feat: listen to netlink kernel events

This commit is contained in:
Ilya Zlobintsev 2025-01-17 22:24:47 +02:00
parent 599adf63cc
commit e2257a83d2
3 changed files with 100 additions and 7 deletions

View File

@ -12,11 +12,14 @@ mod tests;
use anyhow::Context;
use config::Config;
use futures::future::select_all;
use server::system;
use server::{handle_stream, handler::Handler, Server};
use std::cell::Cell;
use std::sync::Arc;
use std::time::Instant;
use std::{os::unix::net::UnixStream as StdUnixStream, time::Duration};
use tokio::net::UnixStream;
use tokio::sync::Notify;
use tokio::{
runtime,
signal::unix::{signal, SignalKind},
@ -72,6 +75,7 @@ pub fn run() -> anyhow::Result<()> {
tokio::task::spawn_local(listen_config_changes(handler.clone()));
tokio::task::spawn_local(listen_exit_signals(handler.clone()));
tokio::task::spawn_local(listen_device_events(handler.clone()));
tokio::task::spawn_local(suspend::listen_events(handler));
server.run().await;
@ -136,6 +140,22 @@ async fn listen_config_changes(handler: Handler) {
}
}
async fn listen_device_events(handler: Handler) {
let notify = Arc::new(Notify::new());
let task_notify = notify.clone();
tokio::task::spawn_blocking(move || {
if let Err(err) = system::listen_netlink_kernel_event(&task_notify) {
error!("kernel event listener error: {err:#}");
}
});
loop {
notify.notified().await;
info!("got kernel drm subsystem event, reloading GPUs");
handler.reload_gpus().await;
}
}
async fn ensure_sufficient_uptime() {
match get_uptime() {
Ok(current_uptime) => {

View File

@ -96,10 +96,7 @@ pub struct Handler {
impl<'a> Handler {
pub async fn new(config: Config) -> anyhow::Result<Self> {
let base_path = match env::var("_LACT_DRM_SYSFS_PATH") {
Ok(custom_path) => PathBuf::from(custom_path),
Err(_) => PathBuf::from("/sys/class/drm"),
};
let base_path = drm_base_path();
Self::with_base_path(&base_path, config).await
}
@ -199,6 +196,23 @@ impl<'a> Handler {
Ok(())
}
pub async fn reload_gpus(&self) {
let base_path = drm_base_path();
match load_controllers(&base_path) {
Ok(new_controllers) => {
info!("GPU list reloaded with {} devices", new_controllers.len());
*self.gpu_controllers.write().await = new_controllers;
if let Err(err) = self.apply_current_config().await {
error!("could not reapply config: {err:#}");
}
}
Err(err) => {
error!("could not load GPU controllers: {err:#}");
}
}
}
async fn stop_profile_watcher(&self) {
let tx = self.profile_watcher_tx.borrow_mut().take();
if let Some(existing_stop_notify) = tx {
@ -1074,3 +1088,10 @@ fn add_path_to_archive(
}
Ok(())
}
fn drm_base_path() -> PathBuf {
match env::var("_LACT_DRM_SYSFS_PATH") {
Ok(custom_path) => PathBuf::from(custom_path),
Err(_) => PathBuf::from("/sys/class/drm"),
}
}

View File

@ -1,15 +1,19 @@
use anyhow::{anyhow, ensure, Context};
use lact_schema::{InitramfsType, SystemInfo, GIT_COMMIT};
use nix::sys::socket::{
bind, recv, socket, AddressFamily, MsgFlags, NetlinkAddr, SockFlag, SockProtocol, SockType,
};
use os_release::{OsRelease, OS_RELEASE};
use std::{
fs::{self, File, Permissions},
io::Write,
os::unix::prelude::PermissionsExt,
os::{fd::AsRawFd, unix::prelude::PermissionsExt},
path::Path,
process,
sync::atomic::{AtomicBool, Ordering},
};
use tokio::process::Command;
use tracing::{info, warn};
use tokio::{process::Command, sync::Notify};
use tracing::{error, info, warn};
static OC_TOGGLED: AtomicBool = AtomicBool::new(false);
@ -191,6 +195,54 @@ async fn run_command(exec: &str, args: &[&str]) -> anyhow::Result<()> {
}
}
pub(crate) fn listen_netlink_kernel_event(notify: &Notify) -> anyhow::Result<()> {
let socket = socket(
AddressFamily::Netlink,
SockType::Raw,
SockFlag::empty(),
SockProtocol::NetlinkKObjectUEvent,
)
.context("Could not setup netlink socket")?;
let sa = NetlinkAddr::new(process::id(), 1);
bind(socket.as_raw_fd(), &sa).context("Could not bind netlink socket")?;
let mut buf = Vec::new();
loop {
// Read only the size using the peek and truncate flags first
let msg_size = recv(
socket.as_raw_fd(),
&mut [],
MsgFlags::MSG_PEEK | MsgFlags::MSG_TRUNC,
)
.context("Could not read netlink message")?;
buf.clear();
buf.resize(msg_size, 0);
// Read the actual message into the buffer
recv(socket.as_raw_fd(), &mut buf, MsgFlags::empty())
.context("Could not read netlink message")?;
for raw_line in buf.split(|c| *c == b'\0') {
match std::str::from_utf8(raw_line) {
Ok(line) => {
if let Some(subsystem) = line.strip_prefix("SUBSYSTEM=") {
if subsystem == "drm" {
notify.notify_one();
}
}
}
Err(_) => {
error!(
"Got invalid unicode in uevent line {}",
String::from_utf8_lossy(raw_line)
);
}
}
}
}
}
#[cfg(test)]
mod tests {
use crate::server::system::detect_initramfs_type;