Include high traffic watermark in stats

This commit is contained in:
Herbert Wolverson
2023-03-21 17:22:48 +00:00
parent e415d699db
commit 769b9e73bd
9 changed files with 45 additions and 8 deletions

View File

@@ -11,5 +11,5 @@ log = "0"
lqos_bus = { path = "../lqos_bus" }
serde = { version = "1.0", features = ["derive"] }
serde_cbor = "0" # For RFC8949/7409 format C binary objects
sqlite = "0.30"
sqlite = "0"
axum = "0.6"

View File

@@ -32,7 +32,9 @@ CREATE TABLE submissions (
generated_pdn_down INTEGER,
generated_pdn_up INTEGER,
shaped_device_count INTEGER,
net_json_len INTEGER
net_json_len INTEGER,
peak_down INTEGER,
peak_up INTEGER
);
CREATE TABLE nics (
@@ -94,7 +96,7 @@ const INSERT_STATS: &str =
:total_memory, :available_memory, :kernel_version, :distro, :usable_cores,
:cpu_brand, :cpu_vendor, :cpu_frequency, :sqm, :monitor_mode, :capcity_down,
:capacity_up, :genereated_pdn_down, :generated_pdn_up, :shaped_device_count,
:net_json_len
:net_json_len, :peak_down, :peak_up
);";
const INSERT_NIC: &str =
@@ -148,6 +150,8 @@ pub fn insert_stats_dump(stats: &AnonymousUsageV1, ip: &str) -> anyhow::Result<(
(":generated_pdn_up", (stats.generated_pdn_capacity.1 as i64).into()),
(":shaped_device_count", (stats.shaped_device_count as i64).into()),
(":net_json_len", (stats.net_json_len as i64).into()),
(":peak_down", (stats.high_watermark_bps.0 as i64).into()),
(":peak_up", (stats.high_watermark_bps.0 as i64).into()),
])?;
statement.next()?;

View File

@@ -61,6 +61,9 @@ pub struct AnonymousUsageV1 {
/// Number of nodes read from network.json
pub net_json_len: usize,
/// Peak number of bits/s passing through the shaper
pub high_watermark_bps: (u64, u64),
}

View File

@@ -81,5 +81,7 @@ pub enum BusResponse {
bus_requests: u64,
/// Us to poll hosts
time_to_poll_hosts: u64,
/// High traffic watermark
high_watermark: (u64, u64),
}
}

View File

@@ -82,15 +82,17 @@ pub async fn update_lqos_tuning(
pub struct LqosStats {
pub bus_requests_since_start: u64,
pub time_to_poll_hosts_us: u64,
pub high_watermark: (u64, u64),
}
#[get("/api/stats")]
pub async fn stats() -> NoCache<Json<LqosStats>> {
for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts } = msg {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark } = msg {
return NoCache::new(Json(LqosStats {
bus_requests_since_start: bus_requests,
time_to_poll_hosts_us: time_to_poll_hosts
time_to_poll_hosts_us: time_to_poll_hosts,
high_watermark
}));
}
}

View File

@@ -5,7 +5,7 @@ use lqos_bus::anonymous::{AnonymousUsageV1, build_stats};
use lqos_config::{EtcLqos, LibreQoSConfig};
use lqos_sys::libbpf_num_possible_cpus;
use sysinfo::{System, SystemExt, CpuExt};
use crate::shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
const SLOW_START_SECS: u64 = 1;
const INTERVAL_SECS: u64 = 60 * 60 * 24;
@@ -82,6 +82,12 @@ fn anonymous_usage_dump() -> anyhow::Result<()> {
data.shaped_device_count = SHAPED_DEVICES.read().unwrap().devices.len();
data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len();
data.high_watermark_bps = (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
);
send_stats(data, &server);
Ok(())
}

View File

@@ -25,7 +25,7 @@ use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals,
};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP};
use tokio::join;
mod stats;
@@ -182,6 +182,10 @@ fn handle_bus_requests(
BusResponse::LqosdStats {
bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed),
time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed),
high_watermark: (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
)
}
}
});

View File

@@ -2,3 +2,5 @@ use std::sync::atomic::AtomicU64;
pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0);
pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0);

View File

@@ -1,5 +1,5 @@
use std::sync::atomic::AtomicU64;
use crate::shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
@@ -220,6 +220,20 @@ impl ThroughputTracker {
Self::add_atomic_tuple(&self.shaped_bytes_per_second, (bytes_down, bytes_up));
}
});
let current = self.bits_per_second();
if current.0 < 100000000000 && current.1 < 100000000000 {
let prev_max = (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
);
if current.0 > prev_max.0 {
HIGH_WATERMARK_DOWN.store(current.0, std::sync::atomic::Ordering::Relaxed);
}
if current.1 > prev_max.1 {
HIGH_WATERMARK_UP.store(current.1, std::sync::atomic::Ordering::Relaxed);
}
}
}
pub(crate) fn next_cycle(&self) {