This commit is contained in:
Herbert Wolverson 2023-07-20 14:21:45 +00:00
parent 185c948112
commit abc1c9a211
3 changed files with 3 additions and 45 deletions

View File

@ -10,4 +10,4 @@ cp ../../site_build/output/* .
cp ../../site_build/src/main.html .
cp ../../site_build/wasm/wasm_pipe_bg.wasm .
popd
RUST_LOG=info RUST_BACKTRACE=1 cargo run --release
RUST_LOG=info RUST_BACKTRACE=1 cargo run

View File

@ -1,11 +1,10 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, send_response, influx_query_builder::InfluxQueryBuilder};
use axum::extract::ws::WebSocket;
use pgdb::{
organization_cache::get_org_details,
sqlx::{Pool, Postgres},
NodeStatus, OrganizationDetails,
NodeStatus
};
use tracing::{error, instrument};
use tracing::instrument;
use wasm_pipe_types::{Rtt, RttHost};
use super::rtt_row::RttRow;
@ -30,30 +29,6 @@ pub async fn send_rtt_for_all_nodes(
Ok(())
}
#[instrument(skip(org, period))]
async fn query_rtt_all_nodes(
org: &OrganizationDetails,
period: &InfluxTimePeriod,
) -> anyhow::Result<Vec<RttRow>> {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = influxdb2::Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!("from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"rtt\")
|> filter(fn: (r) => r[\"_field\"] == \"avg\" or r[\"_field\"] == \"min\" or r[\"_field\"] == \"max\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> group(columns: [\"host_id\", \"_field\"])
|> {}
|> yield(name: \"last\")
",
org.influx_bucket, period.range(), org.key, period.aggregate_window()
);
//println!("{qs}");
let query = influxdb2::models::Query::new(qs);
Ok(client.query::<RttRow>(Some(query)).await?)
}
fn rtt_rows_to_result(rows: Vec<RttRow>, node_status: Vec<NodeStatus>) -> Vec<RttHost> {
let mut result = Vec::<RttHost>::new();
for row in rows.into_iter() {

View File

@ -87,23 +87,6 @@ pub async fn send_throughput_for_node(cnn: &Pool<Postgres>, socket: &mut WebSock
Ok(())
}
pub async fn get_throughput_for_all_nodes(cnn: &Pool<Postgres>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<ThroughputHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();
for node in node_status {
futures.push(get_throughput_for_node(
cnn,
key,
node.node_id.to_string(),
node.node_name.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<ThroughputHost>> = join_all(futures).await
.into_iter().collect();
all_nodes
}
pub async fn get_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, key: &str, period: InfluxTimePeriod, site_name: &str) -> anyhow::Result<Vec<ThroughputHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();