Large batch of improvements:

* The JavaScript RingBuffer structure updated correctly.
* Replaced the funnel graph with text - easier to read.
* Discovered that the current "parking_lot" could become unstable
  under very heavy load, and only with "fat" LTO. Since it's
  no longer recommended (recent change), removed it.
* Replaced the "lazy_static" macro suite with the newly recommended
  "once_cell" system. Less code.
* Full source format.
* Update some dependency versions.
This commit is contained in:
Herbert Wolverson 2023-03-07 21:37:23 +00:00
parent 9fa1318350
commit 67cc8d8e99
37 changed files with 510 additions and 483 deletions

24
src/rust/Cargo.lock generated
View File

@ -587,9 +587,9 @@ dependencies = [
[[package]]
name = "csv"
version = "1.2.0"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af91f40b7355f82b0a891f50e70399475945bb0b0da4f1700ce60761c9d3e359"
checksum = "0b015497079b9a9d69c02ad25de6c0a6edef051ea6360a327d0bd05802ef64ad"
dependencies = [
"csv-core",
"itoa",
@ -1116,9 +1116,9 @@ dependencies = [
[[package]]
name = "io-lifetimes"
version = "1.0.5"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1abeb7a0dd0f8181267ff8adc397075586500b81b28a73e8a0208b00fc170fb3"
checksum = "cfa919a82ea574332e2de6e74b4c36e74d41982b335080fa59d4ef31be20fdf3"
dependencies = [
"libc",
"windows-sys 0.45.0",
@ -1348,13 +1348,11 @@ dependencies = [
"anyhow",
"default-net",
"jemallocator",
"lazy_static",
"lqos_bus",
"lqos_config",
"lqos_utils",
"nix",
"once_cell",
"parking_lot",
"rocket",
"rocket_async_compression",
"sysinfo",
@ -1378,15 +1376,13 @@ name = "lqos_queue_tracker"
version = "0.1.0"
dependencies = [
"criterion",
"lazy_static",
"log",
"log-once",
"lqos_bus",
"lqos_config",
"lqos_sys",
"lqos_utils",
"parking_lot",
"rayon",
"once_cell",
"serde",
"serde_json",
"thiserror",
@ -1445,8 +1441,6 @@ dependencies = [
"lqos_utils",
"nix",
"once_cell",
"parking_lot",
"rayon",
"serde",
"serde_json",
"signal-hook",
@ -2200,18 +2194,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.152"
version = "1.0.153"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
checksum = "3a382c72b4ba118526e187430bb4963cd6d55051ebf13d9b25574d379cc98d20"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.152"
version = "1.0.153"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
checksum = "1ef476a5790f0f6decbc66726b6e5d63680ed518283e64c7df415989d880954f"
dependencies = [
"proc-macro2",
"quote",

View File

@ -114,7 +114,7 @@ pub enum BusRequest {
/// Request details of part of the network tree
GetNetworkMap {
/// The parent of the map to retrieve
parent: usize
parent: usize,
},
/// Retrieves the top N queues from the root level, and summarizes
@ -124,6 +124,12 @@ pub enum BusRequest {
/// Retrieve node names from network.json
GetNodeNamesFromIds(Vec<usize>),
/// Retrieve stats for all queues above a named circuit id
GetFunnel {
/// Circuit being analyzed, as the named circuit id
target: String,
},
/// If running on Equinix (the `equinix_test` feature is enabled),
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]

View File

@ -9,16 +9,16 @@
mod authentication;
mod etc;
mod libre_qos_config;
mod network_json;
mod program_control;
mod shaped_devices;
mod network_json;
pub use authentication::{UserRole, WebUsers};
pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables};
pub use libre_qos_config::LibreQoSConfig;
pub use network_json::{NetworkJson, NetworkJsonNode};
pub use program_control::load_libreqos;
pub use shaped_devices::{ConfigShapedDevices, ShapedDevice};
pub use network_json::{NetworkJson, NetworkJsonNode};
/// Used as a constant in determining buffer preallocation
pub const SUPPORTED_CUSTOMERS: usize = 16_000_000;

View File

@ -122,7 +122,10 @@ impl NetworkJson {
/// Retrieve a cloned copy of all children with a parent containing a specific
/// node index.
pub fn get_cloned_children(&self, index: usize) -> Vec<(usize, NetworkJsonNode)> {
pub fn get_cloned_children(
&self,
index: usize,
) -> Vec<(usize, NetworkJsonNode)> {
self
.nodes
.iter()
@ -158,16 +161,24 @@ impl NetworkJson {
&mut self,
targets: &[usize],
bytes: (u64, u64),
median_rtt: f32,
) {
for idx in targets {
// Safety first: use "get" to ensure that the node exists
if let Some(node) = self.nodes.get_mut(*idx) {
node.current_throughput.0 += bytes.0;
node.current_throughput.1 += bytes.1;
if median_rtt > 0.0 {
node.rtts.push(median_rtt);
} else {
warn!("No network tree entry for index {idx}");
}
}
}
/// Record RTT time in the tree
pub fn add_rtt_cycle(&mut self, targets: &[usize], rtt: f32) {
for idx in targets {
// Safety first: use "get" to ensure that the node exists
if let Some(node) = self.nodes.get_mut(*idx) {
node.rtts.push(rtt);
} else {
warn!("No network tree entry for index {idx}");
}
@ -195,14 +206,13 @@ fn recurse_node(
immediate_parent: usize,
) {
info!("Mapping {name} from network.json");
/*let my_id = if name != "children" {
let mut parents = parents.to_vec();
let my_id = if name != "children" {
parents.push(nodes.len());
nodes.len()
} else {
nodes.len() - 1
};*/
let my_id = nodes.len();
let mut parents = parents.to_vec();
parents.push(my_id);
};
let node = NetworkJsonNode {
parents: parents.to_vec(),
max_throughput: (
@ -215,9 +225,9 @@ fn recurse_node(
rtts: Vec::new(),
};
//if node.name != "children" {
if node.name != "children" {
nodes.push(node);
//}
}
// Recurse children
for (key, value) in json.iter() {

View File

@ -10,8 +10,6 @@ equinix_tests = []
[dependencies]
rocket = { version = "0.5.0-rc.2", features = [ "json", "msgpack", "uuid" ] }
rocket_async_compression = "0.2.0"
lazy_static = "1.4"
parking_lot = "0.12"
lqos_bus = { path = "../lqos_bus" }
lqos_config = { path = "../lqos_config" }
lqos_utils = { path = "../lqos_utils" }

View File

@ -1,7 +1,8 @@
use std::sync::Mutex;
use anyhow::Error;
use lazy_static::*;
use lqos_config::{UserRole, WebUsers};
use parking_lot::Mutex;
use once_cell::sync::Lazy;
use rocket::serde::{json::Json, Deserialize, Serialize};
use rocket::{
http::{Cookie, CookieJar, Status},
@ -9,9 +10,8 @@ use rocket::{
Request,
};
lazy_static! {
static ref WEB_USERS: Mutex<Option<WebUsers>> = Mutex::new(None);
}
static WEB_USERS: Lazy<Mutex<Option<WebUsers>>> =
Lazy::new(|| Mutex::new(None));
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuthGuard {
@ -27,7 +27,7 @@ impl<'r> FromRequest<'r> for AuthGuard {
async fn from_request(
request: &'r Request<'_>,
) -> Outcome<Self, Self::Error> {
let mut lock = WEB_USERS.lock();
let mut lock = WEB_USERS.lock().unwrap();
if lock.is_none() {
if WebUsers::does_users_file_exist().unwrap() {
*lock = Some(WebUsers::load_or_create().unwrap());
@ -82,7 +82,7 @@ pub fn create_first_user(
if WebUsers::does_users_file_exist().unwrap() {
return Json("ERROR".to_string());
}
let mut lock = WEB_USERS.lock();
let mut lock = WEB_USERS.lock().unwrap();
let mut users = WebUsers::load_or_create().unwrap();
users.allow_anonymous(info.allow_anonymous).unwrap();
let token = users
@ -102,7 +102,7 @@ pub struct LoginAttempt {
#[post("/api/login", data = "<info>")]
pub fn login(cookies: &CookieJar, info: Json<LoginAttempt>) -> Json<String> {
let mut lock = WEB_USERS.lock();
let mut lock = WEB_USERS.lock().unwrap();
if lock.is_none() && WebUsers::does_users_file_exist().unwrap() {
*lock = Some(WebUsers::load_or_create().unwrap());
}
@ -126,7 +126,7 @@ pub fn admin_check(auth: AuthGuard) -> Json<bool> {
#[get("/api/username")]
pub fn username(_auth: AuthGuard, cookies: &CookieJar) -> Json<String> {
if let Some(token) = cookies.get("User-Token") {
let lock = WEB_USERS.lock();
let lock = WEB_USERS.lock().unwrap();
if let Some(users) = &*lock {
return Json(users.get_username(token.value()));
}

View File

@ -9,8 +9,8 @@ mod unknown_devices;
use rocket_async_compression::Compression;
mod auth_guard;
mod config_control;
mod queue_info;
mod network_tree;
mod queue_info;
// Use JemAllocator only on supported platforms
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
@ -81,6 +81,7 @@ fn rocket() -> _ {
network_tree::tree_clients,
network_tree::network_tree_summary,
network_tree::node_names,
network_tree::funnel_for_queue,
// Supporting files
static_pages::bootsrap_css,
static_pages::plotly_js,

View File

@ -2,7 +2,10 @@ use std::net::IpAddr;
use lqos_bus::{bus_request, BusRequest, BusResponse};
use lqos_config::NetworkJsonNode;
use rocket::{fs::NamedFile, serde::{json::Json, Serialize}};
use rocket::{
fs::NamedFile,
serde::{json::Json, Serialize},
};
use crate::{cache_control::NoCache, tracker::SHAPED_DEVICES};
@ -28,7 +31,8 @@ pub async fn tree_entry(
}
#[get("/api/network_tree_summary")]
pub async fn network_tree_summary() -> NoCache<Json<Vec<(usize, NetworkJsonNode)>>> {
pub async fn network_tree_summary(
) -> NoCache<Json<Vec<(usize, NetworkJsonNode)>>> {
let responses =
bus_request(vec![BusRequest::TopMapQueues(4)]).await.unwrap();
let result = match &responses[0] {
@ -55,7 +59,7 @@ pub async fn tree_clients(
for msg in
bus_request(vec![BusRequest::GetHostCounter]).await.unwrap().iter()
{
let devices = SHAPED_DEVICES.read();
let devices = SHAPED_DEVICES.read().unwrap();
if let BusResponse::HostCounters(hosts) = msg {
for (ip, down, up) in hosts.iter() {
let lookup = match ip {
@ -71,7 +75,7 @@ pub async fn tree_clients(
limit: (
devices.devices[*c.1].download_max_mbps as u64,
devices.devices[*c.1].upload_max_mbps as u64,
)
),
});
}
}
@ -82,10 +86,14 @@ pub async fn tree_clients(
}
#[post("/api/node_names", data = "<nodes>")]
pub async fn node_names(nodes: Json<Vec<usize>>) -> NoCache<Json<Vec<(usize, String)>>> {
pub async fn node_names(
nodes: Json<Vec<usize>>,
) -> NoCache<Json<Vec<(usize, String)>>> {
let mut result = Vec::new();
for msg in
bus_request(vec![BusRequest::GetNodeNamesFromIds(nodes.0)]).await.unwrap().iter()
for msg in bus_request(vec![BusRequest::GetNodeNamesFromIds(nodes.0)])
.await
.unwrap()
.iter()
{
if let BusResponse::NodeNames(map) = msg {
result.extend_from_slice(map);
@ -94,3 +102,30 @@ pub async fn node_names(nodes: Json<Vec<usize>>) -> NoCache<Json<Vec<(usize, Str
NoCache::new(Json(result))
}
#[get("/api/funnel_for_queue/<circuit_id>")]
pub async fn funnel_for_queue(
circuit_id: String,
) -> NoCache<Json<Vec<(usize, NetworkJsonNode)>>> {
let mut result = Vec::new();
let target = SHAPED_DEVICES
.read()
.unwrap()
.devices
.iter()
.find(|d| d.circuit_id == circuit_id)
.as_ref()
.unwrap()
.parent_node
.clone();
for msg in
bus_request(vec![BusRequest::GetFunnel { target }]).await.unwrap().iter()
{
if let BusResponse::NetworkMap(map) = msg {
result.extend_from_slice(map);
}
}
NoCache::new(Json(result))
}

View File

@ -29,8 +29,12 @@ pub async fn circuit_info(
circuit_id: String,
_auth: AuthGuard,
) -> NoCache<Json<CircuitInfo>> {
if let Some(device) =
SHAPED_DEVICES.read().devices.iter().find(|d| d.circuit_id == circuit_id)
if let Some(device) = SHAPED_DEVICES
.read()
.unwrap()
.devices
.iter()
.find(|d| d.circuit_id == circuit_id)
{
let result = CircuitInfo {
name: device.circuit_name.clone(),
@ -63,7 +67,7 @@ pub async fn current_circuit_throughput(
bus_request(vec![BusRequest::GetHostCounter]).await.unwrap().iter()
{
if let BusResponse::HostCounters(hosts) = msg {
let devices = SHAPED_DEVICES.read();
let devices = SHAPED_DEVICES.read().unwrap();
for (ip, down, up) in hosts.iter() {
let lookup = match ip {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),

View File

@ -13,12 +13,12 @@ static RELOAD_REQUIRED: AtomicBool = AtomicBool::new(false);
pub fn all_shaped_devices(
_auth: AuthGuard,
) -> NoCache<Json<Vec<ShapedDevice>>> {
NoCache::new(Json(SHAPED_DEVICES.read().devices.clone()))
NoCache::new(Json(SHAPED_DEVICES.read().unwrap().devices.clone()))
}
#[get("/api/shaped_devices_count")]
pub fn shaped_devices_count(_auth: AuthGuard) -> NoCache<Json<usize>> {
NoCache::new(Json(SHAPED_DEVICES.read().devices.len()))
NoCache::new(Json(SHAPED_DEVICES.read().unwrap().devices.len()))
}
#[get("/api/shaped_devices_range/<start>/<end>")]
@ -27,7 +27,7 @@ pub fn shaped_devices_range(
end: usize,
_auth: AuthGuard,
) -> NoCache<Json<Vec<ShapedDevice>>> {
let reader = SHAPED_DEVICES.read();
let reader = SHAPED_DEVICES.read().unwrap();
let result: Vec<ShapedDevice> =
reader.devices.iter().skip(start).take(end).cloned().collect();
NoCache::new(Json(result))
@ -39,7 +39,7 @@ pub fn shaped_devices_search(
_auth: AuthGuard,
) -> NoCache<Json<Vec<ShapedDevice>>> {
let term = term.trim().to_lowercase();
let reader = SHAPED_DEVICES.read();
let reader = SHAPED_DEVICES.read().unwrap();
let result: Vec<ShapedDevice> = reader
.devices
.iter()

View File

@ -1,22 +1,15 @@
use lazy_static::*;
use lqos_bus::IpStats;
use parking_lot::RwLock;
use once_cell::sync::Lazy;
use std::sync::RwLock;
lazy_static! {
pub static ref TOP_10_DOWNLOADERS: RwLock<Vec<IpStats>> =
RwLock::new(Vec::with_capacity(10));
}
pub static TOP_10_DOWNLOADERS: Lazy<RwLock<Vec<IpStats>>> =
Lazy::new(|| RwLock::new(Vec::with_capacity(10)));
lazy_static! {
pub static ref WORST_10_RTT: RwLock<Vec<IpStats>> =
RwLock::new(Vec::with_capacity(10));
}
pub static WORST_10_RTT: Lazy<RwLock<Vec<IpStats>>> =
Lazy::new(|| RwLock::new(Vec::with_capacity(10)));
lazy_static! {
pub static ref RTT_HISTOGRAM: RwLock<Vec<u32>> =
RwLock::new(Vec::with_capacity(100));
}
pub static RTT_HISTOGRAM: Lazy<RwLock<Vec<u32>>> =
Lazy::new(|| RwLock::new(Vec::with_capacity(100)));
lazy_static! {
pub static ref HOST_COUNTS: RwLock<(u32, u32)> = RwLock::new((0, 0));
}
pub static HOST_COUNTS: Lazy<RwLock<(u32, u32)>> =
Lazy::new(|| RwLock::new((0, 0)));

View File

@ -1,18 +1,16 @@
use lazy_static::*;
use lqos_bus::IpStats;
use lqos_config::ConfigShapedDevices;
use parking_lot::RwLock;
use once_cell::sync::Lazy;
use std::sync::RwLock;
lazy_static! {
/// Global storage of the shaped devices csv data.
/// Updated by the file system watcher whenever
/// the underlying file changes.
pub static ref SHAPED_DEVICES : RwLock<ConfigShapedDevices> = RwLock::new(ConfigShapedDevices::default());
}
pub static SHAPED_DEVICES: Lazy<RwLock<ConfigShapedDevices>> =
Lazy::new(|| RwLock::new(ConfigShapedDevices::default()));
lazy_static! {
/// Global storage of the shaped devices csv data.
/// Updated by the file system watcher whenever
/// the underlying file changes.
pub static ref UNKNOWN_DEVICES : RwLock<Vec<IpStats>> = RwLock::new(Vec::new());
}
pub static UNKNOWN_DEVICES: Lazy<RwLock<Vec<IpStats>>> =
Lazy::new(|| RwLock::new(Vec::new()));

View File

@ -95,10 +95,10 @@ fn load_shaped_devices() {
let shaped_devices = ConfigShapedDevices::load();
if let Ok(new_file) = shaped_devices {
info!("ShapedDevices.csv loaded");
*SHAPED_DEVICES.write() = new_file;
*SHAPED_DEVICES.write().unwrap() = new_file;
} else {
warn!("ShapedDevices.csv failed to load, see previous error messages. Reverting to empty set.");
*SHAPED_DEVICES.write() = ConfigShapedDevices::default();
*SHAPED_DEVICES.write().unwrap() = ConfigShapedDevices::default();
}
}
@ -138,17 +138,17 @@ async fn get_data_from_server() -> Result<()> {
for r in bus_request(requests).await?.iter() {
match r {
BusResponse::TopDownloaders(stats) => {
*TOP_10_DOWNLOADERS.write() = stats.clone();
*TOP_10_DOWNLOADERS.write().unwrap() = stats.clone();
}
BusResponse::WorstRtt(stats) => {
*WORST_10_RTT.write() = stats.clone();
*WORST_10_RTT.write().unwrap() = stats.clone();
}
BusResponse::RttHistogram(stats) => {
*RTT_HISTOGRAM.write() = stats.clone();
*RTT_HISTOGRAM.write().unwrap() = stats.clone();
}
BusResponse::AllUnknownIps(unknowns) => {
*HOST_COUNTS.write() = (unknowns.len() as u32, 0);
let cfg = SHAPED_DEVICES.read();
*HOST_COUNTS.write().unwrap() = (unknowns.len() as u32, 0);
let cfg = SHAPED_DEVICES.read().unwrap();
let really_unknown: Vec<IpStats> = unknowns
.iter()
.filter(|ip| {
@ -164,8 +164,8 @@ async fn get_data_from_server() -> Result<()> {
})
.cloned()
.collect();
*HOST_COUNTS.write() = (really_unknown.len() as u32, 0);
*UNKNOWN_DEVICES.write() = really_unknown;
*HOST_COUNTS.write().unwrap() = (really_unknown.len() as u32, 0);
*UNKNOWN_DEVICES.write().unwrap() = really_unknown;
}
BusResponse::NotReadyYet => {
warn!("Host system isn't ready to answer all queries yet.");

View File

@ -1,17 +1,13 @@
mod cache;
mod cache_manager;
use self::cache::{
CPU_USAGE, HOST_COUNTS, NUM_CPUS, RAM_USED,
RTT_HISTOGRAM, TOP_10_DOWNLOADERS, TOTAL_RAM,
WORST_10_RTT,
CPU_USAGE, HOST_COUNTS, NUM_CPUS, RAM_USED, RTT_HISTOGRAM,
TOP_10_DOWNLOADERS, TOTAL_RAM, WORST_10_RTT,
};
use crate::auth_guard::AuthGuard;
pub use cache::{SHAPED_DEVICES, UNKNOWN_DEVICES};
pub use cache_manager::update_tracking;
use lazy_static::lazy_static;
use lqos_bus::{IpStats, TcHandle, bus_request, BusRequest, BusResponse};
use lqos_config::LibreQoSConfig;
use parking_lot::Mutex;
use lqos_bus::{bus_request, BusRequest, BusResponse, IpStats, TcHandle};
use rocket::serde::{json::Json, Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -41,6 +37,7 @@ impl From<&IpStats> for IpStatsWithPlan {
if !result.circuit_id.is_empty() {
if let Some(circuit) = SHAPED_DEVICES
.read()
.unwrap()
.devices
.iter()
.find(|sd| sd.circuit_id == result.circuit_id)
@ -50,8 +47,7 @@ impl From<&IpStats> for IpStatsWithPlan {
} else {
&circuit.circuit_name
};
result.ip_address =
format!("{} ({})", name, result.ip_address);
result.ip_address = format!("{} ({})", name, result.ip_address);
result.plan = (circuit.download_max_mbps, circuit.download_min_mbps);
}
}
@ -70,15 +66,20 @@ pub struct ThroughputPerSecond {
}
#[get("/api/current_throughput")]
pub async fn current_throughput(_auth: AuthGuard) -> Json<ThroughputPerSecond> {
pub async fn current_throughput(
_auth: AuthGuard,
) -> Json<ThroughputPerSecond> {
let mut result = ThroughputPerSecond::default();
if let Ok(messages) = bus_request(vec![BusRequest::GetCurrentThroughput]).await {
if let Ok(messages) =
bus_request(vec![BusRequest::GetCurrentThroughput]).await
{
for msg in messages {
if let BusResponse::CurrentThroughput {
bits_per_second,
packets_per_second,
shaped_bits_per_second,
} = msg {
} = msg
{
result.bits_per_second = bits_per_second;
result.packets_per_second = packets_per_second;
result.shaped_bits_per_second = shaped_bits_per_second;
@ -109,33 +110,30 @@ pub fn ram_usage(_auth: AuthGuard) -> Json<Vec<u64>> {
#[get("/api/top_10_downloaders")]
pub fn top_10_downloaders(_auth: AuthGuard) -> Json<Vec<IpStatsWithPlan>> {
let tt: Vec<IpStatsWithPlan> =
TOP_10_DOWNLOADERS.read().iter().map(|tt| tt.into()).collect();
TOP_10_DOWNLOADERS.read().unwrap().iter().map(|tt| tt.into()).collect();
Json(tt)
}
#[get("/api/worst_10_rtt")]
pub fn worst_10_rtt(_auth: AuthGuard) -> Json<Vec<IpStatsWithPlan>> {
let tt: Vec<IpStatsWithPlan> =
WORST_10_RTT.read().iter().map(|tt| tt.into()).collect();
WORST_10_RTT.read().unwrap().iter().map(|tt| tt.into()).collect();
Json(tt)
}
#[get("/api/rtt_histogram")]
pub fn rtt_histogram(_auth: AuthGuard) -> Json<Vec<u32>> {
Json(RTT_HISTOGRAM.read().clone())
Json(RTT_HISTOGRAM.read().unwrap().clone())
}
#[get("/api/host_counts")]
pub fn host_counts(_auth: AuthGuard) -> Json<(u32, u32)> {
let shaped_reader = SHAPED_DEVICES.read();
let shaped_reader = SHAPED_DEVICES.read().unwrap();
let n_devices = shaped_reader.devices.len();
let host_counts = HOST_COUNTS.read();
let host_counts = HOST_COUNTS.read().unwrap();
let unknown = host_counts.0 - host_counts.1;
Json((n_devices as u32, unknown))
}
lazy_static! {
static ref CONFIG: Mutex<LibreQoSConfig> =
Mutex::new(lqos_config::LibreQoSConfig::load().unwrap());
}
//static CONFIG: Lazy<Mutex<LibreQoSConfig>> =
// Lazy::new(|| Mutex::new(lqos_config::LibreQoSConfig::load().unwrap()));

View File

@ -6,12 +6,12 @@ use rocket::serde::json::Json;
#[get("/api/all_unknown_devices")]
pub fn all_unknown_devices(_auth: AuthGuard) -> NoCache<Json<Vec<IpStats>>> {
NoCache::new(Json(UNKNOWN_DEVICES.read().clone()))
NoCache::new(Json(UNKNOWN_DEVICES.read().unwrap().clone()))
}
#[get("/api/unknown_devices_count")]
pub fn unknown_devices_count(_auth: AuthGuard) -> NoCache<Json<usize>> {
NoCache::new(Json(UNKNOWN_DEVICES.read().len()))
NoCache::new(Json(UNKNOWN_DEVICES.read().unwrap().len()))
}
#[get("/api/unknown_devices_range/<start>/<end>")]
@ -20,7 +20,7 @@ pub fn unknown_devices_range(
end: usize,
_auth: AuthGuard,
) -> NoCache<Json<Vec<IpStats>>> {
let reader = UNKNOWN_DEVICES.read();
let reader = UNKNOWN_DEVICES.read().unwrap();
let result: Vec<IpStats> =
reader.iter().skip(start).take(end).cloned().collect();
NoCache::new(Json(result))
@ -29,7 +29,7 @@ pub fn unknown_devices_range(
#[get("/api/unknown_devices_csv")]
pub fn unknown_devices_csv(_auth: AuthGuard) -> NoCache<String> {
let mut result = String::new();
let reader = UNKNOWN_DEVICES.read();
let reader = UNKNOWN_DEVICES.read().unwrap();
for unknown in reader.iter() {
result += &format!("{}\n", unknown.ip_address);

View File

@ -68,6 +68,9 @@
<li class="nav-item" role="presentation">
<button class="nav-link" id="pills-tins-tab" data-bs-toggle="pill" data-bs-target="#pills-tins" type="button" role="tab" aria-controls="pills-profile" aria-selected="false">All Tins</button>
</li>
<li class="nav-item" role="presentation">
<button class="nav-link" id="pills-funnel-tab" data-bs-toggle="pill" data-bs-target="#pills-funnel" type="button" role="tab" aria-controls="pills-funnel" aria-selected="false">Queue Funnel</button>
</li>
</ul>
</div>
<div class="col-sm-2">
@ -139,7 +142,7 @@
</div>
</div>
<div class="tab-pane fade" id="pills-tins" role="tabpanel" aria-labelledby="pills-tins-tab" tabindex="0">
<div class="tab-pane fade" id="pills-tins" role="tabpanel" aria-labelledby="pills-tins-tab" tabindex="1">
<div class="row" class="mtop4">
<div class="col-sm-6">
<div class="card bg-light">
@ -181,6 +184,9 @@
</div>
</div>
</div>
<div class="tab-pane fade" id="pills-funnel" role="tabpanel" aria-labelledby="pills-funnel-tab" tabindex="2">
</div>
</div>
</div>
@ -450,6 +456,63 @@
setTimeout(getThroughput, 1000);
}
let funnels = new MultiRingBuffer(300);
let rtts = {};
let circuitId = "";
function getFunnel(c) {
circuitId = encodeURI(c);
$.get("/api/funnel_for_queue/" + circuitId, (data) => {
let html = "";
for (let i=0; i<data.length; ++i) {
funnels.push(data[i][0], data[i][1].current_throughput[0]*8, data[i][1].current_throughput[1]*8);
rtts[data[i][0]] = new RttHistogram();
let row = "<div class='row row220'>";
row += "<div class='col-sm-6'>";
row += "<div class='card bg-light'>";
row += "<h5 class='card-title'><i class='fa fa-hourglass'></i> <a href='/tree?parent=" + data[i][0] + "'>" + data[i][1].name + " Throughput</a></h5>";
row += "<div id='tp" + data[i][0] + "' class='graph98 graph150'></div>";
row += "</div>";
row += "</div>";
row += "<div class='col-sm-6'>";
row += "<div class='card bg-light'>";
row += "<h5 class='card-title'><i class='fa fa-bar-chart'></i> " + data[i][1].name + " TCP RTT</h5>";
row += "<div id='rtt" + data[i][0] + "' class='graph98 graph150'></div>";
row += "</div>";
row += "</div>";
row += "</div>";
html += row;
}
$("#pills-funnel").html(html);
setTimeout(plotFunnels, 1000);
});
}
function plotFunnels() {
$.get("/api/funnel_for_queue/" + encodeURI(circuitId), (data) => {
for (let i=0; i<data.length; ++i) {
funnels.push(data[i][0], data[i][1].current_throughput[0]*8, data[i][1].current_throughput[1]*8);
for (const [k, v] of Object.entries(funnels.data)) {
let target_div = "tp" + k;
let graphData = v.toScatterGraphData();
let graph = document.getElementById(target_div);
Plotly.newPlot(graph, graphData, { margin: { l:0,r:0,b:0,t:0,pad:4 }, yaxis: { automargin: true }, xaxis: {automargin: true, title: "Time since now (seconds)"} }, { responsive: true });
}
rtts[data[i][0]].clear();
for (let j=0; j<data[i][1].rtts.length; j++) {
rtts[data[i][0]].push(data[i][1].rtts[j]);
}
rtts[data[i][0]].plot("rtt" + data[i][0]);
}
});
setTimeout(plotFunnels, 1000);
}
function start() {
colorReloadButton();
updateHostCounts();
@ -459,6 +522,7 @@
$.get("/api/watch_circuit/" + params.id, () => {
pollQueue();
getThroughput();
getFunnel(params.id);
});
}

View File

@ -190,16 +190,6 @@ const reloadModal = `
</div>
</div>`;
function yValsRingSort(y, head, capacity) {
let result = [];
for (let i=0; i<head; ++i)
result.push(y[i]);
for (let i=head; i<capacity; ++i) {
result.push(y[i])
}
return result;
}
// MultiRingBuffer provides an interface for storing multiple ring-buffers
// of performance data, with a view to them ending up on the same graph.
class MultiRingBuffer {
@ -219,16 +209,13 @@ class MultiRingBuffer {
let graphData = [];
for (const [k, v] of Object.entries(this.data)) {
if (k != rootName) {
let total = v.download.reduce((a, b) => a + b) +
v.upload.reduce((a, b) => a + b);
if (total > 0) {
let dn = { x: v.x_axis, y: yValsRingSort(v.download, v.head, v.capacity), name: k + "_DL", type: 'scatter', stackgroup: 'dn' };
let up = { x: v.x_axis, y: yValsRingSort(v.upload, v.head, v.capacity), name: k + "_UL", type: 'scatter', stackgroup: 'up' };
let y = v.sortedY;
let dn = { x: v.x_axis, y: y.down, name: k + "_DL", type: 'scatter', stackgroup: 'dn' };
let up = { x: v.x_axis, y: y.up, name: k + "_UL", type: 'scatter', stackgroup: 'up' };
graphData.push(dn);
graphData.push(up);
}
}
}
let graph = document.getElementById(target_div);
Plotly.newPlot(
@ -246,17 +233,16 @@ class MultiRingBuffer {
plotTotalThroughput(target_div) {
let graph = document.getElementById(target_div);
let totalDown = yValsRingSort(this.data['total'].download, this.data['total'].head, this.data['total'].capacity);
let totalUp = yValsRingSort(this.data['total'].upload, this.data['total'].head, this.data['total'].capacity);
let shapedDown = yValsRingSort(this.data['shaped'].download, this.data['shaped'].head, this.data['shaped'].capacity);
let shapedUp = yValsRingSort(this.data['shaped'].upload, this.data['shaped'].head, this.data['shaped'].capacity);
let total = this.data['total'].sortedY();
let shaped = this.data['shaped'].sortedY();
let x = this.data['total'].x_axis;
let data = [
{x: x, y:totalDown, name: 'Download', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:totalUp, name: 'Upload', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:shapedDown, name: 'Shaped Download', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
{x: x, y:shapedUp, name: 'Shaped Upload', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
{x: x, y:total.down, name: 'Download', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:total.up, name: 'Upload', type: 'scatter', marker: {color: 'rgb(255,160,122)'}},
{x: x, y:shaped.down, name: 'Shaped Download', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
{x: x, y:shaped.up, name: 'Shaped Upload', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
];
Plotly.newPlot(graph, data, { margin: { l:0,r:0,b:0,t:0,pad:4 }, yaxis: { automargin: true }, xaxis: {automargin: true, title: "Time since now (seconds)"} }, { responsive: true });
}
@ -272,7 +258,7 @@ class RingBuffer {
for (var i = 0; i < capacity; ++i) {
this.download.push(0.0);
this.upload.push(0.0);
this.x_axis.push(0-i);
this.x_axis.push(i);
}
}
@ -283,10 +269,27 @@ class RingBuffer {
this.head %= this.capacity;
}
sortedY() {
let result = {
down: [],
up: [],
};
for (let i=this.head; i<this.capacity; i++) {
result.down.push(this.download[i]);
result.up.push(this.upload[i]);
}
for (let i=0; i < this.head; i++) {
result.down.push(this.download[i]);
result.up.push(this.upload[i]);
}
return result;
}
toScatterGraphData() {
let y = this.sortedY();
let GraphData = [
{ x: this.x_axis, y: this.download, name: 'Download', type: 'scatter' },
{ x: this.x_axis, y: this.upload, name: 'Upload', type: 'scatter' },
{ x: this.x_axis, y: y.down, name: 'Download', type: 'scatter' },
{ x: this.x_axis, y: y.up, name: 'Upload', type: 'scatter' },
];
return GraphData;
}
@ -316,6 +319,10 @@ class RttHistogram {
this.entries[band] += 1;
}
pushBand(band, n) {
this.entries[band] += n;
}
plot(target_div) {
let gData = [
{ x: this.x, y: this.entries, type: 'bar', marker: { color: this.x, colorscale: 'RdBu' } }

View File

@ -168,7 +168,7 @@
<footer>&copy; 2022-2023, LibreQoE LLC</footer>
<script>
let throughput = new MultiRingBuffer(300);
var throughput = new MultiRingBuffer(300);
function updateCurrentThroughput() {
$.get("/api/current_throughput", (tp) => {
@ -181,21 +181,28 @@
throughput.push("total", tp.bits_per_second[0], tp.bits_per_second[1]);
throughput.push("shaped", tp.shaped_bits_per_second[0], tp.shaped_bits_per_second[1]);
throughput.plotTotalThroughput("tpGraph");
setTimeout(updateCurrentThroughput, 1000);
});
}
let funnelData = new MultiRingBuffer(300);
var funnelData = new MultiRingBuffer(300);
function updateSiteFunnel() {
$.get("/api/network_tree_summary/", (data) => {
let table = "<table class='table' style='font-size: 8pt;'>";
for (let i = 0; i < data.length; ++i) {
funnelData.push(data[i][1].name, data[i][1].current_throughput[0] * 8, data[i][1].current_throughput[1] * 8);
let name = data[i][1].name;
if (name.length > 20) {
name = name.substring(0, 20) + "...";
}
funnelData.plotStackedBars("siteFunnel", "");
table += "<tr>";
table += "<td class='redact'>" + redactText(name) + "</td>";
table += "<td>" + scaleNumber(data[i][1].current_throughput[0] * 8) + "</td>";
table += "<td>" + scaleNumber(data[i][1].current_throughput[1] * 8) + "</td>";
table += "</tr>";
}
table += "</table>";
$("#siteFunnel").html(table);
});
setTimeout(updateSiteFunnel, 1000);
}
function updateCpu() {
@ -216,7 +223,6 @@
yaxis: { automargin: true, autorange: false, range: [0.0, 100.0] },
},
{ responsive: true });
setTimeout(updateCpu, 2000);
});
}
@ -229,7 +235,6 @@
type: 'pie'
}];
Plotly.newPlot(graph, data, { margin: { l: 0, r: 0, b: 0, t: 12 }, showlegend: false }, { responsive: true });
setTimeout(updateRam, 30000);
});
}
@ -262,30 +267,48 @@
function updateTop10() {
$.get("/api/top_10_downloaders", (tt) => {
updateNTable('#top10dl', tt);
setTimeout(updateTop10, 5000);
});
}
function updateWorst10() {
$.get("/api/worst_10_rtt", (tt) => {
updateNTable('#worstRtt', tt);
setTimeout(updateWorst10, 5000);
});
}
let rttGraph = new RttHistogram();
var rttGraph = new RttHistogram();
function updateHistogram() {
$.get("/api/rtt_histogram", (rtt) => {
rttGraph.clear();
for (let i = 0; i < rtt.length; i++) {
rttGraph.push(rtt[i]);
rttGraph.pushBand(i, rtt[i]);
}
rttGraph.plot("rttHistogram");
setTimeout(updateHistogram, 5000);
});
}
var tickCount = 0;
function OneSecondCadence() {
updateCurrentThroughput();
updateSiteFunnel();
if (tickCount % 5 == 0) {
updateHistogram();
updateWorst10();
updateTop10();
}
if (tickCount % 10 == 0) {
updateCpu();
updateRam();
}
tickCount++;
setTimeout(OneSecondCadence, 1000);
}
function start() {
if (isRedacted()) {
//console.log("Redacting");
@ -302,6 +325,7 @@
updateHistogram();
updateHostCounts();
updateSiteFunnel();
OneSecondCadence();
}
$(document).ready(start);

View File

@ -65,6 +65,7 @@
<div class="row mbot8 row220">
<!-- 5 minutes of throughput -->
<!--
<div class="col-sm-4">
<div class="card bg-light">
<div class="card-body">
@ -73,6 +74,7 @@
</div>
</div>
</div>
-->
<!-- RTT Histogram -->
<div class="col-sm-4">
@ -258,7 +260,7 @@
$("#treeList").html(tbl);
// Build the stacked chart
buffers.plotStackedBars("tpGraph", rootName);
//buffers.plotStackedBars("tpGraph", rootName);
// Build the RTT histo
rtt_histo.plot("rttHistogram");

View File

@ -2,8 +2,8 @@ use lqos_bus::{BusRequest, BusResponse, TcHandle};
use lqos_utils::hex_string::read_hex_string;
use nix::libc::getpid;
use pyo3::{
exceptions::PyOSError, pyclass, pyfunction, pymodule, types::PyModule,
wrap_pyfunction, PyResult, Python, pymethods,
exceptions::PyOSError, pyclass, pyfunction, pymethods, pymodule,
types::PyModule, wrap_pyfunction, PyResult, Python,
};
use std::{
fs::{remove_file, File},
@ -158,7 +158,13 @@ impl BatchedCommands {
Ok(Self { batch: Vec::new() })
}
pub fn add_ip_mapping(&mut self, ip: String, classid: String, cpu: String, upload: bool) -> PyResult<()> {
pub fn add_ip_mapping(
&mut self,
ip: String,
classid: String,
cpu: String,
upload: bool,
) -> PyResult<()> {
let request = parse_add_ip(&ip, &classid, &cpu, upload);
if let Ok(request) = request {
self.batch.push(request);

View File

@ -13,10 +13,8 @@ lqos_sys = { path = "../lqos_sys" }
lqos_utils = { path = "../lqos_utils" }
log = "0"
log-once = "0.4.0"
lazy_static = "1.4"
parking_lot = "0"
tokio = { version = "1", features = [ "full", "parking_lot" ] }
rayon = "1"
once_cell = "1"
[dev-dependencies]
criterion = { version = "0", features = [ "html_reports"] }

View File

@ -3,7 +3,7 @@ use lqos_bus::BusResponse;
pub fn get_raw_circuit_data(circuit_id: &str) -> BusResponse {
still_watching(circuit_id);
let reader = CIRCUIT_TO_QUEUE.read();
let reader = CIRCUIT_TO_QUEUE.read().unwrap();
if let Some(circuit) = reader.get(circuit_id) {
if let Ok(json) = serde_json::to_string(circuit) {
BusResponse::RawQueueData(json)

View File

@ -1,9 +1,8 @@
use crate::queue_store::QueueStore;
use lazy_static::*;
use parking_lot::RwLock;
use std::collections::HashMap;
use once_cell::sync::Lazy;
lazy_static! {
pub(crate) static ref CIRCUIT_TO_QUEUE: RwLock<HashMap<String, QueueStore>> =
RwLock::new(HashMap::new());
}
use crate::queue_store::QueueStore;
use std::collections::HashMap;
use std::sync::RwLock;
pub(crate) static CIRCUIT_TO_QUEUE: Lazy<RwLock<HashMap<String, QueueStore>>> =
Lazy::new(|| RwLock::new(HashMap::new()));

View File

@ -1,10 +1,6 @@
use lazy_static::*;
use std::sync::atomic::AtomicU64;
lazy_static! {
pub(crate) static ref QUEUE_MONITOR_INTERVAL: AtomicU64 =
AtomicU64::new(1000);
}
pub(crate) static QUEUE_MONITOR_INTERVAL: AtomicU64 = AtomicU64::new(1000);
pub fn set_queue_refresh_interval(interval_ms: u64) {
QUEUE_MONITOR_INTERVAL

View File

@ -1,19 +1,16 @@
use std::sync::RwLock;
use crate::queue_structure::{
queue_network::QueueNetwork, queue_node::QueueNode, read_queueing_structure,
};
use lazy_static::*;
use log::{error, info};
use lqos_utils::file_watcher::FileWatcher;
use parking_lot::RwLock;
use once_cell::sync::Lazy;
use thiserror::Error;
use tokio::task::spawn_blocking;
lazy_static! {
/// Global storage of the shaped devices csv data.
/// Updated by the file system watcher whenever
/// the underlying file changes.
pub(crate) static ref QUEUE_STRUCTURE : RwLock<QueueStructure> = RwLock::new(QueueStructure::new());
}
pub(crate) static QUEUE_STRUCTURE: Lazy<RwLock<QueueStructure>> =
Lazy::new(|| RwLock::new(QueueStructure::new()));
#[derive(Clone)]
pub(crate) struct QueueStructure {
@ -48,7 +45,7 @@ pub async fn spawn_queue_structure_monitor() {
fn update_queue_structure() {
info!("queueingStructure.json reloaded");
QUEUE_STRUCTURE.write().update();
QUEUE_STRUCTURE.write().unwrap().update();
}
/// Fires up a Linux file system watcher than notifies

View File

@ -1,7 +1,7 @@
use lqos_utils::hex_string::read_hex_string;
use super::QueueStructureError;
use log::error;
use lqos_bus::TcHandle;
use lqos_utils::hex_string::read_hex_string;
use serde_json::Value;
#[derive(Default, Clone, Debug)]

View File

@ -5,7 +5,6 @@ use crate::{
use log::{info, warn};
use lqos_config::LibreQoSConfig;
use lqos_utils::fdtimer::periodic;
use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator};
mod reader;
mod watched_queues;
use self::watched_queues::expire_watched_queues;
@ -13,7 +12,7 @@ use watched_queues::WATCHED_QUEUES;
pub use watched_queues::{add_watched_queue, still_watching};
fn track_queues() {
let mut watching = WATCHED_QUEUES.write();
let mut watching = WATCHED_QUEUES.write().unwrap();
if watching.is_empty() {
//info!("No queues marked for read.");
return; // There's nothing to do - bail out fast
@ -24,7 +23,7 @@ fn track_queues() {
return;
}
let config = config.unwrap();
watching.par_iter_mut().for_each(|q| {
watching.iter_mut().for_each(|q| {
let (circuit_id, download_class, upload_class) = q.get();
let (download, upload) = if config.on_a_stick_mode {
@ -50,7 +49,7 @@ fn track_queues() {
if let Ok(download) = download {
if let Ok(upload) = upload {
let mut mapping = CIRCUIT_TO_QUEUE.write();
let mut mapping = CIRCUIT_TO_QUEUE.write().unwrap();
if let Some(circuit) = mapping.get_mut(circuit_id) {
circuit.update(&download[0], &upload[0]);
} else {

View File

@ -1,14 +1,12 @@
use crate::queue_structure::QUEUE_STRUCTURE;
use lazy_static::*;
use log::{info, warn};
use lqos_bus::TcHandle;
use lqos_utils::unix_time::unix_now;
use parking_lot::RwLock;
use once_cell::sync::Lazy;
use std::sync::RwLock;
lazy_static! {
pub(crate) static ref WATCHED_QUEUES: RwLock<Vec<WatchedQueue>> =
RwLock::new(Vec::new());
}
pub(crate) static WATCHED_QUEUES: Lazy<RwLock<Vec<WatchedQueue>>> =
Lazy::new(|| RwLock::new(Vec::new()));
pub(crate) struct WatchedQueue {
circuit_id: String,
@ -35,7 +33,7 @@ pub fn add_watched_queue(circuit_id: &str) {
//info!("Watching queue {circuit_id}");
let max = unsafe { lqos_sys::libbpf_num_possible_cpus() } * 2;
{
let read_lock = WATCHED_QUEUES.read();
let read_lock = WATCHED_QUEUES.read().unwrap();
if read_lock.iter().any(|q| q.circuit_id == circuit_id) {
warn!("Queue {circuit_id} is already being watched. Duplicate ignored.");
return; // No duplicates, please
@ -49,7 +47,7 @@ pub fn add_watched_queue(circuit_id: &str) {
}
}
if let Some(queues) = &QUEUE_STRUCTURE.read().maybe_queues {
if let Some(queues) = &QUEUE_STRUCTURE.read().unwrap().maybe_queues {
if let Some(circuit) = queues.iter().find(|c| {
c.circuit_id.is_some() && c.circuit_id.as_ref().unwrap() == circuit_id
}) {
@ -60,7 +58,7 @@ pub fn add_watched_queue(circuit_id: &str) {
upload_class: circuit.up_class_id,
};
WATCHED_QUEUES.write().push(new_watch);
WATCHED_QUEUES.write().unwrap().push(new_watch);
//info!("Added {circuit_id} to watched queues. Now watching {} queues.", WATCHED_QUEUES.read().len());
} else {
warn!("No circuit ID of {circuit_id}");
@ -71,13 +69,13 @@ pub fn add_watched_queue(circuit_id: &str) {
}
pub(crate) fn expire_watched_queues() {
let mut lock = WATCHED_QUEUES.write();
let mut lock = WATCHED_QUEUES.write().unwrap();
let now = unix_now().unwrap_or(0);
lock.retain(|w| w.expires_unix_time > now);
}
pub fn still_watching(circuit_id: &str) {
let mut lock = WATCHED_QUEUES.write();
let mut lock = WATCHED_QUEUES.write().unwrap();
if let Some(q) = lock.iter_mut().find(|q| q.circuit_id == circuit_id) {
//info!("Still watching circuit: {circuit_id}");
q.refresh_timer();

View File

@ -59,9 +59,9 @@ impl XdpIpAddress {
/// Convers an `XdpIpAddress` type to a Rust `IpAddr` type, using
/// the in-build mapped function for squishing IPv4 into IPv6
pub fn as_ipv6(&self) -> Ipv6Addr {
if self.is_v4()
{
Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15]).to_ipv6_mapped()
if self.is_v4() {
Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15])
.to_ipv6_mapped()
} else {
Ipv6Addr::new(
BigEndian::read_u16(&self.0[0..2]),
@ -78,8 +78,7 @@ impl XdpIpAddress {
/// Converts an `XdpIpAddress` type to a Rust `IpAddr` type
pub fn as_ip(&self) -> IpAddr {
if self.is_v4()
{
if self.is_v4() {
// It's an IPv4 Address
IpAddr::V4(Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15]))
} else {

View File

@ -1,7 +1,7 @@
mod commands;
pub mod fdtimer;
pub mod file_watcher;
pub mod hex_string;
pub mod packet_scale;
mod string_table_enum;
pub mod unix_time;
pub mod hex_string;

View File

@ -15,7 +15,6 @@ lqos_queue_tracker = { path = "../lqos_queue_tracker" }
lqos_utils = { path = "../lqos_utils" }
tokio = { version = "1", features = [ "full", "parking_lot" ] }
once_cell = "1.17.1"
parking_lot = "0.12"
lqos_bus = { path = "../lqos_bus" }
signal-hook = "0.3"
serde_json = "1"
@ -23,7 +22,6 @@ serde = { version = "1.0", features = ["derive"] }
env_logger = "0"
log = "0"
nix = "0"
rayon = "1"
sysinfo = "0"
# Support JemAlloc on supported platforms

View File

@ -170,6 +170,9 @@ fn handle_bus_requests(
BusRequest::GetNodeNamesFromIds(nodes) => {
shaped_devices_tracker::map_node_names(nodes)
}
BusRequest::GetFunnel { target: parent } => {
shaped_devices_tracker::get_funnel(parent)
}
});
}
}

View File

@ -4,7 +4,7 @@ use lqos_bus::BusResponse;
use lqos_config::{ConfigShapedDevices, NetworkJsonNode};
use lqos_utils::file_watcher::FileWatcher;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use std::sync::RwLock;
use tokio::task::spawn_blocking;
mod netjson;
pub use netjson::*;
@ -17,11 +17,14 @@ fn load_shaped_devices() {
let shaped_devices = ConfigShapedDevices::load();
if let Ok(new_file) = shaped_devices {
info!("ShapedDevices.csv loaded");
*SHAPED_DEVICES.write() = new_file;
crate::throughput_tracker::THROUGHPUT_TRACKER.write().refresh_circuit_ids();
*SHAPED_DEVICES.write().unwrap() = new_file;
crate::throughput_tracker::THROUGHPUT_TRACKER
.write()
.unwrap()
.refresh_circuit_ids();
} else {
warn!("ShapedDevices.csv failed to load, see previous error messages. Reverting to empty set.");
*SHAPED_DEVICES.write() = ConfigShapedDevices::default();
*SHAPED_DEVICES.write().unwrap() = ConfigShapedDevices::default();
}
}
@ -55,7 +58,7 @@ fn watch_for_shaped_devices_changing() -> Result<()> {
}
pub fn get_one_network_map_layer(parent_idx: usize) -> BusResponse {
let net_json = NETWORK_JSON.read();
let net_json = NETWORK_JSON.read().unwrap();
if let Some(parent) = net_json.get_cloned_entry_by_index(parent_idx) {
let mut nodes = vec![(parent_idx, parent)];
nodes.extend_from_slice(&net_json.get_cloned_children(parent_idx));
@ -66,7 +69,7 @@ pub fn get_one_network_map_layer(parent_idx: usize) -> BusResponse {
}
pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse {
let net_json = NETWORK_JSON.read();
let net_json = NETWORK_JSON.read().unwrap();
if let Some(parent) = net_json.get_cloned_entry_by_index(0) {
let mut nodes = vec![(0, parent)];
nodes.extend_from_slice(&net_json.get_cloned_children(0));
@ -86,14 +89,17 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse {
other_bw.1 += n.1.current_throughput.1;
});
nodes.push((0, NetworkJsonNode{
nodes.push((
0,
NetworkJsonNode {
name: "Others".into(),
max_throughput: (0, 0),
current_throughput: other_bw,
rtts: Vec::new(),
parents: Vec::new(),
immediate_parent: None,
}));
},
));
}
BusResponse::NetworkMap(nodes)
} else {
@ -103,14 +109,25 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse {
pub fn map_node_names(nodes: &[usize]) -> BusResponse {
let mut result = Vec::new();
let reader = NETWORK_JSON.read();
let reader = NETWORK_JSON.read().unwrap();
nodes.iter().for_each(|id| {
if let Some(node) = reader.nodes.get(*id) {
result.push((
*id,
node.name.clone(),
));
result.push((*id, node.name.clone()));
}
});
BusResponse::NodeNames(result)
}
pub fn get_funnel(circuit_id: &str) -> BusResponse {
let reader = NETWORK_JSON.read().unwrap();
if let Some(index) = reader.get_index_for_name(circuit_id) {
// Reverse the scanning order and skip the last entry (the parent)
let mut result = Vec::new();
for idx in reader.nodes[index].parents.iter().rev().skip(1) {
result.push((*idx, reader.nodes[*idx].clone()));
}
return BusResponse::NetworkMap(result);
}
BusResponse::Fail("Unknown Node".into())
}

View File

@ -1,12 +1,13 @@
use log::{info, error, warn};
use anyhow::Result;
use log::{error, info, warn};
use lqos_config::NetworkJson;
use lqos_utils::file_watcher::FileWatcher;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use std::sync::RwLock;
use tokio::task::spawn_blocking;
use anyhow::Result;
pub static NETWORK_JSON: Lazy<RwLock<NetworkJson>> = Lazy::new(|| RwLock::new(NetworkJson::default()));
pub static NETWORK_JSON: Lazy<RwLock<NetworkJson>> =
Lazy::new(|| RwLock::new(NetworkJson::default()));
pub async fn network_json_watcher() {
spawn_blocking(|| {
@ -21,9 +22,7 @@ fn watch_for_network_json_changing() -> Result<()> {
let watch_path = NetworkJson::path();
if watch_path.is_err() {
error!("Unable to generate path for network.json");
return Err(anyhow::Error::msg(
"Unable to create path for network.json",
));
return Err(anyhow::Error::msg("Unable to create path for network.json"));
}
let watch_path = watch_path.unwrap();
@ -40,7 +39,7 @@ fn watch_for_network_json_changing() -> Result<()> {
fn load_network_json() {
let njs = NetworkJson::load();
if let Ok(njs) = njs {
*NETWORK_JSON.write() = njs;
*NETWORK_JSON.write().unwrap() = njs;
} else {
warn!("Unable to load network.json");
}

View File

@ -1,17 +1,21 @@
mod throughput_entry;
mod tracking_data;
use crate::{throughput_tracker::tracking_data::ThroughputTracker, shaped_devices_tracker::NETWORK_JSON};
use crate::{
shaped_devices_tracker::NETWORK_JSON,
throughput_tracker::tracking_data::ThroughputTracker,
};
use log::{info, warn};
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
use lqos_sys::XdpIpAddress;
use lqos_utils::{fdtimer::periodic, unix_time::time_since_boot};
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use std::sync::RwLock;
use std::time::Duration;
const RETIRE_AFTER_SECONDS: u64 = 30;
pub static THROUGHPUT_TRACKER: Lazy<RwLock<ThroughputTracker>> = Lazy::new(|| RwLock::new(ThroughputTracker::new()));
pub static THROUGHPUT_TRACKER: Lazy<RwLock<ThroughputTracker>> =
Lazy::new(|| RwLock::new(ThroughputTracker::new()));
pub fn spawn_throughput_monitor() {
info!("Starting the bandwidth monitor thread.");
@ -20,12 +24,13 @@ pub fn spawn_throughput_monitor() {
std::thread::spawn(move || {
periodic(interval_ms, "Throughput Monitor", &mut || {
let mut throughput = THROUGHPUT_TRACKER.write();
let mut net_json = NETWORK_JSON.write();
throughput.copy_previous_and_reset_rtt(&mut net_json);
throughput.apply_new_throughput_counters();
throughput.apply_rtt_data();
throughput.update_totals(&mut net_json);
let mut throughput = THROUGHPUT_TRACKER.write().unwrap();
let mut net_json = NETWORK_JSON.write().unwrap();
net_json.zero_throughput_and_rtt();
throughput.copy_previous_and_reset_rtt();
throughput.apply_new_throughput_counters(&mut net_json);
throughput.apply_rtt_data(&mut net_json);
throughput.update_totals();
throughput.next_cycle();
});
});
@ -33,7 +38,7 @@ pub fn spawn_throughput_monitor() {
pub fn current_throughput() -> BusResponse {
let (bits_per_second, packets_per_second, shaped_bits_per_second) = {
let tp = THROUGHPUT_TRACKER.read();
let tp = THROUGHPUT_TRACKER.read().unwrap();
(
tp.bits_per_second(),
tp.packets_per_second(),
@ -49,7 +54,7 @@ pub fn current_throughput() -> BusResponse {
pub fn host_counters() -> BusResponse {
let mut result = Vec::new();
let tp = THROUGHPUT_TRACKER.read();
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data.iter().for_each(|(k, v)| {
let ip = k.as_ip();
let (down, up) = v.bytes_per_second;
@ -67,7 +72,7 @@ type TopList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, String);
pub fn top_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp = THROUGHPUT_TRACKER.read();
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
@ -112,7 +117,7 @@ pub fn top_n(start: u32, end: u32) -> BusResponse {
pub fn worst_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp = THROUGHPUT_TRACKER.read();
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
@ -157,7 +162,7 @@ pub fn worst_n(start: u32, end: u32) -> BusResponse {
}
pub fn best_n(start: u32, end: u32) -> BusResponse {
let mut full_list: Vec<TopList> = {
let tp = THROUGHPUT_TRACKER.read();
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())
@ -203,7 +208,7 @@ pub fn best_n(start: u32, end: u32) -> BusResponse {
}
pub fn xdp_pping_compat() -> BusResponse {
let raw = THROUGHPUT_TRACKER.read();
let raw = THROUGHPUT_TRACKER.read().unwrap();
let result = raw
.raw_data
.iter()
@ -242,7 +247,7 @@ pub fn xdp_pping_compat() -> BusResponse {
pub fn rtt_histogram() -> BusResponse {
let mut result = vec![0; 20];
let reader = THROUGHPUT_TRACKER.read();
let reader = THROUGHPUT_TRACKER.read().unwrap();
for (_, data) in reader
.raw_data
.iter()
@ -265,7 +270,7 @@ pub fn rtt_histogram() -> BusResponse {
pub fn host_counts() -> BusResponse {
let mut total = 0;
let mut shaped = 0;
let tp = THROUGHPUT_TRACKER.read();
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(_, d)| retire_check(tp.cycle, d.most_recent_cycle))
@ -294,7 +299,7 @@ pub fn all_unknown_ips() -> BusResponse {
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos();
let mut full_list: Vec<FullList> = {
let tp = THROUGHPUT_TRACKER.read();
let tp = THROUGHPUT_TRACKER.read().unwrap();
tp.raw_data
.iter()
.filter(|(ip, _)| !ip.as_ip().is_loopback())

View File

@ -4,7 +4,6 @@ use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use lqos_bus::TcHandle;
use lqos_config::NetworkJson;
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress};
use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator};
use std::collections::HashMap;
pub struct ThroughputTracker {
@ -29,17 +28,11 @@ impl ThroughputTracker {
}
}
pub(crate) fn copy_previous_and_reset_rtt(
&mut self,
netjson: &mut NetworkJson,
) {
// Zero the previous funnel hierarchy current numbers
netjson.zero_throughput_and_rtt();
pub(crate) fn copy_previous_and_reset_rtt(&mut self) {
// Copy previous byte/packet numbers and reset RTT data
// We're using Rayon's "par_iter_mut" to spread the operation across
// all CPU cores.
self.raw_data.par_iter_mut().for_each(|(_k, v)| {
self.raw_data.iter_mut().for_each(|(_k, v)| {
if v.first_cycle < self.cycle {
v.bytes_per_second.0 =
u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
@ -64,7 +57,7 @@ impl ThroughputTracker {
fn lookup_circuit_id(xdp_ip: &XdpIpAddress) -> Option<String> {
let mut circuit_id = None;
let lookup = xdp_ip.as_ipv6();
let cfg = SHAPED_DEVICES.read();
let cfg = SHAPED_DEVICES.read().unwrap();
if let Some((_, id)) = cfg.trie.longest_match(lookup) {
circuit_id = Some(cfg.devices[*id].circuit_id.clone());
}
@ -76,12 +69,14 @@ impl ThroughputTracker {
circuit_id: Option<String>,
) -> Option<String> {
if let Some(circuit_id) = circuit_id {
let shaped = SHAPED_DEVICES.read();
shaped
let shaped = SHAPED_DEVICES.read().unwrap();
let parent_name = shaped
.devices
.iter()
.find(|d| d.circuit_id == circuit_id)
.map(|device| device.parent_node.clone())
.map(|device| device.parent_node.clone());
//println!("{parent_name:?}");
parent_name
} else {
None
}
@ -91,7 +86,7 @@ impl ThroughputTracker {
circuit_id: Option<String>,
) -> Option<Vec<usize>> {
if let Some(parent) = Self::get_node_name_for_circuit_id(circuit_id) {
let lock = crate::shaped_devices_tracker::NETWORK_JSON.read();
let lock = crate::shaped_devices_tracker::NETWORK_JSON.read().unwrap();
lock.get_parents_for_circuit_id(&parent)
} else {
None
@ -99,14 +94,17 @@ impl ThroughputTracker {
}
pub(crate) fn refresh_circuit_ids(&mut self) {
self.raw_data.par_iter_mut().for_each(|(ip, data)| {
self.raw_data.iter_mut().for_each(|(ip, data)| {
data.circuit_id = Self::lookup_circuit_id(ip);
data.network_json_parents =
Self::lookup_network_parents(data.circuit_id.clone());
});
}
pub(crate) fn apply_new_throughput_counters(&mut self) {
pub(crate) fn apply_new_throughput_counters(
&mut self,
net_json: &mut NetworkJson,
) {
let cycle = self.cycle;
let raw_data = &mut self.raw_data;
throughput_for_each(&mut |xdp_ip, counts| {
@ -127,6 +125,16 @@ impl ThroughputTracker {
}
if entry.packets != entry.prev_packets {
entry.most_recent_cycle = cycle;
if let Some(parents) = &entry.network_json_parents {
net_json.add_throughput_cycle(
parents,
(
entry.bytes.0 - entry.prev_bytes.0,
entry.bytes.1 - entry.prev_bytes.1,
),
);
}
}
} else {
let circuit_id = Self::lookup_circuit_id(xdp_ip);
@ -160,19 +168,22 @@ impl ThroughputTracker {
});
}
pub(crate) fn apply_rtt_data(&mut self) {
pub(crate) fn apply_rtt_data(&mut self, net_json: &mut NetworkJson) {
rtt_for_each(&mut |raw_ip, rtt| {
if rtt.has_fresh_data != 0 {
let ip = XdpIpAddress(*raw_ip);
if let Some(tracker) = self.raw_data.get_mut(&ip) {
tracker.recent_rtt_data = rtt.rtt;
tracker.last_fresh_rtt_data_cycle = self.cycle;
if let Some(parents) = &tracker.network_json_parents {
net_json.add_rtt_cycle(parents, tracker.median_latency());
}
}
}
});
}
pub(crate) fn update_totals(&mut self, net_json: &mut NetworkJson) {
pub(crate) fn update_totals(&mut self) {
self.bytes_per_second = (0, 0);
self.packets_per_second = (0, 0);
self.shaped_bytes_per_second = (0, 0);
@ -186,12 +197,9 @@ impl ThroughputTracker {
v.packets.0.saturating_sub(v.prev_packets.0),
v.packets.1.saturating_sub(v.prev_packets.1),
v.tc_handle.as_u32() > 0,
&v.network_json_parents,
v.median_latency(),
)
})
.for_each(
|(bytes_down, bytes_up, packets_down, packets_up, shaped, parents, median_rtt)| {
.for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
self.bytes_per_second.0 =
self.bytes_per_second.0.checked_add(bytes_down).unwrap_or(0);
self.bytes_per_second.1 =
@ -206,145 +214,16 @@ impl ThroughputTracker {
.0
.checked_add(bytes_down)
.unwrap_or(0);
self.shaped_bytes_per_second.1 = self
.shaped_bytes_per_second
.1
.checked_add(bytes_up)
.unwrap_or(0);
self.shaped_bytes_per_second.1 =
self.shaped_bytes_per_second.1.checked_add(bytes_up).unwrap_or(0);
}
// If we have parent node data, we apply it now
if let Some(parents) = parents {
net_json.add_throughput_cycle(
parents,
(self.bytes_per_second.0, self.bytes_per_second.1),
median_rtt,
)
}
},
);
});
}
pub(crate) fn next_cycle(&mut self) {
self.cycle += 1;
}
// pub(crate) fn tick(
// &mut self,
// value_dump: &[(XdpIpAddress, Vec<HostCounter>)],
// rtt: Result<Vec<([u8; 16], RttTrackingEntry)>>,
// ) -> Result<()> {
// // Copy previous byte/packet numbers and reset RTT data
// self.raw_data.iter_mut().for_each(|(_k, v)| {
// if v.first_cycle < self.cycle {
// v.bytes_per_second.0 = u64::checked_sub(v.bytes.0, v.prev_bytes.0).unwrap_or(0);
// v.bytes_per_second.1 = u64::checked_sub(v.bytes.1, v.prev_bytes.1).unwrap_or(0);
// v.packets_per_second.0 =
// u64::checked_sub(v.packets.0, v.prev_packets.0).unwrap_or(0);
// v.packets_per_second.1 =
// u64::checked_sub(v.packets.1, v.prev_packets.1).unwrap_or(0);
// v.prev_bytes = v.bytes;
// v.prev_packets = v.packets;
// }
// // Roll out stale RTT data
// if self.cycle > RETIRE_AFTER_SECONDS
// && v.last_fresh_rtt_data_cycle < self.cycle - RETIRE_AFTER_SECONDS
// {
// v.recent_rtt_data = [0; 60];
// }
// });
// value_dump.iter().for_each(|(xdp_ip, counts)| {
// if let Some(entry) = self.raw_data.get_mut(xdp_ip) {
// entry.bytes = (0, 0);
// entry.packets = (0, 0);
// for c in counts {
// entry.bytes.0 += c.download_bytes;
// entry.bytes.1 += c.upload_bytes;
// entry.packets.0 += c.download_packets;
// entry.packets.1 += c.upload_packets;
// if c.tc_handle != 0 {
// entry.tc_handle = TcHandle::from_u32(c.tc_handle);
// }
// if c.last_seen != 0 {
// entry.last_seen = c.last_seen;
// }
// }
// if entry.packets != entry.prev_packets {
// entry.most_recent_cycle = self.cycle;
// }
// } else {
// let mut entry = ThroughputEntry {
// first_cycle: self.cycle,
// most_recent_cycle: 0,
// bytes: (0, 0),
// packets: (0, 0),
// prev_bytes: (0, 0),
// prev_packets: (0, 0),
// bytes_per_second: (0, 0),
// packets_per_second: (0, 0),
// tc_handle: TcHandle::zero(),
// recent_rtt_data: [0; 60],
// last_fresh_rtt_data_cycle: 0,
// last_seen: 0,
// };
// for c in counts {
// entry.bytes.0 += c.download_bytes;
// entry.bytes.1 += c.upload_bytes;
// entry.packets.0 += c.download_packets;
// entry.packets.1 += c.upload_packets;
// if c.tc_handle != 0 {
// entry.tc_handle = TcHandle::from_u32(c.tc_handle);
// }
// }
// self.raw_data.insert(*xdp_ip, entry);
// }
// });
// // Apply RTT data
// if let Ok(rtt_dump) = rtt {
// for (raw_ip, rtt) in rtt_dump {
// if rtt.has_fresh_data != 0 {
// let ip = XdpIpAddress(raw_ip);
// if let Some(tracker) = self.raw_data.get_mut(&ip) {
// tracker.recent_rtt_data = rtt.rtt;
// tracker.last_fresh_rtt_data_cycle = self.cycle;
// }
// }
// }
// }
// // Update totals
// self.bytes_per_second = (0, 0);
// self.packets_per_second = (0, 0);
// self.shaped_bytes_per_second = (0, 0);
// self.raw_data
// .iter()
// .map(|(_k, v)| {
// (
// v.bytes.0 - v.prev_bytes.0,
// v.bytes.1 - v.prev_bytes.1,
// v.packets.0 - v.prev_packets.0,
// v.packets.1 - v.prev_packets.1,
// v.tc_handle.as_u32() > 0,
// )
// })
// .for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
// self.bytes_per_second.0 += bytes_down;
// self.bytes_per_second.1 += bytes_up;
// self.packets_per_second.0 += packets_down;
// self.packets_per_second.1 += packets_up;
// if shaped {
// self.shaped_bytes_per_second.0 += bytes_down;
// self.shaped_bytes_per_second.1 += bytes_up;
// }
// });
// // Onto the next cycle
// self.cycle += 1;
// Ok(())
// }
pub(crate) fn bits_per_second(&self) -> (u64, u64) {
(self.bytes_per_second.0 * 8, self.bytes_per_second.1 * 8)
}