Second try at an idiomatic Rust query builder for Influx. This time a bit more flexible.

This commit is contained in:
Herbert Wolverson 2023-08-11 15:13:37 +00:00
parent 8d06e7db5f
commit 18a211265a
3 changed files with 172 additions and 21 deletions

View File

@ -1,4 +1,8 @@
//! InfluxDB query builder and support code.
mod influx_query_builder;
pub use influx_query_builder::*;
mod time_period;
pub use time_period::*;
mod query_builder2;
pub use query_builder2::*;

View File

@ -0,0 +1,126 @@
use influxdb2::{Client, models::Query};
use influxdb2_structmap::FromMap;
use pgdb::{sqlx::{Pool, Postgres}, OrganizationDetails, organization_cache::get_org_details};
use super::InfluxTimePeriod;
pub struct QueryBuilder<'a> {
lines: Vec<String>,
period: Option<&'a InfluxTimePeriod>,
org: Option<OrganizationDetails>,
}
#[allow(dead_code)]
impl <'a> QueryBuilder <'a> {
/// Construct a new, completely empty query.
pub fn new() -> Self {
Self {
lines: Vec::new(),
period: None,
org: None,
}
}
pub fn with_period(mut self, period: &'a InfluxTimePeriod) -> Self {
self.period = Some(period);
self
}
pub fn with_org(mut self, org: OrganizationDetails) -> Self {
self.org = Some(org);
self
}
pub async fn derive_org(mut self, cnn: &Pool<Postgres>, key: &str) -> QueryBuilder<'a> {
let org = get_org_details(cnn, key).await;
self.org = org;
self
}
pub fn add_line(mut self, line: &str) -> Self {
self.lines.push(line.to_string());
self
}
pub fn add_lines(mut self, lines: &[&str]) -> Self {
for line in lines.iter() {
self.lines.push(line.to_string());
}
self
}
pub fn bucket(mut self) -> Self {
if let Some(org) = &self.org {
self.lines.push(format!("from(bucket: \"{}\")", org.influx_bucket));
} else {
tracing::warn!("No organization in query, cannot add bucket");
}
self
}
pub fn range(mut self) -> Self {
if let Some(period) = &self.period {
self.lines.push(format!("|> {}", period.range()));
} else {
tracing::warn!("No period in query, cannot add range");
}
self
}
pub fn filter(mut self, filter: &str) -> Self {
if !filter.is_empty() {
self.lines.push(format!("|> filter(fn: (r) => {})", filter));
}
self
}
pub fn filter_and(mut self, filters: &[&str]) -> Self {
let all_filters = filters.join(" and ");
self.lines.push(format!("|> filter(fn: (r) => {})", all_filters));
self
}
pub fn measure_field_org(mut self, measurement: &str, field: &str) -> Self {
if let Some(org) = &self.org {
self.lines.push(format!("|> filter(fn: (r) => r[\"_field\"] == \"{}\" and r[\"_measurement\"] == \"{}\" and r[\"organization_id\"] == \"{}\")", field, measurement, org.key));
} else {
tracing::warn!("No organization in query, cannot add measure_field_org");
}
self
}
pub fn aggregate_window(mut self) -> Self {
if let Some(period) = &self.period {
self.lines.push(format!("|> {}", period.aggregate_window()));
} else {
tracing::warn!("No period in query, cannot add aggregate_window");
}
self
}
pub fn group(mut self, columns: &[&str]) -> Self {
let group_by = columns.join(", ");
self.lines.push(format!("|> group(columns: [\"{}\"])", group_by));
self
}
pub async fn execute<T>(&self) -> anyhow::Result<Vec<T>>
where T: FromMap + std::fmt::Debug
{
let qs = self.lines.join("\n");
tracing::info!("Query:\n{}", qs);
if let Some(org) = &self.org {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let query = Query::new(qs.clone());
let rows = client.query::<T>(Some(query)).await;
if let Ok(rows) = rows {
Ok(rows)
} else {
tracing::error!("InfluxDb query error: {rows:?} for: {qs}");
anyhow::bail!("Influx query error");
}
} else {
anyhow::bail!("No organization in query, cannot execute");
}
}
}

View File

@ -1,3 +1,4 @@
use crate::web::wss::queries::{influx::InfluxTimePeriod, QueryBuilder};
use pgdb::{
organization_cache::get_org_details,
sqlx::{Pool, Postgres},
@ -6,7 +7,6 @@ use pgdb::{
use tokio::sync::mpsc::Sender;
use tracing::{error, instrument};
use wasm_pipe_types::{SiteStackHost, WasmResponse};
use crate::web::wss::queries::influx::InfluxTimePeriod;
#[derive(Debug, influxdb2::FromDataPoint)]
pub struct SiteStackRow {
@ -93,28 +93,34 @@ async fn query_circuits_influx(
hosts: &[(String, String)],
host_filter: &str,
) -> anyhow::Result<Vec<SiteStackRow>> {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = influxdb2::Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!("from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_field\"] == \"max\" and r[\"_measurement\"] == \"host_bits\" and r[\"organization_id\"] == \"{}\")
|> {}
|> filter(fn: (r) => {} )
|> group(columns: [\"circuit_id\", \"_field\", \"direction\"])",
org.influx_bucket, period.range(), org.key, period.aggregate_window(), host_filter);
let query = influxdb2::models::Query::new(qs);
//let rows = client.query_raw(Some(query)).await;
let rows = client.query::<CircuitStackRow>(Some(query)).await?;
let rows = rows.into_iter().map(|row| {
SiteStackRow {
node_name: hosts.iter().find(|h| h.0 == row.circuit_id).unwrap().1.clone(),
if host_filter.is_empty() {
return Ok(Vec::new());
}
let rows = QueryBuilder::new()
.with_period(period)
.with_org(org.clone())
.bucket()
.range()
.measure_field_org("host_bits", "max")
.aggregate_window()
.filter(host_filter)
.group(&["circuit_id", "_field", "direction"])
.execute::<CircuitStackRow>()
.await?
.into_iter()
.map(|row| SiteStackRow {
node_name: hosts
.iter()
.find(|h| h.0 == row.circuit_id)
.unwrap()
.1
.clone(),
node_parents: "".to_string(),
bits_max: row.max / 8.0,
time: row.time,
direction: row.direction,
}
}).collect();
})
.collect();
Ok(rows)
}
@ -124,6 +130,21 @@ async fn query_site_stack_influx(
period: &InfluxTimePeriod,
site_index: i32,
) -> anyhow::Result<Vec<SiteStackRow>> {
Ok(QueryBuilder::new()
.add_line("import \"strings\"")
.with_period(period)
.with_org(org.clone())
.bucket()
.range()
.measure_field_org("tree", "bits_max")
.filter_and(&["exists r[\"node_parents\"]", "exists r[\"node_index\"]"])
.aggregate_window()
.filter(&format!("strings.hasSuffix(v: r[\"node_parents\"], suffix: \"S{}S\" + r[\"node_index\"] + \"S\")", site_index))
.group(&["node_name", "node_parents", "_field", "node_index", "direction"])
.execute::<SiteStackRow>()
.await?
)
/*
let influx_url = format!("http://{}:8086", org.influx_host);
let client = influxdb2::Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!("import \"strings\"
@ -139,7 +160,7 @@ async fn query_site_stack_influx(
let query = influxdb2::models::Query::new(qs);
//let rows = client.query_raw(Some(query)).await;
Ok(client.query::<SiteStackRow>(Some(query)).await?)
Ok(client.query::<SiteStackRow>(Some(query)).await?)*/
}
fn site_rows_to_hosts(rows: Vec<SiteStackRow>) -> Vec<SiteStackHost> {
@ -211,7 +232,7 @@ fn reduce_to_x_entries(result: &mut Vec<SiteStackHost>) {
}
});
});
result.truncate(MAX_HOSTS-1);
result.truncate(MAX_HOSTS - 1);
result.push(others);
}
}