Add child heat maps to site and ap

This commit is contained in:
Herbert Wolverson 2023-05-10 19:44:10 +00:00
parent 60224ba67b
commit c1295a6461
8 changed files with 329 additions and 4 deletions

View File

@ -2,7 +2,7 @@ use crate::web::wss::queries::{
omnisearch, root_heat_map, send_packets_for_all_nodes, send_packets_for_node,
send_perf_for_node, send_rtt_for_all_nodes, send_rtt_for_all_nodes_site, send_rtt_for_node,
send_site_info, send_site_parents, send_throughput_for_all_nodes,
send_throughput_for_all_nodes_by_site, send_throughput_for_node, site_tree::send_site_tree,
send_throughput_for_all_nodes_by_site, send_throughput_for_node, site_tree::send_site_tree, site_heat_map,
};
use axum::{
extract::{
@ -224,6 +224,20 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
log::info!("Throughput requested but no credentials provided");
}
}
"siteHeat" => {
if let Some(credentials) = &credentials {
let _ = site_heat_map(
cnn.clone(),
&mut socket,
&credentials.license_key,
json.get("site_id").unwrap().as_str().unwrap(),
period,
)
.await;
} else {
log::info!("Throughput requested but no credentials provided");
}
}
"siteTree" => {
if let Some(credentials) = &credentials {
send_site_tree(

View File

@ -16,6 +16,6 @@ pub use throughput::{ send_throughput_for_all_nodes, send_throughput_for_node, s
pub use rtt::{ send_rtt_for_all_nodes, send_rtt_for_node, send_rtt_for_all_nodes_site };
pub use node_perf::send_perf_for_node;
pub use search::omnisearch;
pub use site_heat_map::root_heat_map;
pub use site_heat_map::{root_heat_map, site_heat_map};
pub use site_info::send_site_info;
pub use site_parents::send_site_parents;

View File

@ -1,9 +1,10 @@
use super::time_period::InfluxTimePeriod;
use crate::submissions::get_org_details;
use axum::extract::ws::{WebSocket, Message};
use axum::extract::ws::{Message, WebSocket};
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::Client;
use influxdb2::{models::Query, FromDataPoint};
use pgdb::OrganizationDetails;
use pgdb::sqlx::{query, Pool, Postgres, Row};
use serde::Serialize;
use std::collections::HashMap;
@ -50,7 +51,7 @@ pub async fn root_heat_map(
host_filter,
period.aggregate_window()
);
println!("{qs}");
//println!("{qs}");
let query = Query::new(qs);
let rows = client.query::<HeatRow>(Some(query)).await;
@ -81,6 +82,166 @@ pub async fn root_heat_map(
Ok(())
}
async fn site_circuits_heat_map(
cnn: Pool<Postgres>,
key: &str,
site_name: &str,
period: InfluxTimePeriod,
sorter: &mut HashMap<String, Vec<(DateTime<FixedOffset>, f64)>>,
client: Client,
org: &OrganizationDetails,
) -> anyhow::Result<()> {
// Get sites where parent=site_id (for this setup)
let hosts: Vec<(String, String)> =
query("SELECT DISTINCT circuit_id, circuit_name FROM shaped_devices WHERE key=$1 AND parent_node=$2")
.bind(key)
.bind(site_name)
.fetch_all(&cnn)
.await?
.iter()
.map(|row| (row.try_get("circuit_id").unwrap(), row.try_get("circuit_name").unwrap()))
.collect();
let mut circuit_map = HashMap::new();
for (id, name) in hosts.iter() {
circuit_map.insert(id, name);
}
let hosts = hosts.iter().map(|(id, _)| id).collect::<Vec<_>>();
let mut host_filter = "filter(fn: (r) => ".to_string();
for host in hosts.iter() {
host_filter += &format!("r[\"circuit_id\"] == \"{host}\" or ");
}
host_filter = host_filter[0..host_filter.len() - 4].to_string();
host_filter += ")";
// Query influx for RTT averages
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"rtt\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"avg\")
|> {}
|> {}
|> yield(name: \"last\")",
org.influx_bucket,
period.range(),
org.key,
host_filter,
period.aggregate_window()
);
//println!("{qs}\n\n");
let query = Query::new(qs);
let rows = client.query::<HeatCircuitRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
for row in rows.iter() {
if let Some(name) = circuit_map.get(&row.circuit_id) {
if let Some(hat) = sorter.get_mut(*name) {
hat.push((row.time, row.avg));
} else {
sorter.insert(name.to_string(), vec![(row.time, row.avg)]);
}
}
}
}
}
Ok(())
}
pub async fn site_heat_map(
cnn: Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
site_name: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
if let Some(org) = get_org_details(cnn.clone(), key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
// Get the site index
let site_id = pgdb::get_site_id_from_name(cnn.clone(), key, site_name).await?;
// Get sites where parent=site_id (for this setup)
let hosts: Vec<String> =
query("SELECT DISTINCT site_name FROM site_tree WHERE key=$1 AND parent=$2")
.bind(key)
.bind(site_id)
.fetch_all(&cnn)
.await?
.iter()
.map(|row| row.try_get("site_name").unwrap())
.collect();
let mut host_filter = "filter(fn: (r) => ".to_string();
for host in hosts.iter() {
host_filter += &format!("r[\"node_name\"] == \"{host}\" or ");
}
host_filter = host_filter[0..host_filter.len() - 4].to_string();
host_filter += ")";
if host_filter.ends_with("(r))") {
host_filter = "filter(fn: (r) => r[\"node_name\"] == \"bad_sheep_no_data\")".to_string();
}
// Query influx for RTT averages
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"tree\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"rtt_avg\")
|> {}
|> {}
|> yield(name: \"last\")",
org.influx_bucket,
period.range(),
org.key,
host_filter,
period.aggregate_window()
);
//println!("{qs}\n\n");
let query = Query::new(qs);
let rows = client.query::<HeatRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
let mut sorter: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>> = HashMap::new();
for row in rows.iter() {
if let Some(hat) = sorter.get_mut(&row.node_name) {
hat.push((row.time, row.rtt_avg));
} else {
sorter.insert(row.node_name.clone(), vec![(row.time, row.rtt_avg)]);
}
}
site_circuits_heat_map(cnn, key, site_name, period, &mut sorter, client, &org).await?;
let msg = HeatMessage {
msg: "siteHeat".to_string(),
data: sorter,
};
let json = serde_json::to_string(&msg).unwrap();
socket.send(Message::Text(json)).await.unwrap();
}
}
}
Ok(())
}
#[derive(Serialize)]
struct HeatMessage {
msg: String,
@ -103,3 +264,20 @@ impl Default for HeatRow {
}
}
}
#[derive(Debug, FromDataPoint)]
pub struct HeatCircuitRow {
pub circuit_id: String,
pub avg: f64,
pub time: DateTime<FixedOffset>,
}
impl Default for HeatCircuitRow {
fn default() -> Self {
Self {
circuit_id: "".to_string(),
avg: 0.0,
time: DateTime::<Utc>::MIN_UTC.into(),
}
}
}

View File

@ -40,6 +40,21 @@ pub async fn get_site_info(
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))
}
pub async fn get_site_id_from_name(
cnn: Pool<Postgres>,
key: &str,
site_name: &str,
) -> Result<i32, StatsHostError> {
let site_id_db = sqlx::query("SELECT index FROM site_tree WHERE key = $1 AND site_name=$2")
.bind(key)
.bind(site_name)
.fetch_one(&cnn)
.await
.map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
let site_id: i32 = site_id_db.try_get("index").map_err(|e| StatsHostError::DatabaseError(e.to_string()))?;
Ok(site_id)
}
pub async fn get_parent_list(
cnn: Pool<Postgres>,
key: &str,

View File

@ -7,6 +7,7 @@ import { SiteInfo } from '../components/site_info';
import { RttChartSite } from '../components/rtt_site';
import { RttHistoSite } from '../components/rtt_histo_site';
import { SiteBreadcrumbs } from '../components/site_breadcrumbs';
import { SiteHeat } from '../components/site_heat';
export class AccessPointPage implements Page {
menu: MenuPage;
@ -26,6 +27,7 @@ export class AccessPointPage implements Page {
new RttChartSite(siteId),
new RttHistoSite(),
new SiteBreadcrumbs(siteId),
new SiteHeat(siteId),
];
}

View File

@ -139,6 +139,16 @@ export class Bus {
this.ws.send("{ \"msg\": \"siteRootHeat\", \"period\": \"" + window.graphPeriod + "\" }");
}
requestSiteHeat(site_id: string) {
let request = {
msg: "siteHeat",
period: window.graphPeriod,
site_id: decodeURI(site_id),
};
let json = JSON.stringify(request);
this.ws.send(json);
}
sendSearch(term: string) {
let request = {
msg: "search",

View File

@ -0,0 +1,104 @@
import { Component } from "./component";
import * as echarts from 'echarts';
export class SiteHeat implements Component {
div: HTMLElement;
myChart: echarts.ECharts;
counter: number = 0;
siteId: string;
constructor(siteId: string) {
this.siteId = siteId;
this.div = document.getElementById("rootHeat") as HTMLElement;
this.myChart = echarts.init(this.div);
this.myChart.showLoading();
}
wireup(): void {
window.bus.requestSiteHeat(this.siteId);
}
ontick(): void {
this.counter++;
if (this.counter % 10 == 0)
window.bus.requestSiteHeat(this.siteId);
}
onmessage(event: any): void {
if (event.msg == "siteHeat") {
this.myChart.hideLoading();
let categories: string[] = [];
let x: string[] = [];
let first: boolean = true;
let count = 0;
let data: any[] = [];
let keys: string[] = [];
for (const key in event.data) {
keys.push(key);
}
keys = keys.sort().reverse();
console.log(keys);
for (let j=0; j<keys.length; j++) {
let key = keys[j];
categories.push(key);
// Push the X axis values
if (first) {
first = false;
for (let i=0; i<event.data[key].length; i++) {
x.push(event.data[key][i][0]);
}
}
// Create all the series entries for this category
for (let i=0; i<event.data[key].length; i++) {
data.push([i, count, event.data[key][i][1].toFixed(1)]);
}
count++;
}
let series: any[] = [];
let i = 0;
series.push({
name: categories[i],
type: 'heatmap',
data: data,
label: { show: true },
emphasis: {
itemStyle: {
shadowBlur: 10,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
})
//console.log(series);
let option = {
title: { text: "TCP Round-Trip Time by Site" },
tooltip: {
show: false,
},
grid: { height: '50%', top: '10%' },
xAxis: { type: 'category', data: x, splitArea: { show: true } },
yAxis: { type: 'category', data: categories, splitArea: { show: true } },
series: series,
visualMap: {
min: 0,
max: 200,
calculable: true,
type: 'continuous',
orient: 'horizontal',
left: 'center',
top: '2%',
inRange : {
color: ['#009000', 'yellow', '#DD2000' ] //From smaller to bigger value ->
}
},
};
this.myChart.setOption(option);
}
}
}

View File

@ -7,6 +7,7 @@ import { SiteInfo } from '../components/site_info';
import { RttChartSite } from '../components/rtt_site';
import { RttHistoSite } from '../components/rtt_histo_site';
import { SiteBreadcrumbs } from '../components/site_breadcrumbs';
import { SiteHeat } from '../components/site_heat';
export class SitePage implements Page {
menu: MenuPage;
@ -26,6 +27,7 @@ export class SitePage implements Page {
new RttChartSite(siteId),
new RttHistoSite(),
new SiteBreadcrumbs(siteId),
new SiteHeat(siteId),
];
}