Add a little padding to the output stream kernel side, to nicely align on 64-bit boundaries. Then use aligned casting for zero-copy userspace access to the data. Another smallish speed increase.

This commit is contained in:
Herbert Wolverson 2023-04-19 18:31:16 +00:00
parent 0c62268ed6
commit 6db2cb0245
2 changed files with 90 additions and 88 deletions

View File

@ -354,6 +354,7 @@ int throughput_reader(struct bpf_iter__bpf_map_elem *ctx)
if (ctx->meta->seq_num == 0) {
bpf_seq_write(seq, &num_cpus, sizeof(__u32));
bpf_seq_write(seq, &num_cpus, sizeof(__u32)); // Repeat for padding
}
bpf_seq_write(seq, ip, sizeof(struct in6_addr));
@ -382,6 +383,7 @@ int rtt_reader(struct bpf_iter__bpf_map_elem *ctx)
if (ctx->meta->seq_num == 0) {
bpf_seq_write(seq, &num_cpus, sizeof(__u32));
bpf_seq_write(seq, &num_cpus, sizeof(__u32)); // Padding
}
//BPF_SEQ_PRINTF(seq, "%d %d\n", counter->next_entry, counter->rtt[0]);

View File

@ -5,7 +5,7 @@ use crate::{
use lqos_utils::XdpIpAddress;
use once_cell::sync::Lazy;
use std::{
fs::File, io::Read, marker::PhantomData, os::fd::FromRawFd, fmt::Debug,
fmt::Debug, fs::File, io::Read, marker::PhantomData, os::fd::FromRawFd,
};
use thiserror::Error;
use zerocopy::FromBytes;
@ -15,7 +15,7 @@ use zerocopy::FromBytes;
/// be loaded. These are designed to be lazy-initialized on a
/// per-map basis. The `MAP_TRAFFIC` and `RTT_ITERATOR` types
/// implement this type.
///
///
/// Normal usage is to initialize the iterator and keep it around.
/// When you need to query the iterator, execute the `iter` method
/// and treat it as a normal Rust iterator.
@ -29,20 +29,22 @@ struct BpfMapIterator<KEY, VALUE> {
unsafe impl<KEY, VALUE> Sync for BpfMapIterator<KEY, VALUE> {}
unsafe impl<KEY, VALUE> Send for BpfMapIterator<KEY, VALUE> {}
impl<KEY, VALUE> BpfMapIterator<KEY, VALUE>
where VALUE: FromBytes + Debug + Clone + Default
impl<KEY, VALUE> BpfMapIterator<KEY, VALUE>
where
KEY: FromBytes + Debug + Clone,
VALUE: FromBytes + Debug + Clone + Default,
{
/// Create a new link to an eBPF map, that *must* have an iterator
/// function defined in the eBPF program - and exposed in the
/// skeleton.
///
///
/// # Safety
///
///
/// * This is unsafe, it relies on the skeleton having been properly
/// initialized prior to using this type.
///
///
/// # Arguments
///
///
/// * `program` - The eBPF program that points to the iterator function.
/// * `map` - The eBPF map that the iterator function will iterate over.
fn new(
@ -71,22 +73,85 @@ where VALUE: FromBytes + Debug + Clone + Default
}
}
/// Transform the iterator into a Rust iterator. This can then
/// be used like a regular iterator. The iterator owns the
/// file's buffer and provides references. The iterator MUST
/// outlive the functions that use it (you can clone all you
/// like).
fn iter(&self) -> Result<BpfMapIter<KEY, VALUE>, BpfIteratorError> {
const KEY_SIZE: usize = std::mem::size_of::<KEY>();
const VALUE_SIZE: usize = std::mem::size_of::<VALUE>();
const TOTAL_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE;
fn for_each_per_cpu(
&self,
callback: &mut dyn FnMut(&KEY, &[VALUE]),
) -> Result<(), BpfIteratorError> {
let mut file = self.as_file()?;
let mut buf = Vec::new();
let bytes_read = file.read_to_end(&mut buf);
match bytes_read {
Ok(_) => Ok(BpfMapIter::new(buf)),
Err(e) => {
log::error!("Unable to read from kernel map iterator file");
log::error!("{e:?}");
Err(BpfIteratorError::UnableToCreateIterator)
}
Ok(_) => {
let first_four_bytes: [u8; 4] = [buf[0], buf[1], buf[2], buf[3]];
let num_cpus = u32::from_ne_bytes(first_four_bytes) as usize;
let mut index = 8;
while index + Self::TOTAL_SIZE <= buf.len() {
let key_start = index;
let key_end = key_start + Self::KEY_SIZE;
let key_slice = &buf[key_start..key_end];
let (_head, key, _tail) = unsafe { &key_slice.align_to::<KEY>() };
let value_start = key_end;
let value_end = value_start + (num_cpus * Self::VALUE_SIZE);
let value_slice = &buf[value_start..value_end];
let (_head, values, _tail) =
unsafe { &value_slice.align_to::<VALUE>() };
debug_assert_eq!(values.len(), num_cpus);
callback(&key[0], values);
index += Self::KEY_SIZE + (num_cpus * Self::VALUE_SIZE);
}
Ok(())
}
}
}
fn for_each(
&self,
callback: &mut dyn FnMut(&KEY, &VALUE),
) -> Result<(), BpfIteratorError> {
let mut file = self.as_file()?;
let mut buf = Vec::new();
let bytes_read = file.read_to_end(&mut buf);
match bytes_read {
Err(e) => {
log::error!("Unable to read from kernel map iterator file");
log::error!("{e:?}");
Err(BpfIteratorError::UnableToCreateIterator)
}
Ok(_) => {
let first_four_bytes: [u8; 4] = [buf[0], buf[1], buf[2], buf[3]];
let num_cpus = u32::from_ne_bytes(first_four_bytes) as usize;
debug_assert_eq!(num_cpus, 1);
let mut index = 8;
while index + Self::TOTAL_SIZE <= buf.len() {
let key_start = index;
let key_end = key_start + Self::KEY_SIZE;
let key_slice = &buf[key_start..key_end];
let (_head, key, _tail) = unsafe { &key_slice.align_to::<KEY>() };
let value_start = key_end;
let value_end = value_start + Self::VALUE_SIZE;
let value_slice = &buf[value_start..value_end];
let (_head, values, _tail) =
unsafe { &value_slice.align_to::<VALUE>() };
callback(&key[0], &values[0]);
index += Self::KEY_SIZE + Self::VALUE_SIZE;
}
Ok(())
}
}
}
}
@ -101,70 +166,6 @@ impl<KEY, VALUE> Drop for BpfMapIterator<KEY, VALUE> {
}
}
/// Rust iterator for reading data from eBPF map iterators.
/// Transforms the data into the appropriate types, and returns
/// a tuple of the key, and a vector of values (1 per CPU for
/// CPU_MAP types, 1 entry for all others).
pub(crate) struct BpfMapIter<K, V> {
buffer: Vec<u8>,
index: usize,
_phantom: PhantomData<(K, V)>,
num_cpus: usize,
row_data: Vec<V>,
}
impl<K, V> BpfMapIter<K, V>
where V: FromBytes + Debug + Clone + Default
{
const KEY_SIZE: usize = std::mem::size_of::<K>();
const VALUE_SIZE: usize = std::mem::size_of::<V>();
const TOTAL_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE;
/// Transforms the buffer into a Rust iterator. The buffer
/// is *moved* into the iterator, which retains ownership
/// throughout.
fn new(buffer: Vec<u8>) -> Self {
let first_four : [u8; 4] = [buffer[0], buffer[1], buffer[2], buffer[3]];
let num_cpus = u32::from_ne_bytes(first_four) as usize;
//println!("CPUs: {num_cpus}");
Self {
buffer,
index: std::mem::size_of::<i32>(),
_phantom: PhantomData,
num_cpus,
row_data: vec![V::default(); num_cpus],
}
}
}
impl<K, V> Iterator for BpfMapIter<K, V>
where
K: FromBytes + Debug + Clone,
V: FromBytes + Debug + Clone + Default,
{
type Item = (K, Vec<V>);
fn next(&mut self) -> Option<Self::Item> {
if self.index + Self::TOTAL_SIZE <= self.buffer.len() {
let key = K::read_from(&self.buffer[self.index..self.index + Self::KEY_SIZE]);
self.index += Self::KEY_SIZE;
for cpu in 0..self.num_cpus {
let value = V::read_from(
&self.buffer
[self.index ..self.index + Self::VALUE_SIZE],
);
self.row_data[cpu] = value.unwrap();
self.index += Self::VALUE_SIZE;
}
//println!("{key:?} {vals:?}");
Some((key.unwrap(), self.row_data.clone()))
} else {
None
}
}
}
#[derive(Debug, Error)]
enum BpfIteratorError {
#[error("Failed to create iterator link")]
@ -183,7 +184,9 @@ static mut RTT_TRACKER: Lazy<
Option<BpfMapIterator<XdpIpAddress, RttTrackingEntry>>,
> = Lazy::new(|| None);
pub unsafe fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter])) {
pub unsafe fn iterate_throughput(
callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter]),
) {
if MAP_TRAFFIC.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
if let Some(skeleton) = lock.as_ref() {
@ -200,14 +203,13 @@ pub unsafe fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &[HostC
}
if let Some(iter) = MAP_TRAFFIC.as_mut() {
iter.iter().unwrap().for_each(|(k, v)| {
//println!("{:?} {:?}", k, v);
callback(&k, &v);
});
let _ = iter.for_each_per_cpu(callback);
}
}
pub unsafe fn iterate_rtt(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) {
pub unsafe fn iterate_rtt(
callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry),
) {
if RTT_TRACKER.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
if let Some(skeleton) = lock.as_ref() {
@ -224,8 +226,6 @@ pub unsafe fn iterate_rtt(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEn
}
if let Some(iter) = RTT_TRACKER.as_mut() {
iter.iter().unwrap().for_each(|(k, v)| {
callback(&k, &v[0]); // Not per-CPU
});
let _ = iter.for_each(callback);
}
}