And top hosts is ported to the new setup. I like this approach.

This commit is contained in:
Herbert Wolverson 2024-03-21 13:58:34 -05:00
parent 2cc5973ce7
commit cefda5e936
4 changed files with 93 additions and 68 deletions

View File

@ -6,7 +6,6 @@ use lqos_bus::{BusClient, BusRequest, BusResponse};
use tokio::sync::mpsc::Receiver;
use std::sync::atomic::Ordering;
pub mod cpu_ram;
pub mod top_hosts;
/// Communications with the bus via channels
pub enum BusMessage {
@ -14,6 +13,8 @@ pub enum BusMessage {
DisableTotalThroughput,
EnableTopFlows(std::sync::mpsc::Sender<BusResponse>),
DisableTopFlows,
EnableTopHosts(std::sync::mpsc::Sender<BusResponse>),
DisableTopHosts,
}
/// The main loop for the bus.
@ -34,7 +35,7 @@ async fn main_loop_wrapper(rx: Receiver<BusMessage>) {
async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
// Collection Settings
let mut collect_total_throughput = None;
let collect_top_downloaders = true;
let mut collect_top_downloaders = None;
let mut collect_top_flows = None;
let mut bus_client = BusClient::new().await?;
@ -58,6 +59,12 @@ async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
BusMessage::DisableTopFlows => {
collect_top_flows = None;
}
BusMessage::EnableTopHosts(tx) => {
collect_top_downloaders = Some(tx);
}
BusMessage::DisableTopHosts => {
collect_top_downloaders = None;
}
}
}
@ -67,7 +74,7 @@ async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
if collect_total_throughput.is_some() {
commands.push(BusRequest::GetCurrentThroughput);
}
if collect_top_downloaders {
if collect_top_downloaders.is_some() {
commands.push(BusRequest::GetTopNDownloaders { start: 0, end: 100 });
}
if collect_top_flows.is_some() {
@ -82,7 +89,11 @@ async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
BusResponse::TopDownloaders { .. } => top_hosts::top_n(&response).await,
BusResponse::TopDownloaders { .. } => {
if let Some(tx) = &collect_top_downloaders {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
BusResponse::TopFlows(..) => {
if let Some(tx) = &collect_top_flows {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed

View File

@ -1,12 +0,0 @@
use lqos_bus::{BusResponse, IpStats};
use once_cell::sync::Lazy;
use std::sync::Mutex;
pub static TOP_HOSTS: Lazy<Mutex<Vec<IpStats>>> = Lazy::new(|| Mutex::new(Vec::new()));
pub async fn top_n(response: &BusResponse) {
if let BusResponse::TopDownloaders(stats) = response {
let mut top_hosts = TOP_HOSTS.lock().unwrap();
*top_hosts = stats.clone();
}
}

View File

@ -10,7 +10,7 @@ use ratatui::prelude::*;
use tokio::sync::mpsc::Sender;
use std::io::Stdout;
use self::top_flows::TopFlows;
use self::{top_flows::TopFlows, top_hosts::TopHosts};
pub struct TopUi {
show_cpus: bool,
@ -23,7 +23,7 @@ pub struct TopUi {
impl TopUi {
/// Create a new TopUi instance. This will initialize the UI framework.
pub fn new(bus_sender: Sender<BusMessage>) -> Self {
let mut main_widget = Box::new(TopFlows::new(bus_sender.clone()));
let mut main_widget = Box::new(TopHosts::new(bus_sender.clone()));
main_widget.enable();
TopUi {
show_cpus: true,
@ -46,7 +46,11 @@ impl TopUi {
self.sparkline.disable();
}
}
//'h' => self.main_widget = MainWidget::Hosts,
'h' => {
self.main_widget.disable();
self.main_widget = Box::new(TopHosts::new(self.bus_sender.clone()));
self.main_widget.enable();
}
'f' => {
self.main_widget.disable();
self.main_widget = Box::new(TopFlows::new(self.bus_sender.clone()));

View File

@ -1,56 +1,78 @@
use lqos_bus::{BusResponse, IpStats};
use lqos_utils::packet_scale::{scale_bits, scale_packets};
use ratatui::{prelude::*, widgets::*};
use ratatui::prelude::*;
use super::{table_helper::TableHelper, TopWidget};
use crate::bus::top_hosts::TOP_HOSTS;
pub struct TopHosts {
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
rx: std::sync::mpsc::Receiver<BusResponse>,
tx: std::sync::mpsc::Sender<BusResponse>,
size: Rect,
stats: Vec<IpStats>,
}
pub fn hosts() -> impl Widget {
let block = Block::default()
//.title("Top Downloaders")
.borders(Borders::NONE)
.style(Style::default().fg(Color::Green));
let mut rows = Vec::new();
let lock = TOP_HOSTS.lock().unwrap();
for host in lock.iter() {
let color = if host.tc_handle.to_string() == "0:0" {
Color::White
} else {
Color::LightGreen
};
let bg_color = if host.median_tcp_rtt > 150.0 {
Color::Red
} else if host.median_tcp_rtt > 100.0 {
Color::Yellow
} else {
Color::Black
};
rows.push(
Row::new(vec![
Cell::from(Text::from(host.ip_address.to_string())),
Cell::from(Text::from(scale_bits(host.bits_per_second.0))),
Cell::from(Text::from(scale_bits(host.bits_per_second.1))),
Cell::from(Text::from(scale_packets(host.packets_per_second.0))),
Cell::from(Text::from(scale_packets(host.packets_per_second.1))),
Cell::from(Text::from(format!("{:.2} ms", host.median_tcp_rtt))),
Cell::from(Text::from(host.tc_handle.to_string())),
]).style(style::Style::default().fg(color).bg(bg_color)),
);
impl TopWidget for TopHosts {
fn enable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::EnableTopHosts(self.tx.clone()))
.unwrap();
}
let header = Row::new(vec![
Cell::from(Text::from("IP Address")),
Cell::from(Text::from("Down (bps)")),
Cell::from(Text::from("Up (bps)")),
Cell::from(Text::from("Down (pps)")),
Cell::from(Text::from("Up (pps)")),
Cell::from(Text::from("RTT")),
Cell::from(Text::from("TC Handle")),
]).style(style::Style::default().fg(Color::Yellow).bg(Color::Blue));
fn disable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::DisableTopHosts)
.unwrap();
}
Table::new(rows, [20, 15, 15, 10, 10, 10, 10])
.block(block)
.header(header)
fn set_size(&mut self, size: Rect) {
self.size = size;
}
fn tick(&mut self) {
while let Ok(response) = self.rx.try_recv() {
if let BusResponse::TopDownloaders(stats) = response {
self.stats = stats;
}
}
}
fn render_to_frame(&mut self, frame: &mut Frame) {
let mut t = TableHelper::new([
"IP Address",
"Down (bps)",
"Up (bps)",
"Down (pps)",
"Up (pps)",
"RTT",
"TC Handle",
]);
for host in self.stats.iter() {
t.add_row([
host.ip_address.to_string(),
scale_bits(host.bits_per_second.0),
scale_bits(host.bits_per_second.1),
scale_packets(host.packets_per_second.0),
scale_packets(host.packets_per_second.1),
format!("{:.2} ms", host.median_tcp_rtt),
host.tc_handle.to_string(),
]);
}
let block = t.to_block();
frame.render_widget(block, self.size);
}
}
impl TopHosts {
pub fn new(bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>) -> Self {
let (tx, rx) = std::sync::mpsc::channel::<BusResponse>();
Self {
bus_link,
rx,
tx,
size: Rect::default(),
stats: Vec::new(),
}
}
}