diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index b9c8b5fd..f422a358 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1334,6 +1334,7 @@ dependencies = [ "ip_network_table", "log", "serde", + "serde_json", "sha2", "thiserror", "toml 0.7.2", diff --git a/src/rust/lqos_bus/src/bus/request.rs b/src/rust/lqos_bus/src/bus/request.rs index 42962022..448f936b 100644 --- a/src/rust/lqos_bus/src/bus/request.rs +++ b/src/rust/lqos_bus/src/bus/request.rs @@ -111,6 +111,12 @@ pub enum BusRequest { /// Request that the Rust side of things validate the CSV ValidateShapedDevicesCsv, + /// Request details of part of the network tree + GetNetworkMap{ + /// The parent of the map to retrieve + parent: usize + }, + /// If running on Equinix (the `equinix_test` feature is enabled), /// display a "run bandwidht test" link. #[cfg(feature = "equinix_tests")] diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index d6a4aa44..666aaa0f 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -68,4 +68,7 @@ pub enum BusResponse { /// A string containing a JSON dump of a queue stats. Analagos to /// the response from `tc show qdisc`. RawQueueData(String), + + /// Results from network map queries + NetworkMap(Vec<(usize, lqos_config::NetworkJsonNode)>), } diff --git a/src/rust/lqos_config/Cargo.toml b/src/rust/lqos_config/Cargo.toml index 6287680c..3a4d942c 100644 --- a/src/rust/lqos_config/Cargo.toml +++ b/src/rust/lqos_config/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" thiserror = "1" toml = "0" serde = { version = "1.0", features = [ "derive" ] } +serde_json = "1" csv = "1" ip_network_table = "0" ip_network = "0" diff --git a/src/rust/lqos_config/src/lib.rs b/src/rust/lqos_config/src/lib.rs index 3d48ae2b..ba4b04af 100644 --- a/src/rust/lqos_config/src/lib.rs +++ b/src/rust/lqos_config/src/lib.rs @@ -11,12 +11,14 @@ mod etc; mod libre_qos_config; mod program_control; mod shaped_devices; +mod network_json; pub use authentication::{UserRole, WebUsers}; pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables}; pub use libre_qos_config::LibreQoSConfig; pub use program_control::load_libreqos; pub use shaped_devices::{ConfigShapedDevices, ShapedDevice}; +pub use network_json::{NetworkJson, NetworkJsonNode}; /// Used as a constant in determining buffer preallocation pub const SUPPORTED_CUSTOMERS: usize = 16_000_000; diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs new file mode 100644 index 00000000..cfcd531a --- /dev/null +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -0,0 +1,230 @@ +use crate::etc; +use log::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use std::{ + fs, + path::{Path, PathBuf}, +}; +use thiserror::Error; + +/// Describes a node in the network map tree. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct NetworkJsonNode { + /// The node name, as it appears in `network.json` + pub name: String, + + /// The maximum throughput allowed per `network.json` for this node + pub max_throughput: (u32, u32), // In mbps + + /// Current throughput (in bytes/second) at this node + pub current_throughput: (u64, u64), // In bytes + + /// Approximate RTTs reported for this level of the tree. + /// It's never going to be as statistically accurate as the actual + /// numbers, being based on medians. + pub rtts: Vec, + + /// A list of indices in the `NetworkJson` vector of nodes + /// linking to parent nodes + pub parents: Vec, + + /// The immediate parent node + pub immediate_parent: Option, +} + +/// Holder for the network.json representation. +/// This is condensed into a single level vector with index-based referencing +/// for easy use in funnel calculations. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct NetworkJson { + nodes: Vec, +} + +impl Default for NetworkJson { + fn default() -> Self { + Self::new() + } +} + +impl NetworkJson { + /// Generates an empty network.json + pub fn new() -> Self { + Self { nodes: Vec::new() } + } + + /// The path to the current `network.json` file, determined + /// by acquiring the prefix from the `/etc/lqos.conf` configuration + /// file. + pub fn path() -> Result { + let cfg = + etc::EtcLqos::load().map_err(|_| NetworkJsonError::ConfigLoadError)?; + let base_path = Path::new(&cfg.lqos_directory); + Ok(base_path.join("network.json")) + } + + /// Does network.json exist? + pub fn exists() -> bool { + if let Ok(path) = Self::path() { + path.exists() + } else { + false + } + } + + /// Attempt to load network.json from disk + pub fn load() -> Result { + let mut nodes = vec![NetworkJsonNode { + name: "Root".to_string(), + max_throughput: (0, 0), + current_throughput: (0, 0), + parents: Vec::new(), + immediate_parent: None, + rtts: Vec::new(), + }]; + if !Self::exists() { + return Err(NetworkJsonError::FileNotFound); + } + let path = Self::path()?; + let raw = fs::read_to_string(path) + .map_err(|_| NetworkJsonError::ConfigLoadError)?; + let json: Value = serde_json::from_str(&raw) + .map_err(|_| NetworkJsonError::ConfigLoadError)?; + + // Start reading from the top. We are at the root node. + let parents = vec![0]; + if let Value::Object(map) = &json { + for (key, value) in map.iter() { + if let Value::Object(inner_map) = value { + recurse_node(&mut nodes, key, inner_map, &parents, 0); + } + } + } + + Ok(Self { nodes }) + } + + /// Find the index of a circuit_id + pub fn get_index_for_name(&self, name: &str) -> Option { + self.nodes.iter().position(|n| n.name == name) + } + + /// Retrieve a cloned copy of a NetworkJsonNode entry, or None if there isn't + /// an entry at that index. + pub fn get_cloned_entry_by_index( + &self, + index: usize, + ) -> Option { + self.nodes.get(index).cloned() + } + + /// Retrieve a cloned copy of all children with a parent containing a specific + /// node index. + pub fn get_cloned_children(&self, index: usize) -> Vec<(usize, NetworkJsonNode)> { + self + .nodes + .iter() + .enumerate() + .filter(|(_i,n)| n.immediate_parent == Some(index)) + .map(|(i, n)| (i, n.clone())) + .collect() + } + + /// Find a circuit_id, and if it exists return its list of parent nodes + /// as indices within the network_json layout. + pub fn get_parents_for_circuit_id( + &self, + circuit_id: &str, + ) -> Option> { + self + .nodes + .iter() + .find(|n| n.name == circuit_id) + .map(|node| node.parents.clone()) + } + + /// Sets all current throughput values to zero + pub fn zero_throughput_and_rtt(&mut self) { + self.nodes.iter_mut().for_each(|n| { + n.current_throughput = (0, 0); + n.rtts.clear(); + }); + } + + /// Add throughput numbers to node entries + pub fn add_throughput_cycle( + &mut self, + targets: &[usize], + bytes: (u64, u64), + median_rtt: f32, + ) { + for idx in targets { + // Safety first: use "get" to ensure that the node exists + if let Some(node) = self.nodes.get_mut(*idx) { + node.current_throughput.0 += bytes.0; + node.current_throughput.1 += bytes.1; + if median_rtt > 0.0 { + node.rtts.push(median_rtt); + } + } else { + warn!("No network tree entry for index {idx}"); + } + } + } +} + +fn json_to_u32(val: Option<&Value>) -> u32 { + if let Some(val) = val { + if let Some(n) = val.as_u64() { + n as u32 + } else { + 0 + } + } else { + 0 + } +} + +fn recurse_node( + nodes: &mut Vec, + name: &str, + json: &Map, + parents: &[usize], + immediate_parent: usize, +) { + info!("Mapping {name} from network.json"); + let node = NetworkJsonNode { + parents: parents.to_vec(), + max_throughput: ( + json_to_u32(json.get("downloadBandwidthMbps")), + json_to_u32(json.get("uploadBandwidthMbps")), + ), + current_throughput: (0, 0), + name: name.to_string(), + immediate_parent: Some(immediate_parent), + rtts: Vec::new(), + }; + + let my_id = nodes.len(); + nodes.push(node); + let mut parents = parents.to_vec(); + parents.push(my_id); + + // Recurse children + for (key, value) in json.iter() { + let key_str = key.as_str(); + if key_str != "uploadBandwidthMbps" && key_str != "downloadBandwidthMbps" { + if let Value::Object(value) = value { + recurse_node(nodes, key, value, &parents, my_id); + } + } + } +} + +#[derive(Error, Debug)] +pub enum NetworkJsonError { + #[error("Unable to find or load network.json")] + ConfigLoadError, + #[error("network.json not found or does not exist")] + FileNotFound, +} diff --git a/src/rust/lqos_node_manager/src/main.rs b/src/rust/lqos_node_manager/src/main.rs index 64264291..b9fe840b 100644 --- a/src/rust/lqos_node_manager/src/main.rs +++ b/src/rust/lqos_node_manager/src/main.rs @@ -10,6 +10,7 @@ use rocket_async_compression::Compression; mod auth_guard; mod config_control; mod queue_info; +mod network_tree; // Use JemAllocator only on supported platforms #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] @@ -38,6 +39,7 @@ fn rocket() -> _ { static_pages::unknown_devices_page, static_pages::circuit_queue, config_control::config_page, + network_tree::tree_page, // Our JS library static_pages::lqos_js, static_pages::lqos_css, @@ -77,6 +79,7 @@ fn rocket() -> _ { auth_guard::admin_check, static_pages::login_page, auth_guard::username, + network_tree::tree_entry, // Supporting files static_pages::bootsrap_css, static_pages::plotly_js, diff --git a/src/rust/lqos_node_manager/src/network_tree.rs b/src/rust/lqos_node_manager/src/network_tree.rs new file mode 100644 index 00000000..c79c065d --- /dev/null +++ b/src/rust/lqos_node_manager/src/network_tree.rs @@ -0,0 +1,26 @@ +use lqos_bus::{bus_request, BusRequest, BusResponse}; +use lqos_config::NetworkJsonNode; +use rocket::{fs::NamedFile, serde::json::Json}; + +use crate::cache_control::NoCache; + +// Note that NoCache can be replaced with a cache option +// once the design work is complete. +#[get("/tree")] +pub async fn tree_page<'a>() -> NoCache> { + NoCache::new(NamedFile::open("static/tree.html").await.ok()) +} + +#[get("/api/network_tree/")] +pub async fn tree_entry( + parent: usize, +) -> NoCache>> { + let responses = + bus_request(vec![BusRequest::GetNetworkMap { parent }]).await.unwrap(); + let result = match &responses[0] { + BusResponse::NetworkMap(nodes) => nodes.to_owned(), + _ => Vec::new(), + }; + + NoCache::new(Json(result)) +} diff --git a/src/rust/lqos_node_manager/static/main.html b/src/rust/lqos_node_manager/static/main.html index cbc6301f..468599b5 100644 --- a/src/rust/lqos_node_manager/static/main.html +++ b/src/rust/lqos_node_manager/static/main.html @@ -26,9 +26,9 @@ Dashboard - + diff --git a/src/rust/lqos_node_manager/static/tree.html b/src/rust/lqos_node_manager/static/tree.html new file mode 100644 index 00000000..5997783c --- /dev/null +++ b/src/rust/lqos_node_manager/static/tree.html @@ -0,0 +1,129 @@ + + + + + + + + + LibreQoS - Local Node Manager + + + + + + + + + +
+ +
+
+ THIS NODE +
+
+ +
+
+
+
+
+ +
+ +
© 2022-2023, LibreQoE LLC
+ + + + + diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index d35279e8..8923e20c 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -3,8 +3,8 @@ mod ip_mapping; #[cfg(feature = "equinix_tests")] mod lqos_daht_test; mod program_control; -mod throughput_tracker; mod shaped_devices_tracker; +mod throughput_tracker; mod tuning; mod validation; use crate::{ @@ -64,7 +64,11 @@ async fn main() -> Result<()> { }; // Spawn tracking sub-systems - join!(spawn_queue_structure_monitor(), shaped_devices_tracker::shaped_devices_watcher()); + join!( + spawn_queue_structure_monitor(), + shaped_devices_tracker::shaped_devices_watcher(), + shaped_devices_tracker::network_json_watcher() + ); throughput_tracker::spawn_throughput_monitor(); spawn_queue_monitor(); @@ -156,7 +160,10 @@ fn handle_bus_requests( BusRequest::RequestLqosEquinixTest => lqos_daht_test::lqos_daht_test(), BusRequest::ValidateShapedDevicesCsv => { validation::validate_shaped_devices_csv() - } + }, + BusRequest::GetNetworkMap { parent } => { + shaped_devices_tracker::get_one_network_map_layer(*parent) + }, }); } } diff --git a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs index 8a34c626..70f62e56 100644 --- a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs +++ b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs @@ -1,10 +1,13 @@ use anyhow::Result; use log::{error, info, warn}; +use lqos_bus::BusResponse; use lqos_config::ConfigShapedDevices; use lqos_utils::file_watcher::FileWatcher; use once_cell::sync::Lazy; use parking_lot::RwLock; use tokio::task::spawn_blocking; +mod netjson; +pub use netjson::*; pub static SHAPED_DEVICES: Lazy> = Lazy::new(|| RwLock::new(ConfigShapedDevices::default())); @@ -50,3 +53,14 @@ fn watch_for_shaped_devices_changing() -> Result<()> { info!("ShapedDevices watcher returned: {result:?}"); } } + +pub fn get_one_network_map_layer(parent_idx: usize) -> BusResponse { + let net_json = NETWORK_JSON.read(); + if let Some(parent) = net_json.get_cloned_entry_by_index(parent_idx) { + let mut nodes = vec![(parent_idx, parent)]; + nodes.extend_from_slice(&net_json.get_cloned_children(parent_idx)); + BusResponse::NetworkMap(nodes) + } else { + BusResponse::Fail("No such node".to_string()) + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/shaped_devices_tracker/netjson.rs b/src/rust/lqosd/src/shaped_devices_tracker/netjson.rs new file mode 100644 index 00000000..8c5d794a --- /dev/null +++ b/src/rust/lqosd/src/shaped_devices_tracker/netjson.rs @@ -0,0 +1,47 @@ +use log::{info, error, warn}; +use lqos_config::NetworkJson; +use lqos_utils::file_watcher::FileWatcher; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use tokio::task::spawn_blocking; +use anyhow::Result; + +pub static NETWORK_JSON: Lazy> = Lazy::new(|| RwLock::new(NetworkJson::default())); + +pub async fn network_json_watcher() { + spawn_blocking(|| { + info!("Watching for network.kson changes"); + let _ = watch_for_network_json_changing(); + }); +} + +/// Fires up a Linux file system watcher than notifies +/// when `network.json` changes, and triggers a reload. +fn watch_for_network_json_changing() -> Result<()> { + let watch_path = NetworkJson::path(); + if watch_path.is_err() { + error!("Unable to generate path for network.json"); + return Err(anyhow::Error::msg( + "Unable to create path for network.json", + )); + } + let watch_path = watch_path.unwrap(); + + let mut watcher = FileWatcher::new("network.json", watch_path); + watcher.set_file_exists_callback(load_network_json); + watcher.set_file_created_callback(load_network_json); + watcher.set_file_changed_callback(load_network_json); + loop { + let result = watcher.watch(); + info!("network.json watcher returned: {result:?}"); + } + } + + fn load_network_json() { + let njs = NetworkJson::load(); + if let Ok(njs) = njs { + *NETWORK_JSON.write() = njs; + } else { + warn!("Unable to load network.json"); + } + } \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 64cea3cb..aba21e20 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -1,6 +1,6 @@ mod throughput_entry; mod tracking_data; -use crate::throughput_tracker::tracking_data::ThroughputTracker; +use crate::{throughput_tracker::tracking_data::ThroughputTracker, shaped_devices_tracker::NETWORK_JSON}; use log::{info, warn}; use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult}; use lqos_sys::XdpIpAddress; @@ -21,10 +21,11 @@ pub fn spawn_throughput_monitor() { std::thread::spawn(move || { periodic(interval_ms, "Throughput Monitor", &mut || { let mut throughput = THROUGHPUT_TRACKER.write(); - throughput.copy_previous_and_reset_rtt(); + let mut net_json = NETWORK_JSON.write(); + throughput.copy_previous_and_reset_rtt(&mut net_json); throughput.apply_new_throughput_counters(); throughput.apply_rtt_data(); - throughput.update_totals(); + throughput.update_totals(&mut net_json); throughput.next_cycle(); }); }); diff --git a/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs b/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs index f5efaa46..c754cdd6 100644 --- a/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs +++ b/src/rust/lqosd/src/throughput_tracker/throughput_entry.rs @@ -3,6 +3,7 @@ use lqos_bus::TcHandle; #[derive(Debug)] pub(crate) struct ThroughputEntry { pub(crate) circuit_id: Option, + pub(crate) network_json_parents: Option>, pub(crate) first_cycle: u64, pub(crate) most_recent_cycle: u64, pub(crate) bytes: (u64, u64), diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 9b4d9884..65283fe6 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -2,6 +2,7 @@ use crate::shaped_devices_tracker::SHAPED_DEVICES; use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use lqos_bus::TcHandle; +use lqos_config::NetworkJson; use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress}; use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator}; use std::collections::HashMap; @@ -28,7 +29,13 @@ impl ThroughputTracker { } } - pub(crate) fn copy_previous_and_reset_rtt(&mut self) { + pub(crate) fn copy_previous_and_reset_rtt( + &mut self, + netjson: &mut NetworkJson, + ) { + // Zero the previous funnel hierarchy current numbers + netjson.zero_throughput_and_rtt(); + // Copy previous byte/packet numbers and reset RTT data // We're using Rayon's "par_iter_mut" to spread the operation across // all CPU cores. @@ -65,9 +72,37 @@ impl ThroughputTracker { circuit_id } + pub(crate) fn get_node_name_for_circuit_id( + circuit_id: Option, + ) -> Option { + if let Some(circuit_id) = circuit_id { + let shaped = SHAPED_DEVICES.read(); + shaped + .devices + .iter() + .find(|d| d.circuit_id == circuit_id) + .map(|device| device.parent_node.clone()) + } else { + None + } + } + + pub(crate) fn lookup_network_parents( + circuit_id: Option, + ) -> Option> { + if let Some(parent) = Self::get_node_name_for_circuit_id(circuit_id) { + let lock = crate::shaped_devices_tracker::NETWORK_JSON.read(); + lock.get_parents_for_circuit_id(&parent) + } else { + None + } + } + pub(crate) fn refresh_circuit_ids(&mut self) { self.raw_data.par_iter_mut().for_each(|(ip, data)| { data.circuit_id = Self::lookup_circuit_id(ip); + data.network_json_parents = + Self::lookup_network_parents(data.circuit_id.clone()); }); } @@ -94,8 +129,10 @@ impl ThroughputTracker { entry.most_recent_cycle = cycle; } } else { + let circuit_id = Self::lookup_circuit_id(xdp_ip); let mut entry = ThroughputEntry { - circuit_id: Self::lookup_circuit_id(xdp_ip), + circuit_id: circuit_id.clone(), + network_json_parents: Self::lookup_network_parents(circuit_id), first_cycle: self.cycle, most_recent_cycle: 0, bytes: (0, 0), @@ -135,7 +172,7 @@ impl ThroughputTracker { }); } - pub(crate) fn update_totals(&mut self) { + pub(crate) fn update_totals(&mut self, net_json: &mut NetworkJson) { self.bytes_per_second = (0, 0); self.packets_per_second = (0, 0); self.shaped_bytes_per_second = (0, 0); @@ -149,27 +186,43 @@ impl ThroughputTracker { v.packets.0.saturating_sub(v.prev_packets.0), v.packets.1.saturating_sub(v.prev_packets.1), v.tc_handle.as_u32() > 0, + &v.network_json_parents, + v.median_latency(), ) }) - .for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| { - self.bytes_per_second.0 = - self.bytes_per_second.0.checked_add(bytes_down).unwrap_or(0); - self.bytes_per_second.1 = - self.bytes_per_second.1.checked_add(bytes_up).unwrap_or(0); - self.packets_per_second.0 = - self.packets_per_second.0.checked_add(packets_down).unwrap_or(0); - self.packets_per_second.1 = - self.packets_per_second.1.checked_add(packets_up).unwrap_or(0); - if shaped { - self.shaped_bytes_per_second.0 = self - .shaped_bytes_per_second - .0 - .checked_add(bytes_down) - .unwrap_or(0); - self.shaped_bytes_per_second.1 = - self.shaped_bytes_per_second.1.checked_add(bytes_up).unwrap_or(0); - } - }); + .for_each( + |(bytes_down, bytes_up, packets_down, packets_up, shaped, parents, median_rtt)| { + self.bytes_per_second.0 = + self.bytes_per_second.0.checked_add(bytes_down).unwrap_or(0); + self.bytes_per_second.1 = + self.bytes_per_second.1.checked_add(bytes_up).unwrap_or(0); + self.packets_per_second.0 = + self.packets_per_second.0.checked_add(packets_down).unwrap_or(0); + self.packets_per_second.1 = + self.packets_per_second.1.checked_add(packets_up).unwrap_or(0); + if shaped { + self.shaped_bytes_per_second.0 = self + .shaped_bytes_per_second + .0 + .checked_add(bytes_down) + .unwrap_or(0); + self.shaped_bytes_per_second.1 = self + .shaped_bytes_per_second + .1 + .checked_add(bytes_up) + .unwrap_or(0); + } + + // If we have parent node data, we apply it now + if let Some(parents) = parents { + net_json.add_throughput_cycle( + parents, + (self.bytes_per_second.0, self.bytes_per_second.1), + median_rtt, + ) + } + }, + ); } pub(crate) fn next_cycle(&mut self) {