diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js index 335ccf1d..901d4587 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js @@ -6,6 +6,7 @@ import {ThroughputRingDash} from "./throughput_ring_dash"; import {RttHistoDash} from "./rtt_histo_dash"; import {Top10Downloaders} from "./top10_downloaders"; import {Worst10Downloaders} from "./worst10_downloaders"; +import {Top10FlowsBytes} from "./top10flows"; export const DashletMenu = [ { name: "Throughput Bits/Second", tag: "throughputBps", size: 3 }, @@ -16,6 +17,7 @@ export const DashletMenu = [ { name: "Round-Trip Time Histogram", tag: "rttHistogram", size: 6 }, { name: "Top 10 Downloaders", tag: "top10downloaders", size: 6 }, { name: "Worst 10 Round-Trip Time", tag: "worst10downloaders", size: 6 }, + { name: "Top 10 Flows (total bytes)", tag: "top10flowsBytes", size: 6 }, ]; export function widgetFactory(widgetName, count) { @@ -29,6 +31,7 @@ export function widgetFactory(widgetName, count) { case "rttHistogram": widget = new RttHistoDash(count); break; case "top10downloaders":widget = new Top10Downloaders(count); break; case "worst10downloaders":widget = new Worst10Downloaders(count); break; + case "top10flowsBytes" : widget = new Top10FlowsBytes(count); break; default: { console.log("I don't know how to construct a widget of type [" + widgetName + "]"); return null; diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top10flows.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top10flows.js new file mode 100644 index 00000000..a290590d --- /dev/null +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top10flows.js @@ -0,0 +1,109 @@ +import {BaseDashlet} from "./base_dashlet"; +import {theading} from "../helpers/builders"; +import {scaleNumber, scaleNanos} from "../helpers/scaling"; + +export class Top10FlowsBytes extends BaseDashlet { + constructor(slot) { + super(slot); + } + + title() { + return "Top 10 Flows (by total bytes)"; + } + + subscribeTo() { + return [ "topFlowsBytes" ]; + } + + buildContainer() { + let base = super.buildContainer(); + base.style.height = "250px"; + base.style.overflow = "auto"; + return base; + } + + setup() { + super.setup(); + } + + onMessage(msg) { + if (msg.event === "topFlowsBytes") { + let target = document.getElementById(this.id); + + let t = document.createElement("table"); + t.classList.add("table", "table-striped", "tiny"); + + let th = document.createElement("thead"); + th.appendChild(theading("Protocol")); + th.appendChild(theading("Local IP")); + th.appendChild(theading("Remote IP")); + th.appendChild(theading("DL ⬇️")); + th.appendChild(theading("UL ⬆️")); + th.appendChild(theading("Total")); + th.appendChild(theading("⬇ RTT")); + th.appendChild(theading("️️⬆ RTT")); + th.appendChild(theading("TCP Retransmits")); + th.appendChild(theading("Remote ASN")); + th.appendChild(theading("Country")); + t.appendChild(th); + + let tbody = document.createElement("tbody"); + msg.data.forEach((r) => { + let row = document.createElement("tr"); + + let proto = document.createElement("td"); + proto.innerText = r.analysis; + row.appendChild(proto); + + let localIp = document.createElement("td"); + localIp.innerText = r.local_ip; + row.appendChild(localIp); + + let remoteIp = document.createElement("td"); + remoteIp.innerText = r.remote_ip; + row.appendChild(remoteIp); + + let dl = document.createElement("td"); + dl.innerText = scaleNumber(r.rate_estimate_bps[0]); + row.appendChild(dl); + + let ul = document.createElement("td"); + ul.innerText = scaleNumber(r.rate_estimate_bps[1]); + row.appendChild(ul); + + let total = document.createElement("td"); + total.innerText = scaleNumber(r.bytes_sent[0]) + " / " + scaleNumber(r.bytes_sent[1]); + row.appendChild(total); + + let rttD = document.createElement("td"); + rttD.innerText = scaleNanos(r.rtt_nanos[0]); + row.appendChild(rttD); + + let rttU = document.createElement("td"); + rttU.innerText = scaleNanos(r.rtt_nanos[1]); + row.appendChild(rttU); + + let tcp = document.createElement("td"); + tcp.innerText = r.tcp_retransmits[0] + " / " + r.tcp_retransmits[1]; + row.appendChild(tcp); + + let asn = document.createElement("td"); + asn.innerText = r.remote_asn_name; + row.appendChild(asn); + + let country = document.createElement("td"); + country.innerText = r.remote_asn_country; + row.appendChild(country); + + t.appendChild(row); + }); + t.appendChild(tbody); + + // Display it + while (target.children.length > 1) { + target.removeChild(target.lastChild); + } + target.appendChild(t); + } + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js b/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js index c897e302..7735c7ca 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js @@ -11,6 +11,18 @@ export function scaleNumber(n, fixed=2) { return n; } +export function scaleNanos(n) { + if (n == 0) return ""; + if (n > 1000000000) { + return (n / 1000000000).toFixed(2) + "s"; + } else if (n > 1000000) { + return (n / 1000000).toFixed(2) + "ms"; + } else if (n > 1000) { + return (n / 1000).toFixed(2) + "µs"; + } + return n + "ns"; +} + export function colorRamp(n) { if (n <= 100) { return "#aaffaa"; diff --git a/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs b/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs index a77663fc..6287639b 100644 --- a/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs +++ b/src/rust/lqosd/src/node_manager/ws/publish_subscribe.rs @@ -33,15 +33,6 @@ impl PubSub { ); } - let channels = vec![ - PublisherChannel::new(PublishedChannels::Throughput), - PublisherChannel::new(PublishedChannels::RttHistogram), - PublisherChannel::new(PublishedChannels::FlowCount), - PublisherChannel::new(PublishedChannels::Cadence), - PublisherChannel::new(PublishedChannels::Top10Downloaders), - PublisherChannel::new(PublishedChannels::Worst10Downloaders), - ]; - let result = Self { channels: Mutex::new(channels), }; diff --git a/src/rust/lqosd/src/node_manager/ws/published_channels.rs b/src/rust/lqosd/src/node_manager/ws/published_channels.rs index 2cc52cf2..991a47ef 100644 --- a/src/rust/lqosd/src/node_manager/ws/published_channels.rs +++ b/src/rust/lqosd/src/node_manager/ws/published_channels.rs @@ -9,6 +9,7 @@ pub enum PublishedChannels { FlowCount, Top10Downloaders, Worst10Downloaders, + TopFlowsBytes, } impl PublishedChannels { @@ -20,6 +21,7 @@ impl PublishedChannels { Self::Cadence => "cadence", Self::Top10Downloaders => "top10downloaders", Self::Worst10Downloaders => "worst10downloaders", + Self::TopFlowsBytes => "topFlowsBytes", } } @@ -31,6 +33,7 @@ impl PublishedChannels { "cadence" => Some(Self::Cadence), "top10downloaders" => Some(Self::Top10Downloaders), "worst10downloaders" => Some(Self::Worst10Downloaders), + "topFlowsBytes" => Some(Self::TopFlowsBytes), _ => None, } } diff --git a/src/rust/lqosd/src/node_manager/ws/ticker.rs b/src/rust/lqosd/src/node_manager/ws/ticker.rs index d7c38fc3..986fb392 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker.rs @@ -4,6 +4,7 @@ mod rtt_histogram; mod flow_counter; mod top_10; mod ipstats_conversion; +mod top_flows; use std::sync::Arc; use crate::node_manager::ws::publish_subscribe::PubSub; @@ -21,6 +22,7 @@ pub(super) async fn channel_ticker(channels: Arc) { flow_counter::flow_count(channels.clone()), top_10::top_10_downloaders(channels.clone()), top_10::worst_10_downloaders(channels.clone()), + top_flows::top_flows_bytes(channels.clone()), ); } } \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/top_flows.rs b/src/rust/lqosd/src/node_manager/ws/ticker/top_flows.rs new file mode 100644 index 00000000..83f0cba5 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/ws/ticker/top_flows.rs @@ -0,0 +1,25 @@ +use std::sync::Arc; + +use serde_json::json; + +use lqos_bus::{BusResponse, TopFlowType}; + +use crate::node_manager::ws::publish_subscribe::PubSub; +use crate::node_manager::ws::published_channels::PublishedChannels; +use crate::throughput_tracker; + +pub async fn top_flows_bytes(channels: Arc) { + if !channels.is_channel_alive(PublishedChannels::TopFlowsBytes).await { + return; + } + + if let BusResponse::TopFlows(flows) = throughput_tracker::top_flows(10, TopFlowType::Bytes) { + let message = json!( + { + "event": "topFlowsBytes", + "data": flows, + } + ).to_string(); + channels.send(PublishedChannels::TopFlowsBytes, message).await; + } +} \ No newline at end of file