Add a new api call - api/flows/dump_all - that lists all recent flows that have been collected. Intended for debugging.

This commit is contained in:
Herbert Wolverson 2024-02-27 16:22:22 -06:00
parent df2b9dfe32
commit e98a1864ad
11 changed files with 129 additions and 42 deletions

View File

@ -152,6 +152,10 @@ pub enum BusRequest {
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]
RequestLqosEquinixTest,
/// Request a dump of all active flows. This can be a lot of data.
/// so this is intended for debugging
DumpActiveFlows,
}
/// Specific requests from the long-term stats system

View File

@ -1,6 +1,6 @@
use super::QueueStoreTransit;
use crate::{
ip_stats::PacketHeader, FlowTransport, IpMapping, IpStats, XdpPpingResult,
ip_stats::{FlowbeeData, PacketHeader}, FlowTransport, IpMapping, IpStats, XdpPpingResult,
};
use lts_client::transport_data::{StatsTotals, StatsHost, StatsTreeNode};
use serde::{Deserialize, Serialize};
@ -116,4 +116,7 @@ pub enum BusResponse {
/// Long-term stats tree
LongTermTree(Vec<StatsTreeNode>),
/// All Active Flows (Not Recommended - Debug Use)
AllActiveFlows(Vec<FlowbeeData>),
}

View File

@ -143,4 +143,32 @@ pub struct PacketHeader {
pub tcp_tsval: u32,
/// TCP ECR val
pub tcp_tsecr: u32,
}
/// Flowbee: a complete flow data, combining key and data.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
pub struct FlowbeeData {
/// Mapped `XdpIpAddress` source for the flow.
pub remote_ip: String,
/// Mapped `XdpIpAddress` destination for the flow
pub local_ip: String,
/// Source port number, or ICMP type.
pub src_port: u16,
/// Destination port number.
pub dst_port: u16,
/// IP protocol (see the Linux kernel!)
pub ip_protocol: u8,
/// Bytes transmitted
pub bytes_sent: [u64; 2],
/// Packets transmitted
pub packets_sent: [u64; 2],
/// Rate estimate
pub rate_estimate_bps: [u64; 2],
/// Retry Counters
pub retries: [u32; 2],
/// Most recent RTT
pub last_rtt: [u64; 2],
/// Has the connection ended?
/// 0 = Alive, 1 = FIN, 2 = RST
pub end_status: u32,
}

View File

@ -14,7 +14,7 @@ mod bus;
mod ip_stats;
pub use ip_stats::{
tos_parser, FlowProto, FlowTransport, IpMapping, IpStats, PacketHeader,
XdpPpingResult,
XdpPpingResult, FlowbeeData
};
mod tc_handle;
pub use bus::{

View File

@ -0,0 +1,15 @@
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeData};
use rocket::serde::json::Json;
use crate::cache_control::NoCache;
#[get("/api/flows/dump_all")]
pub async fn all_flows_debug_dump() -> NoCache<Json<Vec<FlowbeeData>>> {
let responses =
bus_request(vec![BusRequest::DumpActiveFlows]).await.unwrap();
let result = match &responses[0] {
BusResponse::AllActiveFlows(flowbee) => flowbee.to_owned(),
_ => Vec::new(),
};
NoCache::new(Json(result))
}

View File

@ -12,6 +12,7 @@ mod config_control;
mod network_tree;
mod queue_info;
mod toasts;
mod flow_monitor;
// Use JemAllocator only on supported platforms
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
@ -109,6 +110,8 @@ fn rocket() -> _ {
// Front page toast checks
toasts::version_check,
toasts::stats_check,
// Flowbee System
flow_monitor::all_flows_debug_dump,
],
);

View File

@ -266,10 +266,11 @@ static __always_inline void process_tcp(
u_int32_t tsval = dissector->tsval;
u_int32_t tsecr = dissector->tsecr;
if (BITCHECK(DIS_TCP_ACK) && tsval != 0) {
//bpf_debug("[FLOWS][%d] TSVAL: %u, TSECR: %u", direction, tsval, tsecr);
if (direction == TO_INTERNET) {
if (tsval != data->tsval[0] || tsecr != data->tsecr[0]) {
if (tsval != data->tsval[0] && tsecr != data->tsecr[0]) {
if (tsval == data->tsecr[1]) {
if (tsval > data->tsecr[1]) {
__u64 elapsed = now - data->ts_change_time[1];
data->last_rtt[0] = elapsed;
//bpf_debug("[FLOWS][0] RTT: %llu", elapsed);
@ -280,9 +281,9 @@ static __always_inline void process_tcp(
data->tsecr[0] = tsecr;
}
} else {
if (tsval != data->tsval[1] || tsecr != data->tsecr[1]) {
if (tsval != data->tsval[1] && tsecr != data->tsecr[1]) {
if (tsval == data->tsecr[0]) {
if (tsval > data->tsecr[0]) {
__u64 elapsed = now - data->ts_change_time[0];
data->last_rtt[1] = elapsed;
//bpf_debug("[FLOWS][1] RTT: %llu", elapsed);

View File

@ -223,6 +223,9 @@ fn handle_bus_requests(
BusRequest::GetLongTermStats(StatsRequest::Tree) => {
long_term_stats::get_stats_tree()
}
BusRequest::DumpActiveFlows => {
throughput_tracker::dump_active_flows()
}
});
}
}

View File

@ -0,0 +1,7 @@
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use once_cell::sync::Lazy;
use std::sync::Mutex;
pub static ALL_FLOWS: Lazy<Mutex<Vec<(FlowbeeKey, FlowbeeData)>>> =
Lazy::new(|| Mutex::new(Vec::with_capacity(128_000)));

View File

@ -1,6 +1,7 @@
mod heimdall_data;
mod throughput_entry;
mod tracking_data;
pub mod flow_data;
use crate::{
shaped_devices_tracker::{NETWORK_JSON, STATS_NEEDS_NEW_SHAPED_DEVICES, SHAPED_DEVICES}, stats::TIME_TO_POLL_HOSTS,
throughput_tracker::tracking_data::ThroughputTracker, long_term_stats::get_network_tree,
@ -16,6 +17,8 @@ use tokio::{
time::{Duration, Instant},
};
use self::flow_data::ALL_FLOWS;
const RETIRE_AFTER_SECONDS: u64 = 30;
pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> = Lazy::new(ThroughputTracker::new);
@ -48,7 +51,7 @@ async fn throughput_task(interval_ms: u64, long_term_stats_tx: Sender<StatsUpdat
} // Scope to end the lock
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
THROUGHPUT_TRACKER.apply_new_throughput_counters();
THROUGHPUT_TRACKER.apply_rtt_data();
THROUGHPUT_TRACKER.apply_flow_data();
THROUGHPUT_TRACKER.update_totals();
THROUGHPUT_TRACKER.next_cycle();
let duration_ms = start.elapsed().as_micros();
@ -442,4 +445,28 @@ pub fn all_unknown_ips() -> BusResponse {
)
.collect();
BusResponse::AllUnknownIps(result)
}
/// For debugging: dump all active flows!
pub fn dump_active_flows() -> BusResponse {
let lock = ALL_FLOWS.lock().unwrap();
let mut result = Vec::with_capacity(lock.len());
for (ip, flow) in lock.iter() {
result.push(lqos_bus::FlowbeeData {
remote_ip: ip.remote_ip.as_ip().to_string(),
local_ip: ip.local_ip.as_ip().to_string(),
src_port: ip.src_port,
dst_port: ip.dst_port,
ip_protocol: ip.ip_protocol,
bytes_sent: flow.bytes_sent,
packets_sent: flow.packets_sent,
rate_estimate_bps: flow.rate_estimate_bps,
retries: flow.retries,
last_rtt: flow.last_rtt,
end_status: flow.end_status,
});
}
BusResponse::AllActiveFlows(result)
}

View File

@ -1,6 +1,6 @@
use std::{sync::atomic::AtomicU64, time::Duration};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use super::{flow_data::ALL_FLOWS, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
use lqos_sys::{iterate_flows, throughput_for_each};
@ -168,48 +168,44 @@ impl ThroughputTracker {
});
}
pub(crate) fn apply_rtt_data(&self) {
pub(crate) fn apply_flow_data(&self) {
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
/*rtt_for_each(&mut |ip, rtt| {
if rtt.has_fresh_data != 0 {
if let Some(mut tracker) = self.raw_data.get_mut(ip) {
tracker.recent_rtt_data = rtt.rtt;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
}
}
}
}
});*/
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(60)).as_nanos() as u64;
iterate_flows(&mut |key, data| {
// 6 is TCP, not expired
if key.ip_protocol == 6 && data.last_seen > expire && (data.last_rtt[0] != 0 || data.last_rtt[1] != 0) {
if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) {
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
}
tracker.recent_rtt_data[0] = u32::max(
(data.last_rtt[0] / 10000) as u32,
(data.last_rtt[1] / 10000) as u32,
);
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
if let Ok(mut flow_lock) = ALL_FLOWS.try_lock() {
flow_lock.clear(); // Remove all previous values
iterate_flows(&mut |key, data| {
if data.last_seen > expire {
// We have a valid flow, so it needs to be tracked
flow_lock.push((key.clone(), data.clone()));
// TCP - we have RTT data? 6 is TCP
if key.ip_protocol == 6 && (data.last_rtt[0] != 0 || data.last_rtt[1] != 0) {
if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) {
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
}
tracker.recent_rtt_data[0] = u32::max(
(data.last_rtt[0] / 10000) as u32,
(data.last_rtt[1] / 10000) as u32,
);
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
}
}
}
}
}
}
});
});
} else {
log::warn!("Failed to lock ALL_FLOWS");
}
}
}