From c70d01b3f6df079d1ad417aa9c8577fb3bac1b45 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Tue, 14 Mar 2023 17:13:02 +0000 Subject: [PATCH] VERY work in progress - do not try to use this. The good: creates a ringbuffer and successfully sends events to userspace. The bad: it's currently watching all packets, eats a lot of CPU and has a horrible lack of abstractions. --- src/rust/Cargo.lock | 34 +++++ src/rust/Cargo.toml | 1 + src/rust/lqos_heimdall/Cargo.toml | 9 ++ src/rust/lqos_heimdall/src/config.rs | 19 +++ src/rust/lqos_heimdall/src/lib.rs | 8 + src/rust/lqos_heimdall/src/perf_interface.rs | 48 ++++++ src/rust/lqos_heimdall/src/stats.rs | 6 + src/rust/lqos_sys/Cargo.toml | 1 + src/rust/lqos_sys/src/bpf/common/heimdall.h | 93 ++++-------- src/rust/lqos_sys/src/bpf/lqos_kern.c | 6 +- src/rust/lqos_sys/src/heimdall_map.rs | 80 +--------- src/rust/lqos_sys/src/ip_mapping/mod.rs | 3 +- src/rust/lqos_sys/src/lib.rs | 5 +- src/rust/lqos_sys/src/lqos_kernel.rs | 18 +-- src/rust/lqos_sys/src/throughput.rs | 4 +- src/rust/lqos_utils/Cargo.toml | 2 + src/rust/lqos_utils/src/lib.rs | 3 + .../src/xdp_ip_address.rs | 3 +- src/rust/lqosd/Cargo.toml | 1 + src/rust/lqosd/src/ip_mapping.rs | 2 +- src/rust/lqosd/src/main.rs | 3 +- .../src/throughput_tracker/heimdall_data.rs | 140 +----------------- src/rust/lqosd/src/throughput_tracker/mod.rs | 5 +- .../src/throughput_tracker/tracking_data.rs | 11 +- 24 files changed, 191 insertions(+), 314 deletions(-) create mode 100644 src/rust/lqos_heimdall/Cargo.toml create mode 100644 src/rust/lqos_heimdall/src/config.rs create mode 100644 src/rust/lqos_heimdall/src/lib.rs create mode 100644 src/rust/lqos_heimdall/src/perf_interface.rs create mode 100644 src/rust/lqos_heimdall/src/stats.rs rename src/rust/{lqos_sys => lqos_utils}/src/xdp_ip_address.rs (97%) diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index fc49f606..80678313 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1428,6 +1428,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "lqos_heimdall" +version = "0.1.0" +dependencies = [ + "log", + "lqos_utils", + "zerocopy", +] + [[package]] name = "lqos_node_manager" version = "0.1.0" @@ -1502,6 +1511,7 @@ dependencies = [ "log", "lqos_bus", "lqos_config", + "lqos_heimdall", "lqos_utils", "nix", "once_cell", @@ -1512,11 +1522,13 @@ dependencies = [ name = "lqos_utils" version = "0.1.0" dependencies = [ + "byteorder", "log", "nix", "notify", "serde", "thiserror", + "zerocopy", ] [[package]] @@ -1530,6 +1542,7 @@ dependencies = [ "log", "lqos_bus", "lqos_config", + "lqos_heimdall", "lqos_queue_tracker", "lqos_sys", "lqos_utils", @@ -3287,3 +3300,24 @@ name = "yansi" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" + +[[package]] +name = "zerocopy" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 186982e9..6bd36707 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -24,4 +24,5 @@ members = [ "lqos_utils", # A collection of macros and helpers we find useful "lqos_setup", # A quick CLI setup for first-time users "lqos_anonymous_stats_server", # The server for gathering anonymous usage data. + "lqos_heimdall", # Library for managing Heimdall flow watching ] diff --git a/src/rust/lqos_heimdall/Cargo.toml b/src/rust/lqos_heimdall/Cargo.toml new file mode 100644 index 00000000..953c325b --- /dev/null +++ b/src/rust/lqos_heimdall/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "lqos_heimdall" +version = "0.1.0" +edition = "2021" + +[dependencies] +lqos_utils = { path = "../lqos_utils" } +log = "0" +zerocopy = {version = "0.6.1", features = [ "simd" ] } diff --git a/src/rust/lqos_heimdall/src/config.rs b/src/rust/lqos_heimdall/src/config.rs new file mode 100644 index 00000000..e33e479f --- /dev/null +++ b/src/rust/lqos_heimdall/src/config.rs @@ -0,0 +1,19 @@ +/// Currently unused, represents the current operation mode of the Heimdall +/// sub-system. Defaults to 1. +#[repr(u8)] +pub enum HeimdallMode { + /// Do not monitor + Off = 0, + /// Only look at flows on hosts we are watching via the circuit monitor + WatchOnly = 1, + /// Capture everything (this may set your CPU on fire) + Analysis = 2, +} + +/// Configuration options passed to Heimdall +#[derive(Default, Clone)] +#[repr(C)] +pub struct HeimdalConfig { + /// Current operation mode + pub mode: u32, +} \ No newline at end of file diff --git a/src/rust/lqos_heimdall/src/lib.rs b/src/rust/lqos_heimdall/src/lib.rs new file mode 100644 index 00000000..6ea1436a --- /dev/null +++ b/src/rust/lqos_heimdall/src/lib.rs @@ -0,0 +1,8 @@ +//! Provides an interface to the Heimdall packet watching +//! system. Heimdall watches traffic flows, and is notified +//! about their contents via the eBPF Perf system. + +mod config; +pub mod perf_interface; +pub mod stats; +pub use config::{HeimdallMode, HeimdalConfig}; diff --git a/src/rust/lqos_heimdall/src/perf_interface.rs b/src/rust/lqos_heimdall/src/perf_interface.rs new file mode 100644 index 00000000..6588d974 --- /dev/null +++ b/src/rust/lqos_heimdall/src/perf_interface.rs @@ -0,0 +1,48 @@ +use std::{ffi::c_void, slice}; +use lqos_utils::XdpIpAddress; +use zerocopy::FromBytes; + +#[derive(FromBytes, Debug)] +#[repr(C)] +struct HeimdallEvent { + timestamp: u64, + src: XdpIpAddress, + dst: XdpIpAddress, + src_port : u16, + dst_port: u16, + ip_protocol: u8, + tos: u8, + size: u32, +} + +/// Callback for the Heimdall Perf map system. Called whenever Heimdall has +/// events for the system to read. +/// +/// # Safety +/// +/// This function is inherently unsafe, because it interfaces directly with +/// C and the Linux-kernel eBPF system. +#[no_mangle] +pub unsafe extern "C" fn heimdall_handle_events( + _ctx: *mut c_void, + data: *mut c_void, + data_size: usize, +) -> i32 { + const EVENT_SIZE: usize = std::mem::size_of::(); + if data_size < EVENT_SIZE { + log::warn!("Warning: incoming data too small in Heimdall buffer"); + return 0; + } + + //COLLECTED_EVENTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let data_u8 = data as *const u8; + let data_slice : &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); + + if let Some(incoming) = HeimdallEvent::read_from(data_slice) { + //println!("{incoming:?}"); + } else { + println!("Failed to decode"); + } + + 0 +} diff --git a/src/rust/lqos_heimdall/src/stats.rs b/src/rust/lqos_heimdall/src/stats.rs new file mode 100644 index 00000000..8c9b6c85 --- /dev/null +++ b/src/rust/lqos_heimdall/src/stats.rs @@ -0,0 +1,6 @@ +//! Count statistics + +use std::sync::atomic::AtomicU64; + +/// Perf event counter +pub static COLLECTED_EVENTS: AtomicU64 = AtomicU64::new(0); diff --git a/src/rust/lqos_sys/Cargo.toml b/src/rust/lqos_sys/Cargo.toml index 86adf996..1cbf270d 100644 --- a/src/rust/lqos_sys/Cargo.toml +++ b/src/rust/lqos_sys/Cargo.toml @@ -15,6 +15,7 @@ lqos_utils = { path = "../lqos_utils" } once_cell = "1" dashmap = "5" thiserror = "1" +lqos_heimdall = { path = "../lqos_heimdall" } [build-dependencies] bindgen = "0" diff --git a/src/rust/lqos_sys/src/bpf/common/heimdall.h b/src/rust/lqos_sys/src/bpf/common/heimdall.h index 32443ea4..f6c99d52 100644 --- a/src/rust/lqos_sys/src/bpf/common/heimdall.h +++ b/src/rust/lqos_sys/src/bpf/common/heimdall.h @@ -13,6 +13,7 @@ struct heimdall_config_t __u32 monitor_mode; // 0 = Off, 1 = Targets only, 2 = Analysis Mode }; +// Pinned map containing the Heimdall config struct { __uint(type, BPF_MAP_TYPE_ARRAY); @@ -22,6 +23,8 @@ struct __uint(pinning, LIBBPF_PIN_BY_NAME); } heimdall_config SEC(".maps"); +// Pinned map containing the IP addresses (in packed IPv6 format) +// currently being watched by the Heimdall system. struct { __uint(type, BPF_MAP_TYPE_HASH); @@ -31,39 +34,24 @@ struct __uint(pinning, LIBBPF_PIN_BY_NAME); } heimdall_watching SEC(".maps"); -struct heimdall_key -{ +// Perf map for communicating with userspace +struct { + __uint(type, BPF_MAP_TYPE_RINGBUF); + __uint(max_entries, 256 * 1024 /* 256 KB */); +} heimdall_events SEC(".maps"); + +// Basic event type to send to userspace +struct heimdall_event { + __u64 timetamp; struct in6_addr src; struct in6_addr dst; - __u8 ip_protocol; __u16 src_port; __u16 dst_port; -}; - -struct heimdall_data -{ - __u64 last_seen; - __u64 bytes; - __u64 packets; + __u8 ip_protocol; __u8 tos; - __u8 reserved[3]; + __u32 size; }; -struct -{ - __uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH); - __type(key, struct heimdall_key); - __type(value, struct heimdall_data); - __uint(max_entries, MAX_FLOWS); - __uint(pinning, LIBBPF_PIN_BY_NAME); -} heimdall SEC(".maps"); - -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); - __uint(key_size, sizeof(__u32)); - __uint(value_size, sizeof(__u32)); -} heimdall_events SEC(".maps"); - static __always_inline __u8 get_heimdall_mode() { __u32 index = 0; @@ -91,46 +79,17 @@ static __always_inline bool is_heimdall_watching(struct dissector_t *dissector) static __always_inline void update_heimdall(struct dissector_t *dissector, __u32 size, int dir) { - __u32 e = 1; - if (bpf_perf_event_output(dissector->ctx, &heimdall_events, BPF_F_CURRENT_CPU, &e, sizeof(e)) != 0) { - bpf_debug("Failed to send perf event"); - } - - // Don't report any non-ICMP without ports - if (dissector->ip_protocol != 1 && (dissector->src_port == 0 || dissector->dst_port == 0)) - return; - // Don't report ICMP with invalid numbers - if (dissector->ip_protocol == 1 && dissector->src_port > 18) return; - struct heimdall_key key = {0}; - key.src = dissector->src_ip; - key.dst = dissector->dst_ip; - key.ip_protocol = dissector->ip_protocol; - key.src_port = bpf_ntohs(dissector->src_port); - key.dst_port = bpf_ntohs(dissector->dst_port); - struct heimdall_data *counter = (struct heimdall_data *)bpf_map_lookup_elem(&heimdall, &key); - if (counter) - { - counter->last_seen = bpf_ktime_get_boot_ns(); - counter->packets += 1; - counter->bytes += size; - if (dissector->tos != 0) - { - counter->tos = dissector->tos; - } - } - else - { - struct heimdall_data counter = {0}; - counter.last_seen = bpf_ktime_get_boot_ns(); - counter.bytes = size; - counter.packets = 1; - counter.tos = dissector->tos; - counter.reserved[0] = 0; - counter.reserved[1] = 0; - counter.reserved[2] = 0; - if (bpf_map_update_elem(&heimdall, &key, &counter, BPF_NOEXIST) != 0) - { - bpf_debug("Failed to insert tracking"); - } + struct heimdall_event event = {0}; + event.timetamp = bpf_ktime_get_boot_ns(); + event.src = dissector->src_ip; + event.dst = dissector->dst_ip; + event.src_port = dissector->src_port; + event.dst_port = dissector->dst_port; + event.ip_protocol = dissector->ip_protocol; + event.tos = dissector->tos; + event.size = size; + long err = bpf_ringbuf_output(&heimdall_events, &event, sizeof(event), 0); + if (err != 0) { + bpf_debug("Failed to send perf event %d", err); } } \ No newline at end of file diff --git a/src/rust/lqos_sys/src/bpf/lqos_kern.c b/src/rust/lqos_sys/src/bpf/lqos_kern.c index 5cd41bef..084c06f7 100644 --- a/src/rust/lqos_sys/src/bpf/lqos_kern.c +++ b/src/rust/lqos_sys/src/bpf/lqos_kern.c @@ -135,12 +135,14 @@ int xdp_prog(struct xdp_md *ctx) // Send on its way + update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction); if (tc_handle != 0) { // Send data to Heimdall if (heimdall_mode == 2 || (heimdall_mode==1 && is_heimdall_watching(&dissector))) { #ifdef VERBOSE - bpf_debug("(XDP) Storing Heimdall Data"); -#endif update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction); + bpf_debug("(XDP) Storing Heimdall Data"); +#endif + //update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction); } // Handle CPU redirection if there is one specified diff --git a/src/rust/lqos_sys/src/heimdall_map.rs b/src/rust/lqos_sys/src/heimdall_map.rs index 6ae149c4..1b196c77 100644 --- a/src/rust/lqos_sys/src/heimdall_map.rs +++ b/src/rust/lqos_sys/src/heimdall_map.rs @@ -1,71 +1,9 @@ -use std::{time::Duration, ffi::c_void}; +use std::time::Duration; use dashmap::DashMap; -use lqos_utils::unix_time::time_since_boot; +use lqos_heimdall::{HeimdallMode, HeimdalConfig}; +use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; use once_cell::sync::Lazy; - -use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress, bpf_map::BpfMap}; - -/// Representation of the eBPF `heimdall_key` type. -#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)] -#[repr(C)] -pub struct HeimdallKey { - /// Mapped `XdpIpAddress` source for the flow. - pub src_ip: XdpIpAddress, - /// Mapped `XdpIpAddress` destination for the flow - pub dst_ip: XdpIpAddress, - /// IP protocol (see the Linux kernel!) - pub ip_protocol: u8, - /// Source port number, or ICMP type. - pub src_port: u16, - /// Destination port number. - pub dst_port: u16, -} - -/// Mapped representation of the eBPF `heimdall_data` type. -#[derive(Debug, Clone, Default)] -#[repr(C)] -pub struct HeimdallData { - /// Last seen, in nanoseconds (since boot time). - pub last_seen: u64, - /// Number of bytes since the flow started being tracked - pub bytes: u64, - /// Number of packets since the flow started being tracked - pub packets: u64, - /// IP header TOS value - pub tos: u8, - /// Reserved to pad the structure - pub reserved: [u8; 3], -} - -/// Iterates through all throughput entries, and sends them in turn to `callback`. -/// This elides the need to clone or copy data. -pub fn heimdall_for_each( - callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]), -) { - if let Ok(heimdall) = BpfPerCpuMap::::from_path( - "/sys/fs/bpf/heimdall", - ) { - heimdall.for_each(callback); - } -} - -/// Currently unused, represents the current operation mode of the Heimdall -/// sub-system. Defaults to 1. -#[repr(u8)] -pub enum HeimdallMode { - /// Do not monitor - Off = 0, - /// Only look at flows on hosts we are watching via the circuit monitor - WatchOnly = 1, - /// Capture everything (this may set your CPU on fire) - Analysis = 2, -} - -#[derive(Default, Clone)] -#[repr(C)] -struct HeimdalConfig { - mode: u32, -} +use crate::{bpf_map::BpfMap}; /// Change the eBPF Heimdall System mode. pub fn set_heimdall_mode(mode: HeimdallMode) -> anyhow::Result<()> { @@ -130,13 +68,3 @@ pub fn heimdall_watch_ip(ip: XdpIpAddress) { HEIMDALL_WATCH_LIST.insert(ip, h); } } - -#[no_mangle] -pub unsafe extern "C" fn missed_events(ctx: *mut c_void, cpu: i32, lost_count: u64) { - log::warn!("Missed {lost_count} Heimdall events on {cpu}"); -} - -#[no_mangle] -pub unsafe extern "C" fn handle_events(ctx: *mut c_void, cpu: i32, data: *mut c_void, data_size: u32) { - //log::info!("Received a callback on {cpu}"); -} \ No newline at end of file diff --git a/src/rust/lqos_sys/src/ip_mapping/mod.rs b/src/rust/lqos_sys/src/ip_mapping/mod.rs index 80ecb3b9..d5f038fc 100644 --- a/src/rust/lqos_sys/src/ip_mapping/mod.rs +++ b/src/rust/lqos_sys/src/ip_mapping/mod.rs @@ -1,6 +1,7 @@ -use crate::{bpf_map::BpfMap, XdpIpAddress}; +use crate::bpf_map::BpfMap; use anyhow::Result; use lqos_bus::TcHandle; +use lqos_utils::XdpIpAddress; use std::net::IpAddr; mod ip_hash_data; mod ip_hash_key; diff --git a/src/rust/lqos_sys/src/lib.rs b/src/rust/lqos_sys/src/lib.rs index 31d01c8f..056a76e3 100644 --- a/src/rust/lqos_sys/src/lib.rs +++ b/src/rust/lqos_sys/src/lib.rs @@ -16,12 +16,10 @@ mod kernel_wrapper; mod lqos_kernel; mod tcp_rtt; mod throughput; -mod xdp_ip_address; mod linux; pub use heimdall_map::{ - heimdall_expire, heimdall_for_each, heimdall_watch_ip, set_heimdall_mode, - HeimdallData, HeimdallKey, HeimdallMode, + heimdall_expire, heimdall_watch_ip, set_heimdall_mode }; pub use ip_mapping::{ add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips, @@ -31,4 +29,3 @@ pub use linux::num_possible_cpus; pub use lqos_kernel::max_tracked_ips; pub use tcp_rtt::{rtt_for_each, RttTrackingEntry}; pub use throughput::{throughput_for_each, HostCounter}; -pub use xdp_ip_address::XdpIpAddress; diff --git a/src/rust/lqos_sys/src/lqos_kernel.rs b/src/rust/lqos_sys/src/lqos_kernel.rs index ac4d965d..b16fa3ad 100644 --- a/src/rust/lqos_sys/src/lqos_kernel.rs +++ b/src/rust/lqos_sys/src/lqos_kernel.rs @@ -11,8 +11,9 @@ use libbpf_sys::{ XDP_FLAGS_UPDATE_IF_NOEXIST, }; use log::{info, warn}; +use lqos_heimdall::perf_interface::heimdall_handle_events; use nix::libc::{geteuid, if_nametoindex}; -use std::{ffi::{CString, c_void}, process::Command, thread::Thread}; +use std::{ffi::{CString, c_void}, process::Command}; pub(crate) mod bpf { #![allow(warnings, unused)] @@ -165,15 +166,12 @@ pub fn attach_xdp_and_tc_to_interface( log::error!("Unable to load Heimdall Events FD"); return Err(anyhow::Error::msg("Unable to load Heimdall Events FD")); } - let opts: *const bpf::perf_buffer_opts = std::ptr::null(); + let opts: *const bpf::ring_buffer_opts = std::ptr::null(); let heimdall_perf_buffer = unsafe { - bpf::perf_buffer__new( + bpf::ring_buffer__new( heimdall_events_fd, - 8, - Some(crate::heimdall_map::handle_events), - Some(crate::heimdall_map::missed_events), - opts as *mut c_void, - opts) + Some(heimdall_handle_events), + opts as *mut c_void, opts) }; if unsafe { bpf::libbpf_get_error(heimdall_perf_buffer as *mut c_void) != 0 } { log::error!("Failed to create Heimdall event buffer"); @@ -286,7 +284,7 @@ unsafe fn try_xdp_attach( // Handle type used to wrap *mut bpf::perf_buffer and indicate // that it can be moved. Really unsafe code in theory. -struct PerfBufferHandle(*mut bpf::perf_buffer); +struct PerfBufferHandle(*mut bpf::ring_buffer); unsafe impl Send for PerfBufferHandle {} unsafe impl Sync for PerfBufferHandle {} @@ -294,7 +292,7 @@ unsafe impl Sync for PerfBufferHandle {} fn poll_perf_events(heimdall_perf_buffer: PerfBufferHandle) { let heimdall_perf_buffer = heimdall_perf_buffer.0; loop { - let err = unsafe { bpf::perf_buffer__poll(heimdall_perf_buffer, 100) }; + let err = unsafe { bpf::ring_buffer__poll(heimdall_perf_buffer, 100) }; if err < 0 { log::error!("Error polling perfbuffer"); } diff --git a/src/rust/lqos_sys/src/throughput.rs b/src/rust/lqos_sys/src/throughput.rs index 452e135d..75c458b7 100644 --- a/src/rust/lqos_sys/src/throughput.rs +++ b/src/rust/lqos_sys/src/throughput.rs @@ -1,4 +1,6 @@ -use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress}; +use lqos_utils::XdpIpAddress; + +use crate::{bpf_per_cpu_map::BpfPerCpuMap}; /// Representation of the XDP map from map_traffic #[repr(C)] diff --git a/src/rust/lqos_utils/Cargo.toml b/src/rust/lqos_utils/Cargo.toml index 95e201a1..0e856273 100644 --- a/src/rust/lqos_utils/Cargo.toml +++ b/src/rust/lqos_utils/Cargo.toml @@ -9,3 +9,5 @@ nix = "0" log = "0" notify = { version = "5.0.0", default-features = false } # Not using crossbeam because of Tokio thiserror = "1" +byteorder = "1.4" +zerocopy = { version = "0.6.1", features = ["simd"] } diff --git a/src/rust/lqos_utils/src/lib.rs b/src/rust/lqos_utils/src/lib.rs index dd6fa32a..b0654c0d 100644 --- a/src/rust/lqos_utils/src/lib.rs +++ b/src/rust/lqos_utils/src/lib.rs @@ -5,3 +5,6 @@ pub mod hex_string; pub mod packet_scale; mod string_table_enum; pub mod unix_time; +mod xdp_ip_address; + +pub use xdp_ip_address::XdpIpAddress; \ No newline at end of file diff --git a/src/rust/lqos_sys/src/xdp_ip_address.rs b/src/rust/lqos_utils/src/xdp_ip_address.rs similarity index 97% rename from src/rust/lqos_sys/src/xdp_ip_address.rs rename to src/rust/lqos_utils/src/xdp_ip_address.rs index 2e971faa..8580f783 100644 --- a/src/rust/lqos_sys/src/xdp_ip_address.rs +++ b/src/rust/lqos_utils/src/xdp_ip_address.rs @@ -1,11 +1,12 @@ use byteorder::{BigEndian, ByteOrder}; +use zerocopy::FromBytes; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; /// XdpIpAddress provides helpful conversion between the XDP program's /// native storage of IP addresses in `[u8; 16]` blocks of bytes and /// Rust `IpAddr` types. #[repr(C)] -#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FromBytes)] pub struct XdpIpAddress(pub [u8; 16]); impl Default for XdpIpAddress { diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index cc154f7f..584275f6 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -13,6 +13,7 @@ lqos_config = { path = "../lqos_config" } lqos_sys = { path = "../lqos_sys" } lqos_queue_tracker = { path = "../lqos_queue_tracker" } lqos_utils = { path = "../lqos_utils" } +lqos_heimdall = { path = "../lqos_heimdall" } tokio = { version = "1", features = [ "full", "parking_lot" ] } once_cell = "1.17.1" lqos_bus = { path = "../lqos_bus" } diff --git a/src/rust/lqosd/src/ip_mapping.rs b/src/rust/lqosd/src/ip_mapping.rs index cd7f7aab..46789ef7 100644 --- a/src/rust/lqosd/src/ip_mapping.rs +++ b/src/rust/lqosd/src/ip_mapping.rs @@ -1,6 +1,6 @@ use anyhow::Result; use lqos_bus::{BusResponse, IpMapping, TcHandle}; -use lqos_sys::XdpIpAddress; +use lqos_utils::XdpIpAddress; fn expect_ack(result: Result<()>) -> BusResponse { if result.is_ok() { diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index 92305706..7c84f97f 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -16,6 +16,7 @@ use anyhow::Result; use log::{info, warn}; use lqos_bus::{BusRequest, BusResponse, UnixSocketServer}; use lqos_config::LibreQoSConfig; +use lqos_heimdall::HeimdallMode; use lqos_queue_tracker::{ add_watched_queue, get_raw_circuit_data, spawn_queue_monitor, spawn_queue_structure_monitor, @@ -66,7 +67,7 @@ async fn main() -> Result<()> { } else { LibreQoSKernels::new(&config.internet_interface, &config.isp_interface)? }; - set_heimdall_mode(lqos_sys::HeimdallMode::WatchOnly)?; // TODO: Set by config + set_heimdall_mode(HeimdallMode::WatchOnly)?; // TODO: Set by config // Spawn tracking sub-systems join!( diff --git a/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs b/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs index 82015013..97ba86fa 100644 --- a/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs @@ -1,141 +1,5 @@ -use std::{time::Duration, net::IpAddr, collections::HashSet}; +use lqos_bus::BusResponse; -use dashmap::DashMap; -use lqos_bus::{BusResponse, FlowTransport, tos_parser}; -use lqos_sys::{HeimdallData, HeimdallKey, XdpIpAddress, heimdall_watch_ip}; -use lqos_utils::unix_time::time_since_boot; -use once_cell::sync::Lazy; - -use crate::stats::FLOWS_TRACKED; - -pub(crate) static HEIMDALL: Lazy = - Lazy::new(PalantirMonitor::new); - -pub(crate) struct PalantirMonitor { - pub(crate) data: DashMap, -} - -#[derive(Default)] -pub(crate) struct FlowData { - last_seen: u64, - bytes: u64, - packets: u64, - tos: u8, -} - -impl PalantirMonitor { - fn new() -> Self { - Self { data: DashMap::new() } - } - - fn combine_flows(values: &[HeimdallData]) -> FlowData { - let mut result = FlowData::default(); - let mut ls = 0; - values.iter().for_each(|v| { - result.bytes += v.bytes; - result.packets += v.packets; - result.tos += v.tos; - if v.last_seen > ls { - ls = v.last_seen; - } - }); - result.last_seen = ls; - result - } - - pub(crate) fn ingest(&self, key: &HeimdallKey, values: &[HeimdallData]) { - //println!("{key:?}"); - //println!("{values:?}"); - if let Some(expire_ns) = Self::get_expire_time() { - let combined = Self::combine_flows(values); - if combined.last_seen > expire_ns { - if let Some(mut flow) = self.data.get_mut(key) { - // Update - flow.bytes = combined.bytes; - flow.packets = combined.packets; - flow.last_seen = combined.last_seen; - flow.tos = combined.tos; - } else { - // Insert - self.data.insert(key.clone(), combined); - } - } - } - } - - fn get_expire_time() -> Option { - let boot_time = time_since_boot(); - if let Ok(boot_time) = boot_time { - let time_since_boot = Duration::from(boot_time); - let five_minutes_ago = - time_since_boot.saturating_sub(Duration::from_secs(30)); - let expire_ns = five_minutes_ago.as_nanos() as u64; - Some(expire_ns) - } else { - None - } - } - - pub(crate) fn expire(&self) { - if let Some(expire_ns) = Self::get_expire_time() { - self.data.retain(|_k, v| v.last_seen > expire_ns); - } - FLOWS_TRACKED.store(self.data.len() as u64, std::sync::atomic::Ordering::Relaxed); - } -} - -pub fn get_flow_stats(ip: &str) -> BusResponse { - let ip = ip.parse::(); - if let Ok(ip) = ip { - let ip = XdpIpAddress::from_ip(ip); - heimdall_watch_ip(ip); - let mut result = Vec::new(); - - // Obtain all the flows - let mut all_flows = Vec::new(); - for value in HEIMDALL.data.iter() { - let key = value.key(); - if key.src_ip == ip || key.dst_ip == ip { - let (dscp, ecn) = tos_parser(value.tos); - all_flows.push(FlowTransport{ - src: key.src_ip.as_ip().to_string(), - dst: key.dst_ip.as_ip().to_string(), - src_port: key.src_port, - dst_port: key.dst_port, - proto: match key.ip_protocol { - 6 => lqos_bus::FlowProto::TCP, - 17 => lqos_bus::FlowProto::UDP, - _ => lqos_bus::FlowProto::ICMP, - }, - bytes: value.bytes, - packets: value.packets, - dscp, - ecn - }); - } - } - - // Turn them into reciprocal pairs - let mut done = HashSet::new(); - for (i,flow) in all_flows.iter().enumerate() { - if !done.contains(&i) { - let flow_a = flow.clone(); - let flow_b = if let Some(flow_b) = all_flows.iter().position(|f| f.src == flow_a.dst && f.src_port == flow_a.dst_port) { - done.insert(flow_b); - Some(all_flows[flow_b].clone()) - } else { - None - }; - - result.push((flow_a, flow_b)); - } - } - - result.sort_by(|a,b| { - b.0.bytes.cmp(&a.0.bytes) - }); - - return BusResponse::FlowData(result); - } +pub fn get_flow_stats(_ip: &str) -> BusResponse { BusResponse::Fail("No Stats or bad IP".to_string()) } \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 0f597b08..5d27095a 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -8,8 +8,8 @@ use crate::{ }; use log::{info, warn}; use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult}; -use lqos_sys::{XdpIpAddress, heimdall_expire}; -use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot}; +use lqos_sys::heimdall_expire; +use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot, XdpIpAddress}; use once_cell::sync::Lazy; use std::time::Duration; @@ -31,7 +31,6 @@ pub fn spawn_throughput_monitor() { net_json.zero_throughput_and_rtt(); } // Scope to end the lock THROUGHPUT_TRACKER.copy_previous_and_reset_rtt(); - THROUGHPUT_TRACKER.pantir_tracking(); THROUGHPUT_TRACKER.apply_new_throughput_counters(); THROUGHPUT_TRACKER.apply_rtt_data(); THROUGHPUT_TRACKER.update_totals(); diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 9ad38880..a871a5e9 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -3,7 +3,8 @@ use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS, heimdall_data::HEIMDALL}; use dashmap::DashMap; use lqos_bus::TcHandle; -use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress, heimdall_for_each}; +use lqos_sys::{rtt_for_each, throughput_for_each}; +use lqos_utils::XdpIpAddress; pub struct ThroughputTracker { pub(crate) cycle: AtomicU64, @@ -101,14 +102,6 @@ impl ThroughputTracker { }); } - pub(crate) fn pantir_tracking(&self) { - HEIMDALL.expire(); - heimdall_for_each(&mut |key, values| { - HEIMDALL.ingest(key, values); - }); - //println!("Tracking {} flows", HEIMDALL.data.len()); - } - pub(crate) fn apply_new_throughput_counters( &self, ) {