From 84a3709a59bfc029b932fe72f5860a3b2d687327 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Wed, 26 Apr 2023 16:09:30 +0000 Subject: [PATCH] Separate per-host throughput lines --- .../lts_node/src/web/wss/dashboard.rs | 129 ----------- .../lts_node/src/web/wss/mod.rs | 4 +- .../lts_node/src/web/wss/queries/mod.rs | 2 + .../src/web/wss/queries/packet_counts/mod.rs | 2 +- .../src/web/wss/queries/throughput/mod.rs | 100 +++++++++ .../wss/queries/throughput/throughput_host.rs | 23 ++ .../wss/queries/throughput/throughput_row.rs | 25 +++ .../site_build/src/components/throughput.ts | 200 ++++++++++-------- 8 files changed, 262 insertions(+), 223 deletions(-) create mode 100644 src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs create mode 100644 src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_host.rs create mode 100644 src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_row.rs diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/dashboard.rs b/src/rust/long_term_stats/lts_node/src/web/wss/dashboard.rs index a8611a1a..7cd4d480 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/dashboard.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/dashboard.rs @@ -71,135 +71,6 @@ struct RttChart { histo: Vec, } - -pub async fn packets(cnn: Pool, socket: &mut WebSocket, key: &str) { - if let Some(org) = get_org_details(cnn, key).await { - let influx_url = format!("http://{}:8086", org.influx_host); - let client = Client::new(influx_url, &org.influx_org, &org.influx_token); - - let qs = format!( - "from(bucket: \"{}\") - |> range(start: -5m) - |> filter(fn: (r) => r[\"_measurement\"] == \"packets\") - |> filter(fn: (r) => r[\"organization_id\"] == \"{}\") - |> aggregateWindow(every: 10s, fn: mean, createEmpty: false) - |> yield(name: \"last\")", - org.influx_bucket, org.key - ); - - let query = Query::new(qs); - let rows = client.query::(Some(query)).await; - match rows { - Err(e) => { - tracing::error!("Error querying InfluxDB: {}", e); - } - Ok(rows) => { - // Parse and send the data - //println!("{rows:?}"); - - let mut down = Vec::new(); - let mut up = Vec::new(); - - // Fill download - for row in rows - .iter() - .filter(|r| r.direction == "down") - { - down.push(Packets { - value: row.avg, - date: row.time.format("%H:%M:%S").to_string(), - l: row.min, - u: row.max - row.min, - }); - } - - // Fill upload - for row in rows - .iter() - .filter(|r| r.direction == "up") - { - up.push(Packets { - value: row.avg, - date: row.time.format("%H:%M:%S").to_string(), - l: row.min, - u: row.max - row.min, - }); - } - - - // Send it - let chart = PacketChart { msg: "packetChart".to_string(), down, up }; - let json = serde_json::to_string(&chart).unwrap(); - socket.send(Message::Text(json)).await.unwrap(); - } - } - } -} - -pub async fn bits(cnn: Pool, socket: &mut WebSocket, key: &str) { - if let Some(org) = get_org_details(cnn, key).await { - let influx_url = format!("http://{}:8086", org.influx_host); - let client = Client::new(influx_url, &org.influx_org, &org.influx_token); - - let qs = format!( - "from(bucket: \"{}\") - |> range(start: -5m) - |> filter(fn: (r) => r[\"_measurement\"] == \"bits\") - |> filter(fn: (r) => r[\"organization_id\"] == \"{}\") - |> aggregateWindow(every: 10s, fn: mean, createEmpty: false) - |> yield(name: \"last\")", - org.influx_bucket, org.key - ); - - let query = Query::new(qs); - let rows = client.query::(Some(query)).await; - match rows { - Err(e) => { - tracing::error!("Error querying InfluxDB: {}", e); - } - Ok(rows) => { - // Parse and send the data - //println!("{rows:?}"); - - let mut down = Vec::new(); - let mut up = Vec::new(); - - // Fill download - for row in rows - .iter() - .filter(|r| r.direction == "down") - { - down.push(Packets { - value: row.avg, - date: row.time.format("%H:%M:%S").to_string(), - l: row.min, - u: row.max - row.min, - }); - } - - // Fill upload - for row in rows - .iter() - .filter(|r| r.direction == "up") - { - up.push(Packets { - value: row.avg, - date: row.time.format("%H:%M:%S").to_string(), - l: row.min, - u: row.max - row.min, - }); - } - - - // Send it - let chart = PacketChart { msg: "bitsChart".to_string(), down, up }; - let json = serde_json::to_string(&chart).unwrap(); - socket.send(Message::Text(json)).await.unwrap(); - } - } - } -} - pub async fn rtt(cnn: Pool, socket: &mut WebSocket, key: &str) { if let Some(org) = get_org_details(cnn, key).await { let influx_url = format!("http://{}:8086", org.influx_host); diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs b/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs index d64ed95f..42491942 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs @@ -4,7 +4,7 @@ use axum::{ }; use pgdb::sqlx::{Pool, Postgres}; use serde_json::Value; -use crate::web::wss::queries::send_packets_for_all_nodes; +use crate::web::wss::queries::{send_packets_for_all_nodes, send_throughput_for_all_nodes}; mod login; mod nodes; mod dashboard; @@ -62,7 +62,7 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool) { } "throughputChart" => { if let Some(credentials) = &credentials { - dashboard::bits(cnn.clone(), &mut socket, &credentials.license_key).await; + let _ = send_throughput_for_all_nodes(cnn.clone(), &mut socket, &credentials.license_key).await; } else { log::info!("Throughput requested but no credentials provided"); } diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs index a34322f7..bd03e00a 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs @@ -2,4 +2,6 @@ //! then be used by the web server to respond to requests. mod packet_counts; +mod throughput; pub use packet_counts::send_packets_for_all_nodes; +pub use throughput::send_throughput_for_all_nodes; \ No newline at end of file diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/packet_counts/mod.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/packet_counts/mod.rs index 9d793861..870f12be 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/queries/packet_counts/mod.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/packet_counts/mod.rs @@ -35,7 +35,7 @@ pub async fn get_packets_for_all_nodes(cnn: Pool, key: &str) -> anyhow } let all_nodes: anyhow::Result> = join_all(futures).await .into_iter().collect(); - Ok(all_nodes?) + all_nodes } /// Requests packet-per-second data for a single shaper node. diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs new file mode 100644 index 00000000..68d28795 --- /dev/null +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs @@ -0,0 +1,100 @@ +use axum::extract::ws::{WebSocket, Message}; +use futures::future::join_all; +use influxdb2::{Client, models::Query}; +use pgdb::sqlx::{Pool, Postgres}; +use crate::submissions::get_org_details; +use self::{throughput_host::{ThroughputHost, Throughput, ThroughputChart}, throughput_row::ThroughputRow}; +mod throughput_host; +mod throughput_row; + +pub async fn send_throughput_for_all_nodes(cnn: Pool, socket: &mut WebSocket, key: &str) -> anyhow::Result<()> { + let nodes = get_throughput_for_all_nodes(cnn, key).await?; + + let chart = ThroughputChart { msg: "bitsChart".to_string(), nodes }; + let json = serde_json::to_string(&chart).unwrap(); + socket.send(Message::Text(json)).await.unwrap(); + Ok(()) +} + +pub async fn get_throughput_for_all_nodes(cnn: Pool, key: &str) -> anyhow::Result> { + let node_status = pgdb::node_status(cnn.clone(), key).await?; + let mut futures = Vec::new(); + for node in node_status { + futures.push(get_throughput_for_node( + cnn.clone(), + key, + node.node_id.to_string(), + node.node_name.to_string(), + )); + } + let all_nodes: anyhow::Result> = join_all(futures).await + .into_iter().collect(); + all_nodes +} + +pub async fn get_throughput_for_node( + cnn: Pool, + key: &str, + node_id: String, + node_name: String, +) -> anyhow::Result { + if let Some(org) = get_org_details(cnn, key).await { + let influx_url = format!("http://{}:8086", org.influx_host); + let client = Client::new(influx_url, &org.influx_org, &org.influx_token); + + let qs = format!( + "from(bucket: \"{}\") + |> range(start: -5m) + |> filter(fn: (r) => r[\"_measurement\"] == \"bits\") + |> filter(fn: (r) => r[\"organization_id\"] == \"{}\") + |> filter(fn: (r) => r[\"host_id\"] == \"{}\") + |> aggregateWindow(every: 10s, fn: mean, createEmpty: false) + |> yield(name: \"last\")", + org.influx_bucket, org.key, node_id + ); + + let query = Query::new(qs); + let rows = client.query::(Some(query)).await; + match rows { + Err(e) => { + tracing::error!("Error querying InfluxDB: {}", e); + return Err(anyhow::Error::msg("Unable to query influx")); + } + Ok(rows) => { + // Parse and send the data + //println!("{rows:?}"); + + let mut down = Vec::new(); + let mut up = Vec::new(); + + // Fill download + for row in rows.iter().filter(|r| r.direction == "down") { + down.push(Throughput { + value: row.avg, + date: row.time.format("%H:%M:%S").to_string(), + l: row.min, + u: row.max - row.min, + }); + } + + // Fill upload + for row in rows.iter().filter(|r| r.direction == "up") { + up.push(Throughput { + value: row.avg, + date: row.time.format("%H:%M:%S").to_string(), + l: row.min, + u: row.max - row.min, + }); + } + + return Ok(ThroughputHost{ + node_id, + node_name, + down, + up, + }); + } + } + } + Err(anyhow::Error::msg("Unable to query influx")) +} diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_host.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_host.rs new file mode 100644 index 00000000..188d5b02 --- /dev/null +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_host.rs @@ -0,0 +1,23 @@ +use serde::Serialize; + +#[derive(Serialize, Debug)] +pub struct ThroughputHost { + pub node_id: String, + pub node_name: String, + pub down: Vec, + pub up: Vec, +} + +#[derive(Serialize, Debug)] +pub struct Throughput { + pub value: f64, + pub date: String, + pub l: f64, + pub u: f64, +} + +#[derive(Serialize, Debug)] +pub struct ThroughputChart { + pub msg: String, + pub nodes: Vec, +} \ No newline at end of file diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_row.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_row.rs new file mode 100644 index 00000000..e60d9bca --- /dev/null +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/throughput_row.rs @@ -0,0 +1,25 @@ +use chrono::{DateTime, FixedOffset, Utc}; +use influxdb2::FromDataPoint; + +#[derive(Debug, FromDataPoint)] +pub struct ThroughputRow { + pub direction: String, + pub host_id: String, + pub min: f64, + pub max: f64, + pub avg: f64, + pub time: DateTime, +} + +impl Default for ThroughputRow { + fn default() -> Self { + Self { + direction: "".to_string(), + host_id: "".to_string(), + min: 0.0, + max: 0.0, + avg: 0.0, + time: DateTime::::MIN_UTC.into(), + } + } +} \ No newline at end of file diff --git a/src/rust/long_term_stats/site_build/src/components/throughput.ts b/src/rust/long_term_stats/site_build/src/components/throughput.ts index 66d69671..8ae15816 100644 --- a/src/rust/long_term_stats/site_build/src/components/throughput.ts +++ b/src/rust/long_term_stats/site_build/src/components/throughput.ts @@ -5,13 +5,6 @@ import * as echarts from 'echarts'; export class ThroughputChart implements Component { div: HTMLElement; myChart: echarts.ECharts; - download: any; - downloadMin: any; - downloadMax: any; - upload: any; - uploadMin: any; - uploadMax: any; - x: any; chartMade: boolean = false; constructor() { @@ -29,22 +22,107 @@ export class ThroughputChart implements Component { onmessage(event: any): void { if (event.msg == "bitsChart") { - //console.log(event); - this.download = []; - this.downloadMin = []; - this.downloadMax = []; - this.upload = []; - this.uploadMin = []; - this.uploadMax = []; - this.x = []; - for (let i = 0; i < event.down.length; i++) { - this.download.push(event.down[i].value); - this.downloadMin.push(event.down[i].l); - this.downloadMax.push(event.down[i].u); - this.upload.push(0.0 - event.up[i].value); - this.uploadMin.push(0.0 - event.up[i].l); - this.uploadMax.push(0.0 - event.up[i].u); - this.x.push(event.down[i].date); + let series: echarts.SeriesOption[] = []; + + // Iterate all provides nodes and create a set of series for each, + // providing upload and download banding per node. + let x: any[] = []; + let first = true; + let legend: string[] = []; + for (let i=0; i( (option = { title: { text: "Bits" }, + legend: { + orient: "horizontal", + right: 10, + top: "bottom", + data: legend, + }, xAxis: { type: 'category', - data: this.x, + data: x, }, yAxis: { type: 'value', @@ -65,73 +149,7 @@ export class ThroughputChart implements Component { } } }, - series: [ - { - name: "L", - type: "line", - data: this.downloadMin, - symbol: 'none', - stack: 'confidence-band', - lineStyle: { - opacity: 0 - }, - }, - { - name: "U", - type: "line", - data: this.downloadMax, - symbol: 'none', - stack: 'confidence-band', - lineStyle: { - opacity: 0 - }, - areaStyle: { - color: '#ccc' - }, - }, - { - name: "Download", - type: "line", - data: this.download, - symbol: 'none', - itemStyle: { - color: '#333' - }, - }, - // Upload - { - name: "LU", - type: "line", - data: this.uploadMin, - symbol: 'none', - stack: 'confidence-band', - lineStyle: { - opacity: 0 - }, - }, - { - name: "UU", - type: "line", - data: this.uploadMax, - symbol: 'none', - stack: 'confidence-band', - lineStyle: { - opacity: 0 - }, - areaStyle: { - color: '#ccc' - }, - }, - { - name: "Upload", - type: "line", - data: this.upload, - symbol: 'none', - itemStyle: { - color: '#333' - }, - }, - ] + series: series }) ); option && this.myChart.setOption(option);