Stop using sample - use a better window function instead - for site stacks.

This commit is contained in:
Herbert Wolverson 2023-08-10 18:52:31 +00:00
parent 3ec62bc598
commit 1ec7c7a8ee
4 changed files with 43 additions and 23 deletions

View File

@ -4,6 +4,10 @@ version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[features]
default = []
tokio-console = ["console-subscriber"]
[dependencies]
tokio = { version = "1.25.0", features = ["full"] }
anyhow = "1"
@ -26,6 +30,6 @@ chrono = "0"
miniz_oxide = "0.7.1"
tokio-util = { version = "0.7.8", features = ["io"] }
wasm_pipe_types = { path = "../wasm_pipe_types" }
console-subscriber = "0.1.10"
console-subscriber = {version = "0.1.10", optional = true }
itertools = "0.11.0"
urlencoding = "2.1.3"

View File

@ -2,8 +2,8 @@ mod web;
use tracing::{error, info};
use tracing_subscriber::fmt::format::FmtSpan;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
#[cfg(not(feature="tokio-console"))]
fn set_console_logging() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
let subscriber = tracing_subscriber::fmt()
// Use a more compact, abbreviated log format
@ -23,9 +23,29 @@ async fn main() -> anyhow::Result<()> {
// Set the subscriber as the default
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
#[cfg(feature="tokio-console")]
fn set_tokio_console() {
// Initialize the Tokio Console subscription
//console_subscriber::init();
console_subscriber::init();
}
#[cfg(not(feature="tokio-console"))]
fn setup_tracing() {
set_console_logging().unwrap();
}
#[cfg(feature="tokio-console")]
fn setup_tracing() {
set_tokio_console();
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
setup_tracing();
// Get the database connection pool
let pool = pgdb::get_connection_pool(5).await;

View File

@ -41,7 +41,7 @@ pub async fn ws_handler(
ws.on_upgrade(move |sock| handle_socket(sock, state))
}
#[instrument(skip(socket, cnn))]
#[instrument(skip(socket, cnn), name = "handle_wss")]
async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
tracing::info!("WebSocket Connected");
let credentials: Arc<Mutex<Option<login::LoginResult>>> = Arc::new(Mutex::new(None));

View File

@ -12,7 +12,7 @@ use wasm_pipe_types::{SiteStackHost, WasmResponse};
pub struct SiteStackRow {
pub node_name: String,
pub node_parents: String,
pub bits_max: i64,
pub bits_max: f64,
pub time: chrono::DateTime<chrono::FixedOffset>,
pub direction: String,
}
@ -22,7 +22,7 @@ impl Default for SiteStackRow {
Self {
node_name: "".to_string(),
node_parents: "".to_string(),
bits_max: 0,
bits_max: 0.0,
time: chrono::DateTime::<chrono::Utc>::MIN_UTC.into(),
direction: "".to_string(),
}
@ -32,7 +32,7 @@ impl Default for SiteStackRow {
#[derive(Debug, influxdb2::FromDataPoint)]
pub struct CircuitStackRow {
pub circuit_id: String,
pub max: i64,
pub max: f64,
pub time: chrono::DateTime<chrono::FixedOffset>,
pub direction: String,
}
@ -41,7 +41,7 @@ impl Default for CircuitStackRow {
fn default() -> Self {
Self {
circuit_id: "".to_string(),
max: 0,
max: 0.0,
time: chrono::DateTime::<chrono::Utc>::MIN_UTC.into(),
direction: "".to_string(),
}
@ -100,9 +100,8 @@ async fn query_circuits_influx(
|> filter(fn: (r) => r[\"_field\"] == \"max\" and r[\"_measurement\"] == \"host_bits\" and r[\"organization_id\"] == \"{}\")
|> {}
|> filter(fn: (r) => {} )
|> group(columns: [\"circuit_id\", \"_field\", \"direction\"])
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, period.sample(), host_filter);
|> group(columns: [\"circuit_id\", \"_field\", \"direction\"])",
org.influx_bucket, period.range(), org.key, period.aggregate_window(), host_filter);
let query = influxdb2::models::Query::new(qs);
//let rows = client.query_raw(Some(query)).await;
@ -111,7 +110,7 @@ async fn query_circuits_influx(
SiteStackRow {
node_name: hosts.iter().find(|h| h.0 == row.circuit_id).unwrap().1.clone(),
node_parents: "".to_string(),
bits_max: row.max / 8,
bits_max: row.max / 8.0,
time: row.time,
direction: row.direction,
}
@ -132,14 +131,11 @@ async fn query_site_stack_influx(
from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_field\"] == \"bits_max\" and r[\"_measurement\"] == \"tree\" and r[\"organization_id\"] == \"{}\")
|> {}
|> filter(fn: (r) => exists r[\"node_parents\"] and exists r[\"node_index\"])
|> {}
|> filter(fn: (r) => strings.hasSuffix(v: r[\"node_parents\"], suffix: \"S{}S\" + r[\"node_index\"] + \"S\" ))
|> group(columns: [\"node_name\", \"node_parents\", \"_field\", \"node_index\", \"direction\"])
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, period.sample(), site_index);
//println!("{qs}");
|> group(columns: [\"node_name\", \"node_parents\", \"_field\", \"node_index\", \"direction\"])",
org.influx_bucket, period.range(), org.key, period.aggregate_window(), site_index);
let query = influxdb2::models::Query::new(qs);
//let rows = client.query_raw(Some(query)).await;
@ -153,12 +149,12 @@ fn site_rows_to_hosts(rows: Vec<SiteStackRow>) -> Vec<SiteStackHost> {
if row.direction == "down" {
r.download.push((
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
row.bits_max as i64,
));
} else {
r.upload.push((
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
row.bits_max as i64,
));
}
} else if row.direction == "down" {
@ -166,7 +162,7 @@ fn site_rows_to_hosts(rows: Vec<SiteStackRow>) -> Vec<SiteStackHost> {
node_name: row.node_name.clone(),
download: vec![(
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
row.bits_max as i64,
)],
upload: vec![],
});
@ -175,7 +171,7 @@ fn site_rows_to_hosts(rows: Vec<SiteStackRow>) -> Vec<SiteStackHost> {
node_name: row.node_name.clone(),
upload: vec![(
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
row.bits_max as i64,
)],
download: vec![],
});