diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index 2c0e6fc7..8c6a5fc8 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -25,18 +25,18 @@ pub enum BusResponse { /// Current throughput for the overall system. CurrentThroughput { /// In bps - bits_per_second: (u64, u64), + bits_per_second: DownUpOrder, /// In pps - packets_per_second: (u64, u64), + packets_per_second: DownUpOrder, /// How much of the response has been subject to the shaper? - shaped_bits_per_second: (u64, u64), + shaped_bits_per_second: DownUpOrder, }, /// Provides a list of ALL mapped hosts traffic counters, /// listing the IP Address and upload/download in a tuple. - HostCounters(Vec<(IpAddr, u64, u64)>), + HostCounters(Vec<(IpAddr, DownUpOrder)>), /// Provides the Top N downloaders IP stats. TopDownloaders(Vec), @@ -90,7 +90,7 @@ pub enum BusResponse { /// Us to poll hosts time_to_poll_hosts: u64, /// High traffic watermark - high_watermark: (u64, u64), + high_watermark: DownUpOrder, /// Number of flows tracked tracked_flows: u64, /// RTT events per second @@ -141,19 +141,19 @@ pub enum BusResponse { /// Summary of Ether Protocol EtherProtocols{ /// Number of IPv4 Bytes - v4_bytes: [u64; 2], + v4_bytes: DownUpOrder, /// Number of IPv6 Bytes - v6_bytes: [u64; 2], + v6_bytes: DownUpOrder, /// Number of IPv4 Packets - v4_packets: [u64; 2], + v4_packets: DownUpOrder, /// Number of IPv6 Packets - v6_packets: [u64; 2], + v6_packets: DownUpOrder, /// Number of IPv4 Flows - v4_rtt: [u64; 2], + v4_rtt: DownUpOrder, /// Number of IPv6 Flows - v6_rtt: [u64; 2], + v6_rtt: DownUpOrder, }, /// Summary of IP Protocols - IpProtocols(Vec<(String, (u64, u64))>), + IpProtocols(Vec<(String, DownUpOrder)>), } diff --git a/src/rust/lqos_node_manager/src/config_control.rs b/src/rust/lqos_node_manager/src/config_control.rs index e3e70966..f166c362 100644 --- a/src/rust/lqos_node_manager/src/config_control.rs +++ b/src/rust/lqos_node_manager/src/config_control.rs @@ -135,7 +135,7 @@ pub async fn stats() -> NoCache> { return NoCache::new(Json(LqosStats { bus_requests_since_start: bus_requests, time_to_poll_hosts_us: time_to_poll_hosts, - high_watermark, + high_watermark: (high_watermark.down, high_watermark.up), tracked_flows, rtt_events_per_second, })); diff --git a/src/rust/lqos_node_manager/src/flow_monitor.rs b/src/rust/lqos_node_manager/src/flow_monitor.rs index 4e949d85..7627aaa1 100644 --- a/src/rust/lqos_node_manager/src/flow_monitor.rs +++ b/src/rust/lqos_node_manager/src/flow_monitor.rs @@ -92,6 +92,10 @@ pub async fn flows_ip_protocol() -> NoCache>> { BusResponse::IpProtocols(ip_protocols) => ip_protocols.to_owned(), _ => Vec::new(), }; + let result = result. + into_iter() + .map(|(name, bytes)| (name, (bytes.down, bytes.up))) + .collect(); NoCache::new(Json(result)) } \ No newline at end of file diff --git a/src/rust/lqos_node_manager/src/network_tree.rs b/src/rust/lqos_node_manager/src/network_tree.rs index b02ccff3..0afb130e 100644 --- a/src/rust/lqos_node_manager/src/network_tree.rs +++ b/src/rust/lqos_node_manager/src/network_tree.rs @@ -62,7 +62,7 @@ pub async fn tree_clients( { let devices = SHAPED_DEVICES.read().unwrap(); if let BusResponse::HostCounters(hosts) = msg { - for (ip, down, up) in hosts.iter() { + for (ip, bytes) in hosts.iter() { let lookup = match ip { IpAddr::V4(ip) => ip.to_ipv6_mapped(), IpAddr::V6(ip) => *ip, @@ -72,7 +72,7 @@ pub async fn tree_clients( result.push(CircuitThroughput { id: devices.devices[*c.1].circuit_id.clone(), name: devices.devices[*c.1].circuit_name.clone(), - traffic: (*down, *up), + traffic: (bytes.down, bytes.up), limit: ( devices.devices[*c.1].download_max_mbps as u64, devices.devices[*c.1].upload_max_mbps as u64, diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index 89a77bff..6d8a6d74 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -71,14 +71,14 @@ pub async fn current_circuit_throughput( { if let BusResponse::HostCounters(hosts) = msg { let devices = SHAPED_DEVICES.read().unwrap(); - for (ip, down, up) in hosts.iter() { + for (ip, bytes) in hosts.iter() { let lookup = match ip { IpAddr::V4(ip) => ip.to_ipv6_mapped(), IpAddr::V6(ip) => *ip, }; if let Some(c) = devices.trie.longest_match(lookup) { if devices.devices[*c.1].circuit_id == circuit_id { - result.push((ip.to_string(), *down, *up)); + result.push((ip.to_string(), bytes.down, bytes.up)); } } } diff --git a/src/rust/lqos_node_manager/src/tracker/cache/throughput.rs b/src/rust/lqos_node_manager/src/tracker/cache/throughput.rs index ebf5c54a..4f18d212 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache/throughput.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache/throughput.rs @@ -45,9 +45,9 @@ impl TotalThroughput { { let mut lock = self.inner.lock().unwrap(); let head = lock.head; - lock.data[head].bits_per_second = bits_per_second; - lock.data[head].packets_per_second = packets_per_second; - lock.data[head].shaped_bits_per_second = shaped_bits_per_second; + lock.data[head].bits_per_second = (bits_per_second.down, bits_per_second.up); + lock.data[head].packets_per_second = (packets_per_second.down, packets_per_second.up); + lock.data[head].shaped_bits_per_second = (shaped_bits_per_second.down, shaped_bits_per_second.up); lock.prev_head = lock.head; lock.head += 1; lock.head %= 300; diff --git a/src/rust/lqos_utils/src/units/atomic_down_up.rs b/src/rust/lqos_utils/src/units/atomic_down_up.rs index cc5f332c..9f67242f 100644 --- a/src/rust/lqos_utils/src/units/atomic_down_up.rs +++ b/src/rust/lqos_utils/src/units/atomic_down_up.rs @@ -1,5 +1,6 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering::Relaxed; +use crate::units::DownUpOrder; #[derive(Debug)] pub struct AtomicDownUp { @@ -47,4 +48,11 @@ impl AtomicDownUp { pub fn set_up(&self, n: u64) { self.up.store(n, Relaxed); } + + pub fn as_down_up(&self) -> DownUpOrder { + DownUpOrder::new( + self.get_down(), + self.get_up() + ) + } } \ No newline at end of file diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index 50705672..6bab9fda 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -223,10 +223,7 @@ fn handle_bus_requests( BusResponse::LqosdStats { bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed), time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed), - high_watermark: ( - HIGH_WATERMARK.get_down(), - HIGH_WATERMARK.get_up(), - ), + high_watermark: HIGH_WATERMARK.as_down_up(), tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed), rtt_events_per_second: get_rtt_events_per_second(), } diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs index ca04a96b..df899a37 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs @@ -157,10 +157,10 @@ impl TimeBuffer { pub fn ether_protocol_summary(&self) -> BusResponse { let buffer = self.buffer.lock().unwrap(); - let mut v4_bytes_sent = [0,0]; - let mut v4_packets_sent = [0,0]; - let mut v6_bytes_sent = [0,0]; - let mut v6_packets_sent = [0,0]; + let mut v4_bytes_sent = DownUpOrder::zeroed(); + let mut v4_packets_sent = DownUpOrder::zeroed(); + let mut v6_bytes_sent = DownUpOrder::zeroed(); + let mut v6_packets_sent = DownUpOrder::zeroed(); let mut v4_rtt = [Vec::new(), Vec::new()]; let mut v6_rtt = [Vec::new(), Vec::new()]; @@ -170,10 +170,8 @@ impl TimeBuffer { let (key, data, _analysis) = &v.data; if key.local_ip.is_v4() { // It's V4 - v4_bytes_sent[0] += data.bytes_sent.down; - v4_bytes_sent[1] += data.bytes_sent.up; - v4_packets_sent[0] += data.packets_sent.down; - v4_packets_sent[1] += data.packets_sent.up; + v4_bytes_sent.checked_add(data.bytes_sent); + v4_packets_sent.checked_add(data.packets_sent); if data.rtt[0].as_nanos() > 0 { v4_rtt[0].push(data.rtt[0].as_nanos()); } @@ -182,10 +180,8 @@ impl TimeBuffer { } } else { // It's V6 - v6_bytes_sent[0] += data.bytes_sent.down; - v6_bytes_sent[1] += data.bytes_sent.up; - v6_packets_sent[0] += data.packets_sent.down; - v6_packets_sent[1] += data.packets_sent.up; + v6_bytes_sent.checked_add(data.bytes_sent); + v6_packets_sent.checked_add(data.packets_sent); if data.rtt[0].as_nanos() > 0 { v6_rtt[0].push(data.rtt[0].as_nanos()); } @@ -196,14 +192,14 @@ impl TimeBuffer { } }); - let v4_rtt = [ + let v4_rtt = DownUpOrder::new( Self::median(&v4_rtt[0]), Self::median(&v4_rtt[1]), - ]; - let v6_rtt = [ + ); + let v6_rtt = DownUpOrder::new( Self::median(&v6_rtt[0]), Self::median(&v6_rtt[1]), - ]; + ); BusResponse::EtherProtocols { v4_bytes: v4_bytes_sent, @@ -215,7 +211,7 @@ impl TimeBuffer { } } - pub fn ip_protocol_summary(&self) -> Vec<(String, (u64, u64))> { + pub fn ip_protocol_summary(&self) -> Vec<(String, DownUpOrder)> { let buffer = self.buffer.lock().unwrap(); let mut results = FxHashMap::default(); @@ -225,13 +221,12 @@ impl TimeBuffer { .for_each(|v| { let (_key, data, analysis) = &v.data; let proto = analysis.protocol_analysis.to_string(); - let entry = results.entry(proto).or_insert((0, 0)); - entry.0 += data.bytes_sent.down; - entry.1 += data.bytes_sent.up; + let entry = results.entry(proto).or_insert(DownUpOrder::zeroed()); + entry.checked_add(data.bytes_sent); }); - let mut results = results.into_iter().collect::>(); - results.sort_by(|a, b| b.1.1.cmp(&a.1.1)); + let mut results = results.into_iter().collect::)>>(); + results.sort_by(|a, b| b.1.up.cmp(&a.1.up)); // Keep only the top 10 results.truncate(10); results diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index c33e6adb..7ee7f687 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -152,8 +152,8 @@ async fn submit_throughput_stats(long_term_stats_tx: Sender) let summary = Box::new(( ThroughputSummary { - bits_per_second, - shaped_bits_per_second, + bits_per_second: (bits_per_second.down, bits_per_second.up), + shaped_bits_per_second: (shaped_bits_per_second.down, shaped_bits_per_second.up), packets_per_second, hosts, }, @@ -188,7 +188,7 @@ pub fn host_counters() -> BusResponse { let mut result = Vec::new(); THROUGHPUT_TRACKER.raw_data.iter().for_each(|v| { let ip = v.key().as_ip(); - result.push((ip, v.bytes_per_second.down, v.bytes_per_second.up)); + result.push((ip, v.bytes_per_second)); }); BusResponse::HostCounters(result) } diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index fd357cad..e47a9ffe 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -353,16 +353,16 @@ impl ThroughputTracker { }); let current = self.bits_per_second(); - if current.0 < 100000000000 && current.1 < 100000000000 { + if current.both_less_than(100000000000) { let prev_max = ( HIGH_WATERMARK.get_down(), HIGH_WATERMARK.get_up(), ); - if current.0 > prev_max.0 { - HIGH_WATERMARK.set_down(current.0); + if current.down > prev_max.0 { + HIGH_WATERMARK.set_down(current.down); } - if current.1 > prev_max.1 { - HIGH_WATERMARK.set_up(current.1); + if current.up > prev_max.1 { + HIGH_WATERMARK.set_up(current.up); } } } @@ -371,25 +371,16 @@ impl ThroughputTracker { self.cycle.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - pub(crate) fn bits_per_second(&self) -> (u64, u64) { - ( - self.bytes_per_second.get_down() * 8, - self.bytes_per_second.get_up() * 8, - ) + pub(crate) fn bits_per_second(&self) -> DownUpOrder { + self.bytes_per_second.as_down_up().to_bits_from_bytes() } - pub(crate) fn shaped_bits_per_second(&self) -> (u64, u64) { - ( - self.shaped_bytes_per_second.get_down() * 8, - self.shaped_bytes_per_second.get_up() * 8 - ) + pub(crate) fn shaped_bits_per_second(&self) -> DownUpOrder { + self.shaped_bytes_per_second.as_down_up().to_bits_from_bytes() } - pub(crate) fn packets_per_second(&self) -> (u64, u64) { - ( - self.packets_per_second.get_down(), - self.packets_per_second.get_up(), - ) + pub(crate) fn packets_per_second(&self) -> DownUpOrder { + self.packets_per_second.as_down_up() } #[allow(dead_code)] diff --git a/src/rust/lqtop/src/widgets/network_sparkline.rs b/src/rust/lqtop/src/widgets/network_sparkline.rs index ac5409b2..25adb3f6 100644 --- a/src/rust/lqtop/src/widgets/network_sparkline.rs +++ b/src/rust/lqtop/src/widgets/network_sparkline.rs @@ -9,6 +9,7 @@ use ratatui::{ widgets::*, }; use std::sync::mpsc::{Receiver, Sender}; +use lqos_utils::units::DownUpOrder; pub struct NetworkSparkline { bus_link: tokio::sync::mpsc::Sender, @@ -67,32 +68,31 @@ impl TopWidget for NetworkSparkline { let bps_down: Vec<(f64, f64)> = raw_data .iter() .enumerate() - .map(|(i, &val)| (i as f64, val.bits_per_second.1 as f64)) + .map(|(i, &val)| (i as f64, val.bits_per_second.up as f64)) .collect(); let bps_up: Vec<(f64, f64)> = raw_data .iter() .enumerate() - .map(|(i, &val)| (i as f64, val.bits_per_second.0 as f64)) + .map(|(i, &val)| (i as f64, val.bits_per_second.down as f64)) .collect(); let shaped_down: Vec<(f64, f64)> = raw_data .iter() .enumerate() - .map(|(i, &val)| (i as f64, val.shaped_bits_per_second.1 as f64)) + .map(|(i, &val)| (i as f64, val.shaped_bits_per_second.up as f64)) .collect(); let shaped_up: Vec<(f64, f64)> = raw_data .iter() .enumerate() - .map(|(i, &val)| (i as f64, val.shaped_bits_per_second.0 as f64)) + .map(|(i, &val)| (i as f64, val.shaped_bits_per_second.down as f64)) .collect(); - let (up, down) = self.current_throughput.bits_per_second; let title = format!( " [Throughput (Down: {} Up: {})]", - scale_bits(up), - scale_bits(down) + scale_bits(self.current_throughput.bits_per_second.down), + scale_bits(self.current_throughput.bits_per_second.up) ); let block = Block::default() @@ -169,7 +169,7 @@ impl NetworkSparkline { #[derive(Default, Copy, Clone)] struct CurrentThroughput { - pub bits_per_second: (u64, u64), - pub _packets_per_second: (u64, u64), - pub shaped_bits_per_second: (u64, u64), + pub bits_per_second: DownUpOrder, + pub _packets_per_second: DownUpOrder, + pub shaped_bits_per_second: DownUpOrder, }