From 0c6ecd3e916a6992797287c95e87434fa58bf4bd Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Fri, 26 Jul 2024 14:39:43 -0500 Subject: [PATCH] Filter to non-ephemeral flows, and collect flow tracking data a bit better. --- .../src/node_manager/js_build/src/asn_explorer.js | 1 + .../src/node_manager/local_api/flow_explorer.rs | 4 ++++ .../flow_data/flow_analysis/finished_flows.rs | 4 ++++ .../lqosd/src/throughput_tracker/tracking_data.rs | 14 +++++++++++--- 4 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/rust/lqosd/src/node_manager/js_build/src/asn_explorer.js b/src/rust/lqosd/src/node_manager/js_build/src/asn_explorer.js index 82e90208..c0433d38 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/asn_explorer.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/asn_explorer.js @@ -159,6 +159,7 @@ function drawTimeline() { for (let i=0; i) -> Json> { let flows = all_flows_for_asn .iter() + .filter(|flow| { + // Total flow time > 2 seconds + flow.1.last_seen - flow.1.start_time > 2_000_000_000 + }) .map(|flow| { FlowTimeline { diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs index 7ed0535f..9f4186fc 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs @@ -284,6 +284,10 @@ impl TimeBuffer { let buffer = self.buffer.lock().unwrap(); buffer .iter() + .filter(|flow| { + // Total flow time > 2 seconds + flow.data.1.last_seen - flow.data.1.start_time > 2_000_000_000 + }) .map(|flow| flow.data.2.asn_id.0) .sorted() .dedup_with_count() diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 9d37fdc8..7f2cdd53 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -200,7 +200,6 @@ impl ThroughputTracker { get_flowbee_event_count_and_reset(); let since_boot = Duration::from(now); let expire = (since_boot - Duration::from_secs(timeout_seconds)).as_nanos() as u64; - let zeroed = DownUpOrder::zeroed(); // Tracker for per-circuit RTT data. We're losing some of the smoothness by sampling // every flow; the idea is to combine them into a single entry for the circuit. This @@ -234,6 +233,11 @@ impl ThroughputTracker { if data.tcp_retransmits.up != this_flow.0.tcp_retransmits.up { this_flow.0.retry_times_up.push(data.last_seen); } + + let change_since_last_time = data.bytes_sent.checked_sub_or_zero(this_flow.0.bytes_sent); + this_flow.0.throughput_buffer.push(change_since_last_time); + println!("{change_since_last_time:?}"); + this_flow.0.last_seen = data.last_seen; this_flow.0.bytes_sent = data.bytes_sent; this_flow.0.packets_sent = data.packets_sent; @@ -242,8 +246,12 @@ impl ThroughputTracker { this_flow.0.end_status = data.end_status; this_flow.0.tos = data.tos; this_flow.0.flags = data.flags; - let prev_bytes = this_flow.0.throughput_buffer.last().unwrap_or(&zeroed); - this_flow.0.throughput_buffer.push(data.bytes_sent.checked_sub_or_zero(*prev_bytes)); + + + + //let prev_bytes = this_flow.0.throughput_buffer.last().unwrap_or(&zeroed); + //this_flow.0.throughput_buffer.push(data.bytes_sent.checked_sub_or_zero(*prev_bytes)); + if let Some([up, down]) = rtt_samples.get(&key) { if up.as_nanos() != 0 { this_flow.0.rtt[0] = *up;