diff --git a/src/rust/lqos_bus/src/bus/mod.rs b/src/rust/lqos_bus/src/bus/mod.rs index cf74b2d5..7b6588d9 100644 --- a/src/rust/lqos_bus/src/bus/mod.rs +++ b/src/rust/lqos_bus/src/bus/mod.rs @@ -5,6 +5,7 @@ mod request; mod response; mod session; mod unix_socket_server; +mod queue_data; pub use client::bus_request; use log::error; pub use persistent_client::BusClient; @@ -14,6 +15,7 @@ pub use response::BusResponse; pub use session::BusSession; use thiserror::Error; pub use unix_socket_server::UnixSocketServer; +pub use queue_data::*; /// The local socket path to which `lqosd` will bind itself, /// listening for requets. diff --git a/src/rust/lqos_bus/src/bus/queue_data.rs b/src/rust/lqos_bus/src/bus/queue_data.rs new file mode 100644 index 00000000..42b74680 --- /dev/null +++ b/src/rust/lqos_bus/src/bus/queue_data.rs @@ -0,0 +1,103 @@ +use serde::{Serialize, Deserialize}; + + +/// Type used for *displaying* the queue store data. It deliberately +/// doesn't include data that we aren't going to display in a GUI. +#[allow(missing_docs)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +pub struct QueueStoreTransit { + pub history: Vec<(CakeDiffTransit, CakeDiffTransit)>, + pub history_head: usize, + //pub prev_download: Option, + //pub prev_upload: Option, + pub current_download: CakeTransit, + pub current_upload: CakeTransit, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +#[allow(missing_docs)] +pub struct CakeDiffTransit { + pub bytes: u64, + pub packets: u32, + pub qlen: u32, + pub tins: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +#[allow(missing_docs)] +pub struct CakeDiffTinTransit { + pub sent_bytes: u64, + pub backlog_bytes: u32, + pub drops: u32, + pub marks: u32, + pub avg_delay_us: u32, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +#[allow(missing_docs)] +pub struct CakeTransit { + //pub handle: TcHandle, + //pub parent: TcHandle, + //pub bytes: u64, + //pub packets: u32, + //pub overlimits: u32, + //pub requeues: u32, + //pub backlog: u32, + //pub qlen: u32, + pub memory_used: u32, + //pub memory_limit: u32, + //pub capacity_estimate: u32, + //pub min_network_size: u16, + //pub max_network_size: u16, + //pub min_adj_size: u16, + //pub max_adj_size: u16, + //pub avg_hdr_offset: u16, + //pub tins: Vec, + //pub drops: u32, +} + +/* +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +#[allow(missing_docs)] +pub struct CakeOptionsTransit { + pub rtt: u64, + pub bandwidth: u8, + pub diffserv: u8, + pub flowmode: u8, + pub ack_filter: u8, + pub nat: bool, + pub wash: bool, + pub ingress: bool, + pub split_gso: bool, + pub raw: bool, + pub overhead: u16, + pub fwmark: TcHandle, +} + + +// Commented out data is collected but not used +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +#[allow(missing_docs)] +pub struct CakeTinTransit { + //pub threshold_rate: u64, + //pub sent_bytes: u64, + //pub backlog_bytes: u32, + //pub target_us: u32, + //pub interval_us: u32, + //pub peak_delay_us: u32, + //pub avg_delay_us: u32, + //pub base_delay_us: u32, + //pub sent_packets: u32, + //pub way_indirect_hits: u16, + //pub way_misses: u16, + //pub way_collisions: u16, + //pub drops: u32, + //pub ecn_marks: u32, + //pub ack_drops: u32, + //pub sparse_flows: u16, + //pub bulk_flows: u16, + //pub unresponsive_flows: u16, + //pub max_pkt_len: u16, + //pub flow_quantum: u16, +} +*/ \ No newline at end of file diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index 5c416b80..e415f102 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -1,6 +1,7 @@ use crate::{IpMapping, IpStats, XdpPpingResult, FlowTransport, ip_stats::PacketHeader}; use serde::{Deserialize, Serialize}; use std::net::IpAddr; +use super::QueueStoreTransit; /// A `BusResponse` object represents a single /// reply generated from a `BusRequest`, and batched @@ -67,7 +68,7 @@ pub enum BusResponse { /// A string containing a JSON dump of a queue stats. Analagos to /// the response from `tc show qdisc`. - RawQueueData(String), + RawQueueData(Option>), /// Results from network map queries NetworkMap(Vec<(usize, lqos_config::NetworkJsonTransport)>), diff --git a/src/rust/lqos_bus/src/lib.rs b/src/rust/lqos_bus/src/lib.rs index 9c0efc6f..7b067111 100644 --- a/src/rust/lqos_bus/src/lib.rs +++ b/src/rust/lqos_bus/src/lib.rs @@ -12,11 +12,15 @@ #![warn(missing_docs)] mod bus; mod ip_stats; -pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport, tos_parser, PacketHeader}; +pub use ip_stats::{ + tos_parser, FlowProto, FlowTransport, IpMapping, IpStats, PacketHeader, + XdpPpingResult, +}; mod tc_handle; pub use bus::{ bus_request, decode_request, decode_response, encode_request, encode_response, BusClient, BusReply, BusRequest, BusResponse, BusSession, + CakeDiffTinTransit, CakeDiffTransit, CakeTransit, QueueStoreTransit, UnixSocketServer, BUS_SOCKET_PATH, }; pub use tc_handle::TcHandle; diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs index d4fadb0b..773b0aad 100644 --- a/src/rust/lqos_config/src/network_json/mod.rs +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -179,6 +179,7 @@ impl NetworkJson { &self, circuit_id: &str, ) -> Option> { + //println!("Looking for parents of {circuit_id}"); self .nodes .iter() diff --git a/src/rust/lqos_node_manager/src/network_tree.rs b/src/rust/lqos_node_manager/src/network_tree.rs index 481df1c4..8383cbfe 100644 --- a/src/rust/lqos_node_manager/src/network_tree.rs +++ b/src/rust/lqos_node_manager/src/network_tree.rs @@ -106,7 +106,7 @@ pub async fn node_names( #[get("/api/funnel_for_queue/")] pub async fn funnel_for_queue( circuit_id: String, -) -> NoCache>> { +) -> NoCache>> { let mut result = Vec::new(); let target = SHAPED_DEVICES @@ -127,5 +127,5 @@ pub async fn funnel_for_queue( result.extend_from_slice(map); } } - NoCache::new(Json(result)) + NoCache::new(MsgPack(result)) } diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index 57e195a9..fe5e7d46 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -1,12 +1,13 @@ use crate::auth_guard::AuthGuard; use crate::cache_control::NoCache; use crate::tracker::SHAPED_DEVICES; -use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport, PacketHeader}; +use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport, PacketHeader, QueueStoreTransit}; use rocket::fs::NamedFile; use rocket::http::Status; use rocket::response::content::RawJson; use rocket::serde::json::Json; use rocket::serde::Serialize; +use rocket::serde::msgpack::MsgPack; use std::net::IpAddr; #[derive(Serialize, Clone)] @@ -30,7 +31,7 @@ pub async fn watch_circuit( pub async fn circuit_info( circuit_id: String, _auth: AuthGuard, -) -> NoCache> { +) -> NoCache> { if let Some(device) = SHAPED_DEVICES .read() .unwrap() @@ -45,13 +46,13 @@ pub async fn circuit_info( device.upload_max_mbps as u64 * 1_000_000, ), }; - NoCache::new(Json(result)) + NoCache::new(MsgPack(result)) } else { let result = CircuitInfo { name: "Nameless".to_string(), capacity: (1_000_000, 1_000_000), }; - NoCache::new(Json(result)) + NoCache::new(MsgPack(result)) } } @@ -59,7 +60,7 @@ pub async fn circuit_info( pub async fn current_circuit_throughput( circuit_id: String, _auth: AuthGuard, -) -> NoCache>> { +) -> NoCache>> { let mut result = Vec::new(); // Get a list of host counts // This is really inefficient, but I'm struggling to find a better way. @@ -84,25 +85,29 @@ pub async fn current_circuit_throughput( } } - NoCache::new(Json(result)) + NoCache::new(MsgPack(result)) } #[get("/api/raw_queue_by_circuit/")] pub async fn raw_queue_by_circuit( circuit_id: String, _auth: AuthGuard, -) -> NoCache> { +) -> NoCache> { + let responses = bus_request(vec![BusRequest::GetRawQueueData(circuit_id)]).await.unwrap(); + let result = match &responses[0] { - BusResponse::RawQueueData(msg) => msg.clone(), - _ => "Unable to request queue".to_string(), + BusResponse::RawQueueData(Some(msg)) => { + *msg.clone() + } + _ => QueueStoreTransit::default() }; - NoCache::new(RawJson(result)) + NoCache::new(MsgPack(result)) } #[get("/api/flows/")] -pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache)>>> { +pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache)>>> { let mut result = Vec::new(); let request: Vec = ip_list.split(',').map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect(); let responses = bus_request(request).await.unwrap(); @@ -111,7 +116,7 @@ pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache + @@ -9,15 +10,20 @@ LibreQoS - Local Node Manager - + + +