WIP: Site info page has a nicer RTT graph and shows oversubscription estimates.

This commit is contained in:
Herbert Wolverson 2023-07-28 21:30:55 +00:00
parent 9ca8ede727
commit 0d2f83985a
18 changed files with 435 additions and 194 deletions

View File

@ -196,4 +196,25 @@ impl InfluxQueryBuilder {
Err(Error::msg("Organization not found"))
}
}
#[instrument(skip(cnn, key, query))]
pub async fn raw<T>(cnn: &Pool<Postgres>, key: &str, query: String) -> Result<Vec<T>>
where T: FromMap + std::fmt::Debug
{
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
tracing::info!("{query}");
let query = Query::new(query);
let rows = client.query::<T>(Some(query)).await;
if let Ok(rows) = rows {
Ok(rows)
} else {
tracing::error!("InfluxDb query error: {rows:?}");
Err(Error::msg("Influx query error"))
}
} else {
Err(Error::msg("Organization not found"))
}
}
}

View File

@ -55,7 +55,7 @@ pub async fn send_rtt_histogram_for_all_nodes(
Ok(())
}
fn rtt_rows_to_result(rows: Vec<RttRow>, node_status: Vec<NodeStatus>) -> Vec<RttHost> {
pub(crate) fn rtt_rows_to_result(rows: Vec<RttRow>, node_status: Vec<NodeStatus>) -> Vec<RttHost> {
let mut result = Vec::<RttHost>::new();
for row in rows.into_iter() {
if let Some(host) = result.iter_mut().find(|h| h.node_id == row.host_id) {

View File

@ -0,0 +1,107 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, send_response, influx_query_builder::InfluxQueryBuilder};
use axum::extract::ws::WebSocket;
use pgdb::{
sqlx::{Pool, Postgres},
NodeStatus
};
use tracing::instrument;
use wasm_pipe_types::{Rtt, RttHost};
use super::rtt_row::{RttRow, RttHistoRow, RttSiteRow};
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_rtt_for_all_nodes_site(
cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_name: String, period: InfluxTimePeriod
) -> anyhow::Result<()> {
let rows = InfluxQueryBuilder::new(period.clone())
.with_measurement("tree")
.with_fields(&["rtt_avg", "rtt_min", "rtt_max"])
.with_filter(format!("r[\"node_name\"] == \"{}\"", site_name))
.with_groups(&["host_id", "_field"])
.execute::<RttSiteRow>(cnn, key)
.await?;
let node_status = pgdb::node_status(cnn, key).await?;
let nodes = rtt_rows_to_result(rows, node_status);
send_response(socket, wasm_pipe_types::WasmResponse::RttChartSite { nodes }).await;
Ok(())
}
fn rtt_rows_to_result(rows: Vec<RttSiteRow>, node_status: Vec<NodeStatus>) -> Vec<RttHost> {
let mut result = Vec::<RttHost>::new();
for row in rows.into_iter() {
if let Some(host) = result.iter_mut().find(|h| h.node_id == row.host_id) {
// We found one - add to it
host.rtt.push(Rtt {
value: f64::min(200.0, row.rtt_avg),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: f64::min(200.0, row.rtt_min),
u: f64::min(200.0, row.rtt_max) - f64::min(200.0, row.rtt_min),
});
} else {
let rtt = vec![Rtt {
value: f64::min(200.0, row.rtt_avg),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: f64::min(200.0, row.rtt_min),
u: f64::min(200.0, row.rtt_max) - f64::min(200.0, row.rtt_min),
}];
let node_name = node_status
.iter()
.filter(|n| n.node_id == row.host_id)
.map(|n| n.node_name.clone())
.next()
.unwrap_or("".to_string());
let new_host = RttHost {
node_id: row.host_id,
node_name,
rtt,
};
result.push(new_host);
}
}
result
}
const TREE_QUERY: &str = "
import \"join\"
import \"sql\"
sqlData =
sql.from(
driverName: \"postgres\",
dataSourceName: \"postgresql://license:license@127.0.0.1:5432/libreqos\",
query: \"WITH RECURSIVE children
(index, site_name, level, parent) AS (
SELECT index, site_name, 0, parent FROM site_tree WHERE key='%KEY%' and index = %SITE%
UNION ALL
SELECT
st.index,
st.site_name,
children.level + 1,
children.parent
FROM site_tree st, children
WHERE children.index = st.parent AND children.level < 2 AND key='%KEY%'
)
SELECT DISTINCT circuit_id FROM shaped_devices WHERE key='%KEY%'
AND parent_node IN (SELECT site_name FROM children);\",
)
bitsData = from(bucket: \"izones\")
|> range(start: -5m)
|> filter(fn: (r) => r[\"_measurement\"] == \"rtt\")
|> filter(fn: (r) => r[\"organization_id\"] == \"%KEY%\")
|> filter(fn: (r) => r[\"_field\"] == \"avg\" or r[\"_field\"] == \"max\" or r[\"_field\"] == \"min\")
|> filter(fn: (r) => r[\"_value\"] > 0 and r[\"circuit_id\"] != \"unknown\")
|> aggregateWindow(every: 10s, fn: mean, createEmpty: false)
|> group()
|> limit(n : 500)
join.inner(left: bitsData, right: sqlData, on: (l,r) => l.circuit_id == r.circuit_id, as: (l,r) => ({l with rightValue: r.circuit_id}))
|> drop(columns: [\"circuit_id\", \"ip\", \"organization_id\"])
|> group(columns: [\"_field\", \"host_id\"])
|> aggregateWindow(every: 10s, fn: median, createEmpty: false)
";

View File

@ -22,6 +22,23 @@ impl Default for RttRow {
}
}
#[derive(Debug, FromDataPoint)]
pub struct RttValue {
pub host_id: String,
pub avg: f64,
pub time: DateTime<FixedOffset>,
}
impl Default for RttValue {
fn default() -> Self {
Self {
host_id: "".to_string(),
avg: 0.0,
time: DateTime::<Utc>::MIN_UTC.into(),
}
}
}
#[derive(Debug, FromDataPoint, Default)]
pub struct RttHistoRow {
pub avg: f64,

View File

@ -1,9 +1,9 @@
use super::site_tree::tree_to_host;
use crate::web::wss::send_response;
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use serde::Serialize;
use wasm_pipe_types::{SiteTree, WasmResponse};
use crate::web::wss::send_response;
use super::site_tree::tree_to_host;
use wasm_pipe_types::{SiteTree, WasmResponse, SiteOversubscription};
#[derive(Serialize)]
struct SiteInfoMessage {
@ -11,10 +11,31 @@ struct SiteInfoMessage {
data: SiteTree,
}
pub async fn send_site_info(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
site_id: &str,
) {
tracing::error!("REQUESTING SITE INFO");
let (host, oversub) = tokio::join!(
pgdb::get_site_info(cnn, key, site_id),
pgdb::get_oversubscription(cnn, key, site_id)
);
pub async fn send_site_info(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_id: &str) {
if let Ok(host) = pgdb::get_site_info(cnn, key, site_id).await {
let host = tree_to_host(host);
send_response(socket, WasmResponse::SiteInfo { data: host }).await;
if let Ok(host) = host {
if let Ok(oversub) = oversub {
let host = tree_to_host(host);
let oversubscription = SiteOversubscription {
dlmax: oversub.dlmax,
dlmin: oversub.dlmin,
devicecount: oversub.devicecount,
};
send_response(socket, WasmResponse::SiteInfo { data: host, oversubscription }).await;
} else {
tracing::error!("{oversub:?}");
}
} else {
tracing::error!("{host:?}");
}
}
}

View File

@ -1,8 +1,8 @@
#![allow(dead_code)]
#[derive(Clone, Debug)]
pub struct InfluxTimePeriod {
start: String,
aggregate: String,
pub start: String,
pub aggregate: String,
sample: i32,
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,5 +1,5 @@
use sqlx::{FromRow, Pool, Postgres, Row};
use crate::license::StatsHostError;
use sqlx::{FromRow, Pool, Postgres, Row};
#[derive(Debug, FromRow)]
pub struct TreeNode {
@ -54,7 +54,9 @@ pub async fn get_site_id_from_name(
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_id: i32 = site_id_db.try_get("index").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_id: i32 = site_id_db
.try_get("index")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
Ok(site_id)
}
@ -72,19 +74,29 @@ pub async fn get_parent_list(
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let mut site_id: i32 = site_id_db.try_get("index").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let mut site_id: i32 = site_id_db
.try_get("index")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
// Get the parent list
while site_id != 0 {
let parent_db = sqlx::query("SELECT site_name, parent, site_type FROM site_tree WHERE key = $1 AND index=$2")
.bind(key)
.bind(site_id)
.fetch_one(cnn)
.await
let parent_db = sqlx::query(
"SELECT site_name, parent, site_type FROM site_tree WHERE key = $1 AND index=$2",
)
.bind(key)
.bind(site_id)
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let parent: String = parent_db
.try_get("site_name")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_type: String = parent_db
.try_get("site_type")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
site_id = parent_db
.try_get("parent")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let parent: String = parent_db.try_get("site_name").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_type: String = parent_db.try_get("site_type").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
site_id = parent_db.try_get("parent").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
result.push((site_type, parent));
}
@ -105,33 +117,47 @@ pub async fn get_child_list(
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_id: i32 = site_id_db.try_get("index").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
// Add child sites
let child_sites = sqlx::query("SELECT site_name, parent, site_type FROM site_tree WHERE key=$1 AND parent=$2")
.bind(key)
.bind(site_id)
.fetch_all(cnn)
.await
let site_id: i32 = site_id_db
.try_get("index")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
// Add child sites
let child_sites = sqlx::query(
"SELECT site_name, parent, site_type FROM site_tree WHERE key=$1 AND parent=$2",
)
.bind(key)
.bind(site_id)
.fetch_all(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
for child in child_sites {
let child_name: String = child.try_get("site_name").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let child_type: String = child.try_get("site_type").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let child_name: String = child
.try_get("site_name")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let child_type: String = child
.try_get("site_type")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
result.push((child_type, child_name.clone(), child_name));
}
// Add child shaper nodes
let child_circuits = sqlx::query("SELECT circuit_id, circuit_name FROM shaped_devices WHERE key=$1 AND parent_node=$2")
.bind(key)
.bind(site_name)
.fetch_all(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let child_circuits = sqlx::query(
"SELECT circuit_id, circuit_name FROM shaped_devices WHERE key=$1 AND parent_node=$2",
)
.bind(key)
.bind(site_name)
.fetch_all(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
for child in child_circuits {
let child_name: String = child.try_get("circuit_name").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let child_id: String = child.try_get("circuit_id").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let child_name: String = child
.try_get("circuit_name")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let child_id: String = child
.try_get("circuit_id")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
result.push(("circuit".to_string(), child_id, child_name));
}
@ -148,13 +174,14 @@ pub async fn get_circuit_parent_list(
let mut result = Vec::new();
// Get the site name to start at
let site_name : String = sqlx::query("SELECT parent_node FROM shaped_devices WHERE key = $1 AND circuit_id= $2")
.bind(key)
.bind(circuit_id)
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?
.get(0);
let site_name: String =
sqlx::query("SELECT parent_node FROM shaped_devices WHERE key = $1 AND circuit_id= $2")
.bind(key)
.bind(circuit_id)
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?
.get(0);
// Get the site index
let site_id_db = sqlx::query("SELECT index FROM site_tree WHERE key = $1 AND site_name=$2")
@ -163,21 +190,81 @@ pub async fn get_circuit_parent_list(
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let mut site_id: i32 = site_id_db.try_get("index").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let mut site_id: i32 = site_id_db
.try_get("index")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
// Get the parent list
while site_id != 0 {
let parent_db = sqlx::query("SELECT site_name, parent, site_type FROM site_tree WHERE key = $1 AND index=$2")
.bind(key)
.bind(site_id)
.fetch_one(cnn)
.await
let parent_db = sqlx::query(
"SELECT site_name, parent, site_type FROM site_tree WHERE key = $1 AND index=$2",
)
.bind(key)
.bind(site_id)
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let parent: String = parent_db
.try_get("site_name")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_type: String = parent_db
.try_get("site_type")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
site_id = parent_db
.try_get("parent")
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let parent: String = parent_db.try_get("site_name").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_type: String = parent_db.try_get("site_type").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
site_id = parent_db.try_get("parent").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
result.push((site_type, parent));
}
Ok(result)
}
}
#[derive(Debug, FromRow)]
pub struct SiteOversubscription {
pub dlmax: i64,
pub dlmin: i64,
pub devicecount: i64,
}
pub async fn get_oversubscription(cnn: &Pool<Postgres>, key: &str, site_name: &str) -> Result<SiteOversubscription, StatsHostError> {
let site_id = get_site_id_from_name(cnn, key, site_name).await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
const SQL: &str = "WITH RECURSIVE children
(index, site_name, level, parent) AS (
SELECT index, site_name, 0, parent FROM site_tree WHERE key=$1 and index = $2
UNION ALL
SELECT
st.index,
st.site_name,
children.level + 1,
children.parent
FROM site_tree st, children
WHERE children.index = st.parent AND children.level < 2 AND key=$3
),
devices (circuit_id, download_max_mbps, download_min_mbps) AS (
SELECT DISTINCT
circuit_id,
download_max_mbps,
download_min_mbps
FROM shaped_devices WHERE key=$4
AND parent_node IN (SELECT site_name FROM children)
)
SELECT
SUM(download_max_mbps) AS dlmax,
SUM(download_min_mbps) AS dlmin,
COUNT(circuit_id) AS devicecount
FROM devices;";
let rows = sqlx::query_as::<_, SiteOversubscription>(SQL)
.bind(key)
.bind(site_id)
.bind(key)
.bind(key)
.fetch_one(cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
Ok(rows)
}

View File

@ -22,7 +22,7 @@ export class RttHistoSite implements Component {
}
onmessage(event: any): void {
if (event.msg == "RttChartSite") {
/*if (event.msg == "RttChartSite") {
//console.log(event);
this.download = [];
this.x = [];
@ -57,6 +57,6 @@ export class RttHistoSite implements Component {
option && this.myChart.setOption(option);
// this.chartMade = true;
}
}
}*/
}
}

View File

@ -49,29 +49,6 @@ export class RttChartSite implements Component {
}
if (first) first = false;
let min: echarts.SeriesOption = {
name: "L",
type: "line",
data: l,
symbol: 'none',
stack: 'confidence-band-' + node.node_id,
lineStyle: {
opacity: 0
},
};
let max: echarts.SeriesOption = {
name: "U",
type: "line",
data: u,
symbol: 'none',
stack: 'confidence-band-' + node.node_id,
lineStyle: {
opacity: 0
},
areaStyle: {
color: '#ccc'
},
};
let val: echarts.SeriesOption = {
name: node.node_name,
type: "line",
@ -79,8 +56,8 @@ export class RttChartSite implements Component {
symbol: 'none',
};
series.push(min);
series.push(max);
//series.push(min);
//series.push(max);
series.push(val);
}
@ -89,16 +66,11 @@ export class RttChartSite implements Component {
var option: echarts.EChartsOption;
this.myChart.setOption<echarts.EChartsOption>(
(option = {
title: { text: "TCP Round-Trip Time" },
title: { text: "Average TCP Round-Trip Time" },
tooltip: {
trigger: "axis",
formatter: function (params: any) {
let ret = "";
for (let i=0; i<params.length; i+=3) {
if (params[i+2].value > 0) {
ret += "<strong>" + params[i+2].seriesName + "</strong>: " + params[i+2].value.toFixed(1) + " ms<br/>";
}
}
let ret = params[0].value.toFixed(1) + " ms<br/>";
return ret;
}
},

View File

@ -5,7 +5,6 @@ import { request_site_info } from "../../wasm/wasm_pipe";
export class SiteInfo implements Component {
siteId: string;
count: number = 0;
constructor(siteId: string) {
this.siteId = siteId;
@ -16,10 +15,7 @@ export class SiteInfo implements Component {
}
ontick(): void {
this.count++;
if (this.count % 10 == 0) {
request_site_info(decodeURI(this.siteId));
}
request_site_info(decodeURI(this.siteId));
}
onmessage(event: any): void {
@ -33,6 +29,21 @@ export class SiteInfo implements Component {
html += "<tr><td>Current RTT:</td><td>" + event.SiteInfo.data.current_rtt / 100.0 + " ms</td></tr>";
html += "</table>";
div.innerHTML = html;
// Obersub
let dlmax = event.SiteInfo.oversubscription.dlmax * mbps_to_bps;
let dlmin = event.SiteInfo.oversubscription.dlmin * mbps_to_bps;
let maxover = (dlmax / (event.SiteInfo.data.max_down * mbps_to_bps) * 100.0).toFixed(1);
let minover = (dlmin / (event.SiteInfo.data.max_down * mbps_to_bps) * 100.0).toFixed(1);
div = document.getElementById("oversub") as HTMLDivElement;
html = "";
html += "<table class='table table-striped'>";
html += "<tr><td>Total Subscribers:</td><td>" + event.SiteInfo.oversubscription.devicecount + "</td></tr>";
html += "<tr><td>Total Download (Max):</td><td>" + scaleNumber(dlmax) + " (" + maxover + "%)</td></tr>";
html += "<tr><td>Total Download (Min):</td><td>" + scaleNumber(dlmin) + " (" + minover + "%)</td></tr>";
html += "</table>";
div.innerHTML = html;
}
}
}

View File

@ -5,7 +5,6 @@ import { Component } from '../components/component';
import { ThroughputSiteChart } from '../components/throughput_site';
import { SiteInfo } from '../components/site_info';
import { RttChartSite } from '../components/rtt_site';
import { RttHistoSite } from '../components/rtt_histo_site';
import { SiteBreadcrumbs } from '../components/site_breadcrumbs';
import { SiteHeat } from '../components/site_heat';
import { SiteStackChart } from '../components/site_stack';
@ -27,7 +26,6 @@ export class SitePage implements Page {
new SiteBreadcrumbs(siteId),
new ThroughputSiteChart(siteId),
new RttChartSite(siteId),
//new RttHistoSite(),
//new SiteHeat(siteId),
//new SiteStackChart(siteId),
];

View File

@ -14,7 +14,7 @@
<div class="col-6">
<div class="card">
<div class="card-body">
<div id="throughputChart" style="height: 250px"></div>
<div id="oversub"></div>
</div>
</div>
</div>
@ -24,7 +24,7 @@
<div class="col-6">
<div class="card">
<div class="card-body">
<div id="rttChart" style="height: 250px"></div>
<div id="throughputChart" style="height: 250px"></div>
</div>
</div>
</div>
@ -32,7 +32,7 @@
<div class="col-6">
<div class="card">
<div class="card-body">
<div id="rttHisto" style="height: 250px"></div>
<div id="rttChart" style="height: 250px"></div>
</div>
</div>
</div>

View File

@ -730,11 +730,11 @@ function __wbg_get_imports() {
const ret = getObject(arg0).now();
return ret;
};
imports.wbg.__wbindgen_closure_wrapper1481 = function(arg0, arg1, arg2) {
imports.wbg.__wbindgen_closure_wrapper1516 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_16);
return addHeapObject(ret);
};
imports.wbg.__wbindgen_closure_wrapper1483 = function(arg0, arg1, arg2) {
imports.wbg.__wbindgen_closure_wrapper1518 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_16);
return addHeapObject(ret);
};

View File

@ -52,7 +52,7 @@ pub enum WasmResponse {
SiteHeat { data: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>>},
NodePerfChart { nodes: Vec<PerfHost> },
SiteTree { data: Vec<SiteTree> },
SiteInfo { data: SiteTree },
SiteInfo { data: SiteTree, oversubscription: SiteOversubscription },
SiteParents { data: Vec<(String, String)> },
SiteChildren { data: Vec<(String, String, String)> },
SearchResult { hits: Vec<SearchResult> },
@ -169,6 +169,13 @@ pub struct SiteTree {
pub current_rtt: i32,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SiteOversubscription {
pub dlmax: i64,
pub dlmin: i64,
pub devicecount: i64,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SearchResult {
pub name: String,