Flow counting

This commit is contained in:
Herbert Wolverson
2024-06-21 14:35:09 -05:00
parent f5cef3bbfb
commit 3a73eec975
4 changed files with 108 additions and 1 deletions

View File

@@ -0,0 +1,78 @@
import {DashboardGraph} from "./dashboard_graph";
import {scaleNumber} from "../scaling";
const RING_SIZE = 60 * 5; // 5 Minutes
export class FlowCountGraph extends DashboardGraph {
constructor(id) {
super(id);
this.ringbuffer = new RingBuffer(RING_SIZE);
let xaxis = [];
for (let i=0; i<RING_SIZE; i++) {
xaxis.push(i);
}
this.option = {
xAxis: {
type: 'category',
data: xaxis,
},
yAxis: {
type: 'value',
axisLabel: {
formatter: (val) => {
return scaleNumber(Math.abs(val));
},
}
},
series: {
name: 'flows',
data: [],
type: 'line',
},
tooltip: {
trigger: 'item',
},
}
this.option && this.chart.setOption(this.option);
}
update(shaped, unshaped) {
this.chart.hideLoading();
this.ringbuffer.push(shaped, unshaped);
this.option.series.data = this.ringbuffer.series();
this.chart.setOption(this.option);
}
}
class RingBuffer {
constructor(size) {
this.size = size;
let data = [];
for (let i=0; i<size; i++) {
data.push(0);
}
this.head = 0;
this.data = data;
}
push(flows) {
this.data[this.head] = flows;
this.head += 1;
this.head %= this.size;
}
series() {
let result = [];
for (let i=this.head; i<this.size; i++) {
result.push(this.data[i]);
}
for (let i=0; i<this.head; i++) {
result.push(this.data[i]);
}
return result;
}
}

View File

@@ -4,12 +4,14 @@ import {PacketsPerSecondBar} from "./graphs/packets_bar";
import {ShapedUnshapedPie} from "./graphs/shaped_unshaped_pie";
import {ThroughputRingBufferGraph} from "./graphs/throughput_ring_graph";
import {RttHistogram} from "./graphs/rtt_histo";
import {FlowCountGraph} from "./graphs/flows_graph";
let tpBits = null;
let tpPackets = null;
let tpShaped = null;
let tpRing = null;
let rttHisto = null;
let tpFlows = null;
function onMessage(msg) {
switch (msg.event) {
@@ -21,6 +23,8 @@ function onMessage(msg) {
tpRing = new ThroughputRingBufferGraph("tpRing");
} else if (msg.channel === "rtt") {
rttHisto = new RttHistogram("rttHisto");
} else if (msg.channel === "flows") {
tpFlows = new FlowCountGraph("tpFlows");
}
}
break;
@@ -35,9 +39,12 @@ function onMessage(msg) {
break;
case "histogram": {
rttHisto.update(msg.data);
}
case "flows": {
tpFlows.update(msg.data);
}
break;
}
}
subscribeWS(["throughput", "rtt"], onMessage);
subscribeWS(["throughput", "rtt", "flows"], onMessage);

View File

@@ -14,6 +14,11 @@
<div id="tpShaped" class="dashgraph">
</div>
</div>
<div class="col-3 dashbox">
<h5 class="dashbox-title">Tracked Flows</h5>
<div id="tpFlows" class="dashgraph">
</div>
</div>
</div>
<div class="row">

View File

@@ -7,6 +7,7 @@ use tokio::sync::{mpsc::Sender, Mutex};
use lqos_bus::BusResponse;
use crate::throughput_tracker::{rtt_histogram, THROUGHPUT_TRACKER};
use crate::throughput_tracker::flow_data::ALL_FLOWS;
pub fn websocket_router() -> Router {
let channels = PubSub::new();
@@ -20,6 +21,7 @@ pub fn websocket_router() -> Router {
enum Channel {
Throughput,
Rtt,
Flows,
}
impl Channel {
@@ -27,6 +29,7 @@ impl Channel {
match self {
Channel::Throughput => "throughput",
Channel::Rtt => "rtt",
Channel::Flows => "flows",
}
}
}
@@ -76,6 +79,19 @@ async fn channel_ticker(channels: Arc<PubSub>) {
).to_string();
channels.send_and_clean(Channel::Rtt, rtt_histo).await;
}
// Flows
let active_flows = {
let lock = ALL_FLOWS.lock().unwrap();
lock.len() as u64
};
let active_flows = json!(
{
"event": "flows",
"data": active_flows,
}
).to_string();
channels.send_and_clean(Channel::Flows, active_flows).await;
}
}
@@ -106,6 +122,7 @@ impl PubSub {
match channel.as_str() {
"throughput" => self.do_subscribe(Channel::Throughput, tx).await,
"rtt" => self.do_subscribe(Channel::Rtt, tx).await,
"flows" => self.do_subscribe(Channel::Flows, tx).await,
_ => log::warn!("Unknown channel: {}", channel),
}
}