From fc8b3ca9e458059bbd10492a25b00bb6c66619ee Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Fri, 20 Jan 2023 21:18:19 +0000 Subject: [PATCH] Allow paging on TOP-N queries Change BusRequest::GetTopNDownloaders and GetWorstRtt to accept a start and end field, rather than just "n_rows". --- src/rust/lqos_bus/src/bus/request.rs | 14 ++++++++++++-- .../lqos_node_manager/src/tracker/cache_manager.rs | 4 ++-- src/rust/lqosd/src/main.rs | 4 ++-- src/rust/lqosd/src/throughput_tracker/mod.rs | 10 ++++++---- src/rust/lqtop/src/main.rs | 2 +- 5 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/rust/lqos_bus/src/bus/request.rs b/src/rust/lqos_bus/src/bus/request.rs index 8cbdabad..aa45e634 100644 --- a/src/rust/lqos_bus/src/bus/request.rs +++ b/src/rust/lqos_bus/src/bus/request.rs @@ -15,10 +15,20 @@ pub enum BusRequest { GetCurrentThroughput, /// Retrieve the top N downloads by bandwidth use. - GetTopNDownloaders(u32), + GetTopNDownloaders{ + /// First row to retrieve (usually 0 unless you are paging) + start: u32, + /// Last row to retrieve (10 for top-10 starting at 0) + end: u32 + }, /// Retrieves the TopN hosts with the worst RTT, sorted by RTT descending. - GetWorstRtt(u32), + GetWorstRtt{ + /// First row to retrieve (usually 0 unless you are paging) + start: u32, + /// Last row to retrieve (10 for top-10 starting at 0) + end: u32 + }, /// Retrieves current byte counters for all hosts. GetHostCounter, diff --git a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs index 69d6aa01..547c5138 100644 --- a/src/rust/lqos_node_manager/src/tracker/cache_manager.rs +++ b/src/rust/lqos_node_manager/src/tracker/cache_manager.rs @@ -71,8 +71,8 @@ async fn get_data_from_server(bus_client: &mut BusClient) -> Result<()> { // Send request to lqosd let requests = vec![ BusRequest::GetCurrentThroughput, - BusRequest::GetTopNDownloaders(10), - BusRequest::GetWorstRtt(10), + BusRequest::GetTopNDownloaders{start: 0, end: 10}, + BusRequest::GetWorstRtt{start: 0, end: 10}, BusRequest::RttHistogram, BusRequest::AllUnknownIps, ]; diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index e737e16a..10b9a1ea 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -118,8 +118,8 @@ fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec throughput_tracker::current_throughput() } BusRequest::GetHostCounter => throughput_tracker::host_counters(), - BusRequest::GetTopNDownloaders(n) => throughput_tracker::top_n(*n), - BusRequest::GetWorstRtt(n) => throughput_tracker::worst_n(*n), + BusRequest::GetTopNDownloaders{start, end} => throughput_tracker::top_n(*start, *end), + BusRequest::GetWorstRtt{start, end} => throughput_tracker::worst_n(*start, *end), BusRequest::MapIpToFlow { ip_address, tc_handle, diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index c395f079..5a6026c9 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -75,7 +75,7 @@ fn retire_check(cycle: u64, recent_cycle: u64) -> bool { cycle < recent_cycle + RETIRE_AFTER_SECONDS } -pub fn top_n(n: u32) -> BusResponse { +pub fn top_n(start: u32, end: u32) -> BusResponse { let mut full_list: Vec<(XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle)> = { let tp = THROUGHPUT_TRACKER.read(); tp.raw_data @@ -96,7 +96,8 @@ pub fn top_n(n: u32) -> BusResponse { full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0)); let result = full_list .iter() - .take(n as usize) + .skip(start as usize) + .take((end as usize) - (start as usize)) .map( |(ip, (bytes_dn, bytes_up), (packets_dn, packets_up), median_rtt, tc_handle)| IpStats { ip_address: ip.as_ip().to_string(), @@ -110,7 +111,7 @@ pub fn top_n(n: u32) -> BusResponse { BusResponse::TopDownloaders(result) } -pub fn worst_n(n: u32) -> BusResponse { +pub fn worst_n(start: u32, end: u32) -> BusResponse { let mut full_list: Vec<(XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle)> = { let tp = THROUGHPUT_TRACKER.read(); tp.raw_data @@ -131,7 +132,8 @@ pub fn worst_n(n: u32) -> BusResponse { full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap()); let result = full_list .iter() - .take(n as usize) + .skip(start as usize) + .take((end as usize) - (start as usize)) .map( |(ip, (bytes_dn, bytes_up), (packets_dn, packets_up), median_rtt, tc_handle)| IpStats { ip_address: ip.as_ip().to_string(), diff --git a/src/rust/lqtop/src/main.rs b/src/rust/lqtop/src/main.rs index d5c6a951..4f503523 100644 --- a/src/rust/lqtop/src/main.rs +++ b/src/rust/lqtop/src/main.rs @@ -23,7 +23,7 @@ async fn get_data(client: &mut BusClient, n_rows: u16) -> Result { let mut result = DataResult { totals: (0, 0, 0, 0), top: Vec::new() }; let requests = vec![ BusRequest::GetCurrentThroughput, - BusRequest::GetTopNDownloaders(n_rows as u32), + BusRequest::GetTopNDownloaders{start: 0, end: n_rows as u32}, ]; for r in client.request(requests).await? { match r {