Flows system moved over to the new regime.

This commit is contained in:
Herbert Wolverson 2024-03-21 13:15:59 -05:00
parent e6a30bc370
commit 205066b54d
4 changed files with 130 additions and 63 deletions

View File

@ -7,12 +7,13 @@ use tokio::sync::mpsc::Receiver;
use std::sync::atomic::Ordering;
pub mod cpu_ram;
pub mod top_hosts;
pub mod top_flows;
/// Communications with the bus via channels
pub enum BusMessage {
EnableTotalThroughput(std::sync::mpsc::Sender<BusResponse>),
DisableTotalThroughput,
EnableTopFlows(std::sync::mpsc::Sender<BusResponse>),
DisableTopFlows,
}
/// The main loop for the bus.
@ -34,7 +35,7 @@ async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
// Collection Settings
let mut collect_total_throughput = None;
let collect_top_downloaders = true;
let collect_top_flows = true;
let mut collect_top_flows = None;
let mut bus_client = BusClient::new().await?;
if !bus_client.is_connected() {
@ -51,6 +52,12 @@ async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
BusMessage::DisableTotalThroughput => {
collect_total_throughput = None;
}
BusMessage::EnableTopFlows(tx) => {
collect_top_flows = Some(tx);
}
BusMessage::DisableTopFlows => {
collect_top_flows = None;
}
}
}
@ -63,7 +70,7 @@ async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
if collect_top_downloaders {
commands.push(BusRequest::GetTopNDownloaders { start: 0, end: 100 });
}
if collect_top_flows {
if collect_top_flows.is_some() {
commands.push(BusRequest::TopFlows { flow_type: lqos_bus::TopFlowType::Bytes, n: 100 });
}
@ -72,11 +79,15 @@ async fn main_loop(mut rx: Receiver<BusMessage>) -> Result<()> {
match response {
BusResponse::CurrentThroughput { .. } => {
if let Some(tx) = &collect_total_throughput {
tx.send(response).unwrap(); // Ignoring the error, it's ok if the channel closed
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
BusResponse::TopDownloaders { .. } => top_hosts::top_n(&response).await,
BusResponse::TopFlows(..) => top_flows::top_flows(&response).await,
BusResponse::TopFlows(..) => {
if let Some(tx) = &collect_top_flows {
let _ = tx.send(response); // Ignoring the error, it's ok if the channel closed
}
}
_ => {}
}
}

View File

@ -1,12 +0,0 @@
use lqos_bus::{BusResponse, FlowbeeSummaryData};
use once_cell::sync::Lazy;
use std::sync::Mutex;
pub static TOP_FLOWS: Lazy<Mutex<Vec<FlowbeeSummaryData>>> = Lazy::new(|| Mutex::new(Vec::new()));
pub async fn top_flows(response: &BusResponse) {
if let BusResponse::TopFlows(stats) = response {
let mut top_hosts = TOP_FLOWS.lock().unwrap();
*top_hosts = stats.clone();
}
}

View File

@ -10,22 +10,26 @@ use ratatui::prelude::*;
use tokio::sync::mpsc::Sender;
use std::io::Stdout;
use self::top_flows::TopFlows;
pub struct TopUi {
show_cpus: bool,
show_throughput_sparkline: bool,
_bus_sender: Sender<BusMessage>,
bus_sender: Sender<BusMessage>,
sparkline: NetworkSparkline,
main_widget: MainWidget,
main_widget: Box<dyn TopWidget>,
}
impl TopUi {
/// Create a new TopUi instance. This will initialize the UI framework.
pub fn new(bus_sender: Sender<BusMessage>) -> Self {
let mut main_widget = Box::new(TopFlows::new(bus_sender.clone()));
main_widget.enable();
TopUi {
show_cpus: true,
show_throughput_sparkline: false,
main_widget: MainWidget::Hosts,
_bus_sender: bus_sender.clone(),
main_widget,
bus_sender: bus_sender.clone(),
sparkline: NetworkSparkline::new(bus_sender.clone()),
}
}
@ -42,8 +46,12 @@ impl TopUi {
self.sparkline.disable();
}
}
'h' => self.main_widget = MainWidget::Hosts,
'f' => self.main_widget = MainWidget::Flows,
//'h' => self.main_widget = MainWidget::Hosts,
'f' => {
self.main_widget.disable();
self.main_widget = Box::new(TopFlows::new(self.bus_sender.clone()));
self.main_widget.enable();
}
_ => {}
}
}
@ -93,13 +101,17 @@ impl TopUi {
}
// And finally the main panel
match self.main_widget {
self.main_widget.set_size(main_layout[final_region]);
self.main_widget.tick();
self.main_widget.render_to_frame(frame);
/*match self.main_widget {
MainWidget::Hosts => {
frame.render_widget(top_hosts::hosts(), main_layout[final_region]);
}
MainWidget::Flows => {
frame.render_widget(top_flows::flows(), main_layout[final_region]);
}
}
}*/
}
}

View File

@ -1,45 +1,101 @@
use super::TopWidget;
use lqos_bus::{BusResponse, FlowbeeSummaryData};
use lqos_utils::packet_scale::scale_bits;
use ratatui::{prelude::*, widgets::*};
use crate::bus::top_flows::TOP_FLOWS;
pub struct TopFlows {
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
rx: std::sync::mpsc::Receiver<BusResponse>,
tx: std::sync::mpsc::Sender<BusResponse>,
size: Rect,
flows: Vec<FlowbeeSummaryData>,
}
pub fn flows() -> impl Widget {
let block = Block::default()
//.title("Top Downloaders")
.borders(Borders::NONE)
.style(Style::default().fg(Color::Green));
let mut rows = Vec::new();
let lock = TOP_FLOWS.lock().unwrap();
for flow in lock.iter() {
rows.push(
Row::new(vec![
Cell::from(Text::from(flow.local_ip.to_string())),
Cell::from(Text::from(flow.remote_ip.to_string())),
Cell::from(Text::from(flow.analysis.to_string())),
Cell::from(Text::from(scale_bits(flow.bytes_sent[0]))),
Cell::from(Text::from(scale_bits(flow.bytes_sent[1]))),
Cell::from(Text::from(format!("{}/{}", flow.tcp_retransmits[0], flow.tcp_retransmits[1]))),
Cell::from(Text::from(format!("{:.1}/{:.1}", flow.rtt_nanos[0] as f64 / 1000000. , flow.tcp_retransmits[1] as f64 / 1000000.))),
Cell::from(Text::from(flow.remote_asn_name.to_string())),
]).style(style::Style::default().fg(Color::White)),
);
impl TopWidget for TopFlows {
fn enable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::EnableTopFlows(self.tx.clone()))
.unwrap();
}
let header = Row::new(vec![
Cell::from(Text::from("Src IP")),
Cell::from(Text::from("Dst IP")),
Cell::from(Text::from("Type")),
Cell::from(Text::from("Upload")),
Cell::from(Text::from("Download")),
Cell::from(Text::from("Retransmits")),
Cell::from(Text::from("RTT (ms)")),
Cell::from(Text::from("ASN")),
]).style(style::Style::default().fg(Color::Yellow).bg(Color::Blue));
fn disable(&mut self) {
self.bus_link
.blocking_send(crate::bus::BusMessage::DisableTopFlows)
.unwrap();
}
Table::new(rows, [15, 15, 20, 14, 14, 10, 15, 20])
.block(block)
.header(header)
fn set_size(&mut self, size: Rect) {
self.size = size;
}
fn tick(&mut self) {
while let Ok(msg) = self.rx.try_recv() {
if let BusResponse::TopFlows(flows) = msg {
self.flows = flows;
}
}
}
fn render_to_frame(&mut self, frame: &mut Frame) {
let block = Block::default()
//.title("Top Downloaders")
.borders(Borders::NONE)
.style(Style::default().fg(Color::Green));
let mut rows = Vec::new();
for flow in self.flows.iter() {
rows.push(
Row::new(vec![
Cell::from(Text::from(flow.local_ip.to_string())),
Cell::from(Text::from(flow.remote_ip.to_string())),
Cell::from(Text::from(flow.analysis.to_string())),
Cell::from(Text::from(scale_bits(flow.bytes_sent[0]))),
Cell::from(Text::from(scale_bits(flow.bytes_sent[1]))),
Cell::from(Text::from(format!(
"{}/{}",
flow.tcp_retransmits[0], flow.tcp_retransmits[1]
))),
Cell::from(Text::from(format!(
"{:.1}/{:.1}",
flow.rtt_nanos[0] as f64 / 1000000.,
flow.tcp_retransmits[1] as f64 / 1000000.
))),
Cell::from(Text::from(flow.remote_asn_name.to_string())),
])
.style(style::Style::default().fg(Color::White)),
);
}
let header = Row::new(vec![
Cell::from(Text::from("Src IP")),
Cell::from(Text::from("Dst IP")),
Cell::from(Text::from("Type")),
Cell::from(Text::from("Upload")),
Cell::from(Text::from("Download")),
Cell::from(Text::from("Retransmits")),
Cell::from(Text::from("RTT (ms)")),
Cell::from(Text::from("ASN")),
])
.style(style::Style::default().fg(Color::Yellow).bg(Color::Blue));
let table = Table::new(rows, [15, 15, 20, 14, 14, 10, 15, 20])
.block(block)
.header(header);
frame.render_widget(table, self.size);
}
}
impl TopFlows {
pub fn new(bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>) -> Self {
let (tx, rx) = std::sync::mpsc::channel::<BusResponse>();
Self {
bus_link,
tx,
rx,
size: Rect::default(),
flows: Vec::new(),
}
}
}