From 7e5b432253aaa29f2ff3b99f4b3fdefd92b5db0b Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Fri, 3 Mar 2023 21:42:22 +0000 Subject: [PATCH] First refactor towards the "funnel" - shaped devices in lqosd ShapedDevices.csv is now monitored in lqosd. This brings some advantages: * The Tracked Devices list now knows the circuit id association for every tracked IP. * The associations auto-update after a ShapedDevices reload. * The webserver is no longer doing Trie lookups to figure out what name to display. Moving forwards, this will allow for stats gathering to group IPs by circuit, and allow calculation of the "funnel". --- src/rust/lqos_bus/src/ip_stats.rs | 3 ++ src/rust/lqos_node_manager/src/tracker/mod.rs | 30 ++++++----- src/rust/lqos_sys/src/xdp_ip_address.rs | 30 +++++++++-- src/rust/lqosd/src/main.rs | 3 +- .../lqosd/src/shaped_devices_tracker/mod.rs | 52 +++++++++++++++++++ src/rust/lqosd/src/throughput_tracker/mod.rs | 14 ++++- .../throughput_tracker/throughput_entry.rs | 1 + .../src/throughput_tracker/tracking_data.rs | 20 +++++++ 8 files changed, 134 insertions(+), 19 deletions(-) create mode 100644 src/rust/lqosd/src/shaped_devices_tracker/mod.rs diff --git a/src/rust/lqos_bus/src/ip_stats.rs b/src/rust/lqos_bus/src/ip_stats.rs index 385ad5af..b2efa5df 100644 --- a/src/rust/lqos_bus/src/ip_stats.rs +++ b/src/rust/lqos_bus/src/ip_stats.rs @@ -8,6 +8,9 @@ pub struct IpStats { /// The host's IP address, as detected by the XDP program. pub ip_address: String, + /// The host's mapped circuit ID + pub circuit_id: String, + /// The current bits-per-second passing through this host. Tuple /// 0 is download, tuple 1 is upload. pub bits_per_second: (u64, u64), diff --git a/src/rust/lqos_node_manager/src/tracker/mod.rs b/src/rust/lqos_node_manager/src/tracker/mod.rs index 0f5aa5ef..56b9f15a 100644 --- a/src/rust/lqos_node_manager/src/tracker/mod.rs +++ b/src/rust/lqos_node_manager/src/tracker/mod.rs @@ -13,7 +13,6 @@ use lqos_bus::{IpStats, TcHandle}; use lqos_config::LibreQoSConfig; use parking_lot::Mutex; use rocket::serde::{json::Json, Deserialize, Serialize}; -use std::net::IpAddr; #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(crate = "rocket::serde")] @@ -35,23 +34,28 @@ impl From<&IpStats> for IpStatsWithPlan { packets_per_second: i.packets_per_second, median_tcp_rtt: i.median_tcp_rtt, tc_handle: i.tc_handle, - circuit_id: String::new(), + circuit_id: i.circuit_id.clone(), plan: (0, 0), }; - if let Ok(ip) = result.ip_address.parse::() { - let lookup = match ip { - IpAddr::V4(ip) => ip.to_ipv6_mapped(), - IpAddr::V6(ip) => ip, - }; - let cfg = SHAPED_DEVICES.read(); - if let Some((_, id)) = cfg.trie.longest_match(lookup) { + + if !result.circuit_id.is_empty() { + if let Some(circuit) = SHAPED_DEVICES + .read() + .devices + .iter() + .find(|sd| sd.circuit_id == result.circuit_id) + { + let name = if circuit.circuit_name.len() > 20 { + &circuit.circuit_name[0..20] + } else { + &circuit.circuit_name + }; result.ip_address = - format!("{} ({})", cfg.devices[*id].circuit_name, result.ip_address); - result.plan.0 = cfg.devices[*id].download_max_mbps; - result.plan.1 = cfg.devices[*id].upload_max_mbps; - result.circuit_id = cfg.devices[*id].circuit_id.clone(); + format!("{} ({})", name, result.ip_address); + result.plan = (circuit.download_max_mbps, circuit.download_min_mbps); } } + result } } diff --git a/src/rust/lqos_sys/src/xdp_ip_address.rs b/src/rust/lqos_sys/src/xdp_ip_address.rs index 7da8acb5..aef59514 100644 --- a/src/rust/lqos_sys/src/xdp_ip_address.rs +++ b/src/rust/lqos_sys/src/xdp_ip_address.rs @@ -41,9 +41,8 @@ impl XdpIpAddress { result } - /// Converts an `XdpIpAddress` type to a Rust `IpAddr` type - pub fn as_ip(&self) -> IpAddr { - if self.0[0] == 0xFF + fn is_v4(&self) -> bool { + self.0[0] == 0xFF && self.0[1] == 0xFF && self.0[2] == 0xFF && self.0[3] == 0xFF @@ -55,6 +54,31 @@ impl XdpIpAddress { && self.0[9] == 0xFF && self.0[10] == 0xFF && self.0[11] == 0xFF + } + + /// Convers an `XdpIpAddress` type to a Rust `IpAddr` type, using + /// the in-build mapped function for squishing IPv4 into IPv6 + pub fn as_ipv6(&self) -> Ipv6Addr { + if self.is_v4() + { + Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15]).to_ipv6_mapped() + } else { + Ipv6Addr::new( + BigEndian::read_u16(&self.0[0..2]), + BigEndian::read_u16(&self.0[2..4]), + BigEndian::read_u16(&self.0[4..6]), + BigEndian::read_u16(&self.0[6..8]), + BigEndian::read_u16(&self.0[8..10]), + BigEndian::read_u16(&self.0[10..12]), + BigEndian::read_u16(&self.0[12..14]), + BigEndian::read_u16(&self.0[14..]), + ) + } + } + + /// Converts an `XdpIpAddress` type to a Rust `IpAddr` type + pub fn as_ip(&self) -> IpAddr { + if self.is_v4() { // It's an IPv4 Address IpAddr::V4(Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15])) diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index c992f4aa..d35279e8 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -4,6 +4,7 @@ mod ip_mapping; mod lqos_daht_test; mod program_control; mod throughput_tracker; +mod shaped_devices_tracker; mod tuning; mod validation; use crate::{ @@ -63,7 +64,7 @@ async fn main() -> Result<()> { }; // Spawn tracking sub-systems - join!(spawn_queue_structure_monitor(),); + join!(spawn_queue_structure_monitor(), shaped_devices_tracker::shaped_devices_watcher()); throughput_tracker::spawn_throughput_monitor(); spawn_queue_monitor(); diff --git a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs new file mode 100644 index 00000000..8a34c626 --- /dev/null +++ b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs @@ -0,0 +1,52 @@ +use anyhow::Result; +use log::{error, info, warn}; +use lqos_config::ConfigShapedDevices; +use lqos_utils::file_watcher::FileWatcher; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use tokio::task::spawn_blocking; + +pub static SHAPED_DEVICES: Lazy> = + Lazy::new(|| RwLock::new(ConfigShapedDevices::default())); + +fn load_shaped_devices() { + info!("ShapedDevices.csv has changed. Attempting to load it."); + let shaped_devices = ConfigShapedDevices::load(); + if let Ok(new_file) = shaped_devices { + info!("ShapedDevices.csv loaded"); + *SHAPED_DEVICES.write() = new_file; + crate::throughput_tracker::THROUGHPUT_TRACKER.write().refresh_circuit_ids(); + } else { + warn!("ShapedDevices.csv failed to load, see previous error messages. Reverting to empty set."); + *SHAPED_DEVICES.write() = ConfigShapedDevices::default(); + } +} + +pub async fn shaped_devices_watcher() { + spawn_blocking(|| { + info!("Watching for ShapedDevices.csv changes"); + let _ = watch_for_shaped_devices_changing(); + }); +} + +/// Fires up a Linux file system watcher than notifies +/// when `ShapedDevices.csv` changes, and triggers a reload. +fn watch_for_shaped_devices_changing() -> Result<()> { + let watch_path = ConfigShapedDevices::path(); + if watch_path.is_err() { + error!("Unable to generate path for ShapedDevices.csv"); + return Err(anyhow::Error::msg( + "Unable to create path for ShapedDevices.csv", + )); + } + let watch_path = watch_path.unwrap(); + + let mut watcher = FileWatcher::new("ShapedDevices.csv", watch_path); + watcher.set_file_exists_callback(load_shaped_devices); + watcher.set_file_created_callback(load_shaped_devices); + watcher.set_file_changed_callback(load_shaped_devices); + loop { + let result = watcher.watch(); + info!("ShapedDevices watcher returned: {result:?}"); + } +} diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 2c25d85a..64cea3cb 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -11,7 +11,7 @@ use std::time::Duration; const RETIRE_AFTER_SECONDS: u64 = 30; -static THROUGHPUT_TRACKER: Lazy> = Lazy::new(|| RwLock::new(ThroughputTracker::new())); +pub static THROUGHPUT_TRACKER: Lazy> = Lazy::new(|| RwLock::new(ThroughputTracker::new())); pub fn spawn_throughput_monitor() { info!("Starting the bandwidth monitor thread."); @@ -62,7 +62,7 @@ fn retire_check(cycle: u64, recent_cycle: u64) -> bool { cycle < recent_cycle + RETIRE_AFTER_SECONDS } -type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle); +type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String); pub fn top_n(start: u32, end: u32) -> BusResponse { let mut full_list: Vec = { @@ -78,6 +78,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { te.packets_per_second, te.median_latency(), te.tc_handle, + te.circuit_id.as_ref().unwrap_or(&String::new()).clone(), ) }) .collect() @@ -94,8 +95,10 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { (packets_dn, packets_up), median_rtt, tc_handle, + circuit_id, )| IpStats { ip_address: ip.as_ip().to_string(), + circuit_id: circuit_id.clone(), bits_per_second: (bytes_dn * 8, bytes_up * 8), packets_per_second: (*packets_dn, *packets_up), median_tcp_rtt: *median_rtt, @@ -121,6 +124,7 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse { te.packets_per_second, te.median_latency(), te.tc_handle, + te.circuit_id.as_ref().unwrap_or(&String::new()).clone(), ) }) .collect() @@ -137,8 +141,10 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse { (packets_dn, packets_up), median_rtt, tc_handle, + circuit_id, )| IpStats { ip_address: ip.as_ip().to_string(), + circuit_id: circuit_id.clone(), bits_per_second: (bytes_dn * 8, bytes_up * 8), packets_per_second: (*packets_dn, *packets_up), median_tcp_rtt: *median_rtt, @@ -163,6 +169,7 @@ pub fn best_n(start: u32, end: u32) -> BusResponse { te.packets_per_second, te.median_latency(), te.tc_handle, + te.circuit_id.as_ref().unwrap_or(&String::new()).clone(), ) }) .collect() @@ -180,8 +187,10 @@ pub fn best_n(start: u32, end: u32) -> BusResponse { (packets_dn, packets_up), median_rtt, tc_handle, + circuit_id, )| IpStats { ip_address: ip.as_ip().to_string(), + circuit_id: circuit_id.clone(), bits_per_second: (bytes_dn * 8, bytes_up * 8), packets_per_second: (*packets_dn, *packets_up), median_tcp_rtt: *median_rtt, @@ -315,6 +324,7 @@ pub fn all_unknown_ips() -> BusResponse { _last_seen, )| IpStats { ip_address: ip.as_ip().to_string(), + circuit_id: String::new(), bits_per_second: (bytes_dn * 8, bytes_up * 8), packets_per_second: (*packets_dn, *packets_up), median_tcp_rtt: *median_rtt, diff --git a/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs b/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs index 65ea8f1b..f5efaa46 100644 --- a/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs +++ b/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs @@ -2,6 +2,7 @@ use lqos_bus::TcHandle; #[derive(Debug)] pub(crate) struct ThroughputEntry { + pub(crate) circuit_id: Option, pub(crate) first_cycle: u64, pub(crate) most_recent_cycle: u64, pub(crate) bytes: (u64, u64), diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 9ea150f2..9b4d9884 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -1,3 +1,5 @@ +use crate::shaped_devices_tracker::SHAPED_DEVICES; + use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use lqos_bus::TcHandle; use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress}; @@ -52,6 +54,23 @@ impl ThroughputTracker { }); } + fn lookup_circuit_id(xdp_ip: &XdpIpAddress) -> Option { + let mut circuit_id = None; + let lookup = xdp_ip.as_ipv6(); + let cfg = SHAPED_DEVICES.read(); + if let Some((_, id)) = cfg.trie.longest_match(lookup) { + circuit_id = Some(cfg.devices[*id].circuit_id.clone()); + } + //println!("{lookup:?} Found circuit_id: {circuit_id:?}"); + circuit_id + } + + pub(crate) fn refresh_circuit_ids(&mut self) { + self.raw_data.par_iter_mut().for_each(|(ip, data)| { + data.circuit_id = Self::lookup_circuit_id(ip); + }); + } + pub(crate) fn apply_new_throughput_counters(&mut self) { let cycle = self.cycle; let raw_data = &mut self.raw_data; @@ -76,6 +95,7 @@ impl ThroughputTracker { } } else { let mut entry = ThroughputEntry { + circuit_id: Self::lookup_circuit_id(xdp_ip), first_cycle: self.cycle, most_recent_cycle: 0, bytes: (0, 0),