feat: gracefully shut down on more signals

This commit is contained in:
Ilya Zlobintsev
2023-01-07 17:38:29 +02:00
parent d06fb359ea
commit cc9dd98f29
8 changed files with 72 additions and 29 deletions

26
Cargo.lock generated
View File

@@ -288,6 +288,20 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "futures"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.25"
@@ -295,6 +309,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@@ -331,6 +346,12 @@ dependencies = [
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"
[[package]]
name = "futures-task"
version = "0.3.25"
@@ -343,9 +364,13 @@ version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
@@ -730,6 +755,7 @@ dependencies = [
"amdgpu-sysfs",
"anyhow",
"bincode",
"futures",
"lact-schema",
"nix",
"pciid-parser",

View File

@@ -29,3 +29,4 @@ tracing = "0.1"
tracing-subscriber = "0.3"
vulkano = { git = "https://github.com/vulkano-rs/vulkano" }
lact-schema = { path = "../lact-schema" }
futures = { version = "0.3.25", default-features = false, features = ["std", "alloc"] }

View File

@@ -57,7 +57,7 @@ impl Config {
pub fn save(&self) -> anyhow::Result<()> {
let path = get_path();
debug!("Saving config to {path:?}");
debug!("saving config to {path:?}");
let raw_config = serde_yaml::to_string(self)?;
fs::write(path, raw_config).context("Could not write config")
}

View File

@@ -22,7 +22,7 @@ where
match fork()? {
ForkResult::Parent { child } => {
trace!("Waiting for message from child");
trace!("waiting for message from child");
let mut size_buf = [0u8; size_of::<usize>()];
rx.read_exact(&mut size_buf)?;
@@ -31,7 +31,7 @@ where
let mut data_buf = vec![0u8; size];
rx.read_exact(&mut data_buf)?;
trace!("Received {} data bytes from child", data_buf.len());
trace!("received {} data bytes from child", data_buf.len());
waitpid(child, None)?;
@@ -42,7 +42,7 @@ where
}
ForkResult::Child => {
let response = f();
trace!("Sending response to parent: {response:?}");
trace!("sending response to parent: {response:?}");
let send_result = (|| {
let data = bincode::serialize(&response)?;
@@ -55,7 +55,7 @@ where
Ok(()) => 0,
Err(_) => 1,
};
trace!("Exiting child with code {exit_code}");
trace!("exiting child with code {exit_code}");
std::process::exit(exit_code);
}
}

View File

@@ -5,11 +5,22 @@ mod socket;
use anyhow::Context;
use config::Config;
use futures::future::select_all;
use server::{handle_stream, handler::Handler, Server};
use std::os::unix::net::UnixStream as StdUnixStream;
use std::str::FromStr;
use tokio::{runtime, signal::ctrl_c};
use tracing::{debug_span, Instrument, Level};
use tokio::{
runtime,
signal::unix::{signal, SignalKind},
};
use tracing::{debug_span, info, Instrument, Level};
const SHUTDOWN_SIGNALS: [SignalKind; 4] = [
SignalKind::terminate(),
SignalKind::interrupt(),
SignalKind::quit(),
SignalKind::hangup(),
];
pub fn run() -> anyhow::Result<()> {
let rt = runtime::Builder::new_current_thread()
@@ -25,18 +36,7 @@ pub fn run() -> anyhow::Result<()> {
let server = Server::new(config).await?;
let handler = server.handler.clone();
tokio::spawn(async move {
ctrl_c().await.expect("Could not listen to shutdown signal");
async {
handler.cleanup().await;
socket::cleanup();
}
.instrument(debug_span!("shutdown_cleanup"))
.await;
std::process::exit(0);
});
tokio::spawn(listen_shutdown(handler));
server.run().await;
Ok(())
})
@@ -55,3 +55,19 @@ pub fn run_embedded(stream: StdUnixStream) -> anyhow::Result<()> {
handle_stream(stream, handler).await
})
}
async fn listen_shutdown(handler: Handler) {
let mut signals = SHUTDOWN_SIGNALS
.map(|signal_kind| signal(signal_kind).expect("Could not listen to shutdown signal"));
let signal_futures = signals.iter_mut().map(|signal| Box::pin(signal.recv()));
select_all(signal_futures).await;
info!("cleaning up and shutting down...");
async {
handler.cleanup().await;
socket::cleanup();
}
.instrument(debug_span!("shutdown_cleanup"))
.await;
std::process::exit(0);
}

View File

@@ -100,7 +100,7 @@ impl GpuController {
) {
Ok(info) => Some(info),
Err(err) => {
warn!("Could not load vulkan info: {err}");
warn!("could not load vulkan info: {err}");
None
}
}
@@ -215,7 +215,7 @@ impl GpuController {
let notify = Arc::new(Notify::new());
let task_notify = notify.clone();
let task_curve = curve.clone();
debug!("Using curve {curve:?}");
debug!("using curve {curve:?}");
let handle = tokio::spawn(async move {
loop {
@@ -229,20 +229,20 @@ impl GpuController {
.remove(&temp_key)
.expect("Could not get temperature by given key");
let target_pwm = task_curve.pwm_at_temp(temp);
trace!("Fan control tick: setting pwm to {target_pwm}");
trace!("fan control tick: setting pwm to {target_pwm}");
if let Err(err) = hw_mon.set_fan_pwm(target_pwm) {
error!("Could not set fan speed: {err}, disabling fan control");
error!("could not set fan speed: {err}, disabling fan control");
break;
}
}
info!("Exited fan control task");
info!("exited fan control task");
});
*notify_guard = Some((notify, handle, curve));
info!(
"Started fan control with interval {}ms",
"started fan control with interval {}ms",
interval.as_millis()
);

View File

@@ -98,7 +98,7 @@ impl<'a> Handler {
.context("Could not set power cap")?;
}
} else {
info!("Could not find GPU with id {id} defined in configuration");
info!("could not find GPU with id {id} defined in configuration");
}
}
@@ -235,7 +235,7 @@ impl<'a> Handler {
for (id, gpu_config) in config.gpus {
if let Ok(controller) = self.controller_by_id(&id) {
if gpu_config.fan_control_enabled {
debug!("Stopping fan control");
debug!("stopping fan control");
controller
.stop_fan_control(true)
.await
@@ -246,7 +246,7 @@ impl<'a> Handler {
(gpu_config.power_cap, controller.handle.hw_monitors.first())
{
if let Ok(default_cap) = hw_mon.get_power_cap_default() {
debug!("Setting power limit to default");
debug!("setting power limit to default");
hw_mon
.set_power_cap(default_cap)
.expect("Could not set power cap to default");

View File

@@ -53,7 +53,7 @@ pub async fn handle_stream(stream: UnixStream, handler: Handler) -> anyhow::Resu
let mut buf = String::new();
while stream.read_line(&mut buf).await? != 0 {
debug!("Handling request: {}", buf.trim_end());
debug!("handling request: {}", buf.trim_end());
let maybe_request = serde_json::from_str(&buf);
let response = match maybe_request {