In progress. The RTT data from the flows system is integrated into the high-level per-IP RTT tracker.

This commit is contained in:
Herbert Wolverson
2024-02-27 14:06:13 -06:00
parent b7c02d251d
commit f33d22faa0
4 changed files with 35 additions and 10 deletions

View File

@@ -250,7 +250,10 @@ pub unsafe fn iterate_rtt(
}
// TEMPORARY
iterate_flows();
let mut callback = |key: &FlowbeeKey, data: &FlowbeeData| {
println!("{:?} {:?}", key, data);
};
iterate_flows(&mut callback);
}
/// Iterate through the heimdall map and call the callback for each entry.
@@ -280,7 +283,9 @@ pub fn iterate_heimdall(
}
/// Iterate through the Flows 2 system tracker, retrieving all flows
pub fn iterate_flows() {
pub fn iterate_flows(
callback: &mut dyn FnMut(&FlowbeeKey, &FlowbeeData)
) {
unsafe {
if FLOWBEE_TRACKER.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
@@ -297,12 +302,8 @@ pub fn iterate_flows() {
}
}
let mut callback = |key: &FlowbeeKey, data: &FlowbeeData| {
log::info!("Flow: {:#?} -> {:#?}", key, data);
};
if let Some(iter) = FLOWBEE_TRACKER.as_mut() {
let _ = iter.for_each(&mut callback);
let _ = iter.for_each(callback);
}
}
}

View File

@@ -1,3 +1,5 @@
//! Data structures for the Flowbee eBPF program.
use lqos_utils::XdpIpAddress;
use zerocopy::FromBytes;

View File

@@ -32,4 +32,4 @@ pub use linux::num_possible_cpus;
pub use lqos_kernel::max_tracked_ips;
pub use tcp_rtt::{rtt_for_each, RttTrackingEntry};
pub use throughput::{throughput_for_each, HostCounter};
pub use bpf_iterator::iterate_heimdall;
pub use bpf_iterator::{iterate_heimdall, iterate_flows};

View File

@@ -3,7 +3,7 @@ use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
use lqos_sys::{rtt_for_each, throughput_for_each};
use lqos_sys::{iterate_flows, throughput_for_each};
use lqos_utils::XdpIpAddress;
pub struct ThroughputTracker {
@@ -170,7 +170,7 @@ impl ThroughputTracker {
pub(crate) fn apply_rtt_data(&self) {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
rtt_for_each(&mut |ip, rtt| {
/*rtt_for_each(&mut |ip, rtt| {
if rtt.has_fresh_data != 0 {
if let Some(mut tracker) = self.raw_data.get_mut(ip) {
tracker.recent_rtt_data = rtt.rtt;
@@ -183,6 +183,28 @@ impl ThroughputTracker {
}
}
}
});*/
iterate_flows(&mut |key, data| {
// 6 is TCP, not expired
if key.ip_protocol == 6 && data.end_status == 0 {
if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) {
let rtt_as_nanos = data.last_rtt[0];
let data_as_ms_times_10 = rtt_as_nanos / 10000;
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
}
tracker.recent_rtt_data[0] = data_as_ms_times_10 as u32;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
}
}
}
}
});
}