diff --git a/src/rust/lqos_bus/src/bus/request.rs b/src/rust/lqos_bus/src/bus/request.rs index 016ff2d6..43839491 100644 --- a/src/rust/lqos_bus/src/bus/request.rs +++ b/src/rust/lqos_bus/src/bus/request.rs @@ -130,6 +130,9 @@ pub enum BusRequest { target: String, }, + /// Obtain the lqosd statistics + GetLqosStats, + /// If running on Equinix (the `equinix_test` feature is enabled), /// display a "run bandwidht test" link. #[cfg(feature = "equinix_tests")] diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index 78b8e7d3..57f307b7 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -74,4 +74,12 @@ pub enum BusResponse { /// Named nodes from network.json NodeNames(Vec<(usize, String)>), + + /// Statistics from lqosd + LqosdStats{ + /// Number of bus requests handled + bus_requests: u64, + /// Us to poll hosts + time_to_poll_hosts: u64, + } } diff --git a/src/rust/lqos_node_manager/src/config_control.rs b/src/rust/lqos_node_manager/src/config_control.rs index 27e3398f..e1f86618 100644 --- a/src/rust/lqos_node_manager/src/config_control.rs +++ b/src/rust/lqos_node_manager/src/config_control.rs @@ -1,8 +1,8 @@ use crate::{auth_guard::AuthGuard, cache_control::NoCache}; use default_net::get_interfaces; -use lqos_bus::{bus_request, BusRequest}; +use lqos_bus::{bus_request, BusRequest, BusResponse}; use lqos_config::{EtcLqos, LibreQoSConfig, Tunables}; -use rocket::{fs::NamedFile, serde::json::Json}; +use rocket::{fs::NamedFile, serde::{json::Json, Serialize}}; // Note that NoCache can be replaced with a cache option // once the design work is complete. @@ -76,3 +76,23 @@ pub async fn update_lqos_tuning( Json("OK".to_string()) } + +#[derive(Serialize, Clone, Default)] +#[serde(crate = "rocket::serde")] +pub struct LqosStats { + pub bus_requests_since_start: u64, + pub time_to_poll_hosts_us: u64, +} + +#[get("/api/stats")] +pub async fn stats() -> NoCache> { + for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() { + if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts } = msg { + return NoCache::new(Json(LqosStats { + bus_requests_since_start: bus_requests, + time_to_poll_hosts_us: time_to_poll_hosts + })); + } + } + NoCache::new(Json(LqosStats::default())) +} \ No newline at end of file diff --git a/src/rust/lqos_node_manager/src/main.rs b/src/rust/lqos_node_manager/src/main.rs index e5e3d0f4..354b4461 100644 --- a/src/rust/lqos_node_manager/src/main.rs +++ b/src/rust/lqos_node_manager/src/main.rs @@ -82,6 +82,7 @@ fn rocket() -> _ { network_tree::network_tree_summary, network_tree::node_names, network_tree::funnel_for_queue, + config_control::stats, // Supporting files static_pages::bootsrap_css, static_pages::plotly_js, diff --git a/src/rust/lqos_queue_tracker/src/bus.rs b/src/rust/lqos_queue_tracker/src/bus.rs index c7a6d716..992093e3 100644 --- a/src/rust/lqos_queue_tracker/src/bus.rs +++ b/src/rust/lqos_queue_tracker/src/bus.rs @@ -3,9 +3,8 @@ use lqos_bus::BusResponse; pub fn get_raw_circuit_data(circuit_id: &str) -> BusResponse { still_watching(circuit_id); - let reader = CIRCUIT_TO_QUEUE.read().unwrap(); - if let Some(circuit) = reader.get(circuit_id) { - if let Ok(json) = serde_json::to_string(circuit) { + if let Some(circuit) = CIRCUIT_TO_QUEUE.get(circuit_id) { + if let Ok(json) = serde_json::to_string(circuit.value()) { BusResponse::RawQueueData(json) } else { BusResponse::RawQueueData(String::new()) diff --git a/src/rust/lqos_queue_tracker/src/circuit_to_queue.rs b/src/rust/lqos_queue_tracker/src/circuit_to_queue.rs index 4401c7c9..5818e141 100644 --- a/src/rust/lqos_queue_tracker/src/circuit_to_queue.rs +++ b/src/rust/lqos_queue_tracker/src/circuit_to_queue.rs @@ -1,8 +1,6 @@ +use dashmap::DashMap; use once_cell::sync::Lazy; - use crate::queue_store::QueueStore; -use std::collections::HashMap; -use std::sync::RwLock; -pub(crate) static CIRCUIT_TO_QUEUE: Lazy>> = - Lazy::new(|| RwLock::new(HashMap::new())); +pub(crate) static CIRCUIT_TO_QUEUE: Lazy> = + Lazy::new(DashMap::new); diff --git a/src/rust/lqos_queue_tracker/src/tracking/mod.rs b/src/rust/lqos_queue_tracker/src/tracking/mod.rs index bc4ca6f1..cd2044e7 100644 --- a/src/rust/lqos_queue_tracker/src/tracking/mod.rs +++ b/src/rust/lqos_queue_tracker/src/tracking/mod.rs @@ -48,13 +48,12 @@ fn track_queues() { if let Ok(download) = download { if let Ok(upload) = upload { - let mut mapping = CIRCUIT_TO_QUEUE.write().unwrap(); - if let Some(circuit) = mapping.get_mut(circuit_id) { + if let Some(mut circuit) = CIRCUIT_TO_QUEUE.get_mut(circuit_id) { circuit.update(&download[0], &upload[0]); } else { // It's new: insert it if !download.is_empty() && !upload.is_empty() { - mapping.insert( + CIRCUIT_TO_QUEUE.insert( circuit_id.to_string(), QueueStore::new(download[0].clone(), upload[0].clone()), ); diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index d64db399..05615c59 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -24,7 +24,9 @@ use signal_hook::{ consts::{SIGHUP, SIGINT, SIGTERM}, iterator::Signals, }; +use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS}; use tokio::join; +mod stats; // Use JemAllocator only on supported platforms #[cfg(any(target_arch = "x86", target_arch = "x86_64"))] @@ -120,6 +122,7 @@ fn handle_bus_requests( ) { for req in requests.iter() { //println!("Request: {:?}", req); + BUS_REQUESTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); responses.push(match req { BusRequest::Ping => lqos_bus::BusResponse::Ack, BusRequest::GetCurrentThroughput => { @@ -173,6 +176,12 @@ fn handle_bus_requests( BusRequest::GetFunnel { target: parent } => { shaped_devices_tracker::get_funnel(parent) } + BusRequest::GetLqosStats => { + BusResponse::LqosdStats { + bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed), + time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed), + } + } }); } } diff --git a/src/rust/lqosd/src/stats.rs b/src/rust/lqosd/src/stats.rs new file mode 100644 index 00000000..96edffe6 --- /dev/null +++ b/src/rust/lqosd/src/stats.rs @@ -0,0 +1,4 @@ +use std::sync::atomic::AtomicU64; + +pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0); +pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0); diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 86077ca1..aef0db2b 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -2,7 +2,7 @@ mod throughput_entry; mod tracking_data; use crate::{ shaped_devices_tracker::NETWORK_JSON, - throughput_tracker::tracking_data::ThroughputTracker, + throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS, }; use log::{info, warn}; use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult}; @@ -23,6 +23,7 @@ pub fn spawn_throughput_monitor() { std::thread::spawn(move || { periodic(interval_ms, "Throughput Monitor", &mut || { + let start = std::time::Instant::now(); { let net_json = NETWORK_JSON.read().unwrap(); net_json.zero_throughput_and_rtt(); @@ -32,6 +33,8 @@ pub fn spawn_throughput_monitor() { THROUGHPUT_TRACKER.apply_rtt_data(); THROUGHPUT_TRACKER.update_totals(); THROUGHPUT_TRACKER.next_cycle(); + let duration_ms = start.elapsed().as_micros(); + TIME_TO_POLL_HOSTS.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed); }); }); }