From 67cc8d8e99b73adc00b47be0b4749b020714ab9a Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Tue, 7 Mar 2023 21:37:23 +0000 Subject: [PATCH] Large batch of improvements: * The JavaScript RingBuffer structure updated correctly. * Replaced the funnel graph with text - easier to read. * Discovered that the current "parking_lot" could become unstable under very heavy load, and only with "fat" LTO. Since it's no longer recommended (recent change), removed it. * Replaced the "lazy_static" macro suite with the newly recommended "once_cell" system. Less code. * Full source format. * Update some dependency versions. --- src/rust/Cargo.lock | 24 +- src/rust/lqos_bus/src/bus/request.rs | 10 +- src/rust/lqos_config/src/lib.rs | 4 +- src/rust/lqos_config/src/network_json/mod.rs | 38 +-- src/rust/lqos_node_manager/Cargo.toml | 2 - src/rust/lqos_node_manager/src/auth_guard.rs | 18 +- src/rust/lqos_node_manager/src/main.rs | 3 +- .../lqos_node_manager/src/network_tree.rs | 53 ++++- src/rust/lqos_node_manager/src/queue_info.rs | 10 +- .../lqos_node_manager/src/shaped_devices.rs | 8 +- .../src/tracker/cache/lqosd_stats.rs | 27 +-- .../src/tracker/cache/shaped_devices.rs | 26 +-- .../src/tracker/cache_manager.rs | 20 +- src/rust/lqos_node_manager/src/tracker/mod.rs | 42 ++-- .../lqos_node_manager/src/unknown_devices.rs | 8 +- .../static/circuit_queue.html | 66 +++++- src/rust/lqos_node_manager/static/lqos.js | 65 +++--- src/rust/lqos_node_manager/static/main.html | 52 +++-- src/rust/lqos_node_manager/static/tree.html | 4 +- src/rust/lqos_python/src/lib.rs | 12 +- src/rust/lqos_queue_tracker/Cargo.toml | 4 +- src/rust/lqos_queue_tracker/src/bus.rs | 2 +- .../src/circuit_to_queue.rs | 15 +- src/rust/lqos_queue_tracker/src/interval.rs | 6 +- .../queing_structure_json_monitor.rs | 15 +- .../src/queue_structure/queue_node.rs | 2 +- .../lqos_queue_tracker/src/tracking/mod.rs | 7 +- .../src/tracking/watched_queues.rs | 20 +- src/rust/lqos_sys/src/xdp_ip_address.rs | 9 +- src/rust/lqos_utils/src/hex_string.rs | 34 +-- src/rust/lqos_utils/src/lib.rs | 2 +- src/rust/lqosd/Cargo.toml | 2 - src/rust/lqosd/src/main.rs | 5 +- .../lqosd/src/shaped_devices_tracker/mod.rs | 61 +++-- .../src/shaped_devices_tracker/netjson.rs | 59 +++-- src/rust/lqosd/src/throughput_tracker/mod.rs | 41 ++-- .../src/throughput_tracker/tracking_data.rs | 217 ++++-------------- 37 files changed, 510 insertions(+), 483 deletions(-) diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 46e744c1..66e1a88e 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359" +checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" dependencies = [ "csv-core", "itoa", @@ -1116,9 +1116,9 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3" +checksum = "cfa919a82ea574332e2de6e74b4c36e74d41982b335080fa59d4ef31be20fdf3" dependencies = [ "libc", "windows-sys 0.45.0", @@ -1348,13 +1348,11 @@ dependencies = [ "anyhow", "default-net", "jemallocator", - "lazy_static", "lqos_bus", "lqos_config", "lqos_utils", "nix", "once_cell", - "parking_lot", "rocket", "rocket_async_compression", "sysinfo", @@ -1378,15 +1376,13 @@ name = "lqos_queue_tracker" version = "0.1.0" dependencies = [ "criterion", - "lazy_static", "log", "log-once", "lqos_bus", "lqos_config", "lqos_sys", "lqos_utils", - "parking_lot", - "rayon", + "once_cell", "serde", "serde_json", "thiserror", @@ -1445,8 +1441,6 @@ dependencies = [ "lqos_utils", "nix", "once_cell", - "parking_lot", - "rayon", "serde", "serde_json", "signal-hook", @@ -2200,18 +2194,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.152" +version = "1.0.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "3a382c72b4ba118526e187430bb4963cd6d55051ebf13d9b25574d379cc98d20" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "1ef476a5790f0f6decbc66726b6e5d63680ed518283e64c7df415989d880954f" dependencies = [ "proc-macro2", "quote", diff --git a/src/rust/lqos_bus/src/bus/request.rs b/src/rust/lqos_bus/src/bus/request.rs index 35a6e2d2..016ff2d6 100644 --- a/src/rust/lqos_bus/src/bus/request.rs +++ b/src/rust/lqos_bus/src/bus/request.rs @@ -112,9 +112,9 @@ pub enum BusRequest { ValidateShapedDevicesCsv, /// Request details of part of the network tree - GetNetworkMap{ + GetNetworkMap { /// The parent of the map to retrieve - parent: usize + parent: usize, }, /// Retrieves the top N queues from the root level, and summarizes @@ -124,6 +124,12 @@ pub enum BusRequest { /// Retrieve node names from network.json GetNodeNamesFromIds(Vec), + /// Retrieve stats for all queues above a named circuit id + GetFunnel { + /// Circuit being analyzed, as the named circuit id + target: String, + }, + /// If running on Equinix (the `equinix_test` feature is enabled), /// display a "run bandwidht test" link. #[cfg(feature = "equinix_tests")] diff --git a/src/rust/lqos_config/src/lib.rs b/src/rust/lqos_config/src/lib.rs index ba4b04af..0d456a06 100644 --- a/src/rust/lqos_config/src/lib.rs +++ b/src/rust/lqos_config/src/lib.rs @@ -9,16 +9,16 @@ mod authentication; mod etc; mod libre_qos_config; +mod network_json; mod program_control; mod shaped_devices; -mod network_json; pub use authentication::{UserRole, WebUsers}; pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables}; pub use libre_qos_config::LibreQoSConfig; +pub use network_json::{NetworkJson, NetworkJsonNode}; pub use program_control::load_libreqos; pub use shaped_devices::{ConfigShapedDevices, ShapedDevice}; -pub use network_json::{NetworkJson, NetworkJsonNode}; /// Used as a constant in determining buffer preallocation pub const SUPPORTED_CUSTOMERS: usize = 16_000_000; diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs index 2b02347a..c9b0f766 100644 --- a/src/rust/lqos_config/src/network_json/mod.rs +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -122,12 +122,15 @@ impl NetworkJson { /// Retrieve a cloned copy of all children with a parent containing a specific /// node index. - pub fn get_cloned_children(&self, index: usize) -> Vec<(usize, NetworkJsonNode)> { + pub fn get_cloned_children( + &self, + index: usize, + ) -> Vec<(usize, NetworkJsonNode)> { self .nodes .iter() .enumerate() - .filter(|(_i,n)| n.immediate_parent == Some(index)) + .filter(|(_i, n)| n.immediate_parent == Some(index)) .map(|(i, n)| (i, n.clone())) .collect() } @@ -158,16 +161,24 @@ impl NetworkJson { &mut self, targets: &[usize], bytes: (u64, u64), - median_rtt: f32, ) { for idx in targets { // Safety first: use "get" to ensure that the node exists if let Some(node) = self.nodes.get_mut(*idx) { node.current_throughput.0 += bytes.0; node.current_throughput.1 += bytes.1; - if median_rtt > 0.0 { - node.rtts.push(median_rtt); - } + } else { + warn!("No network tree entry for index {idx}"); + } + } + } + + /// Record RTT time in the tree + pub fn add_rtt_cycle(&mut self, targets: &[usize], rtt: f32) { + for idx in targets { + // Safety first: use "get" to ensure that the node exists + if let Some(node) = self.nodes.get_mut(*idx) { + node.rtts.push(rtt); } else { warn!("No network tree entry for index {idx}"); } @@ -195,14 +206,13 @@ fn recurse_node( immediate_parent: usize, ) { info!("Mapping {name} from network.json"); - /*let my_id = if name != "children" { + let mut parents = parents.to_vec(); + let my_id = if name != "children" { + parents.push(nodes.len()); nodes.len() } else { - nodes.len()-1 - };*/ - let my_id = nodes.len(); - let mut parents = parents.to_vec(); - parents.push(my_id); + nodes.len() - 1 + }; let node = NetworkJsonNode { parents: parents.to_vec(), max_throughput: ( @@ -215,9 +225,9 @@ fn recurse_node( rtts: Vec::new(), }; - //if node.name != "children" { + if node.name != "children" { nodes.push(node); - //} + } // Recurse children for (key, value) in json.iter() { diff --git a/src/rust/lqos_node_manager/Cargo.toml b/src/rust/lqos_node_manager/Cargo.toml index 8c7bf029..69eb1030 100644 --- a/src/rust/lqos_node_manager/Cargo.toml +++ b/src/rust/lqos_node_manager/Cargo.toml @@ -10,8 +10,6 @@ equinix_tests = [] [dependencies] rocket = { version = "0.5.0-rc.2", features = [ "json", "msgpack", "uuid" ] } rocket_async_compression = "0.2.0" -lazy_static = "1.4" -parking_lot = "0.12" lqos_bus = { path = "../lqos_bus" } lqos_config = { path = "../lqos_config" } lqos_utils = { path = "../lqos_utils" } diff --git a/src/rust/lqos_node_manager/src/auth_guard.rs b/src/rust/lqos_node_manager/src/auth_guard.rs index a23cd4d9..e714f342 100644 --- a/src/rust/lqos_node_manager/src/auth_guard.rs +++ b/src/rust/lqos_node_manager/src/auth_guard.rs @@ -1,7 +1,8 @@ +use std::sync::Mutex; + use anyhow::Error; -use lazy_static::*; use lqos_config::{UserRole, WebUsers}; -use parking_lot::Mutex; +use once_cell::sync::Lazy; use rocket::serde::{json::Json, Deserialize, Serialize}; use rocket::{ http::{Cookie, CookieJar, Status}, @@ -9,9 +10,8 @@ use rocket::{ Request, }; -lazy_static! { - static ref WEB_USERS: Mutex> = Mutex::new(None); -} +static WEB_USERS: Lazy>> = + Lazy::new(|| Mutex::new(None)); #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AuthGuard { @@ -27,7 +27,7 @@ impl<'r> FromRequest<'r> for AuthGuard { async fn from_request( request: &'r Request<'_>, ) -> Outcome { - let mut lock = WEB_USERS.lock(); + let mut lock = WEB_USERS.lock().unwrap(); if lock.is_none() { if WebUsers::does_users_file_exist().unwrap() { *lock = Some(WebUsers::load_or_create().unwrap()); @@ -82,7 +82,7 @@ pub fn create_first_user( if WebUsers::does_users_file_exist().unwrap() { return Json("ERROR".to_string()); } - let mut lock = WEB_USERS.lock(); + let mut lock = WEB_USERS.lock().unwrap(); let mut users = WebUsers::load_or_create().unwrap(); users.allow_anonymous(info.allow_anonymous).unwrap(); let token = users @@ -102,7 +102,7 @@ pub struct LoginAttempt { #[post("/api/login", data = "")] pub fn login(cookies: &CookieJar, info: Json) -> Json { - let mut lock = WEB_USERS.lock(); + let mut lock = WEB_USERS.lock().unwrap(); if lock.is_none() && WebUsers::does_users_file_exist().unwrap() { *lock = Some(WebUsers::load_or_create().unwrap()); } @@ -126,7 +126,7 @@ pub fn admin_check(auth: AuthGuard) -> Json { #[get("/api/username")] pub fn username(_auth: AuthGuard, cookies: &CookieJar) -> Json { if let Some(token) = cookies.get("User-Token") { - let lock = WEB_USERS.lock(); + let lock = WEB_USERS.lock().unwrap(); if let Some(users) = &*lock { return Json(users.get_username(token.value())); } diff --git a/src/rust/lqos_node_manager/src/main.rs b/src/rust/lqos_node_manager/src/main.rs index 581e9691..e5e3d0f4 100644 --- a/src/rust/lqos_node_manager/src/main.rs +++ b/src/rust/lqos_node_manager/src/main.rs @@ -9,8 +9,8 @@ mod unknown_devices; use rocket_async_compression::Compression; mod auth_guard; mod config_control; -mod queue_info; mod network_tree; +mod queue_info; // Use JemAllocator only on supported platforms #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] @@ -81,6 +81,7 @@ fn rocket() -> _ { network_tree::tree_clients, network_tree::network_tree_summary, network_tree::node_names, + network_tree::funnel_for_queue, // Supporting files static_pages::bootsrap_css, static_pages::plotly_js, diff --git a/src/rust/lqos_node_manager/src/network_tree.rs b/src/rust/lqos_node_manager/src/network_tree.rs index df8fd784..350da873 100644 --- a/src/rust/lqos_node_manager/src/network_tree.rs +++ b/src/rust/lqos_node_manager/src/network_tree.rs @@ -2,7 +2,10 @@ use std::net::IpAddr; use lqos_bus::{bus_request, BusRequest, BusResponse}; use lqos_config::NetworkJsonNode; -use rocket::{fs::NamedFile, serde::{json::Json, Serialize}}; +use rocket::{ + fs::NamedFile, + serde::{json::Json, Serialize}, +}; use crate::{cache_control::NoCache, tracker::SHAPED_DEVICES}; @@ -28,7 +31,8 @@ pub async fn tree_entry( } #[get("/api/network_tree_summary")] -pub async fn network_tree_summary() -> NoCache>> { +pub async fn network_tree_summary( +) -> NoCache>> { let responses = bus_request(vec![BusRequest::TopMapQueues(4)]).await.unwrap(); let result = match &responses[0] { @@ -55,7 +59,7 @@ pub async fn tree_clients( for msg in bus_request(vec![BusRequest::GetHostCounter]).await.unwrap().iter() { - let devices = SHAPED_DEVICES.read(); + let devices = SHAPED_DEVICES.read().unwrap(); if let BusResponse::HostCounters(hosts) = msg { for (ip, down, up) in hosts.iter() { let lookup = match ip { @@ -71,7 +75,7 @@ pub async fn tree_clients( limit: ( devices.devices[*c.1].download_max_mbps as u64, devices.devices[*c.1].upload_max_mbps as u64, - ) + ), }); } } @@ -81,11 +85,15 @@ pub async fn tree_clients( NoCache::new(Json(result)) } -#[post("/api/node_names", data= "")] -pub async fn node_names(nodes: Json>) -> NoCache>> { +#[post("/api/node_names", data = "")] +pub async fn node_names( + nodes: Json>, +) -> NoCache>> { let mut result = Vec::new(); - for msg in - bus_request(vec![BusRequest::GetNodeNamesFromIds(nodes.0)]).await.unwrap().iter() + for msg in bus_request(vec![BusRequest::GetNodeNamesFromIds(nodes.0)]) + .await + .unwrap() + .iter() { if let BusResponse::NodeNames(map) = msg { result.extend_from_slice(map); @@ -93,4 +101,31 @@ pub async fn node_names(nodes: Json>) -> NoCache")] +pub async fn funnel_for_queue( + circuit_id: String, +) -> NoCache>> { + let mut result = Vec::new(); + + let target = SHAPED_DEVICES + .read() + .unwrap() + .devices + .iter() + .find(|d| d.circuit_id == circuit_id) + .as_ref() + .unwrap() + .parent_node + .clone(); + + for msg in + bus_request(vec![BusRequest::GetFunnel { target }]).await.unwrap().iter() + { + if let BusResponse::NetworkMap(map) = msg { + result.extend_from_slice(map); + } + } + NoCache::new(Json(result)) +} diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index ff4e4f5e..5047f1be 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -29,8 +29,12 @@ pub async fn circuit_info( circuit_id: String, _auth: AuthGuard, ) -> NoCache> { - if let Some(device) = - SHAPED_DEVICES.read().devices.iter().find(|d| d.circuit_id == circuit_id) + if let Some(device) = SHAPED_DEVICES + .read() + .unwrap() + .devices + .iter() + .find(|d| d.circuit_id == circuit_id) { let result = CircuitInfo { name: device.circuit_name.clone(), @@ -63,7 +67,7 @@ pub async fn current_circuit_throughput( bus_request(vec![BusRequest::GetHostCounter]).await.unwrap().iter() { if let BusResponse::HostCounters(hosts) = msg { - let devices = SHAPED_DEVICES.read(); + let devices = SHAPED_DEVICES.read().unwrap(); for (ip, down, up) in hosts.iter() { let lookup = match ip { IpAddr::V4(ip) => ip.to_ipv6_mapped(), diff --git a/src/rust/lqos_node_manager/src/shaped_devices.rs b/src/rust/lqos_node_manager/src/shaped_devices.rs index 0200ee73..64737a0b 100644 --- a/src/rust/lqos_node_manager/src/shaped_devices.rs +++ b/src/rust/lqos_node_manager/src/shaped_devices.rs @@ -13,12 +13,12 @@ static RELOAD_REQUIRED: AtomicBool = AtomicBool::new(false); pub fn all_shaped_devices( _auth: AuthGuard, ) -> NoCache>> { - NoCache::new(Json(SHAPED_DEVICES.read().devices.clone())) + NoCache::new(Json(SHAPED_DEVICES.read().unwrap().devices.clone())) } #[get("/api/shaped_devices_count")] pub fn shaped_devices_count(_auth: AuthGuard) -> NoCache> { - NoCache::new(Json(SHAPED_DEVICES.read().devices.len())) + NoCache::new(Json(SHAPED_DEVICES.read().unwrap().devices.len())) } #[get("/api/shaped_devices_range//")] @@ -27,7 +27,7 @@ pub fn shaped_devices_range( end: usize, _auth: AuthGuard, ) -> NoCache>> { - let reader = SHAPED_DEVICES.read(); + let reader = SHAPED_DEVICES.read().unwrap(); let result: Vec = reader.devices.iter().skip(start).take(end).cloned().collect(); NoCache::new(Json(result)) @@ -39,7 +39,7 @@ pub fn shaped_devices_search( _auth: AuthGuard, ) -> NoCache>> { let term = term.trim().to_lowercase(); - let reader = SHAPED_DEVICES.read(); + let reader = SHAPED_DEVICES.read().unwrap(); let result: Vec = reader .devices .iter() diff --git a/src/rust/lqos_node_manager/src/tracker/cache/lqosd_stats.rs b/src/rust/lqos_node_manager/src/tracker/cache/lqosd_stats.rs index 4c00bc08..6b66247b 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache/lqosd_stats.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache/lqosd_stats.rs @@ -1,22 +1,15 @@ -use lazy_static::*; use lqos_bus::IpStats; -use parking_lot::RwLock; +use once_cell::sync::Lazy; +use std::sync::RwLock; -lazy_static! { - pub static ref TOP_10_DOWNLOADERS: RwLock> = - RwLock::new(Vec::with_capacity(10)); -} +pub static TOP_10_DOWNLOADERS: Lazy>> = + Lazy::new(|| RwLock::new(Vec::with_capacity(10))); -lazy_static! { - pub static ref WORST_10_RTT: RwLock> = - RwLock::new(Vec::with_capacity(10)); -} +pub static WORST_10_RTT: Lazy>> = + Lazy::new(|| RwLock::new(Vec::with_capacity(10))); -lazy_static! { - pub static ref RTT_HISTOGRAM: RwLock> = - RwLock::new(Vec::with_capacity(100)); -} +pub static RTT_HISTOGRAM: Lazy>> = + Lazy::new(|| RwLock::new(Vec::with_capacity(100))); -lazy_static! { - pub static ref HOST_COUNTS: RwLock<(u32, u32)> = RwLock::new((0, 0)); -} +pub static HOST_COUNTS: Lazy> = + Lazy::new(|| RwLock::new((0, 0))); diff --git a/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs b/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs index 28ebb2f7..45cd5ae9 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs @@ -1,18 +1,16 @@ -use lazy_static::*; use lqos_bus::IpStats; use lqos_config::ConfigShapedDevices; -use parking_lot::RwLock; +use once_cell::sync::Lazy; +use std::sync::RwLock; -lazy_static! { - /// Global storage of the shaped devices csv data. - /// Updated by the file system watcher whenever - /// the underlying file changes. - pub static ref SHAPED_DEVICES : RwLock = RwLock::new(ConfigShapedDevices::default()); -} +/// Global storage of the shaped devices csv data. +/// Updated by the file system watcher whenever +/// the underlying file changes. +pub static SHAPED_DEVICES: Lazy> = + Lazy::new(|| RwLock::new(ConfigShapedDevices::default())); -lazy_static! { - /// Global storage of the shaped devices csv data. - /// Updated by the file system watcher whenever - /// the underlying file changes. - pub static ref UNKNOWN_DEVICES : RwLock> = RwLock::new(Vec::new()); -} +/// Global storage of the shaped devices csv data. +/// Updated by the file system watcher whenever +/// the underlying file changes. +pub static UNKNOWN_DEVICES: Lazy>> = + Lazy::new(|| RwLock::new(Vec::new())); diff --git a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs index ca00be49..c28ce6ee 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs @@ -95,10 +95,10 @@ fn load_shaped_devices() { let shaped_devices = ConfigShapedDevices::load(); if let Ok(new_file) = shaped_devices { info!("ShapedDevices.csv loaded"); - *SHAPED_DEVICES.write() = new_file; + *SHAPED_DEVICES.write().unwrap() = new_file; } else { warn!("ShapedDevices.csv failed to load, see previous error messages. Reverting to empty set."); - *SHAPED_DEVICES.write() = ConfigShapedDevices::default(); + *SHAPED_DEVICES.write().unwrap() = ConfigShapedDevices::default(); } } @@ -136,19 +136,19 @@ async fn get_data_from_server() -> Result<()> { ]; for r in bus_request(requests).await?.iter() { - match r { + match r { BusResponse::TopDownloaders(stats) => { - *TOP_10_DOWNLOADERS.write() = stats.clone(); + *TOP_10_DOWNLOADERS.write().unwrap() = stats.clone(); } BusResponse::WorstRtt(stats) => { - *WORST_10_RTT.write() = stats.clone(); + *WORST_10_RTT.write().unwrap() = stats.clone(); } BusResponse::RttHistogram(stats) => { - *RTT_HISTOGRAM.write() = stats.clone(); + *RTT_HISTOGRAM.write().unwrap() = stats.clone(); } BusResponse::AllUnknownIps(unknowns) => { - *HOST_COUNTS.write() = (unknowns.len() as u32, 0); - let cfg = SHAPED_DEVICES.read(); + *HOST_COUNTS.write().unwrap() = (unknowns.len() as u32, 0); + let cfg = SHAPED_DEVICES.read().unwrap(); let really_unknown: Vec = unknowns .iter() .filter(|ip| { @@ -164,8 +164,8 @@ async fn get_data_from_server() -> Result<()> { }) .cloned() .collect(); - *HOST_COUNTS.write() = (really_unknown.len() as u32, 0); - *UNKNOWN_DEVICES.write() = really_unknown; + *HOST_COUNTS.write().unwrap() = (really_unknown.len() as u32, 0); + *UNKNOWN_DEVICES.write().unwrap() = really_unknown; } BusResponse::NotReadyYet => { warn!("Host system isn't ready to answer all queries yet."); diff --git a/src/rust/lqos_node_manager/src/tracker/mod.rs b/src/rust/lqos_node_manager/src/tracker/mod.rs index 9f5390e4..49ce5e41 100644 --- a/src/rust/lqos_node_manager/src/tracker/mod.rs +++ b/src/rust/lqos_node_manager/src/tracker/mod.rs @@ -1,17 +1,13 @@ mod cache; mod cache_manager; use self::cache::{ - CPU_USAGE, HOST_COUNTS, NUM_CPUS, RAM_USED, - RTT_HISTOGRAM, TOP_10_DOWNLOADERS, TOTAL_RAM, - WORST_10_RTT, + CPU_USAGE, HOST_COUNTS, NUM_CPUS, RAM_USED, RTT_HISTOGRAM, + TOP_10_DOWNLOADERS, TOTAL_RAM, WORST_10_RTT, }; use crate::auth_guard::AuthGuard; pub use cache::{SHAPED_DEVICES, UNKNOWN_DEVICES}; pub use cache_manager::update_tracking; -use lazy_static::lazy_static; -use lqos_bus::{IpStats, TcHandle, bus_request, BusRequest, BusResponse}; -use lqos_config::LibreQoSConfig; -use parking_lot::Mutex; +use lqos_bus::{bus_request, BusRequest, BusResponse, IpStats, TcHandle}; use rocket::serde::{json::Json, Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug)] @@ -41,6 +37,7 @@ impl From<&IpStats> for IpStatsWithPlan { if !result.circuit_id.is_empty() { if let Some(circuit) = SHAPED_DEVICES .read() + .unwrap() .devices .iter() .find(|sd| sd.circuit_id == result.circuit_id) @@ -50,8 +47,7 @@ impl From<&IpStats> for IpStatsWithPlan { } else { &circuit.circuit_name }; - result.ip_address = - format!("{} ({})", name, result.ip_address); + result.ip_address = format!("{} ({})", name, result.ip_address); result.plan = (circuit.download_max_mbps, circuit.download_min_mbps); } } @@ -70,15 +66,20 @@ pub struct ThroughputPerSecond { } #[get("/api/current_throughput")] -pub async fn current_throughput(_auth: AuthGuard) -> Json { +pub async fn current_throughput( + _auth: AuthGuard, +) -> Json { let mut result = ThroughputPerSecond::default(); - if let Ok(messages) = bus_request(vec![BusRequest::GetCurrentThroughput]).await { + if let Ok(messages) = + bus_request(vec![BusRequest::GetCurrentThroughput]).await + { for msg in messages { if let BusResponse::CurrentThroughput { bits_per_second, packets_per_second, shaped_bits_per_second, - } = msg { + } = msg + { result.bits_per_second = bits_per_second; result.packets_per_second = packets_per_second; result.shaped_bits_per_second = shaped_bits_per_second; @@ -109,33 +110,30 @@ pub fn ram_usage(_auth: AuthGuard) -> Json> { #[get("/api/top_10_downloaders")] pub fn top_10_downloaders(_auth: AuthGuard) -> Json> { let tt: Vec = - TOP_10_DOWNLOADERS.read().iter().map(|tt| tt.into()).collect(); + TOP_10_DOWNLOADERS.read().unwrap().iter().map(|tt| tt.into()).collect(); Json(tt) } #[get("/api/worst_10_rtt")] pub fn worst_10_rtt(_auth: AuthGuard) -> Json> { let tt: Vec = - WORST_10_RTT.read().iter().map(|tt| tt.into()).collect(); + WORST_10_RTT.read().unwrap().iter().map(|tt| tt.into()).collect(); Json(tt) } #[get("/api/rtt_histogram")] pub fn rtt_histogram(_auth: AuthGuard) -> Json> { - Json(RTT_HISTOGRAM.read().clone()) + Json(RTT_HISTOGRAM.read().unwrap().clone()) } #[get("/api/host_counts")] pub fn host_counts(_auth: AuthGuard) -> Json<(u32, u32)> { - let shaped_reader = SHAPED_DEVICES.read(); + let shaped_reader = SHAPED_DEVICES.read().unwrap(); let n_devices = shaped_reader.devices.len(); - let host_counts = HOST_COUNTS.read(); + let host_counts = HOST_COUNTS.read().unwrap(); let unknown = host_counts.0 - host_counts.1; Json((n_devices as u32, unknown)) } -lazy_static! { - static ref CONFIG: Mutex = - Mutex::new(lqos_config::LibreQoSConfig::load().unwrap()); -} - +//static CONFIG: Lazy> = +// Lazy::new(|| Mutex::new(lqos_config::LibreQoSConfig::load().unwrap())); diff --git a/src/rust/lqos_node_manager/src/unknown_devices.rs b/src/rust/lqos_node_manager/src/unknown_devices.rs index 632dd949..3dd201c2 100644 --- a/src/rust/lqos_node_manager/src/unknown_devices.rs +++ b/src/rust/lqos_node_manager/src/unknown_devices.rs @@ -6,12 +6,12 @@ use rocket::serde::json::Json; #[get("/api/all_unknown_devices")] pub fn all_unknown_devices(_auth: AuthGuard) -> NoCache>> { - NoCache::new(Json(UNKNOWN_DEVICES.read().clone())) + NoCache::new(Json(UNKNOWN_DEVICES.read().unwrap().clone())) } #[get("/api/unknown_devices_count")] pub fn unknown_devices_count(_auth: AuthGuard) -> NoCache> { - NoCache::new(Json(UNKNOWN_DEVICES.read().len())) + NoCache::new(Json(UNKNOWN_DEVICES.read().unwrap().len())) } #[get("/api/unknown_devices_range//")] @@ -20,7 +20,7 @@ pub fn unknown_devices_range( end: usize, _auth: AuthGuard, ) -> NoCache>> { - let reader = UNKNOWN_DEVICES.read(); + let reader = UNKNOWN_DEVICES.read().unwrap(); let result: Vec = reader.iter().skip(start).take(end).cloned().collect(); NoCache::new(Json(result)) @@ -29,7 +29,7 @@ pub fn unknown_devices_range( #[get("/api/unknown_devices_csv")] pub fn unknown_devices_csv(_auth: AuthGuard) -> NoCache { let mut result = String::new(); - let reader = UNKNOWN_DEVICES.read(); + let reader = UNKNOWN_DEVICES.read().unwrap(); for unknown in reader.iter() { result += &format!("{}\n", unknown.ip_address); diff --git a/src/rust/lqos_node_manager/static/circuit_queue.html b/src/rust/lqos_node_manager/static/circuit_queue.html index 4b654177..9d3632a3 100644 --- a/src/rust/lqos_node_manager/static/circuit_queue.html +++ b/src/rust/lqos_node_manager/static/circuit_queue.html @@ -68,6 +68,9 @@ +
@@ -139,7 +142,7 @@
-
+
@@ -181,6 +184,9 @@
+ +
+
@@ -450,6 +456,63 @@ setTimeout(getThroughput, 1000); } + let funnels = new MultiRingBuffer(300); + let rtts = {}; + let circuitId = ""; + + function getFunnel(c) { + circuitId = encodeURI(c); + $.get("/api/funnel_for_queue/" + circuitId, (data) => { + let html = ""; + for (let i=0; i" + data[i][1].name + " Throughput"; + row += "
"; + row += ""; + row += ""; + + row += "
"; + row += "
"; + row += "
" + data[i][1].name + " TCP RTT
"; + row += "
"; + row += "
"; + row += "
"; + + row += ""; + html += row; + } + $("#pills-funnel").html(html); + setTimeout(plotFunnels, 1000); + }); + } + + function plotFunnels() { + $.get("/api/funnel_for_queue/" + encodeURI(circuitId), (data) => { + for (let i=0; i { pollQueue(); getThroughput(); + getFunnel(params.id); }); } diff --git a/src/rust/lqos_node_manager/static/lqos.js b/src/rust/lqos_node_manager/static/lqos.js index f3ded684..34e6e455 100644 --- a/src/rust/lqos_node_manager/static/lqos.js +++ b/src/rust/lqos_node_manager/static/lqos.js @@ -190,16 +190,6 @@ const reloadModal = ` `; -function yValsRingSort(y, head, capacity) { - let result = []; - for (let i=0; i a + b) + - v.upload.reduce((a, b) => a + b); - if (total > 0) { - let dn = { x: v.x_axis, y: yValsRingSort(v.download, v.head, v.capacity), name: k + "_DL", type: 'scatter', stackgroup: 'dn' }; - let up = { x: v.x_axis, y: yValsRingSort(v.upload, v.head, v.capacity), name: k + "_UL", type: 'scatter', stackgroup: 'up' }; - graphData.push(dn); - graphData.push(up); - } + let y = v.sortedY; + let dn = { x: v.x_axis, y: y.down, name: k + "_DL", type: 'scatter', stackgroup: 'dn' }; + let up = { x: v.x_axis, y: y.up, name: k + "_UL", type: 'scatter', stackgroup: 'up' }; + graphData.push(dn); + graphData.push(up); } } @@ -246,17 +233,16 @@ class MultiRingBuffer { plotTotalThroughput(target_div) { let graph = document.getElementById(target_div); - let totalDown = yValsRingSort(this.data['total'].download, this.data['total'].head, this.data['total'].capacity); - let totalUp = yValsRingSort(this.data['total'].upload, this.data['total'].head, this.data['total'].capacity); - let shapedDown = yValsRingSort(this.data['shaped'].download, this.data['shaped'].head, this.data['shaped'].capacity); - let shapedUp = yValsRingSort(this.data['shaped'].upload, this.data['shaped'].head, this.data['shaped'].capacity); + let total = this.data['total'].sortedY(); + let shaped = this.data['shaped'].sortedY(); + let x = this.data['total'].x_axis; let data = [ - {x: x, y:totalDown, name: 'Download', type: 'scatter', marker: {color: 'rgb(255,160,122)'}}, - {x: x, y:totalUp, name: 'Upload', type: 'scatter', marker: {color: 'rgb(255,160,122)'}}, - {x: x, y:shapedDown, name: 'Shaped Download', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}}, - {x: x, y:shapedUp, name: 'Shaped Upload', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}}, + {x: x, y:total.down, name: 'Download', type: 'scatter', marker: {color: 'rgb(255,160,122)'}}, + {x: x, y:total.up, name: 'Upload', type: 'scatter', marker: {color: 'rgb(255,160,122)'}}, + {x: x, y:shaped.down, name: 'Shaped Download', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}}, + {x: x, y:shaped.up, name: 'Shaped Upload', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}}, ]; Plotly.newPlot(graph, data, { margin: { l:0,r:0,b:0,t:0,pad:4 }, yaxis: { automargin: true }, xaxis: {automargin: true, title: "Time since now (seconds)"} }, { responsive: true }); } @@ -272,7 +258,7 @@ class RingBuffer { for (var i = 0; i < capacity; ++i) { this.download.push(0.0); this.upload.push(0.0); - this.x_axis.push(0-i); + this.x_axis.push(i); } } @@ -283,10 +269,27 @@ class RingBuffer { this.head %= this.capacity; } + sortedY() { + let result = { + down: [], + up: [], + }; + for (let i=this.head; i© 2022-2023, LibreQoE LLC