Merge pull request #319 from LibreQoE/issue_317_five_minute_reset

Last 5 minutes of throughput data are stored server-side on node_manager
This commit is contained in:
Robert Chacón 2023-03-30 18:26:16 -06:00 committed by GitHub
commit d253e1f518
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 119 additions and 24 deletions

View File

@ -22,13 +22,17 @@ static GLOBAL: Jemalloc = Jemalloc;
#[launch]
fn rocket() -> _ {
//tracker::SHAPED_DEVICES.read().write_csv("ShapedDeviceWriteTest.csv").unwrap();
let server = rocket::build()
.attach(AdHoc::on_liftoff("Poll lqosd", |_| {
Box::pin(async move {
rocket::tokio::spawn(tracker::update_tracking());
})
}))
.attach(AdHoc::on_liftoff("Poll throughput", |_| {
Box::pin(async move {
rocket::tokio::spawn(tracker::update_total_throughput_buffer());
})
}))
.register("/", catchers![static_pages::login])
.mount(
"/",
@ -47,6 +51,7 @@ fn rocket() -> _ {
static_pages::klingon,
// API calls
tracker::current_throughput,
tracker::throughput_ring_buffer,
tracker::cpu_usage,
tracker::ram_usage,
tracker::top_10_downloaders,

View File

@ -4,6 +4,8 @@
mod cpu_ram;
mod shaped_devices;
mod throughput;
pub use cpu_ram::*;
pub use shaped_devices::*;
pub use throughput::THROUGHPUT_BUFFER;

View File

@ -0,0 +1,60 @@
use crate::tracker::ThroughputPerSecond;
use lqos_bus::{bus_request, BusRequest, BusResponse};
use once_cell::sync::Lazy;
use rocket::tokio::sync::RwLock;
pub static THROUGHPUT_BUFFER: Lazy<RwLock<TotalThroughput>> =
Lazy::new(|| RwLock::new(TotalThroughput::new()));
/// Maintains an in-memory ringbuffer of the last 5 minutes of
/// throughput data.
pub struct TotalThroughput {
data: Vec<ThroughputPerSecond>,
head: usize,
prev_head: usize,
}
impl TotalThroughput {
/// Create a new throughput ringbuffer system
pub fn new() -> Self {
Self {
data: vec![ThroughputPerSecond::default(); 300],
head: 0,
prev_head: 0,
}
}
/// Run once per second to update the ringbuffer with current data
pub async fn tick(&mut self) {
if let Ok(messages) =
bus_request(vec![BusRequest::GetCurrentThroughput]).await
{
for msg in messages {
if let BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
} = msg
{
self.data[self.head].bits_per_second = bits_per_second;
self.data[self.head].packets_per_second = packets_per_second;
self.data[self.head].shaped_bits_per_second = shaped_bits_per_second;
self.prev_head = self.head;
self.head += 1;
self.head %= 300;
}
}
}
}
/// Retrieve just the current throughput data (1 tick)
pub fn current(&self) -> ThroughputPerSecond {
self.data[self.prev_head]
}
/// Retrieve the head (0-299) and the full current throughput
/// buffer. Used to populate the dashboard the first time.
pub fn copy(&self) -> (usize, Vec<ThroughputPerSecond>) {
(self.head, self.data.clone())
}
}

View File

@ -9,8 +9,8 @@ use nix::sys::{
time::{TimeSpec, TimeValLike},
timerfd::{ClockId, Expiration, TimerFd, TimerFlags, TimerSetTimeFlags},
};
use rocket::tokio::task::spawn_blocking;
use std::{sync::atomic::AtomicBool};
use rocket::tokio::{task::spawn_blocking, time::Instant};
use std::{sync::atomic::AtomicBool, time::Duration};
/// Once per second, update CPU and RAM usage and ask
/// `lqosd` for updated system statistics.
@ -118,3 +118,16 @@ fn watch_for_shaped_devices_changing() -> Result<()> {
info!("ShapedDevices watcher returned: {result:?}");
}
}
/// Fires once per second and updates the global traffic ringbuffer.
pub async fn update_total_throughput_buffer() {
loop {
let now = Instant::now();
let mut lock = THROUGHPUT_BUFFER.write().await;
lock.tick().await;
let wait_time = Duration::from_secs(1) - now.elapsed();
if wait_time.as_micros() > 0 {
rocket::tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}

View File

@ -3,11 +3,11 @@ mod cache_manager;
use std::net::IpAddr;
use self::cache::{
CPU_USAGE, NUM_CPUS, RAM_USED, TOTAL_RAM,
CPU_USAGE, NUM_CPUS, RAM_USED, TOTAL_RAM, THROUGHPUT_BUFFER,
};
use crate::{auth_guard::AuthGuard, cache_control::NoCache};
pub use cache::SHAPED_DEVICES;
pub use cache_manager::update_tracking;
pub use cache_manager::{update_tracking, update_total_throughput_buffer};
use lqos_bus::{bus_request, BusRequest, BusResponse, IpStats, TcHandle};
use rocket::serde::{Deserialize, Serialize, msgpack::MsgPack};
@ -70,23 +70,15 @@ pub struct ThroughputPerSecond {
pub async fn current_throughput(
_auth: AuthGuard,
) -> NoCache<MsgPack<ThroughputPerSecond>> {
let mut result = ThroughputPerSecond::default();
if let Ok(messages) =
bus_request(vec![BusRequest::GetCurrentThroughput]).await
{
for msg in messages {
if let BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
} = msg
{
result.bits_per_second = bits_per_second;
result.packets_per_second = packets_per_second;
result.shaped_bits_per_second = shaped_bits_per_second;
}
}
}
let result = THROUGHPUT_BUFFER.read().await.current();
NoCache::new(MsgPack(result))
}
#[get("/api/throughput_ring_buffer")]
pub async fn throughput_ring_buffer(
_auth: AuthGuard,
) -> NoCache<MsgPack<(usize, Vec<ThroughputPerSecond>)>> {
let result = THROUGHPUT_BUFFER.read().await.copy();
NoCache::new(MsgPack(result))
}

View File

@ -168,6 +168,29 @@
<script>
var throughput = new MultiRingBuffer(300);
// Loads the complete ringbuffer for initial display
function fillCurrentThroughput() {
msgPackGet("/api/throughput_ring_buffer", (tp) => {
console.log(tp);
const bits = 0;
const packets = 1;
const shaped = 2;
let head = tp[0];
for (let i=head; i<300; ++i) {
throughput.push("pps", tp[1][i][packets][0], tp[1][i][packets][1]);
throughput.push("total", tp[1][i][bits][0], tp[1][i][bits][1]);
throughput.push("shaped", tp[1][i][shaped][0], tp[1][i][shaped][1]);
}
for (let i=0; i<head; ++i) {
throughput.push("pps", tp[1][i][packets][0], tp[1][i][packets][1]);
throughput.push("total", tp[1][i][bits][0], tp[1][i][bits][1]);
throughput.push("shaped", tp[1][i][shaped][0], tp[1][i][shaped][1]);
}
throughput.plotTotalThroughput("tpGraph");
});
}
function updateCurrentThroughput() {
msgPackGet("/api/current_throughput", (tp) => {
const bits = 0;
@ -178,7 +201,7 @@
$("#bpsDown").text(scaleNumber(tp[bits][0]));
$("#bpsUp").text(scaleNumber(tp[bits][1]));
throughput.push("pps", tp[1][0], tp[packets][1]);
throughput.push("pps", tp[packets][0], tp[packets][1]);
throughput.push("total", tp[bits][0], tp[bits][1]);
throughput.push("shaped", tp[shaped][0], tp[shaped][1]);
throughput.plotTotalThroughput("tpGraph");
@ -319,7 +342,7 @@
}
colorReloadButton();
updateCurrentThroughput();
fillCurrentThroughput();
updateCpu();
updateRam();
updateTop10();