diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 46e744c1..951c1825 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -587,9 +587,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359" +checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad" dependencies = [ "csv-core", "itoa", @@ -615,6 +615,19 @@ dependencies = [ "cipher", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "default-net" version = "0.12.0" @@ -1116,9 +1129,9 @@ dependencies = [ [[package]] name = "io-lifetimes" -version = "1.0.5" +version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3" +checksum = "cfa919a82ea574332e2de6e74b4c36e74d41982b335080fa59d4ef31be20fdf3" dependencies = [ "libc", "windows-sys 0.45.0", @@ -1330,6 +1343,7 @@ name = "lqos_config" version = "0.1.0" dependencies = [ "csv", + "dashmap", "ip_network", "ip_network_table", "log", @@ -1348,13 +1362,11 @@ dependencies = [ "anyhow", "default-net", "jemallocator", - "lazy_static", "lqos_bus", "lqos_config", "lqos_utils", "nix", "once_cell", - "parking_lot", "rocket", "rocket_async_compression", "sysinfo", @@ -1378,15 +1390,14 @@ name = "lqos_queue_tracker" version = "0.1.0" dependencies = [ "criterion", - "lazy_static", + "dashmap", "log", "log-once", "lqos_bus", "lqos_config", "lqos_sys", "lqos_utils", - "parking_lot", - "rayon", + "once_cell", "serde", "serde_json", "thiserror", @@ -1435,6 +1446,7 @@ name = "lqosd" version = "0.1.0" dependencies = [ "anyhow", + "dashmap", "env_logger", "jemallocator", "log", @@ -1445,8 +1457,6 @@ dependencies = [ "lqos_utils", "nix", "once_cell", - "parking_lot", - "rayon", "serde", "serde_json", "signal-hook", @@ -2200,18 +2210,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.152" +version = "1.0.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb" +checksum = "3a382c72b4ba118526e187430bb4963cd6d55051ebf13d9b25574d379cc98d20" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.152" +version = "1.0.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e" +checksum = "1ef476a5790f0f6decbc66726b6e5d63680ed518283e64c7df415989d880954f" dependencies = [ "proc-macro2", "quote", diff --git a/src/rust/lqos_bus/src/bus/request.rs b/src/rust/lqos_bus/src/bus/request.rs index 448f936b..43839491 100644 --- a/src/rust/lqos_bus/src/bus/request.rs +++ b/src/rust/lqos_bus/src/bus/request.rs @@ -112,11 +112,27 @@ pub enum BusRequest { ValidateShapedDevicesCsv, /// Request details of part of the network tree - GetNetworkMap{ + GetNetworkMap { /// The parent of the map to retrieve - parent: usize + parent: usize, }, + /// Retrieves the top N queues from the root level, and summarizes + /// the others as "other" + TopMapQueues(usize), + + /// Retrieve node names from network.json + GetNodeNamesFromIds(Vec), + + /// Retrieve stats for all queues above a named circuit id + GetFunnel { + /// Circuit being analyzed, as the named circuit id + target: String, + }, + + /// Obtain the lqosd statistics + GetLqosStats, + /// If running on Equinix (the `equinix_test` feature is enabled), /// display a "run bandwidht test" link. #[cfg(feature = "equinix_tests")] diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index 666aaa0f..57f307b7 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -70,5 +70,16 @@ pub enum BusResponse { RawQueueData(String), /// Results from network map queries - NetworkMap(Vec<(usize, lqos_config::NetworkJsonNode)>), + NetworkMap(Vec<(usize, lqos_config::NetworkJsonTransport)>), + + /// Named nodes from network.json + NodeNames(Vec<(usize, String)>), + + /// Statistics from lqosd + LqosdStats{ + /// Number of bus requests handled + bus_requests: u64, + /// Us to poll hosts + time_to_poll_hosts: u64, + } } diff --git a/src/rust/lqos_bus/src/tc_handle.rs b/src/rust/lqos_bus/src/tc_handle.rs index dea3073d..20585c42 100644 --- a/src/rust/lqos_bus/src/tc_handle.rs +++ b/src/rust/lqos_bus/src/tc_handle.rs @@ -5,7 +5,7 @@ use thiserror::Error; /// Provides consistent handling of TC handle types. #[derive( - Copy, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq, + Copy, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq, Hash )] pub struct TcHandle(u32); diff --git a/src/rust/lqos_config/Cargo.toml b/src/rust/lqos_config/Cargo.toml index 3a4d942c..96ea3779 100644 --- a/src/rust/lqos_config/Cargo.toml +++ b/src/rust/lqos_config/Cargo.toml @@ -14,3 +14,4 @@ ip_network = "0" sha2 = "0" uuid = { version = "1", features = ["v4", "fast-rng" ] } log = "0" +dashmap = "5" diff --git a/src/rust/lqos_config/src/lib.rs b/src/rust/lqos_config/src/lib.rs index ba4b04af..65cced4e 100644 --- a/src/rust/lqos_config/src/lib.rs +++ b/src/rust/lqos_config/src/lib.rs @@ -9,16 +9,16 @@ mod authentication; mod etc; mod libre_qos_config; +mod network_json; mod program_control; mod shaped_devices; -mod network_json; pub use authentication::{UserRole, WebUsers}; pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables}; pub use libre_qos_config::LibreQoSConfig; +pub use network_json::{NetworkJson, NetworkJsonNode, NetworkJsonTransport}; pub use program_control::load_libreqos; pub use shaped_devices::{ConfigShapedDevices, ShapedDevice}; -pub use network_json::{NetworkJson, NetworkJsonNode}; /// Used as a constant in determining buffer preallocation pub const SUPPORTED_CUSTOMERS: usize = 16_000_000; diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs index cfcd531a..d4fadb0b 100644 --- a/src/rust/lqos_config/src/network_json/mod.rs +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -1,15 +1,16 @@ use crate::etc; +use dashmap::DashSet; use log::{error, info, warn}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::{ fs, - path::{Path, PathBuf}, + path::{Path, PathBuf}, sync::atomic::AtomicU64, }; use thiserror::Error; /// Describes a node in the network map tree. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug)] pub struct NetworkJsonNode { /// The node name, as it appears in `network.json` pub name: String, @@ -18,12 +19,12 @@ pub struct NetworkJsonNode { pub max_throughput: (u32, u32), // In mbps /// Current throughput (in bytes/second) at this node - pub current_throughput: (u64, u64), // In bytes + pub current_throughput: (AtomicU64, AtomicU64), // In bytes /// Approximate RTTs reported for this level of the tree. /// It's never going to be as statistically accurate as the actual /// numbers, being based on medians. - pub rtts: Vec, + pub rtts: DashSet, /// A list of indices in the `NetworkJson` vector of nodes /// linking to parent nodes @@ -33,12 +34,51 @@ pub struct NetworkJsonNode { pub immediate_parent: Option, } +impl NetworkJsonNode { + /// Make a deep copy of a `NetworkJsonNode`, converting atomics + /// into concrete values. + pub fn clone_to_transit(&self) -> NetworkJsonTransport { + NetworkJsonTransport { + name: self.name.clone(), + max_throughput: self.max_throughput, + current_throughput: ( + self.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed), + self.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed), + ), + rtts: self.rtts.iter().map(|n| *n as f32 / 100.0).collect(), + parents: self.parents.clone(), + immediate_parent: self.immediate_parent, + } + } +} + +/// A "transport-friendly" version of `NetworkJsonNode`. Designed +/// to be quickly cloned from original nodes and efficiently +/// transmitted/received. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct NetworkJsonTransport { + /// Display name + pub name: String, + /// Max throughput for node (not clamped) + pub max_throughput: (u32, u32), + /// Current node throughput + pub current_throughput: (u64, u64), + /// Set of RTT data + pub rtts: Vec, + /// Node indices of parents + pub parents: Vec, + /// The immediate parent node in the tree + pub immediate_parent: Option, +} + /// Holder for the network.json representation. /// This is condensed into a single level vector with index-based referencing /// for easy use in funnel calculations. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug)] pub struct NetworkJson { - nodes: Vec, + /// Nodes that make up the tree, flattened and referenced by index number. + /// TODO: We should add a primary key to nodes in network.json. + pub nodes: Vec, } impl Default for NetworkJson { @@ -77,10 +117,10 @@ impl NetworkJson { let mut nodes = vec![NetworkJsonNode { name: "Root".to_string(), max_throughput: (0, 0), - current_throughput: (0, 0), + current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), parents: Vec::new(), immediate_parent: None, - rtts: Vec::new(), + rtts: DashSet::new(), }]; if !Self::exists() { return Err(NetworkJsonError::FileNotFound); @@ -114,19 +154,22 @@ impl NetworkJson { pub fn get_cloned_entry_by_index( &self, index: usize, - ) -> Option { - self.nodes.get(index).cloned() + ) -> Option { + self.nodes.get(index).map(|n| n.clone_to_transit()) } /// Retrieve a cloned copy of all children with a parent containing a specific /// node index. - pub fn get_cloned_children(&self, index: usize) -> Vec<(usize, NetworkJsonNode)> { + pub fn get_cloned_children( + &self, + index: usize, + ) -> Vec<(usize, NetworkJsonTransport)> { self .nodes .iter() .enumerate() - .filter(|(_i,n)| n.immediate_parent == Some(index)) - .map(|(i, n)| (i, n.clone())) + .filter(|(_i, n)| n.immediate_parent == Some(index)) + .map(|(i, n)| (i, n.clone_to_transit())) .collect() } @@ -144,28 +187,42 @@ impl NetworkJson { } /// Sets all current throughput values to zero - pub fn zero_throughput_and_rtt(&mut self) { - self.nodes.iter_mut().for_each(|n| { - n.current_throughput = (0, 0); + /// Note that due to interior mutability, this does not require mutable + /// access. + pub fn zero_throughput_and_rtt(&self) { + self.nodes.iter().for_each(|n| { + n.current_throughput.0.store(0, std::sync::atomic::Ordering::Relaxed); + n.current_throughput.1.store(0, std::sync::atomic::Ordering::Relaxed); n.rtts.clear(); }); } - /// Add throughput numbers to node entries + /// Add throughput numbers to node entries. Note that this does *not* require + /// mutable access due to atomics and interior mutability - so it is safe to use + /// a read lock. pub fn add_throughput_cycle( - &mut self, + &self, targets: &[usize], bytes: (u64, u64), - median_rtt: f32, ) { for idx in targets { // Safety first: use "get" to ensure that the node exists - if let Some(node) = self.nodes.get_mut(*idx) { - node.current_throughput.0 += bytes.0; - node.current_throughput.1 += bytes.1; - if median_rtt > 0.0 { - node.rtts.push(median_rtt); - } + if let Some(node) = self.nodes.get(*idx) { + node.current_throughput.0.fetch_add(bytes.0, std::sync::atomic::Ordering::Relaxed); + node.current_throughput.1.fetch_add(bytes.1, std::sync::atomic::Ordering::Relaxed); + } else { + warn!("No network tree entry for index {idx}"); + } + } + } + + /// Record RTT time in the tree. Note that due to interior mutability, + /// this does not require mutable access. + pub fn add_rtt_cycle(&self, targets: &[usize], rtt: f32) { + for idx in targets { + // Safety first: use "get" to ensure that the node exists + if let Some(node) = self.nodes.get(*idx) { + node.rtts.insert((rtt * 100.0) as u16); } else { warn!("No network tree entry for index {idx}"); } @@ -193,22 +250,28 @@ fn recurse_node( immediate_parent: usize, ) { info!("Mapping {name} from network.json"); + let mut parents = parents.to_vec(); + let my_id = if name != "children" { + parents.push(nodes.len()); + nodes.len() + } else { + nodes.len() - 1 + }; let node = NetworkJsonNode { parents: parents.to_vec(), max_throughput: ( json_to_u32(json.get("downloadBandwidthMbps")), json_to_u32(json.get("uploadBandwidthMbps")), ), - current_throughput: (0, 0), + current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), name: name.to_string(), immediate_parent: Some(immediate_parent), - rtts: Vec::new(), + rtts: DashSet::new(), }; - let my_id = nodes.len(); - nodes.push(node); - let mut parents = parents.to_vec(); - parents.push(my_id); + if node.name != "children" { + nodes.push(node); + } // Recurse children for (key, value) in json.iter() { diff --git a/src/rust/lqos_node_manager/Cargo.toml b/src/rust/lqos_node_manager/Cargo.toml index 8c7bf029..69eb1030 100644 --- a/src/rust/lqos_node_manager/Cargo.toml +++ b/src/rust/lqos_node_manager/Cargo.toml @@ -10,8 +10,6 @@ equinix_tests = [] [dependencies] rocket = { version = "0.5.0-rc.2", features = [ "json", "msgpack", "uuid" ] } rocket_async_compression = "0.2.0" -lazy_static = "1.4" -parking_lot = "0.12" lqos_bus = { path = "../lqos_bus" } lqos_config = { path = "../lqos_config" } lqos_utils = { path = "../lqos_utils" } diff --git a/src/rust/lqos_node_manager/src/auth_guard.rs b/src/rust/lqos_node_manager/src/auth_guard.rs index a23cd4d9..e714f342 100644 --- a/src/rust/lqos_node_manager/src/auth_guard.rs +++ b/src/rust/lqos_node_manager/src/auth_guard.rs @@ -1,7 +1,8 @@ +use std::sync::Mutex; + use anyhow::Error; -use lazy_static::*; use lqos_config::{UserRole, WebUsers}; -use parking_lot::Mutex; +use once_cell::sync::Lazy; use rocket::serde::{json::Json, Deserialize, Serialize}; use rocket::{ http::{Cookie, CookieJar, Status}, @@ -9,9 +10,8 @@ use rocket::{ Request, }; -lazy_static! { - static ref WEB_USERS: Mutex> = Mutex::new(None); -} +static WEB_USERS: Lazy>> = + Lazy::new(|| Mutex::new(None)); #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum AuthGuard { @@ -27,7 +27,7 @@ impl<'r> FromRequest<'r> for AuthGuard { async fn from_request( request: &'r Request<'_>, ) -> Outcome { - let mut lock = WEB_USERS.lock(); + let mut lock = WEB_USERS.lock().unwrap(); if lock.is_none() { if WebUsers::does_users_file_exist().unwrap() { *lock = Some(WebUsers::load_or_create().unwrap()); @@ -82,7 +82,7 @@ pub fn create_first_user( if WebUsers::does_users_file_exist().unwrap() { return Json("ERROR".to_string()); } - let mut lock = WEB_USERS.lock(); + let mut lock = WEB_USERS.lock().unwrap(); let mut users = WebUsers::load_or_create().unwrap(); users.allow_anonymous(info.allow_anonymous).unwrap(); let token = users @@ -102,7 +102,7 @@ pub struct LoginAttempt { #[post("/api/login", data = "")] pub fn login(cookies: &CookieJar, info: Json) -> Json { - let mut lock = WEB_USERS.lock(); + let mut lock = WEB_USERS.lock().unwrap(); if lock.is_none() && WebUsers::does_users_file_exist().unwrap() { *lock = Some(WebUsers::load_or_create().unwrap()); } @@ -126,7 +126,7 @@ pub fn admin_check(auth: AuthGuard) -> Json { #[get("/api/username")] pub fn username(_auth: AuthGuard, cookies: &CookieJar) -> Json { if let Some(token) = cookies.get("User-Token") { - let lock = WEB_USERS.lock(); + let lock = WEB_USERS.lock().unwrap(); if let Some(users) = &*lock { return Json(users.get_username(token.value())); } diff --git a/src/rust/lqos_node_manager/src/config_control.rs b/src/rust/lqos_node_manager/src/config_control.rs index 27e3398f..e1f86618 100644 --- a/src/rust/lqos_node_manager/src/config_control.rs +++ b/src/rust/lqos_node_manager/src/config_control.rs @@ -1,8 +1,8 @@ use crate::{auth_guard::AuthGuard, cache_control::NoCache}; use default_net::get_interfaces; -use lqos_bus::{bus_request, BusRequest}; +use lqos_bus::{bus_request, BusRequest, BusResponse}; use lqos_config::{EtcLqos, LibreQoSConfig, Tunables}; -use rocket::{fs::NamedFile, serde::json::Json}; +use rocket::{fs::NamedFile, serde::{json::Json, Serialize}}; // Note that NoCache can be replaced with a cache option // once the design work is complete. @@ -76,3 +76,23 @@ pub async fn update_lqos_tuning( Json("OK".to_string()) } + +#[derive(Serialize, Clone, Default)] +#[serde(crate = "rocket::serde")] +pub struct LqosStats { + pub bus_requests_since_start: u64, + pub time_to_poll_hosts_us: u64, +} + +#[get("/api/stats")] +pub async fn stats() -> NoCache> { + for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() { + if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts } = msg { + return NoCache::new(Json(LqosStats { + bus_requests_since_start: bus_requests, + time_to_poll_hosts_us: time_to_poll_hosts + })); + } + } + NoCache::new(Json(LqosStats::default())) +} \ No newline at end of file diff --git a/src/rust/lqos_node_manager/src/main.rs b/src/rust/lqos_node_manager/src/main.rs index b9fe840b..a54deeae 100644 --- a/src/rust/lqos_node_manager/src/main.rs +++ b/src/rust/lqos_node_manager/src/main.rs @@ -9,8 +9,8 @@ mod unknown_devices; use rocket_async_compression::Compression; mod auth_guard; mod config_control; -mod queue_info; mod network_tree; +mod queue_info; // Use JemAllocator only on supported platforms #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] @@ -46,14 +46,12 @@ fn rocket() -> _ { static_pages::klingon, // API calls tracker::current_throughput, - tracker::throughput_ring, tracker::cpu_usage, tracker::ram_usage, tracker::top_10_downloaders, tracker::worst_10_rtt, tracker::rtt_histogram, tracker::host_counts, - tracker::busy_quantile, shaped_devices::all_shaped_devices, shaped_devices::shaped_devices_count, shaped_devices::shaped_devices_range, @@ -80,10 +78,16 @@ fn rocket() -> _ { static_pages::login_page, auth_guard::username, network_tree::tree_entry, + network_tree::tree_clients, + network_tree::network_tree_summary, + network_tree::node_names, + network_tree::funnel_for_queue, + config_control::stats, // Supporting files static_pages::bootsrap_css, static_pages::plotly_js, static_pages::jquery_js, + static_pages::msgpack_js, static_pages::bootsrap_js, static_pages::tinylogo, static_pages::favicon, diff --git a/src/rust/lqos_node_manager/src/network_tree.rs b/src/rust/lqos_node_manager/src/network_tree.rs index c79c065d..481df1c4 100644 --- a/src/rust/lqos_node_manager/src/network_tree.rs +++ b/src/rust/lqos_node_manager/src/network_tree.rs @@ -1,8 +1,13 @@ -use lqos_bus::{bus_request, BusRequest, BusResponse}; -use lqos_config::NetworkJsonNode; -use rocket::{fs::NamedFile, serde::json::Json}; +use std::net::IpAddr; -use crate::cache_control::NoCache; +use lqos_bus::{bus_request, BusRequest, BusResponse}; +use lqos_config::NetworkJsonTransport; +use rocket::{ + fs::NamedFile, + serde::{json::Json, Serialize, msgpack::MsgPack}, +}; + +use crate::{cache_control::NoCache, tracker::SHAPED_DEVICES}; // Note that NoCache can be replaced with a cache option // once the design work is complete. @@ -14,7 +19,7 @@ pub async fn tree_page<'a>() -> NoCache> { #[get("/api/network_tree/")] pub async fn tree_entry( parent: usize, -) -> NoCache>> { +) -> NoCache>> { let responses = bus_request(vec![BusRequest::GetNetworkMap { parent }]).await.unwrap(); let result = match &responses[0] { @@ -22,5 +27,105 @@ pub async fn tree_entry( _ => Vec::new(), }; + NoCache::new(MsgPack(result)) +} + +#[get("/api/network_tree_summary")] +pub async fn network_tree_summary( +) -> NoCache>> { + let responses = + bus_request(vec![BusRequest::TopMapQueues(4)]).await.unwrap(); + let result = match &responses[0] { + BusResponse::NetworkMap(nodes) => nodes.to_owned(), + _ => Vec::new(), + }; + NoCache::new(MsgPack(result)) +} + +#[derive(Serialize, Clone)] +#[serde(crate = "rocket::serde")] +pub struct CircuitThroughput { + pub id: String, + pub name: String, + pub traffic: (u64, u64), + pub limit: (u64, u64), +} + +#[get("/api/tree_clients/")] +pub async fn tree_clients( + parent: String, +) -> NoCache>> { + let mut result = Vec::new(); + for msg in + bus_request(vec![BusRequest::GetHostCounter]).await.unwrap().iter() + { + let devices = SHAPED_DEVICES.read().unwrap(); + if let BusResponse::HostCounters(hosts) = msg { + for (ip, down, up) in hosts.iter() { + let lookup = match ip { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => *ip, + }; + if let Some(c) = devices.trie.longest_match(lookup) { + if devices.devices[*c.1].parent_node == parent { + result.push(CircuitThroughput { + id: devices.devices[*c.1].circuit_id.clone(), + name: devices.devices[*c.1].circuit_name.clone(), + traffic: (*down, *up), + limit: ( + devices.devices[*c.1].download_max_mbps as u64, + devices.devices[*c.1].upload_max_mbps as u64, + ), + }); + } + } + } + } + } + NoCache::new(MsgPack(result)) +} + +#[post("/api/node_names", data = "")] +pub async fn node_names( + nodes: Json>, +) -> NoCache>> { + let mut result = Vec::new(); + for msg in bus_request(vec![BusRequest::GetNodeNamesFromIds(nodes.0)]) + .await + .unwrap() + .iter() + { + if let BusResponse::NodeNames(map) = msg { + result.extend_from_slice(map); + } + } + + NoCache::new(Json(result)) +} + +#[get("/api/funnel_for_queue/")] +pub async fn funnel_for_queue( + circuit_id: String, +) -> NoCache>> { + let mut result = Vec::new(); + + let target = SHAPED_DEVICES + .read() + .unwrap() + .devices + .iter() + .find(|d| d.circuit_id == circuit_id) + .as_ref() + .unwrap() + .parent_node + .clone(); + + for msg in + bus_request(vec![BusRequest::GetFunnel { target }]).await.unwrap().iter() + { + if let BusResponse::NetworkMap(map) = msg { + result.extend_from_slice(map); + } + } NoCache::new(Json(result)) } diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index ff4e4f5e..5047f1be 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -29,8 +29,12 @@ pub async fn circuit_info( circuit_id: String, _auth: AuthGuard, ) -> NoCache> { - if let Some(device) = - SHAPED_DEVICES.read().devices.iter().find(|d| d.circuit_id == circuit_id) + if let Some(device) = SHAPED_DEVICES + .read() + .unwrap() + .devices + .iter() + .find(|d| d.circuit_id == circuit_id) { let result = CircuitInfo { name: device.circuit_name.clone(), @@ -63,7 +67,7 @@ pub async fn current_circuit_throughput( bus_request(vec![BusRequest::GetHostCounter]).await.unwrap().iter() { if let BusResponse::HostCounters(hosts) = msg { - let devices = SHAPED_DEVICES.read(); + let devices = SHAPED_DEVICES.read().unwrap(); for (ip, down, up) in hosts.iter() { let lookup = match ip { IpAddr::V4(ip) => ip.to_ipv6_mapped(), diff --git a/src/rust/lqos_node_manager/src/shaped_devices.rs b/src/rust/lqos_node_manager/src/shaped_devices.rs index 0200ee73..64737a0b 100644 --- a/src/rust/lqos_node_manager/src/shaped_devices.rs +++ b/src/rust/lqos_node_manager/src/shaped_devices.rs @@ -13,12 +13,12 @@ static RELOAD_REQUIRED: AtomicBool = AtomicBool::new(false); pub fn all_shaped_devices( _auth: AuthGuard, ) -> NoCache>> { - NoCache::new(Json(SHAPED_DEVICES.read().devices.clone())) + NoCache::new(Json(SHAPED_DEVICES.read().unwrap().devices.clone())) } #[get("/api/shaped_devices_count")] pub fn shaped_devices_count(_auth: AuthGuard) -> NoCache> { - NoCache::new(Json(SHAPED_DEVICES.read().devices.len())) + NoCache::new(Json(SHAPED_DEVICES.read().unwrap().devices.len())) } #[get("/api/shaped_devices_range//")] @@ -27,7 +27,7 @@ pub fn shaped_devices_range( end: usize, _auth: AuthGuard, ) -> NoCache>> { - let reader = SHAPED_DEVICES.read(); + let reader = SHAPED_DEVICES.read().unwrap(); let result: Vec = reader.devices.iter().skip(start).take(end).cloned().collect(); NoCache::new(Json(result)) @@ -39,7 +39,7 @@ pub fn shaped_devices_search( _auth: AuthGuard, ) -> NoCache>> { let term = term.trim().to_lowercase(); - let reader = SHAPED_DEVICES.read(); + let reader = SHAPED_DEVICES.read().unwrap(); let result: Vec = reader .devices .iter() diff --git a/src/rust/lqos_node_manager/src/static_pages.rs b/src/rust/lqos_node_manager/src/static_pages.rs index 5626863e..7e625824 100644 --- a/src/rust/lqos_node_manager/src/static_pages.rs +++ b/src/rust/lqos_node_manager/src/static_pages.rs @@ -104,6 +104,11 @@ pub async fn jquery_js<'a>() -> LongCache> { LongCache::new(NamedFile::open("static/vendor/jquery.min.js").await.ok()) } +#[get("/vendor/msgpack.min.js")] +pub async fn msgpack_js<'a>() -> LongCache> { + LongCache::new(NamedFile::open("static/vendor/msgpack.min.js").await.ok()) +} + #[get("/vendor/bootstrap.bundle.min.js")] pub async fn bootsrap_js<'a>() -> LongCache> { LongCache::new( diff --git a/src/rust/lqos_node_manager/src/tracker/cache/lqosd_stats.rs b/src/rust/lqos_node_manager/src/tracker/cache/lqosd_stats.rs deleted file mode 100644 index 4c00bc08..00000000 --- a/src/rust/lqos_node_manager/src/tracker/cache/lqosd_stats.rs +++ /dev/null @@ -1,22 +0,0 @@ -use lazy_static::*; -use lqos_bus::IpStats; -use parking_lot::RwLock; - -lazy_static! { - pub static ref TOP_10_DOWNLOADERS: RwLock> = - RwLock::new(Vec::with_capacity(10)); -} - -lazy_static! { - pub static ref WORST_10_RTT: RwLock> = - RwLock::new(Vec::with_capacity(10)); -} - -lazy_static! { - pub static ref RTT_HISTOGRAM: RwLock> = - RwLock::new(Vec::with_capacity(100)); -} - -lazy_static! { - pub static ref HOST_COUNTS: RwLock<(u32, u32)> = RwLock::new((0, 0)); -} diff --git a/src/rust/lqos_node_manager/src/tracker/cache/mod.rs b/src/rust/lqos_node_manager/src/tracker/cache/mod.rs index cbfdc864..dcc23457 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache/mod.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache/mod.rs @@ -3,11 +3,7 @@ //! of the system. mod cpu_ram; -mod lqosd_stats; mod shaped_devices; -mod throughput; pub use cpu_ram::*; -pub use lqosd_stats::*; pub use shaped_devices::*; -pub use throughput::*; diff --git a/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs b/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs index 28ebb2f7..961b7550 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache/shaped_devices.rs @@ -1,18 +1,9 @@ -use lazy_static::*; -use lqos_bus::IpStats; use lqos_config::ConfigShapedDevices; -use parking_lot::RwLock; +use once_cell::sync::Lazy; +use std::sync::RwLock; -lazy_static! { - /// Global storage of the shaped devices csv data. - /// Updated by the file system watcher whenever - /// the underlying file changes. - pub static ref SHAPED_DEVICES : RwLock = RwLock::new(ConfigShapedDevices::default()); -} - -lazy_static! { - /// Global storage of the shaped devices csv data. - /// Updated by the file system watcher whenever - /// the underlying file changes. - pub static ref UNKNOWN_DEVICES : RwLock> = RwLock::new(Vec::new()); -} +/// Global storage of the shaped devices csv data. +/// Updated by the file system watcher whenever +/// the underlying file changes. +pub static SHAPED_DEVICES: Lazy> = + Lazy::new(|| RwLock::new(ConfigShapedDevices::default())); diff --git a/src/rust/lqos_node_manager/src/tracker/cache/throughput.rs b/src/rust/lqos_node_manager/src/tracker/cache/throughput.rs deleted file mode 100644 index 033d9cca..00000000 --- a/src/rust/lqos_node_manager/src/tracker/cache/throughput.rs +++ /dev/null @@ -1,63 +0,0 @@ -use lazy_static::*; -use parking_lot::RwLock; -use rocket::serde::Serialize; - -lazy_static! { - /// Global storage of the current throughput counter. - pub static ref CURRENT_THROUGHPUT : RwLock = RwLock::new(ThroughputPerSecond::default()); -} - -lazy_static! { - /// Global storage of the last N seconds throughput buffer. - pub static ref THROUGHPUT_BUFFER : RwLock = RwLock::new(ThroughputRingbuffer::new()); -} - -/// Stores total system throughput per second. -#[derive(Debug, Clone, Copy, Serialize, Default)] -#[serde(crate = "rocket::serde")] -pub struct ThroughputPerSecond { - pub bits_per_second: (u64, u64), - pub packets_per_second: (u64, u64), - pub shaped_bits_per_second: (u64, u64), -} - -/// How many entries (at one per second) should we keep in the -/// throughput ringbuffer? -const RINGBUFFER_SAMPLES: usize = 300; - -/// Stores Throughput samples in a ringbuffer, continually -/// updating. There are always RINGBUFFER_SAMPLES available, -/// allowing for non-allocating/non-growing storage of -/// throughput for the dashboard summaries. -pub struct ThroughputRingbuffer { - readings: Vec, - next: usize, -} - -impl ThroughputRingbuffer { - fn new() -> Self { - Self { - readings: vec![ThroughputPerSecond::default(); RINGBUFFER_SAMPLES], - next: 0, - } - } - - pub fn store(&mut self, reading: ThroughputPerSecond) { - self.readings[self.next] = reading; - self.next += 1; - self.next %= RINGBUFFER_SAMPLES; - } - - pub fn get_result(&self) -> Vec { - let mut result = Vec::with_capacity(RINGBUFFER_SAMPLES); - - for i in self.next..RINGBUFFER_SAMPLES { - result.push(self.readings[i]); - } - for i in 0..self.next { - result.push(self.readings[i]); - } - - result - } -} diff --git a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs index fbde2066..2de8c8e5 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs @@ -3,7 +3,6 @@ //! when there are multiple clients. use super::cache::*; use anyhow::Result; -use lqos_bus::{bus_request, BusRequest, BusResponse, IpStats}; use lqos_config::ConfigShapedDevices; use lqos_utils::file_watcher::FileWatcher; use nix::sys::{ @@ -11,7 +10,7 @@ use nix::sys::{ timerfd::{ClockId, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags}, }; use rocket::tokio::task::spawn_blocking; -use std::{net::IpAddr, sync::atomic::AtomicBool}; +use std::{sync::atomic::AtomicBool}; /// Once per second, update CPU and RAM usage and ask /// `lqosd` for updated system statistics. @@ -69,10 +68,6 @@ pub async fn update_tracking() { .store(sys.used_memory(), std::sync::atomic::Ordering::Relaxed); TOTAL_RAM .store(sys.total_memory(), std::sync::atomic::Ordering::Relaxed); - let error = get_data_from_server().await; // Ignoring errors to keep running - if let Err(error) = error { - error!("Error in usage update loop: {:?}", error); - } monitor_busy.store(false, std::sync::atomic::Ordering::Relaxed); } @@ -95,10 +90,10 @@ fn load_shaped_devices() { let shaped_devices = ConfigShapedDevices::load(); if let Ok(new_file) = shaped_devices { info!("ShapedDevices.csv loaded"); - *SHAPED_DEVICES.write() = new_file; + *SHAPED_DEVICES.write().unwrap() = new_file; } else { warn!("ShapedDevices.csv failed to load, see previous error messages. Reverting to empty set."); - *SHAPED_DEVICES.write() = ConfigShapedDevices::default(); + *SHAPED_DEVICES.write().unwrap() = ConfigShapedDevices::default(); } } @@ -123,77 +118,3 @@ fn watch_for_shaped_devices_changing() -> Result<()> { info!("ShapedDevices watcher returned: {result:?}"); } } - -/// Requests data from `lqosd` and stores it in local -/// caches. -async fn get_data_from_server() -> Result<()> { - // Send request to lqosd - let requests = vec![ - BusRequest::GetCurrentThroughput, - BusRequest::GetTopNDownloaders { start: 0, end: 10 }, - BusRequest::GetWorstRtt { start: 0, end: 10 }, - BusRequest::RttHistogram, - BusRequest::AllUnknownIps, - ]; - - for r in bus_request(requests).await?.iter() { - match r { - BusResponse::CurrentThroughput { - bits_per_second, - packets_per_second, - shaped_bits_per_second, - } => { - { - let mut lock = CURRENT_THROUGHPUT.write(); - lock.bits_per_second = *bits_per_second; - lock.packets_per_second = *packets_per_second; - } // Lock scope - { - let mut lock = THROUGHPUT_BUFFER.write(); - lock.store(ThroughputPerSecond { - packets_per_second: *packets_per_second, - bits_per_second: *bits_per_second, - shaped_bits_per_second: *shaped_bits_per_second, - }); - } - } - BusResponse::TopDownloaders(stats) => { - *TOP_10_DOWNLOADERS.write() = stats.clone(); - } - BusResponse::WorstRtt(stats) => { - *WORST_10_RTT.write() = stats.clone(); - } - BusResponse::RttHistogram(stats) => { - *RTT_HISTOGRAM.write() = stats.clone(); - } - BusResponse::AllUnknownIps(unknowns) => { - *HOST_COUNTS.write() = (unknowns.len() as u32, 0); - let cfg = SHAPED_DEVICES.read(); - let really_unknown: Vec = unknowns - .iter() - .filter(|ip| { - if let Ok(ip) = ip.ip_address.parse::() { - let lookup = match ip { - IpAddr::V4(ip) => ip.to_ipv6_mapped(), - IpAddr::V6(ip) => ip, - }; - cfg.trie.longest_match(lookup).is_none() - } else { - false - } - }) - .cloned() - .collect(); - *HOST_COUNTS.write() = (really_unknown.len() as u32, 0); - *UNKNOWN_DEVICES.write() = really_unknown; - } - BusResponse::NotReadyYet => { - warn!("Host system isn't ready to answer all queries yet."); - } - // Default - _ => {} - } - } - - Ok(()) -} diff --git a/src/rust/lqos_node_manager/src/tracker/mod.rs b/src/rust/lqos_node_manager/src/tracker/mod.rs index 56b9f15a..fc3e4f72 100644 --- a/src/rust/lqos_node_manager/src/tracker/mod.rs +++ b/src/rust/lqos_node_manager/src/tracker/mod.rs @@ -1,18 +1,15 @@ mod cache; mod cache_manager; +use std::net::IpAddr; + use self::cache::{ - CPU_USAGE, CURRENT_THROUGHPUT, HOST_COUNTS, NUM_CPUS, RAM_USED, - RTT_HISTOGRAM, THROUGHPUT_BUFFER, TOP_10_DOWNLOADERS, TOTAL_RAM, - WORST_10_RTT, + CPU_USAGE, NUM_CPUS, RAM_USED, TOTAL_RAM, }; -use crate::{auth_guard::AuthGuard, tracker::cache::ThroughputPerSecond}; -pub use cache::{SHAPED_DEVICES, UNKNOWN_DEVICES}; +use crate::{auth_guard::AuthGuard, cache_control::NoCache}; +pub use cache::SHAPED_DEVICES; pub use cache_manager::update_tracking; -use lazy_static::lazy_static; -use lqos_bus::{IpStats, TcHandle}; -use lqos_config::LibreQoSConfig; -use parking_lot::Mutex; -use rocket::serde::{json::Json, Deserialize, Serialize}; +use lqos_bus::{bus_request, BusRequest, BusResponse, IpStats, TcHandle}; +use rocket::serde::{Deserialize, Serialize, msgpack::MsgPack}; #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(crate = "rocket::serde")] @@ -41,6 +38,7 @@ impl From<&IpStats> for IpStatsWithPlan { if !result.circuit_id.is_empty() { if let Some(circuit) = SHAPED_DEVICES .read() + .unwrap() .devices .iter() .find(|sd| sd.circuit_id == result.circuit_id) @@ -50,8 +48,7 @@ impl From<&IpStats> for IpStatsWithPlan { } else { &circuit.circuit_name }; - result.ip_address = - format!("{} ({})", name, result.ip_address); + result.ip_address = format!("{} ({})", name, result.ip_address); result.plan = (circuit.download_max_mbps, circuit.download_min_mbps); } } @@ -60,91 +57,130 @@ impl From<&IpStats> for IpStatsWithPlan { } } -#[get("/api/current_throughput")] -pub fn current_throughput(_auth: AuthGuard) -> Json { - let result = *CURRENT_THROUGHPUT.read(); - Json(result) +/// Stores total system throughput per second. +#[derive(Debug, Clone, Copy, Serialize, Default)] +#[serde(crate = "rocket::serde")] +pub struct ThroughputPerSecond { + pub bits_per_second: (u64, u64), + pub packets_per_second: (u64, u64), + pub shaped_bits_per_second: (u64, u64), } -#[get("/api/throughput_ring")] -pub fn throughput_ring(_auth: AuthGuard) -> Json> { - let result = THROUGHPUT_BUFFER.read().get_result(); - Json(result) +#[get("/api/current_throughput")] +pub async fn current_throughput( + _auth: AuthGuard, +) -> NoCache> { + let mut result = ThroughputPerSecond::default(); + if let Ok(messages) = + bus_request(vec![BusRequest::GetCurrentThroughput]).await + { + for msg in messages { + if let BusResponse::CurrentThroughput { + bits_per_second, + packets_per_second, + shaped_bits_per_second, + } = msg + { + result.bits_per_second = bits_per_second; + result.packets_per_second = packets_per_second; + result.shaped_bits_per_second = shaped_bits_per_second; + } + } + } + NoCache::new(MsgPack(result)) } #[get("/api/cpu")] -pub fn cpu_usage(_auth: AuthGuard) -> Json> { +pub fn cpu_usage(_auth: AuthGuard) -> NoCache>> { let usage: Vec = CPU_USAGE .iter() .take(NUM_CPUS.load(std::sync::atomic::Ordering::Relaxed)) .map(|cpu| cpu.load(std::sync::atomic::Ordering::Relaxed)) .collect(); - Json(usage) + NoCache::new(MsgPack(usage)) } #[get("/api/ram")] -pub fn ram_usage(_auth: AuthGuard) -> Json> { +pub fn ram_usage(_auth: AuthGuard) -> NoCache>> { let ram_usage = RAM_USED.load(std::sync::atomic::Ordering::Relaxed); let total_ram = TOTAL_RAM.load(std::sync::atomic::Ordering::Relaxed); - Json(vec![ram_usage, total_ram]) + NoCache::new(MsgPack(vec![ram_usage, total_ram])) } #[get("/api/top_10_downloaders")] -pub fn top_10_downloaders(_auth: AuthGuard) -> Json> { - let tt: Vec = - TOP_10_DOWNLOADERS.read().iter().map(|tt| tt.into()).collect(); - Json(tt) +pub async fn top_10_downloaders(_auth: AuthGuard) -> NoCache>> { + if let Ok(messages) = bus_request(vec![BusRequest::GetTopNDownloaders { start: 0, end: 10 }]).await + { + for msg in messages { + if let BusResponse::TopDownloaders(stats) = msg { + let result = stats.iter().map(|tt| tt.into()).collect(); + return NoCache::new(MsgPack(result)); + } + } + } + + NoCache::new(MsgPack(Vec::new())) } #[get("/api/worst_10_rtt")] -pub fn worst_10_rtt(_auth: AuthGuard) -> Json> { - let tt: Vec = - WORST_10_RTT.read().iter().map(|tt| tt.into()).collect(); - Json(tt) +pub async fn worst_10_rtt(_auth: AuthGuard) -> NoCache>> { + if let Ok(messages) = bus_request(vec![BusRequest::GetWorstRtt { start: 0, end: 10 }]).await + { + for msg in messages { + if let BusResponse::WorstRtt(stats) = msg { + let result = stats.iter().map(|tt| tt.into()).collect(); + return NoCache::new(MsgPack(result)); + } + } + } + + NoCache::new(MsgPack(Vec::new())) } #[get("/api/rtt_histogram")] -pub fn rtt_histogram(_auth: AuthGuard) -> Json> { - Json(RTT_HISTOGRAM.read().clone()) +pub async fn rtt_histogram(_auth: AuthGuard) -> NoCache>> { + if let Ok(messages) = bus_request(vec![BusRequest::RttHistogram]).await + { + for msg in messages { + if let BusResponse::RttHistogram(stats) = msg { + let result = stats; + return NoCache::new(MsgPack(result)); + } + } + } + + NoCache::new(MsgPack(Vec::new())) } #[get("/api/host_counts")] -pub fn host_counts(_auth: AuthGuard) -> Json<(u32, u32)> { - let shaped_reader = SHAPED_DEVICES.read(); - let n_devices = shaped_reader.devices.len(); - let host_counts = HOST_COUNTS.read(); +pub async fn host_counts(_auth: AuthGuard) -> NoCache> { + let mut host_counts = (0, 0); + if let Ok(messages) = bus_request(vec![BusRequest::AllUnknownIps]).await { + for msg in messages { + if let BusResponse::AllUnknownIps(unknowns) = msg { + let really_unknown: Vec = unknowns + .iter() + .filter(|ip| { + if let Ok(ip) = ip.ip_address.parse::() { + let lookup = match ip { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + SHAPED_DEVICES.read().unwrap().trie.longest_match(lookup).is_none() + } else { + false + } + }) + .cloned() + .collect(); + + host_counts = (really_unknown.len() as u32, 0); + } + } + } + + let n_devices = SHAPED_DEVICES.read().unwrap().devices.len(); let unknown = host_counts.0 - host_counts.1; - Json((n_devices as u32, unknown)) -} - -lazy_static! { - static ref CONFIG: Mutex = - Mutex::new(lqos_config::LibreQoSConfig::load().unwrap()); -} - -#[get("/api/busy_quantile")] -pub fn busy_quantile(_auth: AuthGuard) -> Json> { - let (down_capacity, up_capacity) = { - let lock = CONFIG.lock(); - ( - lock.total_download_mbps as f64 * 1_000_000.0, - lock.total_upload_mbps as f64 * 1_000_000.0, - ) - }; - let throughput = THROUGHPUT_BUFFER.read().get_result(); - let mut result = vec![(0, 0); 10]; - throughput.iter().for_each(|tp| { - let (down, up) = tp.bits_per_second; - let (down, up) = (down * 8, up * 8); - //println!("{down_capacity}, {up_capacity}, {down}, {up}"); - let (down, up) = ( - if down_capacity > 0.0 { down as f64 / down_capacity } else { 0.0 }, - if up_capacity > 0.0 { up as f64 / up_capacity } else { 0.0 }, - ); - let (down, up) = ((down * 10.0) as usize, (up * 10.0) as usize); - result[usize::min(9, down)].0 += 1; - result[usize::min(0, up)].1 += 1; - }); - Json(result) + NoCache::new(MsgPack((n_devices as u32, unknown))) } diff --git a/src/rust/lqos_node_manager/src/unknown_devices.rs b/src/rust/lqos_node_manager/src/unknown_devices.rs index 632dd949..c1ca9c3d 100644 --- a/src/rust/lqos_node_manager/src/unknown_devices.rs +++ b/src/rust/lqos_node_manager/src/unknown_devices.rs @@ -1,35 +1,65 @@ +use std::net::IpAddr; + use crate::{ - auth_guard::AuthGuard, cache_control::NoCache, tracker::UNKNOWN_DEVICES, + auth_guard::AuthGuard, cache_control::NoCache, tracker::SHAPED_DEVICES }; -use lqos_bus::IpStats; +use lqos_bus::{IpStats, bus_request, BusRequest, BusResponse}; use rocket::serde::json::Json; +async fn unknown_devices() -> Vec { + if let Ok(messages) = bus_request(vec![BusRequest::AllUnknownIps]).await { + for msg in messages { + if let BusResponse::AllUnknownIps(unknowns) = msg { + let cfg = SHAPED_DEVICES.read().unwrap(); + let really_unknown: Vec = unknowns + .iter() + .filter(|ip| { + if let Ok(ip) = ip.ip_address.parse::() { + let lookup = match ip { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + cfg.trie.longest_match(lookup).is_none() + } else { + false + } + }) + .cloned() + .collect(); + return really_unknown; + } + } + } + + Vec::new() +} + #[get("/api/all_unknown_devices")] -pub fn all_unknown_devices(_auth: AuthGuard) -> NoCache>> { - NoCache::new(Json(UNKNOWN_DEVICES.read().clone())) +pub async fn all_unknown_devices(_auth: AuthGuard) -> NoCache>> { + NoCache::new(Json(unknown_devices().await)) } #[get("/api/unknown_devices_count")] -pub fn unknown_devices_count(_auth: AuthGuard) -> NoCache> { - NoCache::new(Json(UNKNOWN_DEVICES.read().len())) +pub async fn unknown_devices_count(_auth: AuthGuard) -> NoCache> { + NoCache::new(Json(unknown_devices().await.len())) } #[get("/api/unknown_devices_range//")] -pub fn unknown_devices_range( +pub async fn unknown_devices_range( start: usize, end: usize, _auth: AuthGuard, ) -> NoCache>> { - let reader = UNKNOWN_DEVICES.read(); + let reader = unknown_devices().await; let result: Vec = reader.iter().skip(start).take(end).cloned().collect(); NoCache::new(Json(result)) } #[get("/api/unknown_devices_csv")] -pub fn unknown_devices_csv(_auth: AuthGuard) -> NoCache { +pub async fn unknown_devices_csv(_auth: AuthGuard) -> NoCache { let mut result = String::new(); - let reader = UNKNOWN_DEVICES.read(); + let reader = unknown_devices().await; for unknown in reader.iter() { result += &format!("{}\n", unknown.ip_address); diff --git a/src/rust/lqos_node_manager/static/circuit_queue.html b/src/rust/lqos_node_manager/static/circuit_queue.html index 2574924e..fcd7f8a0 100644 --- a/src/rust/lqos_node_manager/static/circuit_queue.html +++ b/src/rust/lqos_node_manager/static/circuit_queue.html @@ -9,7 +9,7 @@ LibreQoS - Local Node Manager - + @@ -25,10 +25,9 @@ - - + @@ -39,6 +38,7 @@
@@ -139,7 +142,7 @@
-
+
@@ -181,6 +184,9 @@
+ +
+
@@ -450,6 +456,63 @@ setTimeout(getThroughput, 1000); } + let funnels = new MultiRingBuffer(300); + let rtts = {}; + let circuitId = ""; + + function getFunnel(c) { + circuitId = encodeURI(c); + $.get("/api/funnel_for_queue/" + circuitId, (data) => { + let html = ""; + for (let i=0; i" + redactText(data[i][1].name) + " Throughput"; + row += "
"; + row += ""; + row += ""; + + row += "
"; + row += "
"; + row += "
" + redactText(data[i][1].name) + " TCP RTT
"; + row += "
"; + row += "
"; + row += "
"; + + row += ""; + html += row; + } + $("#pills-funnel").html(html); + setTimeout(plotFunnels, 1000); + }); + } + + function plotFunnels() { + $.get("/api/funnel_for_queue/" + encodeURI(circuitId), (data) => { + for (let i=0; i { pollQueue(); getThroughput(); + getFunnel(params.id); }); } diff --git a/src/rust/lqos_node_manager/static/config.html b/src/rust/lqos_node_manager/static/config.html index 4c447942..f53cea92 100644 --- a/src/rust/lqos_node_manager/static/config.html +++ b/src/rust/lqos_node_manager/static/config.html @@ -9,7 +9,7 @@ LibreQoS - Local Node Manager - + @@ -25,10 +25,9 @@ - - + @@ -39,6 +38,7 @@ @@ -54,227 +63,200 @@
- -
- -
-
-
-
Current Throughput
- - - - - - - - - - - -
Packets/Second
Bits/Second
+ +
+ +
+
+
+
Current Throughput
+ + + + + + + + + + + +
Packets/Second
Bits/Second
+
+
+
+ + +
+
+
+
Memory Status
+
+
+
+
+ + +
+
+
+
CPU Status
+
+
- -
-
-
-
Memory Status
-
+ +
+ +
+
+
+
Last 5 Minutes
+
+
+
+
+ + +
+
+
+
TCP Round-Trip Time Histogram
+
+
+
+
+ + +
+
+
+
Site Funnel
+
+
- -
-
-
-
CPU Status
-
+ +
+ +
+
+
+
Top 10 Downloaders
+
+
-
-
- -
- -
-
-
-
Last 5 Minutes
-
+ +
+
+
+
Worst 10 RTT
+
+
- -
-
-
-
TCP Round-Trip Time Histogram
-
-
-
-
- - -
-
-
-
Utilization Quantiles
-
-
-
-
-
- - -
- -
-
-
-
Top 10 Downloaders
-
-
-
-
- - -
-
-
-
Worst 10 RTT
-
-
-
-
-
-
© 2022-2023, LibreQoE LLC
- + + \ No newline at end of file diff --git a/src/rust/lqos_node_manager/static/shaped-add.html b/src/rust/lqos_node_manager/static/shaped-add.html index 99a734da..d589312e 100644 --- a/src/rust/lqos_node_manager/static/shaped-add.html +++ b/src/rust/lqos_node_manager/static/shaped-add.html @@ -9,7 +9,7 @@ LibreQoS - Local Node Manager - + @@ -25,10 +25,9 @@ - - + @@ -39,6 +38,7 @@