Stop using Dashmap for the raw data. Go with a simple approach.

This commit is contained in:
Herbert Wolverson 2024-10-24 14:01:40 -05:00
parent f53dac3424
commit aed23f85fe
6 changed files with 91 additions and 71 deletions

View File

@ -40,9 +40,10 @@ fn recent_flows() -> ContainerSize {
}
fn throughput_tracker() -> ContainerSize {
let raw_data = THROUGHPUT_TRACKER.raw_data.lock().unwrap();
ContainerSize {
size: THROUGHPUT_TRACKER.raw_data.len(),
capacity: THROUGHPUT_TRACKER.raw_data.capacity(),
size: raw_data.len(),
capacity: raw_data.capacity(),
}
}

View File

@ -21,16 +21,18 @@ pub fn get_unknown_ips() -> Vec<UnknownIp> {
let sd_reader = SHAPED_DEVICES.read().unwrap();
THROUGHPUT_TRACKER
.raw_data
.lock()
.unwrap()
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| d.tc_handle.as_u32() == 0)
.filter(|d| {
let ip = d.key().as_ip();
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| d.tc_handle.as_u32() == 0)
.filter(|(k,_d)| {
let ip = k.as_ip();
!sd_reader.trie.longest_match(ip).is_some()
})
.map(|d| {
.map(|(k,d)| {
UnknownIp {
ip: d.key().as_ip().to_string(),
ip: k.as_ip().to_string(),
last_seen_nanos: now - d.last_seen,
total_bytes: d.bytes,
current_bytes: d.bytes_per_second,

View File

@ -29,7 +29,7 @@ pub async fn circuit_capacity(channels: Arc<PubSub>) {
let mut circuits: HashMap<String, CircuitAccumulator> = HashMap::new();
// Aggregate the data by circuit id
THROUGHPUT_TRACKER.raw_data.iter().for_each(|c| {
THROUGHPUT_TRACKER.raw_data.lock().unwrap().iter().for_each(|(_,c)| {
if let Some(circuit_id) = &c.circuit_id {
if let Some(accumulator) = circuits.get_mut(circuit_id) {
accumulator.bytes += c.bytes_per_second;

View File

@ -173,9 +173,11 @@ pub fn get_all_circuits() -> BusResponse {
if let Ok(devices) = SHAPED_DEVICES.read() {
let data = THROUGHPUT_TRACKER.
raw_data.
lock().
unwrap().
iter()
.map(|v| {
let ip = v.key().as_ip();
.map(|(k,v)| {
let ip = k.as_ip();
let last_seen_nanos = if v.last_seen > 0 {
let last_seen_nanos = v.last_seen as u128;
let since_boot = Duration::from(kernel_now).as_nanos();
@ -207,7 +209,7 @@ pub fn get_all_circuits() -> BusResponse {
}
Circuit {
ip: v.key().as_ip(),
ip: k.as_ip(),
bytes_per_second: v.bytes_per_second,
median_latency: v.median_latency(),
tcp_retransmits: v.tcp_retransmits,

View File

@ -147,13 +147,14 @@ fn throughput_task(
// Formerly a "spawn blocking" blob
{
let mut raw_data = THROUGHPUT_TRACKER.raw_data.lock().unwrap();
let mut net_json_calc = NETWORK_JSON.write().unwrap();
timer_metrics.update_cycle = timer_metrics.start.elapsed().as_secs_f64();
net_json_calc.zero_throughput_and_rtt();
timer_metrics.zero_throughput_and_rtt = timer_metrics.start.elapsed().as_secs_f64();
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt(&mut raw_data);
timer_metrics.copy_previous_and_reset_rtt = timer_metrics.start.elapsed().as_secs_f64();
THROUGHPUT_TRACKER.apply_new_throughput_counters(&mut net_json_calc);
THROUGHPUT_TRACKER.apply_new_throughput_counters(&mut net_json_calc, &mut raw_data);
timer_metrics.apply_new_throughput_counters = timer_metrics.start.elapsed().as_secs_f64();
THROUGHPUT_TRACKER.apply_flow_data(
timeout_seconds,
@ -163,12 +164,14 @@ fn throughput_task(
&mut rtt_circuit_tracker,
&mut tcp_retries,
&mut expired_flows,
&mut raw_data,
);
rtt_circuit_tracker.clear();
tcp_retries.clear();
expired_flows.clear();
timer_metrics.apply_flow_data = timer_metrics.start.elapsed().as_secs_f64();
THROUGHPUT_TRACKER.apply_queue_stats(&mut net_json_calc);
THROUGHPUT_TRACKER.apply_queue_stats(&mut net_json_calc, &mut raw_data);
std::mem::drop(raw_data);
timer_metrics.apply_queue_stats = timer_metrics.start.elapsed().as_secs_f64();
THROUGHPUT_TRACKER.update_totals();
timer_metrics.update_totals = timer_metrics.start.elapsed().as_secs_f64();
@ -271,11 +274,11 @@ fn submit_throughput_stats(long_term_stats_tx: Sender<StatsUpdateMessage>, scale
}
let hosts = THROUGHPUT_TRACKER
.raw_data
.raw_data.lock().unwrap()
.iter()
//.filter(|host| host.median_latency().is_some())
.map(|host| HostSummary {
ip: host.key().as_ip(),
.map(|(k, host)| HostSummary {
ip: k.as_ip(),
circuit_id: host.circuit_id.clone(),
bits_per_second: (scale_u64_by_f64(host.bytes_per_second.down * 8, scale), scale_u64_by_f64(host.bytes_per_second.up * 8, scale)),
median_rtt: host.median_latency().unwrap_or(0.0),
@ -324,8 +327,8 @@ pub fn current_throughput() -> BusResponse {
pub fn host_counters() -> BusResponse {
let mut result = Vec::new();
THROUGHPUT_TRACKER.raw_data.iter().for_each(|v| {
let ip = v.key().as_ip();
THROUGHPUT_TRACKER.raw_data.lock().unwrap().iter().for_each(|(k,v)| {
let ip = k.as_ip();
result.push((ip, v.bytes_per_second));
});
BusResponse::HostCounters(result)
@ -345,12 +348,14 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.lock()
.unwrap()
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.map(|te| {
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.map(|(k,te)| {
(
*te.key(),
*k,
te.bytes_per_second,
te.packets_per_second,
te.median_latency().unwrap_or(0.0),
@ -396,13 +401,14 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.lock().unwrap()
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency().is_some())
.map(|te| {
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k, d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(_k, te)| te.median_latency().is_some())
.map(|(k,te)| {
(
*te.key(),
*k,
te.bytes_per_second,
te.packets_per_second,
te.median_latency().unwrap_or(0.0),
@ -448,13 +454,14 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse {
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.lock().unwrap()
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency().is_some())
.map(|te| {
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(_k, te)| te.median_latency().is_some())
.map(|(k,te)| {
(
*te.key(),
*k,
te.bytes_per_second,
te.packets_per_second,
te.median_latency().unwrap_or(0.0),
@ -504,13 +511,14 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.lock().unwrap()
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|te| te.median_latency().is_some())
.map(|te| {
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k, d)| retire_check(tp_cycle, d.most_recent_cycle))
.filter(|(_k, te)| te.median_latency().is_some())
.map(|(k,te)| {
(
*te.key(),
*k,
te.bytes_per_second,
te.packets_per_second,
te.median_latency().unwrap_or(0.0),
@ -556,9 +564,10 @@ pub fn xdp_pping_compat() -> BusResponse {
.load(std::sync::atomic::Ordering::Relaxed);
let result = THROUGHPUT_TRACKER
.raw_data
.lock().unwrap()
.iter()
.filter(|d| retire_check(raw_cycle, d.most_recent_cycle))
.filter_map(|data| {
.filter(|(_k,d)| retire_check(raw_cycle, d.most_recent_cycle))
.filter_map(|(_k,data)| {
if data.tc_handle.as_u32() > 0 {
let mut valid_samples: Vec<u32> = data
.recent_rtt_data
@ -599,10 +608,11 @@ pub fn rtt_histogram<const N: usize>() -> BusResponse {
let reader_cycle = THROUGHPUT_TRACKER
.cycle
.load(std::sync::atomic::Ordering::Relaxed);
for data in THROUGHPUT_TRACKER
for (_k,data) in THROUGHPUT_TRACKER
.raw_data
.lock().unwrap()
.iter()
.filter(|d| retire_check(reader_cycle, d.most_recent_cycle))
.filter(|(_k,d)| retire_check(reader_cycle, d.most_recent_cycle))
{
let valid_samples: Vec<f64> = data
.recent_rtt_data
@ -630,9 +640,10 @@ pub fn host_counts() -> BusResponse {
.load(std::sync::atomic::Ordering::Relaxed);
THROUGHPUT_TRACKER
.raw_data
.lock().unwrap()
.iter()
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
.for_each(|d| {
.filter(|(_k,d)| retire_check(tp_cycle, d.most_recent_cycle))
.for_each(|(_k,d)| {
total += 1;
if d.tc_handle.as_u32() != 0 {
shaped += 1;
@ -658,13 +669,14 @@ pub fn all_unknown_ips() -> BusResponse {
let mut full_list: Vec<FullList> = {
THROUGHPUT_TRACKER
.raw_data
.lock().unwrap()
.iter()
.filter(|v| !v.key().as_ip().is_loopback())
.filter(|d| d.tc_handle.as_u32() == 0)
.filter(|d| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|te| {
.filter(|(k,_v)| !k.as_ip().is_loopback())
.filter(|(_k,d)| d.tc_handle.as_u32() == 0)
.filter(|(_k,d)| d.last_seen as u128 > five_minutes_ago_nanoseconds)
.map(|(k,te)| {
(
*te.key(),
*k,
te.bytes,
te.packets,
te.median_latency().unwrap_or(0.0),

View File

@ -1,7 +1,7 @@
use std::{sync::atomic::AtomicU64, time::Duration};
use std::sync::Mutex;
use crate::{shaped_devices_tracker::SHAPED_DEVICES, stats::HIGH_WATERMARK, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}};
use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use fxhash::FxHashMap;
use tracing::{info, warn};
use lqos_bus::TcHandle;
@ -13,7 +13,7 @@ use lqos_utils::units::{AtomicDownUp, DownUpOrder};
pub struct ThroughputTracker {
pub(crate) cycle: AtomicU64,
pub(crate) raw_data: DashMap<XdpIpAddress, ThroughputEntry>,
pub(crate) raw_data: Mutex<FxHashMap<XdpIpAddress, ThroughputEntry>>,
pub(crate) bytes_per_second: AtomicDownUp,
pub(crate) packets_per_second: AtomicDownUp,
pub(crate) shaped_bytes_per_second: AtomicDownUp,
@ -27,17 +27,17 @@ impl ThroughputTracker {
// first few cycles, but it should be fine after that.
Self {
cycle: AtomicU64::new(RETIRE_AFTER_SECONDS),
raw_data: DashMap::default(),
raw_data: Mutex::new(FxHashMap::default()),
bytes_per_second: AtomicDownUp::zeroed(),
packets_per_second: AtomicDownUp::zeroed(),
shaped_bytes_per_second: AtomicDownUp::zeroed(),
}
}
pub(crate) fn copy_previous_and_reset_rtt(&self) {
pub(crate) fn copy_previous_and_reset_rtt(&self, raw_data: &mut FxHashMap<XdpIpAddress, ThroughputEntry>) {
// Copy previous byte/packet numbers and reset RTT data
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
self.raw_data.iter_mut().for_each(|mut v| {
raw_data.iter_mut().for_each(|(_k,v)| {
if v.first_cycle < self_cycle {
v.bytes_per_second = v.bytes.checked_sub_or_zero(v.prev_bytes);
v.packets_per_second = v.packets.checked_sub_or_zero(v.prev_packets);
@ -95,8 +95,9 @@ impl ThroughputTracker {
}
pub(crate) fn refresh_circuit_ids(&self, lock: &NetworkJson) {
self.raw_data.iter_mut().for_each(|mut data| {
data.circuit_id = Self::lookup_circuit_id(data.key());
let mut raw_data = self.raw_data.lock().unwrap();
raw_data.iter_mut().for_each(|(key, data)| {
data.circuit_id = Self::lookup_circuit_id(key);
data.network_json_parents =
Self::lookup_network_parents(data.circuit_id.clone(), lock);
});
@ -105,11 +106,11 @@ impl ThroughputTracker {
pub(crate) fn apply_new_throughput_counters(
&self,
net_json_calc: &mut NetworkJson,
raw_data: &mut FxHashMap<XdpIpAddress, ThroughputEntry>,
) {
let raw_data = &self.raw_data;
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
throughput_for_each(&mut |xdp_ip, counts| {
if let Some(mut entry) = raw_data.get_mut(xdp_ip) {
if let Some(entry) = raw_data.get_mut(xdp_ip) {
entry.bytes = DownUpOrder::zeroed();
entry.packets = DownUpOrder::zeroed();
for c in counts {
@ -167,13 +168,13 @@ impl ThroughputTracker {
});
}
pub(crate) fn apply_queue_stats(&self, net_json_calc: &mut NetworkJson) {
pub(crate) fn apply_queue_stats(&self, net_json_calc: &mut NetworkJson, raw_data: &mut FxHashMap<XdpIpAddress, ThroughputEntry>) {
// Apply totals
ALL_QUEUE_SUMMARY.calculate_total_queue_stats();
// Iterate through the queue data and find the matching circuit_id
ALL_QUEUE_SUMMARY.iterate_queues(|circuit_id, drops, marks| {
if let Some(entry) = self.raw_data.iter().find(|v| {
if let Some((_k, entry)) = raw_data.iter().find(|(_k,v)| {
match v.circuit_id {
Some(ref id) => id == circuit_id,
None => false,
@ -197,6 +198,7 @@ impl ThroughputTracker {
rtt_circuit_tracker: &mut FxHashMap<XdpIpAddress, [Vec<RttData>; 2]>,
tcp_retries: &mut FxHashMap<XdpIpAddress, DownUpOrder<u64>>,
expired_keys: &mut Vec<FlowbeeKey>,
raw_data: &mut FxHashMap<XdpIpAddress, ThroughputEntry>,
) {
//log::debug!("Flowbee events this second: {}", get_flowbee_event_count_and_reset());
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
@ -257,7 +259,7 @@ impl ThroughputTracker {
}
// TCP - we have RTT data? 6 is TCP
if key.ip_protocol == 6 && data.end_status == 0 && self.raw_data.contains_key(&key.local_ip) {
if key.ip_protocol == 6 && data.end_status == 0 && raw_data.contains_key(&key.local_ip) {
if let Some(rtt) = rtt_samples.get(&key) {
// Add the RTT data to the per-circuit tracker
if let Some(tracker) = rtt_circuit_tracker.get_mut(&key.local_ip) {
@ -297,7 +299,7 @@ impl ThroughputTracker {
if !rtts.is_empty() {
rtts.sort();
let median = rtts[rtts.len() / 2];
if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) {
if let Some(tracker) = raw_data.get_mut(&local_ip) {
// Only apply if the flow has achieved 1 Mbps or more
if tracker.bytes_per_second.sum_exceeds(125_000) {
// Shift left
@ -318,12 +320,12 @@ impl ThroughputTracker {
// Merge in the TCP retries
// Reset all entries in the tracker to 0
for mut circuit in self.raw_data.iter_mut() {
for (_,circuit) in raw_data.iter_mut() {
circuit.tcp_retransmits = DownUpOrder::zeroed();
}
// Apply the new ones
for (local_ip, retries) in tcp_retries {
if let Some(mut tracker) = self.raw_data.get_mut(&local_ip) {
if let Some(tracker) = raw_data.get_mut(&local_ip) {
tracker.tcp_retransmits.down = retries.down.saturating_sub(tracker.prev_tcp_retransmits.down);
tracker.tcp_retransmits.up = retries.up.saturating_sub(tracker.prev_tcp_retransmits.up);
tracker.prev_tcp_retransmits.down = retries.down;
@ -355,7 +357,7 @@ impl ThroughputTracker {
// Cleaning run
all_flows_lock.retain(|_k,v| v.0.last_seen >= expire);
expire_rtt_flows();
self.raw_data.shrink_to_fit();
raw_data.shrink_to_fit();
}
}
@ -366,12 +368,14 @@ impl ThroughputTracker {
self.shaped_bytes_per_second.set_to_zero();
self
.raw_data
.lock()
.unwrap()
.iter()
.filter(|v|
.filter(|(_k,v)|
v.most_recent_cycle == current_cycle &&
v.first_cycle + 2 < current_cycle
)
.map(|v| {
.map(|(_k,v)| {
(
v.bytes.down.saturating_sub(v.prev_bytes.down),
v.bytes.up.saturating_sub(v.prev_bytes.up),
@ -405,7 +409,6 @@ impl ThroughputTracker {
pub(crate) fn next_cycle(&self) {
self.cycle.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.raw_data.shrink_to_fit();
}
pub(crate) fn bits_per_second(&self) -> DownUpOrder<u64> {
@ -422,8 +425,8 @@ impl ThroughputTracker {
#[allow(dead_code)]
pub(crate) fn dump(&self) {
for v in self.raw_data.iter() {
let ip = v.key().as_ip();
for (k,v) in self.raw_data.lock().unwrap().iter() {
let ip = k.as_ip();
info!("{:<34}{:?}", ip, v.tc_handle);
}
}