Cleaning up debug code from the queue tracker, and including the "total stats" that we'll need.

This commit is contained in:
Herbert Wolverson 2024-07-08 09:45:28 -05:00
parent ebb95ce82e
commit 53b257f25b
5 changed files with 73 additions and 26 deletions

View File

@ -23,4 +23,4 @@ pub use queue_structure::spawn_queue_structure_monitor;
pub use queue_types::deserialize_tc_tree; // Exported for the benchmarker pub use queue_types::deserialize_tc_tree; // Exported for the benchmarker
pub use tracking::spawn_queue_monitor; pub use tracking::spawn_queue_monitor;
pub use tracking::{add_watched_queue, still_watching}; pub use tracking::{add_watched_queue, still_watching};
pub use tracking::ALL_QUEUE_SUMMARY; pub use tracking::{ALL_QUEUE_SUMMARY, TOTAL_QUEUE_STATS};

View File

@ -1,23 +1,42 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Mutex; use std::sync::Mutex;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use lqos_utils::units::DownUpOrder; use lqos_utils::units::{AtomicDownUp, DownUpOrder};
use crate::tracking::TrackedQueue; use crate::tracking::TrackedQueue;
pub static ALL_QUEUE_SUMMARY: Lazy<AllQueueData> = Lazy::new(|| AllQueueData::new()); pub static ALL_QUEUE_SUMMARY: Lazy<AllQueueData> = Lazy::new(|| AllQueueData::new());
pub static TOTAL_QUEUE_STATS: TotalQueueStats = TotalQueueStats::new();
pub struct TotalQueueStats {
pub drops: AtomicDownUp,
pub mark: AtomicDownUp,
}
impl TotalQueueStats {
pub const fn new() -> Self {
Self {
drops: AtomicDownUp::zeroed(),
mark: AtomicDownUp::zeroed(),
}
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct QueueData { pub struct QueueData {
pub drops: DownUpOrder<u64>, pub drops: DownUpOrder<u64>,
pub marks: DownUpOrder<u64>, pub marks: DownUpOrder<u64>,
pub prev_drops: DownUpOrder<u64>, pub prev_drops: Option<DownUpOrder<u64>>,
pub prev_marks: DownUpOrder<u64>, pub prev_marks: Option<DownUpOrder<u64>>,
}
fn zero_total_queue_stats() {
TOTAL_QUEUE_STATS.drops.set_to_zero();
TOTAL_QUEUE_STATS.mark.set_to_zero();
} }
#[derive(Debug)] #[derive(Debug)]
pub struct AllQueueData { pub struct AllQueueData {
data: Mutex<HashMap<String, QueueData>> data: Mutex<HashMap<String, QueueData>>,
} }
impl AllQueueData { impl AllQueueData {
@ -34,9 +53,9 @@ impl AllQueueData {
let mut lock = self.data.lock().unwrap(); let mut lock = self.data.lock().unwrap();
// Roll through moving current to previous // Roll through moving current to previous
for (_,q) in lock.iter_mut() { for (_, q) in lock.iter_mut() {
q.prev_drops = q.drops; q.prev_drops = Some(q.drops);
q.prev_marks = q.marks; q.prev_marks = Some(q.marks);
q.drops = DownUpOrder::zeroed(); q.drops = DownUpOrder::zeroed();
q.marks = DownUpOrder::zeroed(); q.marks = DownUpOrder::zeroed();
} }
@ -52,12 +71,11 @@ impl AllQueueData {
let mut new_record = QueueData { let mut new_record = QueueData {
drops: Default::default(), drops: Default::default(),
marks: Default::default(), marks: Default::default(),
prev_drops: Default::default(), prev_drops: None,
prev_marks: Default::default(), prev_marks: None,
}; };
new_record.drops.down = dl.drops; new_record.drops.down = dl.drops;
new_record.marks.down = dl.marks; new_record.marks.down = dl.marks;
println!("Inserting for circuit_id: {}", dl.circuit_id);
lock.insert(dl.circuit_id.clone(), new_record); lock.insert(dl.circuit_id.clone(), new_record);
} }
} }
@ -78,24 +96,44 @@ impl AllQueueData {
}; };
new_record.drops.up = ul.drops; new_record.drops.up = ul.drops;
new_record.marks.up = ul.marks; new_record.marks.up = ul.marks;
println!("Inserting for circuit_id: {}", ul.circuit_id);
lock.insert(ul.circuit_id.clone(), new_record); lock.insert(ul.circuit_id.clone(), new_record);
} }
} }
//println!("{:?}", lock);
} }
pub fn iterate_queues(&self, f: impl Fn(&str, &DownUpOrder<u64>, &DownUpOrder<u64>)) { pub fn iterate_queues(&self, f: impl Fn(&str, &DownUpOrder<u64>, &DownUpOrder<u64>)) {
let lock = self.data.lock().unwrap(); let lock = self.data.lock().unwrap();
for (circuit_id, q) in lock.iter() { for (circuit_id, q) in lock.iter() {
println!("Checking for change in {}", circuit_id); if let Some(prev_drops) = q.prev_drops {
if q.drops > q.prev_drops || q.marks > q.prev_marks { if let Some(prev_marks) = q.prev_marks {
println!("Change detected"); if q.drops > prev_drops || q.marks > prev_marks {
let drops = q.drops.checked_sub_or_zero(q.prev_drops); let drops = q.drops.checked_sub_or_zero(prev_drops);
let marks = q.marks.checked_sub_or_zero(q.prev_marks); let marks = q.marks.checked_sub_or_zero(prev_marks);
f(circuit_id, &drops, &marks); f(circuit_id, &drops, &marks);
}
}
} }
} }
} }
pub fn calculate_total_queue_stats(&self) {
zero_total_queue_stats();
let lock = self.data.lock().unwrap();
let mut drops = DownUpOrder::zeroed();
let mut marks = DownUpOrder::zeroed();
lock
.iter()
.filter(|(_, q)| q.prev_drops.is_some() && q.prev_marks.is_some())
.for_each(|(_, q)| {
drops += q.drops.checked_sub_or_zero(q.prev_drops.unwrap());
marks += q.marks.checked_sub_or_zero(q.prev_marks.unwrap());
});
TOTAL_QUEUE_STATS.drops.set_down(drops.down);
TOTAL_QUEUE_STATS.drops.set_up(drops.up);
TOTAL_QUEUE_STATS.mark.set_down(marks.down);
TOTAL_QUEUE_STATS.mark.set_up(marks.up);
}
} }

View File

@ -10,7 +10,7 @@ use lqos_utils::fdtimer::periodic;
mod reader; mod reader;
mod watched_queues; mod watched_queues;
mod all_queue_data; mod all_queue_data;
pub use all_queue_data::ALL_QUEUE_SUMMARY; pub use all_queue_data::*;
use self::watched_queues::expire_watched_queues; use self::watched_queues::expire_watched_queues;
use watched_queues::WATCHED_QUEUES; use watched_queues::WATCHED_QUEUES;

View File

@ -1,3 +1,4 @@
use std::ops::AddAssign;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use zerocopy::FromBytes; use zerocopy::FromBytes;
use crate::units::UpDownOrder; use crate::units::UpDownOrder;
@ -77,6 +78,15 @@ impl <T> Into<UpDownOrder<T>> for DownUpOrder<T> {
} }
} }
impl <T> AddAssign for DownUpOrder<T>
where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedAdd
{
fn add_assign(&mut self, rhs: Self) {
self.down = self.down.checked_add(&rhs.down).unwrap_or(T::zero());
self.up = self.up.checked_add(&rhs.up).unwrap_or(T::zero());
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -164,9 +164,11 @@ impl ThroughputTracker {
} }
pub(crate) fn apply_queue_stats(&self) { pub(crate) fn apply_queue_stats(&self) {
// Apply totals
ALL_QUEUE_SUMMARY.calculate_total_queue_stats();
// Iterate through the queue data and find the matching circuit_id // Iterate through the queue data and find the matching circuit_id
ALL_QUEUE_SUMMARY.iterate_queues(|circuit_id, drops, marks| { ALL_QUEUE_SUMMARY.iterate_queues(|circuit_id, drops, marks| {
println!("Found marks or drops!");
if let Some(entry) = self.raw_data.iter().find(|v| { if let Some(entry) = self.raw_data.iter().find(|v| {
match v.circuit_id { match v.circuit_id {
Some(ref id) => id == circuit_id, Some(ref id) => id == circuit_id,
@ -174,7 +176,6 @@ impl ThroughputTracker {
} }
}) { }) {
// Find the net_json parents // Find the net_json parents
println!("Found a matching circuit");
if let Some(parents) = &entry.network_json_parents { if let Some(parents) = &entry.network_json_parents {
let net_json = NETWORK_JSON.read().unwrap(); let net_json = NETWORK_JSON.read().unwrap();
// Send it upstream // Send it upstream
@ -323,8 +324,6 @@ impl ThroughputTracker {
// Send it upstream // Send it upstream
if let Some(parents) = &tracker.network_json_parents { if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap(); let net_json = NETWORK_JSON.write().unwrap();
// Send it upstream
println!("{:?}", tracker.tcp_retransmits);
net_json.add_retransmit_cycle(parents, tracker.tcp_retransmits); net_json.add_retransmit_cycle(parents, tracker.tcp_retransmits);
} }
} }