From afda5d3bdf73bf950759465ae032fbba782daac4 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Tue, 16 Jul 2024 15:26:15 -0500 Subject: [PATCH] Work in progress: added channels for circuit capacity and tree node capacity, ready for dashboard entries. --- .../src/node_manager/ws/published_channels.rs | 2 + src/rust/lqosd/src/node_manager/ws/ticker.rs | 5 ++ .../ws/ticker/circuit_capacity.rs | 78 +++++++++++++++++++ .../node_manager/ws/ticker/tree_capacity.rs | 60 ++++++++++++++ 4 files changed, 145 insertions(+) create mode 100644 src/rust/lqosd/src/node_manager/ws/ticker/circuit_capacity.rs create mode 100644 src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs diff --git a/src/rust/lqosd/src/node_manager/ws/published_channels.rs b/src/rust/lqosd/src/node_manager/ws/published_channels.rs index b0b6b670..c7e5a256 100644 --- a/src/rust/lqosd/src/node_manager/ws/published_channels.rs +++ b/src/rust/lqosd/src/node_manager/ws/published_channels.rs @@ -21,4 +21,6 @@ pub enum PublishedChannels { QueueStatsTotal, NetworkTree, NetworkTreeClients, + CircuitCapacity, + TreeCapacity, } diff --git a/src/rust/lqosd/src/node_manager/ws/ticker.rs b/src/rust/lqosd/src/node_manager/ws/ticker.rs index 189eadf5..2bc0579f 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker.rs @@ -14,6 +14,9 @@ pub mod system_info; mod tree_summary; mod queue_stats_total; mod network_tree; +mod circuit_capacity; +mod tree_capacity; + pub use network_tree::{Circuit, all_circuits}; /// Runs a periodic tick to feed data to the node manager. @@ -45,6 +48,8 @@ async fn one_second_cadence(channels: Arc) { flow_endpoints::ip_protocols(channels.clone()), tree_summary::tree_summary(channels.clone()), network_tree::network_tree(channels.clone()), + circuit_capacity::circuit_capacity(channels.clone()), + tree_capacity::tree_capacity(channels.clone()), ); } } diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/circuit_capacity.rs b/src/rust/lqosd/src/node_manager/ws/ticker/circuit_capacity.rs new file mode 100644 index 00000000..8d6781d4 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/ws/ticker/circuit_capacity.rs @@ -0,0 +1,78 @@ +use std::collections::HashMap; +use std::sync::Arc; +use serde::Serialize; +use serde_json::json; +use lqos_utils::units::DownUpOrder; +use crate::node_manager::ws::publish_subscribe::PubSub; +use crate::node_manager::ws::published_channels::PublishedChannels; +use crate::node_manager::ws::published_channels::PublishedChannels::CircuitCapacity; +use crate::shaped_devices_tracker::SHAPED_DEVICES; +use crate::throughput_tracker::THROUGHPUT_TRACKER; + +struct CircuitAccumulator { + bytes: DownUpOrder, + median_rtt: f32, +} + +#[derive(Serialize)] +struct Capacity { + circuit_id: String, + circuit_name: String, + capacity: [f64; 2], + median_rtt: f32, +} + +pub async fn circuit_capacity(channels: Arc) { + if !channels.is_channel_alive(PublishedChannels::CircuitCapacity).await { + return; + } + + let mut circuits: HashMap = HashMap::new(); + + // Aggregate the data by circuit id + THROUGHPUT_TRACKER.raw_data.iter().for_each(|c| { + if let Some(circuit_id) = &c.circuit_id { + if let Some(accumulator) = circuits.get_mut(circuit_id) { + accumulator.bytes += c.bytes; + if let Some(latency) = c.median_latency() { + accumulator.median_rtt = latency; + } + } else { + circuits.insert(circuit_id.clone(), CircuitAccumulator { + bytes: c.bytes, + median_rtt: c.median_latency().unwrap_or(0.0), + }); + } + } + }); + + // Map circuits to capacities + let capacities: Vec = { + let shaped_devices = SHAPED_DEVICES.read().unwrap(); + circuits.iter().filter_map(|(circuit_id, accumulator)| { + if let Some(device) = shaped_devices.devices.iter().find(|sd| sd.circuit_id == *circuit_id) { + let down_mbps = accumulator.bytes.down as f64 * 8.0 / 1_000_000.0; + let down = down_mbps / device.download_max_mbps as f64; + let up_mbps = accumulator.bytes.up as f64 * 8.0 / 1_000_000.0; + let up = up_mbps / device.upload_max_mbps as f64; + + Some(Capacity { + circuit_name: device.circuit_name.clone(), + circuit_id: circuit_id.clone(), + capacity: [down, up], + median_rtt: accumulator.median_rtt, + }) + } else { + None + } + }).collect() + }; + + let message = json!( + { + "event": PublishedChannels::CircuitCapacity.to_string(), + "data": capacities, + } + ).to_string(); + channels.send(PublishedChannels::CircuitCapacity, message).await; +} \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs b/src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs new file mode 100644 index 00000000..c3f70699 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs @@ -0,0 +1,60 @@ +use std::sync::Arc; +use serde::Serialize; +use serde_json::json; +use crate::node_manager::ws::publish_subscribe::PubSub; +use crate::node_manager::ws::published_channels::PublishedChannels; +use crate::shaped_devices_tracker::NETWORK_JSON; + +#[derive(Serialize)] +struct NodeCapacity { + name: String, + down: f64, + up: f64, + max_down: f64, + max_up: f64, + median_rtt: f32, +} + +pub async fn tree_capacity(channels: Arc) { + if !channels.is_channel_alive(PublishedChannels::TreeCapacity).await { + return; + } + + let capacities: Vec<_> = { + let reader = NETWORK_JSON.read().unwrap(); + reader.get_nodes_when_ready().iter().map(|node| { + let node = node.clone_to_transit(); + let down = node.current_throughput.0 as f64 * 8.0 / 1_000_000.0; + let up = node.current_throughput.1 as f64 * 8.0 / 1_000_000.0; + let max_down = node.max_throughput.0 as f64; + let max_up = node.max_throughput.1 as f64; + let median_rtt = if node.rtts.is_empty() { + 0.0 + } else { + let n = node.rtts.len() / 2; + if node.rtts.len() % 2 == 0 { + (node.rtts[n - 1] + node.rtts[n]) / 2.0 + } else { + node.rtts[n] + } + }; + + NodeCapacity { + name: node.name.clone(), + down, + up, + max_down, + max_up, + median_rtt, + } + }).collect() + }; + + let message = json!( + { + "event": PublishedChannels::TreeCapacity.to_string(), + "data": capacities, + } + ).to_string(); + channels.send(PublishedChannels::TreeCapacity, message).await; +} \ No newline at end of file