No more naked queries - all influx queries use a builder

This commit is contained in:
Herbert Wolverson
2023-08-11 16:42:32 +00:00
parent 0ea3838f38
commit 19cf79b1b2
7 changed files with 394 additions and 406 deletions

View File

@@ -364,7 +364,7 @@ async fn handle_socket_message(
tx,
&credentials.license_key,
device_id,
InfluxTimePeriod::new(period),
&InfluxTimePeriod::new(period),
)
.await;
}
@@ -374,7 +374,7 @@ async fn handle_socket_message(
tx,
&credentials.license_key,
device_id,
InfluxTimePeriod::new(period),
&InfluxTimePeriod::new(period),
)
.await;
}

View File

@@ -1,11 +1,11 @@
use std::collections::HashSet;
use axum::extract::ws::WebSocket;
use chrono::{DateTime, FixedOffset};
use influxdb2::{FromDataPoint, models::Query, Client};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use influxdb2::FromDataPoint;
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::WasmResponse;
use super::influx::InfluxTimePeriod;
use wasm_pipe_types::{WasmResponse, SignalNoiseChartExt, CapacityChartExt};
use super::{influx::InfluxTimePeriod, QueryBuilder};
#[tracing::instrument(skip(cnn, tx, key, circuit_id))]
pub async fn send_extended_device_info(
@@ -73,39 +73,29 @@ pub async fn send_extended_device_snr_graph(
tx: Sender<WasmResponse>,
key: &str,
device_id: &str,
period: InfluxTimePeriod,
period: &InfluxTimePeriod,
) -> anyhow::Result<()> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"device_ext\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"device_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"noise_floor\" or r[\"_field\"] == \"rx_signal\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, device_id, period.aggregate_window()
);
//println!("{qs}");
let query = Query::new(qs);
let rows = client.query::<SnrRow>(Some(query)).await?;
let mut sn = Vec::new();
rows.iter().for_each(|row| {
let snr = wasm_pipe_types::SignalNoiseChartExt {
let rows = QueryBuilder::new()
.with_period(period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("device_ext", &["noise_floor", "rx_signal"])
.filter(&format!("r[\"device_id\"] == \"{}\"", device_id))
.aggregate_window()
.execute::<SnrRow>()
.await?
.into_iter()
.map(|row| {
wasm_pipe_types::SignalNoiseChartExt {
noise: row.noise_floor,
signal: row.rx_signal,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
};
sn.push(snr);
});
tx.send(WasmResponse::DeviceExtSnr { data: sn, device_id: device_id.to_string() }).await?;
}
}
})
.collect::<Vec<SignalNoiseChartExt>>();
tx.send(WasmResponse::DeviceExtSnr { data: rows, device_id: device_id.to_string() }).await?;
Ok(())
}
@@ -138,39 +128,29 @@ pub async fn send_extended_device_capacity_graph(
tx: Sender<WasmResponse>,
key: &str,
device_id: &str,
period: InfluxTimePeriod,
period: &InfluxTimePeriod,
) -> anyhow::Result<()> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"device_ext\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"device_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"dl_capacity\" or r[\"_field\"] == \"ul_capacity\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, device_id, period.aggregate_window()
);
//println!("{qs}");
let query = Query::new(qs);
let rows = client.query::<CapacityRow>(Some(query)).await?;
let mut sn = Vec::new();
rows.iter().for_each(|row| {
let snr = wasm_pipe_types::CapacityChartExt {
let rows = QueryBuilder::new()
.with_period(period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("device_ext", &["dl_capacity", "ul_capacity"])
.filter(&format!("r[\"device_id\"] == \"{}\"", device_id))
.aggregate_window()
.execute::<CapacityRow>()
.await?
.into_iter()
.map(|row| {
wasm_pipe_types::CapacityChartExt {
dl: row.dl_capacity,
ul: row.ul_capacity,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
};
sn.push(snr);
});
tx.send(WasmResponse::DeviceExtCapacity { data: sn, device_id: device_id.to_string() }).await?;
}
}
})
.collect::<Vec<CapacityChartExt>>();
tx.send(WasmResponse::DeviceExtCapacity { data: rows, device_id: device_id.to_string() }).await?;
Ok(())
}

View File

@@ -1,7 +1,11 @@
use influxdb2::{Client, models::Query};
use influxdb2_structmap::FromMap;
use pgdb::{sqlx::{Pool, Postgres}, OrganizationDetails, organization_cache::get_org_details};
use super::InfluxTimePeriod;
use influxdb2::{models::Query, Client};
use influxdb2_structmap::FromMap;
use pgdb::{
organization_cache::get_org_details,
sqlx::{Pool, Postgres},
OrganizationDetails,
};
pub struct QueryBuilder<'a> {
lines: Vec<String>,
@@ -10,13 +14,13 @@ pub struct QueryBuilder<'a> {
}
#[allow(dead_code)]
impl <'a> QueryBuilder <'a> {
impl<'a> QueryBuilder<'a> {
/// Construct a new, completely empty query.
pub fn new() -> Self {
Self {
lines: Vec::new(),
period: None,
org: None,
org: None,
}
}
@@ -50,7 +54,8 @@ impl <'a> QueryBuilder <'a> {
pub fn bucket(mut self) -> Self {
if let Some(org) = &self.org {
self.lines.push(format!("from(bucket: \"{}\")", org.influx_bucket));
self.lines
.push(format!("from(bucket: \"{}\")", org.influx_bucket));
} else {
tracing::warn!("No organization in query, cannot add bucket");
}
@@ -75,7 +80,8 @@ impl <'a> QueryBuilder <'a> {
pub fn filter_and(mut self, filters: &[&str]) -> Self {
let all_filters = filters.join(" and ");
self.lines.push(format!("|> filter(fn: (r) => {})", all_filters));
self.lines
.push(format!("|> filter(fn: (r) => {})", all_filters));
self
}
@@ -88,6 +94,19 @@ impl <'a> QueryBuilder <'a> {
self
}
pub fn measure_fields_org(mut self, measurement: &str, fields: &[&str]) -> Self {
if let Some(org) = &self.org {
let mut filters = Vec::new();
for field in fields.iter() {
filters.push(format!("r[\"_field\"] == \"{}\"", field));
}
self.lines.push(format!("|> filter(fn: (r) => r[\"_measurement\"] == \"{}\" and r[\"organization_id\"] == \"{}\" and ({}))", measurement, org.key, filters.join(" or ")));
} else {
tracing::warn!("No organization in query, cannot add measure_field_org");
}
self
}
pub fn aggregate_window(mut self) -> Self {
if let Some(period) = &self.period {
self.lines.push(format!("|> {}", period.aggregate_window()));
@@ -99,12 +118,22 @@ impl <'a> QueryBuilder <'a> {
pub fn group(mut self, columns: &[&str]) -> Self {
let group_by = columns.join(", ");
self.lines.push(format!("|> group(columns: [\"{}\"])", group_by));
self.lines
.push(format!("|> group(columns: [\"{}\"])", group_by));
self
}
pub fn with_host_id(mut self, host_id: &str) -> Self {
self.lines.push(format!(
"|> filter(fn: (r) => r[\"host_id\"] == \"{}\")",
host_id
));
self
}
pub async fn execute<T>(&self) -> anyhow::Result<Vec<T>>
where T: FromMap + std::fmt::Debug
where
T: FromMap + std::fmt::Debug,
{
let qs = self.lines.join("\n");
tracing::info!("Query:\n{}", qs);
@@ -123,4 +152,4 @@ impl <'a> QueryBuilder <'a> {
anyhow::bail!("No organization in query, cannot execute");
}
}
}
}

View File

@@ -1,10 +1,9 @@
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::{Client, FromDataPoint, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use influxdb2::FromDataPoint;
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::{PerfHost, Perf, WasmResponse};
use super::influx::InfluxTimePeriod;
use wasm_pipe_types::{Perf, PerfHost, WasmResponse};
use super::{influx::InfluxTimePeriod, QueryBuilder};
#[derive(Debug, FromDataPoint)]
pub struct PerfRow {
@@ -35,8 +34,9 @@ pub async fn send_perf_for_node(
node_id: String,
node_name: String,
) -> anyhow::Result<()> {
let node = get_perf_for_node(cnn, key, node_id, node_name, period).await?;
tx.send(WasmResponse::NodePerfChart { nodes: vec![node] }).await?;
let node = get_perf_for_node(cnn, key, node_id, node_name, &period).await?;
tx.send(WasmResponse::NodePerfChart { nodes: vec![node] })
.await?;
Ok(())
}
@@ -45,53 +45,35 @@ pub async fn get_perf_for_node(
key: &str,
node_id: String,
node_name: String,
period: InfluxTimePeriod,
period: &InfluxTimePeriod,
) -> anyhow::Result<PerfHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let rows = QueryBuilder::new()
.with_period(period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("perf", &["cpu", "cpu_max", "ram"])
.filter(&format!("r[\"host_id\"] == \"{}\"", node_id))
.aggregate_window()
.execute::<PerfRow>()
.await?;
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"perf\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, period.aggregate_window()
);
let mut stats = Vec::new();
let query = Query::new(qs);
let rows = client.query::<PerfRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB (node-perf): {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut stats = Vec::new();
// Fill download
for row in rows.iter() {
stats.push(Perf {
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
cpu: row.cpu,
cpu_max: row.cpu_max,
ram: row.ram,
});
}
return Ok(PerfHost{
node_id,
node_name,
stats,
});
}
}
// Fill download
for row in rows.iter() {
stats.push(Perf {
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
cpu: row.cpu,
cpu_max: row.cpu_max,
ram: row.ram,
});
}
Err(anyhow::Error::msg("Unable to query influx"))
Ok(PerfHost {
node_id,
node_name,
stats,
})
}

View File

@@ -5,7 +5,7 @@ use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{PacketHost, Packets, WasmResponse};
use super::influx::{InfluxTimePeriod, InfluxQueryBuilder};
use super::{influx::{InfluxTimePeriod, InfluxQueryBuilder}, QueryBuilder};
fn add_by_direction(direction: &str, down: &mut Vec<Packets>, up: &mut Vec<Packets>, row: &PacketRow) {
match direction {
@@ -102,15 +102,20 @@ pub async fn get_packets_for_node(
node_name: String,
period: InfluxTimePeriod,
) -> anyhow::Result<PacketHost> {
let rows = InfluxQueryBuilder::new(period.clone())
.with_measurement("packets")
let rows = QueryBuilder::new()
.with_period(&period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("packets", &["min", "max", "avg"])
.with_host_id(&node_id)
.with_field("min")
.with_field("max")
.with_field("avg")
.execute::<PacketRow>(cnn, key)
.aggregate_window()
.execute::<PacketRow>()
.await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB (packets by node): {}", e);

View File

@@ -3,19 +3,28 @@ pub use per_node::*;
mod per_site;
pub use per_site::*;
use self::rtt_row::RttRow;
use super::{influx::InfluxTimePeriod, QueryBuilder};
use crate::web::wss::queries::rtt::rtt_row::RttCircuitRow;
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use influxdb2::{models::Query, Client};
use pgdb::{
organization_cache::get_org_details,
sqlx::{Pool, Postgres},
};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{RttHost, Rtt, WasmResponse};
use crate::web::wss::queries::rtt::rtt_row::RttCircuitRow;
use self::rtt_row::RttRow;
use super::influx::InfluxTimePeriod;
use wasm_pipe_types::{Rtt, RttHost, WasmResponse};
mod rtt_row;
#[instrument(skip(cnn, tx, key, site_id, period))]
pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
pub async fn send_rtt_for_all_nodes_circuit(
cnn: &Pool<Postgres>,
tx: Sender<WasmResponse>,
key: &str,
site_id: String,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes_circuit(cnn, key, &site_id, period).await?;
let mut histogram = vec![0; 20];
@@ -26,27 +35,32 @@ pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, tx: Sender<Was
}
}
tx.send(WasmResponse::RttChartCircuit { nodes, histogram }).await?;
tx.send(WasmResponse::RttChartCircuit { nodes, histogram })
.await?;
Ok(())
}
pub async fn send_rtt_for_node(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, period: InfluxTimePeriod, node_id: String, node_name: String) -> anyhow::Result<()> {
let node = get_rtt_for_node(cnn, key, node_id, node_name, period).await?;
pub async fn send_rtt_for_node(
cnn: &Pool<Postgres>,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
node_id: String,
node_name: String,
) -> anyhow::Result<()> {
let node = get_rtt_for_node(cnn, key, node_id, node_name, &period).await?;
let nodes = vec![node];
/*let mut histogram = vec![0; 20];
for node in nodes.iter() {
for rtt in node.rtt.iter() {
let bucket = usize::min(19, (rtt.value / 200.0) as usize);
histogram[bucket] += 1;
}
}*/
tx.send(WasmResponse::RttChart { nodes }).await?;
Ok(())
}
pub async fn get_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, key: &str, circuit_id: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<RttHost>> {
pub async fn get_rtt_for_all_nodes_circuit(
cnn: &Pool<Postgres>,
key: &str,
circuit_id: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<Vec<RttHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();
for node in node_status {
@@ -56,11 +70,10 @@ pub async fn get_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, key: &str, circ
node.node_id.to_string(),
node.node_name.to_string(),
circuit_id.to_string(),
period.clone(),
&period,
));
}
let all_nodes: anyhow::Result<Vec<RttHost>> = join_all(futures).await
.into_iter().collect();
let all_nodes: anyhow::Result<Vec<RttHost>> = join_all(futures).await.into_iter().collect();
all_nodes
}
@@ -69,55 +82,37 @@ pub async fn get_rtt_for_node(
key: &str,
node_id: String,
node_name: String,
period: InfluxTimePeriod,
period: &InfluxTimePeriod,
) -> anyhow::Result<RttHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let rows = QueryBuilder::new()
.with_period(period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("rtt", &["avg", "min", "max"])
.filter(&format!("r[\"host_id\"] == \"{}\"", node_id))
.aggregate_window()
.execute::<RttRow>()
.await?;
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"rtt\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, period.aggregate_window()
);
let mut rtt = Vec::new();
let query = Query::new(qs);
let rows = client.query::<RttRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB (rtt node): {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut rtt = Vec::new();
// Fill RTT
for row in rows.iter() {
rtt.push(Rtt {
value: f64::min(200.0, row.avg),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: f64::min(200.0, row.min),
u: f64::min(200.0, row.max) - f64::min(200.0, row.min),
});
}
return Ok(RttHost{
node_id,
node_name,
rtt,
});
}
}
// Fill RTT
for row in rows.iter() {
rtt.push(Rtt {
value: f64::min(200.0, row.avg),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: f64::min(200.0, row.min),
u: f64::min(200.0, row.max) - f64::min(200.0, row.min),
});
}
Err(anyhow::Error::msg("Unable to query influx"))
Ok(RttHost {
node_id,
node_name,
rtt,
})
}
pub async fn get_rtt_for_node_circuit(
@@ -126,55 +121,36 @@ pub async fn get_rtt_for_node_circuit(
node_id: String,
node_name: String,
circuit_id: String,
period: InfluxTimePeriod,
period: &InfluxTimePeriod,
) -> anyhow::Result<RttHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let rows = QueryBuilder::new()
.with_period(period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("rtt", &["avg", "min", "max"])
.filter(&format!("r[\"host_id\"] == \"{}\"", node_id))
.filter(&format!("r[\"circuit_id\"] == \"{}\"", circuit_id))
.aggregate_window()
.execute::<RttCircuitRow>()
.await?;
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"rtt\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> filter(fn: (r) => r[\"circuit_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"avg\" or r[\"_field\"] == \"max\" or r[\"_field\"] == \"min\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, circuit_id, period.aggregate_window()
);
//log::warn!("{qs}");
let query = Query::new(qs);
let rows = client.query::<RttCircuitRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB (rtt_node_circuit): {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut rtt = Vec::new();
let mut rtt = Vec::new();
// Fill download
for row in rows.iter() {
rtt.push(Rtt {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
return Ok(RttHost{
node_id,
node_name,
rtt,
});
}
}
// Fill download
for row in rows.iter() {
rtt.push(Rtt {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
Err(anyhow::Error::msg("Unable to query influx"))
Ok(RttHost {
node_id,
node_name,
rtt,
})
}

View File

@@ -1,17 +1,24 @@
use std::collections::HashMap;
mod site_stack;
use self::throughput_row::{ThroughputRow, ThroughputRowByCircuit, ThroughputRowBySite};
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{ThroughputHost, Throughput, WasmResponse};
use self::throughput_row::{ThroughputRow, ThroughputRowBySite, ThroughputRowByCircuit};
use wasm_pipe_types::{Throughput, ThroughputHost, WasmResponse};
mod throughput_row;
use super::{
influx::{InfluxQueryBuilder, InfluxTimePeriod},
QueryBuilder,
};
pub use site_stack::send_site_stack_map;
use super::influx::{InfluxQueryBuilder, InfluxTimePeriod};
fn add_by_direction(direction: &str, down: &mut Vec<Throughput>, up: &mut Vec<Throughput>, row: &ThroughputRow) {
fn add_by_direction(
direction: &str,
down: &mut Vec<Throughput>,
up: &mut Vec<Throughput>,
row: &ThroughputRow,
) {
match direction {
"down" => {
down.push(Throughput {
@@ -33,7 +40,12 @@ fn add_by_direction(direction: &str, down: &mut Vec<Throughput>, up: &mut Vec<Th
}
}
fn add_by_direction_site(direction: &str, down: &mut Vec<Throughput>, up: &mut Vec<Throughput>, row: &ThroughputRowBySite) {
fn add_by_direction_site(
direction: &str,
down: &mut Vec<Throughput>,
up: &mut Vec<Throughput>,
row: &ThroughputRowBySite,
) {
match direction {
"down" => {
down.push(Throughput {
@@ -56,7 +68,12 @@ fn add_by_direction_site(direction: &str, down: &mut Vec<Throughput>, up: &mut V
}
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_throughput_for_all_nodes(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
pub async fn send_throughput_for_all_nodes(
cnn: &Pool<Postgres>,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut nodes = Vec::<ThroughputHost>::new();
InfluxQueryBuilder::new(period.clone())
@@ -75,21 +92,33 @@ pub async fn send_throughput_for_all_nodes(cnn: &Pool<Postgres>, tx: Sender<Wasm
add_by_direction(&row.direction, &mut down, &mut up, &row);
let node_name = if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) {
node.node_name.clone()
} else {
row.host_id.clone()
};
let node_name =
if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) {
node.node_name.clone()
} else {
row.host_id.clone()
};
nodes.push(ThroughputHost { node_id: row.host_id, node_name, down, up });
nodes.push(ThroughputHost {
node_id: row.host_id,
node_name,
down,
up,
});
}
});
tx.send(WasmResponse::BitsChart { nodes }).await?;
Ok(())
Ok(())
}
#[instrument(skip(cnn, tx, key, period, site_name))]
pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, site_name: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
pub async fn send_throughput_for_all_nodes_by_site(
cnn: &Pool<Postgres>,
tx: Sender<WasmResponse>,
key: &str,
site_name: String,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut nodes = Vec::<ThroughputHost>::new();
InfluxQueryBuilder::new(period.clone())
@@ -109,13 +138,19 @@ pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, tx: Sen
add_by_direction_site(&row.direction, &mut down, &mut up, &row);
let node_name = if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) {
node.node_name.clone()
} else {
row.host_id.clone()
};
let node_name =
if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) {
node.node_name.clone()
} else {
row.host_id.clone()
};
nodes.push(ThroughputHost { node_id: row.host_id, node_name, down, up });
nodes.push(ThroughputHost {
node_id: row.host_id,
node_name,
down,
up,
});
}
});
tx.send(WasmResponse::BitsChart { nodes }).await?;
@@ -130,19 +165,38 @@ pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, socket:
Ok(())
}*/
pub async fn send_throughput_for_all_nodes_by_circuit(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, circuit_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
pub async fn send_throughput_for_all_nodes_by_circuit(
cnn: &Pool<Postgres>,
tx: Sender<WasmResponse>,
key: &str,
circuit_id: String,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes_by_circuit(cnn, key, period, &circuit_id).await?;
tx.send(WasmResponse::BitsChart { nodes }).await?;
Ok(())
}
pub async fn send_throughput_for_node(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, period: InfluxTimePeriod, node_id: String, node_name: String) -> anyhow::Result<()> {
let node = get_throughput_for_node(cnn, key, node_id, node_name, period).await?;
tx.send(WasmResponse::BitsChart { nodes: vec![node] }).await?;
pub async fn send_throughput_for_node(
cnn: &Pool<Postgres>,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
node_id: String,
node_name: String,
) -> anyhow::Result<()> {
let node = get_throughput_for_node(cnn, key, node_id, node_name, &period).await?;
tx.send(WasmResponse::BitsChart { nodes: vec![node] })
.await?;
Ok(())
}
pub async fn get_throughput_for_all_nodes_by_circuit(cnn: &Pool<Postgres>, key: &str, period: InfluxTimePeriod, circuit_id: &str) -> anyhow::Result<Vec<ThroughputHost>> {
pub async fn get_throughput_for_all_nodes_by_circuit(
cnn: &Pool<Postgres>,
key: &str,
period: InfluxTimePeriod,
circuit_id: &str,
) -> anyhow::Result<Vec<ThroughputHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();
for node in node_status {
@@ -152,7 +206,7 @@ pub async fn get_throughput_for_all_nodes_by_circuit(cnn: &Pool<Postgres>, key:
node.node_id.to_string(),
node.node_name.to_string(),
circuit_id.to_string(),
period.clone(),
&period,
));
}
let mut all_nodes = Vec::new();
@@ -167,67 +221,48 @@ pub async fn get_throughput_for_node(
key: &str,
node_id: String,
node_name: String,
period: InfluxTimePeriod,
period: &InfluxTimePeriod,
) -> anyhow::Result<ThroughputHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let rows = QueryBuilder::new()
.with_period(period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("bits", &["avg", "min", "max"])
.aggregate_window()
.execute::<ThroughputRow>()
.await?;
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"bits\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, period.aggregate_window()
);
let mut down = Vec::new();
let mut up = Vec::new();
let query = Query::new(qs);
let rows = client.query::<ThroughputRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB (throughput node): {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut down = Vec::new();
let mut up = Vec::new();
// Fill download
for row in rows.iter().filter(|r| r.direction == "down") {
down.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
// Fill upload
for row in rows.iter().filter(|r| r.direction == "up") {
up.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
return Ok(ThroughputHost{
node_id,
node_name,
down,
up,
});
}
}
// Fill download
for row in rows.iter().filter(|r| r.direction == "down") {
down.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
Err(anyhow::Error::msg("Unable to query influx"))
// Fill upload
for row in rows.iter().filter(|r| r.direction == "up") {
up.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
Ok(ThroughputHost {
node_id,
node_name,
down,
up,
})
}
pub async fn get_throughput_for_node_by_circuit(
@@ -236,83 +271,64 @@ pub async fn get_throughput_for_node_by_circuit(
node_id: String,
node_name: String,
circuit_id: String,
period: InfluxTimePeriod,
period: &InfluxTimePeriod,
) -> anyhow::Result<Vec<ThroughputHost>> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let rows = QueryBuilder::new()
.with_period(period)
.derive_org(cnn, key)
.await
.bucket()
.range()
.measure_fields_org("host_bits", &["avg", "min", "max"])
.with_host_id(&node_id)
.filter(&format!("r[\"circuit_id\"] == \"{}\"", circuit_id))
.aggregate_window()
.execute::<ThroughputRowByCircuit>()
.await?;
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"host_bits\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> filter(fn: (r) => r[\"circuit_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"avg\" or r[\"_field\"] == \"max\" or r[\"_field\"] == \"min\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, circuit_id, period.aggregate_window()
);
let mut sorter: HashMap<String, (Vec<Throughput>, Vec<Throughput>)> =
HashMap::new();
let query = Query::new(qs);
//println!("{:?}", query);
let rows = client.query::<ThroughputRowByCircuit>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!(" (throughput circuit): {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut sorter: HashMap<String, (Vec<Throughput>, Vec<Throughput>)> = HashMap::new();
// Fill download
for row in rows.iter().filter(|r| r.direction == "down") {
let tp = Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
};
if let Some(hat) = sorter.get_mut(&row.ip) {
hat.0.push(tp);
} else {
sorter.insert(row.ip.clone(), (vec![tp], Vec::new()));
}
}
// Fill upload
for row in rows.iter().filter(|r| r.direction == "up") {
let tp = Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
};
if let Some(hat) = sorter.get_mut(&row.ip) {
hat.1.push(tp);
} else {
sorter.insert(row.ip.clone(), (Vec::new(), vec![tp]));
}
}
let mut result = Vec::new();
for (ip, (down, up)) in sorter.iter() {
result.push(ThroughputHost{
node_id: node_id.clone(),
node_name: format!("{ip} {node_name}"),
down: down.clone(),
up: up.clone(),
});
}
return Ok(result);
}
// Fill download
for row in rows.iter().filter(|r| r.direction == "down") {
let tp = Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
};
if let Some(hat) = sorter.get_mut(&row.ip) {
hat.0.push(tp);
} else {
sorter.insert(row.ip.clone(), (vec![tp], Vec::new()));
}
}
Err(anyhow::Error::msg("Unable to query influx"))
// Fill upload
for row in rows.iter().filter(|r| r.direction == "up") {
let tp = Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
};
if let Some(hat) = sorter.get_mut(&row.ip) {
hat.1.push(tp);
} else {
sorter.insert(row.ip.clone(), (Vec::new(), vec![tp]));
}
}
let mut result = Vec::new();
for (ip, (down, up)) in sorter.iter() {
result.push(ThroughputHost {
node_id: node_id.clone(),
node_name: format!("{ip} {node_name}"),
down: down.clone(),
up: up.clone(),
});
}
Ok(result)
}