From 130c888e22a4aa1d8ad3c930a78ddaabddba4e9a Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Sat, 11 Mar 2023 17:10:08 +0000 Subject: [PATCH] Heimdall now only watches those who request to be watched by default. --- src/rust/Cargo.lock | 3 + src/rust/lqos_node_manager/src/queue_info.rs | 2 +- src/rust/lqos_sys/Cargo.toml | 3 + src/rust/lqos_sys/src/bpf/common/heimdall.h | 40 ++++++++++ src/rust/lqos_sys/src/bpf/lqos_kern.c | 6 +- src/rust/lqos_sys/src/bpf_map.rs | 6 +- src/rust/lqos_sys/src/heimdall_map.rs | 79 ++++++++++++++++++- src/rust/lqos_sys/src/lib.rs | 7 +- src/rust/lqosd/src/main.rs | 3 +- .../src/throughput_tracker/heimdall_data.rs | 3 +- src/rust/lqosd/src/throughput_tracker/mod.rs | 3 +- src/rust/remove_pinned_maps.sh | 3 +- 12 files changed, 145 insertions(+), 13 deletions(-) diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 5b418b2c..09dc4940 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1497,11 +1497,14 @@ dependencies = [ "anyhow", "bindgen", "byteorder", + "dashmap", "libbpf-sys", "log", "lqos_bus", "lqos_config", + "lqos_utils", "nix", + "once_cell", ] [[package]] diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index 3c4867d1..9a3f782a 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -102,7 +102,7 @@ pub async fn raw_queue_by_circuit( #[get("/api/flows/")] pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache>> { let mut result = Vec::new(); - let request: Vec = ip_list.split(",").map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect(); + let request: Vec = ip_list.split(',').map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect(); let responses = bus_request(request).await.unwrap(); for r in responses.iter() { if let BusResponse::FlowData(flow) = r { diff --git a/src/rust/lqos_sys/Cargo.toml b/src/rust/lqos_sys/Cargo.toml index d17ead2a..f3b39f4e 100644 --- a/src/rust/lqos_sys/Cargo.toml +++ b/src/rust/lqos_sys/Cargo.toml @@ -11,6 +11,9 @@ byteorder = "1.4" lqos_bus = { path = "../lqos_bus" } lqos_config = { path = "../lqos_config" } log = "0" +lqos_utils = { path = "../lqos_utils" } +once_cell = "1" +dashmap = "5" [build-dependencies] bindgen = "0" diff --git a/src/rust/lqos_sys/src/bpf/common/heimdall.h b/src/rust/lqos_sys/src/bpf/common/heimdall.h index f6066d37..50711730 100644 --- a/src/rust/lqos_sys/src/bpf/common/heimdall.h +++ b/src/rust/lqos_sys/src/bpf/common/heimdall.h @@ -7,6 +7,28 @@ #include "debug.h" #include "dissector.h" +// Array containing one element, the Heimdall configuration +struct heimdall_config_t { + __u32 monitor_mode; // 0 = Off, 1 = Targets only, 2 = Analysis Mode +}; + +struct { + __uint(type, BPF_MAP_TYPE_ARRAY); + __type(key, __u32); + __type(value, struct heimdall_config_t); + __uint(max_entries, 2); + __uint(pinning, LIBBPF_PIN_BY_NAME); +} heimdall_config SEC(".maps"); + +struct +{ + __uint(type, BPF_MAP_TYPE_HASH); + __type(key, struct in6_addr); + __type(value, __u32); + __uint(max_entries, 64); + __uint(pinning, LIBBPF_PIN_BY_NAME); +} heimdall_watching SEC(".maps"); + struct heimdall_key { struct in6_addr src; struct in6_addr dst; @@ -32,6 +54,24 @@ struct __uint(pinning, LIBBPF_PIN_BY_NAME); } heimdall SEC(".maps"); +static __always_inline __u8 get_heimdall_mode() { + __u32 index = 0; + struct heimdall_config_t * cfg = (struct heimdall_config_t *)bpf_map_lookup_elem(&heimdall_config, &index); + if (cfg) { + return cfg->monitor_mode; + } else { + return 0; + } +} + +static __always_inline bool is_heimdall_watching(struct dissector_t *dissector) { + __u32 * watching = bpf_map_lookup_elem(&heimdall_watching, &dissector->src_ip); + if (watching) return true; + watching = bpf_map_lookup_elem(&heimdall_watching, &dissector->dst_ip); + if (watching) return true; + return false; +} + static __always_inline void update_heimdall(struct dissector_t * dissector, __u32 size, int dir) { if (dissector->src_port == 0 || dissector->dst_port == 0) return; struct heimdall_key key = {0}; diff --git a/src/rust/lqos_sys/src/bpf/lqos_kern.c b/src/rust/lqos_sys/src/bpf/lqos_kern.c index 807d110c..2eb0fb61 100644 --- a/src/rust/lqos_sys/src/bpf/lqos_kern.c +++ b/src/rust/lqos_sys/src/bpf/lqos_kern.c @@ -66,6 +66,8 @@ int xdp_prog(struct xdp_md *ctx) return XDP_PASS; } + __u8 heimdall_mode = get_heimdall_mode(); + // Do we need to perform a VLAN redirect? bool vlan_redirect = false; { // Note: scope for removing temporaries from the stack @@ -138,7 +140,9 @@ int xdp_prog(struct xdp_md *ctx) #ifdef VERBOSE bpf_debug("(XDP) Storing Heimdall Data"); #endif - update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction); + if (heimdall_mode == 2 || (heimdall_mode==1 && is_heimdall_watching(&dissector))) { + update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction); + } // Handle CPU redirection if there is one specified __u32 *cpu_lookup; diff --git a/src/rust/lqos_sys/src/bpf_map.rs b/src/rust/lqos_sys/src/bpf_map.rs index be461b79..6ac800aa 100644 --- a/src/rust/lqos_sys/src/bpf_map.rs +++ b/src/rust/lqos_sys/src/bpf_map.rs @@ -130,9 +130,7 @@ where } } - /// Inserts an entry into a BPF map, or updates an existing entry with - /// the same key. - /// + /// Inserts an entry into a BPF map. /// Use this sparingly, because it briefly pauses XDP access to the /// underlying map (through internal locking we can't reach from /// userland). @@ -152,7 +150,7 @@ where self.fd, key_ptr as *mut c_void, val_ptr as *mut c_void, - BPF_NOEXIST.into(), + 0, ) }; if err != 0 { diff --git a/src/rust/lqos_sys/src/heimdall_map.rs b/src/rust/lqos_sys/src/heimdall_map.rs index 46fe1c8b..a67cb5be 100644 --- a/src/rust/lqos_sys/src/heimdall_map.rs +++ b/src/rust/lqos_sys/src/heimdall_map.rs @@ -1,4 +1,9 @@ -use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress}; +use std::time::Duration; +use dashmap::DashMap; +use lqos_utils::unix_time::time_since_boot; +use once_cell::sync::Lazy; + +use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress, bpf_map::BpfMap}; #[derive(Debug, Clone, Default, PartialEq, Eq, Hash)] #[repr(C)] @@ -31,3 +36,75 @@ pub fn heimdall_for_each( heimdall.for_each(callback); } } + +#[repr(u8)] +pub enum HeimdallMode { + Off = 0, + WatchOnly = 1, + Analysis = 2, +} + +#[derive(Default, Clone)] +#[repr(C)] +struct HeimdalConfig { + mode: u32, +} + +/// Change the eBPF Heimdall System mode. +pub fn set_heimdall_mode(mode: HeimdallMode) -> anyhow::Result<()> { + let mut map = BpfMap::::from_path("/sys/fs/bpf/heimdall_config")?; + map.clear_no_repeat()?; + map.insert_or_update(&mut 0, &mut HeimdalConfig { mode: mode as u32 })?; + Ok(()) +} + +#[derive(Clone, Eq, PartialEq, Hash)] +pub struct HeimdallWatching { + expiration: u128, + ip_address: XdpIpAddress +} + +impl HeimdallWatching { + pub fn new(mut ip: XdpIpAddress) -> anyhow::Result { + let now = time_since_boot()?; + let expire = Duration::from(now) + Duration::from_secs(30); + + let mut map = BpfMap::::from_path("/sys/fs/bpf/heimdall_watching").unwrap(); + let _ = map.insert(&mut ip, &mut 1); + + Ok(Self { + ip_address: ip, + expiration: expire.as_nanos(), + }) + } + + fn stop_watching(&mut self) { + //println!("I stopped watching {:?}", self.ip_address); + let mut map = BpfMap::::from_path("/sys/fs/bpf/heimdall_watching").unwrap(); + map.delete(&mut self.ip_address).unwrap(); + } +} + +static HEIMDALL_WATCH_LIST: Lazy> = Lazy::new(DashMap::new); + +pub fn heimdall_expire() { + if let Ok(now) = time_since_boot() { + let now = Duration::from(now).as_nanos(); + HEIMDALL_WATCH_LIST.retain(|_k,v| { + if v.expiration < now { + v.stop_watching(); + } + v.expiration > now + }); + } +} + +pub fn heimdall_watch_ip(ip: XdpIpAddress) { + if HEIMDALL_WATCH_LIST.contains_key(&ip) { + return; + } + if let Ok(h) = HeimdallWatching::new(ip) { + //println!("Watching {:?}", ip); + HEIMDALL_WATCH_LIST.insert(ip, h); + } +} \ No newline at end of file diff --git a/src/rust/lqos_sys/src/lib.rs b/src/rust/lqos_sys/src/lib.rs index 54335a5e..495de395 100644 --- a/src/rust/lqos_sys/src/lib.rs +++ b/src/rust/lqos_sys/src/lib.rs @@ -10,14 +10,18 @@ mod bifrost_maps; mod bpf_map; mod bpf_per_cpu_map; mod cpu_map; -mod ip_mapping; mod heimdall_map; +mod ip_mapping; mod kernel_wrapper; mod lqos_kernel; mod tcp_rtt; mod throughput; mod xdp_ip_address; +pub use heimdall_map::{ + heimdall_expire, heimdall_for_each, heimdall_watch_ip, set_heimdall_mode, + HeimdallData, HeimdallKey, HeimdallMode, +}; pub use ip_mapping::{ add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips, }; @@ -27,4 +31,3 @@ pub use lqos_kernel::max_tracked_ips; pub use tcp_rtt::{rtt_for_each, RttTrackingEntry}; pub use throughput::{throughput_for_each, HostCounter}; pub use xdp_ip_address::XdpIpAddress; -pub use heimdall_map::{heimdall_for_each, HeimdallKey, HeimdallData}; diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index 823ca7a1..92305706 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -20,7 +20,7 @@ use lqos_queue_tracker::{ add_watched_queue, get_raw_circuit_data, spawn_queue_monitor, spawn_queue_structure_monitor, }; -use lqos_sys::LibreQoSKernels; +use lqos_sys::{LibreQoSKernels, set_heimdall_mode}; use signal_hook::{ consts::{SIGHUP, SIGINT, SIGTERM}, iterator::Signals, @@ -66,6 +66,7 @@ async fn main() -> Result<()> { } else { LibreQoSKernels::new(&config.internet_interface, &config.isp_interface)? }; + set_heimdall_mode(lqos_sys::HeimdallMode::WatchOnly)?; // TODO: Set by config // Spawn tracking sub-systems join!( diff --git a/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs b/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs index f3915d67..88af373c 100644 --- a/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs @@ -2,7 +2,7 @@ use std::{time::Duration, net::IpAddr}; use dashmap::DashMap; use lqos_bus::{BusResponse, FlowTransport}; -use lqos_sys::{HeimdallData, HeimdallKey, XdpIpAddress}; +use lqos_sys::{HeimdallData, HeimdallKey, XdpIpAddress, heimdall_watch_ip}; use lqos_utils::unix_time::time_since_boot; use once_cell::sync::Lazy; @@ -90,6 +90,7 @@ pub fn get_flow_stats(ip: &str) -> BusResponse { let ip = ip.parse::(); if let Ok(ip) = ip { let ip = XdpIpAddress::from_ip(ip); + heimdall_watch_ip(ip); let mut result = Vec::new(); for value in HEIMDALL.data.iter() { diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index d4475760..0f597b08 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -8,7 +8,7 @@ use crate::{ }; use log::{info, warn}; use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult}; -use lqos_sys::XdpIpAddress; +use lqos_sys::{XdpIpAddress, heimdall_expire}; use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot}; use once_cell::sync::Lazy; use std::time::Duration; @@ -38,6 +38,7 @@ pub fn spawn_throughput_monitor() { THROUGHPUT_TRACKER.next_cycle(); let duration_ms = start.elapsed().as_micros(); TIME_TO_POLL_HOSTS.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed); + heimdall_expire(); }); }); } diff --git a/src/rust/remove_pinned_maps.sh b/src/rust/remove_pinned_maps.sh index c484a670..44e2ecf7 100755 --- a/src/rust/remove_pinned_maps.sh +++ b/src/rust/remove_pinned_maps.sh @@ -11,4 +11,5 @@ rm -v /sys/fs/bpf/map_txq_config rm -v /sys/fs/bpf/bifrost_interface_map rm -v /sys/fs/bpf/bifrost_vlan_map rm -v /sys/fs/bpf/heimdall - +rm -v /sys/fs/bpf/heimdall_config +rm -v /sys/fs/bpf/heimdall_watching