From ebb95ce82e4b35aefae65da6cb71b9ab7fab9ec0 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Sun, 7 Jul 2024 21:45:18 -0500 Subject: [PATCH] Needs a LOT of cleaning, but marks/drops are being calculated and applied to the tree. --- src/rust/lqos_config/src/network_json/mod.rs | 36 +++++++++++++++++++ .../src/tracking/all_queue_data.rs | 25 ++++++++++--- .../lqos_queue_tracker/src/tracking/mod.rs | 12 +++---- src/rust/lqos_utils/src/units/down_up.rs | 2 +- .../js_build/src/dashlets/top_tree_summary.js | 4 +++ .../lqosd/src/shaped_devices_tracker/mod.rs | 8 +++++ src/rust/lqosd/src/throughput_tracker/mod.rs | 1 + .../src/throughput_tracker/tracking_data.rs | 22 ++++++++++++ 8 files changed, 98 insertions(+), 12 deletions(-) diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs index 91490320..2ef24265 100644 --- a/src/rust/lqos_config/src/network_json/mod.rs +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -24,6 +24,12 @@ pub struct NetworkJsonNode { /// Current TCP Retransmits pub current_tcp_retransmits: AtomicDownUp, // In retries + /// Current Cake Marks + pub current_marks: AtomicDownUp, + + /// Current Cake Drops + pub current_drops: AtomicDownUp, + /// Approximate RTTs reported for this level of the tree. /// It's never going to be as statistically accurate as the actual /// numbers, being based on medians. @@ -55,6 +61,14 @@ impl NetworkJsonNode { self.current_tcp_retransmits.get_down(), self.current_tcp_retransmits.get_up(), ), + current_marks: ( + self.current_marks.get_down(), + self.current_marks.get_up(), + ), + current_drops: ( + self.current_drops.get_down(), + self.current_drops.get_up(), + ), rtts: self.rtts.iter().map(|n| *n as f32 / 100.0).collect(), parents: self.parents.clone(), immediate_parent: self.immediate_parent, @@ -76,6 +90,10 @@ pub struct NetworkJsonTransport { pub current_throughput: (u64, u64), /// Current count of TCP retransmits pub current_retransmits: (u64, u64), + /// Cake marks + pub current_marks: (u64, u64), + /// Cake drops + pub current_drops: (u64, u64), /// Set of RTT data pub rtts: Vec, /// Node indices of parents @@ -135,6 +153,8 @@ impl NetworkJson { max_throughput: (0, 0), current_throughput: AtomicDownUp::zeroed(), current_tcp_retransmits: AtomicDownUp::zeroed(), + current_drops: AtomicDownUp::zeroed(), + current_marks: AtomicDownUp::zeroed(), parents: Vec::new(), immediate_parent: None, rtts: DashSet::new(), @@ -213,6 +233,8 @@ impl NetworkJson { n.current_throughput.set_to_zero(); n.current_tcp_retransmits.set_to_zero(); n.rtts.clear(); + n.current_drops.set_to_zero(); + n.current_marks.set_to_zero(); }); } @@ -257,6 +279,18 @@ impl NetworkJson { } } } + + pub fn add_queue_cycle(&self, targets: &[usize], marks: &DownUpOrder, drops: &DownUpOrder) { + for idx in targets { + // Safety first; use "get" to ensure that the node exists + if let Some(node) = self.nodes.get(*idx) { + node.current_marks.checked_add(*marks); + node.current_drops.checked_add(*drops); + } else { + warn!("No network tree entry for index {idx}"); + } + } + } } fn json_to_u32(val: Option<&Value>) -> u32 { @@ -294,6 +328,8 @@ fn recurse_node( ), current_throughput: AtomicDownUp::zeroed(), current_tcp_retransmits: AtomicDownUp::zeroed(), + current_drops: AtomicDownUp::zeroed(), + current_marks: AtomicDownUp::zeroed(), name: name.to_string(), immediate_parent: Some(immediate_parent), rtts: DashSet::new(), diff --git a/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs b/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs index 8a1a980c..0ea359ec 100644 --- a/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs +++ b/src/rust/lqos_queue_tracker/src/tracking/all_queue_data.rs @@ -9,10 +9,10 @@ pub static ALL_QUEUE_SUMMARY: Lazy = Lazy::new(|| AllQueueData::ne #[derive(Debug)] pub struct QueueData { - pub drops: DownUpOrder, - pub marks: DownUpOrder, - pub prev_drops: DownUpOrder, - pub prev_marks: DownUpOrder, + pub drops: DownUpOrder, + pub marks: DownUpOrder, + pub prev_drops: DownUpOrder, + pub prev_marks: DownUpOrder, } #[derive(Debug)] @@ -57,6 +57,7 @@ impl AllQueueData { }; new_record.drops.down = dl.drops; new_record.marks.down = dl.marks; + println!("Inserting for circuit_id: {}", dl.circuit_id); lock.insert(dl.circuit_id.clone(), new_record); } } @@ -77,10 +78,24 @@ impl AllQueueData { }; new_record.drops.up = ul.drops; new_record.marks.up = ul.marks; + println!("Inserting for circuit_id: {}", ul.circuit_id); lock.insert(ul.circuit_id.clone(), new_record); } } - println!("{:?}", lock); + //println!("{:?}", lock); + } + + pub fn iterate_queues(&self, f: impl Fn(&str, &DownUpOrder, &DownUpOrder)) { + let lock = self.data.lock().unwrap(); + for (circuit_id, q) in lock.iter() { + println!("Checking for change in {}", circuit_id); + if q.drops > q.prev_drops || q.marks > q.prev_marks { + println!("Change detected"); + let drops = q.drops.checked_sub_or_zero(q.prev_drops); + let marks = q.marks.checked_sub_or_zero(q.prev_marks); + f(circuit_id, &drops, &marks); + } + } } } \ No newline at end of file diff --git a/src/rust/lqos_queue_tracker/src/tracking/mod.rs b/src/rust/lqos_queue_tracker/src/tracking/mod.rs index a9ab2165..91f1439c 100644 --- a/src/rust/lqos_queue_tracker/src/tracking/mod.rs +++ b/src/rust/lqos_queue_tracker/src/tracking/mod.rs @@ -86,8 +86,8 @@ fn track_queues() { struct TrackedQueue { circuit_id: String, - drops: u32, - marks: u32, + drops: u64, + marks: u64, } fn connect_queues_to_circuit(structure: &[QueueNode], queues: &[QueueType]) -> Vec { @@ -102,8 +102,8 @@ fn connect_queues_to_circuit(structure: &[QueueNode], queues: &[QueueType]) -> V if (cake.drops > 0 || marks > 0) { return Some(TrackedQueue { circuit_id: circuit_id.clone(), - drops: cake.drops, - marks, + drops: cake.drops as u64, + marks: marks as u64, }) } } @@ -126,8 +126,8 @@ fn connect_queues_to_circuit_up(structure: &[QueueNode], queues: &[QueueType]) - if (cake.drops > 0 || marks > 0) { return Some(TrackedQueue { circuit_id: circuit_id.clone(), - drops: cake.drops, - marks, + drops: cake.drops as u64, + marks: marks as u64, }) } } diff --git a/src/rust/lqos_utils/src/units/down_up.rs b/src/rust/lqos_utils/src/units/down_up.rs index 4dbc3a01..e35a49b8 100644 --- a/src/rust/lqos_utils/src/units/down_up.rs +++ b/src/rust/lqos_utils/src/units/down_up.rs @@ -5,7 +5,7 @@ use crate::units::UpDownOrder; /// Provides strong download/upload separation for /// stored statistics to eliminate confusion. #[repr(C)] -#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default, Ord, PartialOrd)] pub struct DownUpOrder { pub down: T, pub up: T, diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top_tree_summary.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top_tree_summary.js index 922168d3..cf042db1 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top_tree_summary.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top_tree_summary.js @@ -38,6 +38,8 @@ export class TopTreeSummary extends BaseDashlet { th.appendChild(theading("DL ⬇️")); th.appendChild(theading("UL ⬆️")); th.appendChild(theading("TCP Re-xmit")); + th.appendChild(theading("Cake Marks")); + th.appendChild(theading("Cake Drops")); t.appendChild(th); let tbody = document.createElement("tbody"); @@ -47,6 +49,8 @@ export class TopTreeSummary extends BaseDashlet { row.appendChild(simpleRow(scaleNumber(r[1].current_throughput[0] * 8))); row.appendChild(simpleRow(scaleNumber(r[1].current_throughput[1] * 8))); row.appendChild(simpleRow(r[1].current_retransmits[0] + " / " + r[1].current_retransmits[1])) + row.appendChild(simpleRow(r[1].current_marks[0] + " / " + r[1].current_marks[1])); + row.appendChild(simpleRow(r[1].current_drops[0] + " / " + r[1].current_drops[1])); t.appendChild(row); }); diff --git a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs index 1927c100..9a440afc 100644 --- a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs +++ b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs @@ -84,11 +84,17 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse { if nodes.len() > n_queues { let mut other_bw = (0, 0); let mut other_xmit = (0, 0); + let mut other_marks = (0, 0); + let mut other_drops = (0, 0); nodes.drain(n_queues..).for_each(|n| { other_bw.0 += n.1.current_throughput.0; other_bw.1 += n.1.current_throughput.1; other_xmit.0 += n.1.current_retransmits.0; other_xmit.1 += n.1.current_retransmits.1; + other_marks.0 += n.1.current_marks.0; + other_marks.1 += n.1.current_marks.1; + other_drops.0 += n.1.current_drops.0; + other_drops.1 += n.1.current_drops.1; }); nodes.push(( @@ -98,6 +104,8 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse { max_throughput: (0, 0), current_throughput: other_bw, current_retransmits: other_xmit, + current_marks: other_marks, + current_drops: other_drops, rtts: Vec::new(), parents: Vec::new(), immediate_parent: None, diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 7ee7f687..8389c171 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -92,6 +92,7 @@ async fn throughput_task( netflow_enabled, my_netflow_sender.clone(), ); + THROUGHPUT_TRACKER.apply_queue_stats(); THROUGHPUT_TRACKER.update_totals(); THROUGHPUT_TRACKER.next_cycle(); let duration_ms = start.elapsed().as_micros(); diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 650cbe15..9c09677d 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -4,6 +4,7 @@ use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, Flowbee use dashmap::DashMap; use fxhash::FxHashMap; use lqos_bus::TcHandle; +use lqos_queue_tracker::ALL_QUEUE_SUMMARY; use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each}; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; use lqos_utils::units::{AtomicDownUp, DownUpOrder}; @@ -162,6 +163,27 @@ impl ThroughputTracker { }); } + pub(crate) fn apply_queue_stats(&self) { + // Iterate through the queue data and find the matching circuit_id + ALL_QUEUE_SUMMARY.iterate_queues(|circuit_id, drops, marks| { + println!("Found marks or drops!"); + if let Some(entry) = self.raw_data.iter().find(|v| { + match v.circuit_id { + Some(ref id) => id == circuit_id, + None => false, + } + }) { + // Find the net_json parents + println!("Found a matching circuit"); + if let Some(parents) = &entry.network_json_parents { + let net_json = NETWORK_JSON.read().unwrap(); + // Send it upstream + net_json.add_queue_cycle(parents, drops, marks); + } + } + }); + } + pub(crate) fn apply_flow_data( &self, timeout_seconds: u64,