From f0ea3edcff95c68eaa7fe7ffa1f73af1e78ceef6 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Tue, 18 Apr 2023 17:24:25 +0000 Subject: [PATCH] Use eBPF iterators (requires kernel 5.8+) instead of next_entry Replaces the eBPF interface "bpf_map_get_next_key" followed by "bpf_map_lookup_elem" cycle for iterating maps with a call to the kernel-provided eBPF iterator service. Currently only implemented for throughput and RTT tracking. Further commits will tighten the scope and eventually remove the legacy setup completely. For 10k hosts, this reduces the number of system calls from 20,002 to 4. The remaining items are a futex lock (required for safety), and an iterator call. --- src/rust/Cargo.lock | 1 + src/rust/lqos_sys/Cargo.toml | 1 + src/rust/lqos_sys/src/bpf/lqos_kern.c | 55 ++++++ src/rust/lqos_sys/src/bpf/wrapper.c | 74 ++++++++ src/rust/lqos_sys/src/bpf/wrapper.h | 7 +- src/rust/lqos_sys/src/bpf_iterator.rs | 168 ++++++++++++++++++ src/rust/lqos_sys/src/bpf_map.rs | 1 + src/rust/lqos_sys/src/kernel_wrapper.rs | 29 ++- src/rust/lqos_sys/src/lib.rs | 1 + src/rust/lqos_sys/src/lqos_kernel.rs | 7 +- src/rust/lqos_sys/src/tcp_rtt.rs | 14 +- src/rust/lqos_sys/src/throughput.rs | 13 +- .../src/throughput_tracker/tracking_data.rs | 41 +++-- 13 files changed, 367 insertions(+), 45 deletions(-) create mode 100644 src/rust/lqos_sys/src/bpf_iterator.rs diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 160a8b5c..189785a0 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1532,6 +1532,7 @@ dependencies = [ "nix", "once_cell", "thiserror", + "zerocopy", ] [[package]] diff --git a/src/rust/lqos_sys/Cargo.toml b/src/rust/lqos_sys/Cargo.toml index a743824f..dcffa3a9 100644 --- a/src/rust/lqos_sys/Cargo.toml +++ b/src/rust/lqos_sys/Cargo.toml @@ -16,6 +16,7 @@ lqos_utils = { path = "../lqos_utils" } once_cell = "1" dashmap = "5" thiserror = "1" +zerocopy = { version = "0.6.1", features = ["simd"] } [build-dependencies] bindgen = "0" diff --git a/src/rust/lqos_sys/src/bpf/lqos_kern.c b/src/rust/lqos_sys/src/bpf/lqos_kern.c index 9e4e5295..20bfc1a3 100644 --- a/src/rust/lqos_sys/src/bpf/lqos_kern.c +++ b/src/rust/lqos_sys/src/bpf/lqos_kern.c @@ -319,4 +319,59 @@ int bifrost(struct __sk_buff *skb) return TC_ACT_UNSPEC; } +/* + * Structs for map iteration programs + * See https://github.com/xdp-project/bpf-examples + */ +struct bpf_iter_meta { + struct seq_file *seq; + __u64 session_id; + __u64 seq_num; +} __attribute__((preserve_access_index)); + +struct bpf_iter__bpf_map_elem { + struct bpf_iter_meta *meta; + struct bpf_map *map; + void *key; + void *value; +}; + +SEC("iter/bpf_map_elem") +int throughput_reader(struct bpf_iter__bpf_map_elem *ctx) +{ + // The sequence file + struct seq_file *seq = ctx->meta->seq; + struct host_counter *counter = ctx->value; + struct in6_addr *ip = ctx->key; + + // Bail on end + if (counter == NULL || ip == NULL) { + return 0; + } + + bpf_seq_write(seq, ip, sizeof(struct in6_addr)); + bpf_seq_write(seq, counter, sizeof(struct host_counter)); + //BPF_SEQ_PRINTF(seq, "%d %d\n", counter->download_bytes, counter->upload_bytes); + return 0; +} + +SEC("iter/bpf_map_elem") +int rtt_reader(struct bpf_iter__bpf_map_elem *ctx) +{ + // The sequence file + struct seq_file *seq = ctx->meta->seq; + struct rotating_performance *counter = ctx->value; + struct in6_addr *ip = ctx->key; + + // Bail on end + if (counter == NULL || ip == NULL) { + return 0; + } + + //BPF_SEQ_PRINTF(seq, "%d %d\n", counter->next_entry, counter->rtt[0]); + bpf_seq_write(seq, ip, sizeof(struct in6_addr)); + bpf_seq_write(seq, counter, sizeof(struct rotating_performance)); + return 0; +} + char _license[] SEC("license") = "GPL"; diff --git a/src/rust/lqos_sys/src/bpf/wrapper.c b/src/rust/lqos_sys/src/bpf/wrapper.c index 8ca2f74d..94ae7914 100644 --- a/src/rust/lqos_sys/src/bpf/wrapper.c +++ b/src/rust/lqos_sys/src/bpf/wrapper.c @@ -246,3 +246,77 @@ int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj) out: return err; } + +// Iterator code +#include +#include + +struct bpf_link *setup_iterator_link( + struct bpf_program *prog, + struct bpf_map *map +) { + int map_fd; // File descriptor for the map itself + struct bpf_link *link; // Value to return with the link + union bpf_iter_link_info linfo = { 0 }; + DECLARE_LIBBPF_OPTS(bpf_iter_attach_opts, iter_opts, + .link_info = &linfo, + .link_info_len = sizeof(linfo)); + + map_fd = bpf_map__fd(map); + if (map_fd < 0) { + fprintf(stderr, "bpf_map__fd() fails\n"); + return NULL; + } + linfo.map.map_fd = map_fd; + + link = bpf_program__attach_iter(prog, &iter_opts); + if (!link) { + fprintf(stderr, "bpf_program__attach_iter() fails\n"); + return NULL; + } + return link; +} + +int read_tp_buffer(struct bpf_program *prog, struct bpf_map *map) +{ + struct bpf_link *link; + char buf[16] = {}; + int iter_fd = -1, len; + int ret = 0; + int map_fd; + + union bpf_iter_link_info linfo = { 0 }; + DECLARE_LIBBPF_OPTS(bpf_iter_attach_opts, iter_opts, + .link_info = &linfo, + .link_info_len = sizeof(linfo)); + + map_fd = bpf_map__fd(map); + if (map_fd < 0) { + fprintf(stderr, "bpf_map__fd() fails\n"); + return map_fd; + } + linfo.map.map_fd = map_fd; + + link = bpf_program__attach_iter(prog, &iter_opts); + if (!link) { + fprintf(stderr, "bpf_program__attach_iter() fails\n"); + return -1; + } + iter_fd = bpf_iter_create(bpf_link__fd(link)); + if (iter_fd < 0) { + fprintf(stderr, "bpf_iter_create() fails\n"); + ret = -1; + goto free_link; + } + /* not check contents, but ensure read() ends without error */ + while ((len = read(iter_fd, buf, sizeof(buf) - 1)) > 0) { + buf[len] = 0; + printf("%s", buf); + } + printf("\n"); +free_link: + if (iter_fd >= 0) + close(iter_fd); + bpf_link__destroy(link); + return 0; +} \ No newline at end of file diff --git a/src/rust/lqos_sys/src/bpf/wrapper.h b/src/rust/lqos_sys/src/bpf/wrapper.h index a6c8e074..e24a3dc0 100644 --- a/src/rust/lqos_sys/src/bpf/wrapper.h +++ b/src/rust/lqos_sys/src/bpf/wrapper.h @@ -1,5 +1,8 @@ #include "lqos_kern_skel.h" #include +#include +#include +#include extern struct lqos_kern * lqos_kern_open(); extern int lqos_kern_load(struct lqos_kern * skel); @@ -8,4 +11,6 @@ extern int tc_detach_egress(int ifindex, bool verbose, bool flush_hook, const ch extern int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj); extern int tc_detach_ingress(int ifindex, bool verbose, bool flush_hook, const char * ifname); extern __u64 max_tracker_ips(); -extern void do_not_print(); \ No newline at end of file +extern void do_not_print(); +int read_tp_buffer(struct bpf_program *prog, struct bpf_map *map); +struct bpf_link * setup_iterator_link(struct bpf_program *prog, struct bpf_map *map); diff --git a/src/rust/lqos_sys/src/bpf_iterator.rs b/src/rust/lqos_sys/src/bpf_iterator.rs new file mode 100644 index 00000000..90b90a6a --- /dev/null +++ b/src/rust/lqos_sys/src/bpf_iterator.rs @@ -0,0 +1,168 @@ +use crate::{ + kernel_wrapper::BPF_SKELETON, lqos_kernel::bpf, HostCounter, + RttTrackingEntry, +}; +use lqos_utils::XdpIpAddress; +use once_cell::sync::Lazy; +use std::{ + fs::File, io::Read, marker::PhantomData, os::fd::FromRawFd, sync::Mutex, fmt::Debug, +}; +use thiserror::Error; +use zerocopy::FromBytes; + +struct BpfMapIterator { + link: *mut bpf::bpf_link, + _phantom: PhantomData<(KEY, VALUE)>, +} + +unsafe impl Sync for BpfMapIterator {} +unsafe impl Send for BpfMapIterator {} + +impl BpfMapIterator { + fn new( + program: *mut bpf::bpf_program, + map: *mut bpf::bpf_map, + ) -> Result { + let link = unsafe { bpf::setup_iterator_link(program, map) }; + if !link.is_null() { + Ok(Self { link, _phantom: PhantomData }) + } else { + Err(BpfIteratorError::FailedToLink) + } + } + + fn as_file(&self) -> Result { + let link_fd = unsafe { bpf::bpf_link__fd(self.link) }; + let iter_fd = unsafe { bpf::bpf_iter_create(link_fd) }; + if iter_fd < 0 { + log::error!("Unable to create map file descriptor"); + Err(BpfIteratorError::FailedToCreateFd) + } else { + unsafe { Ok(File::from_raw_fd(iter_fd)) } + } + } + + fn iter(&self) -> Result, BpfIteratorError> { + let mut file = self.as_file()?; + let mut buf = Vec::new(); + let bytes_read = file.read_to_end(&mut buf); + match bytes_read { + Ok(_) => Ok(BpfMapIter { buffer: buf, index: 0, _phantom: PhantomData }), + Err(e) => { + log::error!("Unable to read from kernel map iterator file"); + log::error!("{e:?}"); + Err(BpfIteratorError::UnableToCreateIterator) + } + } + } +} + +impl Drop for BpfMapIterator { + fn drop(&mut self) { + unsafe { + bpf::bpf_link__destroy(self.link); + } + } +} + +pub(crate) struct BpfMapIter { + buffer: Vec, + index: usize, + _phantom: PhantomData<(K, V)>, +} + +impl BpfMapIter { + const KEY_SIZE: usize = std::mem::size_of::(); + const VALUE_SIZE: usize = std::mem::size_of::(); + const TOTAL_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE; +} + +impl Iterator for BpfMapIter +where + K: FromBytes + Debug, + V: FromBytes + Debug, +{ + type Item = (K, V); + + fn next(&mut self) -> Option { + if self.index + Self::TOTAL_SIZE <= self.buffer.len() { + let key = K::read_from(&self.buffer[self.index..self.index + Self::KEY_SIZE]); + self.index += Self::KEY_SIZE; + let value = V::read_from( + &self.buffer + [self.index ..self.index + Self::VALUE_SIZE], + ); + self.index += Self::VALUE_SIZE; + Some((key.unwrap(), value.unwrap())) + } else { + None + } + } +} + +#[derive(Debug, Error)] +enum BpfIteratorError { + #[error("Failed to create iterator link")] + FailedToLink, + #[error("Failed to create file descriptor")] + FailedToCreateFd, + #[error("Iterator error")] + UnableToCreateIterator, +} + +static MAP_TRAFFIC: Lazy< + Mutex>>, +> = Lazy::new(|| Mutex::new(None)); + +static RTT_TRACKER: Lazy< + Mutex>>, +> = Lazy::new(|| Mutex::new(None)); + +pub fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter)) { + let mut traffic = MAP_TRAFFIC.lock().unwrap(); + if traffic.is_none() { + let lock = BPF_SKELETON.lock().unwrap(); + if let Some(skeleton) = lock.as_ref() { + let skeleton = skeleton.get_ptr(); + if let Ok(iter) = unsafe { + BpfMapIterator::new( + (*skeleton).progs.throughput_reader, + (*skeleton).maps.map_traffic, + ) + } { + *traffic = Some(iter); + } + } + } + + if let Some(iter) = traffic.as_mut() { + iter.iter().unwrap().for_each(|(k, v)| { + //println!("{:?} {:?}", k, v); + callback(&k, &v); + }); + } +} + +pub fn iterate_rtt(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) { + let mut traffic = RTT_TRACKER.lock().unwrap(); + if traffic.is_none() { + let lock = BPF_SKELETON.lock().unwrap(); + if let Some(skeleton) = lock.as_ref() { + let skeleton = skeleton.get_ptr(); + if let Ok(iter) = unsafe { + BpfMapIterator::new( + (*skeleton).progs.rtt_reader, + (*skeleton).maps.rtt_tracker, + ) + } { + *traffic = Some(iter); + } + } + } + + if let Some(iter) = traffic.as_mut() { + iter.iter().unwrap().for_each(|(k, v)| { + callback(&k, &v); + }); + } +} diff --git a/src/rust/lqos_sys/src/bpf_map.rs b/src/rust/lqos_sys/src/bpf_map.rs index 6f6f3043..f52aa7e1 100644 --- a/src/rust/lqos_sys/src/bpf_map.rs +++ b/src/rust/lqos_sys/src/bpf_map.rs @@ -94,6 +94,7 @@ where value_ptr as *mut c_void, ); //result.push((key.clone(), value.clone())); + println!("old callback"); callback(&key, &value); prev_key = key_ptr; } diff --git a/src/rust/lqos_sys/src/kernel_wrapper.rs b/src/rust/lqos_sys/src/kernel_wrapper.rs index 6375a83f..1c6cb133 100644 --- a/src/rust/lqos_sys/src/kernel_wrapper.rs +++ b/src/rust/lqos_sys/src/kernel_wrapper.rs @@ -1,8 +1,28 @@ +use std::sync::Mutex; +use once_cell::sync::Lazy; use crate::lqos_kernel::{ attach_xdp_and_tc_to_interface, unload_xdp_from_interface, - InterfaceDirection, bpf::ring_buffer_sample_fn, + InterfaceDirection, bpf::{ring_buffer_sample_fn, self}, }; +/// Safer wrapper around pointers to `bpf::lqos_kern`. It really isn't +/// a great idea to be passing mutable pointers around like this, but the C +/// world insists on it. +pub(crate) struct LqosKernBpfWrapper { + ptr: *mut bpf::lqos_kern, +} + +impl LqosKernBpfWrapper { + pub(crate) fn get_ptr(&self) -> *mut bpf::lqos_kern { + self.ptr + } +} + +unsafe impl Sync for LqosKernBpfWrapper {} +unsafe impl Send for LqosKernBpfWrapper {} + +pub(crate) static BPF_SKELETON: Lazy>> = Lazy::new(|| Mutex::new(None)); + /// A wrapper-type that stores the interfaces to which the XDP and TC programs should /// be attached. Performs the attachment process, and hooks "drop" to unattach the /// programs when the structure falls out of scope. @@ -31,7 +51,7 @@ impl LibreQoSKernels { to_isp: to_isp.to_string(), on_a_stick: false, }; - attach_xdp_and_tc_to_interface( + let skeleton = attach_xdp_and_tc_to_interface( &kernel.to_internet, InterfaceDirection::Internet, heimdall_event_handler, @@ -41,6 +61,7 @@ impl LibreQoSKernels { InterfaceDirection::IspNetwork, heimdall_event_handler, )?; + BPF_SKELETON.lock().unwrap().replace(LqosKernBpfWrapper { ptr: skeleton }); Ok(kernel) } @@ -66,12 +87,12 @@ impl LibreQoSKernels { to_isp: String::new(), on_a_stick: true, }; - attach_xdp_and_tc_to_interface( + let skeleton = attach_xdp_and_tc_to_interface( &kernel.to_internet, InterfaceDirection::OnAStick(internet_vlan, isp_vlan), heimdall_event_handler, )?; - + BPF_SKELETON.lock().unwrap().replace(LqosKernBpfWrapper { ptr: skeleton }); Ok(kernel) } } diff --git a/src/rust/lqos_sys/src/lib.rs b/src/rust/lqos_sys/src/lib.rs index 40b56dae..c3c64aba 100644 --- a/src/rust/lqos_sys/src/lib.rs +++ b/src/rust/lqos_sys/src/lib.rs @@ -22,6 +22,7 @@ mod lqos_kernel; mod tcp_rtt; mod throughput; mod linux; +mod bpf_iterator; pub use ip_mapping::{ add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips, diff --git a/src/rust/lqos_sys/src/lqos_kernel.rs b/src/rust/lqos_sys/src/lqos_kernel.rs index 89a132d3..ff67d196 100644 --- a/src/rust/lqos_sys/src/lqos_kernel.rs +++ b/src/rust/lqos_sys/src/lqos_kernel.rs @@ -14,6 +14,8 @@ use log::{info, warn}; use nix::libc::{geteuid, if_nametoindex}; use std::{ffi::{CString, c_void}, process::Command}; +use self::bpf::lqos_kern; + pub(crate) mod bpf { #![allow(warnings, unused)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); @@ -114,7 +116,7 @@ pub fn attach_xdp_and_tc_to_interface( interface_name: &str, direction: InterfaceDirection, heimdall_event_handler: bpf::ring_buffer_sample_fn, -) -> Result<()> { +) -> Result<*mut lqos_kern> { check_root()?; // Check the interface is valid let interface_index = interface_name_to_index(interface_name)?; @@ -229,7 +231,8 @@ pub fn attach_xdp_and_tc_to_interface( } } - Ok(()) + + Ok(skeleton) } unsafe fn attach_xdp_best_available( diff --git a/src/rust/lqos_sys/src/tcp_rtt.rs b/src/rust/lqos_sys/src/tcp_rtt.rs index 5742f17b..2740c7c6 100644 --- a/src/rust/lqos_sys/src/tcp_rtt.rs +++ b/src/rust/lqos_sys/src/tcp_rtt.rs @@ -1,8 +1,10 @@ -use crate::bpf_map::BpfMap; +use lqos_utils::XdpIpAddress; +use zerocopy::FromBytes; +use crate::bpf_iterator::iterate_rtt; /// Entry from the XDP rtt_tracker map. #[repr(C)] -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, FromBytes)] pub struct RttTrackingEntry { /// Array containing TCP round-trip times. Convert to an `f32` and divide by `100.0` for actual numbers. pub rtt: [u32; 60], @@ -29,10 +31,6 @@ impl Default for RttTrackingEntry { /// Only IP addresses facing the ISP Network side are tracked. /// /// Executes `callback` for each entry. -pub fn rtt_for_each(callback: &mut dyn FnMut(&[u8; 16], &RttTrackingEntry)) { - if let Ok(rtt_tracker) = - BpfMap::<[u8; 16], RttTrackingEntry>::from_path("/sys/fs/bpf/rtt_tracker") - { - rtt_tracker.for_each(callback); - } +pub fn rtt_for_each(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) { + iterate_rtt(callback); } diff --git a/src/rust/lqos_sys/src/throughput.rs b/src/rust/lqos_sys/src/throughput.rs index 75c458b7..c7749457 100644 --- a/src/rust/lqos_sys/src/throughput.rs +++ b/src/rust/lqos_sys/src/throughput.rs @@ -1,10 +1,9 @@ use lqos_utils::XdpIpAddress; - -use crate::{bpf_per_cpu_map::BpfPerCpuMap}; +use zerocopy::FromBytes; /// Representation of the XDP map from map_traffic #[repr(C)] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, FromBytes)] pub struct HostCounter { /// Download bytes counter (keeps incrementing) pub download_bytes: u64, @@ -28,11 +27,7 @@ pub struct HostCounter { /// Iterates through all throughput entries, and sends them in turn to `callback`. /// This elides the need to clone or copy data. pub fn throughput_for_each( - callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter]), + callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter), ) { - if let Ok(throughput) = BpfPerCpuMap::::from_path( - "/sys/fs/bpf/map_traffic", - ) { - throughput.for_each(callback); - } + crate::bpf_iterator::iterate_throughput(callback); } diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 73af07fa..c54e2336 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -111,18 +111,18 @@ impl ThroughputTracker { if let Some(mut entry) = raw_data.get_mut(xdp_ip) { entry.bytes = (0, 0); entry.packets = (0, 0); - for c in counts { - entry.bytes.0 += c.download_bytes; - entry.bytes.1 += c.upload_bytes; - entry.packets.0 += c.download_packets; - entry.packets.1 += c.upload_packets; - if c.tc_handle != 0 { - entry.tc_handle = TcHandle::from_u32(c.tc_handle); + //for c in counts { + entry.bytes.0 += counts.download_bytes; + entry.bytes.1 += counts.upload_bytes; + entry.packets.0 += counts.download_packets; + entry.packets.1 += counts.upload_packets; + if counts.tc_handle != 0 { + entry.tc_handle = TcHandle::from_u32(counts.tc_handle); } - if c.last_seen != 0 { - entry.last_seen = c.last_seen; + if counts.last_seen != 0 { + entry.last_seen = counts.last_seen; } - } + //} if entry.packets != entry.prev_packets { entry.most_recent_cycle = self_cycle; @@ -155,15 +155,15 @@ impl ThroughputTracker { last_fresh_rtt_data_cycle: 0, last_seen: 0, }; - for c in counts { - entry.bytes.0 += c.download_bytes; - entry.bytes.1 += c.upload_bytes; - entry.packets.0 += c.download_packets; - entry.packets.1 += c.upload_packets; - if c.tc_handle != 0 { - entry.tc_handle = TcHandle::from_u32(c.tc_handle); + //for c in counts { + entry.bytes.0 += counts.download_bytes; + entry.bytes.1 += counts.upload_bytes; + entry.packets.0 += counts.download_packets; + entry.packets.1 += counts.upload_packets; + if counts.tc_handle != 0 { + entry.tc_handle = TcHandle::from_u32(counts.tc_handle); } - } + //} raw_data.insert(*xdp_ip, entry); } }); @@ -171,10 +171,9 @@ impl ThroughputTracker { pub(crate) fn apply_rtt_data(&self) { let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed); - rtt_for_each(&mut |raw_ip, rtt| { + rtt_for_each(&mut |ip, rtt| { if rtt.has_fresh_data != 0 { - let ip = XdpIpAddress(*raw_ip); - if let Some(mut tracker) = self.raw_data.get_mut(&ip) { + if let Some(mut tracker) = self.raw_data.get_mut(ip) { tracker.recent_rtt_data = rtt.rtt; tracker.last_fresh_rtt_data_cycle = self_cycle; if let Some(parents) = &tracker.network_json_parents {