diff --git a/src/rust/lqos_sys/src/bpf/lqos_kern.c b/src/rust/lqos_sys/src/bpf/lqos_kern.c index 20bfc1a3..bd445a11 100644 --- a/src/rust/lqos_sys/src/bpf/lqos_kern.c +++ b/src/rust/lqos_sys/src/bpf/lqos_kern.c @@ -336,21 +336,32 @@ struct bpf_iter__bpf_map_elem { void *value; }; +volatile const int NUM_CPUS = 0; + SEC("iter/bpf_map_elem") int throughput_reader(struct bpf_iter__bpf_map_elem *ctx) { // The sequence file struct seq_file *seq = ctx->meta->seq; - struct host_counter *counter = ctx->value; + void *counter = ctx->value; struct in6_addr *ip = ctx->key; + __u32 num_cpus = NUM_CPUS; // Bail on end if (counter == NULL || ip == NULL) { return 0; } + if (ctx->meta->seq_num == 0) { + bpf_seq_write(seq, &num_cpus, sizeof(__u32)); + } + bpf_seq_write(seq, ip, sizeof(struct in6_addr)); - bpf_seq_write(seq, counter, sizeof(struct host_counter)); + for (__u32 i=0; idownload_bytes, counter->upload_bytes); return 0; } @@ -362,12 +373,17 @@ int rtt_reader(struct bpf_iter__bpf_map_elem *ctx) struct seq_file *seq = ctx->meta->seq; struct rotating_performance *counter = ctx->value; struct in6_addr *ip = ctx->key; + __u32 num_cpus = 1; // Bail on end if (counter == NULL || ip == NULL) { return 0; } + if (ctx->meta->seq_num == 0) { + bpf_seq_write(seq, &num_cpus, sizeof(__u32)); + } + //BPF_SEQ_PRINTF(seq, "%d %d\n", counter->next_entry, counter->rtt[0]); bpf_seq_write(seq, ip, sizeof(struct in6_addr)); bpf_seq_write(seq, counter, sizeof(struct rotating_performance)); diff --git a/src/rust/lqos_sys/src/bpf_iterator.rs b/src/rust/lqos_sys/src/bpf_iterator.rs index 90b90a6a..f4ccb5e3 100644 --- a/src/rust/lqos_sys/src/bpf_iterator.rs +++ b/src/rust/lqos_sys/src/bpf_iterator.rs @@ -47,7 +47,7 @@ impl BpfMapIterator { let mut buf = Vec::new(); let bytes_read = file.read_to_end(&mut buf); match bytes_read { - Ok(_) => Ok(BpfMapIter { buffer: buf, index: 0, _phantom: PhantomData }), + Ok(_) => Ok(BpfMapIter::new(buf)), Err(e) => { log::error!("Unable to read from kernel map iterator file"); log::error!("{e:?}"); @@ -69,12 +69,26 @@ pub(crate) struct BpfMapIter { buffer: Vec, index: usize, _phantom: PhantomData<(K, V)>, + num_cpus: u32, } impl BpfMapIter { const KEY_SIZE: usize = std::mem::size_of::(); const VALUE_SIZE: usize = std::mem::size_of::(); const TOTAL_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE; + + fn new(buffer: Vec) -> Self { + let first_four : [u8; 4] = [buffer[0], buffer[1], buffer[2], buffer[3]]; + let num_cpus = u32::from_ne_bytes(first_four); + //println!("CPUs: {num_cpus}"); + + Self { + buffer, + index: std::mem::size_of::(), + _phantom: PhantomData, + num_cpus, + } + } } impl Iterator for BpfMapIter @@ -82,18 +96,23 @@ where K: FromBytes + Debug, V: FromBytes + Debug, { - type Item = (K, V); + type Item = (K, Vec); fn next(&mut self) -> Option { if self.index + Self::TOTAL_SIZE <= self.buffer.len() { let key = K::read_from(&self.buffer[self.index..self.index + Self::KEY_SIZE]); self.index += Self::KEY_SIZE; - let value = V::read_from( - &self.buffer - [self.index ..self.index + Self::VALUE_SIZE], - ); - self.index += Self::VALUE_SIZE; - Some((key.unwrap(), value.unwrap())) + let mut vals = Vec::new(); + for _ in 0..self.num_cpus { + let value = V::read_from( + &self.buffer + [self.index ..self.index + Self::VALUE_SIZE], + ); + vals.push(value.unwrap()); + self.index += Self::VALUE_SIZE; + } + //println!("{key:?} {vals:?}"); + Some((key.unwrap(), vals)) } else { None } @@ -118,7 +137,7 @@ static RTT_TRACKER: Lazy< Mutex>>, > = Lazy::new(|| Mutex::new(None)); -pub fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter)) { +pub fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter])) { let mut traffic = MAP_TRAFFIC.lock().unwrap(); if traffic.is_none() { let lock = BPF_SKELETON.lock().unwrap(); @@ -162,7 +181,7 @@ pub fn iterate_rtt(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) { if let Some(iter) = traffic.as_mut() { iter.iter().unwrap().for_each(|(k, v)| { - callback(&k, &v); + callback(&k, &v[0]); // Not per-CPU }); } } diff --git a/src/rust/lqos_sys/src/lqos_kernel.rs b/src/rust/lqos_sys/src/lqos_kernel.rs index ff67d196..d3db199a 100644 --- a/src/rust/lqos_sys/src/lqos_kernel.rs +++ b/src/rust/lqos_sys/src/lqos_kernel.rs @@ -14,7 +14,7 @@ use log::{info, warn}; use nix::libc::{geteuid, if_nametoindex}; use std::{ffi::{CString, c_void}, process::Command}; -use self::bpf::lqos_kern; +use self::bpf::{lqos_kern, libbpf_num_possible_cpus}; pub(crate) mod bpf { #![allow(warnings, unused)] @@ -123,6 +123,7 @@ pub fn attach_xdp_and_tc_to_interface( set_strict_mode()?; let skeleton = unsafe { let skeleton = open_kernel()?; + (*(*skeleton).rodata).NUM_CPUS = libbpf_num_possible_cpus(); (*(*skeleton).data).direction = match direction { InterfaceDirection::Internet => 1, InterfaceDirection::IspNetwork => 2, diff --git a/src/rust/lqos_sys/src/throughput.rs b/src/rust/lqos_sys/src/throughput.rs index c7749457..df5ef2d3 100644 --- a/src/rust/lqos_sys/src/throughput.rs +++ b/src/rust/lqos_sys/src/throughput.rs @@ -27,7 +27,7 @@ pub struct HostCounter { /// Iterates through all throughput entries, and sends them in turn to `callback`. /// This elides the need to clone or copy data. pub fn throughput_for_each( - callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter), + callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter]), ) { crate::bpf_iterator::iterate_throughput(callback); } diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index c54e2336..f204d8df 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -111,18 +111,18 @@ impl ThroughputTracker { if let Some(mut entry) = raw_data.get_mut(xdp_ip) { entry.bytes = (0, 0); entry.packets = (0, 0); - //for c in counts { - entry.bytes.0 += counts.download_bytes; - entry.bytes.1 += counts.upload_bytes; - entry.packets.0 += counts.download_packets; - entry.packets.1 += counts.upload_packets; - if counts.tc_handle != 0 { - entry.tc_handle = TcHandle::from_u32(counts.tc_handle); + for c in counts { + entry.bytes.0 += c.download_bytes; + entry.bytes.1 += c.upload_bytes; + entry.packets.0 += c.download_packets; + entry.packets.1 += c.upload_packets; + if c.tc_handle != 0 { + entry.tc_handle = TcHandle::from_u32(c.tc_handle); } - if counts.last_seen != 0 { - entry.last_seen = counts.last_seen; + if c.last_seen != 0 { + entry.last_seen = c.last_seen; } - //} + } if entry.packets != entry.prev_packets { entry.most_recent_cycle = self_cycle; @@ -131,8 +131,8 @@ impl ThroughputTracker { net_json.add_throughput_cycle( parents, ( - entry.bytes.0 - entry.prev_bytes.0, - entry.bytes.1 - entry.prev_bytes.1, + entry.bytes.0.saturating_sub(entry.prev_bytes.0), + entry.bytes.1.saturating_sub(entry.prev_bytes.1), ), ); } @@ -155,15 +155,15 @@ impl ThroughputTracker { last_fresh_rtt_data_cycle: 0, last_seen: 0, }; - //for c in counts { - entry.bytes.0 += counts.download_bytes; - entry.bytes.1 += counts.upload_bytes; - entry.packets.0 += counts.download_packets; - entry.packets.1 += counts.upload_packets; - if counts.tc_handle != 0 { - entry.tc_handle = TcHandle::from_u32(counts.tc_handle); + for c in counts { + entry.bytes.0 += c.download_bytes; + entry.bytes.1 += c.upload_bytes; + entry.packets.0 += c.download_packets; + entry.packets.1 += c.upload_packets; + if c.tc_handle != 0 { + entry.tc_handle = TcHandle::from_u32(c.tc_handle); } - //} + } raw_data.insert(*xdp_ip, entry); } });