diff --git a/src/rust/lqosd/src/node_manager/js_build/src/circuit.js b/src/rust/lqosd/src/node_manager/js_build/src/circuit.js index 2b3c9ace..2ed3e188 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/circuit.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/circuit.js @@ -9,6 +9,7 @@ import {scaleNanos, scaleNumber} from "./helpers/scaling"; import {DevicePingHistogram} from "./graphs/device_ping_graph"; import {FlowsSankey} from "./graphs/flow_sankey"; import {subscribeWS} from "./pubsub/ws"; +import {CakeBacklog} from "./graphs/cake_backlog"; const params = new Proxy(new URLSearchParams(window.location.search), { get: (searchParams, prop) => searchParams.get(prop), @@ -555,6 +556,22 @@ function onTreeEvent(msg) { }); } +function subscribeToCake() { + let backlogGraph = new CakeBacklog("cakeBacklog"); + channelLink = new DirectChannel({ + CakeWatcher: { + circuit: circuit_id + } + }, (msg) => { + //console.log(msg); + + // Cake Memory Usage + $("#cakeQueueMemory").text(scaleNumber(msg.current_download.memory_used) + " / " + scaleNumber(msg.current_upload.memory_used)); + backlogGraph.update(msg); + backlogGraph.chart.resize(); + }); +} + function loadInitial() { $.ajax({ type: "POST", @@ -581,7 +598,8 @@ function loadInitial() { connectPrivateChannel(); connectPingers(circuits); connectFlowChannel(); - initialFunnel(circuit.parent_node) + initialFunnel(circuit.parent_node); + subscribeToCake(); }, error: () => { alert("Circuit with id " + circuit_id + " not found"); diff --git a/src/rust/lqosd/src/node_manager/js_build/src/graphs/cake_backlog.js b/src/rust/lqosd/src/node_manager/js_build/src/graphs/cake_backlog.js new file mode 100644 index 00000000..bde21d17 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/js_build/src/graphs/cake_backlog.js @@ -0,0 +1,171 @@ +import {DashboardGraph} from "./dashboard_graph"; +import {scaleNumber} from "../helpers/scaling"; + +export class CakeBacklog extends DashboardGraph { + constructor(id) { + super(id); + + let xaxis = []; + for (let i=0; i<600; i++) { + xaxis.push(i); + } + + this.option = { + title: { + text: "Backlog", + }, + legend: { + orient: "horizontal", + right: 10, + top: "bottom", + selectMode: false, + textStyle: { + color: '#aaa' + }, + data: [ + { + name: "Bulk", + icon: 'circle', + itemStyle: { + color: "gray" + } + }, { + name: "Best Effort", + icon: 'circle', + itemStyle: { + color: "green" + } + }, { + name: "RT Video", + icon: 'circle', + itemStyle: { + color: "orange" + } + }, { + name: "Voice", + icon: 'circle', + itemStyle: { + color: "yellow" + } + } + ] + }, + xAxis: { + type: 'category', + data: xaxis, + }, + yAxis: { + type: 'value', + axisLabel: { + formatter: (val) => { + return scaleNumber(Math.abs(val), 0); + }, + } + }, + series: [ + { + name: 'Bulk', + data: [], + type: 'line', + lineStyle: { + color: 'gray', + }, + symbol: 'none', + }, + { + name: 'Best Effort', + data: [], + type: 'line', + lineStyle: { + color: 'green', + }, + symbol: 'none', + }, + { + name: 'RT Video', + data: [], + type: 'line', + lineStyle: { + color: 'orange', + }, + symbol: 'none', + }, + { + name: 'Voice', + data: [], + type: 'line', + lineStyle: { + color: 'yellow', + }, + symbol: 'none', + }, + { + name: 'Bulk Up', + data: [], + type: 'line', + lineStyle: { + color: 'gray', + }, + symbol: 'none', + }, + { + name: 'Best Effort Up', + data: [], + type: 'line', + lineStyle: { + color: 'green', + }, + symbol: 'none', + }, + { + name: 'RT Video Up', + data: [], + type: 'line', + lineStyle: { + color: 'orange', + }, + symbol: 'none', + }, + { + name: 'RT Voice Up', + data: [], + type: 'line', + lineStyle: { + color: 'yellow', + }, + symbol: 'none', + }, + ], + tooltip: { + trigger: 'item', + }, + animation: false, + } + this.option && this.chart.setOption(this.option); + } + + update(msg) { + this.chart.hideLoading(); + + for (let i=0; i<8; i++) { + this.option.series[i].data = []; + } + //console.log(msg); + for (let i=msg.history_head; i<600; i++) { + for (let j=0; j<4; j++) { + if (msg.history[i][0].tins[0] === undefined) continue; + this.option.series[j].data.push(msg.history[i][0].tins[j].backlog_bytes); + this.option.series[j+4].data.push(0 - msg.history[i][1].tins[j].backlog_bytes); + } + } + for (let i=0; i Queue Tree +
@@ -77,6 +82,16 @@
+
+
+
+
+
+
+ Queue Memory: ? +
+
+
diff --git a/src/rust/lqosd/src/node_manager/ws/single_user_channels.rs b/src/rust/lqosd/src/node_manager/ws/single_user_channels.rs index 8065ebac..d8b4ae1e 100644 --- a/src/rust/lqosd/src/node_manager/ws/single_user_channels.rs +++ b/src/rust/lqosd/src/node_manager/ws/single_user_channels.rs @@ -1,12 +1,14 @@ mod circuit; mod ping_monitor; mod flows_by_circuit; +mod cake_watcher; use axum::extract::WebSocketUpgrade; use axum::extract::ws::{Message, WebSocket}; use axum::response::IntoResponse; use serde::{Deserialize, Serialize}; use tokio::spawn; +use crate::node_manager::ws::single_user_channels::cake_watcher::cake_watcher; use crate::node_manager::ws::single_user_channels::circuit::circuit_watcher; use crate::node_manager::ws::single_user_channels::flows_by_circuit::flows_by_circuit; use crate::node_manager::ws::single_user_channels::ping_monitor::ping_monitor; @@ -16,6 +18,7 @@ enum PrivateChannel { CircuitWatcher { circuit: String }, PingMonitor { ips: Vec<(String, String)> }, FlowsByCircuit { circuit: String }, + CakeWatcher { circuit: String }, } pub(super) async fn private_channel_ws_handler( @@ -50,6 +53,9 @@ async fn handle_socket(mut socket: WebSocket) { PrivateChannel::FlowsByCircuit { circuit } => { spawn(flows_by_circuit(circuit, tx.clone())); }, + PrivateChannel::CakeWatcher { circuit } => { + spawn(cake_watcher(circuit, tx.clone())); + }, } } else { log::debug!("Failed to parse private message: {:?}", text); diff --git a/src/rust/lqosd/src/node_manager/ws/single_user_channels/cake_watcher.rs b/src/rust/lqosd/src/node_manager/ws/single_user_channels/cake_watcher.rs new file mode 100644 index 00000000..4b33f1cb --- /dev/null +++ b/src/rust/lqosd/src/node_manager/ws/single_user_channels/cake_watcher.rs @@ -0,0 +1,30 @@ +use lqos_bus::QueueStoreTransit; +use lqos_config::load_config; +use lqos_queue_tracker::{add_watched_queue, get_raw_circuit_data, still_watching}; + +pub(super) async fn cake_watcher(circuit_id: String, tx: tokio::sync::mpsc::Sender) { + let interval_ms = if let Ok(config) = load_config() { + config.queue_check_period_ms + } else { + 0 + }; + add_watched_queue(&circuit_id); + + let mut ticker = tokio::time::interval(tokio::time::Duration::from_millis(interval_ms)); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + ticker.tick().await; + still_watching(&circuit_id); + + match get_raw_circuit_data(&circuit_id) { + lqos_bus::BusResponse::RawQueueData(Some(msg)) => { + let json = serde_json::to_string(&QueueStoreTransit::from(*msg)).unwrap(); + let send_result = tx.send(json.to_string()).await; + if send_result.is_err() { + break; + } + } + _ => {} + } + } +} \ No newline at end of file