From dec1440b31277e6bd7e4f52bf4f4451ce608e686 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Sat, 16 Mar 2024 09:16:05 -0500 Subject: [PATCH] Flow data: the RTT tracker now keeps track of if new data has arrived for a flow, and only reports an RTT if it has. This prevents stuck flows with no new values coming in from biasing our results. --- .../flow_data/flow_analysis/finished_flows.rs | 2 +- .../flow_analysis/kernel_ringbuffer.rs | 25 +++++++++++++++---- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs index 2981b2de..e0872afb 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs @@ -151,7 +151,7 @@ impl FinishedFlowAnalysis { impl FlowbeeRecipient for FinishedFlowAnalysis { fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis) { - log::info!("Finished flow analysis"); + log::debug!("Finished flow analysis"); RECENT_FLOWS.push(TimeEntry { time: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs index 62d59851..52c59d7f 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs @@ -21,6 +21,7 @@ struct RttBuffer { index: usize, buffer: [[RttData; BUFFER_SIZE]; 2], last_seen: u64, + has_new_data: [bool; 2], } impl RttBuffer { @@ -34,12 +35,14 @@ impl RttBuffer { index: 1, buffer: [empty, filled], last_seen, + has_new_data: [false, true], } } else { Self { index: 0, buffer: [filled, empty], last_seen, + has_new_data: [true, false], } } } @@ -48,9 +51,14 @@ impl RttBuffer { self.buffer[direction as usize][self.index] = RttData::from_nanos(reading); self.index = (self.index + 1) % BUFFER_SIZE; self.last_seen = last_seen; + self.has_new_data[direction as usize] = true; } - fn median(&self, direction: usize) -> RttData { + fn median_new_data(&self, direction: usize) -> RttData { + if !self.has_new_data[direction] { + // Reject with no new data + return RttData::from_nanos(0); + } let mut sorted = self.buffer[direction].iter().filter(|x| x.as_nanos() > 0).collect::>(); if sorted.is_empty() { return RttData::from_nanos(0); @@ -132,10 +140,17 @@ pub fn expire_rtt_flows() { } pub fn flowbee_rtt_map() -> FxHashMap { - let lock = FLOW_RTT.lock().unwrap(); - lock.iter() - .map(|(k, v)| (k.clone(), [v.median(0), v.median(1)])) - .collect() + let mut lock = FLOW_RTT.lock().unwrap(); + let result = lock.iter() + .map(|(k, v)| (k.clone(), [v.median_new_data(0), v.median_new_data(1)])) + .collect(); + + // Clear all fresh data labeling + lock.iter_mut().for_each(|(_, v)| { + v.has_new_data = [false, false]; + }); + + result } pub fn get_rtt_events_per_second() -> u64 {