feat: improve config change watcher logic to avoid unnecessary reloads

This commit is contained in:
Ilya Zlobintsev 2024-09-07 21:55:59 +03:00
parent 9dbce2a812
commit 4007e0a360
3 changed files with 64 additions and 30 deletions

View File

@ -13,11 +13,13 @@ use std::{
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time};
use tracing::{debug, error};
const FILE_NAME: &str = "config.yaml";
const DEFAULT_ADMIN_GROUPS: [&str; 2] = ["wheel", "sudo"];
/// Minimum amount of time between separate config reloads
const CONFIG_RELOAD_INTERVAL_MILLIS: u64 = 50;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct Config {
@ -173,13 +175,14 @@ impl Config {
}
}
pub fn start_watcher(config_last_applied: Arc<Mutex<Instant>>) -> mpsc::UnboundedReceiver<Config> {
pub fn start_watcher(config_last_saved: Arc<Mutex<Instant>>) -> mpsc::UnboundedReceiver<Config> {
let (config_tx, config_rx) = mpsc::unbounded_channel();
let (event_tx, event_rx) = std::sync::mpsc::channel();
let (event_tx, mut event_rx) = mpsc::channel(64);
tokio::task::spawn_blocking(move || {
let mut watcher = RecommendedWatcher::new(event_tx, notify::Config::default())
.expect("Could not create config file watcher");
tokio::spawn(async move {
let mut watcher =
RecommendedWatcher::new(SenderEventHandler(event_tx), notify::Config::default())
.expect("Could not create config file watcher");
let config_path = get_path();
let watch_path = config_path
@ -189,33 +192,56 @@ pub fn start_watcher(config_last_applied: Arc<Mutex<Instant>>) -> mpsc::Unbounde
.watch(watch_path, notify::RecursiveMode::Recursive)
.expect("Could not subscribe to config file changes");
for res in event_rx {
while let Some(res) = event_rx.recv().await {
debug!("got config file event {res:?}");
match res {
Ok(event) => {
use notify::EventKind;
let elapsed = config_last_applied.lock().unwrap().elapsed();
if elapsed < Duration::from_millis(50) {
debug!("config was applied very recently, skipping fs event");
continue;
}
if let EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) =
event.kind
{
if config_last_saved.lock().unwrap().elapsed()
< Duration::from_millis(CONFIG_RELOAD_INTERVAL_MILLIS)
{
debug!("ignoring fs event after self-inflicted config change");
continue;
}
if !event.paths.contains(&config_path) {
continue;
}
// Accumulate FS events, reload config only after a period has passed since the last event
debug!(
"waiting for {CONFIG_RELOAD_INTERVAL_MILLIS}ms before reloading config"
);
let timeout =
time::sleep(Duration::from_millis(CONFIG_RELOAD_INTERVAL_MILLIS));
tokio::pin!(timeout);
match event.kind {
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) => {
match Config::load() {
Ok(Some(new_config)) => config_tx.send(new_config).unwrap(),
Ok(None) => error!("config was removed!"),
Err(err) => {
error!("could not read config after it was changed: {err:#}");
}
loop {
tokio::select! {
() = &mut timeout => {
break;
}
Some(res) = event_rx.recv() => {
match res {
Ok(event) => {
if let EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) = event.kind {
debug!("got another fs event, resetting reload timer");
timeout.as_mut().reset(time::Instant::now() + Duration::from_millis(CONFIG_RELOAD_INTERVAL_MILLIS));
}
}
Err(err) => error!("filesystem event error: {err}")
}
}
}
}
match Config::load() {
Ok(Some(new_config)) => config_tx.send(new_config).unwrap(),
Ok(None) => error!("config was removed!"),
Err(err) => {
error!("could not read config after it was changed: {err:#}");
}
}
_ => (),
}
}
Err(err) => error!("filesystem event error: {err}"),
@ -228,6 +254,14 @@ pub fn start_watcher(config_last_applied: Arc<Mutex<Instant>>) -> mpsc::Unbounde
config_rx
}
struct SenderEventHandler(mpsc::Sender<notify::Result<notify::Event>>);
impl notify::EventHandler for SenderEventHandler {
fn handle_event(&mut self, event: notify::Result<notify::Event>) {
let _ = self.0.blocking_send(event);
}
}
fn get_path() -> PathBuf {
let uid = getuid();
if uid.is_root() {

View File

@ -104,7 +104,7 @@ async fn listen_exit_signals(handler: Handler) {
}
async fn listen_config_changes(handler: Handler) {
let mut rx = config::start_watcher(handler.config_last_applied.clone());
let mut rx = config::start_watcher(handler.config_last_saved.clone());
while let Some(new_config) = rx.recv().await {
info!("config file was changed, reloading");
handler.config.replace(new_config);

View File

@ -78,7 +78,7 @@ pub struct Handler {
pub config: Rc<RefCell<Config>>,
pub gpu_controllers: Rc<BTreeMap<String, GpuController>>,
confirm_config_tx: Rc<RefCell<Option<oneshot::Sender<ConfirmCommand>>>>,
pub config_last_applied: Arc<Mutex<Instant>>,
pub config_last_saved: Arc<Mutex<Instant>>,
}
impl<'a> Handler {
@ -133,7 +133,7 @@ impl<'a> Handler {
gpu_controllers: Rc::new(controllers),
config: Rc::new(RefCell::new(config)),
confirm_config_tx: Rc::new(RefCell::new(None)),
config_last_applied: Arc::new(Mutex::new(Instant::now())),
config_last_saved: Arc::new(Mutex::new(Instant::now())),
};
handler.apply_current_config().await;
@ -151,8 +151,6 @@ impl<'a> Handler {
pub async fn apply_current_config(&self) {
let config = self.config.borrow().clone(); // Clone to avoid locking the RwLock on an await point
*self.config_last_applied.lock().unwrap() = Instant::now();
for (id, gpu_config) in &config.gpus {
if let Some(controller) = self.gpu_controllers.get(id) {
if let Err(err) = controller.apply_config(gpu_config).await {
@ -242,7 +240,7 @@ impl<'a> Handler {
match result {
Ok(ConfirmCommand::Confirm) => {
info!("saving updated config");
*handler.config_last_applied.lock().unwrap() = Instant::now();
*handler.config_last_saved.lock().unwrap() = Instant::now();
let mut config_guard = handler.config.borrow_mut();
config_guard.gpus.insert(id, new_config);
@ -250,6 +248,8 @@ impl<'a> Handler {
if let Err(err) = config_guard.save() {
error!("{err}");
}
*handler.config_last_saved.lock().unwrap() = Instant::now();
}
Ok(ConfirmCommand::Revert) | Err(_) => {
if let Err(err) = controller.apply_config(&previous_config).await {