mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
VERY work in progress - do not try to use this. The good: creates a ringbuffer and successfully sends events to userspace. The bad: it's currently watching all packets, eats a lot of CPU and has a horrible lack of abstractions.
This commit is contained in:
parent
e97fb4ac11
commit
c70d01b3f6
34
src/rust/Cargo.lock
generated
34
src/rust/Cargo.lock
generated
@ -1428,6 +1428,15 @@ dependencies = [
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lqos_heimdall"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"log",
|
||||
"lqos_utils",
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lqos_node_manager"
|
||||
version = "0.1.0"
|
||||
@ -1502,6 +1511,7 @@ dependencies = [
|
||||
"log",
|
||||
"lqos_bus",
|
||||
"lqos_config",
|
||||
"lqos_heimdall",
|
||||
"lqos_utils",
|
||||
"nix",
|
||||
"once_cell",
|
||||
@ -1512,11 +1522,13 @@ dependencies = [
|
||||
name = "lqos_utils"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"log",
|
||||
"nix",
|
||||
"notify",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1530,6 +1542,7 @@ dependencies = [
|
||||
"log",
|
||||
"lqos_bus",
|
||||
"lqos_config",
|
||||
"lqos_heimdall",
|
||||
"lqos_queue_tracker",
|
||||
"lqos_sys",
|
||||
"lqos_utils",
|
||||
@ -3287,3 +3300,24 @@ name = "yansi"
|
||||
version = "0.5.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy"
|
||||
version = "0.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"zerocopy-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zerocopy-derive"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
@ -24,4 +24,5 @@ members = [
|
||||
"lqos_utils", # A collection of macros and helpers we find useful
|
||||
"lqos_setup", # A quick CLI setup for first-time users
|
||||
"lqos_anonymous_stats_server", # The server for gathering anonymous usage data.
|
||||
"lqos_heimdall", # Library for managing Heimdall flow watching
|
||||
]
|
||||
|
9
src/rust/lqos_heimdall/Cargo.toml
Normal file
9
src/rust/lqos_heimdall/Cargo.toml
Normal file
@ -0,0 +1,9 @@
|
||||
[package]
|
||||
name = "lqos_heimdall"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
lqos_utils = { path = "../lqos_utils" }
|
||||
log = "0"
|
||||
zerocopy = {version = "0.6.1", features = [ "simd" ] }
|
19
src/rust/lqos_heimdall/src/config.rs
Normal file
19
src/rust/lqos_heimdall/src/config.rs
Normal file
@ -0,0 +1,19 @@
|
||||
/// Currently unused, represents the current operation mode of the Heimdall
|
||||
/// sub-system. Defaults to 1.
|
||||
#[repr(u8)]
|
||||
pub enum HeimdallMode {
|
||||
/// Do not monitor
|
||||
Off = 0,
|
||||
/// Only look at flows on hosts we are watching via the circuit monitor
|
||||
WatchOnly = 1,
|
||||
/// Capture everything (this may set your CPU on fire)
|
||||
Analysis = 2,
|
||||
}
|
||||
|
||||
/// Configuration options passed to Heimdall
|
||||
#[derive(Default, Clone)]
|
||||
#[repr(C)]
|
||||
pub struct HeimdalConfig {
|
||||
/// Current operation mode
|
||||
pub mode: u32,
|
||||
}
|
8
src/rust/lqos_heimdall/src/lib.rs
Normal file
8
src/rust/lqos_heimdall/src/lib.rs
Normal file
@ -0,0 +1,8 @@
|
||||
//! Provides an interface to the Heimdall packet watching
|
||||
//! system. Heimdall watches traffic flows, and is notified
|
||||
//! about their contents via the eBPF Perf system.
|
||||
|
||||
mod config;
|
||||
pub mod perf_interface;
|
||||
pub mod stats;
|
||||
pub use config::{HeimdallMode, HeimdalConfig};
|
48
src/rust/lqos_heimdall/src/perf_interface.rs
Normal file
48
src/rust/lqos_heimdall/src/perf_interface.rs
Normal file
@ -0,0 +1,48 @@
|
||||
use std::{ffi::c_void, slice};
|
||||
use lqos_utils::XdpIpAddress;
|
||||
use zerocopy::FromBytes;
|
||||
|
||||
#[derive(FromBytes, Debug)]
|
||||
#[repr(C)]
|
||||
struct HeimdallEvent {
|
||||
timestamp: u64,
|
||||
src: XdpIpAddress,
|
||||
dst: XdpIpAddress,
|
||||
src_port : u16,
|
||||
dst_port: u16,
|
||||
ip_protocol: u8,
|
||||
tos: u8,
|
||||
size: u32,
|
||||
}
|
||||
|
||||
/// Callback for the Heimdall Perf map system. Called whenever Heimdall has
|
||||
/// events for the system to read.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// This function is inherently unsafe, because it interfaces directly with
|
||||
/// C and the Linux-kernel eBPF system.
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn heimdall_handle_events(
|
||||
_ctx: *mut c_void,
|
||||
data: *mut c_void,
|
||||
data_size: usize,
|
||||
) -> i32 {
|
||||
const EVENT_SIZE: usize = std::mem::size_of::<HeimdallEvent>();
|
||||
if data_size < EVENT_SIZE {
|
||||
log::warn!("Warning: incoming data too small in Heimdall buffer");
|
||||
return 0;
|
||||
}
|
||||
|
||||
//COLLECTED_EVENTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
let data_u8 = data as *const u8;
|
||||
let data_slice : &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
|
||||
|
||||
if let Some(incoming) = HeimdallEvent::read_from(data_slice) {
|
||||
//println!("{incoming:?}");
|
||||
} else {
|
||||
println!("Failed to decode");
|
||||
}
|
||||
|
||||
0
|
||||
}
|
6
src/rust/lqos_heimdall/src/stats.rs
Normal file
6
src/rust/lqos_heimdall/src/stats.rs
Normal file
@ -0,0 +1,6 @@
|
||||
//! Count statistics
|
||||
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
/// Perf event counter
|
||||
pub static COLLECTED_EVENTS: AtomicU64 = AtomicU64::new(0);
|
@ -15,6 +15,7 @@ lqos_utils = { path = "../lqos_utils" }
|
||||
once_cell = "1"
|
||||
dashmap = "5"
|
||||
thiserror = "1"
|
||||
lqos_heimdall = { path = "../lqos_heimdall" }
|
||||
|
||||
[build-dependencies]
|
||||
bindgen = "0"
|
||||
|
@ -13,6 +13,7 @@ struct heimdall_config_t
|
||||
__u32 monitor_mode; // 0 = Off, 1 = Targets only, 2 = Analysis Mode
|
||||
};
|
||||
|
||||
// Pinned map containing the Heimdall config
|
||||
struct
|
||||
{
|
||||
__uint(type, BPF_MAP_TYPE_ARRAY);
|
||||
@ -22,6 +23,8 @@ struct
|
||||
__uint(pinning, LIBBPF_PIN_BY_NAME);
|
||||
} heimdall_config SEC(".maps");
|
||||
|
||||
// Pinned map containing the IP addresses (in packed IPv6 format)
|
||||
// currently being watched by the Heimdall system.
|
||||
struct
|
||||
{
|
||||
__uint(type, BPF_MAP_TYPE_HASH);
|
||||
@ -31,39 +34,24 @@ struct
|
||||
__uint(pinning, LIBBPF_PIN_BY_NAME);
|
||||
} heimdall_watching SEC(".maps");
|
||||
|
||||
struct heimdall_key
|
||||
{
|
||||
// Perf map for communicating with userspace
|
||||
struct {
|
||||
__uint(type, BPF_MAP_TYPE_RINGBUF);
|
||||
__uint(max_entries, 256 * 1024 /* 256 KB */);
|
||||
} heimdall_events SEC(".maps");
|
||||
|
||||
// Basic event type to send to userspace
|
||||
struct heimdall_event {
|
||||
__u64 timetamp;
|
||||
struct in6_addr src;
|
||||
struct in6_addr dst;
|
||||
__u8 ip_protocol;
|
||||
__u16 src_port;
|
||||
__u16 dst_port;
|
||||
};
|
||||
|
||||
struct heimdall_data
|
||||
{
|
||||
__u64 last_seen;
|
||||
__u64 bytes;
|
||||
__u64 packets;
|
||||
__u8 ip_protocol;
|
||||
__u8 tos;
|
||||
__u8 reserved[3];
|
||||
__u32 size;
|
||||
};
|
||||
|
||||
struct
|
||||
{
|
||||
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH);
|
||||
__type(key, struct heimdall_key);
|
||||
__type(value, struct heimdall_data);
|
||||
__uint(max_entries, MAX_FLOWS);
|
||||
__uint(pinning, LIBBPF_PIN_BY_NAME);
|
||||
} heimdall SEC(".maps");
|
||||
|
||||
struct {
|
||||
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
|
||||
__uint(key_size, sizeof(__u32));
|
||||
__uint(value_size, sizeof(__u32));
|
||||
} heimdall_events SEC(".maps");
|
||||
|
||||
static __always_inline __u8 get_heimdall_mode()
|
||||
{
|
||||
__u32 index = 0;
|
||||
@ -91,46 +79,17 @@ static __always_inline bool is_heimdall_watching(struct dissector_t *dissector)
|
||||
|
||||
static __always_inline void update_heimdall(struct dissector_t *dissector, __u32 size, int dir)
|
||||
{
|
||||
__u32 e = 1;
|
||||
if (bpf_perf_event_output(dissector->ctx, &heimdall_events, BPF_F_CURRENT_CPU, &e, sizeof(e)) != 0) {
|
||||
bpf_debug("Failed to send perf event");
|
||||
}
|
||||
|
||||
// Don't report any non-ICMP without ports
|
||||
if (dissector->ip_protocol != 1 && (dissector->src_port == 0 || dissector->dst_port == 0))
|
||||
return;
|
||||
// Don't report ICMP with invalid numbers
|
||||
if (dissector->ip_protocol == 1 && dissector->src_port > 18) return;
|
||||
struct heimdall_key key = {0};
|
||||
key.src = dissector->src_ip;
|
||||
key.dst = dissector->dst_ip;
|
||||
key.ip_protocol = dissector->ip_protocol;
|
||||
key.src_port = bpf_ntohs(dissector->src_port);
|
||||
key.dst_port = bpf_ntohs(dissector->dst_port);
|
||||
struct heimdall_data *counter = (struct heimdall_data *)bpf_map_lookup_elem(&heimdall, &key);
|
||||
if (counter)
|
||||
{
|
||||
counter->last_seen = bpf_ktime_get_boot_ns();
|
||||
counter->packets += 1;
|
||||
counter->bytes += size;
|
||||
if (dissector->tos != 0)
|
||||
{
|
||||
counter->tos = dissector->tos;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
struct heimdall_data counter = {0};
|
||||
counter.last_seen = bpf_ktime_get_boot_ns();
|
||||
counter.bytes = size;
|
||||
counter.packets = 1;
|
||||
counter.tos = dissector->tos;
|
||||
counter.reserved[0] = 0;
|
||||
counter.reserved[1] = 0;
|
||||
counter.reserved[2] = 0;
|
||||
if (bpf_map_update_elem(&heimdall, &key, &counter, BPF_NOEXIST) != 0)
|
||||
{
|
||||
bpf_debug("Failed to insert tracking");
|
||||
}
|
||||
struct heimdall_event event = {0};
|
||||
event.timetamp = bpf_ktime_get_boot_ns();
|
||||
event.src = dissector->src_ip;
|
||||
event.dst = dissector->dst_ip;
|
||||
event.src_port = dissector->src_port;
|
||||
event.dst_port = dissector->dst_port;
|
||||
event.ip_protocol = dissector->ip_protocol;
|
||||
event.tos = dissector->tos;
|
||||
event.size = size;
|
||||
long err = bpf_ringbuf_output(&heimdall_events, &event, sizeof(event), 0);
|
||||
if (err != 0) {
|
||||
bpf_debug("Failed to send perf event %d", err);
|
||||
}
|
||||
}
|
@ -135,12 +135,14 @@ int xdp_prog(struct xdp_md *ctx)
|
||||
|
||||
|
||||
// Send on its way
|
||||
update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction);
|
||||
if (tc_handle != 0) {
|
||||
// Send data to Heimdall
|
||||
if (heimdall_mode == 2 || (heimdall_mode==1 && is_heimdall_watching(&dissector))) {
|
||||
#ifdef VERBOSE
|
||||
bpf_debug("(XDP) Storing Heimdall Data");
|
||||
#endif update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction);
|
||||
bpf_debug("(XDP) Storing Heimdall Data");
|
||||
#endif
|
||||
//update_heimdall(&dissector, ctx->data_end - ctx->data, effective_direction);
|
||||
}
|
||||
|
||||
// Handle CPU redirection if there is one specified
|
||||
|
@ -1,71 +1,9 @@
|
||||
use std::{time::Duration, ffi::c_void};
|
||||
use std::time::Duration;
|
||||
use dashmap::DashMap;
|
||||
use lqos_utils::unix_time::time_since_boot;
|
||||
use lqos_heimdall::{HeimdallMode, HeimdalConfig};
|
||||
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress, bpf_map::BpfMap};
|
||||
|
||||
/// Representation of the eBPF `heimdall_key` type.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
|
||||
#[repr(C)]
|
||||
pub struct HeimdallKey {
|
||||
/// Mapped `XdpIpAddress` source for the flow.
|
||||
pub src_ip: XdpIpAddress,
|
||||
/// Mapped `XdpIpAddress` destination for the flow
|
||||
pub dst_ip: XdpIpAddress,
|
||||
/// IP protocol (see the Linux kernel!)
|
||||
pub ip_protocol: u8,
|
||||
/// Source port number, or ICMP type.
|
||||
pub src_port: u16,
|
||||
/// Destination port number.
|
||||
pub dst_port: u16,
|
||||
}
|
||||
|
||||
/// Mapped representation of the eBPF `heimdall_data` type.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[repr(C)]
|
||||
pub struct HeimdallData {
|
||||
/// Last seen, in nanoseconds (since boot time).
|
||||
pub last_seen: u64,
|
||||
/// Number of bytes since the flow started being tracked
|
||||
pub bytes: u64,
|
||||
/// Number of packets since the flow started being tracked
|
||||
pub packets: u64,
|
||||
/// IP header TOS value
|
||||
pub tos: u8,
|
||||
/// Reserved to pad the structure
|
||||
pub reserved: [u8; 3],
|
||||
}
|
||||
|
||||
/// Iterates through all throughput entries, and sends them in turn to `callback`.
|
||||
/// This elides the need to clone or copy data.
|
||||
pub fn heimdall_for_each(
|
||||
callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]),
|
||||
) {
|
||||
if let Ok(heimdall) = BpfPerCpuMap::<HeimdallKey, HeimdallData>::from_path(
|
||||
"/sys/fs/bpf/heimdall",
|
||||
) {
|
||||
heimdall.for_each(callback);
|
||||
}
|
||||
}
|
||||
|
||||
/// Currently unused, represents the current operation mode of the Heimdall
|
||||
/// sub-system. Defaults to 1.
|
||||
#[repr(u8)]
|
||||
pub enum HeimdallMode {
|
||||
/// Do not monitor
|
||||
Off = 0,
|
||||
/// Only look at flows on hosts we are watching via the circuit monitor
|
||||
WatchOnly = 1,
|
||||
/// Capture everything (this may set your CPU on fire)
|
||||
Analysis = 2,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
#[repr(C)]
|
||||
struct HeimdalConfig {
|
||||
mode: u32,
|
||||
}
|
||||
use crate::{bpf_map::BpfMap};
|
||||
|
||||
/// Change the eBPF Heimdall System mode.
|
||||
pub fn set_heimdall_mode(mode: HeimdallMode) -> anyhow::Result<()> {
|
||||
@ -130,13 +68,3 @@ pub fn heimdall_watch_ip(ip: XdpIpAddress) {
|
||||
HEIMDALL_WATCH_LIST.insert(ip, h);
|
||||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn missed_events(ctx: *mut c_void, cpu: i32, lost_count: u64) {
|
||||
log::warn!("Missed {lost_count} Heimdall events on {cpu}");
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
pub unsafe extern "C" fn handle_events(ctx: *mut c_void, cpu: i32, data: *mut c_void, data_size: u32) {
|
||||
//log::info!("Received a callback on {cpu}");
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
use crate::{bpf_map::BpfMap, XdpIpAddress};
|
||||
use crate::bpf_map::BpfMap;
|
||||
use anyhow::Result;
|
||||
use lqos_bus::TcHandle;
|
||||
use lqos_utils::XdpIpAddress;
|
||||
use std::net::IpAddr;
|
||||
mod ip_hash_data;
|
||||
mod ip_hash_key;
|
||||
|
@ -16,12 +16,10 @@ mod kernel_wrapper;
|
||||
mod lqos_kernel;
|
||||
mod tcp_rtt;
|
||||
mod throughput;
|
||||
mod xdp_ip_address;
|
||||
mod linux;
|
||||
|
||||
pub use heimdall_map::{
|
||||
heimdall_expire, heimdall_for_each, heimdall_watch_ip, set_heimdall_mode,
|
||||
HeimdallData, HeimdallKey, HeimdallMode,
|
||||
heimdall_expire, heimdall_watch_ip, set_heimdall_mode
|
||||
};
|
||||
pub use ip_mapping::{
|
||||
add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips,
|
||||
@ -31,4 +29,3 @@ pub use linux::num_possible_cpus;
|
||||
pub use lqos_kernel::max_tracked_ips;
|
||||
pub use tcp_rtt::{rtt_for_each, RttTrackingEntry};
|
||||
pub use throughput::{throughput_for_each, HostCounter};
|
||||
pub use xdp_ip_address::XdpIpAddress;
|
||||
|
@ -11,8 +11,9 @@ use libbpf_sys::{
|
||||
XDP_FLAGS_UPDATE_IF_NOEXIST,
|
||||
};
|
||||
use log::{info, warn};
|
||||
use lqos_heimdall::perf_interface::heimdall_handle_events;
|
||||
use nix::libc::{geteuid, if_nametoindex};
|
||||
use std::{ffi::{CString, c_void}, process::Command, thread::Thread};
|
||||
use std::{ffi::{CString, c_void}, process::Command};
|
||||
|
||||
pub(crate) mod bpf {
|
||||
#![allow(warnings, unused)]
|
||||
@ -165,15 +166,12 @@ pub fn attach_xdp_and_tc_to_interface(
|
||||
log::error!("Unable to load Heimdall Events FD");
|
||||
return Err(anyhow::Error::msg("Unable to load Heimdall Events FD"));
|
||||
}
|
||||
let opts: *const bpf::perf_buffer_opts = std::ptr::null();
|
||||
let opts: *const bpf::ring_buffer_opts = std::ptr::null();
|
||||
let heimdall_perf_buffer = unsafe {
|
||||
bpf::perf_buffer__new(
|
||||
bpf::ring_buffer__new(
|
||||
heimdall_events_fd,
|
||||
8,
|
||||
Some(crate::heimdall_map::handle_events),
|
||||
Some(crate::heimdall_map::missed_events),
|
||||
opts as *mut c_void,
|
||||
opts)
|
||||
Some(heimdall_handle_events),
|
||||
opts as *mut c_void, opts)
|
||||
};
|
||||
if unsafe { bpf::libbpf_get_error(heimdall_perf_buffer as *mut c_void) != 0 } {
|
||||
log::error!("Failed to create Heimdall event buffer");
|
||||
@ -286,7 +284,7 @@ unsafe fn try_xdp_attach(
|
||||
|
||||
// Handle type used to wrap *mut bpf::perf_buffer and indicate
|
||||
// that it can be moved. Really unsafe code in theory.
|
||||
struct PerfBufferHandle(*mut bpf::perf_buffer);
|
||||
struct PerfBufferHandle(*mut bpf::ring_buffer);
|
||||
unsafe impl Send for PerfBufferHandle {}
|
||||
unsafe impl Sync for PerfBufferHandle {}
|
||||
|
||||
@ -294,7 +292,7 @@ unsafe impl Sync for PerfBufferHandle {}
|
||||
fn poll_perf_events(heimdall_perf_buffer: PerfBufferHandle) {
|
||||
let heimdall_perf_buffer = heimdall_perf_buffer.0;
|
||||
loop {
|
||||
let err = unsafe { bpf::perf_buffer__poll(heimdall_perf_buffer, 100) };
|
||||
let err = unsafe { bpf::ring_buffer__poll(heimdall_perf_buffer, 100) };
|
||||
if err < 0 {
|
||||
log::error!("Error polling perfbuffer");
|
||||
}
|
||||
|
@ -1,4 +1,6 @@
|
||||
use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress};
|
||||
use lqos_utils::XdpIpAddress;
|
||||
|
||||
use crate::{bpf_per_cpu_map::BpfPerCpuMap};
|
||||
|
||||
/// Representation of the XDP map from map_traffic
|
||||
#[repr(C)]
|
||||
|
@ -9,3 +9,5 @@ nix = "0"
|
||||
log = "0"
|
||||
notify = { version = "5.0.0", default-features = false } # Not using crossbeam because of Tokio
|
||||
thiserror = "1"
|
||||
byteorder = "1.4"
|
||||
zerocopy = { version = "0.6.1", features = ["simd"] }
|
||||
|
@ -5,3 +5,6 @@ pub mod hex_string;
|
||||
pub mod packet_scale;
|
||||
mod string_table_enum;
|
||||
pub mod unix_time;
|
||||
mod xdp_ip_address;
|
||||
|
||||
pub use xdp_ip_address::XdpIpAddress;
|
@ -1,11 +1,12 @@
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use zerocopy::FromBytes;
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
|
||||
|
||||
/// XdpIpAddress provides helpful conversion between the XDP program's
|
||||
/// native storage of IP addresses in `[u8; 16]` blocks of bytes and
|
||||
/// Rust `IpAddr` types.
|
||||
#[repr(C)]
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FromBytes)]
|
||||
pub struct XdpIpAddress(pub [u8; 16]);
|
||||
|
||||
impl Default for XdpIpAddress {
|
@ -13,6 +13,7 @@ lqos_config = { path = "../lqos_config" }
|
||||
lqos_sys = { path = "../lqos_sys" }
|
||||
lqos_queue_tracker = { path = "../lqos_queue_tracker" }
|
||||
lqos_utils = { path = "../lqos_utils" }
|
||||
lqos_heimdall = { path = "../lqos_heimdall" }
|
||||
tokio = { version = "1", features = [ "full", "parking_lot" ] }
|
||||
once_cell = "1.17.1"
|
||||
lqos_bus = { path = "../lqos_bus" }
|
||||
|
@ -1,6 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use lqos_bus::{BusResponse, IpMapping, TcHandle};
|
||||
use lqos_sys::XdpIpAddress;
|
||||
use lqos_utils::XdpIpAddress;
|
||||
|
||||
fn expect_ack(result: Result<()>) -> BusResponse {
|
||||
if result.is_ok() {
|
||||
|
@ -16,6 +16,7 @@ use anyhow::Result;
|
||||
use log::{info, warn};
|
||||
use lqos_bus::{BusRequest, BusResponse, UnixSocketServer};
|
||||
use lqos_config::LibreQoSConfig;
|
||||
use lqos_heimdall::HeimdallMode;
|
||||
use lqos_queue_tracker::{
|
||||
add_watched_queue, get_raw_circuit_data, spawn_queue_monitor,
|
||||
spawn_queue_structure_monitor,
|
||||
@ -66,7 +67,7 @@ async fn main() -> Result<()> {
|
||||
} else {
|
||||
LibreQoSKernels::new(&config.internet_interface, &config.isp_interface)?
|
||||
};
|
||||
set_heimdall_mode(lqos_sys::HeimdallMode::WatchOnly)?; // TODO: Set by config
|
||||
set_heimdall_mode(HeimdallMode::WatchOnly)?; // TODO: Set by config
|
||||
|
||||
// Spawn tracking sub-systems
|
||||
join!(
|
||||
|
@ -1,141 +1,5 @@
|
||||
use std::{time::Duration, net::IpAddr, collections::HashSet};
|
||||
use lqos_bus::BusResponse;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use lqos_bus::{BusResponse, FlowTransport, tos_parser};
|
||||
use lqos_sys::{HeimdallData, HeimdallKey, XdpIpAddress, heimdall_watch_ip};
|
||||
use lqos_utils::unix_time::time_since_boot;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::stats::FLOWS_TRACKED;
|
||||
|
||||
pub(crate) static HEIMDALL: Lazy<PalantirMonitor> =
|
||||
Lazy::new(PalantirMonitor::new);
|
||||
|
||||
pub(crate) struct PalantirMonitor {
|
||||
pub(crate) data: DashMap<HeimdallKey, FlowData>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct FlowData {
|
||||
last_seen: u64,
|
||||
bytes: u64,
|
||||
packets: u64,
|
||||
tos: u8,
|
||||
}
|
||||
|
||||
impl PalantirMonitor {
|
||||
fn new() -> Self {
|
||||
Self { data: DashMap::new() }
|
||||
}
|
||||
|
||||
fn combine_flows(values: &[HeimdallData]) -> FlowData {
|
||||
let mut result = FlowData::default();
|
||||
let mut ls = 0;
|
||||
values.iter().for_each(|v| {
|
||||
result.bytes += v.bytes;
|
||||
result.packets += v.packets;
|
||||
result.tos += v.tos;
|
||||
if v.last_seen > ls {
|
||||
ls = v.last_seen;
|
||||
}
|
||||
});
|
||||
result.last_seen = ls;
|
||||
result
|
||||
}
|
||||
|
||||
pub(crate) fn ingest(&self, key: &HeimdallKey, values: &[HeimdallData]) {
|
||||
//println!("{key:?}");
|
||||
//println!("{values:?}");
|
||||
if let Some(expire_ns) = Self::get_expire_time() {
|
||||
let combined = Self::combine_flows(values);
|
||||
if combined.last_seen > expire_ns {
|
||||
if let Some(mut flow) = self.data.get_mut(key) {
|
||||
// Update
|
||||
flow.bytes = combined.bytes;
|
||||
flow.packets = combined.packets;
|
||||
flow.last_seen = combined.last_seen;
|
||||
flow.tos = combined.tos;
|
||||
} else {
|
||||
// Insert
|
||||
self.data.insert(key.clone(), combined);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_expire_time() -> Option<u64> {
|
||||
let boot_time = time_since_boot();
|
||||
if let Ok(boot_time) = boot_time {
|
||||
let time_since_boot = Duration::from(boot_time);
|
||||
let five_minutes_ago =
|
||||
time_since_boot.saturating_sub(Duration::from_secs(30));
|
||||
let expire_ns = five_minutes_ago.as_nanos() as u64;
|
||||
Some(expire_ns)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn expire(&self) {
|
||||
if let Some(expire_ns) = Self::get_expire_time() {
|
||||
self.data.retain(|_k, v| v.last_seen > expire_ns);
|
||||
}
|
||||
FLOWS_TRACKED.store(self.data.len() as u64, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_flow_stats(ip: &str) -> BusResponse {
|
||||
let ip = ip.parse::<IpAddr>();
|
||||
if let Ok(ip) = ip {
|
||||
let ip = XdpIpAddress::from_ip(ip);
|
||||
heimdall_watch_ip(ip);
|
||||
let mut result = Vec::new();
|
||||
|
||||
// Obtain all the flows
|
||||
let mut all_flows = Vec::new();
|
||||
for value in HEIMDALL.data.iter() {
|
||||
let key = value.key();
|
||||
if key.src_ip == ip || key.dst_ip == ip {
|
||||
let (dscp, ecn) = tos_parser(value.tos);
|
||||
all_flows.push(FlowTransport{
|
||||
src: key.src_ip.as_ip().to_string(),
|
||||
dst: key.dst_ip.as_ip().to_string(),
|
||||
src_port: key.src_port,
|
||||
dst_port: key.dst_port,
|
||||
proto: match key.ip_protocol {
|
||||
6 => lqos_bus::FlowProto::TCP,
|
||||
17 => lqos_bus::FlowProto::UDP,
|
||||
_ => lqos_bus::FlowProto::ICMP,
|
||||
},
|
||||
bytes: value.bytes,
|
||||
packets: value.packets,
|
||||
dscp,
|
||||
ecn
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Turn them into reciprocal pairs
|
||||
let mut done = HashSet::new();
|
||||
for (i,flow) in all_flows.iter().enumerate() {
|
||||
if !done.contains(&i) {
|
||||
let flow_a = flow.clone();
|
||||
let flow_b = if let Some(flow_b) = all_flows.iter().position(|f| f.src == flow_a.dst && f.src_port == flow_a.dst_port) {
|
||||
done.insert(flow_b);
|
||||
Some(all_flows[flow_b].clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
result.push((flow_a, flow_b));
|
||||
}
|
||||
}
|
||||
|
||||
result.sort_by(|a,b| {
|
||||
b.0.bytes.cmp(&a.0.bytes)
|
||||
});
|
||||
|
||||
return BusResponse::FlowData(result);
|
||||
}
|
||||
pub fn get_flow_stats(_ip: &str) -> BusResponse {
|
||||
BusResponse::Fail("No Stats or bad IP".to_string())
|
||||
}
|
@ -8,8 +8,8 @@ use crate::{
|
||||
};
|
||||
use log::{info, warn};
|
||||
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
|
||||
use lqos_sys::{XdpIpAddress, heimdall_expire};
|
||||
use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot};
|
||||
use lqos_sys::heimdall_expire;
|
||||
use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot, XdpIpAddress};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::time::Duration;
|
||||
|
||||
@ -31,7 +31,6 @@ pub fn spawn_throughput_monitor() {
|
||||
net_json.zero_throughput_and_rtt();
|
||||
} // Scope to end the lock
|
||||
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
|
||||
THROUGHPUT_TRACKER.pantir_tracking();
|
||||
THROUGHPUT_TRACKER.apply_new_throughput_counters();
|
||||
THROUGHPUT_TRACKER.apply_rtt_data();
|
||||
THROUGHPUT_TRACKER.update_totals();
|
||||
|
@ -3,7 +3,8 @@ use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH
|
||||
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS, heimdall_data::HEIMDALL};
|
||||
use dashmap::DashMap;
|
||||
use lqos_bus::TcHandle;
|
||||
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress, heimdall_for_each};
|
||||
use lqos_sys::{rtt_for_each, throughput_for_each};
|
||||
use lqos_utils::XdpIpAddress;
|
||||
|
||||
pub struct ThroughputTracker {
|
||||
pub(crate) cycle: AtomicU64,
|
||||
@ -101,14 +102,6 @@ impl ThroughputTracker {
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn pantir_tracking(&self) {
|
||||
HEIMDALL.expire();
|
||||
heimdall_for_each(&mut |key, values| {
|
||||
HEIMDALL.ingest(key, values);
|
||||
});
|
||||
//println!("Tracking {} flows", HEIMDALL.data.len());
|
||||
}
|
||||
|
||||
pub(crate) fn apply_new_throughput_counters(
|
||||
&self,
|
||||
) {
|
||||
|
Loading…
Reference in New Issue
Block a user