diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index ce2aa956..c23c8975 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -2019,6 +2019,7 @@ dependencies = [ "log", "nix 0.29.0", "notify", + "num-traits", "serde", "thiserror", "zerocopy 0.6.6", diff --git a/src/rust/lqos_utils/Cargo.toml b/src/rust/lqos_utils/Cargo.toml index 20e6deaa..32f200f6 100644 --- a/src/rust/lqos_utils/Cargo.toml +++ b/src/rust/lqos_utils/Cargo.toml @@ -12,3 +12,4 @@ notify = { version = "5.0.0", default-features = false } # Not using crossbeam b thiserror = "1" byteorder = "1.4" zerocopy = { version = "0.6.1", features = ["simd"] } +num-traits = "0.2.19" diff --git a/src/rust/lqos_utils/src/units.rs b/src/rust/lqos_utils/src/units.rs index 251fc5a4..fe8b6380 100644 --- a/src/rust/lqos_utils/src/units.rs +++ b/src/rust/lqos_utils/src/units.rs @@ -1,3 +1,7 @@ mod atomic_down_up; +mod down_up; +mod up_down; -pub use atomic_down_up::*; \ No newline at end of file +pub use atomic_down_up::*; +pub use down_up::*; +pub use up_down::*; diff --git a/src/rust/lqos_utils/src/units/down_up.rs b/src/rust/lqos_utils/src/units/down_up.rs new file mode 100644 index 00000000..17a4a3e8 --- /dev/null +++ b/src/rust/lqos_utils/src/units/down_up.rs @@ -0,0 +1,107 @@ +use crate::units::UpDownOrder; + +/// Provides strong download/upload separation for +/// stored statistics to eliminate confusion. +#[repr(C)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub struct DownUpOrder { + pub down: T, + pub up: T, +} + +impl DownUpOrder +where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub + + num_traits::CheckedAdd + num_traits::SaturatingSub +{ + pub fn new(down: T, up: T) -> Self { + Self { down, up } + } + + pub fn zeroed() -> Self { + Self { down: T::zero(), up: T::zero() } + } + + pub fn both_less_than(&self, limit: T) -> bool { + self.down < limit && self.up < limit + } + + pub fn sum_exceeds(&self, limit: T) -> bool { + self.down + self.up > limit + } + + pub fn checked_sub_or_zero(&self, rhs: DownUpOrder) -> DownUpOrder { + let down = T::checked_sub(&self.down, &rhs.down).unwrap_or(T::zero()); + let up = T::checked_sub(&self.up, &rhs.up).unwrap_or(T::zero()); + DownUpOrder { down, up } + } + + pub fn checked_add(&mut self, rhs: DownUpOrder) { + self.down = self.down.checked_add(&rhs.down).unwrap_or(T::zero()); + self.up = self.up.checked_add(&rhs.up).unwrap_or(T::zero()); + } + + pub fn checked_add_direct(&mut self, down: T, up: T) { + self.down = self.down.checked_add(&down).unwrap_or(T::zero()); + self.up = self.up.checked_add(&up).unwrap_or(T::zero()); + } +} + +impl Into> for DownUpOrder { + fn into(self) -> UpDownOrder { + UpDownOrder { + up: self.down, + down: self.up + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reverse() { + let a = UpDownOrder::new(1, 2); + let b: DownUpOrder = a.into(); + assert_eq!(a.down, b.up); + } + + #[test] + fn test_checked_sub() { + let a = DownUpOrder::new(1u64, 1); + let b= DownUpOrder::new(1, 1); + let c = a.checked_sub_or_zero(b); + assert_eq!(c.up, 0); + assert_eq!(c.down, 0); + + let b= DownUpOrder::new(2, 2); + let c = a.checked_sub_or_zero(b); + assert_eq!(c.up, 0); + assert_eq!(c.down, 0); + } + + #[test] + fn test_checked_add() { + let mut a = DownUpOrder::new(u64::MAX, u64::MAX); + let b = DownUpOrder::new(1, 1); + a.checked_add(b); + assert_eq!(a.down, 0); + assert_eq!(a.up, 0); + let mut a = DownUpOrder::new(1, 2); + a.checked_add(b); + assert_eq!(a.down, 2); + assert_eq!(a.up, 3); + } + + #[test] + fn test_checked_add_direct() { + let mut a = DownUpOrder::new(u64::MAX, u64::MAX); + a.checked_add_direct(1, 1); + assert_eq!(a.down, 0); + assert_eq!(a.up, 0); + let mut a = DownUpOrder::new(1, 2); + a.checked_add_direct(1, 1); + assert_eq!(a.down, 2); + assert_eq!(a.up, 3); + } +} \ No newline at end of file diff --git a/src/rust/lqos_utils/src/units/up_down.rs b/src/rust/lqos_utils/src/units/up_down.rs new file mode 100644 index 00000000..e3007cb1 --- /dev/null +++ b/src/rust/lqos_utils/src/units/up_down.rs @@ -0,0 +1,88 @@ +use crate::units::DownUpOrder; + +/// Provides strong download/upload separation for +/// stored statistics to eliminate confusion. +#[repr(C)] +#[derive(Copy, Clone, Debug)] +pub struct UpDownOrder { + pub up: T, + pub down: T, +} + +impl UpDownOrder +where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub + + num_traits::CheckedAdd +{ + pub fn new(up: T, down: T) -> Self { + Self { + up, down + } + } + + pub fn zeroed() -> Self { + Self { down: T::zero(), up: T::zero() } + } + + pub fn both_less_than(&self, limit: T) -> bool { + self.down < limit && self.up < limit + } + + pub fn checked_sub_or_zero(&self, rhs: UpDownOrder) -> UpDownOrder { + let down = T::checked_sub(&self.down, &rhs.down).unwrap_or(T::zero()); + let up = T::checked_sub(&self.up, &rhs.up).unwrap_or(T::zero()); + UpDownOrder { down, up } + } + + pub fn checked_add(&mut self, rhs: UpDownOrder) { + self.down = self.down.checked_add(&rhs.down).unwrap_or(T::zero()); + self.up = self.up.checked_add(&rhs.up).unwrap_or(T::zero()); + } +} + +impl Into> for UpDownOrder { + fn into(self) -> DownUpOrder { + DownUpOrder { + up: self.down, + down: self.up, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reverse() { + let a = DownUpOrder::new(1, 2); + let b: UpDownOrder = a.into(); + assert_eq!(a.down, b.up); + } + + #[test] + fn test_checked_sub() { + let a = UpDownOrder::new(1u64, 1); + let b= UpDownOrder::new(1, 1); + let c = a.checked_sub_or_zero(b); + assert_eq!(c.up, 0); + assert_eq!(c.down, 0); + + let b= UpDownOrder::new(2, 2); + let c = a.checked_sub_or_zero(b); + assert_eq!(c.up, 0); + assert_eq!(c.down, 0); + } + + #[test] + fn test_checked_add() { + let mut a = UpDownOrder::new(u64::MAX, u64::MAX); + let b = UpDownOrder::new(1, 1); + a.checked_add(b); + assert_eq!(a.down, 0); + assert_eq!(a.up, 0); + let mut a = UpDownOrder::new(1, 2); + a.checked_add(b); + assert_eq!(a.down, 3); + assert_eq!(a.up, 2); + } +} \ No newline at end of file diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index a69f47f8..d448414e 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -27,7 +27,7 @@ log = "0" nix = "0" sysinfo = "0" dashmap = "5" -num-traits = "0.2" +num-traits = "0.2.19" thiserror = "1" itertools = "0.12.1" csv = "1" diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index d9e6cc9a..f520fb45 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -20,6 +20,7 @@ use tokio::{ sync::mpsc::Sender, time::{Duration, Instant}, }; +use lqos_utils::units::DownUpOrder; const RETIRE_AFTER_SECONDS: u64 = 30; @@ -144,7 +145,7 @@ async fn submit_throughput_stats(long_term_stats_tx: Sender) .map(|host| HostSummary { ip: host.key().as_ip(), circuit_id: host.circuit_id.clone(), - bits_per_second: (host.bytes_per_second.0 * 8, host.bytes_per_second.1 * 8), + bits_per_second: (host.bytes_per_second.down * 8, host.bytes_per_second.up * 8), median_rtt: host.median_latency().unwrap_or(0.0), }) .collect(); @@ -187,8 +188,7 @@ pub fn host_counters() -> BusResponse { let mut result = Vec::new(); THROUGHPUT_TRACKER.raw_data.iter().for_each(|v| { let ip = v.key().as_ip(); - let (down, up) = v.bytes_per_second; - result.push((ip, down, up)); + result.push((ip, v.bytes_per_second.down, v.bytes_per_second.up)); }); BusResponse::HostCounters(result) } @@ -198,7 +198,7 @@ fn retire_check(cycle: u64, recent_cycle: u64) -> bool { cycle < recent_cycle + RETIRE_AFTER_SECONDS } -type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String, (u64, u64)); +type TopList = (XdpIpAddress, DownUpOrder,DownUpOrder, f32, TcHandle, String, (u64, u64)); pub fn top_n(start: u32, end: u32) -> BusResponse { let mut full_list: Vec = { @@ -223,7 +223,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { }) .collect() }; - full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0)); + full_list.sort_by(|a, b| b.1.down.cmp(&a.1.down)); let result = full_list .iter() .skip(start as usize) @@ -231,8 +231,8 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { .map( |( ip, - (bytes_dn, bytes_up), - (packets_dn, packets_up), + bytes, + packets, median_rtt, tc_handle, circuit_id, @@ -240,8 +240,8 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes_dn * 8, bytes_up * 8), - packets_per_second: (*packets_dn, *packets_up), + bits_per_second: (bytes.down * 8, bytes.up * 8), + packets_per_second: (packets.down, packets.up), median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, tcp_retransmits: *tcp_retransmits, @@ -283,8 +283,8 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse { .map( |( ip, - (bytes_dn, bytes_up), - (packets_dn, packets_up), + bytes, + packets, median_rtt, tc_handle, circuit_id, @@ -292,8 +292,8 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes_dn * 8, bytes_up * 8), - packets_per_second: (*packets_dn, *packets_up), + bits_per_second: (bytes.down * 8, bytes.up * 8), + packets_per_second: (packets.down, packets.up), median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, tcp_retransmits: *tcp_retransmits, @@ -339,8 +339,8 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse { .map( |( ip, - (bytes_dn, bytes_up), - (packets_dn, packets_up), + bytes, + packets, median_rtt, tc_handle, circuit_id, @@ -348,8 +348,8 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes_dn * 8, bytes_up * 8), - packets_per_second: (*packets_dn, *packets_up), + bits_per_second: (bytes.down * 8, bytes.up * 8), + packets_per_second: (packets.down, packets.up), median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, tcp_retransmits: *tcp_retransmits, @@ -392,8 +392,8 @@ pub fn best_n(start: u32, end: u32) -> BusResponse { .map( |( ip, - (bytes_dn, bytes_up), - (packets_dn, packets_up), + bytes, + packets, median_rtt, tc_handle, circuit_id, @@ -401,8 +401,8 @@ pub fn best_n(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes_dn * 8, bytes_up * 8), - packets_per_second: (*packets_dn, *packets_up), + bits_per_second: (bytes.down * 8, bytes.up * 8), + packets_per_second: (packets.down, packets.up), median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, tcp_retransmits: *tcp_retransmits, @@ -503,7 +503,7 @@ pub fn host_counts() -> BusResponse { BusResponse::HostCounts((total, shaped)) } -type FullList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, u64); +type FullList = (XdpIpAddress, DownUpOrder, DownUpOrder, f32, TcHandle, u64); pub fn all_unknown_ips() -> BusResponse { let boot_time = time_since_boot(); @@ -542,16 +542,16 @@ pub fn all_unknown_ips() -> BusResponse { .map( |( ip, - (bytes_dn, bytes_up), - (packets_dn, packets_up), + bytes, + packets, median_rtt, tc_handle, _last_seen, )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: String::new(), - bits_per_second: (bytes_dn * 8, bytes_up * 8), - packets_per_second: (*packets_dn, *packets_up), + bits_per_second: (bytes.down * 8, bytes.up * 8), + packets_per_second: (packets.down, packets.up), median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, tcp_retransmits: (0, 0), diff --git a/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs b/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs index dbd29f8c..1c0ca187 100644 --- a/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs +++ b/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs @@ -1,4 +1,5 @@ use lqos_bus::TcHandle; +use lqos_utils::units::DownUpOrder; use super::flow_data::RttData; #[derive(Debug)] @@ -7,12 +8,12 @@ pub(crate) struct ThroughputEntry { pub(crate) network_json_parents: Option>, pub(crate) first_cycle: u64, pub(crate) most_recent_cycle: u64, - pub(crate) bytes: (u64, u64), - pub(crate) packets: (u64, u64), - pub(crate) prev_bytes: (u64, u64), - pub(crate) prev_packets: (u64, u64), - pub(crate) bytes_per_second: (u64, u64), - pub(crate) packets_per_second: (u64, u64), + pub(crate) bytes: DownUpOrder, // 0 DL, 1 UL + pub(crate) packets: DownUpOrder, // 0 DL, 1 UL + pub(crate) prev_bytes: DownUpOrder, // Has to mirror + pub(crate) prev_packets: DownUpOrder, + pub(crate) bytes_per_second: DownUpOrder, + pub(crate) packets_per_second: DownUpOrder, pub(crate) tc_handle: TcHandle, pub(crate) recent_rtt_data: [RttData; 60], pub(crate) last_fresh_rtt_data_cycle: u64, @@ -29,7 +30,7 @@ impl ThroughputEntry { /// less than 1 Mb of data---they are usually long-polling. pub(crate) fn median_latency(&self) -> Option { // Reject sub 1Mb flows - if self.bytes.0 < 1_000_000 || self.bytes.1 < 1_000_000 { + if self.bytes.both_less_than(1_000_000) { return None; } diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 4257af65..8a7f0a4b 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -6,7 +6,7 @@ use fxhash::FxHashMap; use lqos_bus::TcHandle; use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each}; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; -use lqos_utils::units::AtomicDownUp; +use lqos_utils::units::{AtomicDownUp, DownUpOrder}; pub struct ThroughputTracker { pub(crate) cycle: AtomicU64, @@ -35,14 +35,8 @@ impl ThroughputTracker { let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed); self.raw_data.iter_mut().for_each(|mut v| { if v.first_cycle < self_cycle { - v.bytes_per_second.0 = - u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0); - v.bytes_per_second.1 = - u64::checked_sub(v.bytes.1, v.prev_bytes.1).unwrap_or(0); - v.packets_per_second.0 = - u64::checked_sub(v.packets.0, v.prev_packets.0).unwrap_or(0); - v.packets_per_second.1 = - u64::checked_sub(v.packets.1, v.prev_packets.1).unwrap_or(0); + v.bytes_per_second = v.bytes.checked_sub_or_zero(v.prev_bytes); + v.packets_per_second = v.packets.checked_sub_or_zero(v.prev_packets); } v.prev_bytes = v.bytes; v.prev_packets = v.packets; @@ -110,13 +104,11 @@ impl ThroughputTracker { let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed); throughput_for_each(&mut |xdp_ip, counts| { if let Some(mut entry) = raw_data.get_mut(xdp_ip) { - entry.bytes = (0, 0); - entry.packets = (0, 0); + entry.bytes = DownUpOrder::zeroed(); + entry.packets = DownUpOrder::zeroed(); for c in counts { - entry.bytes.0 += c.download_bytes; - entry.bytes.1 += c.upload_bytes; - entry.packets.0 += c.download_packets; - entry.packets.1 += c.upload_packets; + entry.bytes.checked_add_direct(c.download_bytes, c.upload_bytes); + entry.packets.checked_add_direct(c.download_packets, c.upload_packets); if c.tc_handle != 0 { entry.tc_handle = TcHandle::from_u32(c.tc_handle); } @@ -132,8 +124,8 @@ impl ThroughputTracker { net_json.add_throughput_cycle( parents, ( - entry.bytes.0.saturating_sub(entry.prev_bytes.0), - entry.bytes.1.saturating_sub(entry.prev_bytes.1), + entry.bytes.down.saturating_sub(entry.prev_bytes.down), + entry.bytes.up.saturating_sub(entry.prev_bytes.up), ), ); } @@ -145,12 +137,12 @@ impl ThroughputTracker { network_json_parents: Self::lookup_network_parents(circuit_id), first_cycle: self_cycle, most_recent_cycle: 0, - bytes: (0, 0), - packets: (0, 0), - prev_bytes: (0, 0), - prev_packets: (0, 0), - bytes_per_second: (0, 0), - packets_per_second: (0, 0), + bytes: DownUpOrder::zeroed(), + packets: DownUpOrder::zeroed(), + prev_bytes: DownUpOrder::zeroed(), + prev_packets: DownUpOrder::zeroed(), + bytes_per_second: DownUpOrder::zeroed(), + packets_per_second: DownUpOrder::zeroed(), tc_handle: TcHandle::zero(), recent_rtt_data: [RttData::from_nanos(0); 60], last_fresh_rtt_data_cycle: 0, @@ -159,10 +151,8 @@ impl ThroughputTracker { last_tcp_retransmits: (0, 0), }; for c in counts { - entry.bytes.0 += c.download_bytes; - entry.bytes.1 += c.upload_bytes; - entry.packets.0 += c.download_packets; - entry.packets.1 += c.upload_packets; + entry.bytes.checked_add_direct(c.download_bytes, c.upload_bytes); + entry.packets.checked_add_direct(c.download_packets, c.upload_packets); if c.tc_handle != 0 { entry.tc_handle = TcHandle::from_u32(c.tc_handle); } @@ -275,7 +265,7 @@ impl ThroughputTracker { let median = rtts[rtts.len() / 2]; if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) { // Only apply if the flow has achieved 1 Mbps or more - if tracker.bytes_per_second.0 + tracker.bytes_per_second.1 > 125000 { + if tracker.bytes_per_second.sum_exceeds(125_000) { // Shift left for i in 1..60 { tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1]; @@ -345,10 +335,10 @@ impl ThroughputTracker { ) .map(|v| { ( - v.bytes.0.saturating_sub(v.prev_bytes.0), - v.bytes.1.saturating_sub(v.prev_bytes.1), - v.packets.0.saturating_sub(v.prev_packets.0), - v.packets.1.saturating_sub(v.prev_packets.1), + v.bytes.down.saturating_sub(v.prev_bytes.down), + v.bytes.up.saturating_sub(v.prev_bytes.up), + v.packets.down.saturating_sub(v.prev_packets.down), + v.packets.up.saturating_sub(v.prev_packets.up), v.tc_handle.as_u32() > 0, ) })