Finish RTT integration with multiple hosts.

Add a time period to graphs.
Add a drop-down selector for visible time period.
Remember your preferred setting from last time.
This commit is contained in:
Herbert Wolverson 2023-04-26 18:04:17 +00:00
parent beab18b9ec
commit 0d0b2d9b46
13 changed files with 338 additions and 50 deletions

View File

@ -1,15 +1,23 @@
use crate::web::wss::queries::{
send_packets_for_all_nodes, send_rtt_for_all_nodes, send_throughput_for_all_nodes,
};
use axum::{
extract::{ws::{WebSocket, WebSocketUpgrade}, State},
extract::{
ws::{WebSocket, WebSocketUpgrade},
State,
},
response::IntoResponse,
};
use pgdb::sqlx::{Pool, Postgres};
use serde_json::Value;
use crate::web::wss::queries::{send_packets_for_all_nodes, send_throughput_for_all_nodes, send_rtt_for_all_nodes};
mod login;
mod nodes;
mod queries;
pub async fn ws_handler(ws: WebSocketUpgrade, State(state): State<Pool<Postgres>>) -> impl IntoResponse {
pub async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<Pool<Postgres>>,
) -> impl IntoResponse {
ws.on_upgrade(move |sock| handle_socket(sock, state))
}
@ -31,15 +39,20 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
let _ = pgdb::refresh_token(cnn.clone(), &credentials.token).await;
}
let period =
queries::time_period::InfluxTimePeriod::new(json.get("period").cloned());
if let Some(Value::String(msg_type)) = json.get("msg") {
match msg_type.as_str() {
"login" => { // A full login request
"login" => {
// A full login request
let result = login::on_login(&json, &mut socket, cnn).await;
if let Some(result) = result {
credentials = Some(result);
}
}
"auth" => { // Login with just a token
"auth" => {
// Login with just a token
let result = login::on_token_auth(&json, &mut socket, cnn).await;
if let Some(result) = result {
credentials = Some(result);
@ -47,28 +60,51 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
}
"nodeStatus" => {
if let Some(credentials) = &credentials {
nodes::node_status(cnn.clone(), &mut socket, &credentials.license_key).await;
nodes::node_status(
cnn.clone(),
&mut socket,
&credentials.license_key,
)
.await;
} else {
log::info!("Node status requested but no credentials provided");
}
}
"packetChart" => {
if let Some(credentials) = &credentials {
let _ = send_packets_for_all_nodes(cnn.clone(), &mut socket, &credentials.license_key).await;
let _ = send_packets_for_all_nodes(
cnn.clone(),
&mut socket,
&credentials.license_key,
period,
)
.await;
} else {
log::info!("Throughput requested but no credentials provided");
}
}
"throughputChart" => {
if let Some(credentials) = &credentials {
let _ = send_throughput_for_all_nodes(cnn.clone(), &mut socket, &credentials.license_key).await;
let _ = send_throughput_for_all_nodes(
cnn.clone(),
&mut socket,
&credentials.license_key,
period,
)
.await;
} else {
log::info!("Throughput requested but no credentials provided");
}
}
"rttChart" => {
if let Some(credentials) = &credentials {
let _ = send_rtt_for_all_nodes(cnn.clone(), &mut socket, &credentials.license_key).await;
let _ = send_rtt_for_all_nodes(
cnn.clone(),
&mut socket,
&credentials.license_key,
period,
)
.await;
} else {
log::info!("Throughput requested but no credentials provided");
}

View File

@ -4,6 +4,7 @@
mod packet_counts;
mod throughput;
mod rtt;
pub mod time_period;
pub use packet_counts::send_packets_for_all_nodes;
pub use throughput::send_throughput_for_all_nodes;
pub use rtt::send_rtt_for_all_nodes;

View File

@ -8,8 +8,10 @@ use futures::future::join_all;
use influxdb2::{models::Query, Client};
use pgdb::sqlx::{Pool, Postgres};
pub async fn send_packets_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str) -> anyhow::Result<()> {
let nodes = get_packets_for_all_nodes(cnn, key).await?;
use super::time_period::InfluxTimePeriod;
pub async fn send_packets_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_packets_for_all_nodes(cnn, key, period).await?;
let chart = PacketChart { msg: "packetChart".to_string(), nodes };
let json = serde_json::to_string(&chart).unwrap();
@ -22,7 +24,7 @@ pub async fn send_packets_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSoc
/// # Arguments
/// * `cnn` - A connection pool to the database
/// * `key` - The organization's license key
pub async fn get_packets_for_all_nodes(cnn: Pool<Postgres>, key: &str) -> anyhow::Result<Vec<PacketHost>> {
pub async fn get_packets_for_all_nodes(cnn: Pool<Postgres>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<PacketHost>> {
let node_status = pgdb::node_status(cnn.clone(), key).await?;
let mut futures = Vec::new();
for node in node_status {
@ -31,6 +33,7 @@ pub async fn get_packets_for_all_nodes(cnn: Pool<Postgres>, key: &str) -> anyhow
key,
node.node_id.to_string(),
node.node_name.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<PacketHost>> = join_all(futures).await
@ -50,6 +53,7 @@ pub async fn get_packets_for_node(
key: &str,
node_id: String,
node_name: String,
period: InfluxTimePeriod,
) -> anyhow::Result<PacketHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
@ -57,13 +61,13 @@ pub async fn get_packets_for_node(
let qs = format!(
"from(bucket: \"{}\")
|> range(start: -5m)
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"packets\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> {}
|> yield(name: \"last\")",
org.influx_bucket, org.key, node_id
org.influx_bucket, period.range(), org.key, node_id, period.aggregate_window()
);
let query = Query::new(qs);
@ -84,7 +88,7 @@ pub async fn get_packets_for_node(
for row in rows.iter().filter(|r| r.direction == "down") {
down.push(Packets {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
@ -94,7 +98,7 @@ pub async fn get_packets_for_node(
for row in rows.iter().filter(|r| r.direction == "up") {
up.push(Packets {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});

View File

@ -0,0 +1,100 @@
use axum::extract::ws::{WebSocket, Message};
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::sqlx::{Pool, Postgres};
use crate::submissions::get_org_details;
use self::{rtt_row::RttRow, rtt_host::{Rtt, RttHost, RttChart}};
use super::time_period::InfluxTimePeriod;
mod rtt_row;
mod rtt_host;
pub async fn send_rtt_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes(cnn, key, period).await?;
let mut histogram = vec![0; 20];
for node in nodes.iter() {
for rtt in node.rtt.iter() {
let bucket = usize::min(19, (rtt.value / 200.0) as usize);
histogram[bucket] += 1;
}
}
let chart = RttChart { msg: "rttChart".to_string(), nodes, histogram };
let json = serde_json::to_string(&chart).unwrap();
socket.send(Message::Text(json)).await.unwrap();
Ok(())
}
pub async fn get_rtt_for_all_nodes(cnn: Pool<Postgres>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<RttHost>> {
let node_status = pgdb::node_status(cnn.clone(), key).await?;
let mut futures = Vec::new();
for node in node_status {
futures.push(get_rtt_for_node(
cnn.clone(),
key,
node.node_id.to_string(),
node.node_name.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<RttHost>> = join_all(futures).await
.into_iter().collect();
all_nodes
}
pub async fn get_rtt_for_node(
cnn: Pool<Postgres>,
key: &str,
node_id: String,
node_name: String,
period: InfluxTimePeriod,
) -> anyhow::Result<RttHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"rtt\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, period.aggregate_window()
);
let query = Query::new(qs);
let rows = client.query::<RttRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut rtt = Vec::new();
// Fill download
for row in rows.iter() {
rtt.push(Rtt {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
return Ok(RttHost{
node_id,
node_name,
rtt,
});
}
}
}
Err(anyhow::Error::msg("Unable to query influx"))
}

View File

@ -0,0 +1,23 @@
use serde::Serialize;
#[derive(Serialize, Debug)]
pub struct Rtt {
pub value: f64,
pub date: String,
pub l: f64,
pub u: f64,
}
#[derive(Serialize, Debug)]
pub struct RttHost {
pub node_id: String,
pub node_name: String,
pub rtt: Vec<Rtt>,
}
#[derive(Serialize, Debug)]
pub struct RttChart {
pub msg: String,
pub nodes: Vec<RttHost>,
pub histogram: Vec<u32>,
}

View File

@ -0,0 +1,23 @@
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::FromDataPoint;
#[derive(Debug, FromDataPoint)]
pub struct RttRow {
pub host_id: String,
pub min: f64,
pub max: f64,
pub avg: f64,
pub time: DateTime<FixedOffset>,
}
impl Default for RttRow {
fn default() -> Self {
Self {
host_id: "".to_string(),
min: 0.0,
max: 0.0,
avg: 0.0,
time: DateTime::<Utc>::MIN_UTC.into(),
}
}
}

View File

@ -4,11 +4,13 @@ use influxdb2::{Client, models::Query};
use pgdb::sqlx::{Pool, Postgres};
use crate::submissions::get_org_details;
use self::{throughput_host::{ThroughputHost, Throughput, ThroughputChart}, throughput_row::ThroughputRow};
use super::time_period::InfluxTimePeriod;
mod throughput_host;
mod throughput_row;
pub async fn send_throughput_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes(cnn, key).await?;
pub async fn send_throughput_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes(cnn, key, period).await?;
let chart = ThroughputChart { msg: "bitsChart".to_string(), nodes };
let json = serde_json::to_string(&chart).unwrap();
@ -16,7 +18,7 @@ pub async fn send_throughput_for_all_nodes(cnn: Pool<Postgres>, socket: &mut Web
Ok(())
}
pub async fn get_throughput_for_all_nodes(cnn: Pool<Postgres>, key: &str) -> anyhow::Result<Vec<ThroughputHost>> {
pub async fn get_throughput_for_all_nodes(cnn: Pool<Postgres>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<ThroughputHost>> {
let node_status = pgdb::node_status(cnn.clone(), key).await?;
let mut futures = Vec::new();
for node in node_status {
@ -25,6 +27,7 @@ pub async fn get_throughput_for_all_nodes(cnn: Pool<Postgres>, key: &str) -> any
key,
node.node_id.to_string(),
node.node_name.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<ThroughputHost>> = join_all(futures).await
@ -37,6 +40,7 @@ pub async fn get_throughput_for_node(
key: &str,
node_id: String,
node_name: String,
period: InfluxTimePeriod,
) -> anyhow::Result<ThroughputHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
@ -44,13 +48,13 @@ pub async fn get_throughput_for_node(
let qs = format!(
"from(bucket: \"{}\")
|> range(start: -5m)
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"bits\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> {}
|> yield(name: \"last\")",
org.influx_bucket, org.key, node_id
org.influx_bucket, period.range(), org.key, node_id, period.aggregate_window()
);
let query = Query::new(qs);
@ -71,7 +75,7 @@ pub async fn get_throughput_for_node(
for row in rows.iter().filter(|r| r.direction == "down") {
down.push(Throughput {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
@ -81,7 +85,7 @@ pub async fn get_throughput_for_node(
for row in rows.iter().filter(|r| r.direction == "up") {
up.push(Throughput {
value: row.avg,
date: row.time.format("%H:%M:%S").to_string(),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});

View File

@ -0,0 +1,55 @@
use serde_json::Value;
#[derive(Clone)]
pub struct InfluxTimePeriod {
start: String,
aggregate: String,
}
impl InfluxTimePeriod {
pub fn new(period: Option<Value>) -> Self {
if let Some(period) = period {
let start = match period.as_str() {
Some("5m") => "-5m",
Some("15m") => "-15m",
Some("1h") => "-60m",
Some("6h") => "-360m",
Some("12h") => "-720m",
Some("24h") => "-1440m",
Some("7d") => "-10080m",
Some("28d") => "-40320m",
_ => "-5m",
};
let aggregate = match period.as_str() {
Some("5m") => "10s",
Some("15m") => "10s",
Some("1h") => "10s",
Some("6h") => "1m",
Some("12h") => "2m",
Some("24h") => "4m",
Some("7d") => "30m",
Some("28d") => "1h",
_ => "10s"
};
Self {
start: start.to_string(),
aggregate: aggregate.to_string(),
}
} else {
Self {
start: "-5m".to_string(),
aggregate: "10s".to_string(),
}
}
}
pub fn range(&self) -> String {
format!("range(start: {})", self.start)
}
pub fn aggregate_window(&self) -> String {
format!("aggregateWindow(every: {}, fn: mean, createEmpty: false)", self.aggregate)
}
}

View File

@ -1,4 +1,5 @@
import 'bootstrap/dist/css/bootstrap.css';
import 'bootstrap/dist/js/bootstrap.js';
import { SiteRouter } from './router';
import { Bus } from './bus';
import { Auth } from './auth';
@ -9,6 +10,8 @@ declare global {
bus: Bus;
auth: Auth;
login: any;
graphPeriod: string;
changeGraphPeriod: any;
}
}
@ -17,7 +20,20 @@ window.bus = new Bus();
window.router = new SiteRouter();
window.bus.connect();
window.router.initialRoute();
let graphPeriod = localStorage.getItem('graphPeriod');
if (!graphPeriod) {
graphPeriod = "5m";
localStorage.setItem('graphPeriod', graphPeriod);
}
window.graphPeriod = graphPeriod;
window.changeGraphPeriod = (period: string) => changeGraphPeriod(period);
window.setInterval(() => {
window.router.ontick();
}, 1000);
window.bus.updateConnected();
}, 1000);
function changeGraphPeriod(period: string) {
window.graphPeriod = period;
localStorage.setItem('graphPeriod', period);
}

View File

@ -3,24 +3,29 @@ import { SiteRouter } from "./router";
export class Bus {
ws: WebSocket;
connected: boolean;
constructor() {
this.connected = false;
}
updateConnected() {
let indicator = document.getElementById("connStatus");
if (indicator && this.connected) {
indicator.style.color = "green";
} else if (indicator) {
indicator.style.color = "red";
}
}
connect() {
this.ws = new WebSocket("ws://192.168.100.10:9127/ws");
this.ws.onopen = () => {
let indicator = document.getElementById("connStatus");
if (indicator) {
indicator.style.color = "green";
}
this.connected = true;
this.sendToken();
};
this.ws.onclose = (e) => {
let indicator = document.getElementById("connStatus");
if (indicator) {
indicator.style.color = "red";
}
this.connected = false;
console.log("close", e)
};
this.ws.onerror = (e) => { console.log("error", e) };
@ -53,15 +58,15 @@ export class Bus {
}
requestPacketChart() {
this.ws.send("{ \"msg\": \"packetChart\" }");
this.ws.send("{ \"msg\": \"packetChart\", \"period\": \"" + window.graphPeriod + "\" }");
}
requestThroughputChart() {
this.ws.send("{ \"msg\": \"throughputChart\" }");
this.ws.send("{ \"msg\": \"throughputChart\", \"period\": \"" + window.graphPeriod + "\" }");
}
requestRttChart() {
this.ws.send("{ \"msg\": \"rttChart\" }");
this.ws.send("{ \"msg\": \"rttChart\", \"period\": \"" + window.graphPeriod + "\" }");
}
}

View File

@ -12,7 +12,6 @@
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.0/css/all.min.css" />
</head>
<body>
<div id="connStatus" style="color: red"><i class="fa-sharp fa-solid fa-plug"></i></div>
<div id="main"></div>
<footer>Copyright &copy; 2023 LibreQoS</footer>
</body>

View File

@ -1,10 +1,38 @@
<main class="d-flex flex-nowrap">
<div class="d-flex flex-column flex-shrink-0 p-3 text-bg-dark" style="width: 280px">
<a href="/" class="d-flex align-items-center mb-3 mb-md-0 me-md-auto text-white text-decoration-none">
<div class="d-flex flex-nowrap">
<div class="d-flex flex-row flex-fill text-bg-dark" style="padding: 6px;">
<a href="#" class="d-flex align-items-center mb-3 mb-md-0 me-md-auto text-white text-decoration-none">
<!-- Logo -->
<span class="fs-4">LibreQoS</span>
</a>
<hr>
<div class="dropdown" style="padding: 6px;">
<button class="btn btn-secondary dropdown-toggle" type="button" data-bs-toggle="dropdown" aria-expanded="false">
Graph Period
</button>
<ul class="dropdown-menu">
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('5m')">5 minutes</a></li>
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('15m')">15 minutes</a></li>
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('1h')">1 hour</a></li>
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('6h')">6 hours</a></li>
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('12h')">12 hours</a></li>
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('24h')">24 hours</a></li>
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('7d')">7 days</a></li>
<li><a class="dropdown-item" href="#" onclick="window.changeGraphPeriod('28d')">28 days</a></li>
</ul>
</div>
<form class="d-flex" role="search">
<input class="form-control me-2" type="search" placeholder="Search" aria-label="Search">
<button class="btn btn-outline-success" type="submit">Search</button>
</form>
<div id="connStatus" class="fs-4" style="color: red; padding: 6px;"><i class="fa-sharp fa-solid fa-plug"></i>
</div>
</div>
</div>
<div class="d-flex flex-nowrap">
<div class="d-flex flex-column flex-shrink-0 p-3 text-bg-dark" style="width: 280px">
<ul class="nav nav-pills flex-column mb-auto">
<li class="nav-item">
<a href="#" class="nav-link text-white" aria-current="page" id="menuDash">
@ -25,6 +53,6 @@
<div class="b-example-divider b-example-vr"></div>
<div class="d-flex flex-row p-2 flex-fill" id="mainContent">
</div>
</main>
</div>

View File

@ -1,10 +1,4 @@
@import 'bootstrap/dist/css/bootstrap.css';
#connStatus {
position: fixed;
right: 0;
top: 0;
z-index: 1000;
}
.b-example-divider {
height: 3rem;