From 53b257f25b6eac3ad184b196612d6929007a1c6a Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Mon, 8 Jul 2024 09:45:28 -0500 Subject: [PATCH] Cleaning up debug code from the queue tracker, and including the "total stats" that we'll need. --- src/rust/lqos_queue_tracker/src/lib.rs | 2 +- .../src/tracking/all_queue_data.rs | 78 ++++++++++++++----- .../lqos_queue_tracker/src/tracking/mod.rs | 2 +- src/rust/lqos_utils/src/units/down_up.rs | 10 +++ .../src/throughput_tracker/tracking_data.rs | 7 +- 5 files changed, 73 insertions(+), 26 deletions(-) diff --git a/src/rust/lqos_queue_tracker/src/lib.rs b/src/rust/lqos_queue_tracker/src/lib.rs index 593bf976..8f202ea4 100644 --- a/src/rust/lqos_queue_tracker/src/lib.rs +++ b/src/rust/lqos_queue_tracker/src/lib.rs @@ -23,4 +23,4 @@ pub use queue_structure::spawn_queue_structure_monitor; pub use queue_types::deserialize_tc_tree; // Exported for the benchmarker pub use tracking::spawn_queue_monitor; pub use tracking::{add_watched_queue, still_watching}; -pub use tracking::ALL_QUEUE_SUMMARY; \ No newline at end of file +pub use tracking::{ALL_QUEUE_SUMMARY, TOTAL_QUEUE_STATS}; \ No newline at end of file diff --git a/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs b/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs index 0ea359ec..d6fc2c8d 100644 --- a/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs +++ b/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs @@ -1,23 +1,42 @@ use std::collections::HashMap; use std::sync::Mutex; use once_cell::sync::Lazy; -use lqos_utils::units::DownUpOrder; +use lqos_utils::units::{AtomicDownUp, DownUpOrder}; use crate::tracking::TrackedQueue; pub static ALL_QUEUE_SUMMARY: Lazy = Lazy::new(|| AllQueueData::new()); +pub static TOTAL_QUEUE_STATS: TotalQueueStats = TotalQueueStats::new(); + +pub struct TotalQueueStats { + pub drops: AtomicDownUp, + pub mark: AtomicDownUp, +} + +impl TotalQueueStats { + pub const fn new() -> Self { + Self { + drops: AtomicDownUp::zeroed(), + mark: AtomicDownUp::zeroed(), + } + } +} #[derive(Debug)] - pub struct QueueData { pub drops: DownUpOrder, pub marks: DownUpOrder, - pub prev_drops: DownUpOrder, - pub prev_marks: DownUpOrder, + pub prev_drops: Option>, + pub prev_marks: Option>, +} + +fn zero_total_queue_stats() { + TOTAL_QUEUE_STATS.drops.set_to_zero(); + TOTAL_QUEUE_STATS.mark.set_to_zero(); } #[derive(Debug)] pub struct AllQueueData { - data: Mutex> + data: Mutex>, } impl AllQueueData { @@ -34,9 +53,9 @@ impl AllQueueData { let mut lock = self.data.lock().unwrap(); // Roll through moving current to previous - for (_,q) in lock.iter_mut() { - q.prev_drops = q.drops; - q.prev_marks = q.marks; + for (_, q) in lock.iter_mut() { + q.prev_drops = Some(q.drops); + q.prev_marks = Some(q.marks); q.drops = DownUpOrder::zeroed(); q.marks = DownUpOrder::zeroed(); } @@ -52,12 +71,11 @@ impl AllQueueData { let mut new_record = QueueData { drops: Default::default(), marks: Default::default(), - prev_drops: Default::default(), - prev_marks: Default::default(), + prev_drops: None, + prev_marks: None, }; new_record.drops.down = dl.drops; new_record.marks.down = dl.marks; - println!("Inserting for circuit_id: {}", dl.circuit_id); lock.insert(dl.circuit_id.clone(), new_record); } } @@ -78,24 +96,44 @@ impl AllQueueData { }; new_record.drops.up = ul.drops; new_record.marks.up = ul.marks; - println!("Inserting for circuit_id: {}", ul.circuit_id); lock.insert(ul.circuit_id.clone(), new_record); } } - - //println!("{:?}", lock); } pub fn iterate_queues(&self, f: impl Fn(&str, &DownUpOrder, &DownUpOrder)) { let lock = self.data.lock().unwrap(); for (circuit_id, q) in lock.iter() { - println!("Checking for change in {}", circuit_id); - if q.drops > q.prev_drops || q.marks > q.prev_marks { - println!("Change detected"); - let drops = q.drops.checked_sub_or_zero(q.prev_drops); - let marks = q.marks.checked_sub_or_zero(q.prev_marks); - f(circuit_id, &drops, &marks); + if let Some(prev_drops) = q.prev_drops { + if let Some(prev_marks) = q.prev_marks { + if q.drops > prev_drops || q.marks > prev_marks { + let drops = q.drops.checked_sub_or_zero(prev_drops); + let marks = q.marks.checked_sub_or_zero(prev_marks); + f(circuit_id, &drops, &marks); + } + } } } } + + pub fn calculate_total_queue_stats(&self) { + zero_total_queue_stats(); + let lock = self.data.lock().unwrap(); + + let mut drops = DownUpOrder::zeroed(); + let mut marks = DownUpOrder::zeroed(); + + lock + .iter() + .filter(|(_, q)| q.prev_drops.is_some() && q.prev_marks.is_some()) + .for_each(|(_, q)| { + drops += q.drops.checked_sub_or_zero(q.prev_drops.unwrap()); + marks += q.marks.checked_sub_or_zero(q.prev_marks.unwrap()); + }); + + TOTAL_QUEUE_STATS.drops.set_down(drops.down); + TOTAL_QUEUE_STATS.drops.set_up(drops.up); + TOTAL_QUEUE_STATS.mark.set_down(marks.down); + TOTAL_QUEUE_STATS.mark.set_up(marks.up); + } } \ No newline at end of file diff --git a/src/rust/lqos_queue_tracker/src/tracking/mod.rs b/src/rust/lqos_queue_tracker/src/tracking/mod.rs index 91f1439c..131f3df9 100644 --- a/src/rust/lqos_queue_tracker/src/tracking/mod.rs +++ b/src/rust/lqos_queue_tracker/src/tracking/mod.rs @@ -10,7 +10,7 @@ use lqos_utils::fdtimer::periodic; mod reader; mod watched_queues; mod all_queue_data; -pub use all_queue_data::ALL_QUEUE_SUMMARY; +pub use all_queue_data::*; use self::watched_queues::expire_watched_queues; use watched_queues::WATCHED_QUEUES; diff --git a/src/rust/lqos_utils/src/units/down_up.rs b/src/rust/lqos_utils/src/units/down_up.rs index e35a49b8..ec8b6fc4 100644 --- a/src/rust/lqos_utils/src/units/down_up.rs +++ b/src/rust/lqos_utils/src/units/down_up.rs @@ -1,3 +1,4 @@ +use std::ops::AddAssign; use serde::{Deserialize, Serialize}; use zerocopy::FromBytes; use crate::units::UpDownOrder; @@ -77,6 +78,15 @@ impl Into> for DownUpOrder { } } +impl AddAssign for DownUpOrder +where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedAdd +{ + fn add_assign(&mut self, rhs: Self) { + self.down = self.down.checked_add(&rhs.down).unwrap_or(T::zero()); + self.up = self.up.checked_add(&rhs.up).unwrap_or(T::zero()); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 9c09677d..53ff30cd 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -164,9 +164,11 @@ impl ThroughputTracker { } pub(crate) fn apply_queue_stats(&self) { + // Apply totals + ALL_QUEUE_SUMMARY.calculate_total_queue_stats(); + // Iterate through the queue data and find the matching circuit_id ALL_QUEUE_SUMMARY.iterate_queues(|circuit_id, drops, marks| { - println!("Found marks or drops!"); if let Some(entry) = self.raw_data.iter().find(|v| { match v.circuit_id { Some(ref id) => id == circuit_id, @@ -174,7 +176,6 @@ impl ThroughputTracker { } }) { // Find the net_json parents - println!("Found a matching circuit"); if let Some(parents) = &entry.network_json_parents { let net_json = NETWORK_JSON.read().unwrap(); // Send it upstream @@ -323,8 +324,6 @@ impl ThroughputTracker { // Send it upstream if let Some(parents) = &tracker.network_json_parents { let net_json = NETWORK_JSON.write().unwrap(); - // Send it upstream - println!("{:?}", tracker.tcp_retransmits); net_json.add_retransmit_cycle(parents, tracker.tcp_retransmits); } }