Throughput tracker refactor part 1: raw_data is now a DashMap (no locking required).

This commit is contained in:
Herbert Wolverson 2023-03-08 17:19:35 +00:00
parent 497abd15f8
commit ae1d5efb4a
4 changed files with 54 additions and 40 deletions

14
src/rust/Cargo.lock generated
View File

@ -615,6 +615,19 @@ dependencies = [
"cipher",
]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "default-net"
version = "0.12.0"
@ -1431,6 +1444,7 @@ name = "lqosd"
version = "0.1.0"
dependencies = [
"anyhow",
"dashmap",
"env_logger",
"jemallocator",
"log",

View File

@ -23,7 +23,8 @@ env_logger = "0"
log = "0"
nix = "0"
sysinfo = "0"
dashmap = "5"
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]
jemallocator = "0.5"
jemallocator = "0.5"

View File

@ -57,8 +57,8 @@ pub fn current_throughput() -> BusResponse {
pub fn host_counters() -> BusResponse {
let mut result = Vec::new();
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data.iter().for_each(|(k, v)| {
let ip = k.as_ip();
tp.raw_data.iter().for_each(|v| {
let ip = v.key().as_ip();
let (down, up) = v.bytes_per_second;
result.push((ip, down, up));
});
@ -77,11 +77,11 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.map(|(ip, te)| {
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.map(|te| {
(
*ip,
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
@ -122,12 +122,12 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|(_, te)| te.median_latency() > 0.0)
.map(|(ip, te)| {
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
*ip,
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
@ -167,12 +167,12 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|(_, te)| te.median_latency() > 0.0)
.map(|(ip, te)| {
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
*ip,
*te.key(),
te.bytes_per_second,
te.packets_per_second,
te.median_latency(),
@ -214,8 +214,8 @@ pub fn xdp_pping_compat() -> BusResponse {
let result = raw
.raw_data
.iter()
.filter(|(_, d)| retire_check(raw.cycle, d.most_recent_cycle))
.filter_map(|(_ip, data)| {
.filter(|d| retire_check(raw.cycle, d.most_recent_cycle))
.filter_map(|data| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> =
data.recent_rtt_data.iter().filter(|d| **d > 0).copied().collect();
@ -250,10 +250,10 @@ pub fn xdp_pping_compat() -> BusResponse {
pub fn rtt_histogram() -> BusResponse {
let mut result = vec![0; 20];
let reader = THROUGHPUT_TRACKER.read().unwrap();
for (_, data) in reader
for data in reader
.raw_data
.iter()
.filter(|(_, d)| retire_check(reader.cycle, d.most_recent_cycle))
.filter(|d| retire_check(reader.cycle, d.most_recent_cycle))
{
let valid_samples: Vec<u32> =
data.recent_rtt_data.iter().filter(|d| **d > 0).copied().collect();
@ -275,8 +275,8 @@ pub fn host_counts() -> BusResponse {
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
.for_each(|(_, d)| {
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.for_each(|d| {
total += 1;
if d.tc_handle.as_u32() != 0 {
shaped += 1;
@ -304,12 +304,12 @@ pub fn all_unknown_ips() -> BusResponse {
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
.filter(|(_, d)| d.tc_handle.as_u32() == 0)
.filter(|(_, d)| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|(ip, te)| {
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| d.tc_handle.as_u32() == 0)
.filter(|d| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|te| {
(
*ip,
*te.key(),
te.bytes,
te.packets,
te.median_latency(),

View File

@ -1,13 +1,12 @@
use crate::shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress};
use std::collections::HashMap;
pub struct ThroughputTracker {
pub(crate) cycle: u64,
pub(crate) raw_data: HashMap<XdpIpAddress, ThroughputEntry>,
pub(crate) raw_data: DashMap<XdpIpAddress, ThroughputEntry>,
pub(crate) bytes_per_second: (u64, u64),
pub(crate) packets_per_second: (u64, u64),
pub(crate) shaped_bytes_per_second: (u64, u64),
@ -20,7 +19,7 @@ impl ThroughputTracker {
// from there via the C API.
Self {
cycle: RETIRE_AFTER_SECONDS,
raw_data: HashMap::with_capacity(lqos_sys::max_tracked_ips()),
raw_data: DashMap::with_capacity(lqos_sys::max_tracked_ips()),
bytes_per_second: (0, 0),
packets_per_second: (0, 0),
shaped_bytes_per_second: (0, 0),
@ -31,7 +30,7 @@ impl ThroughputTracker {
// Copy previous byte/packet numbers and reset RTT data
// We're using Rayon's "par_iter_mut" to spread the operation across
// all CPU cores.
self.raw_data.iter_mut().for_each(|(_k, v)| {
self.raw_data.iter_mut().for_each(|mut v| {
if v.first_cycle < self.cycle {
v.bytes_per_second.0 =
u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
@ -93,8 +92,8 @@ impl ThroughputTracker {
}
pub(crate) fn refresh_circuit_ids(&mut self) {
self.raw_data.iter_mut().for_each(|(ip, data)| {
data.circuit_id = Self::lookup_circuit_id(ip);
self.raw_data.iter_mut().for_each(|mut data| {
data.circuit_id = Self::lookup_circuit_id(data.key());
data.network_json_parents =
Self::lookup_network_parents(data.circuit_id.clone());
});
@ -106,7 +105,7 @@ impl ThroughputTracker {
let cycle = self.cycle;
let raw_data = &mut self.raw_data;
throughput_for_each(&mut |xdp_ip, counts| {
if let Some(entry) = raw_data.get_mut(xdp_ip) {
if let Some(mut entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = (0, 0);
entry.packets = (0, 0);
for c in counts {
@ -171,7 +170,7 @@ impl ThroughputTracker {
rtt_for_each(&mut |raw_ip, rtt| {
if rtt.has_fresh_data != 0 {
let ip = XdpIpAddress(*raw_ip);
if let Some(tracker) = self.raw_data.get_mut(&ip) {
if let Some(mut tracker) = self.raw_data.get_mut(&ip) {
tracker.recent_rtt_data = rtt.rtt;
tracker.last_fresh_rtt_data_cycle = self.cycle;
if let Some(parents) = &tracker.network_json_parents {
@ -189,7 +188,7 @@ impl ThroughputTracker {
self.shaped_bytes_per_second = (0, 0);
self
.raw_data
.values()
.iter()
.map(|v| {
(
v.bytes.0.saturating_sub(v.prev_bytes.0),
@ -238,8 +237,8 @@ impl ThroughputTracker {
#[allow(dead_code)]
pub(crate) fn dump(&self) {
for (k, v) in self.raw_data.iter() {
let ip = k.as_ip();
for v in self.raw_data.iter() {
let ip = v.key().as_ip();
log::info!("{:<34}{:?}", ip, v.tc_handle);
}
}