From c1295a64616f257fc3fe8c930918a616183421d9 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Wed, 10 May 2023 19:44:10 +0000 Subject: [PATCH] Add child heat maps to site and ap --- .../lts_node/src/web/wss/mod.rs | 16 +- .../lts_node/src/web/wss/queries/mod.rs | 2 +- .../src/web/wss/queries/site_heat_map.rs | 182 +++++++++++++++++- src/rust/long_term_stats/pgdb/src/tree.rs | 15 ++ .../long_term_stats/site_build/src/ap/ap.ts | 2 + .../long_term_stats/site_build/src/bus.ts | 10 + .../site_build/src/components/site_heat.ts | 104 ++++++++++ .../site_build/src/site/site.ts | 2 + 8 files changed, 329 insertions(+), 4 deletions(-) create mode 100644 src/rust/long_term_stats/site_build/src/components/site_heat.ts diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs b/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs index f3069260..dd2e32bf 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/mod.rs @@ -2,7 +2,7 @@ use crate::web::wss::queries::{ omnisearch, root_heat_map, send_packets_for_all_nodes, send_packets_for_node, send_perf_for_node, send_rtt_for_all_nodes, send_rtt_for_all_nodes_site, send_rtt_for_node, send_site_info, send_site_parents, send_throughput_for_all_nodes, - send_throughput_for_all_nodes_by_site, send_throughput_for_node, site_tree::send_site_tree, + send_throughput_for_all_nodes_by_site, send_throughput_for_node, site_tree::send_site_tree, site_heat_map, }; use axum::{ extract::{ @@ -224,6 +224,20 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool) { log::info!("Throughput requested but no credentials provided"); } } + "siteHeat" => { + if let Some(credentials) = &credentials { + let _ = site_heat_map( + cnn.clone(), + &mut socket, + &credentials.license_key, + json.get("site_id").unwrap().as_str().unwrap(), + period, + ) + .await; + } else { + log::info!("Throughput requested but no credentials provided"); + } + } "siteTree" => { if let Some(credentials) = &credentials { send_site_tree( diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs index 668eec02..7872ea07 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/mod.rs @@ -16,6 +16,6 @@ pub use throughput::{ send_throughput_for_all_nodes, send_throughput_for_node, s pub use rtt::{ send_rtt_for_all_nodes, send_rtt_for_node, send_rtt_for_all_nodes_site }; pub use node_perf::send_perf_for_node; pub use search::omnisearch; -pub use site_heat_map::root_heat_map; +pub use site_heat_map::{root_heat_map, site_heat_map}; pub use site_info::send_site_info; pub use site_parents::send_site_parents; \ No newline at end of file diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/site_heat_map.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/site_heat_map.rs index a50369af..9c12f89e 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/queries/site_heat_map.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/site_heat_map.rs @@ -1,9 +1,10 @@ use super::time_period::InfluxTimePeriod; use crate::submissions::get_org_details; -use axum::extract::ws::{WebSocket, Message}; +use axum::extract::ws::{Message, WebSocket}; use chrono::{DateTime, FixedOffset, Utc}; use influxdb2::Client; use influxdb2::{models::Query, FromDataPoint}; +use pgdb::OrganizationDetails; use pgdb::sqlx::{query, Pool, Postgres, Row}; use serde::Serialize; use std::collections::HashMap; @@ -50,7 +51,7 @@ pub async fn root_heat_map( host_filter, period.aggregate_window() ); - println!("{qs}"); + //println!("{qs}"); let query = Query::new(qs); let rows = client.query::(Some(query)).await; @@ -81,6 +82,166 @@ pub async fn root_heat_map( Ok(()) } +async fn site_circuits_heat_map( + cnn: Pool, + key: &str, + site_name: &str, + period: InfluxTimePeriod, + sorter: &mut HashMap, f64)>>, + client: Client, + org: &OrganizationDetails, +) -> anyhow::Result<()> { + // Get sites where parent=site_id (for this setup) + let hosts: Vec<(String, String)> = + query("SELECT DISTINCT circuit_id, circuit_name FROM shaped_devices WHERE key=$1 AND parent_node=$2") + .bind(key) + .bind(site_name) + .fetch_all(&cnn) + .await? + .iter() + .map(|row| (row.try_get("circuit_id").unwrap(), row.try_get("circuit_name").unwrap())) + .collect(); + + let mut circuit_map = HashMap::new(); + for (id, name) in hosts.iter() { + circuit_map.insert(id, name); + } + let hosts = hosts.iter().map(|(id, _)| id).collect::>(); + + let mut host_filter = "filter(fn: (r) => ".to_string(); + for host in hosts.iter() { + host_filter += &format!("r[\"circuit_id\"] == \"{host}\" or "); + } + host_filter = host_filter[0..host_filter.len() - 4].to_string(); + host_filter += ")"; + + // Query influx for RTT averages + let qs = format!( + "from(bucket: \"{}\") + |> {} + |> filter(fn: (r) => r[\"_measurement\"] == \"rtt\") + |> filter(fn: (r) => r[\"organization_id\"] == \"{}\") + |> filter(fn: (r) => r[\"_field\"] == \"avg\") + |> {} + |> {} + |> yield(name: \"last\")", + org.influx_bucket, + period.range(), + org.key, + host_filter, + period.aggregate_window() + ); + //println!("{qs}\n\n"); + + let query = Query::new(qs); + let rows = client.query::(Some(query)).await; + match rows { + Err(e) => { + tracing::error!("Error querying InfluxDB: {}", e); + return Err(anyhow::Error::msg("Unable to query influx")); + } + Ok(rows) => { + for row in rows.iter() { + if let Some(name) = circuit_map.get(&row.circuit_id) { + if let Some(hat) = sorter.get_mut(*name) { + hat.push((row.time, row.avg)); + } else { + sorter.insert(name.to_string(), vec![(row.time, row.avg)]); + } + } + } + } + } + + Ok(()) +} + +pub async fn site_heat_map( + cnn: Pool, + socket: &mut WebSocket, + key: &str, + site_name: &str, + period: InfluxTimePeriod, +) -> anyhow::Result<()> { + if let Some(org) = get_org_details(cnn.clone(), key).await { + let influx_url = format!("http://{}:8086", org.influx_host); + let client = Client::new(influx_url, &org.influx_org, &org.influx_token); + + // Get the site index + let site_id = pgdb::get_site_id_from_name(cnn.clone(), key, site_name).await?; + + // Get sites where parent=site_id (for this setup) + let hosts: Vec = + query("SELECT DISTINCT site_name FROM site_tree WHERE key=$1 AND parent=$2") + .bind(key) + .bind(site_id) + .fetch_all(&cnn) + .await? + .iter() + .map(|row| row.try_get("site_name").unwrap()) + .collect(); + + let mut host_filter = "filter(fn: (r) => ".to_string(); + for host in hosts.iter() { + host_filter += &format!("r[\"node_name\"] == \"{host}\" or "); + } + host_filter = host_filter[0..host_filter.len() - 4].to_string(); + host_filter += ")"; + + if host_filter.ends_with("(r))") { + host_filter = "filter(fn: (r) => r[\"node_name\"] == \"bad_sheep_no_data\")".to_string(); + } + + // Query influx for RTT averages + let qs = format!( + "from(bucket: \"{}\") + |> {} + |> filter(fn: (r) => r[\"_measurement\"] == \"tree\") + |> filter(fn: (r) => r[\"organization_id\"] == \"{}\") + |> filter(fn: (r) => r[\"_field\"] == \"rtt_avg\") + |> {} + |> {} + |> yield(name: \"last\")", + org.influx_bucket, + period.range(), + org.key, + host_filter, + period.aggregate_window() + ); + //println!("{qs}\n\n"); + + let query = Query::new(qs); + let rows = client.query::(Some(query)).await; + match rows { + Err(e) => { + tracing::error!("Error querying InfluxDB: {}", e); + return Err(anyhow::Error::msg("Unable to query influx")); + } + Ok(rows) => { + let mut sorter: HashMap, f64)>> = HashMap::new(); + for row in rows.iter() { + if let Some(hat) = sorter.get_mut(&row.node_name) { + hat.push((row.time, row.rtt_avg)); + } else { + sorter.insert(row.node_name.clone(), vec![(row.time, row.rtt_avg)]); + } + } + + site_circuits_heat_map(cnn, key, site_name, period, &mut sorter, client, &org).await?; + + let msg = HeatMessage { + msg: "siteHeat".to_string(), + data: sorter, + }; + let json = serde_json::to_string(&msg).unwrap(); + socket.send(Message::Text(json)).await.unwrap(); + } + } + } + + Ok(()) +} + #[derive(Serialize)] struct HeatMessage { msg: String, @@ -103,3 +264,20 @@ impl Default for HeatRow { } } } + +#[derive(Debug, FromDataPoint)] +pub struct HeatCircuitRow { + pub circuit_id: String, + pub avg: f64, + pub time: DateTime, +} + +impl Default for HeatCircuitRow { + fn default() -> Self { + Self { + circuit_id: "".to_string(), + avg: 0.0, + time: DateTime::::MIN_UTC.into(), + } + } +} diff --git a/src/rust/long_term_stats/pgdb/src/tree.rs b/src/rust/long_term_stats/pgdb/src/tree.rs index fc9a8530..17f6fb1d 100644 --- a/src/rust/long_term_stats/pgdb/src/tree.rs +++ b/src/rust/long_term_stats/pgdb/src/tree.rs @@ -40,6 +40,21 @@ pub async fn get_site_info( .map_err(|e| StatsHostError::DatabaseError(e.to_string())) } +pub async fn get_site_id_from_name( + cnn: Pool, + key: &str, + site_name: &str, +) -> Result { + let site_id_db = sqlx::query("SELECT index FROM site_tree WHERE key = $1 AND site_name=$2") + .bind(key) + .bind(site_name) + .fetch_one(&cnn) + .await + .map_err(|e| StatsHostError::DatabaseError(e.to_string()))?; + let site_id: i32 = site_id_db.try_get("index").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?; + Ok(site_id) +} + pub async fn get_parent_list( cnn: Pool, key: &str, diff --git a/src/rust/long_term_stats/site_build/src/ap/ap.ts b/src/rust/long_term_stats/site_build/src/ap/ap.ts index 92be3fdc..4c08f44d 100644 --- a/src/rust/long_term_stats/site_build/src/ap/ap.ts +++ b/src/rust/long_term_stats/site_build/src/ap/ap.ts @@ -7,6 +7,7 @@ import { SiteInfo } from '../components/site_info'; import { RttChartSite } from '../components/rtt_site'; import { RttHistoSite } from '../components/rtt_histo_site'; import { SiteBreadcrumbs } from '../components/site_breadcrumbs'; +import { SiteHeat } from '../components/site_heat'; export class AccessPointPage implements Page { menu: MenuPage; @@ -26,6 +27,7 @@ export class AccessPointPage implements Page { new RttChartSite(siteId), new RttHistoSite(), new SiteBreadcrumbs(siteId), + new SiteHeat(siteId), ]; } diff --git a/src/rust/long_term_stats/site_build/src/bus.ts b/src/rust/long_term_stats/site_build/src/bus.ts index 58028e01..da49cf09 100644 --- a/src/rust/long_term_stats/site_build/src/bus.ts +++ b/src/rust/long_term_stats/site_build/src/bus.ts @@ -139,6 +139,16 @@ export class Bus { this.ws.send("{ \"msg\": \"siteRootHeat\", \"period\": \"" + window.graphPeriod + "\" }"); } + requestSiteHeat(site_id: string) { + let request = { + msg: "siteHeat", + period: window.graphPeriod, + site_id: decodeURI(site_id), + }; + let json = JSON.stringify(request); + this.ws.send(json); + } + sendSearch(term: string) { let request = { msg: "search", diff --git a/src/rust/long_term_stats/site_build/src/components/site_heat.ts b/src/rust/long_term_stats/site_build/src/components/site_heat.ts new file mode 100644 index 00000000..06b78010 --- /dev/null +++ b/src/rust/long_term_stats/site_build/src/components/site_heat.ts @@ -0,0 +1,104 @@ +import { Component } from "./component"; +import * as echarts from 'echarts'; + +export class SiteHeat implements Component { + div: HTMLElement; + myChart: echarts.ECharts; + counter: number = 0; + siteId: string; + + constructor(siteId: string) { + this.siteId = siteId; + this.div = document.getElementById("rootHeat") as HTMLElement; + this.myChart = echarts.init(this.div); + this.myChart.showLoading(); + } + + wireup(): void { + window.bus.requestSiteHeat(this.siteId); + } + + ontick(): void { + this.counter++; + if (this.counter % 10 == 0) + window.bus.requestSiteHeat(this.siteId); + } + + onmessage(event: any): void { + if (event.msg == "siteHeat") { + this.myChart.hideLoading(); + + let categories: string[] = []; + let x: string[] = []; + let first: boolean = true; + let count = 0; + let data: any[] = []; + let keys: string[] = []; + for (const key in event.data) { + keys.push(key); + } + keys = keys.sort().reverse(); + console.log(keys); + + for (let j=0; j + } + }, + }; + this.myChart.setOption(option); + } + } +} \ No newline at end of file diff --git a/src/rust/long_term_stats/site_build/src/site/site.ts b/src/rust/long_term_stats/site_build/src/site/site.ts index 9b5bb7b1..2c81b8c8 100644 --- a/src/rust/long_term_stats/site_build/src/site/site.ts +++ b/src/rust/long_term_stats/site_build/src/site/site.ts @@ -7,6 +7,7 @@ import { SiteInfo } from '../components/site_info'; import { RttChartSite } from '../components/rtt_site'; import { RttHistoSite } from '../components/rtt_histo_site'; import { SiteBreadcrumbs } from '../components/site_breadcrumbs'; +import { SiteHeat } from '../components/site_heat'; export class SitePage implements Page { menu: MenuPage; @@ -26,6 +27,7 @@ export class SitePage implements Page { new RttChartSite(siteId), new RttHistoSite(), new SiteBreadcrumbs(siteId), + new SiteHeat(siteId), ]; }