Network JSON atomic counters use the AtomicDownUp structure

This commit is contained in:
Herbert Wolverson 2024-07-01 15:51:08 -05:00
parent c704f231e9
commit 0969ba438a
4 changed files with 16 additions and 15 deletions

View File

@ -19,3 +19,4 @@ dashmap = "5"
pyo3 = "0.20" pyo3 = "0.20"
toml = "0.8.8" toml = "0.8.8"
once_cell = "1.19.0" once_cell = "1.19.0"
lqos_utils = { path = "../lqos_utils" }

View File

@ -7,6 +7,7 @@ use std::{
path::{Path, PathBuf}, sync::atomic::AtomicU64, path::{Path, PathBuf}, sync::atomic::AtomicU64,
}; };
use thiserror::Error; use thiserror::Error;
use lqos_utils::units::AtomicDownUp;
/// Describes a node in the network map tree. /// Describes a node in the network map tree.
#[derive(Debug)] #[derive(Debug)]
@ -18,7 +19,7 @@ pub struct NetworkJsonNode {
pub max_throughput: (u32, u32), // In mbps pub max_throughput: (u32, u32), // In mbps
/// Current throughput (in bytes/second) at this node /// Current throughput (in bytes/second) at this node
pub current_throughput: (AtomicU64, AtomicU64), // In bytes pub current_throughput: AtomicDownUp, // In bytes
/// Approximate RTTs reported for this level of the tree. /// Approximate RTTs reported for this level of the tree.
/// It's never going to be as statistically accurate as the actual /// It's never going to be as statistically accurate as the actual
@ -44,8 +45,8 @@ impl NetworkJsonNode {
name: self.name.clone(), name: self.name.clone(),
max_throughput: self.max_throughput, max_throughput: self.max_throughput,
current_throughput: ( current_throughput: (
self.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed), self.current_throughput.get_down(),
self.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed), self.current_throughput.get_up(),
), ),
rtts: self.rtts.iter().map(|n| *n as f32 / 100.0).collect(), rtts: self.rtts.iter().map(|n| *n as f32 / 100.0).collect(),
parents: self.parents.clone(), parents: self.parents.clone(),
@ -123,7 +124,7 @@ impl NetworkJson {
let mut nodes = vec![NetworkJsonNode { let mut nodes = vec![NetworkJsonNode {
name: "Root".to_string(), name: "Root".to_string(),
max_throughput: (0, 0), max_throughput: (0, 0),
current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), current_throughput: AtomicDownUp::zeroed(),
parents: Vec::new(), parents: Vec::new(),
immediate_parent: None, immediate_parent: None,
rtts: DashSet::new(), rtts: DashSet::new(),
@ -199,8 +200,7 @@ impl NetworkJson {
/// access. /// access.
pub fn zero_throughput_and_rtt(&self) { pub fn zero_throughput_and_rtt(&self) {
self.nodes.iter().for_each(|n| { self.nodes.iter().for_each(|n| {
n.current_throughput.0.store(0, std::sync::atomic::Ordering::Relaxed); n.current_throughput.set_to_zero();
n.current_throughput.1.store(0, std::sync::atomic::Ordering::Relaxed);
n.rtts.clear(); n.rtts.clear();
}); });
} }
@ -216,8 +216,7 @@ impl NetworkJson {
for idx in targets { for idx in targets {
// Safety first: use "get" to ensure that the node exists // Safety first: use "get" to ensure that the node exists
if let Some(node) = self.nodes.get(*idx) { if let Some(node) = self.nodes.get(*idx) {
node.current_throughput.0.fetch_add(bytes.0, std::sync::atomic::Ordering::Relaxed); node.current_throughput.checked_add_tuple(bytes);
node.current_throughput.1.fetch_add(bytes.1, std::sync::atomic::Ordering::Relaxed);
} else { } else {
warn!("No network tree entry for index {idx}"); warn!("No network tree entry for index {idx}");
} }
@ -271,7 +270,7 @@ fn recurse_node(
json_to_u32(json.get("downloadBandwidthMbps")), json_to_u32(json.get("downloadBandwidthMbps")),
json_to_u32(json.get("uploadBandwidthMbps")), json_to_u32(json.get("uploadBandwidthMbps")),
), ),
current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), current_throughput: AtomicDownUp::zeroed(),
name: name.to_string(), name: name.to_string(),
immediate_parent: Some(immediate_parent), immediate_parent: Some(immediate_parent),
rtts: DashSet::new(), rtts: DashSet::new(),

View File

@ -1,6 +1,7 @@
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;
#[derive(Debug)]
pub struct AtomicDownUp { pub struct AtomicDownUp {
down: AtomicU64, down: AtomicU64,
up: AtomicU64, up: AtomicU64,
@ -13,12 +14,12 @@ impl AtomicDownUp {
up: AtomicU64::new(0), up: AtomicU64::new(0),
} }
} }
pub fn set_to_zero(&self) { pub fn set_to_zero(&self) {
self.up.store(0, Relaxed); self.up.store(0, Relaxed);
self.down.store(0, Relaxed); self.down.store(0, Relaxed);
} }
pub fn checked_add_tuple(&self, n: (u64, u64)) { pub fn checked_add_tuple(&self, n: (u64, u64)) {
let n0 = self.down.load(std::sync::atomic::Ordering::Relaxed); let n0 = self.down.load(std::sync::atomic::Ordering::Relaxed);
if let Some(n) = n0.checked_add(n.0) { if let Some(n) = n0.checked_add(n.0) {
@ -30,7 +31,7 @@ impl AtomicDownUp {
self.up.store(n, std::sync::atomic::Ordering::Relaxed); self.up.store(n, std::sync::atomic::Ordering::Relaxed);
} }
} }
pub fn get_down(&self) -> u64 { pub fn get_down(&self) -> u64 {
self.down.load(Relaxed) self.down.load(Relaxed)
} }
@ -38,7 +39,7 @@ impl AtomicDownUp {
pub fn get_up(&self) -> u64 { pub fn get_up(&self) -> u64 {
self.up.load(Relaxed) self.up.load(Relaxed)
} }
pub fn set_down(&self, n: u64) { pub fn set_down(&self, n: u64) {
self.down.store(n, Relaxed); self.down.store(n, Relaxed);
} }

View File

@ -38,8 +38,8 @@ impl From<&NetworkJsonNode> for NetworkTreeEntry {
parents: value.parents.clone(), parents: value.parents.clone(),
immediate_parent: value.immediate_parent, immediate_parent: value.immediate_parent,
current_throughput: ( current_throughput: (
value.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed) as u32, value.current_throughput.get_down() as u32,
value.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed) as u32, value.current_throughput.get_up() as u32,
), ),
node_type: value.node_type.clone(), node_type: value.node_type.clone(),
rtts: (min as u16, max as u16, avg as u16), rtts: (min as u16, max as u16, avg as u16),