Use Arc<String> on channel broadcasts to avoid duplicating strings

This commit is contained in:
Herbert Wolverson 2024-10-24 09:50:29 -05:00
parent f1bfa42347
commit 0cc55889b1
5 changed files with 14 additions and 14 deletions

View File

@ -18,7 +18,7 @@ use std::sync::Arc;
use axum::{Extension, extract::{WebSocketUpgrade, ws::{Message, WebSocket}}, response::IntoResponse, Router, routing::get};
use serde::Deserialize;
use tokio::sync::mpsc::Sender;
use tracing::{debug, info};
use tracing::debug;
use lqos_bus::BusRequest;
use crate::node_manager::auth::auth_layer;
use crate::node_manager::ws::publish_subscribe::PubSub;
@ -69,7 +69,7 @@ struct Subscribe {
async fn handle_socket(mut socket: WebSocket, channels: Arc<PubSub>) {
debug!("Websocket connected");
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(128);
let (tx, mut rx) = tokio::sync::mpsc::channel::<Arc<String>>(128);
let mut subscribed_channels = HashSet::new();
loop {
tokio::select! {
@ -84,7 +84,7 @@ async fn handle_socket(mut socket: WebSocket, channels: Arc<PubSub>) {
outbound = rx.recv() => {
match outbound {
Some(msg) => {
if let Err(_) = socket.send(Message::Text(msg)).await {
if let Err(_) = socket.send(Message::Text((*msg).clone())).await {
// The outbound websocket has closed. That's ok, it's not
// an error. We're relying on *this* task terminating to in
// turn close the subscription channel, which will in turn
@ -102,7 +102,7 @@ async fn handle_socket(mut socket: WebSocket, channels: Arc<PubSub>) {
debug!("Websocket disconnected");
}
async fn receive_channel_message(msg: Message, channels: Arc<PubSub>, tx: Sender<String>, subscribed_channels: &mut HashSet<PublishedChannels>) {
async fn receive_channel_message(msg: Message, channels: Arc<PubSub>, tx: Sender<Arc<String>>, subscribed_channels: &mut HashSet<PublishedChannels>) {
if let Ok(text) = msg.to_text() {
if let Ok(sub) = serde_json::from_str::<Subscribe>(text) {
if let Ok(channel) = PublishedChannels::from_str(&sub.channel) {

View File

@ -5,7 +5,6 @@
mod publisher_channel;
mod subscriber;
use std::collections::HashSet;
use std::sync::Arc;
use arc_swap::ArcSwap;
use fxhash::FxHashSet;
@ -48,7 +47,7 @@ impl PubSub {
/// Adds a subscriber to a channel set. Once added, they are
/// self-managing and will be deleted when they become inactive
/// automatically.
pub(super) async fn subscribe(&self, channel: PublishedChannels, sender: Sender<String>) {
pub(super) async fn subscribe(&self, channel: PublishedChannels, sender: Sender<Arc<String>>) {
let mut channels = self.channels.lock().await;
if let Some(channel) = channels.iter_mut().find(|c| c.channel_type == channel) {
channel.subscribe(sender).await;
@ -79,7 +78,7 @@ impl PubSub {
pub(super) async fn send(&self, channel: PublishedChannels, message: String) {
let mut channels = self.channels.lock().await;
if let Some(channel) = channels.iter_mut().find(|c| c.channel_type == channel) {
channel.send(message).await;
channel.send(Arc::new(message)).await;
}
}

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use serde_json::json;
use tokio::sync::mpsc::Sender;
use tracing::info;
use crate::node_manager::ws::publish_subscribe::subscriber::Subscriber;
use crate::node_manager::ws::published_channels::PublishedChannels;
@ -21,22 +21,22 @@ impl PublisherChannel {
!self.subscribers.is_empty()
}
pub(super) async fn subscribe(&mut self, sender: Sender<String>) {
pub(super) async fn subscribe(&mut self, sender: Sender<Arc<String>>) {
self.subscribers.push(Subscriber{
is_alive: true,
sender: sender.clone(),
});
let welcome = json!(
let welcome = Arc::new(json!(
{
"event" : "join",
"channel" : self.channel_type.to_string(),
}
).to_string();
).to_string());
let _ = sender.send(welcome).await;
}
/// Submit a message to an entire channel
pub(super) async fn send(&mut self, message: String) {
pub(super) async fn send(&mut self, message: Arc<String>) {
for subscriber in self.subscribers.iter_mut() {
if subscriber.sender.send(message.clone()).await.is_err() {
subscriber.is_alive = false;

View File

@ -1,7 +1,8 @@
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
pub(super) struct Subscriber {
pub(super) is_alive: bool,
pub(super) sender: Sender<String>
pub(super) sender: Sender<Arc<String>>
}

View File

@ -10,7 +10,7 @@ use crate::{
stats::TIME_TO_POLL_HOSTS,
throughput_tracker::tracking_data::ThroughputTracker,
};
use tracing::{debug, info, warn};
use tracing::{debug, warn};
use lqos_bus::{BusResponse, FlowbeeProtocol, IpStats, TcHandle, TopFlowType, XdpPpingResult};
use lqos_sys::flowbee_data::FlowbeeKey;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};