First try at a flow duration graph. May need some tweaking once I see it with real data.

This commit is contained in:
Herbert Wolverson 2024-07-26 09:04:50 -05:00
parent 726c4478f5
commit df783f15b8
7 changed files with 120 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import {TopTreeSankey} from "./top_tree_sankey";
import {Top10DownloadersVisual} from "./top10_downloads_graphic";
import {Worst10DownloadersVisual} from "./worst10_downloaders_graphic";
import {Worst10RetransmitsVisual} from "./worst10_retransmits_graphic";
import {FlowDurationDash} from "./flow_durations_dash";
export const DashletMenu = [
{ name: "Throughput Bits/Second", tag: "throughputBps", size: 3 },
@ -41,6 +42,7 @@ export const DashletMenu = [
{ name: "Top 10 Flows (total bytes)", tag: "top10flowsBytes", size: 6 },
{ name: "Top 10 Flows (rate)", tag: "top10flowsRate", size: 6 },
{ name: "Top 10 Endpoints by Country", tag: "top10endpointsCountry", size: 6 },
{ name: "Flow Duration", tag: "flowDuration", size: 6 },
{ name: "Ether Protocols", tag: "etherProtocols", size: 6 },
{ name: "IP Protocols", tag: "ipProtocols", size: 6 },
{ name: "CPU Utilization", tag: "cpu", size: 3 },
@ -75,6 +77,7 @@ export function widgetFactory(widgetName, count) {
case "top10endpointsCountry" : widget = new Top10EndpointsByCountry(count); break;
case "etherProtocols" : widget = new EtherProtocols(count); break;
case "ipProtocols" : widget = new IpProtocols(count); break;
case "flowDuration" : widget = new FlowDurationDash(count); break;
case "cpu" : widget = new CpuDash(count); break;
case "ram" : widget = new RamDash(count); break;
case "treeSummary" : widget = new TopTreeSummary(count); break;

View File

@ -0,0 +1,37 @@
import {BaseDashlet} from "./base_dashlet";
import {FlowDurationsGraph} from "../graphs/flow_durations_graph";
export class FlowDurationDash extends BaseDashlet{
constructor(slot) {
super(slot);
}
title() {
return "Flow Duration";
}
tooltip() {
return "<h5>Flow Duration</h5><p>How long do flows last in seconds? Most flows are very short, while some can last a very long time.</p>";
}
subscribeTo() {
return [ "FlowDurations" ];
}
buildContainer() {
let base = super.buildContainer();
base.appendChild(this.graphDiv());
return base;
}
setup() {
super.setup();
this.graph = new FlowDurationsGraph(this.graphDivId());
}
onMessage(msg) {
if (msg.event === "FlowDurations") {
this.graph.update(msg.data);
}
}
}

View File

@ -0,0 +1,40 @@
import {DashboardGraph} from "./dashboard_graph";
export class FlowDurationsGraph extends DashboardGraph {
constructor(id) {
super(id);
this.option = {
xAxis: {
type: 'category',
data: [],
name: "Seconds"
},
yAxis: {
type: 'value',
name: "Samples"
},
series: {
data: [],
type: 'bar',
}
};
this.option && this.chart.setOption(this.option);
}
update(data) {
this.chart.hideLoading();
let x = [];
let y = [];
data.forEach((r) => {
x.push(r.duration);
y.push(r.count);
});
this.option.xAxis.data = x;
this.option.series.data = y;
this.chart.setOption(this.option);
}
}

View File

@ -13,6 +13,7 @@ pub enum PublishedChannels {
TopFlowsBytes,
TopFlowsRate,
EndpointsByCountry,
FlowDurations,
EtherProtocols,
IpProtocols,
Cpu,

View File

@ -42,6 +42,7 @@ async fn one_second_cadence(channels: Arc<PubSub>) {
let mc = channels.clone(); spawn(async move { flow_endpoints::endpoints_by_country(mc).await });
let mc = channels.clone(); spawn(async move { flow_endpoints::ether_protocols(mc).await });
let mc = channels.clone(); spawn(async move { flow_endpoints::ip_protocols(mc).await });
let mc = channels.clone(); spawn(async move { flow_endpoints::flow_duration(mc).await });
let mc = channels.clone(); spawn(async move { tree_summary::tree_summary(mc).await });
let mc = channels.clone(); spawn(async move { network_tree::network_tree(mc).await });
let mc = channels.clone(); spawn(async move { circuit_capacity::circuit_capacity(mc).await });

View File

@ -7,6 +7,7 @@ use lqos_bus::BusResponse;
use crate::node_manager::ws::publish_subscribe::PubSub;
use crate::node_manager::ws::published_channels::PublishedChannels;
use crate::throughput_tracker;
use crate::throughput_tracker::flow_data::RECENT_FLOWS;
pub async fn endpoints_by_country(channels: Arc<PubSub>) {
if !channels.is_channel_alive(PublishedChannels::EndpointsByCountry).await {
@ -61,4 +62,19 @@ pub async fn ip_protocols(channels: Arc<PubSub>) {
).to_string();
channels.send(PublishedChannels::IpProtocols, message).await;
}
}
pub async fn flow_duration(channels: Arc<PubSub>) {
if !channels.is_channel_alive(PublishedChannels::FlowDurations).await {
return;
}
let data = RECENT_FLOWS.flow_duration_summary();
let message = json!(
{
"event": PublishedChannels::FlowDurations.to_string(),
"data": data,
}
).to_string();
channels.send(PublishedChannels::FlowDurations, message).await;
}

View File

@ -5,6 +5,9 @@ use lqos_bus::BusResponse;
use lqos_sys::flowbee_data::FlowbeeKey;
use once_cell::sync::Lazy;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use itertools::Itertools;
use serde::Serialize;
use lqos_utils::units::DownUpOrder;
pub struct TimeBuffer {
@ -16,6 +19,12 @@ struct TimeEntry {
data: (FlowbeeKey, FlowbeeLocalData, FlowAnalysis),
}
#[derive(Debug, Serialize)]
pub struct FlowDurationSummary {
count: usize,
duration: u64,
}
impl TimeBuffer {
fn new() -> Self {
Self {
@ -239,6 +248,19 @@ impl TimeBuffer {
results.truncate(10);
results
}
pub fn flow_duration_summary(&self) -> Vec<FlowDurationSummary> {
let buffer = self.buffer.lock().unwrap();
buffer
.iter()
.map(|f| Duration::from_nanos(f.data.1.last_seen - f.data.1.start_time)) // Duration in nanoseconds
.map(|nanos| nanos.as_secs())
.sorted()
.dedup_with_count() // Now we're (count, duration in seconds)
.map(|(count, duration)| FlowDurationSummary { count, duration })
.collect()
}
}
pub static RECENT_FLOWS: Lazy<TimeBuffer> = Lazy::new(|| TimeBuffer::new());