From e615d2693460a81ff83b606bedec163e69f34c0e Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Mon, 13 May 2024 15:00:07 -0500 Subject: [PATCH] Add lqtop latency histo --- src/rust/lqtop/src/bus/mod.rs | 17 +++++ src/rust/lqtop/src/top_level_ui.rs | 6 ++ src/rust/lqtop/src/widgets/help.rs | 1 + .../lqtop/src/widgets/latency_histogram.rs | 66 +++++++++++++++++++ src/rust/lqtop/src/widgets/mod.rs | 1 + 5 files changed, 91 insertions(+) create mode 100644 src/rust/lqtop/src/widgets/latency_histogram.rs diff --git a/src/rust/lqtop/src/bus/mod.rs b/src/rust/lqtop/src/bus/mod.rs index 5701f665..16a40398 100644 --- a/src/rust/lqtop/src/bus/mod.rs +++ b/src/rust/lqtop/src/bus/mod.rs @@ -15,6 +15,8 @@ pub enum BusMessage { DisableTopFlows, EnableTopHosts(std::sync::mpsc::Sender), DisableTopHosts, + EnableLatencyHistogram(std::sync::mpsc::Sender), + DisableLatencyHistogram, } /// The main loop for the bus. @@ -37,6 +39,7 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { let mut collect_total_throughput = None; let mut collect_top_downloaders = None; let mut collect_top_flows = None; + let mut collect_latency_histogram = None; let mut bus_client = BusClient::new().await?; if !bus_client.is_connected() { @@ -65,6 +68,12 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { BusMessage::DisableTopHosts => { collect_top_downloaders = None; } + BusMessage::EnableLatencyHistogram(tx) => { + collect_latency_histogram = Some(tx); + } + BusMessage::DisableLatencyHistogram => { + collect_latency_histogram = None; + } } } @@ -80,6 +89,9 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { if collect_top_flows.is_some() { commands.push(BusRequest::TopFlows { flow_type: lqos_bus::TopFlowType::Bytes, n: 100 }); } + if collect_latency_histogram.is_some() { + commands.push(BusRequest::RttHistogram); + } // Send the requests and process replies for response in bus_client.request(commands).await? { @@ -99,6 +111,11 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed } } + BusResponse::RttHistogram(..) => { + if let Some(tx) = &collect_latency_histogram { + let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed + } + } _ => {} } } diff --git a/src/rust/lqtop/src/top_level_ui.rs b/src/rust/lqtop/src/top_level_ui.rs index e3eb030b..0793db2a 100644 --- a/src/rust/lqtop/src/top_level_ui.rs +++ b/src/rust/lqtop/src/top_level_ui.rs @@ -10,6 +10,7 @@ use ratatui::prelude::*; use tokio::sync::mpsc::Sender; use std::io::Stdout; use crate::widgets::help::help_display; +use crate::widgets::latency_histogram::LatencyHistogram; use self::{top_flows::TopFlows, top_hosts::TopHosts}; @@ -57,6 +58,11 @@ impl TopUi { self.main_widget = Box::new(TopFlows::new(self.bus_sender.clone())); self.main_widget.enable(); } + 'l' => { + self.main_widget.disable(); + self.main_widget = Box::new(LatencyHistogram::new(self.bus_sender.clone())); + self.main_widget.enable(); + } _ => {} } } diff --git a/src/rust/lqtop/src/widgets/help.rs b/src/rust/lqtop/src/widgets/help.rs index ea140dfe..6e7c5c6b 100644 --- a/src/rust/lqtop/src/widgets/help.rs +++ b/src/rust/lqtop/src/widgets/help.rs @@ -18,5 +18,6 @@ pub fn help_display() -> impl Widget { keyhelp('n', "Network", &mut span_buf); keyhelp('h', "Hosts", &mut span_buf); keyhelp('f', "Flows", &mut span_buf); + keyhelp('l', "Latency Histogram", &mut span_buf); Block::new().borders(Borders::NONE).title(span_buf) } \ No newline at end of file diff --git a/src/rust/lqtop/src/widgets/latency_histogram.rs b/src/rust/lqtop/src/widgets/latency_histogram.rs new file mode 100644 index 00000000..d46c5324 --- /dev/null +++ b/src/rust/lqtop/src/widgets/latency_histogram.rs @@ -0,0 +1,66 @@ +use super::{table_helper::TableHelper, TopWidget}; +use lqos_bus::{BusResponse, FlowbeeSummaryData}; +use lqos_utils::packet_scale::scale_bits; +use ratatui::prelude::*; + +pub struct LatencyHistogram { + bus_link: tokio::sync::mpsc::Sender, + rx: std::sync::mpsc::Receiver, + tx: std::sync::mpsc::Sender, + size: Rect, + histogram: Vec, +} + +impl TopWidget for LatencyHistogram { + fn enable(&mut self) { + self.bus_link + .blocking_send(crate::bus::BusMessage::EnableLatencyHistogram(self.tx.clone())) + .unwrap(); + } + + fn disable(&mut self) { + self.bus_link + .blocking_send(crate::bus::BusMessage::DisableLatencyHistogram) + .unwrap(); + } + + fn set_size(&mut self, size: Rect) { + self.size = size; + } + + fn tick(&mut self) { + while let Ok(msg) = self.rx.try_recv() { + if let BusResponse::RttHistogram(histogram) = msg { + self.histogram = histogram; + } + } + } + + fn render_to_frame(&mut self, frame: &mut Frame) { + let bars: Vec<(String, u64)> = self.histogram.iter() + .enumerate() + .map(|(i, v)| (i.to_string(), *v as u64)) + .collect(); + let bars_mangled: Vec<_> = bars.iter().map(|(s,n)| { + (s.as_str(), *n) + }).collect(); + let bar = ratatui::widgets::BarChart::default() + .bar_width(5) + .bar_gap(1) + .data(&bars_mangled); + frame.render_widget(bar, self.size); + } +} + +impl LatencyHistogram { + pub fn new(bus_link: tokio::sync::mpsc::Sender) -> Self { + let (tx, rx) = std::sync::mpsc::channel::(); + Self { + bus_link, + tx, + rx, + size: Rect::default(), + histogram: Vec::new(), + } + } +} diff --git a/src/rust/lqtop/src/widgets/mod.rs b/src/rust/lqtop/src/widgets/mod.rs index b15c380f..37d9e6dd 100644 --- a/src/rust/lqtop/src/widgets/mod.rs +++ b/src/rust/lqtop/src/widgets/mod.rs @@ -8,6 +8,7 @@ use ratatui::{layout::Rect, Frame}; pub mod top_hosts; pub mod top_flows; pub mod help; +pub mod latency_histogram; pub enum MainWidget { Hosts,