First play with the idea of storing stats about each TCP/UDP/ICMP flow as it traverses the network.

This commit is contained in:
Herbert Wolverson 2023-03-10 21:33:01 +00:00
parent 9639ceeb6b
commit aecc5f0d95
14 changed files with 372 additions and 68 deletions

View File

@ -83,5 +83,7 @@ pub enum BusResponse {
time_to_poll_hosts: u64, time_to_poll_hosts: u64,
/// High traffic watermark /// High traffic watermark
high_watermark: (u64, u64), high_watermark: (u64, u64),
/// Number of flows tracked
tracked_flows: u64,
} }
} }

View File

@ -83,16 +83,18 @@ pub struct LqosStats {
pub bus_requests_since_start: u64, pub bus_requests_since_start: u64,
pub time_to_poll_hosts_us: u64, pub time_to_poll_hosts_us: u64,
pub high_watermark: (u64, u64), pub high_watermark: (u64, u64),
pub tracked_flows: u64,
} }
#[get("/api/stats")] #[get("/api/stats")]
pub async fn stats() -> NoCache<Json<LqosStats>> { pub async fn stats() -> NoCache<Json<LqosStats>> {
for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() { for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark } = msg { if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark, tracked_flows } = msg {
return NoCache::new(Json(LqosStats { return NoCache::new(Json(LqosStats {
bus_requests_since_start: bus_requests, bus_requests_since_start: bus_requests,
time_to_poll_hosts_us: time_to_poll_hosts, time_to_poll_hosts_us: time_to_poll_hosts,
high_watermark high_watermark,
tracked_flows,
})); }));
} }
} }

View File

@ -9,6 +9,11 @@
#include "../common/debug.h" #include "../common/debug.h"
#include "../common/ip_hash.h" #include "../common/ip_hash.h"
#include "../common/bifrost.h" #include "../common/bifrost.h"
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/tcp.h>
#include <linux/udp.h>
#include <linux/icmp.h>
// Packet dissector for XDP. We don't have any help from Linux at this // Packet dissector for XDP. We don't have any help from Linux at this
// point. // point.
@ -37,6 +42,11 @@ struct dissector_t
// Current VLAN tag. If there are multiple tags, it will be // Current VLAN tag. If there are multiple tags, it will be
// the INNER tag. // the INNER tag.
__be16 current_vlan; __be16 current_vlan;
// IP protocol from __UAPI_DEF_IN_IPPROTO
__u8 ip_protocol;
__u16 src_port;
__u16 dst_port;
__u8 tos;
}; };
// Representation of the VLAN header type. // Representation of the VLAN header type.
@ -63,18 +73,19 @@ struct pppoe_proto
#define PPP_IPV6 0x57 #define PPP_IPV6 0x57
// Representation of an MPLS label // Representation of an MPLS label
struct mpls_label { struct mpls_label
__be32 entry; {
__be32 entry;
}; };
#define MPLS_LS_LABEL_MASK 0xFFFFF000 #define MPLS_LS_LABEL_MASK 0xFFFFF000
#define MPLS_LS_LABEL_SHIFT 12 #define MPLS_LS_LABEL_SHIFT 12
#define MPLS_LS_TC_MASK 0x00000E00 #define MPLS_LS_TC_MASK 0x00000E00
#define MPLS_LS_TC_SHIFT 9 #define MPLS_LS_TC_SHIFT 9
#define MPLS_LS_S_MASK 0x00000100 #define MPLS_LS_S_MASK 0x00000100
#define MPLS_LS_S_SHIFT 8 #define MPLS_LS_S_SHIFT 8
#define MPLS_LS_TTL_MASK 0x000000FF #define MPLS_LS_TTL_MASK 0x000000FF
#define MPLS_LS_TTL_SHIFT 0 #define MPLS_LS_TTL_SHIFT 0
// Constructor for a dissector // Constructor for a dissector
// Connects XDP/TC SKB structure to a dissector structure. // Connects XDP/TC SKB structure to a dissector structure.
@ -84,9 +95,9 @@ struct mpls_label {
// //
// Returns TRUE if all is good, FALSE if the process cannot be completed // Returns TRUE if all is good, FALSE if the process cannot be completed
static __always_inline bool dissector_new( static __always_inline bool dissector_new(
struct xdp_md *ctx, struct xdp_md *ctx,
struct dissector_t *dissector struct dissector_t *dissector)
) { {
dissector->ctx = ctx; dissector->ctx = ctx;
dissector->start = (void *)(long)ctx->data; dissector->start = (void *)(long)ctx->data;
dissector->end = (void *)(long)ctx->data_end; dissector->end = (void *)(long)ctx->data_end;
@ -94,6 +105,10 @@ static __always_inline bool dissector_new(
dissector->l3offset = 0; dissector->l3offset = 0;
dissector->skb_len = dissector->end - dissector->start; dissector->skb_len = dissector->end - dissector->start;
dissector->current_vlan = 0; dissector->current_vlan = 0;
dissector->ip_protocol = 0;
dissector->src_port = 0;
dissector->dst_port = 0;
dissector->tos = 0;
// Check that there's room for an ethernet header // Check that there's room for an ethernet header
if SKB_OVERFLOW (dissector->start, dissector->end, ethhdr) if SKB_OVERFLOW (dissector->start, dissector->end, ethhdr)
@ -114,9 +129,9 @@ static __always_inline bool is_ip(__u16 eth_type)
// Locates the layer-3 offset, if present. Fast returns for various // Locates the layer-3 offset, if present. Fast returns for various
// common non-IP types. Will perform VLAN redirection if requested. // common non-IP types. Will perform VLAN redirection if requested.
static __always_inline bool dissector_find_l3_offset( static __always_inline bool dissector_find_l3_offset(
struct dissector_t *dissector, struct dissector_t *dissector,
bool vlan_redirect bool vlan_redirect)
) { {
if (dissector->ethernet_header == NULL) if (dissector->ethernet_header == NULL)
{ {
bpf_debug("Ethernet header is NULL, still called offset check."); bpf_debug("Ethernet header is NULL, still called offset check.");
@ -149,35 +164,34 @@ static __always_inline bool dissector_find_l3_offset(
case ETH_P_8021AD: case ETH_P_8021AD:
case ETH_P_8021Q: case ETH_P_8021Q:
{ {
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, vlan_hdr) offset, vlan_hdr)
{ {
return false; return false;
} }
struct vlan_hdr *vlan = (struct vlan_hdr *) struct vlan_hdr *vlan = (struct vlan_hdr *)(dissector->start + offset);
(dissector->start + offset);
dissector->current_vlan = vlan->h_vlan_TCI; dissector->current_vlan = vlan->h_vlan_TCI;
eth_type = bpf_ntohs(vlan->h_vlan_encapsulated_proto); eth_type = bpf_ntohs(vlan->h_vlan_encapsulated_proto);
offset += sizeof(struct vlan_hdr); offset += sizeof(struct vlan_hdr);
// VLAN Redirection is requested, so lookup a detination and // VLAN Redirection is requested, so lookup a detination and
// switch the VLAN tag if required // switch the VLAN tag if required
if (vlan_redirect) { if (vlan_redirect)
#ifdef VERBOSE {
bpf_debug("Searching for redirect %u:%u", #ifdef VERBOSE
dissector->ctx->ingress_ifindex, bpf_debug("Searching for redirect %u:%u",
bpf_ntohs(dissector->current_vlan) dissector->ctx->ingress_ifindex,
); bpf_ntohs(dissector->current_vlan));
#endif #endif
__u32 key = (dissector->ctx->ingress_ifindex << 16) | __u32 key = (dissector->ctx->ingress_ifindex << 16) |
bpf_ntohs(dissector->current_vlan); bpf_ntohs(dissector->current_vlan);
struct bifrost_vlan * vlan_info = NULL; struct bifrost_vlan *vlan_info = NULL;
vlan_info = bpf_map_lookup_elem(&bifrost_vlan_map, &key); vlan_info = bpf_map_lookup_elem(&bifrost_vlan_map, &key);
if (vlan_info) { if (vlan_info)
#ifdef VERBOSE {
bpf_debug("Redirect to VLAN %u", #ifdef VERBOSE
bpf_htons(vlan_info->redirect_to) bpf_debug("Redirect to VLAN %u",
); bpf_htons(vlan_info->redirect_to));
#endif #endif
vlan->h_vlan_TCI = bpf_htons(vlan_info->redirect_to); vlan->h_vlan_TCI = bpf_htons(vlan_info->redirect_to);
} }
} }
@ -187,13 +201,12 @@ static __always_inline bool dissector_find_l3_offset(
// Handle PPPoE // Handle PPPoE
case ETH_P_PPP_SES: case ETH_P_PPP_SES:
{ {
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end, if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, pppoe_proto) offset, pppoe_proto)
{ {
return false; return false;
} }
struct pppoe_proto *pppoe = (struct pppoe_proto *) struct pppoe_proto *pppoe = (struct pppoe_proto *)(dissector->start + offset);
(dissector->start + offset);
__u16 proto = bpf_ntohs(pppoe->proto); __u16 proto = bpf_ntohs(pppoe->proto);
switch (proto) switch (proto)
{ {
@ -212,31 +225,39 @@ static __always_inline bool dissector_find_l3_offset(
// WARNING/TODO: Here be dragons; this needs testing. // WARNING/TODO: Here be dragons; this needs testing.
case ETH_P_MPLS_UC: case ETH_P_MPLS_UC:
case ETH_P_MPLS_MC: { case ETH_P_MPLS_MC:
if SKB_OVERFLOW_OFFSET(dissector->start, dissector-> end, {
offset, mpls_label) if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, mpls_label)
{ {
return false; return false;
} }
struct mpls_label * mpls = (struct mpls_label *) struct mpls_label *mpls = (struct mpls_label *)(dissector->start + offset);
(dissector->start + offset);
// Are we at the bottom of the stack? // Are we at the bottom of the stack?
offset += 4; // 32-bits offset += 4; // 32-bits
if (mpls->entry & MPLS_LS_S_MASK) { if (mpls->entry & MPLS_LS_S_MASK)
{
// We've hit the bottom // We've hit the bottom
if SKB_OVERFLOW_OFFSET(dissector->start, dissector->end, if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, iphdr) offset, iphdr)
{ {
return false; return false;
} }
struct iphdr * iph = (struct iphdr *)(dissector->start + offset); struct iphdr *iph = (struct iphdr *)(dissector->start + offset);
switch (iph->version) { switch (iph->version)
case 4: eth_type = ETH_P_IP; break; {
case 6: eth_type = ETH_P_IPV6; break; case 4:
default: return false; eth_type = ETH_P_IP;
break;
case 6:
eth_type = ETH_P_IPV6;
break;
default:
return false;
} }
} }
} break; }
break;
// We found something we don't know how to handle - bail out // We found something we don't know how to handle - bail out
default: default:
@ -250,37 +271,125 @@ static __always_inline bool dissector_find_l3_offset(
return true; return true;
} }
static __always_inline struct tcphdr *get_tcp_header(struct dissector_t *dissector)
{
if (dissector->eth_type == ETH_P_IP)
{
return (struct tcphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
} else if (dissector->eth_type == ETH_P_IPV6) {
return (struct tcphdr *)(dissector->ip_header.ip6h + 1);
}
return NULL;
}
static __always_inline struct udphdr *get_udp_header(struct dissector_t *dissector)
{
if (dissector->eth_type == ETH_P_IP)
{
return (struct udphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
} else if (dissector->eth_type == ETH_P_IPV6) {
return (struct udphdr *)(dissector->ip_header.ip6h + 1);
}
return NULL;
}
static __always_inline struct icmphdr * get_icmp_header(struct dissector_t * dissector) {
if (dissector->eth_type == ETH_P_IP)
{
return (struct icmphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
} else if (dissector->eth_type == ETH_P_IPV6) {
return (struct icmphdr *)(dissector->ip_header.ip6h + 1);
}
return NULL;
}
static __always_inline void snoop(struct dissector_t *dissector)
{
switch (dissector->ip_protocol)
{
case IPPROTO_TCP:
{
struct tcphdr *hdr = get_tcp_header(dissector);
if (hdr != NULL)
{
if (hdr + sizeof(struct tcphdr) > dissector->end)
{
return;
}
dissector->src_port = hdr->source;
dissector->dst_port = hdr->dest;
}
}
break;
case IPPROTO_UDP:
{
struct udphdr *hdr = get_udp_header(dissector);
if (hdr != NULL)
{
if (hdr + sizeof(struct udphdr) > dissector->end)
{
return;
}
dissector->src_port = hdr->source;
dissector->dst_port = hdr->dest;
}
}
case IPPROTO_ICMP:
{
struct icmphdr *hdr = get_icmp_header(dissector);
if (hdr != NULL)
{
if (hdr + sizeof(struct icmphdr) > dissector->end)
{
return;
}
dissector->src_port = hdr->type;
dissector->dst_port = hdr->code;
}
} break;
}
}
// Searches for an IP header. // Searches for an IP header.
static __always_inline bool dissector_find_ip_header( static __always_inline bool dissector_find_ip_header(
struct dissector_t *dissector struct dissector_t *dissector)
) { {
switch (dissector->eth_type) switch (dissector->eth_type)
{ {
case ETH_P_IP: case ETH_P_IP:
{ {
if (dissector->start + dissector->l3offset + sizeof(struct iphdr) > if (dissector->start + dissector->l3offset + sizeof(struct iphdr) >
dissector->end) { dissector->end)
return false; {
return false;
} }
dissector->ip_header.iph = dissector->start + dissector->l3offset; dissector->ip_header.iph = dissector->start + dissector->l3offset;
if (dissector->ip_header.iph + 1 > dissector->end) if (dissector->ip_header.iph + 1 > dissector->end)
return false; return false;
encode_ipv4(dissector->ip_header.iph->saddr, &dissector->src_ip); encode_ipv4(dissector->ip_header.iph->saddr, &dissector->src_ip);
encode_ipv4(dissector->ip_header.iph->daddr, &dissector->dst_ip); encode_ipv4(dissector->ip_header.iph->daddr, &dissector->dst_ip);
dissector->ip_protocol = dissector->ip_header.iph->protocol;
dissector->tos = dissector->ip_header.iph->tos;
snoop(dissector);
return true; return true;
} }
break; break;
case ETH_P_IPV6: case ETH_P_IPV6:
{ {
if (dissector->start + dissector->l3offset + if (dissector->start + dissector->l3offset +
sizeof(struct ipv6hdr) > dissector->end) { sizeof(struct ipv6hdr) >
return false; dissector->end)
{
return false;
} }
dissector->ip_header.ip6h = dissector->start + dissector->l3offset; dissector->ip_header.ip6h = dissector->start + dissector->l3offset;
if (dissector->ip_header.iph + 1 > dissector->end) if (dissector->ip_header.iph + 1 > dissector->end)
return false; return false;
encode_ipv6(&dissector->ip_header.ip6h->saddr, &dissector->src_ip); encode_ipv6(&dissector->ip_header.ip6h->saddr, &dissector->src_ip);
encode_ipv6(&dissector->ip_header.ip6h->daddr, &dissector->dst_ip); encode_ipv6(&dissector->ip_header.ip6h->daddr, &dissector->dst_ip);
dissector->ip_protocol = dissector->ip_header.ip6h->nexthdr;
dissector->ip_header.ip6h->flow_lbl[0]; // Is this right?
snoop(dissector);
return true; return true;
} }
break; break;

View File

@ -0,0 +1,61 @@
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
#include <linux/if_ether.h>
#include <stdbool.h>
#include "maximums.h"
#include "debug.h"
#include "dissector.h"
struct palantir_key {
struct in6_addr src;
struct in6_addr dst;
__u8 ip_protocol;
__u16 src_port;
__u16 dst_port;
};
struct palantir_data {
__u64 last_seen;
__u64 bytes;
__u64 packets;
__u8 tos;
__u8 reserved[3];
};
struct
{
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH);
__type(key, struct palantir_key);
__type(value, struct palantir_data);
__uint(max_entries, MAX_FLOWS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} palantir SEC(".maps");
static __always_inline void update_palantir(struct dissector_t * dissector, __u32 size) {
struct palantir_key key = {0};
key.src = dissector->src_ip;
key.dst = dissector->dst_ip;
key.ip_protocol = dissector->ip_protocol;
key.src_port = dissector->src_port;
key.dst_port = dissector->dst_port;
struct palantir_data * counter = (struct palantir_data *)bpf_map_lookup_elem(&palantir, &key);
if (counter) {
counter->last_seen = bpf_ktime_get_boot_ns();
counter->packets += 1;
counter->bytes += size;
counter->tos = dissector->tos;
} else {
struct palantir_data counter = {0};
counter.last_seen = bpf_ktime_get_boot_ns();
counter.bytes = size;
counter.packets = 1;
counter.tos = dissector->tos;
counter.reserved[0] = 0;
counter.reserved[1] = 0;
counter.reserved[2] = 0;
if (bpf_map_update_elem(&palantir, &key, &counter, BPF_NOEXIST) != 0) {
bpf_debug("Failed to insert tracking");
}
}
}

View File

@ -17,6 +17,7 @@
#include "common/cpu_map.h" #include "common/cpu_map.h"
#include "common/tcp_rtt.h" #include "common/tcp_rtt.h"
#include "common/bifrost.h" #include "common/bifrost.h"
#include "common/palantir.h"
/* Theory of operation: /* Theory of operation:
1. (Packet arrives at interface) 1. (Packet arrives at interface)
@ -127,6 +128,7 @@ int xdp_prog(struct xdp_md *ctx)
ctx->data_end - ctx->data, // end - data = length ctx->data_end - ctx->data, // end - data = length
tc_handle tc_handle
); );
update_palantir(&dissector, ctx->data_end - ctx->data);
// Send on its way // Send on its way
if (tc_handle != 0) { if (tc_handle != 0) {

View File

@ -11,6 +11,7 @@ mod bpf_map;
mod bpf_per_cpu_map; mod bpf_per_cpu_map;
mod cpu_map; mod cpu_map;
mod ip_mapping; mod ip_mapping;
mod palantir_map;
mod kernel_wrapper; mod kernel_wrapper;
mod lqos_kernel; mod lqos_kernel;
mod tcp_rtt; mod tcp_rtt;
@ -26,3 +27,4 @@ pub use lqos_kernel::max_tracked_ips;
pub use tcp_rtt::{rtt_for_each, RttTrackingEntry}; pub use tcp_rtt::{rtt_for_each, RttTrackingEntry};
pub use throughput::{throughput_for_each, HostCounter}; pub use throughput::{throughput_for_each, HostCounter};
pub use xdp_ip_address::XdpIpAddress; pub use xdp_ip_address::XdpIpAddress;
pub use palantir_map::{palantir_for_each, PalantirKey, PalantirData};

View File

@ -74,6 +74,7 @@ pub fn unload_xdp_from_interface(interface_name: &str) -> Result<()> {
fn set_strict_mode() -> Result<()> { fn set_strict_mode() -> Result<()> {
let err = unsafe { libbpf_set_strict_mode(LIBBPF_STRICT_ALL) }; let err = unsafe { libbpf_set_strict_mode(LIBBPF_STRICT_ALL) };
#[cfg(release)]
unsafe { unsafe {
bpf::do_not_print(); bpf::do_not_print();
} }

View File

@ -0,0 +1,33 @@
use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress};
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
#[repr(C)]
pub struct PalantirKey {
pub src_ip: XdpIpAddress,
pub dst_ip: XdpIpAddress,
pub ip_protocol: u8,
pub src_port: u16,
pub dst_port: u16,
}
#[derive(Debug, Clone, Default)]
#[repr(C)]
pub struct PalantirData {
pub last_seen: u64,
pub bytes: u64,
pub packets: u64,
pub tos: u8,
pub reserved: [u8; 3],
}
/// Iterates through all throughput entries, and sends them in turn to `callback`.
/// This elides the need to clone or copy data.
pub fn palantir_for_each(
callback: &mut dyn FnMut(&PalantirKey, &[PalantirData]),
) {
if let Ok(palantir) = BpfPerCpuMap::<PalantirKey, PalantirData>::from_path(
"/sys/fs/bpf/palantir",
) {
palantir.for_each(callback);
}
}

View File

@ -25,7 +25,7 @@ use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM}, consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals, iterator::Signals,
}; };
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}; use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED};
use tokio::join; use tokio::join;
mod stats; mod stats;
@ -185,7 +185,8 @@ fn handle_bus_requests(
high_watermark: ( high_watermark: (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed), HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
) ),
tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed),
} }
} }
}); });

View File

@ -4,3 +4,4 @@ pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0);
pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0); pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0); pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0); pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0);
pub static FLOWS_TRACKED: AtomicU64 = AtomicU64::new(0);

View File

@ -1,5 +1,6 @@
mod throughput_entry; mod throughput_entry;
mod tracking_data; mod tracking_data;
mod palantir_data;
use crate::{ use crate::{
shaped_devices_tracker::NETWORK_JSON, shaped_devices_tracker::NETWORK_JSON,
throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS, throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS,
@ -29,6 +30,7 @@ pub fn spawn_throughput_monitor() {
net_json.zero_throughput_and_rtt(); net_json.zero_throughput_and_rtt();
} // Scope to end the lock } // Scope to end the lock
THROUGHPUT_TRACKER.copy_previous_and_reset_rtt(); THROUGHPUT_TRACKER.copy_previous_and_reset_rtt();
THROUGHPUT_TRACKER.pantir_tracking();
THROUGHPUT_TRACKER.apply_new_throughput_counters(); THROUGHPUT_TRACKER.apply_new_throughput_counters();
THROUGHPUT_TRACKER.apply_rtt_data(); THROUGHPUT_TRACKER.apply_rtt_data();
THROUGHPUT_TRACKER.update_totals(); THROUGHPUT_TRACKER.update_totals();

View File

@ -0,0 +1,79 @@
use std::time::Duration;
use dashmap::DashMap;
use lqos_sys::{PalantirData, PalantirKey};
use lqos_utils::unix_time::time_since_boot;
use once_cell::sync::Lazy;
use crate::stats::FLOWS_TRACKED;
pub(crate) static PALANTIR: Lazy<PalantirMonitor> =
Lazy::new(PalantirMonitor::new);
pub(crate) struct PalantirMonitor {
pub(crate) data: DashMap<PalantirKey, FlowData>,
}
#[derive(Default)]
pub(crate) struct FlowData {
last_seen: u64,
bytes: u64,
packets: u64,
}
impl PalantirMonitor {
fn new() -> Self {
Self { data: DashMap::new() }
}
fn combine_flows(values: &[PalantirData]) -> FlowData {
let mut result = FlowData::default();
let mut ls = 0;
values.iter().for_each(|v| {
result.bytes += v.bytes;
result.packets += v.packets;
if v.last_seen > ls {
ls = v.last_seen;
}
});
result.last_seen = ls;
result
}
pub(crate) fn ingest(&self, key: &PalantirKey, values: &[PalantirData]) {
if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() {
let combined = Self::combine_flows(values);
if combined.last_seen > five_minutes_ago_nanoseconds {
if let Some(mut flow) = self.data.get_mut(key) {
// Update
flow.bytes += combined.bytes;
flow.packets += combined.packets;
flow.last_seen = combined.last_seen;
} else {
// Insert
self.data.insert(key.clone(), combined);
}
}
}
}
fn get_expire_time() -> Option<u64> {
let boot_time = time_since_boot();
if let Ok(boot_time) = boot_time {
let time_since_boot = Duration::from(boot_time);
let five_minutes_ago =
time_since_boot.saturating_sub(Duration::from_secs(300));
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos() as u64;
Some(five_minutes_ago_nanoseconds)
} else {
None
}
}
pub(crate) fn expire(&self) {
if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() {
self.data.retain(|_k, v| v.last_seen > five_minutes_ago_nanoseconds);
}
FLOWS_TRACKED.store(self.data.len() as u64, std::sync::atomic::Ordering::Relaxed);
}
}

View File

@ -1,9 +1,9 @@
use std::sync::atomic::AtomicU64; use std::sync::atomic::AtomicU64;
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}}; use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS, palantir_data::PALANTIR};
use dashmap::DashMap; use dashmap::DashMap;
use lqos_bus::TcHandle; use lqos_bus::TcHandle;
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress}; use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress, palantir_for_each};
pub struct ThroughputTracker { pub struct ThroughputTracker {
pub(crate) cycle: AtomicU64, pub(crate) cycle: AtomicU64,
@ -101,6 +101,14 @@ impl ThroughputTracker {
}); });
} }
pub(crate) fn pantir_tracking(&self) {
PALANTIR.expire();
palantir_for_each(&mut |key, values| {
PALANTIR.ingest(key, values);
});
//println!("Tracking {} flows", PALANTIR.data.len());
}
pub(crate) fn apply_new_throughput_counters( pub(crate) fn apply_new_throughput_counters(
&self, &self,
) { ) {

View File

@ -10,4 +10,5 @@ rm -v /sys/fs/bpf/map_ip_to_cpu_and_tc_recip
rm -v /sys/fs/bpf/map_txq_config rm -v /sys/fs/bpf/map_txq_config
rm -v /sys/fs/bpf/bifrost_interface_map rm -v /sys/fs/bpf/bifrost_interface_map
rm -v /sys/fs/bpf/bifrost_vlan_map rm -v /sys/fs/bpf/bifrost_vlan_map
rm -v /sys/fs/bpf/palantir