A great start at tracking all of the Cake queues drop/marks stats and storing them in a central structure. The plan (probably after the holiday) is to merge these into the tree stats.

This commit is contained in:
Herbert Wolverson 2024-07-03 16:40:21 -05:00
parent e4840583a3
commit a019890b31
6 changed files with 242 additions and 3 deletions

View File

@ -23,3 +23,4 @@ pub use queue_structure::spawn_queue_structure_monitor;
pub use queue_types::deserialize_tc_tree; // Exported for the benchmarker
pub use tracking::spawn_queue_monitor;
pub use tracking::{add_watched_queue, still_watching};
pub use tracking::ALL_QUEUE_SUMMARY;

View File

@ -5,7 +5,7 @@ use log::error;
pub use queing_structure_json_monitor::spawn_queue_structure_monitor;
pub(crate) use queing_structure_json_monitor::QUEUE_STRUCTURE;
use queue_network::QueueNetwork;
use queue_node::QueueNode;
pub(crate) use queue_node::QueueNode;
use thiserror::Error;
pub(crate) fn read_queueing_structure(

View File

@ -8,6 +8,7 @@ use lqos_utils::file_watcher::FileWatcher;
use once_cell::sync::Lazy;
use thiserror::Error;
use tokio::task::spawn_blocking;
use crate::tracking::ALL_QUEUE_SUMMARY;
pub(crate) static QUEUE_STRUCTURE: Lazy<RwLock<QueueStructure>> =
Lazy::new(|| RwLock::new(QueueStructure::new()));
@ -27,6 +28,7 @@ impl QueueStructure {
}
fn update(&mut self) {
ALL_QUEUE_SUMMARY.clear();
if let Ok(queues) = read_queueing_structure() {
self.maybe_queues = Some(queues);
} else {
@ -49,7 +51,7 @@ fn update_queue_structure() {
}
/// Fires up a Linux file system watcher than notifies
/// when `ShapedDevices.csv` changes, and triggers a reload.
/// when `queuingStructure.json` changes, and triggers a reload.
fn watch_for_queueing_structure_changing() -> Result<(), QueueWatcherError> {
// Obtain the path to watch
let watch_path = QueueNetwork::path();

View File

@ -0,0 +1,86 @@
use std::collections::HashMap;
use std::sync::Mutex;
use once_cell::sync::Lazy;
use lqos_utils::units::DownUpOrder;
use crate::tracking::TrackedQueue;
pub static ALL_QUEUE_SUMMARY: Lazy<AllQueueData> = Lazy::new(|| AllQueueData::new());
#[derive(Debug)]
pub struct QueueData {
pub drops: DownUpOrder<u32>,
pub marks: DownUpOrder<u32>,
pub prev_drops: DownUpOrder<u32>,
pub prev_marks: DownUpOrder<u32>,
}
#[derive(Debug)]
pub struct AllQueueData {
data: Mutex<HashMap<String, QueueData>>
}
impl AllQueueData {
pub fn new() -> Self {
Self { data: Mutex::new(HashMap::new()) }
}
pub fn clear(&self) {
let mut lock = self.data.lock().unwrap();
lock.clear();
}
pub fn ingest_batch(&self, download: Vec<TrackedQueue>, upload: Vec<TrackedQueue>) {
let mut lock = self.data.lock().unwrap();
// Roll through moving current to previous
for (_,q) in lock.iter_mut() {
q.prev_drops = q.drops;
q.prev_marks = q.marks;
q.drops = DownUpOrder::zeroed();
q.marks = DownUpOrder::zeroed();
}
// Make download markings
for dl in download.into_iter() {
if let Some(q) = lock.get_mut(&dl.circuit_id) {
// We need to update it
q.drops.down = dl.drops;
q.marks.down = dl.marks;
} else {
// We need to add it
let mut new_record = QueueData {
drops: Default::default(),
marks: Default::default(),
prev_drops: Default::default(),
prev_marks: Default::default(),
};
new_record.drops.down = dl.drops;
new_record.marks.down = dl.marks;
lock.insert(dl.circuit_id.clone(), new_record);
}
}
// Make upload markings
for ul in upload.into_iter() {
if let Some(q) = lock.get_mut(&ul.circuit_id) {
// We need to update it
q.drops.up = ul.drops;
q.marks.up = ul.marks;
} else {
// We need to add it
let mut new_record = QueueData {
drops: Default::default(),
marks: Default::default(),
prev_drops: Default::default(),
prev_marks: Default::default(),
};
new_record.drops.up = ul.drops;
new_record.marks.up = ul.marks;
lock.insert(ul.circuit_id.clone(), new_record);
}
}
println!("{:?}", lock);
}
}

View File

@ -1,3 +1,6 @@
use std::collections::HashMap;
use lqos_bus::TcHandle;
use std::time::Instant;
use crate::{
circuit_to_queue::CIRCUIT_TO_QUEUE, interval::QUEUE_MONITOR_INTERVAL,
queue_store::QueueStore, tracking::reader::read_named_queue_from_interface,
@ -6,9 +9,15 @@ use log::info;
use lqos_utils::fdtimer::periodic;
mod reader;
mod watched_queues;
mod all_queue_data;
pub use all_queue_data::ALL_QUEUE_SUMMARY;
use self::watched_queues::expire_watched_queues;
use watched_queues::WATCHED_QUEUES;
pub use watched_queues::{add_watched_queue, still_watching};
use crate::queue_structure::{QUEUE_STRUCTURE, QueueNode};
use crate::queue_types::QueueType;
use crate::tracking::reader::read_all_queues_from_interface;
fn track_queues() {
if WATCHED_QUEUES.is_empty() {
@ -75,6 +84,105 @@ fn track_queues() {
expire_watched_queues();
}
struct TrackedQueue {
circuit_id: String,
drops: u32,
marks: u32,
}
fn connect_queues_to_circuit(structure: &[QueueNode], queues: &[QueueType]) -> Vec<TrackedQueue> {
queues
.iter()
.filter_map(|q| {
if let QueueType::Cake(cake) = q {
let (major, minor) = cake.parent.get_major_minor();
if let Some (s) = structure.iter().find(|s| s.class_major == major as u32 && s.class_minor == minor as u32) {
if let Some(circuit_id) = &s.circuit_id {
let marks: u32 = cake.tins.iter().map(|tin| tin.ecn_marks).sum();
if (cake.drops > 0 || marks > 0) {
return Some(TrackedQueue {
circuit_id: circuit_id.clone(),
drops: cake.drops,
marks,
})
}
}
}
}
None
})
.collect()
}
fn connect_queues_to_circuit_up(structure: &[QueueNode], queues: &[QueueType]) -> Vec<TrackedQueue> {
queues
.iter()
.filter_map(|q| {
if let QueueType::Cake(cake) = q {
let (major, minor) = cake.parent.get_major_minor();
if let Some (s) = structure.iter().find(|s| s.up_class_major == major as u32 && s.class_minor == minor as u32) {
if let Some(circuit_id) = &s.circuit_id {
let marks: u32 = cake.tins.iter().map(|tin| tin.ecn_marks).sum();
if (cake.drops > 0 || marks > 0) {
return Some(TrackedQueue {
circuit_id: circuit_id.clone(),
drops: cake.drops,
marks,
})
}
}
}
}
None
})
.collect()
}
fn all_queue_reader() {
let start = Instant::now();
let structure = QUEUE_STRUCTURE.read().unwrap();
if let Some(structure) = &structure.maybe_queues {
if let Ok(config) = lqos_config::load_config() {
// Get all the queues
let (download, upload) = if config.on_a_stick_mode() {
let all_queues = read_all_queues_from_interface(&config.internet_interface());
let (download, upload) = if let Ok(q) = all_queues {
let download = connect_queues_to_circuit(&structure, &q);
let upload = connect_queues_to_circuit_up(&structure, &q);
(download, upload)
} else {
(Vec::new(), Vec::new())
};
(download, upload)
} else {
let all_queues_down = read_all_queues_from_interface(&config.internet_interface());
let all_queues_up = read_all_queues_from_interface(&config.isp_interface());
let download = if let Ok(q) = all_queues_down {
connect_queues_to_circuit(&structure, &q)
} else {
Vec::new()
};
let upload = if let Ok(q) = all_queues_up {
connect_queues_to_circuit(&structure, &q)
} else {
Vec::new()
};
(download, upload)
};
println!("{}", download.len() + upload.len());
ALL_QUEUE_SUMMARY.ingest_batch(download, upload);
} else {
log::warn!("(TC monitor) Unable to read configuration");
}
} else {
log::warn!("(TC monitor) Not reading queues due to structure not yet ready");
}
let elapsed = start.elapsed();
log::warn!("(TC monitor) Completed in {:.5} seconds", elapsed.as_secs_f32());
}
/// Spawns a thread that periodically reads the queue statistics from
/// the Linux `tc` shaper, and stores them in a `QueueStore` for later
/// retrieval.
@ -96,4 +204,11 @@ pub fn spawn_queue_monitor() {
track_queues();
});
});
// Set up a 2nd thread to periodically gather ALL the queue stats
std::thread::spawn(|| {
periodic(2000, "All Queues", &mut || {
all_queue_reader();
})
});
}

View File

@ -1,11 +1,46 @@
use crate::{deserialize_tc_tree, queue_types::QueueType};
use log::error;
use log::{error, info};
use lqos_bus::TcHandle;
use std::process::Command;
use thiserror::Error;
const TC: &str = "/sbin/tc";
pub fn read_all_queues_from_interface(
interface: &str
) -> Result<Vec<QueueType>, QueueReaderError> {
let command_output = Command::new(TC)
.args([
"-s",
"-j",
"qdisc",
"show",
"dev",
interface,
])
.output()
.map_err(|e| {
info!("Failed to poll TC for queues: {interface}");
info!("{:?}", e);
QueueReaderError::CommandError
})?;
let raw_json = String::from_utf8(command_output.stdout)
.map_err(|e| {
info!("Failed to convert byte stream to UTF-8 string");
info!("{:?}", e);
QueueReaderError::Utf8Error
})?;
let result = deserialize_tc_tree(&raw_json)
.map_err(|e| {
info!("Failed to deserialize TC tree result.");
info!("{:?}", e);
QueueReaderError::Deserialization
})?;
Ok(result)
}
pub fn read_named_queue_from_interface(
interface: &str,
tc_handle: TcHandle,