diff --git a/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs index 44396aff..119050e6 100644 --- a/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs +++ b/src/rust/long_term_stats/lts_node/src/web/wss/queries/throughput/mod.rs @@ -6,17 +6,65 @@ use influxdb2::{Client, models::Query}; use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details}; use tracing::instrument; use wasm_pipe_types::{ThroughputHost, Throughput}; -use crate::web::wss::send_response; +use crate::web::wss::{send_response, influx_query_builder::InfluxQueryBuilder}; use self::throughput_row::{ThroughputRow, ThroughputRowBySite, ThroughputRowByCircuit}; use super::time_period::InfluxTimePeriod; mod throughput_row; pub use site_stack::send_site_stack_map; +fn add_by_direction(direction: &str, down: &mut Vec, up: &mut Vec, row: &ThroughputRow) { + match direction { + "down" => { + down.push(Throughput { + value: row.avg, + date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(), + l: row.min, + u: row.max - row.min, + }); + } + "up" => { + up.push(Throughput { + value: row.avg, + date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(), + l: row.min, + u: row.max - row.min, + }); + } + _ => {} + } +} + #[instrument(skip(cnn, socket, key, period))] pub async fn send_throughput_for_all_nodes(cnn: &Pool, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> { - let nodes = get_throughput_for_all_nodes(cnn, key, period).await?; + let node_status = pgdb::node_status(cnn, key).await?; + let mut nodes = Vec::::new(); + InfluxQueryBuilder::new(period.clone()) + .with_measurement("bits") + .with_fields(&["min", "max", "avg"]) + .with_groups(&["host_id", "direction", "_field"]) + .execute::(cnn, key) + .await? + .into_iter() + .for_each(|row| { + if let Some(node) = nodes.iter_mut().find(|n| n.node_id == row.host_id) { + add_by_direction(&row.direction, &mut node.down, &mut node.up, &row); + } else { + let mut down = Vec::new(); + let mut up = Vec::new(); + + add_by_direction(&row.direction, &mut down, &mut up, &row); + + let node_name = if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) { + node.node_name.clone() + } else { + row.host_id.clone() + }; + + nodes.push(ThroughputHost { node_id: row.host_id, node_name, down, up }); + } + }); send_response(socket, wasm_pipe_types::WasmResponse::BitsChart { nodes }).await; - Ok(()) + Ok(()) } #[instrument(skip(cnn, socket, key, period, site_name))]