Refactor the ticker engine to be less error-prone while adding new tickers.

This commit is contained in:
Herbert Wolverson 2024-06-27 11:16:03 -05:00
parent 385642f6cc
commit 85961af1a9
19 changed files with 40 additions and 68 deletions

View File

@ -11,7 +11,7 @@ export class RttHistoDash extends BaseDashlet{
}
subscribeTo() {
return [ "rttHistogram" ];
return [ "RttHistogram" ];
}
buildContainer() {
@ -26,7 +26,7 @@ export class RttHistoDash extends BaseDashlet{
}
onMessage(msg) {
if (msg.event === "rttHistogram") {
if (msg.event === "RttHistogram") {
this.graph.update(msg.data);
}
}

View File

@ -7,7 +7,7 @@ export class ShapedUnshapedDash extends BaseDashlet{
}
subscribeTo() {
return [ "throughput" ];
return [ "Throughput" ];
}
buildContainer() {
@ -22,7 +22,7 @@ export class ShapedUnshapedDash extends BaseDashlet{
}
onMessage(msg) {
if (msg.event === "throughput") {
if (msg.event === "Throughput") {
let shaped = msg.data.shaped_bps[0] + msg.data.shaped_bps[1];
let unshaped = msg.data.bps[0] + msg.data.bps[1];
this.graph.update(shaped, unshaped);

View File

@ -7,7 +7,7 @@ export class ThroughputBpsDash extends BaseDashlet{
}
subscribeTo() {
return [ "throughput" ];
return [ "Throughput" ];
}
buildContainer() {
@ -22,7 +22,7 @@ export class ThroughputBpsDash extends BaseDashlet{
}
onMessage(msg) {
if (msg.event === "throughput") {
if (msg.event === "Throughput") {
this.graph.update(msg.data.bps[0], msg.data.bps[1], msg.data.max[0], msg.data.max[1]);
}
}

View File

@ -7,7 +7,7 @@ export class ThroughputPpsDash extends BaseDashlet{
}
subscribeTo() {
return [ "throughput" ];
return [ "Throughput" ];
}
buildContainer() {
@ -22,7 +22,7 @@ export class ThroughputPpsDash extends BaseDashlet{
}
onMessage(msg) {
if (msg.event === "throughput") {
if (msg.event === "Throughput") {
this.graph.update(msg.data.pps[0], msg.data.pps[1]);
}
}

View File

@ -11,7 +11,7 @@ export class ThroughputRingDash extends BaseDashlet{
}
subscribeTo() {
return [ "throughput" ];
return [ "Throughput" ];
}
buildContainer() {
@ -26,7 +26,7 @@ export class ThroughputRingDash extends BaseDashlet{
}
onMessage(msg) {
if (msg.event === "throughput") {
if (msg.event === "Throughput") {
let shaped = msg.data.shaped_bps[0] + msg.data.shaped_bps[1];
let unshaped = msg.data.bps[0] + msg.data.bps[1];
this.graph.update(msg.data.shaped_bps, msg.data.bps);

View File

@ -13,7 +13,7 @@ export class Top10Downloaders extends BaseDashlet {
}
subscribeTo() {
return [ "top10downloaders" ];
return [ "TopDownloads" ];
}
buildContainer() {
@ -28,7 +28,7 @@ export class Top10Downloaders extends BaseDashlet {
}
onMessage(msg) {
if (msg.event === "top10downloaders") {
if (msg.event === "TopDownloads") {
let target = document.getElementById(this.id);
let t = document.createElement("table");

View File

@ -12,7 +12,7 @@ export class Top10FlowsBytes extends BaseDashlet {
}
subscribeTo() {
return [ "topFlowsBytes" ];
return [ "TopFlowsBytes" ];
}
buildContainer() {
@ -27,7 +27,7 @@ export class Top10FlowsBytes extends BaseDashlet {
}
onMessage(msg) {
if (msg.event === "topFlowsBytes") {
if (msg.event === "TopFlowsBytes") {
let target = document.getElementById(this.id);
let t = document.createElement("table");

View File

@ -12,7 +12,7 @@ export class Top10FlowsRate extends BaseDashlet {
}
subscribeTo() {
return [ "topFlowsRate" ];
return [ "TopFlowsRate" ];
}
buildContainer() {
@ -27,7 +27,7 @@ export class Top10FlowsRate extends BaseDashlet {
}
onMessage(msg) {
if (msg.event === "topFlowsRate") {
if (msg.event === "TopFlowsRate") {
let target = document.getElementById(this.id);
let t = document.createElement("table");

View File

@ -7,7 +7,7 @@ export class TrackedFlowsCount extends BaseDashlet{
}
subscribeTo() {
return [ "flowCount" ];
return [ "FlowCount" ];
}
buildContainer() {
@ -22,7 +22,7 @@ export class TrackedFlowsCount extends BaseDashlet{
}
onMessage(msg) {
if (msg.event === "flowCount") {
if (msg.event === "FlowCount") {
this.graph.update(msg.data);
}
}

View File

@ -13,7 +13,7 @@ export class Worst10Downloaders extends BaseDashlet {
}
subscribeTo() {
return [ "worst10downloaders" ];
return [ "WorstRTT" ];
}
buildContainer() {
@ -28,7 +28,7 @@ export class Worst10Downloaders extends BaseDashlet {
}
onMessage(msg) {
if (msg.event === "worst10downloaders") {
if (msg.event === "WorstRTT") {
let target = document.getElementById(this.id);
let t = document.createElement("table");

View File

@ -2,6 +2,7 @@ mod publish_subscribe;
mod published_channels;
mod ticker;
use std::str::FromStr;
use std::sync::Arc;
use axum::{extract::{ws::{Message, WebSocket}, WebSocketUpgrade}, response::IntoResponse, routing::get, Extension, Router};
use serde::Deserialize;

View File

@ -28,7 +28,7 @@ impl PublisherChannel {
let welcome = json!(
{
"event" : "join",
"channel" : self.channel_type.as_str(),
"channel" : self.channel_type.to_string(),
}
).to_string();
let _ = sender.send(welcome).await;

View File

@ -1,43 +1,14 @@
use strum::EnumIter;
use strum::{Display, EnumIter, EnumString};
#[derive(PartialEq, Clone, Copy, Debug, EnumIter)]
#[derive(PartialEq, Clone, Copy, Debug, EnumIter, Display, EnumString)]
pub enum PublishedChannels {
/// Provides a 1-second tick notification to the client
Cadence,
Throughput,
RttHistogram,
FlowCount,
Top10Downloaders,
Worst10Downloaders,
TopDownloads,
WorstRTT,
TopFlowsBytes,
TopFlowsRate,
}
impl PublishedChannels {
pub(super) fn as_str(&self) -> &'static str {
match self {
Self::Throughput => "throughput",
Self::RttHistogram => "rttHistogram",
Self::FlowCount => "flowCount",
Self::Cadence => "cadence",
Self::Top10Downloaders => "top10downloaders",
Self::Worst10Downloaders => "worst10downloaders",
Self::TopFlowsBytes => "topFlowsBytes",
Self::TopFlowsRate => "topFlowsRate",
}
}
pub(super) fn from_str(s: &str) -> Option<Self> {
match s {
"throughput" => Some(Self::Throughput),
"rttHistogram" => Some(Self::RttHistogram),
"flowCount" => Some(Self::FlowCount),
"cadence" => Some(Self::Cadence),
"top10downloaders" => Some(Self::Top10Downloaders),
"worst10downloaders" => Some(Self::Worst10Downloaders),
"topFlowsBytes" => Some(Self::TopFlowsBytes),
"topFlowsRate" => Some(Self::TopFlowsRate),
_ => None,
}
}
}

View File

@ -10,7 +10,7 @@ pub async fn cadence(channels: Arc<PubSub>) {
let message = json!(
{
"event": "tick"
"event": PublishedChannels::Cadence.to_string(),
}
).to_string();
channels.send(PublishedChannels::Cadence, message).await;

View File

@ -15,7 +15,7 @@ pub async fn flow_count(channels: Arc<PubSub>) {
};
let active_flows = json!(
{
"event": "flowCount",
"event": PublishedChannels::FlowCount.to_string(),
"data": active_flows,
}
).to_string();

View File

@ -14,7 +14,7 @@ pub async fn rtt_histo(channels: Arc<PubSub>) {
if let BusResponse::RttHistogram(data) = &histo {
let rtt_histo = json!(
{
"event": "rttHistogram",
"event": PublishedChannels::RttHistogram.to_string(),
"data": data,
}
).to_string();

View File

@ -27,7 +27,7 @@ pub async fn throughput(channels: Arc<PubSub>) {
};
let bps = json!(
{
"event" : "throughput",
"event" : PublishedChannels::Throughput.to_string(),
"data": {
"bps": bits_per_second,
"pps": packets_per_second,

View File

@ -7,7 +7,7 @@ use crate::node_manager::ws::ticker::ipstats_conversion::IpStatsWithPlan;
use crate::throughput_tracker::{top_n, worst_n};
pub async fn top_10_downloaders(channels: Arc<PubSub>) {
if !channels.is_channel_alive(PublishedChannels::Top10Downloaders).await {
if !channels.is_channel_alive(PublishedChannels::TopDownloads).await {
return;
}
@ -19,16 +19,16 @@ pub async fn top_10_downloaders(channels: Arc<PubSub>) {
let message = json!(
{
"event": "top10downloaders",
"event": PublishedChannels::TopDownloads.to_string(),
"data": result
}
).to_string();
channels.send(PublishedChannels::Top10Downloaders, message).await;
channels.send(PublishedChannels::TopDownloads, message).await;
}
}
pub async fn worst_10_downloaders(channels: Arc<PubSub>) {
if !channels.is_channel_alive(PublishedChannels::Worst10Downloaders).await {
if !channels.is_channel_alive(PublishedChannels::WorstRTT).await {
return;
}
@ -40,10 +40,10 @@ pub async fn worst_10_downloaders(channels: Arc<PubSub>) {
let message = json!(
{
"event": "worst10downloaders",
"event": PublishedChannels::WorstRTT.to_string(),
"data": result
}
).to_string();
channels.send(PublishedChannels::Worst10Downloaders, message).await;
channels.send(PublishedChannels::WorstRTT, message).await;
}
}

View File

@ -16,7 +16,7 @@ pub async fn top_flows_bytes(channels: Arc<PubSub>) {
if let BusResponse::TopFlows(flows) = throughput_tracker::top_flows(10, TopFlowType::Bytes) {
let message = json!(
{
"event": "topFlowsBytes",
"event": PublishedChannels::TopFlowsBytes.to_string(),
"data": flows,
}
).to_string();
@ -25,17 +25,17 @@ pub async fn top_flows_bytes(channels: Arc<PubSub>) {
}
pub async fn top_flows_rate(channels: Arc<PubSub>) {
if !channels.is_channel_alive(PublishedChannels::TopFlowsBytes).await {
if !channels.is_channel_alive(PublishedChannels::TopFlowsRate).await {
return;
}
if let BusResponse::TopFlows(flows) = throughput_tracker::top_flows(10, TopFlowType::RateEstimate) {
let message = json!(
{
"event": "topFlowsRate",
"event": PublishedChannels::TopFlowsRate.to_string(),
"data": flows,
}
).to_string();
channels.send(PublishedChannels::TopFlowsBytes, message).await;
channels.send(PublishedChannels::TopFlowsRate, message).await;
}
}