Use eBPF iterators (requires kernel 5.8+) instead of next_entry

Replaces the eBPF interface "bpf_map_get_next_key" followed by
"bpf_map_lookup_elem" cycle for iterating maps with a call to
the kernel-provided eBPF iterator service.

Currently only implemented for throughput and RTT tracking.
Further commits will tighten the scope and eventually remove
the legacy setup completely.

For 10k hosts, this reduces the number of system calls from
20,002 to 4. The remaining items are a futex lock (required
for safety), and an iterator call.
This commit is contained in:
Herbert Wolverson 2023-04-18 17:24:25 +00:00
parent cadd3aa800
commit f0ea3edcff
13 changed files with 367 additions and 45 deletions

1
src/rust/Cargo.lock generated
View File

@ -1532,6 +1532,7 @@ dependencies = [
"nix",
"once_cell",
"thiserror",
"zerocopy",
]
[[package]]

View File

@ -16,6 +16,7 @@ lqos_utils = { path = "../lqos_utils" }
once_cell = "1"
dashmap = "5"
thiserror = "1"
zerocopy = { version = "0.6.1", features = ["simd"] }
[build-dependencies]
bindgen = "0"

View File

@ -319,4 +319,59 @@ int bifrost(struct __sk_buff *skb)
return TC_ACT_UNSPEC;
}
/*
* Structs for map iteration programs
* See https://github.com/xdp-project/bpf-examples
*/
struct bpf_iter_meta {
struct seq_file *seq;
__u64 session_id;
__u64 seq_num;
} __attribute__((preserve_access_index));
struct bpf_iter__bpf_map_elem {
struct bpf_iter_meta *meta;
struct bpf_map *map;
void *key;
void *value;
};
SEC("iter/bpf_map_elem")
int throughput_reader(struct bpf_iter__bpf_map_elem *ctx)
{
// The sequence file
struct seq_file *seq = ctx->meta->seq;
struct host_counter *counter = ctx->value;
struct in6_addr *ip = ctx->key;
// Bail on end
if (counter == NULL || ip == NULL) {
return 0;
}
bpf_seq_write(seq, ip, sizeof(struct in6_addr));
bpf_seq_write(seq, counter, sizeof(struct host_counter));
//BPF_SEQ_PRINTF(seq, "%d %d\n", counter->download_bytes, counter->upload_bytes);
return 0;
}
SEC("iter/bpf_map_elem")
int rtt_reader(struct bpf_iter__bpf_map_elem *ctx)
{
// The sequence file
struct seq_file *seq = ctx->meta->seq;
struct rotating_performance *counter = ctx->value;
struct in6_addr *ip = ctx->key;
// Bail on end
if (counter == NULL || ip == NULL) {
return 0;
}
//BPF_SEQ_PRINTF(seq, "%d %d\n", counter->next_entry, counter->rtt[0]);
bpf_seq_write(seq, ip, sizeof(struct in6_addr));
bpf_seq_write(seq, counter, sizeof(struct rotating_performance));
return 0;
}
char _license[] SEC("license") = "GPL";

View File

@ -246,3 +246,77 @@ int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj)
out:
return err;
}
// Iterator code
#include <stdio.h>
#include <unistd.h>
struct bpf_link *setup_iterator_link(
struct bpf_program *prog,
struct bpf_map *map
) {
int map_fd; // File descriptor for the map itself
struct bpf_link *link; // Value to return with the link
union bpf_iter_link_info linfo = { 0 };
DECLARE_LIBBPF_OPTS(bpf_iter_attach_opts, iter_opts,
.link_info = &linfo,
.link_info_len = sizeof(linfo));
map_fd = bpf_map__fd(map);
if (map_fd < 0) {
fprintf(stderr, "bpf_map__fd() fails\n");
return NULL;
}
linfo.map.map_fd = map_fd;
link = bpf_program__attach_iter(prog, &iter_opts);
if (!link) {
fprintf(stderr, "bpf_program__attach_iter() fails\n");
return NULL;
}
return link;
}
int read_tp_buffer(struct bpf_program *prog, struct bpf_map *map)
{
struct bpf_link *link;
char buf[16] = {};
int iter_fd = -1, len;
int ret = 0;
int map_fd;
union bpf_iter_link_info linfo = { 0 };
DECLARE_LIBBPF_OPTS(bpf_iter_attach_opts, iter_opts,
.link_info = &linfo,
.link_info_len = sizeof(linfo));
map_fd = bpf_map__fd(map);
if (map_fd < 0) {
fprintf(stderr, "bpf_map__fd() fails\n");
return map_fd;
}
linfo.map.map_fd = map_fd;
link = bpf_program__attach_iter(prog, &iter_opts);
if (!link) {
fprintf(stderr, "bpf_program__attach_iter() fails\n");
return -1;
}
iter_fd = bpf_iter_create(bpf_link__fd(link));
if (iter_fd < 0) {
fprintf(stderr, "bpf_iter_create() fails\n");
ret = -1;
goto free_link;
}
/* not check contents, but ensure read() ends without error */
while ((len = read(iter_fd, buf, sizeof(buf) - 1)) > 0) {
buf[len] = 0;
printf("%s", buf);
}
printf("\n");
free_link:
if (iter_fd >= 0)
close(iter_fd);
bpf_link__destroy(link);
return 0;
}

View File

@ -1,5 +1,8 @@
#include "lqos_kern_skel.h"
#include <stdbool.h>
#include <linux/bpf.h>
#include <bpf/libbpf.h>
#include <bpf/bpf.h>
extern struct lqos_kern * lqos_kern_open();
extern int lqos_kern_load(struct lqos_kern * skel);
@ -8,4 +11,6 @@ extern int tc_detach_egress(int ifindex, bool verbose, bool flush_hook, const ch
extern int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj);
extern int tc_detach_ingress(int ifindex, bool verbose, bool flush_hook, const char * ifname);
extern __u64 max_tracker_ips();
extern void do_not_print();
extern void do_not_print();
int read_tp_buffer(struct bpf_program *prog, struct bpf_map *map);
struct bpf_link * setup_iterator_link(struct bpf_program *prog, struct bpf_map *map);

View File

@ -0,0 +1,168 @@
use crate::{
kernel_wrapper::BPF_SKELETON, lqos_kernel::bpf, HostCounter,
RttTrackingEntry,
};
use lqos_utils::XdpIpAddress;
use once_cell::sync::Lazy;
use std::{
fs::File, io::Read, marker::PhantomData, os::fd::FromRawFd, sync::Mutex, fmt::Debug,
};
use thiserror::Error;
use zerocopy::FromBytes;
struct BpfMapIterator<KEY, VALUE> {
link: *mut bpf::bpf_link,
_phantom: PhantomData<(KEY, VALUE)>,
}
unsafe impl<KEY, VALUE> Sync for BpfMapIterator<KEY, VALUE> {}
unsafe impl<KEY, VALUE> Send for BpfMapIterator<KEY, VALUE> {}
impl<KEY, VALUE> BpfMapIterator<KEY, VALUE> {
fn new(
program: *mut bpf::bpf_program,
map: *mut bpf::bpf_map,
) -> Result<Self, BpfIteratorError> {
let link = unsafe { bpf::setup_iterator_link(program, map) };
if !link.is_null() {
Ok(Self { link, _phantom: PhantomData })
} else {
Err(BpfIteratorError::FailedToLink)
}
}
fn as_file(&self) -> Result<File, BpfIteratorError> {
let link_fd = unsafe { bpf::bpf_link__fd(self.link) };
let iter_fd = unsafe { bpf::bpf_iter_create(link_fd) };
if iter_fd < 0 {
log::error!("Unable to create map file descriptor");
Err(BpfIteratorError::FailedToCreateFd)
} else {
unsafe { Ok(File::from_raw_fd(iter_fd)) }
}
}
fn iter(&self) -> Result<BpfMapIter<KEY, VALUE>, BpfIteratorError> {
let mut file = self.as_file()?;
let mut buf = Vec::new();
let bytes_read = file.read_to_end(&mut buf);
match bytes_read {
Ok(_) => Ok(BpfMapIter { buffer: buf, index: 0, _phantom: PhantomData }),
Err(e) => {
log::error!("Unable to read from kernel map iterator file");
log::error!("{e:?}");
Err(BpfIteratorError::UnableToCreateIterator)
}
}
}
}
impl<KEY, VALUE> Drop for BpfMapIterator<KEY, VALUE> {
fn drop(&mut self) {
unsafe {
bpf::bpf_link__destroy(self.link);
}
}
}
pub(crate) struct BpfMapIter<K, V> {
buffer: Vec<u8>,
index: usize,
_phantom: PhantomData<(K, V)>,
}
impl<K, V> BpfMapIter<K, V> {
const KEY_SIZE: usize = std::mem::size_of::<K>();
const VALUE_SIZE: usize = std::mem::size_of::<V>();
const TOTAL_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE;
}
impl<K, V> Iterator for BpfMapIter<K, V>
where
K: FromBytes + Debug,
V: FromBytes + Debug,
{
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
if self.index + Self::TOTAL_SIZE <= self.buffer.len() {
let key = K::read_from(&self.buffer[self.index..self.index + Self::KEY_SIZE]);
self.index += Self::KEY_SIZE;
let value = V::read_from(
&self.buffer
[self.index ..self.index + Self::VALUE_SIZE],
);
self.index += Self::VALUE_SIZE;
Some((key.unwrap(), value.unwrap()))
} else {
None
}
}
}
#[derive(Debug, Error)]
enum BpfIteratorError {
#[error("Failed to create iterator link")]
FailedToLink,
#[error("Failed to create file descriptor")]
FailedToCreateFd,
#[error("Iterator error")]
UnableToCreateIterator,
}
static MAP_TRAFFIC: Lazy<
Mutex<Option<BpfMapIterator<XdpIpAddress, HostCounter>>>,
> = Lazy::new(|| Mutex::new(None));
static RTT_TRACKER: Lazy<
Mutex<Option<BpfMapIterator<XdpIpAddress, RttTrackingEntry>>>,
> = Lazy::new(|| Mutex::new(None));
pub fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter)) {
let mut traffic = MAP_TRAFFIC.lock().unwrap();
if traffic.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
if let Some(skeleton) = lock.as_ref() {
let skeleton = skeleton.get_ptr();
if let Ok(iter) = unsafe {
BpfMapIterator::new(
(*skeleton).progs.throughput_reader,
(*skeleton).maps.map_traffic,
)
} {
*traffic = Some(iter);
}
}
}
if let Some(iter) = traffic.as_mut() {
iter.iter().unwrap().for_each(|(k, v)| {
//println!("{:?} {:?}", k, v);
callback(&k, &v);
});
}
}
pub fn iterate_rtt(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) {
let mut traffic = RTT_TRACKER.lock().unwrap();
if traffic.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
if let Some(skeleton) = lock.as_ref() {
let skeleton = skeleton.get_ptr();
if let Ok(iter) = unsafe {
BpfMapIterator::new(
(*skeleton).progs.rtt_reader,
(*skeleton).maps.rtt_tracker,
)
} {
*traffic = Some(iter);
}
}
}
if let Some(iter) = traffic.as_mut() {
iter.iter().unwrap().for_each(|(k, v)| {
callback(&k, &v);
});
}
}

View File

@ -94,6 +94,7 @@ where
value_ptr as *mut c_void,
);
//result.push((key.clone(), value.clone()));
println!("old callback");
callback(&key, &value);
prev_key = key_ptr;
}

View File

@ -1,8 +1,28 @@
use std::sync::Mutex;
use once_cell::sync::Lazy;
use crate::lqos_kernel::{
attach_xdp_and_tc_to_interface, unload_xdp_from_interface,
InterfaceDirection, bpf::ring_buffer_sample_fn,
InterfaceDirection, bpf::{ring_buffer_sample_fn, self},
};
/// Safer wrapper around pointers to `bpf::lqos_kern`. It really isn't
/// a great idea to be passing mutable pointers around like this, but the C
/// world insists on it.
pub(crate) struct LqosKernBpfWrapper {
ptr: *mut bpf::lqos_kern,
}
impl LqosKernBpfWrapper {
pub(crate) fn get_ptr(&self) -> *mut bpf::lqos_kern {
self.ptr
}
}
unsafe impl Sync for LqosKernBpfWrapper {}
unsafe impl Send for LqosKernBpfWrapper {}
pub(crate) static BPF_SKELETON: Lazy<Mutex<Option<LqosKernBpfWrapper>>> = Lazy::new(|| Mutex::new(None));
/// A wrapper-type that stores the interfaces to which the XDP and TC programs should
/// be attached. Performs the attachment process, and hooks "drop" to unattach the
/// programs when the structure falls out of scope.
@ -31,7 +51,7 @@ impl LibreQoSKernels {
to_isp: to_isp.to_string(),
on_a_stick: false,
};
attach_xdp_and_tc_to_interface(
let skeleton = attach_xdp_and_tc_to_interface(
&kernel.to_internet,
InterfaceDirection::Internet,
heimdall_event_handler,
@ -41,6 +61,7 @@ impl LibreQoSKernels {
InterfaceDirection::IspNetwork,
heimdall_event_handler,
)?;
BPF_SKELETON.lock().unwrap().replace(LqosKernBpfWrapper { ptr: skeleton });
Ok(kernel)
}
@ -66,12 +87,12 @@ impl LibreQoSKernels {
to_isp: String::new(),
on_a_stick: true,
};
attach_xdp_and_tc_to_interface(
let skeleton = attach_xdp_and_tc_to_interface(
&kernel.to_internet,
InterfaceDirection::OnAStick(internet_vlan, isp_vlan),
heimdall_event_handler,
)?;
BPF_SKELETON.lock().unwrap().replace(LqosKernBpfWrapper { ptr: skeleton });
Ok(kernel)
}
}

View File

@ -22,6 +22,7 @@ mod lqos_kernel;
mod tcp_rtt;
mod throughput;
mod linux;
mod bpf_iterator;
pub use ip_mapping::{
add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips,

View File

@ -14,6 +14,8 @@ use log::{info, warn};
use nix::libc::{geteuid, if_nametoindex};
use std::{ffi::{CString, c_void}, process::Command};
use self::bpf::lqos_kern;
pub(crate) mod bpf {
#![allow(warnings, unused)]
include!(concat!(env!("OUT_DIR"), "/bindings.rs"));
@ -114,7 +116,7 @@ pub fn attach_xdp_and_tc_to_interface(
interface_name: &str,
direction: InterfaceDirection,
heimdall_event_handler: bpf::ring_buffer_sample_fn,
) -> Result<()> {
) -> Result<*mut lqos_kern> {
check_root()?;
// Check the interface is valid
let interface_index = interface_name_to_index(interface_name)?;
@ -229,7 +231,8 @@ pub fn attach_xdp_and_tc_to_interface(
}
}
Ok(())
Ok(skeleton)
}
unsafe fn attach_xdp_best_available(

View File

@ -1,8 +1,10 @@
use crate::bpf_map::BpfMap;
use lqos_utils::XdpIpAddress;
use zerocopy::FromBytes;
use crate::bpf_iterator::iterate_rtt;
/// Entry from the XDP rtt_tracker map.
#[repr(C)]
#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, FromBytes)]
pub struct RttTrackingEntry {
/// Array containing TCP round-trip times. Convert to an `f32` and divide by `100.0` for actual numbers.
pub rtt: [u32; 60],
@ -29,10 +31,6 @@ impl Default for RttTrackingEntry {
/// Only IP addresses facing the ISP Network side are tracked.
///
/// Executes `callback` for each entry.
pub fn rtt_for_each(callback: &mut dyn FnMut(&[u8; 16], &RttTrackingEntry)) {
if let Ok(rtt_tracker) =
BpfMap::<[u8; 16], RttTrackingEntry>::from_path("/sys/fs/bpf/rtt_tracker")
{
rtt_tracker.for_each(callback);
}
pub fn rtt_for_each(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) {
iterate_rtt(callback);
}

View File

@ -1,10 +1,9 @@
use lqos_utils::XdpIpAddress;
use crate::{bpf_per_cpu_map::BpfPerCpuMap};
use zerocopy::FromBytes;
/// Representation of the XDP map from map_traffic
#[repr(C)]
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, FromBytes)]
pub struct HostCounter {
/// Download bytes counter (keeps incrementing)
pub download_bytes: u64,
@ -28,11 +27,7 @@ pub struct HostCounter {
/// Iterates through all throughput entries, and sends them in turn to `callback`.
/// This elides the need to clone or copy data.
pub fn throughput_for_each(
callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter]),
callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter),
) {
if let Ok(throughput) = BpfPerCpuMap::<XdpIpAddress, HostCounter>::from_path(
"/sys/fs/bpf/map_traffic",
) {
throughput.for_each(callback);
}
crate::bpf_iterator::iterate_throughput(callback);
}

View File

@ -111,18 +111,18 @@ impl ThroughputTracker {
if let Some(mut entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = (0, 0);
entry.packets = (0, 0);
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
//for c in counts {
entry.bytes.0 += counts.download_bytes;
entry.bytes.1 += counts.upload_bytes;
entry.packets.0 += counts.download_packets;
entry.packets.1 += counts.upload_packets;
if counts.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(counts.tc_handle);
}
if c.last_seen != 0 {
entry.last_seen = c.last_seen;
if counts.last_seen != 0 {
entry.last_seen = counts.last_seen;
}
}
//}
if entry.packets != entry.prev_packets {
entry.most_recent_cycle = self_cycle;
@ -155,15 +155,15 @@ impl ThroughputTracker {
last_fresh_rtt_data_cycle: 0,
last_seen: 0,
};
for c in counts {
entry.bytes.0 += c.download_bytes;
entry.bytes.1 += c.upload_bytes;
entry.packets.0 += c.download_packets;
entry.packets.1 += c.upload_packets;
if c.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(c.tc_handle);
//for c in counts {
entry.bytes.0 += counts.download_bytes;
entry.bytes.1 += counts.upload_bytes;
entry.packets.0 += counts.download_packets;
entry.packets.1 += counts.upload_packets;
if counts.tc_handle != 0 {
entry.tc_handle = TcHandle::from_u32(counts.tc_handle);
}
}
//}
raw_data.insert(*xdp_ip, entry);
}
});
@ -171,10 +171,9 @@ impl ThroughputTracker {
pub(crate) fn apply_rtt_data(&self) {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
rtt_for_each(&mut |raw_ip, rtt| {
rtt_for_each(&mut |ip, rtt| {
if rtt.has_fresh_data != 0 {
let ip = XdpIpAddress(*raw_ip);
if let Some(mut tracker) = self.raw_data.get_mut(&ip) {
if let Some(mut tracker) = self.raw_data.get_mut(ip) {
tracker.recent_rtt_data = rtt.rtt;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {