WIP: You can now view the last 10 seconds of flow data in the API.

This commit is contained in:
Herbert Wolverson 2023-03-14 19:16:04 +00:00
parent 8f3a8a78eb
commit 7180eb48dc
19 changed files with 269 additions and 28 deletions

3
src/rust/Cargo.lock generated
View File

@ -1432,8 +1432,11 @@ dependencies = [
name = "lqos_heimdall"
version = "0.1.0"
dependencies = [
"dashmap",
"log",
"lqos_bus",
"lqos_utils",
"once_cell",
"zerocopy",
]

View File

@ -136,6 +136,9 @@ pub enum BusRequest {
/// Tell me flow stats for a given IP address
GetFlowStats(String),
/// Give me a dump of the last 10 seconds of packet headers
GetPacketHeaderDump(String),
/// If running on Equinix (the `equinix_test` feature is enabled),
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]

View File

@ -1,4 +1,4 @@
use crate::{IpMapping, IpStats, XdpPpingResult, FlowTransport};
use crate::{IpMapping, IpStats, XdpPpingResult, FlowTransport, ip_stats::PacketHeader};
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
@ -89,4 +89,7 @@ pub enum BusResponse {
/// Flow Data
FlowData(Vec<(FlowTransport, Option<FlowTransport>)>),
/// Packet header dump
PacketDump(Vec<PacketHeader>),
}

View File

@ -112,4 +112,17 @@ pub fn tos_parser(tos: u8) -> (u8, u8) {
let ecn = tos & ECN;
let dscp = (tos & DSCP) >> 2;
(dscp, ecn)
}
/// Packet header dump
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
pub struct PacketHeader {
pub timestamp: u64,
pub src: String,
pub dst: String,
pub src_port : u16,
pub dst_port: u16,
pub ip_protocol: u8,
pub tos: u8,
pub size: u32,
}

View File

@ -12,7 +12,7 @@
#![warn(missing_docs)]
mod bus;
mod ip_stats;
pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport, tos_parser};
pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport, tos_parser, PacketHeader};
mod tc_handle;
pub use bus::{
bus_request, decode_request, decode_response, encode_request,

View File

@ -5,5 +5,8 @@ edition = "2021"
[dependencies]
lqos_utils = { path = "../lqos_utils" }
lqos_bus = { path = "../lqos_bus" }
log = "0"
zerocopy = {version = "0.6.1", features = [ "simd" ] }
once_cell = "1.17.1"
dashmap = "5.4.0"

View File

@ -0,0 +1,118 @@
use crate::{perf_interface::HeimdallEvent, timeline::expire_timeline};
use dashmap::DashMap;
use lqos_bus::{tos_parser, BusResponse, FlowTransport};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::{collections::HashSet, time::Duration};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct FlowKey {
src: XdpIpAddress,
dst: XdpIpAddress,
proto: u8,
src_port: u16,
dst_port: u16,
}
#[derive(Clone, Debug)]
struct FlowData {
last_seen: u64,
bytes: u64,
packets: u64,
tos: u8,
}
impl From<&HeimdallEvent> for FlowKey {
fn from(value: &HeimdallEvent) -> Self {
Self {
src: value.src,
dst: value.dst,
proto: value.ip_protocol,
src_port: value.src_port,
dst_port: value.dst_port,
}
}
}
static FLOW_DATA: Lazy<DashMap<FlowKey, FlowData>> = Lazy::new(DashMap::new);
pub(crate) fn record_flow(event: &HeimdallEvent) {
let key: FlowKey = event.into();
if let Some(mut data) = FLOW_DATA.get_mut(&key) {
data.last_seen = event.timestamp;
data.packets += 1;
data.bytes += event.size as u64;
data.tos = event.tos;
} else {
FLOW_DATA.insert(
key,
FlowData {
last_seen: event.timestamp,
bytes: event.size.into(),
packets: 1,
tos: event.tos,
},
);
}
}
pub fn expire_heimdall_flows() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let thirty_secs_ago = since_boot - Duration::from_secs(30);
let expire = thirty_secs_ago.as_nanos() as u64;
FLOW_DATA.retain(|_k, v| v.last_seen > expire);
expire_timeline();
}
}
pub fn get_flow_stats(ip: XdpIpAddress) -> BusResponse {
let mut result = Vec::new();
// Obtain all the flows
let mut all_flows = Vec::new();
for value in FLOW_DATA.iter() {
let key = value.key();
if key.src == ip || key.dst == ip {
let (dscp, ecn) = tos_parser(value.tos);
all_flows.push(FlowTransport {
src: key.src.as_ip().to_string(),
dst: key.dst.as_ip().to_string(),
src_port: key.src_port,
dst_port: key.dst_port,
proto: match key.proto {
6 => lqos_bus::FlowProto::TCP,
17 => lqos_bus::FlowProto::UDP,
_ => lqos_bus::FlowProto::ICMP,
},
bytes: value.bytes,
packets: value.packets,
dscp,
ecn,
});
}
}
// Turn them into reciprocal pairs
let mut done = HashSet::new();
for (i, flow) in all_flows.iter().enumerate() {
if !done.contains(&i) {
let flow_a = flow.clone();
let flow_b = if let Some(flow_b) = all_flows
.iter()
.position(|f| f.src == flow_a.dst && f.src_port == flow_a.dst_port)
{
done.insert(flow_b);
Some(all_flows[flow_b].clone())
} else {
None
};
result.push((flow_a, flow_b));
}
}
result.sort_by(|a, b| b.0.bytes.cmp(&a.0.bytes));
BusResponse::FlowData(result)
}

View File

@ -6,3 +6,7 @@ mod config;
pub mod perf_interface;
pub mod stats;
pub use config::{HeimdallMode, HeimdalConfig};
mod flows;
pub use flows::{expire_heimdall_flows, get_flow_stats};
mod timeline;
pub use timeline::ten_second_packet_dump;

View File

@ -2,17 +2,19 @@ use std::{ffi::c_void, slice};
use lqos_utils::XdpIpAddress;
use zerocopy::FromBytes;
#[derive(FromBytes, Debug)]
use crate::{flows::record_flow, timeline::store_on_timeline};
#[derive(FromBytes, Debug, Clone)]
#[repr(C)]
struct HeimdallEvent {
timestamp: u64,
src: XdpIpAddress,
dst: XdpIpAddress,
src_port : u16,
dst_port: u16,
ip_protocol: u8,
tos: u8,
size: u32,
pub struct HeimdallEvent {
pub timestamp: u64,
pub src: XdpIpAddress,
pub dst: XdpIpAddress,
pub src_port : u16,
pub dst_port: u16,
pub ip_protocol: u8,
pub tos: u8,
pub size: u32,
}
/// Callback for the Heimdall Perf map system. Called whenever Heimdall has
@ -39,7 +41,8 @@ pub unsafe extern "C" fn heimdall_handle_events(
let data_slice : &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
if let Some(incoming) = HeimdallEvent::read_from(data_slice) {
//println!("{incoming:?}");
record_flow(&incoming);
store_on_timeline(incoming);
} else {
println!("Failed to decode");
}

View File

@ -0,0 +1,59 @@
use std::{sync::RwLock, time::Duration};
use lqos_bus::PacketHeader;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use crate::perf_interface::HeimdallEvent;
impl From<&HeimdallEvent> for PacketHeader {
fn from(value: &HeimdallEvent) -> Self {
Self {
timestamp: value.timestamp,
src: value.src.as_ip().to_string(),
dst: value.dst.as_ip().to_string(),
src_port: value.src_port,
dst_port: value.dst_port,
ip_protocol: value.ip_protocol,
tos: value.tos,
size: value.size,
}
}
}
struct Timeline {
data: RwLock<Vec<HeimdallEvent>>,
}
impl Timeline {
fn new() -> Self {
Self { data: RwLock::new(Vec::new()) }
}
}
static TIMELINE: Lazy<Timeline> = Lazy::new(Timeline::new);
pub(crate) fn store_on_timeline(event: HeimdallEvent) {
let mut lock = TIMELINE.data.write().unwrap();
lock.push(event); // We're moving here deliberately
}
pub(crate) fn expire_timeline() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let ten_secs_ago = since_boot - Duration::from_secs(10);
let expire = ten_secs_ago.as_nanos() as u64;
let mut lock = TIMELINE.data.write().unwrap();
lock.retain(|v| v.timestamp > expire);
}
}
pub fn ten_second_packet_dump(ip: XdpIpAddress) -> Vec<PacketHeader> {
TIMELINE
.data
.read()
.unwrap()
.iter()
.filter(|e| e.src == ip || e.dst == ip)
.map(|e| e.into())
.collect()
}

View File

@ -68,6 +68,7 @@ fn rocket() -> _ {
queue_info::current_circuit_throughput,
queue_info::watch_circuit,
queue_info::flow_stats,
queue_info::packet_dump,
config_control::get_nic_list,
config_control::get_current_python_config,
config_control::get_current_lqosd_config,

View File

@ -1,7 +1,7 @@
use crate::auth_guard::AuthGuard;
use crate::cache_control::NoCache;
use crate::tracker::SHAPED_DEVICES;
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport};
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport, PacketHeader};
use rocket::response::content::RawJson;
use rocket::serde::json::Json;
use rocket::serde::Serialize;
@ -109,9 +109,17 @@ pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<Json<Vec<(
result.extend_from_slice(flow);
}
}
NoCache::new(Json(result))
}
#[get("/api/packet_dump/<ip>")]
pub async fn packet_dump(ip: String, _auth: AuthGuard) -> NoCache<Json<Vec<PacketHeader>>> {
let mut result = Vec::new();
for r in bus_request(vec![BusRequest::GetPacketHeaderDump(ip)]).await.unwrap() {
if let BusResponse::PacketDump(packets) = r {
result.extend(packets);
}
}
NoCache::new(Json(result))
}

View File

@ -69,11 +69,14 @@ static __always_inline __u8 get_heimdall_mode()
static __always_inline bool is_heimdall_watching(struct dissector_t *dissector)
{
__u32 *watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->src_ip);
if (watching)
if (watching) {
return true;
}
watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->dst_ip);
if (watching)
if (watching) {
return true;
}
return false;
}

View File

@ -135,14 +135,13 @@ int xdp_prog(struct xdp_md *ctx)
// Send on its way
update_heimdall(&dissector, ctx->data_end - ctx->data);
if (tc_handle != 0) {
// Send data to Heimdall
if (heimdall_mode == 2 || (heimdall_mode==1 && is_heimdall_watching(&dissector))) {
#ifdef VERBOSE
bpf_debug("(XDP) Storing Heimdall Data");
#endif
//update_heimdall(&dissector, ctx->data_end - ctx->data);
update_heimdall(&dissector, ctx->data_end - ctx->data);
}
// Handle CPU redirection if there is one specified

View File

@ -3,7 +3,7 @@ mod version;
use std::{time::Duration, net::TcpStream, io::Write};
use lqos_bus::anonymous::{AnonymousUsageV1, build_stats};
use lqos_config::{EtcLqos, LibreQoSConfig};
use lqos_sys::libbpf_num_possible_cpus;
use lqos_sys::num_possible_cpus;
use sysinfo::{System, SystemExt, CpuExt};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
@ -36,7 +36,7 @@ fn anonymous_usage_dump() -> anyhow::Result<()> {
if let Some(kernel) = sys.kernel_version() {
data.kernel_version = kernel;
}
data.usable_cores = unsafe { libbpf_num_possible_cpus() } as u32;
data.usable_cores = num_possible_cpus().unwrap_or(0);
let cpu = sys.cpus().first();
if let Some(cpu) = cpu {
data.cpu_brand = cpu.brand().to_string();

View File

@ -8,6 +8,8 @@ mod throughput_tracker;
mod anonymous_usage;
mod tuning;
mod validation;
use std::net::IpAddr;
use crate::{
file_lock::FileLock,
ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow},
@ -16,12 +18,13 @@ use anyhow::Result;
use log::{info, warn};
use lqos_bus::{BusRequest, BusResponse, UnixSocketServer};
use lqos_config::LibreQoSConfig;
use lqos_heimdall::HeimdallMode;
use lqos_heimdall::{ten_second_packet_dump, HeimdallMode};
use lqos_queue_tracker::{
add_watched_queue, get_raw_circuit_data, spawn_queue_monitor,
spawn_queue_structure_monitor,
};
use lqos_sys::{LibreQoSKernels, set_heimdall_mode};
use lqos_sys::{set_heimdall_mode, LibreQoSKernels};
use lqos_utils::XdpIpAddress;
use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals,
@ -192,8 +195,15 @@ fn handle_bus_requests(
tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed),
}
}
BusRequest::GetFlowStats(ip) => {
get_flow_stats(ip)
BusRequest::GetFlowStats(ip) => get_flow_stats(ip),
BusRequest::GetPacketHeaderDump(ip) => {
let ip = ip.parse::<IpAddr>();
if let Ok(ip) = ip {
let ip = XdpIpAddress::from_ip(ip);
BusResponse::PacketDump(ten_second_packet_dump(ip))
} else {
BusResponse::Fail("Invalid IP".to_string())
}
}
});
}

View File

@ -1,5 +1,14 @@
use std::net::IpAddr;
use lqos_bus::BusResponse;
use lqos_sys::heimdall_watch_ip;
use lqos_utils::XdpIpAddress;
pub fn get_flow_stats(_ip: &str) -> BusResponse {
pub fn get_flow_stats(ip: &str) -> BusResponse {
let ip = ip.parse::<IpAddr>();
if let Ok(ip) = ip {
let ip = XdpIpAddress::from_ip(ip);
heimdall_watch_ip(ip);
return lqos_heimdall::get_flow_stats(ip);
}
BusResponse::Fail("No Stats or bad IP".to_string())
}

View File

@ -2,6 +2,7 @@ mod throughput_entry;
mod tracking_data;
mod heimdall_data;
pub use heimdall_data::get_flow_stats;
use lqos_heimdall::expire_heimdall_flows;
use crate::{
shaped_devices_tracker::NETWORK_JSON,
throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS,
@ -30,6 +31,7 @@ pub fn spawn_throughput_monitor() {
let net_json = NETWORK_JSON.read().unwrap();
net_json.zero_throughput_and_rtt();
} // Scope to end the lock
expire_heimdall_flows();
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
THROUGHPUT_TRACKER.apply_new_throughput_counters();
THROUGHPUT_TRACKER.apply_rtt_data();

View File

@ -1,6 +1,6 @@
use std::sync::atomic::AtomicU64;
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS, heimdall_data::HEIMDALL};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
use lqos_sys::{rtt_for_each, throughput_for_each};