The watched queues structure no longer RwLocks - it's an interior mutable DashMap now.

This commit is contained in:
Herbert Wolverson 2023-03-08 18:17:17 +00:00
parent c5dca04ade
commit bbbe1e5b83
5 changed files with 14 additions and 17 deletions

1
src/rust/Cargo.lock generated
View File

@ -1390,6 +1390,7 @@ name = "lqos_queue_tracker"
version = "0.1.0"
dependencies = [
"criterion",
"dashmap",
"log",
"log-once",
"lqos_bus",

View File

@ -5,7 +5,7 @@ use thiserror::Error;
/// Provides consistent handling of TC handle types.
#[derive(
Copy, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq,
Copy, Clone, Serialize, Deserialize, Debug, Default, PartialEq, Eq, Hash
)]
pub struct TcHandle(u32);

View File

@ -15,6 +15,7 @@ log = "0"
log-once = "0.4.0"
tokio = { version = "1", features = [ "full", "parking_lot" ] }
once_cell = "1"
dashmap = "5"
[dev-dependencies]
criterion = { version = "0", features = [ "html_reports"] }

View File

@ -12,8 +12,7 @@ use watched_queues::WATCHED_QUEUES;
pub use watched_queues::{add_watched_queue, still_watching};
fn track_queues() {
let mut watching = WATCHED_QUEUES.write().unwrap();
if watching.is_empty() {
if WATCHED_QUEUES.is_empty() {
//info!("No queues marked for read.");
return; // There's nothing to do - bail out fast
}
@ -23,7 +22,7 @@ fn track_queues() {
return;
}
let config = config.unwrap();
watching.iter_mut().for_each(|q| {
WATCHED_QUEUES.iter_mut().for_each(|q| {
let (circuit_id, download_class, upload_class) = q.get();
let (download, upload) = if config.on_a_stick_mode {
@ -73,7 +72,6 @@ fn track_queues() {
}
});
std::mem::drop(watching); // Release the lock
expire_watched_queues();
}

View File

@ -1,13 +1,14 @@
use crate::queue_structure::QUEUE_STRUCTURE;
use dashmap::DashMap;
use log::{info, warn};
use lqos_bus::TcHandle;
use lqos_utils::unix_time::unix_now;
use once_cell::sync::Lazy;
use std::sync::RwLock;
pub(crate) static WATCHED_QUEUES: Lazy<RwLock<Vec<WatchedQueue>>> =
Lazy::new(|| RwLock::new(Vec::new()));
pub(crate) static WATCHED_QUEUES: Lazy<DashMap<String, WatchedQueue>> =
Lazy::new(DashMap::new);
#[derive(PartialEq, Eq, Hash)]
pub(crate) struct WatchedQueue {
circuit_id: String,
expires_unix_time: u64,
@ -33,13 +34,12 @@ pub fn add_watched_queue(circuit_id: &str) {
//info!("Watching queue {circuit_id}");
let max = unsafe { lqos_sys::libbpf_num_possible_cpus() } * 2;
{
let read_lock = WATCHED_QUEUES.read().unwrap();
if read_lock.iter().any(|q| q.circuit_id == circuit_id) {
if WATCHED_QUEUES.contains_key(circuit_id) {
warn!("Queue {circuit_id} is already being watched. Duplicate ignored.");
return; // No duplicates, please
}
if read_lock.len() > max as usize {
if WATCHED_QUEUES.len() > max as usize {
warn!(
"Watching too many queues - didn't add {circuit_id} to watch list."
);
@ -58,7 +58,7 @@ pub fn add_watched_queue(circuit_id: &str) {
upload_class: circuit.up_class_id,
};
WATCHED_QUEUES.write().unwrap().push(new_watch);
WATCHED_QUEUES.insert(circuit.circuit_id.as_ref().unwrap().clone(), new_watch);
//info!("Added {circuit_id} to watched queues. Now watching {} queues.", WATCHED_QUEUES.read().len());
} else {
warn!("No circuit ID of {circuit_id}");
@ -69,19 +69,16 @@ pub fn add_watched_queue(circuit_id: &str) {
}
pub(crate) fn expire_watched_queues() {
let mut lock = WATCHED_QUEUES.write().unwrap();
let now = unix_now().unwrap_or(0);
lock.retain(|w| w.expires_unix_time > now);
WATCHED_QUEUES.retain(|_,w| w.expires_unix_time > now);
}
pub fn still_watching(circuit_id: &str) {
let mut lock = WATCHED_QUEUES.write().unwrap();
if let Some(q) = lock.iter_mut().find(|q| q.circuit_id == circuit_id) {
if let Some(mut q) = WATCHED_QUEUES.get_mut(circuit_id) {
//info!("Still watching circuit: {circuit_id}");
q.refresh_timer();
} else {
info!("Still watching circuit, but it had expired: {circuit_id}");
std::mem::drop(lock);
add_watched_queue(circuit_id);
}
}