Merge pull request #384 from LibreQoE/long_term_stats

Long term stats > Develop
This commit is contained in:
Robert Chacón 2023-08-07 17:50:04 -06:00 committed by GitHub
commit 9d7aae311f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1565 additions and 745 deletions

216
src/rust/Cargo.lock generated
View File

@ -166,7 +166,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -177,7 +177,7 @@ checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -311,7 +311,7 @@ dependencies = [
"regex",
"rustc-hash",
"shlex",
"syn 2.0.16",
"syn 2.0.27",
"which",
]
@ -529,7 +529,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -564,6 +564,42 @@ dependencies = [
"winapi",
]
[[package]]
name = "console-api"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e"
dependencies = [
"prost",
"prost-types",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4cf42660ac07fcebed809cfe561dd8730bcd35b075215e6479c516bcd0d11cb"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures",
"hdrhistogram",
"humantime",
"prost-types",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber 0.3.17",
]
[[package]]
name = "cookie"
version = "0.17.0"
@ -637,7 +673,7 @@ dependencies = [
"clap 3.2.25",
"criterion-plot",
"futures",
"itertools",
"itertools 0.10.5",
"lazy_static",
"num-traits",
"oorandom",
@ -659,7 +695,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
dependencies = [
"cast",
"itertools",
"itertools 0.10.5",
]
[[package]]
@ -871,7 +907,7 @@ dependencies = [
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -1191,7 +1227,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -1330,6 +1366,19 @@ dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "hdrhistogram"
version = "7.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8"
dependencies = [
"base64 0.13.1",
"byteorder",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "headers"
version = "0.3.8"
@ -1482,6 +1531,18 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
@ -1583,7 +1644,7 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16"
dependencies = [
"itertools",
"itertools 0.10.5",
"proc-macro2",
"quote",
"regex",
@ -1705,6 +1766,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.6"
@ -2143,9 +2213,11 @@ dependencies = [
"anyhow",
"axum",
"chrono",
"console-subscriber",
"futures",
"influxdb2",
"influxdb2-structmap",
"itertools 0.11.0",
"lqos_config",
"lts_client",
"miniz_oxide",
@ -2479,7 +2551,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -2615,7 +2687,7 @@ dependencies = [
"proc-macro2",
"proc-macro2-diagnostics",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -2665,7 +2737,7 @@ checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -2727,14 +2799,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "617feabb81566b593beb4886fb8c1f38064169dae4dccad0e3220160c3b37203"
dependencies = [
"proc-macro2",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
name = "proc-macro2"
version = "1.0.57"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4ec6d5fe0b140acb27c9a0444118cf55bfbb4e0b259739429abb4521dd67c16"
checksum = "18fb31db3f9bddb2ea821cde30a9f70117e3f119938b5ee630b7403aa6e2ead9"
dependencies = [
"unicode-ident",
]
@ -2747,11 +2819,43 @@ checksum = "606c4ba35817e2922a308af55ad51bab3645b59eae5c570d4a6cf07e36bd493b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
"version_check",
"yansi",
]
[[package]]
name = "prost"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-derive"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4"
dependencies = [
"anyhow",
"itertools 0.10.5",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "prost-types"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13"
dependencies = [
"prost",
]
[[package]]
name = "pyo3"
version = "0.18.3"
@ -2814,9 +2918,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.27"
version = "1.0.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500"
checksum = "50f3b39ccfb720540debaa0164757101c08ecb8d326b15358ce76a62c7e85965"
dependencies = [
"proc-macro2",
]
@ -2928,7 +3032,7 @@ checksum = "8d2275aab483050ab2a7364c1a46604865ee7d6906684e08db0f090acf74f9e7"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -3105,7 +3209,7 @@ dependencies = [
"proc-macro2",
"quote",
"rocket_http",
"syn 2.0.16",
"syn 2.0.27",
"unicode-xid",
]
@ -3289,7 +3393,7 @@ checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -3475,7 +3579,7 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c12bc9199d1db8234678b7051747c07f517cdcf019262d1847b94ec8b1aee3e"
dependencies = [
"itertools",
"itertools 0.10.5",
"nom",
"unicode_categories",
]
@ -3662,9 +3766,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.16"
version = "2.0.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6f671d4b5ffdb8eadec19c0ae67fe2639df8684bd7bc4b83d986b8db549cf01"
checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0"
dependencies = [
"proc-macro2",
"quote",
@ -3749,22 +3853,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.40"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.40"
version = "1.0.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -3856,9 +3960,20 @@ dependencies = [
"signal-hook-registry",
"socket2",
"tokio-macros",
"tracing",
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-macros"
version = "2.1.0"
@ -3867,7 +3982,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -3959,6 +4074,34 @@ dependencies = [
"winnow",
]
[[package]]
name = "tonic"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-trait",
"axum",
"base64 0.21.0",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"tokio",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
@ -3967,9 +4110,13 @@ checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project",
"pin-project-lite",
"rand",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
@ -4033,7 +4180,7 @@ checksum = "0f57e3ca2a01450b1a921183a9c9cbfda207fd822cef4ccb00a65402cbba7a74"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]
[[package]]
@ -4357,7 +4504,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
"wasm-bindgen-shared",
]
@ -4391,7 +4538,7 @@ checksum = "e128beba882dd1eb6200e1dc92ae6c5dbaa4311aa7bb211ca035779e5efc39f8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -4423,6 +4570,7 @@ dependencies = [
"miniz_oxide",
"serde_cbor",
"serde_json",
"thiserror",
"wasm-bindgen",
"wasm_pipe_types",
"web-sys",
@ -4798,5 +4946,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.16",
"syn 2.0.27",
]

View File

@ -17,7 +17,7 @@ pub async fn collect_tree(
totals: &Option<Vec<StatsTreeNode>>,
) -> anyhow::Result<()> {
if let Some(tree) = totals {
//println!("{tree:?}");
//info!("{tree:?}");
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(&influx_url, &org.influx_org, &org.influx_token);
let mut points: Vec<DataPoint> = Vec::new();
@ -31,12 +31,22 @@ pub async fn collect_tree(
.await?;
for node in tree.iter() {
let mut parents = format!("S{}S", node.parents.iter().map(|p| p.to_string()).collect::<Vec<String>>().join("S"));
if parents.is_empty() {
parents = "0S".to_string();
}
let my_id = node.index.to_string();
//let parent = node.immediate_parent.unwrap_or(0).to_string();
//warn!("{}: {}", node.name, parents);
//warn!("{parent}");
points.push(
DataPoint::builder("tree")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.tag("direction", "down".to_string())
.tag("node_parents", parents.clone())
.tag("node_index", my_id.clone())
.timestamp(timestamp)
.field("bits_min", node.current_throughput.min.0 as i64)
.field("bits_max", node.current_throughput.max.0 as i64)
@ -49,6 +59,8 @@ pub async fn collect_tree(
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.tag("direction", "up".to_string())
.tag("node_parents", parents.clone())
.tag("node_index", my_id.clone())
.timestamp(timestamp)
.field("bits_min", node.current_throughput.min.1 as i64)
.field("bits_max", node.current_throughput.max.1 as i64)
@ -60,6 +72,8 @@ pub async fn collect_tree(
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.tag("node_parents", parents)
.tag("node_index", my_id.clone())
.timestamp(timestamp)
.field("rtt_min", node.rtt.min as i64 / 100)
.field("rtt_max", node.rtt.max as i64 / 100)
@ -92,13 +106,16 @@ pub async fn collect_tree(
error!("Error committing transaction: {}", e);
}
client
if let Err(e) = client
.write_with_precision(
&org.influx_bucket,
stream::iter(points),
influxdb2::api::write::TimestampPrecision::Seconds,
)
.await?;
.await {
error!("Error committing tree to Influx: {}", e);
}
info!("Wrote tree to InfluxDB");
}
Ok(())
}

View File

@ -19,10 +19,12 @@ influxdb2-structmap = "0"
num-traits = "0"
futures = "0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-subscriber = { version = "0.3" }
tower = { version = "0.4", features = ["util"] }
tower-http = { version = "0.4.0", features = ["fs", "trace"] }
chrono = "0"
miniz_oxide = "0.7.1"
tokio-util = { version = "0.7.8", features = ["io"] }
wasm_pipe_types = { path = "../wasm_pipe_types" }
console-subscriber = "0.1.10"
itertools = "0.11.0"

View File

@ -10,4 +10,4 @@ cp ../../site_build/output/* .
cp ../../site_build/src/main.html .
cp ../../site_build/wasm/wasm_pipe_bg.wasm .
popd
RUST_LOG=info RUST_BACKTRACE=1 cargo run
RUST_LOG=info RUST_BACKTRACE=1 cargo run --release

View File

@ -1,10 +1,31 @@
mod web;
use tracing::{info, error};
use tracing::{error, info};
use tracing_subscriber::fmt::format::FmtSpan;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// install global collector configured based on RUST_LOG env var.
tracing_subscriber::fmt::init();
let subscriber = tracing_subscriber::fmt()
// Use a more compact, abbreviated log format
.compact()
// Display source code file paths
.with_file(true)
// Display source code line numbers
.with_line_number(true)
// Display the thread ID an event was recorded on
.with_thread_ids(true)
// Don't display the event's target (module path)
.with_target(false)
// Include per-span timings
.with_span_events(FmtSpan::CLOSE)
// Build the subscriber
.finish();
// Set the subscriber as the default
tracing::subscriber::set_global_default(subscriber)?;
// Initialize the Tokio Console subscription
//console_subscriber::init();
// Get the database connection pool
let pool = pgdb::get_connection_pool(5).await;

View File

@ -0,0 +1,199 @@
#![allow(dead_code)]
use influxdb2::{Client, models::Query};
use influxdb2_structmap::FromMap;
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details, OrganizationDetails};
use anyhow::{Result, Error};
use tracing::instrument;
use super::queries::time_period::InfluxTimePeriod;
#[derive(Debug)]
pub struct InfluxQueryBuilder {
imports: Vec<String>,
fields: Vec<String>,
period: InfluxTimePeriod,
measurement: Option<String>,
group_by: Vec<String>,
aggregate_window: bool,
yield_as: Option<String>,
host_id: Option<String>,
filters: Vec<String>,
sample_after_org: bool,
fill_empty: bool,
}
impl InfluxQueryBuilder {
pub fn new(period: InfluxTimePeriod) -> Self {
Self {
fields: Vec::new(),
imports: Vec::new(),
group_by: Vec::new(),
period,
measurement: None,
aggregate_window: true,
yield_as: Some("last".to_string()),
host_id: None,
filters: Vec::new(),
sample_after_org: false,
fill_empty: false,
}
}
pub fn with_measurement<S: ToString>(mut self, measurement: S) -> Self {
self.measurement = Some(measurement.to_string());
self
}
pub fn with_import<S: ToString>(mut self, import: S) -> Self {
self.imports.push(import.to_string());
self
}
pub fn with_field<S: ToString>(mut self, field: S) -> Self {
self.fields.push(field.to_string());
self
}
pub fn with_fields<S: ToString>(mut self, fields: &[S]) -> Self {
for field in fields.iter() {
self.fields.push(field.to_string());
}
self
}
pub fn with_group<S: ToString>(mut self, group: S) -> Self {
self.group_by.push(group.to_string());
self
}
pub fn with_groups<S: ToString>(mut self, group: &[S]) -> Self {
for group in group.iter() {
self.group_by.push(group.to_string());
}
self
}
pub fn with_host_id<S: ToString>(mut self, host_id: S) -> Self {
self.host_id = Some(host_id.to_string());
self
}
pub fn sample_no_window(mut self) -> Self {
self.aggregate_window = false;
self
}
pub fn with_filter<S: ToString>(mut self, filter: S) -> Self {
self.filters.push(filter.to_string());
self
}
pub fn sample_after_org(mut self) -> Self {
self.sample_after_org = true;
self
}
pub fn fill_empty(mut self) -> Self {
self.fill_empty = true;
self
}
fn build_query(&self, org: &OrganizationDetails) -> String {
let mut lines = Vec::<String>::with_capacity(10);
// Add any import stanzas
self.imports.iter().for_each(|i| lines.push(format!("import \"{i}\"")));
// Add the bucket
lines.push(format!("from(bucket: \"{}\")", org.influx_bucket));
// Add a range limit
lines.push(format!("|> {}", self.period.range()));
// Add the measurement filter
if let Some(measurement) = &self.measurement {
lines.push(format!("|> filter(fn: (r) => r[\"_measurement\"] == \"{}\")", measurement));
}
// Add fields filters
if !self.fields.is_empty() {
let mut fields = String::new();
for field in self.fields.iter() {
if !fields.is_empty() {
fields.push_str(" or ");
}
fields.push_str(&format!("r[\"_field\"] == \"{}\"", field));
}
lines.push(format!("|> filter(fn: (r) => {})", fields));
}
// Filter by organization id
lines.push(format!("|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")", org.key));
if self.sample_after_org {
lines.push(format!("|> {}", self.period.sample()));
}
// Filter by host_id
if let Some(host_id) = &self.host_id {
lines.push(format!("|> filter(fn: (r) => r[\"host_id\"] == \"{}\")", host_id));
}
// Add any other filters
for filter in self.filters.iter() {
lines.push(format!("|> filter(fn: (r) => {})", filter));
}
// Group by
if !self.group_by.is_empty() {
let mut group_by = String::new();
for group in self.group_by.iter() {
if !group_by.is_empty() {
group_by.push_str(", ");
}
group_by.push_str(&format!("\"{}\"", group));
}
lines.push(format!("|> group(columns: [{}])", group_by));
}
// Aggregate Window
if self.aggregate_window {
if self.fill_empty {
lines.push(format!("|> {}", self.period.aggregate_window_empty()));
} else {
lines.push(format!("|> {}", self.period.aggregate_window()));
}
} else {
lines.push(format!("|> {}", self.period.sample()));
}
// Yield as
if let Some(yield_as) = &self.yield_as {
lines.push(format!("|> yield(name: \"{}\")", yield_as));
}
// Combine
lines.join("\n")
}
#[instrument(skip(self, cnn, key))]
pub async fn execute<T>(&self, cnn: &Pool<Postgres>, key: &str) -> Result<Vec<T>>
where T: FromMap + std::fmt::Debug
{
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let query_string = self.build_query(&org);
tracing::info!("{query_string}");
let query = Query::new(query_string);
let rows = client.query::<T>(Some(query)).await;
if let Ok(rows) = rows {
Ok(rows)
} else {
tracing::error!("InfluxDb query error: {rows:?}");
Err(Error::msg("Influx query error"))
}
} else {
Err(Error::msg("Organization not found"))
}
}
}

View File

@ -1,6 +1,7 @@
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use serde::Serialize;
use tracing::instrument;
use wasm_pipe_types::WasmResponse;
use super::send_response;
@ -13,6 +14,7 @@ pub struct LoginResult {
pub license_key: String,
}
#[instrument(skip(license, username, password, socket, cnn))]
pub async fn on_login(license: &str, username: &str, password: &str, socket: &mut WebSocket, cnn: Pool<Postgres>) -> Option<LoginResult> {
let login = pgdb::try_login(cnn, license, username, password).await;
if let Ok(login) = login {
@ -35,6 +37,7 @@ pub async fn on_login(license: &str, username: &str, password: &str, socket: &mu
None
}
#[instrument(skip(token_id, socket, cnn))]
pub async fn on_token_auth(token_id: &str, socket: &mut WebSocket, cnn: Pool<Postgres>) -> Option<LoginResult> {
let login = pgdb::token_to_credentials(cnn, token_id).await;
if let Ok(login) = login {

View File

@ -8,6 +8,7 @@ use crate::web::wss::{
omnisearch, root_heat_map, send_circuit_info, send_packets_for_all_nodes,
send_packets_for_node, send_perf_for_node, send_rtt_for_all_nodes,
send_rtt_for_all_nodes_circuit, send_rtt_for_all_nodes_site, send_rtt_for_node,
send_rtt_histogram_for_all_nodes,
send_site_info, send_site_parents, send_site_stack_map, send_throughput_for_all_nodes,
send_throughput_for_all_nodes_by_circuit, send_throughput_for_all_nodes_by_site,
send_throughput_for_node, site_heat_map,
@ -24,10 +25,12 @@ use axum::{
response::IntoResponse,
};
use pgdb::sqlx::{Pool, Postgres};
use tracing::instrument;
use wasm_pipe_types::{WasmRequest, WasmResponse};
mod login;
mod nodes;
mod queries;
mod influx_query_builder;
pub async fn ws_handler(
ws: WebSocketUpgrade,
@ -168,6 +171,15 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
)
.await;
}
(WasmRequest::RttHistogram { period }, Some(credentials)) => {
let _ = send_rtt_histogram_for_all_nodes(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::RttChartSite { period, site_id }, Some(credentials)) => {
let _ = send_rtt_for_all_nodes_site(
&cnn,
@ -310,6 +322,7 @@ fn serialize_response(response: WasmResponse) -> Vec<u8> {
miniz_oxide::deflate::compress_to_vec(&cbor, 8)
}
#[instrument(skip(socket, response))]
pub async fn send_response(socket: &mut WebSocket, response: WasmResponse) {
let serialized = serialize_response(response);
socket.send(Message::Binary(serialized)).await.unwrap();

View File

@ -1,5 +1,6 @@
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use tracing::instrument;
use wasm_pipe_types::Node;
use crate::web::wss::send_response;
@ -12,6 +13,7 @@ fn convert(ns: pgdb::NodeStatus) -> Node {
}
}
#[instrument(skip(cnn, socket, key))]
pub async fn node_status(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str) {
tracing::info!("Fetching node status, {key}");
let nodes = pgdb::node_status(cnn, key).await;

View File

@ -2,6 +2,7 @@
//! then be used by the web server to respond to requests.
mod circuit_info;
pub mod ext_device;
mod node_perf;
mod packet_counts;
mod rtt;
@ -11,18 +12,19 @@ mod site_info;
mod site_parents;
pub mod site_tree;
mod throughput;
pub mod ext_device;
pub mod time_period;
pub use circuit_info::send_circuit_info;
pub use node_perf::send_perf_for_node;
pub use packet_counts::{send_packets_for_all_nodes, send_packets_for_node};
pub use rtt::{send_rtt_for_all_nodes, send_rtt_for_all_nodes_site, send_rtt_for_node, send_rtt_for_all_nodes_circuit};
pub use rtt::{
send_rtt_for_all_nodes, send_rtt_for_all_nodes_circuit, send_rtt_for_all_nodes_site,
send_rtt_for_node, send_rtt_histogram_for_all_nodes,
};
pub use search::omnisearch;
pub use site_heat_map::{root_heat_map, site_heat_map};
pub use site_info::send_site_info;
pub use site_parents::{send_site_parents, send_circuit_parents, send_root_parents};
pub use site_parents::{send_circuit_parents, send_root_parents, send_site_parents};
pub use throughput::{
send_throughput_for_all_nodes, send_throughput_for_all_nodes_by_circuit,
send_site_stack_map, send_throughput_for_all_nodes, send_throughput_for_all_nodes_by_circuit,
send_throughput_for_all_nodes_by_site, send_throughput_for_node,
send_site_stack_map,
};

View File

@ -2,24 +2,78 @@
mod packet_row;
use self::packet_row::PacketRow;
use super::time_period::InfluxTimePeriod;
use crate::web::wss::send_response;
use crate::web::wss::{influx_query_builder::InfluxQueryBuilder, send_response};
use axum::extract::ws::WebSocket;
use futures::future::join_all;
use influxdb2::{models::Query, Client};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use pgdb::sqlx::{Pool, Postgres};
use tracing::instrument;
use wasm_pipe_types::{PacketHost, Packets};
fn add_by_direction(direction: &str, down: &mut Vec<Packets>, up: &mut Vec<Packets>, row: &PacketRow) {
match direction {
"down" => {
down.push(Packets {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
"up" => {
up.push(Packets {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
_ => {}
}
}
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_packets_for_all_nodes(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
let nodes = get_packets_for_all_nodes(cnn, key, period).await?;
let node_status = pgdb::node_status(cnn, key).await?;
let mut nodes = Vec::<PacketHost>::new();
InfluxQueryBuilder::new(period.clone())
.with_measurement("packets")
.with_fields(&["min", "max", "avg"])
.with_groups(&["host_id", "min", "max", "avg", "direction", "_field"])
.execute::<PacketRow>(cnn, key)
.await?
.into_iter()
.for_each(|row| {
if let Some(node) = nodes.iter_mut().find(|n| n.node_id == row.host_id) {
add_by_direction(&row.direction, &mut node.down, &mut node.up, &row);
} else {
let mut down = Vec::new();
let mut up = Vec::new();
add_by_direction(&row.direction, &mut down, &mut up, &row);
let node_name = if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) {
node.node_name.clone()
} else {
row.host_id.clone()
};
nodes.push(PacketHost {
node_id: row.host_id,
node_name,
down,
up,
});
}
});
send_response(socket, wasm_pipe_types::WasmResponse::PacketChart { nodes }).await;
Ok(())
}
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_packets_for_node(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
@ -39,31 +93,6 @@ pub async fn send_packets_for_node(
Ok(())
}
/// Requests packet-per-second data for all shaper nodes for a given organization
///
/// # Arguments
/// * `cnn` - A connection pool to the database
/// * `key` - The organization's license key
pub async fn get_packets_for_all_nodes(
cnn: &Pool<Postgres>,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<Vec<PacketHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();
for node in node_status {
futures.push(get_packets_for_node(
cnn,
key,
node.node_id.to_string(),
node.node_name.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<PacketHost>> = join_all(futures).await.into_iter().collect();
all_nodes
}
/// Requests packet-per-second data for a single shaper node.
///
/// # Arguments
@ -78,31 +107,19 @@ pub async fn get_packets_for_node(
node_name: String,
period: InfluxTimePeriod,
) -> anyhow::Result<PacketHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let rows = InfluxQueryBuilder::new(period.clone())
.with_measurement("packets")
.with_host_id(&node_id)
.with_field("min")
.with_field("max")
.with_field("avg")
.execute::<PacketRow>(cnn, key)
.await;
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"packets\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket,
period.range(),
org.key,
node_id,
period.aggregate_window()
);
let query = Query::new(qs);
let rows = client.query::<PacketRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB (packets by node): {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
Err(anyhow::Error::msg("Unable to query influx"))
}
Ok(rows) => {
// Parse and send the data
@ -131,14 +148,12 @@ pub async fn get_packets_for_node(
});
}
return Ok(PacketHost {
Ok(PacketHost {
node_id,
node_name,
down,
up,
});
})
}
}
}
Err(anyhow::Error::msg("Unable to query influx"))
}

View File

@ -1,30 +1,18 @@
mod per_node;
pub use per_node::*;
use axum::extract::ws::WebSocket;
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tracing::instrument;
use wasm_pipe_types::{RttHost, Rtt};
use crate::web::wss::{queries::rtt::rtt_row::RttCircuitRow, send_response};
use self::rtt_row::{RttRow, RttSiteRow};
use super::time_period::InfluxTimePeriod;
mod rtt_row;
pub async fn send_rtt_for_all_nodes(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes(cnn, key, period).await?;
let mut histogram = vec![0; 20];
for node in nodes.iter() {
for rtt in node.rtt.iter() {
let bucket = usize::min(19, (rtt.value / 10.0) as usize);
histogram[bucket] += 1;
}
}
let nodes = vec![RttHost { node_id: "".to_string(), node_name: "".to_string(), rtt: rtt_bucket_merge(&nodes) }];
send_response(socket, wasm_pipe_types::WasmResponse::RttChart { nodes, histogram }).await;
Ok(())
}
#[instrument(skip(cnn, socket, key, site_id, period))]
pub async fn send_rtt_for_all_nodes_site(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes_site(cnn, key, &site_id, period).await?;
@ -40,6 +28,7 @@ pub async fn send_rtt_for_all_nodes_site(cnn: &Pool<Postgres>, socket: &mut WebS
Ok(())
}
#[instrument(skip(cnn, socket, key, site_id, period))]
pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes_circuit(cnn, key, &site_id, period).await?;
@ -59,50 +48,18 @@ pub async fn send_rtt_for_node(cnn: &Pool<Postgres>, socket: &mut WebSocket, key
let node = get_rtt_for_node(cnn, key, node_id, node_name, period).await?;
let nodes = vec![node];
let mut histogram = vec![0; 20];
/*let mut histogram = vec![0; 20];
for node in nodes.iter() {
for rtt in node.rtt.iter() {
let bucket = usize::min(19, (rtt.value / 200.0) as usize);
histogram[bucket] += 1;
}
}
}*/
send_response(socket, wasm_pipe_types::WasmResponse::RttChart { nodes, histogram }).await;
send_response(socket, wasm_pipe_types::WasmResponse::RttChart { nodes }).await;
Ok(())
}
fn rtt_bucket_merge(rtt: &[RttHost]) -> Vec<Rtt> {
let mut entries: Vec<Rtt> = Vec::new();
for entry in rtt.iter() {
for entry in entry.rtt.iter() {
if let Some(e) = entries.iter().position(|d| d.date == entry.date) {
entries[e].l = f64::min(entries[e].l, entry.l);
entries[e].u = f64::max(entries[e].u, entry.u);
} else {
entries.push(entry.clone());
}
}
}
entries
}
pub async fn get_rtt_for_all_nodes(cnn: &Pool<Postgres>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<RttHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();
for node in node_status {
futures.push(get_rtt_for_node(
cnn,
key,
node.node_id.to_string(),
node.node_name.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<RttHost>> = join_all(futures).await
.into_iter().collect();
all_nodes
}
pub async fn get_rtt_for_all_nodes_site(cnn: &Pool<Postgres>, key: &str, site_id: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<RttHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();

View File

@ -0,0 +1,93 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, send_response, influx_query_builder::InfluxQueryBuilder};
use axum::extract::ws::WebSocket;
use pgdb::{
sqlx::{Pool, Postgres},
NodeStatus
};
use tracing::instrument;
use wasm_pipe_types::{Rtt, RttHost};
use super::rtt_row::{RttRow, RttHistoRow};
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_rtt_for_all_nodes(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
let rows = InfluxQueryBuilder::new(period.clone())
.with_measurement("rtt")
.with_fields(&["avg", "min", "max"])
.with_groups(&["host_id", "_field"])
.execute::<RttRow>(cnn, key)
.await?;
let node_status = pgdb::node_status(cnn, key).await?;
let nodes = rtt_rows_to_result(rows, node_status);
send_response(socket, wasm_pipe_types::WasmResponse::RttChart { nodes }).await;
Ok(())
}
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_rtt_histogram_for_all_nodes(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
let rows = InfluxQueryBuilder::new(period.clone())
.with_measurement("rtt")
.with_field("avg")
.sample_no_window()
.execute::<RttHistoRow>(cnn, key)
.await?;
let mut histo = vec![0u32; 20];
rows.iter().for_each(|row| {
let rtt = f64::min(row.avg, 200.);
let bucket = usize::min((rtt / 10.0) as usize, 19);
histo[bucket] += 1;
});
send_response(socket, wasm_pipe_types::WasmResponse::RttHistogram { histogram: histo }).await;
Ok(())
}
fn rtt_rows_to_result(rows: Vec<RttRow>, node_status: Vec<NodeStatus>) -> Vec<RttHost> {
let mut result = Vec::<RttHost>::new();
for row in rows.into_iter() {
if let Some(host) = result.iter_mut().find(|h| h.node_id == row.host_id) {
// We found one - add to it
host.rtt.push(Rtt {
value: f64::min(200.0, row.avg),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: f64::min(200.0, row.min),
u: f64::min(200.0, row.max) - f64::min(200.0, row.min),
});
} else {
let rtt = vec![Rtt {
value: f64::min(200.0, row.avg),
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: f64::min(200.0, row.min),
u: f64::min(200.0, row.max) - f64::min(200.0, row.min),
}];
let node_name = node_status
.iter()
.filter(|n| n.node_id == row.host_id)
.map(|n| n.node_name.clone())
.next()
.unwrap_or("".to_string());
let new_host = RttHost {
node_id: row.host_id,
node_name,
rtt,
};
result.push(new_host);
}
}
result
}

View File

@ -22,6 +22,11 @@ impl Default for RttRow {
}
}
#[derive(Debug, FromDataPoint, Default)]
pub struct RttHistoRow {
pub avg: f64,
}
#[derive(Debug, FromDataPoint)]
pub struct RttSiteRow {
pub host_id: String,

View File

@ -1,4 +1,5 @@
use super::time_period::InfluxTimePeriod;
use crate::web::wss::influx_query_builder::InfluxQueryBuilder;
use crate::web::wss::send_response;
use axum::extract::ws::WebSocket;
use chrono::{DateTime, FixedOffset, Utc};
@ -8,16 +9,50 @@ use pgdb::organization_cache::get_org_details;
use pgdb::sqlx::{query, Pool, Postgres, Row};
use pgdb::OrganizationDetails;
use serde::Serialize;
use tracing::instrument;
use std::collections::HashMap;
use wasm_pipe_types::WasmResponse;
use itertools::Itertools;
#[instrument(skip(cnn,socket,key,period))]
pub async fn root_heat_map(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
if let Some(org) = get_org_details(cnn, key).await {
let rows: Vec<HeatRow> = InfluxQueryBuilder::new(period.clone())
.with_import("strings")
.with_measurement("tree")
.with_fields(&["rtt_avg"])
.sample_after_org()
.with_filter("exists(r[\"node_parents\"])")
.with_filter("strings.hasSuffix(suffix: \"S0S\" + r[\"node_index\"] + \"S\", v: r[\"node_parents\"])")
.with_filter("r[\"_value\"] > 0.0")
.with_groups(&["_field", "node_name"])
.execute(cnn, key)
.await?;
let mut headings = rows.iter().map(|r| r.time).collect::<Vec<_>>();
headings.sort();
let headings: Vec<DateTime<FixedOffset>> = headings.iter().dedup().cloned().collect();
//println!("{headings:#?}");
let defaults = headings.iter().map(|h| (*h, 0.0)).collect::<Vec<_>>();
let mut sorter: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>> = HashMap::new();
for row in rows.into_iter() {
let entry = sorter.entry(row.node_name).or_insert(defaults.clone());
if let Some(idx) = headings.iter().position(|h| h == &row.time) {
entry[idx] = (row.time, row.rtt_avg);
}
}
//println!("{:?}", sorter);
send_response(socket, WasmResponse::RootHeat { data: sorter }).await;
/*if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
@ -46,6 +81,7 @@ pub async fn root_heat_map(
|> filter(fn: (r) => r[\"_measurement\"] == \"tree\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"_field\"] == \"rtt_avg\")
|> filter(fn: (r) => r[\"_value\"] > 0.0)
|> {}
|> {}
|> yield(name: \"last\")",
@ -55,7 +91,7 @@ pub async fn root_heat_map(
host_filter,
period.aggregate_window()
);
//println!("{qs}");
println!("{qs}");
let query = Query::new(qs);
let rows = client.query::<HeatRow>(Some(query)).await;
@ -76,7 +112,7 @@ pub async fn root_heat_map(
send_response(socket, WasmResponse::RootHeat { data: sorter }).await;
}
}
}
}*/
Ok(())
}

View File

@ -4,19 +4,70 @@ use axum::extract::ws::WebSocket;
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tracing::instrument;
use wasm_pipe_types::{ThroughputHost, Throughput};
use crate::web::wss::send_response;
use crate::web::wss::{send_response, influx_query_builder::InfluxQueryBuilder};
use self::throughput_row::{ThroughputRow, ThroughputRowBySite, ThroughputRowByCircuit};
use super::time_period::InfluxTimePeriod;
mod throughput_row;
pub use site_stack::send_site_stack_map;
fn add_by_direction(direction: &str, down: &mut Vec<Throughput>, up: &mut Vec<Throughput>, row: &ThroughputRow) {
match direction {
"down" => {
down.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
"up" => {
up.push(Throughput {
value: row.avg,
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
l: row.min,
u: row.max - row.min,
});
}
_ => {}
}
}
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_throughput_for_all_nodes(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes(cnn, key, period).await?;
let node_status = pgdb::node_status(cnn, key).await?;
let mut nodes = Vec::<ThroughputHost>::new();
InfluxQueryBuilder::new(period.clone())
.with_measurement("bits")
.with_fields(&["min", "max", "avg"])
.with_groups(&["host_id", "direction", "_field"])
.execute::<ThroughputRow>(cnn, key)
.await?
.into_iter()
.for_each(|row| {
if let Some(node) = nodes.iter_mut().find(|n| n.node_id == row.host_id) {
add_by_direction(&row.direction, &mut node.down, &mut node.up, &row);
} else {
let mut down = Vec::new();
let mut up = Vec::new();
add_by_direction(&row.direction, &mut down, &mut up, &row);
let node_name = if let Some(node) = node_status.iter().find(|n| n.node_id == row.host_id) {
node.node_name.clone()
} else {
row.host_id.clone()
};
nodes.push(ThroughputHost { node_id: row.host_id, node_name, down, up });
}
});
send_response(socket, wasm_pipe_types::WasmResponse::BitsChart { nodes }).await;
Ok(())
}
#[instrument(skip(cnn, socket, key, period, site_name))]
pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_name: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes_by_site(cnn, key, period, &site_name).await?;
@ -36,23 +87,6 @@ pub async fn send_throughput_for_node(cnn: &Pool<Postgres>, socket: &mut WebSock
Ok(())
}
pub async fn get_throughput_for_all_nodes(cnn: &Pool<Postgres>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<Vec<ThroughputHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();
for node in node_status {
futures.push(get_throughput_for_node(
cnn,
key,
node.node_id.to_string(),
node.node_name.to_string(),
period.clone(),
));
}
let all_nodes: anyhow::Result<Vec<ThroughputHost>> = join_all(futures).await
.into_iter().collect();
all_nodes
}
pub async fn get_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, key: &str, period: InfluxTimePeriod, site_name: &str) -> anyhow::Result<Vec<ThroughputHost>> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut futures = Vec::new();

View File

@ -1,10 +1,35 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, send_response};
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres, Row};
use wasm_pipe_types::Throughput;
use pgdb::{
organization_cache::get_org_details,
sqlx::{Pool, Postgres},
OrganizationDetails,
};
use tracing::{error, instrument};
use wasm_pipe_types::SiteStackHost;
use super::{get_throughput_for_all_nodes_by_circuit, get_throughput_for_all_nodes_by_site};
#[derive(Debug, influxdb2::FromDataPoint)]
pub struct SiteStackRow {
pub node_name: String,
pub node_parents: String,
pub bits_max: i64,
pub time: chrono::DateTime<chrono::FixedOffset>,
pub direction: String,
}
impl Default for SiteStackRow {
fn default() -> Self {
Self {
node_name: "".to_string(),
node_parents: "".to_string(),
bits_max: 0,
time: chrono::DateTime::<chrono::Utc>::MIN_UTC.into(),
direction: "".to_string(),
}
}
}
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_site_stack_map(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
@ -13,99 +38,125 @@ pub async fn send_site_stack_map(
site_id: String,
) -> anyhow::Result<()> {
let site_index = pgdb::get_site_id_from_name(cnn, key, &site_id).await?;
//println!("Site index: {site_index}");
let sites: Vec<String> =
pgdb::sqlx::query("SELECT DISTINCT site_name FROM site_tree WHERE key=$1 AND parent=$2")
.bind(key)
.bind(site_index)
.fetch_all(cnn)
.await?
.iter()
.map(|row| row.try_get("site_name").unwrap())
.collect();
//println!("{sites:?}");
if let Some(org) = get_org_details(cnn, key).await {
let rows = query_site_stack_influx(&org, &period, site_index).await;
match rows {
Err(e) => error!("Influxdb tree query error: {e}"),
Ok(rows) => {
let mut result = site_rows_to_hosts(rows);
reduce_to_x_entries(&mut result);
let circuits: Vec<(String, String)> =
pgdb::sqlx::query("SELECT DISTINCT circuit_id, circuit_name FROM shaped_devices WHERE key=$1 AND parent_node=$2")
.bind(key)
.bind(site_id)
.fetch_all(cnn)
.await?
.iter()
.map(|row| (row.try_get("circuit_id").unwrap(), row.try_get("circuit_name").unwrap()))
.collect();
//println!("{circuits:?}");
let mut result = Vec::new();
for site in sites.into_iter() {
let mut throughput =
get_throughput_for_all_nodes_by_site(cnn, key, period.clone(), &site).await?;
throughput
.iter_mut()
.for_each(|row| row.node_name = site.clone());
result.extend(throughput);
// Send the reply
send_response(
socket,
wasm_pipe_types::WasmResponse::SiteStack { nodes: result },
)
.await;
}
}
for circuit in circuits.into_iter() {
let mut throughput =
get_throughput_for_all_nodes_by_circuit(cnn, key, period.clone(), &circuit.0).await?;
throughput
.iter_mut()
.for_each(|row| row.node_name = circuit.1.clone());
result.extend(throughput);
}
//println!("{result:?}");
// Sort by total
Ok(())
}
#[instrument(skip(org, period, site_index))]
async fn query_site_stack_influx(
org: &OrganizationDetails,
period: &InfluxTimePeriod,
site_index: i32,
) -> anyhow::Result<Vec<SiteStackRow>> {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = influxdb2::Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!("import \"strings\"
from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_field\"] == \"bits_max\" and r[\"_measurement\"] == \"tree\" and r[\"organization_id\"] == \"{}\")
|> {}
|> filter(fn: (r) => exists r[\"node_parents\"] and exists r[\"node_index\"])
|> filter(fn: (r) => strings.hasSuffix(v: r[\"node_parents\"], suffix: \"S{}S\" + r[\"node_index\"] + \"S\" ))
|> group(columns: [\"node_name\", \"node_parents\", \"_field\", \"node_index\", \"direction\"])
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, period.sample(), site_index);
println!("{qs}");
let query = influxdb2::models::Query::new(qs);
//let rows = client.query_raw(Some(query)).await;
Ok(client.query::<SiteStackRow>(Some(query)).await?)
}
fn site_rows_to_hosts(rows: Vec<SiteStackRow>) -> Vec<SiteStackHost> {
let mut result: Vec<SiteStackHost> = Vec::new();
for row in rows.iter() {
if let Some(r) = result.iter_mut().find(|r| r.node_name == row.node_name) {
if row.direction == "down" {
r.download.push((
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
));
} else {
r.upload.push((
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
));
}
} else if row.direction == "down" {
result.push(SiteStackHost {
node_name: row.node_name.clone(),
download: vec![(
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
)],
upload: vec![],
});
} else {
result.push(SiteStackHost {
node_name: row.node_name.clone(),
upload: vec![(
row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
row.bits_max,
)],
download: vec![],
});
}
}
result
}
fn reduce_to_x_entries(result: &mut Vec<SiteStackHost>) {
// Sort descending by total
result.sort_by(|a, b| {
b.total()
.partial_cmp(&a.total())
.unwrap_or(std::cmp::Ordering::Equal)
});
// If there are more than 9 entries, create an "others" to handle the remainder
if result.len() > 9 {
let mut others = wasm_pipe_types::ThroughputHost {
node_id: "others".to_string(),
const MAX_HOSTS: usize = 8;
if result.len() > MAX_HOSTS {
let mut others = SiteStackHost {
node_name: "others".to_string(),
down: Vec::new(),
up: Vec::new(),
download: Vec::new(),
upload: Vec::new(),
};
result[0].down.iter().for_each(|x| {
others.down.push(Throughput {
value: 0.0,
date: x.date.clone(),
l: 0.0,
u: 0.0,
result[0].download.iter().for_each(|x| {
others.download.push((x.0.clone(), 0));
others.upload.push((x.0.clone(), 0));
});
result.iter().skip(MAX_HOSTS).for_each(|row| {
row.download.iter().enumerate().for_each(|(i, x)| {
if i < others.download.len() {
others.download[i].1 += x.1;
}
});
row.upload.iter().enumerate().for_each(|(i, x)| {
if i < others.upload.len() {
others.upload[i].1 += x.1;
}
});
});
result[0].up.iter().for_each(|x| {
others.up.push(Throughput {
value: 0.0,
date: x.date.clone(),
l: 0.0,
u: 0.0,
});
});
result.iter().skip(9).for_each(|row| {
row.down.iter().enumerate().for_each(|(i, x)| {
others.down[i].value += x.value;
});
row.up.iter().enumerate().for_each(|(i, x)| {
others.up[i].value += x.value;
});
});
result.truncate(9);
result.truncate(MAX_HOSTS-1);
result.push(others);
}
send_response(
socket,
wasm_pipe_types::WasmResponse::SiteStack { nodes: result },
)
.await;
Ok(())
}

View File

@ -1,7 +1,9 @@
#[derive(Clone)]
#![allow(dead_code)]
#[derive(Clone, Debug)]
pub struct InfluxTimePeriod {
start: String,
aggregate: String,
sample: i32,
}
impl InfluxTimePeriod {
@ -30,9 +32,23 @@ impl InfluxTimePeriod {
_ => "10s",
};
let sample = match period {
"5m" => 3,
"15m" => 10,
"1h" => 40,
"6h" => 100,
"12h" => 200,
"24h" => 400,
"7d" => 2100,
"28d" => 4400,
_ => 1
};
Self {
start: start.to_string(),
aggregate: aggregate.to_string(),
sample
}
}
@ -46,6 +62,24 @@ impl InfluxTimePeriod {
self.aggregate
)
}
pub fn aggregate_window_empty(&self) -> String {
format!(
"aggregateWindow(every: {}, fn: mean, createEmpty: true)",
self.aggregate
)
}
pub fn aggregate_window_fn(&self, mode: &str) -> String {
format!(
"aggregateWindow(every: {}, fn: {mode}, createEmpty: false)",
self.aggregate
)
}
pub fn sample(&self) -> String {
format!("sample(n: {}, pos: 1)", self.sample)
}
}
impl From<&String> for InfluxTimePeriod {

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -25,7 +25,7 @@ declare global {
window.auth = new Auth;
window.bus = new Bus();
window.router = new SiteRouter();
window.bus.connect();
//window.bus.connect();
window.router.initialRoute();
let graphPeriod = localStorage.getItem('graphPeriod');
if (!graphPeriod) {
@ -39,19 +39,25 @@ window.changeGraphPeriod = (period: string) => changeGraphPeriod(period);
window.setInterval(() => {
window.bus.updateConnected();
window.router.ontick();
let btn = document.getElementById("graphPeriodBtn") as HTMLButtonElement;
btn.innerText = window.graphPeriod;
}, 10000);
// Faster interval for tracking the WSS connection
window.setInterval(() => {
updateDisplayedInterval();
window.bus.updateConnected();
window.bus.sendQueue();
}, 500);
function updateDisplayedInterval() {
let btn = document.getElementById("graphPeriodBtn") as HTMLButtonElement | null;
if (!btn) {
return;
}
btn.innerText = window.graphPeriod;
}
function changeGraphPeriod(period: string) {
window.graphPeriod = period;
localStorage.setItem('graphPeriod', period);
let btn = document.getElementById("graphPeriodBtn") as HTMLButtonElement;
btn.innerText = period;
updateDisplayedInterval();
}

View File

@ -1,15 +1,14 @@
import { connect_wasm_pipe, is_wasm_connected, send_wss_queue } from "../wasm/wasm_pipe";
import { is_wasm_connected, send_wss_queue, initialize_wss } from "../wasm/wasm_pipe";
import { Auth } from "./auth";
import { SiteRouter } from "./router";
export class Bus {
ws: WebSocket;
connected: boolean;
constructor() {
const currentUrlWithoutAnchors = window.location.href.split('#')[0].replace("https://", "").replace("http://", "");
const url = "ws://" + currentUrlWithoutAnchors + "ws";
this.connected = false;
initialize_wss(url);
}
updateConnected() {
@ -19,7 +18,6 @@ export class Bus {
indicator.style.color = "green";
} else if (indicator) {
indicator.style.color = "red";
retryConnect();
}
}
@ -27,12 +25,6 @@ export class Bus {
send_wss_queue();
}
connect() {
const currentUrlWithoutAnchors = window.location.href.split('#')[0].replace("https://", "").replace("http://", "");
const url = "ws://" + currentUrlWithoutAnchors + "ws";
connect_wasm_pipe(url);
}
getToken(): string {
if (window.auth.hasCredentials && window.auth.token) {
return window.auth.token;
@ -128,12 +120,6 @@ export class Bus {
}
}
function retryConnect() {
if (!window.bus.connected) {
//window.bus.connect();
}
}
// WASM callback
export function onAuthFail() {
window.auth.hasCredentials = false;

View File

@ -5,7 +5,6 @@ import * as echarts from 'echarts';
export class RootHeat implements Component {
div: HTMLElement;
myChart: echarts.ECharts | null = null;
counter: number = 0;
constructor() {
this.div = document.getElementById("rootHeat") as HTMLElement;
@ -16,8 +15,6 @@ export class RootHeat implements Component {
}
ontick(): void {
this.counter++;
if (this.counter % 10 == 0)
request_root_heat(window.graphPeriod);
}

View File

@ -88,7 +88,18 @@ export class RttChart implements Component {
this.myChart.setOption<echarts.EChartsOption>(
(option = {
title: { text: "TCP Round-Trip Time" },
tooltip: { trigger: "axis" },
tooltip: {
trigger: "axis",
formatter: function (params: any) {
let ret = "";
for (let i=0; i<params.length; i+=3) {
if (params[i+2].value > 0) {
ret += "<strong>" + params[i+2].seriesName + "</strong>: " + params[i+2].value.toFixed(1) + " ms<br/>";
}
}
return ret;
}
},
legend: {
orient: "horizontal",
right: 10,

View File

@ -1,6 +1,7 @@
import { scaleNumber } from "../helpers";
import { Component } from "./component";
import * as echarts from 'echarts';
import { request_rtt_histogram } from "../../wasm/wasm_pipe";
export class RttHisto implements Component {
div: HTMLElement;
@ -16,18 +17,20 @@ export class RttHisto implements Component {
}
wireup(): void {
request_rtt_histogram(window.graphPeriod);
}
ontick(): void {
request_rtt_histogram(window.graphPeriod);
}
onmessage(event: any): void {
if (event.msg == "RttChart") {
if (event.msg == "RttHistogram") {
//console.log(event);
this.download = [];
this.x = [];
for (let i = 0; i < event.RttChart.histogram.length; i++) {
this.download.push(event.RttChart.histogram[i]);
for (let i = 0; i < event.RttHistogram.histogram.length; i++) {
this.download.push(event.RttHistogram.histogram[i]);
this.x.push(i * 10);
}

View File

@ -21,9 +21,10 @@ export class SiteHeat implements Component {
ontick(): void {
console.log("SiteHeat ontick");
this.counter++;
if (this.counter % 10 == 0)
if (this.counter == 0) {
request_site_heat(window.graphPeriod, this.siteId);
}
}
onmessage(event: any): void {
if (event.msg == "SiteHeat") {

View File

@ -37,19 +37,16 @@ export class SiteStackChart implements Component {
let legend: string[] = [];
for (let i = 0; i < event.SiteStack.nodes.length; i++) {
let node = event.SiteStack.nodes[i];
console.log(node);
if (node.node_name != "Root") {
legend.push(node.node_name);
//legend.push(node.node_name + " UL");
//console.log(node);
let d: number[] = [];
let u: number[] = [];
let l: number[] = [];
for (let j = 0; j < node.down.length; j++) {
if (first) x.push(node.down[j].date);
d.push(node.down[j].value * 8.0);
u.push(node.down[j].u * 8.0);
l.push(node.down[j].l * 8.0);
for (let j = 0; j < node.download.length; j++) {
if (first) x.push(node.download[j][0]);
d.push(node.download[j][1] * 8.0);
}
if (first) first = false;
@ -66,13 +63,11 @@ export class SiteStackChart implements Component {
// Do the same for upload
d = [];
u = [];
l = [];
for (let j = 0; j < node.down.length; j++) {
d.push(0.0 - (node.up[j].value * 8.0));
u.push(0.0 - (node.up[j].u * 8.0));
l.push(0.0 - (node.up[j].l * 8.0));
for (let j = 0; j < node.upload.length; j++) {
if (first) x.push(node.upload[j][0]);
d.push(0.0 - (node.upload[j][1] * 8.0));
}
if (first) first = false;
val = {
name: node.node_name,
@ -81,7 +76,6 @@ export class SiteStackChart implements Component {
symbol: 'none',
stack: 'upload',
areaStyle: {},
label: { show: false }
};
series.push(val);
@ -93,7 +87,7 @@ export class SiteStackChart implements Component {
var option: echarts.EChartsOption;
this.myChart.setOption<echarts.EChartsOption>(
(option = {
title: { text: "Child Node Throughput (Bits)" },
title: { text: "Child Node Throughput (Bits - Max)" },
tooltip: {
trigger: "axis",
formatter: function (params: any) {
@ -101,7 +95,7 @@ export class SiteStackChart implements Component {
let result = "";
for (let i = 0; i < params.length; i+=2) {
let siteName = params[i].seriesName;
siteName += " (⬇️" + scaleNumber(params[i].value) + " / ⬆️" + scaleNumber(0.0 - params[i+1].value) + ")";
siteName += " (⬇️" + scaleNumber(params[i].value) + " / ⬆️" + scaleNumber(Math.abs(params[i+1].value)) + ")";
result += `${siteName}<br />`;
}
return result;

View File

@ -23,13 +23,13 @@ export class DashboardPage implements Page {
}
this.components = [
new NodeStatus(),
new RootBreadcrumbs(),
new PacketsChart(),
new ThroughputChart(),
new RttChart(),
new RttHisto(),
new RootHeat(),
new SiteStackChart("root"),
new RootBreadcrumbs(),
];
}

View File

@ -3,7 +3,7 @@
/**
* @param {string} url
*/
export function connect_wasm_pipe(url: string): void;
export function initialize_wss(url: string): void;
/**
* @returns {boolean}
*/
@ -65,6 +65,10 @@ export function request_site_stack(period: string, site_id: string): void;
export function request_rtt_chart(period: string): void;
/**
* @param {string} period
*/
export function request_rtt_histogram(period: string): void;
/**
* @param {string} period
* @param {string} site_id
*/
export function request_rtt_chart_for_site(period: string, site_id: string): void;
@ -140,7 +144,7 @@ export type InitInput = RequestInfo | URL | Response | BufferSource | WebAssembl
export interface InitOutput {
readonly memory: WebAssembly.Memory;
readonly connect_wasm_pipe: (a: number, b: number) => void;
readonly initialize_wss: (a: number, b: number) => void;
readonly is_wasm_connected: () => number;
readonly send_wss_queue: () => void;
readonly send_token: (a: number, b: number) => void;
@ -154,6 +158,7 @@ export interface InitOutput {
readonly request_throughput_chart_for_circuit: (a: number, b: number, c: number, d: number) => void;
readonly request_site_stack: (a: number, b: number, c: number, d: number) => void;
readonly request_rtt_chart: (a: number, b: number) => void;
readonly request_rtt_histogram: (a: number, b: number) => void;
readonly request_rtt_chart_for_site: (a: number, b: number, c: number, d: number) => void;
readonly request_rtt_chart_for_node: (a: number, b: number, c: number, d: number, e: number, f: number) => void;
readonly request_rtt_chart_for_circuit: (a: number, b: number, c: number, d: number) => void;
@ -170,13 +175,12 @@ export interface InitOutput {
readonly request_ext_device_info: (a: number, b: number) => void;
readonly request_ext_snr_graph: (a: number, b: number, c: number, d: number) => void;
readonly request_ext_capacity_graph: (a: number, b: number, c: number, d: number) => void;
readonly __wbindgen_malloc: (a: number) => number;
readonly __wbindgen_realloc: (a: number, b: number, c: number) => number;
readonly __wbindgen_export_0: (a: number) => number;
readonly __wbindgen_export_1: (a: number, b: number, c: number) => number;
readonly __wbindgen_export_2: WebAssembly.Table;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__hb4b341652e081e3f: (a: number, b: number, c: number) => void;
readonly _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__ha318d2d73313995c: (a: number, b: number, c: number) => void;
readonly __wbindgen_free: (a: number, b: number) => void;
readonly __wbindgen_exn_store: (a: number) => void;
readonly __wbindgen_export_3: (a: number, b: number, c: number) => void;
readonly __wbindgen_export_4: (a: number, b: number) => void;
readonly __wbindgen_export_5: (a: number) => void;
}
export type SyncInitInput = BufferSource | WebAssembly.Module;

View File

@ -201,21 +201,16 @@ function makeMutClosure(arg0, arg1, dtor, f) {
return real;
}
function __wbg_adapter_12(arg0, arg1, arg2) {
wasm._dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__hb4b341652e081e3f(arg0, arg1, addHeapObject(arg2));
wasm.__wbindgen_export_3(arg0, arg1, addHeapObject(arg2));
}
function __wbg_adapter_15(arg0, arg1, arg2) {
wasm._dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__ha318d2d73313995c(arg0, arg1, addHeapObject(arg2));
}
function notDefined(what) { return () => { throw new Error(`${what} is not defined`); }; }
/**
* @param {string} url
*/
export function connect_wasm_pipe(url) {
const ptr0 = passStringToWasm0(url, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
export function initialize_wss(url) {
const ptr0 = passStringToWasm0(url, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.connect_wasm_pipe(ptr0, len0);
wasm.initialize_wss(ptr0, len0);
}
/**
@ -232,11 +227,12 @@ export function send_wss_queue() {
wasm.send_wss_queue();
}
function notDefined(what) { return () => { throw new Error(`${what} is not defined`); }; }
/**
* @param {string} token
*/
export function send_token(token) {
const ptr0 = passStringToWasm0(token, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(token, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.send_token(ptr0, len0);
}
@ -247,11 +243,11 @@ export function send_token(token) {
* @param {string} password
*/
export function send_login(license, username, password) {
const ptr0 = passStringToWasm0(license, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(license, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(username, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(username, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
const ptr2 = passStringToWasm0(password, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr2 = passStringToWasm0(password, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len2 = WASM_VECTOR_LEN;
wasm.send_login(ptr0, len0, ptr1, len1, ptr2, len2);
}
@ -266,7 +262,7 @@ export function request_node_status() {
* @param {string} period
*/
export function request_packet_chart(period) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_packet_chart(ptr0, len0);
}
@ -277,11 +273,11 @@ export function request_packet_chart(period) {
* @param {string} node_name
*/
export function request_packet_chart_for_node(period, node_id, node_name) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len2 = WASM_VECTOR_LEN;
wasm.request_packet_chart_for_node(ptr0, len0, ptr1, len1, ptr2, len2);
}
@ -290,7 +286,7 @@ export function request_packet_chart_for_node(period, node_id, node_name) {
* @param {string} period
*/
export function request_throughput_chart(period) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_throughput_chart(ptr0, len0);
}
@ -300,9 +296,9 @@ export function request_throughput_chart(period) {
* @param {string} site_id
*/
export function request_throughput_chart_for_site(period, site_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_throughput_chart_for_site(ptr0, len0, ptr1, len1);
}
@ -313,11 +309,11 @@ export function request_throughput_chart_for_site(period, site_id) {
* @param {string} node_name
*/
export function request_throughput_chart_for_node(period, node_id, node_name) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len2 = WASM_VECTOR_LEN;
wasm.request_throughput_chart_for_node(ptr0, len0, ptr1, len1, ptr2, len2);
}
@ -327,9 +323,9 @@ export function request_throughput_chart_for_node(period, node_id, node_name) {
* @param {string} circuit_id
*/
export function request_throughput_chart_for_circuit(period, circuit_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(circuit_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(circuit_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_throughput_chart_for_circuit(ptr0, len0, ptr1, len1);
}
@ -339,9 +335,9 @@ export function request_throughput_chart_for_circuit(period, circuit_id) {
* @param {string} site_id
*/
export function request_site_stack(period, site_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_site_stack(ptr0, len0, ptr1, len1);
}
@ -350,19 +346,28 @@ export function request_site_stack(period, site_id) {
* @param {string} period
*/
export function request_rtt_chart(period) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_rtt_chart(ptr0, len0);
}
/**
* @param {string} period
*/
export function request_rtt_histogram(period) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_rtt_histogram(ptr0, len0);
}
/**
* @param {string} period
* @param {string} site_id
*/
export function request_rtt_chart_for_site(period, site_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_rtt_chart_for_site(ptr0, len0, ptr1, len1);
}
@ -373,11 +378,11 @@ export function request_rtt_chart_for_site(period, site_id) {
* @param {string} node_name
*/
export function request_rtt_chart_for_node(period, node_id, node_name) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len2 = WASM_VECTOR_LEN;
wasm.request_rtt_chart_for_node(ptr0, len0, ptr1, len1, ptr2, len2);
}
@ -387,9 +392,9 @@ export function request_rtt_chart_for_node(period, node_id, node_name) {
* @param {string} circuit_id
*/
export function request_rtt_chart_for_circuit(period, circuit_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(circuit_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(circuit_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_rtt_chart_for_circuit(ptr0, len0, ptr1, len1);
}
@ -400,11 +405,11 @@ export function request_rtt_chart_for_circuit(period, circuit_id) {
* @param {string} node_name
*/
export function request_node_perf_chart(period, node_id, node_name) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(node_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr2 = passStringToWasm0(node_name, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len2 = WASM_VECTOR_LEN;
wasm.request_node_perf_chart(ptr0, len0, ptr1, len1, ptr2, len2);
}
@ -413,7 +418,7 @@ export function request_node_perf_chart(period, node_id, node_name) {
* @param {string} period
*/
export function request_root_heat(period) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_root_heat(ptr0, len0);
}
@ -423,9 +428,9 @@ export function request_root_heat(period) {
* @param {string} site_id
*/
export function request_site_heat(period, site_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(site_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_site_heat(ptr0, len0, ptr1, len1);
}
@ -434,7 +439,7 @@ export function request_site_heat(period, site_id) {
* @param {string} parent
*/
export function request_tree(parent) {
const ptr0 = passStringToWasm0(parent, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(parent, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_tree(ptr0, len0);
}
@ -443,7 +448,7 @@ export function request_tree(parent) {
* @param {string} site_id
*/
export function request_site_info(site_id) {
const ptr0 = passStringToWasm0(site_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(site_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_site_info(ptr0, len0);
}
@ -452,7 +457,7 @@ export function request_site_info(site_id) {
* @param {string} site_id
*/
export function request_site_parents(site_id) {
const ptr0 = passStringToWasm0(site_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(site_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_site_parents(ptr0, len0);
}
@ -461,7 +466,7 @@ export function request_site_parents(site_id) {
* @param {string} circuit_id
*/
export function request_circuit_parents(circuit_id) {
const ptr0 = passStringToWasm0(circuit_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(circuit_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_circuit_parents(ptr0, len0);
}
@ -476,7 +481,7 @@ export function request_root_parents() {
* @param {string} term
*/
export function request_search(term) {
const ptr0 = passStringToWasm0(term, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(term, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_search(ptr0, len0);
}
@ -485,7 +490,7 @@ export function request_search(term) {
* @param {string} circuit_id
*/
export function request_circuit_info(circuit_id) {
const ptr0 = passStringToWasm0(circuit_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(circuit_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_circuit_info(ptr0, len0);
}
@ -494,7 +499,7 @@ export function request_circuit_info(circuit_id) {
* @param {string} circuit_id
*/
export function request_ext_device_info(circuit_id) {
const ptr0 = passStringToWasm0(circuit_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(circuit_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
wasm.request_ext_device_info(ptr0, len0);
}
@ -504,9 +509,9 @@ export function request_ext_device_info(circuit_id) {
* @param {string} device_id
*/
export function request_ext_snr_graph(period, device_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(device_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(device_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_ext_snr_graph(ptr0, len0, ptr1, len1);
}
@ -516,9 +521,9 @@ export function request_ext_snr_graph(period, device_id) {
* @param {string} device_id
*/
export function request_ext_capacity_graph(period, device_id) {
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr0 = passStringToWasm0(period, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len0 = WASM_VECTOR_LEN;
const ptr1 = passStringToWasm0(device_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(device_id, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
wasm.request_ext_capacity_graph(ptr0, len0, ptr1, len1);
}
@ -527,7 +532,7 @@ function handleError(f, args) {
try {
return f.apply(this, args);
} catch (e) {
wasm.__wbindgen_exn_store(addHeapObject(e));
wasm.__wbindgen_export_5(addHeapObject(e));
}
}
@ -570,75 +575,39 @@ async function __wbg_load(module, imports) {
function __wbg_get_imports() {
const imports = {};
imports.wbg = {};
imports.wbg.__wbg_new_39e958ac9d5cae7d = function() { return handleError(function (arg0, arg1) {
const ret = new WebSocket(getStringFromWasm0(arg0, arg1));
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbindgen_string_new = function(arg0, arg1) {
const ret = getStringFromWasm0(arg0, arg1);
return addHeapObject(ret);
};
imports.wbg.__wbg_setbinaryType_2e2320b177c86b17 = function(arg0, arg1) {
getObject(arg0).binaryType = takeObject(arg1);
};
imports.wbg.__wbg_setonclose_6b22bc5d93628786 = function(arg0, arg1) {
getObject(arg0).onclose = getObject(arg1);
};
imports.wbg.__wbg_setonerror_9f7532626d7a9ce2 = function(arg0, arg1) {
getObject(arg0).onerror = getObject(arg1);
};
imports.wbg.__wbg_setonopen_6fd8b28538150568 = function(arg0, arg1) {
getObject(arg0).onopen = getObject(arg1);
};
imports.wbg.__wbg_setonmessage_493b82147081ec7e = function(arg0, arg1) {
getObject(arg0).onmessage = getObject(arg1);
};
imports.wbg.__wbg_log_cd48b3599daf93ee = function(arg0, arg1) {
console.log(getStringFromWasm0(arg0, arg1));
};
imports.wbg.__wbg_windowbusgetToken_eab6ac8f06d69af2 = function(arg0) {
const ret = window.bus.getToken();
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
getInt32Memory0()[arg0 / 4 + 1] = len1;
getInt32Memory0()[arg0 / 4 + 0] = ptr1;
};
imports.wbg.__wbg_windowonAuthOk_9cd9fb8f74884ca4 = function(arg0, arg1, arg2, arg3, arg4, arg5) {
let deferred0_0;
let deferred0_1;
let deferred1_0;
let deferred1_1;
let deferred2_0;
let deferred2_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
deferred1_0 = arg2;
deferred1_1 = arg3;
deferred2_0 = arg4;
deferred2_1 = arg5;
window.onAuthOk(getStringFromWasm0(arg0, arg1), getStringFromWasm0(arg2, arg3), getStringFromWasm0(arg4, arg5));
} finally {
wasm.__wbindgen_free(deferred0_0, deferred0_1);
wasm.__wbindgen_free(deferred1_0, deferred1_1);
wasm.__wbindgen_free(deferred2_0, deferred2_1);
}
};
imports.wbg.__wbg_windowonAuthFail_ddfdfcd594ff15b8 = typeof window.onAuthFail == 'function' ? window.onAuthFail : notDefined('window.onAuthFail');
imports.wbg.__wbg_windowonMessage_5c5b80d5376153dc = function(arg0, arg1) {
let deferred0_0;
let deferred0_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
window.onMessage(getStringFromWasm0(arg0, arg1));
} finally {
wasm.__wbindgen_free(deferred0_0, deferred0_1);
}
};
imports.wbg.__wbg_setonopen_6fd8b28538150568 = function(arg0, arg1) {
getObject(arg0).onopen = getObject(arg1);
};
imports.wbg.__wbg_setonerror_9f7532626d7a9ce2 = function(arg0, arg1) {
getObject(arg0).onerror = getObject(arg1);
};
imports.wbg.__wbg_setonclose_6b22bc5d93628786 = function(arg0, arg1) {
getObject(arg0).onclose = getObject(arg1);
};
imports.wbg.__wbg_setonmessage_493b82147081ec7e = function(arg0, arg1) {
getObject(arg0).onmessage = getObject(arg1);
};
imports.wbg.__wbg_setbinaryType_2e2320b177c86b17 = function(arg0, arg1) {
getObject(arg0).binaryType = takeObject(arg1);
};
imports.wbg.__wbg_new_39e958ac9d5cae7d = function() { return handleError(function (arg0, arg1) {
const ret = new WebSocket(getStringFromWasm0(arg0, arg1));
return addHeapObject(ret);
}, arguments) };
imports.wbg.__wbg_send_737fddb36434277e = function() { return handleError(function (arg0, arg1, arg2) {
getObject(arg0).send(getArrayU8FromWasm0(arg1, arg2));
}, arguments) };
imports.wbg.__wbindgen_string_new = function(arg0, arg1) {
const ret = getStringFromWasm0(arg0, arg1);
return addHeapObject(ret);
};
imports.wbg.__wbg_data_ef47af9c565d228b = function(arg0) {
const ret = getObject(arg0).data;
return addHeapObject(ret);
@ -661,19 +630,59 @@ function __wbg_get_imports() {
const ret = getObject(arg0).length;
return ret;
};
imports.wbg.__wbg_set_4b3aa8445ac1e91c = function(arg0, arg1, arg2) {
getObject(arg0).set(getObject(arg1), arg2 >>> 0);
};
imports.wbg.__wbindgen_object_drop_ref = function(arg0) {
takeObject(arg0);
imports.wbg.__wbindgen_memory = function() {
const ret = wasm.memory;
return addHeapObject(ret);
};
imports.wbg.__wbg_buffer_fcbfb6d88b2732e9 = function(arg0) {
const ret = getObject(arg0).buffer;
return addHeapObject(ret);
};
imports.wbg.__wbg_set_4b3aa8445ac1e91c = function(arg0, arg1, arg2) {
getObject(arg0).set(getObject(arg1), arg2 >>> 0);
};
imports.wbg.__wbg_windowonMessage_5c5b80d5376153dc = function(arg0, arg1) {
let deferred0_0;
let deferred0_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
window.onMessage(getStringFromWasm0(arg0, arg1));
} finally {
wasm.__wbindgen_export_4(deferred0_0, deferred0_1);
}
};
imports.wbg.__wbg_windowonAuthOk_9cd9fb8f74884ca4 = function(arg0, arg1, arg2, arg3, arg4, arg5) {
let deferred0_0;
let deferred0_1;
let deferred1_0;
let deferred1_1;
let deferred2_0;
let deferred2_1;
try {
deferred0_0 = arg0;
deferred0_1 = arg1;
deferred1_0 = arg2;
deferred1_1 = arg3;
deferred2_0 = arg4;
deferred2_1 = arg5;
window.onAuthOk(getStringFromWasm0(arg0, arg1), getStringFromWasm0(arg2, arg3), getStringFromWasm0(arg4, arg5));
} finally {
wasm.__wbindgen_export_4(deferred0_0, deferred0_1);
wasm.__wbindgen_export_4(deferred1_0, deferred1_1);
wasm.__wbindgen_export_4(deferred2_0, deferred2_1);
}
};
imports.wbg.__wbg_windowonAuthFail_ddfdfcd594ff15b8 = typeof window.onAuthFail == 'function' ? window.onAuthFail : notDefined('window.onAuthFail');
imports.wbg.__wbg_send_737fddb36434277e = function() { return handleError(function (arg0, arg1, arg2) {
getObject(arg0).send(getArrayU8FromWasm0(arg1, arg2));
}, arguments) };
imports.wbg.__wbindgen_object_drop_ref = function(arg0) {
takeObject(arg0);
};
imports.wbg.__wbindgen_debug_string = function(arg0, arg1) {
const ret = debugString(getObject(arg1));
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const ptr1 = passStringToWasm0(ret, wasm.__wbindgen_export_0, wasm.__wbindgen_export_1);
const len1 = WASM_VECTOR_LEN;
getInt32Memory0()[arg0 / 4 + 1] = len1;
getInt32Memory0()[arg0 / 4 + 0] = ptr1;
@ -681,16 +690,12 @@ function __wbg_get_imports() {
imports.wbg.__wbindgen_throw = function(arg0, arg1) {
throw new Error(getStringFromWasm0(arg0, arg1));
};
imports.wbg.__wbindgen_memory = function() {
const ret = wasm.memory;
imports.wbg.__wbindgen_closure_wrapper1471 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_12);
return addHeapObject(ret);
};
imports.wbg.__wbindgen_closure_wrapper440 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 89, __wbg_adapter_12);
return addHeapObject(ret);
};
imports.wbg.__wbindgen_closure_wrapper442 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 87, __wbg_adapter_15);
imports.wbg.__wbindgen_closure_wrapper1473 = function(arg0, arg1, arg2) {
const ret = makeMutClosure(arg0, arg1, 8, __wbg_adapter_12);
return addHeapObject(ret);
};

View File

@ -1,7 +1,7 @@
/* tslint:disable */
/* eslint-disable */
export const memory: WebAssembly.Memory;
export function connect_wasm_pipe(a: number, b: number): void;
export function initialize_wss(a: number, b: number): void;
export function is_wasm_connected(): number;
export function send_wss_queue(): void;
export function send_token(a: number, b: number): void;
@ -15,6 +15,7 @@ export function request_throughput_chart_for_node(a: number, b: number, c: numbe
export function request_throughput_chart_for_circuit(a: number, b: number, c: number, d: number): void;
export function request_site_stack(a: number, b: number, c: number, d: number): void;
export function request_rtt_chart(a: number, b: number): void;
export function request_rtt_histogram(a: number, b: number): void;
export function request_rtt_chart_for_site(a: number, b: number, c: number, d: number): void;
export function request_rtt_chart_for_node(a: number, b: number, c: number, d: number, e: number, f: number): void;
export function request_rtt_chart_for_circuit(a: number, b: number, c: number, d: number): void;
@ -31,10 +32,9 @@ export function request_circuit_info(a: number, b: number): void;
export function request_ext_device_info(a: number, b: number): void;
export function request_ext_snr_graph(a: number, b: number, c: number, d: number): void;
export function request_ext_capacity_graph(a: number, b: number, c: number, d: number): void;
export function __wbindgen_malloc(a: number): number;
export function __wbindgen_realloc(a: number, b: number, c: number): number;
export function __wbindgen_export_0(a: number): number;
export function __wbindgen_export_1(a: number, b: number, c: number): number;
export const __wbindgen_export_2: WebAssembly.Table;
export function _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__hb4b341652e081e3f(a: number, b: number, c: number): void;
export function _dyn_core__ops__function__FnMut__A____Output___R_as_wasm_bindgen__closure__WasmClosure___describe__invoke__ha318d2d73313995c(a: number, b: number, c: number): void;
export function __wbindgen_free(a: number, b: number): void;
export function __wbindgen_exn_store(a: number): void;
export function __wbindgen_export_3(a: number, b: number, c: number): void;
export function __wbindgen_export_4(a: number, b: number): void;
export function __wbindgen_export_5(a: number): void;

View File

@ -13,6 +13,7 @@ miniz_oxide = "0.7.1"
serde_cbor = "0" # For RFC8949/7409 format C binary objects
wasm_pipe_types = { path = "../wasm_pipe_types" }
serde_json = "1.0.96"
thiserror = "1.0.44"
[dependencies.web-sys]
version = "0.3.22"

View File

@ -1,5 +1,5 @@
#!/bin/bash
cargo build --target wasm32-unknown-unknown
wasm-bindgen --target web --out-dir staging/ ../../target/wasm32-unknown-unknown/debug/wasm_pipe.wasm
CARGO_PROFILE_RELEASE_OPT_LEVEL=z CARGO_PROFILE_RELEASE_LTO=true CARGO_PROFILE_RELEASE_STRIP=symbols CARGO_PROFILE_RELEASE_CODEGEN_UNITS=1 CARGO_PROFILE_RELEASE_INCREMENTAL=false cargo build --target wasm32-unknown-unknown --release
wasm-bindgen --target web --out-dir staging/ ../../target/wasm32-unknown-unknown/release/wasm_pipe.wasm
cp staging/* ../site_build/wasm
cp staging/wasm_pipe_bg.wasm ../lts_node/web

View File

@ -0,0 +1,227 @@
use std::collections::VecDeque;
use wasm_pipe_types::{WasmResponse, WasmRequest};
use web_sys::{WebSocket, BinaryType, ErrorEvent, MessageEvent};
use thiserror::Error;
use wasm_bindgen::prelude::*;
use crate::{message::{WsResponseMessage, WsRequestMessage}, onAuthOk, onAuthFail, onMessage, get_token};
use super::log;
static mut CONDUIT: Option<Conduit> = None;
#[wasm_bindgen]
pub fn initialize_wss(url: String) {
log(&format!("Initializing WSS to: {url}"));
unsafe {
if CONDUIT.is_none() {
CONDUIT = Some(Conduit::new(url));
if let Some(conduit) = &mut CONDUIT {
match conduit.connect() {
Ok(_) => log("Connection requested."),
Err(e) => log(&format!("Error connecting: {:?}", e)),
}
}
} else {
log("Conduit already initialized");
}
}
}
#[wasm_bindgen]
pub fn is_wasm_connected() -> bool {
unsafe {
if let Some(conduit) = &CONDUIT {
conduit.is_connected()
} else {
false
}
}
}
#[wasm_bindgen]
pub fn send_wss_queue() {
unsafe {
if let Some(conduit) = &mut CONDUIT {
conduit.send_queue();
} else {
log("No conduit");
}
}
}
pub fn send_message(msg: WasmRequest) {
unsafe {
if let Some(conduit) = &mut CONDUIT {
conduit.enqueue_raw(msg);
} else {
log("No conduit");
}
}
}
#[derive(Error, Debug)]
enum WebSocketError {
#[error("URL is empty")]
NoURL,
#[error("Already connected")]
AlreadyConnected,
#[error("WebSocket already exists")]
AlreadyExists,
#[error("WebSocket Creation Error")]
CreationError,
}
#[derive(PartialEq, Eq)]
enum ConnectionStatus {
New,
Connected,
}
/// Handles WS connection to the server.
struct Conduit {
status: ConnectionStatus,
socket: Option<WebSocket>,
url: String,
message_queue: VecDeque<Vec<u8>>,
}
impl Conduit {
fn new(url: String) -> Self {
Self {
status: ConnectionStatus::New,
socket: None,
url,
message_queue: VecDeque::new(),
}
}
fn connect(&mut self) -> Result<(), WebSocketError> {
// Precondition testing
if self.url.is_empty() { return Err(WebSocketError::NoURL); }
if self.status != ConnectionStatus::New { return Err(WebSocketError::AlreadyConnected); }
if self.socket.is_some() { return Err(WebSocketError::AlreadyExists); }
self.socket = Some(WebSocket::new(&self.url).map_err(|_| WebSocketError::CreationError)?);
if let Some(socket) = &mut self.socket {
socket.set_binary_type(BinaryType::Arraybuffer);
// Wire up on_close
let onclose_callback = Closure::<dyn FnMut(_)>::new(move |_e: ErrorEvent| {
on_close();
});
socket.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
onclose_callback.forget();
// Wire up on_error
let onerror_callback = Closure::<dyn FnMut(_)>::new(move |e: ErrorEvent| {
log(&format!("Error Received: {e:?}"));
on_error()
});
socket.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
onerror_callback.forget();
// Wire up on_open
let onopen_callback = Closure::<dyn FnMut(_)>::new(move |_e: ErrorEvent| {
log("Open Received");
on_open();
});
socket.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
onopen_callback.forget();
// Wire up on message
let onmessage_callback = Closure::<dyn FnMut(_)>::new(move |e: MessageEvent| {
log("Message Received");
if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
let response = WsResponseMessage::from_array_buffer(abuf);
match response {
Err(e) => log(&format!("Error parsing message: {:?}", e)),
Ok(WsResponseMessage(msg)) => {
match msg {
WasmResponse::AuthOk { token, name, license_key } => {
onAuthOk(token, name, license_key);
}
WasmResponse::AuthFail => {
onAuthFail();
}
_ => {
let json = serde_json::to_string(&msg).unwrap();
onMessage(json);
}
}
}
}
}
});
socket.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
onmessage_callback.forget();
}
Ok(())
}
fn is_connected(&self) -> bool {
self.status == ConnectionStatus::Connected
}
fn enqueue_raw(&mut self, message: WasmRequest) {
let msg = WsRequestMessage::new(message);
self.enqueue_message(msg);
}
fn enqueue_message(&mut self, message: WsRequestMessage) {
let serialized = message.serialize();
match serialized {
Ok(msg) => self.message_queue.push_back(msg),
Err(e) => log(&format!("Error enqueing message: {:?}", e)),
}
}
fn send_queue(&mut self) {
// Bail out if there's nothing to do
if self.message_queue.is_empty() {
return;
}
// Send queued messages
if let Some(ws) = &mut self.socket {
while let Some(msg) = self.message_queue.pop_front() {
log(&format!("Sending message: {msg:?}"));
if let Err(e) = ws.send_with_u8_array(&msg) {
log(&format!("Error sending message: {e:?}"));
self.status = ConnectionStatus::New;
break;
}
}
} else {
log("No WebSocket connection");
let _ = self.connect();
}
}
}
fn on_close() {
unsafe {
if let Some(conduit) = &mut CONDUIT {
conduit.socket = None;
conduit.status = ConnectionStatus::New;
}
}
}
fn on_error() {
unsafe {
if let Some(conduit) = &mut CONDUIT {
conduit.socket = None;
conduit.status = ConnectionStatus::New;
}
}
}
fn on_open() {
unsafe {
if let Some(conduit) = &mut CONDUIT {
conduit.status = ConnectionStatus::Connected;
conduit.enqueue_raw(WasmRequest::Auth { token: get_token() });
}
}
}

View File

@ -1,8 +1,7 @@
use std::collections::VecDeque;
use wasm_bindgen::prelude::*;
use wasm_pipe_types::{WasmRequest, WasmResponse};
use web_sys::{BinaryType, ErrorEvent, MessageEvent, WebSocket};
use wasm_pipe_types::WasmRequest;
mod conduit;
mod message;
#[wasm_bindgen]
extern "C" {
@ -22,117 +21,11 @@ extern "C" {
fn onMessage(json: String);
}
static mut CONNECTED: bool = false;
static mut WS: Option<WebSocket> = None;
static mut QUEUE: VecDeque<Vec<u8>> = VecDeque::new();
static mut URL: String = String::new();
#[wasm_bindgen]
pub fn connect_wasm_pipe(url: String) {
unsafe {
if CONNECTED {
log("Already connected");
return;
}
if !url.is_empty() {
URL = url.clone();
}
WS = Some(WebSocket::new(&url).unwrap());
if let Some(ws) = &mut WS {
ws.set_binary_type(BinaryType::Arraybuffer);
ws.set_binary_type(BinaryType::Arraybuffer);
let onmessage_callback = Closure::<dyn FnMut(_)>::new(move |e: MessageEvent| {
log("Message Received");
if let Ok(abuf) = e.data().dyn_into::<js_sys::ArrayBuffer>() {
let array = js_sys::Uint8Array::new(&abuf);
//let len = array.byte_length() as usize;
let raw = array.to_vec();
let decompressed = miniz_oxide::inflate::decompress_to_vec(&raw).unwrap();
let msg: WasmResponse = serde_cbor::from_slice(&decompressed).unwrap();
//log(&format!("Message: {:?}", msg));
match msg {
WasmResponse::AuthOk { token, name, license_key } => {
onAuthOk(token, name, license_key);
}
WasmResponse::AuthFail => {
onAuthFail();
}
_ => {
let json = serde_json::to_string(&msg).unwrap();
onMessage(json);
}
}
}
});
let onerror_callback = Closure::<dyn FnMut(_)>::new(move |e: ErrorEvent| {
log(&format!("Error Received: {e:?}"));
CONNECTED = false;
});
let onclose_callback = Closure::<dyn FnMut(_)>::new(move |_e: ErrorEvent| {
log("Close Received");
CONNECTED = false;
});
let onopen_callback = Closure::<dyn FnMut(_)>::new(move |_e: ErrorEvent| {
log("Open Received");
CONNECTED = true;
let token = get_token();
send_token(token);
});
ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref()));
ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref()));
ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref()));
ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref()));
// Prevent closures from recursing
onopen_callback.forget();
onclose_callback.forget();
onerror_callback.forget();
onmessage_callback.forget();
}
}
}
#[wasm_bindgen]
pub fn is_wasm_connected() -> bool {
unsafe { CONNECTED && WS.is_some() }
}
#[wasm_bindgen]
pub fn send_wss_queue() {
//log("Call to send queue");
unsafe {
// Bail out if there's nothing to do
if QUEUE.is_empty() {
//log("Queue is empty");
return;
}
// Send queued messages
if let Some(ws) = &mut WS {
while let Some(msg) = QUEUE.pop_front() {
log(&format!("Sending message: {msg:?}"));
ws.send_with_u8_array(&msg).unwrap();
}
} else {
log("No WebSocket connection");
CONNECTED = false;
connect_wasm_pipe(String::new());
}
}
}
fn build_message(msg: WasmRequest) -> Vec<u8> {
let cbor = serde_cbor::to_vec(&msg).unwrap();
miniz_oxide::deflate::compress_to_vec(&cbor, 8)
}
pub use conduit::{initialize_wss, is_wasm_connected, send_wss_queue};
fn send_message(msg: WasmRequest) {
log(&format!("Sending message: {msg:?}"));
let msg = build_message(msg);
unsafe {
QUEUE.push_back(msg);
}
log(&format!("Enqueueing message: {msg:?}"));
conduit::send_message(msg);
}
#[wasm_bindgen]
@ -196,6 +89,11 @@ pub fn request_rtt_chart(period: String) {
send_message(WasmRequest::RttChart { period });
}
#[wasm_bindgen]
pub fn request_rtt_histogram(period: String) {
send_message(WasmRequest::RttHistogram { period });
}
#[wasm_bindgen]
pub fn request_rtt_chart_for_site(period: String, site_id: String) {
send_message(WasmRequest::RttChartSite { period, site_id });

View File

@ -0,0 +1,40 @@
use thiserror::Error;
use wasm_pipe_types::{WasmRequest, WasmResponse};
#[derive(Error, Debug)]
pub enum MessageError {
#[error("Unable to decompress stream")]
DecompressError,
#[error("Unable to de-serialize CBOR into native type")]
DeserializeError,
#[error("Unable to serialize CBOR from native type")]
SerializeError,
}
pub struct WsResponseMessage(pub WasmResponse);
impl WsResponseMessage {
pub fn from_array_buffer(buffer: js_sys::ArrayBuffer) -> Result<Self, MessageError> {
// Convert the array buffer into a strongly typed buffer
let array = js_sys::Uint8Array::new(&buffer);
let raw = array.to_vec();
let decompressed = miniz_oxide::inflate::decompress_to_vec(&raw)
.map_err(|_| MessageError::DecompressError)?;
let msg: WasmResponse =
serde_cbor::from_slice(&decompressed).map_err(|_| MessageError::DeserializeError)?;
Ok(Self(msg))
}
}
pub struct WsRequestMessage(pub WasmRequest);
impl WsRequestMessage {
pub fn new(msg: WasmRequest) -> Self {
Self(msg)
}
pub fn serialize(&self) -> Result<Vec<u8>, MessageError> {
let cbor = serde_cbor::to_vec(&self.0).map_err(|_| MessageError::SerializeError)?;
Ok(miniz_oxide::deflate::compress_to_vec(&cbor, 8))
}
}

View File

@ -14,6 +14,7 @@ pub enum WasmRequest {
ThroughputChartSite { period: String, site_id: String },
ThroughputChartCircuit { period: String, circuit_id: String },
RttChart { period: String },
RttHistogram { period: String },
RttChartSingle { period: String, node_id: String, node_name: String },
RttChartSite { period: String, site_id: String },
RttChartCircuit { period: String, circuit_id: String },
@ -42,10 +43,11 @@ pub enum WasmResponse {
NodeStatus { nodes: Vec<Node> },
PacketChart { nodes: Vec<PacketHost> },
BitsChart { nodes: Vec<ThroughputHost> },
RttChart { nodes: Vec<RttHost>, histogram: Vec<u32> },
RttChart { nodes: Vec<RttHost> },
RttHistogram { histogram: Vec<u32> },
RttChartSite { nodes: Vec<RttHost>, histogram: Vec<u32> },
RttChartCircuit { nodes: Vec<RttHost>, histogram: Vec<u32> },
SiteStack { nodes: Vec<ThroughputHost> },
SiteStack { nodes: Vec<SiteStackHost> },
RootHeat { data: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>>},
SiteHeat { data: HashMap<String, Vec<(DateTime<FixedOffset>, f64)>>},
NodePerfChart { nodes: Vec<PerfHost> },
@ -97,6 +99,19 @@ impl ThroughputHost {
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SiteStackHost {
pub node_name: String,
pub download: Vec<(String, i64)>,
pub upload: Vec<(String, i64)>,
}
impl SiteStackHost {
pub fn total(&self) -> i64 {
self.download.iter().map(|x| x.1).sum::<i64>()
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Throughput {
pub value: f64,