Experiment with no LTS1 to see if it affects memory usage over time

This commit is contained in:
Herbert Wolverson 2024-10-09 15:28:54 -05:00
parent ed835ef621
commit ad8ffc16d4
3 changed files with 20 additions and 16 deletions

View File

@ -30,7 +30,7 @@ members = [
"lqos_heimdall", # Library for managing Heimdall flow watching "lqos_heimdall", # Library for managing Heimdall flow watching
"lqos_map_perf", # A CLI tool for testing eBPF map performance "lqos_map_perf", # A CLI tool for testing eBPF map performance
"lqstats", # A CLI utility for retrieving long-term statistics "lqstats", # A CLI utility for retrieving long-term statistics
"lts_client", # Shared data and client-side code for long-term stats #"lts_client", # Shared data and client-side code for long-term stats
"lqos_map_perf", # A CLI tool for testing eBPF map performance "lqos_map_perf", # A CLI tool for testing eBPF map performance
"uisp", # REST support for the UISP API "uisp", # REST support for the UISP API
"uisp_integration", # UISP Integration in Rust "uisp_integration", # UISP Integration in Rust

View File

@ -135,7 +135,7 @@ fn main() -> Result<()> {
} else { } else {
info!("LTS2 client started successfully"); info!("LTS2 client started successfully");
} }
let long_term_stats_tx = start_long_term_stats(); //let long_term_stats_tx = start_long_term_stats();
let flow_tx = setup_netflow_tracker()?; let flow_tx = setup_netflow_tracker()?;
let _ = throughput_tracker::flow_data::setup_flow_analysis(); let _ = throughput_tracker::flow_data::setup_flow_analysis();
start_heimdall()?; start_heimdall()?;
@ -143,7 +143,7 @@ fn main() -> Result<()> {
shaped_devices_tracker::shaped_devices_watcher()?; shaped_devices_tracker::shaped_devices_watcher()?;
shaped_devices_tracker::network_json_watcher()?; shaped_devices_tracker::network_json_watcher()?;
anonymous_usage::start_anonymous_usage(); anonymous_usage::start_anonymous_usage();
throughput_tracker::spawn_throughput_monitor(long_term_stats_tx.clone(), flow_tx)?; throughput_tracker::spawn_throughput_monitor(flow_tx)?;
spawn_queue_monitor()?; spawn_queue_monitor()?;
let system_usage_tx = system_stats::start_system_stats()?; let system_usage_tx = system_stats::start_system_stats()?;
@ -160,9 +160,9 @@ fn main() -> Result<()> {
warn!("This should never happen - terminating on unknown signal") warn!("This should never happen - terminating on unknown signal")
} }
} }
let _ = tokio::runtime::Runtime::new() //let _ = tokio::runtime::Runtime::new()
.unwrap() // .unwrap()
.block_on(long_term_stats_tx.send(lts_client::collector::stats_availability::StatsUpdateMessage::Quit)); // .block_on(long_term_stats_tx.send(lts_client::collector::stats_availability::StatsUpdateMessage::Quit));
std::mem::drop(kernels); std::mem::drop(kernels);
UnixSocketServer::signal_cleanup(); UnixSocketServer::signal_cleanup();
std::mem::drop(file_lock); std::mem::drop(file_lock);

View File

@ -42,14 +42,14 @@ pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> = Lazy::new(ThroughputTra
/// * `long_term_stats_tx` - an optional MPSC sender to notify the /// * `long_term_stats_tx` - an optional MPSC sender to notify the
/// collection thread that there is fresh data. /// collection thread that there is fresh data.
pub fn spawn_throughput_monitor( pub fn spawn_throughput_monitor(
long_term_stats_tx: Sender<StatsUpdateMessage>, //long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
debug!("Starting the bandwidth monitor thread."); debug!("Starting the bandwidth monitor thread.");
std::thread::Builder::new() std::thread::Builder::new()
.name("Throughput Monitor".to_string()) .name("Throughput Monitor".to_string())
.spawn(|| {throughput_task( .spawn(|| {throughput_task(
long_term_stats_tx, //long_term_stats_tx,
netflow_sender, netflow_sender,
)})?; )})?;
@ -106,7 +106,7 @@ impl ThroughputTaskTimeMetrics {
} }
fn throughput_task( fn throughput_task(
long_term_stats_tx: Sender<StatsUpdateMessage>, //long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
) { ) {
// Obtain the flow timeout from the config, default to 30 seconds // Obtain the flow timeout from the config, default to 30 seconds
@ -180,14 +180,14 @@ fn throughput_task(
} }
if last_submitted_to_lts.is_none() { if last_submitted_to_lts.is_none() {
submit_throughput_stats(long_term_stats_tx.clone(), 1.0); submit_throughput_stats(1.0);
} else { } else {
let elapsed = last_submitted_to_lts.unwrap().elapsed(); let elapsed = last_submitted_to_lts.unwrap().elapsed();
let elapsed_f64 = elapsed.as_secs_f64(); let elapsed_f64 = elapsed.as_secs_f64();
// Temporary: place this in a thread to not block the timer // Temporary: place this in a thread to not block the timer
let my_lts_tx = long_term_stats_tx.clone(); //let my_lts_tx = long_term_stats_tx.clone();
std::thread::Builder::new().name("Throughput Stats Submit".to_string()).spawn(move || { std::thread::Builder::new().name("Throughput Stats Submit".to_string()).spawn(move || {
submit_throughput_stats(my_lts_tx, elapsed_f64); submit_throughput_stats(elapsed_f64);
}).unwrap().join().unwrap(); }).unwrap().join().unwrap();
//submit_throughput_stats(long_term_stats_tx.clone(), elapsed_f64); //submit_throughput_stats(long_term_stats_tx.clone(), elapsed_f64);
} }
@ -230,7 +230,10 @@ impl LtsSubmitMetrics {
} }
} }
fn submit_throughput_stats(long_term_stats_tx: Sender<StatsUpdateMessage>, scale: f64) { fn submit_throughput_stats(
//long_term_stats_tx: Sender<StatsUpdateMessage>,
scale: f64
) {
let mut metrics = LtsSubmitMetrics::new(); let mut metrics = LtsSubmitMetrics::new();
let mut lts2_needs_shaped_devices = false; let mut lts2_needs_shaped_devices = false;
// If ShapedDevices has changed, notify the stats thread // If ShapedDevices has changed, notify the stats thread
@ -243,8 +246,8 @@ fn submit_throughput_stats(long_term_stats_tx: Sender<StatsUpdateMessage>, scale
if changed { if changed {
lts2_needs_shaped_devices = true; lts2_needs_shaped_devices = true;
let shaped_devices = SHAPED_DEVICES.read().unwrap().devices.clone(); let shaped_devices = SHAPED_DEVICES.read().unwrap().devices.clone();
let _ = long_term_stats_tx //let _ = long_term_stats_tx
.blocking_send(StatsUpdateMessage::ShapedDevicesChanged(shaped_devices)); // .blocking_send(StatsUpdateMessage::ShapedDevicesChanged(shaped_devices));
} }
} }
metrics.shaped_devices = metrics.start.elapsed().as_secs_f64(); metrics.shaped_devices = metrics.start.elapsed().as_secs_f64();
@ -259,6 +262,7 @@ fn submit_throughput_stats(long_term_stats_tx: Sender<StatsUpdateMessage>, scale
let bits_per_second = THROUGHPUT_TRACKER.bits_per_second(); let bits_per_second = THROUGHPUT_TRACKER.bits_per_second();
let shaped_bits_per_second = THROUGHPUT_TRACKER.shaped_bits_per_second(); let shaped_bits_per_second = THROUGHPUT_TRACKER.shaped_bits_per_second();
metrics.total_throughput = metrics.start.elapsed().as_secs_f64(); metrics.total_throughput = metrics.start.elapsed().as_secs_f64();
/*
if let Ok(config) = load_config() { if let Ok(config) = load_config() {
if bits_per_second.down > (config.queues.downlink_bandwidth_mbps as u64 * 1_000_000) { if bits_per_second.down > (config.queues.downlink_bandwidth_mbps as u64 * 1_000_000) {
@ -306,7 +310,7 @@ fn submit_throughput_stats(long_term_stats_tx: Sender<StatsUpdateMessage>, scale
if metrics.start.elapsed().as_secs_f64() > 1.0 { if metrics.start.elapsed().as_secs_f64() > 1.0 {
warn!("{:?}", metrics); warn!("{:?}", metrics);
} }*/
// LTS2 Block // LTS2 Block
if let Ok(now) = unix_now() { if let Ok(now) = unix_now() {