Make the Top 10 Flow by Bytes view a bit better. Pushing for testing.

This commit is contained in:
Herbert Wolverson
2024-07-18 11:31:41 -05:00
parent 280176a4c2
commit dd399244d5
6 changed files with 102 additions and 30 deletions

View File

@@ -177,4 +177,8 @@ pub struct FlowbeeSummaryData {
pub remote_asn_country: String,
/// Analysis
pub analysis: String,
/// Circuit ID
pub circuit_id: String,
/// Circuit Name
pub circuit_name: String,
}

View File

@@ -1,5 +1,7 @@
mod serializable;
mod shaped_device;
use std::net::IpAddr;
use crate::SUPPORTED_CUSTOMERS;
use csv::{QuoteStyle, ReaderBuilder, WriterBuilder};
use log::error;
@@ -7,6 +9,7 @@ use serializable::SerializableShapedDevice;
pub use shaped_device::ShapedDevice;
use std::path::{Path, PathBuf};
use thiserror::Error;
use lqos_utils::XdpIpAddress;
/// Provides handling of the `ShapedDevices.csv` file that maps
/// circuits to traffic shaping.
@@ -166,6 +169,21 @@ impl ConfigShapedDevices {
//println!("Would write to file: {}", csv);
Ok(())
}
/// Helper function to search for an XdpIpAddress and return a circuit id and name
/// if they exist.
pub fn get_circuit_id_and_name_from_ip(&self, ip: &XdpIpAddress) -> Option<(String, String)> {
let lookup = match ip.as_ip() {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
if let Some(c) = self.trie.longest_match(lookup) {
let device = &self.devices[*c.1];
return Some((device.circuit_id.clone(), device.circuit_name.clone()));
}
None
}
}
#[derive(Error, Debug)]

View File

@@ -1,10 +1,12 @@
import {BaseDashlet} from "./base_dashlet";
import {clearDashDiv, theading} from "../helpers/builders";
import {scaleNumber, scaleNanos} from "../helpers/scaling";
import {scaleNumber, scaleNanos, formatRetransmit} from "../helpers/scaling";
import {RttCache} from "../helpers/rtt_cache";
export class Top10FlowsBytes extends BaseDashlet {
constructor(slot) {
super(slot);
this.rttCache = new RttCache();
}
title() {
@@ -35,38 +37,43 @@ export class Top10FlowsBytes extends BaseDashlet {
let target = document.getElementById(this.id);
let t = document.createElement("table");
t.classList.add("table", "table-striped", "tiny");
t.classList.add("table", "table-sm", "small");
let th = document.createElement("thead");
th.classList.add("small");
th.appendChild(theading("IP/Circuit"));
th.appendChild(theading("Protocol"));
th.appendChild(theading("Local IP"));
th.appendChild(theading("Remote IP"));
th.appendChild(theading("DL ⬇️"));
th.appendChild(theading("UL ⬆️"));
th.appendChild(theading("Total"));
th.appendChild(theading("RTT"));
th.appendChild(theading("️️⬆ RTT"));
th.appendChild(theading("TCP Retransmits"));
th.appendChild(theading("RTT", 2));
th.appendChild(theading("TCP Retransmits", 2));
th.appendChild(theading("Remote ASN"));
th.appendChild(theading("Country"));
t.appendChild(th);
let tbody = document.createElement("tbody");
msg.data.forEach((r) => {
console.log(r);
let row = document.createElement("tr");
row.classList.add("small");
if (r.circuit_id !== "") {
let circuit = document.createElement("td");
let link = document.createElement("a");
link.href = "circuit.html?id=" + encodeURI(r.circuit_id);
link.innerText = r.circuit_name;
circuit.appendChild(link);
row.appendChild(circuit);
} else {
let localIp = document.createElement("td");
localIp.innerText = r.local_ip;
row.appendChild(localIp);
}
let proto = document.createElement("td");
proto.innerText = r.analysis;
row.appendChild(proto);
let localIp = document.createElement("td");
localIp.innerText = r.local_ip;
row.appendChild(localIp);
let remoteIp = document.createElement("td");
remoteIp.innerText = r.remote_ip;
row.appendChild(remoteIp);
let dl = document.createElement("td");
dl.innerText = scaleNumber(r.rate_estimate_bps.down);
row.appendChild(dl);
@@ -79,26 +86,37 @@ export class Top10FlowsBytes extends BaseDashlet {
total.innerText = scaleNumber(r.bytes_sent.down) + " / " + scaleNumber(r.bytes_sent.up);
row.appendChild(total);
if (r.rtt_nanos.length > 0) {
this.rttCache.set(r.remote_ip + r.analysis, r.rtt_nanos);
}
let rtt = this.rttCache.get(r.remote_ip + r.analysis);
if (rtt === 0) {
rtt = [0,0];
}
let rttD = document.createElement("td");
rttD.innerText = scaleNanos(r.rtt_nanos.down);
rttD.innerText = scaleNanos(rtt[0], 0);
row.appendChild(rttD);
let rttU = document.createElement("td");
rttU.innerText = scaleNanos(r.rtt_nanos.up);
rttU.innerText = scaleNanos(rtt[1], 0);
row.appendChild(rttU);
let tcp = document.createElement("td");
tcp.innerText = r.tcp_retransmits.down + " / " + r.tcp_retransmits.up;
row.appendChild(tcp);
let tcp1 = document.createElement("td");
tcp1.innerHTML = formatRetransmit(r.tcp_retransmits.down);
row.appendChild(tcp1);
let tcp2 = document.createElement("td");
tcp2.innerHTML = formatRetransmit(r.tcp_retransmits.up);
row.appendChild(tcp2);
let asn = document.createElement("td");
asn.innerText = r.remote_asn_name;
if (asn.innerText === "") {
asn.innerText = r.remote_ip;
}
row.appendChild(asn);
let country = document.createElement("td");
country.innerText = r.remote_asn_country;
row.appendChild(country);
t.appendChild(row);
});
t.appendChild(tbody);

View File

@@ -0,0 +1,16 @@
export class RttCache {
constructor() {
this.cache = {};
}
set(key, value) {
this.cache[key] = value;
}
get(key) {
if (this.cache[key] === undefined) {
return 0;
}
return this.cache[key];
}
}

View File

@@ -12,7 +12,7 @@ export function scaleNumber(n, fixed=2) {
}
export function scaleNanos(n, precision=2) {
if (n === 0) return "";
if (n === 0) return "-";
if (n > 60000000000) {
return (n / 60000000000).toFixed(precision) + "m";
}else if (n > 1000000000) {

View File

@@ -229,7 +229,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
full_list.sort_by(|a, b| b.1.down.cmp(&a.1.down));
let result = full_list
.iter()
.skip(start as usize)
//.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
@@ -281,7 +281,7 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
let result = full_list
.iter()
.skip(start as usize)
//.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
@@ -337,7 +337,7 @@ pub fn worst_n_retransmits(start: u32, end: u32) -> BusResponse {
});
let result = full_list
.iter()
.skip(start as usize)
//.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
@@ -390,7 +390,7 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
full_list.reverse();
let result = full_list
.iter()
.skip(start as usize)
//.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(
@@ -573,6 +573,8 @@ pub fn dump_active_flows() -> BusResponse {
let (remote_asn_name, remote_asn_country) =
get_asn_name_and_country(key.remote_ip.as_ip());
let (circuit_id, circuit_name) = (String::new(), String::new());
lqos_bus::FlowbeeSummaryData {
remote_ip: key.remote_ip.as_ip().to_string(),
local_ip: key.local_ip.as_ip().to_string(),
@@ -593,6 +595,8 @@ pub fn dump_active_flows() -> BusResponse {
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()),
circuit_id,
circuit_name,
}
})
.collect();
@@ -653,12 +657,17 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
}
}
let sd = SHAPED_DEVICES.read().unwrap();
let result = table
.iter()
.take(n as usize)
.map(|(ip, flow)| {
let (remote_asn_name, remote_asn_country) =
get_asn_name_and_country(ip.remote_ip.as_ip());
let (circuit_id, circuit_name) = sd.get_circuit_id_and_name_from_ip(&ip.local_ip).unwrap_or((String::new(), String::new()));
lqos_bus::FlowbeeSummaryData {
remote_ip: ip.remote_ip.as_ip().to_string(),
local_ip: ip.local_ip.as_ip().to_string(),
@@ -679,6 +688,8 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
last_seen: flow.0.last_seen,
start_time: flow.0.start_time,
rtt_nanos: DownUpOrder::new(flow.0.rtt[0].as_nanos(), flow.0.rtt[1].as_nanos()),
circuit_id,
circuit_name,
}
})
.collect();
@@ -691,6 +702,7 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
if let Ok(ip) = ip.parse::<IpAddr>() {
let ip = XdpIpAddress::from_ip(ip);
let lock = ALL_FLOWS.lock().unwrap();
let sd = SHAPED_DEVICES.read().unwrap();
let matching_flows: Vec<_> = lock
.iter()
.filter(|(key, _)| key.local_ip == ip)
@@ -698,6 +710,8 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
let (remote_asn_name, remote_asn_country) =
get_asn_name_and_country(key.remote_ip.as_ip());
let (circuit_id, circuit_name) = sd.get_circuit_id_and_name_from_ip(&key.local_ip).unwrap_or((String::new(), String::new()));
lqos_bus::FlowbeeSummaryData {
remote_ip: key.remote_ip.as_ip().to_string(),
local_ip: key.local_ip.as_ip().to_string(),
@@ -718,6 +732,8 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: DownUpOrder::new(row.0.rtt[0].as_nanos(), row.0.rtt[1].as_nanos()),
circuit_id,
circuit_name,
}
})
.collect();