feat: remote management over TCP (#379)

* feat: abstract daemon client

* feat: daemon TCP listener implementation

* feat: support remote connections in the UI

* feat: make the client async

* feat: add documentation about remote management
This commit is contained in:
Ilya Zlobintsev
2024-09-22 14:17:33 +03:00
committed by GitHub
parent 573a599589
commit c08bdfef0b
19 changed files with 507 additions and 256 deletions

2
API.md
View File

@@ -1,6 +1,6 @@
# Description
The LACT Daemon exposes a JSON API over a unix socket, available on `/var/run/lactd.sock`. You can configure who has access to the socket in `/etc/lact/config.yaml` in the `daemon.admin_groups` field.
The LACT Daemon exposes a JSON API over a unix socket or TCP, available on `/var/run/lactd.sock` or an arbitrary TCP port. You can configure who has access to the unix socket in `/etc/lact/config.yaml` in the `daemon.admin_groups` field. The TCP listener is disabled by default for security reasons, see [this README section](./README.md#remote-management) for how to enable it.
The API expects newline-separated JSON objects, and returns a JSON object for every request.

3
Cargo.lock generated
View File

@@ -1585,6 +1585,7 @@ dependencies = [
"anyhow",
"lact-client",
"lact-schema",
"tokio",
]
[[package]]
@@ -1593,10 +1594,12 @@ version = "0.5.6"
dependencies = [
"amdgpu-sysfs",
"anyhow",
"futures",
"lact-schema",
"nix",
"serde",
"serde_json",
"tokio",
"tracing",
]

View File

@@ -19,6 +19,8 @@ serde_json = "1.0.111"
anyhow = "1.0.79"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
futures = { version = "0.3.30", default-features = false }
tokio = { version = "1.35.1", default-features = false }
nix = { version = "0.29.0", default-features = false }
chrono = "0.4.31"

View File

@@ -116,12 +116,12 @@ Steps:
- `make`
- `sudo make install`
It's also possible to build LACT without some of the features by using cargo feature flags.
This can be useful if some dependency is not available on your system, or is too old.
It's possible to change which features LACT gets built with.
To do so, replace the `make` command with the following variation:
Minimal build (no GUI!):
Headless build with no GUI:
```
cargo build --no-default-features -p lact
make build-release-headless
```
Build GUI with libadwaita support:
@@ -131,7 +131,31 @@ make build-release-libadwaita
# API
There is an API available over a unix socket. See [here](API.md) for more information.
There is an API available over a unix or TCP socket. See [here](API.md) for more information.
# Remote management
It's possible to have the LACT daemon running on one machine, and then manage it remotely from another.
This is disabled by default, as the TCP connection **does not have any authentication or encryption mechanism!**
Make sure to only use it in trusted networks and/or set up appropriate firewall rules.
To enable it, edit `/etc/lact/config.yaml` and add `tcp_listen_address` with your desired address and in the `daemon` section.
Example:
```yaml
daemon:
tcp_listen_address: 0.0.0.0:12853
log_level: info
admin_groups:
- wheel
- sudo
disable_clocks_cleanup: false
```
After this restart the service (`sudo systemctl restart lactd`).
To connect to a remote instance with the GUI, run it with `lact gui --tcp-address 192.168.1.10:12853`.
# CLI

View File

@@ -7,3 +7,6 @@ edition = "2021"
lact-client = { path = "../lact-client" }
lact-schema = { path = "../lact-schema", features = ["args"] }
anyhow = "1.0.79"
tokio = { workspace = true, features = [
"rt",
] }

View File

@@ -3,17 +3,22 @@ use lact_client::DaemonClient;
use lact_schema::args::{CliArgs, CliCommand};
pub fn run(args: CliArgs) -> Result<()> {
let client = DaemonClient::connect()?;
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
let client = DaemonClient::connect().await?;
let f = match args.subcommand {
CliCommand::ListGpus => list_gpus,
CliCommand::Info => info,
};
f(&args, &client)
match args.subcommand {
CliCommand::ListGpus => list_gpus(&args, &client).await,
CliCommand::Info => info(&args, &client).await,
}
})
}
fn list_gpus(_: &CliArgs, client: &DaemonClient) -> Result<()> {
let buffer = client.list_devices()?;
async fn list_gpus(_: &CliArgs, client: &DaemonClient) -> Result<()> {
let buffer = client.list_devices().await?;
for entry in buffer.inner()? {
let id = entry.id;
if let Some(name) = entry.name {
@@ -25,9 +30,9 @@ fn list_gpus(_: &CliArgs, client: &DaemonClient) -> Result<()> {
Ok(())
}
fn info(args: &CliArgs, client: &DaemonClient) -> Result<()> {
for id in extract_gpu_ids(args, client) {
let info_buffer = client.get_device_info(&id)?;
async fn info(args: &CliArgs, client: &DaemonClient) -> Result<()> {
for id in extract_gpu_ids(args, client).await {
let info_buffer = client.get_device_info(&id).await?;
let info = info_buffer.inner()?;
let pci_info = info.pci_info.context("GPU reports no pci info")?;
@@ -46,11 +51,11 @@ fn info(args: &CliArgs, client: &DaemonClient) -> Result<()> {
Ok(())
}
fn extract_gpu_ids(args: &CliArgs, client: &DaemonClient) -> Vec<String> {
async fn extract_gpu_ids(args: &CliArgs, client: &DaemonClient) -> Vec<String> {
match args.gpu_id {
Some(ref id) => vec![id.clone()],
None => {
let buffer = client.list_devices().expect("Could not list GPUs");
let buffer = client.list_devices().await.expect("Could not list GPUs");
buffer
.inner()
.expect("Could not deserialize GPUs response")

View File

@@ -11,5 +11,9 @@ anyhow = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = [
"net",
"sync",
] }
futures = { workspace = true }
nix = { workspace = true }

View File

@@ -0,0 +1,30 @@
pub mod tcp;
pub mod unix;
use anyhow::anyhow;
use futures::future::BoxFuture;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
pub trait DaemonConnection {
fn request<'a>(&'a mut self, payload: &'a str) -> BoxFuture<'a, anyhow::Result<String>>;
/// Establish a new connection to the same service
fn new_connection(&self) -> BoxFuture<'_, anyhow::Result<Box<dyn DaemonConnection>>>;
}
async fn request(
socket: &mut BufReader<impl AsyncRead + AsyncWrite + Unpin>,
payload: &str,
) -> anyhow::Result<String> {
if !socket.buffer().is_empty() {
return Err(anyhow!("Another request was not processed properly"));
}
socket.write_all(payload.as_bytes()).await?;
socket.write_all(b"\n").await?;
let mut response_payload = String::new();
socket.read_line(&mut response_payload).await?;
Ok(response_payload)
}

View File

@@ -0,0 +1,40 @@
use super::{request, DaemonConnection};
use anyhow::Context;
use futures::future::BoxFuture;
use tokio::{
io::BufReader,
net::{TcpStream, ToSocketAddrs},
};
use tracing::info;
pub struct TcpConnection {
inner: BufReader<TcpStream>,
}
impl TcpConnection {
pub async fn connect(addr: impl ToSocketAddrs) -> anyhow::Result<Box<Self>> {
info!("connecting to remote TCP service");
let inner = TcpStream::connect(addr).await?;
Ok(Box::new(Self {
inner: BufReader::new(inner),
}))
}
}
impl DaemonConnection for TcpConnection {
fn request<'a>(&'a mut self, payload: &'a str) -> BoxFuture<'a, anyhow::Result<String>> {
Box::pin(async { request(&mut self.inner, payload).await })
}
fn new_connection(&self) -> BoxFuture<'_, anyhow::Result<Box<dyn DaemonConnection>>> {
Box::pin(async {
let peer_addr = self
.inner
.get_ref()
.peer_addr()
.context("Could not read peer address")?;
Ok(Self::connect(peer_addr).await? as Box<dyn DaemonConnection>)
})
}
}

View File

@@ -0,0 +1,58 @@
use super::{request, DaemonConnection};
use anyhow::Context;
use futures::future::BoxFuture;
use std::os::unix::net::UnixStream as StdUnixStream;
use std::path::Path;
use tokio::{io::BufReader, net::UnixStream};
use tracing::info;
pub struct UnixConnection {
inner: BufReader<UnixStream>,
}
impl UnixConnection {
pub async fn connect(path: &Path) -> anyhow::Result<Box<Self>> {
info!("connecting to service at {path:?}");
let inner = UnixStream::connect(path).await?;
Ok(Box::new(Self {
inner: BufReader::new(inner),
}))
}
}
impl From<UnixStream> for UnixConnection {
fn from(inner: UnixStream) -> Self {
Self {
inner: BufReader::new(inner),
}
}
}
impl TryFrom<StdUnixStream> for UnixConnection {
type Error = anyhow::Error;
fn try_from(stream: StdUnixStream) -> Result<Self, Self::Error> {
Ok(UnixStream::from_std(stream)?.into())
}
}
impl DaemonConnection for UnixConnection {
fn request<'a>(&'a mut self, payload: &'a str) -> BoxFuture<'a, anyhow::Result<String>> {
Box::pin(async { request(&mut self.inner, payload).await })
}
fn new_connection(&self) -> BoxFuture<'_, anyhow::Result<Box<dyn DaemonConnection>>> {
Box::pin(async {
let peer_addr = self
.inner
.get_ref()
.peer_addr()
.context("Could not read peer address")?;
let path = peer_addr
.as_pathname()
.context("Connected socket addr is not a path")?;
Ok(Self::connect(path).await? as Box<dyn DaemonConnection>)
})
}
}

View File

@@ -1,3 +1,4 @@
mod connection;
#[macro_use]
mod macros;
@@ -6,7 +7,8 @@ pub use lact_schema as schema;
use amdgpu_sysfs::gpu_handle::{
power_profile_mode::PowerProfileModesTable, PerformanceLevel, PowerLevelKind,
};
use anyhow::{anyhow, Context};
use anyhow::Context;
use connection::{tcp::TcpConnection, unix::UnixConnection, DaemonConnection};
use nix::unistd::getuid;
use schema::{
request::{ConfirmCommand, SetClocksCommand},
@@ -15,101 +17,98 @@ use schema::{
};
use serde::Deserialize;
use std::{
cell::RefCell,
io::{BufRead, BufReader, Write},
marker::PhantomData,
os::unix::net::UnixStream,
path::{Path, PathBuf},
rc::Rc,
time::Duration,
future::Future, marker::PhantomData, os::unix::net::UnixStream, path::PathBuf, pin::Pin,
rc::Rc, time::Duration,
};
use tokio::{net::ToSocketAddrs, sync::Mutex};
use tracing::{error, info};
const RECONNECT_INTERVAL_MS: u64 = 250;
#[derive(Clone)]
pub struct DaemonClient {
stream: Rc<RefCell<(BufReader<UnixStream>, UnixStream)>>,
stream: Rc<Mutex<Box<dyn DaemonConnection>>>,
pub embedded: bool,
}
impl DaemonClient {
pub fn connect() -> anyhow::Result<Self> {
pub async fn connect() -> anyhow::Result<Self> {
let path =
get_socket_path().context("Could not connect to daemon: socket file not found")?;
info!("connecting to service at {path:?}");
let stream_pair = connect_pair(&path)?;
let stream = UnixConnection::connect(&path).await?;
Ok(Self {
stream: Rc::new(RefCell::new(stream_pair)),
stream: Rc::new(Mutex::new(stream)),
embedded: false,
})
}
pub async fn connect_tcp(addr: impl ToSocketAddrs) -> anyhow::Result<Self> {
let stream = TcpConnection::connect(addr).await?;
Ok(Self {
stream: Rc::new(Mutex::new(stream)),
embedded: false,
})
}
pub fn from_stream(stream: UnixStream, embedded: bool) -> anyhow::Result<Self> {
let reader = BufReader::new(stream.try_clone()?);
let connection = UnixConnection::try_from(stream)?;
Ok(Self {
stream: Rc::new(RefCell::new((reader, stream))),
stream: Rc::new(Mutex::new(Box::new(connection))),
embedded,
})
}
fn make_request<'a, T: Deserialize<'a>>(
&self,
request: Request,
) -> anyhow::Result<ResponseBuffer<T>> {
let mut stream_guard = self
.stream
.try_borrow_mut()
.map_err(|err| anyhow!("{err}"))?;
let (reader, writer) = &mut *stream_guard;
fn make_request<'a, 'r, T: Deserialize<'r>>(
&'a self,
request: Request<'a>,
) -> Pin<Box<dyn Future<Output = anyhow::Result<ResponseBuffer<T>>> + 'a>> {
Box::pin(async {
let mut stream = self.stream.lock().await;
if !reader.buffer().is_empty() {
return Err(anyhow!("Another request was not processed properly"));
}
let request_payload = serde_json::to_string(&request)?;
match stream.request(&request_payload).await {
Ok(response_payload) => Ok(ResponseBuffer {
buf: response_payload,
_phantom: PhantomData,
}),
Err(err) => {
error!("Could not make request: {err}, reconnecting to socket");
let response_payload = match process_request(&request, reader, writer) {
Ok(payload) => payload,
Err(err) => {
error!("Could not make request: {err}, reconnecting to socket");
let peer_addr = writer.peer_addr().context("Could not read peer address")?;
let path = peer_addr
.as_pathname()
.context("Connected socket addr is not a path")?;
loop {
match connect_pair(path) {
Ok(new_connection) => {
info!("Established new socket connection");
*stream_guard = new_connection;
drop(stream_guard);
return self.make_request(request);
}
Err(err) => {
error!("Could not reconnect: {err:#}, retrying in {RECONNECT_INTERVAL_MS}ms");
std::thread::sleep(Duration::from_millis(RECONNECT_INTERVAL_MS));
loop {
match stream.new_connection().await {
Ok(new_connection) => {
info!("Established new socket connection");
*stream = new_connection;
drop(stream);
return self.make_request(request).await;
}
Err(err) => {
error!("Could not reconnect: {err:#}, retrying in {RECONNECT_INTERVAL_MS}ms");
std::thread::sleep(Duration::from_millis(RECONNECT_INTERVAL_MS));
}
}
}
}
}
};
Ok(ResponseBuffer {
buf: response_payload,
_phantom: PhantomData,
})
}
pub fn list_devices(&self) -> anyhow::Result<ResponseBuffer<Vec<DeviceListEntry>>> {
self.make_request(Request::ListDevices)
pub async fn list_devices(&self) -> anyhow::Result<ResponseBuffer<Vec<DeviceListEntry>>> {
self.make_request(Request::ListDevices).await
}
pub fn set_fan_control(&self, cmd: FanOptions) -> anyhow::Result<u64> {
self.make_request(Request::SetFanControl(cmd))?.inner()
pub async fn set_fan_control(&self, cmd: FanOptions<'_>) -> anyhow::Result<u64> {
self.make_request(Request::SetFanControl(cmd))
.await?
.inner()
}
pub fn set_power_cap(&self, id: &str, cap: Option<f64>) -> anyhow::Result<u64> {
self.make_request(Request::SetPowerCap { id, cap })?.inner()
pub async fn set_power_cap(&self, id: &str, cap: Option<f64>) -> anyhow::Result<u64> {
self.make_request(Request::SetPowerCap { id, cap })
.await?
.inner()
}
request_plain!(get_system_info, SystemInfo, SystemInfo);
@@ -129,7 +128,7 @@ impl DaemonClient {
request_with_id!(reset_pmfw, ResetPmfw, u64);
request_with_id!(dump_vbios, VbiosDump, Vec<u8>);
pub fn set_performance_level(
pub async fn set_performance_level(
&self,
id: &str,
performance_level: PerformanceLevel,
@@ -137,35 +136,43 @@ impl DaemonClient {
self.make_request(Request::SetPerformanceLevel {
id,
performance_level,
})?
})
.await?
.inner()
}
pub fn set_clocks_value(&self, id: &str, command: SetClocksCommand) -> anyhow::Result<u64> {
self.make_request(Request::SetClocksValue { id, command })?
pub async fn set_clocks_value(
&self,
id: &str,
command: SetClocksCommand,
) -> anyhow::Result<u64> {
self.make_request(Request::SetClocksValue { id, command })
.await?
.inner()
}
pub fn batch_set_clocks_value(
pub async fn batch_set_clocks_value(
&self,
id: &str,
commands: Vec<SetClocksCommand>,
) -> anyhow::Result<u64> {
self.make_request(Request::BatchSetClocksValue { id, commands })?
self.make_request(Request::BatchSetClocksValue { id, commands })
.await?
.inner()
}
pub fn set_enabled_power_states(
pub async fn set_enabled_power_states(
&self,
id: &str,
kind: PowerLevelKind,
states: Vec<u8>,
) -> anyhow::Result<u64> {
self.make_request(Request::SetEnabledPowerStates { id, kind, states })?
self.make_request(Request::SetEnabledPowerStates { id, kind, states })
.await?
.inner()
}
pub fn set_power_profile_mode(
pub async fn set_power_profile_mode(
&self,
id: &str,
index: Option<u16>,
@@ -175,12 +182,14 @@ impl DaemonClient {
id,
index,
custom_heuristics,
})?
})
.await?
.inner()
}
pub fn confirm_pending_config(&self, command: ConfirmCommand) -> anyhow::Result<()> {
self.make_request(Request::ConfirmPendingConfig(command))?
pub async fn confirm_pending_config(&self, command: ConfirmCommand) -> anyhow::Result<()> {
self.make_request(Request::ConfirmPendingConfig(command))
.await?
.inner()
}
}
@@ -220,24 +229,3 @@ impl<'a, T: Deserialize<'a>> ResponseBuffer<T> {
}
}
}
fn connect_pair(path: &Path) -> anyhow::Result<(BufReader<UnixStream>, UnixStream)> {
let stream = UnixStream::connect(path).context("Could not connect to daemon")?;
let reader = BufReader::new(stream.try_clone()?);
Ok((reader, stream))
}
fn process_request(
request: &Request,
reader: &mut BufReader<UnixStream>,
writer: &mut UnixStream,
) -> anyhow::Result<String> {
let request_payload = serde_json::to_string(request)?;
writer.write_all(request_payload.as_bytes())?;
writer.write_all(b"\n")?;
let mut response_payload = String::new();
reader.read_line(&mut response_payload)?;
Ok(response_payload)
}

View File

@@ -1,15 +1,15 @@
macro_rules! request_with_id {
($name:ident, $variant:ident, $response:ty) => {
pub fn $name(&self, id: &str) -> anyhow::Result<ResponseBuffer<$response>> {
self.make_request(Request::$variant { id })
pub async fn $name(&self, id: &str) -> anyhow::Result<ResponseBuffer<$response>> {
self.make_request(Request::$variant { id }).await
}
};
}
macro_rules! request_plain {
($name:ident, $variant:ident, $response:ty) => {
pub fn $name(&self) -> anyhow::Result<ResponseBuffer<$response>> {
self.make_request(Request::$variant)
pub async fn $name(&self) -> anyhow::Result<ResponseBuffer<$response>> {
self.make_request(Request::$variant).await
}
};
}

View File

@@ -18,10 +18,7 @@ serde_json = { workspace = true }
tracing-subscriber = { workspace = true }
nix = { workspace = true, features = ["user", "fs"] }
chrono = { workspace = true }
pciid-parser = { version = "0.7", features = ["serde"] }
serde_yaml = "0.9"
tokio = { version = "1.35.1", features = [
tokio = { workspace = true, features = [
"rt",
"macros",
"net",
@@ -29,8 +26,11 @@ tokio = { version = "1.35.1", features = [
"signal",
"sync",
] }
futures = { workspace = true }
pciid-parser = { version = "0.7", features = ["serde"] }
serde_yaml = "0.9"
vulkano = { version = "0.34.1", default-features = false }
futures = { version = "0.3.30", default-features = false }
zbus = { version = "4.1.2", default-features = false, features = ["tokio"] }
libdrm_amdgpu_sys = "0.7.3"
tar = "0.4.40"

View File

@@ -45,6 +45,7 @@ pub struct Daemon {
pub admin_groups: Vec<String>,
#[serde(default)]
pub disable_clocks_cleanup: bool,
pub tcp_listen_address: Option<String>,
}
impl Default for Daemon {
@@ -53,6 +54,7 @@ impl Default for Daemon {
log_level: "info".to_owned(),
admin_groups: DEFAULT_ADMIN_GROUPS.map(str::to_owned).to_vec(),
disable_clocks_cleanup: false,
tcp_listen_address: None,
}
}
}

View File

@@ -12,6 +12,7 @@ use futures::future::select_all;
use server::{handle_stream, handler::Handler, Server};
use std::str::FromStr;
use std::{os::unix::net::UnixStream as StdUnixStream, time::Duration};
use tokio::net::UnixStream;
use tokio::{
runtime,
signal::unix::{signal, SignalKind},
@@ -79,7 +80,7 @@ pub fn run_embedded(stream: StdUnixStream) -> anyhow::Result<()> {
.run_until(async move {
let config = Config::default();
let handler = Handler::new(config).await?;
let stream = stream.try_into()?;
let stream = UnixStream::try_from(stream)?;
handle_stream(stream, handler).await
})

View File

@@ -5,49 +5,100 @@ mod vulkan;
use self::handler::Handler;
use crate::{config::Config, socket};
use anyhow::Context;
use futures::future::join_all;
use lact_schema::{Pong, Request, Response};
use serde::Serialize;
use std::fmt::Debug;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::{UnixListener, UnixStream},
io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader},
net::{TcpListener, UnixListener},
};
use tracing::{error, instrument, trace};
use tracing::{error, info, instrument, trace};
pub struct Server {
pub handler: Handler,
listener: UnixListener,
unix_listener: UnixListener,
tcp_listener: Option<TcpListener>,
}
impl Server {
pub async fn new(config: Config) -> anyhow::Result<Self> {
let listener = socket::listen(&config.daemon.admin_groups)?;
let unix_listener = socket::listen(&config.daemon.admin_groups)?;
let tcp_listener = if let Some(address) = &config.daemon.tcp_listen_address {
let listener = TcpListener::bind(address)
.await
.with_context(|| format!("Could not bind to TCP address {address}"))?;
info!("TCP listening on {}", listener.local_addr()?);
Some(listener)
} else {
info!("TCP listener disabled");
None
};
let handler = Handler::new(config).await?;
Ok(Self { handler, listener })
Ok(Self {
handler,
unix_listener,
tcp_listener,
})
}
pub async fn run(self) {
loop {
match self.listener.accept().await {
Ok((stream, _)) => {
let handler = self.handler.clone();
tokio::task::spawn_local(async move {
if let Err(error) = handle_stream(stream, handler).await {
error!("{error}");
}
});
}
Err(error) => {
error!("failed to handle connection: {error}");
let mut tasks = vec![];
let unix_handler = self.handler.clone();
let unix_task = tokio::task::spawn_local(async move {
loop {
match self.unix_listener.accept().await {
Ok((stream, _)) => {
let handler = unix_handler.clone();
tokio::task::spawn_local(async move {
if let Err(error) = handle_stream(stream, handler).await {
error!("{error}");
}
});
}
Err(error) => {
error!("failed to handle connection: {error}");
}
}
}
});
tasks.push(unix_task);
if let Some(tcp_listener) = self.tcp_listener {
let tcp_task = tokio::task::spawn_local(async move {
loop {
match tcp_listener.accept().await {
Ok((stream, _)) => {
let handler = self.handler.clone();
tokio::task::spawn_local(async move {
if let Err(error) = handle_stream(stream, handler).await {
error!("{error}");
}
});
}
Err(error) => {
error!("failed to handle connection: {error}");
}
}
}
});
tasks.push(tcp_task);
}
join_all(tasks).await;
}
}
#[instrument(level = "debug", skip(stream, handler))]
pub async fn handle_stream(stream: UnixStream, handler: Handler) -> anyhow::Result<()> {
pub async fn handle_stream<T: AsyncRead + AsyncWrite + Unpin>(
stream: T,
handler: Handler,
) -> anyhow::Result<()> {
let mut stream = BufReader::new(stream);
let mut buf = String::new();

View File

@@ -10,7 +10,7 @@ mod root_stack;
#[cfg(feature = "bench")]
pub use graphs_window::plot::{Plot, PlotData};
use crate::{create_connection, APP_ID, GUI_VERSION};
use crate::{APP_ID, GUI_VERSION};
use anyhow::{anyhow, Context};
use apply_revealer::{ApplyRevealer, ApplyRevealerMsg};
use confirmation_dialog::ConfirmationDialog;
@@ -28,17 +28,19 @@ use header::Header;
use lact_client::DaemonClient;
use lact_daemon::MODULE_CONF_PATH;
use lact_schema::{
args::GuiArgs,
request::{ConfirmCommand, SetClocksCommand},
FanOptions, GIT_COMMIT,
};
use msg::AppMsg;
use relm4::{
actions::{RelmAction, RelmActionGroup},
tokio, Component, ComponentController, ComponentParts, ComponentSender,
prelude::{AsyncComponent, AsyncComponentParts},
tokio, AsyncComponentSender, Component, ComponentController,
};
use root_stack::RootStack;
use std::{cell::RefCell, rc::Rc, sync::atomic::AtomicBool, time::Duration};
use tracing::{debug, error, trace, warn};
use std::{os::unix::net::UnixStream, rc::Rc, sync::atomic::AtomicBool, time::Duration};
use tracing::{debug, error, info, trace, warn};
const STATS_POLL_INTERVAL_MS: u64 = 250;
@@ -51,9 +53,9 @@ pub struct AppModel {
stats_task_handle: Option<glib::JoinHandle<()>>,
}
#[relm4::component(pub)]
impl Component for AppModel {
type Init = (DaemonClient, Option<anyhow::Error>);
#[relm4::component(pub, async)]
impl AsyncComponent for AppModel {
type Init = GuiArgs;
type Input = AppMsg;
type Output = ();
@@ -79,20 +81,41 @@ impl Component for AppModel {
}
}
fn init(
(daemon_client, conn_err): Self::Init,
async fn init(
args: Self::Init,
root: Self::Root,
sender: ComponentSender<Self>,
) -> ComponentParts<Self> {
sender: AsyncComponentSender<Self>,
) -> AsyncComponentParts<Self> {
let (daemon_client, conn_err) = match args.tcp_address {
Some(remote_addr) => {
info!("establishing connection to {remote_addr}");
match DaemonClient::connect_tcp(&remote_addr).await {
Ok(conn) => (conn, None),
Err(err) => {
error!("TCP connection error: {err:#}");
let (conn, _) = create_connection()
.await
.expect("Could not create fallback connection");
(conn, Some(err))
}
}
}
None => create_connection()
.await
.expect("Could not establish any daemon connection"),
};
register_actions(&sender);
let system_info_buf = daemon_client
.get_system_info()
.await
.expect("Could not fetch system info");
let system_info = system_info_buf.inner().expect("Invalid system info buffer");
let devices_buf = daemon_client
.list_devices()
.await
.expect("Could not list devices");
let devices = devices_buf.inner().expect("Could not access devices");
@@ -154,30 +177,24 @@ impl Component for AppModel {
let widgets = view_output!();
let embedded = model.daemon_client.embedded;
let conn_err = RefCell::new(conn_err);
root.connect_visible_notify(move |root| {
if embedded {
if let Some(err) = conn_err.borrow_mut().take() {
show_embedded_info(root, err);
}
}
});
if let Some(err) = conn_err {
show_embedded_info(&root, err);
}
sender.input(AppMsg::ReloadData { full: true });
ComponentParts { model, widgets }
AsyncComponentParts { model, widgets }
}
fn update_with_view(
async fn update_with_view(
&mut self,
widgets: &mut Self::Widgets,
msg: Self::Input,
sender: ComponentSender<Self>,
sender: AsyncComponentSender<Self>,
root: &Self::Root,
) {
trace!("update {msg:#?}");
if let Err(err) = self.handle_msg(msg, sender.clone(), root) {
if let Err(err) = self.handle_msg(msg, sender.clone(), root).await {
show_error(root, &err);
}
self.update_view(widgets, sender);
@@ -185,10 +202,10 @@ impl Component for AppModel {
}
impl AppModel {
fn handle_msg(
async fn handle_msg(
&mut self,
msg: AppMsg,
sender: ComponentSender<Self>,
sender: AsyncComponentSender<Self>,
root: &gtk::ApplicationWindow,
) -> Result<(), Rc<anyhow::Error>> {
match msg {
@@ -196,9 +213,9 @@ impl AppModel {
AppMsg::ReloadData { full } => {
let gpu_id = self.current_gpu_id()?;
if full {
self.update_gpu_data_full(gpu_id, sender)?;
self.update_gpu_data_full(gpu_id, sender).await?;
} else {
self.update_gpu_data(gpu_id, sender)?;
self.update_gpu_data(gpu_id, sender).await?;
}
Ok(())
}
@@ -211,6 +228,7 @@ impl AppModel {
}
AppMsg::ApplyChanges => self
.apply_settings(self.current_gpu_id()?, root, &sender)
.await
.map_err(|err| {
sender.input(AppMsg::ReloadData { full: false });
err.into()
@@ -222,18 +240,21 @@ impl AppModel {
AppMsg::ResetClocks => {
let gpu_id = self.current_gpu_id()?;
self.daemon_client
.set_clocks_value(&gpu_id, SetClocksCommand::Reset)?;
.set_clocks_value(&gpu_id, SetClocksCommand::Reset)
.await?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)?;
.confirm_pending_config(ConfirmCommand::Confirm)
.await?;
sender.input(AppMsg::ReloadData { full: false });
Ok(())
}
AppMsg::ResetPmfw => {
let gpu_id = self.current_gpu_id()?;
self.daemon_client.reset_pmfw(&gpu_id)?;
self.daemon_client.reset_pmfw(&gpu_id).await?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)?;
.confirm_pending_config(ConfirmCommand::Confirm)
.await?;
sender.input(AppMsg::ReloadData { full: false });
Ok(())
@@ -243,23 +264,23 @@ impl AppModel {
Ok(())
}
AppMsg::DumpVBios => {
self.dump_vbios(&self.current_gpu_id()?, root);
self.dump_vbios(&self.current_gpu_id()?, root).await;
Ok(())
}
AppMsg::DebugSnapshot => {
self.generate_debug_snapshot(root);
self.generate_debug_snapshot(root).await;
Ok(())
}
AppMsg::EnableOverdrive => {
toggle_overdrive(true, root.clone());
toggle_overdrive(&self.daemon_client, true, root.clone()).await;
Ok(())
}
AppMsg::DisableOverdrive => {
toggle_overdrive(false, root.clone());
toggle_overdrive(&self.daemon_client, false, root.clone()).await;
Ok(())
}
AppMsg::ResetConfig => {
self.daemon_client.reset_config()?;
self.daemon_client.reset_config().await?;
sender.input(AppMsg::ReloadData { full: true });
Ok(())
}
@@ -287,14 +308,15 @@ impl AppModel {
.context("No GPU selected")
}
fn update_gpu_data_full(
async fn update_gpu_data_full(
&mut self,
gpu_id: String,
sender: ComponentSender<AppModel>,
sender: AsyncComponentSender<AppModel>,
) -> anyhow::Result<()> {
let daemon_client = self.daemon_client.clone();
let info_buf = daemon_client
.get_device_info(&gpu_id)
.await
.context("Could not fetch info")?;
let info = info_buf.inner()?;
@@ -308,17 +330,17 @@ impl AppModel {
.unwrap_or(1.0);
self.graphs_window.set_vram_clock_ratio(vram_clock_ratio);
self.update_gpu_data(gpu_id, sender)?;
self.update_gpu_data(gpu_id, sender).await?;
self.root_stack.thermals_page.set_info(&info);
Ok(())
}
fn update_gpu_data(
async fn update_gpu_data(
&mut self,
gpu_id: String,
sender: ComponentSender<AppModel>,
sender: AsyncComponentSender<AppModel>,
) -> anyhow::Result<()> {
if let Some(stats_task) = self.stats_task_handle.take() {
stats_task.abort();
@@ -329,6 +351,7 @@ impl AppModel {
let stats = self
.daemon_client
.get_device_stats(&gpu_id)
.await
.context("Could not fetch stats")?
.inner()?;
@@ -336,7 +359,7 @@ impl AppModel {
self.root_stack.thermals_page.set_stats(&stats, true);
self.root_stack.info_page.set_stats(&stats);
let maybe_clocks_table = match self.daemon_client.get_device_clocks_info(&gpu_id) {
let maybe_clocks_table = match self.daemon_client.get_device_clocks_info(&gpu_id).await {
Ok(clocks_buf) => match clocks_buf.inner() {
Ok(info) => info.table,
Err(err) => {
@@ -351,7 +374,11 @@ impl AppModel {
};
self.root_stack.oc_page.set_clocks_table(maybe_clocks_table);
let maybe_modes_table = match self.daemon_client.get_device_power_profile_modes(&gpu_id) {
let maybe_modes_table = match self
.daemon_client
.get_device_power_profile_modes(&gpu_id)
.await
{
Ok(buf) => match buf.inner() {
Ok(table) => Some(table),
Err(err) => {
@@ -372,6 +399,7 @@ impl AppModel {
match self
.daemon_client
.get_power_states(&gpu_id)
.await
.and_then(|states| states.inner())
{
Ok(power_states) => {
@@ -417,11 +445,11 @@ impl AppModel {
Ok(())
}
fn apply_settings(
async fn apply_settings(
&self,
gpu_id: String,
root: &gtk::ApplicationWindow,
sender: &ComponentSender<Self>,
sender: &AsyncComponentSender<Self>,
) -> anyhow::Result<()> {
// TODO: Ask confirmation for everything, not just clocks
@@ -430,27 +458,33 @@ impl AppModel {
if let Some(cap) = self.root_stack.oc_page.get_power_cap() {
self.daemon_client
.set_power_cap(&gpu_id, Some(cap))
.await
.context("Failed to set power cap")?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)
.await
.context("Could not commit config")?;
}
// Reset the power profile mode for switching to/from manual performance level
self.daemon_client
.set_power_profile_mode(&gpu_id, None, vec![])
.await
.context("Could not set default power profile mode")?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)
.await
.context("Could not commit config")?;
if let Some(level) = self.root_stack.oc_page.get_performance_level() {
self.daemon_client
.set_performance_level(&gpu_id, level)
.await
.context("Failed to set power profile")?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)
.await
.context("Could not commit config")?;
let mode_index = self
@@ -466,9 +500,11 @@ impl AppModel {
self.daemon_client
.set_power_profile_mode(&gpu_id, mode_index, custom_heuristics)
.await
.context("Could not set active power profile mode")?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)
.await
.context("Could not commit config")?;
}
@@ -487,9 +523,11 @@ impl AppModel {
self.daemon_client
.set_fan_control(opts)
.await
.context("Could not set fan control")?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)
.await
.context("Could not commit config")?;
}
@@ -532,10 +570,12 @@ impl AppModel {
if !states.is_empty() {
self.daemon_client
.set_enabled_power_states(&gpu_id, kind, states)
.await
.context("Could not set power states")?;
self.daemon_client
.confirm_pending_config(ConfirmCommand::Confirm)
.await
.context("Could not commit config")?;
}
}
@@ -544,8 +584,9 @@ impl AppModel {
let delay = self
.daemon_client
.batch_set_clocks_value(&gpu_id, clocks_commands)
.await
.context("Could not commit clocks settings")?;
self.ask_settings_confirmation(delay, root, sender);
self.ask_settings_confirmation(delay, root, sender).await;
}
sender.input(AppMsg::ReloadData { full: false });
@@ -553,11 +594,11 @@ impl AppModel {
Ok(())
}
fn ask_settings_confirmation(
async fn ask_settings_confirmation(
&self,
mut delay: u64,
window: &gtk::ApplicationWindow,
sender: &ComponentSender<AppModel>,
sender: &AsyncComponentSender<AppModel>,
) {
let text = confirmation_text(delay);
let dialog = MessageDialog::builder()
@@ -616,18 +657,21 @@ impl AppModel {
diag.close();
if let Err(err) = daemon_client.confirm_pending_config(command) {
show_error(&window, &err);
}
sender.input(AppMsg::ReloadData { full: false });
relm4::spawn_local(async move {
if let Err(err) = daemon_client.confirm_pending_config(command).await {
show_error(&window, &err);
}
sender.input(AppMsg::ReloadData { full: false });
});
}
));
}
fn dump_vbios(&self, gpu_id: &str, root: &gtk::ApplicationWindow) {
async fn dump_vbios(&self, gpu_id: &str, root: &gtk::ApplicationWindow) {
match self
.daemon_client
.dump_vbios(gpu_id)
.await
.and_then(|response| response.inner())
{
Ok(vbios_data) => {
@@ -676,10 +720,11 @@ impl AppModel {
}
}
fn generate_debug_snapshot(&self, root: &gtk::ApplicationWindow) {
async fn generate_debug_snapshot(&self, root: &gtk::ApplicationWindow) {
match self
.daemon_client
.generate_debug_snapshot()
.await
.and_then(|response| response.inner())
{
Ok(path) => {
@@ -800,7 +845,7 @@ fn show_embedded_info(parent: &ApplicationWindow, err: anyhow::Error) {
fn start_stats_update_loop(
gpu_id: String,
daemon_client: DaemonClient,
sender: ComponentSender<AppModel>,
sender: AsyncComponentSender<AppModel>,
) -> glib::JoinHandle<()> {
debug!("spawning new stats update task with {STATS_POLL_INTERVAL_MS}ms interval");
let duration = Duration::from_millis(STATS_POLL_INTERVAL_MS);
@@ -810,6 +855,7 @@ fn start_stats_update_loop(
match daemon_client
.get_device_stats(&gpu_id)
.await
.and_then(|buffer| buffer.inner())
{
Ok(stats) => {
@@ -868,35 +914,30 @@ fn confirmation_text(seconds_left: u64) -> String {
format!("Do you want to keep the new settings? (Reverting in {seconds_left} seconds)")
}
fn toggle_overdrive(enable: bool, root: ApplicationWindow) {
let handle = relm4::spawn_blocking(move || {
let (daemon_client, _) =
create_connection().expect("Could not create new daemon connection");
if enable {
daemon_client
.enable_overdrive()
.and_then(|buffer| buffer.inner())
} else {
daemon_client
.disable_overdrive()
.and_then(|buffer| buffer.inner())
}
});
async fn toggle_overdrive(daemon_client: &DaemonClient, enable: bool, root: ApplicationWindow) {
let dialog = spinner_dialog(&root, "Regenerating initramfs (this may take a while)");
dialog.show();
relm4::spawn_local(async move {
let result = handle.await.unwrap();
dialog.hide();
let result = if enable {
daemon_client
.enable_overdrive()
.await
.and_then(|buffer| buffer.inner())
} else {
daemon_client
.disable_overdrive()
.await
.and_then(|buffer| buffer.inner())
};
match result {
Ok(msg) => oc_toggled_dialog(false, &msg),
Err(err) => {
show_error(&root, &err);
}
dialog.hide();
match result {
Ok(msg) => oc_toggled_dialog(false, &msg),
Err(err) => {
show_error(&root, &err);
}
});
}
}
fn spinner_dialog(parent: &ApplicationWindow, title: &str) -> MessageDialog {
@@ -920,7 +961,7 @@ fn spinner_dialog(parent: &ApplicationWindow, title: &str) -> MessageDialog {
dialog
}
fn register_actions(sender: &ComponentSender<AppModel>) {
fn register_actions(sender: &AsyncComponentSender<AppModel>) {
let mut group = RelmActionGroup::<AppActionGroup>::new();
macro_rules! actions {
@@ -968,3 +1009,27 @@ relm4::new_stateless_action!(DumpVBios, AppActionGroup, "dump-vbios");
relm4::new_stateless_action!(DebugSnapshot, AppActionGroup, "generate-debug-snapshot");
relm4::new_stateless_action!(DisableOverdrive, AppActionGroup, "disable-overdrive");
relm4::new_stateless_action!(ResetConfig, AppActionGroup, "reset-config");
async fn create_connection() -> anyhow::Result<(DaemonClient, Option<anyhow::Error>)> {
match DaemonClient::connect().await {
Ok(connection) => {
debug!("Established daemon connection");
Ok((connection, None))
}
Err(err) => {
info!("could not connect to socket: {err:#}");
info!("using a local daemon");
let (server_stream, client_stream) = UnixStream::pair()?;
std::thread::spawn(move || {
if let Err(err) = lact_daemon::run_embedded(server_stream) {
error!("Builtin daemon error: {err}");
}
});
let client = DaemonClient::from_stream(client_stream, true)?;
Ok((client, Some(err)))
}
}
}

View File

@@ -1,12 +1,10 @@
pub mod app;
use anyhow::{anyhow, Context};
use anyhow::Context;
use app::AppModel;
use lact_client::DaemonClient;
use lact_schema::args::GuiArgs;
use relm4::RelmApp;
use std::os::unix::net::UnixStream;
use tracing::{debug, error, info, metadata::LevelFilter};
use tracing::metadata::LevelFilter;
use tracing_subscriber::EnvFilter;
const GUI_VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -15,41 +13,15 @@ const APP_ID: &str = "io.github.lact-linux";
pub fn run(args: GuiArgs) -> anyhow::Result<()> {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.parse(args.log_level.unwrap_or_default())
.parse(args.log_level.as_deref().unwrap_or_default())
.context("Invalid log level")?;
tracing_subscriber::fmt().with_env_filter(env_filter).init();
if let Err(err) = gtk::init() {
return Err(anyhow!("Cannot initialize GTK: {err}"));
}
let (connection, connection_err) = create_connection()?;
// if let Err(err) = gtk::init() {
// return Err(anyhow!("Cannot initialize GTK: {err}"));
// }
let app = RelmApp::new(APP_ID).with_args(vec![]);
app.run::<AppModel>((connection, connection_err));
app.run_async::<AppModel>(args);
Ok(())
}
fn create_connection() -> anyhow::Result<(DaemonClient, Option<anyhow::Error>)> {
match DaemonClient::connect() {
Ok(connection) => {
debug!("Established daemon connection");
Ok((connection, None))
}
Err(err) => {
info!("could not connect to socket: {err:#}");
info!("using a local daemon");
let (server_stream, client_stream) = UnixStream::pair()?;
std::thread::spawn(move || {
if let Err(err) = lact_daemon::run_embedded(server_stream) {
error!("Builtin daemon error: {err}");
}
});
let client = DaemonClient::from_stream(client_stream, true)?;
Ok((client, Some(err)))
}
}
}

View File

@@ -22,6 +22,9 @@ pub enum Command {
pub struct GuiArgs {
#[arg(long)]
pub log_level: Option<String>,
/// Remote TCP address to connect to
#[arg(long)]
pub tcp_address: Option<String>,
}
#[derive(Parser)]