WIP: the new wss system for collecting data is proven to function, now fleshing it out.

This commit is contained in:
Herbert Wolverson 2025-01-29 09:23:04 -06:00
parent 009a764114
commit fdce3029a8
13 changed files with 585 additions and 309 deletions

16
src/rust/Cargo.lock generated
View File

@ -1143,6 +1143,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
[[package]]
name = "futures-macro"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.87",
]
[[package]]
name = "futures-sink"
version = "0.3.31"
@ -1163,6 +1174,7 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
@ -2074,6 +2086,7 @@ dependencies = [
"default-net",
"dryoc 0.6.2",
"flate2",
"futures-util",
"fxhash",
"ip_network",
"ip_network_table",
@ -2103,6 +2116,7 @@ dependencies = [
"sysinfo",
"timerfd",
"tokio",
"tokio-tungstenite",
"tower-http",
"tracing",
"tracing-subscriber",
@ -3652,7 +3666,9 @@ checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9"
dependencies = [
"futures-util",
"log",
"native-tls",
"tokio",
"tokio-native-tls",
"tungstenite",
]

View File

@ -53,6 +53,7 @@ uuid = { version = "1.10.0", features = ["v4", "fast-rng", "serde"] }
dryoc = { version ="0.6.0", features = ["serde"] }
miniz_oxide = "0.8.0"
tungstenite = { version = "0", features = [ "native-tls" ] } # For WebSockets
tokio-tungstenite = { version = "0", features = [ "native-tls" ] }
native-tls = "0.2"
ureq = { version = "2.12.1", features = ["json", "native-tls"] }
@ -65,3 +66,4 @@ allocative_derive = "0.3.3"
#[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]
#jemallocator = { workspace = true }
mimalloc = { workspace = true }
futures-util = "0.3.31"

View File

@ -21,6 +21,7 @@ export class ThroughputRingBufferGraphTimescale extends DashboardGraph {
let seconds = periodNameToSeconds(period);
console.log("Requesting Insight History Data");
$.get("local-api/ltsThroughput/" + seconds, (data) => {
console.log("Received Insight History Data", data);
let shaperDown = new MinMaxSeries("Down", 1);
let shaperUp = new MinMaxSeries(" Up", 1);
data.forEach((r) => {

View File

@ -22,7 +22,7 @@ use crate::node_manager::auth::auth_layer;
use tower_http::cors::CorsLayer;
use crate::node_manager::shaper_queries_actor::ShaperQueryCommand;
pub fn local_api(shaper_query: crossbeam_channel::Sender<ShaperQueryCommand>) -> Router {
pub fn local_api(shaper_query: tokio::sync::mpsc::Sender<ShaperQueryCommand>) -> Router {
Router::new()
.route("/dashletThemes", get(dashboard_themes::list_themes))
.route("/dashletSave", post(dashboard_themes::save_theme))

View File

@ -2,12 +2,14 @@ use axum::extract::Path;
use axum::http::StatusCode;
use axum::{Extension, Json};
use serde::{Deserialize, Serialize};
use tracing::info;
use tracing::log::warn;
use lqos_config::load_config;
use crate::node_manager::local_api::lts::rest_client::lts_query;
use crate::node_manager::shaper_queries_actor::ShaperQueryCommand;
#[derive(Serialize, Deserialize, Copy, Clone)]
#[derive(Debug)]
pub struct ThroughputData {
time: i64, // Unix timestamp
max_down: i64,
@ -98,24 +100,27 @@ pub async fn last_24_hours()-> Result<Json<Vec<ThroughputData>>, StatusCode> {
}
pub async fn throughput_period(
Extension(shaper_query): Extension<crossbeam_channel::Sender<ShaperQueryCommand>>,
Extension(shaper_query): Extension<tokio::sync::mpsc::Sender<ShaperQueryCommand>>,
Path(seconds): Path<i32>,
)-> Result<Json<Vec<ThroughputData>>, StatusCode> {
info!("Requesting throughput data for {} seconds", seconds);
let (tx, rx) = tokio::sync::oneshot::channel();
shaper_query.send(ShaperQueryCommand::ShaperThroughput { seconds, reply: tx }).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
shaper_query.send(ShaperQueryCommand::ShaperThroughput { seconds, reply: tx }).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
info!("Sent throughput request. Awaiting reply.");
let throughput = rx.await.map_err(|e| {
warn!("Error getting total throughput: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
info!("Received throughput data.");
Ok(Json(throughput))
}
pub async fn packets_period(
Extension(shaper_query): Extension<crossbeam_channel::Sender<ShaperQueryCommand>>,
Extension(shaper_query): Extension<tokio::sync::mpsc::Sender<ShaperQueryCommand>>,
Path(seconds): Path<i32>,
)-> Result<Json<Vec<FullPacketData>>, StatusCode> {
let (tx, rx) = tokio::sync::oneshot::channel();
shaper_query.send(ShaperQueryCommand::ShaperPackets { seconds, reply: tx }).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
shaper_query.send(ShaperQueryCommand::ShaperPackets { seconds, reply: tx }).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let throughput = rx.await.map_err(|e| {
warn!("Error getting total throughput: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
@ -124,11 +129,11 @@ pub async fn packets_period(
}
pub async fn percent_shaped_period(
Extension(shaper_query): Extension<crossbeam_channel::Sender<ShaperQueryCommand>>,
Extension(shaper_query): Extension<tokio::sync::mpsc::Sender<ShaperQueryCommand>>,
Path(seconds): Path<i32>,
)-> Result<Json<Vec<PercentShapedWeb>>, StatusCode> {
let (tx, rx) = tokio::sync::oneshot::channel();
shaper_query.send(ShaperQueryCommand::ShaperPercent { seconds, reply: tx }).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
shaper_query.send(ShaperQueryCommand::ShaperPercent { seconds, reply: tx }).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let throughput = rx.await.map_err(|e| {
warn!("Error getting total throughput: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR
@ -137,11 +142,11 @@ pub async fn percent_shaped_period(
}
pub async fn percent_flows_period(
Extension(shaper_query): Extension<crossbeam_channel::Sender<ShaperQueryCommand>>,
Extension(shaper_query): Extension<tokio::sync::mpsc::Sender<ShaperQueryCommand>>,
Path(seconds): Path<i32>,
)-> Result<Json<Vec<FlowCountViewWeb>>, StatusCode> {
let (tx, rx) = tokio::sync::oneshot::channel();
shaper_query.send(ShaperQueryCommand::ShaperFlows { seconds, reply: tx }).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
shaper_query.send(ShaperQueryCommand::ShaperFlows { seconds, reply: tx }).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let throughput = rx.await.map_err(|e| {
warn!("Error getting total throughput: {:?}", e);
StatusCode::INTERNAL_SERVER_ERROR

View File

@ -37,7 +37,7 @@ pub async fn spawn_webserver(
let listener = TcpListener::bind(&listen_address).await?;
// Setup shaper queries
let shaper_tx = shaper_queries_actor();
let shaper_tx = shaper_queries_actor().await;
// Construct the router from parts
let router = Router::new()

View File

@ -1,135 +1,20 @@
mod ws;
mod timed_cache;
mod queries;
mod caches;
mod commands;
mod ws_message;
mod remote_insight;
use std::time::Duration;
use tracing::{info, warn};
use crate::node_manager::local_api::lts::{FlowCountViewWeb, FullPacketData, PercentShapedWeb, ThroughputData};
use crate::node_manager::shaper_queries_actor::timed_cache::TimedCache;
pub use crate::node_manager::shaper_queries_actor::commands::ShaperQueryCommand;
use crate::node_manager::shaper_queries_actor::queries::shaper_queries;
pub enum ShaperQueryCommand {
ShaperThroughput { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<ThroughputData>> },
ShaperPackets { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<FullPacketData>> },
ShaperPercent { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<PercentShapedWeb>> },
ShaperFlows { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<FlowCountViewWeb>> },
}
pub fn shaper_queries_actor() -> crossbeam_channel::Sender<ShaperQueryCommand> {
let (tx, rx) = crossbeam_channel::bounded(128);
let _ = std::thread::Builder::new().name("shaper_queries_actor".to_string()).spawn(move || {
shaper_queries(rx);
});
pub async fn shaper_queries_actor() -> tokio::sync::mpsc::Sender<ShaperQueryCommand> {
let (tx, rx) = tokio::sync::mpsc::channel(128);
tokio::spawn(shaper_queries(rx));
tx
}
fn shaper_queries(rx: crossbeam_channel::Receiver<ShaperQueryCommand>) {
info!("Starting the shaper query actor.");
let mut caches = Caches::new();
while let Ok(command) = rx.recv() {
caches.cleanup();
match command {
ShaperQueryCommand::ShaperThroughput { seconds, reply } => {
if let Some(result) = caches.throughput.get(&seconds) {
info!("Cache hit for {seconds} seconds throughput");
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.throughput.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}
ShaperQueryCommand::ShaperPackets { seconds, reply } => {
if let Some(result) = caches.packets.get(&seconds) {
info!("Cache hit for {seconds} seconds packets");
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.packets.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}
ShaperQueryCommand::ShaperPercent { seconds, reply } => {
if let Some(result) = caches.percent_shaped.get(&seconds) {
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.percent_shaped.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}
ShaperQueryCommand::ShaperFlows { seconds, reply } => {
if let Some(result) = caches.flows.get(&seconds) {
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.flows.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}
}
}
warn!("Shaper query actor closing.")
}
const CACHE_DURATION: Duration = Duration::from_secs(60 * 5);
struct Caches {
throughput: TimedCache<i32, Vec<ThroughputData>>,
packets: TimedCache<i32, Vec<FullPacketData>>,
percent_shaped: TimedCache<i32, Vec<PercentShapedWeb>>,
flows: TimedCache<i32, Vec<FlowCountViewWeb>>,
}
impl Caches {
fn new() -> Self {
Self {
throughput: TimedCache::new(CACHE_DURATION),
packets: TimedCache::new(CACHE_DURATION),
percent_shaped: TimedCache::new(CACHE_DURATION),
flows: TimedCache::new(CACHE_DURATION),
}
}
fn cleanup(&mut self) {
self.throughput.cleanup();
}
}

View File

@ -0,0 +1,87 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use serde::de::DeserializeOwned;
use tokio::sync::Mutex;
use tracing::{info, warn};
use crate::node_manager::local_api::lts::{FlowCountViewWeb, FullPacketData, PercentShapedWeb, ThroughputData};
use crate::node_manager::shaper_queries_actor::timed_cache::TimedCache;
const CACHE_DURATION: Duration = Duration::from_secs(60 * 5);
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub enum CacheType {
Throughput,
Packets,
PercentShaped,
Flows,
}
impl CacheType {
fn from_str(tag: &str) -> Self {
match tag {
"throughput" => Self::Throughput,
"packets" => Self::Packets,
"percent_shaped" => Self::PercentShaped,
"flows" => Self::Flows,
_ => panic!(),
}
}
}
pub struct Caches {
on_update: tokio::sync::broadcast::Sender<CacheType>,
cache: Mutex<HashMap<(CacheType, i32), (Instant, Vec<u8>)>>,
}
impl Caches {
pub fn new() -> (Arc<Self>, tokio::sync::broadcast::Receiver<CacheType>) {
let (tx, rx) = tokio::sync::broadcast::channel(32);
(Arc::new(Self {
on_update: tx,
cache: Mutex::new(HashMap::new()),
}), rx)
}
pub async fn cleanup(&self) {
let now = Instant::now();
let mut cache = self.cache.lock().await;
cache.retain(|(_, _), (time, _)| now.duration_since(*time) < CACHE_DURATION);
}
pub async fn store(&self, tag: String, seconds: i32, data: Vec<u8>) {
let mut cache = self.cache.lock().await;
let tag = match tag.as_str() {
"throughput" => CacheType::Throughput,
"packets" => CacheType::Packets,
"percent_shaped" => CacheType::PercentShaped,
"flows" => CacheType::Flows,
_ => return,
};
cache.insert((tag, seconds), (Instant::now(), data));
let _ = self.on_update.send(tag);
}
pub async fn get<T: Cacheable + DeserializeOwned>(&self, seconds: i32) -> Option<Vec<T>> {
let cache = self.cache.lock().await;
let tag = T::tag();
let (_, data) = cache.get(&(tag, seconds))?;
info!("Cache hit for {} seconds {:?}. Length: {}", seconds, tag, data.len());
let deserialized = serde_cbor::from_slice(&data);
if let Err(e) = deserialized {
warn!("Failed to deserialize cache: {:?}", e);
return None;
}
Some(deserialized.unwrap())
}
}
pub trait Cacheable {
fn tag() -> CacheType;
}
impl Cacheable for ThroughputData {
fn tag() -> CacheType {
CacheType::Throughput
}
}

View File

@ -0,0 +1,8 @@
use crate::node_manager::local_api::lts::{FlowCountViewWeb, FullPacketData, PercentShapedWeb, ThroughputData};
pub enum ShaperQueryCommand {
ShaperThroughput { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<ThroughputData>> },
ShaperPackets { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<FullPacketData>> },
ShaperPercent { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<PercentShapedWeb>> },
ShaperFlows { seconds: i32, reply: tokio::sync::oneshot::Sender<Vec<FlowCountViewWeb>> },
}

View File

@ -0,0 +1,147 @@
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::time::error::Elapsed;
use tokio::time::timeout;
use tracing::{info, warn};
use crate::node_manager::local_api::lts::ThroughputData;
use crate::node_manager::shaper_queries_actor::{remote_insight, ShaperQueryCommand};
use crate::node_manager::shaper_queries_actor::caches::{CacheType, Caches};
pub async fn shaper_queries(mut rx: tokio::sync::mpsc::Receiver<ShaperQueryCommand>) {
info!("Starting the shaper query actor.");
// Initialize the cache system
let (mut caches, mut broadcast_rx) = Caches::new();
let mut remote_insight = remote_insight::RemoteInsight::new(caches.clone());
while let Some(command) = rx.recv().await {
caches.cleanup().await;
match command {
ShaperQueryCommand::ShaperThroughput { seconds, reply } => {
// Check for cache hit
if let Some(result) = caches.get::<ThroughputData>(seconds).await {
info!("Cache hit for {seconds} seconds throughput");
let _ = reply.send(result.clone());
return;
}
info!("Cache miss for {seconds} seconds throughput");
info!("Requesting {seconds} seconds throughput from remote insight.");
remote_insight.command(remote_insight::RemoteInsightCommand::ShaperThroughput { seconds }).await;
// Tokio timer that ticks in 30 seconds
let my_caches = caches.clone();
let mut my_broadcast_rx = broadcast_rx.resubscribe();
tokio::spawn(async move {
let mut timer = tokio::time::interval(Duration::from_secs(30));
let mut timer_count = 0;
loop {
tokio::select! {
_ = timer.tick() => {
// Timed out
timer_count += 1;
if timer_count > 1 {
info!("Timeout for {seconds} seconds throughput");
let _ = reply.send(vec![]);
break;
}
}
Ok(CacheType::Throughput) = my_broadcast_rx.recv() => {
// Cache updated
info!("Cache updated for {seconds} seconds throughput");
if let Some(result) = my_caches.get(seconds).await {
info!("Sending {seconds} seconds throughput");
if let Err(e) = reply.send(result) {
warn!("Failed to send {seconds} seconds throughput. Oneshot died?");
}
info!("Sent");
break;
}
}
}
}
});
}
_ => {}
/*ShaperQueryCommand::ShaperThroughput { seconds, reply } => {
if let Some(result) = caches.throughput.get(&seconds) {
info!("Cache hit for {seconds} seconds throughput");
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.throughput.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}
ShaperQueryCommand::ShaperPackets { seconds, reply } => {
if let Some(result) = caches.packets.get(&seconds) {
info!("Cache hit for {seconds} seconds packets");
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.packets.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}
ShaperQueryCommand::ShaperPercent { seconds, reply } => {
if let Some(result) = caches.percent_shaped.get(&seconds) {
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.percent_shaped.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}
ShaperQueryCommand::ShaperFlows { seconds, reply } => {
if let Some(result) = caches.flows.get(&seconds) {
let _ = reply.send(result.clone());
} else {
// Get the data
let result = ws::get_remote_data(&mut caches, seconds);
// Return from the cache once more
if result.is_ok() {
let Some(result) = caches.flows.get(&seconds) else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
return;
};
let _ = reply.send(result.clone());
} else {
warn!("Failed to get data for {seconds} seconds: {result:?}");
}
}
}*/
}
}
warn!("Shaper query actor closing.")
}

View File

@ -0,0 +1,269 @@
use crate::node_manager::shaper_queries_actor::caches::Caches;
use crate::node_manager::shaper_queries_actor::ws_message::WsMessage;
use anyhow::{anyhow, bail};
use futures_util::stream::{SplitSink, StreamExt};
use futures_util::SinkExt;
use lqos_config::load_config;
use std::net::ToSocketAddrs;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::select;
use tokio::time::timeout;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::{info, warn};
const TCP_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone, Debug)]
pub enum RemoteInsightCommand {
Ping,
ShaperThroughput { seconds: i32 },
}
pub struct RemoteInsight {
tx: Option<tokio::sync::mpsc::Sender<RemoteInsightCommand>>,
caches: Arc<Caches>,
}
impl RemoteInsight {
pub fn new(caches: Arc<Caches>) -> Self {
Self { tx: None, caches }
}
async fn connect(&mut self) {
let (tx, rx) = tokio::sync::mpsc::channel(128);
let (ready_tx, ready_rx) = tokio::sync::oneshot::channel();
tokio::spawn(run_remote_insight(rx, ready_tx, self.caches.clone()));
let _ = ready_rx.await;
self.tx = Some(tx);
info!("Connected to remote insight (processor layer)");
}
pub async fn command(&mut self, command: RemoteInsightCommand)
{
if self.tx.is_none() {
self.connect().await;
}
let mut failed = false;
if let Some(tx) = self.tx.as_ref() {
let ping = tx.send(RemoteInsightCommand::Ping).await;
if ping.is_err() {
failed = true;
}
let result = tx.send(command.clone()).await;
if result.is_err() {
failed = true;
}
}
if failed {
self.tx = None;
}
}
}
async fn run_remote_insight(
mut command: tokio::sync::mpsc::Receiver<RemoteInsightCommand>,
ready: tokio::sync::oneshot::Sender<()>,
caches: Arc<Caches>,
) -> anyhow::Result<()> {
let mut socket = connect().await?;
let (mut write, mut read) = socket.split();
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
// Negotiation
info!("Waiting for IdentifyYourself");
let msg = read.next().await;
let Some(Ok(msg)) = msg else {
warn!("Failed to read from shaper query server");
bail!("Failed to read from shaper query server");
};
let tungstenite::Message::Binary(payload) = msg else {
warn!("Failed to read from shaper query server");
bail!("Failed to read from shaper query server");
};
let message = WsMessage::from_bytes(&payload)?;
match message {
WsMessage::IdentifyYourself => {
info!("Sending Hello");
send_hello(&mut write).await?;
}
_ => {
warn!("Unexpected message from shaper query server");
bail!("Unexpected message from shaper query server");
}
}
// Wait for a TokenInvalid or TokenValid
info!("Waiting for token response");
let msg = read.next().await;
let Some(Ok(msg)) = msg else {
warn!("Failed to read from shaper query server");
bail!("Failed to read from shaper query server");
};
let tungstenite::Message::Binary(payload) = msg else {
warn!("Failed to read from shaper query server");
bail!("Failed to read from shaper query server");
};
let message = WsMessage::from_bytes(&payload)?;
match message {
WsMessage::TokenAccepted => {
info!("Token accepted");
}
WsMessage::InvalidToken => {
warn!("Invalid token");
bail!("Invalid token");
}
_ => {
warn!("Unexpected message from shaper query server");
bail!("Unexpected message from shaper query server");
}
}
// Ready
info!("Ready to receive commands");
ready.send(()).map_err(|_| anyhow!("Failed to send ready message"))?;
let timeout = Duration::from_secs(60);
let mut ticker = tokio::time::interval(timeout);
let mut timeout_count = 0;
loop {
select! {
_ = ticker.tick() => {
info!("Shaper WSS timeout reached");
timeout_count += 1;
if timeout_count > 1 {
warn!("Too many timeouts, closing connection");
break;
}
}
command = command.recv() => {
info!("Received command: {command:?}");
match command {
None => break,
Some(RemoteInsightCommand::Ping) => {
// Do nothing - this ensures the channel is alive
}
Some(RemoteInsightCommand::ShaperThroughput { seconds }) => {
let msg = WsMessage::ShaperThroughput { seconds }.to_bytes()?;
tx.send(tungstenite::Message::Binary(msg)).await?;
}
}
}
msg = read.next() => {
let Some(Ok(msg)) = msg else {
warn!("Failed to read from shaper query server");
break;
};
match msg {
tungstenite::Message::Ping(_) => {
write.send(tokio_tungstenite::tungstenite::Message::Pong(vec![])).await?;
}
tungstenite::Message::Pong(_) => {
// Ignore
}
tungstenite::Message::Close(_) => {
info!("Shaper query server closed the connection");
break;
}
tungstenite::Message::Frame(_) => {
warn!("Received a frame message from shaper query server");
}
tungstenite::Message::Text(_) => {
warn!("Received a text message from shaper query server");
}
tungstenite::Message::Binary(bytes) => {
let message = WsMessage::from_bytes(&bytes)?;
match message {
WsMessage::IdentifyYourself => {
warn!("Unexpected IdentifyYourself")
//send_hello(tx.clone()).await?;
}
WsMessage::TokenAccepted => {
info!("Token accepted");
}
WsMessage::InvalidToken => {
warn!("Invalid token");
break;
}
WsMessage::Tick => {
info!("Tick");
}
WsMessage::QueryResult {tag, seconds, data} => {
info!("Query result: {tag} {seconds}, length: {}", data.len());
caches.store(tag, seconds, data).await;
}
_ => unimplemented!()
}
}
}
}
Some(to_send) = rx.recv() => {
write.send(to_send).await?;
}
}
}
Ok(())
}
async fn connect() -> anyhow::Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
let remote_host = crate::lts2_sys::lts2_client::get_remote_host();
let target = format!("wss://{}:443/shaper_api/shaperWs", remote_host);
info!("Connecting to shaper query server: {target}");
let mut addresses = format!("{}:443", remote_host).to_socket_addrs()?;
let addr = addresses.next().ok_or_else(|| anyhow!("Failed to resolve remote host"))?;
// TCP Stream
let Ok(Ok(stream)) = timeout(TCP_TIMEOUT, TcpStream::connect(&addr)).await else {
warn!("Failed to connect to shaper query server: {remote_host}");
bail!("Failed to connect to shaper query server");
};
// Native TLS
info!("Connected to shaper query server: {remote_host}");
let Ok(connector) = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build() else
{
warn!("Failed to create TLS connector");
bail!("Failed to create TLS connector");
};
let t_connector = tokio_tungstenite::Connector::NativeTls(connector);
// Tungstenite Client
info!("Connecting tungstenite client to shaper query server: {target}");
let result = tokio_tungstenite::client_async_tls_with_config(target, stream, None, Some(t_connector)).await;
if result.is_err() {
bail!("Failed to connect to shaper query server. {result:?}");
}
if result.is_err() {
bail!("Failed to connect to shaper query server. {result:?}");
}
let Ok((socket, _response)) = result else {
warn!("Failed to connect to shaper query server");
bail!("Failed to connect to shaper query server");
};
info!("Connected");
Ok(socket)
}
async fn send_hello(write: &mut SplitSink<WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, tungstenite::Message>) -> anyhow::Result<()> {
let config = load_config()?;
let Some(license_key) = &config.long_term_stats.license_key else {
warn!("No license key found in config");
bail!("No license key found in config");
};
let msg = WsMessage::Hello {
license_key: license_key.to_string(),
node_id: config.node_id.to_string(),
}.to_bytes()?;
//tx.send(tungstenite::Message::Binary(msg)).await?;
write.send(tungstenite::Message::Binary(msg)).await?;
Ok(())
}

View File

@ -1,173 +0,0 @@
use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
use std::time::Duration;
use anyhow::{anyhow, bail};
use native_tls::TlsStream;
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
use tungstenite::{Message, WebSocket};
use lqos_config::load_config;
use crate::node_manager::shaper_queries_actor::Caches;
pub fn get_remote_data(caches: &mut Caches, seconds: i32) -> anyhow::Result<()> {
info!("Getting remote data for {seconds} seconds");
let config = load_config()?;
let Some(license_key) = &config.long_term_stats.license_key else {
warn!("No license key found in config");
bail!("No license key found in config");
};
let mut socket = connect_shaper_socket()?;
info!("Saying hello");
send_hello(&mut socket, license_key.as_str(), &config.node_id)?;
info!("Authorized");
request_graphs(&mut socket, seconds)?;
for _ in 0 .. 4 {
let response = socket.read()?;
let reply = WsMessage::from_bytes(&response.into_data())?;
let WsMessage::QueryResult { tag, data } = reply else {
bail!("Failed to get data from shaper query server");
};
match tag.as_str() {
"throughput" => {
let throughput = serde_cbor::from_slice(&data)?;
caches.throughput.insert(seconds, throughput);
}
"packets" => {
let packets = serde_cbor::from_slice(&data)?;
caches.packets.insert(seconds, packets);
}
"percent" => {
let percent = serde_cbor::from_slice(&data)?;
caches.percent_shaped.insert(seconds, percent);
}
"flows" => {
let flows = serde_cbor::from_slice(&data)?;
caches.flows.insert(seconds, flows);
}
_ => {
warn!("Unknown tag received from shaper query server: {tag}");
}
}
}
Ok(())
}
#[derive(Serialize, Deserialize, Debug)]
enum WsMessage {
// Requests
IdentifyYourself,
InvalidToken,
TokenAccepted,
ShaperThroughput { seconds: i32 },
ShaperPackets { seconds: i32 },
ShaperPercent { seconds: i32 },
ShaperFlows { seconds: i32 },
// Responses
Hello { license_key: String, node_id: String },
QueryResult { tag: String, data: Vec<u8> },
Tick,
}
type Wss = WebSocket<TlsStream<TcpStream>>;
fn connect_shaper_socket() -> anyhow::Result<Wss> {
let remote_host = crate::lts2_sys::lts2_client::get_remote_host();
let target = format!("wss://{}:443/shaper_api/shaperWs", remote_host);
info!("Connecting to shaper query server: {target}");
let mut addresses = format!("{}:443", remote_host).to_socket_addrs()?;
let addr = addresses.next().ok_or_else(|| anyhow!("Failed to resolve remote host"))?;
let Ok(stream) = TcpStream::connect_timeout(&addr, Duration::from_secs(10)) else {
warn!("Failed to connect to shaper query server: {remote_host}");
bail!("Failed to connect to shaper query server");
};
info!("Connected to shaper query server: {remote_host}");
let Ok(connector) = native_tls::TlsConnector::builder()
.danger_accept_invalid_certs(true)
.danger_accept_invalid_hostnames(true)
.build() else
{
warn!("Failed to create TLS connector");
bail!("Failed to create TLS connector");
};
info!("Connecting TLS stream to shaper query server: {remote_host}");
let result = connector.connect(&format!("{}", remote_host), stream);
if result.is_err() {
warn!("Failed to connect TLS stream to shaper query server: {result:?}");
bail!("Failed to connect TLS stream to shaper query server: {result:?}");
}
let Ok(tls_stream) = result else {
warn!("Failed to connect TLS stream to shaper query server");
bail!("Failed to connect TLS stream to shaper query server");
};
info!("Connecting tungstenite client to shaper query server: {target}");
let result = tungstenite::client(target, tls_stream);
if result.is_err() {
bail!("Failed to connect to shaper query server. {result:?}");
}
let Ok((mut socket, _response)) = result else {
warn!("Failed to connect to shaper query server");
bail!("Failed to connect to shaper query server");
};
info!("Connected");
let Ok(msg) = socket.read() else {
warn!("Failed to read from shaper query server");
bail!("Failed to read from shaper query server");
};
let reply = WsMessage::from_bytes(&msg.into_data())?;
let WsMessage::IdentifyYourself = reply else {
warn!("Failed to identify with shaper query server. Got: {reply:?}");
bail!("Failed to identify with shaper query server");
};
Ok(socket)
}
fn send_hello(socket: &mut Wss, license_key: &str, node_id: &str) -> anyhow::Result<()> {
let msg = WsMessage::Hello {
license_key: license_key.to_string(),
node_id: node_id.to_string(),
}.to_bytes()?;
socket.send(Message::Binary(msg))?;
let response = socket.read()?;
let reply = WsMessage::from_bytes(&response.into_data())?;
let WsMessage::TokenAccepted = reply else {
warn!("Failed to authenticate with shaper query server. Got: {reply:?}");
bail!("Failed to authenticate with shaper query server");
};
Ok(())
}
fn close(socket: &mut Wss) -> anyhow::Result<()> {
// Close the socket
socket.send(Message::Close(None))?;
Ok(())
}
fn request_graphs(socket: &mut Wss, seconds: i32) -> anyhow::Result<()> {
info!("Requesting throughput for {seconds} seconds");
let msg = WsMessage::ShaperThroughput { seconds }.to_bytes()?;
socket.send(Message::Binary(msg))?;
let msg = WsMessage::ShaperPackets { seconds }.to_bytes()?;
socket.send(Message::Binary(msg))?;
let msg = WsMessage::ShaperPercent { seconds }.to_bytes()?;
socket.send(Message::Binary(msg))?;
let msg = WsMessage::ShaperFlows { seconds }.to_bytes()?;
socket.send(Message::Binary(msg))?;
Ok(())
}
impl WsMessage {
fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let raw_bytes = serde_cbor::to_vec(self)?;
Ok(raw_bytes)
}
fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
Ok(serde_cbor::from_slice(&bytes)?)
}
}

View File

@ -0,0 +1,29 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub enum WsMessage {
// Requests
IdentifyYourself,
InvalidToken,
TokenAccepted,
ShaperThroughput { seconds: i32 },
ShaperPackets { seconds: i32 },
ShaperPercent { seconds: i32 },
ShaperFlows { seconds: i32 },
// Responses
Hello { license_key: String, node_id: String },
QueryResult { tag: String, seconds: i32, data: Vec<u8> },
Tick,
}
impl WsMessage {
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let raw_bytes = serde_cbor::to_vec(self)?;
Ok(raw_bytes)
}
pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
Ok(serde_cbor::from_slice(&bytes)?)
}
}