From fad3e7858a7eb5a04ab0bb2598296c7a7249bea6 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Sat, 11 Mar 2023 14:05:34 +0000 Subject: [PATCH] Minimally working functionality --- src/rust/lqos_bus/src/bus/request.rs | 3 + src/rust/lqos_bus/src/bus/response.rs | 7 ++- src/rust/lqos_bus/src/ip_stats.rs | 17 ++++++ src/rust/lqos_bus/src/lib.rs | 2 +- src/rust/lqos_node_manager/src/main.rs | 1 + src/rust/lqos_node_manager/src/queue_info.rs | 18 +++++- .../static/circuit_queue.html | 45 ++++++++++++++ src/rust/lqos_sys/src/bpf/common/dissector.h | 24 +++++--- src/rust/lqos_sys/src/bpf/common/palantir.h | 11 ++-- src/rust/lqos_sys/src/bpf/lqos_kern.c | 12 +++- src/rust/lqosd/src/main.rs | 4 ++ src/rust/lqosd/src/throughput_tracker/mod.rs | 1 + .../src/throughput_tracker/palantir_data.rs | 61 +++++++++++++++---- 13 files changed, 177 insertions(+), 29 deletions(-) diff --git a/src/rust/lqos_bus/src/bus/request.rs b/src/rust/lqos_bus/src/bus/request.rs index 43839491..cbc9ad16 100644 --- a/src/rust/lqos_bus/src/bus/request.rs +++ b/src/rust/lqos_bus/src/bus/request.rs @@ -133,6 +133,9 @@ pub enum BusRequest { /// Obtain the lqosd statistics GetLqosStats, + /// Tell me flow stats for a given IP address + GetFlowStats(String), + /// If running on Equinix (the `equinix_test` feature is enabled), /// display a "run bandwidht test" link. #[cfg(feature = "equinix_tests")] diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index 86e5618e..e8e0134f 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -1,4 +1,4 @@ -use crate::{IpMapping, IpStats, XdpPpingResult}; +use crate::{IpMapping, IpStats, XdpPpingResult, FlowTransport}; use serde::{Deserialize, Serialize}; use std::net::IpAddr; @@ -85,5 +85,8 @@ pub enum BusResponse { high_watermark: (u64, u64), /// Number of flows tracked tracked_flows: u64, - } + }, + + /// Flow Data + FlowData(Vec), } diff --git a/src/rust/lqos_bus/src/ip_stats.rs b/src/rust/lqos_bus/src/ip_stats.rs index b2efa5df..1127c58d 100644 --- a/src/rust/lqos_bus/src/ip_stats.rs +++ b/src/rust/lqos_bus/src/ip_stats.rs @@ -66,3 +66,20 @@ pub struct XdpPpingResult { /// derived. If 0, the other values are invalid. pub samples: u32, } + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub enum FlowProto { + TCP, UDP, ICMP +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct FlowTransport { + pub src: String, + pub dst: String, + pub proto: FlowProto, + pub src_port: u16, + pub dst_port: u16, + pub bytes: u64, + pub packets: u64, + pub tos: u8, +} \ No newline at end of file diff --git a/src/rust/lqos_bus/src/lib.rs b/src/rust/lqos_bus/src/lib.rs index ecd0b468..31ae488f 100644 --- a/src/rust/lqos_bus/src/lib.rs +++ b/src/rust/lqos_bus/src/lib.rs @@ -12,7 +12,7 @@ #![warn(missing_docs)] mod bus; mod ip_stats; -pub use ip_stats::{IpMapping, IpStats, XdpPpingResult}; +pub use ip_stats::{IpMapping, IpStats, XdpPpingResult, FlowProto, FlowTransport}; mod tc_handle; pub use bus::{ bus_request, decode_request, decode_response, encode_request, diff --git a/src/rust/lqos_node_manager/src/main.rs b/src/rust/lqos_node_manager/src/main.rs index a54deeae..840effa9 100644 --- a/src/rust/lqos_node_manager/src/main.rs +++ b/src/rust/lqos_node_manager/src/main.rs @@ -67,6 +67,7 @@ fn rocket() -> _ { queue_info::circuit_info, queue_info::current_circuit_throughput, queue_info::watch_circuit, + queue_info::flow_stats, config_control::get_nic_list, config_control::get_current_python_config, config_control::get_current_lqosd_config, diff --git a/src/rust/lqos_node_manager/src/queue_info.rs b/src/rust/lqos_node_manager/src/queue_info.rs index 5047f1be..3c4867d1 100644 --- a/src/rust/lqos_node_manager/src/queue_info.rs +++ b/src/rust/lqos_node_manager/src/queue_info.rs @@ -1,7 +1,7 @@ use crate::auth_guard::AuthGuard; use crate::cache_control::NoCache; use crate::tracker::SHAPED_DEVICES; -use lqos_bus::{bus_request, BusRequest, BusResponse}; +use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport}; use rocket::response::content::RawJson; use rocket::serde::json::Json; use rocket::serde::Serialize; @@ -99,6 +99,22 @@ pub async fn raw_queue_by_circuit( NoCache::new(RawJson(result)) } +#[get("/api/flows/")] +pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache>> { + let mut result = Vec::new(); + let request: Vec = ip_list.split(",").map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect(); + let responses = bus_request(request).await.unwrap(); + for r in responses.iter() { + if let BusResponse::FlowData(flow) = r { + result.extend_from_slice(flow); + } + } + + + + NoCache::new(Json(result)) +} + #[cfg(feature = "equinix_tests")] #[get("/api/run_btest")] pub async fn run_btest() -> NoCache> { diff --git a/src/rust/lqos_node_manager/static/circuit_queue.html b/src/rust/lqos_node_manager/static/circuit_queue.html index ccc409ea..f76fab2a 100644 --- a/src/rust/lqos_node_manager/static/circuit_queue.html +++ b/src/rust/lqos_node_manager/static/circuit_queue.html @@ -68,6 +68,9 @@ +
@@ -184,6 +187,19 @@
+ +
+
+
+
+
+
Flows (Last 30 Seconds)
+
+
+
+
+
+
@@ -364,6 +380,8 @@ setTimeout(pollQueue, 1000); } + let ips = []; + function getThroughput() { const params = new Proxy(new URLSearchParams(window.location.search), { get: (searchParams, prop) => searchParams.get(prop), @@ -376,8 +394,10 @@ [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100], ]; + ips = []; for (let i=0; i { + let html = ""; + html += ""; + for (let i=0; i"; + html += ""; + html += ""; + html += ""; + html += ""; + html += ""; + } + html += "
ProtocolFlowPacketsBytes
" + data[i].src + ":" + data[i].src_port + "▶️" + data[i].dst + ":" + data[i].dst_port + "" + data[i].packets + "" + scaleNumber(data[i].bytes) + "" + data[i].tos + "
"; + $("#flowList").html(html); + }) + setTimeout(getFlows, 1000); + } + function start() { colorReloadButton(); updateHostCounts(); @@ -520,6 +564,7 @@ pollQueue(); getThroughput(); getFunnel(params.id); + getFlows(); }); } diff --git a/src/rust/lqos_sys/src/bpf/common/dissector.h b/src/rust/lqos_sys/src/bpf/common/dissector.h index 99b2aaf1..c451fe14 100644 --- a/src/rust/lqos_sys/src/bpf/common/dissector.h +++ b/src/rust/lqos_sys/src/bpf/common/dissector.h @@ -276,7 +276,9 @@ static __always_inline struct tcphdr *get_tcp_header(struct dissector_t *dissect if (dissector->eth_type == ETH_P_IP) { return (struct tcphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4)); - } else if (dissector->eth_type == ETH_P_IPV6) { + } + else if (dissector->eth_type == ETH_P_IPV6) + { return (struct tcphdr *)(dissector->ip_header.ip6h + 1); } return NULL; @@ -287,17 +289,22 @@ static __always_inline struct udphdr *get_udp_header(struct dissector_t *dissect if (dissector->eth_type == ETH_P_IP) { return (struct udphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4)); - } else if (dissector->eth_type == ETH_P_IPV6) { + } + else if (dissector->eth_type == ETH_P_IPV6) + { return (struct udphdr *)(dissector->ip_header.ip6h + 1); } return NULL; } -static __always_inline struct icmphdr * get_icmp_header(struct dissector_t * dissector) { +static __always_inline struct icmphdr *get_icmp_header(struct dissector_t *dissector) +{ if (dissector->eth_type == ETH_P_IP) { return (struct icmphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4)); - } else if (dissector->eth_type == ETH_P_IPV6) { + } + else if (dissector->eth_type == ETH_P_IPV6) + { return (struct icmphdr *)(dissector->ip_header.ip6h + 1); } return NULL; @@ -312,7 +319,7 @@ static __always_inline void snoop(struct dissector_t *dissector) struct tcphdr *hdr = get_tcp_header(dissector); if (hdr != NULL) { - if (hdr + sizeof(struct tcphdr) > dissector->end) + if (hdr + 1 > dissector->end) { return; } @@ -326,7 +333,7 @@ static __always_inline void snoop(struct dissector_t *dissector) struct udphdr *hdr = get_udp_header(dissector); if (hdr != NULL) { - if (hdr + sizeof(struct udphdr) > dissector->end) + if (hdr + 1 > dissector->end) { return; } @@ -339,14 +346,15 @@ static __always_inline void snoop(struct dissector_t *dissector) struct icmphdr *hdr = get_icmp_header(dissector); if (hdr != NULL) { - if (hdr + sizeof(struct icmphdr) > dissector->end) + if (hdr + 1 > dissector->end) { return; } dissector->src_port = hdr->type; dissector->dst_port = hdr->code; } - } break; + } + break; } } diff --git a/src/rust/lqos_sys/src/bpf/common/palantir.h b/src/rust/lqos_sys/src/bpf/common/palantir.h index 3ced32aa..49339551 100644 --- a/src/rust/lqos_sys/src/bpf/common/palantir.h +++ b/src/rust/lqos_sys/src/bpf/common/palantir.h @@ -32,19 +32,22 @@ struct __uint(pinning, LIBBPF_PIN_BY_NAME); } palantir SEC(".maps"); -static __always_inline void update_palantir(struct dissector_t * dissector, __u32 size) { +static __always_inline void update_palantir(struct dissector_t * dissector, __u32 size, int dir) { + if (dissector->src_port == 0 || dissector->dst_port == 0) return; struct palantir_key key = {0}; key.src = dissector->src_ip; key.dst = dissector->dst_ip; key.ip_protocol = dissector->ip_protocol; - key.src_port = dissector->src_port; - key.dst_port = dissector->dst_port; + key.src_port = bpf_ntohs(dissector->src_port); + key.dst_port = bpf_ntohs(dissector->dst_port); struct palantir_data * counter = (struct palantir_data *)bpf_map_lookup_elem(&palantir, &key); if (counter) { counter->last_seen = bpf_ktime_get_boot_ns(); counter->packets += 1; counter->bytes += size; - counter->tos = dissector->tos; + if (dissector->tos != 0) { + counter->tos = dissector->tos; + } } else { struct palantir_data counter = {0}; counter.last_seen = bpf_ktime_get_boot_ns(); diff --git a/src/rust/lqos_sys/src/bpf/lqos_kern.c b/src/rust/lqos_sys/src/bpf/lqos_kern.c index 0ca71d1c..26efa7e0 100644 --- a/src/rust/lqos_sys/src/bpf/lqos_kern.c +++ b/src/rust/lqos_sys/src/bpf/lqos_kern.c @@ -19,6 +19,8 @@ #include "common/bifrost.h" #include "common/palantir.h" +//#define VERBOSE 1 + /* Theory of operation: 1. (Packet arrives at interface) 2. XDP (ingress) starts @@ -73,7 +75,7 @@ int xdp_prog(struct xdp_md *ctx) &bifrost_interface_map, &my_interface ); - if (redirect_info) { + if (redirect_info && redirect_info->scan_vlans) { // If we have a redirect, mark it - the dissector will // apply it vlan_redirect = true; @@ -128,10 +130,16 @@ int xdp_prog(struct xdp_md *ctx) ctx->data_end - ctx->data, // end - data = length tc_handle ); - update_palantir(&dissector, ctx->data_end - ctx->data); + // Send on its way if (tc_handle != 0) { + // Send data to Palantir +#ifdef VERBOSE + bpf_debug("(XDP) Storing Palantir Data"); +#endif + update_palantir(&dissector, ctx->data_end - ctx->data, effective_direction); + // Handle CPU redirection if there is one specified __u32 *cpu_lookup; cpu_lookup = bpf_map_lookup_elem(&cpus_available, &cpu); diff --git a/src/rust/lqosd/src/main.rs b/src/rust/lqosd/src/main.rs index 672f5855..823ca7a1 100644 --- a/src/rust/lqosd/src/main.rs +++ b/src/rust/lqosd/src/main.rs @@ -26,6 +26,7 @@ use signal_hook::{ iterator::Signals, }; use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED}; +use throughput_tracker::get_flow_stats; use tokio::join; mod stats; @@ -189,6 +190,9 @@ fn handle_bus_requests( tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed), } } + BusRequest::GetFlowStats(ip) => { + get_flow_stats(ip) + } }); } } diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index c657ec66..0ce5da72 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -1,6 +1,7 @@ mod throughput_entry; mod tracking_data; mod palantir_data; +pub use palantir_data::get_flow_stats; use crate::{ shaped_devices_tracker::NETWORK_JSON, throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS, diff --git a/src/rust/lqosd/src/throughput_tracker/palantir_data.rs b/src/rust/lqosd/src/throughput_tracker/palantir_data.rs index 03606f7f..57ec7550 100644 --- a/src/rust/lqosd/src/throughput_tracker/palantir_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/palantir_data.rs @@ -1,7 +1,8 @@ -use std::time::Duration; +use std::{time::Duration, net::IpAddr}; use dashmap::DashMap; -use lqos_sys::{PalantirData, PalantirKey}; +use lqos_bus::{BusResponse, FlowTransport}; +use lqos_sys::{PalantirData, PalantirKey, XdpIpAddress}; use lqos_utils::unix_time::time_since_boot; use once_cell::sync::Lazy; @@ -19,6 +20,7 @@ pub(crate) struct FlowData { last_seen: u64, bytes: u64, packets: u64, + tos: u8, } impl PalantirMonitor { @@ -32,6 +34,7 @@ impl PalantirMonitor { values.iter().for_each(|v| { result.bytes += v.bytes; result.packets += v.packets; + result.tos += v.tos; if v.last_seen > ls { ls = v.last_seen; } @@ -41,14 +44,19 @@ impl PalantirMonitor { } pub(crate) fn ingest(&self, key: &PalantirKey, values: &[PalantirData]) { - if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() { + //println!("{key:?}"); + //println!("{values:?}"); + if let Some(expire_ns) = Self::get_expire_time() { let combined = Self::combine_flows(values); - if combined.last_seen > five_minutes_ago_nanoseconds { + if combined.last_seen > expire_ns { if let Some(mut flow) = self.data.get_mut(key) { // Update - flow.bytes += combined.bytes; - flow.packets += combined.packets; + flow.bytes = combined.bytes; + flow.packets = combined.packets; flow.last_seen = combined.last_seen; + if combined.tos != 0 { + flow.tos = combined.tos; + } } else { // Insert self.data.insert(key.clone(), combined); @@ -62,18 +70,49 @@ impl PalantirMonitor { if let Ok(boot_time) = boot_time { let time_since_boot = Duration::from(boot_time); let five_minutes_ago = - time_since_boot.saturating_sub(Duration::from_secs(300)); - let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos() as u64; - Some(five_minutes_ago_nanoseconds) + time_since_boot.saturating_sub(Duration::from_secs(30)); + let expire_ns = five_minutes_ago.as_nanos() as u64; + Some(expire_ns) } else { None } } pub(crate) fn expire(&self) { - if let Some(five_minutes_ago_nanoseconds) = Self::get_expire_time() { - self.data.retain(|_k, v| v.last_seen > five_minutes_ago_nanoseconds); + if let Some(expire_ns) = Self::get_expire_time() { + self.data.retain(|_k, v| v.last_seen > expire_ns); } FLOWS_TRACKED.store(self.data.len() as u64, std::sync::atomic::Ordering::Relaxed); } } + +pub fn get_flow_stats(ip: &str) -> BusResponse { + let ip = ip.parse::(); + if let Ok(ip) = ip { + let ip = XdpIpAddress::from_ip(ip); + let mut result = Vec::new(); + + for value in PALANTIR.data.iter() { + let key = value.key(); + if key.src_ip == ip || key.dst_ip == ip { + result.push(FlowTransport{ + src: key.src_ip.as_ip().to_string(), + dst: key.dst_ip.as_ip().to_string(), + src_port: key.src_port, + dst_port: key.dst_port, + proto: match key.ip_protocol { + 6 => lqos_bus::FlowProto::TCP, + 17 => lqos_bus::FlowProto::UDP, + _ => lqos_bus::FlowProto::ICMP, + }, + bytes: value.bytes, + packets: value.packets, + tos: value.tos, + }); + } + } + + return BusResponse::FlowData(result); + } + BusResponse::Fail("No Stats or bad IP".to_string()) +} \ No newline at end of file