Cadence issue. Sometimes, polling the NETWORK_JSON tree would give invalid results (or all zeroes) - if the tree was in the process of being updated. This was particularly pronounced on some systems is the cadence of the refresh ticks and the update ticks aligned too well - you'd always see zeroes, or flashing to zero in network maps.

This commit adds a Futex (via the `aromic_wait`) package, and makes `nodes` private - accessible from outside only via a function that waits for the update cycle to complete.
This commit is contained in:
Herbert Wolverson 2024-07-09 14:41:41 -05:00
parent 7cf9fba83e
commit 1c1fda7434
11 changed files with 72 additions and 17 deletions

26
src/rust/Cargo.lock generated
View File

@ -232,6 +232,16 @@ dependencies = [
"bytemuck",
]
[[package]]
name = "atomic-wait"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a55b94919229f2c42292fd71ffa4b75e83193bffdd77b1e858cd55fd2d0b0ea8"
dependencies = [
"libc",
"windows-sys 0.42.0",
]
[[package]]
name = "atomic-waker"
version = "1.1.2"
@ -1946,6 +1956,7 @@ dependencies = [
name = "lqos_config"
version = "0.1.0"
dependencies = [
"atomic-wait",
"csv",
"dashmap",
"ip_network",
@ -4397,6 +4408,21 @@ dependencies = [
"windows-targets 0.52.5",
]
[[package]]
name = "windows-sys"
version = "0.42.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.45.0"

View File

@ -20,3 +20,4 @@ pyo3 = "0.20"
toml = "0.8.8"
once_cell = "1.19.0"
lqos_utils = { path = "../lqos_utils" }
atomic-wait = "1.1.0"

View File

@ -6,6 +6,8 @@ use std::{
fs,
path::{Path, PathBuf}, sync::atomic::AtomicU64,
};
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering::SeqCst;
use thiserror::Error;
use lqos_utils::units::{AtomicDownUp, DownUpOrder};
@ -112,7 +114,14 @@ pub struct NetworkJsonTransport {
pub struct NetworkJson {
/// Nodes that make up the tree, flattened and referenced by index number.
/// TODO: We should add a primary key to nodes in network.json.
pub nodes: Vec<NetworkJsonNode>,
///
/// Note that `nodes` is *private* now. This is intentional - direct
/// modification via this module is permitted, but external access was
/// running into timing issues and reading data mid-update. The locking
/// setup makes it hard to performantly lock the whole structure - so we
/// have a messy "busy" compromise.
nodes: Vec<NetworkJsonNode>,
busy: AtomicU32,
}
impl Default for NetworkJson {
@ -124,7 +133,7 @@ impl Default for NetworkJson {
impl NetworkJson {
/// Generates an empty network.json
pub fn new() -> Self {
Self { nodes: Vec::new() }
Self { nodes: Vec::new(), busy: AtomicU32::new(0) }
}
/// The path to the current `network.json` file, determined
@ -179,7 +188,7 @@ impl NetworkJson {
}
}
Ok(Self { nodes })
Ok(Self { nodes, busy: AtomicU32::new(0) })
}
/// Find the index of a circuit_id
@ -225,10 +234,21 @@ impl NetworkJson {
.map(|node| node.parents.clone())
}
/// Obtains a reference to nodes once we're sure that
/// doing so will provide valid data.
pub fn get_nodes_when_ready(&self) -> &Vec<NetworkJsonNode> {
//log::warn!("Awaiting the network tree");
atomic_wait::wait(&self.busy, 1);
//log::warn!("Acquired");
&self.nodes
}
/// Sets all current throughput values to zero
/// Note that due to interior mutability, this does not require mutable
/// access.
pub fn zero_throughput_and_rtt(&self) {
//log::warn!("Locking network tree for throughput cycle");
self.busy.store(1, SeqCst);
self.nodes.iter().for_each(|n| {
n.current_throughput.set_to_zero();
n.current_tcp_retransmits.set_to_zero();
@ -238,6 +258,12 @@ impl NetworkJson {
});
}
pub fn cycle_complete(&self) {
//log::warn!("Unlocking network tree");
self.busy.store(0, SeqCst);
atomic_wait::wake_all(&self.busy);
}
/// Add throughput numbers to node entries. Note that this does *not* require
/// mutable access due to atomics and interior mutability - so it is safe to use
/// a read lock.

View File

@ -139,7 +139,7 @@ fn recurse_weights(
node_index: usize,
) -> Result<i64> {
let mut weight = 0;
let n = &network.nodes[node_index];
let n = &network.get_nodes_when_ready()[node_index];
//println!(" Tower: {}", n.name);
device_list
@ -152,7 +152,7 @@ fn recurse_weights(
});
//println!(" Weight: {}", weight);
for (i, n) in network.nodes
for (i, n) in network.get_nodes_when_ready()
.iter()
.enumerate()
.filter(|(_i, n)| n.immediate_parent == Some(node_index))
@ -177,13 +177,13 @@ pub(crate) fn calculate_tree_weights() -> Result<Vec<NetworkNodeWeight>> {
let device_list = ConfigShapedDevices::load()?.devices;
let device_weights = get_weights_rust()?;
let network = lqos_config::NetworkJson::load()?;
let root_index = network.nodes.iter().position(|n| n.immediate_parent.is_none()).unwrap();
let root_index = network.get_nodes_when_ready().iter().position(|n| n.immediate_parent.is_none()).unwrap();
let mut result = Vec::new();
//println!("Root index is: {}", root_index);
// Find all network nodes one off the top
network
.nodes
.get_nodes_when_ready()
.iter()
.enumerate()
.filter(|(_,n)| n.immediate_parent.is_some() && n.immediate_parent.unwrap() == root_index)

View File

@ -42,7 +42,7 @@ pub fn can_we_read_shaped_devices(results: &mut Vec<SanityCheck>) {
pub fn parent_check(results: &mut Vec<SanityCheck>) {
if let Ok(net_json) = lqos_config::NetworkJson::load() {
if net_json.nodes.len() < 2 {
if net_json.get_nodes_when_ready().len() < 2 {
results.push(SanityCheck{
name: "Flat Network - Skipping Parent Check".to_string(),
success: true,
@ -53,7 +53,7 @@ pub fn parent_check(results: &mut Vec<SanityCheck>) {
if let Ok(shaped_devices) = lqos_config::ConfigShapedDevices::load() {
for sd in shaped_devices.devices.iter() {
if !net_json.nodes.iter().any(|n| n.name == sd.parent_node) {
if !net_json.get_nodes_when_ready().iter().any(|n| n.name == sd.parent_node) {
results.push(SanityCheck{
name: "Shaped Device Invalid Parent".to_string(),
success: false,

View File

@ -71,7 +71,7 @@ fn anonymous_usage_dump() -> anyhow::Result<()> {
data.git_hash = env!("GIT_HASH").to_string();
data.shaped_device_count = SHAPED_DEVICES.read().unwrap().devices.len();
data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len();
data.net_json_len = NETWORK_JSON.read().unwrap().get_nodes_when_ready().len();
data.high_watermark_bps = (
HIGH_WATERMARK.get_down(),

View File

@ -8,7 +8,7 @@ use lts_client::{
pub(crate) fn get_network_tree() -> Vec<(usize, NetworkTreeEntry)> {
if let Ok(reader) = NETWORK_JSON.read() {
let result = reader
.nodes
.get_nodes_when_ready()
.iter()
.enumerate()
.map(|(idx, n)| (idx, n.into()))

View File

@ -10,7 +10,7 @@ pub async fn get_network_tree(
) -> Json<Vec<(usize, NetworkJsonTransport)>> {
let net_json = NETWORK_JSON.read().unwrap();
let result: Vec<(usize, NetworkJsonTransport)> = net_json
.nodes
.get_nodes_when_ready()
.iter()
.enumerate()
.map(|(i, n) | (i, n.clone_to_transit()))

View File

@ -14,7 +14,7 @@ pub async fn network_tree(channels: Arc<PubSub>) {
let data: Vec<(usize, NetworkJsonTransport)> = spawn_blocking(|| {
let net_json = NETWORK_JSON.read().unwrap();
net_json
.nodes
.get_nodes_when_ready()
.iter()
.enumerate()
.map(|(i, n) | (i, n.clone_to_transit()))

View File

@ -123,7 +123,7 @@ pub fn map_node_names(nodes: &[usize]) -> BusResponse {
let mut result = Vec::new();
let reader = NETWORK_JSON.read().unwrap();
nodes.iter().for_each(|id| {
if let Some(node) = reader.nodes.get(*id) {
if let Some(node) = reader.get_nodes_when_ready().get(*id) {
result.push((*id, node.name.clone()));
}
});
@ -135,8 +135,8 @@ pub fn get_funnel(circuit_id: &str) -> BusResponse {
if let Some(index) = reader.get_index_for_name(circuit_id) {
// Reverse the scanning order and skip the last entry (the parent)
let mut result = Vec::new();
for idx in reader.nodes[index].parents.iter().rev().skip(1) {
result.push((*idx, reader.nodes[*idx].clone_to_transit()));
for idx in reader.get_nodes_when_ready()[index].parents.iter().rev().skip(1) {
result.push((*idx, reader.get_nodes_when_ready()[*idx].clone_to_transit()));
}
return BusResponse::NetworkMap(result);
}

View File

@ -398,6 +398,8 @@ impl ThroughputTracker {
pub(crate) fn next_cycle(&self) {
self.cycle.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let lock = NETWORK_JSON.read().unwrap();
lock.cycle_complete();
}
pub(crate) fn bits_per_second(&self) -> DownUpOrder<u64> {