Flow data: the RTT tracker now keeps track of if new data has arrived for a flow, and only reports an RTT if it has. This prevents stuck flows with no new values coming in from biasing our results.

This commit is contained in:
Herbert Wolverson 2024-03-16 09:16:05 -05:00
parent c9600f831d
commit dec1440b31
2 changed files with 21 additions and 6 deletions

View File

@ -151,7 +151,7 @@ impl FinishedFlowAnalysis {
impl FlowbeeRecipient for FinishedFlowAnalysis { impl FlowbeeRecipient for FinishedFlowAnalysis {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis) { fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis) {
log::info!("Finished flow analysis"); log::debug!("Finished flow analysis");
RECENT_FLOWS.push(TimeEntry { RECENT_FLOWS.push(TimeEntry {
time: std::time::SystemTime::now() time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH) .duration_since(std::time::UNIX_EPOCH)

View File

@ -21,6 +21,7 @@ struct RttBuffer {
index: usize, index: usize,
buffer: [[RttData; BUFFER_SIZE]; 2], buffer: [[RttData; BUFFER_SIZE]; 2],
last_seen: u64, last_seen: u64,
has_new_data: [bool; 2],
} }
impl RttBuffer { impl RttBuffer {
@ -34,12 +35,14 @@ impl RttBuffer {
index: 1, index: 1,
buffer: [empty, filled], buffer: [empty, filled],
last_seen, last_seen,
has_new_data: [false, true],
} }
} else { } else {
Self { Self {
index: 0, index: 0,
buffer: [filled, empty], buffer: [filled, empty],
last_seen, last_seen,
has_new_data: [true, false],
} }
} }
} }
@ -48,9 +51,14 @@ impl RttBuffer {
self.buffer[direction as usize][self.index] = RttData::from_nanos(reading); self.buffer[direction as usize][self.index] = RttData::from_nanos(reading);
self.index = (self.index + 1) % BUFFER_SIZE; self.index = (self.index + 1) % BUFFER_SIZE;
self.last_seen = last_seen; self.last_seen = last_seen;
self.has_new_data[direction as usize] = true;
} }
fn median(&self, direction: usize) -> RttData { fn median_new_data(&self, direction: usize) -> RttData {
if !self.has_new_data[direction] {
// Reject with no new data
return RttData::from_nanos(0);
}
let mut sorted = self.buffer[direction].iter().filter(|x| x.as_nanos() > 0).collect::<Vec<_>>(); let mut sorted = self.buffer[direction].iter().filter(|x| x.as_nanos() > 0).collect::<Vec<_>>();
if sorted.is_empty() { if sorted.is_empty() {
return RttData::from_nanos(0); return RttData::from_nanos(0);
@ -132,10 +140,17 @@ pub fn expire_rtt_flows() {
} }
pub fn flowbee_rtt_map() -> FxHashMap<FlowbeeKey, [RttData; 2]> { pub fn flowbee_rtt_map() -> FxHashMap<FlowbeeKey, [RttData; 2]> {
let lock = FLOW_RTT.lock().unwrap(); let mut lock = FLOW_RTT.lock().unwrap();
lock.iter() let result = lock.iter()
.map(|(k, v)| (k.clone(), [v.median(0), v.median(1)])) .map(|(k, v)| (k.clone(), [v.median_new_data(0), v.median_new_data(1)]))
.collect() .collect();
// Clear all fresh data labeling
lock.iter_mut().for_each(|(_, v)| {
v.has_new_data = [false, false];
});
result
} }
pub fn get_rtt_events_per_second() -> u64 { pub fn get_rtt_events_per_second() -> u64 {