diff --git a/src/rust/lqos_queue_tracker/src/bus.rs b/src/rust/lqos_queue_tracker/src/bus.rs index 8904989f..bbbb07c6 100644 --- a/src/rust/lqos_queue_tracker/src/bus.rs +++ b/src/rust/lqos_queue_tracker/src/bus.rs @@ -3,6 +3,10 @@ use crate::{ }; use lqos_bus::BusResponse; +/// Retrieves the raw queue data for a given circuit ID. +/// +/// # Arguments +/// * `circuit_id` - The circuit ID to retrieve data for. pub fn get_raw_circuit_data(circuit_id: &str) -> BusResponse { still_watching(circuit_id); if let Some(circuit) = CIRCUIT_TO_QUEUE.get(circuit_id) { diff --git a/src/rust/lqos_queue_tracker/src/interval.rs b/src/rust/lqos_queue_tracker/src/interval.rs index 126d8622..f5327bf8 100644 --- a/src/rust/lqos_queue_tracker/src/interval.rs +++ b/src/rust/lqos_queue_tracker/src/interval.rs @@ -2,6 +2,12 @@ use std::sync::atomic::AtomicU64; pub(crate) static QUEUE_MONITOR_INTERVAL: AtomicU64 = AtomicU64::new(1000); +/// Sets the interval at which the queue monitor thread will poll the +/// Linux `tc` shaper for queue statistics. +/// +/// # Arguments +/// * `interval_ms` - The interval, in milliseconds, at which the queue +/// monitor thread will poll the Linux `tc` shaper for queue statistics. pub fn set_queue_refresh_interval(interval_ms: u64) { QUEUE_MONITOR_INTERVAL .store(interval_ms, std::sync::atomic::Ordering::Relaxed); diff --git a/src/rust/lqos_queue_tracker/src/lib.rs b/src/rust/lqos_queue_tracker/src/lib.rs index 31c01050..e5a14f25 100644 --- a/src/rust/lqos_queue_tracker/src/lib.rs +++ b/src/rust/lqos_queue_tracker/src/lib.rs @@ -1,3 +1,10 @@ +//! Retrieves queue statistics from the Linux `tc` shaper, and stores +//! them in a `QueueStore` for later retrieval. The `QueueStore` is +//! thread-safe, and can be accessed from multiple threads. It is +//! updated periodically by a separate thread, and accumulates statistics +//! between polling periods. + +#![warn(missing_docs)] mod bus; mod circuit_to_queue; mod interval; diff --git a/src/rust/lqos_queue_tracker/src/tracking/mod.rs b/src/rust/lqos_queue_tracker/src/tracking/mod.rs index 93a13fcb..952e8165 100644 --- a/src/rust/lqos_queue_tracker/src/tracking/mod.rs +++ b/src/rust/lqos_queue_tracker/src/tracking/mod.rs @@ -76,6 +76,9 @@ fn track_queues() { expire_watched_queues(); } +/// Spawns a thread that periodically reads the queue statistics from +/// the Linux `tc` shaper, and stores them in a `QueueStore` for later +/// retrieval. pub fn spawn_queue_monitor() { std::thread::spawn(|| { // Setup the queue monitor loop diff --git a/src/rust/lqos_queue_tracker/src/tracking/watched_queues.rs b/src/rust/lqos_queue_tracker/src/tracking/watched_queues.rs index 3c6d49aa..2196c880 100644 --- a/src/rust/lqos_queue_tracker/src/tracking/watched_queues.rs +++ b/src/rust/lqos_queue_tracker/src/tracking/watched_queues.rs @@ -31,6 +31,13 @@ pub fn expiration_in_the_future() -> u64 { unix_now().unwrap_or(0) + 10 } +/// Start watching a queue. This will cause the queue to be read +/// periodically, and its statistics stored in the `QueueStore`. +/// If the queue is already being watched, this function will +/// do nothing. +/// +/// # Arguments +/// * `circuit_id` - The circuit ID to watch pub fn add_watched_queue(circuit_id: &str) { //info!("Watching queue {circuit_id}"); let max = num_possible_cpus().unwrap() * 2; @@ -74,6 +81,11 @@ pub(crate) fn expire_watched_queues() { WATCHED_QUEUES.retain(|_,w| w.expires_unix_time > now); } +/// Indicates that a watched queue is still being watched. Update the +/// expiration time for the queue. +/// +/// # Arguments +/// * `circuit_id` - The circuit ID to watch pub fn still_watching(circuit_id: &str) { if let Some(mut q) = WATCHED_QUEUES.get_mut(circuit_id) { //info!("Still watching circuit: {circuit_id}");