diff --git a/lact-daemon/src/config.rs b/lact-daemon/src/config.rs index 037bb93..9671d77 100644 --- a/lact-daemon/src/config.rs +++ b/lact-daemon/src/config.rs @@ -13,11 +13,13 @@ use std::{ sync::{Arc, Mutex}, time::{Duration, Instant}, }; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time}; use tracing::{debug, error}; const FILE_NAME: &str = "config.yaml"; const DEFAULT_ADMIN_GROUPS: [&str; 2] = ["wheel", "sudo"]; +/// Minimum amount of time between separate config reloads +const CONFIG_RELOAD_INTERVAL_MILLIS: u64 = 50; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct Config { @@ -173,13 +175,14 @@ impl Config { } } -pub fn start_watcher(config_last_applied: Arc>) -> mpsc::UnboundedReceiver { +pub fn start_watcher(config_last_saved: Arc>) -> mpsc::UnboundedReceiver { let (config_tx, config_rx) = mpsc::unbounded_channel(); - let (event_tx, event_rx) = std::sync::mpsc::channel(); + let (event_tx, mut event_rx) = mpsc::channel(64); - tokio::task::spawn_blocking(move || { - let mut watcher = RecommendedWatcher::new(event_tx, notify::Config::default()) - .expect("Could not create config file watcher"); + tokio::spawn(async move { + let mut watcher = + RecommendedWatcher::new(SenderEventHandler(event_tx), notify::Config::default()) + .expect("Could not create config file watcher"); let config_path = get_path(); let watch_path = config_path @@ -189,33 +192,56 @@ pub fn start_watcher(config_last_applied: Arc>) -> mpsc::Unbounde .watch(watch_path, notify::RecursiveMode::Recursive) .expect("Could not subscribe to config file changes"); - for res in event_rx { + while let Some(res) = event_rx.recv().await { debug!("got config file event {res:?}"); match res { Ok(event) => { use notify::EventKind; - let elapsed = config_last_applied.lock().unwrap().elapsed(); - if elapsed < Duration::from_millis(50) { - debug!("config was applied very recently, skipping fs event"); - continue; - } + if let EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) = + event.kind + { + if config_last_saved.lock().unwrap().elapsed() + < Duration::from_millis(CONFIG_RELOAD_INTERVAL_MILLIS) + { + debug!("ignoring fs event after self-inflicted config change"); + continue; + } - if !event.paths.contains(&config_path) { - continue; - } + // Accumulate FS events, reload config only after a period has passed since the last event + debug!( + "waiting for {CONFIG_RELOAD_INTERVAL_MILLIS}ms before reloading config" + ); + let timeout = + time::sleep(Duration::from_millis(CONFIG_RELOAD_INTERVAL_MILLIS)); + tokio::pin!(timeout); - match event.kind { - EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => { - match Config::load() { - Ok(Some(new_config)) => config_tx.send(new_config).unwrap(), - Ok(None) => error!("config was removed!"), - Err(err) => { - error!("could not read config after it was changed: {err:#}"); - } + loop { + tokio::select! { + () = &mut timeout => { + break; + } + Some(res) = event_rx.recv() => { + match res { + Ok(event) => { + if let EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) = event.kind { + debug!("got another fs event, resetting reload timer"); + timeout.as_mut().reset(time::Instant::now() + Duration::from_millis(CONFIG_RELOAD_INTERVAL_MILLIS)); + } + } + Err(err) => error!("filesystem event error: {err}") + } + } + } + } + + match Config::load() { + Ok(Some(new_config)) => config_tx.send(new_config).unwrap(), + Ok(None) => error!("config was removed!"), + Err(err) => { + error!("could not read config after it was changed: {err:#}"); } } - _ => (), } } Err(err) => error!("filesystem event error: {err}"), @@ -228,6 +254,14 @@ pub fn start_watcher(config_last_applied: Arc>) -> mpsc::Unbounde config_rx } +struct SenderEventHandler(mpsc::Sender>); + +impl notify::EventHandler for SenderEventHandler { + fn handle_event(&mut self, event: notify::Result) { + let _ = self.0.blocking_send(event); + } +} + fn get_path() -> PathBuf { let uid = getuid(); if uid.is_root() { diff --git a/lact-daemon/src/lib.rs b/lact-daemon/src/lib.rs index fe02d75..f74b696 100644 --- a/lact-daemon/src/lib.rs +++ b/lact-daemon/src/lib.rs @@ -104,7 +104,7 @@ async fn listen_exit_signals(handler: Handler) { } async fn listen_config_changes(handler: Handler) { - let mut rx = config::start_watcher(handler.config_last_applied.clone()); + let mut rx = config::start_watcher(handler.config_last_saved.clone()); while let Some(new_config) = rx.recv().await { info!("config file was changed, reloading"); handler.config.replace(new_config); diff --git a/lact-daemon/src/server/handler.rs b/lact-daemon/src/server/handler.rs index 139aba5..ec79a34 100644 --- a/lact-daemon/src/server/handler.rs +++ b/lact-daemon/src/server/handler.rs @@ -78,7 +78,7 @@ pub struct Handler { pub config: Rc>, pub gpu_controllers: Rc>, confirm_config_tx: Rc>>>, - pub config_last_applied: Arc>, + pub config_last_saved: Arc>, } impl<'a> Handler { @@ -133,7 +133,7 @@ impl<'a> Handler { gpu_controllers: Rc::new(controllers), config: Rc::new(RefCell::new(config)), confirm_config_tx: Rc::new(RefCell::new(None)), - config_last_applied: Arc::new(Mutex::new(Instant::now())), + config_last_saved: Arc::new(Mutex::new(Instant::now())), }; handler.apply_current_config().await; @@ -151,8 +151,6 @@ impl<'a> Handler { pub async fn apply_current_config(&self) { let config = self.config.borrow().clone(); // Clone to avoid locking the RwLock on an await point - *self.config_last_applied.lock().unwrap() = Instant::now(); - for (id, gpu_config) in &config.gpus { if let Some(controller) = self.gpu_controllers.get(id) { if let Err(err) = controller.apply_config(gpu_config).await { @@ -242,7 +240,7 @@ impl<'a> Handler { match result { Ok(ConfirmCommand::Confirm) => { info!("saving updated config"); - *handler.config_last_applied.lock().unwrap() = Instant::now(); + *handler.config_last_saved.lock().unwrap() = Instant::now(); let mut config_guard = handler.config.borrow_mut(); config_guard.gpus.insert(id, new_config); @@ -250,6 +248,8 @@ impl<'a> Handler { if let Err(err) = config_guard.save() { error!("{err}"); } + + *handler.config_last_saved.lock().unwrap() = Instant::now(); } Ok(ConfirmCommand::Revert) | Err(_) => { if let Err(err) = controller.apply_config(&previous_config).await {