diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 66e1a88e..cb7294b0 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -615,6 +615,19 @@ dependencies = [ "cipher", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "default-net" version = "0.12.0" @@ -1431,6 +1444,7 @@ name = "lqosd" version = "0.1.0" dependencies = [ "anyhow", + "dashmap", "env_logger", "jemallocator", "log", diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index 91cbb882..cc154f7f 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -23,7 +23,8 @@ env_logger = "0" log = "0" nix = "0" sysinfo = "0" +dashmap = "5" # Support JemAlloc on supported platforms [target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies] -jemallocator = "0.5" \ No newline at end of file +jemallocator = "0.5" diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 42f1cbbc..9f4d46f9 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -57,8 +57,8 @@ pub fn current_throughput() -> BusResponse { pub fn host_counters() -> BusResponse { let mut result = Vec::new(); let tp = THROUGHPUT_TRACKER.read().unwrap(); - tp.raw_data.iter().for_each(|(k, v)| { - let ip = k.as_ip(); + tp.raw_data.iter().for_each(|v| { + let ip = v.key().as_ip(); let (down, up) = v.bytes_per_second; result.push((ip, down, up)); }); @@ -77,11 +77,11 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { let tp = THROUGHPUT_TRACKER.read().unwrap(); tp.raw_data .iter() - .filter(|(ip, _)| !ip.as_ip().is_loopback()) - .filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle)) - .map(|(ip, te)| { + .filter(|v| !v.key().as_ip().is_loopback()) + .filter(|d| retire_check(tp.cycle, d.most_recent_cycle)) + .map(|te| { ( - *ip, + *te.key(), te.bytes_per_second, te.packets_per_second, te.median_latency(), @@ -122,12 +122,12 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse { let tp = THROUGHPUT_TRACKER.read().unwrap(); tp.raw_data .iter() - .filter(|(ip, _)| !ip.as_ip().is_loopback()) - .filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle)) - .filter(|(_, te)| te.median_latency() > 0.0) - .map(|(ip, te)| { + .filter(|v| !v.key().as_ip().is_loopback()) + .filter(|d| retire_check(tp.cycle, d.most_recent_cycle)) + .filter(|te| te.median_latency() > 0.0) + .map(|te| { ( - *ip, + *te.key(), te.bytes_per_second, te.packets_per_second, te.median_latency(), @@ -167,12 +167,12 @@ pub fn best_n(start: u32, end: u32) -> BusResponse { let tp = THROUGHPUT_TRACKER.read().unwrap(); tp.raw_data .iter() - .filter(|(ip, _)| !ip.as_ip().is_loopback()) - .filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle)) - .filter(|(_, te)| te.median_latency() > 0.0) - .map(|(ip, te)| { + .filter(|v| !v.key().as_ip().is_loopback()) + .filter(|d| retire_check(tp.cycle, d.most_recent_cycle)) + .filter(|te| te.median_latency() > 0.0) + .map(|te| { ( - *ip, + *te.key(), te.bytes_per_second, te.packets_per_second, te.median_latency(), @@ -214,8 +214,8 @@ pub fn xdp_pping_compat() -> BusResponse { let result = raw .raw_data .iter() - .filter(|(_, d)| retire_check(raw.cycle, d.most_recent_cycle)) - .filter_map(|(_ip, data)| { + .filter(|d| retire_check(raw.cycle, d.most_recent_cycle)) + .filter_map(|data| { if data.tc_handle.as_u32() > 0 { let mut valid_samples: Vec = data.recent_rtt_data.iter().filter(|d| **d > 0).copied().collect(); @@ -250,10 +250,10 @@ pub fn xdp_pping_compat() -> BusResponse { pub fn rtt_histogram() -> BusResponse { let mut result = vec![0; 20]; let reader = THROUGHPUT_TRACKER.read().unwrap(); - for (_, data) in reader + for data in reader .raw_data .iter() - .filter(|(_, d)| retire_check(reader.cycle, d.most_recent_cycle)) + .filter(|d| retire_check(reader.cycle, d.most_recent_cycle)) { let valid_samples: Vec = data.recent_rtt_data.iter().filter(|d| **d > 0).copied().collect(); @@ -275,8 +275,8 @@ pub fn host_counts() -> BusResponse { let tp = THROUGHPUT_TRACKER.read().unwrap(); tp.raw_data .iter() - .filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle)) - .for_each(|(_, d)| { + .filter(|d| retire_check(tp.cycle, d.most_recent_cycle)) + .for_each(|d| { total += 1; if d.tc_handle.as_u32() != 0 { shaped += 1; @@ -304,12 +304,12 @@ pub fn all_unknown_ips() -> BusResponse { let tp = THROUGHPUT_TRACKER.read().unwrap(); tp.raw_data .iter() - .filter(|(ip, _)| !ip.as_ip().is_loopback()) - .filter(|(_, d)| d.tc_handle.as_u32() == 0) - .filter(|(_, d)| d.last_seen as u128 > five_minutes_ago_nanoseconds) - .map(|(ip, te)| { + .filter(|v| !v.key().as_ip().is_loopback()) + .filter(|d| d.tc_handle.as_u32() == 0) + .filter(|d| d.last_seen as u128 > five_minutes_ago_nanoseconds) + .map(|te| { ( - *ip, + *te.key(), te.bytes, te.packets, te.median_latency(), diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index a023b855..54789fe6 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -1,13 +1,12 @@ use crate::shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}; - use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; +use dashmap::DashMap; use lqos_bus::TcHandle; use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress}; -use std::collections::HashMap; pub struct ThroughputTracker { pub(crate) cycle: u64, - pub(crate) raw_data: HashMap, + pub(crate) raw_data: DashMap, pub(crate) bytes_per_second: (u64, u64), pub(crate) packets_per_second: (u64, u64), pub(crate) shaped_bytes_per_second: (u64, u64), @@ -20,7 +19,7 @@ impl ThroughputTracker { // from there via the C API. Self { cycle: RETIRE_AFTER_SECONDS, - raw_data: HashMap::with_capacity(lqos_sys::max_tracked_ips()), + raw_data: DashMap::with_capacity(lqos_sys::max_tracked_ips()), bytes_per_second: (0, 0), packets_per_second: (0, 0), shaped_bytes_per_second: (0, 0), @@ -31,7 +30,7 @@ impl ThroughputTracker { // Copy previous byte/packet numbers and reset RTT data // We're using Rayon's "par_iter_mut" to spread the operation across // all CPU cores. - self.raw_data.iter_mut().for_each(|(_k, v)| { + self.raw_data.iter_mut().for_each(|mut v| { if v.first_cycle < self.cycle { v.bytes_per_second.0 = u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0); @@ -93,8 +92,8 @@ impl ThroughputTracker { } pub(crate) fn refresh_circuit_ids(&mut self) { - self.raw_data.iter_mut().for_each(|(ip, data)| { - data.circuit_id = Self::lookup_circuit_id(ip); + self.raw_data.iter_mut().for_each(|mut data| { + data.circuit_id = Self::lookup_circuit_id(data.key()); data.network_json_parents = Self::lookup_network_parents(data.circuit_id.clone()); }); @@ -106,7 +105,7 @@ impl ThroughputTracker { let cycle = self.cycle; let raw_data = &mut self.raw_data; throughput_for_each(&mut |xdp_ip, counts| { - if let Some(entry) = raw_data.get_mut(xdp_ip) { + if let Some(mut entry) = raw_data.get_mut(xdp_ip) { entry.bytes = (0, 0); entry.packets = (0, 0); for c in counts { @@ -171,7 +170,7 @@ impl ThroughputTracker { rtt_for_each(&mut |raw_ip, rtt| { if rtt.has_fresh_data != 0 { let ip = XdpIpAddress(*raw_ip); - if let Some(tracker) = self.raw_data.get_mut(&ip) { + if let Some(mut tracker) = self.raw_data.get_mut(&ip) { tracker.recent_rtt_data = rtt.rtt; tracker.last_fresh_rtt_data_cycle = self.cycle; if let Some(parents) = &tracker.network_json_parents { @@ -189,7 +188,7 @@ impl ThroughputTracker { self.shaped_bytes_per_second = (0, 0); self .raw_data - .values() + .iter() .map(|v| { ( v.bytes.0.saturating_sub(v.prev_bytes.0), @@ -238,8 +237,8 @@ impl ThroughputTracker { #[allow(dead_code)] pub(crate) fn dump(&self) { - for (k, v) in self.raw_data.iter() { - let ip = k.as_ip(); + for v in self.raw_data.iter() { + let ip = v.key().as_ip(); log::info!("{:<34}{:?}", ip, v.tc_handle); } }