Change much of the bus API to require DownUp types. Adjust lqtop and the (to be replaced) node manager.

This commit is contained in:
Herbert Wolverson 2024-07-02 11:44:47 -05:00
parent ff97674885
commit d07bfcf0d6
14 changed files with 128 additions and 100 deletions

View File

@ -5,6 +5,7 @@ use crate::{
use lts_client::transport_data::{StatsTotals, StatsHost, StatsTreeNode};
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use lqos_utils::units::DownUpOrder;
/// A `BusResponse` object represents a single
/// reply generated from a `BusRequest`, and batched
@ -132,7 +133,7 @@ pub enum BusResponse {
FlowsByIp(Vec<FlowbeeSummaryData>),
/// Current endpoints by country
CurrentEndpointsByCountry(Vec<(String, [u64; 2], [f32; 2])>),
CurrentEndpointsByCountry(Vec<(String, DownUpOrder<u64>, [f32; 2])>),
/// Current Lat/Lon of endpoints
CurrentLatLon(Vec<(f64, f64, String, u64, f32)>),

View File

@ -1,5 +1,6 @@
use crate::TcHandle;
use serde::{Deserialize, Serialize};
use lqos_utils::units::DownUpOrder;
/// Transmission representation of IP statistics associated
/// with a host.
@ -11,13 +12,12 @@ pub struct IpStats {
/// The host's mapped circuit ID
pub circuit_id: String,
/// The current bits-per-second passing through this host. Tuple
/// 0 is download, tuple 1 is upload.
pub bits_per_second: (u64, u64),
/// The current bits-per-second passing through this host.
pub bits_per_second: DownUpOrder<u64>,
/// The current packets-per-second passing through this host. Tuple
/// 0 is download, tuple 1 is upload.
pub packets_per_second: (u64, u64),
pub packets_per_second: DownUpOrder<u64>,
/// Median TCP round-trip-time for this host at the current time.
pub median_tcp_rtt: f32,
@ -26,7 +26,7 @@ pub struct IpStats {
pub tc_handle: TcHandle,
/// TCP Retransmits for this host at the current time.
pub tcp_retransmits: (u64, u64),
pub tcp_retransmits: DownUpOrder<u64>,
}
/// Represents an IP Mapping in the XDP IP to TC/CPU mapping system.
@ -153,13 +153,13 @@ pub struct FlowbeeSummaryData {
/// Time (nanos) when the connection was last seen
pub last_seen: u64,
/// Bytes transmitted
pub bytes_sent: [u64; 2],
pub bytes_sent: DownUpOrder<u64>,
/// Packets transmitted
pub packets_sent: [u64; 2],
pub packets_sent: DownUpOrder<u64>,
/// Rate estimate
pub rate_estimate_bps: [u32; 2],
pub rate_estimate_bps: DownUpOrder<u32>,
/// TCP Retransmission count (also counts duplicates)
pub tcp_retransmits: [u16; 2],
pub tcp_retransmits: DownUpOrder<u16>,
/// Has the connection ended?
/// 0 = Alive, 1 = FIN, 2 = RST
pub end_status: u8,
@ -168,7 +168,7 @@ pub struct FlowbeeSummaryData {
/// Raw TCP flags
pub flags: u8,
/// Recent RTT median
pub rtt_nanos: [u64; 2],
pub rtt_nanos: DownUpOrder<u64>,
/// Remote ASN
pub remote_asn: u32,
/// Remote ASN Name

View File

@ -55,6 +55,10 @@ pub async fn flows_by_country() -> NoCache<Json<Vec<(String, [u64; 2], [f32; 2])
BusResponse::CurrentEndpointsByCountry(country_summary) => country_summary.to_owned(),
_ => Vec::new(),
};
let result = result
.into_iter()
.map(|(name, bytes, rtt)| (name, [bytes.down, bytes.up], rtt) )
.collect();
NoCache::new(Json(result))
}

View File

@ -29,13 +29,13 @@ impl From<&IpStats> for IpStatsWithPlan {
fn from(i: &IpStats) -> Self {
let mut result = Self {
ip_address: i.ip_address.clone(),
bits_per_second: i.bits_per_second,
packets_per_second: i.packets_per_second,
bits_per_second: (i.bits_per_second.down, i.bits_per_second.up),
packets_per_second: (i.packets_per_second.down, i.packets_per_second.up),
median_tcp_rtt: i.median_tcp_rtt,
tc_handle: i.tc_handle,
circuit_id: i.circuit_id.clone(),
plan: (0, 0),
tcp_retransmits: i.tcp_retransmits,
tcp_retransmits: (i.tcp_retransmits.down, i.tcp_retransmits.up),
};
if !result.circuit_id.is_empty() {

View File

@ -62,7 +62,7 @@ pub async fn unknown_devices_csv(_auth: AuthGuard) -> NoCache<String> {
let reader = unknown_devices().await;
for unknown in reader.iter() {
result += &format!("{},{},{}\n", unknown.ip_address, unknown.bits_per_second.0, unknown.bits_per_second.1);
result += &format!("{},{},{}\n", unknown.ip_address, unknown.bits_per_second.down, unknown.bits_per_second.up);
}
NoCache::new(result)
}

View File

@ -2,6 +2,7 @@
use lqos_utils::XdpIpAddress;
use zerocopy::FromBytes;
use lqos_utils::units::DownUpOrder;
/// Representation of the eBPF `flow_key_t` type.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, FromBytes)]
@ -32,29 +33,29 @@ pub struct FlowbeeData {
/// Time (nanos) when the connection was last seen
pub last_seen: u64,
/// Bytes transmitted
pub bytes_sent: [u64; 2],
pub bytes_sent: DownUpOrder<u64>,
/// Packets transmitted
pub packets_sent: [u64; 2],
pub packets_sent: DownUpOrder<u64>,
/// Clock for the next rate estimate
pub next_count_time: [u64; 2],
pub next_count_time: DownUpOrder<u64>,
/// Clock for the previous rate estimate
pub last_count_time: [u64; 2],
pub last_count_time: DownUpOrder<u64>,
/// Bytes at the next rate estimate
pub next_count_bytes: [u64; 2],
pub next_count_bytes: DownUpOrder<u64>,
/// Rate estimate
pub rate_estimate_bps: [u32; 2],
pub rate_estimate_bps: DownUpOrder<u32>,
/// Sequence number of the last packet
pub last_sequence: [u32; 2],
pub last_sequence: DownUpOrder<u32>,
/// Acknowledgement number of the last packet
pub last_ack: [u32; 2],
pub last_ack: DownUpOrder<u32>,
/// TCP Retransmission count (also counts duplicates)
pub tcp_retransmits: [u16; 2],
pub tcp_retransmits: DownUpOrder<u16>,
/// Timestamp values
pub tsval: [u32; 2],
pub tsval: DownUpOrder<u32>,
/// Timestamp echo values
pub tsecr: [u32; 2],
pub tsecr: DownUpOrder<u32>,
/// When did the timestamp change?
pub ts_change_time: [u64; 2],
pub ts_change_time: DownUpOrder<u64>,
/// Has the connection ended?
/// 0 = Alive, 1 = FIN, 2 = RST
pub end_status: u8,

View File

@ -1,9 +1,11 @@
use serde::{Deserialize, Serialize};
use zerocopy::FromBytes;
use crate::units::UpDownOrder;
/// Provides strong download/upload separation for
/// stored statistics to eliminate confusion.
#[repr(C)]
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default)]
pub struct DownUpOrder<T> {
pub down: T,
pub up: T,
@ -11,12 +13,21 @@ pub struct DownUpOrder<T> {
impl <T> DownUpOrder<T>
where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub
+ num_traits::CheckedAdd + num_traits::SaturatingSub
+ num_traits::CheckedAdd + num_traits::SaturatingSub + num_traits::SaturatingMul
+ num_traits::FromPrimitive
{
pub fn new(down: T, up: T) -> Self {
Self { down, up }
}
pub fn dir(&self, direction: usize) -> T {
if direction == 0 {
self.down
} else {
self.up
}
}
pub fn zeroed() -> Self {
Self { down: T::zero(), up: T::zero() }
}
@ -48,6 +59,13 @@ where T: std::cmp::Ord + num_traits::Zero + Copy + num_traits::CheckedSub
pub fn sum(&self) -> T {
self.down + self.up
}
pub fn to_bits_from_bytes(&self) -> DownUpOrder<T> {
DownUpOrder {
down: self.down.saturating_mul(&T::from_u32(8).unwrap()),
up: self.up.saturating_mul(&T::from_u32(8).unwrap()),
}
}
}
impl <T> Into<UpDownOrder<T>> for DownUpOrder<T> {

View File

@ -1,17 +1,18 @@
use serde::{Deserialize, Serialize};
use lqos_bus::{IpStats, TcHandle};
use lqos_utils::units::DownUpOrder;
use crate::shaped_devices_tracker::SHAPED_DEVICES;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct IpStatsWithPlan {
pub ip_address: String,
pub bits_per_second: (u64, u64),
pub packets_per_second: (u64, u64),
pub bits_per_second: DownUpOrder<u64>,
pub packets_per_second: DownUpOrder<u64>,
pub median_tcp_rtt: f32,
pub tc_handle: TcHandle,
pub circuit_id: String,
pub plan: (u32, u32),
pub tcp_retransmits: (u64, u64),
pub plan: DownUpOrder<u32>,
pub tcp_retransmits: DownUpOrder<u64>,
}
impl From<&IpStats> for IpStatsWithPlan {
@ -23,7 +24,7 @@ impl From<&IpStats> for IpStatsWithPlan {
median_tcp_rtt: i.median_tcp_rtt,
tc_handle: i.tc_handle,
circuit_id: i.circuit_id.clone(),
plan: (0, 0),
plan: DownUpOrder::zeroed(),
tcp_retransmits: i.tcp_retransmits,
};
@ -41,7 +42,7 @@ impl From<&IpStats> for IpStatsWithPlan {
&circuit.circuit_name
};
result.ip_address = format!("{} ({})", name, result.ip_address);
result.plan = (circuit.download_max_mbps, circuit.upload_max_mbps);
result.plan = DownUpOrder::new(circuit.download_max_mbps, circuit.upload_max_mbps);
}
}

View File

@ -5,6 +5,7 @@ use lqos_bus::BusResponse;
use lqos_sys::flowbee_data::FlowbeeKey;
use once_cell::sync::Lazy;
use std::sync::{Arc, Mutex};
use lqos_utils::units::DownUpOrder;
pub struct TimeBuffer {
buffer: Mutex<Vec<TimeEntry>>,
@ -44,7 +45,7 @@ impl TimeBuffer {
let (key, data, _analysis) = &v.data;
let (lat, lon) = get_asn_lat_lon(key.remote_ip.as_ip());
let (_name, country) = get_asn_name_and_country(key.remote_ip.as_ip());
(lat, lon, country, data.bytes_sent[1], data.rtt[1].as_nanos() as f32)
(lat, lon, country, data.bytes_sent.up, data.rtt[1].as_nanos() as f32)
})
.filter(|(lat, lon, ..)| *lat != 0.0 && *lon != 0.0)
.collect::<Vec<(f64, f64, String, u64, f32)>>();
@ -58,7 +59,7 @@ impl TimeBuffer {
my_buffer
}
pub fn country_summary(&self) -> Vec<(String, [u64; 2], [f32; 2])> {
pub fn country_summary(&self) -> Vec<(String, DownUpOrder<u64>, [f32; 2])> {
let buffer = self.buffer.lock().unwrap();
let mut my_buffer = buffer
.iter()
@ -71,7 +72,7 @@ impl TimeBuffer {
];
(country, data.bytes_sent, rtt)
})
.collect::<Vec<(String, [u64; 2], [f32; 2])>>();
.collect::<Vec<(String, DownUpOrder<u64>, [f32; 2])>>();
// Sort by country
my_buffer.sort_by(|a, b| a.0.cmp(&b.0));
@ -79,7 +80,7 @@ impl TimeBuffer {
// Summarize by country
let mut country_summary = Vec::new();
let mut last_country = String::new();
let mut total_bytes = [0, 0];
let mut total_bytes = DownUpOrder::zeroed();
let mut total_rtt = [0.0f64, 0.0f64];
let mut rtt_count = [0, 0];
for (country, bytes, rtt) in my_buffer {
@ -102,12 +103,11 @@ impl TimeBuffer {
}
last_country = country.to_string();
total_bytes = [0, 0];
total_bytes = DownUpOrder::zeroed();
total_rtt = [0.0, 0.0];
rtt_count = [0, 0];
}
total_bytes[0] += bytes[0];
total_bytes[1] += bytes[1];
total_bytes.checked_add(bytes);
if rtt[0] > 0.0 {
total_rtt[0] += rtt[0] as f64;
rtt_count[0] += 1;
@ -135,7 +135,7 @@ impl TimeBuffer {
country_summary.push((last_country, total_bytes, rtt));
// Sort by bytes downloaded descending
country_summary.sort_by(|a, b| b.1[1].cmp(&a.1[1]));
country_summary.sort_by(|a, b| b.1.up.cmp(&a.1.up));
country_summary
}
@ -170,10 +170,10 @@ impl TimeBuffer {
let (key, data, _analysis) = &v.data;
if key.local_ip.is_v4() {
// It's V4
v4_bytes_sent[0] += data.bytes_sent[0];
v4_bytes_sent[1] += data.bytes_sent[1];
v4_packets_sent[0] += data.packets_sent[0];
v4_packets_sent[1] += data.packets_sent[1];
v4_bytes_sent[0] += data.bytes_sent.down;
v4_bytes_sent[1] += data.bytes_sent.up;
v4_packets_sent[0] += data.packets_sent.down;
v4_packets_sent[1] += data.packets_sent.up;
if data.rtt[0].as_nanos() > 0 {
v4_rtt[0].push(data.rtt[0].as_nanos());
}
@ -182,10 +182,10 @@ impl TimeBuffer {
}
} else {
// It's V6
v6_bytes_sent[0] += data.bytes_sent[0];
v6_bytes_sent[1] += data.bytes_sent[1];
v6_packets_sent[0] += data.packets_sent[0];
v6_packets_sent[1] += data.packets_sent[1];
v6_bytes_sent[0] += data.bytes_sent.down;
v6_bytes_sent[1] += data.bytes_sent.up;
v6_packets_sent[0] += data.packets_sent.down;
v6_packets_sent[1] += data.packets_sent.up;
if data.rtt[0].as_nanos() > 0 {
v6_rtt[0].push(data.rtt[0].as_nanos());
}
@ -226,8 +226,8 @@ impl TimeBuffer {
let (_key, data, analysis) = &v.data;
let proto = analysis.protocol_analysis.to_string();
let entry = results.entry(proto).or_insert((0, 0));
entry.0 += data.bytes_sent[0];
entry.1 += data.bytes_sent[1];
entry.0 += data.bytes_sent.down;
entry.1 += data.bytes_sent.up;
});
let mut results = results.into_iter().collect::<Vec<(String, (u64, u64))>>();

View File

@ -5,6 +5,7 @@ use super::{flow_analysis::FlowAnalysis, RttData};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use once_cell::sync::Lazy;
use std::{collections::HashMap, sync::Mutex};
use lqos_utils::units::DownUpOrder;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct AsnId(pub u32);
@ -23,13 +24,13 @@ pub struct FlowbeeLocalData {
/// Time (nanos) when the connection was last seen
pub last_seen: u64,
/// Bytes transmitted
pub bytes_sent: [u64; 2],
pub bytes_sent: DownUpOrder<u64>,
/// Packets transmitted
pub packets_sent: [u64; 2],
pub packets_sent: DownUpOrder<u64>,
/// Rate estimate
pub rate_estimate_bps: [u32; 2],
pub rate_estimate_bps: DownUpOrder<u32>,
/// TCP Retransmission count (also counts duplicates)
pub tcp_retransmits: [u16; 2],
pub tcp_retransmits: DownUpOrder<u16>,
/// Has the connection ended?
/// 0 = Alive, 1 = FIN, 2 = RST
pub end_status: u8,

View File

@ -74,10 +74,10 @@ pub(crate) fn to_netflow_5(key: &FlowbeeKey, data: &FlowbeeLocalData) -> anyhow:
let src_ip = u32::from_ne_bytes(local.octets());
let dst_ip = u32::from_ne_bytes(remote.octets());
// Convert d_pkts to network order
let d_pkts = (data.packets_sent[0] as u32).to_be();
let d_octets = (data.bytes_sent[0] as u32).to_be();
let d_pkts2 = (data.packets_sent[1] as u32).to_be();
let d_octets2 = (data.bytes_sent[1] as u32).to_be();
let d_pkts = (data.packets_sent.down as u32).to_be();
let d_octets = (data.bytes_sent.down as u32).to_be();
let d_pkts2 = (data.packets_sent.up as u32).to_be();
let d_octets2 = (data.bytes_sent.up as u32).to_be();
let record = Netflow5Record {
src_addr: src_ip,

View File

@ -11,8 +11,8 @@ pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: us
let mut result = Vec::with_capacity(total_size as usize);
for (field_type, field_length) in template.iter() {
match (*field_type, *field_length) {
IN_BYTES => encode_u64(data.bytes_sent[direction], &mut result),
IN_PKTS => encode_u64(data.packets_sent[direction], &mut result),
IN_BYTES => encode_u64(data.bytes_sent.dir(direction), &mut result),
IN_PKTS => encode_u64(data.packets_sent.dir(direction), &mut result),
PROTOCOL => result.push(key.ip_protocol),
L4_SRC_PORT => encode_u16(src_port, &mut result),
L4_DST_PORT => encode_u16(dst_port, &mut result),

View File

@ -240,11 +240,11 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
bits_per_second: bytes.to_bits_from_bytes(),
packets_per_second: *packets,
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up),
tcp_retransmits: *tcp_retransmits,
},
)
.collect();
@ -292,11 +292,11 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
bits_per_second: bytes.to_bits_from_bytes(),
packets_per_second: *packets,
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up),
tcp_retransmits: *tcp_retransmits,
},
)
.collect();
@ -348,11 +348,11 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
bits_per_second: bytes.to_bits_from_bytes(),
packets_per_second: *packets,
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up),
tcp_retransmits: *tcp_retransmits,
},
)
.collect();
@ -401,11 +401,11 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: circuit_id.clone(),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
bits_per_second: bytes.to_bits_from_bytes(),
packets_per_second: *packets,
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: (tcp_retransmits.down, tcp_retransmits.up),
tcp_retransmits: *tcp_retransmits,
},
)
.collect();
@ -550,11 +550,11 @@ pub fn all_unknown_ips() -> BusResponse {
)| IpStats {
ip_address: ip.as_ip().to_string(),
circuit_id: String::new(),
bits_per_second: (bytes.down * 8, bytes.up * 8),
packets_per_second: (packets.down, packets.up),
bits_per_second: bytes.to_bits_from_bytes(),
packets_per_second: *packets,
median_tcp_rtt: *median_rtt,
tc_handle: *tc_handle,
tcp_retransmits: (0, 0),
tcp_retransmits: DownUpOrder::zeroed(),
},
)
.collect();
@ -589,7 +589,7 @@ pub fn dump_active_flows() -> BusResponse {
analysis: row.1.protocol_analysis.to_string(),
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: [row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()],
rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()),
}
})
.collect();
@ -615,29 +615,29 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
match flow_type {
TopFlowType::RateEstimate => {
table.sort_by(|a, b| {
let a_total = a.1 .0.rate_estimate_bps[0] + a.1 .0.rate_estimate_bps[1];
let b_total = b.1 .0.rate_estimate_bps[0] + b.1 .0.rate_estimate_bps[1];
let a_total = a.1 .0.rate_estimate_bps.sum();
let b_total = b.1 .0.rate_estimate_bps.sum();
b_total.cmp(&a_total)
});
}
TopFlowType::Bytes => {
table.sort_by(|a, b| {
let a_total = a.1 .0.bytes_sent[0] + a.1 .0.bytes_sent[1];
let b_total = b.1 .0.bytes_sent[0] + b.1 .0.bytes_sent[1];
let a_total = a.1 .0.bytes_sent.sum();
let b_total = b.1 .0.bytes_sent.sum();
b_total.cmp(&a_total)
});
}
TopFlowType::Packets => {
table.sort_by(|a, b| {
let a_total = a.1 .0.packets_sent[0] + a.1 .0.packets_sent[1];
let b_total = b.1 .0.packets_sent[0] + b.1 .0.packets_sent[1];
let a_total = a.1 .0.packets_sent.sum();
let b_total = b.1 .0.packets_sent.sum();
b_total.cmp(&a_total)
});
}
TopFlowType::Drops => {
table.sort_by(|a, b| {
let a_total = a.1 .0.tcp_retransmits[0] + a.1 .0.tcp_retransmits[1];
let b_total = b.1 .0.tcp_retransmits[0] + b.1 .0.tcp_retransmits[1];
let a_total = a.1 .0.tcp_retransmits.sum();
let b_total = b.1 .0.tcp_retransmits.sum();
b_total.cmp(&a_total)
});
}
@ -675,7 +675,7 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
analysis: flow.1.protocol_analysis.to_string(),
last_seen: flow.0.last_seen,
start_time: flow.0.start_time,
rtt_nanos: [flow.0.rtt[0].as_nanos(), flow.0.rtt[1].as_nanos()],
rtt_nanos: DownUpOrder::new(flow.0.rtt[0].as_nanos(), flow.0.rtt[1].as_nanos()),
}
})
.collect();
@ -714,7 +714,7 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
analysis: row.1.protocol_analysis.to_string(),
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: [row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()],
rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()),
}
})
.collect();

View File

@ -183,7 +183,7 @@ impl ThroughputTracker {
let mut rtt_circuit_tracker: FxHashMap<XdpIpAddress, [Vec<RttData>; 2]> = FxHashMap::default();
// Tracker for TCP retries. We're storing these per second.
let mut tcp_retries: FxHashMap<XdpIpAddress, [u64; 2]> = FxHashMap::default();
let mut tcp_retries: FxHashMap<XdpIpAddress, DownUpOrder<u64>> = FxHashMap::default();
// Track the expired keys
let mut expired_keys = Vec::new();
@ -242,10 +242,12 @@ impl ThroughputTracker {
// TCP Retries
if let Some(retries) = tcp_retries.get_mut(&key.local_ip) {
retries[0] += data.tcp_retransmits[0] as u64;
retries[1] += data.tcp_retransmits[1] as u64;
retries.down += data.tcp_retransmits.down as u64;
retries.up += data.tcp_retransmits.up as u64;
} else {
tcp_retries.insert(key.local_ip, [data.tcp_retransmits[0] as u64, data.tcp_retransmits[1] as u64]);
tcp_retries.insert(key.local_ip,
DownUpOrder::new(data.tcp_retransmits.down as u64, data.tcp_retransmits.up as u64)
);
}
if data.end_status != 0 {
@ -291,10 +293,10 @@ impl ThroughputTracker {
// Apply the new ones
for (local_ip, retries) in tcp_retries {
if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) {
tracker.tcp_retransmits.down = retries[0].saturating_sub(tracker.last_tcp_retransmits.down);
tracker.tcp_retransmits.up = retries[1].saturating_sub(tracker.last_tcp_retransmits.up);
tracker.last_tcp_retransmits.down = retries[0];
tracker.last_tcp_retransmits.up = retries[1];
tracker.tcp_retransmits.down = retries.down.saturating_sub(tracker.last_tcp_retransmits.down);
tracker.tcp_retransmits.up = retries.up.saturating_sub(tracker.last_tcp_retransmits.up);
tracker.last_tcp_retransmits.down = retries.down;
tracker.last_tcp_retransmits.up = retries.up;
}
}