Handle per-CPU maps correctly in eBPF iterators

The eBPF iterator interface is now per-CPU aware, and correctly
extracts a slice of data for each per-CPU map (or a slice of 1
when using a non-CPU divided map).
This commit is contained in:
Herbert Wolverson 2023-04-18 20:37:49 +00:00
parent f0ea3edcff
commit 082a16e532
5 changed files with 70 additions and 34 deletions

View File

@ -336,21 +336,32 @@ struct bpf_iter__bpf_map_elem {
void *value;
};
volatile const int NUM_CPUS = 0;
SEC("iter/bpf_map_elem")
int throughput_reader(struct bpf_iter__bpf_map_elem *ctx)
{
// The sequence file
struct seq_file *seq = ctx->meta->seq;
struct host_counter *counter = ctx->value;
void *counter = ctx->value;
struct in6_addr *ip = ctx->key;
__u32 num_cpus = NUM_CPUS;
// Bail on end
if (counter == NULL || ip == NULL) {
return 0;
}
if (ctx->meta->seq_num == 0) {
bpf_seq_write(seq, &num_cpus, sizeof(__u32));
}
bpf_seq_write(seq, ip, sizeof(struct in6_addr));
bpf_seq_write(seq, counter, sizeof(struct host_counter));
for (__u32 i=0; i<NUM_CPUS; i++) {
struct host_counter * content = counter+(i*48);
bpf_seq_write(seq, content, sizeof(struct host_counter));
}
//BPF_SEQ_PRINTF(seq, "%d %d\n", counter->download_bytes, counter->upload_bytes);
return 0;
}
@ -362,12 +373,17 @@ int rtt_reader(struct bpf_iter__bpf_map_elem *ctx)
struct seq_file *seq = ctx->meta->seq;
struct rotating_performance *counter = ctx->value;
struct in6_addr *ip = ctx->key;
__u32 num_cpus = 1;
// Bail on end
if (counter == NULL || ip == NULL) {
return 0;
}
if (ctx->meta->seq_num == 0) {
bpf_seq_write(seq, &num_cpus, sizeof(__u32));
}
//BPF_SEQ_PRINTF(seq, "%d %d\n", counter->next_entry, counter->rtt[0]);
bpf_seq_write(seq, ip, sizeof(struct in6_addr));
bpf_seq_write(seq, counter, sizeof(struct rotating_performance));

View File

@ -47,7 +47,7 @@ impl<KEY, VALUE> BpfMapIterator<KEY, VALUE> {
let mut buf = Vec::new();
let bytes_read = file.read_to_end(&mut buf);
match bytes_read {
Ok(_) => Ok(BpfMapIter { buffer: buf, index: 0, _phantom: PhantomData }),
Ok(_) => Ok(BpfMapIter::new(buf)),
Err(e) => {
log::error!("Unable to read from kernel map iterator file");
log::error!("{e:?}");
@ -69,12 +69,26 @@ pub(crate) struct BpfMapIter<K, V> {
buffer: Vec<u8>,
index: usize,
_phantom: PhantomData<(K, V)>,
num_cpus: u32,
}
impl<K, V> BpfMapIter<K, V> {
const KEY_SIZE: usize = std::mem::size_of::<K>();
const VALUE_SIZE: usize = std::mem::size_of::<V>();
const TOTAL_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE;
fn new(buffer: Vec<u8>) -> Self {
let first_four : [u8; 4] = [buffer[0], buffer[1], buffer[2], buffer[3]];
let num_cpus = u32::from_ne_bytes(first_four);
//println!("CPUs: {num_cpus}");
Self {
buffer,
index: std::mem::size_of::<i32>(),
_phantom: PhantomData,
num_cpus,
}
}
}
impl<K, V> Iterator for BpfMapIter<K, V>
@ -82,18 +96,23 @@ where
K: FromBytes + Debug,
V: FromBytes + Debug,
{
type Item = (K, V);
type Item = (K, Vec<V>);
fn next(&mut self) -> Option<Self::Item> {
if self.index + Self::TOTAL_SIZE <= self.buffer.len() {
let key = K::read_from(&self.buffer[self.index..self.index + Self::KEY_SIZE]);
self.index += Self::KEY_SIZE;
let value = V::read_from(
&self.buffer
[self.index ..self.index + Self::VALUE_SIZE],
);
self.index += Self::VALUE_SIZE;
Some((key.unwrap(), value.unwrap()))
let mut vals = Vec::new();
for _ in 0..self.num_cpus {
let value = V::read_from(
&self.buffer
[self.index ..self.index + Self::VALUE_SIZE],
);
vals.push(value.unwrap());
self.index += Self::VALUE_SIZE;
}
//println!("{key:?} {vals:?}");
Some((key.unwrap(), vals))
} else {
None
}
@ -118,7 +137,7 @@ static RTT_TRACKER: Lazy<
Mutex<Option<BpfMapIterator<XdpIpAddress, RttTrackingEntry>>>,
> = Lazy::new(|| Mutex::new(None));
pub fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter)) {
pub fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter])) {
let mut traffic = MAP_TRAFFIC.lock().unwrap();
if traffic.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
@ -162,7 +181,7 @@ pub fn iterate_rtt(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) {
if let Some(iter) = traffic.as_mut() {
iter.iter().unwrap().for_each(|(k, v)| {
callback(&k, &v);
callback(&k, &v[0]); // Not per-CPU
});
}
}

View File

@ -14,7 +14,7 @@ use log::{info, warn};
use nix::libc::{geteuid, if_nametoindex};
use std::{ffi::{CString, c_void}, process::Command};
use self::bpf::lqos_kern;
use self::bpf::{lqos_kern, libbpf_num_possible_cpus};
pub(crate) mod bpf {
#![allow(warnings, unused)]
@ -123,6 +123,7 @@ pub fn attach_xdp_and_tc_to_interface(
set_strict_mode()?;
let skeleton = unsafe {
let skeleton = open_kernel()?;
(*(*skeleton).rodata).NUM_CPUS = libbpf_num_possible_cpus();
(*(*skeleton).data).direction = match direction {
InterfaceDirection::Internet => 1,
InterfaceDirection::IspNetwork => 2,

View File

@ -27,7 +27,7 @@ pub struct HostCounter {
/// Iterates through all throughput entries, and sends them in turn to `callback`.
/// This elides the need to clone or copy data.
pub fn throughput_for_each(
callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter),
callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter]),
) {
crate::bpf_iterator::iterate_throughput(callback);
}

View File

@ -111,18 +111,18 @@ impl ThroughputTracker {
if let Some(mut entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = (0, 0);
entry.packets = (0, 0);
//for c in counts {
entry.bytes.0 += counts.download_bytes;
entry.bytes.1 += counts.upload_bytes;
entry.packets.0 += counts.download_packets;
entry.packets.1 += counts.upload_packets;
if counts.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(counts.tc_handle);
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
if counts.last_seen != 0 {
entry.last_seen = counts.last_seen;
if c.last_seen != 0 {
entry.last_seen = c.last_seen;
}
//}
}
if entry.packets != entry.prev_packets {
entry.most_recent_cycle = self_cycle;
@ -131,8 +131,8 @@ impl ThroughputTracker {
net_json.add_throughput_cycle(
parents,
(
entry.bytes.0 - entry.prev_bytes.0,
entry.bytes.1 - entry.prev_bytes.1,
entry.bytes.0.saturating_sub(entry.prev_bytes.0),
entry.bytes.1.saturating_sub(entry.prev_bytes.1),
),
);
}
@ -155,15 +155,15 @@ impl ThroughputTracker {
last_fresh_rtt_data_cycle: 0,
last_seen: 0,
};
//for c in counts {
entry.bytes.0 += counts.download_bytes;
entry.bytes.1 += counts.upload_bytes;
entry.packets.0 += counts.download_packets;
entry.packets.1 += counts.upload_packets;
if counts.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(counts.tc_handle);
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
}
//}
}
raw_data.insert(*xdp_ip, entry);
}
});