Checkpoint before I rearrange some things for efficiency.

This commit is contained in:
Herbert Wolverson 2023-05-01 13:22:30 +00:00
parent 72424fd433
commit ac304b436b
7 changed files with 157 additions and 4 deletions

View File

@ -1,5 +1,5 @@
use crate::web::wss::queries::{
send_packets_for_all_nodes, send_rtt_for_all_nodes, send_throughput_for_all_nodes, send_packets_for_node, send_throughput_for_node, send_rtt_for_node, send_perf_for_node, omnisearch,
send_packets_for_all_nodes, send_rtt_for_all_nodes, send_throughput_for_all_nodes, send_packets_for_node, send_throughput_for_node, send_rtt_for_node, send_perf_for_node, omnisearch, root_heat_map,
};
use axum::{
extract::{
@ -179,6 +179,19 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
).await;
}
}
"siteRootHeat" => {
if let Some(credentials) = &credentials {
let _ = root_heat_map(
cnn.clone(),
&mut socket,
&credentials.license_key,
period,
)
.await;
} else {
log::info!("Throughput requested but no credentials provided");
}
}
_ => {
log::warn!("Unknown message type: {msg_type}");
}

View File

@ -6,9 +6,11 @@ mod throughput;
mod rtt;
mod node_perf;
mod search;
mod site_heat_map;
pub mod time_period;
pub use packet_counts::{ send_packets_for_all_nodes, send_packets_for_node };
pub use throughput::{ send_throughput_for_all_nodes, send_throughput_for_node };
pub use rtt::{ send_rtt_for_all_nodes, send_rtt_for_node };
pub use node_perf::send_perf_for_node;
pub use search::omnisearch;
pub use site_heat_map::root_heat_map;

View File

@ -0,0 +1,105 @@
use super::time_period::InfluxTimePeriod;
use crate::submissions::get_org_details;
use axum::extract::ws::{WebSocket, Message};
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::Client;
use influxdb2::{models::Query, FromDataPoint};
use pgdb::sqlx::{query, Pool, Postgres, Row};
use serde::Serialize;
use std::collections::HashMap;
pub async fn root_heat_map(
cnn: Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
if let Some(org) = get_org_details(cnn.clone(), key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
// Get sites where parent=0 (for this setup)
let hosts: Vec<String> = query("SELECT DISTINCT site_name FROM site_tree WHERE key=$1 AND parent=0 AND site_type='site'")
.bind(key)
.fetch_all(&cnn)
.await?
.iter()
.map(|row| row.try_get("site_name").unwrap())
.collect();
let mut host_filter = "filter(fn: (r) => ".to_string();
for host in hosts.iter() {
host_filter += &format!("r[\"node_name\"] == \"{host}\" or ");
}
host_filter = host_filter[0..host_filter.len() - 4].to_string();
host_filter += ")";
// Query influx for RTT averages
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"tree\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"rtt_avg\")
|> {}
|> {}
|> yield(name: \"last\")",
org.influx_bucket,
period.range(),
org.key,
host_filter,
period.aggregate_window()
);
println!("{qs}");
let query = Query::new(qs);
let rows = client.query::<HeatRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
let mut sorter: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>> = HashMap::new();
for row in rows.iter() {
if let Some(hat) = sorter.get_mut(&row.node_name) {
hat.push((row.time, row.rtt_avg));
} else {
sorter.insert(row.node_name.clone(), vec![(row.time, row.rtt_avg)]);
}
}
let msg = HeatMessage {
msg: "rootHeat".to_string(),
data: sorter,
};
let json = serde_json::to_string(&msg).unwrap();
socket.send(Message::Text(json)).await.unwrap();
}
}
}
Ok(())
}
#[derive(Serialize)]
struct HeatMessage {
msg: String,
data: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>>,
}
#[derive(Debug, FromDataPoint)]
pub struct HeatRow {
pub node_name: String,
pub rtt_avg: f64,
pub time: DateTime<FixedOffset>,
}
impl Default for HeatRow {
fn default() -> Self {
Self {
node_name: "".to_string(),
rtt_avg: 0.0,
time: DateTime::<Utc>::MIN_UTC.into(),
}
}
}

View File

@ -115,6 +115,10 @@ export class Bus {
this.ws.send(json);
}
requestSiteRootHeat() {
this.ws.send("{ \"msg\": \"siteRootHeat\", \"period\": \"" + window.graphPeriod + "\" }");
}
sendSearch(term: string) {
let request = {
msg: "search",

View File

@ -0,0 +1,17 @@
import { Component } from "./component";
export class RootHeat implements Component {
wireup(): void {
}
ontick(): void {
window.bus.requestSiteRootHeat();
}
onmessage(event: any): void {
if (event.msg == "rootHeat") {
console.log(event);
}
}
}

View File

@ -7,6 +7,7 @@ import { PacketsChart } from '../components/packets';
import { ThroughputChart } from '../components/throughput';
import { RttChart } from '../components/rtt_graph';
import { RttHisto } from '../components/rtt_histo';
import { RootHeat } from '../components/root_heat';
export class DashboardPage implements Page {
menu: MenuPage;
@ -24,6 +25,7 @@ export class DashboardPage implements Page {
new ThroughputChart(),
new RttChart(),
new RttHisto(),
new RootHeat(),
];
}

View File

@ -39,4 +39,14 @@
</div>
</div>
</div>
<div class="row">
<div class="col-12">
<div class="card">
<div class="card-body">
<div id="rootHeat" style="height: 250px"></div>
</div>
</div>
</div>
</div>
</div>