diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index e8e0134f..319eb7b1 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -88,5 +88,5 @@ pub enum BusResponse { }, /// Flow Data - FlowData(Vec), + FlowData(Vec<(FlowTransport, Option)>), } diff --git a/src/rust/lqos_bus/src/ip_stats.rs b/src/rust/lqos_bus/src/ip_stats.rs index 1127c58d..f7e66c5a 100644 --- a/src/rust/lqos_bus/src/ip_stats.rs +++ b/src/rust/lqos_bus/src/ip_stats.rs @@ -81,5 +81,16 @@ pub struct FlowTransport { pub dst_port: u16, pub bytes: u64, pub packets: u64, - pub tos: u8, + pub dscp: u8, + pub congestion: bool, +} + +pub fn tos_parser(tos: u8) -> (u8, bool) { + // Format: 2 bits of ECN, 6 bits of DSCP + const ECN: u8 = 0b00000011; + const DSCP: u8 = 0b11111100; + + let ecn = tos & ECN; + let dscp = (tos & DSCP) >> 2; + (dscp, ecn == 3) } \ No newline at end of file diff --git a/src/rust/lqos_bus/src/lib.rs b/src/rust/lqos_bus/src/lib.rs index 31ae488f..d6e0dc87 100644 --- a/src/rust/lqos_bus/src/lib.rs +++ b/src/rust/lqos_bus/src/lib.rs @@ -12,7 +12,7 @@ #![warn(missing_docs)] mod bus; mod ip_stats; -pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport}; +pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport, tos_parser}; mod tc_handle; pub use bus::{ bus_request, decode_request, decode_response, encode_request, diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index 9a3f782a..90c2d85e 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -100,7 +100,7 @@ pub async fn raw_queue_by_circuit( } #[get("/api/flows/")] -pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache>> { +pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache)>>> { let mut result = Vec::new(); let request: Vec = ip_list.split(',').map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect(); let responses = bus_request(request).await.unwrap(); diff --git a/src/rust/lqos_node_manager/static/circuit_queue.html b/src/rust/lqos_node_manager/static/circuit_queue.html index f76fab2a..9a059b30 100644 --- a/src/rust/lqos_node_manager/static/circuit_queue.html +++ b/src/rust/lqos_node_manager/static/circuit_queue.html @@ -537,15 +537,56 @@ } ip_list = ip_list.substring(0, ip_list.length-1); $.get("/api/flows/" + ip_list, (data) => { + //console.log(data); let html = ""; - html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; for (let i=0; i"; - html += ""; - html += ""; - html += ""; - html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + if (data[i][0].congestion) { + html += ""; + } else { + html += ""; + } + if (rcongestion) { + html += ""; + } else { + html += ""; + } html += ""; } html += "
ProtocolFlowPacketsBytes
ProtocolSourceSource PortDestDest PortPackets InPackets OutBytes InBytes OutDSCP InDSCP OutECN InECN Out
" + data[i].src + ":" + data[i].src_port + "▶️" + data[i].dst + ":" + data[i].dst_port + "" + data[i].packets + "" + scaleNumber(data[i].bytes) + "" + data[i].tos + "" + data[i][0].proto + "" + data[i][0].src + "" + data[i][0].src_port + "" + data[i][0].dst + "" + data[i][0].dst_port + "" + data[i][0].packets + "" + rpackets + "" + scaleNumber(data[i][0].bytes) + "" + rbytes + "0x" + data[i][0].dscp.toString(16) + "" + rdscp + "--
"; diff --git a/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs b/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs index 88af373c..d095731e 100644 --- a/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/heimdall_data.rs @@ -1,7 +1,7 @@ -use std::{time::Duration, net::IpAddr}; +use std::{time::Duration, net::IpAddr, collections::HashSet}; use dashmap::DashMap; -use lqos_bus::{BusResponse, FlowTransport}; +use lqos_bus::{BusResponse, FlowTransport, tos_parser}; use lqos_sys::{HeimdallData, HeimdallKey, XdpIpAddress, heimdall_watch_ip}; use lqos_utils::unix_time::time_since_boot; use once_cell::sync::Lazy; @@ -54,9 +54,7 @@ impl PalantirMonitor { flow.bytes = combined.bytes; flow.packets = combined.packets; flow.last_seen = combined.last_seen; - if combined.tos != 0 { - flow.tos = combined.tos; - } + flow.tos = combined.tos; } else { // Insert self.data.insert(key.clone(), combined); @@ -93,10 +91,13 @@ pub fn get_flow_stats(ip: &str) -> BusResponse { heimdall_watch_ip(ip); let mut result = Vec::new(); + // Obtain all the flows + let mut all_flows = Vec::new(); for value in HEIMDALL.data.iter() { let key = value.key(); - if key.src_ip == ip || key.dst_ip == ip { - result.push(FlowTransport{ + if key.src_ip == ip || key.dst_ip == ip { + let (dscp, congestion) = tos_parser(value.tos); + all_flows.push(FlowTransport{ src: key.src_ip.as_ip().to_string(), dst: key.dst_ip.as_ip().to_string(), src_port: key.src_port, @@ -108,11 +109,29 @@ pub fn get_flow_stats(ip: &str) -> BusResponse { }, bytes: value.bytes, packets: value.packets, - tos: value.tos, + dscp, + congestion }); } } + // Turn them into reciprocal pairs + let mut done = HashSet::new(); + for (i,flow) in all_flows.iter().enumerate() { + if !done.contains(&i) { + let flow_a = flow.clone(); + let flow_b = if let Some(flow_b) = all_flows.iter().position(|f| f.src == flow_a.dst && f.src_port == flow_a.dst_port) { + done.insert(flow_b); + Some(all_flows[flow_b].clone()) + } else { + None + }; + + result.push((flow_a, flow_b)); + } + } + + return BusResponse::FlowData(result); } BusResponse::Fail("No Stats or bad IP".to_string())