From aecc5f0d95d5d5d5043fc15ade3a3e418e2d4a25 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Fri, 10 Mar 2023 21:33:01 +0000 Subject: [PATCH] First play with the idea of storing stats about each TCP/UDP/ICMP flow as it traverses the network. --- src/rust/lqos_bus/src/bus/response.rs | 2 + .../lqos_node_manager/src/config_control.rs | 6 +- src/rust/lqos_sys/src/bpf/common/dissector.h | 233 +++++++++++++----- src/rust/lqos_sys/src/bpf/common/palantir.h | 61 +++++ src/rust/lqos_sys/src/bpf/lqos_kern.c | 2 + src/rust/lqos_sys/src/lib.rs | 2 + src/rust/lqos_sys/src/lqos_kernel.rs | 1 + src/rust/lqos_sys/src/palantir_map.rs | 33 +++ src/rust/lqosd/src/main.rs | 5 +- src/rust/lqosd/src/stats.rs | 1 + src/rust/lqosd/src/throughput_tracker/mod.rs | 2 + .../src/throughput_tracker/palantir_data.rs | 79 ++++++ .../src/throughput_tracker/tracking_data.rs | 12 +- src/rust/remove_pinned_maps.sh | 1 + 14 files changed, 372 insertions(+), 68 deletions(-) create mode 100644 src/rust/lqos_sys/src/bpf/common/palantir.h create mode 100644 src/rust/lqos_sys/src/palantir_map.rs create mode 100644 src/rust/lqosd/src/throughput_tracker/palantir_data.rs diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index 88710cd8..86e5618e 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -83,5 +83,7 @@ pub enum BusResponse { time_to_poll_hosts: u64, /// High traffic watermark high_watermark: (u64, u64), + /// Number of flows tracked + tracked_flows: u64, } } diff --git a/src/rust/lqos_node_manager/src/config_control.rs b/src/rust/lqos_node_manager/src/config_control.rs index f3aedcc5..24602a09 100644 --- a/src/rust/lqos_node_manager/src/config_control.rs +++ b/src/rust/lqos_node_manager/src/config_control.rs @@ -83,16 +83,18 @@ pub struct LqosStats { pub bus_requests_since_start: u64, pub time_to_poll_hosts_us: u64, pub high_watermark: (u64, u64), + pub tracked_flows: u64, } #[get("/api/stats")] pub async fn stats() -> NoCache> { for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() { - if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark } = msg { + if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark, tracked_flows } = msg { return NoCache::new(Json(LqosStats { bus_requests_since_start: bus_requests, time_to_poll_hosts_us: time_to_poll_hosts, - high_watermark + high_watermark, + tracked_flows, })); } } diff --git a/src/rust/lqos_sys/src/bpf/common/dissector.h b/src/rust/lqos_sys/src/bpf/common/dissector.h index 1f607052..99b2aaf1 100644 --- a/src/rust/lqos_sys/src/bpf/common/dissector.h +++ b/src/rust/lqos_sys/src/bpf/common/dissector.h @@ -9,6 +9,11 @@ #include "../common/debug.h" #include "../common/ip_hash.h" #include "../common/bifrost.h" +#include +#include +#include +#include +#include // Packet dissector for XDP. We don't have any help from Linux at this // point. @@ -37,6 +42,11 @@ struct dissector_t // Current VLAN tag. If there are multiple tags, it will be // the INNER tag. __be16 current_vlan; + // IP protocol from __UAPI_DEF_IN_IPPROTO + __u8 ip_protocol; + __u16 src_port; + __u16 dst_port; + __u8 tos; }; // Representation of the VLAN header type. @@ -63,18 +73,19 @@ struct pppoe_proto #define PPP_IPV6 0x57 // Representation of an MPLS label -struct mpls_label { - __be32 entry; +struct mpls_label +{ + __be32 entry; }; -#define MPLS_LS_LABEL_MASK 0xFFFFF000 -#define MPLS_LS_LABEL_SHIFT 12 -#define MPLS_LS_TC_MASK 0x00000E00 -#define MPLS_LS_TC_SHIFT 9 -#define MPLS_LS_S_MASK 0x00000100 -#define MPLS_LS_S_SHIFT 8 -#define MPLS_LS_TTL_MASK 0x000000FF -#define MPLS_LS_TTL_SHIFT 0 +#define MPLS_LS_LABEL_MASK 0xFFFFF000 +#define MPLS_LS_LABEL_SHIFT 12 +#define MPLS_LS_TC_MASK 0x00000E00 +#define MPLS_LS_TC_SHIFT 9 +#define MPLS_LS_S_MASK 0x00000100 +#define MPLS_LS_S_SHIFT 8 +#define MPLS_LS_TTL_MASK 0x000000FF +#define MPLS_LS_TTL_SHIFT 0 // Constructor for a dissector // Connects XDP/TC SKB structure to a dissector structure. @@ -84,9 +95,9 @@ struct mpls_label { // // Returns TRUE if all is good, FALSE if the process cannot be completed static __always_inline bool dissector_new( - struct xdp_md *ctx, - struct dissector_t *dissector -) { + struct xdp_md *ctx, + struct dissector_t *dissector) +{ dissector->ctx = ctx; dissector->start = (void *)(long)ctx->data; dissector->end = (void *)(long)ctx->data_end; @@ -94,6 +105,10 @@ static __always_inline bool dissector_new( dissector->l3offset = 0; dissector->skb_len = dissector->end - dissector->start; dissector->current_vlan = 0; + dissector->ip_protocol = 0; + dissector->src_port = 0; + dissector->dst_port = 0; + dissector->tos = 0; // Check that there's room for an ethernet header if SKB_OVERFLOW (dissector->start, dissector->end, ethhdr) @@ -114,9 +129,9 @@ static __always_inline bool is_ip(__u16 eth_type) // Locates the layer-3 offset, if present. Fast returns for various // common non-IP types. Will perform VLAN redirection if requested. static __always_inline bool dissector_find_l3_offset( - struct dissector_t *dissector, - bool vlan_redirect -) { + struct dissector_t *dissector, + bool vlan_redirect) +{ if (dissector->ethernet_header == NULL) { bpf_debug("Ethernet header is NULL, still called offset check."); @@ -149,35 +164,34 @@ static __always_inline bool dissector_find_l3_offset( case ETH_P_8021AD: case ETH_P_8021Q: { - if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, - offset, vlan_hdr) + if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, + offset, vlan_hdr) { return false; } - struct vlan_hdr *vlan = (struct vlan_hdr *) - (dissector->start + offset); + struct vlan_hdr *vlan = (struct vlan_hdr *)(dissector->start + offset); dissector->current_vlan = vlan->h_vlan_TCI; eth_type = bpf_ntohs(vlan->h_vlan_encapsulated_proto); offset += sizeof(struct vlan_hdr); // VLAN Redirection is requested, so lookup a detination and // switch the VLAN tag if required - if (vlan_redirect) { - #ifdef VERBOSE - bpf_debug("Searching for redirect %u:%u", - dissector->ctx->ingress_ifindex, - bpf_ntohs(dissector->current_vlan) - ); - #endif - __u32 key = (dissector->ctx->ingress_ifindex << 16) | - bpf_ntohs(dissector->current_vlan); - struct bifrost_vlan * vlan_info = NULL; + if (vlan_redirect) + { +#ifdef VERBOSE + bpf_debug("Searching for redirect %u:%u", + dissector->ctx->ingress_ifindex, + bpf_ntohs(dissector->current_vlan)); +#endif + __u32 key = (dissector->ctx->ingress_ifindex << 16) | + bpf_ntohs(dissector->current_vlan); + struct bifrost_vlan *vlan_info = NULL; vlan_info = bpf_map_lookup_elem(&bifrost_vlan_map, &key); - if (vlan_info) { - #ifdef VERBOSE - bpf_debug("Redirect to VLAN %u", - bpf_htons(vlan_info->redirect_to) - ); - #endif + if (vlan_info) + { +#ifdef VERBOSE + bpf_debug("Redirect to VLAN %u", + bpf_htons(vlan_info->redirect_to)); +#endif vlan->h_vlan_TCI = bpf_htons(vlan_info->redirect_to); } } @@ -187,13 +201,12 @@ static __always_inline bool dissector_find_l3_offset( // Handle PPPoE case ETH_P_PPP_SES: { - if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, - offset, pppoe_proto) + if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, + offset, pppoe_proto) { return false; } - struct pppoe_proto *pppoe = (struct pppoe_proto *) - (dissector->start + offset); + struct pppoe_proto *pppoe = (struct pppoe_proto *)(dissector->start + offset); __u16 proto = bpf_ntohs(pppoe->proto); switch (proto) { @@ -212,31 +225,39 @@ static __always_inline bool dissector_find_l3_offset( // WARNING/TODO: Here be dragons; this needs testing. case ETH_P_MPLS_UC: - case ETH_P_MPLS_MC: { - if SKB_OVERFLOW_OFFSET(dissector->start, dissector-> end, - offset, mpls_label) + case ETH_P_MPLS_MC: + { + if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, + offset, mpls_label) { return false; } - struct mpls_label * mpls = (struct mpls_label *) - (dissector->start + offset); + struct mpls_label *mpls = (struct mpls_label *)(dissector->start + offset); // Are we at the bottom of the stack? offset += 4; // 32-bits - if (mpls->entry & MPLS_LS_S_MASK) { + if (mpls->entry & MPLS_LS_S_MASK) + { // We've hit the bottom - if SKB_OVERFLOW_OFFSET(dissector->start, dissector->end, - offset, iphdr) + if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, + offset, iphdr) { return false; } - struct iphdr * iph = (struct iphdr *)(dissector->start + offset); - switch (iph->version) { - case 4: eth_type = ETH_P_IP; break; - case 6: eth_type = ETH_P_IPV6; break; - default: return false; + struct iphdr *iph = (struct iphdr *)(dissector->start + offset); + switch (iph->version) + { + case 4: + eth_type = ETH_P_IP; + break; + case 6: + eth_type = ETH_P_IPV6; + break; + default: + return false; } } - } break; + } + break; // We found something we don't know how to handle - bail out default: @@ -250,37 +271,125 @@ static __always_inline bool dissector_find_l3_offset( return true; } +static __always_inline struct tcphdr *get_tcp_header(struct dissector_t *dissector) +{ + if (dissector->eth_type == ETH_P_IP) + { + return (struct tcphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4)); + } else if (dissector->eth_type == ETH_P_IPV6) { + return (struct tcphdr *)(dissector->ip_header.ip6h + 1); + } + return NULL; +} + +static __always_inline struct udphdr *get_udp_header(struct dissector_t *dissector) +{ + if (dissector->eth_type == ETH_P_IP) + { + return (struct udphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4)); + } else if (dissector->eth_type == ETH_P_IPV6) { + return (struct udphdr *)(dissector->ip_header.ip6h + 1); + } + return NULL; +} + +static __always_inline struct icmphdr * get_icmp_header(struct dissector_t * dissector) { + if (dissector->eth_type == ETH_P_IP) + { + return (struct icmphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4)); + } else if (dissector->eth_type == ETH_P_IPV6) { + return (struct icmphdr *)(dissector->ip_header.ip6h + 1); + } + return NULL; +} + +static __always_inline void snoop(struct dissector_t *dissector) +{ + switch (dissector->ip_protocol) + { + case IPPROTO_TCP: + { + struct tcphdr *hdr = get_tcp_header(dissector); + if (hdr != NULL) + { + if (hdr + sizeof(struct tcphdr) > dissector->end) + { + return; + } + dissector->src_port = hdr->source; + dissector->dst_port = hdr->dest; + } + } + break; + case IPPROTO_UDP: + { + struct udphdr *hdr = get_udp_header(dissector); + if (hdr != NULL) + { + if (hdr + sizeof(struct udphdr) > dissector->end) + { + return; + } + dissector->src_port = hdr->source; + dissector->dst_port = hdr->dest; + } + } + case IPPROTO_ICMP: + { + struct icmphdr *hdr = get_icmp_header(dissector); + if (hdr != NULL) + { + if (hdr + sizeof(struct icmphdr) > dissector->end) + { + return; + } + dissector->src_port = hdr->type; + dissector->dst_port = hdr->code; + } + } break; + } +} + // Searches for an IP header. static __always_inline bool dissector_find_ip_header( - struct dissector_t *dissector -) { + struct dissector_t *dissector) +{ switch (dissector->eth_type) { case ETH_P_IP: { - if (dissector->start + dissector->l3offset + sizeof(struct iphdr) > - dissector->end) { - return false; + if (dissector->start + dissector->l3offset + sizeof(struct iphdr) > + dissector->end) + { + return false; } dissector->ip_header.iph = dissector->start + dissector->l3offset; if (dissector->ip_header.iph + 1 > dissector->end) return false; encode_ipv4(dissector->ip_header.iph->saddr, &dissector->src_ip); encode_ipv4(dissector->ip_header.iph->daddr, &dissector->dst_ip); + dissector->ip_protocol = dissector->ip_header.iph->protocol; + dissector->tos = dissector->ip_header.iph->tos; + snoop(dissector); return true; } break; case ETH_P_IPV6: { - if (dissector->start + dissector->l3offset + - sizeof(struct ipv6hdr) > dissector->end) { - return false; + if (dissector->start + dissector->l3offset + + sizeof(struct ipv6hdr) > + dissector->end) + { + return false; } dissector->ip_header.ip6h = dissector->start + dissector->l3offset; if (dissector->ip_header.iph + 1 > dissector->end) return false; encode_ipv6(&dissector->ip_header.ip6h->saddr, &dissector->src_ip); encode_ipv6(&dissector->ip_header.ip6h->daddr, &dissector->dst_ip); + dissector->ip_protocol = dissector->ip_header.ip6h->nexthdr; + dissector->ip_header.ip6h->flow_lbl[0]; // Is this right? + snoop(dissector); return true; } break; diff --git a/src/rust/lqos_sys/src/bpf/common/palantir.h b/src/rust/lqos_sys/src/bpf/common/palantir.h new file mode 100644 index 00000000..3ced32aa --- /dev/null +++ b/src/rust/lqos_sys/src/bpf/common/palantir.h @@ -0,0 +1,61 @@ +#include +#include +#include +#include +#include +#include "maximums.h" +#include "debug.h" +#include "dissector.h" + +struct palantir_key { + struct in6_addr src; + struct in6_addr dst; + __u8 ip_protocol; + __u16 src_port; + __u16 dst_port; +}; + +struct palantir_data { + __u64 last_seen; + __u64 bytes; + __u64 packets; + __u8 tos; + __u8 reserved[3]; +}; + +struct +{ + __uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH); + __type(key, struct palantir_key); + __type(value, struct palantir_data); + __uint(max_entries, MAX_FLOWS); + __uint(pinning, LIBBPF_PIN_BY_NAME); +} palantir SEC(".maps"); + +static __always_inline void update_palantir(struct dissector_t * dissector, __u32 size) { + struct palantir_key key = {0}; + key.src = dissector->src_ip; + key.dst = dissector->dst_ip; + key.ip_protocol = dissector->ip_protocol; + key.src_port = dissector->src_port; + key.dst_port = dissector->dst_port; + struct palantir_data * counter = (struct palantir_data *)bpf_map_lookup_elem(&palantir, &key); + if (counter) { + counter->last_seen = bpf_ktime_get_boot_ns(); + counter->packets += 1; + counter->bytes += size; + counter->tos = dissector->tos; + } else { + struct palantir_data counter = {0}; + counter.last_seen = bpf_ktime_get_boot_ns(); + counter.bytes = size; + counter.packets = 1; + counter.tos = dissector->tos; + counter.reserved[0] = 0; + counter.reserved[1] = 0; + counter.reserved[2] = 0; + if (bpf_map_update_elem(&palantir, &key, &counter, BPF_NOEXIST) != 0) { + bpf_debug("Failed to insert tracking"); + } + } +} \ No newline at end of file diff --git a/src/rust/lqos_sys/src/bpf/lqos_kern.c b/src/rust/lqos_sys/src/bpf/lqos_kern.c index 6654d48e..0ca71d1c 100644 --- a/src/rust/lqos_sys/src/bpf/lqos_kern.c +++ b/src/rust/lqos_sys/src/bpf/lqos_kern.c @@ -17,6 +17,7 @@ #include "common/cpu_map.h" #include "common/tcp_rtt.h" #include "common/bifrost.h" +#include "common/palantir.h" /* Theory of operation: 1. (Packet arrives at interface) @@ -127,6 +128,7 @@ int xdp_prog(struct xdp_md *ctx) ctx->data_end - ctx->data, // end - data = length tc_handle ); + update_palantir(&dissector, ctx->data_end - ctx->data); // Send on its way if (tc_handle != 0) { diff --git a/src/rust/lqos_sys/src/lib.rs b/src/rust/lqos_sys/src/lib.rs index a241b398..2b149781 100644 --- a/src/rust/lqos_sys/src/lib.rs +++ b/src/rust/lqos_sys/src/lib.rs @@ -11,6 +11,7 @@ mod bpf_map; mod bpf_per_cpu_map; mod cpu_map; mod ip_mapping; +mod palantir_map; mod kernel_wrapper; mod lqos_kernel; mod tcp_rtt; @@ -26,3 +27,4 @@ pub use lqos_kernel::max_tracked_ips; pub use tcp_rtt::{rtt_for_each, RttTrackingEntry}; pub use throughput::{throughput_for_each, HostCounter}; pub use xdp_ip_address::XdpIpAddress; +pub use palantir_map::{palantir_for_each, PalantirKey, PalantirData}; diff --git a/src/rust/lqos_sys/src/lqos_kernel.rs b/src/rust/lqos_sys/src/lqos_kernel.rs index 5dcb27a2..12b5f1ae 100644 --- a/src/rust/lqos_sys/src/lqos_kernel.rs +++ b/src/rust/lqos_sys/src/lqos_kernel.rs @@ -74,6 +74,7 @@ pub fn unload_xdp_from_interface(interface_name: &str) -> Result<()> { fn set_strict_mode() -> Result<()> { let err = unsafe { libbpf_set_strict_mode(LIBBPF_STRICT_ALL) }; + #[cfg(release)] unsafe { bpf::do_not_print(); } diff --git a/src/rust/lqos_sys/src/palantir_map.rs b/src/rust/lqos_sys/src/palantir_map.rs new file mode 100644 index 00000000..4c295298 --- /dev/null +++ b/src/rust/lqos_sys/src/palantir_map.rs @@ -0,0 +1,33 @@ +use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress}; + +#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)] +#[repr(C)] +pub struct PalantirKey { + pub src_ip: XdpIpAddress, + pub dst_ip: XdpIpAddress, + pub ip_protocol: u8, + pub src_port: u16, + pub dst_port: u16, +} + +#[derive(Debug, Clone, Default)] +#[repr(C)] +pub struct PalantirData { + pub last_seen: u64, + pub bytes: u64, + pub packets: u64, + pub tos: u8, + pub reserved: [u8; 3], +} + +/// Iterates through all throughput entries, and sends them in turn to `callback`. +/// This elides the need to clone or copy data. +pub fn palantir_for_each( + callback: &mut dyn FnMut(&PalantirKey, &[PalantirData]), +) { + if let Ok(palantir) = BpfPerCpuMap::::from_path( + "/sys/fs/bpf/palantir", + ) { + palantir.for_each(callback); + } +} diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index d9d36825..672f5855 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -25,7 +25,7 @@ use signal_hook::{ consts::{SIGHUP, SIGINT, SIGTERM}, iterator::Signals, }; -use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}; +use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED}; use tokio::join; mod stats; @@ -185,7 +185,8 @@ fn handle_bus_requests( high_watermark: ( HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), - ) + ), + tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed), } } }); diff --git a/src/rust/lqosd/src/stats.rs b/src/rust/lqosd/src/stats.rs index 41fd0516..e3763b7c 100644 --- a/src/rust/lqosd/src/stats.rs +++ b/src/rust/lqosd/src/stats.rs @@ -4,3 +4,4 @@ pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0); pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0); pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0); pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0); +pub static FLOWS_TRACKED: AtomicU64 = AtomicU64::new(0); diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index aef0db2b..c657ec66 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -1,5 +1,6 @@ mod throughput_entry; mod tracking_data; +mod palantir_data; use crate::{ shaped_devices_tracker::NETWORK_JSON, throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS, @@ -29,6 +30,7 @@ pub fn spawn_throughput_monitor() { net_json.zero_throughput_and_rtt(); } // Scope to end the lock THROUGHPUT_TRACKER.copy_previous_and_reset_rtt(); + THROUGHPUT_TRACKER.pantir_tracking(); THROUGHPUT_TRACKER.apply_new_throughput_counters(); THROUGHPUT_TRACKER.apply_rtt_data(); THROUGHPUT_TRACKER.update_totals(); diff --git a/src/rust/lqosd/src/throughput_tracker/palantir_data.rs b/src/rust/lqosd/src/throughput_tracker/palantir_data.rs new file mode 100644 index 00000000..03606f7f --- /dev/null +++ b/src/rust/lqosd/src/throughput_tracker/palantir_data.rs @@ -0,0 +1,79 @@ +use std::time::Duration; + +use dashmap::DashMap; +use lqos_sys::{PalantirData, PalantirKey}; +use lqos_utils::unix_time::time_since_boot; +use once_cell::sync::Lazy; + +use crate::stats::FLOWS_TRACKED; + +pub(crate) static PALANTIR: Lazy = + Lazy::new(PalantirMonitor::new); + +pub(crate) struct PalantirMonitor { + pub(crate) data: DashMap, +} + +#[derive(Default)] +pub(crate) struct FlowData { + last_seen: u64, + bytes: u64, + packets: u64, +} + +impl PalantirMonitor { + fn new() -> Self { + Self { data: DashMap::new() } + } + + fn combine_flows(values: &[PalantirData]) -> FlowData { + let mut result = FlowData::default(); + let mut ls = 0; + values.iter().for_each(|v| { + result.bytes += v.bytes; + result.packets += v.packets; + if v.last_seen > ls { + ls = v.last_seen; + } + }); + result.last_seen = ls; + result + } + + pub(crate) fn ingest(&self, key: &PalantirKey, values: &[PalantirData]) { + if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() { + let combined = Self::combine_flows(values); + if combined.last_seen > five_minutes_ago_nanoseconds { + if let Some(mut flow) = self.data.get_mut(key) { + // Update + flow.bytes += combined.bytes; + flow.packets += combined.packets; + flow.last_seen = combined.last_seen; + } else { + // Insert + self.data.insert(key.clone(), combined); + } + } + } + } + + fn get_expire_time() -> Option { + let boot_time = time_since_boot(); + if let Ok(boot_time) = boot_time { + let time_since_boot = Duration::from(boot_time); + let five_minutes_ago = + time_since_boot.saturating_sub(Duration::from_secs(300)); + let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos() as u64; + Some(five_minutes_ago_nanoseconds) + } else { + None + } + } + + pub(crate) fn expire(&self) { + if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() { + self.data.retain(|_k, v| v.last_seen > five_minutes_ago_nanoseconds); + } + FLOWS_TRACKED.store(self.data.len() as u64, std::sync::atomic::Ordering::Relaxed); + } +} diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 8f81e7b4..ca5b335a 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -1,9 +1,9 @@ use std::sync::atomic::AtomicU64; use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}}; -use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; +use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS, palantir_data::PALANTIR}; use dashmap::DashMap; use lqos_bus::TcHandle; -use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress}; +use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress, palantir_for_each}; pub struct ThroughputTracker { pub(crate) cycle: AtomicU64, @@ -101,6 +101,14 @@ impl ThroughputTracker { }); } + pub(crate) fn pantir_tracking(&self) { + PALANTIR.expire(); + palantir_for_each(&mut |key, values| { + PALANTIR.ingest(key, values); + }); + //println!("Tracking {} flows", PALANTIR.data.len()); + } + pub(crate) fn apply_new_throughput_counters( &self, ) { diff --git a/src/rust/remove_pinned_maps.sh b/src/rust/remove_pinned_maps.sh index 3c349f0c..0bd26cf2 100755 --- a/src/rust/remove_pinned_maps.sh +++ b/src/rust/remove_pinned_maps.sh @@ -10,4 +10,5 @@ rm -v /sys/fs/bpf/map_ip_to_cpu_and_tc_recip rm -v /sys/fs/bpf/map_txq_config rm -v /sys/fs/bpf/bifrost_interface_map rm -v /sys/fs/bpf/bifrost_vlan_map +rm -v /sys/fs/bpf/palantir