Merge pull request #483 from LibreQoE/lqtop_ratatui

Merge lqtop2 into develop. Extends lqtop to include the new features we display on the dashboard, switches to ratatui. 

Co-authored-by:  Dave Täht (dtaht)
This commit is contained in:
Herbert "TheBracket 2024-05-15 10:55:52 -05:00 committed by GitHub
commit ed403f7869
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 1838 additions and 641 deletions

522
src/rust/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -9,5 +9,8 @@ tokio = { version = "1", features = [ "full" ] }
lqos_bus = { path = "../lqos_bus" }
lqos_utils = { path = "../lqos_utils" }
anyhow = "1"
tui = "0.19"
crossterm = { version = "0", features = [ "serde" ] }
ratatui = "0.26.1"
crossterm = "0.27.0"
ctrlc = "3.4.4"
sysinfo = "0"
once_cell = "1.19.0"

View File

@ -0,0 +1,56 @@
//! Provides a sysinfo link for CPU and RAM tracking
use crate::ui_base::SHOULD_EXIT;
use once_cell::sync::Lazy;
use std::sync::atomic::Ordering;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize};
const MAX_CPUS_COUNTED: usize = 128;
/// Stores overall CPU usage
pub static CPU_USAGE: Lazy<[AtomicU32; MAX_CPUS_COUNTED]> = Lazy::new(build_empty_cpu_list);
/// Total number of CPUs detected
pub static NUM_CPUS: AtomicUsize = AtomicUsize::new(0);
/// Total RAM used (bytes)
pub static RAM_USED: AtomicU64 = AtomicU64::new(0);
/// Total RAM installed (bytes)
pub static TOTAL_RAM: AtomicU64 = AtomicU64::new(0);
pub async fn gather_sysinfo() {
use sysinfo::System;
let mut sys = System::new_all();
loop {
if SHOULD_EXIT.load(Ordering::Relaxed) {
break;
}
// Refresh system info
sys.refresh_cpu();
sys.refresh_memory();
sys.cpus()
.iter()
.enumerate()
.map(|(i, cpu)| (i, cpu.cpu_usage() as u32)) // Always rounds down
.for_each(|(i, cpu)| CPU_USAGE[i].store(cpu, std::sync::atomic::Ordering::Relaxed));
NUM_CPUS.store(sys.cpus().len(), std::sync::atomic::Ordering::Relaxed);
RAM_USED.store(sys.used_memory(), std::sync::atomic::Ordering::Relaxed);
TOTAL_RAM.store(sys.total_memory(), std::sync::atomic::Ordering::Relaxed);
// Sleep
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
fn build_empty_cpu_list() -> [AtomicU32; MAX_CPUS_COUNTED] {
let mut temp = Vec::with_capacity(MAX_CPUS_COUNTED);
for _ in 0..MAX_CPUS_COUNTED {
temp.push(AtomicU32::new(0));
}
temp.try_into()
.expect("This should never happen, sizes are constant.")
}

View File

@ -0,0 +1,132 @@
//! Handles the communication loop with lqosd.
use crate::ui_base::SHOULD_EXIT;
use anyhow::{bail, Result};
use lqos_bus::{BusClient, BusRequest, BusResponse};
use tokio::sync::mpsc::Receiver;
use std::sync::atomic::Ordering;
pub mod cpu_ram;
/// Communications with the bus via channels
pub enum BusMessage {
EnableTotalThroughput(std::sync::mpsc::Sender<BusResponse>),
DisableTotalThroughput,
EnableTopFlows(std::sync::mpsc::Sender<BusResponse>),
DisableTopFlows,
EnableTopHosts(std::sync::mpsc::Sender<BusResponse>),
DisableTopHosts,
EnableLatencyHistogram(std::sync::mpsc::Sender<BusResponse>),
DisableLatencyHistogram,
}
/// The main loop for the bus.
/// Spawns a separate task to handle the bus communication.
pub async fn bus_loop(rx: Receiver<BusMessage>) {
tokio::spawn(cpu_ram::gather_sysinfo());
main_loop_wrapper(rx).await;
}
async fn main_loop_wrapper(rx: Receiver<BusMessage>) {
let loop_result = main_loop(rx).await;
if let Err(e) = loop_result {
SHOULD_EXIT.store(true, Ordering::Relaxed);
panic!("Error in main loop: {}", e);
}
}
async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
// Collection Settings
let mut collect_total_throughput = None;
let mut collect_top_downloaders = None;
let mut collect_top_flows = None;
let mut collect_latency_histogram = None;
let mut bus_client = BusClient::new().await?;
if !bus_client.is_connected() {
bail!("Failed to connect to the bus");
}
loop {
// See if there are any messages
while let Ok(msg) = rx.try_recv() {
match msg {
BusMessage::EnableTotalThroughput(tx) => {
collect_total_throughput = Some(tx);
}
BusMessage::DisableTotalThroughput => {
collect_total_throughput = None;
}
BusMessage::EnableTopFlows(tx) => {
collect_top_flows = Some(tx);
}
BusMessage::DisableTopFlows => {
collect_top_flows = None;
}
BusMessage::EnableTopHosts(tx) => {
collect_top_downloaders = Some(tx);
}
BusMessage::DisableTopHosts => {
collect_top_downloaders = None;
}
BusMessage::EnableLatencyHistogram(tx) => {
collect_latency_histogram = Some(tx);
}
BusMessage::DisableLatencyHistogram => {
collect_latency_histogram = None;
}
}
}
// Perform actual bus collection
let mut commands: Vec<BusRequest> = Vec::new();
if collect_total_throughput.is_some() {
commands.push(BusRequest::GetCurrentThroughput);
}
if collect_top_downloaders.is_some() {
commands.push(BusRequest::GetTopNDownloaders { start: 0, end: 100 });
}
if collect_top_flows.is_some() {
commands.push(BusRequest::TopFlows { flow_type: lqos_bus::TopFlowType::Bytes, n: 100 });
}
if collect_latency_histogram.is_some() {
commands.push(BusRequest::RttHistogram);
}
// Send the requests and process replies
for response in bus_client.request(commands).await? {
match response {
BusResponse::CurrentThroughput { .. } => {
if let Some(tx) = &collect_total_throughput {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
BusResponse::TopDownloaders { .. } => {
if let Some(tx) = &collect_top_downloaders {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
BusResponse::TopFlows(..) => {
if let Some(tx) = &collect_top_flows {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
BusResponse::RttHistogram(..) => {
if let Some(tx) = &collect_latency_histogram {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
_ => {}
}
}
// Check if we should be quitting
if SHOULD_EXIT.load(Ordering::Relaxed) {
break;
}
// Sleep for one tick
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
Ok(())
}

View File

@ -1,447 +1,26 @@
mod bus;
mod top_level_ui;
mod ui_base;
use anyhow::Result;
use crossterm::{
event::{read, Event, KeyCode, KeyEvent, KeyModifiers},
terminal::enable_raw_mode,
};
use lqos_bus::{BusClient, BusRequest, BusResponse, IpStats};
use lqos_utils::packet_scale::{scale_bits, scale_packets};
use std::{io, time::Duration};
use tui::{
backend::CrosstermBackend,
layout::{Alignment, Constraint, Direction, Layout},
style::{Color, Style},
text::{Span, Spans},
widgets::{Block, BorderType, Cell, Paragraph, Row, Table},
Terminal,
};
use bus::BusMessage;
use ui_base::UiBase;
pub mod widgets;
struct DataResult {
totals: (u64, u64, u64, u64),
top: Vec<IpStats>,
}
async fn get_data(client: &mut BusClient, n_rows: u16) -> Result<DataResult> {
let mut result = DataResult { totals: (0, 0, 0, 0), top: Vec::new() };
let requests = vec![
BusRequest::GetCurrentThroughput,
BusRequest::GetTopNDownloaders { start: 0, end: n_rows as u32 },
];
for r in client.request(requests).await? {
match r {
BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second: _,
} => {
let tuple = (
bits_per_second.0,
bits_per_second.1,
packets_per_second.0,
packets_per_second.1,
);
result.totals = tuple;
}
BusResponse::TopDownloaders(top) => {
result.top = top.clone();
}
_ => {}
}
}
Ok(result)
}
fn draw_menu<'a>(is_connected: bool) -> Paragraph<'a> {
let mut text = Spans::from(vec![
Span::styled("Q", Style::default().fg(Color::White)),
Span::from("uit"),
]);
if !is_connected {
text
.0
.push(Span::styled(" NOT CONNECTED ", Style::default().fg(Color::Red)))
} else {
text
.0
.push(Span::styled(" CONNECTED ", Style::default().fg(Color::Green)))
}
let para = Paragraph::new(text)
.style(Style::default().fg(Color::White))
.alignment(Alignment::Center)
.block(
Block::default()
.style(Style::default().fg(Color::Green))
.border_type(BorderType::Plain)
.title("LibreQoS Monitor: "),
);
para
}
fn draw_pps<'a>(
packets_per_second: (u64, u64),
bits_per_second: (u64, u64),
) -> Spans<'a> {
Spans::from(vec![
Span::from("DOWN: "),
Span::from(scale_bits(bits_per_second.0)),
Span::from(" "),
Span::from(scale_bits(bits_per_second.1)),
Span::from(" "),
Span::from("UP: "),
Span::from(scale_packets(packets_per_second.0)),
Span::from(" "),
Span::from(scale_packets(packets_per_second.1)),
])
}
fn draw_top_pane<'a>(
top: &[IpStats],
packets_per_second: (u64, u64),
bits_per_second: (u64, u64),
) -> Table<'a> {
let rows: Vec<Row> = top
.iter()
.map(|stats| {
let color = if stats.bits_per_second.0 < 500 {
Color::DarkGray
} else if stats.tc_handle.as_u32() == 0 {
Color::Cyan
} else {
Color::LightGreen
};
Row::new(vec![
Cell::from(stats.ip_address.clone()),
Cell::from(format!("{:<13}", scale_bits(stats.bits_per_second.0))),
Cell::from(format!("{:<13}", scale_bits(stats.bits_per_second.1))),
Cell::from(format!(
"{:<13}",
scale_packets(stats.packets_per_second.0)
)),
Cell::from(format!(
"{:<13}",
scale_packets(stats.packets_per_second.1)
)),
Cell::from(format!(
"{:<10} ms",
format!("{:.2}", stats.median_tcp_rtt)
)),
Cell::from(format!("{:>7}", stats.tc_handle.to_string())),
])
.style(Style::default().fg(color))
})
.collect();
let header = Row::new(vec![
"Local IP",
"Download",
"Upload",
"Pkts Dn",
"Pkts Up",
"TCP RTT ms",
"Shaper",
])
.style(Style::default().fg(Color::Yellow));
Table::new(rows)
.header(header)
.block(
Block::default().title(draw_pps(packets_per_second, bits_per_second)),
)
.widths(&[
Constraint::Min(42),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(7),
])
}
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<()> {
let mut bus_client = BusClient::new().await?;
if !bus_client.is_connected() {
println!("ERROR: lqosd bus is not available");
std::process::exit(0);
}
let mut packets = (0, 0);
let mut bits = (0, 0);
let mut top = Vec::new();
// Initialize TUI
enable_raw_mode()?;
let stdout = io::stdout();
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
let mut n_rows = 33;
loop {
if let Ok(result) = get_data(&mut bus_client, n_rows).await {
let (bits_down, bits_up, packets_down, packets_up) = result.totals;
packets = (packets_down, packets_up);
bits = (bits_down, bits_up);
top = result.top;
}
//terminal.clear()?;
terminal.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(0)
.constraints(
[Constraint::Min(1), Constraint::Percentage(100)].as_ref(),
)
.split(f.size());
f.render_widget(draw_menu(bus_client.is_connected()), chunks[0]);
// NOTE: this is where the height of the main panel is calculated.
// Resize events are consumed by `tui`, so we never receive them.
n_rows = chunks[1].height;
f.render_widget(draw_top_pane(&top, packets, bits), chunks[1]);
//f.render_widget(bandwidth_chart(datasets.clone(), packets, bits, min, max), chunks[1]);
})?;
if crossterm::event::poll(Duration::from_secs(1)).unwrap() {
match read().unwrap() {
// FIXME - this needs to absorb multiple resize events. Presently,
// When I resize a terminal window, it is not getting one, either.
// How to then change n_rows from here is also on my mind
Event::Resize(width, height) => {
println!("New size = {width}x{height}")
}
Event::Key(KeyEvent {
code: KeyCode::Char('c'),
modifiers: KeyModifiers::CONTROL,
..
}) => break,
Event::Key(KeyEvent {
code: KeyCode::Char('q'),
modifiers: KeyModifiers::NONE,
..
}) => break,
Event::Key(KeyEvent {
code: KeyCode::Char('Z'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // Disconnect from bus, suspend
// Event::Key(KeyEvent { escape should do something I don't know what.
// code: KeyCode::Char('ESC'),
// modifiers: KeyModifiers::CONTROL,}) => break,// go BACK?
//
Event::Key(KeyEvent {
code: KeyCode::Char('h'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into help
Event::Key(KeyEvent {
code: KeyCode::Char('n'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into next
// e.g. n_rows = screen size
// n_start = n_start + screen
// size
Event::Key(KeyEvent {
code: KeyCode::Char('p'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into prev
Event::Key(KeyEvent {
code: KeyCode::Char('?'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into help
Event::Key(KeyEvent {
code: KeyCode::Char('u'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into uploaders
Event::Key(KeyEvent {
code: KeyCode::Char('d'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into downloads
Event::Key(KeyEvent {
code: KeyCode::Char('c'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into cpu
Event::Key(KeyEvent {
code: KeyCode::Char('l'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME lag meter
Event::Key(KeyEvent {
code: KeyCode::Char('N'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into next panel
Event::Key(KeyEvent {
code: KeyCode::Char('P'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into prev panel
Event::Key(KeyEvent {
code: KeyCode::Char('b'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Best
Event::Key(KeyEvent {
code: KeyCode::Char('w'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Worst
Event::Key(KeyEvent {
code: KeyCode::Char('D'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Drops
Event::Key(KeyEvent {
code: KeyCode::Char('Q'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Queues
Event::Key(KeyEvent {
code: KeyCode::Char('W'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME (un)display wider stuff
Event::Key(KeyEvent {
code: KeyCode::Char('8'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter out fe80
Event::Key(KeyEvent {
code: KeyCode::Char('6'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Just look at ipv6
Event::Key(KeyEvent {
code: KeyCode::Char('4'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Just look at ipv4
Event::Key(KeyEvent {
code: KeyCode::Char('5'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME ipv4 + ipv6
Event::Key(KeyEvent {
code: KeyCode::Char('U'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME filter on Unshaped
Event::Key(KeyEvent {
code: KeyCode::Char('M'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME filter on My Network
Event::Key(KeyEvent {
code: KeyCode::Char('H'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Generate histogram
Event::Key(KeyEvent {
code: KeyCode::Char('T'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter Tin. This would require an argument BVIL<RET>
Event::Key(KeyEvent {
code: KeyCode::Char('O'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME "Odd" events - multicast, AI-assistance, people down?
Event::Key(KeyEvent {
code: KeyCode::Char('F'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter on "something*
Event::Key(KeyEvent {
code: KeyCode::Char('S'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter on Plan Speed
Event::Key(KeyEvent {
code: KeyCode::Char('z'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Zoom in
Event::Key(KeyEvent {
code: KeyCode::Char('Z'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Zoom out
// Now I am Dreaming
Event::Key(KeyEvent {
code: KeyCode::Char('C'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Capture what I am filtering on
Event::Key(KeyEvent {
code: KeyCode::Char('F'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // FIXME Freeze what I am filtering on
Event::Key(KeyEvent {
code: KeyCode::Char('S'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // FIXME Step through what I captured on
Event::Key(KeyEvent {
code: KeyCode::Char('R'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // FIXME Step backwards what I captured on
// Left and right cursors also
// Dreaming Less now
// Use TAB for autocompletion
// If I have moved into a panel, the following are ideas
Event::Key(KeyEvent {
code: KeyCode::Char('/'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Search for ip
Event::Key(KeyEvent {
code: KeyCode::Char('R'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Traceroute/MTR
Event::Key(KeyEvent {
code: KeyCode::Char('A'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Alert me on this selection
Event::Key(KeyEvent {
code: KeyCode::Char('K'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Kill Alert on this
Event::Key(KeyEvent {
code: KeyCode::Char('V'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME View Selected Alerts
Event::Key(KeyEvent {
code: KeyCode::Char('B'),
modifiers: KeyModifiers::NONE,
..
}) => break, // Launch Browser on this customer
Event::Key(KeyEvent {
code: KeyCode::Char('L'),
modifiers: KeyModifiers::NONE,
..
}) => break, // Log notebook on this set of filters
_ => println!("Not recognized"),
}
}
}
// Undo the crossterm stuff
terminal.clear()?;
terminal.show_cursor()?;
crossterm::terminal::disable_raw_mode()?;
Ok(())
fn main() -> Result<()> {
// Create an async channel for seinding data into the bus system.
let (tx, rx) = tokio::sync::mpsc::channel::<BusMessage>(100);
// Create a tokio runtime in a single thread
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move { bus::bus_loop(rx).await });
});
// Initialize the UI
let mut ui = UiBase::new(tx)?;
ui.event_loop()?;
// Return OK
Ok(())
}

View File

@ -0,0 +1,447 @@
use anyhow::Result;
use crossterm::{
event::{read, Event, KeyCode, KeyEvent, KeyModifiers},
terminal::enable_raw_mode,
};
use lqos_bus::{BusClient, BusRequest, BusResponse, IpStats};
use lqos_utils::packet_scale::{scale_bits, scale_packets};
use std::{io, time::Duration};
use tui::{
backend::CrosstermBackend,
layout::{Alignment, Constraint, Direction, Layout},
style::{Color, Style},
text::{Span, Spans},
widgets::{Block, BorderType, Cell, Paragraph, Row, Table},
Terminal,
};
struct DataResult {
totals: (u64, u64, u64, u64),
top: Vec<IpStats>,
}
async fn get_data(client: &mut BusClient, n_rows: u16) -> Result<DataResult> {
let mut result = DataResult { totals: (0, 0, 0, 0), top: Vec::new() };
let requests = vec![
BusRequest::GetCurrentThroughput,
BusRequest::GetTopNDownloaders { start: 0, end: n_rows as u32 },
];
for r in client.request(requests).await? {
match r {
BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second: _,
} => {
let tuple = (
bits_per_second.0,
bits_per_second.1,
packets_per_second.0,
packets_per_second.1,
);
result.totals = tuple;
}
BusResponse::TopDownloaders(top) => {
result.top = top.clone();
}
_ => {}
}
}
Ok(result)
}
fn draw_menu<'a>(is_connected: bool) -> Paragraph<'a> {
let mut text = Spans::from(vec![
Span::styled("Q", Style::default().fg(Color::White)),
Span::from("uit"),
]);
if !is_connected {
text
.0
.push(Span::styled(" NOT CONNECTED ", Style::default().fg(Color::Red)))
} else {
text
.0
.push(Span::styled(" CONNECTED ", Style::default().fg(Color::Green)))
}
let para = Paragraph::new(text)
.style(Style::default().fg(Color::White))
.alignment(Alignment::Center)
.block(
Block::default()
.style(Style::default().fg(Color::Green))
.border_type(BorderType::Plain)
.title("LibreQoS Monitor: "),
);
para
}
fn draw_pps<'a>(
packets_per_second: (u64, u64),
bits_per_second: (u64, u64),
) -> Spans<'a> {
Spans::from(vec![
Span::from("DOWN: "),
Span::from(scale_bits(bits_per_second.0)),
Span::from(" "),
Span::from(scale_bits(bits_per_second.1)),
Span::from(" "),
Span::from("UP: "),
Span::from(scale_packets(packets_per_second.0)),
Span::from(" "),
Span::from(scale_packets(packets_per_second.1)),
])
}
fn draw_top_pane<'a>(
top: &[IpStats],
packets_per_second: (u64, u64),
bits_per_second: (u64, u64),
) -> Table<'a> {
let rows: Vec<Row> = top
.iter()
.map(|stats| {
let color = if stats.bits_per_second.0 < 500 {
Color::DarkGray
} else if stats.tc_handle.as_u32() == 0 {
Color::Cyan
} else {
Color::LightGreen
};
Row::new(vec![
Cell::from(stats.ip_address.clone()),
Cell::from(format!("{:<13}", scale_bits(stats.bits_per_second.0))),
Cell::from(format!("{:<13}", scale_bits(stats.bits_per_second.1))),
Cell::from(format!(
"{:<13}",
scale_packets(stats.packets_per_second.0)
)),
Cell::from(format!(
"{:<13}",
scale_packets(stats.packets_per_second.1)
)),
Cell::from(format!(
"{:<10} ms",
format!("{:.2}", stats.median_tcp_rtt)
)),
Cell::from(format!("{:>7}", stats.tc_handle.to_string())),
])
.style(Style::default().fg(color))
})
.collect();
let header = Row::new(vec![
"Local IP",
"Download",
"Upload",
"Pkts Dn",
"Pkts Up",
"TCP RTT ms",
"Shaper",
])
.style(Style::default().fg(Color::Yellow));
Table::new(rows)
.header(header)
.block(
Block::default().title(draw_pps(packets_per_second, bits_per_second)),
)
.widths(&[
Constraint::Min(42),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(15),
Constraint::Length(7),
])
}
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<()> {
let mut bus_client = BusClient::new().await?;
if !bus_client.is_connected() {
println!("ERROR: lqosd bus is not available");
std::process::exit(0);
}
let mut packets = (0, 0);
let mut bits = (0, 0);
let mut top = Vec::new();
// Initialize TUI
enable_raw_mode()?;
let stdout = io::stdout();
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
terminal.clear()?;
let mut n_rows = 33;
loop {
if let Ok(result) = get_data(&mut bus_client, n_rows).await {
let (bits_down, bits_up, packets_down, packets_up) = result.totals;
packets = (packets_down, packets_up);
bits = (bits_down, bits_up);
top = result.top;
}
//terminal.clear()?;
terminal.draw(|f| {
let chunks = Layout::default()
.direction(Direction::Vertical)
.margin(0)
.constraints(
[Constraint::Min(1), Constraint::Percentage(100)].as_ref(),
)
.split(f.size());
f.render_widget(draw_menu(bus_client.is_connected()), chunks[0]);
// NOTE: this is where the height of the main panel is calculated.
// Resize events are consumed by `tui`, so we never receive them.
n_rows = chunks[1].height;
f.render_widget(draw_top_pane(&top, packets, bits), chunks[1]);
//f.render_widget(bandwidth_chart(datasets.clone(), packets, bits, min, max), chunks[1]);
})?;
if crossterm::event::poll(Duration::from_secs(1)).unwrap() {
match read().unwrap() {
// FIXME - this needs to absorb multiple resize events. Presently,
// When I resize a terminal window, it is not getting one, either.
// How to then change n_rows from here is also on my mind
Event::Resize(width, height) => {
println!("New size = {width}x{height}")
}
Event::Key(KeyEvent {
code: KeyCode::Char('c'),
modifiers: KeyModifiers::CONTROL,
..
}) => break,
Event::Key(KeyEvent {
code: KeyCode::Char('q'),
modifiers: KeyModifiers::NONE,
..
}) => break,
Event::Key(KeyEvent {
code: KeyCode::Char('Z'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // Disconnect from bus, suspend
// Event::Key(KeyEvent { escape should do something I don't know what.
// code: KeyCode::Char('ESC'),
// modifiers: KeyModifiers::CONTROL,}) => break,// go BACK?
//
Event::Key(KeyEvent {
code: KeyCode::Char('h'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into help
Event::Key(KeyEvent {
code: KeyCode::Char('n'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into next
// e.g. n_rows = screen size
// n_start = n_start + screen
// size
Event::Key(KeyEvent {
code: KeyCode::Char('p'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into prev
Event::Key(KeyEvent {
code: KeyCode::Char('?'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into help
Event::Key(KeyEvent {
code: KeyCode::Char('u'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into uploaders
Event::Key(KeyEvent {
code: KeyCode::Char('d'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into downloads
Event::Key(KeyEvent {
code: KeyCode::Char('c'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into cpu
Event::Key(KeyEvent {
code: KeyCode::Char('l'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME lag meter
Event::Key(KeyEvent {
code: KeyCode::Char('N'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into next panel
Event::Key(KeyEvent {
code: KeyCode::Char('P'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME make into prev panel
Event::Key(KeyEvent {
code: KeyCode::Char('b'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Best
Event::Key(KeyEvent {
code: KeyCode::Char('w'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Worst
Event::Key(KeyEvent {
code: KeyCode::Char('D'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Drops
Event::Key(KeyEvent {
code: KeyCode::Char('Q'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Queues
Event::Key(KeyEvent {
code: KeyCode::Char('W'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME (un)display wider stuff
Event::Key(KeyEvent {
code: KeyCode::Char('8'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter out fe80
Event::Key(KeyEvent {
code: KeyCode::Char('6'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Just look at ipv6
Event::Key(KeyEvent {
code: KeyCode::Char('4'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Just look at ipv4
Event::Key(KeyEvent {
code: KeyCode::Char('5'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME ipv4 + ipv6
Event::Key(KeyEvent {
code: KeyCode::Char('U'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME filter on Unshaped
Event::Key(KeyEvent {
code: KeyCode::Char('M'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME filter on My Network
Event::Key(KeyEvent {
code: KeyCode::Char('H'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Generate histogram
Event::Key(KeyEvent {
code: KeyCode::Char('T'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter Tin. This would require an argument BVIL<RET>
Event::Key(KeyEvent {
code: KeyCode::Char('O'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME "Odd" events - multicast, AI-assistance, people down?
Event::Key(KeyEvent {
code: KeyCode::Char('F'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter on "something*
Event::Key(KeyEvent {
code: KeyCode::Char('S'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Filter on Plan Speed
Event::Key(KeyEvent {
code: KeyCode::Char('z'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Zoom in
Event::Key(KeyEvent {
code: KeyCode::Char('Z'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Zoom out
// Now I am Dreaming
Event::Key(KeyEvent {
code: KeyCode::Char('C'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Capture what I am filtering on
Event::Key(KeyEvent {
code: KeyCode::Char('F'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // FIXME Freeze what I am filtering on
Event::Key(KeyEvent {
code: KeyCode::Char('S'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // FIXME Step through what I captured on
Event::Key(KeyEvent {
code: KeyCode::Char('R'),
modifiers: KeyModifiers::CONTROL,
..
}) => break, // FIXME Step backwards what I captured on
// Left and right cursors also
// Dreaming Less now
// Use TAB for autocompletion
// If I have moved into a panel, the following are ideas
Event::Key(KeyEvent {
code: KeyCode::Char('/'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Search for ip
Event::Key(KeyEvent {
code: KeyCode::Char('R'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Traceroute/MTR
Event::Key(KeyEvent {
code: KeyCode::Char('A'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Alert me on this selection
Event::Key(KeyEvent {
code: KeyCode::Char('K'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME Kill Alert on this
Event::Key(KeyEvent {
code: KeyCode::Char('V'),
modifiers: KeyModifiers::NONE,
..
}) => break, // FIXME View Selected Alerts
Event::Key(KeyEvent {
code: KeyCode::Char('B'),
modifiers: KeyModifiers::NONE,
..
}) => break, // Launch Browser on this customer
Event::Key(KeyEvent {
code: KeyCode::Char('L'),
modifiers: KeyModifiers::NONE,
..
}) => break, // Log notebook on this set of filters
_ => println!("Not recognized"),
}
}
}
// Undo the crossterm stuff
terminal.clear()?;
terminal.show_cursor()?;
crossterm::terminal::disable_raw_mode()?;
Ok(())
}

View File

@ -0,0 +1,135 @@
//! Provides a basic system for the UI framework. Handles
//! rendering the basic layout, talking to the UI framework,
//! and event-loop events that aren't quitting the program.
//!
//! It's designed to be the manager from which specific UI
//! components are managed.
use crate::{bus::BusMessage, widgets::*};
use ratatui::prelude::*;
use tokio::sync::mpsc::Sender;
use std::io::Stdout;
use crate::widgets::help::help_display;
use crate::widgets::latency_histogram::LatencyHistogram;
use self::{top_flows::TopFlows, top_hosts::TopHosts};
pub struct TopUi {
show_cpus: bool,
show_throughput_sparkline: bool,
bus_sender: Sender<BusMessage>,
sparkline: NetworkSparkline,
main_widget: Box<dyn TopWidget>,
}
impl TopUi {
/// Create a new TopUi instance. This will initialize the UI framework.
pub fn new(bus_sender: Sender<BusMessage>) -> Self {
let mut main_widget = Box::new(TopHosts::new(bus_sender.clone()));
main_widget.enable();
TopUi {
show_cpus: true,
show_throughput_sparkline: false,
main_widget,
bus_sender: bus_sender.clone(),
sparkline: NetworkSparkline::new(bus_sender.clone()),
}
}
pub fn handle_keypress(&mut self, key: char) {
// Handle Mode Switches
match key {
'c' => self.show_cpus = !self.show_cpus,
'n' => {
self.show_throughput_sparkline = !self.show_throughput_sparkline;
if self.show_throughput_sparkline {
self.sparkline.enable();
} else {
self.sparkline.disable();
}
}
'h' => {
self.main_widget.disable();
self.main_widget = Box::new(TopHosts::new(self.bus_sender.clone()));
self.main_widget.enable();
}
'f' => {
self.main_widget.disable();
self.main_widget = Box::new(TopFlows::new(self.bus_sender.clone()));
self.main_widget.enable();
}
'l' => {
self.main_widget.disable();
self.main_widget = Box::new(LatencyHistogram::new(self.bus_sender.clone()));
self.main_widget.enable();
}
_ => {}
}
}
pub fn render(&mut self, terminal: &mut Terminal<CrosstermBackend<Stdout>>) {
terminal
.draw(|f| {
self.top_level_render(f);
})
.unwrap();
}
fn top_level_render(&mut self, frame: &mut Frame) {
let mut constraints = Vec::new();
let mut next_region = 0;
// Build the layout regions
let help_region = {
constraints.push(Constraint::Length(1));
next_region += 1;
next_region - 1
};
let cpu_region = if self.show_cpus {
constraints.push(Constraint::Length(1));
next_region += 1;
next_region - 1
} else {
next_region
};
let network_spark_region = if self.show_throughput_sparkline {
constraints.push(Constraint::Length(10));
next_region += 1;
next_region - 1
} else {
next_region
};
let final_region = constraints.len();
constraints.push(Constraint::Fill(1));
let main_layout = Layout::new(Direction::Vertical, constraints).split(frame.size());
// Add Widgets
frame.render_widget(help_display(), main_layout[help_region]);
if self.show_cpus {
frame.render_widget(cpu_display(), main_layout[cpu_region]);
}
if self.show_throughput_sparkline {
self.sparkline.set_size(main_layout[network_spark_region]);
self.sparkline.tick();
self.sparkline.render_to_frame(frame);
}
// And finally the main panel
self.main_widget.set_size(main_layout[final_region]);
self.main_widget.tick();
self.main_widget.render_to_frame(frame);
/*match self.main_widget {
MainWidget::Hosts => {
frame.render_widget(top_hosts::hosts(), main_layout[final_region]);
}
MainWidget::Flows => {
frame.render_widget(top_flows::flows(), main_layout[final_region]);
}
}*/
}
}

View File

@ -0,0 +1,103 @@
//! Provides a basic system for the UI framework.
//! Upon starting the program, it performs basic initialization.
//! It tracks "drop", so when the program exits, it can perform cleanup.
use crate::{bus::BusMessage, top_level_ui::TopUi};
use anyhow::Result;
use crossterm::{
event::{self, Event, KeyCode, KeyEventKind},
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen},
ExecutableCommand,
};
use ratatui::{backend::CrosstermBackend, Terminal};
use tokio::sync::mpsc::Sender;
use std::{
io::stdout,
sync::atomic::{AtomicBool, Ordering},
};
pub static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
pub struct UiBase {
ui: TopUi,
}
impl UiBase {
/// Create a new UiBase instance. This will initialize the UI framework.
pub fn new(tx: Sender<BusMessage>) -> Result<Self> {
// Crossterm mode setup
enable_raw_mode()?;
stdout().execute(EnterAlternateScreen)?;
// Panic handler because I hate missing error messages
let original_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic| {
UiBase::cleanup();
original_hook(panic);
}));
// Setup Control-C Handler for graceful shutdown
ctrlc::set_handler(move || {
Self::cleanup();
std::process::exit(0);
})
.unwrap();
// Return
Ok(UiBase {
ui: TopUi::new(tx.clone()),
})
}
pub fn quit_program(&self) {
SHOULD_EXIT.store(true, Ordering::Relaxed);
}
/// Set the should_exit flag to true, which will cause the event loop to exit.
pub fn event_loop(&mut self) -> Result<()> {
let mut terminal = Terminal::new(CrosstermBackend::new(stdout()))?;
while !SHOULD_EXIT.load(Ordering::Relaxed) {
if event::poll(std::time::Duration::from_millis(50))? {
// Retrieve the keypress information
if let Event::Key(key) = event::read()? {
// Key press (down) event
if key.kind == KeyEventKind::Press {
match key.code {
// Quit the program
KeyCode::Char('q') => {
self.quit_program();
}
_ => {
let char: Option<char> = match key.code {
KeyCode::Char(c) => Some(c),
_ => None,
};
if let Some(c) = char {
self.ui.handle_keypress(c);
}
}
}
}
}
}
// Perform rendering
self.ui.render(&mut terminal);
}
Ok(())
}
fn cleanup() {
disable_raw_mode().unwrap();
stdout()
.execute(crossterm::terminal::LeaveAlternateScreen)
.unwrap();
}
}
impl Drop for UiBase {
fn drop(&mut self) {
Self::cleanup();
}
}

View File

@ -0,0 +1,55 @@
use std::sync::atomic::Ordering;
use ratatui::{
style::{Color, Style},
text::Span,
widgets::{Block, Borders, Widget},
};
/// Used to display the CPU usage and RAM usage
pub fn cpu_display() -> impl Widget {
use crate::bus::cpu_ram::*;
let num_cpus = NUM_CPUS.load(Ordering::Relaxed);
let cpu_usage = CPU_USAGE
.iter()
.take(num_cpus)
.map(|x| x.load(Ordering::Relaxed))
.collect::<Vec<_>>();
let total_ram = TOTAL_RAM.load(Ordering::Relaxed);
let used_ram = RAM_USED.load(Ordering::Relaxed);
let ram_percent = 100.0 - ((used_ram as f64 / total_ram as f64) * 100.0);
let ram_color = if ram_percent < 10.0 {
Color::Red
} else if ram_percent < 25.0 {
Color::Yellow
} else {
Color::White
};
let mut span_buf = vec![
Span::styled(" [ RAM: ", Style::default().fg(Color::Green)),
Span::styled(
format!("{:.0}% ", ram_percent),
Style::default().fg(ram_color),
),
Span::styled("CPU: ", Style::default().fg(Color::Green)),
];
for cpu in cpu_usage {
let color = if cpu < 10 {
Color::White
} else if cpu < 25 {
Color::Yellow
} else {
Color::Red
};
span_buf.push(Span::styled(
format!("{}% ", cpu),
Style::default().fg(color),
));
}
span_buf.push(Span::styled(" ] ", Style::default().fg(Color::Green)));
Block::new().borders(Borders::TOP).title(span_buf)
}

View File

@ -0,0 +1,23 @@
use ratatui::prelude::*;
use ratatui::widgets::{Block, Borders};
fn keyhelp(key: char, action: &'static str, buf: &mut Vec<Span>) {
buf.push(Span::styled("[", Style::default().fg(Color::Green)));
buf.push(Span::styled(key.to_string(), Style::default().fg(Color::Green)));
buf.push(Span::styled("] ", Style::default().fg(Color::Green)));
buf.push(Span::styled(action, Style::default().fg(Color::Yellow)));
buf.push(Span::styled(" ", Style::default().fg(Color::Green)));
}
pub fn help_display() -> impl Widget {
let mut span_buf = vec![
Span::styled("LQTOP - ", Style::default().fg(Color::White)),
];
keyhelp('q', "Quit", &mut span_buf);
keyhelp('c', "CPUs", &mut span_buf);
keyhelp('n', "Network", &mut span_buf);
keyhelp('h', "Hosts", &mut span_buf);
keyhelp('f', "Flows", &mut span_buf);
keyhelp('l', "Latency Histogram", &mut span_buf);
Block::new().borders(Borders::NONE).title(span_buf)
}

View File

@ -0,0 +1,66 @@
use super::{table_helper::TableHelper, TopWidget};
use lqos_bus::{BusResponse, FlowbeeSummaryData};
use lqos_utils::packet_scale::scale_bits;
use ratatui::prelude::*;
pub struct LatencyHistogram {
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
rx: std::sync::mpsc::Receiver<BusResponse>,
tx: std::sync::mpsc::Sender<BusResponse>,
size: Rect,
histogram: Vec<u32>,
}
impl TopWidget for LatencyHistogram {
fn enable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::EnableLatencyHistogram(self.tx.clone()))
.unwrap();
}
fn disable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::DisableLatencyHistogram)
.unwrap();
}
fn set_size(&mut self, size: Rect) {
self.size = size;
}
fn tick(&mut self) {
while let Ok(msg) = self.rx.try_recv() {
if let BusResponse::RttHistogram(histogram) = msg {
self.histogram = histogram;
}
}
}
fn render_to_frame(&mut self, frame: &mut Frame) {
let bars: Vec<(String, u64)> = self.histogram.iter()
.enumerate()
.map(|(i, v)| (i.to_string(), *v as u64))
.collect();
let bars_mangled: Vec<_> = bars.iter().map(|(s,n)| {
(s.as_str(), *n)
}).collect();
let bar = ratatui::widgets::BarChart::default()
.bar_width(5)
.bar_gap(1)
.data(&bars_mangled);
frame.render_widget(bar, self.size);
}
}
impl LatencyHistogram {
pub fn new(bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>) -> Self {
let (tx, rx) = std::sync::mpsc::channel::<BusResponse>();
Self {
bus_link,
tx,
rx,
size: Rect::default(),
histogram: Vec::new(),
}
}
}

View File

@ -0,0 +1,33 @@
mod stats_ringbuffer;
mod table_helper;
mod cpu;
pub use cpu::cpu_display;
mod network_sparkline;
pub use network_sparkline::*;
use ratatui::{layout::Rect, Frame};
pub mod top_hosts;
pub mod top_flows;
pub mod help;
pub mod latency_histogram;
pub enum MainWidget {
Hosts,
Flows,
}
pub trait TopWidget {
/// When the widget is enabled, this is called to setup the link to the bus
fn enable(&mut self);
/// When the widget is disabled, this is called to allow the widget to cleanup
fn disable(&mut self);
/// Receive the allocated size for the widget from the layout system
fn set_size(&mut self, size: Rect);
/// Perform a tick to update the widget
fn tick(&mut self);
/// Render the widget
fn render_to_frame(&mut self, frame: &mut Frame);
}

View File

@ -0,0 +1,175 @@
use super::{stats_ringbuffer::StatsRingBuffer, TopWidget};
use crate::bus::BusMessage;
use lqos_bus::BusResponse;
use lqos_utils::packet_scale::scale_bits;
use ratatui::{
prelude::*,
style::{Color, Style},
symbols,
widgets::*,
};
use std::sync::mpsc::{Receiver, Sender};
pub struct NetworkSparkline {
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
rx: Receiver<BusResponse>,
tx: Sender<BusResponse>,
throughput: StatsRingBuffer<CurrentThroughput, 200>,
current_throughput: CurrentThroughput,
render_size: Rect,
}
impl TopWidget for NetworkSparkline {
fn enable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::EnableTotalThroughput(
self.tx.clone(),
))
.unwrap();
}
fn disable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::DisableTotalThroughput)
.unwrap();
}
fn set_size(&mut self, _size: Rect) {
self.render_size = _size;
}
fn tick(&mut self) {
while let Ok(msg) = self.rx.try_recv() {
if let BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
} = msg
{
self.throughput.push(CurrentThroughput {
bits_per_second,
_packets_per_second: packets_per_second,
shaped_bits_per_second,
});
self.current_throughput = CurrentThroughput {
bits_per_second,
_packets_per_second: packets_per_second,
shaped_bits_per_second,
};
}
}
}
fn render_to_frame(&mut self, frame: &mut Frame) {
let mut raw_data = self.throughput.get_values_in_order();
raw_data.reverse();
let bps_down: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.bits_per_second.1 as f64))
.collect();
let bps_up: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.bits_per_second.0 as f64))
.collect();
let shaped_down: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.1 as f64))
.collect();
let shaped_up: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.0 as f64))
.collect();
let (up, down) = self.current_throughput.bits_per_second;
let title = format!(
" [Throughput (Down: {} Up: {})]",
scale_bits(up),
scale_bits(down)
);
let block = Block::default()
.title(title)
.borders(Borders::ALL)
.style(Style::default().fg(Color::Green));
let datasets = vec![
Dataset::default()
.name("Throughput")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::Cyan))
.data(&bps_down),
Dataset::default()
.name("Throughput")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::Cyan))
.data(&bps_up),
Dataset::default()
.name("Shaped")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::LightGreen))
.data(&shaped_down),
Dataset::default()
.name("Shaped")
.marker(symbols::Marker::Braille)
.style(Style::default().fg(Color::LightGreen))
.data(&shaped_up),
];
let bps_max = bps_down.iter().map(|(_, val)| *val).fold(0.0, f64::max);
let bps_min = bps_up.iter().map(|(_, val)| *val).fold(0.0, f64::min);
let shaped_max = shaped_down.iter().map(|(_, val)| *val).fold(0.0, f64::max);
let shaped_min = shaped_up.iter().map(|(_, val)| *val).fold(0.0, f64::min);
let max = f64::max(bps_max, shaped_max);
let min = f64::min(bps_min, shaped_min);
let chart = Chart::new(datasets)
.block(block)
.x_axis(
Axis::default()
.title("Time")
.style(Style::default().fg(Color::Gray))
.bounds([0.0, 80.0]),
)
.y_axis(
Axis::default()
.style(Style::default().fg(Color::Gray))
.bounds([min, max]),
);
frame.render_widget(chart, self.render_size);
}
}
impl NetworkSparkline {
pub fn new(bus_link: tokio::sync::mpsc::Sender<BusMessage>) -> Self {
let (tx, rx) = std::sync::mpsc::channel::<BusResponse>();
NetworkSparkline {
bus_link,
rx,
tx,
throughput: StatsRingBuffer::new(),
render_size: Rect::default(),
current_throughput: CurrentThroughput::default(),
}
}
}
#[derive(Default, Copy, Clone)]
struct CurrentThroughput {
pub bits_per_second: (u64, u64),
pub _packets_per_second: (u64, u64),
pub shaped_bits_per_second: (u64, u64),
}

View File

@ -0,0 +1,35 @@
/// Generic RingBuffer Type for storing statistics
pub struct StatsRingBuffer<T, const N: usize> {
buffer: Vec<T>,
index: usize,
}
impl <T: Default + Clone, const N: usize> StatsRingBuffer<T, N> {
/// Creates an empty ringbuffer
pub fn new() -> Self {
Self {
buffer: vec![T::default(); N],
index: 0,
}
}
/// Adds an entry to the ringbuffer
pub fn push(&mut self, value: T) {
self.buffer[self.index] = value;
self.index = (self.index + 1) % N;
}
/// Retrieves a clone of the ringbuffer, in the order the values were added
pub fn get_values_in_order(&self) -> Vec<T> {
let mut result = Vec::with_capacity(N);
for i in self.index..N {
result.push(self.buffer[i].clone());
}
for i in 0..self.index {
result.push(self.buffer[i].clone());
}
result
}
}

View File

@ -0,0 +1,67 @@
use ratatui::{style::{Color, Style}, text::Text, widgets::{Block, Borders, Cell, Row, Table}};
/// A helper for building Ratatui tables
pub struct TableHelper<const N_COLS: usize> {
headers: [String; N_COLS],
rows: Vec<[String; N_COLS]>,
}
impl <const N: usize> TableHelper<N> {
pub fn new<S: ToString>(raw_headers: [S; N]) -> Self {
const ARRAY_REPEAT_VALUE: String = String::new();
let mut headers = [ARRAY_REPEAT_VALUE; N];
for i in 0..N {
headers[i] = raw_headers[i].to_string();
}
let headers = headers.try_into().unwrap();
Self {
headers,
rows: Vec::new(),
}
}
pub fn add_row(&mut self, row: [String; N]) {
self.rows.push(row);
}
pub fn to_table(&self) -> Table {
let header_cells: Vec<_> = self.headers.
iter()
.map(|h| Cell::from(Text::from(h.clone())))
.collect();
let rows: Vec<_> = self.rows
.iter()
.map(|row| {
let cells = row.iter()
.map(|cell| Cell::from(Text::from(cell.clone())))
.collect::<Vec<Cell>>();
Row::new(cells)
})
.collect();
let mut widths = [0u16; N];
for (i, heading) in self.headers.iter().enumerate() {
widths[i] = (heading.len() + 2) as u16;
}
for row in self.rows.iter() {
for (j, cell) in row.iter().enumerate() {
if cell.len() + 2 > widths[j] as usize {
widths[j] = (cell.len() + 2) as u16;
}
}
}
Table::new(rows, widths)
.header(Row::new(header_cells).style(Style::default().fg(Color::White).bg(Color::Blue)))
}
pub fn to_block(&self) -> Table {
let block = Block::default()
//.title("Top Downloaders")
.borders(Borders::NONE)
.style(Style::default().fg(Color::Green));
self.to_table().block(block)
}
}

View File

@ -0,0 +1,78 @@
use super::{table_helper::TableHelper, TopWidget};
use lqos_bus::{BusResponse, FlowbeeSummaryData};
use lqos_utils::packet_scale::scale_bits;
use ratatui::prelude::*;
pub struct TopFlows {
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
rx: std::sync::mpsc::Receiver<BusResponse>,
tx: std::sync::mpsc::Sender<BusResponse>,
size: Rect,
flows: Vec<FlowbeeSummaryData>,
}
impl TopWidget for TopFlows {
fn enable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::EnableTopFlows(self.tx.clone()))
.unwrap();
}
fn disable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::DisableTopFlows)
.unwrap();
}
fn set_size(&mut self, size: Rect) {
self.size = size;
}
fn tick(&mut self) {
while let Ok(msg) = self.rx.try_recv() {
if let BusResponse::TopFlows(flows) = msg {
self.flows = flows;
}
}
}
fn render_to_frame(&mut self, frame: &mut Frame) {
let mut t = TableHelper::new([
"Src IP",
"Dst IP",
"Type",
"Upload",
"Download",
"Retransmits",
"RTT (ms)",
"ASN",
]);
for flow in self.flows.iter() {
t.add_row([
flow.local_ip.to_string(),
flow.remote_ip.to_string(),
flow.analysis.to_string(),
scale_bits(flow.bytes_sent[0]),
scale_bits(flow.bytes_sent[1]),
format!("{}/{}", flow.tcp_retransmits[0], flow.tcp_retransmits[1]),
format!("{:.1}/{:.1}", flow.rtt_nanos[0] as f64 / 1000000., flow.tcp_retransmits[1] as f64 / 1000000.),
flow.remote_asn_name.to_string(),
]);
}
let table = t.to_block();
frame.render_widget(table, self.size);
}
}
impl TopFlows {
pub fn new(bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>) -> Self {
let (tx, rx) = std::sync::mpsc::channel::<BusResponse>();
Self {
bus_link,
tx,
rx,
size: Rect::default(),
flows: Vec::new(),
}
}
}

View File

@ -0,0 +1,78 @@
use lqos_bus::{BusResponse, IpStats};
use lqos_utils::packet_scale::{scale_bits, scale_packets};
use ratatui::prelude::*;
use super::{table_helper::TableHelper, TopWidget};
pub struct TopHosts {
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
rx: std::sync::mpsc::Receiver<BusResponse>,
tx: std::sync::mpsc::Sender<BusResponse>,
size: Rect,
stats: Vec<IpStats>,
}
impl TopWidget for TopHosts {
fn enable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::EnableTopHosts(self.tx.clone()))
.unwrap();
}
fn disable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::DisableTopHosts)
.unwrap();
}
fn set_size(&mut self, size: Rect) {
self.size = size;
}
fn tick(&mut self) {
while let Ok(response) = self.rx.try_recv() {
if let BusResponse::TopDownloaders(stats) = response {
self.stats = stats;
}
}
}
fn render_to_frame(&mut self, frame: &mut Frame) {
let mut t = TableHelper::new([
"IP Address",
"Down (bps)",
"Up (bps)",
"Down (pps)",
"Up (pps)",
"RTT",
"TC Handle",
]);
for host in self.stats.iter() {
t.add_row([
host.ip_address.to_string(),
scale_bits(host.bits_per_second.0),
scale_bits(host.bits_per_second.1),
scale_packets(host.packets_per_second.0),
scale_packets(host.packets_per_second.1),
format!("{:.2} ms", host.median_tcp_rtt),
host.tc_handle.to_string(),
]);
}
let block = t.to_block();
frame.render_widget(block, self.size);
}
}
impl TopHosts {
pub fn new(bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>) -> Self {
let (tx, rx) = std::sync::mpsc::channel::<BusResponse>();
Self {
bus_link,
rx,
tx,
size: Rect::default(),
stats: Vec::new(),
}
}
}