mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
First refactor towards the "funnel" - shaped devices in lqosd
ShapedDevices.csv is now monitored in lqosd. This brings some advantages: * The Tracked Devices list now knows the circuit id association for every tracked IP. * The associations auto-update after a ShapedDevices reload. * The webserver is no longer doing Trie lookups to figure out what name to display. Moving forwards, this will allow for stats gathering to group IPs by circuit, and allow calculation of the "funnel".
This commit is contained in:
parent
42c2c63f55
commit
7e5b432253
@ -8,6 +8,9 @@ pub struct IpStats {
|
||||
/// The host's IP address, as detected by the XDP program.
|
||||
pub ip_address: String,
|
||||
|
||||
/// The host's mapped circuit ID
|
||||
pub circuit_id: String,
|
||||
|
||||
/// The current bits-per-second passing through this host. Tuple
|
||||
/// 0 is download, tuple 1 is upload.
|
||||
pub bits_per_second: (u64, u64),
|
||||
|
@ -13,7 +13,6 @@ use lqos_bus::{IpStats, TcHandle};
|
||||
use lqos_config::LibreQoSConfig;
|
||||
use parking_lot::Mutex;
|
||||
use rocket::serde::{json::Json, Deserialize, Serialize};
|
||||
use std::net::IpAddr;
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug)]
|
||||
#[serde(crate = "rocket::serde")]
|
||||
@ -35,23 +34,28 @@ impl From<&IpStats> for IpStatsWithPlan {
|
||||
packets_per_second: i.packets_per_second,
|
||||
median_tcp_rtt: i.median_tcp_rtt,
|
||||
tc_handle: i.tc_handle,
|
||||
circuit_id: String::new(),
|
||||
circuit_id: i.circuit_id.clone(),
|
||||
plan: (0, 0),
|
||||
};
|
||||
if let Ok(ip) = result.ip_address.parse::<IpAddr>() {
|
||||
let lookup = match ip {
|
||||
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
|
||||
IpAddr::V6(ip) => ip,
|
||||
};
|
||||
let cfg = SHAPED_DEVICES.read();
|
||||
if let Some((_, id)) = cfg.trie.longest_match(lookup) {
|
||||
|
||||
if !result.circuit_id.is_empty() {
|
||||
if let Some(circuit) = SHAPED_DEVICES
|
||||
.read()
|
||||
.devices
|
||||
.iter()
|
||||
.find(|sd| sd.circuit_id == result.circuit_id)
|
||||
{
|
||||
let name = if circuit.circuit_name.len() > 20 {
|
||||
&circuit.circuit_name[0..20]
|
||||
} else {
|
||||
&circuit.circuit_name
|
||||
};
|
||||
result.ip_address =
|
||||
format!("{} ({})", cfg.devices[*id].circuit_name, result.ip_address);
|
||||
result.plan.0 = cfg.devices[*id].download_max_mbps;
|
||||
result.plan.1 = cfg.devices[*id].upload_max_mbps;
|
||||
result.circuit_id = cfg.devices[*id].circuit_id.clone();
|
||||
format!("{} ({})", name, result.ip_address);
|
||||
result.plan = (circuit.download_max_mbps, circuit.download_min_mbps);
|
||||
}
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
@ -41,9 +41,8 @@ impl XdpIpAddress {
|
||||
result
|
||||
}
|
||||
|
||||
/// Converts an `XdpIpAddress` type to a Rust `IpAddr` type
|
||||
pub fn as_ip(&self) -> IpAddr {
|
||||
if self.0[0] == 0xFF
|
||||
fn is_v4(&self) -> bool {
|
||||
self.0[0] == 0xFF
|
||||
&& self.0[1] == 0xFF
|
||||
&& self.0[2] == 0xFF
|
||||
&& self.0[3] == 0xFF
|
||||
@ -55,6 +54,31 @@ impl XdpIpAddress {
|
||||
&& self.0[9] == 0xFF
|
||||
&& self.0[10] == 0xFF
|
||||
&& self.0[11] == 0xFF
|
||||
}
|
||||
|
||||
/// Convers an `XdpIpAddress` type to a Rust `IpAddr` type, using
|
||||
/// the in-build mapped function for squishing IPv4 into IPv6
|
||||
pub fn as_ipv6(&self) -> Ipv6Addr {
|
||||
if self.is_v4()
|
||||
{
|
||||
Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15]).to_ipv6_mapped()
|
||||
} else {
|
||||
Ipv6Addr::new(
|
||||
BigEndian::read_u16(&self.0[0..2]),
|
||||
BigEndian::read_u16(&self.0[2..4]),
|
||||
BigEndian::read_u16(&self.0[4..6]),
|
||||
BigEndian::read_u16(&self.0[6..8]),
|
||||
BigEndian::read_u16(&self.0[8..10]),
|
||||
BigEndian::read_u16(&self.0[10..12]),
|
||||
BigEndian::read_u16(&self.0[12..14]),
|
||||
BigEndian::read_u16(&self.0[14..]),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts an `XdpIpAddress` type to a Rust `IpAddr` type
|
||||
pub fn as_ip(&self) -> IpAddr {
|
||||
if self.is_v4()
|
||||
{
|
||||
// It's an IPv4 Address
|
||||
IpAddr::V4(Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15]))
|
||||
|
@ -4,6 +4,7 @@ mod ip_mapping;
|
||||
mod lqos_daht_test;
|
||||
mod program_control;
|
||||
mod throughput_tracker;
|
||||
mod shaped_devices_tracker;
|
||||
mod tuning;
|
||||
mod validation;
|
||||
use crate::{
|
||||
@ -63,7 +64,7 @@ async fn main() -> Result<()> {
|
||||
};
|
||||
|
||||
// Spawn tracking sub-systems
|
||||
join!(spawn_queue_structure_monitor(),);
|
||||
join!(spawn_queue_structure_monitor(), shaped_devices_tracker::shaped_devices_watcher());
|
||||
throughput_tracker::spawn_throughput_monitor();
|
||||
spawn_queue_monitor();
|
||||
|
||||
|
52
src/rust/lqosd/src/shaped_devices_tracker/mod.rs
Normal file
52
src/rust/lqosd/src/shaped_devices_tracker/mod.rs
Normal file
@ -0,0 +1,52 @@
|
||||
use anyhow::Result;
|
||||
use log::{error, info, warn};
|
||||
use lqos_config::ConfigShapedDevices;
|
||||
use lqos_utils::file_watcher::FileWatcher;
|
||||
use once_cell::sync::Lazy;
|
||||
use parking_lot::RwLock;
|
||||
use tokio::task::spawn_blocking;
|
||||
|
||||
pub static SHAPED_DEVICES: Lazy<RwLock<ConfigShapedDevices>> =
|
||||
Lazy::new(|| RwLock::new(ConfigShapedDevices::default()));
|
||||
|
||||
fn load_shaped_devices() {
|
||||
info!("ShapedDevices.csv has changed. Attempting to load it.");
|
||||
let shaped_devices = ConfigShapedDevices::load();
|
||||
if let Ok(new_file) = shaped_devices {
|
||||
info!("ShapedDevices.csv loaded");
|
||||
*SHAPED_DEVICES.write() = new_file;
|
||||
crate::throughput_tracker::THROUGHPUT_TRACKER.write().refresh_circuit_ids();
|
||||
} else {
|
||||
warn!("ShapedDevices.csv failed to load, see previous error messages. Reverting to empty set.");
|
||||
*SHAPED_DEVICES.write() = ConfigShapedDevices::default();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shaped_devices_watcher() {
|
||||
spawn_blocking(|| {
|
||||
info!("Watching for ShapedDevices.csv changes");
|
||||
let _ = watch_for_shaped_devices_changing();
|
||||
});
|
||||
}
|
||||
|
||||
/// Fires up a Linux file system watcher than notifies
|
||||
/// when `ShapedDevices.csv` changes, and triggers a reload.
|
||||
fn watch_for_shaped_devices_changing() -> Result<()> {
|
||||
let watch_path = ConfigShapedDevices::path();
|
||||
if watch_path.is_err() {
|
||||
error!("Unable to generate path for ShapedDevices.csv");
|
||||
return Err(anyhow::Error::msg(
|
||||
"Unable to create path for ShapedDevices.csv",
|
||||
));
|
||||
}
|
||||
let watch_path = watch_path.unwrap();
|
||||
|
||||
let mut watcher = FileWatcher::new("ShapedDevices.csv", watch_path);
|
||||
watcher.set_file_exists_callback(load_shaped_devices);
|
||||
watcher.set_file_created_callback(load_shaped_devices);
|
||||
watcher.set_file_changed_callback(load_shaped_devices);
|
||||
loop {
|
||||
let result = watcher.watch();
|
||||
info!("ShapedDevices watcher returned: {result:?}");
|
||||
}
|
||||
}
|
@ -11,7 +11,7 @@ use std::time::Duration;
|
||||
|
||||
const RETIRE_AFTER_SECONDS: u64 = 30;
|
||||
|
||||
static THROUGHPUT_TRACKER: Lazy<RwLock<ThroughputTracker>> = Lazy::new(|| RwLock::new(ThroughputTracker::new()));
|
||||
pub static THROUGHPUT_TRACKER: Lazy<RwLock<ThroughputTracker>> = Lazy::new(|| RwLock::new(ThroughputTracker::new()));
|
||||
|
||||
pub fn spawn_throughput_monitor() {
|
||||
info!("Starting the bandwidth monitor thread.");
|
||||
@ -62,7 +62,7 @@ fn retire_check(cycle: u64, recent_cycle: u64) -> bool {
|
||||
cycle < recent_cycle + RETIRE_AFTER_SECONDS
|
||||
}
|
||||
|
||||
type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle);
|
||||
type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String);
|
||||
|
||||
pub fn top_n(start: u32, end: u32) -> BusResponse {
|
||||
let mut full_list: Vec<TopList> = {
|
||||
@ -78,6 +78,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
|
||||
te.packets_per_second,
|
||||
te.median_latency(),
|
||||
te.tc_handle,
|
||||
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
@ -94,8 +95,10 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
|
||||
(packets_dn, packets_up),
|
||||
median_rtt,
|
||||
tc_handle,
|
||||
circuit_id,
|
||||
)| IpStats {
|
||||
ip_address: ip.as_ip().to_string(),
|
||||
circuit_id: circuit_id.clone(),
|
||||
bits_per_second: (bytes_dn * 8, bytes_up * 8),
|
||||
packets_per_second: (*packets_dn, *packets_up),
|
||||
median_tcp_rtt: *median_rtt,
|
||||
@ -121,6 +124,7 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
|
||||
te.packets_per_second,
|
||||
te.median_latency(),
|
||||
te.tc_handle,
|
||||
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
@ -137,8 +141,10 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
|
||||
(packets_dn, packets_up),
|
||||
median_rtt,
|
||||
tc_handle,
|
||||
circuit_id,
|
||||
)| IpStats {
|
||||
ip_address: ip.as_ip().to_string(),
|
||||
circuit_id: circuit_id.clone(),
|
||||
bits_per_second: (bytes_dn * 8, bytes_up * 8),
|
||||
packets_per_second: (*packets_dn, *packets_up),
|
||||
median_tcp_rtt: *median_rtt,
|
||||
@ -163,6 +169,7 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
|
||||
te.packets_per_second,
|
||||
te.median_latency(),
|
||||
te.tc_handle,
|
||||
te.circuit_id.as_ref().unwrap_or(&String::new()).clone(),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
@ -180,8 +187,10 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
|
||||
(packets_dn, packets_up),
|
||||
median_rtt,
|
||||
tc_handle,
|
||||
circuit_id,
|
||||
)| IpStats {
|
||||
ip_address: ip.as_ip().to_string(),
|
||||
circuit_id: circuit_id.clone(),
|
||||
bits_per_second: (bytes_dn * 8, bytes_up * 8),
|
||||
packets_per_second: (*packets_dn, *packets_up),
|
||||
median_tcp_rtt: *median_rtt,
|
||||
@ -315,6 +324,7 @@ pub fn all_unknown_ips() -> BusResponse {
|
||||
_last_seen,
|
||||
)| IpStats {
|
||||
ip_address: ip.as_ip().to_string(),
|
||||
circuit_id: String::new(),
|
||||
bits_per_second: (bytes_dn * 8, bytes_up * 8),
|
||||
packets_per_second: (*packets_dn, *packets_up),
|
||||
median_tcp_rtt: *median_rtt,
|
||||
|
@ -2,6 +2,7 @@ use lqos_bus::TcHandle;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ThroughputEntry {
|
||||
pub(crate) circuit_id: Option<String>,
|
||||
pub(crate) first_cycle: u64,
|
||||
pub(crate) most_recent_cycle: u64,
|
||||
pub(crate) bytes: (u64, u64),
|
||||
|
@ -1,3 +1,5 @@
|
||||
use crate::shaped_devices_tracker::SHAPED_DEVICES;
|
||||
|
||||
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
|
||||
use lqos_bus::TcHandle;
|
||||
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress};
|
||||
@ -52,6 +54,23 @@ impl ThroughputTracker {
|
||||
});
|
||||
}
|
||||
|
||||
fn lookup_circuit_id(xdp_ip: &XdpIpAddress) -> Option<String> {
|
||||
let mut circuit_id = None;
|
||||
let lookup = xdp_ip.as_ipv6();
|
||||
let cfg = SHAPED_DEVICES.read();
|
||||
if let Some((_, id)) = cfg.trie.longest_match(lookup) {
|
||||
circuit_id = Some(cfg.devices[*id].circuit_id.clone());
|
||||
}
|
||||
//println!("{lookup:?} Found circuit_id: {circuit_id:?}");
|
||||
circuit_id
|
||||
}
|
||||
|
||||
pub(crate) fn refresh_circuit_ids(&mut self) {
|
||||
self.raw_data.par_iter_mut().for_each(|(ip, data)| {
|
||||
data.circuit_id = Self::lookup_circuit_id(ip);
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn apply_new_throughput_counters(&mut self) {
|
||||
let cycle = self.cycle;
|
||||
let raw_data = &mut self.raw_data;
|
||||
@ -76,6 +95,7 @@ impl ThroughputTracker {
|
||||
}
|
||||
} else {
|
||||
let mut entry = ThroughputEntry {
|
||||
circuit_id: Self::lookup_circuit_id(xdp_ip),
|
||||
first_cycle: self.cycle,
|
||||
most_recent_cycle: 0,
|
||||
bytes: (0, 0),
|
||||
|
Loading…
Reference in New Issue
Block a user