Rearrange Heimdall

Refactor Heimdall into a more self-contained process, separate its
operation into its own thread rather than occupying the main
tick thread.
This commit is contained in:
Herbert Wolverson 2023-03-16 14:24:53 +00:00
parent 5ff671567a
commit 2779cf20c9
18 changed files with 181 additions and 48 deletions

3
src/rust/Cargo.lock generated
View File

@ -1458,9 +1458,11 @@ dependencies = [
name = "lqos_heimdall"
version = "0.1.0"
dependencies = [
"anyhow",
"dashmap",
"log",
"lqos_bus",
"lqos_sys",
"lqos_utils",
"once_cell",
"zerocopy",
@ -1541,7 +1543,6 @@ dependencies = [
"log",
"lqos_bus",
"lqos_config",
"lqos_heimdall",
"lqos_utils",
"nix",
"once_cell",

View File

@ -6,7 +6,9 @@ edition = "2021"
[dependencies]
lqos_utils = { path = "../lqos_utils" }
lqos_bus = { path = "../lqos_bus" }
lqos_sys = { path = "../lqos_sys" }
log = "0"
zerocopy = {version = "0.6.1", features = [ "simd" ] }
once_cell = "1.17.1"
dashmap = "5.4.0"
anyhow = "1"

View File

@ -6,7 +6,7 @@ pub enum HeimdallMode {
Off = 0,
/// Only look at flows on hosts we are watching via the circuit monitor
WatchOnly = 1,
/// Capture everything (this may set your CPU on fire)
/// Capture detailed packet data from flows
Analysis = 2,
}

View File

@ -1,4 +1,4 @@
use crate::{perf_interface::HeimdallEvent, timeline::expire_timeline};
use crate::{perf_interface::HeimdallEvent, timeline::expire_timeline, FLOW_EXPIRE_SECS};
use dashmap::DashMap;
use lqos_bus::{tos_parser, BusResponse, FlowTransport};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
@ -59,8 +59,7 @@ pub(crate) fn record_flow(event: &HeimdallEvent) {
pub fn expire_heimdall_flows() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let thirty_secs_ago = since_boot - Duration::from_secs(30);
let expire = thirty_secs_ago.as_nanos() as u64;
let expire = (since_boot - Duration::from_secs(FLOW_EXPIRE_SECS)).as_nanos() as u64;
FLOW_DATA.retain(|_k, v| v.last_seen > expire);
expire_timeline();
}

View File

@ -5,9 +5,45 @@
mod config;
pub mod perf_interface;
pub mod stats;
pub use config::{HeimdallMode, HeimdalConfig};
pub use config::{HeimdalConfig, HeimdallMode};
mod flows;
pub use flows::{expire_heimdall_flows, get_flow_stats};
mod timeline;
pub use timeline::{ten_second_packet_dump, ten_second_pcap};
mod pcap;
mod watchlist;
use lqos_utils::fdtimer::periodic;
pub use watchlist::{heimdall_expire, heimdall_watch_ip, set_heimdall_mode};
/// How long should Heimdall keep watching a flow after being requested
/// to do so? Setting this to a long period increases CPU load after the
/// client has stopped looking. Too short a delay will lead to missed
/// collections if the client hasn't maintained the 1s request cadence.
const EXPIRE_WATCHES_SECS: u64 = 5;
/// How long should Heimdall retain flow summary data?
const FLOW_EXPIRE_SECS: u64 = 10;
/// How long should Heimdall retain packet timeline data?
const TIMELINE_EXPIRE_SECS: u64 = 10;
/// Interface to running Heimdall (start this when lqosd starts)
/// This is async to match the other spawning systems.
pub async fn start_heimdall() {
if set_heimdall_mode(HeimdallMode::WatchOnly).is_err() {
log::error!(
"Unable to set Heimdall Mode. Packet watching will be unavailable."
);
return;
}
let interval_ms = 1000; // 1 second
log::info!("Heimdall check period set to {interval_ms} ms.");
std::thread::spawn(move || {
periodic(interval_ms, "Heimdall Packet Watcher", &mut || {
expire_heimdall_flows();
heimdall_expire();
});
});
}

View File

@ -4,7 +4,7 @@ use lqos_bus::{PacketHeader, tos_parser};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use zerocopy::AsBytes;
use crate::{perf_interface::{HeimdallEvent, PACKET_OCTET_SIZE}, pcap::{PcapFileHeader, PcapPacketHeader}};
use crate::{perf_interface::{HeimdallEvent, PACKET_OCTET_SIZE}, pcap::{PcapFileHeader, PcapPacketHeader}, TIMELINE_EXPIRE_SECS};
impl HeimdallEvent {
fn as_header(&self) -> PacketHeader {
@ -45,8 +45,7 @@ pub(crate) fn store_on_timeline(event: HeimdallEvent) {
pub(crate) fn expire_timeline() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let ten_secs_ago = since_boot - Duration::from_secs(10);
let expire = ten_secs_ago.as_nanos() as u64;
let expire = (since_boot - Duration::from_secs(TIMELINE_EXPIRE_SECS)).as_nanos() as u64;
TIMELINE.data.retain(|v| v.timestamp > expire);
}
}

View File

@ -0,0 +1,79 @@
use crate::{HeimdalConfig, HeimdallMode, EXPIRE_WATCHES_SECS};
use dashmap::DashMap;
use lqos_sys::bpf_map::BpfMap;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::time::Duration;
const HEIMDALL_CFG_PATH: &str = "/sys/fs/bpf/heimdall_config";
const HEIMDALL_WATCH_PATH: &str = "/sys/fs/bpf/heimdall_watching";
/// Change the eBPF Heimdall System mode.
pub fn set_heimdall_mode(mode: HeimdallMode) -> anyhow::Result<()> {
let mut map = BpfMap::<u32, HeimdalConfig>::from_path(HEIMDALL_CFG_PATH)?;
map.insert_or_update(&mut 0, &mut HeimdalConfig { mode: mode as u32 })?;
Ok(())
}
#[derive(Clone, Eq, PartialEq, Hash)]
pub struct HeimdallWatching {
expiration: u128,
ip_address: XdpIpAddress,
}
impl HeimdallWatching {
pub fn new(mut ip: XdpIpAddress) -> anyhow::Result<Self> {
let now = time_since_boot()?;
let expire =
Duration::from(now) + Duration::from_secs(EXPIRE_WATCHES_SECS);
let mut map =
BpfMap::<XdpIpAddress, u32>::from_path(HEIMDALL_WATCH_PATH).unwrap();
let _ = map.insert(&mut ip, &mut 1);
Ok(Self { ip_address: ip, expiration: expire.as_nanos() })
}
fn stop_watching(&mut self) {
log::info!("Heimdall stopped watching {}", self.ip_address.as_ip().to_string());
let mut map =
BpfMap::<XdpIpAddress, u32>::from_path(HEIMDALL_WATCH_PATH).unwrap();
map.delete(&mut self.ip_address).unwrap();
}
}
impl Drop for HeimdallWatching {
fn drop(&mut self) {
self.stop_watching();
}
}
static HEIMDALL_WATCH_LIST: Lazy<DashMap<XdpIpAddress, HeimdallWatching>> =
Lazy::new(DashMap::new);
/// Run this periodically (once per second) to expire any watched traffic
/// flows that haven't received traffic in the last 30 seconds.
pub fn heimdall_expire() {
if let Ok(now) = time_since_boot() {
let now = Duration::from(now).as_nanos();
HEIMDALL_WATCH_LIST.retain(|_k, v| {
v.expiration > now
});
}
}
/// Instruct Heimdall to start watching an IP address.
/// You want to call this when you refresh a flow; it will auto-expire
/// in 30 seconds.
pub fn heimdall_watch_ip(ip: XdpIpAddress) {
if let Some(mut watch) = HEIMDALL_WATCH_LIST.get_mut(&ip) {
if let Ok(now) = time_since_boot() {
let expire =
Duration::from(now) + Duration::from_secs(EXPIRE_WATCHES_SECS);
watch.expiration = expire.as_nanos();
}
} else if let Ok(h) = HeimdallWatching::new(ip) {
log::info!("Heimdall is watching {}", ip.as_ip().to_string());
HEIMDALL_WATCH_LIST.insert(ip, h);
}
}

View File

@ -15,7 +15,6 @@ lqos_utils = { path = "../lqos_utils" }
once_cell = "1"
dashmap = "5"
thiserror = "1"
lqos_heimdall = { path = "../lqos_heimdall" }
[build-dependencies]
bindgen = "0"

View File

@ -65,6 +65,9 @@ static __always_inline __u8 get_heimdall_mode()
struct heimdall_config_t *cfg = (struct heimdall_config_t *)bpf_map_lookup_elem(&heimdall_config, &index);
if (cfg)
{
#ifdef VERBOSE
bpf_debug("Heimdall Mode: %d", cfg->monitor_mode);
#endif
return cfg->monitor_mode;
}
else
@ -76,12 +79,12 @@ static __always_inline __u8 get_heimdall_mode()
static __always_inline bool is_heimdall_watching(struct dissector_t *dissector, int effective_direction)
{
if (effective_direction == 2) {
__u32 *watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->src_ip);
__u32 * watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->src_ip);
if (watching) {
return true;
}
} else {
__u32 *watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->dst_ip);
__u32 * watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->dst_ip);
if (watching) {
return true;
}
@ -89,7 +92,7 @@ static __always_inline bool is_heimdall_watching(struct dissector_t *dissector,
return false;
}
static __always_inline void update_heimdall(struct dissector_t *dissector, __u32 size)
static __always_inline void update_heimdall(struct dissector_t *dissector, __u32 size, __u8 mode)
{
struct heimdall_event event = {0};
event.timetamp = bpf_ktime_get_boot_ns();
@ -102,13 +105,12 @@ static __always_inline void update_heimdall(struct dissector_t *dissector, __u32
event.size = size;
event.tcp_flags = dissector->tcp_flags;
event.tcp_window = dissector->window;
//event.tsval = dissector->tsval;
//event.tsecr = dissector->tsecr;
event.tsval = dissector->tsval;
event.tsecr = dissector->tsecr;
if (size > PACKET_OCTET_SIZE) size = PACKET_OCTET_SIZE;
//if ((char *)dissector->start + size < dissector->end) {
bpf_probe_read_kernel(&event.dump, size, dissector->start);
//}
bpf_ringbuf_output(&heimdall_events, &event, sizeof(event), 0);
// Commented out because we don't really care - some will be missed
// during very heavy load.
//if (err != 0) {

View File

@ -136,11 +136,11 @@ int xdp_prog(struct xdp_md *ctx)
if (tc_handle != 0) {
// Send data to Heimdall
__u8 heimdall_mode = get_heimdall_mode();
if (heimdall_mode == 2 || (heimdall_mode==1 && is_heimdall_watching(&dissector, effective_direction))) {
if (heimdall_mode > 0 && is_heimdall_watching(&dissector, effective_direction)) {
#ifdef VERBOSE
bpf_debug("(XDP) Storing Heimdall Data");
#endif
update_heimdall(&dissector, ctx->data_end - ctx->data);
update_heimdall(&dissector, ctx->data_end - ctx->data, heimdall_mode);
}
// Handle CPU redirection if there is one specified

View File

@ -15,7 +15,7 @@ use std::{
///
/// `K` is the *key* type, indexing the map.
/// `V` is the *value* type, and must exactly match the underlying C data type.
pub(crate) struct BpfMap<K, V> {
pub struct BpfMap<K, V> {
fd: i32,
_key_phantom: PhantomData<K>,
_val_phantom: PhantomData<V>,
@ -29,7 +29,7 @@ where
/// Connect to a BPF map via a filename. Connects the internal
/// file descriptor, which is held until the structure is
/// dropped.
pub(crate) fn from_path(filename: &str) -> Result<Self> {
pub fn from_path(filename: &str) -> Result<Self> {
let filename_c = CString::new(filename)?;
let fd = unsafe { bpf_obj_get(filename_c.as_ptr()) };
if fd < 0 {
@ -43,7 +43,7 @@ where
/// to a vector. Each entry contains a `key, value` tuple.
///
/// This has performance issues due to excessive cloning
pub(crate) fn dump_vec(&self) -> Vec<(K, V)> {
pub fn dump_vec(&self) -> Vec<(K, V)> {
let mut result = Vec::new();
let mut prev_key: *mut K = null_mut();
@ -74,7 +74,7 @@ where
/// Iterates the undlering BPF map, and sends references to the
/// results directly to a callback
pub(crate) fn for_each(&self, callback: &mut dyn FnMut(&K, &V)) {
pub fn for_each(&self, callback: &mut dyn FnMut(&K, &V)) {
let mut prev_key: *mut K = null_mut();
let mut key: K = K::default();
let key_ptr: *mut K = &mut key;
@ -112,7 +112,7 @@ where
///
/// Returns Ok if insertion succeeded, a generic error (no details yet)
/// if it fails.
pub(crate) fn insert(&mut self, key: &mut K, value: &mut V) -> Result<()> {
pub fn insert(&mut self, key: &mut K, value: &mut V) -> Result<()> {
let key_ptr: *mut K = key;
let val_ptr: *mut V = value;
let err = unsafe {
@ -142,7 +142,7 @@ where
///
/// Returns Ok if insertion succeeded, a generic error (no details yet)
/// if it fails.
pub(crate) fn insert_or_update(&mut self, key: &mut K, value: &mut V) -> Result<()> {
pub fn insert_or_update(&mut self, key: &mut K, value: &mut V) -> Result<()> {
let key_ptr: *mut K = key;
let val_ptr: *mut V = value;
let err = unsafe {
@ -169,7 +169,7 @@ where
/// * `key` - the key to delete.
///
/// Return `Ok` if deletion succeeded.
pub(crate) fn delete(&mut self, key: &mut K) -> Result<()> {
pub fn delete(&mut self, key: &mut K) -> Result<()> {
let key_ptr: *mut K = key;
let err = unsafe { bpf_map_delete_elem(self.fd, key_ptr as *mut c_void) };
if err != 0 {
@ -189,7 +189,7 @@ where
/// heavy load, it WILL eventually terminate - but it might
/// take a very long time. Only use this for cleaning up
/// sparsely allocated map data.
pub(crate) fn clear(&mut self) -> Result<()> {
pub fn clear(&mut self) -> Result<()> {
loop {
let mut key = K::default();
let mut prev_key: *mut K = null_mut();
@ -223,7 +223,15 @@ where
Ok(())
}
pub(crate) fn clear_no_repeat(&mut self) -> Result<()> {
/// Delete all entries in the underlying eBPF map.
/// Use this sparingly, it locks the underlying map. Under
/// heavy load, it WILL eventually terminate - but it might
/// take a very long time. Only use this for cleaning up
/// sparsely allocated map data.
///
/// This version skips the "did it really clear?" repeat
/// found in the main version.
pub fn clear_no_repeat(&mut self) -> Result<()> {
let mut key = K::default();
let mut prev_key: *mut K = null_mut();
unsafe {

View File

@ -3,7 +3,13 @@ use dashmap::DashMap;
use lqos_heimdall::{HeimdallMode, HeimdalConfig};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use crate::{bpf_map::BpfMap};
use crate::bpf_map::BpfMap;
/// How long should Heimdall keep watching a flow after being requested
/// to do so? Setting this to a long period increases CPU load after the
/// client has stopped looking. Too short a delay will lead to missed
/// collections if the client hasn't maintained the 1s request cadence.
const EXPIRE_WATCHES_SECS: u64 = 5;
/// Change the eBPF Heimdall System mode.
pub fn set_heimdall_mode(mode: HeimdallMode) -> anyhow::Result<()> {
@ -21,7 +27,7 @@ pub struct HeimdallWatching {
impl HeimdallWatching {
pub fn new(mut ip: XdpIpAddress) -> anyhow::Result<Self> {
let now = time_since_boot()?;
let expire = Duration::from(now) + Duration::from_secs(30);
let expire = Duration::from(now) + Duration::from_secs(EXPIRE_WATCHES_SECS);
let mut map = BpfMap::<XdpIpAddress, u32>::from_path("/sys/fs/bpf/heimdall_watching").unwrap();
let _ = map.insert(&mut ip, &mut 1);

View File

@ -1,6 +1,6 @@
use crate::lqos_kernel::{
attach_xdp_and_tc_to_interface, unload_xdp_from_interface,
InterfaceDirection,
InterfaceDirection, bpf::ring_buffer_sample_fn,
};
/// A wrapper-type that stores the interfaces to which the XDP and TC programs should
@ -23,7 +23,9 @@ impl LibreQoSKernels {
///
/// * `to_internet` - the name of the Internet-facing interface (e.g. `eth1`).
/// * `to_isp` - the name of the ISP-network facing interface (e.g. `eth2`).
pub fn new<S: ToString>(to_internet: S, to_isp: S) -> anyhow::Result<Self> {
/// * `heimdall_event_handler` - C function pointer to the ringbuffer
/// event handler exported by Heimdall.
pub fn new<S: ToString>(to_internet: S, to_isp: S, heimdall_event_handler: ring_buffer_sample_fn) -> anyhow::Result<Self> {
let kernel = Self {
to_internet: to_internet.to_string(),
to_isp: to_isp.to_string(),
@ -32,10 +34,12 @@ impl LibreQoSKernels {
attach_xdp_and_tc_to_interface(
&kernel.to_internet,
InterfaceDirection::Internet,
heimdall_event_handler,
)?;
attach_xdp_and_tc_to_interface(
&kernel.to_isp,
InterfaceDirection::IspNetwork,
heimdall_event_handler,
)?;
Ok(kernel)
}
@ -55,6 +59,7 @@ impl LibreQoSKernels {
stick_interface: S,
internet_vlan: u16,
isp_vlan: u16,
heimdall_event_handler: ring_buffer_sample_fn,
) -> anyhow::Result<Self> {
let kernel = Self {
to_internet: stick_interface.to_string(),
@ -64,6 +69,7 @@ impl LibreQoSKernels {
attach_xdp_and_tc_to_interface(
&kernel.to_internet,
InterfaceDirection::OnAStick(internet_vlan, isp_vlan),
heimdall_event_handler,
)?;
Ok(kernel)

View File

@ -7,10 +7,12 @@
//! and statically embeds the result in this crate.
mod bifrost_maps;
mod bpf_map;
/// Provides direct access to LibBPF functionality, as exposed by the
/// built-in, compiled eBPF programs. This is very-low level and should
/// be handled with caution.
pub mod bpf_map;
mod bpf_per_cpu_map;
mod cpu_map;
mod heimdall_map;
mod ip_mapping;
mod kernel_wrapper;
mod lqos_kernel;
@ -18,9 +20,6 @@ mod tcp_rtt;
mod throughput;
mod linux;
pub use heimdall_map::{
heimdall_expire, heimdall_watch_ip, set_heimdall_mode
};
pub use ip_mapping::{
add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips,
};

View File

@ -11,7 +11,6 @@ use libbpf_sys::{
XDP_FLAGS_UPDATE_IF_NOEXIST,
};
use log::{info, warn};
use lqos_heimdall::perf_interface::heimdall_handle_events;
use nix::libc::{geteuid, if_nametoindex};
use std::{ffi::{CString, c_void}, process::Command};
@ -114,6 +113,7 @@ pub enum InterfaceDirection {
pub fn attach_xdp_and_tc_to_interface(
interface_name: &str,
direction: InterfaceDirection,
heimdall_event_handler: bpf::ring_buffer_sample_fn,
) -> Result<()> {
check_root()?;
// Check the interface is valid
@ -170,7 +170,7 @@ pub fn attach_xdp_and_tc_to_interface(
let heimdall_perf_buffer = unsafe {
bpf::ring_buffer__new(
heimdall_events_fd,
Some(heimdall_handle_events),
heimdall_event_handler,
opts as *mut c_void, opts)
};
if unsafe { bpf::libbpf_get_error(heimdall_perf_buffer as *mut c_void) != 0 } {

View File

@ -18,12 +18,12 @@ use anyhow::Result;
use log::{info, warn};
use lqos_bus::{BusRequest, BusResponse, UnixSocketServer};
use lqos_config::LibreQoSConfig;
use lqos_heimdall::{ten_second_packet_dump, HeimdallMode};
use lqos_heimdall::{ten_second_packet_dump, perf_interface::heimdall_handle_events, start_heimdall};
use lqos_queue_tracker::{
add_watched_queue, get_raw_circuit_data, spawn_queue_monitor,
spawn_queue_structure_monitor,
};
use lqos_sys::{set_heimdall_mode, LibreQoSKernels};
use lqos_sys::LibreQoSKernels;
use lqos_utils::XdpIpAddress;
use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM},
@ -66,14 +66,15 @@ async fn main() -> Result<()> {
&config.internet_interface,
config.stick_vlans.1,
config.stick_vlans.0,
Some(heimdall_handle_events),
)?
} else {
LibreQoSKernels::new(&config.internet_interface, &config.isp_interface)?
LibreQoSKernels::new(&config.internet_interface, &config.isp_interface, Some(heimdall_handle_events))?
};
set_heimdall_mode(HeimdallMode::WatchOnly)?; // TODO: Set by config
// Spawn tracking sub-systems
join!(
start_heimdall(),
spawn_queue_structure_monitor(),
shaped_devices_tracker::shaped_devices_watcher(),
shaped_devices_tracker::network_json_watcher(),

View File

@ -1,6 +1,6 @@
use std::net::IpAddr;
use lqos_bus::BusResponse;
use lqos_sys::heimdall_watch_ip;
use lqos_heimdall::heimdall_watch_ip;
use lqos_utils::XdpIpAddress;
pub fn get_flow_stats(ip: &str) -> BusResponse {

View File

@ -2,14 +2,12 @@ mod throughput_entry;
mod tracking_data;
mod heimdall_data;
pub use heimdall_data::get_flow_stats;
use lqos_heimdall::expire_heimdall_flows;
use crate::{
shaped_devices_tracker::NETWORK_JSON,
throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS,
};
use log::{info, warn};
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
use lqos_sys::heimdall_expire;
use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::time::Duration;
@ -31,7 +29,6 @@ pub fn spawn_throughput_monitor() {
let net_json = NETWORK_JSON.read().unwrap();
net_json.zero_throughput_and_rtt();
} // Scope to end the lock
expire_heimdall_flows();
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
THROUGHPUT_TRACKER.apply_new_throughput_counters();
THROUGHPUT_TRACKER.apply_rtt_data();
@ -39,7 +36,6 @@ pub fn spawn_throughput_monitor() {
THROUGHPUT_TRACKER.next_cycle();
let duration_ms = start.elapsed().as_micros();
TIME_TO_POLL_HOSTS.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed);
heimdall_expire();
});
});
}