diff --git a/src/rust/lqos_utils/src/units/atomic_down_up.rs b/src/rust/lqos_utils/src/units/atomic_down_up.rs index 7570d2a5..413fa1d2 100644 --- a/src/rust/lqos_utils/src/units/atomic_down_up.rs +++ b/src/rust/lqos_utils/src/units/atomic_down_up.rs @@ -38,4 +38,12 @@ impl AtomicDownUp { pub fn get_up(&self) -> u64 { self.up.load(Relaxed) } + + pub fn set_down(&self, n: u64) { + self.down.store(n, Relaxed); + } + + pub fn set_up(&self, n: u64) { + self.up.store(n, Relaxed); + } } \ No newline at end of file diff --git a/src/rust/lqosd/src/anonymous_usage/mod.rs b/src/rust/lqosd/src/anonymous_usage/mod.rs index 24339e16..fbbb44c1 100644 --- a/src/rust/lqosd/src/anonymous_usage/mod.rs +++ b/src/rust/lqosd/src/anonymous_usage/mod.rs @@ -4,7 +4,7 @@ use std::{time::Duration, net::TcpStream, io::Write}; use lqos_bus::anonymous::{AnonymousUsageV1, build_stats}; use lqos_sys::num_possible_cpus; use sysinfo::System; -use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}}; +use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::HIGH_WATERMARK}; const SLOW_START_SECS: u64 = 1; const INTERVAL_SECS: u64 = 60 * 60 * 24; @@ -74,8 +74,8 @@ fn anonymous_usage_dump() -> anyhow::Result<()> { data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len(); data.high_watermark_bps = ( - HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), - HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), + HIGH_WATERMARK.get_down(), + HIGH_WATERMARK.get_up(), ); diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index ce54a009..50705672 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -28,7 +28,7 @@ use signal_hook::{ consts::{SIGHUP, SIGINT, SIGTERM}, iterator::Signals, }; -use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED}; +use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK, FLOWS_TRACKED}; use throughput_tracker::flow_data::get_rtt_events_per_second; use tokio::join; mod stats; @@ -224,8 +224,8 @@ fn handle_bus_requests( bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed), time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed), high_watermark: ( - HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), - HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), + HIGH_WATERMARK.get_down(), + HIGH_WATERMARK.get_up(), ), tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed), rtt_events_per_second: get_rtt_events_per_second(), diff --git a/src/rust/lqosd/src/stats.rs b/src/rust/lqosd/src/stats.rs index e3763b7c..135ca763 100644 --- a/src/rust/lqosd/src/stats.rs +++ b/src/rust/lqosd/src/stats.rs @@ -1,7 +1,7 @@ use std::sync::atomic::AtomicU64; +use lqos_utils::units::AtomicDownUp; pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0); pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0); -pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0); -pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0); +pub static HIGH_WATERMARK: AtomicDownUp = AtomicDownUp::zeroed(); pub static FLOWS_TRACKED: AtomicU64 = AtomicU64::new(0); diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index d4b8639e..df940916 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -1,5 +1,5 @@ use std::{sync::atomic::AtomicU64, time::Duration}; -use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}}; +use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::HIGH_WATERMARK, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}}; use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use dashmap::DashMap; use fxhash::FxHashMap; @@ -382,14 +382,14 @@ impl ThroughputTracker { let current = self.bits_per_second(); if current.0 < 100000000000 && current.1 < 100000000000 { let prev_max = ( - HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), - HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), + HIGH_WATERMARK.get_down(), + HIGH_WATERMARK.get_up(), ); if current.0 > prev_max.0 { - HIGH_WATERMARK_DOWN.store(current.0, std::sync::atomic::Ordering::Relaxed); + HIGH_WATERMARK.set_down(current.0); } if current.1 > prev_max.1 { - HIGH_WATERMARK_UP.store(current.1, std::sync::atomic::Ordering::Relaxed); + HIGH_WATERMARK.set_up(current.1); } } }