diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 160a8b5c..189785a0 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1532,6 +1532,7 @@ dependencies = [ "nix", "once_cell", "thiserror", + "zerocopy", ] [[package]] diff --git a/src/rust/lqos_sys/Cargo.toml b/src/rust/lqos_sys/Cargo.toml index a743824f..dcffa3a9 100644 --- a/src/rust/lqos_sys/Cargo.toml +++ b/src/rust/lqos_sys/Cargo.toml @@ -16,6 +16,7 @@ lqos_utils = { path = "../lqos_utils" } once_cell = "1" dashmap = "5" thiserror = "1" +zerocopy = { version = "0.6.1", features = ["simd"] } [build-dependencies] bindgen = "0" diff --git a/src/rust/lqos_sys/src/bpf/lqos_kern.c b/src/rust/lqos_sys/src/bpf/lqos_kern.c index 9e4e5295..20bfc1a3 100644 --- a/src/rust/lqos_sys/src/bpf/lqos_kern.c +++ b/src/rust/lqos_sys/src/bpf/lqos_kern.c @@ -319,4 +319,59 @@ int bifrost(struct __sk_buff *skb) return TC_ACT_UNSPEC; } +/* + * Structs for map iteration programs + * See https://github.com/xdp-project/bpf-examples + */ +struct bpf_iter_meta { + struct seq_file *seq; + __u64 session_id; + __u64 seq_num; +} __attribute__((preserve_access_index)); + +struct bpf_iter__bpf_map_elem { + struct bpf_iter_meta *meta; + struct bpf_map *map; + void *key; + void *value; +}; + +SEC("iter/bpf_map_elem") +int throughput_reader(struct bpf_iter__bpf_map_elem *ctx) +{ + // The sequence file + struct seq_file *seq = ctx->meta->seq; + struct host_counter *counter = ctx->value; + struct in6_addr *ip = ctx->key; + + // Bail on end + if (counter == NULL || ip == NULL) { + return 0; + } + + bpf_seq_write(seq, ip, sizeof(struct in6_addr)); + bpf_seq_write(seq, counter, sizeof(struct host_counter)); + //BPF_SEQ_PRINTF(seq, "%d %d\n", counter->download_bytes, counter->upload_bytes); + return 0; +} + +SEC("iter/bpf_map_elem") +int rtt_reader(struct bpf_iter__bpf_map_elem *ctx) +{ + // The sequence file + struct seq_file *seq = ctx->meta->seq; + struct rotating_performance *counter = ctx->value; + struct in6_addr *ip = ctx->key; + + // Bail on end + if (counter == NULL || ip == NULL) { + return 0; + } + + //BPF_SEQ_PRINTF(seq, "%d %d\n", counter->next_entry, counter->rtt[0]); + bpf_seq_write(seq, ip, sizeof(struct in6_addr)); + bpf_seq_write(seq, counter, sizeof(struct rotating_performance)); + return 0; +} + char _license[] SEC("license") = "GPL"; diff --git a/src/rust/lqos_sys/src/bpf/wrapper.c b/src/rust/lqos_sys/src/bpf/wrapper.c index 8ca2f74d..94ae7914 100644 --- a/src/rust/lqos_sys/src/bpf/wrapper.c +++ b/src/rust/lqos_sys/src/bpf/wrapper.c @@ -246,3 +246,77 @@ int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj) out: return err; } + +// Iterator code +#include +#include + +struct bpf_link *setup_iterator_link( + struct bpf_program *prog, + struct bpf_map *map +) { + int map_fd; // File descriptor for the map itself + struct bpf_link *link; // Value to return with the link + union bpf_iter_link_info linfo = { 0 }; + DECLARE_LIBBPF_OPTS(bpf_iter_attach_opts, iter_opts, + .link_info = &linfo, + .link_info_len = sizeof(linfo)); + + map_fd = bpf_map__fd(map); + if (map_fd < 0) { + fprintf(stderr, "bpf_map__fd() fails\n"); + return NULL; + } + linfo.map.map_fd = map_fd; + + link = bpf_program__attach_iter(prog, &iter_opts); + if (!link) { + fprintf(stderr, "bpf_program__attach_iter() fails\n"); + return NULL; + } + return link; +} + +int read_tp_buffer(struct bpf_program *prog, struct bpf_map *map) +{ + struct bpf_link *link; + char buf[16] = {}; + int iter_fd = -1, len; + int ret = 0; + int map_fd; + + union bpf_iter_link_info linfo = { 0 }; + DECLARE_LIBBPF_OPTS(bpf_iter_attach_opts, iter_opts, + .link_info = &linfo, + .link_info_len = sizeof(linfo)); + + map_fd = bpf_map__fd(map); + if (map_fd < 0) { + fprintf(stderr, "bpf_map__fd() fails\n"); + return map_fd; + } + linfo.map.map_fd = map_fd; + + link = bpf_program__attach_iter(prog, &iter_opts); + if (!link) { + fprintf(stderr, "bpf_program__attach_iter() fails\n"); + return -1; + } + iter_fd = bpf_iter_create(bpf_link__fd(link)); + if (iter_fd < 0) { + fprintf(stderr, "bpf_iter_create() fails\n"); + ret = -1; + goto free_link; + } + /* not check contents, but ensure read() ends without error */ + while ((len = read(iter_fd, buf, sizeof(buf) - 1)) > 0) { + buf[len] = 0; + printf("%s", buf); + } + printf("\n"); +free_link: + if (iter_fd >= 0) + close(iter_fd); + bpf_link__destroy(link); + return 0; +} \ No newline at end of file diff --git a/src/rust/lqos_sys/src/bpf/wrapper.h b/src/rust/lqos_sys/src/bpf/wrapper.h index a6c8e074..e24a3dc0 100644 --- a/src/rust/lqos_sys/src/bpf/wrapper.h +++ b/src/rust/lqos_sys/src/bpf/wrapper.h @@ -1,5 +1,8 @@ #include "lqos_kern_skel.h" #include +#include +#include +#include extern struct lqos_kern * lqos_kern_open(); extern int lqos_kern_load(struct lqos_kern * skel); @@ -8,4 +11,6 @@ extern int tc_detach_egress(int ifindex, bool verbose, bool flush_hook, const ch extern int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj); extern int tc_detach_ingress(int ifindex, bool verbose, bool flush_hook, const char * ifname); extern __u64 max_tracker_ips(); -extern void do_not_print(); \ No newline at end of file +extern void do_not_print(); +int read_tp_buffer(struct bpf_program *prog, struct bpf_map *map); +struct bpf_link * setup_iterator_link(struct bpf_program *prog, struct bpf_map *map); diff --git a/src/rust/lqos_sys/src/bpf_iterator.rs b/src/rust/lqos_sys/src/bpf_iterator.rs new file mode 100644 index 00000000..90b90a6a --- /dev/null +++ b/src/rust/lqos_sys/src/bpf_iterator.rs @@ -0,0 +1,168 @@ +use crate::{ + kernel_wrapper::BPF_SKELETON, lqos_kernel::bpf, HostCounter, + RttTrackingEntry, +}; +use lqos_utils::XdpIpAddress; +use once_cell::sync::Lazy; +use std::{ + fs::File, io::Read, marker::PhantomData, os::fd::FromRawFd, sync::Mutex, fmt::Debug, +}; +use thiserror::Error; +use zerocopy::FromBytes; + +struct BpfMapIterator { + link: *mut bpf::bpf_link, + _phantom: PhantomData<(KEY, VALUE)>, +} + +unsafe impl Sync for BpfMapIterator {} +unsafe impl Send for BpfMapIterator {} + +impl BpfMapIterator { + fn new( + program: *mut bpf::bpf_program, + map: *mut bpf::bpf_map, + ) -> Result { + let link = unsafe { bpf::setup_iterator_link(program, map) }; + if !link.is_null() { + Ok(Self { link, _phantom: PhantomData }) + } else { + Err(BpfIteratorError::FailedToLink) + } + } + + fn as_file(&self) -> Result { + let link_fd = unsafe { bpf::bpf_link__fd(self.link) }; + let iter_fd = unsafe { bpf::bpf_iter_create(link_fd) }; + if iter_fd < 0 { + log::error!("Unable to create map file descriptor"); + Err(BpfIteratorError::FailedToCreateFd) + } else { + unsafe { Ok(File::from_raw_fd(iter_fd)) } + } + } + + fn iter(&self) -> Result, BpfIteratorError> { + let mut file = self.as_file()?; + let mut buf = Vec::new(); + let bytes_read = file.read_to_end(&mut buf); + match bytes_read { + Ok(_) => Ok(BpfMapIter { buffer: buf, index: 0, _phantom: PhantomData }), + Err(e) => { + log::error!("Unable to read from kernel map iterator file"); + log::error!("{e:?}"); + Err(BpfIteratorError::UnableToCreateIterator) + } + } + } +} + +impl Drop for BpfMapIterator { + fn drop(&mut self) { + unsafe { + bpf::bpf_link__destroy(self.link); + } + } +} + +pub(crate) struct BpfMapIter { + buffer: Vec, + index: usize, + _phantom: PhantomData<(K, V)>, +} + +impl BpfMapIter { + const KEY_SIZE: usize = std::mem::size_of::(); + const VALUE_SIZE: usize = std::mem::size_of::(); + const TOTAL_SIZE: usize = Self::KEY_SIZE + Self::VALUE_SIZE; +} + +impl Iterator for BpfMapIter +where + K: FromBytes + Debug, + V: FromBytes + Debug, +{ + type Item = (K, V); + + fn next(&mut self) -> Option { + if self.index + Self::TOTAL_SIZE <= self.buffer.len() { + let key = K::read_from(&self.buffer[self.index..self.index + Self::KEY_SIZE]); + self.index += Self::KEY_SIZE; + let value = V::read_from( + &self.buffer + [self.index ..self.index + Self::VALUE_SIZE], + ); + self.index += Self::VALUE_SIZE; + Some((key.unwrap(), value.unwrap())) + } else { + None + } + } +} + +#[derive(Debug, Error)] +enum BpfIteratorError { + #[error("Failed to create iterator link")] + FailedToLink, + #[error("Failed to create file descriptor")] + FailedToCreateFd, + #[error("Iterator error")] + UnableToCreateIterator, +} + +static MAP_TRAFFIC: Lazy< + Mutex>>, +> = Lazy::new(|| Mutex::new(None)); + +static RTT_TRACKER: Lazy< + Mutex>>, +> = Lazy::new(|| Mutex::new(None)); + +pub fn iterate_throughput(callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter)) { + let mut traffic = MAP_TRAFFIC.lock().unwrap(); + if traffic.is_none() { + let lock = BPF_SKELETON.lock().unwrap(); + if let Some(skeleton) = lock.as_ref() { + let skeleton = skeleton.get_ptr(); + if let Ok(iter) = unsafe { + BpfMapIterator::new( + (*skeleton).progs.throughput_reader, + (*skeleton).maps.map_traffic, + ) + } { + *traffic = Some(iter); + } + } + } + + if let Some(iter) = traffic.as_mut() { + iter.iter().unwrap().for_each(|(k, v)| { + //println!("{:?} {:?}", k, v); + callback(&k, &v); + }); + } +} + +pub fn iterate_rtt(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) { + let mut traffic = RTT_TRACKER.lock().unwrap(); + if traffic.is_none() { + let lock = BPF_SKELETON.lock().unwrap(); + if let Some(skeleton) = lock.as_ref() { + let skeleton = skeleton.get_ptr(); + if let Ok(iter) = unsafe { + BpfMapIterator::new( + (*skeleton).progs.rtt_reader, + (*skeleton).maps.rtt_tracker, + ) + } { + *traffic = Some(iter); + } + } + } + + if let Some(iter) = traffic.as_mut() { + iter.iter().unwrap().for_each(|(k, v)| { + callback(&k, &v); + }); + } +} diff --git a/src/rust/lqos_sys/src/bpf_map.rs b/src/rust/lqos_sys/src/bpf_map.rs index 6f6f3043..f52aa7e1 100644 --- a/src/rust/lqos_sys/src/bpf_map.rs +++ b/src/rust/lqos_sys/src/bpf_map.rs @@ -94,6 +94,7 @@ where value_ptr as *mut c_void, ); //result.push((key.clone(), value.clone())); + println!("old callback"); callback(&key, &value); prev_key = key_ptr; } diff --git a/src/rust/lqos_sys/src/kernel_wrapper.rs b/src/rust/lqos_sys/src/kernel_wrapper.rs index 6375a83f..1c6cb133 100644 --- a/src/rust/lqos_sys/src/kernel_wrapper.rs +++ b/src/rust/lqos_sys/src/kernel_wrapper.rs @@ -1,8 +1,28 @@ +use std::sync::Mutex; +use once_cell::sync::Lazy; use crate::lqos_kernel::{ attach_xdp_and_tc_to_interface, unload_xdp_from_interface, - InterfaceDirection, bpf::ring_buffer_sample_fn, + InterfaceDirection, bpf::{ring_buffer_sample_fn, self}, }; +/// Safer wrapper around pointers to `bpf::lqos_kern`. It really isn't +/// a great idea to be passing mutable pointers around like this, but the C +/// world insists on it. +pub(crate) struct LqosKernBpfWrapper { + ptr: *mut bpf::lqos_kern, +} + +impl LqosKernBpfWrapper { + pub(crate) fn get_ptr(&self) -> *mut bpf::lqos_kern { + self.ptr + } +} + +unsafe impl Sync for LqosKernBpfWrapper {} +unsafe impl Send for LqosKernBpfWrapper {} + +pub(crate) static BPF_SKELETON: Lazy>> = Lazy::new(|| Mutex::new(None)); + /// A wrapper-type that stores the interfaces to which the XDP and TC programs should /// be attached. Performs the attachment process, and hooks "drop" to unattach the /// programs when the structure falls out of scope. @@ -31,7 +51,7 @@ impl LibreQoSKernels { to_isp: to_isp.to_string(), on_a_stick: false, }; - attach_xdp_and_tc_to_interface( + let skeleton = attach_xdp_and_tc_to_interface( &kernel.to_internet, InterfaceDirection::Internet, heimdall_event_handler, @@ -41,6 +61,7 @@ impl LibreQoSKernels { InterfaceDirection::IspNetwork, heimdall_event_handler, )?; + BPF_SKELETON.lock().unwrap().replace(LqosKernBpfWrapper { ptr: skeleton }); Ok(kernel) } @@ -66,12 +87,12 @@ impl LibreQoSKernels { to_isp: String::new(), on_a_stick: true, }; - attach_xdp_and_tc_to_interface( + let skeleton = attach_xdp_and_tc_to_interface( &kernel.to_internet, InterfaceDirection::OnAStick(internet_vlan, isp_vlan), heimdall_event_handler, )?; - + BPF_SKELETON.lock().unwrap().replace(LqosKernBpfWrapper { ptr: skeleton }); Ok(kernel) } } diff --git a/src/rust/lqos_sys/src/lib.rs b/src/rust/lqos_sys/src/lib.rs index 40b56dae..c3c64aba 100644 --- a/src/rust/lqos_sys/src/lib.rs +++ b/src/rust/lqos_sys/src/lib.rs @@ -22,6 +22,7 @@ mod lqos_kernel; mod tcp_rtt; mod throughput; mod linux; +mod bpf_iterator; pub use ip_mapping::{ add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips, diff --git a/src/rust/lqos_sys/src/lqos_kernel.rs b/src/rust/lqos_sys/src/lqos_kernel.rs index 89a132d3..ff67d196 100644 --- a/src/rust/lqos_sys/src/lqos_kernel.rs +++ b/src/rust/lqos_sys/src/lqos_kernel.rs @@ -14,6 +14,8 @@ use log::{info, warn}; use nix::libc::{geteuid, if_nametoindex}; use std::{ffi::{CString, c_void}, process::Command}; +use self::bpf::lqos_kern; + pub(crate) mod bpf { #![allow(warnings, unused)] include!(concat!(env!("OUT_DIR"), "/bindings.rs")); @@ -114,7 +116,7 @@ pub fn attach_xdp_and_tc_to_interface( interface_name: &str, direction: InterfaceDirection, heimdall_event_handler: bpf::ring_buffer_sample_fn, -) -> Result<()> { +) -> Result<*mut lqos_kern> { check_root()?; // Check the interface is valid let interface_index = interface_name_to_index(interface_name)?; @@ -229,7 +231,8 @@ pub fn attach_xdp_and_tc_to_interface( } } - Ok(()) + + Ok(skeleton) } unsafe fn attach_xdp_best_available( diff --git a/src/rust/lqos_sys/src/tcp_rtt.rs b/src/rust/lqos_sys/src/tcp_rtt.rs index 5742f17b..2740c7c6 100644 --- a/src/rust/lqos_sys/src/tcp_rtt.rs +++ b/src/rust/lqos_sys/src/tcp_rtt.rs @@ -1,8 +1,10 @@ -use crate::bpf_map::BpfMap; +use lqos_utils::XdpIpAddress; +use zerocopy::FromBytes; +use crate::bpf_iterator::iterate_rtt; /// Entry from the XDP rtt_tracker map. #[repr(C)] -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, FromBytes)] pub struct RttTrackingEntry { /// Array containing TCP round-trip times. Convert to an `f32` and divide by `100.0` for actual numbers. pub rtt: [u32; 60], @@ -29,10 +31,6 @@ impl Default for RttTrackingEntry { /// Only IP addresses facing the ISP Network side are tracked. /// /// Executes `callback` for each entry. -pub fn rtt_for_each(callback: &mut dyn FnMut(&[u8; 16], &RttTrackingEntry)) { - if let Ok(rtt_tracker) = - BpfMap::<[u8; 16], RttTrackingEntry>::from_path("/sys/fs/bpf/rtt_tracker") - { - rtt_tracker.for_each(callback); - } +pub fn rtt_for_each(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) { + iterate_rtt(callback); } diff --git a/src/rust/lqos_sys/src/throughput.rs b/src/rust/lqos_sys/src/throughput.rs index 75c458b7..c7749457 100644 --- a/src/rust/lqos_sys/src/throughput.rs +++ b/src/rust/lqos_sys/src/throughput.rs @@ -1,10 +1,9 @@ use lqos_utils::XdpIpAddress; - -use crate::{bpf_per_cpu_map::BpfPerCpuMap}; +use zerocopy::FromBytes; /// Representation of the XDP map from map_traffic #[repr(C)] -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, FromBytes)] pub struct HostCounter { /// Download bytes counter (keeps incrementing) pub download_bytes: u64, @@ -28,11 +27,7 @@ pub struct HostCounter { /// Iterates through all throughput entries, and sends them in turn to `callback`. /// This elides the need to clone or copy data. pub fn throughput_for_each( - callback: &mut dyn FnMut(&XdpIpAddress, &[HostCounter]), + callback: &mut dyn FnMut(&XdpIpAddress, &HostCounter), ) { - if let Ok(throughput) = BpfPerCpuMap::::from_path( - "/sys/fs/bpf/map_traffic", - ) { - throughput.for_each(callback); - } + crate::bpf_iterator::iterate_throughput(callback); } diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 73af07fa..c54e2336 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -111,18 +111,18 @@ impl ThroughputTracker { if let Some(mut entry) = raw_data.get_mut(xdp_ip) { entry.bytes = (0, 0); entry.packets = (0, 0); - for c in counts { - entry.bytes.0 += c.download_bytes; - entry.bytes.1 += c.upload_bytes; - entry.packets.0 += c.download_packets; - entry.packets.1 += c.upload_packets; - if c.tc_handle != 0 { - entry.tc_handle = TcHandle::from_u32(c.tc_handle); + //for c in counts { + entry.bytes.0 += counts.download_bytes; + entry.bytes.1 += counts.upload_bytes; + entry.packets.0 += counts.download_packets; + entry.packets.1 += counts.upload_packets; + if counts.tc_handle != 0 { + entry.tc_handle = TcHandle::from_u32(counts.tc_handle); } - if c.last_seen != 0 { - entry.last_seen = c.last_seen; + if counts.last_seen != 0 { + entry.last_seen = counts.last_seen; } - } + //} if entry.packets != entry.prev_packets { entry.most_recent_cycle = self_cycle; @@ -155,15 +155,15 @@ impl ThroughputTracker { last_fresh_rtt_data_cycle: 0, last_seen: 0, }; - for c in counts { - entry.bytes.0 += c.download_bytes; - entry.bytes.1 += c.upload_bytes; - entry.packets.0 += c.download_packets; - entry.packets.1 += c.upload_packets; - if c.tc_handle != 0 { - entry.tc_handle = TcHandle::from_u32(c.tc_handle); + //for c in counts { + entry.bytes.0 += counts.download_bytes; + entry.bytes.1 += counts.upload_bytes; + entry.packets.0 += counts.download_packets; + entry.packets.1 += counts.upload_packets; + if counts.tc_handle != 0 { + entry.tc_handle = TcHandle::from_u32(counts.tc_handle); } - } + //} raw_data.insert(*xdp_ip, entry); } }); @@ -171,10 +171,9 @@ impl ThroughputTracker { pub(crate) fn apply_rtt_data(&self) { let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed); - rtt_for_each(&mut |raw_ip, rtt| { + rtt_for_each(&mut |ip, rtt| { if rtt.has_fresh_data != 0 { - let ip = XdpIpAddress(*raw_ip); - if let Some(mut tracker) = self.raw_data.get_mut(&ip) { + if let Some(mut tracker) = self.raw_data.get_mut(ip) { tracker.recent_rtt_data = rtt.rtt; tracker.last_fresh_rtt_data_cycle = self_cycle; if let Some(parents) = &tracker.network_json_parents {