RTT gathering improvement: RTTs are grouped per circuit, and then added as a single value to help preserve per-circuit sample stability.

This commit is contained in:
Herbert Wolverson 2024-03-16 09:08:31 -05:00
parent b6eb08751c
commit c9600f831d

View File

@ -2,6 +2,7 @@ use std::{sync::atomic::AtomicU64, time::Duration};
use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}};
use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use fxhash::FxHashMap;
use lqos_bus::TcHandle;
use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
@ -183,6 +184,11 @@ impl ThroughputTracker {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(timeout_seconds)).as_nanos() as u64;
// Tracker for per-circuit RTT data. We're losing some of the smoothness by sampling
// every flow; the idea is to combine them into a single entry for the circuit. This
// should limit outliers.
let mut rtt_circuit_tracker: FxHashMap<XdpIpAddress, [Vec<RttData>; 2]> = FxHashMap::default();
// Track the expired keys
let mut expired_keys = Vec::new();
@ -217,10 +223,17 @@ impl ThroughputTracker {
}
// TCP - we have RTT data? 6 is TCP
if key.ip_protocol == 6 && data.end_status == 0 {
if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) {
if key.ip_protocol == 6 && data.end_status == 0 && self.raw_data.contains_key(&key.local_ip) {
if let Some(rtt) = rtt_samples.get(&key) {
for i in 0..2 {
// Add the RTT data to the per-circuit tracker
if let Some(tracker) = rtt_circuit_tracker.get_mut(&key.local_ip) {
tracker[0].push(rtt[0]);
tracker[1].push(rtt[1]);
} else {
rtt_circuit_tracker.insert(key.local_ip, [vec![rtt[0]], vec![rtt[1]]]);
}
/*for i in 0..2 {
if rtt[i].as_nanos() > 0 {
// Shift left
for i in 1..60 {
@ -235,18 +248,42 @@ impl ThroughputTracker {
}
}
}
}
}*/
}
if data.end_status != 0 {
// The flow has ended. We need to remove it from the map.
expired_keys.push(key.clone());
}
}
}
}
}); // End flow iterator
// Merge in the per-flow RTT data into the per-circuit tracker
for (local_ip, rtt_data) in rtt_circuit_tracker {
let mut rtts = rtt_data[0].iter().filter(|r| r.as_nanos() > 0).collect::<Vec<_>>();
rtts.extend(rtt_data[1].iter().filter(|r| r.as_nanos() > 0));
if !rtts.is_empty() {
rtts.sort();
let median = rtts[rtts.len() / 2];
if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) {
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
}
tracker.recent_rtt_data[0] = *median;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
}
}
}
}
}
// Key Expiration
if !expired_keys.is_empty() {
for key in expired_keys.iter() {
// Send it off to netperf for analysis if we are supporting doing so.