From c9600f831de6d995dfa88b8bd44b449a2133f079 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Sat, 16 Mar 2024 09:08:31 -0500 Subject: [PATCH] RTT gathering improvement: RTTs are grouped per circuit, and then added as a single value to help preserve per-circuit sample stability. --- .../src/throughput_tracker/tracking_data.rs | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index e2924b7c..4027b112 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -2,6 +2,7 @@ use std::{sync::atomic::AtomicU64, time::Duration}; use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}}; use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use dashmap::DashMap; +use fxhash::FxHashMap; use lqos_bus::TcHandle; use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each}; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; @@ -183,6 +184,11 @@ impl ThroughputTracker { let since_boot = Duration::from(now); let expire = (since_boot - Duration::from_secs(timeout_seconds)).as_nanos() as u64; + // Tracker for per-circuit RTT data. We're losing some of the smoothness by sampling + // every flow; the idea is to combine them into a single entry for the circuit. This + // should limit outliers. + let mut rtt_circuit_tracker: FxHashMap; 2]> = FxHashMap::default(); + // Track the expired keys let mut expired_keys = Vec::new(); @@ -217,10 +223,17 @@ impl ThroughputTracker { } // TCP - we have RTT data? 6 is TCP - if key.ip_protocol == 6 && data.end_status == 0 { - if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) { + if key.ip_protocol == 6 && data.end_status == 0 && self.raw_data.contains_key(&key.local_ip) { if let Some(rtt) = rtt_samples.get(&key) { - for i in 0..2 { + // Add the RTT data to the per-circuit tracker + if let Some(tracker) = rtt_circuit_tracker.get_mut(&key.local_ip) { + tracker[0].push(rtt[0]); + tracker[1].push(rtt[1]); + } else { + rtt_circuit_tracker.insert(key.local_ip, [vec![rtt[0]], vec![rtt[1]]]); + } + + /*for i in 0..2 { if rtt[i].as_nanos() > 0 { // Shift left for i in 1..60 { @@ -235,18 +248,42 @@ impl ThroughputTracker { } } } - } + }*/ } if data.end_status != 0 { // The flow has ended. We need to remove it from the map. expired_keys.push(key.clone()); } - } } } }); // End flow iterator + // Merge in the per-flow RTT data into the per-circuit tracker + for (local_ip, rtt_data) in rtt_circuit_tracker { + let mut rtts = rtt_data[0].iter().filter(|r| r.as_nanos() > 0).collect::>(); + rtts.extend(rtt_data[1].iter().filter(|r| r.as_nanos() > 0)); + if !rtts.is_empty() { + rtts.sort(); + let median = rtts[rtts.len() / 2]; + if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) { + // Shift left + for i in 1..60 { + tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1]; + } + tracker.recent_rtt_data[0] = *median; + tracker.last_fresh_rtt_data_cycle = self_cycle; + if let Some(parents) = &tracker.network_json_parents { + let net_json = NETWORK_JSON.write().unwrap(); + if let Some(rtt) = tracker.median_latency() { + net_json.add_rtt_cycle(parents, rtt); + } + } + } + } + } + + // Key Expiration if !expired_keys.is_empty() { for key in expired_keys.iter() { // Send it off to netperf for analysis if we are supporting doing so.