More flexible time period with a unit test, rearrange a bit for clarity.

This commit is contained in:
Herbert Wolverson
2023-08-11 13:40:12 +00:00
parent afcec77b06
commit 8d06e7db5f
14 changed files with 63 additions and 31 deletions

View File

@@ -1,5 +1,4 @@
use std::sync::Arc;
use crate::web::wss::{
nodes::node_status,
queries::{
@@ -15,7 +14,6 @@ use crate::web::wss::{
send_throughput_for_all_nodes_by_circuit, send_throughput_for_all_nodes_by_site,
send_throughput_for_node, site_heat_map,
site_tree::send_site_tree,
time_period::InfluxTimePeriod,
},
};
use axum::{
@@ -29,7 +27,7 @@ use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::{mpsc::Sender, Mutex};
use tracing::instrument;
use wasm_pipe_types::{WasmRequest, WasmResponse};
mod influx_query_builder;
use self::queries::InfluxTimePeriod;
mod login;
mod nodes;
mod queries;

View File

@@ -5,7 +5,7 @@ use influxdb2::{FromDataPoint, models::Query, Client};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::WasmResponse;
use super::time_period::InfluxTimePeriod;
use super::influx::InfluxTimePeriod;
#[tracing::instrument(skip(cnn, tx, key, circuit_id))]
pub async fn send_extended_device_info(

View File

@@ -4,7 +4,7 @@ use influxdb2_structmap::FromMap;
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details, OrganizationDetails};
use anyhow::{Result, Error};
use tracing::instrument;
use super::queries::time_period::InfluxTimePeriod;
use super::InfluxTimePeriod;
#[derive(Debug)]
pub struct InfluxQueryBuilder {

View File

@@ -0,0 +1,4 @@
mod influx_query_builder;
pub use influx_query_builder::*;
mod time_period;
pub use time_period::*;

View File

@@ -24,29 +24,44 @@ const fn aggregate_window(seconds: i32) -> i32 {
seconds / SAMPLES_PER_GRAPH
}
fn period_string_to_seconds(period: &str) -> i32 {
let last_char = period.chars().last().unwrap();
let number_part = &period[..period.len() - 1];
let number = number_part.parse::<i32>().unwrap_or(5);
let start_seconds = match last_char {
's' => number,
'm' => minutes_to_seconds(number),
'h' => hours_to_seconds(number),
'd' => days_to_seconds(number),
_ => {
tracing::warn!("Unknown time unit: {last_char}");
minutes_to_seconds(5)
}
};
start_seconds
}
impl InfluxTimePeriod {
pub fn new(period: &str) -> Self {
let start_seconds = match period {
"5m" => minutes_to_seconds(5),
"15m" => minutes_to_seconds(15),
"1h" => hours_to_seconds(1),
"6h" => hours_to_seconds(6),
"12h" => hours_to_seconds(12),
"24h" => hours_to_seconds(24),
"7d" => days_to_seconds(7),
"28d" => days_to_seconds(28),
let last_char = period.chars().last().unwrap();
let number_part = &period[..period.len() - 1];
let number = number_part.parse::<i32>().unwrap_or(5);
let start_seconds = match last_char {
's' => number,
'm' => minutes_to_seconds(number),
'h' => hours_to_seconds(number),
'd' => days_to_seconds(number),
_ => {
tracing::warn!("Unknown period: {}", period);
minutes_to_seconds(5)
tracing::warn!("Unknown time unit: {last_char}");
minutes_to_seconds(5)
}
};
let start = format!("-{}s", start_seconds);
let aggregate_seconds = aggregate_window(start_seconds);
let aggregate = format!("{}s", aggregate_seconds);
let sample = start_seconds / 100;
println!("Period: {period}, Seconds: {start_seconds}, AggSec: {aggregate_seconds}, Samples: {sample}");
Self {
start: start.to_string(),
aggregate: aggregate.to_string(),
@@ -88,4 +103,20 @@ impl From<&String> for InfluxTimePeriod {
fn from(period: &String) -> Self {
Self::new(period)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_period_to_seconds() {
assert_eq!(period_string_to_seconds("5s"), 5);
assert_eq!(period_string_to_seconds("5m"), 300);
assert_eq!(period_string_to_seconds("5h"), 18000);
assert_eq!(period_string_to_seconds("5d"), 432000);
// Test that an unknown returns the default
assert_eq!(period_string_to_seconds("5x"), 300);
}
}

View File

@@ -1,6 +1,8 @@
//! Provides pre-packaged queries for obtaining data, that will
//! then be used by the web server to respond to requests.
mod influx;
pub(crate) use influx::*;
mod circuit_info;
pub mod ext_device;
mod node_perf;
@@ -12,7 +14,6 @@ mod site_info;
mod site_parents;
pub mod site_tree;
mod throughput;
pub mod time_period;
pub use circuit_info::send_circuit_info;
pub use node_perf::send_perf_for_node;
pub use packet_counts::{send_packets_for_all_nodes, send_packets_for_node};

View File

@@ -3,7 +3,8 @@ use influxdb2::{Client, FromDataPoint, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::{PerfHost, Perf, WasmResponse};
use super::time_period::InfluxTimePeriod;
use super::influx::InfluxTimePeriod;
#[derive(Debug, FromDataPoint)]
pub struct PerfRow {

View File

@@ -1,12 +1,11 @@
//! Packet-per-second data queries
mod packet_row;
use self::packet_row::PacketRow;
use super::time_period::InfluxTimePeriod;
use crate::web::wss::influx_query_builder::InfluxQueryBuilder;
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{PacketHost, Packets, WasmResponse};
use super::influx::{InfluxTimePeriod, InfluxQueryBuilder};
fn add_by_direction(direction: &str, down: &mut Vec<Packets>, up: &mut Vec<Packets>, row: &PacketRow) {
match direction {

View File

@@ -11,7 +11,7 @@ use tracing::instrument;
use wasm_pipe_types::{RttHost, Rtt, WasmResponse};
use crate::web::wss::queries::rtt::rtt_row::RttCircuitRow;
use self::rtt_row::RttRow;
use super::time_period::InfluxTimePeriod;
use super::influx::InfluxTimePeriod;
mod rtt_row;
#[instrument(skip(cnn, tx, key, site_id, period))]

View File

@@ -1,4 +1,3 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, influx_query_builder::InfluxQueryBuilder};
use pgdb::{
sqlx::{Pool, Postgres},
NodeStatus
@@ -6,7 +5,7 @@ use pgdb::{
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{Rtt, RttHost, WasmResponse};
use crate::web::wss::queries::influx::{InfluxTimePeriod, InfluxQueryBuilder};
use super::rtt_row::{RttRow, RttHistoRow};
#[instrument(skip(cnn, tx, key, period))]

View File

@@ -1,4 +1,3 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, influx_query_builder::InfluxQueryBuilder};
use pgdb::{
sqlx::{Pool, Postgres},
NodeStatus
@@ -6,6 +5,7 @@ use pgdb::{
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{Rtt, RttHost, WasmResponse};
use crate::web::wss::queries::influx::{InfluxTimePeriod, InfluxQueryBuilder};
use super::rtt_row::RttSiteRow;
#[instrument(skip(cnn, tx, key, period))]

View File

@@ -1,5 +1,3 @@
use super::time_period::InfluxTimePeriod;
use crate::web::wss::influx_query_builder::InfluxQueryBuilder;
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::FromDataPoint;
use pgdb::sqlx::{Pool, Postgres};
@@ -10,6 +8,8 @@ use std::collections::HashMap;
use wasm_pipe_types::WasmResponse;
use itertools::Itertools;
use super::influx::{InfluxTimePeriod, InfluxQueryBuilder};
fn headings_sorter<T: HeatMapData>(rows: Vec<T>) -> HashMap<String, Vec<(DateTime<FixedOffset>, f64)>> {
let mut headings = rows.iter().map(|r| r.time()).collect::<Vec<_>>();
headings.sort();

View File

@@ -6,11 +6,10 @@ use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{ThroughputHost, Throughput, WasmResponse};
use crate::web::wss::influx_query_builder::InfluxQueryBuilder;
use self::throughput_row::{ThroughputRow, ThroughputRowBySite, ThroughputRowByCircuit};
use super::time_period::InfluxTimePeriod;
mod throughput_row;
pub use site_stack::send_site_stack_map;
use super::influx::{InfluxQueryBuilder, InfluxTimePeriod};
fn add_by_direction(direction: &str, down: &mut Vec<Throughput>, up: &mut Vec<Throughput>, row: &ThroughputRow) {
match direction {

View File

@@ -1,4 +1,3 @@
use crate::web::wss::queries::time_period::InfluxTimePeriod;
use pgdb::{
organization_cache::get_org_details,
sqlx::{Pool, Postgres},
@@ -7,6 +6,7 @@ use pgdb::{
use tokio::sync::mpsc::Sender;
use tracing::{error, instrument};
use wasm_pipe_types::{SiteStackHost, WasmResponse};
use crate::web::wss::queries::influx::InfluxTimePeriod;
#[derive(Debug, influxdb2::FromDataPoint)]
pub struct SiteStackRow {