High watermark uses the same atomic up/down structure

This commit is contained in:
Herbert Wolverson 2024-07-01 15:37:44 -05:00
parent 5a69ca5ea8
commit c704f231e9
5 changed files with 21 additions and 13 deletions

View File

@ -38,4 +38,12 @@ impl AtomicDownUp {
pub fn get_up(&self) -> u64 { pub fn get_up(&self) -> u64 {
self.up.load(Relaxed) self.up.load(Relaxed)
} }
pub fn set_down(&self, n: u64) {
self.down.store(n, Relaxed);
}
pub fn set_up(&self, n: u64) {
self.up.store(n, Relaxed);
}
} }

View File

@ -4,7 +4,7 @@ use std::{time::Duration, net::TcpStream, io::Write};
use lqos_bus::anonymous::{AnonymousUsageV1, build_stats}; use lqos_bus::anonymous::{AnonymousUsageV1, build_stats};
use lqos_sys::num_possible_cpus; use lqos_sys::num_possible_cpus;
use sysinfo::System; use sysinfo::System;
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}}; use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::HIGH_WATERMARK};
const SLOW_START_SECS: u64 = 1; const SLOW_START_SECS: u64 = 1;
const INTERVAL_SECS: u64 = 60 * 60 * 24; const INTERVAL_SECS: u64 = 60 * 60 * 24;
@ -74,8 +74,8 @@ fn anonymous_usage_dump() -> anyhow::Result<()> {
data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len(); data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len();
data.high_watermark_bps = ( data.high_watermark_bps = (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK.get_down(),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK.get_up(),
); );

View File

@ -28,7 +28,7 @@ use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM}, consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals, iterator::Signals,
}; };
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED}; use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK, FLOWS_TRACKED};
use throughput_tracker::flow_data::get_rtt_events_per_second; use throughput_tracker::flow_data::get_rtt_events_per_second;
use tokio::join; use tokio::join;
mod stats; mod stats;
@ -224,8 +224,8 @@ fn handle_bus_requests(
bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed), bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed),
time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed), time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed),
high_watermark: ( high_watermark: (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK.get_down(),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK.get_up(),
), ),
tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed), tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed),
rtt_events_per_second: get_rtt_events_per_second(), rtt_events_per_second: get_rtt_events_per_second(),

View File

@ -1,7 +1,7 @@
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use lqos_utils::units::AtomicDownUp;
pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0); pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0);
pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0); pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0); pub static HIGH_WATERMARK: AtomicDownUp = AtomicDownUp::zeroed();
pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0);
pub static FLOWS_TRACKED: AtomicU64 = AtomicU64::new(0); pub static FLOWS_TRACKED: AtomicU64 = AtomicU64::new(0);

View File

@ -1,5 +1,5 @@
use std::{sync::atomic::AtomicU64, time::Duration}; use std::{sync::atomic::AtomicU64, time::Duration};
use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}}; use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::HIGH_WATERMARK, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}};
use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap; use dashmap::DashMap;
use fxhash::FxHashMap; use fxhash::FxHashMap;
@ -382,14 +382,14 @@ impl ThroughputTracker {
let current = self.bits_per_second(); let current = self.bits_per_second();
if current.0 < 100000000000 && current.1 < 100000000000 { if current.0 < 100000000000 && current.1 < 100000000000 {
let prev_max = ( let prev_max = (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK.get_down(),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK.get_up(),
); );
if current.0 > prev_max.0 { if current.0 > prev_max.0 {
HIGH_WATERMARK_DOWN.store(current.0, std::sync::atomic::Ordering::Relaxed); HIGH_WATERMARK.set_down(current.0);
} }
if current.1 > prev_max.1 { if current.1 > prev_max.1 {
HIGH_WATERMARK_UP.store(current.1, std::sync::atomic::Ordering::Relaxed); HIGH_WATERMARK.set_up(current.1);
} }
} }
} }