From 1376d7e812f1597259f4abd4bcf2561b5318988e Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Wed, 8 Mar 2023 15:58:52 +0000 Subject: [PATCH] Elide some write locks on NETWORK_JSON altogether by making the throughput counts atomic with interior mutability. --- src/rust/lqos_bus/src/bus/response.rs | 2 +- src/rust/lqos_config/src/lib.rs | 2 +- src/rust/lqos_config/src/network_json/mod.rs | 72 ++++++++++++++----- .../lqos_node_manager/src/network_tree.rs | 8 +-- .../lqosd/src/shaped_devices_tracker/mod.rs | 6 +- .../src/throughput_tracker/tracking_data.rs | 2 +- 6 files changed, 66 insertions(+), 26 deletions(-) diff --git a/src/rust/lqos_bus/src/bus/response.rs b/src/rust/lqos_bus/src/bus/response.rs index ec2bc9a1..78b8e7d3 100644 --- a/src/rust/lqos_bus/src/bus/response.rs +++ b/src/rust/lqos_bus/src/bus/response.rs @@ -70,7 +70,7 @@ pub enum BusResponse { RawQueueData(String), /// Results from network map queries - NetworkMap(Vec<(usize, lqos_config::NetworkJsonNode)>), + NetworkMap(Vec<(usize, lqos_config::NetworkJsonTransport)>), /// Named nodes from network.json NodeNames(Vec<(usize, String)>), diff --git a/src/rust/lqos_config/src/lib.rs b/src/rust/lqos_config/src/lib.rs index 0d456a06..65cced4e 100644 --- a/src/rust/lqos_config/src/lib.rs +++ b/src/rust/lqos_config/src/lib.rs @@ -16,7 +16,7 @@ mod shaped_devices; pub use authentication::{UserRole, WebUsers}; pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables}; pub use libre_qos_config::LibreQoSConfig; -pub use network_json::{NetworkJson, NetworkJsonNode}; +pub use network_json::{NetworkJson, NetworkJsonNode, NetworkJsonTransport}; pub use program_control::load_libreqos; pub use shaped_devices::{ConfigShapedDevices, ShapedDevice}; diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs index c9b0f766..4f186231 100644 --- a/src/rust/lqos_config/src/network_json/mod.rs +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -4,12 +4,12 @@ use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use std::{ fs, - path::{Path, PathBuf}, + path::{Path, PathBuf}, sync::atomic::AtomicU64, }; use thiserror::Error; /// Describes a node in the network map tree. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug)] pub struct NetworkJsonNode { /// The node name, as it appears in `network.json` pub name: String, @@ -18,7 +18,7 @@ pub struct NetworkJsonNode { pub max_throughput: (u32, u32), // In mbps /// Current throughput (in bytes/second) at this node - pub current_throughput: (u64, u64), // In bytes + pub current_throughput: (AtomicU64, AtomicU64), // In bytes /// Approximate RTTs reported for this level of the tree. /// It's never going to be as statistically accurate as the actual @@ -33,10 +33,47 @@ pub struct NetworkJsonNode { pub immediate_parent: Option, } +impl NetworkJsonNode { + /// Make a deep copy of a `NetworkJsonNode`, converting atomics + /// into concrete values. + pub fn clone_to_transit(&self) -> NetworkJsonTransport { + NetworkJsonTransport { + name: self.name.clone(), + max_throughput: self.max_throughput, + current_throughput: ( + self.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed), + self.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed), + ), + rtts: self.rtts.clone(), + parents: self.parents.clone(), + immediate_parent: self.immediate_parent, + } + } +} + +/// A "transport-friendly" version of `NetworkJsonNode`. Designed +/// to be quickly cloned from original nodes and efficiently +/// transmitted/received. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct NetworkJsonTransport { + /// Display name + pub name: String, + /// Max throughput for node (not clamped) + pub max_throughput: (u32, u32), + /// Current node throughput + pub current_throughput: (u64, u64), + /// Set of RTT data + pub rtts: Vec, + /// Node indices of parents + pub parents: Vec, + /// The immediate parent node in the tree + pub immediate_parent: Option, +} + /// Holder for the network.json representation. /// This is condensed into a single level vector with index-based referencing /// for easy use in funnel calculations. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug)] pub struct NetworkJson { /// Nodes that make up the tree, flattened and referenced by index number. /// TODO: We should add a primary key to nodes in network.json. @@ -79,7 +116,7 @@ impl NetworkJson { let mut nodes = vec![NetworkJsonNode { name: "Root".to_string(), max_throughput: (0, 0), - current_throughput: (0, 0), + current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), parents: Vec::new(), immediate_parent: None, rtts: Vec::new(), @@ -116,8 +153,8 @@ impl NetworkJson { pub fn get_cloned_entry_by_index( &self, index: usize, - ) -> Option { - self.nodes.get(index).cloned() + ) -> Option { + self.nodes.get(index).map(|n| n.clone_to_transit()) } /// Retrieve a cloned copy of all children with a parent containing a specific @@ -125,13 +162,13 @@ impl NetworkJson { pub fn get_cloned_children( &self, index: usize, - ) -> Vec<(usize, NetworkJsonNode)> { + ) -> Vec<(usize, NetworkJsonTransport)> { self .nodes .iter() .enumerate() .filter(|(_i, n)| n.immediate_parent == Some(index)) - .map(|(i, n)| (i, n.clone())) + .map(|(i, n)| (i, n.clone_to_transit())) .collect() } @@ -151,22 +188,25 @@ impl NetworkJson { /// Sets all current throughput values to zero pub fn zero_throughput_and_rtt(&mut self) { self.nodes.iter_mut().for_each(|n| { - n.current_throughput = (0, 0); + n.current_throughput.0.store(0, std::sync::atomic::Ordering::Relaxed); + n.current_throughput.1.store(0, std::sync::atomic::Ordering::Relaxed); n.rtts.clear(); }); } - /// Add throughput numbers to node entries + /// Add throughput numbers to node entries. Note that this does *not* require + /// mutable access due to atomics and interior mutability - so it is safe to use + /// a read lock. pub fn add_throughput_cycle( - &mut self, + &self, targets: &[usize], bytes: (u64, u64), ) { for idx in targets { // Safety first: use "get" to ensure that the node exists - if let Some(node) = self.nodes.get_mut(*idx) { - node.current_throughput.0 += bytes.0; - node.current_throughput.1 += bytes.1; + if let Some(node) = self.nodes.get(*idx) { + node.current_throughput.0.fetch_add(bytes.0, std::sync::atomic::Ordering::Relaxed); + node.current_throughput.1.fetch_add(bytes.1, std::sync::atomic::Ordering::Relaxed); } else { warn!("No network tree entry for index {idx}"); } @@ -219,7 +259,7 @@ fn recurse_node( json_to_u32(json.get("downloadBandwidthMbps")), json_to_u32(json.get("uploadBandwidthMbps")), ), - current_throughput: (0, 0), + current_throughput: (AtomicU64::new(0), AtomicU64::new(0)), name: name.to_string(), immediate_parent: Some(immediate_parent), rtts: Vec::new(), diff --git a/src/rust/lqos_node_manager/src/network_tree.rs b/src/rust/lqos_node_manager/src/network_tree.rs index 350da873..aa010f28 100644 --- a/src/rust/lqos_node_manager/src/network_tree.rs +++ b/src/rust/lqos_node_manager/src/network_tree.rs @@ -1,7 +1,7 @@ use std::net::IpAddr; use lqos_bus::{bus_request, BusRequest, BusResponse}; -use lqos_config::NetworkJsonNode; +use lqos_config::NetworkJsonTransport; use rocket::{ fs::NamedFile, serde::{json::Json, Serialize}, @@ -19,7 +19,7 @@ pub async fn tree_page<'a>() -> NoCache> { #[get("/api/network_tree/")] pub async fn tree_entry( parent: usize, -) -> NoCache>> { +) -> NoCache>> { let responses = bus_request(vec![BusRequest::GetNetworkMap { parent }]).await.unwrap(); let result = match &responses[0] { @@ -32,7 +32,7 @@ pub async fn tree_entry( #[get("/api/network_tree_summary")] pub async fn network_tree_summary( -) -> NoCache>> { +) -> NoCache>> { let responses = bus_request(vec![BusRequest::TopMapQueues(4)]).await.unwrap(); let result = match &responses[0] { @@ -106,7 +106,7 @@ pub async fn node_names( #[get("/api/funnel_for_queue/")] pub async fn funnel_for_queue( circuit_id: String, -) -> NoCache>> { +) -> NoCache>> { let mut result = Vec::new(); let target = SHAPED_DEVICES diff --git a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs index bc4243f8..2aa4f95f 100644 --- a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs +++ b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs @@ -1,7 +1,7 @@ use anyhow::Result; use log::{error, info, warn}; use lqos_bus::BusResponse; -use lqos_config::{ConfigShapedDevices, NetworkJsonNode}; +use lqos_config::{ConfigShapedDevices, NetworkJsonTransport}; use lqos_utils::file_watcher::FileWatcher; use once_cell::sync::Lazy; use std::sync::RwLock; @@ -91,7 +91,7 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse { nodes.push(( 0, - NetworkJsonNode { + NetworkJsonTransport { name: "Others".into(), max_throughput: (0, 0), current_throughput: other_bw, @@ -124,7 +124,7 @@ pub fn get_funnel(circuit_id: &str) -> BusResponse { // Reverse the scanning order and skip the last entry (the parent) let mut result = Vec::new(); for idx in reader.nodes[index].parents.iter().rev().skip(1) { - result.push((*idx, reader.nodes[*idx].clone())); + result.push((*idx, reader.nodes[*idx].clone_to_transit())); } return BusResponse::NetworkMap(result); } diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 2aed2376..85f3c65e 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -125,7 +125,7 @@ impl ThroughputTracker { entry.most_recent_cycle = cycle; if let Some(parents) = &entry.network_json_parents { - let mut net_json = NETWORK_JSON.write().unwrap(); + let net_json = NETWORK_JSON.read().unwrap(); net_json.add_throughput_cycle( parents, (