mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
Refactor fd timer wait into a "periodic" function in lqos_utils,
only available for non-async use at this time. Adjust the two non-async usages of timer-fd based timers to use the more canonical setup.
This commit is contained in:
4
src/rust/Cargo.lock
generated
4
src/rust/Cargo.lock
generated
@@ -1379,7 +1379,6 @@ dependencies = [
|
|||||||
"lqos_config",
|
"lqos_config",
|
||||||
"lqos_sys",
|
"lqos_sys",
|
||||||
"lqos_utils",
|
"lqos_utils",
|
||||||
"nix",
|
|
||||||
"notify",
|
"notify",
|
||||||
"parking_lot 0.12.1",
|
"parking_lot 0.12.1",
|
||||||
"rayon",
|
"rayon",
|
||||||
@@ -1410,6 +1409,8 @@ dependencies = [
|
|||||||
name = "lqos_utils"
|
name = "lqos_utils"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"log",
|
||||||
|
"nix",
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
@@ -1426,6 +1427,7 @@ dependencies = [
|
|||||||
"lqos_config",
|
"lqos_config",
|
||||||
"lqos_queue_tracker",
|
"lqos_queue_tracker",
|
||||||
"lqos_sys",
|
"lqos_sys",
|
||||||
|
"lqos_utils",
|
||||||
"nix",
|
"nix",
|
||||||
"notify",
|
"notify",
|
||||||
"parking_lot 0.12.1",
|
"parking_lot 0.12.1",
|
||||||
|
|||||||
@@ -18,7 +18,6 @@ parking_lot = "0"
|
|||||||
notify = { version = "5.0.0", default-features = false, feature=["macos_kqueue"] } # Not using crossbeam because of Tokio
|
notify = { version = "5.0.0", default-features = false, feature=["macos_kqueue"] } # Not using crossbeam because of Tokio
|
||||||
tokio = { version = "1.22", features = [ "full", "parking_lot" ] }
|
tokio = { version = "1.22", features = [ "full", "parking_lot" ] }
|
||||||
rayon = "1"
|
rayon = "1"
|
||||||
nix = "0"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
criterion = { version = "0.3", features = [ "html_reports"] }
|
criterion = { version = "0.3", features = [ "html_reports"] }
|
||||||
|
|||||||
@@ -1,18 +1,10 @@
|
|||||||
use std::sync::atomic::AtomicBool;
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
circuit_to_queue::CIRCUIT_TO_QUEUE, interval::QUEUE_MONITOR_INTERVAL, queue_store::QueueStore,
|
circuit_to_queue::CIRCUIT_TO_QUEUE, interval::QUEUE_MONITOR_INTERVAL, queue_store::QueueStore,
|
||||||
tracking::reader::read_named_queue_from_interface,
|
tracking::reader::read_named_queue_from_interface,
|
||||||
};
|
};
|
||||||
use log::{info, warn, error};
|
use log::{info, warn};
|
||||||
use lqos_config::LibreQoSConfig;
|
use lqos_config::LibreQoSConfig;
|
||||||
use nix::sys::time::TimeSpec;
|
use lqos_utils::fdtimer::periodic;
|
||||||
use nix::sys::time::TimeValLike;
|
|
||||||
use nix::sys::timerfd::ClockId;
|
|
||||||
use nix::sys::timerfd::Expiration;
|
|
||||||
use nix::sys::timerfd::TimerFd;
|
|
||||||
use nix::sys::timerfd::TimerFlags;
|
|
||||||
use nix::sys::timerfd::TimerSetTimeFlags;
|
|
||||||
use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator};
|
use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator};
|
||||||
mod reader;
|
mod reader;
|
||||||
mod watched_queues;
|
mod watched_queues;
|
||||||
@@ -83,28 +75,8 @@ pub fn spawn_queue_monitor() {
|
|||||||
info!("Queue check period set to {interval_ms} ms.");
|
info!("Queue check period set to {interval_ms} ms.");
|
||||||
|
|
||||||
// Setup the Linux timer fd system
|
// Setup the Linux timer fd system
|
||||||
let monitor_busy = AtomicBool::new(false);
|
periodic(interval_ms, "Queue Reader", &mut || {
|
||||||
if let Ok(timer) = TimerFd::new(ClockId::CLOCK_MONOTONIC, TimerFlags::empty()) {
|
|
||||||
if timer.set(Expiration::Interval(TimeSpec::milliseconds(interval_ms as i64)), TimerSetTimeFlags::TFD_TIMER_ABSTIME).is_ok() {
|
|
||||||
loop {
|
|
||||||
if timer.wait().is_ok() {
|
|
||||||
if monitor_busy.load(std::sync::atomic::Ordering::Relaxed) {
|
|
||||||
warn!("Queue tick fired while another queue read is ongoing. Skipping this cycle.");
|
|
||||||
} else {
|
|
||||||
monitor_busy.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
//info!("Queue tracking timer fired.");
|
|
||||||
track_queues();
|
track_queues();
|
||||||
monitor_busy.store(false, std::sync::atomic::Ordering::Relaxed);
|
});
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("Error in timer wait (Linux fdtimer). This should never happen.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("Unable to set the Linux fdtimer timer interval. Queues will not be monitored.");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("Unable to acquire Linux fdtimer. Queues will not be monitored.");
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,3 +5,5 @@ edition = "2021"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
nix = "0"
|
||||||
|
log = "0"
|
||||||
29
src/rust/lqos_utils/src/fdtimer.rs
Normal file
29
src/rust/lqos_utils/src/fdtimer.rs
Normal file
@@ -0,0 +1,29 @@
|
|||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use nix::sys::{timerfd::{TimerFd, ClockId, TimerFlags, Expiration, TimerSetTimeFlags}, time::{TimeSpec, TimeValLike}};
|
||||||
|
use log::{warn, error};
|
||||||
|
|
||||||
|
pub fn periodic(interval_ms: u64, task_name: &str, tick_function: &mut dyn FnMut()) {
|
||||||
|
let monitor_busy = AtomicBool::new(false);
|
||||||
|
if let Ok(timer) = TimerFd::new(ClockId::CLOCK_MONOTONIC, TimerFlags::empty()) {
|
||||||
|
if timer.set(Expiration::Interval(TimeSpec::milliseconds(interval_ms as i64)), TimerSetTimeFlags::TFD_TIMER_ABSTIME).is_ok() {
|
||||||
|
loop {
|
||||||
|
if timer.wait().is_ok() {
|
||||||
|
if monitor_busy.load(std::sync::atomic::Ordering::Relaxed) {
|
||||||
|
warn!("{task_name} tick fired while another queue read is ongoing. Skipping this cycle.");
|
||||||
|
} else {
|
||||||
|
monitor_busy.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
//info!("Queue tracking timer fired.");
|
||||||
|
tick_function();
|
||||||
|
monitor_busy.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Error in timer wait (Linux fdtimer). This should never happen.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Unable to set the Linux fdtimer timer interval. Queues will not be monitored.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Unable to acquire Linux fdtimer. Queues will not be monitored.");
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
mod commands;
|
mod commands;
|
||||||
pub mod packet_scale;
|
pub mod packet_scale;
|
||||||
mod string_table_enum;
|
mod string_table_enum;
|
||||||
|
pub mod fdtimer;
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ anyhow = "1"
|
|||||||
lqos_config = { path = "../lqos_config" }
|
lqos_config = { path = "../lqos_config" }
|
||||||
lqos_sys = { path = "../lqos_sys" }
|
lqos_sys = { path = "../lqos_sys" }
|
||||||
lqos_queue_tracker = { path = "../lqos_queue_tracker" }
|
lqos_queue_tracker = { path = "../lqos_queue_tracker" }
|
||||||
|
lqos_utils = { path = "../lqos_utils" }
|
||||||
tokio = { version = "1.22", features = [ "full", "parking_lot" ] }
|
tokio = { version = "1.22", features = [ "full", "parking_lot" ] }
|
||||||
lazy_static = "1.4"
|
lazy_static = "1.4"
|
||||||
parking_lot = "0.12"
|
parking_lot = "0.12"
|
||||||
|
|||||||
@@ -4,10 +4,10 @@ use crate::throughput_tracker::tracking_data::ThroughputTracker;
|
|||||||
use lazy_static::*;
|
use lazy_static::*;
|
||||||
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
|
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
|
||||||
use lqos_sys::XdpIpAddress;
|
use lqos_sys::XdpIpAddress;
|
||||||
use nix::sys::{timerfd::{TimerFd, ClockId, TimerFlags, Expiration, TimerSetTimeFlags}, time::{TimeSpec, TimeValLike}};
|
use lqos_utils::fdtimer::periodic;
|
||||||
use parking_lot::RwLock;
|
use parking_lot::RwLock;
|
||||||
use std::{time::Duration, sync::atomic::AtomicBool};
|
use std::{time::Duration};
|
||||||
use log::{info, warn, error};
|
use log::info;
|
||||||
|
|
||||||
const RETIRE_AFTER_SECONDS: u64 = 30;
|
const RETIRE_AFTER_SECONDS: u64 = 30;
|
||||||
|
|
||||||
@@ -22,34 +22,14 @@ pub fn spawn_throughput_monitor() {
|
|||||||
info!("Bandwidth check period set to {interval_ms} ms.");
|
info!("Bandwidth check period set to {interval_ms} ms.");
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
let monitor_busy = AtomicBool::new(false);
|
periodic(interval_ms, "Throughput Monitor", &mut || {
|
||||||
if let Ok(timer) = TimerFd::new(ClockId::CLOCK_MONOTONIC, TimerFlags::empty()) {
|
|
||||||
if timer.set(Expiration::Interval(TimeSpec::milliseconds(interval_ms as i64)), TimerSetTimeFlags::TFD_TIMER_ABSTIME).is_ok() {
|
|
||||||
loop {
|
|
||||||
if timer.wait().is_ok() {
|
|
||||||
if monitor_busy.load(std::sync::atomic::Ordering::Relaxed) {
|
|
||||||
warn!("Queue tick fired while another queue read is ongoing. Skipping this cycle.");
|
|
||||||
} else {
|
|
||||||
monitor_busy.store(true, std::sync::atomic::Ordering::Relaxed);
|
|
||||||
//info!("Bandwidth tracking timer fired.");
|
|
||||||
let mut throughput = THROUGHPUT_TRACKER.write();
|
let mut throughput = THROUGHPUT_TRACKER.write();
|
||||||
throughput.copy_previous_and_reset_rtt();
|
throughput.copy_previous_and_reset_rtt();
|
||||||
throughput.apply_new_throughput_counters();
|
throughput.apply_new_throughput_counters();
|
||||||
throughput.apply_rtt_data();
|
throughput.apply_rtt_data();
|
||||||
throughput.update_totals();
|
throughput.update_totals();
|
||||||
throughput.next_cycle();
|
throughput.next_cycle();
|
||||||
monitor_busy.store(false, std::sync::atomic::Ordering::Relaxed);
|
});
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("Error in timer wait (Linux fdtimer). This should never happen.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("Unable to set the Linux fdtimer timer interval. Bandwidth will not be monitored.");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
error!("Unable to acquire Linux fdtimer. Bandwidth will not be monitored.");
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user