mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
Prevent duplicate channel subscription.
This commit is contained in:
parent
bcd7842755
commit
87257df5ad
@ -11,6 +11,7 @@
|
|||||||
//!
|
//!
|
||||||
//! Both types of websocket are authenticated using the auth layer.
|
//! Both types of websocket are authenticated using the auth layer.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
@ -64,12 +65,13 @@ async fn handle_socket(mut socket: WebSocket, channels: Arc<PubSub>) {
|
|||||||
log::info!("Websocket connected");
|
log::info!("Websocket connected");
|
||||||
|
|
||||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(10);
|
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(10);
|
||||||
|
let mut subscribed_channels = HashSet::new();
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
inbound = socket.recv() => {
|
inbound = socket.recv() => {
|
||||||
// Received a websocket message
|
// Received a websocket message
|
||||||
match inbound {
|
match inbound {
|
||||||
Some(Ok(msg)) => receive_channel_message(msg, channels.clone(), tx.clone()).await,
|
Some(Ok(msg)) => receive_channel_message(msg, channels.clone(), tx.clone(), &mut subscribed_channels).await,
|
||||||
Some(Err(_)) => break, // The channel has closed
|
Some(Err(_)) => break, // The channel has closed
|
||||||
None => break, // The channel has closed
|
None => break, // The channel has closed
|
||||||
}
|
}
|
||||||
@ -96,12 +98,15 @@ async fn handle_socket(mut socket: WebSocket, channels: Arc<PubSub>) {
|
|||||||
log::info!("Websocket disconnected");
|
log::info!("Websocket disconnected");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_channel_message(msg: Message, channels: Arc<PubSub>, tx: Sender<String>) {
|
async fn receive_channel_message(msg: Message, channels: Arc<PubSub>, tx: Sender<String>, subscribed_channels: &mut HashSet<PublishedChannels>) {
|
||||||
log::debug!("Received message: {:?}", msg);
|
log::debug!("Received message: {:?}", msg);
|
||||||
if let Ok(text) = msg.to_text() {
|
if let Ok(text) = msg.to_text() {
|
||||||
if let Ok(sub) = serde_json::from_str::<Subscribe>(text) {
|
if let Ok(sub) = serde_json::from_str::<Subscribe>(text) {
|
||||||
if let Ok(channel) = PublishedChannels::from_str(&sub.channel) {
|
if let Ok(channel) = PublishedChannels::from_str(&sub.channel) {
|
||||||
|
if !subscribed_channels.contains(&channel) {
|
||||||
channels.subscribe(channel, tx.clone()).await;
|
channels.subscribe(channel, tx.clone()).await;
|
||||||
|
subscribed_channels.insert(channel);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use strum::{Display, EnumIter, EnumString};
|
use strum::{Display, EnumIter, EnumString};
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Copy, Debug, EnumIter, Display, EnumString)]
|
#[derive(PartialEq, Clone, Copy, Debug, EnumIter, Display, EnumString, Hash, Eq)]
|
||||||
pub enum PublishedChannels {
|
pub enum PublishedChannels {
|
||||||
/// Provides a 1-second tick notification to the client
|
/// Provides a 1-second tick notification to the client
|
||||||
Cadence,
|
Cadence,
|
||||||
|
Loading…
Reference in New Issue
Block a user