Elide some write locks on NETWORK_JSON altogether by making

the throughput counts atomic with interior mutability.
This commit is contained in:
Herbert Wolverson 2023-03-08 15:58:52 +00:00
parent ee47549728
commit 1376d7e812
6 changed files with 66 additions and 26 deletions

View File

@ -70,7 +70,7 @@ pub enum BusResponse {
RawQueueData(String), RawQueueData(String),
/// Results from network map queries /// Results from network map queries
NetworkMap(Vec<(usize, lqos_config::NetworkJsonNode)>), NetworkMap(Vec<(usize, lqos_config::NetworkJsonTransport)>),
/// Named nodes from network.json /// Named nodes from network.json
NodeNames(Vec<(usize, String)>), NodeNames(Vec<(usize, String)>),

View File

@ -16,7 +16,7 @@ mod shaped_devices;
pub use authentication::{UserRole, WebUsers}; pub use authentication::{UserRole, WebUsers};
pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables}; pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables};
pub use libre_qos_config::LibreQoSConfig; pub use libre_qos_config::LibreQoSConfig;
pub use network_json::{NetworkJson, NetworkJsonNode}; pub use network_json::{NetworkJson, NetworkJsonNode, NetworkJsonTransport};
pub use program_control::load_libreqos; pub use program_control::load_libreqos;
pub use shaped_devices::{ConfigShapedDevices, ShapedDevice}; pub use shaped_devices::{ConfigShapedDevices, ShapedDevice};

View File

@ -4,12 +4,12 @@ use serde::{Deserialize, Serialize};
use serde_json::{Map, Value}; use serde_json::{Map, Value};
use std::{ use std::{
fs, fs,
path::{Path, PathBuf}, path::{Path, PathBuf}, sync::atomic::AtomicU64,
}; };
use thiserror::Error; use thiserror::Error;
/// Describes a node in the network map tree. /// Describes a node in the network map tree.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug)]
pub struct NetworkJsonNode { pub struct NetworkJsonNode {
/// The node name, as it appears in `network.json` /// The node name, as it appears in `network.json`
pub name: String, pub name: String,
@ -18,7 +18,7 @@ pub struct NetworkJsonNode {
pub max_throughput: (u32, u32), // In mbps pub max_throughput: (u32, u32), // In mbps
/// Current throughput (in bytes/second) at this node /// Current throughput (in bytes/second) at this node
pub current_throughput: (u64, u64), // In bytes pub current_throughput: (AtomicU64, AtomicU64), // In bytes
/// Approximate RTTs reported for this level of the tree. /// Approximate RTTs reported for this level of the tree.
/// It's never going to be as statistically accurate as the actual /// It's never going to be as statistically accurate as the actual
@ -33,10 +33,47 @@ pub struct NetworkJsonNode {
pub immediate_parent: Option<usize>, pub immediate_parent: Option<usize>,
} }
impl NetworkJsonNode {
/// Make a deep copy of a `NetworkJsonNode`, converting atomics
/// into concrete values.
pub fn clone_to_transit(&self) -> NetworkJsonTransport {
NetworkJsonTransport {
name: self.name.clone(),
max_throughput: self.max_throughput,
current_throughput: (
self.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed),
self.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed),
),
rtts: self.rtts.clone(),
parents: self.parents.clone(),
immediate_parent: self.immediate_parent,
}
}
}
/// A "transport-friendly" version of `NetworkJsonNode`. Designed
/// to be quickly cloned from original nodes and efficiently
/// transmitted/received.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct NetworkJsonTransport {
/// Display name
pub name: String,
/// Max throughput for node (not clamped)
pub max_throughput: (u32, u32),
/// Current node throughput
pub current_throughput: (u64, u64),
/// Set of RTT data
pub rtts: Vec<f32>,
/// Node indices of parents
pub parents: Vec<usize>,
/// The immediate parent node in the tree
pub immediate_parent: Option<usize>,
}
/// Holder for the network.json representation. /// Holder for the network.json representation.
/// This is condensed into a single level vector with index-based referencing /// This is condensed into a single level vector with index-based referencing
/// for easy use in funnel calculations. /// for easy use in funnel calculations.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug)]
pub struct NetworkJson { pub struct NetworkJson {
/// Nodes that make up the tree, flattened and referenced by index number. /// Nodes that make up the tree, flattened and referenced by index number.
/// TODO: We should add a primary key to nodes in network.json. /// TODO: We should add a primary key to nodes in network.json.
@ -79,7 +116,7 @@ impl NetworkJson {
let mut nodes = vec![NetworkJsonNode { let mut nodes = vec![NetworkJsonNode {
name: "Root".to_string(), name: "Root".to_string(),
max_throughput: (0, 0), max_throughput: (0, 0),
current_throughput: (0, 0), current_throughput: (AtomicU64::new(0), AtomicU64::new(0)),
parents: Vec::new(), parents: Vec::new(),
immediate_parent: None, immediate_parent: None,
rtts: Vec::new(), rtts: Vec::new(),
@ -116,8 +153,8 @@ impl NetworkJson {
pub fn get_cloned_entry_by_index( pub fn get_cloned_entry_by_index(
&self, &self,
index: usize, index: usize,
) -> Option<NetworkJsonNode> { ) -> Option<NetworkJsonTransport> {
self.nodes.get(index).cloned() self.nodes.get(index).map(|n| n.clone_to_transit())
} }
/// Retrieve a cloned copy of all children with a parent containing a specific /// Retrieve a cloned copy of all children with a parent containing a specific
@ -125,13 +162,13 @@ impl NetworkJson {
pub fn get_cloned_children( pub fn get_cloned_children(
&self, &self,
index: usize, index: usize,
) -> Vec<(usize, NetworkJsonNode)> { ) -> Vec<(usize, NetworkJsonTransport)> {
self self
.nodes .nodes
.iter() .iter()
.enumerate() .enumerate()
.filter(|(_i, n)| n.immediate_parent == Some(index)) .filter(|(_i, n)| n.immediate_parent == Some(index))
.map(|(i, n)| (i, n.clone())) .map(|(i, n)| (i, n.clone_to_transit()))
.collect() .collect()
} }
@ -151,22 +188,25 @@ impl NetworkJson {
/// Sets all current throughput values to zero /// Sets all current throughput values to zero
pub fn zero_throughput_and_rtt(&mut self) { pub fn zero_throughput_and_rtt(&mut self) {
self.nodes.iter_mut().for_each(|n| { self.nodes.iter_mut().for_each(|n| {
n.current_throughput = (0, 0); n.current_throughput.0.store(0, std::sync::atomic::Ordering::Relaxed);
n.current_throughput.1.store(0, std::sync::atomic::Ordering::Relaxed);
n.rtts.clear(); n.rtts.clear();
}); });
} }
/// Add throughput numbers to node entries /// Add throughput numbers to node entries. Note that this does *not* require
/// mutable access due to atomics and interior mutability - so it is safe to use
/// a read lock.
pub fn add_throughput_cycle( pub fn add_throughput_cycle(
&mut self, &self,
targets: &[usize], targets: &[usize],
bytes: (u64, u64), bytes: (u64, u64),
) { ) {
for idx in targets { for idx in targets {
// Safety first: use "get" to ensure that the node exists // Safety first: use "get" to ensure that the node exists
if let Some(node) = self.nodes.get_mut(*idx) { if let Some(node) = self.nodes.get(*idx) {
node.current_throughput.0 += bytes.0; node.current_throughput.0.fetch_add(bytes.0, std::sync::atomic::Ordering::Relaxed);
node.current_throughput.1 += bytes.1; node.current_throughput.1.fetch_add(bytes.1, std::sync::atomic::Ordering::Relaxed);
} else { } else {
warn!("No network tree entry for index {idx}"); warn!("No network tree entry for index {idx}");
} }
@ -219,7 +259,7 @@ fn recurse_node(
json_to_u32(json.get("downloadBandwidthMbps")), json_to_u32(json.get("downloadBandwidthMbps")),
json_to_u32(json.get("uploadBandwidthMbps")), json_to_u32(json.get("uploadBandwidthMbps")),
), ),
current_throughput: (0, 0), current_throughput: (AtomicU64::new(0), AtomicU64::new(0)),
name: name.to_string(), name: name.to_string(),
immediate_parent: Some(immediate_parent), immediate_parent: Some(immediate_parent),
rtts: Vec::new(), rtts: Vec::new(),

View File

@ -1,7 +1,7 @@
use std::net::IpAddr; use std::net::IpAddr;
use lqos_bus::{bus_request, BusRequest, BusResponse}; use lqos_bus::{bus_request, BusRequest, BusResponse};
use lqos_config::NetworkJsonNode; use lqos_config::NetworkJsonTransport;
use rocket::{ use rocket::{
fs::NamedFile, fs::NamedFile,
serde::{json::Json, Serialize}, serde::{json::Json, Serialize},
@ -19,7 +19,7 @@ pub async fn tree_page<'a>() -> NoCache<Option<NamedFile>> {
#[get("/api/network_tree/<parent>")] #[get("/api/network_tree/<parent>")]
pub async fn tree_entry( pub async fn tree_entry(
parent: usize, parent: usize,
) -> NoCache<Json<Vec<(usize, NetworkJsonNode)>>> { ) -> NoCache<Json<Vec<(usize, NetworkJsonTransport)>>> {
let responses = let responses =
bus_request(vec![BusRequest::GetNetworkMap { parent }]).await.unwrap(); bus_request(vec![BusRequest::GetNetworkMap { parent }]).await.unwrap();
let result = match &responses[0] { let result = match &responses[0] {
@ -32,7 +32,7 @@ pub async fn tree_entry(
#[get("/api/network_tree_summary")] #[get("/api/network_tree_summary")]
pub async fn network_tree_summary( pub async fn network_tree_summary(
) -> NoCache<Json<Vec<(usize, NetworkJsonNode)>>> { ) -> NoCache<Json<Vec<(usize, NetworkJsonTransport)>>> {
let responses = let responses =
bus_request(vec![BusRequest::TopMapQueues(4)]).await.unwrap(); bus_request(vec![BusRequest::TopMapQueues(4)]).await.unwrap();
let result = match &responses[0] { let result = match &responses[0] {
@ -106,7 +106,7 @@ pub async fn node_names(
#[get("/api/funnel_for_queue/<circuit_id>")] #[get("/api/funnel_for_queue/<circuit_id>")]
pub async fn funnel_for_queue( pub async fn funnel_for_queue(
circuit_id: String, circuit_id: String,
) -> NoCache<Json<Vec<(usize, NetworkJsonNode)>>> { ) -> NoCache<Json<Vec<(usize, NetworkJsonTransport)>>> {
let mut result = Vec::new(); let mut result = Vec::new();
let target = SHAPED_DEVICES let target = SHAPED_DEVICES

View File

@ -1,7 +1,7 @@
use anyhow::Result; use anyhow::Result;
use log::{error, info, warn}; use log::{error, info, warn};
use lqos_bus::BusResponse; use lqos_bus::BusResponse;
use lqos_config::{ConfigShapedDevices, NetworkJsonNode}; use lqos_config::{ConfigShapedDevices, NetworkJsonTransport};
use lqos_utils::file_watcher::FileWatcher; use lqos_utils::file_watcher::FileWatcher;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::sync::RwLock; use std::sync::RwLock;
@ -91,7 +91,7 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse {
nodes.push(( nodes.push((
0, 0,
NetworkJsonNode { NetworkJsonTransport {
name: "Others".into(), name: "Others".into(),
max_throughput: (0, 0), max_throughput: (0, 0),
current_throughput: other_bw, current_throughput: other_bw,
@ -124,7 +124,7 @@ pub fn get_funnel(circuit_id: &str) -> BusResponse {
// Reverse the scanning order and skip the last entry (the parent) // Reverse the scanning order and skip the last entry (the parent)
let mut result = Vec::new(); let mut result = Vec::new();
for idx in reader.nodes[index].parents.iter().rev().skip(1) { for idx in reader.nodes[index].parents.iter().rev().skip(1) {
result.push((*idx, reader.nodes[*idx].clone())); result.push((*idx, reader.nodes[*idx].clone_to_transit()));
} }
return BusResponse::NetworkMap(result); return BusResponse::NetworkMap(result);
} }

View File

@ -125,7 +125,7 @@ impl ThroughputTracker {
entry.most_recent_cycle = cycle; entry.most_recent_cycle = cycle;
if let Some(parents) = &entry.network_json_parents { if let Some(parents) = &entry.network_json_parents {
let mut net_json = NETWORK_JSON.write().unwrap(); let net_json = NETWORK_JSON.read().unwrap();
net_json.add_throughput_cycle( net_json.add_throughput_cycle(
parents, parents,
( (