From f84798885b9cd8d8eb34ae8ea2db1d9f0d1ef98d Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Wed, 28 Feb 2024 15:03:03 -0600 Subject: [PATCH] Add a stub for listing top flows by current bitrate. --- src/rust/Cargo.lock | 14 ++++++-- src/rust/lqos_bus/src/bus/request.rs | 3 ++ src/rust/lqos_bus/src/bus/response.rs | 3 ++ .../lqos_node_manager/src/flow_monitor.rs | 12 +++++++ src/rust/lqos_node_manager/src/main.rs | 1 + src/rust/lqosd/Cargo.toml | 1 + src/rust/lqosd/src/main.rs | 1 + src/rust/lqosd/src/throughput_tracker/mod.rs | 32 +++++++++++++++++++ 8 files changed, 65 insertions(+), 2 deletions(-) diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 205f2544..9a9fff9c 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -556,7 +556,7 @@ dependencies = [ "criterion-plot", "futures", "is-terminal", - "itertools", + "itertools 0.10.5", "num-traits", "once_cell", "oorandom", @@ -578,7 +578,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" dependencies = [ "cast", - "itertools", + "itertools 0.10.5", ] [[package]] @@ -1354,6 +1354,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -1676,6 +1685,7 @@ dependencies = [ "anyhow", "dashmap", "env_logger", + "itertools 0.12.1", "jemallocator", "log", "lqos_bus", diff --git a/src/rust/lqos_bus/src/bus/request.rs b/src/rust/lqos_bus/src/bus/request.rs index b477d653..52d76607 100644 --- a/src/rust/lqos_bus/src/bus/request.rs +++ b/src/rust/lqos_bus/src/bus/request.rs @@ -159,6 +159,9 @@ pub enum BusRequest { /// Count the nubmer of active flows. CountActiveFlows, + + /// Top Flows Reports + TopFlows{ n: u32 }, } /// Specific requests from the long-term stats system diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index a9c30219..80c26bc1 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -122,4 +122,7 @@ pub enum BusResponse { /// Count active flows CountActiveFlows(u64), + + /// Top Flopws + TopFlows(Vec), } diff --git a/src/rust/lqos_node_manager/src/flow_monitor.rs b/src/rust/lqos_node_manager/src/flow_monitor.rs index e4dd1c56..61ed159f 100644 --- a/src/rust/lqos_node_manager/src/flow_monitor.rs +++ b/src/rust/lqos_node_manager/src/flow_monitor.rs @@ -23,5 +23,17 @@ pub async fn count_flows() -> NoCache> { _ => 0, }; + NoCache::new(Json(result)) +} + +#[get("/api/flows/top5")] +pub async fn top_5_flows() -> NoCache>> { + let responses = + bus_request(vec![BusRequest::TopFlows { n: 5 }]).await.unwrap(); + let result = match &responses[0] { + BusResponse::TopFlows(flowbee) => flowbee.to_owned(), + _ => Vec::new(), + }; + NoCache::new(Json(result)) } \ No newline at end of file diff --git a/src/rust/lqos_node_manager/src/main.rs b/src/rust/lqos_node_manager/src/main.rs index 2b8ba55d..d9d3bb4c 100644 --- a/src/rust/lqos_node_manager/src/main.rs +++ b/src/rust/lqos_node_manager/src/main.rs @@ -113,6 +113,7 @@ fn rocket() -> _ { // Flowbee System flow_monitor::all_flows_debug_dump, flow_monitor::count_flows, + flow_monitor::top_5_flows, ], ); diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index a5115991..10899c39 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -29,6 +29,7 @@ sysinfo = "0" dashmap = "5" num-traits = "0.2" thiserror = "1" +itertools = "0.12.1" # Support JemAlloc on supported platforms [target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies] diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index e4d455b1..114fad76 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -229,6 +229,7 @@ fn handle_bus_requests( BusRequest::CountActiveFlows => { throughput_tracker::count_active_flows() } + BusRequest::TopFlows { n } => throughput_tracker::top_flows(*n), }); } } diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index 2c83a6c1..72c3dc44 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -475,4 +475,36 @@ pub fn all_unknown_ips() -> BusResponse { pub fn count_active_flows() -> BusResponse { let lock = ALL_FLOWS.lock().unwrap(); BusResponse::CountActiveFlows(lock.len() as u64) + } + + /// Top Flows Report + pub fn top_flows(n: u32) -> BusResponse { + let lock = ALL_FLOWS.lock().unwrap(); + let mut table = lock.clone(); + table.sort_by(|a, b| { + let a_total = a.1.rate_estimate_bps[0] + a.1.rate_estimate_bps[1]; + let b_total = b.1.rate_estimate_bps[0] + b.1.rate_estimate_bps[1]; + b_total.cmp(&a_total) + }); + let result = table + .iter() + .take(n as usize) + .map(|(ip, flow)| { + lqos_bus::FlowbeeData { + remote_ip: ip.remote_ip.as_ip().to_string(), + local_ip: ip.local_ip.as_ip().to_string(), + src_port: ip.src_port, + dst_port: ip.dst_port, + ip_protocol: FlowbeeProtocol::from(ip.ip_protocol), + bytes_sent: flow.bytes_sent, + packets_sent: flow.packets_sent, + rate_estimate_bps: flow.rate_estimate_bps, + retries: flow.retries, + last_rtt: flow.last_rtt, + end_status: flow.end_status, + } + }) + .collect(); + + BusResponse::TopFlows(result) } \ No newline at end of file