Plumbing in place for Cake displays. Display the backlog and memory usage.

This commit is contained in:
Herbert Wolverson
2024-07-16 13:49:52 -05:00
parent 0693c71296
commit 822743605e
5 changed files with 241 additions and 1 deletions

View File

@@ -9,6 +9,7 @@ import {scaleNanos, scaleNumber} from "./helpers/scaling";
import {DevicePingHistogram} from "./graphs/device_ping_graph";
import {FlowsSankey} from "./graphs/flow_sankey";
import {subscribeWS} from "./pubsub/ws";
import {CakeBacklog} from "./graphs/cake_backlog";
const params = new Proxy(new URLSearchParams(window.location.search), {
get: (searchParams, prop) => searchParams.get(prop),
@@ -555,6 +556,22 @@ function onTreeEvent(msg) {
});
}
function subscribeToCake() {
let backlogGraph = new CakeBacklog("cakeBacklog");
channelLink = new DirectChannel({
CakeWatcher: {
circuit: circuit_id
}
}, (msg) => {
//console.log(msg);
// Cake Memory Usage
$("#cakeQueueMemory").text(scaleNumber(msg.current_download.memory_used) + " / " + scaleNumber(msg.current_upload.memory_used));
backlogGraph.update(msg);
backlogGraph.chart.resize();
});
}
function loadInitial() {
$.ajax({
type: "POST",
@@ -581,7 +598,8 @@ function loadInitial() {
connectPrivateChannel();
connectPingers(circuits);
connectFlowChannel();
initialFunnel(circuit.parent_node)
initialFunnel(circuit.parent_node);
subscribeToCake();
},
error: () => {
alert("Circuit with id " + circuit_id + " not found");

View File

@@ -0,0 +1,171 @@
import {DashboardGraph} from "./dashboard_graph";
import {scaleNumber} from "../helpers/scaling";
export class CakeBacklog extends DashboardGraph {
constructor(id) {
super(id);
let xaxis = [];
for (let i=0; i<600; i++) {
xaxis.push(i);
}
this.option = {
title: {
text: "Backlog",
},
legend: {
orient: "horizontal",
right: 10,
top: "bottom",
selectMode: false,
textStyle: {
color: '#aaa'
},
data: [
{
name: "Bulk",
icon: 'circle',
itemStyle: {
color: "gray"
}
}, {
name: "Best Effort",
icon: 'circle',
itemStyle: {
color: "green"
}
}, {
name: "RT Video",
icon: 'circle',
itemStyle: {
color: "orange"
}
}, {
name: "Voice",
icon: 'circle',
itemStyle: {
color: "yellow"
}
}
]
},
xAxis: {
type: 'category',
data: xaxis,
},
yAxis: {
type: 'value',
axisLabel: {
formatter: (val) => {
return scaleNumber(Math.abs(val), 0);
},
}
},
series: [
{
name: 'Bulk',
data: [],
type: 'line',
lineStyle: {
color: 'gray',
},
symbol: 'none',
},
{
name: 'Best Effort',
data: [],
type: 'line',
lineStyle: {
color: 'green',
},
symbol: 'none',
},
{
name: 'RT Video',
data: [],
type: 'line',
lineStyle: {
color: 'orange',
},
symbol: 'none',
},
{
name: 'Voice',
data: [],
type: 'line',
lineStyle: {
color: 'yellow',
},
symbol: 'none',
},
{
name: 'Bulk Up',
data: [],
type: 'line',
lineStyle: {
color: 'gray',
},
symbol: 'none',
},
{
name: 'Best Effort Up',
data: [],
type: 'line',
lineStyle: {
color: 'green',
},
symbol: 'none',
},
{
name: 'RT Video Up',
data: [],
type: 'line',
lineStyle: {
color: 'orange',
},
symbol: 'none',
},
{
name: 'RT Voice Up',
data: [],
type: 'line',
lineStyle: {
color: 'yellow',
},
symbol: 'none',
},
],
tooltip: {
trigger: 'item',
},
animation: false,
}
this.option && this.chart.setOption(this.option);
}
update(msg) {
this.chart.hideLoading();
for (let i=0; i<8; i++) {
this.option.series[i].data = [];
}
//console.log(msg);
for (let i=msg.history_head; i<600; i++) {
for (let j=0; j<4; j++) {
if (msg.history[i][0].tins[0] === undefined) continue;
this.option.series[j].data.push(msg.history[i][0].tins[j].backlog_bytes);
this.option.series[j+4].data.push(0 - msg.history[i][1].tins[j].backlog_bytes);
}
}
for (let i=0; i<msg.history_head; i++) {
for (let j=0; j<4; j++) {
if (msg.history[i][0].tins[0] === undefined) continue;
this.option.series[j].data.push(msg.history[i][0].tins[j].backlog_bytes);
this.option.series[j+4].data.push(0 - msg.history[i][1].tins[j].backlog_bytes);
}
}
this.chart.setOption(this.option);
}
}

View File

@@ -49,6 +49,11 @@
<i class="fa fa-tree"></i> Queue Tree
</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="cake-tab" data-bs-toggle="tab" data-bs-target="#cake" type="button" role="tab" aria-controls="contact" aria-selected="false">
<i class="fa fa-birthday-cake"></i> CAKE Shaper Overview
</button>
</li>
</ul>
<div class="tab-content" id="myTabContent">
<div class="tab-pane fade show active" id="devs" role="tabpanel" aria-labelledby="devs-tab">
@@ -77,6 +82,16 @@
</div>
</div>
</div>
<div class="tab-pane fade" id="cake" role="tabpanel" aria-labelledby="cake-tab">
<div class="row">
<div class="col-4">
<div id="cakeBacklog" style="height: 250px"></div>
</div>
<div class="col-3">
Queue Memory: <span id="cakeQueueMemory">?</span>
</div>
</div>
</div>
</div>

View File

@@ -1,12 +1,14 @@
mod circuit;
mod ping_monitor;
mod flows_by_circuit;
mod cake_watcher;
use axum::extract::WebSocketUpgrade;
use axum::extract::ws::{Message, WebSocket};
use axum::response::IntoResponse;
use serde::{Deserialize, Serialize};
use tokio::spawn;
use crate::node_manager::ws::single_user_channels::cake_watcher::cake_watcher;
use crate::node_manager::ws::single_user_channels::circuit::circuit_watcher;
use crate::node_manager::ws::single_user_channels::flows_by_circuit::flows_by_circuit;
use crate::node_manager::ws::single_user_channels::ping_monitor::ping_monitor;
@@ -16,6 +18,7 @@ enum PrivateChannel {
CircuitWatcher { circuit: String },
PingMonitor { ips: Vec<(String, String)> },
FlowsByCircuit { circuit: String },
CakeWatcher { circuit: String },
}
pub(super) async fn private_channel_ws_handler(
@@ -50,6 +53,9 @@ async fn handle_socket(mut socket: WebSocket) {
PrivateChannel::FlowsByCircuit { circuit } => {
spawn(flows_by_circuit(circuit, tx.clone()));
},
PrivateChannel::CakeWatcher { circuit } => {
spawn(cake_watcher(circuit, tx.clone()));
},
}
} else {
log::debug!("Failed to parse private message: {:?}", text);

View File

@@ -0,0 +1,30 @@
use lqos_bus::QueueStoreTransit;
use lqos_config::load_config;
use lqos_queue_tracker::{add_watched_queue, get_raw_circuit_data, still_watching};
pub(super) async fn cake_watcher(circuit_id: String, tx: tokio::sync::mpsc::Sender<String>) {
let interval_ms = if let Ok(config) = load_config() {
config.queue_check_period_ms
} else {
0
};
add_watched_queue(&circuit_id);
let mut ticker = tokio::time::interval(tokio::time::Duration::from_millis(interval_ms));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
still_watching(&circuit_id);
match get_raw_circuit_data(&circuit_id) {
lqos_bus::BusResponse::RawQueueData(Some(msg)) => {
let json = serde_json::to_string(&QueueStoreTransit::from(*msg)).unwrap();
let send_result = tx.send(json.to_string()).await;
if send_result.is_err() {
break;
}
}
_ => {}
}
}
}