From dd399244d571d6bb53a957f81e0450242cbcef0b Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Thu, 18 Jul 2024 11:31:41 -0500 Subject: [PATCH] Make the Top 10 Flow by Bytes view a bit better. Pushing for testing. --- src/rust/lqos_bus/src/ip_stats.rs | 4 ++ .../lqos_config/src/shaped_devices/mod.rs | 18 +++++ .../js_build/src/dashlets/top10flows_bytes.js | 68 ++++++++++++------- .../js_build/src/helpers/rtt_cache.js | 16 +++++ .../js_build/src/helpers/scaling.js | 2 +- src/rust/lqosd/src/throughput_tracker/mod.rs | 24 +++++-- 6 files changed, 102 insertions(+), 30 deletions(-) create mode 100644 src/rust/lqosd/src/node_manager/js_build/src/helpers/rtt_cache.js diff --git a/src/rust/lqos_bus/src/ip_stats.rs b/src/rust/lqos_bus/src/ip_stats.rs index 81a65180..f5f8bf83 100644 --- a/src/rust/lqos_bus/src/ip_stats.rs +++ b/src/rust/lqos_bus/src/ip_stats.rs @@ -177,4 +177,8 @@ pub struct FlowbeeSummaryData { pub remote_asn_country: String, /// Analysis pub analysis: String, + /// Circuit ID + pub circuit_id: String, + /// Circuit Name + pub circuit_name: String, } diff --git a/src/rust/lqos_config/src/shaped_devices/mod.rs b/src/rust/lqos_config/src/shaped_devices/mod.rs index 38e519cb..2ae77ff1 100644 --- a/src/rust/lqos_config/src/shaped_devices/mod.rs +++ b/src/rust/lqos_config/src/shaped_devices/mod.rs @@ -1,5 +1,7 @@ mod serializable; mod shaped_device; + +use std::net::IpAddr; use crate::SUPPORTED_CUSTOMERS; use csv::{QuoteStyle, ReaderBuilder, WriterBuilder}; use log::error; @@ -7,6 +9,7 @@ use serializable::SerializableShapedDevice; pub use shaped_device::ShapedDevice; use std::path::{Path, PathBuf}; use thiserror::Error; +use lqos_utils::XdpIpAddress; /// Provides handling of the `ShapedDevices.csv` file that maps /// circuits to traffic shaping. @@ -166,6 +169,21 @@ impl ConfigShapedDevices { //println!("Would write to file: {}", csv); Ok(()) } + + /// Helper function to search for an XdpIpAddress and return a circuit id and name + /// if they exist. + pub fn get_circuit_id_and_name_from_ip(&self, ip: &XdpIpAddress) -> Option<(String, String)> { + let lookup = match ip.as_ip() { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + if let Some(c) = self.trie.longest_match(lookup) { + let device = &self.devices[*c.1]; + return Some((device.circuit_id.clone(), device.circuit_name.clone())); + } + + None + } } #[derive(Error, Debug)] diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top10flows_bytes.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top10flows_bytes.js index a76fc9af..6b5ec209 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top10flows_bytes.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/top10flows_bytes.js @@ -1,10 +1,12 @@ import {BaseDashlet} from "./base_dashlet"; import {clearDashDiv, theading} from "../helpers/builders"; -import {scaleNumber, scaleNanos} from "../helpers/scaling"; +import {scaleNumber, scaleNanos, formatRetransmit} from "../helpers/scaling"; +import {RttCache} from "../helpers/rtt_cache"; export class Top10FlowsBytes extends BaseDashlet { constructor(slot) { super(slot); + this.rttCache = new RttCache(); } title() { @@ -35,38 +37,43 @@ export class Top10FlowsBytes extends BaseDashlet { let target = document.getElementById(this.id); let t = document.createElement("table"); - t.classList.add("table", "table-striped", "tiny"); + t.classList.add("table", "table-sm", "small"); let th = document.createElement("thead"); + th.classList.add("small"); + th.appendChild(theading("IP/Circuit")); th.appendChild(theading("Protocol")); - th.appendChild(theading("Local IP")); - th.appendChild(theading("Remote IP")); th.appendChild(theading("DL ⬇️")); th.appendChild(theading("UL ⬆️")); th.appendChild(theading("Total")); - th.appendChild(theading("⬇ RTT")); - th.appendChild(theading("️️⬆ RTT")); - th.appendChild(theading("TCP Retransmits")); + th.appendChild(theading("RTT", 2)); + th.appendChild(theading("TCP Retransmits", 2)); th.appendChild(theading("Remote ASN")); - th.appendChild(theading("Country")); t.appendChild(th); let tbody = document.createElement("tbody"); msg.data.forEach((r) => { + console.log(r); let row = document.createElement("tr"); + row.classList.add("small"); + + if (r.circuit_id !== "") { + let circuit = document.createElement("td"); + let link = document.createElement("a"); + link.href = "circuit.html?id=" + encodeURI(r.circuit_id); + link.innerText = r.circuit_name; + circuit.appendChild(link); + row.appendChild(circuit); + } else { + let localIp = document.createElement("td"); + localIp.innerText = r.local_ip; + row.appendChild(localIp); + } let proto = document.createElement("td"); proto.innerText = r.analysis; row.appendChild(proto); - let localIp = document.createElement("td"); - localIp.innerText = r.local_ip; - row.appendChild(localIp); - - let remoteIp = document.createElement("td"); - remoteIp.innerText = r.remote_ip; - row.appendChild(remoteIp); - let dl = document.createElement("td"); dl.innerText = scaleNumber(r.rate_estimate_bps.down); row.appendChild(dl); @@ -79,26 +86,37 @@ export class Top10FlowsBytes extends BaseDashlet { total.innerText = scaleNumber(r.bytes_sent.down) + " / " + scaleNumber(r.bytes_sent.up); row.appendChild(total); + if (r.rtt_nanos.length > 0) { + this.rttCache.set(r.remote_ip + r.analysis, r.rtt_nanos); + } + let rtt = this.rttCache.get(r.remote_ip + r.analysis); + if (rtt === 0) { + rtt = [0,0]; + } + let rttD = document.createElement("td"); - rttD.innerText = scaleNanos(r.rtt_nanos.down); + rttD.innerText = scaleNanos(rtt[0], 0); row.appendChild(rttD); let rttU = document.createElement("td"); - rttU.innerText = scaleNanos(r.rtt_nanos.up); + rttU.innerText = scaleNanos(rtt[1], 0); row.appendChild(rttU); - let tcp = document.createElement("td"); - tcp.innerText = r.tcp_retransmits.down + " / " + r.tcp_retransmits.up; - row.appendChild(tcp); + let tcp1 = document.createElement("td"); + tcp1.innerHTML = formatRetransmit(r.tcp_retransmits.down); + row.appendChild(tcp1); + + let tcp2 = document.createElement("td"); + tcp2.innerHTML = formatRetransmit(r.tcp_retransmits.up); + row.appendChild(tcp2); let asn = document.createElement("td"); asn.innerText = r.remote_asn_name; + if (asn.innerText === "") { + asn.innerText = r.remote_ip; + } row.appendChild(asn); - let country = document.createElement("td"); - country.innerText = r.remote_asn_country; - row.appendChild(country); - t.appendChild(row); }); t.appendChild(tbody); diff --git a/src/rust/lqosd/src/node_manager/js_build/src/helpers/rtt_cache.js b/src/rust/lqosd/src/node_manager/js_build/src/helpers/rtt_cache.js new file mode 100644 index 00000000..d918595f --- /dev/null +++ b/src/rust/lqosd/src/node_manager/js_build/src/helpers/rtt_cache.js @@ -0,0 +1,16 @@ +export class RttCache { + constructor() { + this.cache = {}; + } + + set(key, value) { + this.cache[key] = value; + } + + get(key) { + if (this.cache[key] === undefined) { + return 0; + } + return this.cache[key]; + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js b/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js index f45e61ec..3f9a6f94 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/helpers/scaling.js @@ -12,7 +12,7 @@ export function scaleNumber(n, fixed=2) { } export function scaleNanos(n, precision=2) { - if (n === 0) return ""; + if (n === 0) return "-"; if (n > 60000000000) { return (n / 60000000000).toFixed(precision) + "m"; }else if (n > 1000000000) { diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 76209f39..707f6042 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -229,7 +229,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse { full_list.sort_by(|a, b| b.1.down.cmp(&a.1.down)); let result = full_list .iter() - .skip(start as usize) + //.skip(start as usize) .take((end as usize) - (start as usize)) .map( |( @@ -281,7 +281,7 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse { full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap()); let result = full_list .iter() - .skip(start as usize) + //.skip(start as usize) .take((end as usize) - (start as usize)) .map( |( @@ -337,7 +337,7 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse { }); let result = full_list .iter() - .skip(start as usize) + //.skip(start as usize) .take((end as usize) - (start as usize)) .map( |( @@ -390,7 +390,7 @@ pub fn best_n(start: u32, end: u32) -> BusResponse { full_list.reverse(); let result = full_list .iter() - .skip(start as usize) + //.skip(start as usize) .take((end as usize) - (start as usize)) .map( |( @@ -573,6 +573,8 @@ pub fn dump_active_flows() -> BusResponse { let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); + let (circuit_id, circuit_name) = (String::new(), String::new()); + lqos_bus::FlowbeeSummaryData { remote_ip: key.remote_ip.as_ip().to_string(), local_ip: key.local_ip.as_ip().to_string(), @@ -593,6 +595,8 @@ pub fn dump_active_flows() -> BusResponse { last_seen: row.0.last_seen, start_time: row.0.start_time, rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()), + circuit_id, + circuit_name, } }) .collect(); @@ -653,12 +657,17 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { } } + let sd = SHAPED_DEVICES.read().unwrap(); + let result = table .iter() .take(n as usize) .map(|(ip, flow)| { let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(ip.remote_ip.as_ip()); + + let (circuit_id, circuit_name) = sd.get_circuit_id_and_name_from_ip(&ip.local_ip).unwrap_or((String::new(), String::new())); + lqos_bus::FlowbeeSummaryData { remote_ip: ip.remote_ip.as_ip().to_string(), local_ip: ip.local_ip.as_ip().to_string(), @@ -679,6 +688,8 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { last_seen: flow.0.last_seen, start_time: flow.0.start_time, rtt_nanos: DownUpOrder::new(flow.0.rtt[0].as_nanos(), flow.0.rtt[1].as_nanos()), + circuit_id, + circuit_name, } }) .collect(); @@ -691,6 +702,7 @@ pub fn flows_by_ip(ip: &str) -> BusResponse { if let Ok(ip) = ip.parse::() { let ip = XdpIpAddress::from_ip(ip); let lock = ALL_FLOWS.lock().unwrap(); + let sd = SHAPED_DEVICES.read().unwrap(); let matching_flows: Vec<_> = lock .iter() .filter(|(key, _)| key.local_ip == ip) @@ -698,6 +710,8 @@ pub fn flows_by_ip(ip: &str) -> BusResponse { let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); + let (circuit_id, circuit_name) = sd.get_circuit_id_and_name_from_ip(&key.local_ip).unwrap_or((String::new(), String::new())); + lqos_bus::FlowbeeSummaryData { remote_ip: key.remote_ip.as_ip().to_string(), local_ip: key.local_ip.as_ip().to_string(), @@ -718,6 +732,8 @@ pub fn flows_by_ip(ip: &str) -> BusResponse { last_seen: row.0.last_seen, start_time: row.0.start_time, rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()), + circuit_id, + circuit_name, } }) .collect();