Use a dual-structure to store RTT in both directions.

This commit is contained in:
Herbert Wolverson 2024-03-15 12:29:47 -05:00
parent 9938a94caf
commit 9d0e536089
5 changed files with 25 additions and 22 deletions

View File

@ -165,7 +165,7 @@ pub struct FlowbeeSummaryData {
/// Raw TCP flags
pub flags: u8,
/// Recent RTT median
pub rtt_nanos: u64,
pub rtt_nanos: [u64; 2],
/// Remote ASN
pub remote_asn: u32,
/// Remote ASN Name

View File

@ -50,8 +50,8 @@ impl RttBuffer {
self.last_seen = last_seen;
}
fn median(&self) -> RttData {
let mut sorted = self.buffer[0].iter().filter(|x| x.as_nanos() > 0).collect::<Vec<_>>();
fn median(&self, direction: usize) -> RttData {
let mut sorted = self.buffer[direction].iter().filter(|x| x.as_nanos() > 0).collect::<Vec<_>>();
if sorted.is_empty() {
return RttData::from_nanos(0);
}
@ -131,10 +131,10 @@ pub fn expire_rtt_flows() {
}
}
pub fn flowbee_rtt_map() -> FxHashMap<FlowbeeKey, RttData> {
pub fn flowbee_rtt_map() -> FxHashMap<FlowbeeKey, [RttData; 2]> {
let lock = FLOW_RTT.lock().unwrap();
lock.iter()
.map(|(k, v)| (k.clone(), v.median()))
.map(|(k, v)| (k.clone(), [v.median(0), v.median(1)]))
.collect()
}

View File

@ -38,7 +38,7 @@ pub struct FlowbeeLocalData {
/// Raw TCP flags
pub flags: u8,
/// Recent RTT median
pub rtt: RttData,
pub rtt: [RttData; 2],
}
impl From<&FlowbeeData> for FlowbeeLocalData {
@ -53,7 +53,7 @@ impl From<&FlowbeeData> for FlowbeeLocalData {
end_status: data.end_status,
tos: data.tos,
flags: data.flags,
rtt: RttData::from_nanos(0),
rtt: [RttData::from_nanos(0); 2],
}
}
}

View File

@ -527,7 +527,7 @@ pub fn dump_active_flows() -> BusResponse {
analysis: row.1.protocol_analysis.to_string(),
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: row.0.rtt.as_nanos(),
rtt_nanos: [row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()],
}
})
.collect();
@ -613,7 +613,7 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
analysis: flow.1.protocol_analysis.to_string(),
last_seen: flow.0.last_seen,
start_time: flow.0.start_time,
rtt_nanos: flow.0.rtt.as_nanos(),
rtt_nanos: [flow.0.rtt[0].as_nanos(), flow.0.rtt[1].as_nanos()],
}
})
.collect();
@ -652,7 +652,7 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
analysis: row.1.protocol_analysis.to_string(),
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: row.0.rtt.as_nanos(),
rtt_nanos: [row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()],
}
})
.collect();

View File

@ -208,7 +208,8 @@ impl ThroughputTracker {
this_flow.0.end_status = data.end_status;
this_flow.0.tos = data.tos;
this_flow.0.flags = data.flags;
this_flow.0.rtt = rtt_samples.get(&key).copied().unwrap_or(RttData::from_nanos(0)).clone();
let rtt = rtt_samples.get(&key).copied().unwrap_or([RttData::from_nanos(0); 2]);
this_flow.0.rtt = rtt;
} else {
// Insert it into the map
let flow_analysis = FlowAnalysis::new(&key);
@ -219,17 +220,19 @@ impl ThroughputTracker {
if key.ip_protocol == 6 && data.end_status == 0 {
if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) {
if let Some(rtt) = rtt_samples.get(&key) {
if rtt.as_nanos() > 0 {
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
}
tracker.recent_rtt_data[0] = rtt.as_millis_times_100() as u32;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
for i in 0..2 {
if rtt[i].as_nanos() > 0 {
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
}
tracker.recent_rtt_data[0] = rtt[i].as_millis_times_100() as u32;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
}
}
}
}