mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
Initial work on long-term stats collection
Still very much a work-in-progress. * Adds configuration entries for enabling long-term stats collection (`gather_stats`) and collation period. * Spawns a collection thread (if `gather_stats` is enabled) that collects total traffic and per-host traffic/RTT (with circuit ID) on the same 1s cadence as throughput collection. * Spawns a collator thread that gathers all of the collection summaries generated since the last collation (every `collation_period_seconds` seconds). The gathered stats are divided into min/max/mean average for each circuit. * Collated stats are handed to a submissions system, that is currently a bare-bones "keep last" - this will be extended! * Add a `lqstats` tool that lets you use the bus to see the current long-term stats. This is very barebones, but will provide a tool allowing stat extraction for inclusion in external stats collectors (such as Zabbix), if you aren't using the long-term stats server we're developing.
This commit is contained in:
parent
77ecb8afcd
commit
24588621c6
10
src/rust/Cargo.lock
generated
10
src/rust/Cargo.lock
generated
@ -1566,6 +1566,7 @@ dependencies = [
|
||||
"lqos_sys",
|
||||
"lqos_utils",
|
||||
"nix",
|
||||
"num-traits",
|
||||
"once_cell",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@ -1574,6 +1575,15 @@ dependencies = [
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lqstats"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"lqos_bus",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lqtop"
|
||||
version = "0.1.0"
|
||||
|
@ -26,4 +26,5 @@ members = [
|
||||
"lqos_setup", # A quick CLI setup for first-time users
|
||||
"lqos_anonymous_stats_server", # The server for gathering anonymous usage data.
|
||||
"lqos_heimdall", # Library for managing Heimdall flow watching
|
||||
"lqstats", # A CLI utility for retrieving long-term statistics
|
||||
]
|
||||
|
@ -10,7 +10,7 @@ pub use client::bus_request;
|
||||
use log::error;
|
||||
pub use persistent_client::BusClient;
|
||||
pub use reply::BusReply;
|
||||
pub use request::BusRequest;
|
||||
pub use request::{BusRequest, StatsRequest};
|
||||
pub use response::BusResponse;
|
||||
pub use session::BusSession;
|
||||
use thiserror::Error;
|
||||
|
@ -145,8 +145,18 @@ pub enum BusRequest {
|
||||
/// Give me a libpcap format packet dump (shortened) of the last 10 seconds
|
||||
GetPcapDump(usize),
|
||||
|
||||
/// Request data from the long-term stats system
|
||||
GetLongTermStats(StatsRequest),
|
||||
|
||||
/// If running on Equinix (the `equinix_test` feature is enabled),
|
||||
/// display a "run bandwidht test" link.
|
||||
#[cfg(feature = "equinix_tests")]
|
||||
RequestLqosEquinixTest,
|
||||
}
|
||||
|
||||
/// Specific requests from the long-term stats system
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum StatsRequest {
|
||||
CurrentTotals,
|
||||
AllHosts,
|
||||
}
|
@ -1,7 +1,10 @@
|
||||
use crate::{IpMapping, IpStats, XdpPpingResult, FlowTransport, ip_stats::PacketHeader};
|
||||
use super::QueueStoreTransit;
|
||||
use crate::{
|
||||
ip_stats::PacketHeader, long_term_stats::{StatsTotals, StatsHost}, FlowTransport,
|
||||
IpMapping, IpStats, XdpPpingResult,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::IpAddr;
|
||||
use super::QueueStoreTransit;
|
||||
|
||||
/// A `BusResponse` object represents a single
|
||||
/// reply generated from a `BusRequest`, and batched
|
||||
@ -77,7 +80,7 @@ pub enum BusResponse {
|
||||
NodeNames(Vec<(usize, String)>),
|
||||
|
||||
/// Statistics from lqosd
|
||||
LqosdStats{
|
||||
LqosdStats {
|
||||
/// Number of bus requests handled
|
||||
bus_requests: u64,
|
||||
/// Us to poll hosts
|
||||
@ -92,11 +95,11 @@ pub enum BusResponse {
|
||||
FlowData(Vec<(FlowTransport, Option<FlowTransport>)>),
|
||||
|
||||
/// The index of the new packet collection session
|
||||
PacketCollectionSession{
|
||||
PacketCollectionSession {
|
||||
/// The identifier of the capture session
|
||||
session_id: usize,
|
||||
session_id: usize,
|
||||
/// Number of seconds for which data will be captured
|
||||
countdown: usize
|
||||
countdown: usize,
|
||||
},
|
||||
|
||||
/// Packet header dump
|
||||
@ -104,4 +107,10 @@ pub enum BusResponse {
|
||||
|
||||
/// Pcap format dump
|
||||
PcapDump(Option<String>),
|
||||
|
||||
/// Long-term stats top-level totals
|
||||
LongTermTotals(StatsTotals),
|
||||
|
||||
/// Long-term stats host totals
|
||||
LongTermHosts(Vec<StatsHost>),
|
||||
}
|
||||
|
@ -21,9 +21,12 @@ pub use bus::{
|
||||
bus_request, decode_request, decode_response, encode_request,
|
||||
encode_response, BusClient, BusReply, BusRequest, BusResponse, BusSession,
|
||||
CakeDiffTinTransit, CakeDiffTransit, CakeTransit, QueueStoreTransit,
|
||||
UnixSocketServer, BUS_SOCKET_PATH,
|
||||
UnixSocketServer, BUS_SOCKET_PATH, StatsRequest
|
||||
};
|
||||
pub use tc_handle::TcHandle;
|
||||
|
||||
/// Anonymous Usage Statistics Data Types
|
||||
pub mod anonymous;
|
||||
|
||||
/// Module offering types for long-term stats transit
|
||||
pub mod long_term_stats;
|
||||
|
30
src/rust/lqos_bus/src/long_term_stats.rs
Normal file
30
src/rust/lqos_bus/src/long_term_stats.rs
Normal file
@ -0,0 +1,30 @@
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct StatsSummary {
|
||||
pub min: (u64, u64),
|
||||
pub max: (u64, u64),
|
||||
pub avg: (u64, u64),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct StatsRttSummary {
|
||||
pub min: u32,
|
||||
pub max: u32,
|
||||
pub avg: u32,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct StatsTotals {
|
||||
pub packets: StatsSummary,
|
||||
pub bits: StatsSummary,
|
||||
pub shaped_bits: StatsSummary,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
|
||||
pub struct StatsHost {
|
||||
pub circuit_id: String,
|
||||
pub ip_address: String,
|
||||
pub bits: StatsSummary,
|
||||
pub rtt: StatsRttSummary,
|
||||
}
|
@ -36,6 +36,9 @@ pub struct EtcLqos {
|
||||
/// run. Short times are good, there's a real performance penalty to
|
||||
/// capturing high-throughput streams. Defaults to 10 seconds.
|
||||
pub packet_capture_time: Option<usize>,
|
||||
|
||||
/// Long-term statistics retention settings.
|
||||
pub long_term_stats: Option<LongTermStats>,
|
||||
}
|
||||
|
||||
/// Represents a set of `sysctl` and `ethtool` tweaks that may be
|
||||
@ -129,6 +132,17 @@ pub struct UsageStats {
|
||||
pub anonymous_server: String,
|
||||
}
|
||||
|
||||
/// Long Term Data Retention
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
pub struct LongTermStats {
|
||||
/// Should we store long-term stats at all?
|
||||
pub gather_stats: bool,
|
||||
|
||||
/// How frequently should stats be accumulated into a long-term
|
||||
/// min/max/avg format per tick?
|
||||
pub collation_period_seconds: u32,
|
||||
}
|
||||
|
||||
impl EtcLqos {
|
||||
/// Loads `/etc/lqos.conf`.
|
||||
pub fn load() -> Result<Self, EtcLqosError> {
|
||||
|
@ -26,6 +26,7 @@ log = "0"
|
||||
nix = "0"
|
||||
sysinfo = "0"
|
||||
dashmap = "5"
|
||||
num-traits = "0.2"
|
||||
|
||||
# Support JemAlloc on supported platforms
|
||||
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]
|
||||
|
61
src/rust/lqosd/src/long_term_stats/collation_utils.rs
Normal file
61
src/rust/lqosd/src/long_term_stats/collation_utils.rs
Normal file
@ -0,0 +1,61 @@
|
||||
use num_traits::{Bounded, CheckedDiv, NumCast, Zero};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct MinMaxAvg<T> {
|
||||
pub(crate) min: T,
|
||||
pub(crate) max: T,
|
||||
pub(crate) avg: T,
|
||||
}
|
||||
|
||||
impl<
|
||||
T: Bounded
|
||||
+ Zero
|
||||
+ std::ops::AddAssign<T>
|
||||
+ Copy
|
||||
+ std::cmp::Ord
|
||||
+ CheckedDiv
|
||||
+ NumCast,
|
||||
> MinMaxAvg<T>
|
||||
{
|
||||
pub(crate) fn from_slice(stats: &[T]) -> Self {
|
||||
let mut min = T::max_value();
|
||||
let mut max = T::min_value();
|
||||
let mut avg = T::zero();
|
||||
|
||||
stats.iter().for_each(|n| {
|
||||
avg += *n;
|
||||
min = T::min(min, *n);
|
||||
max = T::max(max, *n);
|
||||
});
|
||||
let len = T::from(stats.len()).unwrap();
|
||||
avg = avg.checked_div(&len).unwrap_or(T::zero());
|
||||
|
||||
Self { max, min, avg }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct MinMaxAvgPair<T> {
|
||||
pub(crate) down: MinMaxAvg<T>,
|
||||
pub(crate) up: MinMaxAvg<T>,
|
||||
}
|
||||
|
||||
impl<
|
||||
T: Bounded
|
||||
+ Zero
|
||||
+ std::ops::AddAssign<T>
|
||||
+ Copy
|
||||
+ std::cmp::Ord
|
||||
+ CheckedDiv
|
||||
+ NumCast,
|
||||
> MinMaxAvgPair<T>
|
||||
{
|
||||
pub(crate) fn from_slice(stats: &[(T, T)]) -> Self {
|
||||
let down: Vec<T> = stats.iter().map(|(down, _up)| *down).collect();
|
||||
let up: Vec<T> = stats.iter().map(|(_down, up)| *up).collect();
|
||||
Self {
|
||||
down: MinMaxAvg::<T>::from_slice(&down),
|
||||
up: MinMaxAvg::<T>::from_slice(&up),
|
||||
}
|
||||
}
|
||||
}
|
143
src/rust/lqosd/src/long_term_stats/collator.rs
Normal file
143
src/rust/lqosd/src/long_term_stats/collator.rs
Normal file
@ -0,0 +1,143 @@
|
||||
use super::{collation_utils::{MinMaxAvg, MinMaxAvgPair}, submission::new_submission};
|
||||
use crate::long_term_stats::data_collector::SESSION_BUFFER;
|
||||
use std::{collections::HashMap, net::IpAddr};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct StatsSubmission {
|
||||
pub(crate) bits_per_second: MinMaxAvgPair<u64>,
|
||||
pub(crate) shaped_bits_per_second: MinMaxAvgPair<u64>,
|
||||
pub(crate) packets_per_second: MinMaxAvgPair<u64>,
|
||||
pub(crate) hosts: Vec<SubmissionHost>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct SubmissionHost {
|
||||
pub(crate) circuit_id: String,
|
||||
pub(crate) ip_address: IpAddr,
|
||||
pub(crate) bits_per_second: MinMaxAvgPair<u64>,
|
||||
pub(crate) median_rtt: MinMaxAvg<u32>,
|
||||
}
|
||||
|
||||
impl From<StatsSubmission> for lqos_bus::long_term_stats::StatsTotals {
|
||||
fn from(value: StatsSubmission) -> Self {
|
||||
Self {
|
||||
bits: value.bits_per_second.into(),
|
||||
shaped_bits: value.shaped_bits_per_second.into(),
|
||||
packets: value.packets_per_second.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MinMaxAvgPair<u64>> for lqos_bus::long_term_stats::StatsSummary {
|
||||
fn from(value: MinMaxAvgPair<u64>) -> Self {
|
||||
Self {
|
||||
min: (value.down.min, value.up.min),
|
||||
max: (value.down.max, value.up.max),
|
||||
avg: (value.down.avg, value.up.avg),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MinMaxAvg<u32>> for lqos_bus::long_term_stats::StatsRttSummary {
|
||||
fn from(value: MinMaxAvg<u32>) -> Self {
|
||||
Self {
|
||||
min: value.min,
|
||||
max: value.max,
|
||||
avg: value.avg,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SubmissionHost> for lqos_bus::long_term_stats::StatsHost {
|
||||
fn from(value: SubmissionHost) -> Self {
|
||||
Self {
|
||||
circuit_id: value.circuit_id.to_string(),
|
||||
ip_address: value.ip_address.to_string(),
|
||||
bits: value.bits_per_second.into(),
|
||||
rtt: value.median_rtt.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Every (n) seconds, collate the accumulated stats buffer
|
||||
/// into a current statistics block (min/max/avg format)
|
||||
/// ready for submission to the stats system.
|
||||
///
|
||||
/// (n) is defined in /etc/lqos.conf in the `collation_period_seconds`
|
||||
/// field of the `[long_term_stats]` section.
|
||||
pub(crate) fn collate_stats() {
|
||||
// Obtain exclusive access to the session
|
||||
let mut writer = SESSION_BUFFER.lock().unwrap();
|
||||
if writer.is_empty() {
|
||||
// Nothing to do - so exit
|
||||
return;
|
||||
}
|
||||
|
||||
// Collate total stats for the period
|
||||
let bps: Vec<(u64, u64)> =
|
||||
writer.iter().map(|e| e.bits_per_second).collect();
|
||||
let pps: Vec<(u64, u64)> =
|
||||
writer.iter().map(|e| e.packets_per_second).collect();
|
||||
let sbps: Vec<(u64, u64)> =
|
||||
writer.iter().map(|e| e.shaped_bits_per_second).collect();
|
||||
let bits_per_second = MinMaxAvgPair::from_slice(&bps);
|
||||
let packets_per_second = MinMaxAvgPair::from_slice(&pps);
|
||||
let shaped_bits_per_second = MinMaxAvgPair::from_slice(&sbps);
|
||||
|
||||
let mut submission = StatsSubmission {
|
||||
bits_per_second,
|
||||
shaped_bits_per_second,
|
||||
packets_per_second,
|
||||
hosts: Vec::new(),
|
||||
};
|
||||
|
||||
// Collate host stats
|
||||
let mut host_accumulator =
|
||||
HashMap::<(&IpAddr, &String), Vec<(u64, u64, f32)>>::new();
|
||||
writer.iter().for_each(|session| {
|
||||
session.hosts.iter().for_each(|host| {
|
||||
if let Some(ha) =
|
||||
host_accumulator.get_mut(&(&host.ip_address, &host.circuit_id))
|
||||
{
|
||||
ha.push((
|
||||
host.bits_per_second.0,
|
||||
host.bits_per_second.1,
|
||||
host.median_rtt,
|
||||
));
|
||||
} else {
|
||||
host_accumulator.insert(
|
||||
(&host.ip_address, &host.circuit_id),
|
||||
vec![(
|
||||
host.bits_per_second.0,
|
||||
host.bits_per_second.1,
|
||||
host.median_rtt,
|
||||
)],
|
||||
);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
for ((ip, circuit), data) in host_accumulator.iter() {
|
||||
let bps: Vec<(u64, u64)> =
|
||||
data.iter().map(|(d, u, _rtt)| (*d, *u)).collect();
|
||||
let bps = MinMaxAvgPair::<u64>::from_slice(&bps);
|
||||
let fps: Vec<u32> =
|
||||
data.iter().map(|(_d, _u, rtt)| (*rtt * 100.0) as u32).collect();
|
||||
let fps = MinMaxAvg::<u32>::from_slice(&fps);
|
||||
submission.hosts.push(SubmissionHost {
|
||||
circuit_id: circuit.to_string(),
|
||||
ip_address: **ip,
|
||||
bits_per_second: bps,
|
||||
median_rtt: fps,
|
||||
});
|
||||
}
|
||||
|
||||
// Remove all gathered stats
|
||||
writer.clear();
|
||||
|
||||
// Drop the lock
|
||||
std::mem::drop(writer);
|
||||
|
||||
// Submit
|
||||
new_submission(submission);
|
||||
}
|
72
src/rust/lqosd/src/long_term_stats/data_collector.rs
Normal file
72
src/rust/lqosd/src/long_term_stats/data_collector.rs
Normal file
@ -0,0 +1,72 @@
|
||||
use crate::throughput_tracker::THROUGHPUT_TRACKER;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::{
|
||||
net::IpAddr,
|
||||
sync::{atomic::AtomicU64, Mutex},
|
||||
};
|
||||
|
||||
static SUBMISSION_COUNTER: AtomicU64 = AtomicU64::new(0);
|
||||
|
||||
pub(crate) struct StatsSession {
|
||||
pub(crate) bits_per_second: (u64, u64),
|
||||
pub(crate) packets_per_second: (u64, u64),
|
||||
pub(crate) shaped_bits_per_second: (u64, u64),
|
||||
pub(crate) hosts: Vec<SessionHost>,
|
||||
}
|
||||
|
||||
pub(crate) struct SessionHost {
|
||||
pub(crate) circuit_id: String,
|
||||
pub(crate) ip_address: IpAddr,
|
||||
pub(crate) bits_per_second: (u64, u64),
|
||||
pub(crate) median_rtt: f32,
|
||||
}
|
||||
|
||||
pub(crate) static SESSION_BUFFER: Lazy<Mutex<Vec<StatsSession>>> =
|
||||
Lazy::new(|| Mutex::new(Vec::new()));
|
||||
|
||||
pub(crate) fn gather_throughput_stats() {
|
||||
let count =
|
||||
SUBMISSION_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
if count < 5 {
|
||||
// Ignore the first few sets of data
|
||||
return;
|
||||
}
|
||||
|
||||
// Gather Global Stats
|
||||
let packets_per_second = (
|
||||
THROUGHPUT_TRACKER
|
||||
.packets_per_second
|
||||
.0
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
THROUGHPUT_TRACKER
|
||||
.packets_per_second
|
||||
.1
|
||||
.load(std::sync::atomic::Ordering::Relaxed),
|
||||
);
|
||||
let bits_per_second = THROUGHPUT_TRACKER.bits_per_second();
|
||||
let shaped_bits_per_second = THROUGHPUT_TRACKER.shaped_bits_per_second();
|
||||
|
||||
let mut session = StatsSession {
|
||||
bits_per_second,
|
||||
shaped_bits_per_second,
|
||||
packets_per_second,
|
||||
hosts: Vec::with_capacity(THROUGHPUT_TRACKER.raw_data.len()),
|
||||
};
|
||||
|
||||
THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
.filter(|t| t.circuit_id.is_some())
|
||||
.for_each(|tp| {
|
||||
let bytes_per_second = tp.bytes_per_second;
|
||||
let bits_per_second = (bytes_per_second.0 * 8, bytes_per_second.1 * 8);
|
||||
session.hosts.push(SessionHost {
|
||||
circuit_id: tp.circuit_id.as_ref().unwrap().clone(),
|
||||
ip_address: tp.key().as_ip(),
|
||||
bits_per_second,
|
||||
median_rtt: tp.median_latency(),
|
||||
});
|
||||
});
|
||||
|
||||
SESSION_BUFFER.lock().unwrap().push(session);
|
||||
}
|
75
src/rust/lqosd/src/long_term_stats/mod.rs
Normal file
75
src/rust/lqosd/src/long_term_stats/mod.rs
Normal file
@ -0,0 +1,75 @@
|
||||
//! Long-term stats. This module *gathers* the statistics and makes them
|
||||
//! available for tools to provide export to other systems - including
|
||||
//! our own.
|
||||
|
||||
mod data_collector;
|
||||
mod collation_utils;
|
||||
mod collator;
|
||||
mod submission;
|
||||
use log::{info, warn};
|
||||
use lqos_config::EtcLqos;
|
||||
use lqos_utils::fdtimer::periodic;
|
||||
use std::{
|
||||
sync::mpsc::{self, Receiver, Sender},
|
||||
thread,
|
||||
};
|
||||
pub(crate) use submission::{get_stats_totals, get_stats_host};
|
||||
|
||||
/// Messages to/from the stats collection thread
|
||||
pub enum StatsMessage {
|
||||
/// Fresh throughput stats have been collected
|
||||
ThroughputReady,
|
||||
/// Request that the stats thread terminate
|
||||
Quit,
|
||||
}
|
||||
|
||||
/// Launch the statistics system
|
||||
pub fn start_long_term_stats() -> Option<Sender<StatsMessage>> {
|
||||
if let Ok(cfg) = EtcLqos::load() {
|
||||
if let Some(cfg) = cfg.long_term_stats {
|
||||
if cfg.gather_stats {
|
||||
start_collating_stats(cfg.collation_period_seconds);
|
||||
return Some(start_collecting_stats());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn start_collecting_stats() -> Sender<StatsMessage> {
|
||||
// Spawn the manager thread, which will wait for message to maintain
|
||||
// sync with the generation of stats.
|
||||
let (tx, rx): (Sender<StatsMessage>, Receiver<StatsMessage>) =
|
||||
mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
info!("Long-term stats gathering thread started");
|
||||
loop {
|
||||
let msg = rx.recv();
|
||||
match msg {
|
||||
Ok(StatsMessage::Quit) => {
|
||||
info!("Exiting the long-term stats thread");
|
||||
break;
|
||||
}
|
||||
Ok(StatsMessage::ThroughputReady) => {
|
||||
data_collector::gather_throughput_stats();
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error in the long-term stats thread message receiver");
|
||||
warn!("{e:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
tx
|
||||
}
|
||||
|
||||
fn start_collating_stats(seconds: u32) {
|
||||
let interval_ms = (seconds * 1000).into();
|
||||
thread::spawn(move || {
|
||||
periodic(
|
||||
interval_ms,
|
||||
"Long-Term Stats Collation",
|
||||
&mut collator::collate_stats
|
||||
);
|
||||
});
|
||||
}
|
30
src/rust/lqosd/src/long_term_stats/submission.rs
Normal file
30
src/rust/lqosd/src/long_term_stats/submission.rs
Normal file
@ -0,0 +1,30 @@
|
||||
use std::sync::RwLock;
|
||||
use lqos_bus::{BusResponse, long_term_stats::StatsHost};
|
||||
use once_cell::sync::Lazy;
|
||||
use super::collator::StatsSubmission;
|
||||
|
||||
pub(crate) static CURRENT_STATS: Lazy<RwLock<Option<StatsSubmission>>> = Lazy::new(|| RwLock::new(None));
|
||||
|
||||
pub(crate) fn new_submission(data: StatsSubmission) {
|
||||
*CURRENT_STATS.write().unwrap() = Some(data);
|
||||
}
|
||||
|
||||
pub fn get_stats_totals() -> BusResponse {
|
||||
let current = CURRENT_STATS.read().unwrap().clone();
|
||||
if let Some(c) = current {
|
||||
BusResponse::LongTermTotals(c.into())
|
||||
} else {
|
||||
BusResponse::Fail("No Data".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_stats_host() -> BusResponse {
|
||||
let current = CURRENT_STATS.read().unwrap();
|
||||
if let Some(c) = &*current {
|
||||
BusResponse::LongTermHosts(
|
||||
c.hosts.iter().cloned().map(|h| std::convert::Into::<StatsHost>::into(h)).collect()
|
||||
)
|
||||
} else {
|
||||
BusResponse::Fail("No Data".to_string())
|
||||
}
|
||||
}
|
@ -8,15 +8,15 @@ mod throughput_tracker;
|
||||
mod anonymous_usage;
|
||||
mod tuning;
|
||||
mod validation;
|
||||
mod long_term_stats;
|
||||
use std::net::IpAddr;
|
||||
|
||||
use crate::{
|
||||
file_lock::FileLock,
|
||||
ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow},
|
||||
};
|
||||
use anyhow::Result;
|
||||
use log::{info, warn};
|
||||
use lqos_bus::{BusRequest, BusResponse, UnixSocketServer};
|
||||
use lqos_bus::{BusRequest, BusResponse, UnixSocketServer, StatsRequest};
|
||||
use lqos_config::LibreQoSConfig;
|
||||
use lqos_heimdall::{n_second_packet_dump, perf_interface::heimdall_handle_events, start_heimdall};
|
||||
use lqos_queue_tracker::{
|
||||
@ -72,6 +72,7 @@ async fn main() -> Result<()> {
|
||||
};
|
||||
|
||||
// Spawn tracking sub-systems
|
||||
let long_term_stats_tx = long_term_stats::start_long_term_stats();
|
||||
join!(
|
||||
start_heimdall(),
|
||||
spawn_queue_structure_monitor(),
|
||||
@ -79,7 +80,7 @@ async fn main() -> Result<()> {
|
||||
shaped_devices_tracker::network_json_watcher(),
|
||||
anonymous_usage::start_anonymous_usage(),
|
||||
);
|
||||
throughput_tracker::spawn_throughput_monitor();
|
||||
throughput_tracker::spawn_throughput_monitor(long_term_stats_tx.clone());
|
||||
spawn_queue_monitor();
|
||||
|
||||
// Handle signals
|
||||
@ -95,6 +96,11 @@ async fn main() -> Result<()> {
|
||||
warn!("This should never happen - terminating on unknown signal")
|
||||
}
|
||||
}
|
||||
if let Some(tx) = long_term_stats_tx {
|
||||
// Deliberately ignoring the error because we're trying to
|
||||
// exit ASAP and don't really care!
|
||||
let _ = tx.send(long_term_stats::StatsMessage::Quit);
|
||||
}
|
||||
std::mem::drop(kernels);
|
||||
UnixSocketServer::signal_cleanup();
|
||||
std::mem::drop(file_lock);
|
||||
@ -214,6 +220,12 @@ fn handle_bus_requests(
|
||||
BusResponse::Fail("Invalid IP".to_string())
|
||||
}
|
||||
}
|
||||
BusRequest::GetLongTermStats(StatsRequest::CurrentTotals) => {
|
||||
long_term_stats::get_stats_totals()
|
||||
}
|
||||
BusRequest::GetLongTermStats(StatsRequest::AllHosts) => {
|
||||
long_term_stats::get_stats_host()
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -1,23 +1,35 @@
|
||||
mod heimdall_data;
|
||||
mod throughput_entry;
|
||||
mod tracking_data;
|
||||
mod heimdall_data;
|
||||
pub use heimdall_data::get_flow_stats;
|
||||
use crate::{
|
||||
shaped_devices_tracker::NETWORK_JSON,
|
||||
throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS,
|
||||
long_term_stats::StatsMessage, shaped_devices_tracker::NETWORK_JSON,
|
||||
stats::TIME_TO_POLL_HOSTS,
|
||||
throughput_tracker::tracking_data::ThroughputTracker,
|
||||
};
|
||||
pub use heimdall_data::get_flow_stats;
|
||||
use log::{info, warn};
|
||||
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
|
||||
use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot, XdpIpAddress};
|
||||
use lqos_utils::{
|
||||
fdtimer::periodic, unix_time::time_since_boot, XdpIpAddress,
|
||||
};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::time::Duration;
|
||||
use std::{sync::mpsc::Sender, time::Duration};
|
||||
|
||||
const RETIRE_AFTER_SECONDS: u64 = 30;
|
||||
|
||||
pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> =
|
||||
Lazy::new(ThroughputTracker::new);
|
||||
|
||||
pub fn spawn_throughput_monitor() {
|
||||
/// Create the throughput monitor thread, and begin polling for
|
||||
/// throughput data every second.
|
||||
///
|
||||
/// ## Arguments
|
||||
///
|
||||
/// * `long_term_stats_tx` - an optional MPSC sender to notify the
|
||||
/// collection thread that there is fresh data.
|
||||
pub fn spawn_throughput_monitor(
|
||||
long_term_stats_tx: Option<Sender<StatsMessage>>,
|
||||
) {
|
||||
info!("Starting the bandwidth monitor thread.");
|
||||
let interval_ms = 1000; // 1 second
|
||||
info!("Bandwidth check period set to {interval_ms} ms.");
|
||||
@ -35,7 +47,14 @@ pub fn spawn_throughput_monitor() {
|
||||
THROUGHPUT_TRACKER.update_totals();
|
||||
THROUGHPUT_TRACKER.next_cycle();
|
||||
let duration_ms = start.elapsed().as_micros();
|
||||
TIME_TO_POLL_HOSTS.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed);
|
||||
TIME_TO_POLL_HOSTS
|
||||
.store(duration_ms as u64, std::sync::atomic::Ordering::Relaxed);
|
||||
if let Some(tx) = &long_term_stats_tx {
|
||||
let result = tx.send(StatsMessage::ThroughputReady);
|
||||
if let Err(e) = result {
|
||||
warn!("Error sending message to stats collection system. {e:?}");
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
@ -74,8 +93,10 @@ type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String);
|
||||
|
||||
pub fn top_n(start: u32, end: u32) -> BusResponse {
|
||||
let mut full_list: Vec<TopList> = {
|
||||
let tp_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER.raw_data
|
||||
let tp_cycle =
|
||||
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
.filter(|v| !v.key().as_ip().is_loopback())
|
||||
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
|
||||
@ -119,8 +140,10 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
|
||||
|
||||
pub fn worst_n(start: u32, end: u32) -> BusResponse {
|
||||
let mut full_list: Vec<TopList> = {
|
||||
let tp_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER.raw_data
|
||||
let tp_cycle =
|
||||
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
.filter(|v| !v.key().as_ip().is_loopback())
|
||||
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
|
||||
@ -164,8 +187,10 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
|
||||
}
|
||||
pub fn best_n(start: u32, end: u32) -> BusResponse {
|
||||
let mut full_list: Vec<TopList> = {
|
||||
let tp_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER.raw_data
|
||||
let tp_cycle =
|
||||
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
.filter(|v| !v.key().as_ip().is_loopback())
|
||||
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
|
||||
@ -210,7 +235,8 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
|
||||
}
|
||||
|
||||
pub fn xdp_pping_compat() -> BusResponse {
|
||||
let raw_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let raw_cycle =
|
||||
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let result = THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
@ -249,7 +275,8 @@ pub fn xdp_pping_compat() -> BusResponse {
|
||||
|
||||
pub fn rtt_histogram() -> BusResponse {
|
||||
let mut result = vec![0; 20];
|
||||
let reader_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
let reader_cycle =
|
||||
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
for data in THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
@ -272,8 +299,10 @@ pub fn rtt_histogram() -> BusResponse {
|
||||
pub fn host_counts() -> BusResponse {
|
||||
let mut total = 0;
|
||||
let mut shaped = 0;
|
||||
let tp_cycle = THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER.raw_data
|
||||
let tp_cycle =
|
||||
THROUGHPUT_TRACKER.cycle.load(std::sync::atomic::Ordering::Relaxed);
|
||||
THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
.filter(|d| retire_check(tp_cycle, d.most_recent_cycle))
|
||||
.for_each(|d| {
|
||||
@ -301,7 +330,8 @@ pub fn all_unknown_ips() -> BusResponse {
|
||||
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos();
|
||||
|
||||
let mut full_list: Vec<FullList> = {
|
||||
THROUGHPUT_TRACKER.raw_data
|
||||
THROUGHPUT_TRACKER
|
||||
.raw_data
|
||||
.iter()
|
||||
.filter(|v| !v.key().as_ip().is_loopback())
|
||||
.filter(|d| d.tc_handle.as_u32() == 0)
|
||||
|
10
src/rust/lqstats/Cargo.toml
Normal file
10
src/rust/lqstats/Cargo.toml
Normal file
@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "lqstats"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "GPL-2.0-only"
|
||||
|
||||
[dependencies]
|
||||
tokio = { version = "1", features = [ "rt", "macros", "net", "io-util", "time" ] }
|
||||
anyhow = "1"
|
||||
lqos_bus = { path = "../lqos_bus" }
|
17
src/rust/lqstats/src/main.rs
Normal file
17
src/rust/lqstats/src/main.rs
Normal file
@ -0,0 +1,17 @@
|
||||
use anyhow::Result;
|
||||
use lqos_bus::{bus_request, BusRequest, BusResponse, StatsRequest};
|
||||
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
pub async fn main() -> Result<()> {
|
||||
for resp in bus_request(vec![BusRequest::GetLongTermStats(StatsRequest::CurrentTotals)]).await? {
|
||||
if let BusResponse::LongTermTotals(stats) = resp {
|
||||
println!("{stats:?}");
|
||||
}
|
||||
}
|
||||
for resp in bus_request(vec![BusRequest::GetLongTermStats(StatsRequest::AllHosts)]).await? {
|
||||
if let BusResponse::LongTermHosts(stats) = resp {
|
||||
println!("{stats:?}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
Loading…
Reference in New Issue
Block a user