Further reduce write locking of NETWORK_JSON by utilizing interior mutability on the RTT tracker.

This commit is contained in:
Herbert Wolverson
2023-03-08 16:16:21 +00:00
parent 1376d7e812
commit b6e2091152
2 changed files with 20 additions and 13 deletions

View File

@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::{
fs,
path::{Path, PathBuf}, sync::atomic::AtomicU64,
path::{Path, PathBuf}, sync::{atomic::AtomicU64, RwLock},
};
use thiserror::Error;
@@ -23,7 +23,7 @@ pub struct NetworkJsonNode {
/// Approximate RTTs reported for this level of the tree.
/// It's never going to be as statistically accurate as the actual
/// numbers, being based on medians.
pub rtts: Vec<f32>,
pub rtts: RwLock<Vec<f32>>,
/// A list of indices in the `NetworkJson` vector of nodes
/// linking to parent nodes
@@ -44,7 +44,7 @@ impl NetworkJsonNode {
self.current_throughput.0.load(std::sync::atomic::Ordering::Relaxed),
self.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed),
),
rtts: self.rtts.clone(),
rtts: self.rtts.read().unwrap().clone(),
parents: self.parents.clone(),
immediate_parent: self.immediate_parent,
}
@@ -119,7 +119,7 @@ impl NetworkJson {
current_throughput: (AtomicU64::new(0), AtomicU64::new(0)),
parents: Vec::new(),
immediate_parent: None,
rtts: Vec::new(),
rtts: RwLock::new(Vec::new()),
}];
if !Self::exists() {
return Err(NetworkJsonError::FileNotFound);
@@ -186,11 +186,17 @@ impl NetworkJson {
}
/// Sets all current throughput values to zero
pub fn zero_throughput_and_rtt(&mut self) {
self.nodes.iter_mut().for_each(|n| {
/// Note that due to interior mutability, this does not require mutable
/// access.
pub fn zero_throughput_and_rtt(&self) {
self.nodes.iter().for_each(|n| {
n.current_throughput.0.store(0, std::sync::atomic::Ordering::Relaxed);
n.current_throughput.1.store(0, std::sync::atomic::Ordering::Relaxed);
n.rtts.clear();
let mut size = n.rtts.read().unwrap().len();
while size > 5 {
n.rtts.write().unwrap().remove(0);
size = n.rtts.read().unwrap().len();
}
});
}
@@ -213,12 +219,13 @@ impl NetworkJson {
}
}
/// Record RTT time in the tree
pub fn add_rtt_cycle(&mut self, targets: &[usize], rtt: f32) {
/// Record RTT time in the tree. Note that due to interior mutability,
/// this does not require mutable access.
pub fn add_rtt_cycle(&self, targets: &[usize], rtt: f32) {
for idx in targets {
// Safety first: use "get" to ensure that the node exists
if let Some(node) = self.nodes.get_mut(*idx) {
node.rtts.push(rtt);
if let Some(node) = self.nodes.get(*idx) {
node.rtts.write().unwrap().push(rtt);
} else {
warn!("No network tree entry for index {idx}");
}
@@ -262,7 +269,7 @@ fn recurse_node(
current_throughput: (AtomicU64::new(0), AtomicU64::new(0)),
name: name.to_string(),
immediate_parent: Some(immediate_parent),
rtts: Vec::new(),
rtts: RwLock::new(Vec::new()),
};
if node.name != "children" {

View File

@@ -26,7 +26,7 @@ pub fn spawn_throughput_monitor() {
periodic(interval_ms, "Throughput Monitor", &mut || {
let mut throughput = THROUGHPUT_TRACKER.write().unwrap();
{
let mut net_json = NETWORK_JSON.write().unwrap();
let net_json = NETWORK_JSON.read().unwrap();
net_json.zero_throughput_and_rtt();
} // Scope to end the lock
throughput.copy_previous_and_reset_rtt();