Throughput tracker unlocking part 2, cycle is an atomic.

This commit is contained in:
Herbert Wolverson
2023-03-08 17:29:15 +00:00
parent ae1d5efb4a
commit 395b8e0857
2 changed files with 26 additions and 17 deletions

View File

@@ -75,10 +75,11 @@ type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String);
pub fn top_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp = THROUGHPUT_TRACKER.read().unwrap();
let tp_cycle = tp.cycle.load(std::sync::atomic::Ordering::Relaxed);
tp.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.map(|te| {
(
*te.key(),
@@ -120,10 +121,11 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
pub fn worst_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp = THROUGHPUT_TRACKER.read().unwrap();
let tp_cycle = tp.cycle.load(std::sync::atomic::Ordering::Relaxed);
tp.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
@@ -165,10 +167,11 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
pub fn best_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp = THROUGHPUT_TRACKER.read().unwrap();
let tp_cycle = tp.cycle.load(std::sync::atomic::Ordering::Relaxed);
tp.raw_data
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency() > 0.0)
.map(|te| {
(
@@ -211,10 +214,11 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
pub fn xdp_pping_compat() -> BusResponse {
let raw = THROUGHPUT_TRACKER.read().unwrap();
let raw_cycle = raw.cycle.load(std::sync::atomic::Ordering::Relaxed);
let result = raw
.raw_data
.iter()
.filter(|d| retire_check(raw.cycle, d.most_recent_cycle))
.filter(|d| retire_check(raw_cycle, d.most_recent_cycle))
.filter_map(|data| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> =
@@ -250,10 +254,11 @@ pub fn xdp_pping_compat() -> BusResponse {
pub fn rtt_histogram() -> BusResponse {
let mut result = vec![0; 20];
let reader = THROUGHPUT_TRACKER.read().unwrap();
let reader_cycle = reader.cycle.load(std::sync::atomic::Ordering::Relaxed);
for data in reader
.raw_data
.iter()
.filter(|d| retire_check(reader.cycle, d.most_recent_cycle))
.filter(|d| retire_check(reader_cycle, d.most_recent_cycle))
{
let valid_samples: Vec<u32> =
data.recent_rtt_data.iter().filter(|d| **d > 0).copied().collect();
@@ -273,9 +278,10 @@ pub fn host_counts() -> BusResponse {
let mut total = 0;
let mut shaped = 0;
let tp = THROUGHPUT_TRACKER.read().unwrap();
let tp_cycle = tp.cycle.load(std::sync::atomic::Ordering::Relaxed);
tp.raw_data
.iter()
.filter(|d| retire_check(tp.cycle, d.most_recent_cycle))
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.for_each(|d| {
total += 1;
if d.tc_handle.as_u32() != 0 {

View File

@@ -1,3 +1,4 @@
use std::sync::atomic::AtomicU64;
use crate::shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
@@ -5,7 +6,7 @@ use lqos_bus::TcHandle;
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress};
pub struct ThroughputTracker {
pub(crate) cycle: u64,
pub(crate) cycle: AtomicU64,
pub(crate) raw_data: DashMap<XdpIpAddress, ThroughputEntry>,
pub(crate) bytes_per_second: (u64, u64),
pub(crate) packets_per_second: (u64, u64),
@@ -18,7 +19,7 @@ impl ThroughputTracker {
// maximums.h (MAX_TRACKED_IPS), so we grab it
// from there via the C API.
Self {
cycle: RETIRE_AFTER_SECONDS,
cycle: AtomicU64::new(RETIRE_AFTER_SECONDS),
raw_data: DashMap::with_capacity(lqos_sys::max_tracked_ips()),
bytes_per_second: (0, 0),
packets_per_second: (0, 0),
@@ -30,8 +31,9 @@ impl ThroughputTracker {
// Copy previous byte/packet numbers and reset RTT data
// We're using Rayon's "par_iter_mut" to spread the operation across
// all CPU cores.
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
self.raw_data.iter_mut().for_each(|mut v| {
if v.first_cycle < self.cycle {
if v.first_cycle < self_cycle {
v.bytes_per_second.0 =
u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
v.bytes_per_second.1 =
@@ -44,8 +46,8 @@ impl ThroughputTracker {
v.prev_packets = v.packets;
}
// Roll out stale RTT data
if self.cycle > RETIRE_AFTER_SECONDS
&& v.last_fresh_rtt_data_cycle < self.cycle - RETIRE_AFTER_SECONDS
if self_cycle > RETIRE_AFTER_SECONDS
&& v.last_fresh_rtt_data_cycle < self_cycle - RETIRE_AFTER_SECONDS
{
v.recent_rtt_data = [0; 60];
}
@@ -102,8 +104,8 @@ impl ThroughputTracker {
pub(crate) fn apply_new_throughput_counters(
&mut self,
) {
let cycle = self.cycle;
let raw_data = &mut self.raw_data;
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
throughput_for_each(&mut |xdp_ip, counts| {
if let Some(mut entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = (0, 0);
@@ -121,7 +123,7 @@ impl ThroughputTracker {
}
}
if entry.packets != entry.prev_packets {
entry.most_recent_cycle = cycle;
entry.most_recent_cycle = self_cycle;
if let Some(parents) = &entry.network_json_parents {
let net_json = NETWORK_JSON.read().unwrap();
@@ -139,7 +141,7 @@ impl ThroughputTracker {
let mut entry = ThroughputEntry {
circuit_id: circuit_id.clone(),
network_json_parents: Self::lookup_network_parents(circuit_id),
first_cycle: self.cycle,
first_cycle: self_cycle,
most_recent_cycle: 0,
bytes: (0, 0),
packets: (0, 0),
@@ -167,12 +169,13 @@ impl ThroughputTracker {
}
pub(crate) fn apply_rtt_data(&mut self) {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
rtt_for_each(&mut |raw_ip, rtt| {
if rtt.has_fresh_data != 0 {
let ip = XdpIpAddress(*raw_ip);
if let Some(mut tracker) = self.raw_data.get_mut(&ip) {
tracker.recent_rtt_data = rtt.rtt;
tracker.last_fresh_rtt_data_cycle = self.cycle;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
net_json.add_rtt_cycle(parents, tracker.median_latency());
@@ -219,8 +222,8 @@ impl ThroughputTracker {
});
}
pub(crate) fn next_cycle(&mut self) {
self.cycle += 1;
pub(crate) fn next_cycle(&self) {
self.cycle.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn bits_per_second(&self) -> (u64, u64) {