Add a stub for listing top flows by current bitrate.

This commit is contained in:
Herbert Wolverson
2024-02-28 15:03:03 -06:00
parent 7516715874
commit f84798885b
8 changed files with 65 additions and 2 deletions

14
src/rust/Cargo.lock generated
View File

@@ -556,7 +556,7 @@ dependencies = [
"criterion-plot",
"futures",
"is-terminal",
"itertools",
"itertools 0.10.5",
"num-traits",
"once_cell",
"oorandom",
@@ -578,7 +578,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
dependencies = [
"cast",
"itertools",
"itertools 0.10.5",
]
[[package]]
@@ -1354,6 +1354,15 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.10"
@@ -1676,6 +1685,7 @@ dependencies = [
"anyhow",
"dashmap",
"env_logger",
"itertools 0.12.1",
"jemallocator",
"log",
"lqos_bus",

View File

@@ -159,6 +159,9 @@ pub enum BusRequest {
/// Count the nubmer of active flows.
CountActiveFlows,
/// Top Flows Reports
TopFlows{ n: u32 },
}
/// Specific requests from the long-term stats system

View File

@@ -122,4 +122,7 @@ pub enum BusResponse {
/// Count active flows
CountActiveFlows(u64),
/// Top Flopws
TopFlows(Vec<FlowbeeData>),
}

View File

@@ -23,5 +23,17 @@ pub async fn count_flows() -> NoCache<Json<u64>> {
_ => 0,
};
NoCache::new(Json(result))
}
#[get("/api/flows/top5")]
pub async fn top_5_flows() -> NoCache<Json<Vec<FlowbeeData>>> {
let responses =
bus_request(vec![BusRequest::TopFlows { n: 5 }]).await.unwrap();
let result = match &responses[0] {
BusResponse::TopFlows(flowbee) => flowbee.to_owned(),
_ => Vec::new(),
};
NoCache::new(Json(result))
}

View File

@@ -113,6 +113,7 @@ fn rocket() -> _ {
// Flowbee System
flow_monitor::all_flows_debug_dump,
flow_monitor::count_flows,
flow_monitor::top_5_flows,
],
);

View File

@@ -29,6 +29,7 @@ sysinfo = "0"
dashmap = "5"
num-traits = "0.2"
thiserror = "1"
itertools = "0.12.1"
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]

View File

@@ -229,6 +229,7 @@ fn handle_bus_requests(
BusRequest::CountActiveFlows => {
throughput_tracker::count_active_flows()
}
BusRequest::TopFlows { n } => throughput_tracker::top_flows(*n),
});
}
}

View File

@@ -475,4 +475,36 @@ pub fn all_unknown_ips() -> BusResponse {
pub fn count_active_flows() -> BusResponse {
let lock = ALL_FLOWS.lock().unwrap();
BusResponse::CountActiveFlows(lock.len() as u64)
}
/// Top Flows Report
pub fn top_flows(n: u32) -> BusResponse {
let lock = ALL_FLOWS.lock().unwrap();
let mut table = lock.clone();
table.sort_by(|a, b| {
let a_total = a.1.rate_estimate_bps[0] + a.1.rate_estimate_bps[1];
let b_total = b.1.rate_estimate_bps[0] + b.1.rate_estimate_bps[1];
b_total.cmp(&a_total)
});
let result = table
.iter()
.take(n as usize)
.map(|(ip, flow)| {
lqos_bus::FlowbeeData {
remote_ip: ip.remote_ip.as_ip().to_string(),
local_ip: ip.local_ip.as_ip().to_string(),
src_port: ip.src_port,
dst_port: ip.dst_port,
ip_protocol: FlowbeeProtocol::from(ip.ip_protocol),
bytes_sent: flow.bytes_sent,
packets_sent: flow.packets_sent,
rate_estimate_bps: flow.rate_estimate_bps,
retries: flow.retries,
last_rtt: flow.last_rtt,
end_status: flow.end_status,
}
})
.collect();
BusResponse::TopFlows(result)
}