Continue cleaning up flow display system.

This commit is contained in:
Herbert Wolverson 2023-03-13 17:04:18 +00:00
parent 64641c6849
commit 4372b501c9
6 changed files with 89 additions and 18 deletions

View File

@ -88,5 +88,5 @@ pub enum BusResponse {
},
/// Flow Data
FlowData(Vec<FlowTransport>),
FlowData(Vec<(FlowTransport, Option<FlowTransport>)>),
}

View File

@ -81,5 +81,16 @@ pub struct FlowTransport {
pub dst_port: u16,
pub bytes: u64,
pub packets: u64,
pub tos: u8,
pub dscp: u8,
pub congestion: bool,
}
pub fn tos_parser(tos: u8) -> (u8, bool) {
// Format: 2 bits of ECN, 6 bits of DSCP
const ECN: u8 = 0b00000011;
const DSCP: u8 = 0b11111100;
let ecn = tos & ECN;
let dscp = (tos & DSCP) >> 2;
(dscp, ecn == 3)
}

View File

@ -12,7 +12,7 @@
#![warn(missing_docs)]
mod bus;
mod ip_stats;
pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport};
pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport, tos_parser};
mod tc_handle;
pub use bus::{
bus_request, decode_request, decode_response, encode_request,

View File

@ -100,7 +100,7 @@ pub async fn raw_queue_by_circuit(
}
#[get("/api/flows/<ip_list>")]
pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<Json<Vec<FlowTransport>>> {
pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<Json<Vec<(FlowTransport, Option<FlowTransport>)>>> {
let mut result = Vec::new();
let request: Vec<BusRequest> = ip_list.split(',').map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect();
let responses = bus_request(request).await.unwrap();

View File

@ -537,15 +537,56 @@
}
ip_list = ip_list.substring(0, ip_list.length-1);
$.get("/api/flows/" + ip_list, (data) => {
//console.log(data);
let html = "<table class='table table-striped'>";
html += "<thead><th>Protocol</th><th>Flow</th><th>Packets</th><th>Bytes</th></thead><tbody>";
html += "<thead>";
html += "<th>Protocol</th>";
html += "<th>Source</th>";
html += "<th>Source Port</th>";
html += "<th>Dest</th>";
html += "<th>Dest Port</th>";
html += "<th>Packets In</th>";
html += "<th>Packets Out</th>";
html += "<th>Bytes In</th>";
html += "<th>Bytes Out</th>";
html += "<th>DSCP In</th>";
html += "<th>DSCP Out</th>";
html += "<th>ECN In</th>";
html += "<th>ECN Out</th>";
html += "</thead>";
for (let i=0; i<data.length; i++) {
let rpackets = "-";
let rbytes = "-";
let rdscp = "-";
let rcongestion = false;
if (data[i][1] != null) {
rpackets = data[i][1].packets;
rbytes = scaleNumber(data[i][1].bytes);
rdscp = "0x" + data[i][1].dscp.toString(16);
rcongestion = data[i][1].congestion;
}
html += "<tr>";
html += "<td>" + data[i].proto + "</td>";
html += "<td>" + data[i].src + ":" + data[i].src_port + "▶️" + data[i].dst + ":" + data[i].dst_port + "</td>";
html += "<td>" + data[i].packets + "</td>";
html += "<td>" + scaleNumber(data[i].bytes) + "</td>";
html += "<td>" + data[i].tos + "</td>";
html += "<td>" + data[i][0].proto + "</td>";
html += "<td>" + data[i][0].src + "</td>";
html += "<td>" + data[i][0].src_port + "</td>";
html += "<td>" + data[i][0].dst + "</td>";
html += "<td>" + data[i][0].dst_port + "</td>";
html += "<td>" + data[i][0].packets + "</td>";
html += "<td>" + rpackets + "</td>";
html += "<td>" + scaleNumber(data[i][0].bytes) + "</td>";
html += "<td>" + rbytes + "</td>";
html += "<td>0x" + data[i][0].dscp.toString(16) + "</td>";
html += "<td>" + rdscp + "</td>";
if (data[i][0].congestion) {
html += "<td></td>";
} else {
html += "<td>-</td>";
}
if (rcongestion) {
html += "<td></td>";
} else {
html += "<td>-</td>";
}
html += "</tr>";
}
html += "</tbody></table>";

View File

@ -1,7 +1,7 @@
use std::{time::Duration, net::IpAddr};
use std::{time::Duration, net::IpAddr, collections::HashSet};
use dashmap::DashMap;
use lqos_bus::{BusResponse, FlowTransport};
use lqos_bus::{BusResponse, FlowTransport, tos_parser};
use lqos_sys::{HeimdallData, HeimdallKey, XdpIpAddress, heimdall_watch_ip};
use lqos_utils::unix_time::time_since_boot;
use once_cell::sync::Lazy;
@ -54,9 +54,7 @@ impl PalantirMonitor {
flow.bytes = combined.bytes;
flow.packets = combined.packets;
flow.last_seen = combined.last_seen;
if combined.tos != 0 {
flow.tos = combined.tos;
}
} else {
// Insert
self.data.insert(key.clone(), combined);
@ -93,10 +91,13 @@ pub fn get_flow_stats(ip: &str) -> BusResponse {
heimdall_watch_ip(ip);
let mut result = Vec::new();
// Obtain all the flows
let mut all_flows = Vec::new();
for value in HEIMDALL.data.iter() {
let key = value.key();
if key.src_ip == ip || key.dst_ip == ip {
result.push(FlowTransport{
let (dscp, congestion) = tos_parser(value.tos);
all_flows.push(FlowTransport{
src: key.src_ip.as_ip().to_string(),
dst: key.dst_ip.as_ip().to_string(),
src_port: key.src_port,
@ -108,11 +109,29 @@ pub fn get_flow_stats(ip: &str) -> BusResponse {
},
bytes: value.bytes,
packets: value.packets,
tos: value.tos,
dscp,
congestion
});
}
}
// Turn them into reciprocal pairs
let mut done = HashSet::new();
for (i,flow) in all_flows.iter().enumerate() {
if !done.contains(&i) {
let flow_a = flow.clone();
let flow_b = if let Some(flow_b) = all_flows.iter().position(|f| f.src == flow_a.dst && f.src_port == flow_a.dst_port) {
done.insert(flow_b);
Some(all_flows[flow_b].clone())
} else {
None
};
result.push((flow_a, flow_b));
}
}
return BusResponse::FlowData(result);
}
BusResponse::Fail("No Stats or bad IP".to_string())