Start working on a better layout for widgets that retain local state, and eliminate the need for messy synchronized global variables.

This commit is contained in:
Herbert Wolverson 2024-03-21 12:44:30 -05:00
parent a4aa1f63fa
commit e6a30bc370
8 changed files with 230 additions and 211 deletions

View File

@ -3,30 +3,36 @@
use crate::ui_base::SHOULD_EXIT;
use anyhow::{bail, Result};
use lqos_bus::{BusClient, BusRequest, BusResponse};
use tokio::sync::mpsc::Receiver;
use std::sync::atomic::Ordering;
pub mod cpu_ram;
pub mod throughput;
pub mod top_hosts;
pub mod top_flows;
/// The main loop for the bus.
/// Spawns a separate task to handle the bus communication.
pub async fn bus_loop() {
tokio::spawn(cpu_ram::gather_sysinfo());
main_loop_wrapper().await;
/// Communications with the bus via channels
pub enum BusMessage {
EnableTotalThroughput(std::sync::mpsc::Sender<BusResponse>),
DisableTotalThroughput,
}
async fn main_loop_wrapper() {
let loop_result = main_loop().await;
/// The main loop for the bus.
/// Spawns a separate task to handle the bus communication.
pub async fn bus_loop(rx: Receiver<BusMessage>) {
tokio::spawn(cpu_ram::gather_sysinfo());
main_loop_wrapper(rx).await;
}
async fn main_loop_wrapper(rx: Receiver<BusMessage>) {
let loop_result = main_loop(rx).await;
if let Err(e) = loop_result {
eprintln!("Error in main loop: {}", e);
SHOULD_EXIT.store(true, Ordering::Relaxed);
panic!("Error in main loop: {}", e);
}
}
async fn main_loop() -> Result<()> {
async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
// Collection Settings
let collect_total_throughput = true;
let mut collect_total_throughput = None;
let collect_top_downloaders = true;
let collect_top_flows = true;
@ -36,10 +42,22 @@ async fn main_loop() -> Result<()> {
}
loop {
// See if there are any messages
while let Ok(msg) = rx.try_recv() {
match msg {
BusMessage::EnableTotalThroughput(tx) => {
collect_total_throughput = Some(tx);
}
BusMessage::DisableTotalThroughput => {
collect_total_throughput = None;
}
}
}
// Perform actual bus collection
let mut commands: Vec<BusRequest> = Vec::new();
if collect_total_throughput {
if collect_total_throughput.is_some() {
commands.push(BusRequest::GetCurrentThroughput);
}
if collect_top_downloaders {
@ -52,7 +70,11 @@ async fn main_loop() -> Result<()> {
// Send the requests and process replies
for response in bus_client.request(commands).await? {
match response {
BusResponse::CurrentThroughput { .. } => throughput::throughput(&response).await,
BusResponse::CurrentThroughput { .. } => {
if let Some(tx) = &collect_total_throughput {
tx.send(response).unwrap(); // Ignoring the error, it's ok if the channel closed
}
}
BusResponse::TopDownloaders { .. } => top_hosts::top_n(&response).await,
BusResponse::TopFlows(..) => top_flows::top_flows(&response).await,
_ => {}

View File

@ -1,116 +0,0 @@
use lqos_bus::BusResponse;
use once_cell::sync::Lazy;
use std::sync::Mutex;
pub static THROUGHPUT_RING: Lazy<Mutex<ThroughputRingbuffer>> =
Lazy::new(|| Mutex::new(ThroughputRingbuffer::default()));
const RINGBUFFER_SIZE: usize = 80;
pub static CURRENT_THROUGHPUT: Lazy<Mutex<CurrentThroughput>> =
Lazy::new(|| Mutex::new(CurrentThroughput::default()));
#[derive(Default, Copy, Clone)]
pub struct CurrentThroughput {
pub bits_per_second: (u64, u64),
pub packets_per_second: (u64, u64),
pub shaped_bits_per_second: (u64, u64),
}
pub struct ThroughputRingbuffer {
current_index: usize,
pub ringbuffer: [CurrentThroughput; RINGBUFFER_SIZE],
}
impl ThroughputRingbuffer {
fn push(&mut self, current: CurrentThroughput) {
self.ringbuffer[self.current_index] = current;
self.current_index = (self.current_index + 1) % RINGBUFFER_SIZE;
}
pub fn bits_per_second_vec_up(&self) -> Vec<u64> {
let mut result = Vec::with_capacity(RINGBUFFER_SIZE);
for i in self.current_index..RINGBUFFER_SIZE {
result.push(self.ringbuffer[i].bits_per_second.0);
}
for i in 0..self.current_index {
result.push(self.ringbuffer[i].bits_per_second.0);
}
result
}
pub fn bits_per_second_vec_down(&self) -> Vec<u64> {
let mut result = Vec::with_capacity(RINGBUFFER_SIZE);
for i in self.current_index..RINGBUFFER_SIZE {
result.push(self.ringbuffer[i].bits_per_second.1);
}
for i in 0..self.current_index {
result.push(self.ringbuffer[i].bits_per_second.1);
}
result
}
pub fn shaped_bits_per_second_vec_up(&self) -> Vec<u64> {
let mut result = Vec::with_capacity(RINGBUFFER_SIZE);
for i in self.current_index..RINGBUFFER_SIZE {
result.push(self.ringbuffer[i].shaped_bits_per_second.0);
}
for i in 0..self.current_index {
result.push(self.ringbuffer[i].shaped_bits_per_second.0);
}
result
}
pub fn shaped_bits_per_second_vec_down(&self) -> Vec<u64> {
let mut result = Vec::with_capacity(RINGBUFFER_SIZE);
for i in self.current_index..RINGBUFFER_SIZE {
result.push(self.ringbuffer[i].shaped_bits_per_second.1);
}
for i in 0..self.current_index {
result.push(self.ringbuffer[i].shaped_bits_per_second.1);
}
result
}
}
impl Default for ThroughputRingbuffer {
fn default() -> Self {
let mut ringbuffer = [CurrentThroughput::default(); RINGBUFFER_SIZE];
for i in 0..RINGBUFFER_SIZE {
ringbuffer[i].bits_per_second = (0, 0);
ringbuffer[i].packets_per_second = (0, 0);
ringbuffer[i].shaped_bits_per_second = (0, 0);
}
ThroughputRingbuffer {
current_index: 0,
ringbuffer,
}
}
}
pub async fn throughput(response: &BusResponse) {
if let BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
} = response
{
let mut rb = THROUGHPUT_RING.lock().unwrap();
rb.push(CurrentThroughput {
bits_per_second: *bits_per_second,
packets_per_second: *packets_per_second,
shaped_bits_per_second: *shaped_bits_per_second,
});
let mut current = CURRENT_THROUGHPUT.lock().unwrap();
current.bits_per_second = *bits_per_second;
current.packets_per_second = *packets_per_second;
current.shaped_bits_per_second = *shaped_bits_per_second;
}
}

View File

@ -2,19 +2,23 @@ mod bus;
mod top_level_ui;
mod ui_base;
use anyhow::Result;
use bus::BusMessage;
use ui_base::UiBase;
pub mod widgets;
fn main() -> Result<()> {
// Create an async channel for seinding data into the bus system.
let (tx, rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
// Create a tokio runtime in a single thread
std::thread::spawn(|| {
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { bus::bus_loop().await });
rt.block_on(async move { bus::bus_loop(rx).await });
});
// Initialize the UI
let mut ui = UiBase::new()?;
let mut ui = UiBase::new(tx)?;
ui.event_loop()?;
// Return OK

View File

@ -5,23 +5,28 @@
//! It's designed to be the manager from which specific UI
//! components are managed.
use crate::widgets::*;
use crate::{bus::BusMessage, widgets::*};
use ratatui::prelude::*;
use tokio::sync::mpsc::Sender;
use std::io::Stdout;
pub struct TopUi {
show_cpus: bool,
show_throughput_sparkline: bool,
_bus_sender: Sender<BusMessage>,
sparkline: NetworkSparkline,
main_widget: MainWidget,
}
impl TopUi {
/// Create a new TopUi instance. This will initialize the UI framework.
pub fn new() -> Self {
pub fn new(bus_sender: Sender<BusMessage>) -> Self {
TopUi {
show_cpus: true,
show_throughput_sparkline: true,
show_throughput_sparkline: false,
main_widget: MainWidget::Hosts,
_bus_sender: bus_sender.clone(),
sparkline: NetworkSparkline::new(bus_sender.clone()),
}
}
@ -29,7 +34,14 @@ impl TopUi {
// Handle Mode Switches
match key {
'c' => self.show_cpus = !self.show_cpus,
'n' => self.show_throughput_sparkline = !self.show_throughput_sparkline,
'n' => {
self.show_throughput_sparkline = !self.show_throughput_sparkline;
if self.show_throughput_sparkline {
self.sparkline.enable();
} else {
self.sparkline.disable();
}
}
'h' => self.main_widget = MainWidget::Hosts,
'f' => self.main_widget = MainWidget::Flows,
_ => {}
@ -44,7 +56,7 @@ impl TopUi {
.unwrap();
}
fn top_level_render(&self, frame: &mut Frame) {
fn top_level_render(&mut self, frame: &mut Frame) {
let mut constraints = Vec::new();
let mut next_region = 0;
@ -75,9 +87,9 @@ impl TopUi {
frame.render_widget(cpu_display(), main_layout[cpu_region]);
}
if self.show_throughput_sparkline {
let nspark = NetworkSparkline::new();
let render = nspark.render();
frame.render_widget(render, main_layout[network_spark_region]);
self.sparkline.set_size(main_layout[network_spark_region]);
self.sparkline.tick();
self.sparkline.render_to_frame(frame);
}
// And finally the main panel

View File

@ -2,7 +2,7 @@
//! Upon starting the program, it performs basic initialization.
//! It tracks "drop", so when the program exits, it can perform cleanup.
use crate::top_level_ui::TopUi;
use crate::{bus::BusMessage, top_level_ui::TopUi};
use anyhow::Result;
use crossterm::{
event::{self, Event, KeyCode, KeyEventKind},
@ -10,6 +10,7 @@ use crossterm::{
ExecutableCommand,
};
use ratatui::{backend::CrosstermBackend, Terminal};
use tokio::sync::mpsc::Sender;
use std::{
io::stdout,
sync::atomic::{AtomicBool, Ordering},
@ -23,7 +24,7 @@ pub struct UiBase {
impl UiBase {
/// Create a new UiBase instance. This will initialize the UI framework.
pub fn new() -> Result<Self> {
pub fn new(tx: Sender<BusMessage>) -> Result<Self> {
// Crossterm mode setup
enable_raw_mode()?;
stdout().execute(EnterAlternateScreen)?;
@ -45,7 +46,7 @@ impl UiBase {
// Return
Ok(UiBase {
ui: TopUi::new(),
ui: TopUi::new(tx.clone()),
})
}

View File

@ -1,7 +1,9 @@
mod stats_ringbuffer;
mod cpu;
pub use cpu::cpu_display;
mod network_sparkline;
pub use network_sparkline::*;
use ratatui::{layout::Rect, Frame};
pub mod top_hosts;
pub mod top_flows;
@ -9,3 +11,20 @@ pub enum MainWidget {
Hosts,
Flows,
}
pub trait TopWidget {
/// When the widget is enabled, this is called to setup the link to the bus
fn enable(&mut self);
/// When the widget is disabled, this is called to allow the widget to cleanup
fn disable(&mut self);
/// Receive the allocated size for the widget from the layout system
fn set_size(&mut self, size: Rect);
/// Perform a tick to update the widget
fn tick(&mut self);
/// Render the widget
fn render_to_frame(&mut self, frame: &mut Frame);
}

View File

@ -1,64 +1,94 @@
use crate::bus::throughput::{CURRENT_THROUGHPUT, THROUGHPUT_RING};
use super::{stats_ringbuffer::StatsRingBuffer, TopWidget};
use crate::bus::BusMessage;
use lqos_bus::BusResponse;
use lqos_utils::packet_scale::scale_bits;
use ratatui::{
prelude::*,
style::{Color, Style},
symbols,
widgets::{Axis, Block, Borders, Chart, Dataset, Widget},
widgets::*,
};
use std::sync::mpsc::{Receiver, Sender};
pub struct NetworkSparkline {
bps_down: Vec<(f64, f64)>,
bps_up: Vec<(f64, f64)>,
shaped_down: Vec<(f64, f64)>,
shaped_up: Vec<(f64, f64)>,
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
rx: Receiver<BusResponse>,
tx: Sender<BusResponse>,
throughput: StatsRingBuffer<CurrentThroughput, 200>,
current_throughput: CurrentThroughput,
render_size: Rect,
}
impl NetworkSparkline {
pub fn new() -> Self {
let raw_data = THROUGHPUT_RING.lock().unwrap().bits_per_second_vec_down();
let bps_down = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val as f64))
.collect();
impl TopWidget for NetworkSparkline {
fn enable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::EnableTotalThroughput(
self.tx.clone(),
))
.unwrap();
}
let raw_data = THROUGHPUT_RING.lock().unwrap().bits_per_second_vec_up();
let bps_up = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, 0.0 - val as f64))
.collect();
fn disable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::DisableTotalThroughput)
.unwrap();
}
let raw_data = THROUGHPUT_RING
.lock()
.unwrap()
.shaped_bits_per_second_vec_down();
let shaped_down = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val as f64))
.collect();
fn set_size(&mut self, _size: Rect) {
self.render_size = _size;
}
let raw_data = THROUGHPUT_RING
.lock()
.unwrap()
.shaped_bits_per_second_vec_up();
let shaped_up = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, 0.0 - val as f64))
.collect();
NetworkSparkline {
bps_down,
bps_up,
shaped_down,
shaped_up,
fn tick(&mut self) {
while let Ok(msg) = self.rx.try_recv() {
if let BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
} = msg
{
self.throughput.push(CurrentThroughput {
bits_per_second,
_packets_per_second: packets_per_second,
shaped_bits_per_second,
});
self.current_throughput = CurrentThroughput {
bits_per_second,
_packets_per_second: packets_per_second,
shaped_bits_per_second,
};
}
}
}
pub fn render(&self) -> impl Widget + '_ {
let (up, down) = CURRENT_THROUGHPUT.lock().unwrap().bits_per_second;
fn render_to_frame(&mut self, frame: &mut Frame) {
let mut raw_data = self.throughput.get_values_in_order();
raw_data.reverse();
let bps_down: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.bits_per_second.1 as f64))
.collect();
let bps_up: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.bits_per_second.0 as f64))
.collect();
let shaped_down: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.1 as f64))
.collect();
let shaped_up: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.0 as f64))
.collect();
let (up, down) = self.current_throughput.bits_per_second;
let title = format!(
" [Throughput (Down: {} Up: {})]",
scale_bits(up),
@ -75,48 +105,36 @@ impl NetworkSparkline {
.name("Throughput")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::Cyan))
.data(&self.bps_down),
.data(&bps_down),
Dataset::default()
.name("Throughput")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::Cyan))
.data(&self.bps_up),
.data(&bps_up),
Dataset::default()
.name("Shaped")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::LightGreen))
.data(&self.shaped_down),
.data(&shaped_down),
Dataset::default()
.name("Shaped")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::LightGreen))
.data(&self.shaped_up),
.data(&shaped_up),
];
let bps_max = self
.bps_down
.iter()
.map(|(_, val)| *val)
.fold(0.0, f64::max);
let bps_max = bps_down.iter().map(|(_, val)| *val).fold(0.0, f64::max);
let bps_min = self.bps_up.iter().map(|(_, val)| *val).fold(0.0, f64::min);
let bps_min = bps_up.iter().map(|(_, val)| *val).fold(0.0, f64::min);
let shaped_max = self
.shaped_down
.iter()
.map(|(_, val)| *val)
.fold(0.0, f64::max);
let shaped_max = shaped_down.iter().map(|(_, val)| *val).fold(0.0, f64::max);
let shaped_min = self
.shaped_up
.iter()
.map(|(_, val)| *val)
.fold(0.0, f64::min);
let shaped_min = shaped_up.iter().map(|(_, val)| *val).fold(0.0, f64::min);
let max = f64::max(bps_max, shaped_max);
let min = f64::min(bps_min, shaped_min);
Chart::new(datasets)
let chart = Chart::new(datasets)
.block(block)
.x_axis(
Axis::default()
@ -128,6 +146,30 @@ impl NetworkSparkline {
Axis::default()
.style(Style::default().fg(Color::Gray))
.bounds([min, max]),
)
);
frame.render_widget(chart, self.render_size);
}
}
impl NetworkSparkline {
pub fn new(bus_link: tokio::sync::mpsc::Sender<BusMessage>) -> Self {
let (tx, rx) = std::sync::mpsc::channel::<BusResponse>();
NetworkSparkline {
bus_link,
rx,
tx,
throughput: StatsRingBuffer::new(),
render_size: Rect::default(),
current_throughput: CurrentThroughput::default(),
}
}
}
#[derive(Default, Copy, Clone)]
struct CurrentThroughput {
pub bits_per_second: (u64, u64),
pub _packets_per_second: (u64, u64),
pub shaped_bits_per_second: (u64, u64),
}

View File

@ -0,0 +1,35 @@
/// Generic RingBuffer Type for storing statistics
pub struct StatsRingBuffer<T, const N: usize> {
buffer: Vec<T>,
index: usize,
}
impl <T: Default + Clone, const N: usize> StatsRingBuffer<T, N> {
/// Creates an empty ringbuffer
pub fn new() -> Self {
Self {
buffer: vec![T::default(); N],
index: 0,
}
}
/// Adds an entry to the ringbuffer
pub fn push(&mut self, value: T) {
self.buffer[self.index] = value;
self.index = (self.index + 1) % N;
}
/// Retrieves a clone of the ringbuffer, in the order the values were added
pub fn get_values_in_order(&self) -> Vec<T> {
let mut result = Vec::with_capacity(N);
for i in self.index..N {
result.push(self.buffer[i].clone());
}
for i in 0..self.index {
result.push(self.buffer[i].clone());
}
result
}
}