Minimal - not very safe at all - submission.

This commit is contained in:
Herbert Wolverson 2023-04-07 19:50:05 +00:00
parent ac97eb450e
commit 99b3702142
10 changed files with 561 additions and 119 deletions

276
src/rust/Cargo.lock generated
View File

@ -207,6 +207,12 @@ version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
[[package]]
name = "binascii"
version = "0.1.4"
@ -455,7 +461,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb"
dependencies = [
"aes-gcm",
"base64",
"base64 0.20.0",
"hkdf",
"hmac",
"percent-encoding",
@ -845,6 +851,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
@ -1136,6 +1157,29 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "idna"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "indexmap"
version = "1.9.2"
@ -1229,6 +1273,12 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e537132deb99c0eb4b752f0346b6a836200eaaa3516dd7e5514b63930a09e5d"
[[package]]
name = "ipnet"
version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f"
[[package]]
name = "is-terminal"
version = "0.4.4"
@ -1581,6 +1631,7 @@ dependencies = [
"nix",
"num-traits",
"once_cell",
"reqwest",
"serde",
"serde_json",
"signal-hook",
@ -1619,6 +1670,20 @@ dependencies = [
"lqos_config",
]
[[package]]
name = "lts_node"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"env_logger",
"log",
"lqos_bus",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -1717,6 +1782,24 @@ dependencies = [
"version_check",
]
[[package]]
name = "native-tls"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]
[[package]]
name = "nix"
version = "0.26.2"
@ -1813,6 +1896,50 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openssl"
version = "0.10.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d2f106ab837a24e03672c59b1239669a0596406ff657c3c0835b6b7f0f35a33"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-sys"
version = "0.9.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a20eace9dc2d82904039cb76dcf50fb1a0bba071cfd1629720b5d6f1ddba0fa"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]]
name = "os_str_bytes"
version = "6.4.1"
@ -2195,6 +2322,43 @@ version = "0.6.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
[[package]]
name = "reqwest"
version = "0.11.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27b71749df584b7f4cac2c426c127a7c785a5106cc98f7a8feb044115f0fa254"
dependencies = [
"base64 0.21.0",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "rmp"
version = "0.8.11"
@ -2356,6 +2520,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "schannel"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "713cfb06c7059f3588fb8044c0fad1d09e3c01d225e25b9220dbfdcf16dbb1b3"
dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "scoped-tls"
version = "1.0.1"
@ -2368,6 +2541,29 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "security-framework"
version = "2.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a332be01508d814fed64bf28f798a146d73792121129962fdf335bb3c49a4254"
dependencies = [
"bitflags",
"core-foundation",
"core-foundation-sys",
"libc",
"security-framework-sys",
]
[[package]]
name = "security-framework-sys"
version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31c9bb296072e961fcbd8853511dd39c2d8be2deb1e17c6860b1d30732b323b4"
dependencies = [
"core-foundation-sys",
"libc",
]
[[package]]
name = "serde"
version = "1.0.153"
@ -2746,6 +2942,21 @@ dependencies = [
"serde_json",
]
[[package]]
name = "tinyvec"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.26.0"
@ -2777,6 +2988,16 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]]
name = "tokio-stream"
version = "0.1.12"
@ -2968,12 +3189,27 @@ dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460"
[[package]]
name = "unicode-ident"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4"
[[package]]
name = "unicode-normalization"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921"
dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.10.1"
@ -3008,6 +3244,17 @@ dependencies = [
"subtle",
]
[[package]]
name = "url"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
]
[[package]]
name = "uuid"
version = "1.3.0"
@ -3025,6 +3272,12 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "version_check"
version = "0.9.4"
@ -3083,6 +3336,18 @@ dependencies = [
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f219e0d211ba40266969f6dbdd90636da12f75bee4fc9d6c23d1260dadb51454"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.84"
@ -3306,6 +3571,15 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "xdp_iphash_to_cpu_cmdline"
version = "0.1.0"

View File

@ -28,4 +28,5 @@ members = [
"lqos_heimdall", # Library for managing Heimdall flow watching
"lqstats", # A CLI utility for retrieving long-term statistics
"long_term_stats/license_server", # Licensing Server for LibreQoS Long-term stats
"long_term_stats/lts_node", # Long-term stats cluster node
]

View File

@ -0,0 +1,15 @@
[package]
name = "lts_node"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
tokio = { version = "1.25.0", features = ["full"] }
anyhow = "1"
env_logger = "0"
log = "0"
serde = { version = "1.0", features = ["derive"] }
axum = "0.6"
lqos_bus = { path = "../../lqos_bus" }
serde_json = "1"

View File

@ -0,0 +1,40 @@
use axum::{
response::Html,
routing::{get, post},
Router, Json,
};
use lqos_bus::long_term_stats::StatsSubmission;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Start the logger
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "warn"),
);
tokio::spawn(server()).await;
Ok(())
}
async fn server() {
let app = Router::new()
.route("/", get(index_page))
.route("/submit", post(on_submission));
log::info!("Listening for web traffic on 0.0.0.0:9127");
axum::Server::bind(&"0.0.0.0:9127".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}
async fn index_page() -> Html<String> {
Html("Hello, World!".to_string())
}
async fn on_submission(Json(payload): Json<StatsSubmission>) -> Html<String> {
log::info!("Submission arrived");
println!("{payload:#?}");
Html("Hello, World!".to_string())
}

View File

@ -69,6 +69,23 @@ pub struct StatsTreeNode {
pub immediate_parent: Option<usize>,
}
/// Collation of all stats for a given time period
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct StatsSubmission {
/// License Key
pub key: String,
/// Node ID
pub node_id: String,
/// Timestamp of the collation (UNIX time)
pub timestamp: u64,
/// Total traffic statistics
pub totals: StatsTotals,
/// Per-host statistics
pub hosts: Vec<StatsHost>,
/// Tree of traffic summaries
pub tree: Vec<StatsTreeNode>,
}
/// Network-transmitted query to ask the status of a license
/// key.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]

View File

@ -28,6 +28,7 @@ sysinfo = "0"
dashmap = "5"
num-traits = "0.2"
thiserror = "1"
reqwest = { version = "0.11", features = ["json"] }
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]

View File

@ -1,67 +1,101 @@
use lqos_config::EtcLqos;
use lqos_utils::unix_time::unix_now;
use super::{
collation_utils::{MinMaxAvg, MinMaxAvgPair},
submission::new_submission, tree::{NetworkTreeEntry, get_network_tree},
collation_utils::{MinMaxAvg, MinMaxAvgPair},
submission::new_submission,
tree::{get_network_tree, NetworkTreeEntry},
};
use crate::long_term_stats::data_collector::SESSION_BUFFER;
use std::{collections::HashMap, net::IpAddr};
#[derive(Debug, Clone)]
pub(crate) struct StatsSubmission {
pub(crate) timestamp: u64,
pub(crate) bits_per_second: MinMaxAvgPair<u64>,
pub(crate) shaped_bits_per_second: MinMaxAvgPair<u64>,
pub(crate) packets_per_second: MinMaxAvgPair<u64>,
pub(crate) hosts: Vec<SubmissionHost>,
pub(crate) tree: Vec<NetworkTreeEntry>,
pub(crate) timestamp: u64,
pub(crate) bits_per_second: MinMaxAvgPair<u64>,
pub(crate) shaped_bits_per_second: MinMaxAvgPair<u64>,
pub(crate) packets_per_second: MinMaxAvgPair<u64>,
pub(crate) hosts: Vec<SubmissionHost>,
pub(crate) tree: Vec<NetworkTreeEntry>,
}
#[derive(Debug, Clone)]
pub(crate) struct SubmissionHost {
pub(crate) circuit_id: String,
pub(crate) ip_address: IpAddr,
pub(crate) bits_per_second: MinMaxAvgPair<u64>,
pub(crate) median_rtt: MinMaxAvg<u32>,
pub(crate) tree_parent_indices: Vec<usize>,
pub(crate) circuit_id: String,
pub(crate) ip_address: IpAddr,
pub(crate) bits_per_second: MinMaxAvgPair<u64>,
pub(crate) median_rtt: MinMaxAvg<u32>,
pub(crate) tree_parent_indices: Vec<usize>,
}
impl From<StatsSubmission> for lqos_bus::long_term_stats::StatsSubmission {
fn from(value: StatsSubmission) -> Self {
let cfg = EtcLqos::load().unwrap();
let lts_cfg = cfg.long_term_stats.unwrap();
let key = lts_cfg.license_key.unwrap_or("".to_string());
let node_id = cfg.node_id.unwrap_or("".to_string());
Self {
key,
node_id,
timestamp: value.timestamp,
totals: value.clone().into(),
hosts: value.hosts.into_iter().map(Into::into).collect(),
tree: value.tree.into_iter().map(Into::into).collect(),
}
}
}
impl From<NetworkTreeEntry> for lqos_bus::long_term_stats::StatsTreeNode {
fn from(value: NetworkTreeEntry) -> Self {
Self {
name: value.name.clone(),
max_throughput: value.max_throughput,
parents: value.parents,
immediate_parent: value.immediate_parent,
}
}
}
impl From<StatsSubmission> for lqos_bus::long_term_stats::StatsTotals {
fn from(value: StatsSubmission) -> Self {
Self {
bits: value.bits_per_second.into(),
shaped_bits: value.shaped_bits_per_second.into(),
packets: value.packets_per_second.into(),
fn from(value: StatsSubmission) -> Self {
Self {
bits: value.bits_per_second.into(),
shaped_bits: value.shaped_bits_per_second.into(),
packets: value.packets_per_second.into(),
}
}
}
}
impl From<MinMaxAvgPair<u64>> for lqos_bus::long_term_stats::StatsSummary {
fn from(value: MinMaxAvgPair<u64>) -> Self {
Self {
min: (value.down.min, value.up.min),
max: (value.down.max, value.up.max),
avg: (value.down.avg, value.up.avg),
fn from(value: MinMaxAvgPair<u64>) -> Self {
Self {
min: (value.down.min, value.up.min),
max: (value.down.max, value.up.max),
avg: (value.down.avg, value.up.avg),
}
}
}
}
impl From<MinMaxAvg<u32>> for lqos_bus::long_term_stats::StatsRttSummary {
fn from(value: MinMaxAvg<u32>) -> Self {
Self { min: value.min, max: value.max, avg: value.avg }
}
fn from(value: MinMaxAvg<u32>) -> Self {
Self {
min: value.min,
max: value.max,
avg: value.avg,
}
}
}
impl From<SubmissionHost> for lqos_bus::long_term_stats::StatsHost {
fn from(value: SubmissionHost) -> Self {
Self {
circuit_id: value.circuit_id.to_string(),
ip_address: value.ip_address.to_string(),
bits: value.bits_per_second.into(),
rtt: value.median_rtt.into(),
tree_indices: value.tree_parent_indices,
fn from(value: SubmissionHost) -> Self {
Self {
circuit_id: value.circuit_id.to_string(),
ip_address: value.ip_address.to_string(),
bits: value.bits_per_second.into(),
rtt: value.median_rtt.into(),
tree_indices: value.tree_parent_indices,
}
}
}
}
/// Every (n) seconds, collate the accumulated stats buffer
@ -71,89 +105,85 @@ impl From<SubmissionHost> for lqos_bus::long_term_stats::StatsHost {
/// (n) is defined in /etc/lqos.conf in the `collation_period_seconds`
/// field of the `[long_term_stats]` section.
pub(crate) async fn collate_stats() {
// Obtain exclusive access to the session
let mut writer = SESSION_BUFFER.lock().await;
if writer.is_empty() {
// Nothing to do - so exit
return;
}
// Obtain exclusive access to the session
let mut writer = SESSION_BUFFER.lock().await;
if writer.is_empty() {
// Nothing to do - so exit
return;
}
// Collate total stats for the period
let bps: Vec<(u64, u64)> =
writer.iter().map(|e| e.bits_per_second).collect();
let pps: Vec<(u64, u64)> =
writer.iter().map(|e| e.packets_per_second).collect();
let sbps: Vec<(u64, u64)> =
writer.iter().map(|e| e.shaped_bits_per_second).collect();
let bits_per_second = MinMaxAvgPair::from_slice(&bps);
let packets_per_second = MinMaxAvgPair::from_slice(&pps);
let shaped_bits_per_second = MinMaxAvgPair::from_slice(&sbps);
// Collate total stats for the period
let bps: Vec<(u64, u64)> = writer.iter().map(|e| e.bits_per_second).collect();
let pps: Vec<(u64, u64)> = writer.iter().map(|e| e.packets_per_second).collect();
let sbps: Vec<(u64, u64)> = writer.iter().map(|e| e.shaped_bits_per_second).collect();
let bits_per_second = MinMaxAvgPair::from_slice(&bps);
let packets_per_second = MinMaxAvgPair::from_slice(&pps);
let shaped_bits_per_second = MinMaxAvgPair::from_slice(&sbps);
let mut submission = StatsSubmission {
timestamp: unix_now().unwrap_or(0),
bits_per_second,
shaped_bits_per_second,
packets_per_second,
hosts: Vec::new(),
tree: get_network_tree(),
};
let mut submission = StatsSubmission {
timestamp: unix_now().unwrap_or(0),
bits_per_second,
shaped_bits_per_second,
packets_per_second,
hosts: Vec::new(),
tree: get_network_tree(),
};
// Collate host stats
let mut host_accumulator =
HashMap::<(&IpAddr, &String), Vec<(u64, u64, f32, Vec<usize>)>>::new();
writer.iter().for_each(|session| {
session.hosts.iter().for_each(|host| {
if let Some(ha) =
host_accumulator.get_mut(&(&host.ip_address, &host.circuit_id))
{
ha.push((
host.bits_per_second.0,
host.bits_per_second.1,
host.median_rtt,
host.tree_parent_indices.clone(),
));
} else {
host_accumulator.insert(
(&host.ip_address, &host.circuit_id),
vec![(
host.bits_per_second.0,
host.bits_per_second.1,
host.median_rtt,
host.tree_parent_indices.clone(),
)],
);
}
// Collate host stats
let mut host_accumulator =
HashMap::<(&IpAddr, &String), Vec<(u64, u64, f32, Vec<usize>)>>::new();
writer.iter().for_each(|session| {
session.hosts.iter().for_each(|host| {
if let Some(ha) = host_accumulator.get_mut(&(&host.ip_address, &host.circuit_id)) {
ha.push((
host.bits_per_second.0,
host.bits_per_second.1,
host.median_rtt,
host.tree_parent_indices.clone(),
));
} else {
host_accumulator.insert(
(&host.ip_address, &host.circuit_id),
vec![(
host.bits_per_second.0,
host.bits_per_second.1,
host.median_rtt,
host.tree_parent_indices.clone(),
)],
);
}
});
});
});
for ((ip, circuit), data) in host_accumulator.iter() {
let bps: Vec<(u64, u64)> =
data.iter().map(|(d, u, _rtt, _tree)| (*d, *u)).collect();
let bps = MinMaxAvgPair::<u64>::from_slice(&bps);
let fps: Vec<u32> =
data.iter().map(|(_d, _u, rtt, _tree)| (*rtt * 100.0) as u32).collect();
let fps = MinMaxAvg::<u32>::from_slice(&fps);
let tree = data
.iter()
.cloned()
.map(|(_d, _u, _rtt, tree)| tree)
.next()
.unwrap_or(Vec::new());
submission.hosts.push(SubmissionHost {
circuit_id: circuit.to_string(),
ip_address: **ip,
bits_per_second: bps,
median_rtt: fps,
tree_parent_indices: tree,
});
}
for ((ip, circuit), data) in host_accumulator.iter() {
let bps: Vec<(u64, u64)> = data.iter().map(|(d, u, _rtt, _tree)| (*d, *u)).collect();
let bps = MinMaxAvgPair::<u64>::from_slice(&bps);
let fps: Vec<u32> = data
.iter()
.map(|(_d, _u, rtt, _tree)| (*rtt * 100.0) as u32)
.collect();
let fps = MinMaxAvg::<u32>::from_slice(&fps);
let tree = data
.iter()
.cloned()
.map(|(_d, _u, _rtt, tree)| tree)
.next()
.unwrap_or(Vec::new());
submission.hosts.push(SubmissionHost {
circuit_id: circuit.to_string(),
ip_address: **ip,
bits_per_second: bps,
median_rtt: fps,
tree_parent_indices: tree,
});
}
// Remove all gathered stats
writer.clear();
// Remove all gathered stats
writer.clear();
// Drop the lock
std::mem::drop(writer);
// Drop the lock
std::mem::drop(writer);
// Submit
new_submission(submission).await;
// Submit
new_submission(submission).await;
}

View File

@ -0,0 +1,64 @@
use lqos_bus::long_term_stats::StatsSubmission;
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
struct QueueSubmission {
attempts: u8,
body: StatsSubmission,
sent: bool,
}
pub(crate) struct Queue {
queue: Mutex<Vec<QueueSubmission>>,
}
impl Queue {
fn new() -> Self {
Self {
queue: Mutex::new(Vec::new()),
}
}
pub async fn push(&self, data: lqos_bus::long_term_stats::StatsSubmission, host: &str) {
{
let mut lock = self.queue.lock().await;
lock.push(QueueSubmission {
attempts: 0,
sent: false,
body: data,
});
}
tokio::spawn(send_queue(host.to_string()));
}
}
pub(crate) static QUEUE: Lazy<Queue> = Lazy::new(Queue::new);
async fn send_queue(host: String) {
let url = format!("http://{host}/submit");
let mut lock = QUEUE.queue.lock().await;
if lock.is_empty() {
return;
}
for s in lock.iter_mut() {
let client = reqwest::Client::new();
let res = client.post(&url)
.json(&s.body)
.send();
match res.await {
Ok(_) => {
s.sent = true;
}
Err(e) => {
log::warn!("Error sending stats: {}", e);
s.attempts += 1;
}
}
}
lock.retain(|s| !s.sent);
lock.retain(|s| s.attempts < 200);
}

View File

@ -8,6 +8,7 @@ mod collator;
mod submission;
mod tree;
mod licensing;
mod lts_queue;
use std::time::Duration;
use log::{info, warn};
use lqos_config::EtcLqos;

View File

@ -1,12 +1,12 @@
use std::sync::RwLock;
use lqos_bus::{BusResponse, long_term_stats::StatsHost};
use once_cell::sync::Lazy;
use super::{collator::StatsSubmission, licensing::{get_license_status, LicenseState}};
use super::{collator::StatsSubmission, licensing::{get_license_status, LicenseState}, lts_queue::QUEUE};
pub(crate) static CURRENT_STATS: Lazy<RwLock<Option<StatsSubmission>>> = Lazy::new(|| RwLock::new(None));
pub(crate) async fn new_submission(data: StatsSubmission) {
*CURRENT_STATS.write().unwrap() = Some(data);
*CURRENT_STATS.write().unwrap() = Some(data.clone());
let license = get_license_status().await;
match license {
@ -16,9 +16,8 @@ pub(crate) async fn new_submission(data: StatsSubmission) {
LicenseState::Denied => {
log::error!("Your license is invalid. Please contact support.");
}
LicenseState::Valid{ expiry, stats_host } => {
// TODO: Send to server
println!("Send stats to {stats_host} before {expiry}");
LicenseState::Valid{ stats_host, .. } => {
QUEUE.push(data.into(), &stats_host).await;
}
}
}