diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index 8ff7eaea..2c0e6fc7 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -5,6 +5,7 @@ use crate::{ use lts_client::transport_data::{StatsTotals, StatsHost, StatsTreeNode}; use serde::{Deserialize, Serialize}; use std::net::IpAddr; +use lqos_utils::units::DownUpOrder; /// A `BusResponse` object represents a single /// reply generated from a `BusRequest`, and batched @@ -132,7 +133,7 @@ pub enum BusResponse { FlowsByIp(Vec), /// Current endpoints by country - CurrentEndpointsByCountry(Vec<(String, [u64; 2], [f32; 2])>), + CurrentEndpointsByCountry(Vec<(String, DownUpOrder, [f32; 2])>), /// Current Lat/Lon of endpoints CurrentLatLon(Vec<(f64, f64, String, u64, f32)>), diff --git a/src/rust/lqos_bus/src/ip_stats.rs b/src/rust/lqos_bus/src/ip_stats.rs index 665068ff..81a65180 100644 --- a/src/rust/lqos_bus/src/ip_stats.rs +++ b/src/rust/lqos_bus/src/ip_stats.rs @@ -1,5 +1,6 @@ use crate::TcHandle; use serde::{Deserialize, Serialize}; +use lqos_utils::units::DownUpOrder; /// Transmission representation of IP statistics associated /// with a host. @@ -11,13 +12,12 @@ pub struct IpStats { /// The host's mapped circuit ID pub circuit_id: String, - /// The current bits-per-second passing through this host. Tuple - /// 0 is download, tuple 1 is upload. - pub bits_per_second: (u64, u64), + /// The current bits-per-second passing through this host. + pub bits_per_second: DownUpOrder, /// The current packets-per-second passing through this host. Tuple /// 0 is download, tuple 1 is upload. - pub packets_per_second: (u64, u64), + pub packets_per_second: DownUpOrder, /// Median TCP round-trip-time for this host at the current time. pub median_tcp_rtt: f32, @@ -26,7 +26,7 @@ pub struct IpStats { pub tc_handle: TcHandle, /// TCP Retransmits for this host at the current time. - pub tcp_retransmits: (u64, u64), + pub tcp_retransmits: DownUpOrder, } /// Represents an IP Mapping in the XDP IP to TC/CPU mapping system. @@ -153,13 +153,13 @@ pub struct FlowbeeSummaryData { /// Time (nanos) when the connection was last seen pub last_seen: u64, /// Bytes transmitted - pub bytes_sent: [u64; 2], + pub bytes_sent: DownUpOrder, /// Packets transmitted - pub packets_sent: [u64; 2], + pub packets_sent: DownUpOrder, /// Rate estimate - pub rate_estimate_bps: [u32; 2], + pub rate_estimate_bps: DownUpOrder, /// TCP Retransmission count (also counts duplicates) - pub tcp_retransmits: [u16; 2], + pub tcp_retransmits: DownUpOrder, /// Has the connection ended? /// 0 = Alive, 1 = FIN, 2 = RST pub end_status: u8, @@ -168,7 +168,7 @@ pub struct FlowbeeSummaryData { /// Raw TCP flags pub flags: u8, /// Recent RTT median - pub rtt_nanos: [u64; 2], + pub rtt_nanos: DownUpOrder, /// Remote ASN pub remote_asn: u32, /// Remote ASN Name diff --git a/src/rust/lqos_node_manager/src/flow_monitor.rs b/src/rust/lqos_node_manager/src/flow_monitor.rs index 08588ff6..4e949d85 100644 --- a/src/rust/lqos_node_manager/src/flow_monitor.rs +++ b/src/rust/lqos_node_manager/src/flow_monitor.rs @@ -55,6 +55,10 @@ pub async fn flows_by_country() -> NoCache country_summary.to_owned(), _ => Vec::new(), }; + let result = result + .into_iter() + .map(|(name, bytes, rtt)| (name, [bytes.down, bytes.up], rtt) ) + .collect(); NoCache::new(Json(result)) } diff --git a/src/rust/lqos_node_manager/src/tracker/mod.rs b/src/rust/lqos_node_manager/src/tracker/mod.rs index 7d6994a2..8f047889 100644 --- a/src/rust/lqos_node_manager/src/tracker/mod.rs +++ b/src/rust/lqos_node_manager/src/tracker/mod.rs @@ -29,13 +29,13 @@ impl From<&IpStats> for IpStatsWithPlan { fn from(i: &IpStats) -> Self { let mut result = Self { ip_address: i.ip_address.clone(), - bits_per_second: i.bits_per_second, - packets_per_second: i.packets_per_second, + bits_per_second: (i.bits_per_second.down, i.bits_per_second.up), + packets_per_second: (i.packets_per_second.down, i.packets_per_second.up), median_tcp_rtt: i.median_tcp_rtt, tc_handle: i.tc_handle, circuit_id: i.circuit_id.clone(), plan: (0, 0), - tcp_retransmits: i.tcp_retransmits, + tcp_retransmits: (i.tcp_retransmits.down, i.tcp_retransmits.up), }; if !result.circuit_id.is_empty() { diff --git a/src/rust/lqos_node_manager/src/unknown_devices.rs b/src/rust/lqos_node_manager/src/unknown_devices.rs index 06cef5cf..c75e7f3a 100644 --- a/src/rust/lqos_node_manager/src/unknown_devices.rs +++ b/src/rust/lqos_node_manager/src/unknown_devices.rs @@ -62,7 +62,7 @@ pub async fn unknown_devices_csv(_auth: AuthGuard) -> NoCache { let reader = unknown_devices().await; for unknown in reader.iter() { - result += &format!("{},{},{}\n", unknown.ip_address, unknown.bits_per_second.0, unknown.bits_per_second.1); + result += &format!("{},{},{}\n", unknown.ip_address, unknown.bits_per_second.down, unknown.bits_per_second.up); } NoCache::new(result) } diff --git a/src/rust/lqos_sys/src/flowbee_data.rs b/src/rust/lqos_sys/src/flowbee_data.rs index b04fae24..6abb145a 100644 --- a/src/rust/lqos_sys/src/flowbee_data.rs +++ b/src/rust/lqos_sys/src/flowbee_data.rs @@ -2,6 +2,7 @@ use lqos_utils::XdpIpAddress; use zerocopy::FromBytes; +use lqos_utils::units::DownUpOrder; /// Representation of the eBPF `flow_key_t` type. #[derive(Debug, Clone, Default, PartialEq, Eq, Hash, FromBytes)] @@ -32,29 +33,29 @@ pub struct FlowbeeData { /// Time (nanos) when the connection was last seen pub last_seen: u64, /// Bytes transmitted - pub bytes_sent: [u64; 2], + pub bytes_sent: DownUpOrder, /// Packets transmitted - pub packets_sent: [u64; 2], + pub packets_sent: DownUpOrder, /// Clock for the next rate estimate - pub next_count_time: [u64; 2], + pub next_count_time: DownUpOrder, /// Clock for the previous rate estimate - pub last_count_time: [u64; 2], + pub last_count_time: DownUpOrder, /// Bytes at the next rate estimate - pub next_count_bytes: [u64; 2], + pub next_count_bytes: DownUpOrder, /// Rate estimate - pub rate_estimate_bps: [u32; 2], + pub rate_estimate_bps: DownUpOrder, /// Sequence number of the last packet - pub last_sequence: [u32; 2], + pub last_sequence: DownUpOrder, /// Acknowledgement number of the last packet - pub last_ack: [u32; 2], + pub last_ack: DownUpOrder, /// TCP Retransmission count (also counts duplicates) - pub tcp_retransmits: [u16; 2], + pub tcp_retransmits: DownUpOrder, /// Timestamp values - pub tsval: [u32; 2], + pub tsval: DownUpOrder, /// Timestamp echo values - pub tsecr: [u32; 2], + pub tsecr: DownUpOrder, /// When did the timestamp change? - pub ts_change_time: [u64; 2], + pub ts_change_time: DownUpOrder, /// Has the connection ended? /// 0 = Alive, 1 = FIN, 2 = RST pub end_status: u8, diff --git a/src/rust/lqos_utils/src/units/down_up.rs b/src/rust/lqos_utils/src/units/down_up.rs index 94eb9197..4dbc3a01 100644 --- a/src/rust/lqos_utils/src/units/down_up.rs +++ b/src/rust/lqos_utils/src/units/down_up.rs @@ -1,9 +1,11 @@ +use serde::{Deserialize, Serialize}; +use zerocopy::FromBytes; use crate::units::UpDownOrder; /// Provides strong download/upload separation for /// stored statistics to eliminate confusion. #[repr(C)] -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default)] pub struct DownUpOrder { pub down: T, pub up: T, @@ -11,11 +13,20 @@ pub struct DownUpOrder { impl DownUpOrder where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub - + num_traits::CheckedAdd + num_traits::SaturatingSub + + num_traits::CheckedAdd + num_traits::SaturatingSub + num_traits::SaturatingMul + + num_traits::FromPrimitive { pub fn new(down: T, up: T) -> Self { Self { down, up } } + + pub fn dir(&self, direction: usize) -> T { + if direction == 0 { + self.down + } else { + self.up + } + } pub fn zeroed() -> Self { Self { down: T::zero(), up: T::zero() } @@ -24,7 +35,7 @@ where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub pub fn both_less_than(&self, limit: T) -> bool { self.down < limit && self.up < limit } - + pub fn sum_exceeds(&self, limit: T) -> bool { self.down + self.up > limit } @@ -39,15 +50,22 @@ where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub self.down = self.down.checked_add(&rhs.down).unwrap_or(T::zero()); self.up = self.up.checked_add(&rhs.up).unwrap_or(T::zero()); } - + pub fn checked_add_direct(&mut self, down: T, up: T) { self.down = self.down.checked_add(&down).unwrap_or(T::zero()); self.up = self.up.checked_add(&up).unwrap_or(T::zero()); } - + pub fn sum(&self) -> T { self.down + self.up } + + pub fn to_bits_from_bytes(&self) -> DownUpOrder { + DownUpOrder { + down: self.down.saturating_mul(&T::from_u32(8).unwrap()), + up: self.up.saturating_mul(&T::from_u32(8).unwrap()), + } + } } impl Into> for DownUpOrder { diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/ipstats_conversion.rs b/src/rust/lqosd/src/node_manager/ws/ticker/ipstats_conversion.rs index 64ff3561..cace374d 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker/ipstats_conversion.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker/ipstats_conversion.rs @@ -1,17 +1,18 @@ use serde::{Deserialize, Serialize}; use lqos_bus::{IpStats, TcHandle}; +use lqos_utils::units::DownUpOrder; use crate::shaped_devices_tracker::SHAPED_DEVICES; #[derive(Serialize, Deserialize, Clone, Debug)] pub struct IpStatsWithPlan { pub ip_address: String, - pub bits_per_second: (u64, u64), - pub packets_per_second: (u64, u64), + pub bits_per_second: DownUpOrder, + pub packets_per_second: DownUpOrder, pub median_tcp_rtt: f32, pub tc_handle: TcHandle, pub circuit_id: String, - pub plan: (u32, u32), - pub tcp_retransmits: (u64, u64), + pub plan: DownUpOrder, + pub tcp_retransmits: DownUpOrder, } impl From<&IpStats> for IpStatsWithPlan { @@ -23,7 +24,7 @@ impl From<&IpStats> for IpStatsWithPlan { median_tcp_rtt: i.median_tcp_rtt, tc_handle: i.tc_handle, circuit_id: i.circuit_id.clone(), - plan: (0, 0), + plan: DownUpOrder::zeroed(), tcp_retransmits: i.tcp_retransmits, }; @@ -41,7 +42,7 @@ impl From<&IpStats> for IpStatsWithPlan { &circuit.circuit_name }; result.ip_address = format!("{} ({})", name, result.ip_address); - result.plan = (circuit.download_max_mbps, circuit.upload_max_mbps); + result.plan = DownUpOrder::new(circuit.download_max_mbps, circuit.upload_max_mbps); } } diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs index 6dab3dda..ca04a96b 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs @@ -5,6 +5,7 @@ use lqos_bus::BusResponse; use lqos_sys::flowbee_data::FlowbeeKey; use once_cell::sync::Lazy; use std::sync::{Arc, Mutex}; +use lqos_utils::units::DownUpOrder; pub struct TimeBuffer { buffer: Mutex>, @@ -44,7 +45,7 @@ impl TimeBuffer { let (key, data, _analysis) = &v.data; let (lat, lon) = get_asn_lat_lon(key.remote_ip.as_ip()); let (_name, country) = get_asn_name_and_country(key.remote_ip.as_ip()); - (lat, lon, country, data.bytes_sent[1], data.rtt[1].as_nanos() as f32) + (lat, lon, country, data.bytes_sent.up, data.rtt[1].as_nanos() as f32) }) .filter(|(lat, lon, ..)| *lat != 0.0 && *lon != 0.0) .collect::>(); @@ -58,7 +59,7 @@ impl TimeBuffer { my_buffer } - pub fn country_summary(&self) -> Vec<(String, [u64; 2], [f32; 2])> { + pub fn country_summary(&self) -> Vec<(String, DownUpOrder, [f32; 2])> { let buffer = self.buffer.lock().unwrap(); let mut my_buffer = buffer .iter() @@ -71,7 +72,7 @@ impl TimeBuffer { ]; (country, data.bytes_sent, rtt) }) - .collect::>(); + .collect::, [f32; 2])>>(); // Sort by country my_buffer.sort_by(|a, b| a.0.cmp(&b.0)); @@ -79,7 +80,7 @@ impl TimeBuffer { // Summarize by country let mut country_summary = Vec::new(); let mut last_country = String::new(); - let mut total_bytes = [0, 0]; + let mut total_bytes = DownUpOrder::zeroed(); let mut total_rtt = [0.0f64, 0.0f64]; let mut rtt_count = [0, 0]; for (country, bytes, rtt) in my_buffer { @@ -102,12 +103,11 @@ impl TimeBuffer { } last_country = country.to_string(); - total_bytes = [0, 0]; + total_bytes = DownUpOrder::zeroed(); total_rtt = [0.0, 0.0]; rtt_count = [0, 0]; } - total_bytes[0] += bytes[0]; - total_bytes[1] += bytes[1]; + total_bytes.checked_add(bytes); if rtt[0] > 0.0 { total_rtt[0] += rtt[0] as f64; rtt_count[0] += 1; @@ -135,7 +135,7 @@ impl TimeBuffer { country_summary.push((last_country, total_bytes, rtt)); // Sort by bytes downloaded descending - country_summary.sort_by(|a, b| b.1[1].cmp(&a.1[1])); + country_summary.sort_by(|a, b| b.1.up.cmp(&a.1.up)); country_summary } @@ -170,10 +170,10 @@ impl TimeBuffer { let (key, data, _analysis) = &v.data; if key.local_ip.is_v4() { // It's V4 - v4_bytes_sent[0] += data.bytes_sent[0]; - v4_bytes_sent[1] += data.bytes_sent[1]; - v4_packets_sent[0] += data.packets_sent[0]; - v4_packets_sent[1] += data.packets_sent[1]; + v4_bytes_sent[0] += data.bytes_sent.down; + v4_bytes_sent[1] += data.bytes_sent.up; + v4_packets_sent[0] += data.packets_sent.down; + v4_packets_sent[1] += data.packets_sent.up; if data.rtt[0].as_nanos() > 0 { v4_rtt[0].push(data.rtt[0].as_nanos()); } @@ -182,10 +182,10 @@ impl TimeBuffer { } } else { // It's V6 - v6_bytes_sent[0] += data.bytes_sent[0]; - v6_bytes_sent[1] += data.bytes_sent[1]; - v6_packets_sent[0] += data.packets_sent[0]; - v6_packets_sent[1] += data.packets_sent[1]; + v6_bytes_sent[0] += data.bytes_sent.down; + v6_bytes_sent[1] += data.bytes_sent.up; + v6_packets_sent[0] += data.packets_sent.down; + v6_packets_sent[1] += data.packets_sent.up; if data.rtt[0].as_nanos() > 0 { v6_rtt[0].push(data.rtt[0].as_nanos()); } @@ -226,8 +226,8 @@ impl TimeBuffer { let (_key, data, analysis) = &v.data; let proto = analysis.protocol_analysis.to_string(); let entry = results.entry(proto).or_insert((0, 0)); - entry.0 += data.bytes_sent[0]; - entry.1 += data.bytes_sent[1]; + entry.0 += data.bytes_sent.down; + entry.1 += data.bytes_sent.up; }); let mut results = results.into_iter().collect::>(); diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs index 4d9b2f54..6b0ccb34 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs @@ -5,6 +5,7 @@ use super::{flow_analysis::FlowAnalysis, RttData}; use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; use once_cell::sync::Lazy; use std::{collections::HashMap, sync::Mutex}; +use lqos_utils::units::DownUpOrder; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct AsnId(pub u32); @@ -23,13 +24,13 @@ pub struct FlowbeeLocalData { /// Time (nanos) when the connection was last seen pub last_seen: u64, /// Bytes transmitted - pub bytes_sent: [u64; 2], + pub bytes_sent: DownUpOrder, /// Packets transmitted - pub packets_sent: [u64; 2], + pub packets_sent: DownUpOrder, /// Rate estimate - pub rate_estimate_bps: [u32; 2], + pub rate_estimate_bps: DownUpOrder, /// TCP Retransmission count (also counts duplicates) - pub tcp_retransmits: [u16; 2], + pub tcp_retransmits: DownUpOrder, /// Has the connection ended? /// 0 = Alive, 1 = FIN, 2 = RST pub end_status: u8, diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs index f2567c75..9dd8f38a 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs @@ -74,10 +74,10 @@ pub(crate) fn to_netflow_5(key: &FlowbeeKey, data: &FlowbeeLocalData) -> anyhow: let src_ip = u32::from_ne_bytes(local.octets()); let dst_ip = u32::from_ne_bytes(remote.octets()); // Convert d_pkts to network order - let d_pkts = (data.packets_sent[0] as u32).to_be(); - let d_octets = (data.bytes_sent[0] as u32).to_be(); - let d_pkts2 = (data.packets_sent[1] as u32).to_be(); - let d_octets2 = (data.bytes_sent[1] as u32).to_be(); + let d_pkts = (data.packets_sent.down as u32).to_be(); + let d_octets = (data.bytes_sent.down as u32).to_be(); + let d_pkts2 = (data.packets_sent.up as u32).to_be(); + let d_octets2 = (data.bytes_sent.up as u32).to_be(); let record = Netflow5Record { src_addr: src_ip, diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs index c248a622..f5793fad 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs @@ -11,8 +11,8 @@ pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: us let mut result = Vec::with_capacity(total_size as usize); for (field_type, field_length) in template.iter() { match (*field_type, *field_length) { - IN_BYTES => encode_u64(data.bytes_sent[direction], &mut result), - IN_PKTS => encode_u64(data.packets_sent[direction], &mut result), + IN_BYTES => encode_u64(data.bytes_sent.dir(direction), &mut result), + IN_PKTS => encode_u64(data.packets_sent.dir(direction), &mut result), PROTOCOL => result.push(key.ip_protocol), L4_SRC_PORT => encode_u16(src_port, &mut result), L4_DST_PORT => encode_u16(dst_port, &mut result), diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 3a265769..c33e6adb 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -240,11 +240,11 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes.down * 8, bytes.up * 8), - packets_per_second: (packets.down, packets.up), + bits_per_second: bytes.to_bits_from_bytes(), + packets_per_second: *packets, median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, - tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up), + tcp_retransmits: *tcp_retransmits, }, ) .collect(); @@ -292,11 +292,11 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes.down * 8, bytes.up * 8), - packets_per_second: (packets.down, packets.up), + bits_per_second: bytes.to_bits_from_bytes(), + packets_per_second: *packets, median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, - tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up), + tcp_retransmits: *tcp_retransmits, }, ) .collect(); @@ -348,11 +348,11 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes.down * 8, bytes.up * 8), - packets_per_second: (packets.down, packets.up), + bits_per_second: bytes.to_bits_from_bytes(), + packets_per_second: *packets, median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, - tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up), + tcp_retransmits: *tcp_retransmits, }, ) .collect(); @@ -401,11 +401,11 @@ pub fn best_n(start: u32, end: u32) -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: circuit_id.clone(), - bits_per_second: (bytes.down * 8, bytes.up * 8), - packets_per_second: (packets.down, packets.up), + bits_per_second: bytes.to_bits_from_bytes(), + packets_per_second: *packets, median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, - tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up), + tcp_retransmits: *tcp_retransmits, }, ) .collect(); @@ -550,11 +550,11 @@ pub fn all_unknown_ips() -> BusResponse { )| IpStats { ip_address: ip.as_ip().to_string(), circuit_id: String::new(), - bits_per_second: (bytes.down * 8, bytes.up * 8), - packets_per_second: (packets.down, packets.up), + bits_per_second: bytes.to_bits_from_bytes(), + packets_per_second: *packets, median_tcp_rtt: *median_rtt, tc_handle: *tc_handle, - tcp_retransmits: (0, 0), + tcp_retransmits: DownUpOrder::zeroed(), }, ) .collect(); @@ -589,7 +589,7 @@ pub fn dump_active_flows() -> BusResponse { analysis: row.1.protocol_analysis.to_string(), last_seen: row.0.last_seen, start_time: row.0.start_time, - rtt_nanos: [row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()], + rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()), } }) .collect(); @@ -615,29 +615,29 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { match flow_type { TopFlowType::RateEstimate => { table.sort_by(|a, b| { - let a_total = a.1 .0.rate_estimate_bps[0] + a.1 .0.rate_estimate_bps[1]; - let b_total = b.1 .0.rate_estimate_bps[0] + b.1 .0.rate_estimate_bps[1]; + let a_total = a.1 .0.rate_estimate_bps.sum(); + let b_total = b.1 .0.rate_estimate_bps.sum(); b_total.cmp(&a_total) }); } TopFlowType::Bytes => { table.sort_by(|a, b| { - let a_total = a.1 .0.bytes_sent[0] + a.1 .0.bytes_sent[1]; - let b_total = b.1 .0.bytes_sent[0] + b.1 .0.bytes_sent[1]; + let a_total = a.1 .0.bytes_sent.sum(); + let b_total = b.1 .0.bytes_sent.sum(); b_total.cmp(&a_total) }); } TopFlowType::Packets => { table.sort_by(|a, b| { - let a_total = a.1 .0.packets_sent[0] + a.1 .0.packets_sent[1]; - let b_total = b.1 .0.packets_sent[0] + b.1 .0.packets_sent[1]; + let a_total = a.1 .0.packets_sent.sum(); + let b_total = b.1 .0.packets_sent.sum(); b_total.cmp(&a_total) }); } TopFlowType::Drops => { table.sort_by(|a, b| { - let a_total = a.1 .0.tcp_retransmits[0] + a.1 .0.tcp_retransmits[1]; - let b_total = b.1 .0.tcp_retransmits[0] + b.1 .0.tcp_retransmits[1]; + let a_total = a.1 .0.tcp_retransmits.sum(); + let b_total = b.1 .0.tcp_retransmits.sum(); b_total.cmp(&a_total) }); } @@ -675,7 +675,7 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { analysis: flow.1.protocol_analysis.to_string(), last_seen: flow.0.last_seen, start_time: flow.0.start_time, - rtt_nanos: [flow.0.rtt[0].as_nanos(), flow.0.rtt[1].as_nanos()], + rtt_nanos: DownUpOrder::new(flow.0.rtt[0].as_nanos(), flow.0.rtt[1].as_nanos()), } }) .collect(); @@ -714,7 +714,7 @@ pub fn flows_by_ip(ip: &str) -> BusResponse { analysis: row.1.protocol_analysis.to_string(), last_seen: row.0.last_seen, start_time: row.0.start_time, - rtt_nanos: [row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()], + rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()), } }) .collect(); diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index e396c806..fd357cad 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -183,7 +183,7 @@ impl ThroughputTracker { let mut rtt_circuit_tracker: FxHashMap; 2]> = FxHashMap::default(); // Tracker for TCP retries. We're storing these per second. - let mut tcp_retries: FxHashMap = FxHashMap::default(); + let mut tcp_retries: FxHashMap> = FxHashMap::default(); // Track the expired keys let mut expired_keys = Vec::new(); @@ -242,10 +242,12 @@ impl ThroughputTracker { // TCP Retries if let Some(retries) = tcp_retries.get_mut(&key.local_ip) { - retries[0] += data.tcp_retransmits[0] as u64; - retries[1] += data.tcp_retransmits[1] as u64; + retries.down += data.tcp_retransmits.down as u64; + retries.up += data.tcp_retransmits.up as u64; } else { - tcp_retries.insert(key.local_ip, [data.tcp_retransmits[0] as u64, data.tcp_retransmits[1] as u64]); + tcp_retries.insert(key.local_ip, + DownUpOrder::new(data.tcp_retransmits.down as u64, data.tcp_retransmits.up as u64) + ); } if data.end_status != 0 { @@ -291,10 +293,10 @@ impl ThroughputTracker { // Apply the new ones for (local_ip, retries) in tcp_retries { if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) { - tracker.tcp_retransmits.down = retries[0].saturating_sub(tracker.last_tcp_retransmits.down); - tracker.tcp_retransmits.up = retries[1].saturating_sub(tracker.last_tcp_retransmits.up); - tracker.last_tcp_retransmits.down = retries[0]; - tracker.last_tcp_retransmits.up = retries[1]; + tracker.tcp_retransmits.down = retries.down.saturating_sub(tracker.last_tcp_retransmits.down); + tracker.tcp_retransmits.up = retries.up.saturating_sub(tracker.last_tcp_retransmits.up); + tracker.last_tcp_retransmits.down = retries.down; + tracker.last_tcp_retransmits.up = retries.up; } }