mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
Make channel creation by enum entry automatic (slightly slower start, but now I can't forget). Add worst 10 downloaders to the list of information channels.
This commit is contained in:
parent
854bdd2eae
commit
1a83857f98
1
src/rust/Cargo.lock
generated
1
src/rust/Cargo.lock
generated
@ -2054,6 +2054,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"signal-hook",
|
||||
"strum",
|
||||
"sysinfo",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
|
@ -40,6 +40,7 @@ zerocopy = {version = "0.6.1", features = [ "simd" ] }
|
||||
fxhash = "0.2.1"
|
||||
axum = { version = "0.7.5", features = ["ws"] }
|
||||
tower-http = { version = "0.5.2", features = ["fs"] }
|
||||
strum = { version = "0.26.3", features = ["derive"] }
|
||||
|
||||
# Support JemAlloc on supported platforms
|
||||
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]
|
||||
|
@ -5,6 +5,7 @@ import {TrackedFlowsCount} from "./tracked_flow_count_dash";
|
||||
import {ThroughputRingDash} from "./throughput_ring_dash";
|
||||
import {RttHistoDash} from "./rtt_histo_dash";
|
||||
import {Top10Downloaders} from "./top10_downloaders";
|
||||
import {Worst10Downloaders} from "./worst10_downloaders";
|
||||
|
||||
export const DashletMenu = [
|
||||
{ name: "Throughput Bits/Second", tag: "throughputBps", size: 3 },
|
||||
@ -14,6 +15,7 @@ export const DashletMenu = [
|
||||
{ name: "Last 5 Minutes Throughput", tag: "throughputRing", size: 6 },
|
||||
{ name: "Round-Trip Time Histogram", tag: "rttHistogram", size: 6 },
|
||||
{ name: "Top 10 Downloaders", tag: "top10downloaders", size: 6 },
|
||||
{ name: "Worst 10 Round-Trip Time", tag: "worst10downloaders", size: 6 },
|
||||
];
|
||||
|
||||
export function widgetFactory(widgetName, count) {
|
||||
@ -26,6 +28,7 @@ export function widgetFactory(widgetName, count) {
|
||||
case "throughputRing": widget = new ThroughputRingDash(count); break;
|
||||
case "rttHistogram": widget = new RttHistoDash(count); break;
|
||||
case "top10downloaders":widget = new Top10Downloaders(count); break;
|
||||
case "worst10downloaders":widget = new Worst10Downloaders(count); break;
|
||||
default: {
|
||||
console.log("I don't know how to construct a widget of type [" + widgetName + "]");
|
||||
return null;
|
||||
|
@ -0,0 +1,90 @@
|
||||
import {BaseDashlet} from "./base_dashlet";
|
||||
import {RttHistogram} from "../graphs/rtt_histo";
|
||||
import {theading} from "../helpers/builders";
|
||||
import {scaleNumber, rttCircleSpan} from "../helpers/scaling";
|
||||
|
||||
export class Worst10Downloaders extends BaseDashlet {
|
||||
constructor(slot) {
|
||||
super(slot);
|
||||
}
|
||||
|
||||
title() {
|
||||
return "Worst 10 Round-Trip Time";
|
||||
}
|
||||
|
||||
subscribeTo() {
|
||||
return [ "worst10downloaders" ];
|
||||
}
|
||||
|
||||
buildContainer() {
|
||||
let base = super.buildContainer();
|
||||
base.style.height = "250px";
|
||||
base.style.overflow = "auto";
|
||||
return base;
|
||||
}
|
||||
|
||||
setup() {
|
||||
super.setup();
|
||||
}
|
||||
|
||||
onMessage(msg) {
|
||||
if (msg.event === "worst10downloaders") {
|
||||
let target = document.getElementById(this.id);
|
||||
|
||||
let t = document.createElement("table");
|
||||
t.classList.add("table", "table-striped", "tiny");
|
||||
|
||||
let th = document.createElement("thead");
|
||||
th.appendChild(theading(""));
|
||||
th.appendChild(theading("IP Address/Circuit"));
|
||||
th.appendChild(theading("DL ⬇️"));
|
||||
th.appendChild(theading("UL ⬆️"));
|
||||
th.appendChild(theading("RTT (ms)"));
|
||||
th.appendChild(theading("TCP Retransmits"));
|
||||
th.appendChild(theading("Shaped"));
|
||||
t.appendChild(th);
|
||||
|
||||
let tbody = document.createElement("tbody");
|
||||
msg.data.forEach((r) => {
|
||||
let row = document.createElement("tr");
|
||||
|
||||
let circle = document.createElement("td");
|
||||
circle.appendChild(rttCircleSpan(r.median_tcp_rtt));
|
||||
row.appendChild(circle);
|
||||
|
||||
let ip = document.createElement("td");
|
||||
ip.innerText = r.ip_address;
|
||||
row.append(ip);
|
||||
|
||||
let dl = document.createElement("td");
|
||||
dl.innerText = scaleNumber(r.bits_per_second[0]);
|
||||
row.append(dl);
|
||||
|
||||
let ul = document.createElement("td");
|
||||
ul.innerText = scaleNumber(r.bits_per_second[1]);
|
||||
row.append(ul);
|
||||
|
||||
let rtt = document.createElement("td");
|
||||
rtt.innerText = r.median_tcp_rtt.toFixed(2);
|
||||
row.append(rtt);
|
||||
|
||||
let tcp_xmit = document.createElement("td");
|
||||
tcp_xmit.innerText = r.tcp_retransmits[0] + " / " + r.tcp_retransmits[1];
|
||||
row.append(tcp_xmit);
|
||||
|
||||
let shaped = document.createElement("td");
|
||||
shaped.innerText = r.plan[0] + " / " + r.plan[1];
|
||||
row.append(shaped);
|
||||
|
||||
t.appendChild(row);
|
||||
});
|
||||
t.appendChild(tbody);
|
||||
|
||||
// Display it
|
||||
while (target.children.length > 1) {
|
||||
target.removeChild(target.lastChild);
|
||||
}
|
||||
target.appendChild(t);
|
||||
}
|
||||
}
|
||||
}
|
@ -6,6 +6,7 @@ mod publisher_channel;
|
||||
mod subscriber;
|
||||
|
||||
use std::sync::Arc;
|
||||
use strum::IntoEnumIterator;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::Mutex;
|
||||
use crate::node_manager::ws::publish_subscribe::publisher_channel::PublisherChannel;
|
||||
@ -25,12 +26,20 @@ impl PubSub {
|
||||
/// Constructs a new PubSub interface with a default set of
|
||||
/// channels.
|
||||
pub(super) fn new() -> Arc<Self> {
|
||||
let mut channels = Vec::new();
|
||||
for c in PublishedChannels::iter() {
|
||||
channels.push(
|
||||
PublisherChannel::new(c)
|
||||
);
|
||||
}
|
||||
|
||||
let channels = vec![
|
||||
PublisherChannel::new(PublishedChannels::Throughput),
|
||||
PublisherChannel::new(PublishedChannels::RttHistogram),
|
||||
PublisherChannel::new(PublishedChannels::FlowCount),
|
||||
PublisherChannel::new(PublishedChannels::Cadence),
|
||||
PublisherChannel::new(PublishedChannels::Top10Downloaders),
|
||||
PublisherChannel::new(PublishedChannels::Worst10Downloaders),
|
||||
];
|
||||
|
||||
let result = Self {
|
||||
@ -46,9 +55,11 @@ impl PubSub {
|
||||
let mut channels = self.channels.lock().await;
|
||||
if let Some(channel) = channels.iter_mut().find(|c| c.channel_type == channel) {
|
||||
channel.subscribe(sender).await;
|
||||
} else {
|
||||
log::warn!("Tried to subscribe to channel {:?}, which doesn't exist", channel);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Checks that a channel has anyone listening for it. If it doesn't,
|
||||
/// there's no point in using CPU to process it!
|
||||
pub(super) async fn is_channel_alive(&self, channel: PublishedChannels) -> bool {
|
||||
@ -59,7 +70,7 @@ impl PubSub {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub(super) async fn send(&self, channel: PublishedChannels, message: String) {
|
||||
let mut channels = self.channels.lock().await;
|
||||
if let Some(channel) = channels.iter_mut().find(|c| c.channel_type == channel) {
|
||||
|
@ -1,4 +1,6 @@
|
||||
#[derive(PartialEq, Clone, Copy, Debug)]
|
||||
use strum::EnumIter;
|
||||
|
||||
#[derive(PartialEq, Clone, Copy, Debug, EnumIter)]
|
||||
pub enum PublishedChannels {
|
||||
/// Provides a 1-second tick notification to the client
|
||||
Cadence,
|
||||
@ -6,6 +8,7 @@ pub enum PublishedChannels {
|
||||
RttHistogram,
|
||||
FlowCount,
|
||||
Top10Downloaders,
|
||||
Worst10Downloaders,
|
||||
}
|
||||
|
||||
impl PublishedChannels {
|
||||
@ -16,6 +19,7 @@ impl PublishedChannels {
|
||||
Self::FlowCount => "flowCount",
|
||||
Self::Cadence => "cadence",
|
||||
Self::Top10Downloaders => "top10downloaders",
|
||||
Self::Worst10Downloaders => "worst10downloaders",
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,6 +30,7 @@ impl PublishedChannels {
|
||||
"flowCount" => Some(Self::FlowCount),
|
||||
"cadence" => Some(Self::Cadence),
|
||||
"top10downloaders" => Some(Self::Top10Downloaders),
|
||||
"worst10downloaders" => Some(Self::Worst10Downloaders),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ pub(super) async fn channel_ticker(channels: Arc<PubSub>) {
|
||||
rtt_histogram::rtt_histo(channels.clone()),
|
||||
flow_counter::flow_count(channels.clone()),
|
||||
top_10::top_10_downloaders(channels.clone()),
|
||||
top_10::worst_10_downloaders(channels.clone()),
|
||||
);
|
||||
}
|
||||
}
|
@ -4,13 +4,13 @@ use lqos_bus::BusResponse;
|
||||
use crate::node_manager::ws::publish_subscribe::PubSub;
|
||||
use crate::node_manager::ws::published_channels::PublishedChannels;
|
||||
use crate::node_manager::ws::ticker::ipstats_conversion::IpStatsWithPlan;
|
||||
use crate::throughput_tracker::top_n;
|
||||
use crate::throughput_tracker::{top_n, worst_n};
|
||||
|
||||
pub async fn top_10_downloaders(channels: Arc<PubSub>) {
|
||||
if !channels.is_channel_alive(PublishedChannels::Top10Downloaders).await {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if let BusResponse::TopDownloaders(top) = top_n(0, 10) {
|
||||
let result: Vec<IpStatsWithPlan> = top
|
||||
.iter()
|
||||
@ -25,4 +25,25 @@ pub async fn top_10_downloaders(channels: Arc<PubSub>) {
|
||||
).to_string();
|
||||
channels.send(PublishedChannels::Top10Downloaders, message).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn worst_10_downloaders(channels: Arc<PubSub>) {
|
||||
if !channels.is_channel_alive(PublishedChannels::Worst10Downloaders).await {
|
||||
return;
|
||||
}
|
||||
|
||||
if let BusResponse::WorstRtt(top) = worst_n(0, 10) {
|
||||
let result: Vec<IpStatsWithPlan> = top
|
||||
.iter()
|
||||
.map(|stat| stat.into())
|
||||
.collect();
|
||||
|
||||
let message = json!(
|
||||
{
|
||||
"event": "worst10downloaders",
|
||||
"data": result
|
||||
}
|
||||
).to_string();
|
||||
channels.send(PublishedChannels::Worst10Downloaders, message).await;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user