Add the beginnings of internal stats keeping for lqosd and a not-advertised /api/stats page to view.

This commit is contained in:
Herbert Wolverson 2023-03-08 19:25:30 +00:00
parent bbbe1e5b83
commit b1091a94aa
10 changed files with 58 additions and 14 deletions

View File

@ -130,6 +130,9 @@ pub enum BusRequest {
target: String,
},
/// Obtain the lqosd statistics
GetLqosStats,
/// If running on Equinix (the `equinix_test` feature is enabled),
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]

View File

@ -74,4 +74,12 @@ pub enum BusResponse {
/// Named nodes from network.json
NodeNames(Vec<(usize, String)>),
/// Statistics from lqosd
LqosdStats{
/// Number of bus requests handled
bus_requests: u64,
/// Us to poll hosts
time_to_poll_hosts: u64,
}
}

View File

@ -1,8 +1,8 @@
use crate::{auth_guard::AuthGuard, cache_control::NoCache};
use default_net::get_interfaces;
use lqos_bus::{bus_request, BusRequest};
use lqos_bus::{bus_request, BusRequest, BusResponse};
use lqos_config::{EtcLqos, LibreQoSConfig, Tunables};
use rocket::{fs::NamedFile, serde::json::Json};
use rocket::{fs::NamedFile, serde::{json::Json, Serialize}};
// Note that NoCache can be replaced with a cache option
// once the design work is complete.
@ -76,3 +76,23 @@ pub async fn update_lqos_tuning(
Json("OK".to_string())
}
#[derive(Serialize, Clone, Default)]
#[serde(crate = "rocket::serde")]
pub struct LqosStats {
pub bus_requests_since_start: u64,
pub time_to_poll_hosts_us: u64,
}
#[get("/api/stats")]
pub async fn stats() -> NoCache<Json<LqosStats>> {
for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts } = msg {
return NoCache::new(Json(LqosStats {
bus_requests_since_start: bus_requests,
time_to_poll_hosts_us: time_to_poll_hosts
}));
}
}
NoCache::new(Json(LqosStats::default()))
}

View File

@ -82,6 +82,7 @@ fn rocket() -> _ {
network_tree::network_tree_summary,
network_tree::node_names,
network_tree::funnel_for_queue,
config_control::stats,
// Supporting files
static_pages::bootsrap_css,
static_pages::plotly_js,

View File

@ -3,9 +3,8 @@ use lqos_bus::BusResponse;
pub fn get_raw_circuit_data(circuit_id: &str) -> BusResponse {
still_watching(circuit_id);
let reader = CIRCUIT_TO_QUEUE.read().unwrap();
if let Some(circuit) = reader.get(circuit_id) {
if let Ok(json) = serde_json::to_string(circuit) {
if let Some(circuit) = CIRCUIT_TO_QUEUE.get(circuit_id) {
if let Ok(json) = serde_json::to_string(circuit.value()) {
BusResponse::RawQueueData(json)
} else {
BusResponse::RawQueueData(String::new())

View File

@ -1,8 +1,6 @@
use dashmap::DashMap;
use once_cell::sync::Lazy;
use crate::queue_store::QueueStore;
use std::collections::HashMap;
use std::sync::RwLock;
pub(crate) static CIRCUIT_TO_QUEUE: Lazy<RwLock<HashMap<String, QueueStore>>> =
Lazy::new(|| RwLock::new(HashMap::new()));
pub(crate) static CIRCUIT_TO_QUEUE: Lazy<DashMap<String, QueueStore>> =
Lazy::new(DashMap::new);

View File

@ -48,13 +48,12 @@ fn track_queues() {
if let Ok(download) = download {
if let Ok(upload) = upload {
let mut mapping = CIRCUIT_TO_QUEUE.write().unwrap();
if let Some(circuit) = mapping.get_mut(circuit_id) {
if let Some(mut circuit) = CIRCUIT_TO_QUEUE.get_mut(circuit_id) {
circuit.update(&download[0], &upload[0]);
} else {
// It's new: insert it
if !download.is_empty() && !upload.is_empty() {
mapping.insert(
CIRCUIT_TO_QUEUE.insert(
circuit_id.to_string(),
QueueStore::new(download[0].clone(), upload[0].clone()),
);

View File

@ -24,7 +24,9 @@ use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals,
};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS};
use tokio::join;
mod stats;
// Use JemAllocator only on supported platforms
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
@ -120,6 +122,7 @@ fn handle_bus_requests(
) {
for req in requests.iter() {
//println!("Request: {:?}", req);
BUS_REQUESTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
responses.push(match req {
BusRequest::Ping => lqos_bus::BusResponse::Ack,
BusRequest::GetCurrentThroughput => {
@ -173,6 +176,12 @@ fn handle_bus_requests(
BusRequest::GetFunnel { target: parent } => {
shaped_devices_tracker::get_funnel(parent)
}
BusRequest::GetLqosStats => {
BusResponse::LqosdStats {
bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed),
time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed),
}
}
});
}
}

View File

@ -0,0 +1,4 @@
use std::sync::atomic::AtomicU64;
pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0);
pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0);

View File

@ -2,7 +2,7 @@ mod throughput_entry;
mod tracking_data;
use crate::{
shaped_devices_tracker::NETWORK_JSON,
throughput_tracker::tracking_data::ThroughputTracker,
throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS,
};
use log::{info, warn};
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
@ -23,6 +23,7 @@ pub fn spawn_throughput_monitor() {
std::thread::spawn(move || {
periodic(interval_ms, "Throughput Monitor", &mut || {
let start = std::time::Instant::now();
{
let net_json = NETWORK_JSON.read().unwrap();
net_json.zero_throughput_and_rtt();
@ -32,6 +33,8 @@ pub fn spawn_throughput_monitor() {
THROUGHPUT_TRACKER.apply_rtt_data();
THROUGHPUT_TRACKER.update_totals();
THROUGHPUT_TRACKER.next_cycle();
let duration_ms = start.elapsed().as_micros();
TIME_TO_POLL_HOSTS.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed);
});
});
}