Separate per-host throughput lines

This commit is contained in:
Herbert Wolverson 2023-04-26 16:09:30 +00:00
parent 1c22faa0f0
commit 84a3709a59
8 changed files with 262 additions and 223 deletions

View File

@ -71,135 +71,6 @@ struct RttChart {
histo: Vec<u64>,
}
pub async fn packets(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str) {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> range(start: -5m)
|> filter(fn: (r) => r[\"_measurement\"] == \"packets\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> yield(name: \"last\")",
org.influx_bucket, org.key
);
let query = Query::new(qs);
let rows = client.query::<BitsAndPackets>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut down = Vec::new();
let mut up = Vec::new();
// Fill download
for row in rows
.iter()
.filter(|r| r.direction == "down")
{
down.push(Packets {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
// Fill upload
for row in rows
.iter()
.filter(|r| r.direction == "up")
{
up.push(Packets {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
// Send it
let chart = PacketChart { msg: "packetChart".to_string(), down, up };
let json = serde_json::to_string(&chart).unwrap();
socket.send(Message::Text(json)).await.unwrap();
}
}
}
}
pub async fn bits(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str) {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> range(start: -5m)
|> filter(fn: (r) => r[\"_measurement\"] == \"bits\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> yield(name: \"last\")",
org.influx_bucket, org.key
);
let query = Query::new(qs);
let rows = client.query::<BitsAndPackets>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut down = Vec::new();
let mut up = Vec::new();
// Fill download
for row in rows
.iter()
.filter(|r| r.direction == "down")
{
down.push(Packets {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
// Fill upload
for row in rows
.iter()
.filter(|r| r.direction == "up")
{
up.push(Packets {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
// Send it
let chart = PacketChart { msg: "bitsChart".to_string(), down, up };
let json = serde_json::to_string(&chart).unwrap();
socket.send(Message::Text(json)).await.unwrap();
}
}
}
}
pub async fn rtt(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str) {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);

View File

@ -4,7 +4,7 @@ use axum::{
};
use pgdb::sqlx::{Pool, Postgres};
use serde_json::Value;
use crate::web::wss::queries::send_packets_for_all_nodes;
use crate::web::wss::queries::{send_packets_for_all_nodes, send_throughput_for_all_nodes};
mod login;
mod nodes;
mod dashboard;
@ -62,7 +62,7 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
}
"throughputChart" => {
if let Some(credentials) = &credentials {
dashboard::bits(cnn.clone(), &mut socket, &credentials.license_key).await;
let _ = send_throughput_for_all_nodes(cnn.clone(), &mut socket, &credentials.license_key).await;
} else {
log::info!("Throughput requested but no credentials provided");
}

View File

@ -2,4 +2,6 @@
//! then be used by the web server to respond to requests.
mod packet_counts;
mod throughput;
pub use packet_counts::send_packets_for_all_nodes;
pub use throughput::send_throughput_for_all_nodes;

View File

@ -35,7 +35,7 @@ pub async fn get_packets_for_all_nodes(cnn: Pool<Postgres>, key: &str) -> anyhow
}
let all_nodes: anyhow::Result<Vec<PacketHost>> = join_all(futures).await
.into_iter().collect();
Ok(all_nodes?)
all_nodes
}
/// Requests packet-per-second data for a single shaper node.

View File

@ -0,0 +1,100 @@
use axum::extract::ws::{WebSocket, Message};
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::sqlx::{Pool, Postgres};
use crate::submissions::get_org_details;
use self::{throughput_host::{ThroughputHost, Throughput, ThroughputChart}, throughput_row::ThroughputRow};
mod throughput_host;
mod throughput_row;
pub async fn send_throughput_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes(cnn, key).await?;
let chart = ThroughputChart { msg: "bitsChart".to_string(), nodes };
let json = serde_json::to_string(&chart).unwrap();
socket.send(Message::Text(json)).await.unwrap();
Ok(())
}
pub async fn get_throughput_for_all_nodes(cnn: Pool<Postgres>, key: &str) -> anyhow::Result<Vec<ThroughputHost>> {
let node_status = pgdb::node_status(cnn.clone(), key).await?;
let mut futures = Vec::new();
for node in node_status {
futures.push(get_throughput_for_node(
cnn.clone(),
key,
node.node_id.to_string(),
node.node_name.to_string(),
));
}
let all_nodes: anyhow::Result<Vec<ThroughputHost>> = join_all(futures).await
.into_iter().collect();
all_nodes
}
pub async fn get_throughput_for_node(
cnn: Pool<Postgres>,
key: &str,
node_id: String,
node_name: String,
) -> anyhow::Result<ThroughputHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> range(start: -5m)
|> filter(fn: (r) => r[\"_measurement\"] == \"bits\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> yield(name: \"last\")",
org.influx_bucket, org.key, node_id
);
let query = Query::new(qs);
let rows = client.query::<ThroughputRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut down = Vec::new();
let mut up = Vec::new();
// Fill download
for row in rows.iter().filter(|r| r.direction == "down") {
down.push(Throughput {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
// Fill upload
for row in rows.iter().filter(|r| r.direction == "up") {
up.push(Throughput {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
return Ok(ThroughputHost{
node_id,
node_name,
down,
up,
});
}
}
}
Err(anyhow::Error::msg("Unable to query influx"))
}

View File

@ -0,0 +1,23 @@
use serde::Serialize;
#[derive(Serialize, Debug)]
pub struct ThroughputHost {
pub node_id: String,
pub node_name: String,
pub down: Vec<Throughput>,
pub up: Vec<Throughput>,
}
#[derive(Serialize, Debug)]
pub struct Throughput {
pub value: f64,
pub date: String,
pub l: f64,
pub u: f64,
}
#[derive(Serialize, Debug)]
pub struct ThroughputChart {
pub msg: String,
pub nodes: Vec<ThroughputHost>,
}

View File

@ -0,0 +1,25 @@
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::FromDataPoint;
#[derive(Debug, FromDataPoint)]
pub struct ThroughputRow {
pub direction: String,
pub host_id: String,
pub min: f64,
pub max: f64,
pub avg: f64,
pub time: DateTime<FixedOffset>,
}
impl Default for ThroughputRow {
fn default() -> Self {
Self {
direction: "".to_string(),
host_id: "".to_string(),
min: 0.0,
max: 0.0,
avg: 0.0,
time: DateTime::<Utc>::MIN_UTC.into(),
}
}
}

View File

@ -5,13 +5,6 @@ import * as echarts from 'echarts';
export class ThroughputChart implements Component {
div: HTMLElement;
myChart: echarts.ECharts;
download: any;
downloadMin: any;
downloadMax: any;
upload: any;
uploadMin: any;
uploadMax: any;
x: any;
chartMade: boolean = false;
constructor() {
@ -29,22 +22,107 @@ export class ThroughputChart implements Component {
onmessage(event: any): void {
if (event.msg == "bitsChart") {
//console.log(event);
this.download = [];
this.downloadMin = [];
this.downloadMax = [];
this.upload = [];
this.uploadMin = [];
this.uploadMax = [];
this.x = [];
for (let i = 0; i < event.down.length; i++) {
this.download.push(event.down[i].value);
this.downloadMin.push(event.down[i].l);
this.downloadMax.push(event.down[i].u);
this.upload.push(0.0 - event.up[i].value);
this.uploadMin.push(0.0 - event.up[i].l);
this.uploadMax.push(0.0 - event.up[i].u);
this.x.push(event.down[i].date);
let series: echarts.SeriesOption[] = [];
// Iterate all provides nodes and create a set of series for each,
// providing upload and download banding per node.
let x: any[] = [];
let first = true;
let legend: string[] = [];
for (let i=0; i<event.nodes.length; i++) {
let node = event.nodes[i];
legend.push(node.node_name + " DL");
legend.push(node.node_name + " UL");
//console.log(node);
let d: number[] = [];
let u: number[] = [];
let l: number[] = [];
for (let j=0; j<node.down.length; j++) {
if (first) x.push(node.down[j].date);
d.push(node.down[j].value);
u.push(node.down[j].u);
l.push(node.down[j].l);
}
if (first) first = false;
let min: echarts.SeriesOption = {
name: "L",
type: "line",
data: l,
symbol: 'none',
stack: 'confidence-band-' + node.node_id,
lineStyle: {
opacity: 0
},
};
let max: echarts.SeriesOption = {
name: "U",
type: "line",
data: u,
symbol: 'none',
stack: 'confidence-band' + node.node_id,
lineStyle: {
opacity: 0
},
areaStyle: {
color: '#ccc'
},
};
let val: echarts.SeriesOption = {
name: node.node_name + " DL",
type: "line",
data: d,
symbol: 'none',
};
series.push(min);
series.push(max);
series.push(val);
// Do the same for upload
d = [];
u = [];
l = [];
for (let j=0; j<node.down.length; j++) {
d.push(0.0 - node.up[j].value);
u.push(0.0 - node.up[j].u);
l.push(0.0 - node.up[j].l);
}
min = {
name: "L",
type: "line",
data: l,
symbol: 'none',
stack: 'confidence-band-' + node.node_id,
lineStyle: {
opacity: 0
},
};
max = {
name: "U",
type: "line",
data: u,
symbol: 'none',
stack: 'confidence-band' + node.node_id,
lineStyle: {
opacity: 0
},
areaStyle: {
color: '#ccc'
},
};
val = {
name: node.node_name + " UL",
type: "line",
data: d,
symbol: 'none',
};
series.push(min);
series.push(max);
series.push(val);
}
if (!this.chartMade) {
@ -53,9 +131,15 @@ export class ThroughputChart implements Component {
this.myChart.setOption<echarts.EChartsOption>(
(option = {
title: { text: "Bits" },
legend: {
orient: "horizontal",
right: 10,
top: "bottom",
data: legend,
},
xAxis: {
type: 'category',
data: this.x,
data: x,
},
yAxis: {
type: 'value',
@ -65,73 +149,7 @@ export class ThroughputChart implements Component {
}
}
},
series: [
{
name: "L",
type: "line",
data: this.downloadMin,
symbol: 'none',
stack: 'confidence-band',
lineStyle: {
opacity: 0
},
},
{
name: "U",
type: "line",
data: this.downloadMax,
symbol: 'none',
stack: 'confidence-band',
lineStyle: {
opacity: 0
},
areaStyle: {
color: '#ccc'
},
},
{
name: "Download",
type: "line",
data: this.download,
symbol: 'none',
itemStyle: {
color: '#333'
},
},
// Upload
{
name: "LU",
type: "line",
data: this.uploadMin,
symbol: 'none',
stack: 'confidence-band',
lineStyle: {
opacity: 0
},
},
{
name: "UU",
type: "line",
data: this.uploadMax,
symbol: 'none',
stack: 'confidence-band',
lineStyle: {
opacity: 0
},
areaStyle: {
color: '#ccc'
},
},
{
name: "Upload",
type: "line",
data: this.upload,
symbol: 'none',
itemStyle: {
color: '#333'
},
},
]
series: series
})
);
option && this.myChart.setOption(option);