Work in progress: added channels for circuit capacity and tree node capacity, ready for dashboard entries.

This commit is contained in:
Herbert Wolverson 2024-07-16 15:26:15 -05:00
parent 46e9182561
commit afda5d3bdf
4 changed files with 145 additions and 0 deletions

View File

@ -21,4 +21,6 @@ pub enum PublishedChannels {
QueueStatsTotal,
NetworkTree,
NetworkTreeClients,
CircuitCapacity,
TreeCapacity,
}

View File

@ -14,6 +14,9 @@ pub mod system_info;
mod tree_summary;
mod queue_stats_total;
mod network_tree;
mod circuit_capacity;
mod tree_capacity;
pub use network_tree::{Circuit, all_circuits};
/// Runs a periodic tick to feed data to the node manager.
@ -45,6 +48,8 @@ async fn one_second_cadence(channels: Arc<PubSub>) {
flow_endpoints::ip_protocols(channels.clone()),
tree_summary::tree_summary(channels.clone()),
network_tree::network_tree(channels.clone()),
circuit_capacity::circuit_capacity(channels.clone()),
tree_capacity::tree_capacity(channels.clone()),
);
}
}

View File

@ -0,0 +1,78 @@
use std::collections::HashMap;
use std::sync::Arc;
use serde::Serialize;
use serde_json::json;
use lqos_utils::units::DownUpOrder;
use crate::node_manager::ws::publish_subscribe::PubSub;
use crate::node_manager::ws::published_channels::PublishedChannels;
use crate::node_manager::ws::published_channels::PublishedChannels::CircuitCapacity;
use crate::shaped_devices_tracker::SHAPED_DEVICES;
use crate::throughput_tracker::THROUGHPUT_TRACKER;
struct CircuitAccumulator {
bytes: DownUpOrder<u64>,
median_rtt: f32,
}
#[derive(Serialize)]
struct Capacity {
circuit_id: String,
circuit_name: String,
capacity: [f64; 2],
median_rtt: f32,
}
pub async fn circuit_capacity(channels: Arc<PubSub>) {
if !channels.is_channel_alive(PublishedChannels::CircuitCapacity).await {
return;
}
let mut circuits: HashMap<String, CircuitAccumulator> = HashMap::new();
// Aggregate the data by circuit id
THROUGHPUT_TRACKER.raw_data.iter().for_each(|c| {
if let Some(circuit_id) = &c.circuit_id {
if let Some(accumulator) = circuits.get_mut(circuit_id) {
accumulator.bytes += c.bytes;
if let Some(latency) = c.median_latency() {
accumulator.median_rtt = latency;
}
} else {
circuits.insert(circuit_id.clone(), CircuitAccumulator {
bytes: c.bytes,
median_rtt: c.median_latency().unwrap_or(0.0),
});
}
}
});
// Map circuits to capacities
let capacities: Vec<Capacity> = {
let shaped_devices = SHAPED_DEVICES.read().unwrap();
circuits.iter().filter_map(|(circuit_id, accumulator)| {
if let Some(device) = shaped_devices.devices.iter().find(|sd| sd.circuit_id == *circuit_id) {
let down_mbps = accumulator.bytes.down as f64 * 8.0 / 1_000_000.0;
let down = down_mbps / device.download_max_mbps as f64;
let up_mbps = accumulator.bytes.up as f64 * 8.0 / 1_000_000.0;
let up = up_mbps / device.upload_max_mbps as f64;
Some(Capacity {
circuit_name: device.circuit_name.clone(),
circuit_id: circuit_id.clone(),
capacity: [down, up],
median_rtt: accumulator.median_rtt,
})
} else {
None
}
}).collect()
};
let message = json!(
{
"event": PublishedChannels::CircuitCapacity.to_string(),
"data": capacities,
}
).to_string();
channels.send(PublishedChannels::CircuitCapacity, message).await;
}

View File

@ -0,0 +1,60 @@
use std::sync::Arc;
use serde::Serialize;
use serde_json::json;
use crate::node_manager::ws::publish_subscribe::PubSub;
use crate::node_manager::ws::published_channels::PublishedChannels;
use crate::shaped_devices_tracker::NETWORK_JSON;
#[derive(Serialize)]
struct NodeCapacity {
name: String,
down: f64,
up: f64,
max_down: f64,
max_up: f64,
median_rtt: f32,
}
pub async fn tree_capacity(channels: Arc<PubSub>) {
if !channels.is_channel_alive(PublishedChannels::TreeCapacity).await {
return;
}
let capacities: Vec<_> = {
let reader = NETWORK_JSON.read().unwrap();
reader.get_nodes_when_ready().iter().map(|node| {
let node = node.clone_to_transit();
let down = node.current_throughput.0 as f64 * 8.0 / 1_000_000.0;
let up = node.current_throughput.1 as f64 * 8.0 / 1_000_000.0;
let max_down = node.max_throughput.0 as f64;
let max_up = node.max_throughput.1 as f64;
let median_rtt = if node.rtts.is_empty() {
0.0
} else {
let n = node.rtts.len() / 2;
if node.rtts.len() % 2 == 0 {
(node.rtts[n - 1] + node.rtts[n]) / 2.0
} else {
node.rtts[n]
}
};
NodeCapacity {
name: node.name.clone(),
down,
up,
max_down,
max_up,
median_rtt,
}
}).collect()
};
let message = json!(
{
"event": PublishedChannels::TreeCapacity.to_string(),
"data": capacities,
}
).to_string();
channels.send(PublishedChannels::TreeCapacity, message).await;
}