More flexible API for 'top N' calculations based on flow buffer.

This commit is contained in:
Herbert Wolverson 2024-02-29 09:10:31 -06:00
parent 0659cda225
commit 8f343b7c3b
6 changed files with 78 additions and 14 deletions

View File

@ -10,7 +10,7 @@ pub use client::bus_request;
use log::error;
pub use persistent_client::BusClient;
pub use reply::BusReply;
pub use request::{BusRequest, StatsRequest};
pub use request::{BusRequest, StatsRequest, TopFlowType};
pub use response::BusResponse;
pub use session::BusSession;
use thiserror::Error;

View File

@ -161,7 +161,27 @@ pub enum BusRequest {
CountActiveFlows,
/// Top Flows Reports
TopFlows{ n: u32 },
TopFlows{
/// The type of top report to request
flow_type: TopFlowType,
/// The number of flows to return
n: u32
},
}
/// Defines the type of "top" flow being requested
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Copy)]
pub enum TopFlowType {
/// Top flows by current estimated bandwidth use
RateEstimate,
/// Top flows by total bytes transferred
Bytes,
/// Top flows by total packets transferred
Packets,
/// Top flows by total drops
Drops,
/// Top flows by round-trip time estimate
RoundTripTime,
}
/// Specific requests from the long-term stats system

View File

@ -21,7 +21,7 @@ pub use bus::{
bus_request, decode_request, decode_response, encode_request,
encode_response, BusClient, BusReply, BusRequest, BusResponse, BusSession,
CakeDiffTinTransit, CakeDiffTransit, CakeTransit, QueueStoreTransit,
UnixSocketServer, BUS_SOCKET_PATH, StatsRequest
UnixSocketServer, BUS_SOCKET_PATH, StatsRequest, TopFlowType
};
pub use tc_handle::TcHandle;

View File

@ -26,10 +26,19 @@ pub async fn count_flows() -> NoCache<Json<u64>> {
NoCache::new(Json(result))
}
#[get("/api/flows/top5")]
pub async fn top_5_flows() -> NoCache<Json<Vec<FlowbeeData>>> {
#[get("/api/flows/top/<top_n>/<flow_type>")]
pub async fn top_5_flows(top_n: u32, flow_type: String) -> NoCache<Json<Vec<FlowbeeData>>> {
let flow_type = match flow_type.as_str() {
"rate" => lqos_bus::TopFlowType::RateEstimate,
"bytes" => lqos_bus::TopFlowType::Bytes,
"packets" => lqos_bus::TopFlowType::Packets,
"drops" => lqos_bus::TopFlowType::Drops,
"rtt" => lqos_bus::TopFlowType::RoundTripTime,
_ => lqos_bus::TopFlowType::RateEstimate,
};
let responses =
bus_request(vec![BusRequest::TopFlows { n: 5 }]).await.unwrap();
bus_request(vec![BusRequest::TopFlows { n: top_n, flow_type }]).await.unwrap();
let result = match &responses[0] {
BusResponse::TopFlows(flowbee) => flowbee.to_owned(),
_ => Vec::new(),

View File

@ -229,7 +229,7 @@ fn handle_bus_requests(
BusRequest::CountActiveFlows => {
throughput_tracker::count_active_flows()
}
BusRequest::TopFlows { n } => throughput_tracker::top_flows(*n),
BusRequest::TopFlows { n, flow_type } => throughput_tracker::top_flows(*n, *flow_type),
});
}
}

View File

@ -8,7 +8,7 @@ use crate::{
};
pub use heimdall_data::get_flow_stats;
use log::{info, warn};
use lqos_bus::{BusResponse, FlowbeeProtocol, IpStats, TcHandle, XdpPpingResult};
use lqos_bus::{BusResponse, FlowbeeProtocol, IpStats, TcHandle, TopFlowType, XdpPpingResult};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use lts_client::collector::{StatsUpdateMessage, ThroughputSummary, HostSummary};
use once_cell::sync::Lazy;
@ -478,14 +478,49 @@ pub fn all_unknown_ips() -> BusResponse {
}
/// Top Flows Report
pub fn top_flows(n: u32) -> BusResponse {
pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
let lock = ALL_FLOWS.lock().unwrap();
let mut table = lock.clone();
table.sort_by(|a, b| {
let a_total = a.1.rate_estimate_bps[0] + a.1.rate_estimate_bps[1];
let b_total = b.1.rate_estimate_bps[0] + b.1.rate_estimate_bps[1];
b_total.cmp(&a_total)
});
match flow_type {
TopFlowType::RateEstimate => {
table.sort_by(|a, b| {
let a_total = a.1.rate_estimate_bps[0] + a.1.rate_estimate_bps[1];
let b_total = b.1.rate_estimate_bps[0] + b.1.rate_estimate_bps[1];
b_total.cmp(&a_total)
});
}
TopFlowType::Bytes => {
table.sort_by(|a, b| {
let a_total = a.1.bytes_sent[0] + a.1.bytes_sent[1];
let b_total = b.1.bytes_sent[0] + b.1.bytes_sent[1];
b_total.cmp(&a_total)
});
}
TopFlowType::Packets => {
table.sort_by(|a, b| {
let a_total = a.1.packets_sent[0] + a.1.packets_sent[1];
let b_total = b.1.packets_sent[0] + b.1.packets_sent[1];
b_total.cmp(&a_total)
});
}
TopFlowType::Drops => {
table.sort_by(|a, b| {
let a_total = a.1.retries[0] + a.1.retries[1];
let b_total = b.1.retries[0] + b.1.retries[1];
b_total.cmp(&a_total)
});
}
TopFlowType::RoundTripTime => {
table.sort_by(|a, b| {
let a_total = a.1.last_rtt[0] + a.1.last_rtt[1];
let b_total = b.1.last_rtt[0] + b.1.last_rtt[1];
b_total.cmp(&a_total)
});
}
}
let result = table
.iter()
.take(n as usize)