chore: use LocalSet for daemon tasks, replace mutex/rwlock with refcell

This commit is contained in:
Ilya Zlobintsev 2023-09-23 09:23:13 +03:00
parent a395d1abe2
commit 2fc395d284
4 changed files with 51 additions and 44 deletions

View File

@ -15,6 +15,7 @@ use std::str::FromStr;
use tokio::{ use tokio::{
runtime, runtime,
signal::unix::{signal, SignalKind}, signal::unix::{signal, SignalKind},
task::LocalSet,
}; };
use tracing::{debug_span, info, Instrument, Level}; use tracing::{debug_span, info, Instrument, Level};
@ -42,13 +43,17 @@ pub fn run() -> anyhow::Result<()> {
let max_level = Level::from_str(&config.daemon.log_level).context("Invalid log level")?; let max_level = Level::from_str(&config.daemon.log_level).context("Invalid log level")?;
tracing_subscriber::fmt().with_max_level(max_level).init(); tracing_subscriber::fmt().with_max_level(max_level).init();
let server = Server::new(config).await?; LocalSet::new()
let handler = server.handler.clone(); .run_until(async move {
let server = Server::new(config).await?;
let handler = server.handler.clone();
tokio::spawn(listen_exit_signals(handler.clone())); tokio::task::spawn_local(listen_exit_signals(handler.clone()));
tokio::spawn(suspend::listen_events(handler)); tokio::task::spawn_local(suspend::listen_events(handler));
server.run().await; server.run().await;
Ok(()) Ok(())
})
.await
}) })
} }
@ -63,11 +68,15 @@ pub fn run_embedded(stream: StdUnixStream) -> anyhow::Result<()> {
.build() .build()
.expect("Could not initialize tokio runtime"); .expect("Could not initialize tokio runtime");
rt.block_on(async { rt.block_on(async {
let config = Config::default(); LocalSet::new()
let handler = Handler::new(config).await?; .run_until(async move {
let stream = stream.try_into()?; let config = Config::default();
let handler = Handler::new(config).await?;
let stream = stream.try_into()?;
handle_stream(stream, handler).await handle_stream(stream, handler).await
})
.await
}) })
} }

View File

@ -20,8 +20,9 @@ use lact_schema::{
use pciid_parser::Database; use pciid_parser::Database;
use std::{ use std::{
borrow::Cow, borrow::Cow,
cell::RefCell,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Mutex}, rc::Rc,
time::Duration, time::Duration,
}; };
use tokio::{select, sync::Notify, task::JoinHandle, time::sleep}; use tokio::{select, sync::Notify, task::JoinHandle, time::sleep};
@ -33,14 +34,14 @@ use {
std::{fs::File, os::fd::IntoRawFd}, std::{fs::File, os::fd::IntoRawFd},
}; };
type FanControlHandle = (Arc<Notify>, JoinHandle<()>); type FanControlHandle = (Rc<Notify>, JoinHandle<()>);
pub struct GpuController { pub struct GpuController {
pub handle: GpuHandle, pub handle: GpuHandle,
#[cfg(feature = "libdrm_amdgpu_sys")] #[cfg(feature = "libdrm_amdgpu_sys")]
pub drm_handle: Option<DrmHandle>, pub drm_handle: Option<DrmHandle>,
pub pci_info: Option<GpuPciInfo>, pub pci_info: Option<GpuPciInfo>,
pub fan_control_handle: Mutex<Option<FanControlHandle>>, pub fan_control_handle: RefCell<Option<FanControlHandle>>,
} }
impl GpuController { impl GpuController {
@ -111,7 +112,7 @@ impl GpuController {
#[cfg(feature = "libdrm_amdgpu_sys")] #[cfg(feature = "libdrm_amdgpu_sys")]
drm_handle, drm_handle,
pci_info, pci_info,
fan_control_handle: Mutex::new(None), fan_control_handle: RefCell::new(None),
}) })
} }
@ -232,7 +233,7 @@ impl GpuController {
pub fn get_stats(&self, gpu_config: Option<&config::Gpu>) -> anyhow::Result<DeviceStats> { pub fn get_stats(&self, gpu_config: Option<&config::Gpu>) -> anyhow::Result<DeviceStats> {
let fan_control_enabled = self let fan_control_enabled = self
.fan_control_handle .fan_control_handle
.lock() .try_borrow()
.map_err(|err| anyhow!("Could not lock fan control mutex: {err}"))? .map_err(|err| anyhow!("Could not lock fan control mutex: {err}"))?
.is_some(); .is_some();
@ -311,13 +312,13 @@ impl GpuController {
let mut notify_guard = self let mut notify_guard = self
.fan_control_handle .fan_control_handle
.lock() .try_borrow_mut()
.map_err(|err| anyhow!("Lock error: {err}"))?; .map_err(|err| anyhow!("Lock error: {err}"))?;
let notify = Arc::new(Notify::new()); let notify = Rc::new(Notify::new());
let task_notify = notify.clone(); let task_notify = notify.clone();
let handle = tokio::spawn(async move { let handle = tokio::task::spawn_local(async move {
loop { loop {
select! { select! {
_ = sleep(interval) => (), _ = sleep(interval) => (),
@ -352,7 +353,7 @@ impl GpuController {
async fn stop_fan_control(&self, reset_mode: bool) -> anyhow::Result<()> { async fn stop_fan_control(&self, reset_mode: bool) -> anyhow::Result<()> {
let maybe_notify = self let maybe_notify = self
.fan_control_handle .fan_control_handle
.lock() .try_borrow_mut()
.map_err(|err| anyhow!("Lock error: {err}"))? .map_err(|err| anyhow!("Lock error: {err}"))?
.take(); .take();
if let Some((notify, handle)) = maybe_notify { if let Some((notify, handle)) = maybe_notify {

View File

@ -6,13 +6,7 @@ use lact_schema::{
request::{ConfirmCommand, SetClocksCommand}, request::{ConfirmCommand, SetClocksCommand},
ClocksInfo, DeviceInfo, DeviceListEntry, DeviceStats, FanCurveMap, ClocksInfo, DeviceInfo, DeviceListEntry, DeviceStats, FanCurveMap,
}; };
use std::{ use std::{cell::RefCell, collections::BTreeMap, env, path::PathBuf, rc::Rc, time::Duration};
collections::BTreeMap,
env,
path::PathBuf,
sync::{Arc, Mutex, RwLock},
time::Duration,
};
use tokio::{sync::oneshot, time::sleep}; use tokio::{sync::oneshot, time::sleep};
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -21,9 +15,9 @@ const CONTROLLERS_LOAD_RETRY_INTERVAL: u64 = 1;
#[derive(Clone)] #[derive(Clone)]
pub struct Handler { pub struct Handler {
pub config: Arc<RwLock<Config>>, pub config: Rc<RefCell<Config>>,
pub gpu_controllers: Arc<BTreeMap<String, GpuController>>, pub gpu_controllers: Rc<BTreeMap<String, GpuController>>,
confirm_config_tx: Arc<Mutex<Option<oneshot::Sender<ConfirmCommand>>>>, confirm_config_tx: Rc<RefCell<Option<oneshot::Sender<ConfirmCommand>>>>,
} }
impl<'a> Handler { impl<'a> Handler {
@ -45,9 +39,9 @@ impl<'a> Handler {
info!("initialized {} GPUs", controllers.len()); info!("initialized {} GPUs", controllers.len());
let handler = Self { let handler = Self {
gpu_controllers: Arc::new(controllers), gpu_controllers: Rc::new(controllers),
config: Arc::new(RwLock::new(config)), config: Rc::new(RefCell::new(config)),
confirm_config_tx: Arc::new(Mutex::new(None)), confirm_config_tx: Rc::new(RefCell::new(None)),
}; };
handler.load_config().await; handler.load_config().await;
@ -55,7 +49,7 @@ impl<'a> Handler {
} }
pub async fn load_config(&self) { pub async fn load_config(&self) {
let config = self.config.read().expect("Faied to lock config").clone(); // Clone to avoid locking the RwLock on an await point let config = self.config.borrow().clone(); // Clone to avoid locking the RwLock on an await point
for (id, gpu_config) in &config.gpus { for (id, gpu_config) in &config.gpus {
if let Some(controller) = self.gpu_controllers.get(id) { if let Some(controller) = self.gpu_controllers.get(id) {
@ -75,7 +69,7 @@ impl<'a> Handler {
) -> anyhow::Result<u64> { ) -> anyhow::Result<u64> {
if self if self
.confirm_config_tx .confirm_config_tx
.try_lock() .try_borrow_mut()
.map_err(|err| anyhow!("{err}"))? .map_err(|err| anyhow!("{err}"))?
.is_some() .is_some()
{ {
@ -85,7 +79,7 @@ impl<'a> Handler {
} }
let (gpu_config, apply_timer) = { let (gpu_config, apply_timer) = {
let config = self.config.read().map_err(|err| anyhow!("{err}"))?; let config = self.config.try_borrow().map_err(|err| anyhow!("{err}"))?;
let apply_timer = config.apply_settings_timer; let apply_timer = config.apply_settings_timer;
let gpu_config = config.gpus.get(&id).cloned().unwrap_or_default(); let gpu_config = config.gpus.get(&id).cloned().unwrap_or_default();
(gpu_config, apply_timer) (gpu_config, apply_timer)
@ -123,12 +117,12 @@ impl<'a> Handler {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
*self *self
.confirm_config_tx .confirm_config_tx
.try_lock() .try_borrow_mut()
.map_err(|err| anyhow!("{err}"))? = Some(tx); .map_err(|err| anyhow!("{err}"))? = Some(tx);
let handler = self.clone(); let handler = self.clone();
tokio::task::spawn(async move { tokio::task::spawn_local(async move {
let controller = handler let controller = handler
.controller_by_id(&id) .controller_by_id(&id)
.expect("GPU controller disappeared"); .expect("GPU controller disappeared");
@ -146,7 +140,7 @@ impl<'a> Handler {
Ok(ConfirmCommand::Confirm) => { Ok(ConfirmCommand::Confirm) => {
info!("saving updated config"); info!("saving updated config");
let mut config_guard = handler.config.write().unwrap(); let mut config_guard = handler.config.borrow_mut();
config_guard.gpus.insert(id, new_config); config_guard.gpus.insert(id, new_config);
if let Err(err) = config_guard.save() { if let Err(err) = config_guard.save() {
@ -162,7 +156,7 @@ impl<'a> Handler {
} }
} }
match handler.confirm_config_tx.try_lock() { match handler.confirm_config_tx.try_borrow_mut() {
Ok(mut guard) => *guard = None, Ok(mut guard) => *guard = None,
Err(err) => error!("{err}"), Err(err) => error!("{err}"),
} }
@ -199,7 +193,7 @@ impl<'a> Handler {
pub fn get_gpu_stats(&'a self, id: &str) -> anyhow::Result<DeviceStats> { pub fn get_gpu_stats(&'a self, id: &str) -> anyhow::Result<DeviceStats> {
let config = self let config = self
.config .config
.read() .try_borrow()
.map_err(|err| anyhow!("Could not read config: {err:?}"))?; .map_err(|err| anyhow!("Could not read config: {err:?}"))?;
let gpu_config = config.gpus.get(id); let gpu_config = config.gpus.get(id);
self.controller_by_id(id)?.get_stats(gpu_config) self.controller_by_id(id)?.get_stats(gpu_config)
@ -220,7 +214,10 @@ impl<'a> Handler {
let curve = FanCurve(raw_curve); let curve = FanCurve(raw_curve);
curve.validate()?; curve.validate()?;
let mut config_guard = self.config.write().map_err(|err| anyhow!("{err}"))?; let mut config_guard = self
.config
.try_borrow_mut()
.map_err(|err| anyhow!("{err}"))?;
let gpu_config = config_guard.gpus.entry(id.to_owned()).or_default(); let gpu_config = config_guard.gpus.entry(id.to_owned()).or_default();
if let Some(mut existing_settings) = gpu_config.fan_control_settings.clone() { if let Some(mut existing_settings) = gpu_config.fan_control_settings.clone() {
@ -312,7 +309,7 @@ impl<'a> Handler {
pub fn confirm_pending_config(&self, command: ConfirmCommand) -> anyhow::Result<()> { pub fn confirm_pending_config(&self, command: ConfirmCommand) -> anyhow::Result<()> {
if let Some(tx) = self if let Some(tx) = self
.confirm_config_tx .confirm_config_tx
.try_lock() .try_borrow_mut()
.map_err(|err| anyhow!("{err}"))? .map_err(|err| anyhow!("{err}"))?
.take() .take()
{ {
@ -326,7 +323,7 @@ impl<'a> Handler {
pub async fn cleanup(self) { pub async fn cleanup(self) {
let disable_clocks_cleanup = self let disable_clocks_cleanup = self
.config .config
.try_read() .try_borrow()
.map(|config| config.daemon.disable_clocks_cleanup) .map(|config| config.daemon.disable_clocks_cleanup)
.unwrap_or(false); .unwrap_or(false);

View File

@ -34,7 +34,7 @@ impl Server {
match self.listener.accept().await { match self.listener.accept().await {
Ok((stream, _)) => { Ok((stream, _)) => {
let handler = self.handler.clone(); let handler = self.handler.clone();
tokio::spawn(async move { tokio::task::spawn_local(async move {
if let Err(error) = handle_stream(stream, handler).await { if let Err(error) = handle_stream(stream, handler).await {
error!("{error}"); error!("{error}");
} }