Refactor the entire WSS back-end to be a better async citizen

This commit is contained in:
Herbert Wolverson 2023-08-09 19:54:00 +00:00
parent 8bd14c14f6
commit 32233ca36d
17 changed files with 463 additions and 418 deletions

View File

@ -1,12 +1,10 @@
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use serde::Serialize;
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::WasmResponse;
use super::send_response;
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct LoginResult {
pub msg: String,
pub token: String,
@ -14,8 +12,8 @@ pub struct LoginResult {
pub license_key: String,
}
#[instrument(skip(license, username, password, socket, cnn))]
pub async fn on_login(license: &str, username: &str, password: &str, socket: &mut WebSocket, cnn: Pool<Postgres>) -> Option<LoginResult> {
#[instrument(skip(license, username, password, tx, cnn))]
pub async fn on_login(license: &str, username: &str, password: &str, tx: Sender<WasmResponse>, cnn: Pool<Postgres>) -> Option<LoginResult> {
let login = pgdb::try_login(cnn, license, username, password).await;
if let Ok(login) = login {
let lr = WasmResponse::LoginOk {
@ -23,7 +21,7 @@ pub async fn on_login(license: &str, username: &str, password: &str, socket: &mu
name: login.name.clone(),
license_key: license.to_string(),
};
send_response(socket, lr).await;
tx.send(lr).await.unwrap();
return Some(LoginResult {
msg: "Login Ok".to_string(),
token: login.token.to_string(),
@ -32,13 +30,13 @@ pub async fn on_login(license: &str, username: &str, password: &str, socket: &mu
});
} else {
let lr = WasmResponse::LoginFail;
send_response(socket, lr).await;
tx.send(lr).await.unwrap();
}
None
}
#[instrument(skip(token_id, socket, cnn))]
pub async fn on_token_auth(token_id: &str, socket: &mut WebSocket, cnn: Pool<Postgres>) -> Option<LoginResult> {
#[instrument(skip(token_id, tx, cnn))]
pub async fn on_token_auth(token_id: &str, tx: Sender<WasmResponse>, cnn: Pool<Postgres>) -> Option<LoginResult> {
let login = pgdb::token_to_credentials(cnn, token_id).await;
if let Ok(login) = login {
let lr = WasmResponse::AuthOk {
@ -46,7 +44,7 @@ pub async fn on_token_auth(token_id: &str, socket: &mut WebSocket, cnn: Pool<Pos
name: login.name.clone(),
license_key: login.license.clone(),
};
send_response(socket, lr).await;
tx.send(lr).await.unwrap();
return Some(LoginResult {
msg: "Login Ok".to_string(),
token: login.token.to_string(),
@ -54,7 +52,7 @@ pub async fn on_token_auth(token_id: &str, socket: &mut WebSocket, cnn: Pool<Pos
license_key: login.license.to_string(),
});
} else {
send_response(socket, WasmResponse::AuthFail).await;
tx.send(WasmResponse::AuthFail).await.unwrap();
}
None
}

View File

@ -1,3 +1,5 @@
use std::sync::Arc;
use crate::web::wss::{
nodes::node_status,
queries::{
@ -5,16 +7,15 @@ use crate::web::wss::{
send_extended_device_capacity_graph, send_extended_device_info,
send_extended_device_snr_graph,
},
omnisearch, root_heat_map, send_circuit_info, send_packets_for_all_nodes,
send_packets_for_node, send_perf_for_node, send_rtt_for_all_nodes,
send_rtt_for_all_nodes_circuit, send_rtt_for_all_nodes_site, send_rtt_for_node,
send_rtt_histogram_for_all_nodes,
send_site_info, send_site_parents, send_site_stack_map, send_throughput_for_all_nodes,
omnisearch, root_heat_map, send_circuit_info, send_circuit_parents,
send_packets_for_all_nodes, send_packets_for_node, send_perf_for_node, send_root_parents,
send_rtt_for_all_nodes, send_rtt_for_all_nodes_circuit, send_rtt_for_all_nodes_site,
send_rtt_for_node, send_rtt_histogram_for_all_nodes, send_site_info, send_site_parents,
send_site_stack_map, send_throughput_for_all_nodes,
send_throughput_for_all_nodes_by_circuit, send_throughput_for_all_nodes_by_site,
send_throughput_for_node, site_heat_map,
site_tree::send_site_tree,
time_period::InfluxTimePeriod,
send_circuit_parents, send_root_parents,
},
};
use axum::{
@ -25,12 +26,13 @@ use axum::{
response::IntoResponse,
};
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::{mpsc::Sender, Mutex};
use tracing::instrument;
use wasm_pipe_types::{WasmRequest, WasmResponse};
mod influx_query_builder;
mod login;
mod nodes;
mod queries;
mod influx_query_builder;
pub async fn ws_handler(
ws: WebSocketUpgrade,
@ -39,292 +41,354 @@ pub async fn ws_handler(
ws.on_upgrade(move |sock| handle_socket(sock, state))
}
#[instrument(skip(socket, cnn))]
async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
tracing::info!("WebSocket Connected");
let mut credentials: Option<login::LoginResult> = None;
while let Some(msg) = socket.recv().await {
let cnn = cnn.clone();
let msg = msg.unwrap();
let credentials: Arc<Mutex<Option<login::LoginResult>>> = Arc::new(Mutex::new(None));
// Get the binary message and decompress it
tracing::info!("Received a message: {:?}", msg);
let raw = msg.into_data();
let uncompressed = miniz_oxide::inflate::decompress_to_vec(&raw).unwrap();
let msg = lts_client::cbor::from_slice::<WasmRequest>(&uncompressed).unwrap();
tracing::info!("{msg:?}");
// Setup the send/receive channel
let (tx, mut rx) = tokio::sync::mpsc::channel::<WasmResponse>(10);
// Update the token credentials (if there are any)
if let Some(credentials) = &credentials {
let _ = pgdb::refresh_token(cnn.clone(), &credentials.token).await;
loop {
tokio::select! {
msg = socket.recv() => {
match msg {
Some(msg) => {
tokio::spawn(
handle_socket_message(msg.unwrap(), cnn.clone(), credentials.clone(), tx.clone())
);
}
None => {
tracing::info!("WebSocket Disconnected");
break;
}
}
},
msg = rx.recv() => {
match msg {
Some(msg) => {
let serialized = serialize_response(msg);
socket.send(Message::Binary(serialized)).await.unwrap();
}
None => {
tracing::info!("WebSocket Disconnected");
break;
}
}
},
}
}
}
// Handle the message by type
let matcher = (&msg, &mut credentials);
let wss = &mut socket;
match matcher {
// Handle login with just a token
(WasmRequest::Auth { token }, _) => {
let result = login::on_token_auth(token, &mut socket, cnn).await;
if let Some(result) = result {
credentials = Some(result);
}
}
// Handle login with a username and password
(
WasmRequest::Login {
license,
username,
password,
},
_,
) => {
let result = login::on_login(license, username, password, &mut socket, cnn).await;
if let Some(result) = result {
credentials = Some(result);
}
}
// Node status for dashboard
(WasmRequest::GetNodeStatus, Some(credentials)) => {
node_status(&cnn, wss, &credentials.license_key).await;
}
// Packet chart for dashboard
(WasmRequest::PacketChart { period }, Some(credentials)) => {
let _ =
send_packets_for_all_nodes(&cnn, wss, &credentials.license_key, period.into())
.await;
}
// Packet chart for individual node
(
WasmRequest::PacketChartSingle {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_packets_for_node(
&cnn,
wss,
&credentials.license_key,
period.into(),
node_id,
node_name,
)
.await;
}
// Throughput chart for the dashboard
(WasmRequest::ThroughputChart { period }, Some(credentials)) => {
let _ = send_throughput_for_all_nodes(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
// Throughput chart for a single shaper node
(
WasmRequest::ThroughputChartSingle {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_throughput_for_node(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
node_id.to_string(),
node_name.to_string(),
)
.await;
}
(WasmRequest::ThroughputChartSite { period, site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
let _ = send_throughput_for_all_nodes_by_site(
&cnn,
wss,
&credentials.license_key,
site_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::ThroughputChartCircuit { period, circuit_id }, Some(credentials)) => {
let _ = send_throughput_for_all_nodes_by_circuit(
&cnn,
wss,
&credentials.license_key,
circuit_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
// Rtt Chart
(WasmRequest::RttChart { period }, Some(credentials)) => {
let _ = send_rtt_for_all_nodes(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::RttHistogram { period }, Some(credentials)) => {
let _ = send_rtt_histogram_for_all_nodes(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::RttChartSite { period, site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
let _ = send_rtt_for_all_nodes_site(
&cnn,
wss,
&credentials.license_key,
site_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
(
WasmRequest::RttChartSingle {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_rtt_for_node(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
node_id.to_string(),
node_name.to_string(),
)
.await;
}
(WasmRequest::RttChartCircuit { period, circuit_id }, Some(credentials)) => {
let _ = send_rtt_for_all_nodes_circuit(
&cnn,
wss,
&credentials.license_key,
circuit_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
// Site Stack
(WasmRequest::SiteStack { period, site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
let _ = send_site_stack_map(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
site_id.to_string(),
)
.await;
}
(WasmRequest::RootHeat { period }, Some(credentials)) => {
let _ = root_heat_map(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::SiteHeat { period, site_id }, Some(credentials)) => {
let _ = site_heat_map(
&cnn,
wss,
&credentials.license_key,
site_id,
InfluxTimePeriod::new(period),
)
.await;
}
(
WasmRequest::NodePerfChart {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_perf_for_node(
&cnn,
wss,
&credentials.license_key,
InfluxTimePeriod::new(period),
node_id.to_string(),
node_name.to_string(),
)
.await;
}
(WasmRequest::Tree { parent }, Some(credentials)) => {
send_site_tree(&cnn, wss, &credentials.license_key, parent).await;
}
(WasmRequest::SiteInfo { site_id }, Some(credentials)) => {
send_site_info(&cnn, wss, &credentials.license_key, site_id).await;
}
(WasmRequest::SiteParents { site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
send_site_parents(&cnn, wss, &credentials.license_key, &site_id).await;
}
(WasmRequest::CircuitParents { circuit_id }, Some(credentials)) => {
let circuit_id = urlencoding::decode(circuit_id).unwrap();
send_circuit_parents(&cnn, wss, &credentials.license_key, &circuit_id).await;
}
(WasmRequest::RootParents, Some(credentials)) => {
send_root_parents(&cnn, wss, &credentials.license_key).await;
}
(WasmRequest::Search { term }, Some(credentials)) => {
let _ = omnisearch(&cnn, wss, &credentials.license_key, term).await;
}
(WasmRequest::CircuitInfo { circuit_id }, Some(credentials)) => {
send_circuit_info(&cnn, wss, &credentials.license_key, circuit_id).await;
}
(WasmRequest::ExtendedDeviceInfo { circuit_id }, Some(credentials)) => {
send_extended_device_info(&cnn, wss, &credentials.license_key, circuit_id).await;
}
(WasmRequest::SignalNoiseChartExt { period, device_id }, Some(credentials)) => {
let _ = send_extended_device_snr_graph(
&cnn,
wss,
&credentials.license_key,
device_id,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::DeviceCapacityChartExt { period, device_id }, Some(credentials)) => {
let _ = send_extended_device_capacity_graph(
&cnn,
wss,
&credentials.license_key,
device_id,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::ApSignalExt { period, site_name }, Some(credentials)) => {
#[instrument(skip(credentials, cnn))]
async fn update_token_credentials(
credentials: Arc<Mutex<Option<login::LoginResult>>>,
cnn: Pool<Postgres>,
) {
let mut credentials = credentials.lock().await;
if let Some(credentials) = &mut *credentials {
let _ = pgdb::refresh_token(cnn, &credentials.token).await;
}
}
async fn set_credentials(
credentials: Arc<Mutex<Option<login::LoginResult>>>,
result: login::LoginResult,
) {
let mut credentials = credentials.lock().await;
*credentials = Some(result);
}
fn extract_message(msg: Message) -> WasmRequest {
let raw = msg.into_data();
let uncompressed = miniz_oxide::inflate::decompress_to_vec(&raw).unwrap();
lts_client::cbor::from_slice::<WasmRequest>(&uncompressed).unwrap()
}
async fn handle_auth_message(
msg: &WasmRequest,
credentials: Arc<Mutex<Option<login::LoginResult>>>,
tx: Sender<WasmResponse>,
cnn: Pool<Postgres>,
) {
match msg {
// Handle login with just a token
WasmRequest::Auth { token } => {
let result = login::on_token_auth(token, tx, cnn).await;
if let Some(result) = result {
set_credentials(credentials, result).await;
}
(WasmRequest::ApCapacityExt { period, site_name }, Some(credentials)) => {
}
(_, None) => {
tracing::error!("No credentials");
}
// Handle a full login
WasmRequest::Login { license, username, password } => {
let result = login::on_login(license, username, password, tx, cnn).await;
if let Some(result) = result {
set_credentials(credentials, result).await;
}
}
_ => {}
}
}
async fn handle_socket_message(
msg: Message,
cnn: Pool<Postgres>,
credentials: Arc<Mutex<Option<login::LoginResult>>>,
tx: Sender<WasmResponse>,
) {
// Get the binary message and decompress it
let msg = extract_message(msg);
update_token_credentials(credentials.clone(), cnn.clone()).await;
// Handle the message by type
handle_auth_message(&msg, credentials.clone(), tx.clone(), cnn.clone()).await;
let my_credentials = {
let lock = credentials.lock().await;
lock.clone()
};
let matcher = (&msg, &my_credentials);
match matcher {
// Node status for dashboard
(WasmRequest::GetNodeStatus, Some(credentials)) => {
node_status(&cnn, tx, &credentials.license_key).await;
}
// Packet chart for dashboard
(WasmRequest::PacketChart { period }, Some(credentials)) => {
let _ =
send_packets_for_all_nodes(&cnn, tx, &credentials.license_key, period.into()).await;
}
// Packet chart for individual node
(
WasmRequest::PacketChartSingle {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_packets_for_node(
&cnn,
tx,
&credentials.license_key,
period.into(),
node_id,
node_name,
)
.await;
}
// Throughput chart for the dashboard
(WasmRequest::ThroughputChart { period }, Some(credentials)) => {
let _ = send_throughput_for_all_nodes(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
// Throughput chart for a single shaper node
(
WasmRequest::ThroughputChartSingle {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_throughput_for_node(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
node_id.to_string(),
node_name.to_string(),
)
.await;
}
(WasmRequest::ThroughputChartSite { period, site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
let _ = send_throughput_for_all_nodes_by_site(
&cnn,
tx,
&credentials.license_key,
site_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::ThroughputChartCircuit { period, circuit_id }, Some(credentials)) => {
let _ = send_throughput_for_all_nodes_by_circuit(
&cnn,
tx,
&credentials.license_key,
circuit_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
// Rtt Chart
(WasmRequest::RttChart { period }, Some(credentials)) => {
let _ = send_rtt_for_all_nodes(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::RttHistogram { period }, Some(credentials)) => {
let _ = send_rtt_histogram_for_all_nodes(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::RttChartSite { period, site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
let _ = send_rtt_for_all_nodes_site(
&cnn,
tx,
&credentials.license_key,
site_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
(
WasmRequest::RttChartSingle {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_rtt_for_node(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
node_id.to_string(),
node_name.to_string(),
)
.await;
}
(WasmRequest::RttChartCircuit { period, circuit_id }, Some(credentials)) => {
let _ = send_rtt_for_all_nodes_circuit(
&cnn,
tx,
&credentials.license_key,
circuit_id.to_string(),
InfluxTimePeriod::new(period),
)
.await;
}
// Site Stack
(WasmRequest::SiteStack { period, site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
let _ = send_site_stack_map(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
site_id.to_string(),
)
.await;
}
(WasmRequest::RootHeat { period }, Some(credentials)) => {
let _ = root_heat_map(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::SiteHeat { period, site_id }, Some(credentials)) => {
let _ = site_heat_map(
&cnn,
tx,
&credentials.license_key,
site_id,
InfluxTimePeriod::new(period),
)
.await;
}
(
WasmRequest::NodePerfChart {
period,
node_id,
node_name,
},
Some(credentials),
) => {
let _ = send_perf_for_node(
&cnn,
tx,
&credentials.license_key,
InfluxTimePeriod::new(period),
node_id.to_string(),
node_name.to_string(),
)
.await;
}
(WasmRequest::Tree { parent }, Some(credentials)) => {
send_site_tree(&cnn, tx, &credentials.license_key, parent).await;
}
(WasmRequest::SiteInfo { site_id }, Some(credentials)) => {
send_site_info(&cnn, tx, &credentials.license_key, site_id).await;
}
(WasmRequest::SiteParents { site_id }, Some(credentials)) => {
let site_id = urlencoding::decode(site_id).unwrap();
send_site_parents(&cnn, tx, &credentials.license_key, &site_id).await;
}
(WasmRequest::CircuitParents { circuit_id }, Some(credentials)) => {
let circuit_id = urlencoding::decode(circuit_id).unwrap();
send_circuit_parents(&cnn, tx, &credentials.license_key, &circuit_id).await;
}
(WasmRequest::RootParents, Some(credentials)) => {
send_root_parents(&cnn, tx, &credentials.license_key).await;
}
(WasmRequest::Search { term }, Some(credentials)) => {
let _ = omnisearch(&cnn, tx, &credentials.license_key, term).await;
}
(WasmRequest::CircuitInfo { circuit_id }, Some(credentials)) => {
send_circuit_info(&cnn, tx, &credentials.license_key, circuit_id).await;
}
(WasmRequest::ExtendedDeviceInfo { circuit_id }, Some(credentials)) => {
send_extended_device_info(&cnn, tx, &credentials.license_key, circuit_id).await;
}
(WasmRequest::SignalNoiseChartExt { period, device_id }, Some(credentials)) => {
let _ = send_extended_device_snr_graph(
&cnn,
tx,
&credentials.license_key,
device_id,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::DeviceCapacityChartExt { period, device_id }, Some(credentials)) => {
let _ = send_extended_device_capacity_graph(
&cnn,
tx,
&credentials.license_key,
device_id,
InfluxTimePeriod::new(period),
)
.await;
}
(WasmRequest::ApSignalExt { period, site_name }, Some(credentials)) => {}
(WasmRequest::ApCapacityExt { period, site_name }, Some(credentials)) => {}
(_, None) => {
tracing::error!("No credentials");
}
_ => {
let error = format!("Unknown message: {msg:?}");
tracing::error!(error);
}
}
}
@ -332,9 +396,3 @@ fn serialize_response(response: WasmResponse) -> Vec<u8> {
let cbor = lts_client::cbor::to_vec(&response).unwrap();
miniz_oxide::deflate::compress_to_vec(&cbor, 8)
}
#[instrument(skip(socket, response))]
pub async fn send_response(socket: &mut WebSocket, response: WasmResponse) {
let serialized = serialize_response(response);
socket.send(Message::Binary(serialized)).await.unwrap();
}

View File

@ -1,9 +1,7 @@
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::Node;
use crate::web::wss::send_response;
use wasm_pipe_types::{Node, WasmResponse};
fn convert(ns: pgdb::NodeStatus) -> Node {
Node {
@ -13,14 +11,14 @@ fn convert(ns: pgdb::NodeStatus) -> Node {
}
}
#[instrument(skip(cnn, socket, key))]
pub async fn node_status(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str) {
#[instrument(skip(cnn, tx, key))]
pub async fn node_status(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str) {
tracing::info!("Fetching node status, {key}");
let nodes = pgdb::node_status(cnn, key).await;
match nodes {
Ok(nodes) => {
let nodes: Vec<Node> = nodes.into_iter().map(convert).collect();
send_response(socket, wasm_pipe_types::WasmResponse::NodeStatus { nodes }).await;
tx.send(wasm_pipe_types::WasmResponse::NodeStatus { nodes }).await.unwrap();
},
Err(e) => {
tracing::error!("Unable to obtain node status: {}", e);

View File

@ -1,8 +1,6 @@
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use wasm_pipe_types::CircuitList;
use crate::web::wss::send_response;
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::{CircuitList, WasmResponse};
fn from(circuit: pgdb::CircuitInfo) -> CircuitList {
CircuitList {
@ -21,9 +19,10 @@ fn from(circuit: pgdb::CircuitInfo) -> CircuitList {
}
}
pub async fn send_circuit_info(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, circuit_id: &str) {
#[tracing::instrument(skip(cnn, tx, key, circuit_id))]
pub async fn send_circuit_info(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, circuit_id: &str) {
if let Ok(hosts) = pgdb::get_circuit_info(cnn, key, circuit_id).await {
let hosts = hosts.into_iter().map(from).collect::<Vec<_>>();
send_response(socket, wasm_pipe_types::WasmResponse::CircuitInfo { data: hosts }).await;
tx.send(WasmResponse::CircuitInfo { data: hosts }).await.unwrap();
}
}

View File

@ -3,12 +3,14 @@ use axum::extract::ws::WebSocket;
use chrono::{DateTime, FixedOffset};
use influxdb2::{FromDataPoint, models::Query, Client};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use crate::web::wss::send_response;
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::WasmResponse;
use super::time_period::InfluxTimePeriod;
#[tracing::instrument(skip(cnn, tx, key, circuit_id))]
pub async fn send_extended_device_info(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
circuit_id: &str,
) {
@ -60,14 +62,15 @@ pub async fn send_extended_device_info(
// If there is any, send it
println!("{extended_data:?}");
if !extended_data.is_empty() {
send_response(socket, wasm_pipe_types::WasmResponse::DeviceExt { data: extended_data }).await;
tx.send(WasmResponse::DeviceExt { data: extended_data }).await.unwrap();
}
}
}
#[tracing::instrument(skip(cnn, tx, key, device_id, period))]
pub async fn send_extended_device_snr_graph(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
device_id: &str,
period: InfluxTimePeriod,
@ -101,7 +104,7 @@ pub async fn send_extended_device_snr_graph(
};
sn.push(snr);
});
send_response(socket, wasm_pipe_types::WasmResponse::DeviceExtSnr { data: sn, device_id: device_id.to_string() }).await;
tx.send(WasmResponse::DeviceExtSnr { data: sn, device_id: device_id.to_string() }).await?;
}
Ok(())
}
@ -129,9 +132,10 @@ pub struct SnrRow {
pub time: DateTime<FixedOffset>,
}
#[tracing::instrument(skip(cnn, tx, key, device_id, period))]
pub async fn send_extended_device_capacity_graph(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
device_id: &str,
period: InfluxTimePeriod,
@ -165,7 +169,7 @@ pub async fn send_extended_device_capacity_graph(
};
sn.push(snr);
});
send_response(socket, wasm_pipe_types::WasmResponse::DeviceExtCapacity { data: sn, device_id: device_id.to_string() }).await;
tx.send(WasmResponse::DeviceExtCapacity { data: sn, device_id: device_id.to_string() }).await;
}
Ok(())
}

View File

@ -1,9 +1,8 @@
use axum::extract::ws::WebSocket;
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::{Client, FromDataPoint, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use wasm_pipe_types::{PerfHost, Perf};
use crate::web::wss::send_response;
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::{PerfHost, Perf, WasmResponse};
use super::time_period::InfluxTimePeriod;
#[derive(Debug, FromDataPoint)]
@ -29,14 +28,14 @@ impl Default for PerfRow {
pub async fn send_perf_for_node(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
node_id: String,
node_name: String,
) -> anyhow::Result<()> {
let node = get_perf_for_node(cnn, key, node_id, node_name, period).await?;
send_response(socket, wasm_pipe_types::WasmResponse::NodePerfChart { nodes: vec![node] }).await;
tx.send(WasmResponse::NodePerfChart { nodes: vec![node] }).await?;
Ok(())
}

View File

@ -2,11 +2,11 @@
mod packet_row;
use self::packet_row::PacketRow;
use super::time_period::InfluxTimePeriod;
use crate::web::wss::{influx_query_builder::InfluxQueryBuilder, send_response};
use axum::extract::ws::WebSocket;
use crate::web::wss::influx_query_builder::InfluxQueryBuilder;
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{PacketHost, Packets};
use wasm_pipe_types::{PacketHost, Packets, WasmResponse};
fn add_by_direction(direction: &str, down: &mut Vec<Packets>, up: &mut Vec<Packets>, row: &PacketRow) {
match direction {
@ -30,10 +30,10 @@ fn add_by_direction(direction: &str, down: &mut Vec<Packets>, up: &mut Vec<Packe
}
}
#[instrument(skip(cnn, socket, key, period))]
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_packets_for_all_nodes(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
@ -69,14 +69,14 @@ pub async fn send_packets_for_all_nodes(
});
}
});
send_response(socket, wasm_pipe_types::WasmResponse::PacketChart { nodes }).await;
tx.send(wasm_pipe_types::WasmResponse::PacketChart { nodes }).await?;
Ok(())
}
#[instrument(skip(cnn, socket, key, period))]
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_packets_for_node(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
node_id: &str,
@ -85,11 +85,7 @@ pub async fn send_packets_for_node(
let node =
get_packets_for_node(cnn, key, node_id.to_string(), node_name.to_string(), period).await?;
send_response(
socket,
wasm_pipe_types::WasmResponse::PacketChart { nodes: vec![node] },
)
.await;
tx.send(wasm_pipe_types::WasmResponse::PacketChart { nodes: vec![node] }).await?;
Ok(())
}

View File

@ -3,19 +3,19 @@ pub use per_node::*;
mod per_site;
pub use per_site::*;
use axum::extract::ws::WebSocket;
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{RttHost, Rtt};
use crate::web::wss::{queries::rtt::rtt_row::RttCircuitRow, send_response};
use wasm_pipe_types::{RttHost, Rtt, WasmResponse};
use crate::web::wss::queries::rtt::rtt_row::RttCircuitRow;
use self::rtt_row::RttRow;
use super::time_period::InfluxTimePeriod;
mod rtt_row;
#[instrument(skip(cnn, socket, key, site_id, period))]
pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
#[instrument(skip(cnn, tx, key, site_id, period))]
pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, site_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_rtt_for_all_nodes_circuit(cnn, key, &site_id, period).await?;
let mut histogram = vec![0; 20];
@ -26,11 +26,11 @@ pub async fn send_rtt_for_all_nodes_circuit(cnn: &Pool<Postgres>, socket: &mut W
}
}
send_response(socket, wasm_pipe_types::WasmResponse::RttChartCircuit { nodes, histogram }).await;
tx.send(WasmResponse::RttChartCircuit { nodes, histogram }).await?;
Ok(())
}
pub async fn send_rtt_for_node(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod, node_id: String, node_name: String) -> anyhow::Result<()> {
pub async fn send_rtt_for_node(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, period: InfluxTimePeriod, node_id: String, node_name: String) -> anyhow::Result<()> {
let node = get_rtt_for_node(cnn, key, node_id, node_name, period).await?;
let nodes = vec![node];
@ -42,7 +42,7 @@ pub async fn send_rtt_for_node(cnn: &Pool<Postgres>, socket: &mut WebSocket, key
}
}*/
send_response(socket, wasm_pipe_types::WasmResponse::RttChart { nodes }).await;
tx.send(WasmResponse::RttChart { nodes }).await?;
Ok(())
}

View File

@ -1,18 +1,18 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, send_response, influx_query_builder::InfluxQueryBuilder};
use axum::extract::ws::WebSocket;
use crate::web::wss::{queries::time_period::InfluxTimePeriod, influx_query_builder::InfluxQueryBuilder};
use pgdb::{
sqlx::{Pool, Postgres},
NodeStatus
};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{Rtt, RttHost};
use wasm_pipe_types::{Rtt, RttHost, WasmResponse};
use super::rtt_row::{RttRow, RttHistoRow};
#[instrument(skip(cnn, socket, key, period))]
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_rtt_for_all_nodes(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
@ -24,15 +24,15 @@ pub async fn send_rtt_for_all_nodes(
.await?;
let node_status = pgdb::node_status(cnn, key).await?;
let nodes = rtt_rows_to_result(rows, node_status);
send_response(socket, wasm_pipe_types::WasmResponse::RttChart { nodes }).await;
tx.send(WasmResponse::RttChart { nodes }).await?;
Ok(())
}
#[instrument(skip(cnn, socket, key, period))]
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_rtt_histogram_for_all_nodes(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
@ -50,7 +50,7 @@ pub async fn send_rtt_histogram_for_all_nodes(
histo[bucket] += 1;
});
send_response(socket, wasm_pipe_types::WasmResponse::RttHistogram { histogram: histo }).await;
tx.send(WasmResponse::RttHistogram { histogram: histo }).await?;
Ok(())
}

View File

@ -1,16 +1,16 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, send_response, influx_query_builder::InfluxQueryBuilder};
use axum::extract::ws::WebSocket;
use crate::web::wss::{queries::time_period::InfluxTimePeriod, influx_query_builder::InfluxQueryBuilder};
use pgdb::{
sqlx::{Pool, Postgres},
NodeStatus
};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{Rtt, RttHost};
use wasm_pipe_types::{Rtt, RttHost, WasmResponse};
use super::rtt_row::RttSiteRow;
#[instrument(skip(cnn, socket, key, period))]
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_rtt_for_all_nodes_site(
cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_name: String, period: InfluxTimePeriod
cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, site_name: String, period: InfluxTimePeriod
) -> anyhow::Result<()> {
let rows = InfluxQueryBuilder::new(period.clone())
.with_measurement("tree")
@ -21,7 +21,7 @@ pub async fn send_rtt_for_all_nodes_site(
.await?;
let node_status = pgdb::node_status(cnn, key).await?;
let nodes = rtt_rows_to_result(rows, node_status);
send_response(socket, wasm_pipe_types::WasmResponse::RttChartSite { nodes }).await;
tx.send(WasmResponse::RttChartSite { nodes }).await?;
Ok(())
}

View File

@ -1,12 +1,11 @@
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use wasm_pipe_types::SearchResult;
use crate::web::wss::send_response;
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::{SearchResult, WasmResponse};
#[tracing::instrument(skip(cnn, tx, key, term))]
pub async fn omnisearch(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
term: &str,
) -> anyhow::Result<()> {
@ -25,7 +24,7 @@ pub async fn omnisearch(
hits.dedup_by(|a,b| a.name == b.name && a.url == b.url);
hits.sort_by(|a,b| a.score.partial_cmp(&b.score).unwrap());
send_response(socket, wasm_pipe_types::WasmResponse::SearchResult { hits }).await;
tx.send(WasmResponse::SearchResult { hits }).await?;
Ok(())
}

View File

@ -1,11 +1,10 @@
use super::time_period::InfluxTimePeriod;
use crate::web::wss::influx_query_builder::InfluxQueryBuilder;
use crate::web::wss::send_response;
use axum::extract::ws::WebSocket;
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::FromDataPoint;
use pgdb::sqlx::{Pool, Postgres};
use serde::Serialize;
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use std::collections::HashMap;
use wasm_pipe_types::WasmResponse;
@ -27,10 +26,10 @@ fn headings_sorter<T: HeatMapData>(rows: Vec<T>) -> HashMap<String, Vec<(DateTim
sorter
}
#[instrument(skip(cnn,socket,key,period))]
#[instrument(skip(cnn,tx,key,period))]
pub async fn root_heat_map(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
) -> anyhow::Result<()> {
@ -47,7 +46,7 @@ pub async fn root_heat_map(
.await?;
let sorter = headings_sorter(rows);
send_response(socket, WasmResponse::RootHeat { data: sorter }).await;
tx.send(WasmResponse::RootHeat { data: sorter }).await?;
Ok(())
}
@ -82,10 +81,10 @@ async fn site_circuits_heat_map(
Ok(rows)
}
#[instrument(skip(cnn, socket, key, period))]
#[instrument(skip(cnn, tx, key, period))]
pub async fn site_heat_map(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
site_name: &str,
period: InfluxTimePeriod,
@ -118,7 +117,7 @@ pub async fn site_heat_map(
});
let sorter = headings_sorter(rows);
send_response(socket, WasmResponse::SiteHeat { data: sorter }).await;
tx.send(WasmResponse::SiteHeat { data: sorter }).await?;
Ok(())
}

View File

@ -1,8 +1,7 @@
use super::site_tree::tree_to_host;
use crate::web::wss::send_response;
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use serde::Serialize;
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::{SiteTree, WasmResponse, SiteOversubscription};
#[derive(Serialize)]
@ -13,7 +12,7 @@ struct SiteInfoMessage {
pub async fn send_site_info(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
site_id: &str,
) {
@ -30,7 +29,7 @@ pub async fn send_site_info(
dlmin: oversub.dlmin,
devicecount: oversub.devicecount,
};
send_response(socket, WasmResponse::SiteInfo { data: host, oversubscription }).await;
tx.send(WasmResponse::SiteInfo { data: host, oversubscription }).await.unwrap();
} else {
tracing::error!("{oversub:?}");
}

View File

@ -1,21 +1,21 @@
use axum::extract::ws::WebSocket;
use pgdb::sqlx::{Pool, Postgres};
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::WasmResponse;
use crate::web::wss::send_response;
#[tracing::instrument(skip(cnn, tx, key, site_name))]
pub async fn send_site_parents(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
site_name: &str,
) {
if let Ok(parents) = pgdb::get_parent_list(cnn, key, site_name).await {
send_response(socket, wasm_pipe_types::WasmResponse::SiteParents { data: parents }).await;
tx.send(WasmResponse::SiteParents { data: parents }).await.unwrap();
}
let child_result = pgdb::get_child_list(cnn, key, site_name).await;
if let Ok(children) = child_result {
send_response(socket, wasm_pipe_types::WasmResponse::SiteChildren { data: children }).await;
tx.send(WasmResponse::SiteChildren { data: children }).await.unwrap();
} else {
tracing::error!("Error getting children: {:?}", child_result);
}
@ -23,24 +23,24 @@ pub async fn send_site_parents(
pub async fn send_circuit_parents(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
circuit_id: &str,
) {
if let Ok(parents) = pgdb::get_circuit_parent_list(cnn, key, circuit_id).await {
send_response(socket, wasm_pipe_types::WasmResponse::SiteParents { data: parents }).await;
tx.send(WasmResponse::SiteParents { data: parents }).await.unwrap();
}
}
pub async fn send_root_parents(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
) {
let site_name = "Root";
let child_result = pgdb::get_child_list(cnn, key, site_name).await;
if let Ok(children) = child_result {
send_response(socket, wasm_pipe_types::WasmResponse::SiteChildren { data: children }).await;
tx.send(WasmResponse::SiteChildren { data: children }).await.unwrap();
} else {
tracing::error!("Error getting children: {:?}", child_result);
}

View File

@ -1,19 +1,19 @@
use axum::extract::ws::WebSocket;
use pgdb::{
sqlx::{Pool, Postgres},
TreeNode,
};
use wasm_pipe_types::SiteTree;
use crate::web::wss::send_response;
use tokio::sync::mpsc::Sender;
use wasm_pipe_types::{SiteTree, WasmResponse};
pub async fn send_site_tree(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, parent: &str) {
#[tracing::instrument(skip(cnn, tx, key, parent))]
pub async fn send_site_tree(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, parent: &str) {
let tree = pgdb::get_site_tree(cnn, key, parent).await.unwrap();
let tree = tree
.into_iter()
.map(tree_to_host)
.collect::<Vec<SiteTree>>();
send_response(socket, wasm_pipe_types::WasmResponse::SiteTree { data: tree }).await;
tx.send(WasmResponse::SiteTree { data: tree }).await.unwrap();
}
pub(crate) fn tree_to_host(row: TreeNode) -> SiteTree {

View File

@ -1,12 +1,12 @@
use std::collections::HashMap;
mod site_stack;
use axum::extract::ws::WebSocket;
use futures::future::join_all;
use influxdb2::{Client, models::Query};
use pgdb::{sqlx::{Pool, Postgres}, organization_cache::get_org_details};
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use wasm_pipe_types::{ThroughputHost, Throughput};
use crate::web::wss::{send_response, influx_query_builder::InfluxQueryBuilder};
use wasm_pipe_types::{ThroughputHost, Throughput, WasmResponse};
use crate::web::wss::influx_query_builder::InfluxQueryBuilder;
use self::throughput_row::{ThroughputRow, ThroughputRowBySite, ThroughputRowByCircuit};
use super::time_period::InfluxTimePeriod;
mod throughput_row;
@ -56,8 +56,8 @@ fn add_by_direction_site(direction: &str, down: &mut Vec<Throughput>, up: &mut V
}
}
#[instrument(skip(cnn, socket, key, period))]
pub async fn send_throughput_for_all_nodes(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_throughput_for_all_nodes(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut nodes = Vec::<ThroughputHost>::new();
InfluxQueryBuilder::new(period.clone())
@ -85,12 +85,12 @@ pub async fn send_throughput_for_all_nodes(cnn: &Pool<Postgres>, socket: &mut We
nodes.push(ThroughputHost { node_id: row.host_id, node_name, down, up });
}
});
send_response(socket, wasm_pipe_types::WasmResponse::BitsChart { nodes }).await;
tx.send(WasmResponse::BitsChart { nodes }).await?;
Ok(())
}
#[instrument(skip(cnn, socket, key, period, site_name))]
pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, site_name: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
#[instrument(skip(cnn, tx, key, period, site_name))]
pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, site_name: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let node_status = pgdb::node_status(cnn, key).await?;
let mut nodes = Vec::<ThroughputHost>::new();
InfluxQueryBuilder::new(period.clone())
@ -119,7 +119,7 @@ pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, socket:
nodes.push(ThroughputHost { node_id: row.host_id, node_name, down, up });
}
});
send_response(socket, wasm_pipe_types::WasmResponse::BitsChart { nodes }).await;
tx.send(WasmResponse::BitsChart { nodes }).await?;
Ok(())
}
@ -131,15 +131,15 @@ pub async fn send_throughput_for_all_nodes_by_site(cnn: &Pool<Postgres>, socket:
Ok(())
}*/
pub async fn send_throughput_for_all_nodes_by_circuit(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, circuit_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
pub async fn send_throughput_for_all_nodes_by_circuit(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, circuit_id: String, period: InfluxTimePeriod) -> anyhow::Result<()> {
let nodes = get_throughput_for_all_nodes_by_circuit(cnn, key, period, &circuit_id).await?;
send_response(socket, wasm_pipe_types::WasmResponse::BitsChart { nodes }).await;
tx.send(WasmResponse::BitsChart { nodes }).await?;
Ok(())
}
pub async fn send_throughput_for_node(cnn: &Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod, node_id: String, node_name: String) -> anyhow::Result<()> {
pub async fn send_throughput_for_node(cnn: &Pool<Postgres>, tx: Sender<WasmResponse>, key: &str, period: InfluxTimePeriod, node_id: String, node_name: String) -> anyhow::Result<()> {
let node = get_throughput_for_node(cnn, key, node_id, node_name, period).await?;
send_response(socket, wasm_pipe_types::WasmResponse::BitsChart { nodes: vec![node] }).await;
tx.send(WasmResponse::BitsChart { nodes: vec![node] }).await?;
Ok(())
}

View File

@ -1,12 +1,12 @@
use crate::web::wss::{queries::time_period::InfluxTimePeriod, send_response};
use axum::extract::ws::WebSocket;
use crate::web::wss::queries::time_period::InfluxTimePeriod;
use pgdb::{
organization_cache::get_org_details,
sqlx::{Pool, Postgres},
OrganizationDetails,
};
use tokio::sync::mpsc::Sender;
use tracing::{error, instrument};
use wasm_pipe_types::SiteStackHost;
use wasm_pipe_types::{SiteStackHost, WasmResponse};
#[derive(Debug, influxdb2::FromDataPoint)]
pub struct SiteStackRow {
@ -48,10 +48,10 @@ impl Default for CircuitStackRow {
}
}
#[instrument(skip(cnn, socket, key, period))]
#[instrument(skip(cnn, tx, key, period))]
pub async fn send_site_stack_map(
cnn: &Pool<Postgres>,
socket: &mut WebSocket,
tx: Sender<WasmResponse>,
key: &str,
period: InfluxTimePeriod,
site_id: String,
@ -78,11 +78,7 @@ pub async fn send_site_stack_map(
reduce_to_x_entries(&mut result);
// Send the reply
send_response(
socket,
wasm_pipe_types::WasmResponse::SiteStack { nodes: result },
)
.await;
tx.send(WasmResponse::SiteStack { nodes: result }).await?;
}
}
}