ISSUEE #486 - Replace THROUGHPUT_BUFFER's external lock that required async with interior mutability regular lock that doesn't. This eliminates the possibility of cross-task locking issues leading to a deadlock, and reduces the surface area of the lock period also. Also replace RwLock with Mutex, the simplicity is usually a net gain. This appears to have resolved the issue for me.

This commit is contained in:
Herbert Wolverson 2024-05-22 09:10:13 -05:00
parent a326dce33f
commit 667fec63e9
3 changed files with 29 additions and 19 deletions

View File

@ -1,14 +1,19 @@
use std::sync::Mutex;
use crate::tracker::ThroughputPerSecond;
use lqos_bus::{bus_request, BusRequest, BusResponse};
use once_cell::sync::Lazy;
use rocket::tokio::sync::RwLock;
pub static THROUGHPUT_BUFFER: Lazy<RwLock<TotalThroughput>> =
Lazy::new(|| RwLock::new(TotalThroughput::new()));
pub static THROUGHPUT_BUFFER: Lazy<TotalThroughput> =
Lazy::new(|| TotalThroughput::new());
/// Maintains an in-memory ringbuffer of the last 5 minutes of
/// throughput data.
pub struct TotalThroughput {
inner: Mutex<TotalThroughputInner>
}
struct TotalThroughputInner {
data: Vec<ThroughputPerSecond>,
head: usize,
prev_head: usize,
@ -18,14 +23,16 @@ impl TotalThroughput {
/// Create a new throughput ringbuffer system
pub fn new() -> Self {
Self {
data: vec![ThroughputPerSecond::default(); 300],
head: 0,
prev_head: 0,
inner: Mutex::new(TotalThroughputInner {
data: vec![ThroughputPerSecond::default(); 300],
head: 0,
prev_head: 0,
}),
}
}
/// Run once per second to update the ringbuffer with current data
pub async fn tick(&mut self) {
pub async fn tick(&self) {
if let Ok(messages) =
bus_request(vec![BusRequest::GetCurrentThroughput]).await
{
@ -36,12 +43,14 @@ impl TotalThroughput {
shaped_bits_per_second,
} = msg
{
self.data[self.head].bits_per_second = bits_per_second;
self.data[self.head].packets_per_second = packets_per_second;
self.data[self.head].shaped_bits_per_second = shaped_bits_per_second;
self.prev_head = self.head;
self.head += 1;
self.head %= 300;
let mut lock = self.inner.lock().unwrap();
let head = lock.head;
lock.data[head].bits_per_second = bits_per_second;
lock.data[head].packets_per_second = packets_per_second;
lock.data[head].shaped_bits_per_second = shaped_bits_per_second;
lock.prev_head = lock.head;
lock.head += 1;
lock.head %= 300;
}
}
}
@ -49,12 +58,14 @@ impl TotalThroughput {
/// Retrieve just the current throughput data (1 tick)
pub fn current(&self) -> ThroughputPerSecond {
self.data[self.prev_head]
let lock = self.inner.lock().unwrap();
lock.data[lock.prev_head]
}
/// Retrieve the head (0-299) and the full current throughput
/// buffer. Used to populate the dashboard the first time.
pub fn copy(&self) -> (usize, Vec<ThroughputPerSecond>) {
(self.head, self.data.clone())
let lock = self.inner.lock().unwrap();
(lock.head, lock.data.clone())
}
}

View File

@ -121,8 +121,7 @@ fn watch_for_shaped_devices_changing() -> Result<()> {
pub async fn update_total_throughput_buffer() {
loop {
let now = Instant::now();
let mut lock = THROUGHPUT_BUFFER.write().await;
lock.tick().await;
THROUGHPUT_BUFFER.tick().await;
let wait_time = Duration::from_secs(1) - now.elapsed();
if wait_time.as_micros() > 0 {
rocket::tokio::time::sleep(Duration::from_secs(1)).await;

View File

@ -73,7 +73,7 @@ pub struct ThroughputPerSecond {
pub async fn current_throughput(
_auth: AuthGuard,
) -> NoCache<MsgPack<ThroughputPerSecond>> {
let result = THROUGHPUT_BUFFER.read().await.current();
let result = THROUGHPUT_BUFFER.current();
NoCache::new(MsgPack(result))
}
@ -81,7 +81,7 @@ pub async fn current_throughput(
pub async fn throughput_ring_buffer(
_auth: AuthGuard,
) -> NoCache<MsgPack<(usize, Vec<ThroughputPerSecond>)>> {
let result = THROUGHPUT_BUFFER.read().await.copy();
let result = THROUGHPUT_BUFFER.copy();
NoCache::new(MsgPack(result))
}