From f44af376707e3db8767399906bee251ae75b2d20 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Fri, 15 Mar 2024 12:15:11 -0500 Subject: [PATCH] WIP - Compiles RTT data into the tracker, strong type RTTs to clarify the unit confusion. Web side is not done yet. --- src/rust/Cargo.lock | 10 + src/rust/lqos_bus/src/bus/response.rs | 8 +- src/rust/lqos_bus/src/ip_stats.rs | 225 +++++++++--------- src/rust/lqos_bus/src/lib.rs | 2 +- .../lqos_node_manager/src/flow_monitor.rs | 6 +- src/rust/lqos_node_manager/src/queue_info.rs | 4 +- src/rust/lqos_sys/src/bpf/common/flows.h | 24 +- src/rust/lqos_sys/src/flowbee_data.rs | 102 ++++---- src/rust/lqosd/Cargo.toml | 1 + .../flow_data/flow_analysis/finished_flows.rs | 8 +- .../flow_analysis/kernel_ringbuffer.rs | 129 ++++++++-- .../flow_data/flow_analysis/mod.rs | 2 + .../flow_data/flow_analysis/rtt_types.rs | 38 +++ .../flow_data/flow_tracker.rs | 50 +++- .../src/throughput_tracker/flow_data/mod.rs | 15 +- .../flow_data/netflow5/mod.rs | 8 +- .../flow_data/netflow5/protocol.rs | 6 +- .../flow_data/netflow9/mod.rs | 8 +- .../netflow9/protocol/field_encoder.rs | 6 +- .../flow_data/netflow9/protocol/mod.rs | 10 +- src/rust/lqosd/src/throughput_tracker/mod.rs | 44 ++-- .../src/throughput_tracker/tracking_data.rs | 36 +-- 22 files changed, 473 insertions(+), 269 deletions(-) create mode 100644 src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/rtt_types.rs diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index fc39abf8..066aac92 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1088,6 +1088,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generator" version = "0.7.5" @@ -1743,6 +1752,7 @@ dependencies = [ "dashmap", "env_logger", "flate2", + "fxhash", "ip_network", "ip_network_table", "itertools 0.12.1", diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index b600237e..c2b87983 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -1,6 +1,6 @@ use super::QueueStoreTransit; use crate::{ - ip_stats::{FlowbeeData, PacketHeader}, IpMapping, IpStats, XdpPpingResult, + ip_stats::{FlowbeeSummaryData, PacketHeader}, IpMapping, IpStats, XdpPpingResult, }; use lts_client::transport_data::{StatsTotals, StatsHost, StatsTreeNode}; use serde::{Deserialize, Serialize}; @@ -115,16 +115,16 @@ pub enum BusResponse { LongTermTree(Vec), /// All Active Flows (Not Recommended - Debug Use) - AllActiveFlows(Vec), + AllActiveFlows(Vec), /// Count active flows CountActiveFlows(u64), /// Top Flopws - TopFlows(Vec), + TopFlows(Vec), /// Flows by IP - FlowsByIp(Vec), + FlowsByIp(Vec), /// Current endpoints by country CurrentEndpointsByCountry(Vec<(String, [u64; 2], [f32; 2])>), diff --git a/src/rust/lqos_bus/src/ip_stats.rs b/src/rust/lqos_bus/src/ip_stats.rs index 3f8982a3..d357d6c7 100644 --- a/src/rust/lqos_bus/src/ip_stats.rs +++ b/src/rust/lqos_bus/src/ip_stats.rs @@ -5,120 +5,120 @@ use serde::{Deserialize, Serialize}; /// with a host. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct IpStats { - /// The host's IP address, as detected by the XDP program. - pub ip_address: String, + /// The host's IP address, as detected by the XDP program. + pub ip_address: String, - /// The host's mapped circuit ID - pub circuit_id: String, + /// The host's mapped circuit ID + pub circuit_id: String, - /// The current bits-per-second passing through this host. Tuple - /// 0 is download, tuple 1 is upload. - pub bits_per_second: (u64, u64), + /// The current bits-per-second passing through this host. Tuple + /// 0 is download, tuple 1 is upload. + pub bits_per_second: (u64, u64), - /// The current packets-per-second passing through this host. Tuple - /// 0 is download, tuple 1 is upload. - pub packets_per_second: (u64, u64), + /// The current packets-per-second passing through this host. Tuple + /// 0 is download, tuple 1 is upload. + pub packets_per_second: (u64, u64), - /// Median TCP round-trip-time for this host at the current time. - pub median_tcp_rtt: f32, + /// Median TCP round-trip-time for this host at the current time. + pub median_tcp_rtt: f32, - /// Associated TC traffic control handle. - pub tc_handle: TcHandle, + /// Associated TC traffic control handle. + pub tc_handle: TcHandle, } /// Represents an IP Mapping in the XDP IP to TC/CPU mapping system. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct IpMapping { - /// The mapped IP address. May be IPv4, or IPv6. - pub ip_address: String, + /// The mapped IP address. May be IPv4, or IPv6. + pub ip_address: String, - /// The CIDR prefix length of the host. Equivalent to the CIDR value - /// after the /. e.g. `/24`. - pub prefix_length: u32, + /// The CIDR prefix length of the host. Equivalent to the CIDR value + /// after the /. e.g. `/24`. + pub prefix_length: u32, - /// The current TC traffic control handle. - pub tc_handle: TcHandle, + /// The current TC traffic control handle. + pub tc_handle: TcHandle, - /// The CPU index associated with this IP mapping. - pub cpu: u32, + /// The CPU index associated with this IP mapping. + pub cpu: u32, } /// Provided for backwards compatibility with `xdp_pping`, with the intent /// to retire it eventually. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct XdpPpingResult { - /// The TC handle in text format. e.g. "1:12" - pub tc: String, + /// The TC handle in text format. e.g. "1:12" + pub tc: String, - /// The average (mean) RTT value for the current sample. - pub avg: f32, + /// The average (mean) RTT value for the current sample. + pub avg: f32, - /// The minimum RTT value for the current sample. - pub min: f32, + /// The minimum RTT value for the current sample. + pub min: f32, - /// The maximum RTT value for the current sample. - pub max: f32, + /// The maximum RTT value for the current sample. + pub max: f32, - /// The median RTT value for the current sample. - pub median: f32, + /// The median RTT value for the current sample. + pub median: f32, - /// The number of samples from which these values were - /// derived. If 0, the other values are invalid. - pub samples: u32, + /// The number of samples from which these values were + /// derived. If 0, the other values are invalid. + pub samples: u32, } /// Extract the 6-bit DSCP and 2-bit ECN code from a TOS field /// in an IP header. pub fn tos_parser(tos: u8) -> (u8, u8) { - // Format: 2 bits of ECN, 6 bits of DSCP - const ECN: u8 = 0b00000011; - const DSCP: u8 = 0b11111100; + // Format: 2 bits of ECN, 6 bits of DSCP + const ECN: u8 = 0b00000011; + const DSCP: u8 = 0b11111100; - let ecn = tos & ECN; - let dscp = (tos & DSCP) >> 2; - (dscp, ecn) + let ecn = tos & ECN; + let dscp = (tos & DSCP) >> 2; + (dscp, ecn) } /// Packet header dump #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] pub struct PacketHeader { - /// Timestamp (ns since boot) - pub timestamp: u64, - /// Source IP - pub src: String, - /// Destination IP - pub dst: String, - /// Source Port - pub src_port : u16, - /// Destination Port - pub dst_port: u16, - /// Ip Protocol (see Linux kernel docs) - pub ip_protocol: u8, - /// ECN Flag - pub ecn: u8, - /// DSCP code - pub dscp: u8, - /// Packet Size - pub size: u32, - /// TCP Flag Bitset - pub tcp_flags: u8, - /// TCP Window Size - pub tcp_window: u16, - /// TCP TSVal - pub tcp_tsval: u32, - /// TCP ECR val - pub tcp_tsecr: u32, + /// Timestamp (ns since boot) + pub timestamp: u64, + /// Source IP + pub src: String, + /// Destination IP + pub dst: String, + /// Source Port + pub src_port: u16, + /// Destination Port + pub dst_port: u16, + /// Ip Protocol (see Linux kernel docs) + pub ip_protocol: u8, + /// ECN Flag + pub ecn: u8, + /// DSCP code + pub dscp: u8, + /// Packet Size + pub size: u32, + /// TCP Flag Bitset + pub tcp_flags: u8, + /// TCP Window Size + pub tcp_window: u16, + /// TCP TSVal + pub tcp_tsval: u32, + /// TCP ECR val + pub tcp_tsecr: u32, } /// Flowbee protocol enumeration #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] pub enum FlowbeeProtocol { - /// TCP (type 6) - TCP, - /// UDP (type 17) - UDP, - /// ICMP (type 1) - ICMP + /// TCP (type 6) + TCP, + /// UDP (type 17) + UDP, + /// ICMP (type 1) + ICMP, } impl From for FlowbeeProtocol { @@ -133,38 +133,45 @@ impl From for FlowbeeProtocol { /// Flowbee: a complete flow data, combining key and data. #[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] -pub struct FlowbeeData { - /// Mapped `XdpIpAddress` source for the flow. - pub remote_ip: String, - /// Mapped `XdpIpAddress` destination for the flow - pub local_ip: String, - /// Source port number, or ICMP type. - pub src_port: u16, - /// Destination port number. - pub dst_port: u16, - /// IP protocol (see the Linux kernel!) - pub ip_protocol: FlowbeeProtocol, - /// Bytes transmitted - pub bytes_sent: [u64; 2], - /// Packets transmitted - pub packets_sent: [u64; 2], - /// Rate estimate - pub rate_estimate_bps: [u32; 2], - /// TCP Retransmission count (also counts duplicates) - pub tcp_retransmits: [u16; 2], - /// Has the connection ended? - /// 0 = Alive, 1 = FIN, 2 = RST - pub end_status: u8, - /// Raw IP TOS - pub tos: u8, - /// Raw TCP flags - pub flags: u8, - /// Remote ASN - pub remote_asn: u32, - /// Remote ASN Name - pub remote_asn_name: String, - /// Remote ASN Country - pub remote_asn_country: String, - /// Analysis - pub analysis: String, -} \ No newline at end of file +pub struct FlowbeeSummaryData { + /// Mapped `XdpIpAddress` source for the flow. + pub remote_ip: String, + /// Mapped `XdpIpAddress` destination for the flow + pub local_ip: String, + /// Source port number, or ICMP type. + pub src_port: u16, + /// Destination port number. + pub dst_port: u16, + /// IP protocol (see the Linux kernel!) + pub ip_protocol: FlowbeeProtocol, + /// Padding to align the structure to 16 bytes. + /// Time (nanos) when the connection was established + pub start_time: u64, + /// Time (nanos) when the connection was last seen + pub last_seen: u64, + /// Bytes transmitted + pub bytes_sent: [u64; 2], + /// Packets transmitted + pub packets_sent: [u64; 2], + /// Rate estimate + pub rate_estimate_bps: [u32; 2], + /// TCP Retransmission count (also counts duplicates) + pub tcp_retransmits: [u16; 2], + /// Has the connection ended? + /// 0 = Alive, 1 = FIN, 2 = RST + pub end_status: u8, + /// Raw IP TOS + pub tos: u8, + /// Raw TCP flags + pub flags: u8, + /// Recent RTT median + pub rtt_nanos: u64, + /// Remote ASN + pub remote_asn: u32, + /// Remote ASN Name + pub remote_asn_name: String, + /// Remote ASN Country + pub remote_asn_country: String, + /// Analysis + pub analysis: String, +} diff --git a/src/rust/lqos_bus/src/lib.rs b/src/rust/lqos_bus/src/lib.rs index 9a60fd75..da420dca 100644 --- a/src/rust/lqos_bus/src/lib.rs +++ b/src/rust/lqos_bus/src/lib.rs @@ -14,7 +14,7 @@ mod bus; mod ip_stats; pub use ip_stats::{ tos_parser, IpMapping, IpStats, PacketHeader, - XdpPpingResult, FlowbeeData, FlowbeeProtocol + XdpPpingResult, FlowbeeSummaryData, FlowbeeProtocol }; mod tc_handle; pub use bus::{ diff --git a/src/rust/lqos_node_manager/src/flow_monitor.rs b/src/rust/lqos_node_manager/src/flow_monitor.rs index b3c422bc..1ed1b900 100644 --- a/src/rust/lqos_node_manager/src/flow_monitor.rs +++ b/src/rust/lqos_node_manager/src/flow_monitor.rs @@ -1,9 +1,9 @@ -use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeData}; +use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeSummaryData}; use rocket::serde::json::Json; use crate::cache_control::NoCache; #[get("/api/flows/dump_all")] -pub async fn all_flows_debug_dump() -> NoCache>> { +pub async fn all_flows_debug_dump() -> NoCache>> { let responses = bus_request(vec![BusRequest::DumpActiveFlows]).await.unwrap(); let result = match &responses[0] { @@ -27,7 +27,7 @@ pub async fn count_flows() -> NoCache> { } #[get("/api/flows/top//")] -pub async fn top_5_flows(top_n: u32, flow_type: String) -> NoCache>> { +pub async fn top_5_flows(top_n: u32, flow_type: String) -> NoCache>> { let flow_type = match flow_type.as_str() { "rate" => lqos_bus::TopFlowType::RateEstimate, "bytes" => lqos_bus::TopFlowType::Bytes, diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index c48ed138..89a77bff 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -1,7 +1,7 @@ use crate::auth_guard::AuthGuard; use crate::cache_control::NoCache; use crate::tracker::{SHAPED_DEVICES, lookup_dns}; -use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeData, PacketHeader, QueueStoreTransit}; +use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeSummaryData, PacketHeader, QueueStoreTransit}; use rocket::fs::NamedFile; use rocket::http::Status; use rocket::response::content::RawJson; @@ -107,7 +107,7 @@ pub async fn raw_queue_by_circuit( } #[get("/api/flows/")] -pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache>> { +pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache>> { let mut result = Vec::new(); let request: Vec = ip_list.split(',').map(|ip| BusRequest::FlowsByIp(ip.to_string())).collect(); let responses = bus_request(request).await.unwrap(); diff --git a/src/rust/lqos_sys/src/bpf/common/flows.h b/src/rust/lqos_sys/src/bpf/common/flows.h index 8601417c..4c283863 100644 --- a/src/rust/lqos_sys/src/bpf/common/flows.h +++ b/src/rust/lqos_sys/src/bpf/common/flows.h @@ -304,25 +304,15 @@ static __always_inline void process_tcp( (data->rate_estimate_bps[rate_index] > 0 || data->rate_estimate_bps[other_rate_index] > 0 ) ) { - struct flowbee_event event = { - .key = key, - .round_trip_time = dissector->now - data->ts_change_time[other_rate_index], - .effective_direction = direction - }; - bpf_ringbuf_output(&flowbee_events, &event, sizeof(event), 0); - /* __u64 elapsed = dissector->now - data->ts_change_time[other_rate_index]; - __u16 rtt_in_ms10 = elapsed / MS_IN_NANOS_T10; - - if (elapsed < TWO_SECONDS_IN_NANOS && rtt_in_ms10 > 0 && rtt_in_ms10 < 2000) { - //bpf_debug("[FLOWS][%d] RTT: %u", direction, rtt_in_ms10); - __u8 entry = data->rtt_index[rate_index]; - if (entry < RTT_RING_SIZE) - data->rtt_ringbuffer[rate_index][entry] = rtt_in_ms10; - data->rtt_index[rate_index] = (entry + 1) % RTT_RING_SIZE; + if (elapsed < TWO_SECONDS_IN_NANOS) { + struct flowbee_event event = { + .key = key, + .round_trip_time = elapsed, + .effective_direction = rate_index + }; + bpf_ringbuf_output(&flowbee_events, &event, sizeof(event), 0); } - //bpf_debug("[FLOWS][%d] RTT: %llu", direction, elapsed); - */ } data->ts_change_time[rate_index] = dissector->now; diff --git a/src/rust/lqos_sys/src/flowbee_data.rs b/src/rust/lqos_sys/src/flowbee_data.rs index fe292a90..b04fae24 100644 --- a/src/rust/lqos_sys/src/flowbee_data.rs +++ b/src/rust/lqos_sys/src/flowbee_data.rs @@ -7,61 +7,61 @@ use zerocopy::FromBytes; #[derive(Debug, Clone, Default, PartialEq, Eq, Hash, FromBytes)] #[repr(C)] pub struct FlowbeeKey { - /// Mapped `XdpIpAddress` source for the flow. - pub remote_ip: XdpIpAddress, - /// Mapped `XdpIpAddress` destination for the flow - pub local_ip: XdpIpAddress, - /// Source port number, or ICMP type. - pub src_port: u16, - /// Destination port number. - pub dst_port: u16, - /// IP protocol (see the Linux kernel!) - pub ip_protocol: u8, - /// Padding to align the structure to 16 bytes. - padding: u8, - padding1: u8, - padding2: u8, + /// Mapped `XdpIpAddress` source for the flow. + pub remote_ip: XdpIpAddress, + /// Mapped `XdpIpAddress` destination for the flow + pub local_ip: XdpIpAddress, + /// Source port number, or ICMP type. + pub src_port: u16, + /// Destination port number. + pub dst_port: u16, + /// IP protocol (see the Linux kernel!) + pub ip_protocol: u8, + /// Padding to align the structure to 16 bytes. + padding: u8, + padding1: u8, + padding2: u8, } /// Mapped representation of the eBPF `flow_data_t` type. #[derive(Debug, Clone, Default, FromBytes)] #[repr(C)] pub struct FlowbeeData { - /// Time (nanos) when the connection was established - pub start_time: u64, - /// Time (nanos) when the connection was last seen - pub last_seen: u64, - /// Bytes transmitted - pub bytes_sent: [u64; 2], - /// Packets transmitted - pub packets_sent: [u64; 2], - /// Clock for the next rate estimate - pub next_count_time: [u64; 2], - /// Clock for the previous rate estimate - pub last_count_time: [u64; 2], - /// Bytes at the next rate estimate - pub next_count_bytes: [u64; 2], - /// Rate estimate - pub rate_estimate_bps: [u32; 2], - /// Sequence number of the last packet - pub last_sequence: [u32; 2], - /// Acknowledgement number of the last packet - pub last_ack: [u32; 2], - /// TCP Retransmission count (also counts duplicates) - pub tcp_retransmits: [u16; 2], - /// Timestamp values - pub tsval: [u32; 2], - /// Timestamp echo values - pub tsecr: [u32; 2], - /// When did the timestamp change? - pub ts_change_time: [u64; 2], - /// Has the connection ended? - /// 0 = Alive, 1 = FIN, 2 = RST - pub end_status: u8, - /// Raw IP TOS - pub tos: u8, - /// Raw TCP flags - pub flags: u8, - /// Padding. - pub padding: u8, + /// Time (nanos) when the connection was established + pub start_time: u64, + /// Time (nanos) when the connection was last seen + pub last_seen: u64, + /// Bytes transmitted + pub bytes_sent: [u64; 2], + /// Packets transmitted + pub packets_sent: [u64; 2], + /// Clock for the next rate estimate + pub next_count_time: [u64; 2], + /// Clock for the previous rate estimate + pub last_count_time: [u64; 2], + /// Bytes at the next rate estimate + pub next_count_bytes: [u64; 2], + /// Rate estimate + pub rate_estimate_bps: [u32; 2], + /// Sequence number of the last packet + pub last_sequence: [u32; 2], + /// Acknowledgement number of the last packet + pub last_ack: [u32; 2], + /// TCP Retransmission count (also counts duplicates) + pub tcp_retransmits: [u16; 2], + /// Timestamp values + pub tsval: [u32; 2], + /// Timestamp echo values + pub tsecr: [u32; 2], + /// When did the timestamp change? + pub ts_change_time: [u64; 2], + /// Has the connection ended? + /// 0 = Alive, 1 = FIN, 2 = RST + pub end_status: u8, + /// Raw IP TOS + pub tos: u8, + /// Raw TCP flags + pub flags: u8, + /// Padding. + pub padding: u8, } diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index 3048a87b..9e3654d2 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -37,6 +37,7 @@ bincode = "1" ip_network_table = "0" ip_network = "0" zerocopy = {version = "0.6.1", features = [ "simd" ] } +fxhash = "0.2.1" # Support JemAlloc on supported platforms [target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies] diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs index d10ce66a..2981b2de 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs @@ -1,6 +1,6 @@ use super::{get_asn_lat_lon, get_asn_name_and_country, FlowAnalysis}; -use crate::throughput_tracker::flow_data::FlowbeeRecipient; -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use crate::throughput_tracker::flow_data::{FlowbeeLocalData, FlowbeeRecipient}; +use lqos_sys::flowbee_data::FlowbeeKey; use once_cell::sync::Lazy; use std::sync::{Arc, Mutex}; @@ -10,7 +10,7 @@ pub struct TimeBuffer { struct TimeEntry { time: u64, - data: (FlowbeeKey, FlowbeeData, FlowAnalysis), + data: (FlowbeeKey, FlowbeeLocalData, FlowAnalysis), } impl TimeBuffer { @@ -150,7 +150,7 @@ impl FinishedFlowAnalysis { } impl FlowbeeRecipient for FinishedFlowAnalysis { - fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, analysis: FlowAnalysis) { + fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis) { log::info!("Finished flow analysis"); RECENT_FLOWS.push(TimeEntry { time: std::time::SystemTime::now() diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs index c824f7c9..c24a1af5 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs @@ -1,6 +1,68 @@ -use std::{ffi::c_void, slice}; -use zerocopy::FromBytes; +//! Connects to the flows.h "flowbee_events" ring buffer and processes the events. +use crate::throughput_tracker::flow_data::flow_analysis::rtt_types::RttData; +use fxhash::FxHashMap; use lqos_sys::flowbee_data::FlowbeeKey; +use lqos_utils::unix_time::time_since_boot; +use once_cell::sync::Lazy; +use std::{ + ffi::c_void, + slice, + sync::{atomic::AtomicU64, Mutex}, + time::Duration, +}; +use zerocopy::FromBytes; + +static EVENT_COUNT: AtomicU64 = AtomicU64::new(0); +static EVENTS_PER_SECOND: AtomicU64 = AtomicU64::new(0); + +const BUFFER_SIZE: usize = 1024; + +struct RttBuffer { + index: usize, + buffer: [[RttData; BUFFER_SIZE]; 2], + last_seen: u64, +} + +impl RttBuffer { + fn new(reading: u64, direction: u32, last_seen: u64) -> Self { + let empty = [RttData::from_nanos(0); BUFFER_SIZE]; + let mut filled = [RttData::from_nanos(0); BUFFER_SIZE]; + filled[0] = RttData::from_nanos(reading); + + if direction == 0 { + Self { + index: 1, + buffer: [empty, filled], + last_seen, + } + } else { + Self { + index: 0, + buffer: [filled, empty], + last_seen, + } + } + } + + fn push(&mut self, reading: u64, direction: u32, last_seen: u64) { + self.buffer[direction as usize][self.index] = RttData::from_nanos(reading); + self.index = (self.index + 1) % BUFFER_SIZE; + self.last_seen = last_seen; + } + + fn median(&self) -> RttData { + let mut sorted = self.buffer[0].iter().filter(|x| x.as_nanos() > 0).collect::>(); + if sorted.is_empty() { + return RttData::from_nanos(0); + } + sorted.sort_unstable(); + let mid = sorted.len() / 2; + *sorted[mid] + } +} + +static FLOW_RTT: Lazy>> = + Lazy::new(|| Mutex::new(FxHashMap::default())); #[repr(C)] #[derive(FromBytes, Debug, Clone, PartialEq, Eq, Hash)] @@ -16,8 +78,6 @@ pub unsafe extern "C" fn flowbee_handle_events( data: *mut c_void, data_size: usize, ) -> i32 { - println!("Event received"); - const EVENT_SIZE: usize = std::mem::size_of::(); if data_size < EVENT_SIZE { log::warn!("Warning: incoming data too small in Flowbee buffer"); @@ -27,26 +87,53 @@ pub unsafe extern "C" fn flowbee_handle_events( let data_u8 = data as *const u8; let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); if let Some(incoming) = FlowbeeEvent::read_from(data_slice) { - println!("RTT: {}, Direction: {}", incoming.rtt, incoming.effective_direction); + EVENT_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + + if let Ok(now) = time_since_boot() { + let since_boot = Duration::from(now); + let mut lock = FLOW_RTT.lock().unwrap(); + if let Some(entry) = lock.get_mut(&incoming.key) { + entry.push( + incoming.rtt, + incoming.effective_direction, + since_boot.as_nanos() as u64, + ); + } else { + lock.insert( + incoming.key, + RttBuffer::new( + incoming.rtt, + incoming.effective_direction, + since_boot.as_nanos() as u64, + ), + ); + } + } } else { log::error!("Failed to decode Flowbee Event"); } - /*const EVENT_SIZE: usize = std::mem::size_of::(); - if data_size < EVENT_SIZE { - log::warn!("Warning: incoming data too small in Heimdall buffer"); - return 0; - } - - //COLLECTED_EVENTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let data_u8 = data as *const u8; - let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); - - if let Some(incoming) = HeimdallEvent::read_from(data_slice) { - store_on_timeline(incoming); - } else { - println!("Failed to decode"); - }*/ - 0 } + +pub fn get_flowbee_event_count_and_reset() -> u64 { + let count = EVENT_COUNT.swap(0, std::sync::atomic::Ordering::Relaxed); + EVENTS_PER_SECOND.store(count, std::sync::atomic::Ordering::Relaxed); + count +} + +pub fn expire_rtt_flows() { + if let Ok(now) = time_since_boot() { + let since_boot = Duration::from(now); + let expire = (since_boot - Duration::from_secs(30)).as_nanos() as u64; + let mut lock = FLOW_RTT.lock().unwrap(); + lock.retain(|_, v| v.last_seen > expire); + } +} + +pub fn flowbee_rtt_map() -> FxHashMap { + let lock = FLOW_RTT.lock().unwrap(); + lock.iter() + .map(|(k, v)| (k.clone(), v.median())) + .collect() +} diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs index 5ed32d29..5926de55 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs @@ -10,6 +10,8 @@ pub use finished_flows::FinishedFlowAnalysis; pub use finished_flows::RECENT_FLOWS; mod kernel_ringbuffer; pub use kernel_ringbuffer::*; +mod rtt_types; +pub use rtt_types::RttData; static ANALYSIS: Lazy = Lazy::new(|| FlowAnalysisSystem::new()); diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/rtt_types.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/rtt_types.rs new file mode 100644 index 00000000..29eed212 --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/rtt_types.rs @@ -0,0 +1,38 @@ +//! Provides a set of types for representing round-trip time (RTT) data, +//! as produced by the eBPF system and consumed in different ways. +//! +//! Adopting strong-typing is an attempt to reduce confusion with +//! multipliers, divisors, etc. It is intended to become pervasive +//! throughout the system. + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct RttData { + nanoseconds: u64, +} + +#[allow(dead_code)] +impl RttData { + pub fn from_nanos(nanoseconds: u64) -> Self { + Self { nanoseconds } + } + + pub fn as_nanos(&self) -> u64 { + self.nanoseconds + } + + pub fn as_micros(&self) -> f64 { + self.nanoseconds as f64 / 1_000.0 + } + + pub fn as_millis(&self) -> f64 { + self.nanoseconds as f64 / 1_000_000.0 + } + + pub fn as_millis_times_100(&self) -> f64 { + self.nanoseconds as f64 / 10_000.0 + } + + pub fn as_seconds(&self) -> f64 { + self.nanoseconds as f64 / 1_000_000_000.0 + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs index ad057f36..e2ecdcee 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_tracker.rs @@ -1,7 +1,7 @@ //! Provides a globally accessible vector of all flows. This is used to store //! all flows for the purpose of tracking and data-services. -use super::flow_analysis::FlowAnalysis; +use super::{flow_analysis::FlowAnalysis, RttData}; use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; use once_cell::sync::Lazy; use std::{collections::HashMap, sync::Mutex}; @@ -9,5 +9,51 @@ use std::{collections::HashMap, sync::Mutex}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct AsnId(pub u32); -pub static ALL_FLOWS: Lazy>> = +pub static ALL_FLOWS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); + + +/// Condensed representation of the FlowbeeData type. This contains +/// only the information we want to keep locally for analysis purposes, +/// adds RTT data, and uses Rust-friendly typing. +#[derive(Debug, Clone)] +pub struct FlowbeeLocalData { + /// Time (nanos) when the connection was established + pub start_time: u64, + /// Time (nanos) when the connection was last seen + pub last_seen: u64, + /// Bytes transmitted + pub bytes_sent: [u64; 2], + /// Packets transmitted + pub packets_sent: [u64; 2], + /// Rate estimate + pub rate_estimate_bps: [u32; 2], + /// TCP Retransmission count (also counts duplicates) + pub tcp_retransmits: [u16; 2], + /// Has the connection ended? + /// 0 = Alive, 1 = FIN, 2 = RST + pub end_status: u8, + /// Raw IP TOS + pub tos: u8, + /// Raw TCP flags + pub flags: u8, + /// Recent RTT median + pub rtt: RttData, +} + +impl From<&FlowbeeData> for FlowbeeLocalData { + fn from(data: &FlowbeeData) -> Self { + Self { + start_time: data.start_time, + last_seen: data.last_seen, + bytes_sent: data.bytes_sent, + packets_sent: data.packets_sent, + rate_estimate_bps: data.rate_estimate_bps, + tcp_retransmits: data.tcp_retransmits, + end_status: data.end_status, + tos: data.tos, + flags: data.flags, + rtt: RttData::from_nanos(0), + } + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs index 0992e72e..3fd8a088 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs @@ -7,22 +7,25 @@ mod netflow9; mod flow_analysis; use crate::throughput_tracker::flow_data::{flow_analysis::FinishedFlowAnalysis, netflow5::Netflow5, netflow9::Netflow9}; -pub(crate) use flow_tracker::{ALL_FLOWS, AsnId}; -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +pub(crate) use flow_tracker::{ALL_FLOWS, AsnId, FlowbeeLocalData}; +use lqos_sys::flowbee_data::FlowbeeKey; use std::sync::{ mpsc::{channel, Sender}, Arc, }; -pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country, FlowAnalysis, RECENT_FLOWS, flowbee_handle_events}; +pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country, + FlowAnalysis, RECENT_FLOWS, flowbee_handle_events, get_flowbee_event_count_and_reset, + expire_rtt_flows, flowbee_rtt_map, RttData, +}; trait FlowbeeRecipient { - fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, analysis: FlowAnalysis); + fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis); } // Creates the netflow tracker and returns the sender -pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))> { - let (tx, rx) = channel::<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>(); +pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))> { + let (tx, rx) = channel::<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>(); let config = lqos_config::load_config().unwrap(); std::thread::spawn(move || { diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/mod.rs index ac8fc6f6..29a934a9 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/mod.rs @@ -1,8 +1,8 @@ //! Support for the Netflow 5 protocol //! Mostly taken from: https://netflow.caligare.com/netflow_v5.htm mod protocol; -use super::{FlowAnalysis, FlowbeeRecipient}; -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use super::{FlowAnalysis, FlowbeeLocalData, FlowbeeRecipient}; +use lqos_sys::flowbee_data::FlowbeeKey; pub(crate) use protocol::*; use std::{ net::UdpSocket, @@ -13,7 +13,7 @@ pub(crate) struct Netflow5 { socket: UdpSocket, sequence: AtomicU32, target: String, - send_queue: Mutex>, + send_queue: Mutex>, } impl Netflow5 { @@ -83,7 +83,7 @@ impl Netflow5 { } impl FlowbeeRecipient for Netflow5 { - fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, _analysis: FlowAnalysis) { + fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, _analysis: FlowAnalysis) { let mut lock = self.send_queue.lock().unwrap(); lock.push((key, data)); } diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs index 98d6a2d6..f2567c75 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow5/protocol.rs @@ -1,10 +1,12 @@ //! Definitions for the actual netflow 5 protocol use std::net::IpAddr; -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use lqos_sys::flowbee_data::FlowbeeKey; use lqos_utils::unix_time::time_since_boot; use nix::sys::time::TimeValLike; +use crate::throughput_tracker::flow_data::FlowbeeLocalData; + /// Standard Netflow 5 header #[repr(C)] pub(crate) struct Netflow5Header { @@ -64,7 +66,7 @@ pub(crate) struct Netflow5Record { } /// Convert a Flowbee key and data to a pair of Netflow 5 records -pub(crate) fn to_netflow_5(key: &FlowbeeKey, data: &FlowbeeData) -> anyhow::Result<(Netflow5Record, Netflow5Record)> { +pub(crate) fn to_netflow_5(key: &FlowbeeKey, data: &FlowbeeLocalData) -> anyhow::Result<(Netflow5Record, Netflow5Record)> { // TODO: Detect overflow let local = key.local_ip.as_ip(); let remote = key.remote_ip.as_ip(); diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs index 3366d213..2ac7703e 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/mod.rs @@ -1,18 +1,18 @@ use crate::throughput_tracker::flow_data::netflow9::protocol::{ header::Netflow9Header, template_ipv4::template_data_ipv4, template_ipv6::template_data_ipv6, }; -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use lqos_sys::flowbee_data::FlowbeeKey; use std::{net::UdpSocket, sync::{atomic::AtomicU32, Arc, Mutex}}; use self::protocol::to_netflow_9; -use super::{FlowAnalysis, FlowbeeRecipient}; +use super::{FlowAnalysis, FlowbeeLocalData, FlowbeeRecipient}; mod protocol; pub(crate) struct Netflow9 { socket: UdpSocket, sequence: AtomicU32, target: String, - send_queue: Mutex>, + send_queue: Mutex>, } impl Netflow9 { @@ -66,7 +66,7 @@ impl Netflow9 { } impl FlowbeeRecipient for Netflow9 { - fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, _analysis: FlowAnalysis) { + fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, _analysis: FlowAnalysis) { let mut lock = self.send_queue.lock().unwrap(); lock.push((key, data)); } diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs index 2d5054fa..c248a622 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/field_encoder.rs @@ -1,9 +1,9 @@ use std::net::IpAddr; - -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use lqos_sys::flowbee_data::FlowbeeKey; +use crate::throughput_tracker::flow_data::FlowbeeLocalData; use super::field_types::*; -pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: usize, key: &FlowbeeKey, data: &FlowbeeData) -> anyhow::Result> { +pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: usize, key: &FlowbeeKey, data: &FlowbeeLocalData) -> anyhow::Result> { let src_port = if direction == 0 { key.src_port } else { key.dst_port }; let dst_port = if direction == 0 { key.dst_port } else { key.src_port }; diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/mod.rs index 97772b94..a3781b67 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/netflow9/protocol/mod.rs @@ -1,9 +1,11 @@ //! Protocol definitions for Netflow v9 Data. //! Mostly derived from https://netflow.caligare.com/netflow_v9.htm -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use lqos_sys::flowbee_data::FlowbeeKey; mod field_types; use field_types::*; + +use crate::throughput_tracker::flow_data::FlowbeeLocalData; pub(crate) mod field_encoder; pub(crate) mod header; pub(crate) mod template_ipv4; @@ -16,7 +18,7 @@ fn add_field(bytes: &mut Vec, field_type: u16, field_length: u16) { pub(crate) fn to_netflow_9( key: &FlowbeeKey, - data: &FlowbeeData, + data: &FlowbeeLocalData, ) -> anyhow::Result<(Vec, Vec)> { if key.local_ip.is_v4() && key.remote_ip.is_v4() { // Return IPv4 records @@ -29,7 +31,7 @@ pub(crate) fn to_netflow_9( } } -fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result> { +fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeLocalData, direction: usize) -> anyhow::Result> { let field_bytes = field_encoder::encode_fields_from_template( &template_ipv4::FIELDS_IPV4, direction, @@ -63,7 +65,7 @@ fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow Ok(bytes) } -fn ipv6_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result> { +fn ipv6_record(key: &FlowbeeKey, data: &FlowbeeLocalData, direction: usize) -> anyhow::Result> { let field_bytes = field_encoder::encode_fields_from_template( &template_ipv6::FIELDS_IPV6, direction, diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index d426c5df..5e485fcd 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -3,6 +3,7 @@ mod throughput_entry; mod tracking_data; use std::net::IpAddr; +use self::flow_data::{get_asn_name_and_country, FlowAnalysis, FlowbeeLocalData, ALL_FLOWS}; use crate::{ long_term_stats::get_network_tree, shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES, STATS_NEEDS_NEW_SHAPED_DEVICES}, @@ -11,7 +12,7 @@ use crate::{ }; use log::{info, warn}; use lqos_bus::{BusResponse, FlowbeeProtocol, IpStats, TcHandle, TopFlowType, XdpPpingResult}; -use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey}; +use lqos_sys::flowbee_data::FlowbeeKey; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; use lts_client::collector::{HostSummary, StatsUpdateMessage, ThroughputSummary}; use once_cell::sync::Lazy; @@ -19,7 +20,6 @@ use tokio::{ sync::mpsc::Sender, time::{Duration, Instant}, }; -use self::flow_data::{get_asn_name_and_country, FlowAnalysis, ALL_FLOWS}; const RETIRE_AFTER_SECONDS: u64 = 30; @@ -34,7 +34,7 @@ pub static THROUGHPUT_TRACKER: Lazy = Lazy::new(ThroughputTra /// collection thread that there is fresh data. pub async fn spawn_throughput_monitor( long_term_stats_tx: Sender, - netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>, + netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, ) { info!("Starting the bandwidth monitor thread."); let interval_ms = 1000; // 1 second @@ -49,7 +49,7 @@ pub async fn spawn_throughput_monitor( async fn throughput_task( interval_ms: u64, long_term_stats_tx: Sender, - netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>, + netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, ) { // Obtain the flow timeout from the config, default to 30 seconds let timeout_seconds = if let Ok(config) = lqos_config::load_config() { @@ -502,12 +502,13 @@ pub fn all_unknown_ips() -> BusResponse { /// For debugging: dump all active flows! pub fn dump_active_flows() -> BusResponse { let lock = ALL_FLOWS.lock().unwrap(); - let result: Vec = lock + let result: Vec = lock .iter() .map(|(key, row)| { - let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); + let (remote_asn_name, remote_asn_country) = + get_asn_name_and_country(key.remote_ip.as_ip()); - lqos_bus::FlowbeeData { + lqos_bus::FlowbeeSummaryData { remote_ip: key.remote_ip.as_ip().to_string(), local_ip: key.local_ip.as_ip().to_string(), src_port: key.src_port, @@ -524,6 +525,9 @@ pub fn dump_active_flows() -> BusResponse { remote_asn_name, remote_asn_country, analysis: row.1.protocol_analysis.to_string(), + last_seen: row.0.last_seen, + start_time: row.0.start_time, + rtt_nanos: row.0.rtt.as_nanos(), } }) .collect(); @@ -540,7 +544,7 @@ pub fn count_active_flows() -> BusResponse { /// Top Flows Report pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { let lock = ALL_FLOWS.lock().unwrap(); - let mut table: Vec<(FlowbeeKey, (FlowbeeData, FlowAnalysis))> = lock + let mut table: Vec<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))> = lock .iter() .map(|(key, value)| (key.clone(), value.clone())) .collect(); @@ -577,9 +581,9 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { } TopFlowType::RoundTripTime => { table.sort_by(|a, b| { - let a_total = 0.0; - let b_total = 0.0; - b_total.partial_cmp(&a_total).unwrap() + let a_total = a.1 .0.rtt; + let b_total = b.1 .0.rtt; + a_total.cmp(&b_total) }); } } @@ -588,8 +592,9 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { .iter() .take(n as usize) .map(|(ip, flow)| { - let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(ip.remote_ip.as_ip()); - lqos_bus::FlowbeeData { + let (remote_asn_name, remote_asn_country) = + get_asn_name_and_country(ip.remote_ip.as_ip()); + lqos_bus::FlowbeeSummaryData { remote_ip: ip.remote_ip.as_ip().to_string(), local_ip: ip.local_ip.as_ip().to_string(), src_port: ip.src_port, @@ -606,6 +611,9 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { remote_asn_name, remote_asn_country, analysis: flow.1.protocol_analysis.to_string(), + last_seen: flow.0.last_seen, + start_time: flow.0.start_time, + rtt_nanos: flow.0.rtt.as_nanos(), } }) .collect(); @@ -622,9 +630,10 @@ pub fn flows_by_ip(ip: &str) -> BusResponse { .iter() .filter(|(key, _)| key.local_ip == ip) .map(|(key, row)| { - let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); - - lqos_bus::FlowbeeData { + let (remote_asn_name, remote_asn_country) = + get_asn_name_and_country(key.remote_ip.as_ip()); + + lqos_bus::FlowbeeSummaryData { remote_ip: key.remote_ip.as_ip().to_string(), local_ip: key.local_ip.as_ip().to_string(), src_port: key.src_port, @@ -641,6 +650,9 @@ pub fn flows_by_ip(ip: &str) -> BusResponse { remote_asn_name, remote_asn_country, analysis: row.1.protocol_analysis.to_string(), + last_seen: row.0.last_seen, + start_time: row.0.start_time, + rtt_nanos: row.0.rtt.as_nanos(), } }) .collect(); diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index a5b79805..841c852f 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -1,9 +1,9 @@ use std::{sync::atomic::AtomicU64, time::Duration}; -use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}}; -use super::{flow_data::{FlowAnalysis, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; +use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}}; +use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use dashmap::DashMap; use lqos_bus::TcHandle; -use lqos_sys::{flowbee_data::{FlowbeeData, FlowbeeKey}, iterate_flows, throughput_for_each}; +use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each}; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; pub struct ThroughputTracker { @@ -172,18 +172,21 @@ impl ThroughputTracker { &self, timeout_seconds: u64, _netflow_enabled: bool, - sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>, + sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>, ) { + //log::debug!("Flowbee events this second: {}", get_flowbee_event_count_and_reset()); let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed); if let Ok(now) = time_since_boot() { + let rtt_samples = flowbee_rtt_map(); + get_flowbee_event_count_and_reset(); let since_boot = Duration::from(now); let expire = (since_boot - Duration::from_secs(timeout_seconds)).as_nanos() as u64; // Track the expired keys let mut expired_keys = Vec::new(); - let mut lock = ALL_FLOWS.lock().unwrap(); + let mut all_flows_lock = ALL_FLOWS.lock().unwrap(); // Track through all the flows iterate_flows(&mut |key, data| { @@ -196,7 +199,7 @@ impl ThroughputTracker { expired_keys.push(key.clone()); } else { // We have a valid flow, so it needs to be tracked - if let Some(this_flow) = lock.get_mut(&key) { + if let Some(this_flow) = all_flows_lock.get_mut(&key) { this_flow.0.last_seen = data.last_seen; this_flow.0.bytes_sent = data.bytes_sent; this_flow.0.packets_sent = data.packets_sent; @@ -204,24 +207,24 @@ impl ThroughputTracker { this_flow.0.tcp_retransmits = data.tcp_retransmits; this_flow.0.end_status = data.end_status; this_flow.0.tos = data.tos; - this_flow.0.flags = data.flags; + this_flow.0.flags = data.flags; + this_flow.0.rtt = rtt_samples.get(&key).copied().unwrap_or(RttData::from_nanos(0)).clone(); } else { // Insert it into the map let flow_analysis = FlowAnalysis::new(&key); - lock.insert(key.clone(), (data.clone(), flow_analysis)); + all_flows_lock.insert(key.clone(), (data.into(), flow_analysis)); } // TCP - we have RTT data? 6 is TCP if key.ip_protocol == 6 && data.end_status == 0 { if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) { - /*for rtt in data.median_pair().iter() { - if *rtt > 0.0 { - println!("RTT: {rtt:?}"); + if let Some(rtt) = rtt_samples.get(&key) { + if rtt.as_nanos() > 0 { // Shift left for i in 1..60 { tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1]; } - tracker.recent_rtt_data[0] = *rtt as u32; + tracker.recent_rtt_data[0] = rtt.as_millis_times_100() as u32; tracker.last_fresh_rtt_data_cycle = self_cycle; if let Some(parents) = &tracker.network_json_parents { let net_json = NETWORK_JSON.write().unwrap(); @@ -230,7 +233,7 @@ impl ThroughputTracker { } } } - }*/ + } if data.end_status != 0 { // The flow has ended. We need to remove it from the map. @@ -244,11 +247,11 @@ impl ThroughputTracker { if !expired_keys.is_empty() { for key in expired_keys.iter() { // Send it off to netperf for analysis if we are supporting doing so. - if let Some(d) = lock.get(&key) { + if let Some(d) = all_flows_lock.get(&key) { let _ = sender.send((key.clone(), (d.0.clone(), d.1.clone()))); } // Remove the flow from circulation - lock.remove(&key); + all_flows_lock.remove(&key); } let ret = lqos_sys::end_flows(&mut expired_keys); @@ -258,7 +261,8 @@ impl ThroughputTracker { } // Cleaning run - lock.retain(|_k,v| v.0.last_seen >= expire); + all_flows_lock.retain(|_k,v| v.0.last_seen >= expire); + expire_rtt_flows(); } }