Merge pull request #538 from LibreQoE/flow_timeline_data

Flow timeline data
This commit is contained in:
Robert Chacón 2024-07-29 10:09:40 -06:00 committed by GitHub
commit ea88d50d51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 902 additions and 14 deletions

View File

@ -39,7 +39,7 @@ zerocopy = { workspace = true }
fxhash = "0.2.1"
axum = { version = "0.7.5", features = ["ws", "http2"] }
axum-extra = { version = "0.9.3", features = ["cookie", "cookie-private"] }
tower-http = { version = "0.5.2", features = ["fs"] }
tower-http = { version = "0.5.2", features = ["fs", "cors"] }
strum = { version = "0.26.3", features = ["derive"] }
default-net = { workspace = true }
surge-ping = "0.8.1"

View File

@ -1,6 +1,6 @@
#!/bin/bash
set -e
scripts=( index.js template.js login.js first-run.js shaped-devices.js tree.js help.js unknown-ips.js configuration.js circuit.js flow_map.js all_tree_sankey.js )
scripts=( index.js template.js login.js first-run.js shaped-devices.js tree.js help.js unknown-ips.js configuration.js circuit.js flow_map.js all_tree_sankey.js asn_explorer.js )
for script in "${scripts[@]}"
do
echo "Building {$script}"

View File

@ -98,7 +98,7 @@ function start() {
fontSize: 6,
color: "#999"
};
if (redact) label.fontSize = 0;
if (redact) label.backgroundColor = label.color;
nodes.push({
name: data[i][1].name,

View File

@ -0,0 +1,510 @@
import {clearDiv} from "./helpers/builders";
import {scaleNanos, scaleNumber} from "./helpers/scaling";
//const API_URL = "local-api/";
const API_URL = "local-api/";
const LIST_URL = API_URL + "asnList";
const FLOW_URL = API_URL + "flowTimeline/";
let asnList = [];
let countryList = [];
let protocolList = [];
let asnData = [];
let graphMinTime = Number.MAX_SAFE_INTEGER;
let graphMaxTime = Number.MIN_SAFE_INTEGER;
const itemsPerPage = 20;
let page = 0;
let renderMode = "asn";
let sortBy = "start";
let sortOptionsList = [
{ tag: "start", label: "Start Time" },
{ tag: "duration", label: "Duration" },
{ tag: "bytes", label: "Bytes" },
];
function unixTimeToDate(unixTime) {
return new Date(unixTime * 1000).toLocaleString();
}
function asnDropdown() {
$.get(LIST_URL, (data) => {
asnList = data;
// Sort data by row.count, descending
data.sort((a, b) => {
return b.count - a.count;
});
// Build the dropdown
let parentDiv = document.createElement("div");
parentDiv.classList.add("dropdown");
let button = document.createElement("button");
button.classList.add("btn", "btn-secondary", "dropdown-toggle", "btn-sm");
button.type = "button";
button.innerHTML = "Select ASN";
button.setAttribute("data-bs-toggle", "dropdown");
button.setAttribute("aria-expanded", "false");
parentDiv.appendChild(button);
let dropdownList = document.createElement("ul");
dropdownList.classList.add("dropdown-menu");
if (data.length === 0) {
data.push({asn: 0, name: "No data", count: 0});
}
// Add items
data.forEach((row) => {
let li = document.createElement("li");
li.innerHTML = "#" + row.asn + " " + row.name + " (" + row.count + ")";
li.classList.add("dropdown-item");
li.onclick = () => {
selectAsn(row.asn);
renderMode = "asn";
};
dropdownList.appendChild(li);
});
parentDiv.appendChild(dropdownList);
let target = document.getElementById("asnList");
clearDiv(target);
target.appendChild(parentDiv);
});
}
function countryDropdown() {
$.get(API_URL + "countryList", (data) => {
countryList = data;
// Sort data by row.count, descending
data.sort((a, b) => {
return b.count - a.count;
});
//console.log(data);
// Build the dropdown
let parentDiv = document.createElement("div");
parentDiv.classList.add("dropdown");
let button = document.createElement("button");
button.classList.add("btn", "btn-secondary", "dropdown-toggle", "btn-sm");
button.type = "button";
button.innerHTML = "Select Country";
button.setAttribute("data-bs-toggle", "dropdown");
button.setAttribute("aria-expanded", "false");
parentDiv.appendChild(button);
let dropdownList = document.createElement("ul");
dropdownList.classList.add("dropdown-menu");
// Add items
data.forEach((row) => {
let li = document.createElement("li");
li.innerHTML = "<img alt='" + row.iso_code + "' src='flags/" + row.iso_code.toLowerCase() + ".svg' height=12 width=12 />" + row.name + " (" + row.count + ")";
li.classList.add("dropdown-item");
li.onclick = () => {
selectCountry(row.iso_code);
renderMode = "country";
};
dropdownList.appendChild(li);
});
parentDiv.appendChild(dropdownList);
let target = document.getElementById("countryList");
clearDiv(target);
target.appendChild(parentDiv);
});
}
function protocolDropdown() {
$.get(API_URL + "protocolList", (data) => {
protocolList = data;
// Sort data by row.count, descending
data.sort((a, b) => {
return b.count - a.count;
});
//console.log(data);
// Build the dropdown
let parentDiv = document.createElement("div");
parentDiv.classList.add("dropdown");
let button = document.createElement("button");
button.classList.add("btn", "btn-secondary", "dropdown-toggle", "btn-sm");
button.type = "button";
button.innerHTML = "Select Protocol";
button.setAttribute("data-bs-toggle", "dropdown");
button.setAttribute("aria-expanded", "false");
parentDiv.appendChild(button);
let dropdownList = document.createElement("ul");
dropdownList.classList.add("dropdown-menu");
// Add items
data.forEach((row) => {
let li = document.createElement("li");
li.innerHTML = row.protocol + " (" + row.count + ")";
li.classList.add("dropdown-item");
li.onclick = () => {
selectProtocol(row.protocol);
renderMode = "protocol";
};
dropdownList.appendChild(li);
});
parentDiv.appendChild(dropdownList);
let target = document.getElementById("protocolList");
clearDiv(target);
target.appendChild(parentDiv);
});
}
function selectAsn(asn) {
$.get(FLOW_URL + encodeURI(asn), (data) => {
page = 0;
renderAsn(asn, data);
});
}
function selectCountry(country) {
let url = API_URL + "countryTimeline/" + encodeURI(country);
$.get(url, (data) => {
page = 0;
renderAsn(country, data);
});
}
function selectProtocol(protocol) {
let url = API_URL + "protocolTimeline/" + encodeURI(protocol.replace('/', '_'));
$.get(url, (data) => {
page = 0;
renderAsn(protocol, data);
});
}
function renderAsn(asn, data) {
let heading = document.createElement("h2");
if (renderMode === "asn") {
let targetAsn = asnList.find((row) => row.asn === asn);
if (targetAsn === undefined || targetAsn === null) {
console.error("Could not find ASN: " + asn);
return;
}
// Build the heading
heading.innerText = "ASN #" + asn.toFixed(0) + " (" + targetAsn.name + ")";
} else if (renderMode === "country") {
let targetCountry = countryList.find((row) => row.iso_code === asn);
if (targetCountry === undefined || targetCountry === null) {
console.error("Could not find country: " + asn);
return;
}
// Build the heading
heading.innerHTML = "<img alt='" + targetCountry.iso_code + "' src='flags/" + targetCountry.iso_code.toLowerCase() + ".svg' height=32 width=32 />" + targetCountry.name;
} else if (renderMode === "protocol") {
// Build the heading
heading.innerText = "Protocol: " + asn;
}
let target = document.getElementById("asnDetails");
// Get the flow data
asnData = data;
// Sort by the selected sort key
switch (sortBy) {
case "start": {
data.sort((a, b) => {
return a.start - b.start;
});
} break;
case "duration": {
data.sort((a, b) => {
return b.duration_nanos - a.duration_nanos;
});
} break;
case "bytes": {
data.sort((a, b) => {
return (b.total_bytes.down + b.total_bytes.up) - (a.total_bytes.down + a.total_bytes.up);
});
}
}
let div = document.createElement("div");
div.classList.add("row");
let minTime = Number.MAX_SAFE_INTEGER;
let maxTime = Number.MIN_SAFE_INTEGER;
// Calculate time overall
data.forEach((row) => {
// Update min/max time
if (row.start < minTime) {
minTime = row.start;
}
if (row.end > maxTime) {
maxTime = row.end;
}
});
// Store the global time range
graphMinTime = minTime;
graphMaxTime = maxTime;
// Header row (explain the columns)
let headerDiv = document.createElement("div");
headerDiv.classList.add("row");
let headerBytes = document.createElement("div");
headerBytes.classList.add("col-1", "text-secondary");
headerBytes.innerText = "Bytes";
headerDiv.appendChild(headerBytes);
let headerRtt = document.createElement("div");
headerRtt.classList.add("col-1", "text-secondary");
headerRtt.innerText = "RTT";
headerDiv.appendChild(headerRtt);
let headerClient = document.createElement("div");
headerClient.classList.add("col-1", "text-secondary");
headerClient.innerText = "Client";
headerDiv.appendChild(headerClient);
let headerRemote = document.createElement("div");
headerRemote.classList.add("col-1", "text-secondary");
headerRemote.innerText = "Remote";
headerDiv.appendChild(headerRemote);
let headerProtocol = document.createElement("div");
headerProtocol.classList.add("col-1", "text-secondary");
headerProtocol.innerText = "Protocol";
headerDiv.appendChild(headerProtocol);
let headerTime1 = document.createElement("div");
headerTime1.classList.add("col-4", "text-secondary");
headerTime1.innerText = unixTimeToDate(minTime);
headerDiv.appendChild(headerTime1);
let headerTime2 = document.createElement("div");
headerTime2.classList.add("col-3", "text-secondary", "text-end");
headerTime2.innerText = unixTimeToDate(maxTime);
headerDiv.appendChild(headerTime2);
let flowsDiv = document.createElement("div");
for (let i= page * itemsPerPage; i<(page+1) * itemsPerPage; i++) {
if (i >= data.length) break;
let row = data[i];
// Build the headings
let totalCol = document.createElement("div");
totalCol.classList.add("col-1", "text-secondary", "small");
totalCol.innerText = scaleNumber(row.total_bytes.down, 0) + " / " + scaleNumber(row.total_bytes.up);
div.appendChild(totalCol);
let rttCol = document.createElement("div");
rttCol.classList.add("col-1", "text-secondary", "small");
let rttDown = row.rtt[0] !== undefined ? scaleNanos(row.rtt[0].nanoseconds, 0) : "-";
let rttUp = row.rtt[1] !== undefined ? scaleNanos(row.rtt[1].nanoseconds, 0) : "-";
rttCol.innerText = rttDown + " / " + rttUp;
div.appendChild(rttCol);
let clientCol = document.createElement("div");
clientCol.classList.add("col-1", "text-secondary", "small");
if (row.circuit_id !== "") {
let clientLink = document.createElement("a");
clientLink.href = "/circuit/" + encodeURI(row.circuit_id);
clientLink.innerText = row.circuit_name;
clientLink.classList.add("redactable");
clientLink.style.textOverflow = "ellipsis";
clientCol.appendChild(clientLink);
} else {
clientCol.innerText = row.circuit_name;
}
div.appendChild(clientCol);
let remoteCol = document.createElement("div");
remoteCol.classList.add("col-1", "text-secondary", "small");
remoteCol.innerText = row.remote_ip;
div.appendChild(remoteCol);
let protocolCol = document.createElement("div");
protocolCol.classList.add("col-1", "text-secondary", "small");
protocolCol.innerText = row.protocol;
div.appendChild(protocolCol);
// Build a canvas div, we'll decorate this later
let canvasCol = document.createElement("div");
canvasCol.classList.add("col-7");
let canvas = document.createElement("canvas");
canvas.id = "flowCanvas" + i;
canvas.style.width = "100%";
canvas.style.height = "20px";
canvasCol.appendChild(canvas);
div.appendChild(canvasCol);
flowsDiv.appendChild(div);
}
// Apply the data to the page
clearDiv(target);
target.appendChild(heading);
let nextButton = document.createElement("button");
nextButton.classList.add("btn", "btn-secondary", "btn-sm", "ms-2");
nextButton.innerHTML = "<i class='fa fa-arrow-right'></i> Next";
nextButton.onclick = () => {
page++;
if (page * itemsPerPage >= data.length) page = Math.floor(data.length / itemsPerPage);
renderAsn(asn, data);
};
let prevButton = document.createElement("button");
nextButton.classList.add("btn", "btn-secondary", "btn-sm", "me-2");
prevButton.innerHTML = "<i class='fa fa-arrow-left'></i> Prev";
prevButton.onclick = () => {
page--;
if (page < 0) page = 0;
renderAsn(asn, data);
}
let paginator = document.createElement("span");
paginator.classList.add("text-secondary", "small", "ms-2", "me-2");
paginator.innerText = "Page " + (page + 1) + " of " + Math.ceil(data.length / itemsPerPage);
paginator.id = "paginator";
let sortOptions = document.createElement("span");
sortOptions.classList.add("text-secondary", "small", "ms-2", "me-2");
sortOptions.innerText = "Sort by: ";
let sortBox = document.createElement("select");
sortBox.classList.add("small");
sortBox.id = "sortBox";
sortOptionsList.forEach((option) => {
let opt = document.createElement("option");
opt.value = option.tag;
opt.innerText = option.label;
if (option.tag === sortBy) {
opt.selected = true;
}
sortBox.appendChild(opt);
});
sortBox.onchange = () => {
let sortBox = document.getElementById("sortBox");
sortBy = sortBox.value;
renderAsn(asn, data);
}
let controlDiv = document.createElement("div");
controlDiv.classList.add("mb-2");
controlDiv.appendChild(prevButton);
controlDiv.appendChild(paginator);
controlDiv.appendChild(nextButton);
controlDiv.appendChild(sortOptions);
controlDiv.appendChild(sortBox);
target.appendChild(controlDiv);
target.appendChild(headerDiv);
target.appendChild(flowsDiv);
// Wait for the page to render before drawing the graphs
requestAnimationFrame(() => {
setTimeout(() => {
drawTimeline();
});
});
}
function timeToX(time, width) {
let range = graphMaxTime - graphMinTime;
let offset = time - graphMinTime;
return (offset / range) * width;
}
function drawTimeline() {
var style = getComputedStyle(document.body)
let regionBg = style.getPropertyValue('--bs-tertiary-bg');
let lineColor = style.getPropertyValue('--bs-primary');
let axisColor = style.getPropertyValue('--bs-secondary');
for (let i=page * itemsPerPage; i<(page+1)*itemsPerPage; i++) {
let row = asnData[i];
//console.log(row);
let canvasId = "flowCanvas" + i;
// Get the canvas context
let canvas = document.getElementById(canvasId);
if (canvas === null) break;
const { width, height } = canvas.getBoundingClientRect();
canvas.width = width;
canvas.height = height;
let ctx = canvas.getContext("2d");
// Draw the background for the time period
ctx.fillStyle = regionBg;
ctx.fillRect(timeToX(row.start, width), 0, timeToX(row.end, width), height);
// Draw red lines for TCP retransmits
ctx.strokeStyle = "red";
row.retransmit_times_down.forEach((time) => {
// Start at y/2, end at y
ctx.beginPath();
ctx.moveTo(timeToX(time, width), height / 2);
ctx.lineTo(timeToX(time, width), height);
ctx.stroke();
});
row.retransmit_times_up.forEach((time) => {
// Start at 0, end at y/2
ctx.beginPath();
ctx.moveTo(timeToX(time, width), 0);
ctx.lineTo(timeToX(time, width), height / 2);
ctx.stroke();
});
// Draw a horizontal axis line the length of the canvas area at y/2
ctx.strokeStyle = axisColor;
ctx.beginPath();
ctx.moveTo(timeToX(row.start, width), height / 2);
ctx.lineTo(timeToX(row.end, width), height / 2);
ctx.stroke();
// Calculate maxThroughputUp and maxThroughputDown for this row
let maxThroughputDown = 0;
let maxThroughputUp = 0;
row.throughput.forEach((value) => {
if (value.down > maxThroughputDown) {
maxThroughputDown = value.down;
}
if (value.up > maxThroughputUp) {
maxThroughputUp = value.up;
}
});
// Draw a throughput down line. Y from y/2 to height, scaled to maxThroughputDown
ctx.strokeStyle = lineColor;
ctx.beginPath();
let numberOfSamples = row.throughput.length;
let startX = timeToX(row.start, width);
let endX = timeToX(row.end, width);
let sampleWidth = (endX - startX) / numberOfSamples;
let x = timeToX(row.start, width);
ctx.moveTo(x, height/2);
let trimmedHeight = height - 4;
row.throughput.forEach((value) => {
let downPercent = value.down / maxThroughputDown;
let y = (height/2) - (downPercent * (trimmedHeight / 2));
ctx.lineTo(x, y);
x += sampleWidth;
});
ctx.stroke();
x = timeToX(row.start, width);
ctx.moveTo(x, height/2);
row.throughput.forEach((value) => {
let upPercent = value.up / maxThroughputUp;
let y = (height/2) + (upPercent * (trimmedHeight / 2));
ctx.lineTo(x, y);
x += sampleWidth;
});
ctx.stroke();
}
}
asnDropdown();
countryDropdown();
protocolDropdown();

View File

@ -1,6 +1,5 @@
import {BaseDashlet} from "./base_dashlet";
import {clearDiv, simpleRowHtml, theading} from "../helpers/builders";
import {formatThroughput, formatRetransmit, formatCakeStat, lerpGreenToRedViaOrange} from "../helpers/scaling";
import {lerpGreenToRedViaOrange} from "../helpers/scaling";
import {DashboardGraph} from "../graphs/dashboard_graph";
import {isRedacted} from "../helpers/redact";
@ -92,7 +91,7 @@ export class TopTreeSankey extends BaseDashlet {
fontSize: 9,
color: "#999"
};
if (redact) label.fontSize = 0;
if (redact) label.backgroundColor = label.color;
let name = r[1].name;
let bytes = r[1].current_throughput[0];

View File

@ -1,5 +1,6 @@
import {DashboardGraph} from "./dashboard_graph";
import {lerpColor, lerpGreenToRedViaOrange, scaleNumber} from "../helpers/scaling";
import {isRedacted} from "../helpers/redact";
export class TopNSankey extends DashboardGraph {
constructor(id) {
@ -59,6 +60,7 @@ export class TopNSankey extends DashboardGraph {
fontSize: 9,
color: "#999"
};
if (isRedacted()) label.backgroundColor = label.color;
let name = r.ip_address+ " (" + scaleNumber(r.bits_per_second.down, 0) + ", " + r.tcp_retransmits.down + "/" + r.tcp_retransmits.up + ")";
let bytes = r.bits_per_second.down / 8;

View File

@ -13,10 +13,12 @@ mod circuit;
mod packet_analysis;
mod flow_map;
mod warnings;
mod flow_explorer;
use axum::Router;
use axum::routing::{get, post};
use crate::node_manager::auth::auth_layer;
use tower_http::cors::CorsLayer;
pub fn local_api() -> Router {
Router::new()
@ -48,5 +50,12 @@ pub fn local_api() -> Router {
.route("/pcapDump/:id", get(packet_analysis::pcap_dump))
.route("/flowMap", get(flow_map::flow_lat_lon))
.route("/globalWarnings", get(warnings::get_global_warnings))
.route("/asnList", get(flow_explorer::asn_list))
.route("/countryList", get(flow_explorer::country_list))
.route("/protocolList", get(flow_explorer::protocol_list))
.route("/flowTimeline/:asn_id", get(flow_explorer::flow_timeline))
.route("/countryTimeline/:iso_code", get(flow_explorer::country_timeline))
.route("/protocolTimeline/:protocol", get(flow_explorer::protocol_timeline))
.layer(CorsLayer::very_permissive())
.route_layer(axum::middleware::from_fn(auth_layer))
}

View File

@ -0,0 +1,117 @@
use std::time::Duration;
use axum::extract::Path;
use axum::Json;
use serde::Serialize;
use lqos_sys::flowbee_data::FlowbeeKey;
use lqos_utils::units::DownUpOrder;
use lqos_utils::unix_time::{time_since_boot, unix_now};
use crate::shaped_devices_tracker::SHAPED_DEVICES;
use crate::throughput_tracker::flow_data::{AsnListEntry, AsnCountryListEntry, AsnProtocolListEntry,
RECENT_FLOWS, RttData, FlowbeeLocalData, FlowAnalysis};
pub async fn asn_list() -> Json<Vec<AsnListEntry>> {
Json(RECENT_FLOWS.asn_list())
}
pub async fn country_list() -> Json<Vec<AsnCountryListEntry>> {
Json(RECENT_FLOWS.country_list())
}
pub async fn protocol_list() -> Json<Vec<AsnProtocolListEntry>> {
Json(RECENT_FLOWS.protocol_list())
}
#[derive(Serialize)]
pub struct FlowTimeline {
start: u64,
end: u64,
duration_nanos: u64,
throughput: Vec<DownUpOrder<u64>>,
tcp_retransmits: DownUpOrder<u16>,
rtt: [RttData; 2],
retransmit_times_down: Vec<u64>,
retransmit_times_up: Vec<u64>,
total_bytes: DownUpOrder<u64>,
protocol: String,
circuit_id: String,
circuit_name: String,
remote_ip: String,
}
pub async fn flow_timeline(Path(asn_id): Path<u32>) -> Json<Vec<FlowTimeline>> {
let time_since_boot = time_since_boot().unwrap();
let since_boot = Duration::from(time_since_boot);
let boot_time = unix_now().unwrap() - since_boot.as_secs();
let all_flows_for_asn = RECENT_FLOWS.all_flows_for_asn(asn_id);
let flows = all_flows_to_transport(boot_time, all_flows_for_asn);
Json(flows)
}
fn all_flows_to_transport(boot_time: u64, all_flows_for_asn: Vec<(FlowbeeKey, FlowbeeLocalData, FlowAnalysis)>) -> Vec<FlowTimeline> {
all_flows_for_asn
.iter()
.filter(|flow| {
// Total flow time > 2 seconds
flow.1.last_seen - flow.1.start_time > 2_000_000_000
})
.map(|flow| {
let (circuit_id, mut circuit_name) = {
let sd = SHAPED_DEVICES.read().unwrap();
sd.get_circuit_id_and_name_from_ip(&flow.0.local_ip).unwrap_or((String::new(), String::new()))
};
if circuit_name.is_empty() {
circuit_name = flow.0.local_ip.as_ip().to_string();
}
FlowTimeline {
start: boot_time + Duration::from_nanos(flow.1.start_time).as_secs(),
end: boot_time + Duration::from_nanos(flow.1.last_seen).as_secs(),
duration_nanos: flow.1.last_seen - flow.1.start_time,
tcp_retransmits: flow.1.tcp_retransmits.clone(),
throughput: flow.1.throughput_buffer.clone(),
rtt: flow.1.rtt.clone(),
retransmit_times_down: flow.1.retry_times_down
.iter()
.map(|t| boot_time + Duration::from_nanos(*t).as_secs())
.collect(),
retransmit_times_up: flow.1.retry_times_up
.iter()
.map(|t| boot_time + Duration::from_nanos(*t).as_secs())
.collect(),
total_bytes: flow.1.bytes_sent.clone(),
protocol: flow.2.protocol_analysis.to_string(),
circuit_id,
circuit_name,
remote_ip: flow.0.remote_ip.as_ip().to_string(),
}
})
.collect::<Vec<_>>()
}
pub async fn country_timeline(Path(iso_code): Path<String>) -> Json<Vec<FlowTimeline>> {
let time_since_boot = time_since_boot().unwrap();
let since_boot = Duration::from(time_since_boot);
let boot_time = unix_now().unwrap() - since_boot.as_secs();
let all_flows_for_asn = RECENT_FLOWS.all_flows_for_country(&iso_code);
let flows = all_flows_to_transport(boot_time, all_flows_for_asn);
Json(flows)
}
pub async fn protocol_timeline(Path(protocol_name): Path<String>) -> Json<Vec<FlowTimeline>> {
let protocol_name = protocol_name.replace("_", "/");
let time_since_boot = time_since_boot().unwrap();
let since_boot = Duration::from(time_since_boot);
let boot_time = unix_now().unwrap() - since_boot.as_secs();
let all_flows_for_asn = RECENT_FLOWS.all_flows_for_protocol(&protocol_name);
let flows = all_flows_to_transport(boot_time, all_flows_for_asn);
Json(flows)
}

View File

@ -0,0 +1,17 @@
<div class="row">
<div class="col-1">
<span id="asnList"><i class="fa fa-spin fa-spinner"></i> Loading, Please Wait (this can take a minute)</span>
</div>
<div class="col-1">
<span id="countryList"><i class="fa fa-spin fa-spinner"></i> Loading, Please Wait (this can take a minute)</span>
</div>
<div class="col-1 ms-4">
<span id="protocolList"><i class="fa fa-spin fa-spinner"></i> Loading, Please Wait (this can take a minute)</span>
</div>
</div>
<div class="row">
<div class="col-12" id="asnDetails">
</div>
</div>
<script src="asn_explorer.js"></script>

View File

@ -77,6 +77,12 @@
<i class="fa fa-fw fa-centerline fa-server nav-icon"></i> Tree Overview
</a>
</li>
<!-- ASN Explorer -->
<li class="nav-item">
<a class="nav-link" href="asn_explorer.html">
<i class="fa fa-fw fa-centerline fa-globe nav-icon"></i> ASN Explorer
</a>
</li>
<!-- Statistics -->
<li class="nav-item">
<a class="nav-link" id="lnkStats">

View File

@ -32,6 +32,7 @@ pub(super) fn static_routes() -> Result<Router> {
"index.html", "shaped_devices.html", "tree.html",
"help.html", "unknown_ips.html", "configuration.html",
"circuit.html", "flow_map.html", "all_tree_sankey.html",
"asn_explorer.html",
];
// Iterate through pages and construct the router

View File

@ -3,6 +3,7 @@
use std::{io::Read, net::IpAddr, path::Path};
use fxhash::FxHashMap;
use serde::Deserialize;
#[derive(Deserialize, Clone, Debug)]
@ -44,6 +45,7 @@ struct Geobin {
pub struct GeoTable {
asn_trie: ip_network_table::IpNetworkTable<AsnEncoded>,
geo_trie: ip_network_table::IpNetworkTable<GeoIpLocation>,
asn_lookup: FxHashMap<u32, String>,
}
impl GeoTable {
@ -78,10 +80,13 @@ impl GeoTable {
flate2::read::GzDecoder::new(file).read_to_end(&mut buffer)?;
let geobin: Geobin = bincode::deserialize(&buffer)?;
// Build the ASN trie
// Build the ASN trie and ASN lookup map
let mut asn_lookup = FxHashMap::default();
log::info!("Building ASN trie");
let mut asn_trie = ip_network_table::IpNetworkTable::<AsnEncoded>::new();
for entry in geobin.asn {
asn_lookup.insert(entry.asn, entry.organization.clone());
let (ip, prefix) = match entry.network {
IpAddr::V4(ip) => (ip.to_ipv6_mapped(), entry.prefix+96 ),
IpAddr::V6(ip) => (ip, entry.prefix),
@ -109,6 +114,7 @@ impl GeoTable {
Ok(Self {
asn_trie,
geo_trie,
asn_lookup,
})
}
@ -133,9 +139,9 @@ impl GeoTable {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
let mut owners = String::new();
let mut country = String::new();
let mut flag = String::new();
let mut owners = "Unknown".to_string();
let mut country = "Unknown".to_string();
let mut flag = "Unknown".to_string();
if let Some(matched) = self.asn_trie.longest_match(ip) {
log::debug!("Matched ASN: {:?}", matched.1.asn);
@ -168,6 +174,10 @@ impl GeoTable {
(0.0, 0.0)
}
pub fn find_name_by_id(&self, id: u32) -> String {
self.asn_lookup.get(&id).cloned().unwrap_or_else(|| "Unknown".to_string())
}
}
#[derive(Default)]

View File

@ -1,4 +1,4 @@
use super::{get_asn_lat_lon, get_asn_name_and_country, FlowAnalysis};
use super::{get_asn_lat_lon, get_asn_name_and_country, FlowAnalysis, get_asn_name_by_id};
use crate::throughput_tracker::flow_data::{FlowbeeLocalData, FlowbeeRecipient};
use fxhash::FxHashMap;
use lqos_bus::BusResponse;
@ -14,6 +14,7 @@ pub struct TimeBuffer {
buffer: Mutex<Vec<TimeEntry>>,
}
#[derive(Clone, Debug)]
struct TimeEntry {
time: u64,
data: (FlowbeeKey, FlowbeeLocalData, FlowAnalysis),
@ -25,6 +26,26 @@ pub struct FlowDurationSummary {
duration: u64,
}
#[derive(Debug, Serialize)]
pub struct AsnListEntry {
count: usize,
asn: u32,
name: String,
}
#[derive(Debug, Serialize)]
pub struct AsnCountryListEntry {
count: usize,
name: String,
iso_code: String,
}
#[derive(Debug, Serialize)]
pub struct AsnProtocolListEntry {
count: usize,
protocol: String,
}
impl TimeBuffer {
fn new() -> Self {
Self {
@ -261,6 +282,144 @@ impl TimeBuffer {
.map(|(count, duration)| FlowDurationSummary { count, duration })
.collect()
}
pub fn all_flows_for_asn(&self, id: u32) -> Vec<(FlowbeeKey, FlowbeeLocalData, FlowAnalysis)> {
let buffer = self.buffer.lock().unwrap();
buffer
.iter()
.filter(|flow| flow.data.2.asn_id.0 == id )
.map(|flow| flow.data.clone())
.collect()
}
pub fn all_flows_for_country(&self, iso_code: &str) -> Vec<(FlowbeeKey, FlowbeeLocalData, FlowAnalysis)> {
let buffer = self.buffer.lock().unwrap();
buffer
.iter()
.filter(|flow| {
let country = get_asn_name_and_country(flow.data.0.remote_ip.as_ip());
country.flag == iso_code
})
.map(|flow| flow.data.clone())
.collect()
}
pub fn all_flows_for_protocol(&self, protocol_name: &str) -> Vec<(FlowbeeKey, FlowbeeLocalData, FlowAnalysis)> {
let buffer = self.buffer.lock().unwrap();
buffer
.iter()
.filter(|flow| {
flow.data.2.protocol_analysis.to_string() == protocol_name
})
.map(|flow| flow.data.clone())
.collect()
}
/// Builds a list of all ASNs with recent data, and how many flows they have.
pub fn asn_list(&self) -> Vec<AsnListEntry> {
// 1: Clone: large operation, don't keep the buffer locked longer than we have to
let buffer = {
let buffer = self.buffer.lock().unwrap();
buffer.clone()
};
// Filter out short flows and reduce to the ASN ID# only
let mut buffer: Vec<_> = buffer
.into_iter()
.filter(|flow| {
// Total flow time > 3 seconds
flow.data.1.last_seen - flow.data.1.start_time > 3_000_000_000
})
.map(|flow| flow.data.2.asn_id.0)
.collect();
// Sort the buffer
buffer.sort_unstable();
// Deduplicate and count, decorate with name
buffer
.into_iter()
.sorted()
.dedup_with_count()
.map(|(count, asn)| AsnListEntry {
count,
asn,
name: get_asn_name_by_id(asn),
})
.collect()
}
/// Builds a list of ASNs by country with recent data, and how many flows they have.
pub fn country_list(&self) -> Vec<AsnCountryListEntry> {
// 1: Clone: large operation, don't keep the buffer locked longer than we have to
let buffer = {
let buffer = self.buffer.lock().unwrap();
buffer.clone()
};
// Filter out the short flows and get the country & flag
let mut buffer: Vec<(String, String)> = buffer
.into_iter()
.filter(|flow| {
// Total flow time > 3 seconds
flow.data.1.last_seen - flow.data.1.start_time > 3_000_000_000
})
.map(|flow| {
let country = get_asn_name_and_country(flow.data.0.remote_ip.as_ip());
(country.country, country.flag)
})
.collect();
// Sort the buffer
buffer.sort_unstable_by(|a, b| a.0.cmp(&b.0));
// Deduplicate and count, decorate with name
buffer
.into_iter()
.sorted()
.dedup_with_count()
.map(|(count, asn)| AsnCountryListEntry {
count,
name: asn.0,
iso_code: asn.1,
})
.collect()
}
/// Builds a list of protocols with recent data, and how many flows they have.
pub fn protocol_list(&self) -> Vec<AsnProtocolListEntry> {
// 1: Clone: large operation, don't keep the buffer locked longer than we have to
let buffer = {
let buffer = self.buffer.lock().unwrap();
buffer.clone()
};
// Filter out the short flows and get the country & flag
let mut buffer: Vec<String> = buffer
.into_iter()
.filter(|flow| {
// Total flow time > 3 seconds
flow.data.1.last_seen - flow.data.1.start_time > 3_000_000_000
})
.map(|flow| {
flow.data.2.protocol_analysis.to_string()
})
.collect();
// Sort the buffer
buffer.sort_unstable_by(|a, b| a.cmp(&b));
// Deduplicate and count, decorate with name
buffer
.into_iter()
.sorted()
.dedup_with_count()
.map(|(count, protocol)| AsnProtocolListEntry {
count,
protocol,
})
.collect()
}
}
pub static RECENT_FLOWS: Lazy<TimeBuffer> = Lazy::new(|| TimeBuffer::new());
@ -281,8 +440,9 @@ impl FinishedFlowAnalysis {
}
impl FlowbeeRecipient for FinishedFlowAnalysis {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis) {
fn enqueue(&self, key: FlowbeeKey, mut data: FlowbeeLocalData, analysis: FlowAnalysis) {
log::debug!("Finished flow analysis");
data.trim(); // Remove the trailing 30 seconds of zeroes
RECENT_FLOWS.push(TimeEntry {
time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)

View File

@ -14,6 +14,7 @@ mod kernel_ringbuffer;
pub use kernel_ringbuffer::*;
mod rtt_types;
pub use rtt_types::RttData;
pub use finished_flows::{AsnListEntry, AsnCountryListEntry, AsnProtocolListEntry};
use crate::throughput_tracker::flow_data::flow_analysis::asn::AsnNameCountryFlag;
static ANALYSIS: Lazy<FlowAnalysisSystem> = Lazy::new(|| FlowAnalysisSystem::new());
@ -97,4 +98,13 @@ pub fn get_asn_lat_lon(ip: IpAddr) -> (f64, f64) {
}
}
(0.0, 0.0)
}
pub fn get_asn_name_by_id(id: u32) -> String {
if let Ok(table_lock) = ANALYSIS.asn_table.lock() {
if let Some(table) = table_lock.as_ref() {
return table.find_name_by_id(id);
}
}
"Unknown".to_string()
}

View File

@ -41,6 +41,12 @@ pub struct FlowbeeLocalData {
pub flags: u8,
/// Recent RTT median
pub rtt: [RttData; 2],
/// Throughput Buffer
pub throughput_buffer: Vec<DownUpOrder<u64>>,
/// When did the retries happen? In nanoseconds since kernel boot
pub retry_times_down: Vec<u64>,
/// When did the retries happen? In nanoseconds since kernel boot
pub retry_times_up: Vec<u64>,
}
impl From<&FlowbeeData> for FlowbeeLocalData {
@ -56,6 +62,34 @@ impl From<&FlowbeeData> for FlowbeeLocalData {
tos: data.tos,
flags: data.flags,
rtt: [RttData::from_nanos(0); 2],
throughput_buffer: vec![ data.bytes_sent ],
retry_times_down: Vec::new(),
retry_times_up: Vec::new(),
}
}
}
impl FlowbeeLocalData {
pub fn trim(&mut self) {
// Find the point at which the throughput buffer starts being all zeroes
let mut last_start: Option<usize> = None;
let mut in_zero_run = false;
for (i, &value) in self.throughput_buffer.iter().enumerate() {
if value.down == 0 && value.up == 0 {
if !in_zero_run {
in_zero_run = true;
last_start = Some(i);
}
} else {
in_zero_run = false;
}
}
if let Some(start_index) = last_start {
// There's a run of zeroes terminating the throughput buffer
// That means we need to truncate the buffer
self.throughput_buffer.truncate(start_index);
}
}
}

View File

@ -15,10 +15,10 @@ use std::sync::{
};
pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country,
FlowAnalysis, RECENT_FLOWS, flowbee_handle_events, get_flowbee_event_count_and_reset,
expire_rtt_flows, flowbee_rtt_map, RttData, get_rtt_events_per_second,
expire_rtt_flows, flowbee_rtt_map, RttData, get_rtt_events_per_second, AsnListEntry,
AsnCountryListEntry, AsnProtocolListEntry,
};
trait FlowbeeRecipient {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis);
}

View File

@ -226,6 +226,18 @@ impl ThroughputTracker {
} else {
// We have a valid flow, so it needs to be tracked
if let Some(this_flow) = all_flows_lock.get_mut(&key) {
// If retransmits have changed, add the time to the retry list
if data.tcp_retransmits.down != this_flow.0.tcp_retransmits.down {
this_flow.0.retry_times_down.push(data.last_seen);
}
if data.tcp_retransmits.up != this_flow.0.tcp_retransmits.up {
this_flow.0.retry_times_up.push(data.last_seen);
}
let change_since_last_time = data.bytes_sent.checked_sub_or_zero(this_flow.0.bytes_sent);
this_flow.0.throughput_buffer.push(change_since_last_time);
//println!("{change_since_last_time:?}");
this_flow.0.last_seen = data.last_seen;
this_flow.0.bytes_sent = data.bytes_sent;
this_flow.0.packets_sent = data.packets_sent;
@ -234,6 +246,7 @@ impl ThroughputTracker {
this_flow.0.end_status = data.end_status;
this_flow.0.tos = data.tos;
this_flow.0.flags = data.flags;
if let Some([up, down]) = rtt_samples.get(&key) {
if up.as_nanos() != 0 {
this_flow.0.rtt[0] = *up;