diff --git a/src/rust/lqtop/src/bus/mod.rs b/src/rust/lqtop/src/bus/mod.rs index e91286a6..7e3c6760 100644 --- a/src/rust/lqtop/src/bus/mod.rs +++ b/src/rust/lqtop/src/bus/mod.rs @@ -7,12 +7,13 @@ use tokio::sync::mpsc::Receiver; use std::sync::atomic::Ordering; pub mod cpu_ram; pub mod top_hosts; -pub mod top_flows; /// Communications with the bus via channels pub enum BusMessage { EnableTotalThroughput(std::sync::mpsc::Sender), DisableTotalThroughput, + EnableTopFlows(std::sync::mpsc::Sender), + DisableTopFlows, } /// The main loop for the bus. @@ -34,7 +35,7 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { // Collection Settings let mut collect_total_throughput = None; let collect_top_downloaders = true; - let collect_top_flows = true; + let mut collect_top_flows = None; let mut bus_client = BusClient::new().await?; if !bus_client.is_connected() { @@ -51,6 +52,12 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { BusMessage::DisableTotalThroughput => { collect_total_throughput = None; } + BusMessage::EnableTopFlows(tx) => { + collect_top_flows = Some(tx); + } + BusMessage::DisableTopFlows => { + collect_top_flows = None; + } } } @@ -63,7 +70,7 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { if collect_top_downloaders { commands.push(BusRequest::GetTopNDownloaders { start: 0, end: 100 }); } - if collect_top_flows { + if collect_top_flows.is_some() { commands.push(BusRequest::TopFlows { flow_type: lqos_bus::TopFlowType::Bytes, n: 100 }); } @@ -72,11 +79,15 @@ async fn main_loop(mut rx: Receiver) -> Result<()> { match response { BusResponse::CurrentThroughput { .. } => { if let Some(tx) = &collect_total_throughput { - tx.send(response).unwrap(); // Ignoring the error, it's ok if the channel closed + let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed } } BusResponse::TopDownloaders { .. } => top_hosts::top_n(&response).await, - BusResponse::TopFlows(..) => top_flows::top_flows(&response).await, + BusResponse::TopFlows(..) => { + if let Some(tx) = &collect_top_flows { + let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed + } + } _ => {} } } diff --git a/src/rust/lqtop/src/bus/top_flows.rs b/src/rust/lqtop/src/bus/top_flows.rs deleted file mode 100644 index 52ec82e6..00000000 --- a/src/rust/lqtop/src/bus/top_flows.rs +++ /dev/null @@ -1,12 +0,0 @@ -use lqos_bus::{BusResponse, FlowbeeSummaryData}; -use once_cell::sync::Lazy; -use std::sync::Mutex; - -pub static TOP_FLOWS: Lazy>> = Lazy::new(|| Mutex::new(Vec::new())); - -pub async fn top_flows(response: &BusResponse) { - if let BusResponse::TopFlows(stats) = response { - let mut top_hosts = TOP_FLOWS.lock().unwrap(); - *top_hosts = stats.clone(); - } -} \ No newline at end of file diff --git a/src/rust/lqtop/src/top_level_ui.rs b/src/rust/lqtop/src/top_level_ui.rs index 78b370ac..ebca364d 100644 --- a/src/rust/lqtop/src/top_level_ui.rs +++ b/src/rust/lqtop/src/top_level_ui.rs @@ -10,22 +10,26 @@ use ratatui::prelude::*; use tokio::sync::mpsc::Sender; use std::io::Stdout; +use self::top_flows::TopFlows; + pub struct TopUi { show_cpus: bool, show_throughput_sparkline: bool, - _bus_sender: Sender, + bus_sender: Sender, sparkline: NetworkSparkline, - main_widget: MainWidget, + main_widget: Box, } impl TopUi { /// Create a new TopUi instance. This will initialize the UI framework. pub fn new(bus_sender: Sender) -> Self { + let mut main_widget = Box::new(TopFlows::new(bus_sender.clone())); + main_widget.enable(); TopUi { show_cpus: true, show_throughput_sparkline: false, - main_widget: MainWidget::Hosts, - _bus_sender: bus_sender.clone(), + main_widget, + bus_sender: bus_sender.clone(), sparkline: NetworkSparkline::new(bus_sender.clone()), } } @@ -42,8 +46,12 @@ impl TopUi { self.sparkline.disable(); } } - 'h' => self.main_widget = MainWidget::Hosts, - 'f' => self.main_widget = MainWidget::Flows, + //'h' => self.main_widget = MainWidget::Hosts, + 'f' => { + self.main_widget.disable(); + self.main_widget = Box::new(TopFlows::new(self.bus_sender.clone())); + self.main_widget.enable(); + } _ => {} } } @@ -93,13 +101,17 @@ impl TopUi { } // And finally the main panel - match self.main_widget { + self.main_widget.set_size(main_layout[final_region]); + self.main_widget.tick(); + self.main_widget.render_to_frame(frame); + + /*match self.main_widget { MainWidget::Hosts => { frame.render_widget(top_hosts::hosts(), main_layout[final_region]); } MainWidget::Flows => { frame.render_widget(top_flows::flows(), main_layout[final_region]); } - } + }*/ } } diff --git a/src/rust/lqtop/src/widgets/top_flows.rs b/src/rust/lqtop/src/widgets/top_flows.rs index 4e464a06..5cbb8288 100644 --- a/src/rust/lqtop/src/widgets/top_flows.rs +++ b/src/rust/lqtop/src/widgets/top_flows.rs @@ -1,45 +1,101 @@ +use super::TopWidget; +use lqos_bus::{BusResponse, FlowbeeSummaryData}; use lqos_utils::packet_scale::scale_bits; use ratatui::{prelude::*, widgets::*}; -use crate::bus::top_flows::TOP_FLOWS; +pub struct TopFlows { + bus_link: tokio::sync::mpsc::Sender, + rx: std::sync::mpsc::Receiver, + tx: std::sync::mpsc::Sender, + size: Rect, + flows: Vec, +} -pub fn flows() -> impl Widget { - let block = Block::default() - //.title("Top Downloaders") - .borders(Borders::NONE) - .style(Style::default().fg(Color::Green)); - - let mut rows = Vec::new(); - - let lock = TOP_FLOWS.lock().unwrap(); - for flow in lock.iter() { - - rows.push( - Row::new(vec![ - Cell::from(Text::from(flow.local_ip.to_string())), - Cell::from(Text::from(flow.remote_ip.to_string())), - Cell::from(Text::from(flow.analysis.to_string())), - Cell::from(Text::from(scale_bits(flow.bytes_sent[0]))), - Cell::from(Text::from(scale_bits(flow.bytes_sent[1]))), - Cell::from(Text::from(format!("{}/{}", flow.tcp_retransmits[0], flow.tcp_retransmits[1]))), - Cell::from(Text::from(format!("{:.1}/{:.1}", flow.rtt_nanos[0] as f64 / 1000000. , flow.tcp_retransmits[1] as f64 / 1000000.))), - Cell::from(Text::from(flow.remote_asn_name.to_string())), - ]).style(style::Style::default().fg(Color::White)), - ); +impl TopWidget for TopFlows { + fn enable(&mut self) { + self.bus_link + .blocking_send(crate::bus::BusMessage::EnableTopFlows(self.tx.clone())) + .unwrap(); } - let header = Row::new(vec![ - Cell::from(Text::from("Src IP")), - Cell::from(Text::from("Dst IP")), - Cell::from(Text::from("Type")), - Cell::from(Text::from("Upload")), - Cell::from(Text::from("Download")), - Cell::from(Text::from("Retransmits")), - Cell::from(Text::from("RTT (ms)")), - Cell::from(Text::from("ASN")), - ]).style(style::Style::default().fg(Color::Yellow).bg(Color::Blue)); + fn disable(&mut self) { + self.bus_link + .blocking_send(crate::bus::BusMessage::DisableTopFlows) + .unwrap(); + } - Table::new(rows, [15, 15, 20, 14, 14, 10, 15, 20]) - .block(block) - .header(header) -} \ No newline at end of file + fn set_size(&mut self, size: Rect) { + self.size = size; + } + + fn tick(&mut self) { + while let Ok(msg) = self.rx.try_recv() { + if let BusResponse::TopFlows(flows) = msg { + self.flows = flows; + } + } + } + + fn render_to_frame(&mut self, frame: &mut Frame) { + let block = Block::default() + //.title("Top Downloaders") + .borders(Borders::NONE) + .style(Style::default().fg(Color::Green)); + + let mut rows = Vec::new(); + + for flow in self.flows.iter() { + rows.push( + Row::new(vec![ + Cell::from(Text::from(flow.local_ip.to_string())), + Cell::from(Text::from(flow.remote_ip.to_string())), + Cell::from(Text::from(flow.analysis.to_string())), + Cell::from(Text::from(scale_bits(flow.bytes_sent[0]))), + Cell::from(Text::from(scale_bits(flow.bytes_sent[1]))), + Cell::from(Text::from(format!( + "{}/{}", + flow.tcp_retransmits[0], flow.tcp_retransmits[1] + ))), + Cell::from(Text::from(format!( + "{:.1}/{:.1}", + flow.rtt_nanos[0] as f64 / 1000000., + flow.tcp_retransmits[1] as f64 / 1000000. + ))), + Cell::from(Text::from(flow.remote_asn_name.to_string())), + ]) + .style(style::Style::default().fg(Color::White)), + ); + } + + let header = Row::new(vec![ + Cell::from(Text::from("Src IP")), + Cell::from(Text::from("Dst IP")), + Cell::from(Text::from("Type")), + Cell::from(Text::from("Upload")), + Cell::from(Text::from("Download")), + Cell::from(Text::from("Retransmits")), + Cell::from(Text::from("RTT (ms)")), + Cell::from(Text::from("ASN")), + ]) + .style(style::Style::default().fg(Color::Yellow).bg(Color::Blue)); + + let table = Table::new(rows, [15, 15, 20, 14, 14, 10, 15, 20]) + .block(block) + .header(header); + + frame.render_widget(table, self.size); + } +} + +impl TopFlows { + pub fn new(bus_link: tokio::sync::mpsc::Sender) -> Self { + let (tx, rx) = std::sync::mpsc::channel::(); + Self { + bus_link, + tx, + rx, + size: Rect::default(), + flows: Vec::new(), + } + } +}