WIP: RTT chart for site display is optimized

This commit is contained in:
Herbert Wolverson 2023-07-28 16:33:28 +00:00
parent 3fb812878b
commit 67dcc9f30b
9 changed files with 132 additions and 221 deletions

View File

@ -182,6 +182,7 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
.await; .await;
} }
(WasmRequest::RttChartSite { period, site_id }, Some(credentials)) => { (WasmRequest::RttChartSite { period, site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
let _ = send_rtt_for_all_nodes_site( let _ = send_rtt_for_all_nodes_site(
&cnn, &cnn,
wss, wss,

View File

@ -1,5 +1,7 @@
mod per_node; mod per_node;
pub use per_node::*; pub use per_node::*;
mod per_site;
pub use per_site::*;
use axum::extract::ws::WebSocket; use axum::extract::ws::WebSocket;
use futures::future::join_all; use futures::future::join_all;
@ -8,26 +10,10 @@ use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tracing::instrument; use tracing::instrument;
use wasm_pipe_types::{RttHost, Rtt}; use wasm_pipe_types::{RttHost, Rtt};
use crate::web::wss::{queries::rtt::rtt_row::RttCircuitRow, send_response}; use crate::web::wss::{queries::rtt::rtt_row::RttCircuitRow, send_response};
use self::rtt_row::{RttRow, RttSiteRow}; use self::rtt_row::RttRow;
use super::time_period::InfluxTimePeriod; use super::time_period::InfluxTimePeriod;
mod rtt_row; mod rtt_row;
#[instrument(skip(cnn, socket, key, site_id, period))]
pub async fn send_rtt_for_all_nodes_site(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes_site(cnn, key, &site_id, period).await?;
let mut histogram = vec![0; 20];
for node in nodes.iter() {
for rtt in node.rtt.iter() {
let bucket = usize::min(19, (rtt.value / 200.0) as usize);
histogram[bucket] += 1;
}
}
send_response(socket, wasm_pipe_types::WasmResponse::RttChartSite { nodes, histogram }).await;
Ok(())
}
#[instrument(skip(cnn, socket, key, site_id, period))] #[instrument(skip(cnn, socket, key, site_id, period))]
pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> { pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes_circuit(cnn, key, &site_id, period).await?; let nodes = get_rtt_for_all_nodes_circuit(cnn, key, &site_id, period).await?;
@ -60,24 +46,6 @@ pub async fn send_rtt_for_node(cnn: &Pool<Postgres>, socket: &mut WebSocket, key
Ok(()) Ok(())
} }
pub async fn get_rtt_for_all_nodes_site(cnn: &Pool<Postgres>, key: &str, site_id: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<RttHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();
for node in node_status {
futures.push(get_rtt_for_node_site(
cnn,
key,
node.node_id.to_string(),
node.node_name.to_string(),
site_id.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<RttHost>> = join_all(futures).await
.into_iter().collect();
all_nodes
}
pub async fn get_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, key: &str, circuit_id: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<RttHost>> { pub async fn get_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, key: &str, circuit_id: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<RttHost>> {
let node_status = pgdb::node_status(cnn, key).await?; let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new(); let mut futures = Vec::new();
@ -152,65 +120,6 @@ pub async fn get_rtt_for_node(
Err(anyhow::Error::msg("Unable to query influx")) Err(anyhow::Error::msg("Unable to query influx"))
} }
pub async fn get_rtt_for_node_site(
cnn: &Pool<Postgres>,
key: &str,
node_id: String,
node_name: String,
site_id: String,
period: InfluxTimePeriod,
) -> anyhow::Result<RttHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"tree\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> filter(fn: (r) => r[\"node_name\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"rtt_avg\" or r[\"_field\"] == \"rtt_max\" or r[\"_field\"] == \"rtt_min\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, site_id, period.aggregate_window()
);
let query = Query::new(qs);
let rows = client.query::<RttSiteRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB (rtt node site): {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut rtt = Vec::new();
// Fill download
for row in rows.iter() {
rtt.push(Rtt {
value: row.rtt_avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.rtt_min,
u: row.rtt_max - row.rtt_min,
});
}
return Ok(RttHost{
node_id,
node_name,
rtt,
});
}
}
}
Err(anyhow::Error::msg("Unable to query influx"))
}
pub async fn get_rtt_for_node_circuit( pub async fn get_rtt_for_node_circuit(
cnn: &Pool<Postgres>, cnn: &Pool<Postgres>,
key: &str, key: &str,

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -17,6 +17,7 @@ export class RttChartSite implements Component {
} }
wireup(): void { wireup(): void {
request_rtt_chart_for_site(window.graphPeriod, this.siteId);
} }
ontick(): void { ontick(): void {

View File

@ -26,7 +26,7 @@ export class SitePage implements Page {
new SiteInfo(siteId), new SiteInfo(siteId),
new SiteBreadcrumbs(siteId), new SiteBreadcrumbs(siteId),
new ThroughputSiteChart(siteId), new ThroughputSiteChart(siteId),
//new RttChartSite(siteId), new RttChartSite(siteId),
//new RttHistoSite(), //new RttHistoSite(),
//new SiteHeat(siteId), //new SiteHeat(siteId),
//new SiteStackChart(siteId), //new SiteStackChart(siteId),

View File

@ -45,7 +45,7 @@ pub enum WasmResponse {
BitsChart { nodes: Vec<ThroughputHost> }, BitsChart { nodes: Vec<ThroughputHost> },
RttChart { nodes: Vec<RttHost> }, RttChart { nodes: Vec<RttHost> },
RttHistogram { histogram: Vec<u32> }, RttHistogram { histogram: Vec<u32> },
RttChartSite { nodes: Vec<RttHost>, histogram: Vec<u32> }, RttChartSite { nodes: Vec<RttHost> },
RttChartCircuit { nodes: Vec<RttHost>, histogram: Vec<u32> }, RttChartCircuit { nodes: Vec<RttHost>, histogram: Vec<u32> },
SiteStack { nodes: Vec<SiteStackHost> }, SiteStack { nodes: Vec<SiteStackHost> },
RootHeat { data: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>>}, RootHeat { data: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>>},