Add the ability to ignore subnets from TCP RTT tracking.

If you add:

[netflow]
do_not_track_subnets = [ "192.168.66.0/24" ]

to your lqos.conf file, the listed subnets will never have RTT data
collected. This is useful if you wish to ignore the time spent
updating a cloud management system for CPEs, for example.
This commit is contained in:
Herbert Wolverson 2024-03-20 09:31:57 -05:00
parent c6830abae6
commit d72dd29996
2 changed files with 79 additions and 11 deletions

View File

@ -11,6 +11,7 @@ pub struct FlowConfig {
pub netflow_port: Option<u16>,
pub netflow_ip: Option<String>,
pub netflow_version: Option<u8>,
pub do_not_track_subnets: Option<Vec<String>>,
}
impl Default for FlowConfig {
@ -21,6 +22,7 @@ impl Default for FlowConfig {
netflow_port: None,
netflow_ip: None,
netflow_version: None,
do_not_track_subnets: None,
}
}
}

View File

@ -5,10 +5,7 @@ use lqos_sys::flowbee_data::FlowbeeKey;
use lqos_utils::unix_time::time_since_boot;
use once_cell::sync::Lazy;
use std::{
ffi::c_void,
slice,
sync::{atomic::AtomicU64, Mutex},
time::Duration,
ffi::c_void, net::{IpAddr, Ipv4Addr, Ipv6Addr}, slice, sync::{atomic::AtomicU64, Mutex}, time::Duration
};
use zerocopy::FromBytes;
@ -69,8 +66,61 @@ impl RttBuffer {
}
}
static FLOW_RTT: Lazy<Mutex<FxHashMap<FlowbeeKey, RttBuffer>>> =
Lazy::new(|| Mutex::new(FxHashMap::default()));
struct FlowTracker {
flow_rtt: FxHashMap<FlowbeeKey, RttBuffer>,
ignore_subnets: ip_network_table::IpNetworkTable<bool>,
}
impl FlowTracker {
fn new() -> Self {
let config = lqos_config::load_config().unwrap();
let mut ignore_subnets = ip_network_table::IpNetworkTable::new();
if let Some(flows) = &config.flows {
if let Some(subnets) = &flows.do_not_track_subnets {
// Subnets are in CIDR notation
for subnet in subnets.iter() {
let mut mask;
if subnet.contains('/') {
let split = subnet.split('/').collect::<Vec<_>>();
println!("{:?}", split);
if split.len() != 2 {
log::error!("Invalid subnet: {}", subnet);
continue;
}
let ip = if split[0].contains(":") {
// It's IPv6
mask = split[1].parse().unwrap_or(128);
let ip: Ipv6Addr = split[0].parse().unwrap();
ip
} else {
// It's IPv4
mask = split[1].parse().unwrap_or(32);
let ip: Ipv4Addr = split[0].parse().unwrap();
mask += 96;
ip.to_ipv6_mapped()
};
println!("{:?} {:?}", ip, mask);
let addr = ip_network::IpNetwork::new(ip, mask).unwrap();
ignore_subnets.insert(addr, true);
} else {
log::error!("Invalid subnet: {}", subnet);
continue;
}
}
}
}
Self {
flow_rtt: FxHashMap::default(),
ignore_subnets,
}
}
}
static FLOW_RTT: Lazy<Mutex<FlowTracker>> =
Lazy::new(|| Mutex::new(FlowTracker::new()));
#[repr(C)]
#[derive(FromBytes, Debug, Clone, PartialEq, Eq, Hash)]
@ -103,14 +153,30 @@ pub unsafe extern "C" fn flowbee_handle_events(
return 0;
}
let mut lock = FLOW_RTT.lock().unwrap();
if let Some(entry) = lock.get_mut(&incoming.key) {
// Check if it should be ignored
let ip = incoming.key.remote_ip.as_ip();
let ip = match ip {
IpAddr::V4(ip) => {
ip.to_ipv6_mapped()
}
IpAddr::V6(ip) => {
ip
}
};
if lock.ignore_subnets.longest_match(ip).is_some() {
return 0;
}
// Insert it
if let Some(entry) = lock.flow_rtt.get_mut(&incoming.key) {
entry.push(
incoming.rtt,
incoming.effective_direction,
since_boot.as_nanos() as u64,
);
} else {
lock.insert(
lock.flow_rtt.insert(
incoming.key,
RttBuffer::new(
incoming.rtt,
@ -138,18 +204,18 @@ pub fn expire_rtt_flows() {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(30)).as_nanos() as u64;
let mut lock = FLOW_RTT.lock().unwrap();
lock.retain(|_, v| v.last_seen > expire);
lock.flow_rtt.retain(|_, v| v.last_seen > expire);
}
}
pub fn flowbee_rtt_map() -> FxHashMap<FlowbeeKey, [RttData; 2]> {
let mut lock = FLOW_RTT.lock().unwrap();
let result = lock.iter()
let result = lock.flow_rtt.iter()
.map(|(k, v)| (k.clone(), [v.median_new_data(0), v.median_new_data(1)]))
.collect();
// Clear all fresh data labeling
lock.iter_mut().for_each(|(_, v)| {
lock.flow_rtt.iter_mut().for_each(|(_, v)| {
v.has_new_data = [false, false];
});