WIP - Compiles RTT data into the tracker, strong type RTTs to clarify the unit confusion. Web side is not done yet.

This commit is contained in:
Herbert Wolverson
2024-03-15 12:15:11 -05:00
parent 56b170f7e4
commit f44af37670
22 changed files with 473 additions and 269 deletions

10
src/rust/Cargo.lock generated
View File

@@ -1088,6 +1088,15 @@ dependencies = [
"slab",
]
[[package]]
name = "fxhash"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
dependencies = [
"byteorder",
]
[[package]]
name = "generator"
version = "0.7.5"
@@ -1743,6 +1752,7 @@ dependencies = [
"dashmap",
"env_logger",
"flate2",
"fxhash",
"ip_network",
"ip_network_table",
"itertools 0.12.1",

View File

@@ -1,6 +1,6 @@
use super::QueueStoreTransit;
use crate::{
ip_stats::{FlowbeeData, PacketHeader}, IpMapping, IpStats, XdpPpingResult,
ip_stats::{FlowbeeSummaryData, PacketHeader}, IpMapping, IpStats, XdpPpingResult,
};
use lts_client::transport_data::{StatsTotals, StatsHost, StatsTreeNode};
use serde::{Deserialize, Serialize};
@@ -115,16 +115,16 @@ pub enum BusResponse {
LongTermTree(Vec<StatsTreeNode>),
/// All Active Flows (Not Recommended - Debug Use)
AllActiveFlows(Vec<FlowbeeData>),
AllActiveFlows(Vec<FlowbeeSummaryData>),
/// Count active flows
CountActiveFlows(u64),
/// Top Flopws
TopFlows(Vec<FlowbeeData>),
TopFlows(Vec<FlowbeeSummaryData>),
/// Flows by IP
FlowsByIp(Vec<FlowbeeData>),
FlowsByIp(Vec<FlowbeeSummaryData>),
/// Current endpoints by country
CurrentEndpointsByCountry(Vec<(String, [u64; 2], [f32; 2])>),

View File

@@ -89,7 +89,7 @@ pub struct PacketHeader {
/// Destination IP
pub dst: String,
/// Source Port
pub src_port : u16,
pub src_port: u16,
/// Destination Port
pub dst_port: u16,
/// Ip Protocol (see Linux kernel docs)
@@ -118,7 +118,7 @@ pub enum FlowbeeProtocol {
/// UDP (type 17)
UDP,
/// ICMP (type 1)
ICMP
ICMP,
}
impl From<u8> for FlowbeeProtocol {
@@ -133,7 +133,7 @@ impl From<u8> for FlowbeeProtocol {
/// Flowbee: a complete flow data, combining key and data.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
pub struct FlowbeeData {
pub struct FlowbeeSummaryData {
/// Mapped `XdpIpAddress` source for the flow.
pub remote_ip: String,
/// Mapped `XdpIpAddress` destination for the flow
@@ -144,6 +144,11 @@ pub struct FlowbeeData {
pub dst_port: u16,
/// IP protocol (see the Linux kernel!)
pub ip_protocol: FlowbeeProtocol,
/// Padding to align the structure to 16 bytes.
/// Time (nanos) when the connection was established
pub start_time: u64,
/// Time (nanos) when the connection was last seen
pub last_seen: u64,
/// Bytes transmitted
pub bytes_sent: [u64; 2],
/// Packets transmitted
@@ -159,6 +164,8 @@ pub struct FlowbeeData {
pub tos: u8,
/// Raw TCP flags
pub flags: u8,
/// Recent RTT median
pub rtt_nanos: u64,
/// Remote ASN
pub remote_asn: u32,
/// Remote ASN Name

View File

@@ -14,7 +14,7 @@ mod bus;
mod ip_stats;
pub use ip_stats::{
tos_parser, IpMapping, IpStats, PacketHeader,
XdpPpingResult, FlowbeeData, FlowbeeProtocol
XdpPpingResult, FlowbeeSummaryData, FlowbeeProtocol
};
mod tc_handle;
pub use bus::{

View File

@@ -1,9 +1,9 @@
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeData};
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeSummaryData};
use rocket::serde::json::Json;
use crate::cache_control::NoCache;
#[get("/api/flows/dump_all")]
pub async fn all_flows_debug_dump() -> NoCache<Json<Vec<FlowbeeData>>> {
pub async fn all_flows_debug_dump() -> NoCache<Json<Vec<FlowbeeSummaryData>>> {
let responses =
bus_request(vec![BusRequest::DumpActiveFlows]).await.unwrap();
let result = match &responses[0] {
@@ -27,7 +27,7 @@ pub async fn count_flows() -> NoCache<Json<u64>> {
}
#[get("/api/flows/top/<top_n>/<flow_type>")]
pub async fn top_5_flows(top_n: u32, flow_type: String) -> NoCache<Json<Vec<FlowbeeData>>> {
pub async fn top_5_flows(top_n: u32, flow_type: String) -> NoCache<Json<Vec<FlowbeeSummaryData>>> {
let flow_type = match flow_type.as_str() {
"rate" => lqos_bus::TopFlowType::RateEstimate,
"bytes" => lqos_bus::TopFlowType::Bytes,

View File

@@ -1,7 +1,7 @@
use crate::auth_guard::AuthGuard;
use crate::cache_control::NoCache;
use crate::tracker::{SHAPED_DEVICES, lookup_dns};
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeData, PacketHeader, QueueStoreTransit};
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeSummaryData, PacketHeader, QueueStoreTransit};
use rocket::fs::NamedFile;
use rocket::http::Status;
use rocket::response::content::RawJson;
@@ -107,7 +107,7 @@ pub async fn raw_queue_by_circuit(
}
#[get("/api/flows/<ip_list>")]
pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<Json<Vec<FlowbeeData>>> {
pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<Json<Vec<FlowbeeSummaryData>>> {
let mut result = Vec::new();
let request: Vec<BusRequest> = ip_list.split(',').map(|ip| BusRequest::FlowsByIp(ip.to_string())).collect();
let responses = bus_request(request).await.unwrap();

View File

@@ -304,25 +304,15 @@ static __always_inline void process_tcp(
(data->rate_estimate_bps[rate_index] > 0 ||
data->rate_estimate_bps[other_rate_index] > 0 )
) {
__u64 elapsed = dissector->now - data->ts_change_time[other_rate_index];
if (elapsed < TWO_SECONDS_IN_NANOS) {
struct flowbee_event event = {
.key = key,
.round_trip_time = dissector->now - data->ts_change_time[other_rate_index],
.effective_direction = direction
.round_trip_time = elapsed,
.effective_direction = rate_index
};
bpf_ringbuf_output(&flowbee_events, &event, sizeof(event), 0);
/*
__u64 elapsed = dissector->now - data->ts_change_time[other_rate_index];
__u16 rtt_in_ms10 = elapsed / MS_IN_NANOS_T10;
if (elapsed < TWO_SECONDS_IN_NANOS && rtt_in_ms10 > 0 && rtt_in_ms10 < 2000) {
//bpf_debug("[FLOWS][%d] RTT: %u", direction, rtt_in_ms10);
__u8 entry = data->rtt_index[rate_index];
if (entry < RTT_RING_SIZE)
data->rtt_ringbuffer[rate_index][entry] = rtt_in_ms10;
data->rtt_index[rate_index] = (entry + 1) % RTT_RING_SIZE;
}
//bpf_debug("[FLOWS][%d] RTT: %llu", direction, elapsed);
*/
}
data->ts_change_time[rate_index] = dissector->now;

View File

@@ -37,6 +37,7 @@ bincode = "1"
ip_network_table = "0"
ip_network = "0"
zerocopy = {version = "0.6.1", features = [ "simd" ] }
fxhash = "0.2.1"
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]

View File

@@ -1,6 +1,6 @@
use super::{get_asn_lat_lon, get_asn_name_and_country, FlowAnalysis};
use crate::throughput_tracker::flow_data::FlowbeeRecipient;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use crate::throughput_tracker::flow_data::{FlowbeeLocalData, FlowbeeRecipient};
use lqos_sys::flowbee_data::FlowbeeKey;
use once_cell::sync::Lazy;
use std::sync::{Arc, Mutex};
@@ -10,7 +10,7 @@ pub struct TimeBuffer {
struct TimeEntry {
time: u64,
data: (FlowbeeKey, FlowbeeData, FlowAnalysis),
data: (FlowbeeKey, FlowbeeLocalData, FlowAnalysis),
}
impl TimeBuffer {
@@ -150,7 +150,7 @@ impl FinishedFlowAnalysis {
}
impl FlowbeeRecipient for FinishedFlowAnalysis {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, analysis: FlowAnalysis) {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis) {
log::info!("Finished flow analysis");
RECENT_FLOWS.push(TimeEntry {
time: std::time::SystemTime::now()

View File

@@ -1,6 +1,68 @@
use std::{ffi::c_void, slice};
use zerocopy::FromBytes;
//! Connects to the flows.h "flowbee_events" ring buffer and processes the events.
use crate::throughput_tracker::flow_data::flow_analysis::rtt_types::RttData;
use fxhash::FxHashMap;
use lqos_sys::flowbee_data::FlowbeeKey;
use lqos_utils::unix_time::time_since_boot;
use once_cell::sync::Lazy;
use std::{
ffi::c_void,
slice,
sync::{atomic::AtomicU64, Mutex},
time::Duration,
};
use zerocopy::FromBytes;
static EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
static EVENTS_PER_SECOND: AtomicU64 = AtomicU64::new(0);
const BUFFER_SIZE: usize = 1024;
struct RttBuffer {
index: usize,
buffer: [[RttData; BUFFER_SIZE]; 2],
last_seen: u64,
}
impl RttBuffer {
fn new(reading: u64, direction: u32, last_seen: u64) -> Self {
let empty = [RttData::from_nanos(0); BUFFER_SIZE];
let mut filled = [RttData::from_nanos(0); BUFFER_SIZE];
filled[0] = RttData::from_nanos(reading);
if direction == 0 {
Self {
index: 1,
buffer: [empty, filled],
last_seen,
}
} else {
Self {
index: 0,
buffer: [filled, empty],
last_seen,
}
}
}
fn push(&mut self, reading: u64, direction: u32, last_seen: u64) {
self.buffer[direction as usize][self.index] = RttData::from_nanos(reading);
self.index = (self.index + 1) % BUFFER_SIZE;
self.last_seen = last_seen;
}
fn median(&self) -> RttData {
let mut sorted = self.buffer[0].iter().filter(|x| x.as_nanos() > 0).collect::<Vec<_>>();
if sorted.is_empty() {
return RttData::from_nanos(0);
}
sorted.sort_unstable();
let mid = sorted.len() / 2;
*sorted[mid]
}
}
static FLOW_RTT: Lazy<Mutex<FxHashMap<FlowbeeKey, RttBuffer>>> =
Lazy::new(|| Mutex::new(FxHashMap::default()));
#[repr(C)]
#[derive(FromBytes, Debug, Clone, PartialEq, Eq, Hash)]
@@ -16,8 +78,6 @@ pub unsafe extern "C" fn flowbee_handle_events(
data: *mut c_void,
data_size: usize,
) -> i32 {
println!("Event received");
const EVENT_SIZE: usize = std::mem::size_of::<FlowbeeEvent>();
if data_size < EVENT_SIZE {
log::warn!("Warning: incoming data too small in Flowbee buffer");
@@ -27,26 +87,53 @@ pub unsafe extern "C" fn flowbee_handle_events(
let data_u8 = data as *const u8;
let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
if let Some(incoming) = FlowbeeEvent::read_from(data_slice) {
println!("RTT: {}, Direction: {}", incoming.rtt, incoming.effective_direction);
EVENT_COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let mut lock = FLOW_RTT.lock().unwrap();
if let Some(entry) = lock.get_mut(&incoming.key) {
entry.push(
incoming.rtt,
incoming.effective_direction,
since_boot.as_nanos() as u64,
);
} else {
lock.insert(
incoming.key,
RttBuffer::new(
incoming.rtt,
incoming.effective_direction,
since_boot.as_nanos() as u64,
),
);
}
}
} else {
log::error!("Failed to decode Flowbee Event");
}
/*const EVENT_SIZE: usize = std::mem::size_of::<HeimdallEvent>();
if data_size < EVENT_SIZE {
log::warn!("Warning: incoming data too small in Heimdall buffer");
return 0;
}
//COLLECTED_EVENTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data_u8 = data as *const u8;
let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
if let Some(incoming) = HeimdallEvent::read_from(data_slice) {
store_on_timeline(incoming);
} else {
println!("Failed to decode");
}*/
0
}
pub fn get_flowbee_event_count_and_reset() -> u64 {
let count = EVENT_COUNT.swap(0, std::sync::atomic::Ordering::Relaxed);
EVENTS_PER_SECOND.store(count, std::sync::atomic::Ordering::Relaxed);
count
}
pub fn expire_rtt_flows() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(30)).as_nanos() as u64;
let mut lock = FLOW_RTT.lock().unwrap();
lock.retain(|_, v| v.last_seen > expire);
}
}
pub fn flowbee_rtt_map() -> FxHashMap<FlowbeeKey, RttData> {
let lock = FLOW_RTT.lock().unwrap();
lock.iter()
.map(|(k, v)| (k.clone(), v.median()))
.collect()
}

View File

@@ -10,6 +10,8 @@ pub use finished_flows::FinishedFlowAnalysis;
pub use finished_flows::RECENT_FLOWS;
mod kernel_ringbuffer;
pub use kernel_ringbuffer::*;
mod rtt_types;
pub use rtt_types::RttData;
static ANALYSIS: Lazy<FlowAnalysisSystem> = Lazy::new(|| FlowAnalysisSystem::new());

View File

@@ -0,0 +1,38 @@
//! Provides a set of types for representing round-trip time (RTT) data,
//! as produced by the eBPF system and consumed in different ways.
//!
//! Adopting strong-typing is an attempt to reduce confusion with
//! multipliers, divisors, etc. It is intended to become pervasive
//! throughout the system.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct RttData {
nanoseconds: u64,
}
#[allow(dead_code)]
impl RttData {
pub fn from_nanos(nanoseconds: u64) -> Self {
Self { nanoseconds }
}
pub fn as_nanos(&self) -> u64 {
self.nanoseconds
}
pub fn as_micros(&self) -> f64 {
self.nanoseconds as f64 / 1_000.0
}
pub fn as_millis(&self) -> f64 {
self.nanoseconds as f64 / 1_000_000.0
}
pub fn as_millis_times_100(&self) -> f64 {
self.nanoseconds as f64 / 10_000.0
}
pub fn as_seconds(&self) -> f64 {
self.nanoseconds as f64 / 1_000_000_000.0
}
}

View File

@@ -1,7 +1,7 @@
//! Provides a globally accessible vector of all flows. This is used to store
//! all flows for the purpose of tracking and data-services.
use super::flow_analysis::FlowAnalysis;
use super::{flow_analysis::FlowAnalysis, RttData};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use once_cell::sync::Lazy;
use std::{collections::HashMap, sync::Mutex};
@@ -9,5 +9,51 @@ use std::{collections::HashMap, sync::Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct AsnId(pub u32);
pub static ALL_FLOWS: Lazy<Mutex<HashMap<FlowbeeKey, (FlowbeeData, FlowAnalysis)>>> =
pub static ALL_FLOWS: Lazy<Mutex<HashMap<FlowbeeKey, (FlowbeeLocalData, FlowAnalysis)>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
/// Condensed representation of the FlowbeeData type. This contains
/// only the information we want to keep locally for analysis purposes,
/// adds RTT data, and uses Rust-friendly typing.
#[derive(Debug, Clone)]
pub struct FlowbeeLocalData {
/// Time (nanos) when the connection was established
pub start_time: u64,
/// Time (nanos) when the connection was last seen
pub last_seen: u64,
/// Bytes transmitted
pub bytes_sent: [u64; 2],
/// Packets transmitted
pub packets_sent: [u64; 2],
/// Rate estimate
pub rate_estimate_bps: [u32; 2],
/// TCP Retransmission count (also counts duplicates)
pub tcp_retransmits: [u16; 2],
/// Has the connection ended?
/// 0 = Alive, 1 = FIN, 2 = RST
pub end_status: u8,
/// Raw IP TOS
pub tos: u8,
/// Raw TCP flags
pub flags: u8,
/// Recent RTT median
pub rtt: RttData,
}
impl From<&FlowbeeData> for FlowbeeLocalData {
fn from(data: &FlowbeeData) -> Self {
Self {
start_time: data.start_time,
last_seen: data.last_seen,
bytes_sent: data.bytes_sent,
packets_sent: data.packets_sent,
rate_estimate_bps: data.rate_estimate_bps,
tcp_retransmits: data.tcp_retransmits,
end_status: data.end_status,
tos: data.tos,
flags: data.flags,
rtt: RttData::from_nanos(0),
}
}
}

View File

@@ -7,22 +7,25 @@ mod netflow9;
mod flow_analysis;
use crate::throughput_tracker::flow_data::{flow_analysis::FinishedFlowAnalysis, netflow5::Netflow5, netflow9::Netflow9};
pub(crate) use flow_tracker::{ALL_FLOWS, AsnId};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
pub(crate) use flow_tracker::{ALL_FLOWS, AsnId, FlowbeeLocalData};
use lqos_sys::flowbee_data::FlowbeeKey;
use std::sync::{
mpsc::{channel, Sender},
Arc,
};
pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country, FlowAnalysis, RECENT_FLOWS, flowbee_handle_events};
pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country,
FlowAnalysis, RECENT_FLOWS, flowbee_handle_events, get_flowbee_event_count_and_reset,
expire_rtt_flows, flowbee_rtt_map, RttData,
};
trait FlowbeeRecipient {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, analysis: FlowAnalysis);
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, analysis: FlowAnalysis);
}
// Creates the netflow tracker and returns the sender
pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))> {
let (tx, rx) = channel::<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>();
pub fn setup_netflow_tracker() -> Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))> {
let (tx, rx) = channel::<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>();
let config = lqos_config::load_config().unwrap();
std::thread::spawn(move || {

View File

@@ -1,8 +1,8 @@
//! Support for the Netflow 5 protocol
//! Mostly taken from: https://netflow.caligare.com/netflow_v5.htm
mod protocol;
use super::{FlowAnalysis, FlowbeeRecipient};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use super::{FlowAnalysis, FlowbeeLocalData, FlowbeeRecipient};
use lqos_sys::flowbee_data::FlowbeeKey;
pub(crate) use protocol::*;
use std::{
net::UdpSocket,
@@ -13,7 +13,7 @@ pub(crate) struct Netflow5 {
socket: UdpSocket,
sequence: AtomicU32,
target: String,
send_queue: Mutex<Vec<(FlowbeeKey, FlowbeeData)>>,
send_queue: Mutex<Vec<(FlowbeeKey, FlowbeeLocalData)>>,
}
impl Netflow5 {
@@ -83,7 +83,7 @@ impl Netflow5 {
}
impl FlowbeeRecipient for Netflow5 {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, _analysis: FlowAnalysis) {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, _analysis: FlowAnalysis) {
let mut lock = self.send_queue.lock().unwrap();
lock.push((key, data));
}

View File

@@ -1,10 +1,12 @@
//! Definitions for the actual netflow 5 protocol
use std::net::IpAddr;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use lqos_sys::flowbee_data::FlowbeeKey;
use lqos_utils::unix_time::time_since_boot;
use nix::sys::time::TimeValLike;
use crate::throughput_tracker::flow_data::FlowbeeLocalData;
/// Standard Netflow 5 header
#[repr(C)]
pub(crate) struct Netflow5Header {
@@ -64,7 +66,7 @@ pub(crate) struct Netflow5Record {
}
/// Convert a Flowbee key and data to a pair of Netflow 5 records
pub(crate) fn to_netflow_5(key: &FlowbeeKey, data: &FlowbeeData) -> anyhow::Result<(Netflow5Record, Netflow5Record)> {
pub(crate) fn to_netflow_5(key: &FlowbeeKey, data: &FlowbeeLocalData) -> anyhow::Result<(Netflow5Record, Netflow5Record)> {
// TODO: Detect overflow
let local = key.local_ip.as_ip();
let remote = key.remote_ip.as_ip();

View File

@@ -1,18 +1,18 @@
use crate::throughput_tracker::flow_data::netflow9::protocol::{
header::Netflow9Header, template_ipv4::template_data_ipv4, template_ipv6::template_data_ipv6,
};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use lqos_sys::flowbee_data::FlowbeeKey;
use std::{net::UdpSocket, sync::{atomic::AtomicU32, Arc, Mutex}};
use self::protocol::to_netflow_9;
use super::{FlowAnalysis, FlowbeeRecipient};
use super::{FlowAnalysis, FlowbeeLocalData, FlowbeeRecipient};
mod protocol;
pub(crate) struct Netflow9 {
socket: UdpSocket,
sequence: AtomicU32,
target: String,
send_queue: Mutex<Vec<(FlowbeeKey, FlowbeeData)>>,
send_queue: Mutex<Vec<(FlowbeeKey, FlowbeeLocalData)>>,
}
impl Netflow9 {
@@ -66,7 +66,7 @@ impl Netflow9 {
}
impl FlowbeeRecipient for Netflow9 {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeData, _analysis: FlowAnalysis) {
fn enqueue(&self, key: FlowbeeKey, data: FlowbeeLocalData, _analysis: FlowAnalysis) {
let mut lock = self.send_queue.lock().unwrap();
lock.push((key, data));
}

View File

@@ -1,9 +1,9 @@
use std::net::IpAddr;
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use lqos_sys::flowbee_data::FlowbeeKey;
use crate::throughput_tracker::flow_data::FlowbeeLocalData;
use super::field_types::*;
pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: usize, key: &FlowbeeKey, data: &FlowbeeData) -> anyhow::Result<Vec<u8>> {
pub(crate) fn encode_fields_from_template(template: &[(u16, u16)], direction: usize, key: &FlowbeeKey, data: &FlowbeeLocalData) -> anyhow::Result<Vec<u8>> {
let src_port = if direction == 0 { key.src_port } else { key.dst_port };
let dst_port = if direction == 0 { key.dst_port } else { key.src_port };

View File

@@ -1,9 +1,11 @@
//! Protocol definitions for Netflow v9 Data.
//! Mostly derived from https://netflow.caligare.com/netflow_v9.htm
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use lqos_sys::flowbee_data::FlowbeeKey;
mod field_types;
use field_types::*;
use crate::throughput_tracker::flow_data::FlowbeeLocalData;
pub(crate) mod field_encoder;
pub(crate) mod header;
pub(crate) mod template_ipv4;
@@ -16,7 +18,7 @@ fn add_field(bytes: &mut Vec<u8>, field_type: u16, field_length: u16) {
pub(crate) fn to_netflow_9(
key: &FlowbeeKey,
data: &FlowbeeData,
data: &FlowbeeLocalData,
) -> anyhow::Result<(Vec<u8>, Vec<u8>)> {
if key.local_ip.is_v4() && key.remote_ip.is_v4() {
// Return IPv4 records
@@ -29,7 +31,7 @@ pub(crate) fn to_netflow_9(
}
}
fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result<Vec<u8>> {
fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeLocalData, direction: usize) -> anyhow::Result<Vec<u8>> {
let field_bytes = field_encoder::encode_fields_from_template(
&template_ipv4::FIELDS_IPV4,
direction,
@@ -63,7 +65,7 @@ fn ipv4_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow
Ok(bytes)
}
fn ipv6_record(key: &FlowbeeKey, data: &FlowbeeData, direction: usize) -> anyhow::Result<Vec<u8>> {
fn ipv6_record(key: &FlowbeeKey, data: &FlowbeeLocalData, direction: usize) -> anyhow::Result<Vec<u8>> {
let field_bytes = field_encoder::encode_fields_from_template(
&template_ipv6::FIELDS_IPV6,
direction,

View File

@@ -3,6 +3,7 @@ mod throughput_entry;
mod tracking_data;
use std::net::IpAddr;
use self::flow_data::{get_asn_name_and_country, FlowAnalysis, FlowbeeLocalData, ALL_FLOWS};
use crate::{
long_term_stats::get_network_tree,
shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES, STATS_NEEDS_NEW_SHAPED_DEVICES},
@@ -11,7 +12,7 @@ use crate::{
};
use log::{info, warn};
use lqos_bus::{BusResponse, FlowbeeProtocol, IpStats, TcHandle, TopFlowType, XdpPpingResult};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};
use lqos_sys::flowbee_data::FlowbeeKey;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use lts_client::collector::{HostSummary, StatsUpdateMessage, ThroughputSummary};
use once_cell::sync::Lazy;
@@ -19,7 +20,6 @@ use tokio::{
sync::mpsc::Sender,
time::{Duration, Instant},
};
use self::flow_data::{get_asn_name_and_country, FlowAnalysis, ALL_FLOWS};
const RETIRE_AFTER_SECONDS: u64 = 30;
@@ -34,7 +34,7 @@ pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> = Lazy::new(ThroughputTra
/// collection thread that there is fresh data.
pub async fn spawn_throughput_monitor(
long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
) {
info!("Starting the bandwidth monitor thread.");
let interval_ms = 1000; // 1 second
@@ -49,7 +49,7 @@ pub async fn spawn_throughput_monitor(
async fn throughput_task(
interval_ms: u64,
long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
) {
// Obtain the flow timeout from the config, default to 30 seconds
let timeout_seconds = if let Ok(config) = lqos_config::load_config() {
@@ -502,12 +502,13 @@ pub fn all_unknown_ips() -> BusResponse {
/// For debugging: dump all active flows!
pub fn dump_active_flows() -> BusResponse {
let lock = ALL_FLOWS.lock().unwrap();
let result: Vec<lqos_bus::FlowbeeData> = lock
let result: Vec<lqos_bus::FlowbeeSummaryData> = lock
.iter()
.map(|(key, row)| {
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip());
let (remote_asn_name, remote_asn_country) =
get_asn_name_and_country(key.remote_ip.as_ip());
lqos_bus::FlowbeeData {
lqos_bus::FlowbeeSummaryData {
remote_ip: key.remote_ip.as_ip().to_string(),
local_ip: key.local_ip.as_ip().to_string(),
src_port: key.src_port,
@@ -524,6 +525,9 @@ pub fn dump_active_flows() -> BusResponse {
remote_asn_name,
remote_asn_country,
analysis: row.1.protocol_analysis.to_string(),
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: row.0.rtt.as_nanos(),
}
})
.collect();
@@ -540,7 +544,7 @@ pub fn count_active_flows() -> BusResponse {
/// Top Flows Report
pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
let lock = ALL_FLOWS.lock().unwrap();
let mut table: Vec<(FlowbeeKey, (FlowbeeData, FlowAnalysis))> = lock
let mut table: Vec<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))> = lock
.iter()
.map(|(key, value)| (key.clone(), value.clone()))
.collect();
@@ -577,9 +581,9 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
}
TopFlowType::RoundTripTime => {
table.sort_by(|a, b| {
let a_total = 0.0;
let b_total = 0.0;
b_total.partial_cmp(&a_total).unwrap()
let a_total = a.1 .0.rtt;
let b_total = b.1 .0.rtt;
a_total.cmp(&b_total)
});
}
}
@@ -588,8 +592,9 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
.iter()
.take(n as usize)
.map(|(ip, flow)| {
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(ip.remote_ip.as_ip());
lqos_bus::FlowbeeData {
let (remote_asn_name, remote_asn_country) =
get_asn_name_and_country(ip.remote_ip.as_ip());
lqos_bus::FlowbeeSummaryData {
remote_ip: ip.remote_ip.as_ip().to_string(),
local_ip: ip.local_ip.as_ip().to_string(),
src_port: ip.src_port,
@@ -606,6 +611,9 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
remote_asn_name,
remote_asn_country,
analysis: flow.1.protocol_analysis.to_string(),
last_seen: flow.0.last_seen,
start_time: flow.0.start_time,
rtt_nanos: flow.0.rtt.as_nanos(),
}
})
.collect();
@@ -622,9 +630,10 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
.iter()
.filter(|(key, _)| key.local_ip == ip)
.map(|(key, row)| {
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip());
let (remote_asn_name, remote_asn_country) =
get_asn_name_and_country(key.remote_ip.as_ip());
lqos_bus::FlowbeeData {
lqos_bus::FlowbeeSummaryData {
remote_ip: key.remote_ip.as_ip().to_string(),
local_ip: key.local_ip.as_ip().to_string(),
src_port: key.src_port,
@@ -641,6 +650,9 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
remote_asn_name,
remote_asn_country,
analysis: row.1.protocol_analysis.to_string(),
last_seen: row.0.last_seen,
start_time: row.0.start_time,
rtt_nanos: row.0.rtt.as_nanos(),
}
})
.collect();

View File

@@ -1,9 +1,9 @@
use std::{sync::atomic::AtomicU64, time::Duration};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
use super::{flow_data::{FlowAnalysis, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use crate::{shaped_devices_tracker::{NETWORK_JSON, SHAPED_DEVICES}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}, throughput_tracker::flow_data::{expire_rtt_flows, flowbee_rtt_map}};
use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, FlowbeeLocalData, RttData, ALL_FLOWS}, throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
use lqos_sys::{flowbee_data::{FlowbeeData, FlowbeeKey}, iterate_flows, throughput_for_each};
use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
pub struct ThroughputTracker {
@@ -172,18 +172,21 @@ impl ThroughputTracker {
&self,
timeout_seconds: u64,
_netflow_enabled: bool,
sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeData, FlowAnalysis))>,
sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
) {
//log::debug!("Flowbee events this second: {}", get_flowbee_event_count_and_reset());
let self_cycle = self.cycle.load(std::sync::atomic::Ordering::Relaxed);
if let Ok(now) = time_since_boot() {
let rtt_samples = flowbee_rtt_map();
get_flowbee_event_count_and_reset();
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(timeout_seconds)).as_nanos() as u64;
// Track the expired keys
let mut expired_keys = Vec::new();
let mut lock = ALL_FLOWS.lock().unwrap();
let mut all_flows_lock = ALL_FLOWS.lock().unwrap();
// Track through all the flows
iterate_flows(&mut |key, data| {
@@ -196,7 +199,7 @@ impl ThroughputTracker {
expired_keys.push(key.clone());
} else {
// We have a valid flow, so it needs to be tracked
if let Some(this_flow) = lock.get_mut(&key) {
if let Some(this_flow) = all_flows_lock.get_mut(&key) {
this_flow.0.last_seen = data.last_seen;
this_flow.0.bytes_sent = data.bytes_sent;
this_flow.0.packets_sent = data.packets_sent;
@@ -205,23 +208,23 @@ impl ThroughputTracker {
this_flow.0.end_status = data.end_status;
this_flow.0.tos = data.tos;
this_flow.0.flags = data.flags;
this_flow.0.rtt = rtt_samples.get(&key).copied().unwrap_or(RttData::from_nanos(0)).clone();
} else {
// Insert it into the map
let flow_analysis = FlowAnalysis::new(&key);
lock.insert(key.clone(), (data.clone(), flow_analysis));
all_flows_lock.insert(key.clone(), (data.into(), flow_analysis));
}
// TCP - we have RTT data? 6 is TCP
if key.ip_protocol == 6 && data.end_status == 0 {
if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) {
/*for rtt in data.median_pair().iter() {
if *rtt > 0.0 {
println!("RTT: {rtt:?}");
if let Some(rtt) = rtt_samples.get(&key) {
if rtt.as_nanos() > 0 {
// Shift left
for i in 1..60 {
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
}
tracker.recent_rtt_data[0] = *rtt as u32;
tracker.recent_rtt_data[0] = rtt.as_millis_times_100() as u32;
tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
@@ -230,7 +233,7 @@ impl ThroughputTracker {
}
}
}
}*/
}
if data.end_status != 0 {
// The flow has ended. We need to remove it from the map.
@@ -244,11 +247,11 @@ impl ThroughputTracker {
if !expired_keys.is_empty() {
for key in expired_keys.iter() {
// Send it off to netperf for analysis if we are supporting doing so.
if let Some(d) = lock.get(&key) {
if let Some(d) = all_flows_lock.get(&key) {
let _ = sender.send((key.clone(), (d.0.clone(), d.1.clone())));
}
// Remove the flow from circulation
lock.remove(&key);
all_flows_lock.remove(&key);
}
let ret = lqos_sys::end_flows(&mut expired_keys);
@@ -258,7 +261,8 @@ impl ThroughputTracker {
}
// Cleaning run
lock.retain(|_k,v| v.0.last_seen >= expire);
all_flows_lock.retain(|_k,v| v.0.last_seen >= expire);
expire_rtt_flows();
}
}