Refactor the multiple "notify" systems into a single helper

structure.

* Creates FileWatcher, in lqos_utils.
* Removes "notify" dependency from other crates.

FileWatcher is designed to watch a file. If the file doesn't exist,
then an optional callback is called - and the watcher waits,
periodically checking to see if the file has appeared yet. When the
file appears, another optional callback is executed.

Once the file exists, a `notify` system is started (it uses Linux's
`inotify` system internally) for that file. When the file changes,
the process sleeps briefly and then executes an `on_change` callback.
Further messages are then suppressed for a short period to avoid
duplicates.

All uses of notify have been updated to use this system. Errors are
handled cleanly, per ISSUE #209.
This commit is contained in:
Herbert Wolverson 2023-01-31 17:52:35 +00:00
parent 3e4e7ebe64
commit 816ca7e651
9 changed files with 192 additions and 106 deletions

6
src/rust/Cargo.lock generated
View File

@ -1348,8 +1348,8 @@ dependencies = [
"lazy_static",
"lqos_bus",
"lqos_config",
"lqos_utils",
"nix",
"notify",
"parking_lot 0.12.1",
"rocket",
"rocket_async_compression",
@ -1379,7 +1379,6 @@ dependencies = [
"lqos_config",
"lqos_sys",
"lqos_utils",
"notify",
"parking_lot 0.12.1",
"rayon",
"serde",
@ -1412,7 +1411,9 @@ version = "0.1.0"
dependencies = [
"log",
"nix",
"notify",
"serde",
"thiserror",
]
[[package]]
@ -1430,7 +1431,6 @@ dependencies = [
"lqos_sys",
"lqos_utils",
"nix",
"notify",
"parking_lot 0.12.1",
"rayon",
"serde",

View File

@ -14,9 +14,9 @@ lazy_static = "1.4"
parking_lot = "0.12"
lqos_bus = { path = "../lqos_bus" }
lqos_config = { path = "../lqos_config" }
lqos_utils = { path = "../lqos_utils" }
anyhow = "1"
sysinfo = "0"
notify = { version = "5.0.0", default-features = false, feature=["macos_kqueue"] } # Not using crossbeam because of Tokio
default-net = "0"
nix = "0"

View File

@ -7,11 +7,12 @@ use lqos_bus::{
BusRequest, BusResponse, IpStats, bus_request,
};
use lqos_config::ConfigShapedDevices;
use lqos_utils::file_watcher::FileWatcher;
use nix::sys::{timerfd::{TimerFd, ClockId, TimerFlags, Expiration, TimerSetTimeFlags}, time::{TimeSpec, TimeValLike}};
use rocket::tokio::{
task::spawn_blocking,
};
use std::{net::IpAddr, sync::atomic::AtomicBool, time::Duration};
use std::{net::IpAddr, sync::atomic::AtomicBool};
/// Once per second, update CPU and RAM usage and ask
/// `lqosd` for updated system statistics.
@ -74,12 +75,17 @@ pub async fn update_tracking() {
}
}
fn load_shaped_devices() {
let shaped_devices = ConfigShapedDevices::load();
if let Ok(new_file) = shaped_devices {
info!("ShapedDevices.csv loaded");
*SHAPED_DEVICES.write() = new_file;
}
}
/// Fires up a Linux file system watcher than notifies
/// when `ShapedDevices.csv` changes, and triggers a reload.
fn watch_for_shaped_devices_changing() -> Result<()> {
info!("Starting to watch ShapedDevices.csv");
use notify::{Config, RecursiveMode, Watcher};
let watch_path = ConfigShapedDevices::path();
if watch_path.is_err() {
error!("Unable to generate path for ShapedDevices.csv");
@ -87,52 +93,12 @@ fn watch_for_shaped_devices_changing() -> Result<()> {
}
let watch_path = watch_path.unwrap();
// Chicken and egg. You can't watch a file that doesn't exist yet.
if !watch_path.exists() {
info!("ShapedDevices.csv does not exist yet. Waiting for it.");
loop {
std::thread::sleep(Duration::from_secs(30));
if watch_path.exists() {
info!("ShapedDevcices.csv was just created. Waiting a second and loading it.");
std::thread::sleep(Duration::from_secs(1));
if let Ok(new_file) = ConfigShapedDevices::load() {
info!("ShapedDevices.csv loaded");
*SHAPED_DEVICES.write() = new_file;
}
break;
}
}
} else {
// Since the file exists, load it.
if let Ok(new_file) = ConfigShapedDevices::load() {
info!("ShapedDevices.csv loaded");
*SHAPED_DEVICES.write() = new_file;
}
}
let (tx, rx) = std::sync::mpsc::channel();
let watcher = notify::RecommendedWatcher::new(tx, Config::default());
if watcher.is_err() {
error!("Unable to create watcher for ShapedDevices.csv");
error!("{:?}", watcher);
return Err(anyhow::Error::msg("Unable to create watcher for ShapedDevices.csv"));
}
let mut watcher = watcher.unwrap();
let retval = watcher.watch(&watch_path, RecursiveMode::NonRecursive);
if retval.is_err() {
error!("Unable to start watcher for ShapedDevices.csv");
error!("{:?}", retval);
return Err(anyhow::Error::msg("Unable to start watcher for ShapedDevices.csv"));
}
loop {
let _ = rx.recv();
info!("ShapedDevices.csv changed");
if let Ok(new_file) = ConfigShapedDevices::load() {
*SHAPED_DEVICES.write() = new_file;
info!("ShapedDevices.csv loaded correctly.");
}
}
let mut watcher = FileWatcher::new("ShapedDevices.csv", watch_path);
watcher.set_file_exists_callback(load_shaped_devices);
watcher.set_file_created_callback(load_shaped_devices);
watcher.set_file_changed_callback(load_shaped_devices);
let _ = watcher.watch();
Ok(())
}
/// Requests data from `lqosd` and stores it in local

View File

@ -16,7 +16,6 @@ log = "0"
log-once = "0.4.0"
lazy_static = "1.4"
parking_lot = "0"
notify = { version = "5.0.0", default-features = false, feature=["macos_kqueue"] } # Not using crossbeam because of Tokio
tokio = { version = "1.22", features = [ "full", "parking_lot" ] }
rayon = "1"

View File

@ -1,13 +1,12 @@
use std::time::Duration;
use crate::queue_structure::{
queue_network::QueueNetwork, queue_node::QueueNode, read_queueing_structure,
};
use lazy_static::*;
use lqos_utils::file_watcher::FileWatcher;
use parking_lot::RwLock;
use thiserror::Error;
use tokio::task::spawn_blocking;
use log::{info, error};
use log::{error, info};
lazy_static! {
/// Global storage of the shaped devices csv data.
@ -49,12 +48,14 @@ pub async fn spawn_queue_structure_monitor() {
});
}
fn update_queue_structure() {
info!("queueingStructure.csv reloaded");
QUEUE_STRUCTURE.write().update();
}
/// Fires up a Linux file system watcher than notifies
/// when `ShapedDevices.csv` changes, and triggers a reload.
fn watch_for_queueing_structure_changing() -> Result<(), QueueWatcherError> {
info!("Starting the queue structure monitor.");
use notify::{Config, RecursiveMode, Watcher};
// Obtain the path to watch
let watch_path = QueueNetwork::path();
if watch_path.is_err() {
@ -63,54 +64,19 @@ fn watch_for_queueing_structure_changing() -> Result<(), QueueWatcherError> {
}
let watch_path = watch_path.unwrap();
// File notify doesn't work for files that don't exist
// It's quite possible that a user is just starting, and will
// not have a queueingStructure.json yet - so we need to keep
// trying to obtain one.
if !watch_path.exists() {
info!("queueingStructure.json does not exist yet.");
loop {
std::thread::sleep(Duration::from_secs(30));
if watch_path.exists() {
info!("queueingStructure.json was just created. Sleeping 1 second and watching it.");
std::thread::sleep(Duration::from_secs(1));
QUEUE_STRUCTURE.write().update();
break;
}
}
}
// Build the monitor
let (tx, rx) = std::sync::mpsc::channel();
let watcher = notify::RecommendedWatcher::new(tx, Config::default());
if watcher.is_err() {
error!("Could not create file watcher for queueingStructure.json");
error!("{:?}", watcher);
return Err(QueueWatcherError::WatcherFail);
}
let mut watcher = watcher.unwrap();
// Start monitoring
let result = watcher.watch(&watch_path, RecursiveMode::NonRecursive);
if result.is_ok() {
info!("Watching queueingStructure.csv for changes.");
loop {
let _ = rx.recv();
log::info!("queuingStructure.csv changed");
QUEUE_STRUCTURE.write().update();
}
} else {
error!("Unable to start queueingStructure watcher.");
error!("{:?}", watcher);
Err(QueueWatcherError::WatcherFail)
// Do the watching
let mut watcher = FileWatcher::new("queueingStructure.json", watch_path);
watcher.set_file_created_callback(update_queue_structure);
watcher.set_file_changed_callback(update_queue_structure);
let retval = watcher.watch();
if retval.is_err() {
error!("Unable to create queueingStructure.json watcher");
}
Ok(())
}
#[derive(Error, Debug)]
pub enum QueueWatcherError {
#[error("Could not create the path buffer to find queuingStructure.json")]
CannotCreatePath,
#[error("Cannot watch queueingStructure.json")]
WatcherFail,
}

View File

@ -6,4 +6,6 @@ edition = "2021"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
nix = "0"
log = "0"
log = "0"
notify = { version = "5.0.0", default-features = false, feature=["macos_kqueue"] } # Not using crossbeam because of Tokio
thiserror = "1"

View File

@ -0,0 +1,153 @@
use log::{error, info};
use notify::{Config, RecursiveMode, Watcher};
use std::{
path::PathBuf,
time::{Duration, Instant},
};
use thiserror::Error;
const SLEEP_UNTIL_EXISTS_SECONDS: u64 = 10;
const SLEEP_AFTER_CREATION_SECONDS: u64 = 3;
const SLEEP_AFTER_CHANGE_SECONDS: u64 = 3;
const SLEEP_DEBOUNCE_DURATION: u64 = 10;
/// Provides a convenient mechanism for watching a file for changes.
/// On Linux, it uses `inotify` - this varies for other operating systems.
///
/// Do not create the structure directly: use new(), followed by
/// setting the appropriate callbacks.
///
/// ## Example
///
/// ```rust
/// use lqos_utils::file_watcher::FileWatcher;
/// use std::path::Path;
///
/// let path = Path::new("/opt/libreqos/src").join("ShapedDevices.csv");
/// let mut watcher = FileWatcher::new("ShapedDevices.csv", path);
/// watcher.set_file_changed_callback(|| println!("ShapedDevices.csv has changed"));
/// //let _ = watcher.watch(); // Commented out because the test will hang
/// ```
pub struct FileWatcher {
nice_name: String,
path: PathBuf,
file_created_callback: Option<fn()>,
file_exists_callback: Option<fn()>,
file_changed_callback: Option<fn()>,
}
impl FileWatcher {
/// Creates a new `FileWatcher`.
///
/// ## Arguments
///
/// * `nice_name` - the print-friendly (short) name of the file to watch.
/// * `path` - a generated `PathBuf` pointing to the file to watch.
pub fn new<S: ToString>(nice_name: S, path: PathBuf) -> Self {
Self {
nice_name: nice_name.to_string(),
path,
file_created_callback: None,
file_exists_callback: None,
file_changed_callback: None,
}
}
/// Set a callback function to run if the file did not exist
/// initially, and has been created since execution started.
pub fn set_file_created_callback(&mut self, callback: fn()) {
self.file_created_callback = Some(callback);
}
/// Set a callback function to run if the file exists when
/// the watching process being.
pub fn set_file_exists_callback(&mut self, callback: fn()) {
self.file_exists_callback = Some(callback);
}
/// Set a callback function to run whenever the file changes.
pub fn set_file_changed_callback(&mut self, callback: fn()) {
self.file_changed_callback = Some(callback);
}
/// Start watching the file. NOTE: this function will only
/// return if something bad happens. It is designed to be
/// executed in a thread, and take over the executing thread.
pub fn watch(&mut self) -> Result<(), WatchedFileError> {
// Handle the case in which the file does not yet exist
if !self.path.exists() {
info!(
"{} does not exist yet. Waiting for it to appear.",
self.nice_name
);
loop {
std::thread::sleep(Duration::from_secs(SLEEP_UNTIL_EXISTS_SECONDS));
if self.path.exists() {
info!("{} has been created. Waiting a second.", self.nice_name);
std::thread::sleep(Duration::from_secs(SLEEP_AFTER_CREATION_SECONDS));
if let Some(callback) = &mut self.file_created_callback {
callback();
}
break;
}
}
} else {
if let Some(callback) = &mut self.file_exists_callback {
callback();
}
}
// Build the watcher
let (tx, rx) = std::sync::mpsc::channel();
let watcher = notify::RecommendedWatcher::new(tx, Config::default());
if watcher.is_err() {
error!("Unable to create watcher for ShapedDevices.csv");
error!("{:?}", watcher);
return Err(WatchedFileError::CreateWatcherError);
}
let mut watcher = watcher.unwrap();
// Try to start watching for changes
let retval = watcher.watch(&self.path, RecursiveMode::NonRecursive);
if retval.is_err() {
error!("Unable to start watcher for ShapedDevices.csv");
error!("{:?}", retval);
return Err(WatchedFileError::StartWatcherError);
}
let mut last_event: Option<Instant> = None;
loop {
let ret = rx.recv();
if ret.is_err() {
error!("Error from monitor thread, watching {}", self.nice_name);
error!("{:?}", ret);
}
// A change event has arrived
// Are we taking a short break to avoid duplicates?
let mut process = true;
if let Some(last_event) = last_event {
if last_event.elapsed().as_secs() < SLEEP_DEBOUNCE_DURATION {
process = false;
//info!("Ignoring duplicate event");
}
}
if process {
std::thread::sleep(Duration::from_secs(SLEEP_AFTER_CHANGE_SECONDS));
last_event = Some(Instant::now());
info!("{} changed", self.nice_name);
if let Some(callback) = &mut self.file_changed_callback {
callback();
}
}
}
}
}
#[derive(Error, Debug)]
pub enum WatchedFileError {
#[error("Unable to create watcher")]
CreateWatcherError,
#[error("Unable to start watcher")]
StartWatcherError,
}

View File

@ -2,3 +2,4 @@ mod commands;
pub mod packet_scale;
mod string_table_enum;
pub mod fdtimer;
pub mod file_watcher;

View File

@ -20,7 +20,6 @@ lqos_bus = { path = "../lqos_bus" }
signal-hook = "0.3"
serde_json = "1"
serde = { version = "1.0", features = ["derive"] }
notify = { version = "5.0.0", default-features = false, feature=["macos_kqueue"] } # Not using crossbeam because of Tokio
env_logger = "0"
log = "0"
nix = "0"