diff --git a/src/rust/lqos_sys/src/bpf_iterator.rs b/src/rust/lqos_sys/src/bpf_iterator.rs index 20b7d36a..27378e49 100644 --- a/src/rust/lqos_sys/src/bpf_iterator.rs +++ b/src/rust/lqos_sys/src/bpf_iterator.rs @@ -250,7 +250,10 @@ pub unsafe fn iterate_rtt( } // TEMPORARY - iterate_flows(); + let mut callback = |key: &FlowbeeKey, data: &FlowbeeData| { + println!("{:?} {:?}", key, data); + }; + iterate_flows(&mut callback); } /// Iterate through the heimdall map and call the callback for each entry. @@ -280,7 +283,9 @@ pub fn iterate_heimdall( } /// Iterate through the Flows 2 system tracker, retrieving all flows -pub fn iterate_flows() { +pub fn iterate_flows( + callback: &mut dyn FnMut(&FlowbeeKey, &FlowbeeData) +) { unsafe { if FLOWBEE_TRACKER.is_none() { let lock = BPF_SKELETON.lock().unwrap(); @@ -297,12 +302,8 @@ pub fn iterate_flows() { } } - let mut callback = |key: &FlowbeeKey, data: &FlowbeeData| { - log::info!("Flow: {:#?} -> {:#?}", key, data); - }; - if let Some(iter) = FLOWBEE_TRACKER.as_mut() { - let _ = iter.for_each(&mut callback); + let _ = iter.for_each(callback); } } } \ No newline at end of file diff --git a/src/rust/lqos_sys/src/flowbee_data.rs b/src/rust/lqos_sys/src/flowbee_data.rs index 692c19bf..cebf79b5 100644 --- a/src/rust/lqos_sys/src/flowbee_data.rs +++ b/src/rust/lqos_sys/src/flowbee_data.rs @@ -1,3 +1,5 @@ +//! Data structures for the Flowbee eBPF program. + use lqos_utils::XdpIpAddress; use zerocopy::FromBytes; diff --git a/src/rust/lqos_sys/src/lib.rs b/src/rust/lqos_sys/src/lib.rs index a957f198..d2870968 100644 --- a/src/rust/lqos_sys/src/lib.rs +++ b/src/rust/lqos_sys/src/lib.rs @@ -32,4 +32,4 @@ pub use linux::num_possible_cpus; pub use lqos_kernel::max_tracked_ips; pub use tcp_rtt::{rtt_for_each, RttTrackingEntry}; pub use throughput::{throughput_for_each, HostCounter}; -pub use bpf_iterator::iterate_heimdall; \ No newline at end of file +pub use bpf_iterator::{iterate_heimdall, iterate_flows}; \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 246332f8..b8383ff6 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -3,7 +3,7 @@ use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use dashmap::DashMap; use lqos_bus::TcHandle; -use lqos_sys::{rtt_for_each, throughput_for_each}; +use lqos_sys::{iterate_flows, throughput_for_each}; use lqos_utils::XdpIpAddress; pub struct ThroughputTracker { @@ -170,7 +170,7 @@ impl ThroughputTracker { pub(crate) fn apply_rtt_data(&self) { let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed); - rtt_for_each(&mut |ip, rtt| { + /*rtt_for_each(&mut |ip, rtt| { if rtt.has_fresh_data != 0 { if let Some(mut tracker) = self.raw_data.get_mut(ip) { tracker.recent_rtt_data = rtt.rtt; @@ -183,6 +183,28 @@ impl ThroughputTracker { } } } + });*/ + + iterate_flows(&mut |key, data| { + // 6 is TCP, not expired + if key.ip_protocol == 6 && data.end_status == 0 { + if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) { + let rtt_as_nanos = data.last_rtt[0]; + let data_as_ms_times_10 = rtt_as_nanos / 10000; + // Shift left + for i in 1..60 { + tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1]; + } + tracker.recent_rtt_data[0] = data_as_ms_times_10 as u32; + tracker.last_fresh_rtt_data_cycle = self_cycle; + if let Some(parents) = &tracker.network_json_parents { + let net_json = NETWORK_JSON.write().unwrap(); + if let Some(rtt) = tracker.median_latency() { + net_json.add_rtt_cycle(parents, rtt); + } + } + } + } }); }