fix: handle profile watcher reloads with gamemode reconnect correctly

This commit is contained in:
Ilya Zlobintsev
2025-01-16 22:34:23 +02:00
parent 5eb782eec4
commit 9ba8dd6588

View File

@@ -9,8 +9,15 @@ use std::{
rc::Rc, rc::Rc,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::{select, sync::mpsc, time::sleep}; use tokio::{
runtime, select,
sync::{mpsc, Mutex, Notify},
time::sleep,
};
use tracing::{debug, error, info, trace}; use tracing::{debug, error, info, trace};
use zbus::AsyncDrop;
static PROFILE_WATCHER_LOCK: Mutex<()> = Mutex::const_new(());
const PROFILE_WATCHER_MIN_DELAY_MS: u64 = 50; const PROFILE_WATCHER_MIN_DELAY_MS: u64 = 50;
const PROFILE_WATCHER_MAX_DELAY_MS: u64 = 500; const PROFILE_WATCHER_MAX_DELAY_MS: u64 = 500;
@@ -28,6 +35,13 @@ pub enum ProfileWatcherCommand {
} }
pub async fn run_watcher(handler: Handler, mut command_rx: mpsc::Receiver<ProfileWatcherCommand>) { pub async fn run_watcher(handler: Handler, mut command_rx: mpsc::Receiver<ProfileWatcherCommand>) {
debug!(
"starting new task watcher (total task count: {})",
runtime::Handle::current().metrics().num_alive_tasks()
);
let _guard = PROFILE_WATCHER_LOCK.lock().await;
let mut state = ProfileWatcherState::default(); let mut state = ProfileWatcherState::default();
process::load_full_process_list(&mut state); process::load_full_process_list(&mut state);
info!("loaded {} processes", state.process_list.len()); info!("loaded {} processes", state.process_list.len());
@@ -36,6 +50,7 @@ pub async fn run_watcher(handler: Handler, mut command_rx: mpsc::Receiver<Profil
process::start_listener(event_tx.clone()); process::start_listener(event_tx.clone());
let gamemode_stop_notify = Rc::new(Notify::new());
let mut gamemode_task = None; let mut gamemode_task = None;
if let Some(gamemode_proxy) = gamemode::connect(&state.process_list).await { if let Some(gamemode_proxy) = gamemode::connect(&state.process_list).await {
match gamemode_proxy.list_games().await { match gamemode_proxy.list_games().await {
@@ -55,6 +70,7 @@ pub async fn run_watcher(handler: Handler, mut command_rx: mpsc::Receiver<Profil
) { ) {
(Ok(mut registered_stream), Ok(mut unregistered_stream)) => { (Ok(mut registered_stream), Ok(mut unregistered_stream)) => {
let event_tx = event_tx.clone(); let event_tx = event_tx.clone();
let stop_notify = gamemode_stop_notify.clone();
let handle = tokio::task::spawn_local(async move { let handle = tokio::task::spawn_local(async move {
loop { loop {
@@ -79,12 +95,18 @@ pub async fn run_watcher(handler: Handler, mut command_rx: mpsc::Receiver<Profil
Err(err) => error!("could not get event args: {err}"), Err(err) => error!("could not get event args: {err}"),
} }
}, },
() = stop_notify.notified() => {
break;
}
}; };
if let Some(event) = event { if let Some(event) = event {
let _ = event_tx.send(ProfileWatcherEvent::Gamemode(event)).await; let _ = event_tx.send(ProfileWatcherEvent::Gamemode(event)).await;
} }
} }
registered_stream.async_drop().await;
unregistered_stream.async_drop().await;
debug!("exited gamemode watcher");
}); });
gamemode_task = Some(handle); gamemode_task = Some(handle);
} }
@@ -151,7 +173,8 @@ pub async fn run_watcher(handler: Handler, mut command_rx: mpsc::Receiver<Profil
handler.profile_watcher_state.borrow_mut().take(); handler.profile_watcher_state.borrow_mut().take();
if let Some(handle) = gamemode_task { if let Some(handle) = gamemode_task {
handle.abort(); gamemode_stop_notify.notify_one();
handle.await.unwrap();
} }
} }