diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 6dd80110..c04f4843 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1322,6 +1322,19 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "lqos_anonymous_stats_server" +version = "0.1.0" +dependencies = [ + "anyhow", + "env_logger", + "log", + "lqos_bus", + "serde", + "serde_cbor", + "tokio", +] + [[package]] name = "lqos_bus" version = "0.1.0" @@ -1333,6 +1346,7 @@ dependencies = [ "lqos_utils", "nix", "serde", + "serde_cbor", "thiserror", "tokio", ] @@ -2217,6 +2231,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_cbor" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5" +dependencies = [ + "half", + "serde", +] + [[package]] name = "serde_derive" version = "1.0.153" diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index 006da64b..186982e9 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -23,4 +23,5 @@ members = [ "lqusers", # CLI control for managing the web user list "lqos_utils", # A collection of macros and helpers we find useful "lqos_setup", # A quick CLI setup for first-time users + "lqos_anonymous_stats_server", # The server for gathering anonymous usage data. ] diff --git a/src/rust/lqos_anonymous_stats_server/Cargo.toml b/src/rust/lqos_anonymous_stats_server/Cargo.toml new file mode 100644 index 00000000..e1b324c6 --- /dev/null +++ b/src/rust/lqos_anonymous_stats_server/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "lqos_anonymous_stats_server" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio = { version = "1.25.0", features = ["full"] } +anyhow = "1" +env_logger = "0" +log = "0" +lqos_bus = { path = "../lqos_bus" } +serde = { version = "1.0", features = ["derive"] } +serde_cbor = "0" # For RFC8949/7409 format C binary objects diff --git a/src/rust/lqos_anonymous_stats_server/src/main.rs b/src/rust/lqos_anonymous_stats_server/src/main.rs new file mode 100644 index 00000000..a56fd674 --- /dev/null +++ b/src/rust/lqos_anonymous_stats_server/src/main.rs @@ -0,0 +1,13 @@ +mod stats_server; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Start the logger + env_logger::init_from_env( + env_logger::Env::default() + .filter_or(env_logger::DEFAULT_FILTER_ENV, "warn"), + ); + + let _ = stats_server::gather_stats().await; + Ok(()) +} \ No newline at end of file diff --git a/src/rust/lqos_anonymous_stats_server/src/stats_server.rs b/src/rust/lqos_anonymous_stats_server/src/stats_server.rs new file mode 100644 index 00000000..06e60c80 --- /dev/null +++ b/src/rust/lqos_anonymous_stats_server/src/stats_server.rs @@ -0,0 +1,65 @@ +use lqos_bus::anonymous::AnonymousUsageV1; +use std::net::SocketAddr; +use tokio::{io::AsyncReadExt, net::TcpListener, spawn}; + +pub async fn gather_stats() -> anyhow::Result<()> { + let listener = TcpListener::bind(":::9125").await?; + log::info!("Listening on :::9125"); + + loop { + let (mut socket, address) = listener.accept().await?; + log::info!("Connection from {address:?}"); + spawn(async move { + let mut buf = vec![0; 10240]; + if let Ok(n) = socket.read(&mut buf).await { + log::info!("Received {n} bytes from {address:?}"); + if let Err(e) = decode(&buf, address).await { + log::error!("Decode error from {address:?}"); + log::error!("{e:?}"); + } + } + }); + } +} + +async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<()> { + const U64SIZE: usize = std::mem::size_of::(); + let version_buf = &buf[0..2].try_into()?; + let version = u16::from_be_bytes(*version_buf); + let size_buf = &buf[2..2 + U64SIZE].try_into()?; + let size = u64::from_be_bytes(*size_buf); + log::info!("Received a version {version} payload of serialized size {size} from {address:?}"); + + match version { + 1 => { + let start = 2 + U64SIZE; + let end = start + size as usize; + let payload: Result = + serde_cbor::from_slice(&buf[start..end]); + match payload { + Ok(payload) => store_stats_v1(&payload, address).await, + Err(e) => { + log::error!( + "Unable to deserialize statistics sent from {address:?}" + ); + log::error!("{e:?}"); + Err(anyhow::Error::msg("Deserialize error")) + } + } + } + _ => { + log::error!( + "Unknown version of statistics: {version}, dumped {size} bytes" + ); + Err(anyhow::Error::msg("Version error")) + } + } +} + +async fn store_stats_v1( + payload: &AnonymousUsageV1, + address: SocketAddr, +) -> anyhow::Result<()> { + println!("{payload:?} {address:?}"); + Ok(()) +} diff --git a/src/rust/lqos_bus/Cargo.toml b/src/rust/lqos_bus/Cargo.toml index bb3fd25b..fd849777 100644 --- a/src/rust/lqos_bus/Cargo.toml +++ b/src/rust/lqos_bus/Cargo.toml @@ -16,6 +16,7 @@ lqos_utils = { path = "../lqos_utils" } tokio = { version = "1", features = [ "rt", "macros", "net", "io-util", "time" ] } log = "0" nix = "0" +serde_cbor = "0" # For RFC8949/7409 format C binary objects [dev-dependencies] criterion = { version = "0", features = [ "html_reports", "async_tokio"] } diff --git a/src/rust/lqos_bus/src/anonymous/mod.rs b/src/rust/lqos_bus/src/anonymous/mod.rs index 26af6293..cca8e0c1 100644 --- a/src/rust/lqos_bus/src/anonymous/mod.rs +++ b/src/rust/lqos_bus/src/anonymous/mod.rs @@ -1,2 +1,45 @@ mod v1; -pub use v1::*; \ No newline at end of file +use serde::{Serialize, Deserialize}; +use thiserror::Error; +pub use v1::*; + +#[derive(Debug, Clone, Serialize, Deserialize)] +/// Header for stats submission +pub struct StatsHeader { + /// The version to use (should be 1) + pub version: u16, + /// The number of bytes being submitted following the header + pub size: usize, +} + +/// Build an anonymous usage statistics buffer. +/// Transforms `stats` (`AnonymousUsageV1`) into a matching +/// header and payload, in a single buffer ready to send. +pub fn build_stats(stats: &AnonymousUsageV1) -> Result, StatsError> { + let mut result = Vec::new(); + let payload = serde_cbor::to_vec(stats); + if let Err(e) = payload { + log::warn!("Unable to serialize statistics. Not sending them."); + log::warn!("{e:?}"); + return Err(StatsError::SerializeFail); + } + let payload = payload.unwrap(); + + // Store the version as network order + result.extend( 1u16.to_be_bytes() ); + // Store the payload size as network order + result.extend( (payload.len() as u64).to_be_bytes() ); + // Store the payload itself + result.extend(payload); + + Ok(result) +} + + +/// Errors for anonymous usage statistics failure +#[derive(Error, Debug)] +pub enum StatsError { + /// Serializing the object failed + #[error("Unable to serialize object")] + SerializeFail +} \ No newline at end of file diff --git a/src/rust/lqos_bus/src/anonymous/v1.rs b/src/rust/lqos_bus/src/anonymous/v1.rs index dc501575..0ce33e74 100644 --- a/src/rust/lqos_bus/src/anonymous/v1.rs +++ b/src/rust/lqos_bus/src/anonymous/v1.rs @@ -1,4 +1,5 @@ -#[derive(Default, Debug)] +use serde::{Serialize, Deserialize}; +#[derive(Default, Debug, Serialize, Deserialize)] /// Defines data to be submitted if anonymous usage submission is /// enabled. This is protocol version 1. @@ -64,7 +65,7 @@ pub struct AnonymousUsageV1 { /// Description of installed NIC (version 1 data) -#[derive(Default, Debug)] +#[derive(Default, Debug, Serialize, Deserialize)] pub struct NicV1 { /// Description, usually "Ethernet" pub description: String, diff --git a/src/rust/lqosd/src/anonymous_usage/mod.rs b/src/rust/lqosd/src/anonymous_usage/mod.rs index 411781bf..5c8f035b 100644 --- a/src/rust/lqosd/src/anonymous_usage/mod.rs +++ b/src/rust/lqosd/src/anonymous_usage/mod.rs @@ -1,7 +1,7 @@ mod lshw; mod version; -use std::time::Duration; -use lqos_bus::anonymous::AnonymousUsageV1; +use std::{time::Duration, net::TcpStream, io::Write}; +use lqos_bus::anonymous::{AnonymousUsageV1, build_stats}; use lqos_config::{EtcLqos, LibreQoSConfig}; use lqos_sys::num_possible_cpus; use sysinfo::{System, SystemExt, CpuExt}; @@ -29,6 +29,7 @@ pub async fn start_anonymous_usage() { fn anonymous_usage_dump() -> anyhow::Result<()> { let mut data = AnonymousUsageV1::default(); let mut sys = System::new_all(); + let mut server = String::new(); sys.refresh_all(); data.total_memory = sys.total_memory(); data.available_memory = sys.available_memory(); @@ -74,12 +75,39 @@ fn anonymous_usage_dump() -> anyhow::Result<()> { data.using_xdp_bridge = bridge.use_xdp_bridge; } } + if let Some(anon) = cfg.usage_stats { + server = anon.anonymous_server; + } } data.git_hash = env!("GIT_HASH").to_string(); data.shaped_device_count = SHAPED_DEVICES.read().unwrap().devices.len(); data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len(); - println!("{data:#?}"); + send_stats(data, &server); Ok(()) +} + +fn send_stats(data: AnonymousUsageV1, server: &str) { + let buffer = build_stats(&data); + if let Err(e) = buffer { + log::warn!("Unable to serialize stats buffer"); + log::warn!("{e:?}"); + return; + } + let buffer = buffer.unwrap(); + + let stream = TcpStream::connect(server); + if let Err(e) = stream { + log::warn!("Unable to connect to {server}"); + log::warn!("{e:?}"); + return; + } + let mut stream = stream.unwrap(); + let result = stream.write(&buffer); + if let Err(e) = result { + log::warn!("Unable to send bytes to {server}"); + log::warn!("{e:?}"); + } + log::info!("Anonymous usage stats submitted"); } \ No newline at end of file