diff --git a/src/rust/lqos_config/Cargo.toml b/src/rust/lqos_config/Cargo.toml index a8f552fd..60cb2719 100644 --- a/src/rust/lqos_config/Cargo.toml +++ b/src/rust/lqos_config/Cargo.toml @@ -19,3 +19,4 @@ dashmap = "5" pyo3 = "0.20" toml = "0.8.8" once_cell = "1.19.0" +lqos_utils = { path = "../lqos_utils" } \ No newline at end of file diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs index 1414c4b6..5224e0a7 100644 --- a/src/rust/lqos_config/src/network_json/mod.rs +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -7,6 +7,7 @@ use std::{ path::{Path, PathBuf}, sync::atomic::AtomicU64, }; use thiserror::Error; +use lqos_utils::units::AtomicDownUp; /// Describes a node in the network map tree. #[derive(Debug)] @@ -18,7 +19,7 @@ pub struct NetworkJsonNode { pub max_throughput: (u32, u32), // In mbps /// Current throughput (in bytes/second) at this node - pub current_throughput: (AtomicU64, AtomicU64), // In bytes + pub current_throughput: AtomicDownUp, // In bytes /// Approximate RTTs reported for this level of the tree. /// It's never going to be as statistically accurate as the actual @@ -44,8 +45,8 @@ impl NetworkJsonNode { name: self.name.clone(), max_throughput: self.max_throughput, current_throughput: ( - self.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed), - self.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed), + self.current_throughput.get_down(), + self.current_throughput.get_up(), ), rtts: self.rtts.iter().map(|n| *n as f32 / 100.0).collect(), parents: self.parents.clone(), @@ -123,7 +124,7 @@ impl NetworkJson { let mut nodes = vec![NetworkJsonNode { name: "Root".to_string(), max_throughput: (0, 0), - current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), + current_throughput: AtomicDownUp::zeroed(), parents: Vec::new(), immediate_parent: None, rtts: DashSet::new(), @@ -199,8 +200,7 @@ impl NetworkJson { /// access. pub fn zero_throughput_and_rtt(&self) { self.nodes.iter().for_each(|n| { - n.current_throughput.0.store(0, std::sync::atomic::Ordering::Relaxed); - n.current_throughput.1.store(0, std::sync::atomic::Ordering::Relaxed); + n.current_throughput.set_to_zero(); n.rtts.clear(); }); } @@ -216,8 +216,7 @@ impl NetworkJson { for idx in targets { // Safety first: use "get" to ensure that the node exists if let Some(node) = self.nodes.get(*idx) { - node.current_throughput.0.fetch_add(bytes.0, std::sync::atomic::Ordering::Relaxed); - node.current_throughput.1.fetch_add(bytes.1, std::sync::atomic::Ordering::Relaxed); + node.current_throughput.checked_add_tuple(bytes); } else { warn!("No network tree entry for index {idx}"); } @@ -271,7 +270,7 @@ fn recurse_node( json_to_u32(json.get("downloadBandwidthMbps")), json_to_u32(json.get("uploadBandwidthMbps")), ), - current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), + current_throughput: AtomicDownUp::zeroed(), name: name.to_string(), immediate_parent: Some(immediate_parent), rtts: DashSet::new(), diff --git a/src/rust/lqos_utils/src/units/atomic_down_up.rs b/src/rust/lqos_utils/src/units/atomic_down_up.rs index 413fa1d2..cc5f332c 100644 --- a/src/rust/lqos_utils/src/units/atomic_down_up.rs +++ b/src/rust/lqos_utils/src/units/atomic_down_up.rs @@ -1,6 +1,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::Relaxed; +#[derive(Debug)] pub struct AtomicDownUp { down: AtomicU64, up: AtomicU64, @@ -13,12 +14,12 @@ impl AtomicDownUp { up: AtomicU64::new(0), } } - + pub fn set_to_zero(&self) { self.up.store(0, Relaxed); self.down.store(0, Relaxed); } - + pub fn checked_add_tuple(&self, n: (u64, u64)) { let n0 = self.down.load(std::sync::atomic::Ordering::Relaxed); if let Some(n) = n0.checked_add(n.0) { @@ -30,7 +31,7 @@ impl AtomicDownUp { self.up.store(n, std::sync::atomic::Ordering::Relaxed); } } - + pub fn get_down(&self) -> u64 { self.down.load(Relaxed) } @@ -38,7 +39,7 @@ impl AtomicDownUp { pub fn get_up(&self) -> u64 { self.up.load(Relaxed) } - + pub fn set_down(&self, n: u64) { self.down.store(n, Relaxed); } diff --git a/src/rust/lts_client/src/collector/network_tree.rs b/src/rust/lts_client/src/collector/network_tree.rs index b7ade091..efb34b64 100644 --- a/src/rust/lts_client/src/collector/network_tree.rs +++ b/src/rust/lts_client/src/collector/network_tree.rs @@ -38,8 +38,8 @@ impl From<&NetworkJsonNode> for NetworkTreeEntry { parents: value.parents.clone(), immediate_parent: value.immediate_parent, current_throughput: ( - value.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed) as u32, - value.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed) as u32, + value.current_throughput.get_down() as u32, + value.current_throughput.get_up() as u32, ), node_type: value.node_type.clone(), rtts: (min as u16, max as u16, avg as u16),