Allow paging on TOP-N queries

Change BusRequest::GetTopNDownloaders and GetWorstRtt to accept
a start and end field, rather than just "n_rows".
This commit is contained in:
Herbert Wolverson
2023-01-20 21:18:19 +00:00
parent f742acafda
commit fc8b3ca9e4
5 changed files with 23 additions and 11 deletions

View File

@@ -15,10 +15,20 @@ pub enum BusRequest {
GetCurrentThroughput,
/// Retrieve the top N downloads by bandwidth use.
GetTopNDownloaders(u32),
GetTopNDownloaders{
/// First row to retrieve (usually 0 unless you are paging)
start: u32,
/// Last row to retrieve (10 for top-10 starting at 0)
end: u32
},
/// Retrieves the TopN hosts with the worst RTT, sorted by RTT descending.
GetWorstRtt(u32),
GetWorstRtt{
/// First row to retrieve (usually 0 unless you are paging)
start: u32,
/// Last row to retrieve (10 for top-10 starting at 0)
end: u32
},
/// Retrieves current byte counters for all hosts.
GetHostCounter,

View File

@@ -71,8 +71,8 @@ async fn get_data_from_server(bus_client: &mut BusClient) -> Result<()> {
// Send request to lqosd
let requests = vec![
BusRequest::GetCurrentThroughput,
BusRequest::GetTopNDownloaders(10),
BusRequest::GetWorstRtt(10),
BusRequest::GetTopNDownloaders{start: 0, end: 10},
BusRequest::GetWorstRtt{start: 0, end: 10},
BusRequest::RttHistogram,
BusRequest::AllUnknownIps,
];

View File

@@ -118,8 +118,8 @@ fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec<BusResponse>
throughput_tracker::current_throughput()
}
BusRequest::GetHostCounter => throughput_tracker::host_counters(),
BusRequest::GetTopNDownloaders(n) => throughput_tracker::top_n(*n),
BusRequest::GetWorstRtt(n) => throughput_tracker::worst_n(*n),
BusRequest::GetTopNDownloaders{start, end} => throughput_tracker::top_n(*start, *end),
BusRequest::GetWorstRtt{start, end} => throughput_tracker::worst_n(*start, *end),
BusRequest::MapIpToFlow {
ip_address,
tc_handle,

View File

@@ -75,7 +75,7 @@ fn retire_check(cycle: u64, recent_cycle: u64) -> bool {
cycle < recent_cycle + RETIRE_AFTER_SECONDS
}
pub fn top_n(n: u32) -> BusResponse {
pub fn top_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<(XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
@@ -96,7 +96,8 @@ pub fn top_n(n: u32) -> BusResponse {
full_list.sort_by(|a, b| b.1 .0.cmp(&a.1 .0));
let result = full_list
.iter()
.take(n as usize)
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(ip, (bytes_dn, bytes_up), (packets_dn, packets_up), median_rtt, tc_handle)| IpStats {
ip_address: ip.as_ip().to_string(),
@@ -110,7 +111,7 @@ pub fn top_n(n: u32) -> BusResponse {
BusResponse::TopDownloaders(result)
}
pub fn worst_n(n: u32) -> BusResponse {
pub fn worst_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<(XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle)> = {
let tp = THROUGHPUT_TRACKER.read();
tp.raw_data
@@ -131,7 +132,8 @@ pub fn worst_n(n: u32) -> BusResponse {
full_list.sort_by(|a, b| b.3.partial_cmp(&a.3).unwrap());
let result = full_list
.iter()
.take(n as usize)
.skip(start as usize)
.take((end as usize) - (start as usize))
.map(
|(ip, (bytes_dn, bytes_up), (packets_dn, packets_up), median_rtt, tc_handle)| IpStats {
ip_address: ip.as_ip().to_string(),

View File

@@ -23,7 +23,7 @@ async fn get_data(client: &mut BusClient, n_rows: u16) -> Result<DataResult> {
let mut result = DataResult { totals: (0, 0, 0, 0), top: Vec::new() };
let requests = vec![
BusRequest::GetCurrentThroughput,
BusRequest::GetTopNDownloaders(n_rows as u32),
BusRequest::GetTopNDownloaders{start: 0, end: n_rows as u32},
];
for r in client.request(requests).await? {
match r {