Merge branch 'anonymouse'

This commit is contained in:
Herbert Wolverson
2023-03-21 17:48:53 +00:00
26 changed files with 1188 additions and 11 deletions

1
.gitignore vendored
View File

@@ -53,6 +53,7 @@ src/liblqos_python.so
src/webusers.toml
src/lqusers.toml
src/dist
src/rust/lqos_anonymous_stats_server/anonymous.sqlite
# Ignore Rust build artifacts
src/rust/target

View File

@@ -5,6 +5,10 @@
lqos_directory = '/opt/libreqos/src'
queue_check_period_ms = 1000
[usage_stats]
send_anonymous = true
anonymous_server = "127.0.0.1:9125"
[tuning]
# IRQ balance breaks XDP_Redirect, which we use. Recommended to leave as true.
stop_irq_balance = true

191
src/rust/Cargo.lock generated
View File

@@ -152,6 +152,55 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13d8068b6ccb8b34db9de397c7043f91db8b4c66414952c6db944f238c4d3db3"
dependencies = [
"async-trait",
"axum-core",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2f958c80c248b34b9a877a643811be8dbca03ca5ba827f2b63baf3a81e5fc4e"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "base64"
version = "0.20.0"
@@ -784,6 +833,15 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.26"
@@ -1322,6 +1380,21 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "lqos_anonymous_stats_server"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"env_logger",
"log",
"lqos_bus",
"serde",
"serde_cbor",
"sqlite",
"tokio",
]
[[package]]
name = "lqos_bus"
version = "0.1.0"
@@ -1333,6 +1406,7 @@ dependencies = [
"lqos_utils",
"nix",
"serde",
"serde_cbor",
"thiserror",
"tokio",
]
@@ -1413,6 +1487,7 @@ version = "0.1.0"
dependencies = [
"colored",
"default-net",
"uuid",
]
[[package]]
@@ -1493,6 +1568,12 @@ dependencies = [
"regex-automata",
]
[[package]]
name = "matchit"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]]
name = "memalloc"
version = "0.1.0"
@@ -1748,6 +1829,26 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@@ -2216,6 +2317,16 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_cbor"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2bef2ebfde456fb76bbcf9f59315333decc4fda0b2b44b420243c11e0f5ec1f5"
dependencies = [
"half",
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.153"
@@ -2238,6 +2349,15 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db0969fff533976baadd92e08b1d102c5a3d8a8049eadfd69d4d1e3c5b2ed189"
dependencies = [
"serde",
]
[[package]]
name = "serde_spanned"
version = "0.6.1"
@@ -2247,6 +2367,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "sha2"
version = "0.10.6"
@@ -2334,6 +2466,36 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dccf47db1b41fa1573ed27ccf5e08e3ca771cb994f776668c5ebda893b248fc"
[[package]]
name = "sqlite"
version = "0.30.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b1908664131c21a38e5b531344d52a196ec338af5bf44f7fa2c83d539e9561d"
dependencies = [
"libc",
"sqlite3-sys",
]
[[package]]
name = "sqlite3-src"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1815a7a02c996eb8e5c64f61fcb6fd9b12e593ce265c512c5853b2513635691"
dependencies = [
"cc",
"pkg-config",
]
[[package]]
name = "sqlite3-sys"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d47c99824fc55360ba00caf28de0b8a0458369b832e016a64c13af0ad9fbb9ee"
dependencies = [
"libc",
"sqlite3-src",
]
[[package]]
name = "stable-pattern"
version = "0.1.0"
@@ -2381,6 +2543,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sysinfo"
version = "0.28.2"
@@ -2617,6 +2785,28 @@ dependencies = [
"winnow",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-layer"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]]
name = "tower-service"
version = "0.3.2"
@@ -2630,6 +2820,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",

View File

@@ -23,4 +23,5 @@ members = [
"lqusers", # CLI control for managing the web user list
"lqos_utils", # A collection of macros and helpers we find useful
"lqos_setup", # A quick CLI setup for first-time users
"lqos_anonymous_stats_server", # The server for gathering anonymous usage data.
]

View File

@@ -0,0 +1,15 @@
[package]
name = "lqos_anonymous_stats_server"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.25.0", features = ["full"] }
anyhow = "1"
env_logger = "0"
log = "0"
lqos_bus = { path = "../lqos_bus" }
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0" # For RFC8949/7409 format C binary objects
sqlite = "0"
axum = "0.6"

View File

@@ -0,0 +1,296 @@
use std::{path::Path, sync::atomic::AtomicI64, time::SystemTime};
use lqos_bus::anonymous::AnonymousUsageV1;
use sqlite::{Value, State};
const DBPATH: &str = "anonymous.sqlite";
const SETUP_QUERY: &str =
"CREATE TABLE nodes (
node_id TEXT PRIMARY KEY,
last_seen INTEGER NOT NULL
);
CREATE TABLE submissions (
id INTEGER PRIMARY KEY,
date INTEGER,
node_id TEXT,
ip_address TEXT,
git_hash TEXT,
using_xdp_bridge INTEGER,
on_a_stick INTEGER,
total_memory INTEGER,
available_memory INTEGER,
kernel_version TEXT,
distro TEXT,
usable_cores INTEGER,
cpu_brand TEXT,
cpu_vendor TEXT,
cpu_frequency INTEGER,
sqm TEXT,
monitor_mode INTEGER,
capacity_down INTEGER,
capacity_up INTEGER,
generated_pdn_down INTEGER,
generated_pdn_up INTEGER,
shaped_device_count INTEGER,
net_json_len INTEGER,
peak_down INTEGER,
peak_up INTEGER
);
CREATE TABLE nics (
id INTEGER PRIMARY KEY,
parent INTEGER,
description TEXT,
product TEXT,
vendor TEXT,
clock TEXT,
capacity TEXT,
FOREIGN KEY(parent)
REFERENCES submissions (id)
ON DELETE CASCADE
ON UPDATE NO ACTION
);
";
static SUBMISSION_ID: AtomicI64 = AtomicI64::new(0);
pub fn create_if_not_exist() {
let path = Path::new(DBPATH);
if !path.exists() {
if let Ok(cn) = sqlite::open(DBPATH) {
let result = cn.execute(SETUP_QUERY);
if let Err(e) = result {
log::error!("{e:?}");
panic!("Failed to create database");
}
} else {
panic!("Unable to create database");
}
}
}
pub fn check_id() {
log::info!("Checking primary keys");
if let Ok(cn) = sqlite::open(DBPATH) {
cn.iterate("SELECT MAX(id) as id FROM submissions;", |pairs| {
for &(name, value) in pairs.iter() {
if name == "id" {
if let Some(val) = value {
if let Ok(n) = val.parse::<i64>() {
log::info!("Last id: {n}");
SUBMISSION_ID.store(n+1, std::sync::atomic::Ordering::Relaxed);
}
}
}
}
true
}).unwrap();
} else {
panic!("Unable to connect to database");
}
}
const INSERT_STATS: &str =
"INSERT INTO submissions VALUES (
:id, :date, :node_id, :ip_address, :git_hash, :using_xdp_bridge, :on_a_stick,
:total_memory, :available_memory, :kernel_version, :distro, :usable_cores,
:cpu_brand, :cpu_vendor, :cpu_frequency, :sqm, :monitor_mode, :capcity_down,
:capacity_up, :genereated_pdn_down, :generated_pdn_up, :shaped_device_count,
:net_json_len, :peak_down, :peak_up
);";
const INSERT_NIC: &str =
"INSERT INTO nics
(parent, description, product, vendor, clock, capacity)
VALUES (
:parent, :description, :product, :vendor, :clock, :capacity
);";
fn bool_to_n(x: bool) -> i64 {
if x {
1
} else {
0
}
}
fn get_sys_time_in_secs() -> u64 {
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => panic!("SystemTime before UNIX EPOCH!"),
}
}
pub fn insert_stats_dump(stats: &AnonymousUsageV1, ip: &str) -> anyhow::Result<()> {
let date = get_sys_time_in_secs() as i64;
let new_id = SUBMISSION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let cn = sqlite::open(DBPATH)?;
let mut statement = cn.prepare(INSERT_STATS)?;
statement.bind_iter::<_, (_, Value)>([
(":id", new_id.into()),
(":date", date.into()),
(":node_id", stats.node_id.clone().into()),
(":ip_address", ip.into()),
(":git_hash", stats.git_hash.clone().into()),
(":using_xdp_bridge", bool_to_n(stats.using_xdp_bridge).into()),
(":on_a_stick", bool_to_n(stats.on_a_stick).into()),
(":total_memory", (stats.total_memory as i64).into()),
(":available_memory", (stats.available_memory as i64).into()),
(":kernel_version", stats.kernel_version.clone().into()),
(":distro", stats.distro.clone().into()),
(":usable_cores", (stats.usable_cores as i64).into()),
(":cpu_brand", stats.cpu_brand.clone().into()),
(":cpu_vendor", stats.cpu_vendor.clone().into()),
(":cpu_frequency", (stats.cpu_frequency as i64).into()),
(":sqm", stats.sqm.clone().into()),
(":monitor_mode", bool_to_n(stats.monitor_mode).into()),
(":capcity_down", (stats.total_capacity.0 as i64).into()),
(":capacity_up", (stats.total_capacity.1 as i64).into()),
(":genereated_pdn_down", (stats.generated_pdn_capacity.0 as i64).into()),
(":generated_pdn_up", (stats.generated_pdn_capacity.1 as i64).into()),
(":shaped_device_count", (stats.shaped_device_count as i64).into()),
(":net_json_len", (stats.net_json_len as i64).into()),
(":peak_down", (stats.high_watermark_bps.0 as i64).into()),
(":peak_up", (stats.high_watermark_bps.0 as i64).into()),
])?;
statement.next()?;
for nic in stats.nics.iter() {
let mut statement = cn.prepare(INSERT_NIC)?;
statement.bind_iter::<_, (_, Value)>([
(":parent", new_id.into()),
(":description", nic.description.clone().into()),
(":product", nic.product.clone().into()),
(":vendor", nic.vendor.clone().into()),
(":clock", nic.clock.clone().into()),
(":capacity", nic.capacity.clone().into()),
])?;
statement.next()?;
}
// Find out if its a new host
let mut found = false;
let mut statement = cn.prepare("SELECT * FROM nodes WHERE node_id=:id")?;
statement.bind_iter::<_, (_, Value)>([
(":id", stats.node_id.clone().into()),
])?;
while let Ok(State::Row) = statement.next() {
found = true;
}
if found {
log::info!("Updating last seen date for {}", stats.node_id);
let mut statement = cn.prepare("UPDATE nodes SET last_seen=:date WHERE node_id=:id")?;
statement.bind_iter::<_, (_, Value)>([
(":id", stats.node_id.clone().into()),
(":date", date.into()),
])?;
statement.next()?;
} else {
log::info!("New host: {}", stats.node_id);
let mut statement = cn.prepare("INSERT INTO nodes (node_id, last_seen) VALUES(:id, :date)")?;
statement.bind_iter::<_, (_, Value)>([
(":id", stats.node_id.clone().into()),
(":date", date.into()),
])?;
statement.next()?;
}
log::info!("Submitted");
Ok(())
}
// Not a great idea, this is for test data
#[allow(dead_code)]
pub fn dump_all_to_string() -> anyhow::Result<String> {
let mut result = String::new();
let cn = sqlite::open(DBPATH)?;
cn.iterate("SELECT * FROM submissions;", |pairs| {
for &(name, value) in pairs.iter() {
result += &format!(";{name}={value:?}");
}
result += "\n";
true
}).unwrap();
Ok(result)
}
pub fn count_unique_node_ids() -> anyhow::Result<u64> {
let mut result = 0;
let cn = sqlite::open(DBPATH)?;
cn.iterate("SELECT COUNT(DISTINCT node_id) AS count FROM nodes;", |pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
}
}
}
true
}).unwrap();
Ok(result)
}
pub fn count_unique_node_ids_this_week() -> anyhow::Result<u64> {
let mut result = 0;
let cn = sqlite::open(DBPATH)?;
let last_week = (get_sys_time_in_secs() - 604800).to_string();
cn.iterate(format!("SELECT COUNT(DISTINCT node_id) AS count FROM nodes WHERE last_seen > {last_week};"), |pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
}
}
}
true
}).unwrap();
Ok(result)
}
pub fn shaped_devices() -> anyhow::Result<u64> {
let mut result = 0;
let cn = sqlite::open(DBPATH)?;
cn.iterate("SELECT SUM(shaped_device_count) AS total FROM (SELECT DISTINCT node_id, shaped_device_count FROM submissions);", |pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
}
}
}
true
}).unwrap();
Ok(result)
}
pub fn net_json_nodes() -> anyhow::Result<u64> {
let mut result = 0;
let cn = sqlite::open(DBPATH)?;
cn.iterate("SELECT SUM(net_json_len) AS total FROM (SELECT DISTINCT node_id, net_json_len FROM submissions);", |pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
}
}
}
true
}).unwrap();
Ok(result)
}
pub fn bandwidth() -> anyhow::Result<u64> {
let mut result = 0;
let cn = sqlite::open(DBPATH)?;
cn.iterate("SELECT SUM(capacity_down) AS total FROM (SELECT DISTINCT node_id, capacity_down FROM submissions);", |pairs| {
for &(_name, value) in pairs.iter() {
if let Some(val) = value {
if let Ok(val) = val.parse::<u64>() {
result = val;
}
}
}
true
}).unwrap();
Ok(result)
}

View File

@@ -0,0 +1,22 @@
mod stats_server;
mod db;
mod webserver;
use tokio::spawn;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Start the logger
env_logger::init_from_env(
env_logger::Env::default()
.filter_or(env_logger::DEFAULT_FILTER_ENV, "warn"),
);
db::create_if_not_exist();
db::check_id();
spawn(webserver::stats_viewer());
let _ = stats_server::gather_stats().await;
Ok(())
}

View File

@@ -0,0 +1,67 @@
use lqos_bus::anonymous::AnonymousUsageV1;
use std::net::SocketAddr;
use tokio::{io::AsyncReadExt, net::TcpListener, spawn};
use crate::db::insert_stats_dump;
pub async fn gather_stats() -> anyhow::Result<()> {
let listener = TcpListener::bind(":::9125").await?;
log::info!("Listening on :::9125");
loop {
let (mut socket, address) = listener.accept().await?;
log::info!("Connection from {address:?}");
spawn(async move {
let mut buf = vec![0; 10240];
if let Ok(n) = socket.read(&mut buf).await {
log::info!("Received {n} bytes from {address:?}");
if let Err(e) = decode(&buf, address).await {
log::error!("Decode error from {address:?}");
log::error!("{e:?}");
}
}
});
}
}
async fn decode(buf: &[u8], address: SocketAddr) -> anyhow::Result<()> {
const U64SIZE: usize = std::mem::size_of::<u64>();
let version_buf = &buf[0..2].try_into()?;
let version = u16::from_be_bytes(*version_buf);
let size_buf = &buf[2..2 + U64SIZE].try_into()?;
let size = u64::from_be_bytes(*size_buf);
log::info!("Received a version {version} payload of serialized size {size} from {address:?}");
match version {
1 => {
let start = 2 + U64SIZE;
let end = start + size as usize;
let payload: Result<AnonymousUsageV1, _> =
serde_cbor::from_slice(&buf[start..end]);
match payload {
Ok(payload) => store_stats_v1(&payload, address).await,
Err(e) => {
log::error!(
"Unable to deserialize statistics sent from {address:?}"
);
log::error!("{e:?}");
Err(anyhow::Error::msg("Deserialize error"))
}
}
}
_ => {
log::error!(
"Unknown version of statistics: {version}, dumped {size} bytes"
);
Err(anyhow::Error::msg("Version error"))
}
}
}
async fn store_stats_v1(
payload: &AnonymousUsageV1,
address: SocketAddr,
) -> anyhow::Result<()> {
insert_stats_dump(payload, &address.to_string())?;
Ok(())
}

View File

@@ -0,0 +1,91 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>LibreQoS Installation Statistics</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-rbsA2VBKQhggwzxH7pPCaAqO46MgnOM80zW1RWuH61DGLwZJEdK2Kadq2F9CUG65" crossorigin="anonymous">
</head>
<body style="margin: 8px;">
<header>
<div class="container py-3">
<div class="d-flex flex-column flex-md-row align-items-center pb-3 mb-4 border-bottom">
<a href="/" class="d-flex align-items-center text-dark text-decoration-none">
<svg xmlns="http://www.w3.org/2000/svg" width="40" height="32" class="me-2" viewBox="0 0 118 94" role="img">
<title>Bootstrap</title>
<path fill-rule="evenodd" clip-rule="evenodd"
d="M24.509 0c-6.733 0-11.715 5.893-11.492 12.284.214 6.14-.064 14.092-2.066 20.577C8.943 39.365 5.547 43.485 0 44.014v5.972c5.547.529 8.943 4.649 10.951 11.153 2.002 6.485 2.28 14.437 2.066 20.577C12.794 88.106 17.776 94 24.51 94H93.5c6.733 0 11.714-5.893 11.491-12.284-.214-6.14.064-14.092 2.066-20.577 2.009-6.504 5.396-10.624 10.943-11.153v-5.972c-5.547-.529-8.934-4.649-10.943-11.153-2.002-6.484-2.28-14.437-2.066-20.577C105.214 5.894 100.233 0 93.5 0H24.508zM80 57.863C80 66.663 73.436 72 62.543 72H44a2 2 0 01-2-2V24a2 2 0 012-2h18.437c9.083 0 15.044 4.92 15.044 12.474 0 5.302-4.01 10.049-9.119 10.88v.277C75.317 46.394 80 51.21 80 57.863zM60.521 28.34H49.948v14.934h8.905c6.884 0 10.68-2.772 10.68-7.727 0-4.643-3.264-7.207-9.012-7.207zM49.948 49.2v16.458H60.91c7.167 0 10.964-2.876 10.964-8.281 0-5.406-3.903-8.178-11.425-8.178H49.948z"
fill="currentColor"></path>
</svg>
<span class="fs-4">LibreQoS Installation Statistics</span>
</a>
<!--
<nav class="d-inline-flex mt-2 mt-md-0 ms-md-auto">
<a class="me-3 py-2 text-dark text-decoration-none" href="#">Features</a>
<a class="me-3 py-2 text-dark text-decoration-none" href="#">Enterprise</a>
<a class="me-3 py-2 text-dark text-decoration-none" href="#">Support</a>
<a class="py-2 text-dark text-decoration-none" href="#">Pricing</a>
</nav>
-->
</div>
<div class="pricing-header p-3 pb-md-4 mx-auto text-center">
<h1 class="display-4 fw-normal">Installation Statistics</h1>
<p class="fs-5 text-muted">LibreQoS is fixing the Internet, one ISP at a time.</p>
</div>
</div>
</div>
</header>
<main>
<div class="row row-cols-1 row-cols-md-3 mb-3 text-center">
<div class="col">
<div class="card mb-4 rounded-3 shadow-sm">
<div class="card-header py-3">
<h4 class="my-0 fw-normal">Deployments</h4>
</div>
<div class="card-body">
$$UNIQUE_HOSTS$$<br />
$$NEW_HOSTS$$
</div>
</div>
</div>
<div class="col">
<div class="card mb-4 rounded-3 shadow-sm">
<div class="card-header py-3">
<h4 class="my-0 fw-normal">Connections Debloated</h4>
</div>
<div class="card-body">
$$TOTAL_SHAPED$$<br />
$$NODES$$<br />
</div>
</div>
</div>
<div class="col">
<div class="card mb-4 rounded-3 shadow-sm">
<div class="card-header py-3">
<h4 class="my-0 fw-normal">Bandwidth</h4>
</div>
<div class="card-body">
$$BW$$
</div>
</div>
</div>
</div>
</div>
</main>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.2.3/dist/js/bootstrap.bundle.min.js"
integrity="sha384-kenU1KFdBIe4zVF0s0G1M5b4hcpxyD9F7jL+jjXkk+Q2h455rYXK/7HAuoJl+0I4"
crossorigin="anonymous"></script>
<script>
</script>
</body>
</html>

View File

@@ -0,0 +1,29 @@
use axum::{routing::get, Router, response::Html};
use crate::db::{count_unique_node_ids, count_unique_node_ids_this_week, shaped_devices, net_json_nodes, bandwidth};
pub async fn stats_viewer() -> anyhow::Result<()> {
let app = Router::new().route("/", get(index_page));
log::info!("Listening for web traffic on 0.0.0.0:3000");
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await?;
Ok(())
}
async fn index_page() -> Html<String> {
let result = include_str!("./index.html");
let unique = count_unique_node_ids().unwrap_or(0);
let new = count_unique_node_ids_this_week().unwrap_or(0);
let total_shaped = shaped_devices().unwrap_or(0);
let net_json_nodes = net_json_nodes().unwrap_or(0);
let bw = bandwidth().unwrap_or(0) as f64 / 1024.0 / 1024.0;
let result = result.replace("$$UNIQUE_HOSTS$$", &format!("<strong>{unique}</strong> Total LQOS Installs"));
let result = result.replace("$$NEW_HOSTS$$", &format!("<strong>{new}</strong> New Installs This Week"));
let result = result.replace("$$TOTAL_SHAPED$$", &format!("<strong>{total_shaped}</strong> Shaped Devices"));
let result = result.replace("$$NODES$$", &format!("<strong>{net_json_nodes}</strong> Network Hierarchy Nodes"));
let result = result.replace("$$BW$$", &format!("<strong>{bw:.2}</strong> Tbits/s Monitored"));
Html(result)
}

View File

@@ -16,6 +16,7 @@ lqos_utils = { path = "../lqos_utils" }
tokio = { version = "1", features = [ "rt", "macros", "net", "io-util", "time" ] }
log = "0"
nix = "0"
serde_cbor = "0" # For RFC8949/7409 format C binary objects
[dev-dependencies]
criterion = { version = "0", features = [ "html_reports", "async_tokio"] }

View File

@@ -0,0 +1,45 @@
mod v1;
use serde::{Serialize, Deserialize};
use thiserror::Error;
pub use v1::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
/// Header for stats submission
pub struct StatsHeader {
/// The version to use (should be 1)
pub version: u16,
/// The number of bytes being submitted following the header
pub size: usize,
}
/// Build an anonymous usage statistics buffer.
/// Transforms `stats` (`AnonymousUsageV1`) into a matching
/// header and payload, in a single buffer ready to send.
pub fn build_stats(stats: &AnonymousUsageV1) -> Result<Vec<u8>, StatsError> {
let mut result = Vec::new();
let payload = serde_cbor::to_vec(stats);
if let Err(e) = payload {
log::warn!("Unable to serialize statistics. Not sending them.");
log::warn!("{e:?}");
return Err(StatsError::SerializeFail);
}
let payload = payload.unwrap();
// Store the version as network order
result.extend( 1u16.to_be_bytes() );
// Store the payload size as network order
result.extend( (payload.len() as u64).to_be_bytes() );
// Store the payload itself
result.extend(payload);
Ok(result)
}
/// Errors for anonymous usage statistics failure
#[derive(Error, Debug)]
pub enum StatsError {
/// Serializing the object failed
#[error("Unable to serialize object")]
SerializeFail
}

View File

@@ -0,0 +1,87 @@
use serde::{Serialize, Deserialize};
#[derive(Default, Debug, Serialize, Deserialize)]
/// Defines data to be submitted if anonymous usage submission is
/// enabled. This is protocol version 1.
pub struct AnonymousUsageV1 {
/// Unique but anonymous node identifier
pub node_id: String,
/// The git hash from which this version was compiled
pub git_hash: String,
/// Are they using the Bifrost bridge?
pub using_xdp_bridge: bool,
/// Is it an "On a stick" config?
pub on_a_stick: bool,
/// Total installed RAM (bytes)
pub total_memory: u64,
/// Total available RAM (bytes)
pub available_memory: u64,
/// Linux Kernel Version
pub kernel_version: String,
/// Linux distro
pub distro: String,
/// Number of "usable" CPU cores, as used by eBPF. This may not
/// be exactly equal to the number of actual cores.
pub usable_cores: u32,
/// CPU brand
pub cpu_brand: String,
/// CPU vendor
pub cpu_vendor: String,
/// CPU frequency
pub cpu_frequency: u64,
/// Installed network cards
pub nics: Vec<NicV1>,
/// SQM setting from the ispConfig.py file
pub sqm: String,
/// Is Monitor-ony mode enabled?
pub monitor_mode: bool,
/// Capacity as specified in ispConfig.py
pub total_capacity: (u32, u32),
/// Generated node capacity from ispConfig.py
pub generated_pdn_capacity: (u32, u32),
/// Number of shaped devices from ShapedDevices.csv
pub shaped_device_count: usize,
/// Number of nodes read from network.json
pub net_json_len: usize,
/// Peak number of bits/s passing through the shaper
pub high_watermark_bps: (u64, u64),
}
/// Description of installed NIC (version 1 data)
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct NicV1 {
/// Description, usually "Ethernet"
pub description: String,
/// Product name as specified by the driver
pub product: String,
/// Vendor as specified by the driver
pub vendor: String,
/// Clock speed, specified by the vendor (may not be accurate)
pub clock: String,
/// NIC possible capacity (as reported by the driver)
pub capacity: String,
}

View File

@@ -81,5 +81,7 @@ pub enum BusResponse {
bus_requests: u64,
/// Us to poll hosts
time_to_poll_hosts: u64,
/// High traffic watermark
high_watermark: (u64, u64),
}
}

View File

@@ -20,3 +20,6 @@ pub use bus::{
UnixSocketServer, BUS_SOCKET_PATH,
};
pub use tc_handle::TcHandle;
/// Anonymous Usage Statistics Data Types
pub mod anonymous;

View File

@@ -1,7 +1,7 @@
//! Manages the `/etc/lqos.conf` file.
use log::error;
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::{fs, path::Path};
use thiserror::Error;
/// Represents the top-level of the `/etc/lqos.conf` file. Serialization
@@ -15,12 +15,21 @@ pub struct EtcLqos {
/// In ms.
pub queue_check_period_ms: u64,
/// If present, provides a unique ID for the node. Used for
/// anonymous stats (to identify nodes without providing an actual
/// identity), and will be used for long-term data retention to
/// disambiguate cluster or multi-head-end nodes.
pub node_id: Option<String>,
/// If present, defines how the Bifrost XDP bridge operates.
pub bridge: Option<BridgeConfig>,
/// If present, defines the values for various `sysctl` and `ethtool`
/// tweaks.
pub tuning: Option<Tunables>,
/// If present, defined anonymous usage stat sending
pub usage_stats: Option<UsageStats>,
}
/// Represents a set of `sysctl` and `ethtool` tweaks that may be
@@ -104,6 +113,16 @@ pub struct BridgeVlan {
pub redirect_to: u32,
}
/// Definitions for anonymous usage submission
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct UsageStats {
/// Are we allowed to send stats at all?
pub send_anonymous: bool,
/// Where do we send them?
pub anonymous_server: String,
}
impl EtcLqos {
/// Loads `/etc/lqos.conf`.
pub fn load() -> Result<Self, EtcLqosError> {
@@ -114,7 +133,10 @@ impl EtcLqos {
if let Ok(raw) = std::fs::read_to_string("/etc/lqos.conf") {
let config_result: Result<Self, toml::de::Error> = toml::from_str(&raw);
match config_result {
Ok(config) => Ok(config),
Ok(mut config) => {
check_config(&mut config);
Ok(config)
}
Err(e) => {
error!("Unable to parse TOML from /etc/lqos.conf");
error!("Full error: {:?}", e);
@@ -126,6 +148,46 @@ impl EtcLqos {
Err(EtcLqosError::CannotReadFile)
}
}
/// Saves changes made to /etc/lqos.conf
/// Copies current configuration into /etc/lqos.conf.backup first
pub fn save(&self) -> Result<(), EtcLqosError> {
let cfg_path = Path::new("/etc/lqos.conf");
let backup_path = Path::new("/etc/lqos.conf.backup");
if let Err(e) = std::fs::copy(cfg_path, backup_path) {
log::error!("Unable to backup /etc/lqos.conf");
log::error!("{e:?}");
return Err(EtcLqosError::BackupFail);
}
let new_cfg = toml::to_string_pretty(&self);
match new_cfg {
Err(e) => {
log::error!("Unable to serialize new /etc/lqos.conf");
log::error!("{e:?}");
return Err(EtcLqosError::SerializeFail);
}
Ok(new_cfg) => {
if let Err(e) = fs::write(cfg_path, new_cfg) {
log::error!("Unable to write to /etc/lqos.conf");
log::error!("{e:?}");
return Err(EtcLqosError::WriteFail);
}
}
}
Ok(())
}
}
fn check_config(cfg: &mut EtcLqos) {
use sha2::digest::Update;
use sha2::Digest;
if cfg.node_id.is_none() {
if let Ok(machine_id) = std::fs::read_to_string("/etc/machine-id") {
let hash = sha2::Sha256::new().chain(machine_id).finalize();
cfg.node_id = Some(format!("{:x}", hash));
}
}
}
#[derive(Error, Debug)]
@@ -138,4 +200,10 @@ pub enum EtcLqosError {
CannotReadFile,
#[error("Unable to parse TOML in /etc/lqos.conf")]
CannotParseToml,
#[error("Unable to backup /etc/lqos.conf to /etc/lqos.conf.backup")]
BackupFail,
#[error("Unable to serialize new configuration")]
SerializeFail,
#[error("Unable to write to /etc/lqos.conf")]
WriteFail,
}

View File

@@ -82,15 +82,17 @@ pub async fn update_lqos_tuning(
pub struct LqosStats {
pub bus_requests_since_start: u64,
pub time_to_poll_hosts_us: u64,
pub high_watermark: (u64, u64),
}
#[get("/api/stats")]
pub async fn stats() -> NoCache<Json<LqosStats>> {
for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts } = msg {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark } = msg {
return NoCache::new(Json(LqosStats {
bus_requests_since_start: bus_requests,
time_to_poll_hosts_us: time_to_poll_hosts
time_to_poll_hosts_us: time_to_poll_hosts,
high_watermark
}));
}
}

View File

@@ -5,4 +5,5 @@ edition = "2021"
[dependencies]
colored = "2"
default-net = "0" # For obtaining an easy-to-use NIC list
default-net = "0" # For obtaining an easy-to-use NIC list
uuid = { version = "1", features = ["v4", "fast-rng" ] }

View File

@@ -1,5 +1,6 @@
use colored::Colorize;
use default_net::{get_interfaces, interface::InterfaceType, Interface};
use uuid::Uuid;
use std::{fs, path::Path, process::Command};
fn get_available_interfaces() -> Vec<Interface> {
@@ -117,6 +118,7 @@ fn get_bandwidth(up: bool) -> u32 {
const ETC_LQOS_CONF: &str = "lqos_directory = '/opt/libreqos/src'
queue_check_period_ms = 1000
node_id = {NODE_ID}
[tuning]
stop_irq_balance = true
@@ -135,11 +137,18 @@ interface_mapping = [
{ name = \"{ISP}\", redirect_to = \"{INTERNET}\", scan_vlans = false }
]
vlan_mapping = []
[usage_stats]
send_anonymous = {ALLOW_ANONYMOUS}
anonymous_server = \"stats.libreqos.io:9125\"
";
fn write_etc_lqos_conf(internet: &str, isp: &str) {
fn write_etc_lqos_conf(internet: &str, isp: &str, allow_anonymous: bool) {
let new_id = Uuid::new_v4().to_string();
let output =
ETC_LQOS_CONF.replace("{INTERNET}", internet).replace("{ISP}", isp);
ETC_LQOS_CONF.replace("{INTERNET}", internet).replace("{ISP}", isp)
.replace("{NODE_ID}", &new_id)
.replace("{ALLOW_ANONYMOUS}", &allow_anonymous.to_string());
fs::write(LQOS_CONF, output).expect("Unable to write file");
}
@@ -198,6 +207,22 @@ fn write_shaped_devices() {
.expect("Unable to write file");
}
fn anonymous() -> bool {
println!("{}", "Help Improve LibreQoS with Anonymous Statistics?".yellow());
println!("{}", "We'd really appreciate it if you'd allow anonymous statistics".green());
println!("{}", "to be sent to our way. They help us focus our develpoment,".green());
println!("{}", "and let us know that you're out there!".green());
loop {
println!("{}", "Reply YES or NO".cyan());
let input = read_line().trim().to_uppercase();
if input == "YES" {
return true;
} else if input == "NO" {
return false;
}
}
}
fn main() {
println!("{:^80}", "LibreQoS 1.4 Setup Assistant".yellow().on_blue());
println!();
@@ -212,8 +237,9 @@ fn main() {
);
get_internet_interface(&interfaces, &mut if_internet);
get_isp_interface(&interfaces, &mut if_isp);
let allow_anonymous = anonymous();
if let (Some(internet), Some(isp)) = (&if_internet, &if_isp) {
write_etc_lqos_conf(internet, isp);
write_etc_lqos_conf(internet, isp, allow_anonymous);
}
}

7
src/rust/lqosd/build.rs Normal file
View File

@@ -0,0 +1,7 @@
use std::process::Command;
fn main() {
// Adds a git commit hash to the program
let output = Command::new("git").args(["rev-parse", "HEAD"]).output().unwrap();
let git_hash = String::from_utf8(output.stdout).unwrap();
println!("cargo:rustc-env=GIT_HASH={}", git_hash);
}

View File

@@ -0,0 +1,70 @@
use std::process::Command;
use lqos_bus::anonymous::NicV1;
#[derive(Default)]
pub(crate) struct Nic {
pub(crate) description: String,
pub(crate) product: String,
pub(crate) vendor: String,
pub(crate) clock: String,
pub(crate) capacity: String,
}
#[allow(clippy::from_over_into)]
impl Into<NicV1> for Nic {
fn into(self) -> NicV1 {
NicV1 {
description: self.description,
product: self.product,
vendor: self.vendor,
clock: self.clock,
capacity: self.capacity,
}
}
}
pub(crate) fn get_nic_info() -> anyhow::Result<Vec<Nic>> {
let mut current_nic = None;
let mut result = Vec::new();
let output = Command::new("/bin/lshw")
.args(["-C", "network"])
.output()?;
let stdout = String::from_utf8(output.stdout)?;
let lines = stdout.split('\n');
for line in lines {
let trimmed = line.trim();
// Starting a new record
if trimmed.starts_with("*-network:") {
if let Some(nic) = current_nic {
result.push(nic);
}
current_nic = Some(Nic::default());
}
if let Some(mut nic) = current_nic.as_mut() {
if let Some(d) = trimmed.strip_prefix("description: ") {
nic.description = d.to_string();
}
if let Some(d) = trimmed.strip_prefix("product: ") {
nic.product = d.to_string();
}
if let Some(d) = trimmed.strip_prefix("vendor: ") {
nic.vendor = d.to_string();
}
if let Some(d) = trimmed.strip_prefix("clock: ") {
nic.clock = d.to_string();
}
if let Some(d) = trimmed.strip_prefix("capacity: ") {
nic.capacity = d.to_string();
}
}
}
if let Some(nic) = current_nic {
result.push(nic);
}
Ok(result)
}

View File

@@ -0,0 +1,117 @@
mod lshw;
mod version;
use std::{time::Duration, net::TcpStream, io::Write};
use lqos_bus::anonymous::{AnonymousUsageV1, build_stats};
use lqos_config::{EtcLqos, LibreQoSConfig};
use lqos_sys::libbpf_num_possible_cpus;
use sysinfo::{System, SystemExt, CpuExt};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
const SLOW_START_SECS: u64 = 1;
const INTERVAL_SECS: u64 = 60 * 60 * 24;
pub async fn start_anonymous_usage() {
if let Ok(cfg) = EtcLqos::load() {
if let Some(usage) = cfg.usage_stats {
if usage.send_anonymous {
std::thread::spawn(|| {
std::thread::sleep(Duration::from_secs(SLOW_START_SECS));
loop {
let _ = anonymous_usage_dump();
std::thread::sleep(Duration::from_secs(INTERVAL_SECS));
}
});
}
}
}
}
fn anonymous_usage_dump() -> anyhow::Result<()> {
let mut data = AnonymousUsageV1::default();
let mut sys = System::new_all();
let mut server = String::new();
sys.refresh_all();
data.total_memory = sys.total_memory();
data.available_memory = sys.available_memory();
if let Some(kernel) = sys.kernel_version() {
data.kernel_version = kernel;
}
data.usable_cores = unsafe { libbpf_num_possible_cpus() } as u32;
let cpu = sys.cpus().first();
if let Some(cpu) = cpu {
data.cpu_brand = cpu.brand().to_string();
data.cpu_vendor = cpu.vendor_id().to_string();
data.cpu_frequency = cpu.frequency();
}
if let Ok(nics) = lshw::get_nic_info() {
for nic in nics {
data.nics.push(nic.into());
}
}
if let Ok(pv) = version::get_proc_version() {
data.distro = pv.trim().to_string();
}
if let Ok(cfg) = LibreQoSConfig::load() {
data.sqm = cfg.sqm;
data.monitor_mode = cfg.monitor_mode;
data.total_capacity = (
cfg.total_download_mbps,
cfg.total_upload_mbps,
);
data.generated_pdn_capacity = (
cfg.generated_download_mbps,
cfg.generated_upload_mbps,
);
data.on_a_stick = cfg.on_a_stick_mode;
}
if let Ok(cfg) = EtcLqos::load() {
if let Some(node_id) = cfg.node_id {
data.node_id = node_id;
if let Some(bridge) = cfg.bridge {
data.using_xdp_bridge = bridge.use_xdp_bridge;
}
}
if let Some(anon) = cfg.usage_stats {
server = anon.anonymous_server;
}
}
data.git_hash = env!("GIT_HASH").to_string();
data.shaped_device_count = SHAPED_DEVICES.read().unwrap().devices.len();
data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len();
data.high_watermark_bps = (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
);
send_stats(data, &server);
Ok(())
}
fn send_stats(data: AnonymousUsageV1, server: &str) {
let buffer = build_stats(&data);
if let Err(e) = buffer {
log::warn!("Unable to serialize stats buffer");
log::warn!("{e:?}");
return;
}
let buffer = buffer.unwrap();
let stream = TcpStream::connect(server);
if let Err(e) = stream {
log::warn!("Unable to connect to {server}");
log::warn!("{e:?}");
return;
}
let mut stream = stream.unwrap();
let result = stream.write(&buffer);
if let Err(e) = result {
log::warn!("Unable to send bytes to {server}");
log::warn!("{e:?}");
}
log::info!("Anonymous usage stats submitted");
}

View File

@@ -0,0 +1,9 @@
use std::process::Command;
pub(crate) fn get_proc_version() -> anyhow::Result<String> {
let output = Command::new("/bin/cat")
.args(["/proc/version"])
.output()?;
let stdout = String::from_utf8(output.stdout)?;
Ok(stdout)
}

View File

@@ -5,6 +5,7 @@ mod lqos_daht_test;
mod program_control;
mod shaped_devices_tracker;
mod throughput_tracker;
mod anonymous_usage;
mod tuning;
mod validation;
use crate::{
@@ -24,7 +25,7 @@ use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals,
};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP};
use tokio::join;
mod stats;
@@ -69,7 +70,8 @@ async fn main() -> Result<()> {
join!(
spawn_queue_structure_monitor(),
shaped_devices_tracker::shaped_devices_watcher(),
shaped_devices_tracker::network_json_watcher()
shaped_devices_tracker::network_json_watcher(),
anonymous_usage::start_anonymous_usage(),
);
throughput_tracker::spawn_throughput_monitor();
spawn_queue_monitor();
@@ -180,6 +182,10 @@ fn handle_bus_requests(
BusResponse::LqosdStats {
bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed),
time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed),
high_watermark: (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
)
}
}
});

View File

@@ -2,3 +2,5 @@ use std::sync::atomic::AtomicU64;
pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0);
pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0);

View File

@@ -1,5 +1,5 @@
use std::sync::atomic::AtomicU64;
use crate::shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
@@ -220,6 +220,20 @@ impl ThroughputTracker {
Self::add_atomic_tuple(&self.shaped_bytes_per_second, (bytes_down, bytes_up));
}
});
let current = self.bits_per_second();
if current.0 < 100000000000 && current.1 < 100000000000 {
let prev_max = (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
);
if current.0 > prev_max.0 {
HIGH_WATERMARK_DOWN.store(current.0, std::sync::atomic::Ordering::Relaxed);
}
if current.1 > prev_max.1 {
HIGH_WATERMARK_UP.store(current.1, std::sync::atomic::Ordering::Relaxed);
}
}
}
pub(crate) fn next_cycle(&self) {