Use the new query builder for send_throughput_for_all_nodes and move to a grouped, efficient iterator based operation.

This commit is contained in:
Herbert Wolverson 2023-07-18 15:44:40 +00:00
parent 600d990345
commit ed916bff11

View File

@ -6,17 +6,65 @@ use influxdb2::{Client, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tracing::instrument;
use wasm_pipe_types::{ThroughputHost, Throughput};
use crate::web::wss::send_response;
use crate::web::wss::{send_response, influx_query_builder::InfluxQueryBuilder};
use self::throughput_row::{ThroughputRow, ThroughputRowBySite, ThroughputRowByCircuit};
use super::time_period::InfluxTimePeriod;
mod throughput_row;
pub use site_stack::send_site_stack_map;
fn add_by_direction(direction: &str, down: &mut Vec<Throughput>, up: &mut Vec<Throughput>, row: &ThroughputRow) {
match direction {
"down" => {
down.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
"up" => {
up.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
_ => {}
}
}
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_throughput_for_all_nodes(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes(cnn, key, period).await?;
let node_status = pgdb::node_status(cnn, key).await?;
let mut nodes = Vec::<ThroughputHost>::new();
InfluxQueryBuilder::new(period.clone())
.with_measurement("bits")
.with_fields(&["min", "max", "avg"])
.with_groups(&["host_id", "direction", "_field"])
.execute::<ThroughputRow>(cnn, key)
.await?
.into_iter()
.for_each(|row| {
if let Some(node) = nodes.iter_mut().find(|n| n.node_id == row.host_id) {
add_by_direction(&row.direction, &mut node.down, &mut node.up, &row);
} else {
let mut down = Vec::new();
let mut up = Vec::new();
add_by_direction(&row.direction, &mut down, &mut up, &row);
let node_name = if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) {
node.node_name.clone()
} else {
row.host_id.clone()
};
nodes.push(ThroughputHost { node_id: row.host_id, node_name, down, up });
}
});
send_response(socket, wasm_pipe_types::WasmResponse::BitsChart { nodes }).await;
Ok(())
Ok(())
}
#[instrument(skip(cnn, socket, key, period, site_name))]