From 2defbf7ce4acb71768a20e9548201acb73f3c737 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Wed, 9 Oct 2024 13:11:05 -0500 Subject: [PATCH] Replace flow submission channel with a bounded crossbeam channel. --- .../lqosd/src/throughput_tracker/flow_data/mod.rs | 12 +++++------- src/rust/lqosd/src/throughput_tracker/mod.rs | 4 ++-- .../lqosd/src/throughput_tracker/tracking_data.rs | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs index 49bbb5df..67e7bc5e 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs @@ -9,13 +9,10 @@ mod flow_analysis; use crate::throughput_tracker::flow_data::{flow_analysis::FinishedFlowAnalysis, netflow5::Netflow5, netflow9::Netflow9}; pub(crate) use flow_tracker::{ALL_FLOWS, AsnId, FlowbeeLocalData}; use lqos_sys::flowbee_data::FlowbeeKey; -use std::sync::{ - mpsc::{channel, Sender}, - Arc, -}; +use std::sync::Arc; use tracing::{debug, error, info}; use anyhow::Result; - +use crossbeam_channel::Sender; pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country, FlowAnalysis, RECENT_FLOWS, flowbee_handle_events, get_flowbee_event_count_and_reset, expire_rtt_flows, flowbee_rtt_map, RttData, get_rtt_events_per_second, AsnListEntry, @@ -28,8 +25,9 @@ trait FlowbeeRecipient { // Creates the netflow tracker and returns the sender pub fn setup_netflow_tracker() -> Result> { - let (tx, rx) = channel::<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>(); - let config = lqos_config::load_config().unwrap(); + let (tx, rx) = crossbeam_channel::bounded::<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>(65535); + let config = lqos_config::load_config() + .inspect_err(|e| error!("Failed to load configuration: {e}"))?; std::thread::Builder::new() .name("Netflow Tracker".to_string()) diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 3d19c098..437de23a 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -37,7 +37,7 @@ pub static THROUGHPUT_TRACKER: Lazy = Lazy::new(ThroughputTra /// collection thread that there is fresh data. pub fn spawn_throughput_monitor( long_term_stats_tx: Sender, - netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, + netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, ) -> anyhow::Result<()> { debug!("Starting the bandwidth monitor thread."); std::thread::Builder::new() @@ -101,7 +101,7 @@ impl ThroughputTaskTimeMetrics { fn throughput_task( long_term_stats_tx: Sender, - netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, + netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, ) { // Obtain the flow timeout from the config, default to 30 seconds let timeout_seconds = if let Ok(config) = lqos_config::load_config() { diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 06a01075..58263e9a 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -191,7 +191,7 @@ impl ThroughputTracker { &self, timeout_seconds: u64, _netflow_enabled: bool, - sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, + sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, net_json_calc: &mut NetworkJsonCounting, ) { //log::debug!("Flowbee events this second: {}", get_flowbee_event_count_and_reset());