EBPF garbage collector: hourly remove throughput entries that haven't been seen recently. Runs in a background thread, so iteration time should be a non-issue.

This commit is contained in:
Herbert Wolverson 2024-10-31 10:55:41 -05:00
parent 0d6c9d38fd
commit de264bd200
5 changed files with 73 additions and 2 deletions

View File

@ -258,3 +258,12 @@ pub fn end_flows(flows: &mut [FlowbeeKey]) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
pub(crate) fn expire_throughput(keys: &mut [XdpIpAddress]) -> anyhow::Result<()> {
let mut map = BpfMap::<XdpIpAddress, HostCounter>::from_path("/sys/fs/bpf/map_traffic")?;
for key in keys {
map.delete(key).unwrap();
}
Ok(())
}

View File

@ -0,0 +1,59 @@
use std::time::Duration;
use tracing::{debug, error, info};
use lqos_utils::unix_time::time_since_boot;
/// Starts a periodic garbage collector that will run every hour.
/// This is used to clean up old eBPF map entries to limit memory usage.
pub fn bpf_garbage_collector() {
const SLEEP_TIME: u64 = 60 * 60; // 1 Hour
//const SLEEP_TIME: u64 = 5 * 60; // 5 Minutes
info!("Starting BPF garbage collector");
let result = std::thread::Builder::new()
.name("bpf_garbage_collector".to_string())
.spawn(|| {
loop {
std::thread::sleep(Duration::from_secs(SLEEP_TIME));
info!("Running BPF garbage collector");
throughput_garbage_collect();
}
});
if let Err(e) = result {
error!("Failed to start BPF garbage collector: {:?}", e);
}
}
/// Iterates through all throughput entries, building a list of any that
/// haven't been seen for an hour. These are then bulk deleted.
fn throughput_garbage_collect() {
const EXPIRY_TIME: u64 = 60 * 60; // 1 Hour
//const EXPIRY_TIME: u64 = 5 * 60; // 5 Minutes
let Ok(now) = time_since_boot() else { return };
let now = Duration::from(now).as_nanos() as u64;
let period_nanos = EXPIRY_TIME * 1_000_000_000;
let period_ago = now - period_nanos;
let mut expired = Vec::new();
unsafe {
crate::bpf_iterator::iterate_throughput(&mut |ip, counters| {
let last_seen: u64 = counters
.iter()
.map(|c| c.last_seen)
.collect::<Vec<_>>()
.into_iter()
.max()
.unwrap_or(0);
if last_seen < period_ago {
expired.push(ip.clone());
}
});
}
if !expired.is_empty() {
info!("Garbage collecting {} throughput entries", expired.len());
if let Err(e) = crate::bpf_iterator::expire_throughput(&mut expired) {
error!("Failed to garbage collect throughput: {:?}", e);
}
}
}

View File

@ -21,6 +21,7 @@ mod bpf_iterator;
/// Data shared between eBPF and Heimdall that needs local access /// Data shared between eBPF and Heimdall that needs local access
/// for map control. /// for map control.
pub mod flowbee_data; pub mod flowbee_data;
mod garbage_collector;
pub use ip_mapping::{ pub use ip_mapping::{
add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips, clear_hot_cache, add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips, clear_hot_cache,
@ -31,3 +32,4 @@ pub use lqos_kernel::max_tracked_ips;
pub use throughput::{throughput_for_each, HostCounter}; pub use throughput::{throughput_for_each, HostCounter};
pub use bpf_iterator::{iterate_flows, end_flows}; pub use bpf_iterator::{iterate_flows, end_flows};
pub use lqos_kernel::interface_name_to_index; pub use lqos_kernel::interface_name_to_index;
pub use garbage_collector::bpf_garbage_collector;

View File

@ -141,6 +141,7 @@ fn main() -> Result<()> {
throughput_tracker::spawn_throughput_monitor(long_term_stats_tx.clone(), flow_tx)?; throughput_tracker::spawn_throughput_monitor(long_term_stats_tx.clone(), flow_tx)?;
spawn_queue_monitor()?; spawn_queue_monitor()?;
let system_usage_tx = system_stats::start_system_stats()?; let system_usage_tx = system_stats::start_system_stats()?;
lqos_sys::bpf_garbage_collector();
// Handle signals // Handle signals
let mut signals = Signals::new([SIGINT, SIGHUP, SIGTERM])?; let mut signals = Signals::new([SIGINT, SIGHUP, SIGTERM])?;