mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2024-11-22 08:16:25 -06:00
The new Heimdall UI which requires explicitly starting capture sessions.
This commit is contained in:
parent
4d0fd00583
commit
98d7bb88e1
104
src/rust/Cargo.lock
generated
104
src/rust/Cargo.lock
generated
@ -733,18 +733,6 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "educe"
|
||||
version = "0.4.20"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cb0188e3c3ba8df5753894d54461f0e39bc91741dc5b22e1c46999ec2c71f4e4"
|
||||
dependencies = [
|
||||
"enum-ordinalize",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.8.1"
|
||||
@ -760,20 +748,6 @@ dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "enum-ordinalize"
|
||||
version = "3.1.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a62bb1df8b45ecb7ffa78dca1c17a438fb193eb083db0b1b494d2a61bcb5096a"
|
||||
dependencies = [
|
||||
"num-bigint",
|
||||
"num-traits",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"rustc_version",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.10.0"
|
||||
@ -1481,7 +1455,6 @@ dependencies = [
|
||||
"nix",
|
||||
"once_cell",
|
||||
"rocket",
|
||||
"rocket-download-response",
|
||||
"rocket_async_compression",
|
||||
"sysinfo",
|
||||
]
|
||||
@ -1658,16 +1631,6 @@ version = "0.3.16"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
|
||||
|
||||
[[package]]
|
||||
name = "mime_guess"
|
||||
version = "2.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef"
|
||||
dependencies = [
|
||||
"mime",
|
||||
"unicase",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "minimal-lexical"
|
||||
version = "0.2.1"
|
||||
@ -1774,27 +1737,6 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-bigint"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f93ab6289c7b344a8a9f60f88d80aa20032336fe78da341afc91c8a2341fc75f"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-integer"
|
||||
version = "0.1.45"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-traits"
|
||||
version = "0.2.15"
|
||||
@ -2277,19 +2219,6 @@ dependencies = [
|
||||
"yansi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rocket-download-response"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "98c0a60b1b9d018d3cd73b4b69a1f49656a4adbb1552a997e18de30d7413c6ce"
|
||||
dependencies = [
|
||||
"educe",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"rocket",
|
||||
"url-escape",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rocket_async_compression"
|
||||
version = "0.2.0"
|
||||
@ -2353,15 +2282,6 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
||||
|
||||
[[package]]
|
||||
name = "rustc_version"
|
||||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
|
||||
dependencies = [
|
||||
"semver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustix"
|
||||
version = "0.36.9"
|
||||
@ -2409,12 +2329,6 @@ version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "semver"
|
||||
version = "1.0.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.153"
|
||||
@ -3027,15 +2941,6 @@ dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
|
||||
dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-ident"
|
||||
version = "1.0.8"
|
||||
@ -3076,15 +2981,6 @@ dependencies = [
|
||||
"subtle",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "url-escape"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "44e0ce4d1246d075ca5abec4b41d33e87a6054d08e2366b63205665e950db218"
|
||||
dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "uuid"
|
||||
version = "1.3.0"
|
||||
|
@ -136,11 +136,14 @@ pub enum BusRequest {
|
||||
/// Tell me flow stats for a given IP address
|
||||
GetFlowStats(String),
|
||||
|
||||
/// Tell Heimdall to hyper-focus on an IP address for a bit
|
||||
GatherPacketData(String),
|
||||
|
||||
/// Give me a dump of the last 10 seconds of packet headers
|
||||
GetPacketHeaderDump(String),
|
||||
GetPacketHeaderDump(usize),
|
||||
|
||||
/// Give me a libpcap format packet dump (shortened) of the last 10 seconds
|
||||
GetPcapDump,
|
||||
GetPcapDump(usize),
|
||||
|
||||
/// If running on Equinix (the `equinix_test` feature is enabled),
|
||||
/// display a "run bandwidht test" link.
|
||||
|
@ -90,9 +90,12 @@ pub enum BusResponse {
|
||||
/// Flow Data
|
||||
FlowData(Vec<(FlowTransport, Option<FlowTransport>)>),
|
||||
|
||||
/// The index of the new packet collection session
|
||||
PacketCollectionSession(usize),
|
||||
|
||||
/// Packet header dump
|
||||
PacketDump(Vec<PacketHeader>),
|
||||
PacketDump(Option<Vec<PacketHeader>>),
|
||||
|
||||
/// Pcap format dump
|
||||
PcapDump(Vec<u8>),
|
||||
PcapDump(Option<String>),
|
||||
}
|
||||
|
@ -1,10 +1,43 @@
|
||||
use crate::{perf_interface::HeimdallEvent, timeline::expire_timeline, FLOW_EXPIRE_SECS};
|
||||
use crate::{timeline::expire_timeline, FLOW_EXPIRE_SECS};
|
||||
use dashmap::DashMap;
|
||||
use lqos_bus::{tos_parser, BusResponse, FlowTransport};
|
||||
use lqos_sys::bpf_per_cpu_map::BpfPerCpuMap;
|
||||
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::{collections::HashSet, time::Duration};
|
||||
|
||||
/// Representation of the eBPF `heimdall_key` type.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
|
||||
#[repr(C)]
|
||||
pub struct HeimdallKey {
|
||||
/// Mapped `XdpIpAddress` source for the flow.
|
||||
pub src_ip: XdpIpAddress,
|
||||
/// Mapped `XdpIpAddress` destination for the flow
|
||||
pub dst_ip: XdpIpAddress,
|
||||
/// IP protocol (see the Linux kernel!)
|
||||
pub ip_protocol: u8,
|
||||
/// Source port number, or ICMP type.
|
||||
pub src_port: u16,
|
||||
/// Destination port number.
|
||||
pub dst_port: u16,
|
||||
}
|
||||
|
||||
/// Mapped representation of the eBPF `heimdall_data` type.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[repr(C)]
|
||||
pub struct HeimdallData {
|
||||
/// Last seen, in nanoseconds (since boot time).
|
||||
pub last_seen: u64,
|
||||
/// Number of bytes since the flow started being tracked
|
||||
pub bytes: u64,
|
||||
/// Number of packets since the flow started being tracked
|
||||
pub packets: u64,
|
||||
/// IP header TOS value
|
||||
pub tos: u8,
|
||||
/// Reserved to pad the structure
|
||||
pub reserved: [u8; 3],
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||
struct FlowKey {
|
||||
src: XdpIpAddress,
|
||||
@ -14,7 +47,7 @@ struct FlowKey {
|
||||
dst_port: u16,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, Default)]
|
||||
struct FlowData {
|
||||
last_seen: u64,
|
||||
bytes: u64,
|
||||
@ -22,11 +55,11 @@ struct FlowData {
|
||||
tos: u8,
|
||||
}
|
||||
|
||||
impl From<&HeimdallEvent> for FlowKey {
|
||||
fn from(value: &HeimdallEvent) -> Self {
|
||||
impl From<&HeimdallKey> for FlowKey {
|
||||
fn from(value: &HeimdallKey) -> Self {
|
||||
Self {
|
||||
src: value.src,
|
||||
dst: value.dst,
|
||||
src: value.src_ip,
|
||||
dst: value.dst_ip,
|
||||
proto: value.ip_protocol,
|
||||
src_port: value.src_port,
|
||||
dst_port: value.dst_port,
|
||||
@ -36,7 +69,7 @@ impl From<&HeimdallEvent> for FlowKey {
|
||||
|
||||
static FLOW_DATA: Lazy<DashMap<FlowKey, FlowData>> = Lazy::new(DashMap::new);
|
||||
|
||||
pub(crate) fn record_flow(event: &HeimdallEvent) {
|
||||
/*pub(crate) fn record_flow(event: &HeimdallEvent) {
|
||||
let key: FlowKey = event.into();
|
||||
if let Some(mut data) = FLOW_DATA.get_mut(&key) {
|
||||
data.last_seen = event.timestamp;
|
||||
@ -54,6 +87,50 @@ pub(crate) fn record_flow(event: &HeimdallEvent) {
|
||||
},
|
||||
);
|
||||
}
|
||||
}*/
|
||||
|
||||
|
||||
/// Iterates through all throughput entries, and sends them in turn to `callback`.
|
||||
/// This elides the need to clone or copy data.
|
||||
fn heimdall_for_each(
|
||||
callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]),
|
||||
) {
|
||||
if let Ok(heimdall) = BpfPerCpuMap::<HeimdallKey, HeimdallData>::from_path(
|
||||
"/sys/fs/bpf/heimdall",
|
||||
) {
|
||||
heimdall.for_each(callback);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn combine_flows(values: &[HeimdallData]) -> FlowData {
|
||||
let mut result = FlowData::default();
|
||||
let mut ls = 0;
|
||||
values.iter().for_each(|v| {
|
||||
result.bytes += v.bytes;
|
||||
result.packets += v.packets;
|
||||
result.tos += v.tos;
|
||||
if v.last_seen > ls {
|
||||
ls = v.last_seen;
|
||||
}
|
||||
});
|
||||
result.last_seen = ls;
|
||||
result
|
||||
}
|
||||
|
||||
pub fn read_flows() {
|
||||
heimdall_for_each(&mut |key, value| {
|
||||
let flow_key = key.into();
|
||||
let combined = combine_flows(value);
|
||||
if let Some(mut flow) = FLOW_DATA.get_mut(&flow_key) {
|
||||
flow.last_seen = combined.last_seen;
|
||||
flow.bytes = combined.bytes;
|
||||
flow.packets = combined.packets;
|
||||
flow.tos = combined.tos;
|
||||
} else {
|
||||
FLOW_DATA.insert(flow_key, combined);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn expire_heimdall_flows() {
|
||||
|
@ -9,12 +9,14 @@ pub use config::{HeimdalConfig, HeimdallMode};
|
||||
mod flows;
|
||||
pub use flows::{expire_heimdall_flows, get_flow_stats};
|
||||
mod timeline;
|
||||
pub use timeline::{ten_second_packet_dump, ten_second_pcap};
|
||||
pub use timeline::{ten_second_packet_dump, ten_second_pcap, hyperfocus_on_target};
|
||||
mod pcap;
|
||||
mod watchlist;
|
||||
use lqos_utils::fdtimer::periodic;
|
||||
pub use watchlist::{heimdall_expire, heimdall_watch_ip, set_heimdall_mode};
|
||||
|
||||
use crate::flows::read_flows;
|
||||
|
||||
/// How long should Heimdall keep watching a flow after being requested
|
||||
/// to do so? Setting this to a long period increases CPU load after the
|
||||
/// client has stopped looking. Too short a delay will lead to missed
|
||||
@ -27,6 +29,9 @@ const FLOW_EXPIRE_SECS: u64 = 10;
|
||||
/// How long should Heimdall retain packet timeline data?
|
||||
const TIMELINE_EXPIRE_SECS: u64 = 10;
|
||||
|
||||
/// How long should an analysis session remain in memory?
|
||||
const SESSION_EXPIRE_SECONDS: u64 = 600;
|
||||
|
||||
/// Interface to running Heimdall (start this when lqosd starts)
|
||||
/// This is async to match the other spawning systems.
|
||||
pub async fn start_heimdall() {
|
||||
@ -42,6 +47,7 @@ pub async fn start_heimdall() {
|
||||
|
||||
std::thread::spawn(move || {
|
||||
periodic(interval_ms, "Heimdall Packet Watcher", &mut || {
|
||||
read_flows();
|
||||
expire_heimdall_flows();
|
||||
heimdall_expire();
|
||||
});
|
||||
|
@ -1,7 +1,7 @@
|
||||
use std::{ffi::c_void, slice};
|
||||
use lqos_utils::XdpIpAddress;
|
||||
use zerocopy::FromBytes;
|
||||
use crate::{flows::record_flow, timeline::store_on_timeline};
|
||||
use crate::timeline::store_on_timeline;
|
||||
|
||||
/// This constant MUST exactly match PACKET_OCTET_STATE in heimdall.h
|
||||
pub(crate) const PACKET_OCTET_SIZE: usize = 128;
|
||||
@ -60,7 +60,6 @@ pub unsafe extern "C" fn heimdall_handle_events(
|
||||
let data_slice : &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
|
||||
|
||||
if let Some(incoming) = HeimdallEvent::read_from(data_slice) {
|
||||
record_flow(&incoming);
|
||||
store_on_timeline(incoming);
|
||||
} else {
|
||||
println!("Failed to decode");
|
||||
|
@ -1,29 +1,42 @@
|
||||
use std::time::Duration;
|
||||
use dashmap::DashSet;
|
||||
use lqos_bus::{PacketHeader, tos_parser};
|
||||
use crate::{
|
||||
heimdall_watch_ip,
|
||||
pcap::{PcapFileHeader, PcapPacketHeader},
|
||||
perf_interface::{HeimdallEvent, PACKET_OCTET_SIZE},
|
||||
set_heimdall_mode, HeimdallMode, SESSION_EXPIRE_SECONDS,
|
||||
TIMELINE_EXPIRE_SECS,
|
||||
};
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use lqos_bus::{tos_parser, PacketHeader};
|
||||
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
|
||||
use once_cell::sync::Lazy;
|
||||
use std::{
|
||||
fs::{remove_file, File},
|
||||
io::Write,
|
||||
path::Path,
|
||||
sync::atomic::{AtomicBool, AtomicUsize},
|
||||
time::Duration,
|
||||
};
|
||||
use zerocopy::AsBytes;
|
||||
use crate::{perf_interface::{HeimdallEvent, PACKET_OCTET_SIZE}, pcap::{PcapFileHeader, PcapPacketHeader}, TIMELINE_EXPIRE_SECS};
|
||||
|
||||
impl HeimdallEvent {
|
||||
fn as_header(&self) -> PacketHeader {
|
||||
let (dscp, ecn) = tos_parser(self.tos);
|
||||
PacketHeader {
|
||||
timestamp: self.timestamp,
|
||||
src: self.src.as_ip().to_string(),
|
||||
dst: self.dst.as_ip().to_string(),
|
||||
src_port: self.src_port,
|
||||
dst_port: self.dst_port,
|
||||
ip_protocol: self.ip_protocol,
|
||||
ecn, dscp,
|
||||
size: self.size,
|
||||
tcp_flags: self.tcp_flags,
|
||||
tcp_window: self.tcp_window,
|
||||
tcp_tsecr: self.tcp_tsecr,
|
||||
tcp_tsval: self.tcp_tsval,
|
||||
}
|
||||
fn as_header(&self) -> PacketHeader {
|
||||
let (dscp, ecn) = tos_parser(self.tos);
|
||||
PacketHeader {
|
||||
timestamp: self.timestamp,
|
||||
src: self.src.as_ip().to_string(),
|
||||
dst: self.dst.as_ip().to_string(),
|
||||
src_port: self.src_port,
|
||||
dst_port: self.dst_port,
|
||||
ip_protocol: self.ip_protocol,
|
||||
ecn,
|
||||
dscp,
|
||||
size: self.size,
|
||||
tcp_flags: self.tcp_flags,
|
||||
tcp_window: self.tcp_window,
|
||||
tcp_tsecr: self.tcp_tsecr,
|
||||
tcp_tsval: self.tcp_tsval,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct Timeline {
|
||||
@ -45,34 +58,125 @@ pub(crate) fn store_on_timeline(event: HeimdallEvent) {
|
||||
pub(crate) fn expire_timeline() {
|
||||
if let Ok(now) = time_since_boot() {
|
||||
let since_boot = Duration::from(now);
|
||||
let expire = (since_boot - Duration::from_secs(TIMELINE_EXPIRE_SECS)).as_nanos() as u64;
|
||||
let expire = (since_boot - Duration::from_secs(TIMELINE_EXPIRE_SECS))
|
||||
.as_nanos() as u64;
|
||||
TIMELINE.data.retain(|v| v.timestamp > expire);
|
||||
FOCUS_SESSIONS.retain(|_, v| v.expire < since_boot.as_nanos() as u64);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ten_second_packet_dump(ip: XdpIpAddress) -> Vec<PacketHeader> {
|
||||
TIMELINE
|
||||
.data
|
||||
.iter()
|
||||
.filter(|e| e.src == ip || e.dst == ip)
|
||||
.map(|e| e.as_header())
|
||||
.collect()
|
||||
struct FocusSession {
|
||||
expire: u64,
|
||||
data: DashSet<HeimdallEvent>,
|
||||
dump_filename: Option<String>,
|
||||
}
|
||||
|
||||
pub fn ten_second_pcap() -> Vec<u8> {
|
||||
let mut bytes : Vec<u8> = Vec::new();
|
||||
let file_header = PcapFileHeader::new();
|
||||
bytes.extend(file_header.as_bytes());
|
||||
let mut packets: Vec<HeimdallEvent> = TIMELINE.data.iter().map(|e| e.clone()).collect();
|
||||
packets.sort_by(|a,b| a.timestamp.cmp(&b.timestamp));
|
||||
packets.iter().for_each(|p| {
|
||||
let packet_header = PcapPacketHeader::from_heimdall(p);
|
||||
bytes.extend(packet_header.as_bytes());
|
||||
if p.size < PACKET_OCTET_SIZE as u32 {
|
||||
bytes.extend(&p.packet_data[0 .. p.size as usize]);
|
||||
} else {
|
||||
bytes.extend(p.packet_data);
|
||||
impl Drop for FocusSession {
|
||||
fn drop(&mut self) {
|
||||
if let Some(df) = &self.dump_filename {
|
||||
let path = Path::new(df);
|
||||
if path.exists() {
|
||||
let _ = remove_file(path);
|
||||
}
|
||||
}
|
||||
});
|
||||
bytes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static HYPERFOCUSED: AtomicBool = AtomicBool::new(false);
|
||||
static FOCUS_SESSION_ID: AtomicUsize = AtomicUsize::new(0);
|
||||
static FOCUS_SESSIONS: Lazy<DashMap<usize, FocusSession>> =
|
||||
Lazy::new(DashMap::new);
|
||||
|
||||
/// Tell Heimdall to spend the next 10 seconds obsessing over an IP address,
|
||||
/// collecting full packet headers. This hurts your CPU, so use it sparingly.
|
||||
///
|
||||
/// This spawns a thread that keeps Heimdall in Analysis mode (saving packet
|
||||
/// data to userspace) for 10 seconds, before reverting to WatchOnly mode.
|
||||
///
|
||||
/// You can only do this on one target at a time.
|
||||
///
|
||||
/// ## Returns
|
||||
///
|
||||
/// * Either `None` or...
|
||||
/// * The id number of the collection session for analysis.
|
||||
pub fn hyperfocus_on_target(ip: XdpIpAddress) -> Option<usize> {
|
||||
if HYPERFOCUSED.compare_exchange(
|
||||
false,
|
||||
true,
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
std::sync::atomic::Ordering::Relaxed,
|
||||
) == Ok(false)
|
||||
{
|
||||
let new_id =
|
||||
FOCUS_SESSION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
std::thread::spawn(move || {
|
||||
for _ in 0..10 {
|
||||
let _ = set_heimdall_mode(HeimdallMode::Analysis);
|
||||
heimdall_watch_ip(ip);
|
||||
std::thread::sleep(Duration::from_secs(1));
|
||||
}
|
||||
let _ = set_heimdall_mode(HeimdallMode::WatchOnly);
|
||||
|
||||
if let Ok(now) = time_since_boot() {
|
||||
let since_boot = Duration::from(now);
|
||||
let expire = (since_boot - Duration::from_secs(SESSION_EXPIRE_SECONDS))
|
||||
.as_nanos() as u64;
|
||||
FOCUS_SESSIONS.insert(
|
||||
new_id,
|
||||
FocusSession {
|
||||
expire,
|
||||
data: TIMELINE.data.clone(),
|
||||
dump_filename: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
HYPERFOCUSED.store(false, std::sync::atomic::Ordering::Relaxed);
|
||||
});
|
||||
Some(new_id)
|
||||
} else {
|
||||
log::warn!(
|
||||
"Heimdall was busy and won't start another collection session."
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ten_second_packet_dump(session_id: usize) -> Option<Vec<PacketHeader>> {
|
||||
if let Some(session) = FOCUS_SESSIONS.get(&session_id) {
|
||||
Some(session.data.iter().map(|e| e.as_header()).collect())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ten_second_pcap(session_id: usize) -> Option<String> {
|
||||
if let Some(mut session) = FOCUS_SESSIONS.get_mut(&session_id) {
|
||||
let filename = format!("/tmp/cap_sess_{session_id}");
|
||||
session.dump_filename = Some(filename.clone());
|
||||
let path = Path::new(&filename);
|
||||
let mut out = File::create(path).expect("Unable to create {filename}");
|
||||
out
|
||||
.write_all(PcapFileHeader::new().as_bytes())
|
||||
.expect("Unable to write to {filename}");
|
||||
|
||||
session
|
||||
.data
|
||||
.iter()
|
||||
.map(|e| (e.packet_data, e.size, PcapPacketHeader::from_heimdall(&e)))
|
||||
.for_each(
|
||||
|(data, size, p)| {
|
||||
out.write_all(p.as_bytes()).expect("Unable to write to {filename}");
|
||||
if size < PACKET_OCTET_SIZE as u32 {
|
||||
out.write_all(&data[0 .. size as usize]).unwrap();
|
||||
} else {
|
||||
out.write_all(&data).unwrap();
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
Some(filename)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
@ -42,12 +42,6 @@ impl HeimdallWatching {
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for HeimdallWatching {
|
||||
fn drop(&mut self) {
|
||||
self.stop_watching();
|
||||
}
|
||||
}
|
||||
|
||||
static HEIMDALL_WATCH_LIST: Lazy<DashMap<XdpIpAddress, HeimdallWatching>> =
|
||||
Lazy::new(DashMap::new);
|
||||
|
||||
@ -57,6 +51,9 @@ pub fn heimdall_expire() {
|
||||
if let Ok(now) = time_since_boot() {
|
||||
let now = Duration::from(now).as_nanos();
|
||||
HEIMDALL_WATCH_LIST.retain(|_k, v| {
|
||||
if v.expiration < now {
|
||||
v.stop_watching();
|
||||
}
|
||||
v.expiration > now
|
||||
});
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ sysinfo = "0"
|
||||
default-net = "0"
|
||||
nix = "0"
|
||||
once_cell = "1"
|
||||
rocket-download-response = "0.5"
|
||||
|
||||
# Support JemAlloc on supported platforms
|
||||
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]
|
||||
|
@ -71,6 +71,7 @@ fn rocket() -> _ {
|
||||
queue_info::flow_stats,
|
||||
queue_info::packet_dump,
|
||||
queue_info::pcap,
|
||||
queue_info::request_analysis,
|
||||
config_control::get_nic_list,
|
||||
config_control::get_current_python_config,
|
||||
config_control::get_current_lqosd_config,
|
||||
|
@ -2,11 +2,11 @@ use crate::auth_guard::AuthGuard;
|
||||
use crate::cache_control::NoCache;
|
||||
use crate::tracker::SHAPED_DEVICES;
|
||||
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport, PacketHeader};
|
||||
use rocket::fs::NamedFile;
|
||||
use rocket::http::Status;
|
||||
use rocket::response::content::RawJson;
|
||||
use rocket::serde::json::Json;
|
||||
use rocket::serde::Serialize;
|
||||
use rocket_download_response::DownloadResponse;
|
||||
use std::net::IpAddr;
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
@ -114,26 +114,44 @@ pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<Json<Vec<(
|
||||
NoCache::new(Json(result))
|
||||
}
|
||||
|
||||
#[get("/api/packet_dump/<ip>")]
|
||||
pub async fn packet_dump(ip: String, _auth: AuthGuard) -> NoCache<Json<Vec<PacketHeader>>> {
|
||||
#[derive(Serialize, Clone)]
|
||||
#[serde(crate = "rocket::serde")]
|
||||
pub enum RequestAnalysisResult {
|
||||
Fail,
|
||||
Ok(usize)
|
||||
}
|
||||
|
||||
#[get("/api/request_analysis/<ip>")]
|
||||
pub async fn request_analysis(ip: String) -> NoCache<Json<RequestAnalysisResult>> {
|
||||
for r in bus_request(vec![BusRequest::GatherPacketData(ip)]).await.unwrap() {
|
||||
if let BusResponse::PacketCollectionSession(id) = r {
|
||||
return NoCache::new(Json(RequestAnalysisResult::Ok(id)));
|
||||
}
|
||||
}
|
||||
|
||||
NoCache::new(Json(RequestAnalysisResult::Fail))
|
||||
}
|
||||
|
||||
#[get("/api/packet_dump/<id>")]
|
||||
pub async fn packet_dump(id: usize, _auth: AuthGuard) -> NoCache<Json<Vec<PacketHeader>>> {
|
||||
let mut result = Vec::new();
|
||||
for r in bus_request(vec![BusRequest::GetPacketHeaderDump(ip)]).await.unwrap() {
|
||||
if let BusResponse::PacketDump(packets) = r {
|
||||
for r in bus_request(vec![BusRequest::GetPacketHeaderDump(id)]).await.unwrap() {
|
||||
if let BusResponse::PacketDump(Some(packets)) = r {
|
||||
result.extend(packets);
|
||||
}
|
||||
}
|
||||
NoCache::new(Json(result))
|
||||
}
|
||||
|
||||
#[get("/api/pcap")]
|
||||
pub async fn pcap() -> Result<DownloadResponse, Status> {
|
||||
for r in bus_request(vec![BusRequest::GetPcapDump]).await.unwrap() {
|
||||
if let BusResponse::PcapDump(bytes) = r {
|
||||
return Ok(DownloadResponse::from_vec(bytes, Some("capture.pcap"), None));
|
||||
#[get("/api/pcap/<id>/capture.pcap")]
|
||||
pub async fn pcap(id: usize) -> Result<NoCache<NamedFile>, Status> {
|
||||
for r in bus_request(vec![BusRequest::GetPcapDump(id)]).await.unwrap() {
|
||||
if let BusResponse::PcapDump(Some(filename)) = r {
|
||||
return Ok(NoCache::new(NamedFile::open(filename).await.unwrap()));
|
||||
}
|
||||
}
|
||||
|
||||
Err(Status::NoContent)
|
||||
Err(Status::NotFound)
|
||||
}
|
||||
|
||||
#[cfg(feature = "equinix_tests")]
|
||||
|
@ -194,6 +194,9 @@
|
||||
<div class="card bg-light">
|
||||
<div class="card-body">
|
||||
<h5 class="card-title"><i class="fa fa-bar-chart"></i> Flows (Last 30 Seconds)</h5>
|
||||
<p class="alert alert-warning" role="alert">
|
||||
<i class="fa fa-warning"></i> Gathering packet data can cause high CPU load during the 10 second capture window.
|
||||
</p>
|
||||
<div id="packetButtons"></div>
|
||||
<div id="flowList"></div>
|
||||
</div>
|
||||
@ -557,6 +560,37 @@
|
||||
}
|
||||
|
||||
var madeButtons = false;
|
||||
var analysisId = null;
|
||||
var analysisTimer = null;
|
||||
var analysisBtn = null;
|
||||
|
||||
function analyze(id) {
|
||||
if (analysisId != null) {
|
||||
alert("Heimdall says: 'STOP CLICKING ME'");
|
||||
return;
|
||||
}
|
||||
let ip = ips[id];
|
||||
$.get("/api/request_analysis/" + encodeURI(ip), (data) => {
|
||||
if (data == "Fail") {
|
||||
alert("Heimdall is busy serving other customers. Your desire is important to him, please try again later.")
|
||||
return;
|
||||
}
|
||||
analysisId = data.Ok;
|
||||
analysisBtn = "#dumpBtn_" + id;
|
||||
analysisTimer = 10;
|
||||
analyzeTick();
|
||||
});
|
||||
}
|
||||
|
||||
function analyzeTick() {
|
||||
$(analysisBtn).text("Gathering Data for " + analysisTimer + " more seconds");
|
||||
analysisTimer--;
|
||||
if (analysisTimer > -1) {
|
||||
setTimeout(analyzeTick, 1000);
|
||||
} else {
|
||||
window.location.href = "/ip_dump?id=" + analysisId;
|
||||
}
|
||||
}
|
||||
|
||||
function getFlows() {
|
||||
let ip_list = "";
|
||||
@ -564,11 +598,10 @@
|
||||
for (let i=0; i<ips.length; ++i) {
|
||||
ip_list += ips[i] + ",";
|
||||
if (circuit_info != null) {
|
||||
ip_btns += "<a href='/ip_dump?ip=" + ips[i] + "&dn=" + circuit_info.capacity[0] + "&up=" + circuit_info.capacity[1] + "' class='btn btn-info'>Packet Dump: " + ips[i] + "</a> "
|
||||
ip_btns += "<a id='dumpBtn_" + i + "' href='#' onclick='analyze(\"" + i + "\")' class='btn btn-info'><i class='fa fa-search'></i> Analyze: " + ips[i] + "</a> "
|
||||
}
|
||||
}
|
||||
if (!madeButtons && ips.length > 0 && circuit_info != null) {
|
||||
ip_btns += "<a href='/api/pcap' class='btn btn-warning'>Download PCAP Dump</a>";
|
||||
ip_btns += "<br />";
|
||||
madeButtons = true;
|
||||
$("#packetButtons").html(ip_btns);
|
||||
|
@ -130,7 +130,8 @@ if (hdr->cwr) flags |= 128;
|
||||
}
|
||||
|
||||
function paginator(active) {
|
||||
let paginator = "<a href='#' class='btn btn-info' onClick='zoomIn();'>Zoom In</a> ";
|
||||
let paginator = "<a href='/api/pcap/" + target + "/capture.pcap' class='btn btn-warning'>Download PCAP Dump</a> ";
|
||||
paginator += "<a href='#' class='btn btn-info' onClick='zoomIn();'>Zoom In</a> ";
|
||||
paginator += "<a href='#' class='btn btn-info' onClick='zoomOut();'>Zoom Out</a> (ℹ️ Or drag an area of the graph) <br />";
|
||||
|
||||
paginator += "<strong>Jump to page</strong>: ";
|
||||
@ -211,8 +212,8 @@ if (hdr->cwr) flags |= 128;
|
||||
capacity = [ capacity[0] / 1e9, capacity[1] / 1e9 ]; // Bytes per nanosecond
|
||||
|
||||
|
||||
target = params.ip;
|
||||
$.get("/api/packet_dump/" + params.ip, (data) => {
|
||||
target = params.id;
|
||||
$.get("/api/packet_dump/" + params.id, (data) => {
|
||||
data.sort((a,b) => a.timestamp - b.timestamp);
|
||||
let min_ts = null;
|
||||
for (let i=0; i<data.length; ++i) {
|
||||
|
@ -42,7 +42,8 @@ struct {
|
||||
__uint(max_entries, 256 * 1024 /* 256 KB */);
|
||||
} heimdall_events SEC(".maps");
|
||||
|
||||
// Basic event type to send to userspace
|
||||
// Basic event type to send to userspace when "hyperfocused" on a
|
||||
// data flow.
|
||||
struct heimdall_event {
|
||||
__u64 timetamp;
|
||||
struct in6_addr src;
|
||||
@ -59,6 +60,32 @@ struct heimdall_event {
|
||||
__u8 dump[PACKET_OCTET_SIZE];
|
||||
};
|
||||
|
||||
struct heimdall_key
|
||||
{
|
||||
struct in6_addr src;
|
||||
struct in6_addr dst;
|
||||
__u8 ip_protocol;
|
||||
__u16 src_port;
|
||||
__u16 dst_port;
|
||||
};
|
||||
|
||||
struct heimdall_data {
|
||||
__u64 last_seen;
|
||||
__u64 bytes;
|
||||
__u64 packets;
|
||||
__u8 tos;
|
||||
};
|
||||
|
||||
// Map for tracking flow information in-kernel for watched IPs
|
||||
struct
|
||||
{
|
||||
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH);
|
||||
__type(key, struct heimdall_key);
|
||||
__type(value, struct heimdall_data);
|
||||
__uint(max_entries, MAX_FLOWS);
|
||||
__uint(pinning, LIBBPF_PIN_BY_NAME);
|
||||
} heimdall SEC(".maps");
|
||||
|
||||
static __always_inline __u8 get_heimdall_mode()
|
||||
{
|
||||
__u32 index = 0;
|
||||
@ -94,22 +121,59 @@ static __always_inline bool is_heimdall_watching(struct dissector_t *dissector,
|
||||
|
||||
static __always_inline void update_heimdall(struct dissector_t *dissector, __u32 size, __u8 mode)
|
||||
{
|
||||
struct heimdall_event event = {0};
|
||||
event.timetamp = bpf_ktime_get_boot_ns();
|
||||
event.src = dissector->src_ip;
|
||||
event.dst = dissector->dst_ip;
|
||||
event.src_port = dissector->src_port;
|
||||
event.dst_port = dissector->dst_port;
|
||||
event.ip_protocol = dissector->ip_protocol;
|
||||
event.tos = dissector->tos;
|
||||
event.size = size;
|
||||
event.tcp_flags = dissector->tcp_flags;
|
||||
event.tcp_window = dissector->window;
|
||||
event.tsval = dissector->tsval;
|
||||
event.tsecr = dissector->tsecr;
|
||||
if (size > PACKET_OCTET_SIZE) size = PACKET_OCTET_SIZE;
|
||||
bpf_probe_read_kernel(&event.dump, size, dissector->start);
|
||||
bpf_ringbuf_output(&heimdall_events, &event, sizeof(event), 0);
|
||||
if (mode == 1) {
|
||||
// Don't report any non-ICMP without ports
|
||||
if (dissector->ip_protocol != 1 && (dissector->src_port == 0 || dissector->dst_port == 0))
|
||||
return;
|
||||
// Don't report ICMP with invalid numbers
|
||||
if (dissector->ip_protocol == 1 && dissector->src_port > 18) return;
|
||||
struct heimdall_key key = {0};
|
||||
key.src = dissector->src_ip;
|
||||
key.dst = dissector->dst_ip;
|
||||
key.ip_protocol = dissector->ip_protocol;
|
||||
key.src_port = bpf_ntohs(dissector->src_port);
|
||||
key.dst_port = bpf_ntohs(dissector->dst_port);
|
||||
struct heimdall_data *counter = (struct heimdall_data *)bpf_map_lookup_elem(&heimdall, &key);
|
||||
if (counter)
|
||||
{
|
||||
counter->last_seen = bpf_ktime_get_boot_ns();
|
||||
counter->packets += 1;
|
||||
counter->bytes += size;
|
||||
if (dissector->tos != 0)
|
||||
{
|
||||
counter->tos = dissector->tos;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
struct heimdall_data counter = {0};
|
||||
counter.last_seen = bpf_ktime_get_boot_ns();
|
||||
counter.bytes = size;
|
||||
counter.packets = 1;
|
||||
counter.tos = dissector->tos;
|
||||
if (bpf_map_update_elem(&heimdall, &key, &counter, BPF_NOEXIST) != 0)
|
||||
{
|
||||
bpf_debug("Failed to insert tracking");
|
||||
}
|
||||
}
|
||||
} else if (mode == 2) {
|
||||
struct heimdall_event event = {0};
|
||||
event.timetamp = bpf_ktime_get_boot_ns();
|
||||
event.src = dissector->src_ip;
|
||||
event.dst = dissector->dst_ip;
|
||||
event.src_port = dissector->src_port;
|
||||
event.dst_port = dissector->dst_port;
|
||||
event.ip_protocol = dissector->ip_protocol;
|
||||
event.tos = dissector->tos;
|
||||
event.size = size;
|
||||
event.tcp_flags = dissector->tcp_flags;
|
||||
event.tcp_window = dissector->window;
|
||||
event.tsval = dissector->tsval;
|
||||
event.tsecr = dissector->tsecr;
|
||||
if (size > PACKET_OCTET_SIZE) size = PACKET_OCTET_SIZE;
|
||||
bpf_probe_read_kernel(&event.dump, size, dissector->start);
|
||||
bpf_ringbuf_output(&heimdall_events, &event, sizeof(event), 0);
|
||||
}
|
||||
|
||||
// Commented out because we don't really care - some will be missed
|
||||
// during very heavy load.
|
||||
|
@ -16,7 +16,7 @@ use crate::num_possible_cpus;
|
||||
///
|
||||
/// `K` is the *key* type, indexing the map.
|
||||
/// `V` is the *value* type, and must exactly match the underlying C data type.
|
||||
pub(crate) struct BpfPerCpuMap<K, V> {
|
||||
pub struct BpfPerCpuMap<K, V> {
|
||||
fd: i32,
|
||||
_key_phantom: PhantomData<K>,
|
||||
_val_phantom: PhantomData<V>,
|
||||
@ -30,7 +30,7 @@ where
|
||||
/// Connect to a PER-CPU BPF map via a filename. Connects the internal
|
||||
/// file descriptor, which is held until the structure is
|
||||
/// dropped. The index of the CPU is *not* specified.
|
||||
pub(crate) fn from_path(filename: &str) -> Result<Self> {
|
||||
pub fn from_path(filename: &str) -> Result<Self> {
|
||||
let filename_c = CString::new(filename)?;
|
||||
let fd = unsafe { bpf_obj_get(filename_c.as_ptr()) };
|
||||
if fd < 0 {
|
||||
@ -43,7 +43,7 @@ where
|
||||
/// Instead of clonining into a vector
|
||||
/// and allocating, calls `callback` for each key/value slice
|
||||
/// with references to the data returned from C.
|
||||
pub(crate) fn for_each(&self, callback: &mut dyn FnMut(&K, &[V])) {
|
||||
pub fn for_each(&self, callback: &mut dyn FnMut(&K, &[V])) {
|
||||
let num_cpus = num_possible_cpus().unwrap();
|
||||
let mut prev_key: *mut K = null_mut();
|
||||
let mut key: K = K::default();
|
||||
|
@ -11,7 +11,10 @@ mod bifrost_maps;
|
||||
/// built-in, compiled eBPF programs. This is very-low level and should
|
||||
/// be handled with caution.
|
||||
pub mod bpf_map;
|
||||
mod bpf_per_cpu_map;
|
||||
/// Provides direct access to LibBPF functionality, as exposed by the
|
||||
/// built-in, compiled eBPF programs. This is very-low level and should
|
||||
/// be handled with caution.
|
||||
pub mod bpf_per_cpu_map;
|
||||
mod cpu_map;
|
||||
mod ip_mapping;
|
||||
mod kernel_wrapper;
|
||||
|
@ -24,7 +24,6 @@ use lqos_queue_tracker::{
|
||||
spawn_queue_structure_monitor,
|
||||
};
|
||||
use lqos_sys::LibreQoSKernels;
|
||||
use lqos_utils::XdpIpAddress;
|
||||
use signal_hook::{
|
||||
consts::{SIGHUP, SIGINT, SIGTERM},
|
||||
iterator::Signals,
|
||||
@ -197,18 +196,24 @@ fn handle_bus_requests(
|
||||
}
|
||||
}
|
||||
BusRequest::GetFlowStats(ip) => get_flow_stats(ip),
|
||||
BusRequest::GetPacketHeaderDump(ip) => {
|
||||
BusRequest::GetPacketHeaderDump(id) => {
|
||||
BusResponse::PacketDump(ten_second_packet_dump(*id))
|
||||
}
|
||||
BusRequest::GetPcapDump(id) => {
|
||||
BusResponse::PcapDump(lqos_heimdall::ten_second_pcap(*id))
|
||||
}
|
||||
BusRequest::GatherPacketData(ip) => {
|
||||
let ip = ip.parse::<IpAddr>();
|
||||
if let Ok(ip) = ip {
|
||||
let ip = XdpIpAddress::from_ip(ip);
|
||||
BusResponse::PacketDump(ten_second_packet_dump(ip))
|
||||
if let Some(id) = lqos_heimdall::hyperfocus_on_target(ip.into()) {
|
||||
BusResponse::PacketCollectionSession(id)
|
||||
} else {
|
||||
BusResponse::Fail("Busy".to_string())
|
||||
}
|
||||
} else {
|
||||
BusResponse::Fail("Invalid IP".to_string())
|
||||
}
|
||||
}
|
||||
BusRequest::GetPcapDump => {
|
||||
BusResponse::PcapDump(lqos_heimdall::ten_second_pcap())
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user