Improved queue watching system

* Queue timing is now provided by Linux "timer file descriptors"
  instead of Tokio timers.
* Added an atomic bool to track "we're going faster than we should"
  (it's true while executing), skip cycles if we ran out of time and
  issue a warning.
* Queue tracking is no longer async, but is locked to its very own
  thread.
* Queue watcher is now more verbose about any issues it encounters.
* Queue structures with children will now correctly track all the
  children, avoiding the blank queue data issue.
This commit is contained in:
Herbert Wolverson
2023-01-30 16:55:42 +00:00
parent 2486355c1d
commit 6b6bdc1395
7 changed files with 85 additions and 31 deletions

1
src/rust/Cargo.lock generated
View File

@@ -1378,6 +1378,7 @@ dependencies = [
"lqos_config",
"lqos_sys",
"lqos_utils",
"nix",
"notify",
"parking_lot 0.12.1",
"rayon",

View File

@@ -18,6 +18,7 @@ parking_lot = "0"
notify = { version = "5.0.0", default-features = false, feature=["macos_kqueue"] } # Not using crossbeam because of Tokio
tokio = { version = "1.22", features = [ "full", "parking_lot" ] }
rayon = "1"
nix = "0"
[dev-dependencies]
criterion = { version = "0.3", features = [ "html_reports"] }

View File

@@ -13,5 +13,6 @@ fn read_hex_string(s: &str) -> Result<u32> {
pub(crate) fn read_queueing_structure() -> Result<Vec<QueueNode>> {
let network = QueueNetwork::from_json()?;
Ok(network.to_flat())
let flattened = network.to_flat();
Ok(flattened)
}

View File

@@ -27,6 +27,7 @@ pub struct QueueNode {
pub device_id: Option<String>,
pub device_name: Option<String>,
pub mac: Option<String>,
pub children: Vec<QueueNode>,
}
impl QueueNode {
@@ -101,7 +102,16 @@ impl QueueNode {
}
}
}
"children" => {} // Ignore for now
"children" => {
if let Value::Object(map) = value {
for (key, c) in map.iter() {
result.circuits.push(QueueNode::from_json(key, c)?);
}
} else {
log::warn!("Children was not an object");
log::warn!("{:?}", value);
}
}
_ => log::error!("I don't know how to parse key: [{key}]"),
}
}
@@ -125,6 +135,11 @@ impl QueueNode {
let children = c.to_flat();
result.extend_from_slice(&children);
}
for c in self.children.iter() {
result.push(c.clone());
let children = c.to_flat();
result.extend_from_slice(&children);
}
result
}
}

View File

@@ -1,26 +1,37 @@
use std::sync::atomic::AtomicBool;
use crate::{
circuit_to_queue::CIRCUIT_TO_QUEUE, interval::QUEUE_MONITOR_INTERVAL, queue_store::QueueStore,
tracking::reader::read_named_queue_from_interface,
};
use anyhow::Result;
use log::{info, warn, error};
use lqos_config::LibreQoSConfig;
use nix::sys::time::TimeSpec;
use nix::sys::time::TimeValLike;
use nix::sys::timerfd::ClockId;
use nix::sys::timerfd::Expiration;
use nix::sys::timerfd::TimerFd;
use nix::sys::timerfd::TimerFlags;
use nix::sys::timerfd::TimerSetTimeFlags;
use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator};
use std::time::{Duration, Instant};
use tokio::task;
use tokio::time;
mod reader;
mod watched_queues;
use watched_queues::WATCHED_QUEUES;
pub use watched_queues::{add_watched_queue, still_watching};
use self::watched_queues::expire_watched_queues;
async fn track_queues() -> Result<()> {
fn track_queues() {
let mut watching = WATCHED_QUEUES.write();
if watching.is_empty() {
return Ok(()); // There's nothing to do - bail out fast
info!("No queues marked for read.");
return; // There's nothing to do - bail out fast
}
let config = LibreQoSConfig::load()?;
let config = LibreQoSConfig::load();
if config.is_err() {
warn!("Unable to read LibreQoS config. Skipping queue collection cycle.");
return;
}
let config = config.unwrap();
watching.par_iter_mut().for_each(|q| {
let (circuit_id, download_class, upload_class) = q.get();
@@ -54,33 +65,46 @@ async fn track_queues() -> Result<()> {
std::mem::drop(watching); // Release the lock
expire_watched_queues();
Ok(())
}
pub async fn spawn_queue_monitor() {
let _ = task::spawn(async {
pub fn spawn_queue_monitor() {
std::thread::spawn(|| {
// Setup the queue monitor loop
info!("Starting Queue Monitor Thread.");
let interval_ms = if let Ok(config) = lqos_config::EtcLqos::load() {
config.queue_check_period_ms
} else {
1000
};
QUEUE_MONITOR_INTERVAL.store(
lqos_config::EtcLqos::load().unwrap().queue_check_period_ms,
interval_ms,
std::sync::atomic::Ordering::Relaxed,
);
loop {
let queue_check_period_ms =
QUEUE_MONITOR_INTERVAL.load(std::sync::atomic::Ordering::Relaxed);
let mut interval = time::interval(Duration::from_millis(queue_check_period_ms));
info!("Queue check period set to {interval_ms} ms.");
let now = Instant::now();
let _ = track_queues().await;
let elapsed = now.elapsed();
//info!(
// "TC Reader tick with mapping consumed {} ms.",
// elapsed.as_millis()
//);
if elapsed.as_millis() < queue_check_period_ms as u128 {
let duration = Duration::from_millis(queue_check_period_ms) - elapsed;
tokio::time::sleep(duration).await;
// Setup the Linux timer fd system
let monitor_busy = AtomicBool::new(false);
if let Ok(timer) = TimerFd::new(ClockId::CLOCK_MONOTONIC, TimerFlags::empty()) {
if timer.set(Expiration::Interval(TimeSpec::milliseconds(interval_ms as i64)), TimerSetTimeFlags::TFD_TIMER_ABSTIME).is_ok() {
loop {
if timer.wait().is_ok() {
if monitor_busy.load(std::sync::atomic::Ordering::Relaxed) {
warn!("Queue tick fired while another queue read is ongoing. Skipping this cycle.");
} else {
monitor_busy.store(true, std::sync::atomic::Ordering::Relaxed);
//info!("Queue tracking timer fired.");
track_queues();
monitor_busy.store(false, std::sync::atomic::Ordering::Relaxed);
}
} else {
error!("Error in timer wait (Linux fdtimer). This should never happen.");
}
}
} else {
interval.tick().await;
error!("Unable to set the Linux fdtimer timer interval. Queues will not be monitored.");
}
} else {
error!("Unable to acquire Linux fdtimer. Queues will not be monitored.");
}
});
}

View File

@@ -3,6 +3,7 @@ use lazy_static::*;
use lqos_bus::TcHandle;
use parking_lot::RwLock;
use std::time::{SystemTime, UNIX_EPOCH};
use log::{info, warn};
lazy_static! {
pub(crate) static ref WATCHED_QUEUES: RwLock<Vec<WatchedQueue>> = RwLock::new(Vec::new());
@@ -37,6 +38,7 @@ fn unix_now() -> u64 {
}
pub fn add_watched_queue(circuit_id: &str) {
//info!("Watching queue {circuit_id}");
let max = unsafe { lqos_sys::libbpf_num_possible_cpus() } * 2;
{
let read_lock = WATCHED_QUEUES.read();
@@ -45,10 +47,12 @@ pub fn add_watched_queue(circuit_id: &str) {
.find(|q| q.circuit_id == circuit_id)
.is_some()
{
warn!("Queue {circuit_id} is already being watched. Duplicate ignored.");
return; // No duplicates, please
}
if read_lock.len() > max as usize {
warn!("Watching too many queues - didn't add {circuit_id} to watch list.");
return; // Too many watched pots
}
}
@@ -66,9 +70,12 @@ pub fn add_watched_queue(circuit_id: &str) {
};
WATCHED_QUEUES.write().push(new_watch);
//info!("Added {circuit_id} to watched queues. Now watching {} queues.", WATCHED_QUEUES.read().len());
} else {
warn!("No circuit ID of {circuit_id}");
}
} else {
log::warn!("No circuit ID of {circuit_id}");
warn!("Unable to access watched queue list. Try again later.");
}
}
@@ -81,6 +88,11 @@ pub(crate) fn expire_watched_queues() {
pub fn still_watching(circuit_id: &str) {
let mut lock = WATCHED_QUEUES.write();
if let Some(q) = lock.iter_mut().find(|q| q.circuit_id == circuit_id) {
//info!("Still watching circuit: {circuit_id}");
q.refresh_timer();
} else {
info!("Still watching circuit, but it had expired: {circuit_id}");
std::mem::drop(lock);
add_watched_queue(circuit_id);
}
}

View File

@@ -63,9 +63,9 @@ async fn main() -> Result<()> {
// Spawn tracking sub-systems
join!(
throughput_tracker::spawn_throughput_monitor(),
spawn_queue_monitor(),
spawn_queue_structure_monitor(),
);
spawn_queue_monitor();
// Handle signals
let mut signals = Signals::new(&[SIGINT, SIGHUP, SIGTERM])?;