Stability: no more unwrap in the main websocket handler. Instead, websocket disconnection is graceful.

This commit is contained in:
Herbert Wolverson 2024-07-17 08:27:17 -05:00
parent 1d9f1bd2ce
commit 3e5e63924d
2 changed files with 33 additions and 23 deletions

View File

@ -1,17 +1,20 @@
mod publish_subscribe;
mod published_channels;
mod ticker;
mod single_user_channels;
use std::str::FromStr;
use std::sync::Arc;
use axum::{extract::{ws::{Message, WebSocket}, WebSocketUpgrade}, response::IntoResponse, routing::get, Extension, Router};
use axum::{Extension, extract::{WebSocketUpgrade, ws::{Message, WebSocket}}, response::IntoResponse, Router, routing::get};
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
use crate::node_manager::auth::auth_layer;
use crate::node_manager::ws::publish_subscribe::PubSub;
use crate::node_manager::ws::published_channels::PublishedChannels;
use crate::node_manager::ws::ticker::channel_ticker;
mod publish_subscribe;
mod published_channels;
mod ticker;
mod single_user_channels;
pub fn websocket_router() -> Router {
let channels = PubSub::new();
tokio::spawn(channel_ticker(channels.clone()));
@ -48,27 +51,21 @@ async fn handle_socket(mut socket: WebSocket, channels: Arc<PubSub>) {
inbound = socket.recv() => {
// Received a websocket message
match inbound {
Some(Ok(msg)) => {
log::info!("Received message: {:?}", msg);
if let Ok(text) = msg.to_text() {
if let Ok(sub) = serde_json::from_str::<Subscribe>(text) {
channels.subscribe(PublishedChannels::from_str(&sub.channel).unwrap(), tx.clone()).await;
}
}
}
Some(Err(e)) => {
log::warn!("Error receiving websocket message: {:?}", e);
break;
}
None => {
break;
}
Some(Ok(msg)) => receive_channel_message(msg, channels.clone(), tx.clone()).await,
Some(Err(_)) => break, // The channel has closed
None => break, // The channel has closed
}
}
outbound = rx.recv() => {
match outbound {
Some(msg) => {
socket.send(Message::Text(msg)).await.unwrap();
if let Err(_) = socket.send(Message::Text(msg)).await {
// The outbound websocket has closed. That's ok, it's not
// an error. We're relying on *this* task terminating to in
// turn close the subscription channel, which will in turn
// cause the subscription to end.
break;
}
}
None => {
log::info!("WebSocket Disconnected");
@ -79,4 +76,15 @@ async fn handle_socket(mut socket: WebSocket, channels: Arc<PubSub>) {
}
}
log::info!("Websocket disconnected");
}
}
async fn receive_channel_message(msg: Message, channels: Arc<PubSub>, tx: Sender<String>) {
log::debug!("Received message: {:?}", msg);
if let Ok(text) = msg.to_text() {
if let Ok(sub) = serde_json::from_str::<Subscribe>(text) {
if let Ok(channel) = PublishedChannels::from_str(&sub.channel) {
channels.subscribe(channel, tx.clone()).await;
}
}
}
}

View File

@ -62,6 +62,8 @@ impl PubSub {
}
}
/// Sends a message to everyone subscribed to a topic. If senders' channels
/// are dead, they are removed from the list.
pub(super) async fn send(&self, channel: PublishedChannels, message: String) {
let mut channels = self.channels.lock().await;
if let Some(channel) = channels.iter_mut().find(|c| c.channel_type == channel) {