Initial stats collection protocol.

This commit is contained in:
Herbert Wolverson
2023-03-17 16:39:38 +00:00
parent 6d1519b9aa
commit c1c5f3bba4
9 changed files with 195 additions and 6 deletions

24
src/rust/Cargo.lock generated
View File

@@ -1322,6 +1322,19 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "lqos_anonymous_stats_server"
version = "0.1.0"
dependencies = [
"anyhow",
"env_logger",
"log",
"lqos_bus",
"serde",
"serde_cbor",
"tokio",
]
[[package]]
name = "lqos_bus"
version = "0.1.0"
@@ -1333,6 +1346,7 @@ dependencies = [
"lqos_utils",
"nix",
"serde",
"serde_cbor",
"thiserror",
"tokio",
]
@@ -2217,6 +2231,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.153"

View File

@@ -23,4 +23,5 @@ members = [
"lqusers", # CLI control for managing the web user list
"lqos_utils", # A collection of macros and helpers we find useful
"lqos_setup", # A quick CLI setup for first-time users
"lqos_anonymous_stats_server", # The server for gathering anonymous usage data.
]

View File

@@ -0,0 +1,13 @@
[package]
name = "lqos_anonymous_stats_server"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.25.0", features = ["full"] }
anyhow = "1"
env_logger = "0"
log = "0"
lqos_bus = { path = "../lqos_bus" }
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0" # For RFC8949/7409 format C binary objects

View File

@@ -0,0 +1,13 @@
mod stats_server;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Start the logger
env_logger::init_from_env(
env_logger::Env::default()
.filter_or(env_logger::DEFAULT_FILTER_ENV, "warn"),
);
let _ = stats_server::gather_stats().await;
Ok(())
}

View File

@@ -0,0 +1,65 @@
use lqos_bus::anonymous::AnonymousUsageV1;
use std::net::SocketAddr;
use tokio::{io::AsyncReadExt, net::TcpListener, spawn};
pub async fn gather_stats() -> anyhow::Result<()> {
let listener = TcpListener::bind(":::9125").await?;
log::info!("Listening on :::9125");
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
spawn(async move {
let mut buf = vec![0; 10240];
if let Ok(n) = socket.read(&mut buf).await {
log::info!("Received {n} bytes from {address:?}");
if let Err(e) = decode(&buf, address).await {
log::error!("Decode error from {address:?}");
log::error!("{e:?}");
}
}
});
}
}
async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<()> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
log::info!("Received a version {version} payload of serialized size {size} from {address:?}");
match version {
1 => {
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: Result<AnonymousUsageV1, _> =
serde_cbor::from_slice(&buf[start..end]);
match payload {
Ok(payload) => store_stats_v1(&payload, address).await,
Err(e) => {
log::error!(
"Unable to deserialize statistics sent from {address:?}"
);
log::error!("{e:?}");
Err(anyhow::Error::msg("Deserialize error"))
}
}
}
_ => {
log::error!(
"Unknown version of statistics: {version}, dumped {size} bytes"
);
Err(anyhow::Error::msg("Version error"))
}
}
}
async fn store_stats_v1(
payload: &AnonymousUsageV1,
address: SocketAddr,
) -> anyhow::Result<()> {
println!("{payload:?} {address:?}");
Ok(())
}

View File

@@ -16,6 +16,7 @@ lqos_utils = { path = "../lqos_utils" }
tokio = { version = "1", features = [ "rt", "macros", "net", "io-util", "time" ] }
log = "0"
nix = "0"
serde_cbor = "0" # For RFC8949/7409 format C binary objects
[dev-dependencies]
criterion = { version = "0", features = [ "html_reports", "async_tokio"] }

View File

@@ -1,2 +1,45 @@
mod v1;
use serde::{Serialize, Deserialize};
use thiserror::Error;
pub use v1::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Header for stats submission
pub struct StatsHeader {
/// The version to use (should be 1)
pub version: u16,
/// The number of bytes being submitted following the header
pub size: usize,
}
/// Build an anonymous usage statistics buffer.
/// Transforms `stats` (`AnonymousUsageV1`) into a matching
/// header and payload, in a single buffer ready to send.
pub fn build_stats(stats: &AnonymousUsageV1) -> Result<Vec<u8>, StatsError> {
let mut result = Vec::new();
let payload = serde_cbor::to_vec(stats);
if let Err(e) = payload {
log::warn!("Unable to serialize statistics. Not sending them.");
log::warn!("{e:?}");
return Err(StatsError::SerializeFail);
}
let payload = payload.unwrap();
// Store the version as network order
result.extend( 1u16.to_be_bytes() );
// Store the payload size as network order
result.extend( (payload.len() as u64).to_be_bytes() );
// Store the payload itself
result.extend(payload);
Ok(result)
}
/// Errors for anonymous usage statistics failure
#[derive(Error, Debug)]
pub enum StatsError {
/// Serializing the object failed
#[error("Unable to serialize object")]
SerializeFail
}

View File

@@ -1,4 +1,5 @@
#[derive(Default, Debug)]
use serde::{Serialize, Deserialize};
#[derive(Default, Debug, Serialize, Deserialize)]
/// Defines data to be submitted if anonymous usage submission is
/// enabled. This is protocol version 1.
@@ -64,7 +65,7 @@ pub struct AnonymousUsageV1 {
/// Description of installed NIC (version 1 data)
#[derive(Default, Debug)]
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct NicV1 {
/// Description, usually "Ethernet"
pub description: String,

View File

@@ -1,7 +1,7 @@
mod lshw;
mod version;
use std::time::Duration;
use lqos_bus::anonymous::AnonymousUsageV1;
use std::{time::Duration, net::TcpStream, io::Write};
use lqos_bus::anonymous::{AnonymousUsageV1, build_stats};
use lqos_config::{EtcLqos, LibreQoSConfig};
use lqos_sys::num_possible_cpus;
use sysinfo::{System, SystemExt, CpuExt};
@@ -29,6 +29,7 @@ pub async fn start_anonymous_usage() {
fn anonymous_usage_dump() -> anyhow::Result<()> {
let mut data = AnonymousUsageV1::default();
let mut sys = System::new_all();
let mut server = String::new();
sys.refresh_all();
data.total_memory = sys.total_memory();
data.available_memory = sys.available_memory();
@@ -74,12 +75,39 @@ fn anonymous_usage_dump() -> anyhow::Result<()> {
data.using_xdp_bridge = bridge.use_xdp_bridge;
}
}
if let Some(anon) = cfg.usage_stats {
server = anon.anonymous_server;
}
}
data.git_hash = env!("GIT_HASH").to_string();
data.shaped_device_count = SHAPED_DEVICES.read().unwrap().devices.len();
data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len();
println!("{data:#?}");
send_stats(data, &server);
Ok(())
}
fn send_stats(data: AnonymousUsageV1, server: &str) {
let buffer = build_stats(&data);
if let Err(e) = buffer {
log::warn!("Unable to serialize stats buffer");
log::warn!("{e:?}");
return;
}
let buffer = buffer.unwrap();
let stream = TcpStream::connect(server);
if let Err(e) = stream {
log::warn!("Unable to connect to {server}");
log::warn!("{e:?}");
return;
}
let mut stream = stream.unwrap();
let result = stream.write(&buffer);
if let Err(e) = result {
log::warn!("Unable to send bytes to {server}");
log::warn!("{e:?}");
}
log::info!("Anonymous usage stats submitted");
}