Remove most of the Heimdall mode 1 path, cleaning up the execution path now that we have global flow tracking.

This commit is contained in:
Herbert Wolverson
2024-03-11 13:27:42 -05:00
parent 91a48bc275
commit 445cdcda81
16 changed files with 10 additions and 401 deletions

View File

@@ -133,10 +133,6 @@ pub enum BusRequest {
/// Obtain the lqosd statistics
GetLqosStats,
/// Tell me flow stats for a given IP address
#[deprecated(note = "Heimdall flows are being migrated to flows 2")]
GetFlowStats(String),
/// Tell Heimdall to hyper-focus on an IP address for a bit
GatherPacketData(String),

View File

@@ -1,6 +1,6 @@
use super::QueueStoreTransit;
use crate::{
ip_stats::{FlowbeeData, PacketHeader}, FlowTransport, IpMapping, IpStats, XdpPpingResult,
ip_stats::{FlowbeeData, PacketHeader}, IpMapping, IpStats, XdpPpingResult,
};
use lts_client::transport_data::{StatsTotals, StatsHost, StatsTreeNode};
use serde::{Deserialize, Serialize};
@@ -91,10 +91,6 @@ pub enum BusResponse {
tracked_flows: u64,
},
/// Flow Data
#[deprecated(note = "Being replaced by FlowbeeData")]
FlowData(Vec<(FlowTransport, Option<FlowTransport>)>),
/// The index of the new packet collection session
PacketCollectionSession {
/// The identifier of the capture session

View File

@@ -67,41 +67,6 @@ pub struct XdpPpingResult {
pub samples: u32,
}
/// Defines an IP protocol for display in the flow
/// tracking (Heimdall) system.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum FlowProto {
/// A TCP flow
TCP,
/// A UDP flow
UDP,
/// An ICMP flow
ICMP
}
/// Defines the display data for a flow in Heimdall.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct FlowTransport {
/// The Source IP address
pub src: String,
/// The Destination IP address
pub dst: String,
/// The flow protocol (see `FlowProto`)
pub proto: FlowProto,
/// The source port, which is overridden to ICMP code on ICMP flows.
pub src_port: u16,
/// The destination port, which isn't useful at all on ICMP flows.
pub dst_port: u16,
/// The number of bytes since we started tracking this flow.
pub bytes: u64,
/// The number of packets since we started tracking this flow.
pub packets: u64,
/// Detected DSCP code if any
pub dscp: u8,
/// Detected ECN bit status (0-3)
pub ecn: u8,
}
/// Extract the 6-bit DSCP and 2-bit ECN code from a TOS field
/// in an IP header.
pub fn tos_parser(tos: u8) -> (u8, u8) {

View File

@@ -13,7 +13,7 @@
mod bus;
mod ip_stats;
pub use ip_stats::{
tos_parser, FlowProto, FlowTransport, IpMapping, IpStats, PacketHeader,
tos_parser, IpMapping, IpStats, PacketHeader,
XdpPpingResult, FlowbeeData, FlowbeeProtocol
};
mod tc_handle;

View File

@@ -1,165 +0,0 @@
use crate::{timeline::expire_timeline, FLOW_EXPIRE_SECS};
use dashmap::DashMap;
use lqos_bus::{tos_parser, BusResponse, FlowTransport};
use lqos_sys::heimdall_data::{HeimdallKey, HeimdallData};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::{collections::HashSet, time::Duration};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct FlowKey {
src: XdpIpAddress,
dst: XdpIpAddress,
proto: u8,
src_port: u16,
dst_port: u16,
}
#[derive(Clone, Debug, Default)]
struct FlowData {
last_seen: u64,
bytes: u64,
packets: u64,
tos: u8,
}
impl From<&HeimdallKey> for FlowKey {
fn from(value: &HeimdallKey) -> Self {
Self {
src: value.src_ip,
dst: value.dst_ip,
proto: value.ip_protocol,
src_port: value.src_port,
dst_port: value.dst_port,
}
}
}
static FLOW_DATA: Lazy<DashMap<FlowKey, FlowData>> = Lazy::new(DashMap::new);
/*pub(crate) fn record_flow(event: &HeimdallEvent) {
let key: FlowKey = event.into();
if let Some(mut data) = FLOW_DATA.get_mut(&key) {
data.last_seen = event.timestamp;
data.packets += 1;
data.bytes += event.size as u64;
data.tos = event.tos;
} else {
FLOW_DATA.insert(
key,
FlowData {
last_seen: event.timestamp,
bytes: event.size.into(),
packets: 1,
tos: event.tos,
},
);
}
}*/
/// Iterates through all throughput entries, and sends them in turn to `callback`.
/// This elides the need to clone or copy data.
fn heimdall_for_each(
callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]),
) {
/*if let Ok(heimdall) = BpfPerCpuMap::<HeimdallKey, HeimdallData>::from_path(
"/sys/fs/bpf/heimdall",
) {
heimdall.for_each(callback);
}*/
lqos_sys::iterate_heimdall(callback);
}
fn combine_flows(values: &[HeimdallData]) -> FlowData {
let mut result = FlowData::default();
let mut ls = 0;
values.iter().for_each(|v| {
result.bytes += v.bytes;
result.packets += v.packets;
result.tos += v.tos;
if v.last_seen > ls {
ls = v.last_seen;
}
});
result.last_seen = ls;
result
}
pub fn read_flows() {
heimdall_for_each(&mut |key, value| {
let flow_key = key.into();
let combined = combine_flows(value);
if let Some(mut flow) = FLOW_DATA.get_mut(&flow_key) {
flow.last_seen = combined.last_seen;
flow.bytes = combined.bytes;
flow.packets = combined.packets;
flow.tos = combined.tos;
} else {
FLOW_DATA.insert(flow_key, combined);
}
});
}
/// Expire flows that have not been seen in a while.
pub fn expire_heimdall_flows() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(FLOW_EXPIRE_SECS)).as_nanos() as u64;
FLOW_DATA.retain(|_k, v| v.last_seen > expire);
expire_timeline();
}
}
/// Get the flow stats for a given IP address.
pub fn get_flow_stats(ip: XdpIpAddress) -> BusResponse {
let mut result = Vec::new();
// Obtain all the flows
let mut all_flows = Vec::new();
for value in FLOW_DATA.iter() {
let key = value.key();
if key.src == ip || key.dst == ip {
let (dscp, ecn) = tos_parser(value.tos);
all_flows.push(FlowTransport {
src: key.src.as_ip().to_string(),
dst: key.dst.as_ip().to_string(),
src_port: key.src_port,
dst_port: key.dst_port,
proto: match key.proto {
6 => lqos_bus::FlowProto::TCP,
17 => lqos_bus::FlowProto::UDP,
_ => lqos_bus::FlowProto::ICMP,
},
bytes: value.bytes,
packets: value.packets,
dscp,
ecn,
});
}
}
// Turn them into reciprocal pairs
let mut done = HashSet::new();
for (i, flow) in all_flows.iter().enumerate() {
if !done.contains(&i) {
let flow_a = flow.clone();
let flow_b = if let Some(flow_b) = all_flows
.iter()
.position(|f| f.src == flow_a.dst && f.src_port == flow_a.dst_port)
{
done.insert(flow_b);
Some(all_flows[flow_b].clone())
} else {
None
};
result.push((flow_a, flow_b));
}
}
result.sort_by(|a, b| b.0.bytes.cmp(&a.0.bytes));
BusResponse::FlowData(result)
}

View File

@@ -7,8 +7,6 @@ mod config;
pub mod perf_interface;
pub mod stats;
pub use config::{HeimdalConfig, HeimdallMode};
mod flows;
pub use flows::{expire_heimdall_flows, get_flow_stats};
mod timeline;
pub use timeline::{n_second_packet_dump, n_second_pcap, hyperfocus_on_target};
mod pcap;
@@ -16,7 +14,7 @@ mod watchlist;
use lqos_utils::fdtimer::periodic;
pub use watchlist::{heimdall_expire, heimdall_watch_ip, set_heimdall_mode};
use crate::flows::read_flows;
use crate::timeline::expire_timeline;
/// How long should Heimdall keep watching a flow after being requested
/// to do so? Setting this to a long period increases CPU load after the
@@ -24,9 +22,6 @@ use crate::flows::read_flows;
/// collections if the client hasn't maintained the 1s request cadence.
const EXPIRE_WATCHES_SECS: u64 = 5;
/// How long should Heimdall retain flow summary data?
const FLOW_EXPIRE_SECS: u64 = 10;
/// How long should Heimdall retain packet timeline data?
const TIMELINE_EXPIRE_SECS: u64 = 10;
@@ -48,9 +43,8 @@ pub async fn start_heimdall() {
std::thread::spawn(move || {
periodic(interval_ms, "Heimdall Packet Watcher", &mut || {
read_flows();
expire_heimdall_flows();
heimdall_expire();
expire_timeline();
});
});
}

View File

@@ -1,7 +1,7 @@
use crate::auth_guard::AuthGuard;
use crate::cache_control::NoCache;
use crate::tracker::{SHAPED_DEVICES, lookup_dns};
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport, FlowbeeData, PacketHeader, QueueStoreTransit};
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowbeeData, PacketHeader, QueueStoreTransit};
use rocket::fs::NamedFile;
use rocket::http::Status;
use rocket::response::content::RawJson;

View File

@@ -332,6 +332,7 @@ if (hdr->cwr) flags |= 128;
target = params.id;
$.get("/api/packet_dump/" + params.id, (data) => {
console.log(data);
data.sort((a,b) => a.timestamp - b.timestamp);
// Find the minimum timestamp

View File

@@ -60,33 +60,6 @@ struct heimdall_event {
__u8 dump[PACKET_OCTET_SIZE];
};
struct heimdall_key
{
struct in6_addr src;
struct in6_addr dst;
__u8 ip_protocol;
__u16 src_port;
__u16 dst_port;
__u8 pad;
};
struct heimdall_data {
__u64 last_seen;
__u64 bytes;
__u64 packets;
__u8 tos;
};
// Map for tracking flow information in-kernel for watched IPs
struct
{
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH);
__type(key, struct heimdall_key);
__type(value, struct heimdall_data);
__uint(max_entries, MAX_FLOWS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} heimdall SEC(".maps");
static __always_inline __u8 get_heimdall_mode()
{
__u32 index = 0;
@@ -122,43 +95,7 @@ static __always_inline bool is_heimdall_watching(struct dissector_t *dissector,
static __always_inline void update_heimdall(struct dissector_t *dissector, __u32 size, __u8 mode)
{
if (mode == 1) {
// Don't report any non-ICMP without ports
if (dissector->ip_protocol != 1 && (dissector->src_port == 0 || dissector->dst_port == 0))
return;
// Don't report ICMP with invalid numbers
if (dissector->ip_protocol == 1 && dissector->src_port > 18) return;
struct heimdall_key key = {0};
key.src = dissector->src_ip;
key.dst = dissector->dst_ip;
key.ip_protocol = dissector->ip_protocol;
key.src_port = bpf_ntohs(dissector->src_port);
key.dst_port = bpf_ntohs(dissector->dst_port);
struct heimdall_data *counter = (struct heimdall_data *)bpf_map_lookup_elem(&heimdall, &key);
if (counter)
{
counter->last_seen = dissector->now;
counter->packets += 1;
counter->bytes += size;
if (dissector->tos != 0)
{
counter->tos = dissector->tos;
}
}
else
{
struct heimdall_data counter = {0};
counter.last_seen = dissector->now;
counter.bytes = size;
counter.packets = 1;
counter.tos = dissector->tos;
if (bpf_map_update_elem(&heimdall, &key, &counter, BPF_NOEXIST) != 0)
{
bpf_debug("Failed to insert tracking");
}
//bpf_debug("Inserted tracking");
}
} else if (mode == 2) {
if (mode == 2) {
struct heimdall_event event = {0};
event.timetamp = dissector->now;
event.src = dissector->src_ip;

View File

@@ -443,34 +443,6 @@ int throughput_reader(struct bpf_iter__bpf_map_elem *ctx)
return 0;
}
SEC("iter/bpf_map_elem")
int heimdall_reader(struct bpf_iter__bpf_map_elem *ctx) {
// The sequence file
struct seq_file *seq = ctx->meta->seq;
void *counter = ctx->value;
struct heimdall_key *ip = ctx->key;
__u32 num_cpus = NUM_CPUS;
if (ctx->meta->seq_num == 0) {
bpf_seq_write(seq, &num_cpus, sizeof(__u32));
bpf_seq_write(seq, &num_cpus, sizeof(__u32)); // Repeat for padding
}
// Bail on end
if (counter == NULL || ip == NULL) {
return 0;
}
bpf_seq_write(seq, ip, sizeof(struct heimdall_key));
for (__u32 i=0; i<NUM_CPUS; i++) {
struct heimdall_data * content = counter+(i*sizeof(struct heimdall_data));
bpf_seq_write(seq, content, sizeof(struct heimdall_data));
}
//BPF_SEQ_PRINTF(seq, "%d %d\n", counter->download_bytes, counter->upload_bytes);
return 0;
}
SEC("iter/bpf_map_elem")
int flow_reader(struct bpf_iter__bpf_map_elem *ctx)
{

View File

@@ -1,5 +1,5 @@
use crate::{
bpf_map::BpfMap, flowbee_data::{FlowbeeData, FlowbeeKey}, heimdall_data::{HeimdallData, HeimdallKey}, kernel_wrapper::BPF_SKELETON, lqos_kernel::bpf, HostCounter
bpf_map::BpfMap, flowbee_data::{FlowbeeData, FlowbeeKey}, kernel_wrapper::BPF_SKELETON, lqos_kernel::bpf, HostCounter
};
use lqos_utils::XdpIpAddress;
use once_cell::sync::Lazy;
@@ -192,10 +192,6 @@ static mut MAP_TRAFFIC: Lazy<
Option<BpfMapIterator<XdpIpAddress, HostCounter>>,
> = Lazy::new(|| None);
static mut HEIMDALL_TRACKER: Lazy<
Option<BpfMapIterator<HeimdallKey, HeimdallData>>,
> = Lazy::new(|| None);
static mut FLOWBEE_TRACKER: Lazy<
Option<BpfMapIterator<FlowbeeKey, FlowbeeData>>,
> = Lazy::new(|| None);
@@ -223,32 +219,6 @@ pub unsafe fn iterate_throughput(
}
}
/// Iterate through the heimdall map and call the callback for each entry.
pub fn iterate_heimdall(
callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]),
) {
unsafe {
if HEIMDALL_TRACKER.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
if let Some(skeleton) = lock.as_ref() {
let skeleton = skeleton.get_ptr();
if let Ok(iter) = {
BpfMapIterator::new(
(*skeleton).progs.heimdall_reader,
(*skeleton).maps.heimdall,
)
} {
*HEIMDALL_TRACKER = Some(iter);
}
}
}
if let Some(iter) = HEIMDALL_TRACKER.as_mut() {
let _ = iter.for_each_per_cpu(callback);
}
}
}
/// Iterate through the Flows 2 system tracker, retrieving all flows
pub fn iterate_flows(
callback: &mut dyn FnMut(&FlowbeeKey, &FlowbeeData)
@@ -281,13 +251,8 @@ pub fn iterate_flows(
pub fn end_flows(flows: &mut [FlowbeeKey]) -> anyhow::Result<()> {
let mut map = BpfMap::<FlowbeeKey, FlowbeeData>::from_path("/sys/fs/bpf/flowbee")?;
let mut dead_flow = FlowbeeData {
end_status: 2,
..Default::default()
};
for flow in flows {
map.insert_or_update(flow, &mut dead_flow)?;
map.delete(flow)?;
}
Ok(())

View File

@@ -1,33 +0,0 @@
use lqos_utils::XdpIpAddress;
use zerocopy::FromBytes;
/// Representation of the eBPF `heimdall_key` type.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash, FromBytes)]
#[repr(C)]
pub struct HeimdallKey {
/// Mapped `XdpIpAddress` source for the flow.
pub src_ip: XdpIpAddress,
/// Mapped `XdpIpAddress` destination for the flow
pub dst_ip: XdpIpAddress,
/// IP protocol (see the Linux kernel!)
pub ip_protocol: u8,
/// Source port number, or ICMP type.
pub src_port: u16,
/// Destination port number.
pub dst_port: u16,
_padding: u8,
}
/// Mapped representation of the eBPF `heimdall_data` type.
#[derive(Debug, Clone, Default, FromBytes)]
#[repr(C)]
pub struct HeimdallData {
/// Last seen, in nanoseconds (since boot time).
pub last_seen: u64,
/// Number of bytes since the flow started being tracked
pub bytes: u64,
/// Number of packets since the flow started being tracked
pub packets: u64,
/// IP header TOS value
pub tos: u8,
}

View File

@@ -20,7 +20,6 @@ mod linux;
mod bpf_iterator;
/// Data shared between eBPF and Heimdall that needs local access
/// for map control.
pub mod heimdall_data;
pub mod flowbee_data;
pub use ip_mapping::{
@@ -30,4 +29,4 @@ pub use kernel_wrapper::LibreQoSKernels;
pub use linux::num_possible_cpus;
pub use lqos_kernel::max_tracked_ips;
pub use throughput::{throughput_for_each, HostCounter};
pub use bpf_iterator::{iterate_heimdall, iterate_flows, end_flows};
pub use bpf_iterator::{iterate_flows, end_flows};

View File

@@ -29,7 +29,6 @@ use signal_hook::{
iterator::Signals,
};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED};
use throughput_tracker::get_flow_stats;
use tokio::join;
mod stats;
@@ -197,7 +196,6 @@ fn handle_bus_requests(
tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed),
}
}
BusRequest::GetFlowStats(ip) => get_flow_stats(ip),
BusRequest::GetPacketHeaderDump(id) => {
BusResponse::PacketDump(n_second_packet_dump(*id))
}

View File

@@ -1,14 +0,0 @@
use std::net::IpAddr;
use lqos_bus::BusResponse;
use lqos_heimdall::heimdall_watch_ip;
use lqos_utils::XdpIpAddress;
pub fn get_flow_stats(ip: &str) -> BusResponse {
let ip = ip.parse::<IpAddr>();
if let Ok(ip) = ip {
let ip = XdpIpAddress::from_ip(ip);
heimdall_watch_ip(ip);
return lqos_heimdall::get_flow_stats(ip);
}
BusResponse::Fail("No Stats or bad IP".to_string())
}

View File

@@ -1,5 +1,4 @@
pub mod flow_data;
mod heimdall_data;
mod throughput_entry;
mod tracking_data;
use std::net::IpAddr;
@@ -10,7 +9,6 @@ use crate::{
stats::TIME_TO_POLL_HOSTS,
throughput_tracker::tracking_data::ThroughputTracker,
};
pub use heimdall_data::get_flow_stats;
use log::{info, warn};
use lqos_bus::{BusResponse, FlowbeeProtocol, IpStats, TcHandle, TopFlowType, XdpPpingResult};
use lqos_sys::flowbee_data::{FlowbeeData, FlowbeeKey};