From 0e97e6a868f35c38288f6dbdf094d3cd3a3d4570 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Fri, 27 Jan 2023 19:47:19 +0000 Subject: [PATCH] Attempting to resolve issues with lqos_node_manager not seeing statistics, while lqtop still works. 1) Add warning and error logging to lqos_node_manager if any part of the statistics gathering process fails. 2) (Hopefully temporarily) use the non-persistent bus client, again logging any issues. 3) Improve the statistics gathering timer code. --- .../src/tracker/cache_manager.rs | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs index 547c5138..bd66fe8a 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs @@ -4,13 +4,13 @@ use super::cache::*; use anyhow::Result; use lqos_bus::{ - BusRequest, BusResponse, IpStats, BusClient, + BusRequest, BusResponse, IpStats, bus_request, }; use lqos_config::ConfigShapedDevices; use rocket::tokio::{ task::spawn_blocking, }; -use std::{net::IpAddr, time::Duration}; +use std::{net::IpAddr, time::{Duration, Instant}}; /// Once per second, update CPU and RAM usage and ask /// `lqosd` for updated system statistics. @@ -26,8 +26,9 @@ pub async fn update_tracking() { spawn_blocking(|| { let _ = watch_for_shaped_devices_changing(); }); - let mut bus_client = BusClient::new().await.unwrap(); + //let mut bus_client = BusClient::new().await.unwrap(); loop { + let now = Instant::now(); //println!("Updating tracking data"); sys.refresh_cpu(); sys.refresh_memory(); @@ -42,8 +43,19 @@ pub async fn update_tracking() { mem_use[0] = sys.used_memory(); mem_use[1] = sys.total_memory(); } - let _ = get_data_from_server(&mut bus_client).await; // Ignoring errors to keep running - rocket::tokio::time::sleep(Duration::from_secs(1)).await; + let error = get_data_from_server().await; // Ignoring errors to keep running + if let Err(error) = error { + error!("Error in usage update loop: {:?}", error); + } + + let duration = now.elapsed(); + if duration.as_secs_f32() < 1.0 { + let wait_time = Duration::from_secs(1) - duration; + rocket::tokio::time::sleep(wait_time).await; + } else { + warn!("Gathering usage data exceeded one second, waiting for 1 second."); + rocket::tokio::time::sleep(Duration::from_secs(1)).await; + } } } @@ -67,7 +79,7 @@ fn watch_for_shaped_devices_changing() -> Result<()> { /// Requests data from `lqosd` and stores it in local /// caches. -async fn get_data_from_server(bus_client: &mut BusClient) -> Result<()> { +async fn get_data_from_server() -> Result<()> { // Send request to lqosd let requests = vec![ BusRequest::GetCurrentThroughput, @@ -77,7 +89,7 @@ async fn get_data_from_server(bus_client: &mut BusClient) -> Result<()> { BusRequest::AllUnknownIps, ]; - for r in bus_client.request(requests).await?.iter() { + for r in bus_request(requests).await?.iter() { match r { BusResponse::CurrentThroughput { bits_per_second,