Change the core throughput tracker entries to use DownUp<u64> rather than (u64, u64) for clarity. Added lots of helpers and improved code readability.

This commit is contained in:
Herbert Wolverson 2024-07-02 10:37:01 -05:00
parent b29831a5ae
commit 2675d56e6b
9 changed files with 259 additions and 67 deletions

1
src/rust/Cargo.lock generated
View File

@ -2019,6 +2019,7 @@ dependencies = [
"log",
"nix 0.29.0",
"notify",
"num-traits",
"serde",
"thiserror",
"zerocopy 0.6.6",

View File

@ -12,3 +12,4 @@ notify = { version = "5.0.0", default-features = false } # Not using crossbeam b
thiserror = "1"
byteorder = "1.4"
zerocopy = { version = "0.6.1", features = ["simd"] }
num-traits = "0.2.19"

View File

@ -1,3 +1,7 @@
mod atomic_down_up;
mod down_up;
mod up_down;
pub use atomic_down_up::*;
pub use down_up::*;
pub use up_down::*;

View File

@ -0,0 +1,107 @@
use crate::units::UpDownOrder;
/// Provides strong download/upload separation for
/// stored statistics to eliminate confusion.
#[repr(C)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct DownUpOrder<T> {
pub down: T,
pub up: T,
}
impl <T> DownUpOrder<T>
where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub
+ num_traits::CheckedAdd + num_traits::SaturatingSub
{
pub fn new(down: T, up: T) -> Self {
Self { down, up }
}
pub fn zeroed() -> Self {
Self { down: T::zero(), up: T::zero() }
}
pub fn both_less_than(&self, limit: T) -> bool {
self.down < limit && self.up < limit
}
pub fn sum_exceeds(&self, limit: T) -> bool {
self.down + self.up > limit
}
pub fn checked_sub_or_zero(&self, rhs: DownUpOrder<T>) -> DownUpOrder<T> {
let down = T::checked_sub(&self.down, &rhs.down).unwrap_or(T::zero());
let up = T::checked_sub(&self.up, &rhs.up).unwrap_or(T::zero());
DownUpOrder { down, up }
}
pub fn checked_add(&mut self, rhs: DownUpOrder<T>) {
self.down = self.down.checked_add(&rhs.down).unwrap_or(T::zero());
self.up = self.up.checked_add(&rhs.up).unwrap_or(T::zero());
}
pub fn checked_add_direct(&mut self, down: T, up: T) {
self.down = self.down.checked_add(&down).unwrap_or(T::zero());
self.up = self.up.checked_add(&up).unwrap_or(T::zero());
}
}
impl <T> Into<UpDownOrder<T>> for DownUpOrder<T> {
fn into(self) -> UpDownOrder<T> {
UpDownOrder {
up: self.down,
down: self.up
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reverse() {
let a = UpDownOrder::new(1, 2);
let b: DownUpOrder<i32> = a.into();
assert_eq!(a.down, b.up);
}
#[test]
fn test_checked_sub() {
let a = DownUpOrder::new(1u64, 1);
let b= DownUpOrder::new(1, 1);
let c = a.checked_sub_or_zero(b);
assert_eq!(c.up, 0);
assert_eq!(c.down, 0);
let b= DownUpOrder::new(2, 2);
let c = a.checked_sub_or_zero(b);
assert_eq!(c.up, 0);
assert_eq!(c.down, 0);
}
#[test]
fn test_checked_add() {
let mut a = DownUpOrder::new(u64::MAX, u64::MAX);
let b = DownUpOrder::new(1, 1);
a.checked_add(b);
assert_eq!(a.down, 0);
assert_eq!(a.up, 0);
let mut a = DownUpOrder::new(1, 2);
a.checked_add(b);
assert_eq!(a.down, 2);
assert_eq!(a.up, 3);
}
#[test]
fn test_checked_add_direct() {
let mut a = DownUpOrder::new(u64::MAX, u64::MAX);
a.checked_add_direct(1, 1);
assert_eq!(a.down, 0);
assert_eq!(a.up, 0);
let mut a = DownUpOrder::new(1, 2);
a.checked_add_direct(1, 1);
assert_eq!(a.down, 2);
assert_eq!(a.up, 3);
}
}

View File

@ -0,0 +1,88 @@
use crate::units::DownUpOrder;
/// Provides strong download/upload separation for
/// stored statistics to eliminate confusion.
#[repr(C)]
#[derive(Copy, Clone, Debug)]
pub struct UpDownOrder<T> {
pub up: T,
pub down: T,
}
impl<T> UpDownOrder<T>
where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub
+ num_traits::CheckedAdd
{
pub fn new(up: T, down: T) -> Self {
Self {
up, down
}
}
pub fn zeroed() -> Self {
Self { down: T::zero(), up: T::zero() }
}
pub fn both_less_than(&self, limit: T) -> bool {
self.down < limit && self.up < limit
}
pub fn checked_sub_or_zero(&self, rhs: UpDownOrder<T>) -> UpDownOrder<T> {
let down = T::checked_sub(&self.down, &rhs.down).unwrap_or(T::zero());
let up = T::checked_sub(&self.up, &rhs.up).unwrap_or(T::zero());
UpDownOrder { down, up }
}
pub fn checked_add(&mut self, rhs: UpDownOrder<T>) {
self.down = self.down.checked_add(&rhs.down).unwrap_or(T::zero());
self.up = self.up.checked_add(&rhs.up).unwrap_or(T::zero());
}
}
impl <T> Into<DownUpOrder<T>> for UpDownOrder<T> {
fn into(self) -> DownUpOrder<T> {
DownUpOrder {
up: self.down,
down: self.up,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_reverse() {
let a = DownUpOrder::new(1, 2);
let b: UpDownOrder<i32> = a.into();
assert_eq!(a.down, b.up);
}
#[test]
fn test_checked_sub() {
let a = UpDownOrder::new(1u64, 1);
let b= UpDownOrder::new(1, 1);
let c = a.checked_sub_or_zero(b);
assert_eq!(c.up, 0);
assert_eq!(c.down, 0);
let b= UpDownOrder::new(2, 2);
let c = a.checked_sub_or_zero(b);
assert_eq!(c.up, 0);
assert_eq!(c.down, 0);
}
#[test]
fn test_checked_add() {
let mut a = UpDownOrder::new(u64::MAX, u64::MAX);
let b = UpDownOrder::new(1, 1);
a.checked_add(b);
assert_eq!(a.down, 0);
assert_eq!(a.up, 0);
let mut a = UpDownOrder::new(1, 2);
a.checked_add(b);
assert_eq!(a.down, 3);
assert_eq!(a.up, 2);
}
}

View File

@ -27,7 +27,7 @@ log = "0"
nix = "0"
sysinfo = "0"
dashmap = "5"
num-traits = "0.2"
num-traits = "0.2.19"
thiserror = "1"
itertools = "0.12.1"
csv = "1"

View File

@ -20,6 +20,7 @@ use tokio::{
sync::mpsc::Sender,
time::{Duration, Instant},
};
use lqos_utils::units::DownUpOrder;
const RETIRE_AFTER_SECONDS: u64 = 30;
@ -144,7 +145,7 @@ async fn submit_throughput_stats(long_term_stats_tx: Sender<StatsUpdateMessage>)
.map(|host| HostSummary {
ip: host.key().as_ip(),
circuit_id: host.circuit_id.clone(),
bits_per_second: (host.bytes_per_second.0 * 8, host.bytes_per_second.1 * 8),
bits_per_second: (host.bytes_per_second.down * 8, host.bytes_per_second.up * 8),
median_rtt: host.median_latency().unwrap_or(0.0),
})
.collect();
@ -187,8 +188,7 @@ pub fn host_counters() -> BusResponse {
let mut result = Vec::new();
THROUGHPUT_TRACKER.raw_data.iter().for_each(|v| {
let ip = v.key().as_ip();
let (down, up) = v.bytes_per_second;
result.push((ip, down, up));
result.push((ip, v.bytes_per_second.down, v.bytes_per_second.up));
});
BusResponse::HostCounters(result)
}
@ -198,7 +198,7 @@ fn retire_check(cycle: u64, recent_cycle: u64) -> bool {
cycle < recent_cycle + RETIRE_AFTER_SECONDS
}
type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String, (u64, u64));
type TopList = (XdpIpAddress, DownUpOrder<u64>,DownUpOrder<u64>, f32, TcHandle, String, (u64, u64));
pub fn top_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
@ -223,7 +223,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
})
.collect()
};
full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
full_list.sort_by(|a, b| b.1.down.cmp(&a.1.down));
let result = full_list
.iter()
.skip(start as usize)
@ -231,8 +231,8 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
bytes,
packets,
median_rtt,
tc_handle,
circuit_id,
@ -240,8 +240,8 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: *tcp_retransmits,
@ -283,8 +283,8 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
bytes,
packets,
median_rtt,
tc_handle,
circuit_id,
@ -292,8 +292,8 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: *tcp_retransmits,
@ -339,8 +339,8 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse {
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
bytes,
packets,
median_rtt,
tc_handle,
circuit_id,
@ -348,8 +348,8 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: *tcp_retransmits,
@ -392,8 +392,8 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
bytes,
packets,
median_rtt,
tc_handle,
circuit_id,
@ -401,8 +401,8 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: *tcp_retransmits,
@ -503,7 +503,7 @@ pub fn host_counts() -> BusResponse {
BusResponse::HostCounts((total, shaped))
}
type FullList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, u64);
type FullList = (XdpIpAddress, DownUpOrder<u64>, DownUpOrder<u64>, f32, TcHandle, u64);
pub fn all_unknown_ips() -> BusResponse {
let boot_time = time_since_boot();
@ -542,16 +542,16 @@ pub fn all_unknown_ips() -> BusResponse {
.map(
|(
ip,
(bytes_dn, bytes_up),
(packets_dn, packets_up),
bytes,
packets,
median_rtt,
tc_handle,
_last_seen,
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: String::new(),
bits_per_second: (bytes_dn * 8, bytes_up * 8),
packets_per_second: (*packets_dn, *packets_up),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: (0, 0),

View File

@ -1,4 +1,5 @@
use lqos_bus::TcHandle;
use lqos_utils::units::DownUpOrder;
use super::flow_data::RttData;
#[derive(Debug)]
@ -7,12 +8,12 @@ pub(crate) struct ThroughputEntry {
pub(crate) network_json_parents: Option<Vec<usize>>,
pub(crate) first_cycle: u64,
pub(crate) most_recent_cycle: u64,
pub(crate) bytes: (u64, u64),
pub(crate) packets: (u64, u64),
pub(crate) prev_bytes: (u64, u64),
pub(crate) prev_packets: (u64, u64),
pub(crate) bytes_per_second: (u64, u64),
pub(crate) packets_per_second: (u64, u64),
pub(crate) bytes: DownUpOrder<u64>, // 0 DL, 1 UL
pub(crate) packets: DownUpOrder<u64>, // 0 DL, 1 UL
pub(crate) prev_bytes: DownUpOrder<u64>, // Has to mirror
pub(crate) prev_packets: DownUpOrder<u64>,
pub(crate) bytes_per_second: DownUpOrder<u64>,
pub(crate) packets_per_second: DownUpOrder<u64>,
pub(crate) tc_handle: TcHandle,
pub(crate) recent_rtt_data: [RttData; 60],
pub(crate) last_fresh_rtt_data_cycle: u64,
@ -29,7 +30,7 @@ impl ThroughputEntry {
/// less than 1 Mb of data---they are usually long-polling.
pub(crate) fn median_latency(&self) -> Option<f32> {
// Reject sub 1Mb flows
if self.bytes.0 < 1_000_000 || self.bytes.1 < 1_000_000 {
if self.bytes.both_less_than(1_000_000) {
return None;
}

View File

@ -6,7 +6,7 @@ use fxhash::FxHashMap;
use lqos_bus::TcHandle;
use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use lqos_utils::units::AtomicDownUp;
use lqos_utils::units::{AtomicDownUp, DownUpOrder};
pub struct ThroughputTracker {
pub(crate) cycle: AtomicU64,
@ -35,14 +35,8 @@ impl ThroughputTracker {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
self.raw_data.iter_mut().for_each(|mut v| {
if v.first_cycle < self_cycle {
v.bytes_per_second.0 =
u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
v.bytes_per_second.1 =
u64::checked_sub(v.bytes.1, v.prev_bytes.1).unwrap_or(0);
v.packets_per_second.0 =
u64::checked_sub(v.packets.0, v.prev_packets.0).unwrap_or(0);
v.packets_per_second.1 =
u64::checked_sub(v.packets.1, v.prev_packets.1).unwrap_or(0);
v.bytes_per_second = v.bytes.checked_sub_or_zero(v.prev_bytes);
v.packets_per_second = v.packets.checked_sub_or_zero(v.prev_packets);
}
v.prev_bytes = v.bytes;
v.prev_packets = v.packets;
@ -110,13 +104,11 @@ impl ThroughputTracker {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
throughput_for_each(&mut |xdp_ip, counts| {
if let Some(mut entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = (0, 0);
entry.packets = (0, 0);
entry.bytes = DownUpOrder::zeroed();
entry.packets = DownUpOrder::zeroed();
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
entry.bytes.checked_add_direct(c.download_bytes, c.upload_bytes);
entry.packets.checked_add_direct(c.download_packets, c.upload_packets);
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
@ -132,8 +124,8 @@ impl ThroughputTracker {
net_json.add_throughput_cycle(
parents,
(
entry.bytes.0.saturating_sub(entry.prev_bytes.0),
entry.bytes.1.saturating_sub(entry.prev_bytes.1),
entry.bytes.down.saturating_sub(entry.prev_bytes.down),
entry.bytes.up.saturating_sub(entry.prev_bytes.up),
),
);
}
@ -145,12 +137,12 @@ impl ThroughputTracker {
network_json_parents: Self::lookup_network_parents(circuit_id),
first_cycle: self_cycle,
most_recent_cycle: 0,
bytes: (0, 0),
packets: (0, 0),
prev_bytes: (0, 0),
prev_packets: (0, 0),
bytes_per_second: (0, 0),
packets_per_second: (0, 0),
bytes: DownUpOrder::zeroed(),
packets: DownUpOrder::zeroed(),
prev_bytes: DownUpOrder::zeroed(),
prev_packets: DownUpOrder::zeroed(),
bytes_per_second: DownUpOrder::zeroed(),
packets_per_second: DownUpOrder::zeroed(),
tc_handle: TcHandle::zero(),
recent_rtt_data: [RttData::from_nanos(0); 60],
last_fresh_rtt_data_cycle: 0,
@ -159,10 +151,8 @@ impl ThroughputTracker {
last_tcp_retransmits: (0, 0),
};
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
entry.bytes.checked_add_direct(c.download_bytes, c.upload_bytes);
entry.packets.checked_add_direct(c.download_packets, c.upload_packets);
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
@ -275,7 +265,7 @@ impl ThroughputTracker {
let median = rtts[rtts.len() / 2];
if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) {
// Only apply if the flow has achieved 1 Mbps or more
if tracker.bytes_per_second.0 + tracker.bytes_per_second.1 > 125000 {
if tracker.bytes_per_second.sum_exceeds(125_000) {
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
@ -345,10 +335,10 @@ impl ThroughputTracker {
)
.map(|v| {
(
v.bytes.0.saturating_sub(v.prev_bytes.0),
v.bytes.1.saturating_sub(v.prev_bytes.1),
v.packets.0.saturating_sub(v.prev_packets.0),
v.packets.1.saturating_sub(v.prev_packets.1),
v.bytes.down.saturating_sub(v.prev_bytes.down),
v.bytes.up.saturating_sub(v.prev_bytes.up),
v.packets.down.saturating_sub(v.prev_packets.down),
v.packets.up.saturating_sub(v.prev_packets.up),
v.tc_handle.as_u32() > 0,
)
})