Merge pull request #299 from LibreQoE/heimdall

Heimdall to main
This commit is contained in:
Herbert "TheBracket 2023-03-25 10:08:13 -05:00 committed by GitHub
commit 05186030e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
87 changed files with 3025 additions and 654 deletions

View File

@ -4,6 +4,7 @@
# Where is LibreQoS installed?
lqos_directory = '/opt/libreqos/src'
queue_check_period_ms = 1000
packet_capture_time = 10 # Number of seconds to capture packets in an analysis session
[usage_stats]
send_anonymous = true

43
src/rust/Cargo.lock generated
View File

@ -1428,6 +1428,21 @@ dependencies = [
"uuid",
]
[[package]]
name = "lqos_heimdall"
version = "0.1.0"
dependencies = [
"anyhow",
"dashmap",
"log",
"lqos_bus",
"lqos_config",
"lqos_sys",
"lqos_utils",
"once_cell",
"zerocopy",
]
[[package]]
name = "lqos_node_manager"
version = "0.1.0"
@ -1497,22 +1512,28 @@ dependencies = [
"anyhow",
"bindgen",
"byteorder",
"dashmap",
"libbpf-sys",
"log",
"lqos_bus",
"lqos_config",
"lqos_utils",
"nix",
"once_cell",
"thiserror",
]
[[package]]
name = "lqos_utils"
version = "0.1.0"
dependencies = [
"byteorder",
"log",
"nix",
"notify",
"serde",
"thiserror",
"zerocopy",
]
[[package]]
@ -1526,6 +1547,7 @@ dependencies = [
"log",
"lqos_bus",
"lqos_config",
"lqos_heimdall",
"lqos_queue_tracker",
"lqos_sys",
"lqos_utils",
@ -3283,3 +3305,24 @@ name = "yansi"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
[[package]]
name = "zerocopy"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "332f188cc1bcf1fe1064b8c58d150f497e697f49774aa846f2dc949d9a25f236"
dependencies = [
"byteorder",
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6505e6815af7de1746a08f69c69606bb45695a17149517680f3b2149713b19a3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

View File

@ -2,6 +2,7 @@
name = "lqos_rs"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
@ -24,4 +25,5 @@ members = [
"lqos_utils", # A collection of macros and helpers we find useful
"lqos_setup", # A quick CLI setup for first-time users
"lqos_anonymous_stats_server", # The server for gathering anonymous usage data.
"lqos_heimdall", # Library for managing Heimdall flow watching
]

24
src/rust/check_licenses.sh Executable file
View File

@ -0,0 +1,24 @@
#!/bin/bash
# Checks all Rust projects for accidental inclusion of something GPL-3
# licensed.
PROJECTS="lqos_anonymous_stats_server lqos_bus lqos_config lqos_heimdall lqos_node_manager lqos_python lqos_queue_tracker lqos_setup lqos_sys lqos_utils lqosd lqusers xdp_iphash_to_cpu_cmdline xdp_pping"
TOOL="cargo license --help"
# Check that the tool exists
if ! $TOOL &> /dev/null
then
echo "Cargo License Tool not Found. Installing it."
cargo install cargo-license
fi
# Check every project
for project in $PROJECTS
do
pushd $project > /dev/null
if cargo license | grep "GPL-3"; then
echo "Warning: GPL3 detected in dependencies for $project"
fi
popd > /dev/null
done

View File

@ -2,6 +2,7 @@
name = "lqos_anonymous_stats_server"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
tokio = { version = "1.25.0", features = ["full"] }

View File

@ -2,6 +2,7 @@
name = "lqos_bus"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[features]
default = ["equinix_tests"]

View File

@ -5,6 +5,7 @@ mod request;
mod response;
mod session;
mod unix_socket_server;
mod queue_data;
pub use client::bus_request;
use log::error;
pub use persistent_client::BusClient;
@ -14,6 +15,7 @@ pub use response::BusResponse;
pub use session::BusSession;
use thiserror::Error;
pub use unix_socket_server::UnixSocketServer;
pub use queue_data::*;
/// The local socket path to which `lqosd` will bind itself,
/// listening for requets.

View File

@ -0,0 +1,103 @@
use serde::{Serialize, Deserialize};
/// Type used for *displaying* the queue store data. It deliberately
/// doesn't include data that we aren't going to display in a GUI.
#[allow(missing_docs)]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
pub struct QueueStoreTransit {
pub history: Vec<(CakeDiffTransit, CakeDiffTransit)>,
pub history_head: usize,
//pub prev_download: Option<CakeTransit>,
//pub prev_upload: Option<CakeTransit>,
pub current_download: CakeTransit,
pub current_upload: CakeTransit,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[allow(missing_docs)]
pub struct CakeDiffTransit {
pub bytes: u64,
pub packets: u32,
pub qlen: u32,
pub tins: Vec<CakeDiffTinTransit>,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[allow(missing_docs)]
pub struct CakeDiffTinTransit {
pub sent_bytes: u64,
pub backlog_bytes: u32,
pub drops: u32,
pub marks: u32,
pub avg_delay_us: u32,
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[allow(missing_docs)]
pub struct CakeTransit {
//pub handle: TcHandle,
//pub parent: TcHandle,
//pub bytes: u64,
//pub packets: u32,
//pub overlimits: u32,
//pub requeues: u32,
//pub backlog: u32,
//pub qlen: u32,
pub memory_used: u32,
//pub memory_limit: u32,
//pub capacity_estimate: u32,
//pub min_network_size: u16,
//pub max_network_size: u16,
//pub min_adj_size: u16,
//pub max_adj_size: u16,
//pub avg_hdr_offset: u16,
//pub tins: Vec<CakeTinTransit>,
//pub drops: u32,
}
/*
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[allow(missing_docs)]
pub struct CakeOptionsTransit {
pub rtt: u64,
pub bandwidth: u8,
pub diffserv: u8,
pub flowmode: u8,
pub ack_filter: u8,
pub nat: bool,
pub wash: bool,
pub ingress: bool,
pub split_gso: bool,
pub raw: bool,
pub overhead: u16,
pub fwmark: TcHandle,
}
// Commented out data is collected but not used
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[allow(missing_docs)]
pub struct CakeTinTransit {
//pub threshold_rate: u64,
//pub sent_bytes: u64,
//pub backlog_bytes: u32,
//pub target_us: u32,
//pub interval_us: u32,
//pub peak_delay_us: u32,
//pub avg_delay_us: u32,
//pub base_delay_us: u32,
//pub sent_packets: u32,
//pub way_indirect_hits: u16,
//pub way_misses: u16,
//pub way_collisions: u16,
//pub drops: u32,
//pub ecn_marks: u32,
//pub ack_drops: u32,
//pub sparse_flows: u16,
//pub bulk_flows: u16,
//pub unresponsive_flows: u16,
//pub max_pkt_len: u16,
//pub flow_quantum: u16,
}
*/

View File

@ -133,6 +133,18 @@ pub enum BusRequest {
/// Obtain the lqosd statistics
GetLqosStats,
/// Tell me flow stats for a given IP address
GetFlowStats(String),
/// Tell Heimdall to hyper-focus on an IP address for a bit
GatherPacketData(String),
/// Give me a dump of the last 10 seconds of packet headers
GetPacketHeaderDump(usize),
/// Give me a libpcap format packet dump (shortened) of the last 10 seconds
GetPcapDump(usize),
/// If running on Equinix (the `equinix_test` feature is enabled),
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]

View File

@ -1,6 +1,7 @@
use crate::{IpMapping, IpStats, XdpPpingResult};
use crate::{IpMapping, IpStats, XdpPpingResult, FlowTransport, ip_stats::PacketHeader};
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use super::QueueStoreTransit;
/// A `BusResponse` object represents a single
/// reply generated from a `BusRequest`, and batched
@ -67,7 +68,7 @@ pub enum BusResponse {
/// A string containing a JSON dump of a queue stats. Analagos to
/// the response from `tc show qdisc`.
RawQueueData(String),
RawQueueData(Option<Box<QueueStoreTransit>>),
/// Results from network map queries
NetworkMap(Vec<(usize, lqos_config::NetworkJsonTransport)>),
@ -83,5 +84,24 @@ pub enum BusResponse {
time_to_poll_hosts: u64,
/// High traffic watermark
high_watermark: (u64, u64),
}
/// Number of flows tracked
tracked_flows: u64,
},
/// Flow Data
FlowData(Vec<(FlowTransport, Option<FlowTransport>)>),
/// The index of the new packet collection session
PacketCollectionSession{
/// The identifier of the capture session
session_id: usize,
/// Number of seconds for which data will be captured
countdown: usize
},
/// Packet header dump
PacketDump(Option<Vec<PacketHeader>>),
/// Pcap format dump
PcapDump(Option<String>),
}

View File

@ -66,3 +66,81 @@ pub struct XdpPpingResult {
/// derived. If 0, the other values are invalid.
pub samples: u32,
}
/// Defines an IP protocol for display in the flow
/// tracking (Heimdall) system.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum FlowProto {
/// A TCP flow
TCP,
/// A UDP flow
UDP,
/// An ICMP flow
ICMP
}
/// Defines the display data for a flow in Heimdall.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct FlowTransport {
/// The Source IP address
pub src: String,
/// The Destination IP address
pub dst: String,
/// The flow protocol (see `FlowProto`)
pub proto: FlowProto,
/// The source port, which is overridden to ICMP code on ICMP flows.
pub src_port: u16,
/// The destination port, which isn't useful at all on ICMP flows.
pub dst_port: u16,
/// The number of bytes since we started tracking this flow.
pub bytes: u64,
/// The number of packets since we started tracking this flow.
pub packets: u64,
/// Detected DSCP code if any
pub dscp: u8,
/// Detected ECN bit status (0-3)
pub ecn: u8,
}
/// Extract the 6-bit DSCP and 2-bit ECN code from a TOS field
/// in an IP header.
pub fn tos_parser(tos: u8) -> (u8, u8) {
// Format: 2 bits of ECN, 6 bits of DSCP
const ECN: u8 = 0b00000011;
const DSCP: u8 = 0b11111100;
let ecn = tos & ECN;
let dscp = (tos & DSCP) >> 2;
(dscp, ecn)
}
/// Packet header dump
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)]
pub struct PacketHeader {
/// Timestamp (ns since boot)
pub timestamp: u64,
/// Source IP
pub src: String,
/// Destination IP
pub dst: String,
/// Source Port
pub src_port : u16,
/// Destination Port
pub dst_port: u16,
/// Ip Protocol (see Linux kernel docs)
pub ip_protocol: u8,
/// ECN Flag
pub ecn: u8,
/// DSCP code
pub dscp: u8,
/// Packet Size
pub size: u32,
/// TCP Flag Bitset
pub tcp_flags: u8,
/// TCP Window Size
pub tcp_window: u16,
/// TCP TSVal
pub tcp_tsval: u32,
/// TCP ECR val
pub tcp_tsecr: u32,
}

View File

@ -12,11 +12,15 @@
#![warn(missing_docs)]
mod bus;
mod ip_stats;
pub use ip_stats::{IpMapping, IpStats, XdpPpingResult};
pub use ip_stats::{
tos_parser, FlowProto, FlowTransport, IpMapping, IpStats, PacketHeader,
XdpPpingResult,
};
mod tc_handle;
pub use bus::{
bus_request, decode_request, decode_response, encode_request,
encode_response, BusClient, BusReply, BusRequest, BusResponse, BusSession,
CakeDiffTinTransit, CakeDiffTransit, CakeTransit, QueueStoreTransit,
UnixSocketServer, BUS_SOCKET_PATH,
};
pub use tc_handle::TcHandle;

View File

@ -34,8 +34,13 @@ impl TcHandle {
"none" => Ok(Self(TC_H_UNSPEC)),
_ => {
if !handle.contains(':') {
error!("Unable to parse TC handle {handle}. Must contain a colon.");
return Err(TcHandleParseError::InvalidInput(handle.to_string()));
if let Ok(major) = read_hex_string(handle) {
let minor = 0;
return Ok(Self((major << 16) | minor));
} else {
error!("Unable to parse TC handle {handle}. Must contain a colon.");
return Err(TcHandleParseError::InvalidInput(handle.to_string()));
}
}
let parts: Vec<&str> = handle.split(':').collect();
let major = read_hex_string(parts[0]).map_err(|_| TcHandleParseError::InvalidInput(handle.to_string()))?;

View File

@ -2,6 +2,7 @@
name = "lqos_config"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
thiserror = "1"

View File

@ -30,6 +30,11 @@ pub struct EtcLqos {
/// If present, defined anonymous usage stat sending
pub usage_stats: Option<UsageStats>,
/// Defines for how many seconds a libpcap compatible capture should
/// run. Short times are good, there's a real performance penalty to
/// capturing high-throughput streams. Defaults to 10 seconds.
pub packet_capture_time: Option<usize>,
}
/// Represents a set of `sysctl` and `ethtool` tweaks that may be

View File

@ -179,6 +179,7 @@ impl NetworkJson {
&self,
circuit_id: &str,
) -> Option<Vec<usize>> {
//println!("Looking for parents of {circuit_id}");
self
.nodes
.iter()

View File

@ -0,0 +1,16 @@
[package]
name = "lqos_heimdall"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
lqos_utils = { path = "../lqos_utils" }
lqos_bus = { path = "../lqos_bus" }
lqos_sys = { path = "../lqos_sys" }
lqos_config = { path = "../lqos_config" }
log = "0"
zerocopy = {version = "0.6.1", features = [ "simd" ] }
once_cell = "1.17.1"
dashmap = "5.4.0"
anyhow = "1"

View File

@ -0,0 +1,19 @@
/// Currently unused, represents the current operation mode of the Heimdall
/// sub-system. Defaults to 1.
#[repr(u8)]
pub enum HeimdallMode {
/// Do not monitor
Off = 0,
/// Only look at flows on hosts we are watching via the circuit monitor
WatchOnly = 1,
/// Capture detailed packet data from flows
Analysis = 2,
}
/// Configuration options passed to Heimdall
#[derive(Default, Clone)]
#[repr(C)]
pub struct HeimdalConfig {
/// Current operation mode
pub mode: u32,
}

View File

@ -0,0 +1,194 @@
use crate::{timeline::expire_timeline, FLOW_EXPIRE_SECS};
use dashmap::DashMap;
use lqos_bus::{tos_parser, BusResponse, FlowTransport};
use lqos_sys::bpf_per_cpu_map::BpfPerCpuMap;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::{collections::HashSet, time::Duration};
/// Representation of the eBPF `heimdall_key` type.
#[derive(Debug, Clone, Default, PartialEq, Eq, Hash)]
#[repr(C)]
pub struct HeimdallKey {
/// Mapped `XdpIpAddress` source for the flow.
pub src_ip: XdpIpAddress,
/// Mapped `XdpIpAddress` destination for the flow
pub dst_ip: XdpIpAddress,
/// IP protocol (see the Linux kernel!)
pub ip_protocol: u8,
/// Source port number, or ICMP type.
pub src_port: u16,
/// Destination port number.
pub dst_port: u16,
}
/// Mapped representation of the eBPF `heimdall_data` type.
#[derive(Debug, Clone, Default)]
#[repr(C)]
pub struct HeimdallData {
/// Last seen, in nanoseconds (since boot time).
pub last_seen: u64,
/// Number of bytes since the flow started being tracked
pub bytes: u64,
/// Number of packets since the flow started being tracked
pub packets: u64,
/// IP header TOS value
pub tos: u8,
/// Reserved to pad the structure
pub reserved: [u8; 3],
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct FlowKey {
src: XdpIpAddress,
dst: XdpIpAddress,
proto: u8,
src_port: u16,
dst_port: u16,
}
#[derive(Clone, Debug, Default)]
struct FlowData {
last_seen: u64,
bytes: u64,
packets: u64,
tos: u8,
}
impl From<&HeimdallKey> for FlowKey {
fn from(value: &HeimdallKey) -> Self {
Self {
src: value.src_ip,
dst: value.dst_ip,
proto: value.ip_protocol,
src_port: value.src_port,
dst_port: value.dst_port,
}
}
}
static FLOW_DATA: Lazy<DashMap<FlowKey, FlowData>> = Lazy::new(DashMap::new);
/*pub(crate) fn record_flow(event: &HeimdallEvent) {
let key: FlowKey = event.into();
if let Some(mut data) = FLOW_DATA.get_mut(&key) {
data.last_seen = event.timestamp;
data.packets += 1;
data.bytes += event.size as u64;
data.tos = event.tos;
} else {
FLOW_DATA.insert(
key,
FlowData {
last_seen: event.timestamp,
bytes: event.size.into(),
packets: 1,
tos: event.tos,
},
);
}
}*/
/// Iterates through all throughput entries, and sends them in turn to `callback`.
/// This elides the need to clone or copy data.
fn heimdall_for_each(
callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]),
) {
if let Ok(heimdall) = BpfPerCpuMap::<HeimdallKey, HeimdallData>::from_path(
"/sys/fs/bpf/heimdall",
) {
heimdall.for_each(callback);
}
}
fn combine_flows(values: &[HeimdallData]) -> FlowData {
let mut result = FlowData::default();
let mut ls = 0;
values.iter().for_each(|v| {
result.bytes += v.bytes;
result.packets += v.packets;
result.tos += v.tos;
if v.last_seen > ls {
ls = v.last_seen;
}
});
result.last_seen = ls;
result
}
pub fn read_flows() {
heimdall_for_each(&mut |key, value| {
let flow_key = key.into();
let combined = combine_flows(value);
if let Some(mut flow) = FLOW_DATA.get_mut(&flow_key) {
flow.last_seen = combined.last_seen;
flow.bytes = combined.bytes;
flow.packets = combined.packets;
flow.tos = combined.tos;
} else {
FLOW_DATA.insert(flow_key, combined);
}
});
}
pub fn expire_heimdall_flows() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(FLOW_EXPIRE_SECS)).as_nanos() as u64;
FLOW_DATA.retain(|_k, v| v.last_seen > expire);
expire_timeline();
}
}
pub fn get_flow_stats(ip: XdpIpAddress) -> BusResponse {
let mut result = Vec::new();
// Obtain all the flows
let mut all_flows = Vec::new();
for value in FLOW_DATA.iter() {
let key = value.key();
if key.src == ip || key.dst == ip {
let (dscp, ecn) = tos_parser(value.tos);
all_flows.push(FlowTransport {
src: key.src.as_ip().to_string(),
dst: key.dst.as_ip().to_string(),
src_port: key.src_port,
dst_port: key.dst_port,
proto: match key.proto {
6 => lqos_bus::FlowProto::TCP,
17 => lqos_bus::FlowProto::UDP,
_ => lqos_bus::FlowProto::ICMP,
},
bytes: value.bytes,
packets: value.packets,
dscp,
ecn,
});
}
}
// Turn them into reciprocal pairs
let mut done = HashSet::new();
for (i, flow) in all_flows.iter().enumerate() {
if !done.contains(&i) {
let flow_a = flow.clone();
let flow_b = if let Some(flow_b) = all_flows
.iter()
.position(|f| f.src == flow_a.dst && f.src_port == flow_a.dst_port)
{
done.insert(flow_b);
Some(all_flows[flow_b].clone())
} else {
None
};
result.push((flow_a, flow_b));
}
}
result.sort_by(|a, b| b.0.bytes.cmp(&a.0.bytes));
BusResponse::FlowData(result)
}

View File

@ -0,0 +1,55 @@
//! Provides an interface to the Heimdall packet watching
//! system. Heimdall watches traffic flows, and is notified
//! about their contents via the eBPF Perf system.
mod config;
pub mod perf_interface;
pub mod stats;
pub use config::{HeimdalConfig, HeimdallMode};
mod flows;
pub use flows::{expire_heimdall_flows, get_flow_stats};
mod timeline;
pub use timeline::{n_second_packet_dump, n_second_pcap, hyperfocus_on_target};
mod pcap;
mod watchlist;
use lqos_utils::fdtimer::periodic;
pub use watchlist::{heimdall_expire, heimdall_watch_ip, set_heimdall_mode};
use crate::flows::read_flows;
/// How long should Heimdall keep watching a flow after being requested
/// to do so? Setting this to a long period increases CPU load after the
/// client has stopped looking. Too short a delay will lead to missed
/// collections if the client hasn't maintained the 1s request cadence.
const EXPIRE_WATCHES_SECS: u64 = 5;
/// How long should Heimdall retain flow summary data?
const FLOW_EXPIRE_SECS: u64 = 10;
/// How long should Heimdall retain packet timeline data?
const TIMELINE_EXPIRE_SECS: u64 = 10;
/// How long should an analysis session remain in memory?
const SESSION_EXPIRE_SECONDS: u64 = 600;
/// Interface to running Heimdall (start this when lqosd starts)
/// This is async to match the other spawning systems.
pub async fn start_heimdall() {
if set_heimdall_mode(HeimdallMode::WatchOnly).is_err() {
log::error!(
"Unable to set Heimdall Mode. Packet watching will be unavailable."
);
return;
}
let interval_ms = 1000; // 1 second
log::info!("Heimdall check period set to {interval_ms} ms.");
std::thread::spawn(move || {
periodic(interval_ms, "Heimdall Packet Watcher", &mut || {
read_flows();
expire_heimdall_flows();
heimdall_expire();
});
});
}

View File

@ -0,0 +1,50 @@
use std::time::Duration;
use zerocopy::AsBytes;
use crate::perf_interface::{HeimdallEvent, PACKET_OCTET_SIZE};
#[derive(AsBytes)]
#[repr(C)]
pub(crate) struct PcapFileHeader {
magic: u32,
version_major: u16,
version_minor: u16,
thiszone: i32,
sigfigs: u32,
snaplen: u32,
link_type: u32,
}
impl PcapFileHeader {
pub(crate) fn new() -> Self {
Self {
magic: 0xa1b2c3d4,
version_major: 2,
version_minor: 4,
thiszone: 0,
sigfigs: 0,
snaplen: PACKET_OCTET_SIZE as u32,
link_type: 1,
}
}
}
#[derive(AsBytes)]
#[repr(C)]
pub(crate) struct PcapPacketHeader {
ts_sec: u32,
ts_usec: u32,
inc_len: u32, // Octets included
orig_len: u32, // Length the packet used to be
}
impl PcapPacketHeader {
pub(crate) fn from_heimdall(event: &HeimdallEvent) -> Self {
let timestamp_nanos = Duration::from_nanos(event.timestamp);
Self {
ts_sec: timestamp_nanos.as_secs() as u32,
ts_usec: timestamp_nanos.subsec_micros(),
inc_len: u32::min(PACKET_OCTET_SIZE as u32, event.size),
orig_len: event.size
}
}
}

View File

@ -0,0 +1,69 @@
use std::{ffi::c_void, slice};
use lqos_utils::XdpIpAddress;
use zerocopy::FromBytes;
use crate::timeline::store_on_timeline;
/// This constant MUST exactly match PACKET_OCTET_STATE in heimdall.h
pub(crate) const PACKET_OCTET_SIZE: usize = 128;
#[derive(FromBytes, Debug, Clone, PartialEq, Eq, Hash)]
#[repr(C)]
pub struct HeimdallEvent {
pub timestamp: u64,
pub src: XdpIpAddress,
pub dst: XdpIpAddress,
pub src_port : u16,
pub dst_port: u16,
pub ip_protocol: u8,
pub tos: u8,
pub size: u32,
pub tcp_flags: u8,
pub tcp_window: u16,
pub tcp_tsval: u32,
pub tcp_tsecr: u32,
pub packet_data: [u8; PACKET_OCTET_SIZE],
}
/*
Snippet for tcp_flags decoding
if (hdr->fin) flags |= 1;
if (hdr->syn) flags |= 2;
if (hdr->rst) flags |= 4;
if (hdr->psh) flags |= 8;
if (hdr->ack) flags |= 16;
if (hdr->urg) flags |= 32;
if (hdr->ece) flags |= 64;
if (hdr->cwr) flags |= 128;
*/
/// Callback for the Heimdall Perf map system. Called whenever Heimdall has
/// events for the system to read.
///
/// # Safety
///
/// This function is inherently unsafe, because it interfaces directly with
/// C and the Linux-kernel eBPF system.
#[no_mangle]
pub unsafe extern "C" fn heimdall_handle_events(
_ctx: *mut c_void,
data: *mut c_void,
data_size: usize,
) -> i32 {
const EVENT_SIZE: usize = std::mem::size_of::<HeimdallEvent>();
if data_size < EVENT_SIZE {
log::warn!("Warning: incoming data too small in Heimdall buffer");
return 0;
}
//COLLECTED_EVENTS.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let data_u8 = data as *const u8;
let data_slice : &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
if let Some(incoming) = HeimdallEvent::read_from(data_slice) {
store_on_timeline(incoming);
} else {
println!("Failed to decode");
}
0
}

View File

@ -0,0 +1,6 @@
//! Count statistics
use std::sync::atomic::AtomicU64;
/// Perf event counter
pub static COLLECTED_EVENTS: AtomicU64 = AtomicU64::new(0);

View File

@ -0,0 +1,190 @@
use crate::{
heimdall_watch_ip,
pcap::{PcapFileHeader, PcapPacketHeader},
perf_interface::{HeimdallEvent, PACKET_OCTET_SIZE},
set_heimdall_mode, HeimdallMode, SESSION_EXPIRE_SECONDS,
TIMELINE_EXPIRE_SECS,
};
use dashmap::{DashMap, DashSet};
use lqos_bus::{tos_parser, PacketHeader};
use lqos_config::EtcLqos;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::{
fs::{remove_file, File},
io::Write,
path::Path,
sync::atomic::{AtomicBool, AtomicUsize},
time::Duration,
};
use zerocopy::AsBytes;
impl HeimdallEvent {
fn as_header(&self) -> PacketHeader {
let (dscp, ecn) = tos_parser(self.tos);
PacketHeader {
timestamp: self.timestamp,
src: self.src.as_ip().to_string(),
dst: self.dst.as_ip().to_string(),
src_port: self.src_port,
dst_port: self.dst_port,
ip_protocol: self.ip_protocol,
ecn,
dscp,
size: self.size,
tcp_flags: self.tcp_flags,
tcp_window: self.tcp_window,
tcp_tsecr: self.tcp_tsecr,
tcp_tsval: self.tcp_tsval,
}
}
}
struct Timeline {
data: DashSet<HeimdallEvent>,
}
impl Timeline {
fn new() -> Self {
Self { data: DashSet::new() }
}
}
static TIMELINE: Lazy<Timeline> = Lazy::new(Timeline::new);
pub(crate) fn store_on_timeline(event: HeimdallEvent) {
TIMELINE.data.insert(event); // We're moving here deliberately
}
pub(crate) fn expire_timeline() {
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(TIMELINE_EXPIRE_SECS))
.as_nanos() as u64;
TIMELINE.data.retain(|v| v.timestamp > expire);
FOCUS_SESSIONS.retain(|_, v| v.expire < since_boot.as_nanos() as u64);
}
}
struct FocusSession {
expire: u64,
data: DashSet<HeimdallEvent>,
dump_filename: Option<String>,
}
impl Drop for FocusSession {
fn drop(&mut self) {
if let Some(df) = &self.dump_filename {
let path = Path::new(df);
if path.exists() {
let _ = remove_file(path);
}
}
}
}
static HYPERFOCUSED: AtomicBool = AtomicBool::new(false);
static FOCUS_SESSION_ID: AtomicUsize = AtomicUsize::new(0);
static FOCUS_SESSIONS: Lazy<DashMap<usize, FocusSession>> =
Lazy::new(DashMap::new);
/// Tell Heimdall to spend the next 10 seconds obsessing over an IP address,
/// collecting full packet headers. This hurts your CPU, so use it sparingly.
///
/// This spawns a thread that keeps Heimdall in Analysis mode (saving packet
/// data to userspace) for 10 seconds, before reverting to WatchOnly mode.
///
/// You can only do this on one target at a time.
///
/// ## Returns
///
/// * Either `None` or...
/// * The id number of the collection session for analysis.
pub fn hyperfocus_on_target(ip: XdpIpAddress) -> Option<(usize, usize)> {
if HYPERFOCUSED.compare_exchange(
false,
true,
std::sync::atomic::Ordering::Relaxed,
std::sync::atomic::Ordering::Relaxed,
) == Ok(false)
{
// If explicitly set, obtain the capture time. Otherwise, default to
// a reasonable 10 seconds.
let capture_time = if let Ok(cfg) = EtcLqos::load() {
cfg.packet_capture_time.unwrap_or(10)
} else {
10
};
let new_id =
FOCUS_SESSION_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
std::thread::spawn(move || {
for _ in 0..capture_time {
let _ = set_heimdall_mode(HeimdallMode::Analysis);
heimdall_watch_ip(ip);
std::thread::sleep(Duration::from_secs(1));
}
let _ = set_heimdall_mode(HeimdallMode::WatchOnly);
if let Ok(now) = time_since_boot() {
let since_boot = Duration::from(now);
let expire = (since_boot - Duration::from_secs(SESSION_EXPIRE_SECONDS))
.as_nanos() as u64;
FOCUS_SESSIONS.insert(
new_id,
FocusSession {
expire,
data: TIMELINE.data.clone(),
dump_filename: None,
},
);
}
HYPERFOCUSED.store(false, std::sync::atomic::Ordering::Relaxed);
});
Some((new_id, capture_time))
} else {
log::warn!(
"Heimdall was busy and won't start another collection session."
);
None
}
}
pub fn n_second_packet_dump(session_id: usize) -> Option<Vec<PacketHeader>> {
if let Some(session) = FOCUS_SESSIONS.get(&session_id) {
Some(session.data.iter().map(|e| e.as_header()).collect())
} else {
None
}
}
pub fn n_second_pcap(session_id: usize) -> Option<String> {
if let Some(mut session) = FOCUS_SESSIONS.get_mut(&session_id) {
let filename = format!("/tmp/cap_sess_{session_id}");
session.dump_filename = Some(filename.clone());
let path = Path::new(&filename);
let mut out = File::create(path).expect("Unable to create {filename}");
out
.write_all(PcapFileHeader::new().as_bytes())
.expect("Unable to write to {filename}");
session
.data
.iter()
.map(|e| (e.packet_data, e.size, PcapPacketHeader::from_heimdall(&e)))
.for_each(
|(data, size, p)| {
out.write_all(p.as_bytes()).expect("Unable to write to {filename}");
if size < PACKET_OCTET_SIZE as u32 {
out.write_all(&data[0 .. size as usize]).unwrap();
} else {
out.write_all(&data).unwrap();
}
},
);
Some(filename)
} else {
None
}
}

View File

@ -0,0 +1,76 @@
use crate::{HeimdalConfig, HeimdallMode, EXPIRE_WATCHES_SECS};
use dashmap::DashMap;
use lqos_sys::bpf_map::BpfMap;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::time::Duration;
const HEIMDALL_CFG_PATH: &str = "/sys/fs/bpf/heimdall_config";
const HEIMDALL_WATCH_PATH: &str = "/sys/fs/bpf/heimdall_watching";
/// Change the eBPF Heimdall System mode.
pub fn set_heimdall_mode(mode: HeimdallMode) -> anyhow::Result<()> {
let mut map = BpfMap::<u32, HeimdalConfig>::from_path(HEIMDALL_CFG_PATH)?;
map.insert_or_update(&mut 0, &mut HeimdalConfig { mode: mode as u32 })?;
Ok(())
}
#[derive(Clone, Eq, PartialEq, Hash)]
pub struct HeimdallWatching {
expiration: u128,
ip_address: XdpIpAddress,
}
impl HeimdallWatching {
pub fn new(mut ip: XdpIpAddress) -> anyhow::Result<Self> {
let now = time_since_boot()?;
let expire =
Duration::from(now) + Duration::from_secs(EXPIRE_WATCHES_SECS);
let mut map =
BpfMap::<XdpIpAddress, u32>::from_path(HEIMDALL_WATCH_PATH).unwrap();
let _ = map.insert(&mut ip, &mut 1);
Ok(Self { ip_address: ip, expiration: expire.as_nanos() })
}
fn stop_watching(&mut self) {
log::info!("Heimdall stopped watching {}", self.ip_address.as_ip().to_string());
let mut map =
BpfMap::<XdpIpAddress, u32>::from_path(HEIMDALL_WATCH_PATH).unwrap();
map.delete(&mut self.ip_address).unwrap();
}
}
static HEIMDALL_WATCH_LIST: Lazy<DashMap<XdpIpAddress, HeimdallWatching>> =
Lazy::new(DashMap::new);
/// Run this periodically (once per second) to expire any watched traffic
/// flows that haven't received traffic in the last 30 seconds.
pub fn heimdall_expire() {
if let Ok(now) = time_since_boot() {
let now = Duration::from(now).as_nanos();
HEIMDALL_WATCH_LIST.retain(|_k, v| {
if v.expiration < now {
v.stop_watching();
}
v.expiration > now
});
}
}
/// Instruct Heimdall to start watching an IP address.
/// You want to call this when you refresh a flow; it will auto-expire
/// in 30 seconds.
pub fn heimdall_watch_ip(ip: XdpIpAddress) {
if let Some(mut watch) = HEIMDALL_WATCH_LIST.get_mut(&ip) {
if let Ok(now) = time_since_boot() {
let expire =
Duration::from(now) + Duration::from_secs(EXPIRE_WATCHES_SECS);
watch.expiration = expire.as_nanos();
}
} else if let Ok(h) = HeimdallWatching::new(ip) {
log::info!("Heimdall is watching {}", ip.as_ip().to_string());
HEIMDALL_WATCH_LIST.insert(ip, h);
}
}

View File

@ -2,6 +2,7 @@
name = "lqos_node_manager"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[features]
default = ["equinix_tests"]
@ -21,4 +22,4 @@ once_cell = "1"
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]
jemallocator = "0.5"
jemallocator = "0.5"

View File

@ -83,16 +83,18 @@ pub struct LqosStats {
pub bus_requests_since_start: u64,
pub time_to_poll_hosts_us: u64,
pub high_watermark: (u64, u64),
pub tracked_flows: u64,
}
#[get("/api/stats")]
pub async fn stats() -> NoCache<Json<LqosStats>> {
for msg in bus_request(vec![BusRequest::GetLqosStats]).await.unwrap() {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark } = msg {
if let BusResponse::LqosdStats { bus_requests, time_to_poll_hosts, high_watermark, tracked_flows } = msg {
return NoCache::new(Json(LqosStats {
bus_requests_since_start: bus_requests,
time_to_poll_hosts_us: time_to_poll_hosts,
high_watermark
high_watermark,
tracked_flows,
}));
}
}

View File

@ -40,6 +40,7 @@ fn rocket() -> _ {
static_pages::circuit_queue,
config_control::config_page,
network_tree::tree_page,
static_pages::ip_dump,
// Our JS library
static_pages::lqos_js,
static_pages::lqos_css,
@ -67,6 +68,10 @@ fn rocket() -> _ {
queue_info::circuit_info,
queue_info::current_circuit_throughput,
queue_info::watch_circuit,
queue_info::flow_stats,
queue_info::packet_dump,
queue_info::pcap,
queue_info::request_analysis,
config_control::get_nic_list,
config_control::get_current_python_config,
config_control::get_current_lqosd_config,

View File

@ -106,7 +106,7 @@ pub async fn node_names(
#[get("/api/funnel_for_queue/<circuit_id>")]
pub async fn funnel_for_queue(
circuit_id: String,
) -> NoCache<Json<Vec<(usize, NetworkJsonTransport)>>> {
) -> NoCache<MsgPack<Vec<(usize, NetworkJsonTransport)>>> {
let mut result = Vec::new();
let target = SHAPED_DEVICES
@ -127,5 +127,5 @@ pub async fn funnel_for_queue(
result.extend_from_slice(map);
}
}
NoCache::new(Json(result))
NoCache::new(MsgPack(result))
}

View File

@ -1,10 +1,13 @@
use crate::auth_guard::AuthGuard;
use crate::cache_control::NoCache;
use crate::tracker::SHAPED_DEVICES;
use lqos_bus::{bus_request, BusRequest, BusResponse};
use lqos_bus::{bus_request, BusRequest, BusResponse, FlowTransport, PacketHeader, QueueStoreTransit};
use rocket::fs::NamedFile;
use rocket::http::Status;
use rocket::response::content::RawJson;
use rocket::serde::json::Json;
use rocket::serde::Serialize;
use rocket::serde::msgpack::MsgPack;
use std::net::IpAddr;
#[derive(Serialize, Clone)]
@ -28,7 +31,7 @@ pub async fn watch_circuit(
pub async fn circuit_info(
circuit_id: String,
_auth: AuthGuard,
) -> NoCache<Json<CircuitInfo>> {
) -> NoCache<MsgPack<CircuitInfo>> {
if let Some(device) = SHAPED_DEVICES
.read()
.unwrap()
@ -43,13 +46,13 @@ pub async fn circuit_info(
device.upload_max_mbps as u64 * 1_000_000,
),
};
NoCache::new(Json(result))
NoCache::new(MsgPack(result))
} else {
let result = CircuitInfo {
name: "Nameless".to_string(),
capacity: (1_000_000, 1_000_000),
};
NoCache::new(Json(result))
NoCache::new(MsgPack(result))
}
}
@ -57,7 +60,7 @@ pub async fn circuit_info(
pub async fn current_circuit_throughput(
circuit_id: String,
_auth: AuthGuard,
) -> NoCache<Json<Vec<(String, u64, u64)>>> {
) -> NoCache<MsgPack<Vec<(String, u64, u64)>>> {
let mut result = Vec::new();
// Get a list of host counts
// This is really inefficient, but I'm struggling to find a better way.
@ -82,21 +85,81 @@ pub async fn current_circuit_throughput(
}
}
NoCache::new(Json(result))
NoCache::new(MsgPack(result))
}
#[get("/api/raw_queue_by_circuit/<circuit_id>")]
pub async fn raw_queue_by_circuit(
circuit_id: String,
_auth: AuthGuard,
) -> NoCache<RawJson<String>> {
) -> NoCache<MsgPack<QueueStoreTransit>> {
let responses =
bus_request(vec![BusRequest::GetRawQueueData(circuit_id)]).await.unwrap();
let result = match &responses[0] {
BusResponse::RawQueueData(msg) => msg.clone(),
_ => "Unable to request queue".to_string(),
BusResponse::RawQueueData(Some(msg)) => {
*msg.clone()
}
_ => QueueStoreTransit::default()
};
NoCache::new(RawJson(result))
NoCache::new(MsgPack(result))
}
#[get("/api/flows/<ip_list>")]
pub async fn flow_stats(ip_list: String, _auth: AuthGuard) -> NoCache<MsgPack<Vec<(FlowTransport, Option<FlowTransport>)>>> {
let mut result = Vec::new();
let request: Vec<BusRequest> = ip_list.split(',').map(|ip| BusRequest::GetFlowStats(ip.to_string())).collect();
let responses = bus_request(request).await.unwrap();
for r in responses.iter() {
if let BusResponse::FlowData(flow) = r {
result.extend_from_slice(flow);
}
}
NoCache::new(MsgPack(result))
}
#[derive(Serialize, Clone)]
#[serde(crate = "rocket::serde")]
pub enum RequestAnalysisResult {
Fail,
Ok{ session_id: usize, countdown: usize }
}
#[get("/api/request_analysis/<ip>")]
pub async fn request_analysis(ip: String) -> NoCache<Json<RequestAnalysisResult>> {
for r in bus_request(vec![BusRequest::GatherPacketData(ip)]).await.unwrap() {
if let BusResponse::PacketCollectionSession{session_id, countdown} = r {
return NoCache::new(Json(RequestAnalysisResult::Ok{session_id, countdown}));
}
}
NoCache::new(Json(RequestAnalysisResult::Fail))
}
#[get("/api/packet_dump/<id>")]
pub async fn packet_dump(id: usize, _auth: AuthGuard) -> NoCache<Json<Vec<PacketHeader>>> {
let mut result = Vec::new();
for r in bus_request(vec![BusRequest::GetPacketHeaderDump(id)]).await.unwrap() {
if let BusResponse::PacketDump(Some(packets)) = r {
result.extend(packets);
}
}
NoCache::new(Json(result))
}
#[allow(unused_variables)]
#[get("/api/pcap/<id>/<filename>")]
pub async fn pcap(id: usize, filename: String) -> Result<NoCache<NamedFile>, Status> {
// The unusued _filename parameter is there to allow the changing of the
// filename on the client side. See Github issue 291.
for r in bus_request(vec![BusRequest::GetPcapDump(id)]).await.unwrap() {
if let BusResponse::PcapDump(Some(filename)) = r {
return Ok(NoCache::new(NamedFile::open(filename).await.unwrap()));
}
}
Err(Status::NotFound)
}
#[cfg(feature = "equinix_tests")]

View File

@ -48,6 +48,15 @@ pub async fn circuit_queue<'a>(
NoCache::new(NamedFile::open("static/circuit_queue.html").await.ok())
}
// Note that NoCache can be replaced with a cache option
// once the design work is complete.
#[get("/ip_dump")]
pub async fn ip_dump<'a>(
_auth: AuthGuard,
) -> NoCache<Option<NamedFile>> {
NoCache::new(NamedFile::open("static/ip_dump.html").await.ok())
}
// Note that NoCache can be replaced with a cache option
// once the design work is complete.
#[get("/unknown")]

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-globe"></i> Tree</a>
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-tree"></i> Tree</a>
</li>
<li class="nav-item">
<a class="nav-link" aria-current="page" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span id="shapedCount" class="badge badge-pill badge-success green-badge">?</span></a>

View File

@ -0,0 +1,240 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href="/vendor/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="/vendor/solid.min.css">
<link rel="stylesheet" href="/lqos.css">
<title>LibreQoS - Local Node Manager</title>
<script src="/lqos.js"></script>
<script src="/vendor/plotly-2.16.1.min.js"></script>
<script src="/vendor/jquery.min.js"></script><script src="/vendor/msgpack.min.js"></script>
<script defer src="/vendor/bootstrap.bundle.min.js"></script>
</head>
<body class="bg-secondary">
<!-- Navigation -->
<nav class="navbar navbar-expand-lg navbar-dark bg-dark">
<div class="container-fluid">
<a class="navbar-brand" href="/"><img src="/vendor/tinylogo.svg" alt="LibreQoS SVG Logo" width="25" height="25" />&nbsp;LibreQoS</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-tree"></i> Tree</a>
</li>
<li class="nav-item">
<a class="nav-link" aria-current="page" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span id="shapedCount" class="badge badge-pill badge-success green-badge">?</span></a>
</li>
<li class="nav-item">
<a class="nav-link" href="/unknown"><i class="fa fa-address-card"></i> Unknown IPs <span id="unshapedCount" class="badge badge-warning orange-badge">?</span></a>
</li>
</ul>
</div>
<ul class="navbar-nav ms-auto">
<li class="nav-item" id="currentLogin"></li>
<li class="nav-item">
<a class="nav-link" href="#" id="startTest"><i class="fa fa-flag-checkered"></i> Run Bandwidth Test</a>
</li>
<li class="nav-item ms-auto">
<a class="nav-link" href="/config"><i class="fa fa-gear"></i> Configuration</a>
</li>
<li>
<a class="nav-link btn btn-small black-txt" href="#" id="btnReload"><i class="fa fa-refresh"></i> Reload LibreQoS</a>
</li>
</ul>
</div>
</nav>
<div id="container" class="pad4">
<div class="row">
<div class="col-sm-12">
<div class="card bg-light">
<div class="card-body">
<h5 class="card-title"><i class="fa fa-users"></i> Packet Dump</h5>
<div id="pages"></div>
<div id="graph"></div>
<div id="dump">Please Wait... this may take a second.</div>
</div>
</div>
</div>
</div>
</div>
<footer>&copy; 2022-2023, LibreQoE LLC</footer>
<script>
var packets = [];
var pages = 0;
var PAGE_SIZE = 1000;
var target = "";
var current_page = 0;
var capacity = [];
function proto(n) {
switch (n) {
case 6: return "TCP"
case 17: return "UDP"
default: return "ICMP"
}
}
/*
Snippet for tcp_flags decoding
if (hdr->fin) flags |= 1;
if (hdr->syn) flags |= 2;
if (hdr->rst) flags |= 4;
if (hdr->psh) flags |= 8;
if (hdr->ack) flags |= 16;
if (hdr->urg) flags |= 32;
if (hdr->ece) flags |= 64;
if (hdr->cwr) flags |= 128;
*/
function tcp_flags(n) {
let result = "";
if (n & 1) result += "FIN-";
if (n & 2) result += "SYN-";
if (n & 4) result += "RST-";
if (n & 8) result += "PSH-";
if (n & 16) result += "ACK-";
if (n & 32) result += "URG-";
if (n & 64) result += "ECE-";
if (n & 128) result += "CWR-";
return result;
}
function zoomIn() {
PAGE_SIZE /= 2;
current_page /= 2;
pages = packets.length / PAGE_SIZE;
viewPage(current_page);
}
function zoomOut() {
PAGE_SIZE *= 2;
current_page *= 2;
pages = packets.length / PAGE_SIZE;
viewPage(current_page);
}
function paginator(active) {
let paginator = "<a href='/api/pcap/" + target + "/capture-" + circuit_id + "-" + starting_timestamp + ".pcap' class='btn btn-warning'>Download PCAP Dump</a> ";
paginator += "<a href='#' class='btn btn-info' onClick='zoomIn();'>Zoom In</a> ";
paginator += "<a href='#' class='btn btn-info' onClick='zoomOut();'>Zoom Out</a> ( Or drag an area of the graph) <br />";
paginator += "<strong>Jump to page</strong>: ";
for (let i=0; i<pages; i++) {
if (i == active) {
paginator += " " + i + " ";
} else {
paginator += "<a href='#' onclick='viewPage(" + i + ");'>" + i + "</a> ";
}
}
$("#pages").html(paginator);
}
function viewPage(n) {
let start = n * PAGE_SIZE;
let end = Math.min(start + PAGE_SIZE, packets.length);
if (start > packets.length) {
console.log("OOps");
}
let html = "<table class='table table-striped'>";
html += "<thead><th>Time (nanos)</th><th>Proto</th><th>TCP Flags</th><th>Sequence</th><th>Window</th><th>Flow</th><th>Bytes</th><th>ECN</th><th>DSCP</th></thead>";
let x_axis = [];
let y1_axis = [];
let y2_axis = [];
for (let i=start; i<end; ++i) {
html += "<tr>";
html += "<td>" + packets[i].timestamp + "</td>";
html += "<td>" + proto(packets[i].ip_protocol) + "</td>";
if (packets[i].ip_protocol == 6) {
html += "<td>" + tcp_flags(packets[i].tcp_flags) + "</td>";
html += "<td>" + packets[i].tcp_tsval + "/" + packets[i].tcp_tsecr + "</td>";
html += "<td>" + packets[i].tcp_window + "</td>";
} else {
html += "<td></td><td></td><td></td>";
}
if (packets[i].ip_protocol != 1) {
html += "<td>" + packets[i].src + ":" + packets[i].src_port + " -> " + packets[i].dst + ":" + packets[i].dst_port + "</td>";
} else {
html += "<td>" + packets[i].src + " -> " + packets[i].dst + "</td>";
}
html += "<td>" + packets[i].size + "</td>";
html += "<td>" + ecn(packets[i].ecn) + "</td>";
html += "<td>0x" + packets[i].dscp.toString(16) + "</td>";
html += "</tr>";
x_axis.push(packets[i].timestamp);
if (packets[i].src == target) {
y1_axis.push(packets[i].size);
y2_axis.push(0);
} else {
y1_axis.push(0);
y2_axis.push(0.0 - packets[i].size);
}
}
html += "</table>";
$("#dump").html(html);
paginator(n);
// Make the graph
let graph = document.getElementById("graph");
let data = [
{x: x_axis, y:y1_axis, name: 'Download', type: 'scatter', mode: 'markers', error_x: { type: 'percent', value: capacity[0], symetric: false, valueminus: 0 }},
{x: x_axis, y:y2_axis, name: 'Upload', type: 'scatter', mode: 'markers', error_x: { type: 'percent', value: capacity[1], symetric: false, valueminus: 0 }},
];
Plotly.newPlot(graph, data, { margin: { l:0,r:0,b:0,t:0,pad:4 }, yaxis: { automargin: true, title: 'Bytes' }, xaxis: {automargin: true, title: "Nanoseconds"} }, { responsive: true });
}
let circuit_id = null;
let starting_timestamp = null;
function start() {
colorReloadButton();
updateHostCounts();
const params = new Proxy(new URLSearchParams(window.location.search), {
get: (searchParams, prop) => searchParams.get(prop),
});
circuit_id = params.circuit_id;
capacity = [ params.dn, params.up ]; // Bits per second
capacity = [ capacity[0] / 8, capacity[1] / 8 ]; // Bytes per second
capacity = [ capacity[0] / 1e9, capacity[1] / 1e9 ]; // Bytes per nanosecond
target = params.id;
$.get("/api/packet_dump/" + params.id, (data) => {
data.sort((a,b) => a.timestamp - b.timestamp);
let min_ts = null;
for (let i=0; i<data.length; ++i) {
if (min_ts == null || min_ts > data[i].timestamp) {
min_ts = data[i].timestamp;
}
}
for (let i=0; i<data.length; ++i) {
data[i].timestamp -= min_ts;
}
packets = data;
pages = Math.ceil((packets.length / PAGE_SIZE));
starting_timestamp = min_ts;
paginator(0);
viewPage(0);
});
}
$(document).ready(start);
</script>
</body>
</html>

View File

@ -36,6 +36,49 @@ const IpStats = {
"plan": 6,
}
const FlowTrans = {
"src": 0,
"dst": 1,
"proto": 2,
"src_port": 3,
"dst_port": 4,
"bytes": 5,
"packets": 6,
"dscp": 7,
"ecn": 8
}
const CircuitInfo = {
"name" : 0,
"capacity" : 1,
}
const QD = { // Queue data
"history": 0,
"history_head": 1,
"current_download": 2,
"current_upload": 3,
}
const CT = { // Cake transit
"memory_used": 0,
}
const CDT = { // Cake Diff Transit
"bytes": 0,
"packets": 1,
"qlen": 2,
"tins": 3,
}
const CDTT = { // Cake Diff Tin Transit
"sent_bytes": 0,
"backlog_bytes": 1,
"drops": 2,
"marks": 3,
"avg_delay_us": 4,
}
function metaverse_color_ramp(n) {
if (n <= 9) {
return "#32b08c";
@ -271,18 +314,23 @@ class MultiRingBuffer {
plotTotalThroughput(target_div) {
let graph = document.getElementById(target_div);
let total = this.data['total'].sortedY();
let shaped = this.data['shaped'].sortedY();
this.data['total'].prepare();
this.data['shaped'].prepare();
let x = this.data['total'].x_axis;
let data = [
{x: x, y:total.down, name: 'Download', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:total.up, name: 'Upload', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:shaped.down, name: 'Shaped Download', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
{x: x, y:shaped.up, name: 'Shaped Upload', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
let graphData = [
{x: x, y:this.data['total'].sortedY[0], name: 'Download', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:this.data['total'].sortedY[1], name: 'Upload', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:this.data['shaped'].sortedY[0], name: 'Shaped Download', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
{x: x, y:this.data['shaped'].sortedY[1], name: 'Shaped Upload', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
];
Plotly.newPlot(graph, data, { margin: { l:0,r:0,b:0,t:0,pad:4 }, yaxis: { automargin: true }, xaxis: {automargin: true, title: "Time since now (seconds)"} }, { responsive: true });
if (this.plotted == null) {
Plotly.newPlot(graph, graphData, { margin: { l:0,r:0,b:0,t:0,pad:4 }, yaxis: { automargin: true, title: "Traffic (bits)" }, xaxis: {automargin: true, title: "Time since now (seconds)"} }, { responsive: true });
this.plotted = true;
} else {
Plotly.redraw(graph, graphData);
}
}
}
@ -293,10 +341,13 @@ class RingBuffer {
this.download = [];
this.upload = [];
this.x_axis = [];
this.sortedY = [ [], [] ];
for (var i = 0; i < capacity; ++i) {
this.download.push(0.0);
this.upload.push(0.0);
this.x_axis.push(capacity - i);
this.sortedY[0].push(0);
this.sortedY[1].push(0);
}
}
@ -307,27 +358,25 @@ class RingBuffer {
this.head %= this.capacity;
}
sortedY() {
let result = {
down: [],
up: [],
};
prepare() {
let counter = 0;
for (let i=this.head; i<this.capacity; i++) {
result.down.push(this.download[i]);
result.up.push(this.upload[i]);
this.sortedY[0][counter] = this.download[i];
this.sortedY[1][counter] = this.upload[i];
counter++;
}
for (let i=0; i < this.head; i++) {
result.down.push(this.download[i]);
result.up.push(this.upload[i]);
this.sortedY[0][counter] = this.download[i];
this.sortedY[1][counter] = this.upload[i];
counter++;
}
return result;
}
toScatterGraphData() {
let y = this.sortedY();
this.prepare();
let GraphData = [
{ x: this.x_axis, y: y.down, name: 'Download', type: 'scatter' },
{ x: this.x_axis, y: y.up, name: 'Upload', type: 'scatter' },
{ x: this.x_axis, y: this.sortedY[0], name: 'Download', type: 'scatter' },
{ x: this.x_axis, y: this.sortedY[1], name: 'Upload', type: 'scatter' },
];
return GraphData;
}
@ -366,6 +415,36 @@ class RttHistogram {
{ x: this.x, y: this.entries, type: 'bar', marker: { color: this.x, colorscale: 'RdBu' } }
]
let graph = document.getElementById(target_div);
Plotly.newPlot(graph, gData, { margin: { l: 0, r: 0, b: 35, t: 0 }, xaxis: { title: 'TCP Round-Trip Time (ms)' } }, { responsive: true });
if (this.plotted == null) {
Plotly.newPlot(graph, gData, { margin: { l: 40, r: 0, b: 35, t: 0 }, yaxis: { title: "# Hosts" }, xaxis: { title: 'TCP Round-Trip Time (ms)' } }, { responsive: true });
this.plotted = true;
} else {
Plotly.redraw(graph, gData);
}
}
}
function ecn(n) {
switch (n) {
case 0: return "-";
case 1: return "L4S";
case 2: return "ECT0";
case 3: return "CE";
default: return "???";
}
}
function zip(a, b) {
let zipped = [];
for (let i=0; i<a.length; ++i) {
zipped.push(a[i]);
zipped.push(b[i]);
}
return zipped;
}
function zero_to_null(array) {
for (let i=0; i<array.length; ++i) {
if (array[i] == 0) array[i] = null;
}
}

View File

@ -28,7 +28,7 @@
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-globe"></i> Tree</a>
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-tree"></i> Tree</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span
@ -110,7 +110,7 @@
<div class="col-sm-4">
<div class="card bg-light">
<div class="card-body">
<h5 class="card-title"><i class="fa fa-hourglass"></i> Last 5 Minutes</h5>
<h5 class="card-title"><i class="fa fa-dashboard"></i> Last 5 Minutes</h5>
<div id="tpGraph" class="graph98 graph150"></div>
</div>
</div>
@ -130,7 +130,7 @@
<div class="col-sm-4">
<div class="card bg-light">
<div class="card-body">
<h5 class="card-title"><i class="fa fa-globe"></i> Site Funnel</h5>
<h5 class="card-title"><i class="fa fa-tree"></i> Network Tree</h5>
<div id="siteFunnel" class="graph98 graph150"></div>
</div>
</div>
@ -190,12 +190,13 @@
msgPackGet("/api/network_tree_summary/", (data) => {
let table = "<table class='table' style='font-size: 8pt;'>";
for (let i = 0; i < data.length; ++i) {
let id = data[i][0];
let name = data[i][1][NetTrans.name];
if (name.length > 20) {
name = name.substring(0, 20) + "...";
}
table += "<tr>";
table += "<td class='redact'>" + redactText(name) + "</td>";
table += "<td class='redact'><a href='/tree?parent=" + id + "'>" + redactText(name) + "</a></td>";
table += "<td>" + scaleNumber(data[i][1][NetTrans.current_throughput][0] * 8) + "</td>";
table += "<td>" + scaleNumber(data[i][1][NetTrans.current_throughput][1] * 8) + "</td>";
table += "</tr>";

View File

@ -23,7 +23,7 @@
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-globe"></i> Tree</a>
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-tree"></i> Tree</a>
</li>
<li class="nav-item">
<a class="nav-link active" aria-current="page" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span id="shapedCount" class="badge badge-pill badge-success green-badge">?</span></a>

View File

@ -23,7 +23,7 @@
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-globe"></i> Tree</a>
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-tree"></i> Tree</a>
</li>
<li class="nav-item">
<a class="nav-link active" aria-current="page" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span id="shapedCount" class="badge badge-pill badge-success green-badge">?</span></a>

View File

@ -28,7 +28,7 @@
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link active" href="/tree?parent=0"><i class="fa fa-globe"></i> Tree</a>
<a class="nav-link active" href="/tree?parent=0"><i class="fa fa-tree"></i> Tree</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span
@ -87,7 +87,7 @@
<div class="col-sm-4">
<div class="card bg-light">
<div class="card-body">
<h5 class="card-title"><i class="fa fa-globe"></i> <span id="nodeName"
<h5 class="card-title"><i class="fa fa-tree"></i> <span id="nodeName"
style="font-weight: bold;" class='redact'></span></h5>
<strong>DL Limit</strong>: <span id="nodeDL"></span><br />
<strong>UL Limit</strong>: <span id="nodeUL"></span><br />
@ -103,7 +103,7 @@
<div class="col-sm-6">
<div class="card bg-light">
<div class="card-body">
<h5 class="card-title"><i class="fa fa-globe"></i> Child Nodes</h5>
<h5 class="card-title"><i class="fa fa-tree"></i> Child Nodes</h5>
<div id="treeList"></div>
</div>
</div>
@ -146,7 +146,7 @@
for (let i = 0; i < data.length; ++i) {
let nodeDL = scaleNumber(data[i][Circuit.limit][0] * 1000000);
let nodeUL = scaleNumber(data[i][Circuit.limit[1]] * 1000000);
let nodeUL = scaleNumber(data[i][Circuit.limit][1] * 1000000);
if (nodeDL == "0") nodeDL = "Unlimited";
if (nodeUL == "0") nodeUL = "Unlimited";
tbl += "<tr>";

View File

@ -23,7 +23,7 @@
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-globe"></i> Tree</a>
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-tree"></i> Tree</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span id="shapedCount" class="badge badge-pill badge-success green-badge">?</span></a>

View File

@ -2,6 +2,7 @@
name = "lqos_python"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[lib]
name = "lqos_python"

View File

@ -2,6 +2,7 @@
name = "lqos_queue_tracker"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
thiserror = "1"

View File

@ -1,15 +1,15 @@
use crate::{circuit_to_queue::CIRCUIT_TO_QUEUE, still_watching};
use crate::{
circuit_to_queue::CIRCUIT_TO_QUEUE, queue_store::QueueStore, still_watching,
};
use lqos_bus::BusResponse;
pub fn get_raw_circuit_data(circuit_id: &str) -> BusResponse {
still_watching(circuit_id);
if let Some(circuit) = CIRCUIT_TO_QUEUE.get(circuit_id) {
if let Ok(json) = serde_json::to_string(circuit.value()) {
BusResponse::RawQueueData(json)
} else {
BusResponse::RawQueueData(String::new())
}
let cv: QueueStore = circuit.value().clone();
let transit = Box::new(cv.into());
BusResponse::RawQueueData(Some(transit))
} else {
BusResponse::RawQueueData(String::new())
BusResponse::RawQueueData(None)
}
}

View File

@ -1,11 +1,16 @@
use crate::{
queue_diff::{make_queue_diff, QueueDiff},
queue_types::QueueType,
queue_diff::{make_queue_diff, CakeDiffTin, QueueDiff},
queue_types::{
QueueType,
},
NUM_QUEUE_HISTORY,
};
use lqos_bus::{
CakeDiffTinTransit, CakeDiffTransit, CakeTransit, QueueStoreTransit,
};
use serde::Serialize;
#[derive(Debug, Serialize)]
#[derive(Debug, Serialize, Clone)]
pub struct QueueStore {
history: Vec<(QueueDiff, QueueDiff)>,
history_head: usize,
@ -50,3 +55,134 @@ impl QueueStore {
}
}
}
// Note: I'm overriding the warning because the "from only" behaviour
// is actually what we want here.
#[allow(clippy::from_over_into)]
impl Into<QueueStoreTransit> for QueueStore {
fn into(self) -> QueueStoreTransit {
QueueStoreTransit {
history: self
.history
.iter()
.cloned()
.map(|(a, b)| (a.into(), b.into()))
.collect(),
history_head: self.history_head,
//prev_download: self.prev_download.map(|d| d.into()),
//prev_upload: self.prev_upload.map(|u| u.into()),
current_download: self.current_download.into(),
current_upload: self.current_upload.into(),
}
}
}
#[allow(clippy::from_over_into)]
impl Into<CakeDiffTransit> for QueueDiff {
fn into(self) -> CakeDiffTransit {
if let QueueDiff::Cake(c) = &self {
CakeDiffTransit {
bytes: c.bytes,
packets: c.packets,
qlen: c.qlen,
tins: c.tins.iter().cloned().map(|t| t.into()).collect(),
}
} else {
CakeDiffTransit::default()
}
}
}
#[allow(clippy::from_over_into)]
impl Into<CakeDiffTinTransit> for CakeDiffTin {
fn into(self) -> CakeDiffTinTransit {
CakeDiffTinTransit {
sent_bytes: self.sent_bytes,
backlog_bytes: self.backlog_bytes,
drops: self.drops,
marks: self.marks,
avg_delay_us: self.avg_delay_us,
}
}
}
#[allow(clippy::from_over_into)]
impl Into<CakeTransit> for QueueType {
fn into(self) -> CakeTransit {
if let QueueType::Cake(c) = self {
CakeTransit {
//handle: c.handle,
//parent: c.parent,
//options: c.options.into(),
//bytes: c.bytes,
//packets: c.packets,
//overlimits: c.overlimits,
//requeues: c.requeues,
//backlog: c.backlog,
//qlen: c.qlen,
memory_used: c.memory_used,
//memory_limit: c.memory_limit,
//capacity_estimate: c.capacity_estimate,
//min_network_size: c.min_network_size,
//max_network_size: c.max_network_size,
//min_adj_size: c.min_adj_size,
//max_adj_size: c.max_adj_size,
//avg_hdr_offset: c.avg_hdr_offset,
//tins: c.tins.iter().cloned().map(|t| t.into()).collect(),
//drops: c.drops,
}
} else {
CakeTransit::default()
}
}
}
/*
#[allow(clippy::from_over_into)]
impl Into<CakeOptionsTransit> for TcCakeOptions {
fn into(self) -> CakeOptionsTransit {
CakeOptionsTransit {
rtt: self.rtt,
bandwidth: self.bandwidth as u8,
diffserv: self.diffserv as u8,
flowmode: self.flowmode as u8,
ack_filter: self.ack_filter as u8,
nat: self.nat,
wash: self.wash,
ingress: self.ingress,
split_gso: self.split_gso,
raw: self.raw,
overhead: self.overhead,
fwmark: self.fwmark,
}
}
}
#[allow(clippy::from_over_into)]
impl Into<CakeTinTransit> for TcCakeTin {
fn into(self) -> CakeTinTransit {
CakeTinTransit {
//threshold_rate: self.threshold_rate,
//sent_bytes: self.sent_bytes,
//backlog_bytes: self.backlog_bytes,
//target_us: self.target_us,
//interval_us: self.interval_us,
//peak_delay_us: self.peak_delay_us,
//avg_delay_us: self.avg_delay_us,
//base_delay_us: self.base_delay_us,
//sent_packets: self.sent_packets,
//way_indirect_hits: self.way_indirect_hits,
//way_misses: self.way_misses,
//way_collisions: self.way_collisions,
//drops: self.drops,
//ecn_marks: self.ecn_marks,
//ack_drops: self.ack_drops,
//sparse_flows: self.sparse_flows,
//bulk_flows: self.bulk_flows,
//unresponsive_flows: self.unresponsive_flows,
//max_pkt_len: self.max_pkt_len,
//flow_quantum: self.flow_quantum,
}
}
}
*/

View File

@ -1,4 +1,4 @@
mod tc_cake;
pub(crate) mod tc_cake;
mod tc_fq_codel;
mod tc_htb;
mod tc_mq;

View File

@ -27,63 +27,63 @@ string_table_enum!(BandWidth, unlimited); // in the present implementation with
pub struct TcCake {
pub(crate) handle: TcHandle,
pub(crate) parent: TcHandle,
options: TcCakeOptions,
pub(crate) options: TcCakeOptions,
pub(crate) bytes: u64,
pub(crate) packets: u32,
overlimits: u32,
requeues: u32,
pub(crate) overlimits: u32,
pub(crate) requeues: u32,
pub(crate) backlog: u32,
pub(crate) qlen: u32,
memory_used: u32,
memory_limit: u32,
capacity_estimate: u32,
min_network_size: u16,
max_network_size: u16,
min_adj_size: u16,
max_adj_size: u16,
avg_hdr_offset: u16,
pub(crate) memory_used: u32,
pub(crate) memory_limit: u32,
pub(crate) capacity_estimate: u32,
pub(crate) min_network_size: u16,
pub(crate) max_network_size: u16,
pub(crate) min_adj_size: u16,
pub(crate) max_adj_size: u16,
pub(crate) avg_hdr_offset: u16,
pub(crate) tins: Vec<TcCakeTin>,
pub(crate) drops: u32,
}
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
struct TcCakeOptions {
rtt: u64,
bandwidth: BandWidth,
diffserv: DiffServ,
flowmode: FlowMode,
ack_filter: AckFilter,
nat: bool,
wash: bool,
ingress: bool,
split_gso: bool,
raw: bool,
overhead: u16,
fwmark: String,
pub(crate) struct TcCakeOptions {
pub(crate) rtt: u64,
pub(crate) bandwidth: BandWidth,
pub(crate) diffserv: DiffServ,
pub(crate) flowmode: FlowMode,
pub(crate) ack_filter: AckFilter,
pub(crate) nat: bool,
pub(crate) wash: bool,
pub(crate) ingress: bool,
pub(crate) split_gso: bool,
pub(crate) raw: bool,
pub(crate) overhead: u16,
pub(crate) fwmark: TcHandle,
}
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct TcCakeTin {
threshold_rate: u64,
pub(crate) struct TcCakeTin {
pub(crate) threshold_rate: u64,
pub(crate) sent_bytes: u64,
pub(crate) backlog_bytes: u32,
target_us: u32,
interval_us: u32,
peak_delay_us: u32,
pub(crate) target_us: u32,
pub(crate) interval_us: u32,
pub(crate) peak_delay_us: u32,
pub(crate) avg_delay_us: u32,
base_delay_us: u32,
sent_packets: u32,
way_indirect_hits: u16,
way_misses: u16,
way_collisions: u16,
pub(crate) base_delay_us: u32,
pub(crate) sent_packets: u32,
pub(crate) way_indirect_hits: u16,
pub(crate) way_misses: u16,
pub(crate) way_collisions: u16,
pub(crate) drops: u32,
pub(crate) ecn_marks: u32,
ack_drops: u32,
sparse_flows: u16,
bulk_flows: u16,
unresponsive_flows: u16,
max_pkt_len: u16,
flow_quantum: u16,
pub(crate) ack_drops: u32,
pub(crate) sparse_flows: u16,
pub(crate) bulk_flows: u16,
pub(crate) unresponsive_flows: u16,
pub(crate) max_pkt_len: u16,
pub(crate) flow_quantum: u16,
}
impl TcCake {
@ -179,7 +179,7 @@ impl TcCakeOptions {
"raw" => result.raw = value.as_bool().unwrap_or(false),
"overhead" => result.overhead = value.as_u64().unwrap_or(0) as u16,
"fwmark" => {
result.fwmark = value.as_str().unwrap_or("").to_string()
parse_tc_handle!(result.fwmark, value);
}
_ => {
info_once!(

View File

@ -2,7 +2,7 @@ use crate::{
circuit_to_queue::CIRCUIT_TO_QUEUE, interval::QUEUE_MONITOR_INTERVAL,
queue_store::QueueStore, tracking::reader::read_named_queue_from_interface,
};
use log::{info, warn};
use log::info;
use lqos_config::LibreQoSConfig;
use lqos_utils::fdtimer::periodic;
mod reader;
@ -18,7 +18,7 @@ fn track_queues() {
}
let config = LibreQoSConfig::load();
if config.is_err() {
warn!("Unable to read LibreQoS config. Skipping queue collection cycle.");
//warn!("Unable to read LibreQoS config. Skipping queue collection cycle.");
return;
}
let config = config.unwrap();
@ -49,7 +49,9 @@ fn track_queues() {
if let Ok(download) = download {
if let Ok(upload) = upload {
if let Some(mut circuit) = CIRCUIT_TO_QUEUE.get_mut(circuit_id) {
circuit.update(&download[0], &upload[0]);
if !download.is_empty() && !upload.is_empty() {
circuit.update(&download[0], &upload[0]);
}
} else {
// It's new: insert it
if !download.is_empty() && !upload.is_empty() {

View File

@ -2,6 +2,7 @@ use crate::queue_structure::QUEUE_STRUCTURE;
use dashmap::DashMap;
use log::{info, warn};
use lqos_bus::TcHandle;
use lqos_sys::num_possible_cpus;
use lqos_utils::unix_time::unix_now;
use once_cell::sync::Lazy;
@ -32,7 +33,7 @@ pub fn expiration_in_the_future() -> u64 {
pub fn add_watched_queue(circuit_id: &str) {
//info!("Watching queue {circuit_id}");
let max = unsafe { lqos_sys::libbpf_num_possible_cpus() } * 2;
let max = num_possible_cpus().unwrap() * 2;
{
if WATCHED_QUEUES.contains_key(circuit_id) {
warn!("Queue {circuit_id} is already being watched. Duplicate ignored.");
@ -59,7 +60,7 @@ pub fn add_watched_queue(circuit_id: &str) {
};
WATCHED_QUEUES.insert(circuit.circuit_id.as_ref().unwrap().clone(), new_watch);
//info!("Added {circuit_id} to watched queues. Now watching {} queues.", WATCHED_QUEUES.read().len());
//info!("Added {circuit_id} to watched queues. Now watching {} queues.", WATCHED_QUEUES.len());
} else {
warn!("No circuit ID of {circuit_id}");
}

View File

@ -2,6 +2,7 @@
name = "lqos_setup"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
colored = "2"

View File

@ -118,7 +118,7 @@ fn get_bandwidth(up: bool) -> u32 {
const ETC_LQOS_CONF: &str = "lqos_directory = '/opt/libreqos/src'
queue_check_period_ms = 1000
node_id = {NODE_ID}
node_id = \"{NODE_ID}\"
[tuning]
stop_irq_balance = true

View File

@ -2,6 +2,7 @@
name = "lqos_sys"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
nix = "0"
@ -11,6 +12,10 @@ byteorder = "1.4"
lqos_bus = { path = "../lqos_bus" }
lqos_config = { path = "../lqos_config" }
log = "0"
lqos_utils = { path = "../lqos_utils" }
once_cell = "1"
dashmap = "5"
thiserror = "1"
[build-dependencies]
bindgen = "0"

View File

@ -9,6 +9,12 @@
#include "../common/debug.h"
#include "../common/ip_hash.h"
#include "../common/bifrost.h"
#include "../common/tcp_opts.h"
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/tcp.h>
#include <linux/udp.h>
#include <linux/icmp.h>
// Packet dissector for XDP. We don't have any help from Linux at this
// point.
@ -37,6 +43,15 @@ struct dissector_t
// Current VLAN tag. If there are multiple tags, it will be
// the INNER tag.
__be16 current_vlan;
// IP protocol from __UAPI_DEF_IN_IPPROTO
__u8 ip_protocol;
__u16 src_port;
__u16 dst_port;
__u8 tos;
__u8 tcp_flags;
__u16 window;
__u32 tsval;
__u32 tsecr;
};
// Representation of the VLAN header type.
@ -63,18 +78,19 @@ struct pppoe_proto
#define PPP_IPV6 0x57
// Representation of an MPLS label
struct mpls_label {
__be32 entry;
struct mpls_label
{
__be32 entry;
};
#define MPLS_LS_LABEL_MASK 0xFFFFF000
#define MPLS_LS_LABEL_SHIFT 12
#define MPLS_LS_TC_MASK 0x00000E00
#define MPLS_LS_TC_SHIFT 9
#define MPLS_LS_S_MASK 0x00000100
#define MPLS_LS_S_SHIFT 8
#define MPLS_LS_TTL_MASK 0x000000FF
#define MPLS_LS_TTL_SHIFT 0
#define MPLS_LS_LABEL_MASK 0xFFFFF000
#define MPLS_LS_LABEL_SHIFT 12
#define MPLS_LS_TC_MASK 0x00000E00
#define MPLS_LS_TC_SHIFT 9
#define MPLS_LS_S_MASK 0x00000100
#define MPLS_LS_S_SHIFT 8
#define MPLS_LS_TTL_MASK 0x000000FF
#define MPLS_LS_TTL_SHIFT 0
// Constructor for a dissector
// Connects XDP/TC SKB structure to a dissector structure.
@ -84,9 +100,9 @@ struct mpls_label {
//
// Returns TRUE if all is good, FALSE if the process cannot be completed
static __always_inline bool dissector_new(
struct xdp_md *ctx,
struct dissector_t *dissector
) {
struct xdp_md *ctx,
struct dissector_t *dissector)
{
dissector->ctx = ctx;
dissector->start = (void *)(long)ctx->data;
dissector->end = (void *)(long)ctx->data_end;
@ -94,6 +110,10 @@ static __always_inline bool dissector_new(
dissector->l3offset = 0;
dissector->skb_len = dissector->end - dissector->start;
dissector->current_vlan = 0;
dissector->ip_protocol = 0;
dissector->src_port = 0;
dissector->dst_port = 0;
dissector->tos = 0;
// Check that there's room for an ethernet header
if SKB_OVERFLOW (dissector->start, dissector->end, ethhdr)
@ -114,9 +134,9 @@ static __always_inline bool is_ip(__u16 eth_type)
// Locates the layer-3 offset, if present. Fast returns for various
// common non-IP types. Will perform VLAN redirection if requested.
static __always_inline bool dissector_find_l3_offset(
struct dissector_t *dissector,
bool vlan_redirect
) {
struct dissector_t *dissector,
bool vlan_redirect)
{
if (dissector->ethernet_header == NULL)
{
bpf_debug("Ethernet header is NULL, still called offset check.");
@ -149,35 +169,34 @@ static __always_inline bool dissector_find_l3_offset(
case ETH_P_8021AD:
case ETH_P_8021Q:
{
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, vlan_hdr)
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, vlan_hdr)
{
return false;
}
struct vlan_hdr *vlan = (struct vlan_hdr *)
(dissector->start + offset);
struct vlan_hdr *vlan = (struct vlan_hdr *)(dissector->start + offset);
dissector->current_vlan = vlan->h_vlan_TCI;
eth_type = bpf_ntohs(vlan->h_vlan_encapsulated_proto);
offset += sizeof(struct vlan_hdr);
// VLAN Redirection is requested, so lookup a detination and
// switch the VLAN tag if required
if (vlan_redirect) {
#ifdef VERBOSE
bpf_debug("Searching for redirect %u:%u",
dissector->ctx->ingress_ifindex,
bpf_ntohs(dissector->current_vlan)
);
#endif
__u32 key = (dissector->ctx->ingress_ifindex << 16) |
bpf_ntohs(dissector->current_vlan);
struct bifrost_vlan * vlan_info = NULL;
if (vlan_redirect)
{
#ifdef VERBOSE
bpf_debug("Searching for redirect %u:%u",
dissector->ctx->ingress_ifindex,
bpf_ntohs(dissector->current_vlan));
#endif
__u32 key = (dissector->ctx->ingress_ifindex << 16) |
bpf_ntohs(dissector->current_vlan);
struct bifrost_vlan *vlan_info = NULL;
vlan_info = bpf_map_lookup_elem(&bifrost_vlan_map, &key);
if (vlan_info) {
#ifdef VERBOSE
bpf_debug("Redirect to VLAN %u",
bpf_htons(vlan_info->redirect_to)
);
#endif
if (vlan_info)
{
#ifdef VERBOSE
bpf_debug("Redirect to VLAN %u",
bpf_htons(vlan_info->redirect_to));
#endif
vlan->h_vlan_TCI = bpf_htons(vlan_info->redirect_to);
}
}
@ -187,13 +206,12 @@ static __always_inline bool dissector_find_l3_offset(
// Handle PPPoE
case ETH_P_PPP_SES:
{
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, pppoe_proto)
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, pppoe_proto)
{
return false;
}
struct pppoe_proto *pppoe = (struct pppoe_proto *)
(dissector->start + offset);
struct pppoe_proto *pppoe = (struct pppoe_proto *)(dissector->start + offset);
__u16 proto = bpf_ntohs(pppoe->proto);
switch (proto)
{
@ -212,31 +230,39 @@ static __always_inline bool dissector_find_l3_offset(
// WARNING/TODO: Here be dragons; this needs testing.
case ETH_P_MPLS_UC:
case ETH_P_MPLS_MC: {
if SKB_OVERFLOW_OFFSET(dissector->start, dissector-> end,
offset, mpls_label)
case ETH_P_MPLS_MC:
{
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, mpls_label)
{
return false;
}
struct mpls_label * mpls = (struct mpls_label *)
(dissector->start + offset);
struct mpls_label *mpls = (struct mpls_label *)(dissector->start + offset);
// Are we at the bottom of the stack?
offset += 4; // 32-bits
if (mpls->entry & MPLS_LS_S_MASK) {
if (mpls->entry & MPLS_LS_S_MASK)
{
// We've hit the bottom
if SKB_OVERFLOW_OFFSET(dissector->start, dissector->end,
offset, iphdr)
if SKB_OVERFLOW_OFFSET (dissector->start, dissector->end,
offset, iphdr)
{
return false;
}
struct iphdr * iph = (struct iphdr *)(dissector->start + offset);
switch (iph->version) {
case 4: eth_type = ETH_P_IP; break;
case 6: eth_type = ETH_P_IPV6; break;
default: return false;
struct iphdr *iph = (struct iphdr *)(dissector->start + offset);
switch (iph->version)
{
case 4:
eth_type = ETH_P_IP;
break;
case 6:
eth_type = ETH_P_IPV6;
break;
default:
return false;
}
}
} break;
}
break;
// We found something we don't know how to handle - bail out
default:
@ -250,37 +276,148 @@ static __always_inline bool dissector_find_l3_offset(
return true;
}
static __always_inline struct tcphdr *get_tcp_header(struct dissector_t *dissector)
{
if (dissector->eth_type == ETH_P_IP)
{
return (struct tcphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
}
else if (dissector->eth_type == ETH_P_IPV6)
{
return (struct tcphdr *)(dissector->ip_header.ip6h + 1);
}
return NULL;
}
static __always_inline struct udphdr *get_udp_header(struct dissector_t *dissector)
{
if (dissector->eth_type == ETH_P_IP)
{
return (struct udphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
}
else if (dissector->eth_type == ETH_P_IPV6)
{
return (struct udphdr *)(dissector->ip_header.ip6h + 1);
}
return NULL;
}
static __always_inline struct icmphdr *get_icmp_header(struct dissector_t *dissector)
{
if (dissector->eth_type == ETH_P_IP)
{
return (struct icmphdr *)((char *)dissector->ip_header.iph + (dissector->ip_header.iph->ihl * 4));
}
else if (dissector->eth_type == ETH_P_IPV6)
{
return (struct icmphdr *)(dissector->ip_header.ip6h + 1);
}
return NULL;
}
static __always_inline void snoop(struct dissector_t *dissector)
{
switch (dissector->ip_protocol)
{
case IPPROTO_TCP:
{
struct tcphdr *hdr = get_tcp_header(dissector);
if (hdr != NULL)
{
if (hdr + 1 > dissector->end)
{
return;
}
dissector->src_port = hdr->source;
dissector->dst_port = hdr->dest;
__u8 flags = 0;
if (hdr->fin) flags |= 1;
if (hdr->syn) flags |= 2;
if (hdr->rst) flags |= 4;
if (hdr->psh) flags |= 8;
if (hdr->ack) flags |= 16;
if (hdr->urg) flags |= 32;
if (hdr->ece) flags |= 64;
if (hdr->cwr) flags |= 128;
dissector->tcp_flags = flags;
dissector->window = hdr->window;
parse_tcp_ts(hdr, dissector->end, &dissector->tsval, &dissector->tsecr);
}
}
break;
case IPPROTO_UDP:
{
struct udphdr *hdr = get_udp_header(dissector);
if (hdr != NULL)
{
if (hdr + 1 > dissector->end)
{
return;
}
dissector->src_port = hdr->source;
dissector->dst_port = hdr->dest;
}
}
case IPPROTO_ICMP:
{
struct icmphdr *hdr = get_icmp_header(dissector);
if (hdr != NULL)
{
if ((char *)hdr + sizeof(struct icmphdr) > dissector->end)
{
return;
}
dissector->ip_protocol = 1;
dissector->src_port = bpf_ntohs(hdr->type);
dissector->dst_port = bpf_ntohs(hdr->type);
}
}
break;
}
}
// Searches for an IP header.
static __always_inline bool dissector_find_ip_header(
struct dissector_t *dissector
) {
struct dissector_t *dissector)
{
switch (dissector->eth_type)
{
case ETH_P_IP:
{
if (dissector->start + dissector->l3offset + sizeof(struct iphdr) >
dissector->end) {
return false;
if (dissector->start + dissector->l3offset + sizeof(struct iphdr) >
dissector->end)
{
return false;
}
dissector->ip_header.iph = dissector->start + dissector->l3offset;
if (dissector->ip_header.iph + 1 > dissector->end)
return false;
encode_ipv4(dissector->ip_header.iph->saddr, &dissector->src_ip);
encode_ipv4(dissector->ip_header.iph->daddr, &dissector->dst_ip);
dissector->ip_protocol = dissector->ip_header.iph->protocol;
dissector->tos = dissector->ip_header.iph->tos;
snoop(dissector);
return true;
}
break;
case ETH_P_IPV6:
{
if (dissector->start + dissector->l3offset +
sizeof(struct ipv6hdr) > dissector->end) {
return false;
if (dissector->start + dissector->l3offset +
sizeof(struct ipv6hdr) >
dissector->end)
{
return false;
}
dissector->ip_header.ip6h = dissector->start + dissector->l3offset;
if (dissector->ip_header.iph + 1 > dissector->end)
return false;
encode_ipv6(&dissector->ip_header.ip6h->saddr, &dissector->src_ip);
encode_ipv6(&dissector->ip_header.ip6h->daddr, &dissector->dst_ip);
dissector->ip_protocol = dissector->ip_header.ip6h->nexthdr;
dissector->ip_header.ip6h->flow_lbl[0]; // Is this right?
snoop(dissector);
return true;
}
break;

View File

@ -0,0 +1,183 @@
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <bpf/bpf_endian.h>
#include <linux/if_ether.h>
#include <stdbool.h>
#include "maximums.h"
#include "debug.h"
#include "dissector.h"
#define PACKET_OCTET_SIZE 128
// Array containing one element, the Heimdall configuration
struct heimdall_config_t
{
__u32 monitor_mode; // 0 = Off, 1 = Targets only, 2 = Analysis Mode
};
// Pinned map containing the Heimdall config
struct
{
__uint(type, BPF_MAP_TYPE_ARRAY);
__type(key, __u32);
__type(value, struct heimdall_config_t);
__uint(max_entries, 2);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} heimdall_config SEC(".maps");
// Pinned map containing the IP addresses (in packed IPv6 format)
// currently being watched by the Heimdall system.
struct
{
__uint(type, BPF_MAP_TYPE_HASH);
__type(key, struct in6_addr);
__type(value, __u32);
__uint(max_entries, 64);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} heimdall_watching SEC(".maps");
// Perf map for communicating with userspace
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
__uint(max_entries, 256 * 1024 /* 256 KB */);
} heimdall_events SEC(".maps");
// Basic event type to send to userspace when "hyperfocused" on a
// data flow.
struct heimdall_event {
__u64 timetamp;
struct in6_addr src;
struct in6_addr dst;
__u16 src_port;
__u16 dst_port;
__u8 ip_protocol;
__u8 tos;
__u32 size;
__u8 tcp_flags;
__u16 tcp_window;
__u32 tsval;
__u32 tsecr;
__u8 dump[PACKET_OCTET_SIZE];
};
struct heimdall_key
{
struct in6_addr src;
struct in6_addr dst;
__u8 ip_protocol;
__u16 src_port;
__u16 dst_port;
};
struct heimdall_data {
__u64 last_seen;
__u64 bytes;
__u64 packets;
__u8 tos;
};
// Map for tracking flow information in-kernel for watched IPs
struct
{
__uint(type, BPF_MAP_TYPE_LRU_PERCPU_HASH);
__type(key, struct heimdall_key);
__type(value, struct heimdall_data);
__uint(max_entries, MAX_FLOWS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} heimdall SEC(".maps");
static __always_inline __u8 get_heimdall_mode()
{
__u32 index = 0;
struct heimdall_config_t *cfg = (struct heimdall_config_t *)bpf_map_lookup_elem(&heimdall_config, &index);
if (cfg)
{
#ifdef VERBOSE
bpf_debug("Heimdall Mode: %d", cfg->monitor_mode);
#endif
return cfg->monitor_mode;
}
else
{
return 0;
}
}
static __always_inline bool is_heimdall_watching(struct dissector_t *dissector, int effective_direction)
{
if (effective_direction == 2) {
__u32 * watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->src_ip);
if (watching) {
return true;
}
} else {
__u32 * watching = (__u32 *)bpf_map_lookup_elem(&heimdall_watching, &dissector->dst_ip);
if (watching) {
return true;
}
}
return false;
}
static __always_inline void update_heimdall(struct dissector_t *dissector, __u32 size, __u8 mode)
{
if (mode == 1) {
// Don't report any non-ICMP without ports
if (dissector->ip_protocol != 1 && (dissector->src_port == 0 || dissector->dst_port == 0))
return;
// Don't report ICMP with invalid numbers
if (dissector->ip_protocol == 1 && dissector->src_port > 18) return;
struct heimdall_key key = {0};
key.src = dissector->src_ip;
key.dst = dissector->dst_ip;
key.ip_protocol = dissector->ip_protocol;
key.src_port = bpf_ntohs(dissector->src_port);
key.dst_port = bpf_ntohs(dissector->dst_port);
struct heimdall_data *counter = (struct heimdall_data *)bpf_map_lookup_elem(&heimdall, &key);
if (counter)
{
counter->last_seen = bpf_ktime_get_boot_ns();
counter->packets += 1;
counter->bytes += size;
if (dissector->tos != 0)
{
counter->tos = dissector->tos;
}
}
else
{
struct heimdall_data counter = {0};
counter.last_seen = bpf_ktime_get_boot_ns();
counter.bytes = size;
counter.packets = 1;
counter.tos = dissector->tos;
if (bpf_map_update_elem(&heimdall, &key, &counter, BPF_NOEXIST) != 0)
{
bpf_debug("Failed to insert tracking");
}
}
} else if (mode == 2) {
struct heimdall_event event = {0};
event.timetamp = bpf_ktime_get_boot_ns();
event.src = dissector->src_ip;
event.dst = dissector->dst_ip;
event.src_port = dissector->src_port;
event.dst_port = dissector->dst_port;
event.ip_protocol = dissector->ip_protocol;
event.tos = dissector->tos;
event.size = size;
event.tcp_flags = dissector->tcp_flags;
event.tcp_window = dissector->window;
event.tsval = dissector->tsval;
event.tsecr = dissector->tsecr;
//if (size > PACKET_OCTET_SIZE) size = PACKET_OCTET_SIZE;
bpf_probe_read_kernel(&event.dump, PACKET_OCTET_SIZE, dissector->start);
bpf_ringbuf_output(&heimdall_events, &event, sizeof(event), 0);
}
// Commented out because we don't really care - some will be missed
// during very heavy load.
//if (err != 0) {
// bpf_debug("Failed to send perf event %d", err);
//}
}

View File

@ -0,0 +1,75 @@
#pragma once
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <linux/pkt_cls.h>
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/ipv6.h>
#include <linux/tcp.h>
#include <bpf/bpf_endian.h>
#include <stdbool.h>
#define MAX_TCP_OPTIONS 10
/*
* Parses the TSval and TSecr values from the TCP options field. If sucessful
* the TSval and TSecr values will be stored at tsval and tsecr (in network
* byte order).
* Returns 0 if sucessful and -1 on failure
*/
static __always_inline int parse_tcp_ts(
struct tcphdr *tcph,
void *data_end,
__u32 *tsval,
__u32 *tsecr
) {
int len = tcph->doff << 2;
void *opt_end = (void *)tcph + len;
__u8 *pos = (__u8 *)(tcph + 1); // Current pos in TCP options
__u8 i, opt;
volatile __u8
opt_size; // Seems to ensure it's always read of from stack as u8
if (tcph + 1 > data_end || len <= sizeof(struct tcphdr))
return -1;
#pragma unroll // temporary solution until we can identify why the non-unrolled loop gets stuck in an infinite loop
for (i = 0; i < MAX_TCP_OPTIONS; i++)
{
if (pos + 1 > opt_end || pos + 1 > data_end)
return -1;
opt = *pos;
if (opt == 0) // Reached end of TCP options
return -1;
if (opt == 1)
{ // TCP NOP option - advance one byte
pos++;
continue;
}
// Option > 1, should have option size
if (pos + 2 > opt_end || pos + 2 > data_end)
return -1;
opt_size = *(pos + 1);
if (opt_size < 2) // Stop parsing options if opt_size has an invalid value
return -1;
// Option-kind is TCP timestap (yey!)
if (opt == 8 && opt_size == 10)
{
if (pos + 10 > opt_end || pos + 10 > data_end)
return -1;
*tsval = bpf_ntohl(*(__u32 *)(pos + 2));
*tsecr = bpf_ntohl(*(__u32 *)(pos + 6));
return 0;
}
// Some other TCP option - advance option-length bytes
pos += opt_size;
}
return -1;
}

View File

@ -36,6 +36,7 @@ My modifications are Copyright 2022, Herbert Wolverson
#include "debug.h"
#include "ip_hash.h"
#include "dissector_tc.h"
#include "tcp_opts.h"
#define MAX_MEMCMP_SIZE 128
@ -226,66 +227,6 @@ static __always_inline struct flow_state *fstate_from_dfkey(
return is_dfkey ? &df_state->dir1 : &df_state->dir2;
}
/*
* Parses the TSval and TSecr values from the TCP options field. If sucessful
* the TSval and TSecr values will be stored at tsval and tsecr (in network
* byte order).
* Returns 0 if sucessful and -1 on failure
*/
static __always_inline int parse_tcp_ts(
struct tcphdr *tcph,
void *data_end,
__u32 *tsval,
__u32 *tsecr
) {
int len = tcph->doff << 2;
void *opt_end = (void *)tcph + len;
__u8 *pos = (__u8 *)(tcph + 1); // Current pos in TCP options
__u8 i, opt;
volatile __u8
opt_size; // Seems to ensure it's always read of from stack as u8
if (tcph + 1 > data_end || len <= sizeof(struct tcphdr))
return -1;
#pragma unroll // temporary solution until we can identify why the non-unrolled loop gets stuck in an infinite loop
for (i = 0; i < MAX_TCP_OPTIONS; i++)
{
if (pos + 1 > opt_end || pos + 1 > data_end)
return -1;
opt = *pos;
if (opt == 0) // Reached end of TCP options
return -1;
if (opt == 1)
{ // TCP NOP option - advance one byte
pos++;
continue;
}
// Option > 1, should have option size
if (pos + 2 > opt_end || pos + 2 > data_end)
return -1;
opt_size = *(pos + 1);
if (opt_size < 2) // Stop parsing options if opt_size has an invalid value
return -1;
// Option-kind is TCP timestap (yey!)
if (opt == 8 && opt_size == 10)
{
if (pos + 10 > opt_end || pos + 10 > data_end)
return -1;
*tsval = bpf_ntohl(*(__u32 *)(pos + 2));
*tsecr = bpf_ntohl(*(__u32 *)(pos + 6));
return 0;
}
// Some other TCP option - advance option-length bytes
pos += opt_size;
}
return -1;
}
/*
* Attempts to fetch an identifier for TCP packets, based on the TCP timestamp
* option.

View File

@ -17,6 +17,9 @@
#include "common/cpu_map.h"
#include "common/tcp_rtt.h"
#include "common/bifrost.h"
#include "common/heimdall.h"
//#define VERBOSE 1
/* Theory of operation:
1. (Packet arrives at interface)
@ -128,8 +131,18 @@ int xdp_prog(struct xdp_md *ctx)
tc_handle
);
// Send on its way
if (tc_handle != 0) {
// Send data to Heimdall
__u8 heimdall_mode = get_heimdall_mode();
if (heimdall_mode > 0 && is_heimdall_watching(&dissector, effective_direction)) {
#ifdef VERBOSE
bpf_debug("(XDP) Storing Heimdall Data");
#endif
update_heimdall(&dissector, ctx->data_end - ctx->data, heimdall_mode);
}
// Handle CPU redirection if there is one specified
__u32 *cpu_lookup;
cpu_lookup = bpf_map_lookup_elem(&cpus_available, &cpu);

View File

@ -246,70 +246,3 @@ int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj)
out:
return err;
}
/*******************************/
static inline unsigned int bpf_num_possible_cpus(void)
{
static const char *fcpu = "/sys/devices/system/cpu/possible";
unsigned int start, end, possible_cpus = 0;
char buff[128];
FILE *fp;
int n;
fp = fopen(fcpu, "r");
if (!fp) {
printf("Failed to open %s: '%s'!\n", fcpu, strerror(errno));
exit(1);
}
while (fgets(buff, sizeof(buff), fp)) {
n = sscanf(buff, "%u-%u", &start, &end);
if (n == 0) {
printf("Failed to retrieve # possible CPUs!\n");
exit(1);
} else if (n == 1) {
end = start;
}
possible_cpus = start == 0 ? end + 1 : 0;
break;
}
fclose(fp);
return possible_cpus;
}
struct txq_config {
/* lookup key: __u32 cpu; */
__u16 queue_mapping;
__u16 htb_major;
};
bool map_txq_config_base_setup(int map_fd) {
unsigned int possible_cpus = bpf_num_possible_cpus();
struct txq_config txq_cfg;
__u32 cpu;
int err;
if (map_fd < 0) {
fprintf(stderr, "ERR: (bad map_fd:%d) "
"cannot proceed without access to txq_config map\n",
map_fd);
return false;
}
for (cpu = 0; cpu < possible_cpus; cpu++) {
txq_cfg.queue_mapping = cpu + 1;
txq_cfg.htb_major = cpu + 1;
err = bpf_map_update_elem(map_fd, &cpu, &txq_cfg, 0);
if (err) {
fprintf(stderr,
"ERR: %s() updating cpu-key:%d err(%d):%s\n",
__func__, cpu, errno, strerror(errno));
return false;
}
}
return true;
}

View File

@ -8,5 +8,4 @@ extern int tc_detach_egress(int ifindex, bool verbose, bool flush_hook, const ch
extern int tc_attach_ingress(int ifindex, bool verbose, struct lqos_kern *obj);
extern int tc_detach_ingress(int ifindex, bool verbose, bool flush_hook, const char * ifname);
extern __u64 max_tracker_ips();
extern bool map_txq_config_base_setup(int map_fd);
extern void do_not_print();

View File

@ -15,7 +15,7 @@ use std::{
///
/// `K` is the *key* type, indexing the map.
/// `V` is the *value* type, and must exactly match the underlying C data type.
pub(crate) struct BpfMap<K, V> {
pub struct BpfMap<K, V> {
fd: i32,
_key_phantom: PhantomData<K>,
_val_phantom: PhantomData<V>,
@ -29,7 +29,7 @@ where
/// Connect to a BPF map via a filename. Connects the internal
/// file descriptor, which is held until the structure is
/// dropped.
pub(crate) fn from_path(filename: &str) -> Result<Self> {
pub fn from_path(filename: &str) -> Result<Self> {
let filename_c = CString::new(filename)?;
let fd = unsafe { bpf_obj_get(filename_c.as_ptr()) };
if fd < 0 {
@ -43,7 +43,7 @@ where
/// to a vector. Each entry contains a `key, value` tuple.
///
/// This has performance issues due to excessive cloning
pub(crate) fn dump_vec(&self) -> Vec<(K, V)> {
pub fn dump_vec(&self) -> Vec<(K, V)> {
let mut result = Vec::new();
let mut prev_key: *mut K = null_mut();
@ -74,7 +74,7 @@ where
/// Iterates the undlering BPF map, and sends references to the
/// results directly to a callback
pub(crate) fn for_each(&self, callback: &mut dyn FnMut(&K, &V)) {
pub fn for_each(&self, callback: &mut dyn FnMut(&K, &V)) {
let mut prev_key: *mut K = null_mut();
let mut key: K = K::default();
let key_ptr: *mut K = &mut key;
@ -112,7 +112,7 @@ where
///
/// Returns Ok if insertion succeeded, a generic error (no details yet)
/// if it fails.
pub(crate) fn insert(&mut self, key: &mut K, value: &mut V) -> Result<()> {
pub fn insert(&mut self, key: &mut K, value: &mut V) -> Result<()> {
let key_ptr: *mut K = key;
let val_ptr: *mut V = value;
let err = unsafe {
@ -130,9 +130,7 @@ where
}
}
/// Inserts an entry into a BPF map, or updates an existing entry with
/// the same key.
///
/// Inserts an entry into a BPF map.
/// Use this sparingly, because it briefly pauses XDP access to the
/// underlying map (through internal locking we can't reach from
/// userland).
@ -144,7 +142,7 @@ where
///
/// Returns Ok if insertion succeeded, a generic error (no details yet)
/// if it fails.
pub(crate) fn insert_or_update(&mut self, key: &mut K, value: &mut V) -> Result<()> {
pub fn insert_or_update(&mut self, key: &mut K, value: &mut V) -> Result<()> {
let key_ptr: *mut K = key;
let val_ptr: *mut V = value;
let err = unsafe {
@ -152,7 +150,7 @@ where
self.fd,
key_ptr as *mut c_void,
val_ptr as *mut c_void,
BPF_NOEXIST.into(),
0,
)
};
if err != 0 {
@ -171,7 +169,7 @@ where
/// * `key` - the key to delete.
///
/// Return `Ok` if deletion succeeded.
pub(crate) fn delete(&mut self, key: &mut K) -> Result<()> {
pub fn delete(&mut self, key: &mut K) -> Result<()> {
let key_ptr: *mut K = key;
let err = unsafe { bpf_map_delete_elem(self.fd, key_ptr as *mut c_void) };
if err != 0 {
@ -191,7 +189,7 @@ where
/// heavy load, it WILL eventually terminate - but it might
/// take a very long time. Only use this for cleaning up
/// sparsely allocated map data.
pub(crate) fn clear(&mut self) -> Result<()> {
pub fn clear(&mut self) -> Result<()> {
loop {
let mut key = K::default();
let mut prev_key: *mut K = null_mut();
@ -225,7 +223,15 @@ where
Ok(())
}
pub(crate) fn clear_no_repeat(&mut self) -> Result<()> {
/// Delete all entries in the underlying eBPF map.
/// Use this sparingly, it locks the underlying map. Under
/// heavy load, it WILL eventually terminate - but it might
/// take a very long time. Only use this for cleaning up
/// sparsely allocated map data.
///
/// This version skips the "did it really clear?" repeat
/// found in the main version.
pub fn clear_no_repeat(&mut self) -> Result<()> {
let mut key = K::default();
let mut prev_key: *mut K = null_mut();
unsafe {

View File

@ -1,7 +1,6 @@
use anyhow::{Error, Result};
use libbpf_sys::{
bpf_map_get_next_key, bpf_map_lookup_elem, bpf_obj_get,
libbpf_num_possible_cpus,
};
use std::fmt::Debug;
use std::{
@ -10,12 +9,14 @@ use std::{
ptr::null_mut,
};
use crate::num_possible_cpus;
/// Represents an underlying BPF map, accessed via the filesystem.
/// `BpfMap` *only* talks to PER-CPU variants of maps.
///
/// `K` is the *key* type, indexing the map.
/// `V` is the *value* type, and must exactly match the underlying C data type.
pub(crate) struct BpfPerCpuMap<K, V> {
pub struct BpfPerCpuMap<K, V> {
fd: i32,
_key_phantom: PhantomData<K>,
_val_phantom: PhantomData<V>,
@ -29,7 +30,7 @@ where
/// Connect to a PER-CPU BPF map via a filename. Connects the internal
/// file descriptor, which is held until the structure is
/// dropped. The index of the CPU is *not* specified.
pub(crate) fn from_path(filename: &str) -> Result<Self> {
pub fn from_path(filename: &str) -> Result<Self> {
let filename_c = CString::new(filename)?;
let fd = unsafe { bpf_obj_get(filename_c.as_ptr()) };
if fd < 0 {
@ -42,8 +43,8 @@ where
/// Instead of clonining into a vector
/// and allocating, calls `callback` for each key/value slice
/// with references to the data returned from C.
pub(crate) fn for_each(&self, callback: &mut dyn FnMut(&K, &[V])) {
let num_cpus = unsafe { libbpf_num_possible_cpus() };
pub fn for_each(&self, callback: &mut dyn FnMut(&K, &[V])) {
let num_cpus = num_possible_cpus().unwrap();
let mut prev_key: *mut K = null_mut();
let mut key: K = K::default();
let key_ptr: *mut K = &mut key;

View File

@ -1,7 +1,8 @@
use anyhow::{Error, Result};
use libbpf_sys::{bpf_map_update_elem, bpf_obj_get, libbpf_num_possible_cpus};
use libbpf_sys::{bpf_map_update_elem, bpf_obj_get};
use log::info;
use std::{ffi::CString, os::raw::c_void};
use crate::{num_possible_cpus, linux::map_txq_config_base_setup};
//* Provides an interface for querying the number of CPUs eBPF can
//* see, and marking CPUs as available. Currently marks ALL eBPF
@ -33,7 +34,7 @@ impl CpuMapping {
}
pub(crate) fn mark_cpus_available(&self) -> Result<()> {
let cpu_count = unsafe { libbpf_num_possible_cpus() } as u32;
let cpu_count = num_possible_cpus()?;
let queue_size = 2048u32;
let val_ptr: *const u32 = &queue_size;
@ -70,14 +71,7 @@ impl CpuMapping {
}
pub(crate) fn setup_base_txq_config(&self) -> Result<()> {
use crate::lqos_kernel::bpf::map_txq_config_base_setup;
// Should we shell out to the C and do it the easy way?
let result = unsafe { map_txq_config_base_setup(self.fd_txq_config) };
if !result {
Err(Error::msg("Unable to setup TXQ map"))
} else {
Ok(())
}
Ok(map_txq_config_base_setup(self.fd_txq_config)?)
}
}

View File

@ -1,6 +1,7 @@
use crate::{bpf_map::BpfMap, XdpIpAddress};
use crate::bpf_map::BpfMap;
use anyhow::Result;
use lqos_bus::TcHandle;
use lqos_utils::XdpIpAddress;
use std::net::IpAddr;
mod ip_hash_data;
mod ip_hash_key;

View File

@ -1,6 +1,6 @@
use crate::lqos_kernel::{
attach_xdp_and_tc_to_interface, unload_xdp_from_interface,
InterfaceDirection,
InterfaceDirection, bpf::ring_buffer_sample_fn,
};
/// A wrapper-type that stores the interfaces to which the XDP and TC programs should
@ -23,7 +23,9 @@ impl LibreQoSKernels {
///
/// * `to_internet` - the name of the Internet-facing interface (e.g. `eth1`).
/// * `to_isp` - the name of the ISP-network facing interface (e.g. `eth2`).
pub fn new<S: ToString>(to_internet: S, to_isp: S) -> anyhow::Result<Self> {
/// * `heimdall_event_handler` - C function pointer to the ringbuffer
/// event handler exported by Heimdall.
pub fn new<S: ToString>(to_internet: S, to_isp: S, heimdall_event_handler: ring_buffer_sample_fn) -> anyhow::Result<Self> {
let kernel = Self {
to_internet: to_internet.to_string(),
to_isp: to_isp.to_string(),
@ -32,10 +34,12 @@ impl LibreQoSKernels {
attach_xdp_and_tc_to_interface(
&kernel.to_internet,
InterfaceDirection::Internet,
heimdall_event_handler,
)?;
attach_xdp_and_tc_to_interface(
&kernel.to_isp,
InterfaceDirection::IspNetwork,
heimdall_event_handler,
)?;
Ok(kernel)
}
@ -55,6 +59,7 @@ impl LibreQoSKernels {
stick_interface: S,
internet_vlan: u16,
isp_vlan: u16,
heimdall_event_handler: ring_buffer_sample_fn,
) -> anyhow::Result<Self> {
let kernel = Self {
to_internet: stick_interface.to_string(),
@ -64,6 +69,7 @@ impl LibreQoSKernels {
attach_xdp_and_tc_to_interface(
&kernel.to_internet,
InterfaceDirection::OnAStick(internet_vlan, isp_vlan),
heimdall_event_handler,
)?;
Ok(kernel)

View File

@ -7,22 +7,27 @@
//! and statically embeds the result in this crate.
mod bifrost_maps;
mod bpf_map;
mod bpf_per_cpu_map;
/// Provides direct access to LibBPF functionality, as exposed by the
/// built-in, compiled eBPF programs. This is very-low level and should
/// be handled with caution.
pub mod bpf_map;
/// Provides direct access to LibBPF functionality, as exposed by the
/// built-in, compiled eBPF programs. This is very-low level and should
/// be handled with caution.
pub mod bpf_per_cpu_map;
mod cpu_map;
mod ip_mapping;
mod kernel_wrapper;
mod lqos_kernel;
mod tcp_rtt;
mod throughput;
mod xdp_ip_address;
mod linux;
pub use ip_mapping::{
add_ip_to_tc, clear_ips_from_tc, del_ip_from_tc, list_mapped_ips,
};
pub use kernel_wrapper::LibreQoSKernels;
pub use libbpf_sys::libbpf_num_possible_cpus;
pub use linux::num_possible_cpus;
pub use lqos_kernel::max_tracked_ips;
pub use tcp_rtt::{rtt_for_each, RttTrackingEntry};
pub use throughput::{throughput_for_each, HostCounter};
pub use xdp_ip_address::XdpIpAddress;

View File

@ -0,0 +1,6 @@
//! Ports of C code that is very Linux specific.
mod possible_cpus;
mod txq_base_setup;
pub use possible_cpus::num_possible_cpus;
pub(crate) use txq_base_setup::*;

View File

@ -0,0 +1,79 @@
use std::{fs::read_to_string, path::Path};
use log::error;
use thiserror::Error;
const POSSIBLE_CPUS_PATH: &str = "/sys/devices/system/cpu/possible";
/// Query the number of available CPUs from `/sys/devices/system/cpu/possible`,
/// and return the last digit (it will be formatted 0-3 or similar) plus one.
/// So on a 16 CPU system, `0-15` will return `16`.
pub fn num_possible_cpus() -> Result<u32, PossibleCpuError> {
let path = Path::new(POSSIBLE_CPUS_PATH);
if !path.exists() {
error!("Unable to read /sys/devices/system/cpu/possible");
return Err(PossibleCpuError::FileNotFound);
};
let file_contents = read_to_string(path);
if file_contents.is_err() {
error!("Unable to read contents of /sys/devices/system/cpu/possible");
error!("{file_contents:?}");
return Err(PossibleCpuError::UnableToRead);
}
let file_contents = file_contents.unwrap();
parse_cpu_string(&file_contents)
}
fn parse_cpu_string(possible_cpus: &str) -> Result<u32, PossibleCpuError> {
if let Some(last_digit) = possible_cpus.trim().split('-').last() {
if let Ok(n) = last_digit.parse::<u32>() {
Ok(n + 1)
} else {
error!("Unable to parse /sys/devices/system/cpu/possible");
error!("{possible_cpus}");
Err(PossibleCpuError::ParseError)
}
} else {
error!("Unable to parse /sys/devices/system/cpu/possible");
error!("{possible_cpus}");
Err(PossibleCpuError::ParseError)
}
}
#[derive(Error, Debug, Clone, PartialEq)]
pub enum PossibleCpuError {
#[error("Unable to access /sys/devices/system/cpu/possible")]
FileNotFound,
#[error("Unable to read /sys/devices/system/cpu/possible")]
UnableToRead,
#[error("Unable to parse contents of /sys/devices/system/cpu/possible")]
ParseError,
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_unable_to_parse() {
assert_eq!(parse_cpu_string("blah").err().unwrap(), PossibleCpuError::ParseError);
}
#[test]
fn test_four_cpus() {
assert_eq!(4, parse_cpu_string("0-3").unwrap());
}
#[test]
fn test_sixteen_cpus() {
assert_eq!(16, parse_cpu_string("0-15").unwrap());
}
#[test]
fn test_againt_c() {
let cpu_count = unsafe { libbpf_sys::libbpf_num_possible_cpus() } as u32;
assert_eq!(cpu_count, num_possible_cpus().unwrap());
}
}

View File

@ -0,0 +1,51 @@
use std::ffi::c_void;
use libbpf_sys::bpf_map_update_elem;
use log::error;
use thiserror::Error;
use crate::num_possible_cpus;
#[derive(Default)]
#[repr(C)]
struct TxqConfig {
/* lookup key: __u32 cpu; */
queue_mapping: u16,
htb_major: u16,
}
pub fn map_txq_config_base_setup(map_fd: i32) -> Result<(), MapTxqConfigError> {
let possible_cpus = num_possible_cpus().map_err(|_| MapTxqConfigError::NumCpusError)?;
if map_fd < 0 {
error!("ERR: (bad map_fd:{map_fd}) cannot proceed without access to txq_config map");
return Err(MapTxqConfigError::BadMapFd);
}
let mut txq_cfg = TxqConfig::default();
for cpu in 0 .. possible_cpus {
let cpu_u16: u16 = cpu as u16;
txq_cfg.queue_mapping = cpu_u16 + 1;
txq_cfg.htb_major = cpu_u16 + 1;
let key_ptr: *const u32 = &cpu;
let val_ptr: *const TxqConfig = &txq_cfg;
let err = unsafe {
bpf_map_update_elem(map_fd, key_ptr as *const c_void, val_ptr as *mut c_void, 0)
};
if err != 0 {
error!("Unable to update TXQ map");
return Err(MapTxqConfigError::BpfMapUpdateFail);
}
}
Ok(())
}
#[derive(Error, Debug)]
pub enum MapTxqConfigError {
#[error("Unable to determine number of CPUs")]
NumCpusError,
#[error("Bad Mapped File Descriptor")]
BadMapFd,
#[error("Unable to insert into map")]
BpfMapUpdateFail,
}

View File

@ -12,7 +12,7 @@ use libbpf_sys::{
};
use log::{info, warn};
use nix::libc::{geteuid, if_nametoindex};
use std::{ffi::CString, process::Command};
use std::{ffi::{CString, c_void}, process::Command};
pub(crate) mod bpf {
#![allow(warnings, unused)]
@ -74,6 +74,7 @@ pub fn unload_xdp_from_interface(interface_name: &str) -> Result<()> {
fn set_strict_mode() -> Result<()> {
let err = unsafe { libbpf_set_strict_mode(LIBBPF_STRICT_ALL) };
#[cfg(not(debug_assertions))]
unsafe {
bpf::do_not_print();
}
@ -112,6 +113,7 @@ pub enum InterfaceDirection {
pub fn attach_xdp_and_tc_to_interface(
interface_name: &str,
direction: InterfaceDirection,
heimdall_event_handler: bpf::ring_buffer_sample_fn,
) -> Result<()> {
check_root()?;
// Check the interface is valid
@ -156,6 +158,28 @@ pub fn attach_xdp_and_tc_to_interface(
)
}; // Ignoring error, because it's ok to not have something to detach
// Find the heimdall_events perf map by name
let heimdall_events_name = CString::new("heimdall_events").unwrap();
let heimdall_events_map = unsafe { bpf::bpf_object__find_map_by_name((*skeleton).obj, heimdall_events_name.as_ptr()) };
let heimdall_events_fd = unsafe { bpf::bpf_map__fd(heimdall_events_map) };
if heimdall_events_fd < 0 {
log::error!("Unable to load Heimdall Events FD");
return Err(anyhow::Error::msg("Unable to load Heimdall Events FD"));
}
let opts: *const bpf::ring_buffer_opts = std::ptr::null();
let heimdall_perf_buffer = unsafe {
bpf::ring_buffer__new(
heimdall_events_fd,
heimdall_event_handler,
opts as *mut c_void, opts)
};
if unsafe { bpf::libbpf_get_error(heimdall_perf_buffer as *mut c_void) != 0 } {
log::error!("Failed to create Heimdall event buffer");
return Err(anyhow::Error::msg("Failed to create Heimdall event buffer"));
}
let handle = PerfBufferHandle(heimdall_perf_buffer);
std::thread::spawn(|| poll_perf_events(handle));
// Remove any previous entry
let _r = Command::new("tc")
.args(["qdisc", "del", "dev", interface_name, "clsact"])
@ -257,3 +281,20 @@ unsafe fn try_xdp_attach(
}
Ok(())
}
// Handle type used to wrap *mut bpf::perf_buffer and indicate
// that it can be moved. Really unsafe code in theory.
struct PerfBufferHandle(*mut bpf::ring_buffer);
unsafe impl Send for PerfBufferHandle {}
unsafe impl Sync for PerfBufferHandle {}
/// Run this in a thread, or doom will surely hit you
fn poll_perf_events(heimdall_perf_buffer: PerfBufferHandle) {
let heimdall_perf_buffer = heimdall_perf_buffer.0;
loop {
let err = unsafe { bpf::ring_buffer__poll(heimdall_perf_buffer, 100) };
if err < 0 {
log::error!("Error polling perfbuffer");
}
}
}

View File

@ -1,4 +1,6 @@
use crate::{bpf_per_cpu_map::BpfPerCpuMap, XdpIpAddress};
use lqos_utils::XdpIpAddress;
use crate::{bpf_per_cpu_map::BpfPerCpuMap};
/// Representation of the XDP map from map_traffic
#[repr(C)]

View File

@ -2,6 +2,7 @@
name = "lqos_utils"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
@ -9,3 +10,5 @@ nix = "0"
log = "0"
notify = { version = "5.0.0", default-features = false } # Not using crossbeam because of Tokio
thiserror = "1"
byteorder = "1.4"
zerocopy = { version = "0.6.1", features = ["simd"] }

View File

@ -5,3 +5,6 @@ pub mod hex_string;
pub mod packet_scale;
mod string_table_enum;
pub mod unix_time;
mod xdp_ip_address;
pub use xdp_ip_address::XdpIpAddress;

View File

@ -3,7 +3,7 @@ macro_rules! string_table_enum {
($enum_name: ident, $($option:ident),*) => {
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[allow(non_camel_case_types)]
enum $enum_name {
pub(crate) enum $enum_name {
$($option, )*
Unknown
}
@ -41,7 +41,7 @@ macro_rules! dashy_table_enum {
($enum_name: ident, $($option:ident),*) => {
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
#[allow(non_camel_case_types)]
enum $enum_name {
pub(crate) enum $enum_name {
$($option, )*
Unknown
}

View File

@ -1,11 +1,12 @@
use byteorder::{BigEndian, ByteOrder};
use zerocopy::FromBytes;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
/// XdpIpAddress provides helpful conversion between the XDP program's
/// native storage of IP addresses in `[u8; 16]` blocks of bytes and
/// Rust `IpAddr` types.
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, FromBytes)]
pub struct XdpIpAddress(pub [u8; 16]);
impl Default for XdpIpAddress {

View File

@ -2,6 +2,7 @@
name = "lqosd"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[features]
default = ["equinix_tests"]
@ -13,6 +14,7 @@ lqos_config = { path = "../lqos_config" }
lqos_sys = { path = "../lqos_sys" }
lqos_queue_tracker = { path = "../lqos_queue_tracker" }
lqos_utils = { path = "../lqos_utils" }
lqos_heimdall = { path = "../lqos_heimdall" }
tokio = { version = "1", features = [ "full", "parking_lot" ] }
once_cell = "1.17.1"
lqos_bus = { path = "../lqos_bus" }

View File

@ -3,7 +3,7 @@ mod version;
use std::{time::Duration, net::TcpStream, io::Write};
use lqos_bus::anonymous::{AnonymousUsageV1, build_stats};
use lqos_config::{EtcLqos, LibreQoSConfig};
use lqos_sys::libbpf_num_possible_cpus;
use lqos_sys::num_possible_cpus;
use sysinfo::{System, SystemExt, CpuExt};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
@ -36,7 +36,7 @@ fn anonymous_usage_dump() -> anyhow::Result<()> {
if let Some(kernel) = sys.kernel_version() {
data.kernel_version = kernel;
}
data.usable_cores = unsafe { libbpf_num_possible_cpus() } as u32;
data.usable_cores = num_possible_cpus().unwrap_or(0);
let cpu = sys.cpus().first();
if let Some(cpu) = cpu {
data.cpu_brand = cpu.brand().to_string();

View File

@ -1,6 +1,6 @@
use anyhow::Result;
use lqos_bus::{BusResponse, IpMapping, TcHandle};
use lqos_sys::XdpIpAddress;
use lqos_utils::XdpIpAddress;
fn expect_ack(result: Result<()>) -> BusResponse {
if result.is_ok() {

View File

@ -8,6 +8,8 @@ mod throughput_tracker;
mod anonymous_usage;
mod tuning;
mod validation;
use std::net::IpAddr;
use crate::{
file_lock::FileLock,
ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow},
@ -16,6 +18,7 @@ use anyhow::Result;
use log::{info, warn};
use lqos_bus::{BusRequest, BusResponse, UnixSocketServer};
use lqos_config::LibreQoSConfig;
use lqos_heimdall::{n_second_packet_dump, perf_interface::heimdall_handle_events, start_heimdall};
use lqos_queue_tracker::{
add_watched_queue, get_raw_circuit_data, spawn_queue_monitor,
spawn_queue_structure_monitor,
@ -25,7 +28,8 @@ use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals,
};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP};
use stats::{BUS_REQUESTS, TIME_TO_POLL_HOSTS, HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP, FLOWS_TRACKED};
use throughput_tracker::get_flow_stats;
use tokio::join;
mod stats;
@ -61,13 +65,15 @@ async fn main() -> Result<()> {
&config.internet_interface,
config.stick_vlans.1,
config.stick_vlans.0,
Some(heimdall_handle_events),
)?
} else {
LibreQoSKernels::new(&config.internet_interface, &config.isp_interface)?
LibreQoSKernels::new(&config.internet_interface, &config.isp_interface, Some(heimdall_handle_events))?
};
// Spawn tracking sub-systems
join!(
start_heimdall(),
spawn_queue_structure_monitor(),
shaped_devices_tracker::shaped_devices_watcher(),
shaped_devices_tracker::network_json_watcher(),
@ -185,7 +191,27 @@ fn handle_bus_requests(
high_watermark: (
HIGH_WATERMARK_DOWN.load(std::sync::atomic::Ordering::Relaxed),
HIGH_WATERMARK_UP.load(std::sync::atomic::Ordering::Relaxed),
)
),
tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed),
}
}
BusRequest::GetFlowStats(ip) => get_flow_stats(ip),
BusRequest::GetPacketHeaderDump(id) => {
BusResponse::PacketDump(n_second_packet_dump(*id))
}
BusRequest::GetPcapDump(id) => {
BusResponse::PcapDump(lqos_heimdall::n_second_pcap(*id))
}
BusRequest::GatherPacketData(ip) => {
let ip = ip.parse::<IpAddr>();
if let Ok(ip) = ip {
if let Some((session_id, countdown)) = lqos_heimdall::hyperfocus_on_target(ip.into()) {
BusResponse::PacketCollectionSession{session_id, countdown}
} else {
BusResponse::Fail("Busy".to_string())
}
} else {
BusResponse::Fail("Invalid IP".to_string())
}
}
});

View File

@ -41,6 +41,9 @@ fn load_network_json() {
if let Ok(njs) = njs {
let mut write_lock = NETWORK_JSON.write().unwrap();
*write_lock = njs;
std::mem::drop(write_lock);
crate::throughput_tracker::THROUGHPUT_TRACKER
.refresh_circuit_ids();
} else {
warn!("Unable to load network.json");
}

View File

@ -4,3 +4,4 @@ pub static BUS_REQUESTS: AtomicU64 = AtomicU64::new(0);
pub static TIME_TO_POLL_HOSTS: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_DOWN: AtomicU64 = AtomicU64::new(0);
pub static HIGH_WATERMARK_UP: AtomicU64 = AtomicU64::new(0);
pub static FLOWS_TRACKED: AtomicU64 = AtomicU64::new(0);

View File

@ -0,0 +1,14 @@
use std::net::IpAddr;
use lqos_bus::BusResponse;
use lqos_heimdall::heimdall_watch_ip;
use lqos_utils::XdpIpAddress;
pub fn get_flow_stats(ip: &str) -> BusResponse {
let ip = ip.parse::<IpAddr>();
if let Ok(ip) = ip {
let ip = XdpIpAddress::from_ip(ip);
heimdall_watch_ip(ip);
return lqos_heimdall::get_flow_stats(ip);
}
BusResponse::Fail("No Stats or bad IP".to_string())
}

View File

@ -1,13 +1,14 @@
mod throughput_entry;
mod tracking_data;
mod heimdall_data;
pub use heimdall_data::get_flow_stats;
use crate::{
shaped_devices_tracker::NETWORK_JSON,
throughput_tracker::tracking_data::ThroughputTracker, stats::TIME_TO_POLL_HOSTS,
};
use log::{info, warn};
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
use lqos_sys::XdpIpAddress;
use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot};
use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy;
use std::time::Duration;

View File

@ -3,7 +3,8 @@ use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap;
use lqos_bus::TcHandle;
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress};
use lqos_sys::{rtt_for_each, throughput_for_each};
use lqos_utils::XdpIpAddress;
pub struct ThroughputTracker {
pub(crate) cycle: AtomicU64,

View File

@ -2,6 +2,7 @@
name = "lqtop"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
tokio = { version = "1", features = [ "rt", "macros", "net", "io-util", "time" ] }

View File

@ -2,6 +2,7 @@
name = "lqusers"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
clap = { version = "4", features = ["derive"] }

View File

@ -10,4 +10,6 @@ rm -v /sys/fs/bpf/map_ip_to_cpu_and_tc_recip
rm -v /sys/fs/bpf/map_txq_config
rm -v /sys/fs/bpf/bifrost_interface_map
rm -v /sys/fs/bpf/bifrost_vlan_map
rm -v /sys/fs/bpf/heimdall
rm -v /sys/fs/bpf/heimdall_config
rm -v /sys/fs/bpf/heimdall_watching

View File

@ -2,6 +2,7 @@
name = "xdp_iphash_to_cpu_cmdline"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
clap = { version = "4", features = ["derive"] }

View File

@ -2,6 +2,7 @@
name = "xdp_pping"
version = "0.1.0"
edition = "2021"
license = "GPL-2.0-only"
[dependencies]
tokio = { version = "1", features = [ "rt", "macros", "net", "io-util", "time" ] }