Change RTT tracking to reduce erroneous appearance of low-traffic flows.

* Change the "median_rtt" function signature to return an Option
  rather than 0.0 on "no data".
* Change the "median_rtt" function to return None for flows with
  less than 1 mbit of traffic.
* Change the "worst rtt" list to ignore any None entries.
* Change the "funnel" view to only report RTT when there is
  a value.
This commit is contained in:
Herbert Wolverson 2023-06-09 19:54:13 +00:00
parent 5346be5c84
commit 890c8b05ec
3 changed files with 23 additions and 10 deletions

View File

@ -84,7 +84,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
*te.key(), *te.key(),
te.bytes_per_second, te.bytes_per_second,
te.packets_per_second, te.packets_per_second,
te.median_latency(), te.median_latency().unwrap_or(0.0),
te.tc_handle, te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(), te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
) )
@ -124,13 +124,13 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
.iter() .iter()
.filter(|v| !v.key().as_ip().is_loopback()) .filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle)) .filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0) .filter(|te| te.median_latency().is_some())
.map(|te| { .map(|te| {
( (
*te.key(), *te.key(),
te.bytes_per_second, te.bytes_per_second,
te.packets_per_second, te.packets_per_second,
te.median_latency(), te.median_latency().unwrap_or(0.0),
te.tc_handle, te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(), te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
) )
@ -162,6 +162,7 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
.collect(); .collect();
BusResponse::WorstRtt(result) BusResponse::WorstRtt(result)
} }
pub fn best_n(start: u32, end: u32) -> BusResponse { pub fn best_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = { let mut full_list: Vec<TopList> = {
let tp_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed); let tp_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
@ -169,13 +170,13 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
.iter() .iter()
.filter(|v| !v.key().as_ip().is_loopback()) .filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle)) .filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0) .filter(|te| te.median_latency().is_some())
.map(|te| { .map(|te| {
( (
*te.key(), *te.key(),
te.bytes_per_second, te.bytes_per_second,
te.packets_per_second, te.packets_per_second,
te.median_latency(), te.median_latency().unwrap_or(0.0),
te.tc_handle, te.tc_handle,
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(), te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
) )
@ -311,7 +312,7 @@ pub fn all_unknown_ips() -> BusResponse {
*te.key(), *te.key(),
te.bytes, te.bytes,
te.packets, te.packets,
te.median_latency(), te.median_latency().unwrap_or(0.0),
te.tc_handle, te.tc_handle,
te.most_recent_cycle, te.most_recent_cycle,
) )

View File

@ -19,7 +19,17 @@ pub(crate) struct ThroughputEntry {
} }
impl ThroughputEntry { impl ThroughputEntry {
pub(crate) fn median_latency(&self) -> f32 { /// Calculate the median latency from the recent_rtt_data
/// Returns an optional, because there might not be any
/// data to track.
/// Also explicitly rejects 0 values, and flows that have
/// less than 1 Mb of data---they are usually long-polling.
pub(crate) fn median_latency(&self) -> Option<f32> {
// Reject sub 1Mb flows
if self.bytes.0 < 1_000_000 || self.bytes.1 < 1_000_000 {
return None;
}
let mut shifted: Vec<f32> = self let mut shifted: Vec<f32> = self
.recent_rtt_data .recent_rtt_data
.iter() .iter()
@ -27,9 +37,9 @@ impl ThroughputEntry {
.map(|n| *n as f32 / 100.0) .map(|n| *n as f32 / 100.0)
.collect(); .collect();
if shifted.len() < 5 { if shifted.len() < 5 {
return 0.0; return None;
} }
shifted.sort_by(|a, b| a.partial_cmp(b).unwrap()); shifted.sort_by(|a, b| a.partial_cmp(b).unwrap());
shifted[shifted.len() / 2] Some(shifted[shifted.len() / 2])
} }
} }

View File

@ -178,7 +178,9 @@ impl ThroughputTracker {
tracker.last_fresh_rtt_data_cycle = self_cycle; tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents { if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap(); let net_json = NETWORK_JSON.write().unwrap();
net_json.add_rtt_cycle(parents, tracker.median_latency()); if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
}
} }
} }
} }