From 1a83857f9861ad5d3764581aa20233535fd39520 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Wed, 26 Jun 2024 15:08:17 -0500 Subject: [PATCH] Make channel creation by enum entry automatic (slightly slower start, but now I can't forget). Add worst 10 downloaders to the list of information channels. --- src/rust/Cargo.lock | 1 + src/rust/lqosd/Cargo.toml | 1 + .../js_build/src/dashlets/dashlet_index.js | 3 + .../src/dashlets/worst10_downloaders.js | 90 +++++++++++++++++++ .../src/node_manager/ws/publish_subscribe.rs | 15 +++- .../src/node_manager/ws/published_channels.rs | 7 +- src/rust/lqosd/src/node_manager/ws/ticker.rs | 1 + .../src/node_manager/ws/ticker/top_10.rs | 25 +++++- 8 files changed, 138 insertions(+), 5 deletions(-) create mode 100644 src/rust/lqosd/src/node_manager/js_build/src/dashlets/worst10_downloaders.js diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 2beb613f..c72b2a2b 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -2054,6 +2054,7 @@ dependencies = [ "serde", "serde_json", "signal-hook", + "strum", "sysinfo", "thiserror", "tokio", diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index b3c3c8db..a69f47f8 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -40,6 +40,7 @@ zerocopy = {version = "0.6.1", features = [ "simd" ] } fxhash = "0.2.1" axum = { version = "0.7.5", features = ["ws"] } tower-http = { version = "0.5.2", features = ["fs"] } +strum = { version = "0.26.3", features = ["derive"] } # Support JemAlloc on supported platforms [target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies] diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js index 28e3809d..335ccf1d 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js @@ -5,6 +5,7 @@ import {TrackedFlowsCount} from "./tracked_flow_count_dash"; import {ThroughputRingDash} from "./throughput_ring_dash"; import {RttHistoDash} from "./rtt_histo_dash"; import {Top10Downloaders} from "./top10_downloaders"; +import {Worst10Downloaders} from "./worst10_downloaders"; export const DashletMenu = [ { name: "Throughput Bits/Second", tag: "throughputBps", size: 3 }, @@ -14,6 +15,7 @@ export const DashletMenu = [ { name: "Last 5 Minutes Throughput", tag: "throughputRing", size: 6 }, { name: "Round-Trip Time Histogram", tag: "rttHistogram", size: 6 }, { name: "Top 10 Downloaders", tag: "top10downloaders", size: 6 }, + { name: "Worst 10 Round-Trip Time", tag: "worst10downloaders", size: 6 }, ]; export function widgetFactory(widgetName, count) { @@ -26,6 +28,7 @@ export function widgetFactory(widgetName, count) { case "throughputRing": widget = new ThroughputRingDash(count); break; case "rttHistogram": widget = new RttHistoDash(count); break; case "top10downloaders":widget = new Top10Downloaders(count); break; + case "worst10downloaders":widget = new Worst10Downloaders(count); break; default: { console.log("I don't know how to construct a widget of type [" + widgetName + "]"); return null; diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/worst10_downloaders.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/worst10_downloaders.js new file mode 100644 index 00000000..08775943 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/worst10_downloaders.js @@ -0,0 +1,90 @@ +import {BaseDashlet} from "./base_dashlet"; +import {RttHistogram} from "../graphs/rtt_histo"; +import {theading} from "../helpers/builders"; +import {scaleNumber, rttCircleSpan} from "../helpers/scaling"; + +export class Worst10Downloaders extends BaseDashlet { + constructor(slot) { + super(slot); + } + + title() { + return "Worst 10 Round-Trip Time"; + } + + subscribeTo() { + return [ "worst10downloaders" ]; + } + + buildContainer() { + let base = super.buildContainer(); + base.style.height = "250px"; + base.style.overflow = "auto"; + return base; + } + + setup() { + super.setup(); + } + + onMessage(msg) { + if (msg.event === "worst10downloaders") { + let target = document.getElementById(this.id); + + let t = document.createElement("table"); + t.classList.add("table", "table-striped", "tiny"); + + let th = document.createElement("thead"); + th.appendChild(theading("")); + th.appendChild(theading("IP Address/Circuit")); + th.appendChild(theading("DL ⬇️")); + th.appendChild(theading("UL ⬆️")); + th.appendChild(theading("RTT (ms)")); + th.appendChild(theading("TCP Retransmits")); + th.appendChild(theading("Shaped")); + t.appendChild(th); + + let tbody = document.createElement("tbody"); + msg.data.forEach((r) => { + let row = document.createElement("tr"); + + let circle = document.createElement("td"); + circle.appendChild(rttCircleSpan(r.median_tcp_rtt)); + row.appendChild(circle); + + let ip = document.createElement("td"); + ip.innerText = r.ip_address; + row.append(ip); + + let dl = document.createElement("td"); + dl.innerText = scaleNumber(r.bits_per_second[0]); + row.append(dl); + + let ul = document.createElement("td"); + ul.innerText = scaleNumber(r.bits_per_second[1]); + row.append(ul); + + let rtt = document.createElement("td"); + rtt.innerText = r.median_tcp_rtt.toFixed(2); + row.append(rtt); + + let tcp_xmit = document.createElement("td"); + tcp_xmit.innerText = r.tcp_retransmits[0] + " / " + r.tcp_retransmits[1]; + row.append(tcp_xmit); + + let shaped = document.createElement("td"); + shaped.innerText = r.plan[0] + " / " + r.plan[1]; + row.append(shaped); + + t.appendChild(row); + }); + t.appendChild(tbody); + + // Display it + while (target.children.length > 1) { + target.removeChild(target.lastChild); + } + target.appendChild(t); + } + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs b/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs index cad437cc..a77663fc 100644 --- a/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs +++ b/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs @@ -6,6 +6,7 @@ mod publisher_channel; mod subscriber; use std::sync::Arc; +use strum::IntoEnumIterator; use tokio::sync::mpsc::Sender; use tokio::sync::Mutex; use crate::node_manager::ws::publish_subscribe::publisher_channel::PublisherChannel; @@ -25,12 +26,20 @@ impl PubSub { /// Constructs a new PubSub interface with a default set of /// channels. pub(super) fn new() -> Arc { + let mut channels = Vec::new(); + for c in PublishedChannels::iter() { + channels.push( + PublisherChannel::new(c) + ); + } + let channels = vec![ PublisherChannel::new(PublishedChannels::Throughput), PublisherChannel::new(PublishedChannels::RttHistogram), PublisherChannel::new(PublishedChannels::FlowCount), PublisherChannel::new(PublishedChannels::Cadence), PublisherChannel::new(PublishedChannels::Top10Downloaders), + PublisherChannel::new(PublishedChannels::Worst10Downloaders), ]; let result = Self { @@ -46,9 +55,11 @@ impl PubSub { let mut channels = self.channels.lock().await; if let Some(channel) = channels.iter_mut().find(|c| c.channel_type == channel) { channel.subscribe(sender).await; + } else { + log::warn!("Tried to subscribe to channel {:?}, which doesn't exist", channel); } } - + /// Checks that a channel has anyone listening for it. If it doesn't, /// there's no point in using CPU to process it! pub(super) async fn is_channel_alive(&self, channel: PublishedChannels) -> bool { @@ -59,7 +70,7 @@ impl PubSub { false } } - + pub(super) async fn send(&self, channel: PublishedChannels, message: String) { let mut channels = self.channels.lock().await; if let Some(channel) = channels.iter_mut().find(|c| c.channel_type == channel) { diff --git a/src/rust/lqosd/src/node_manager/ws/published_channels.rs b/src/rust/lqosd/src/node_manager/ws/published_channels.rs index a50f4ad4..2cc52cf2 100644 --- a/src/rust/lqosd/src/node_manager/ws/published_channels.rs +++ b/src/rust/lqosd/src/node_manager/ws/published_channels.rs @@ -1,4 +1,6 @@ -#[derive(PartialEq, Clone, Copy, Debug)] +use strum::EnumIter; + +#[derive(PartialEq, Clone, Copy, Debug, EnumIter)] pub enum PublishedChannels { /// Provides a 1-second tick notification to the client Cadence, @@ -6,6 +8,7 @@ pub enum PublishedChannels { RttHistogram, FlowCount, Top10Downloaders, + Worst10Downloaders, } impl PublishedChannels { @@ -16,6 +19,7 @@ impl PublishedChannels { Self::FlowCount => "flowCount", Self::Cadence => "cadence", Self::Top10Downloaders => "top10downloaders", + Self::Worst10Downloaders => "worst10downloaders", } } @@ -26,6 +30,7 @@ impl PublishedChannels { "flowCount" => Some(Self::FlowCount), "cadence" => Some(Self::Cadence), "top10downloaders" => Some(Self::Top10Downloaders), + "worst10downloaders" => Some(Self::Worst10Downloaders), _ => None, } } diff --git a/src/rust/lqosd/src/node_manager/ws/ticker.rs b/src/rust/lqosd/src/node_manager/ws/ticker.rs index 25009091..d7c38fc3 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker.rs @@ -20,6 +20,7 @@ pub(super) async fn channel_ticker(channels: Arc) { rtt_histogram::rtt_histo(channels.clone()), flow_counter::flow_count(channels.clone()), top_10::top_10_downloaders(channels.clone()), + top_10::worst_10_downloaders(channels.clone()), ); } } \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/top_10.rs b/src/rust/lqosd/src/node_manager/ws/ticker/top_10.rs index 78a924ef..6a09d36d 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker/top_10.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker/top_10.rs @@ -4,13 +4,13 @@ use lqos_bus::BusResponse; use crate::node_manager::ws::publish_subscribe::PubSub; use crate::node_manager::ws::published_channels::PublishedChannels; use crate::node_manager::ws::ticker::ipstats_conversion::IpStatsWithPlan; -use crate::throughput_tracker::top_n; +use crate::throughput_tracker::{top_n, worst_n}; pub async fn top_10_downloaders(channels: Arc) { if !channels.is_channel_alive(PublishedChannels::Top10Downloaders).await { return; } - + if let BusResponse::TopDownloaders(top) = top_n(0, 10) { let result: Vec = top .iter() @@ -25,4 +25,25 @@ pub async fn top_10_downloaders(channels: Arc) { ).to_string(); channels.send(PublishedChannels::Top10Downloaders, message).await; } +} + +pub async fn worst_10_downloaders(channels: Arc) { + if !channels.is_channel_alive(PublishedChannels::Worst10Downloaders).await { + return; + } + + if let BusResponse::WorstRtt(top) = worst_n(0, 10) { + let result: Vec = top + .iter() + .map(|stat| stat.into()) + .collect(); + + let message = json!( + { + "event": "worst10downloaders", + "data": result + } + ).to_string(); + channels.send(PublishedChannels::Worst10Downloaders, message).await; + } } \ No newline at end of file