diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 7b50178b..7f5661c3 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -30,7 +30,7 @@ members = [ "lqos_heimdall", # Library for managing Heimdall flow watching "lqos_map_perf", # A CLI tool for testing eBPF map performance "lqstats", # A CLI utility for retrieving long-term statistics - "lts_client", # Shared data and client-side code for long-term stats + #"lts_client", # Shared data and client-side code for long-term stats "lqos_map_perf", # A CLI tool for testing eBPF map performance "uisp", # REST support for the UISP API "uisp_integration", # UISP Integration in Rust diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index 37f0c84e..0d891547 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -135,7 +135,7 @@ fn main() -> Result<()> { } else { info!("LTS2 client started successfully"); } - let long_term_stats_tx = start_long_term_stats(); + //let long_term_stats_tx = start_long_term_stats(); let flow_tx = setup_netflow_tracker()?; let _ = throughput_tracker::flow_data::setup_flow_analysis(); start_heimdall()?; @@ -143,7 +143,7 @@ fn main() -> Result<()> { shaped_devices_tracker::shaped_devices_watcher()?; shaped_devices_tracker::network_json_watcher()?; anonymous_usage::start_anonymous_usage(); - throughput_tracker::spawn_throughput_monitor(long_term_stats_tx.clone(), flow_tx)?; + throughput_tracker::spawn_throughput_monitor(flow_tx)?; spawn_queue_monitor()?; let system_usage_tx = system_stats::start_system_stats()?; @@ -160,9 +160,9 @@ fn main() -> Result<()> { warn!("This should never happen - terminating on unknown signal") } } - let _ = tokio::runtime::Runtime::new() - .unwrap() - .block_on(long_term_stats_tx.send(lts_client::collector::stats_availability::StatsUpdateMessage::Quit)); + //let _ = tokio::runtime::Runtime::new() + // .unwrap() + // .block_on(long_term_stats_tx.send(lts_client::collector::stats_availability::StatsUpdateMessage::Quit)); std::mem::drop(kernels); UnixSocketServer::signal_cleanup(); std::mem::drop(file_lock); diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 26ed77db..54334525 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -42,14 +42,14 @@ pub static THROUGHPUT_TRACKER: Lazy = Lazy::new(ThroughputTra /// * `long_term_stats_tx` - an optional MPSC sender to notify the /// collection thread that there is fresh data. pub fn spawn_throughput_monitor( - long_term_stats_tx: Sender, + //long_term_stats_tx: Sender, netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, ) -> anyhow::Result<()> { debug!("Starting the bandwidth monitor thread."); std::thread::Builder::new() .name("Throughput Monitor".to_string()) .spawn(|| {throughput_task( - long_term_stats_tx, + //long_term_stats_tx, netflow_sender, )})?; @@ -106,7 +106,7 @@ impl ThroughputTaskTimeMetrics { } fn throughput_task( - long_term_stats_tx: Sender, + //long_term_stats_tx: Sender, netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, ) { // Obtain the flow timeout from the config, default to 30 seconds @@ -180,14 +180,14 @@ fn throughput_task( } if last_submitted_to_lts.is_none() { - submit_throughput_stats(long_term_stats_tx.clone(), 1.0); + submit_throughput_stats(1.0); } else { let elapsed = last_submitted_to_lts.unwrap().elapsed(); let elapsed_f64 = elapsed.as_secs_f64(); // Temporary: place this in a thread to not block the timer - let my_lts_tx = long_term_stats_tx.clone(); + //let my_lts_tx = long_term_stats_tx.clone(); std::thread::Builder::new().name("Throughput Stats Submit".to_string()).spawn(move || { - submit_throughput_stats(my_lts_tx, elapsed_f64); + submit_throughput_stats(elapsed_f64); }).unwrap().join().unwrap(); //submit_throughput_stats(long_term_stats_tx.clone(), elapsed_f64); } @@ -230,7 +230,10 @@ impl LtsSubmitMetrics { } } -fn submit_throughput_stats(long_term_stats_tx: Sender, scale: f64) { +fn submit_throughput_stats( + //long_term_stats_tx: Sender, + scale: f64 +) { let mut metrics = LtsSubmitMetrics::new(); let mut lts2_needs_shaped_devices = false; // If ShapedDevices has changed, notify the stats thread @@ -243,8 +246,8 @@ fn submit_throughput_stats(long_term_stats_tx: Sender, scale if changed { lts2_needs_shaped_devices = true; let shaped_devices = SHAPED_DEVICES.read().unwrap().devices.clone(); - let _ = long_term_stats_tx - .blocking_send(StatsUpdateMessage::ShapedDevicesChanged(shaped_devices)); + //let _ = long_term_stats_tx + // .blocking_send(StatsUpdateMessage::ShapedDevicesChanged(shaped_devices)); } } metrics.shaped_devices = metrics.start.elapsed().as_secs_f64(); @@ -259,6 +262,7 @@ fn submit_throughput_stats(long_term_stats_tx: Sender, scale let bits_per_second = THROUGHPUT_TRACKER.bits_per_second(); let shaped_bits_per_second = THROUGHPUT_TRACKER.shaped_bits_per_second(); metrics.total_throughput = metrics.start.elapsed().as_secs_f64(); + /* if let Ok(config) = load_config() { if bits_per_second.down > (config.queues.downlink_bandwidth_mbps as u64 * 1_000_000) { @@ -306,7 +310,7 @@ fn submit_throughput_stats(long_term_stats_tx: Sender, scale if metrics.start.elapsed().as_secs_f64() > 1.0 { warn!("{:?}", metrics); - } + }*/ // LTS2 Block if let Ok(now) = unix_now() {