From 1c1fda7434bdd458085bda161740caaa8b14f847 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Tue, 9 Jul 2024 14:41:41 -0500 Subject: [PATCH] Cadence issue. Sometimes, polling the NETWORK_JSON tree would give invalid results (or all zeroes) - if the tree was in the process of being updated. This was particularly pronounced on some systems is the cadence of the refresh ticks and the update ticks aligned too well - you'd always see zeroes, or flashing to zero in network maps. This commit adds a Futex (via the `aromic_wait`) package, and makes `nodes` private - accessible from outside only via a function that waits for the update cycle to complete. --- src/rust/Cargo.lock | 26 +++++++++++++++ src/rust/lqos_config/Cargo.toml | 3 +- src/rust/lqos_config/src/network_json/mod.rs | 32 +++++++++++++++++-- src/rust/lqos_python/src/device_weights.rs | 8 ++--- .../src/sanity_checks/shaped_devices.rs | 4 +-- src/rust/lqosd/src/anonymous_usage/mod.rs | 2 +- src/rust/lqosd/src/long_term_stats/mod.rs | 2 +- .../node_manager/local_api/network_tree.rs | 2 +- .../node_manager/ws/ticker/network_tree.rs | 2 +- .../lqosd/src/shaped_devices_tracker/mod.rs | 6 ++-- .../src/throughput_tracker/tracking_data.rs | 2 ++ 11 files changed, 72 insertions(+), 17 deletions(-) diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index c799a958..4846f472 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -232,6 +232,16 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "atomic-wait" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a55b94919229f2c42292fd71ffa4b75e83193bffdd77b1e858cd55fd2d0b0ea8" +dependencies = [ + "libc", + "windows-sys 0.42.0", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1946,6 +1956,7 @@ dependencies = [ name = "lqos_config" version = "0.1.0" dependencies = [ + "atomic-wait", "csv", "dashmap", "ip_network", @@ -4397,6 +4408,21 @@ dependencies = [ "windows-targets 0.52.5", ] +[[package]] +name = "windows-sys" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a3e1820f08b8513f676f7ab6c1f99ff312fb97b553d30ff4dd86f9f15728aa7" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-sys" version = "0.45.0" diff --git a/src/rust/lqos_config/Cargo.toml b/src/rust/lqos_config/Cargo.toml index 60cb2719..0d28c270 100644 --- a/src/rust/lqos_config/Cargo.toml +++ b/src/rust/lqos_config/Cargo.toml @@ -19,4 +19,5 @@ dashmap = "5" pyo3 = "0.20" toml = "0.8.8" once_cell = "1.19.0" -lqos_utils = { path = "../lqos_utils" } \ No newline at end of file +lqos_utils = { path = "../lqos_utils" } +atomic-wait = "1.1.0" diff --git a/src/rust/lqos_config/src/network_json/mod.rs b/src/rust/lqos_config/src/network_json/mod.rs index 2ef24265..9d377575 100644 --- a/src/rust/lqos_config/src/network_json/mod.rs +++ b/src/rust/lqos_config/src/network_json/mod.rs @@ -6,6 +6,8 @@ use std::{ fs, path::{Path, PathBuf}, sync::atomic::AtomicU64, }; +use std::sync::atomic::AtomicU32; +use std::sync::atomic::Ordering::SeqCst; use thiserror::Error; use lqos_utils::units::{AtomicDownUp, DownUpOrder}; @@ -112,7 +114,14 @@ pub struct NetworkJsonTransport { pub struct NetworkJson { /// Nodes that make up the tree, flattened and referenced by index number. /// TODO: We should add a primary key to nodes in network.json. - pub nodes: Vec, + /// + /// Note that `nodes` is *private* now. This is intentional - direct + /// modification via this module is permitted, but external access was + /// running into timing issues and reading data mid-update. The locking + /// setup makes it hard to performantly lock the whole structure - so we + /// have a messy "busy" compromise. + nodes: Vec, + busy: AtomicU32, } impl Default for NetworkJson { @@ -124,7 +133,7 @@ impl Default for NetworkJson { impl NetworkJson { /// Generates an empty network.json pub fn new() -> Self { - Self { nodes: Vec::new() } + Self { nodes: Vec::new(), busy: AtomicU32::new(0) } } /// The path to the current `network.json` file, determined @@ -179,7 +188,7 @@ impl NetworkJson { } } - Ok(Self { nodes }) + Ok(Self { nodes, busy: AtomicU32::new(0) }) } /// Find the index of a circuit_id @@ -225,10 +234,21 @@ impl NetworkJson { .map(|node| node.parents.clone()) } + /// Obtains a reference to nodes once we're sure that + /// doing so will provide valid data. + pub fn get_nodes_when_ready(&self) -> &Vec { + //log::warn!("Awaiting the network tree"); + atomic_wait::wait(&self.busy, 1); + //log::warn!("Acquired"); + &self.nodes + } + /// Sets all current throughput values to zero /// Note that due to interior mutability, this does not require mutable /// access. pub fn zero_throughput_and_rtt(&self) { + //log::warn!("Locking network tree for throughput cycle"); + self.busy.store(1, SeqCst); self.nodes.iter().for_each(|n| { n.current_throughput.set_to_zero(); n.current_tcp_retransmits.set_to_zero(); @@ -238,6 +258,12 @@ impl NetworkJson { }); } + pub fn cycle_complete(&self) { + //log::warn!("Unlocking network tree"); + self.busy.store(0, SeqCst); + atomic_wait::wake_all(&self.busy); + } + /// Add throughput numbers to node entries. Note that this does *not* require /// mutable access due to atomics and interior mutability - so it is safe to use /// a read lock. diff --git a/src/rust/lqos_python/src/device_weights.rs b/src/rust/lqos_python/src/device_weights.rs index 2656d927..49206fd1 100644 --- a/src/rust/lqos_python/src/device_weights.rs +++ b/src/rust/lqos_python/src/device_weights.rs @@ -139,7 +139,7 @@ fn recurse_weights( node_index: usize, ) -> Result { let mut weight = 0; - let n = &network.nodes[node_index]; + let n = &network.get_nodes_when_ready()[node_index]; //println!(" Tower: {}", n.name); device_list @@ -152,7 +152,7 @@ fn recurse_weights( }); //println!(" Weight: {}", weight); - for (i, n) in network.nodes + for (i, n) in network.get_nodes_when_ready() .iter() .enumerate() .filter(|(_i, n)| n.immediate_parent == Some(node_index)) @@ -177,13 +177,13 @@ pub(crate) fn calculate_tree_weights() -> Result> { let device_list = ConfigShapedDevices::load()?.devices; let device_weights = get_weights_rust()?; let network = lqos_config::NetworkJson::load()?; - let root_index = network.nodes.iter().position(|n| n.immediate_parent.is_none()).unwrap(); + let root_index = network.get_nodes_when_ready().iter().position(|n| n.immediate_parent.is_none()).unwrap(); let mut result = Vec::new(); //println!("Root index is: {}", root_index); // Find all network nodes one off the top network - .nodes + .get_nodes_when_ready() .iter() .enumerate() .filter(|(_,n)| n.immediate_parent.is_some() && n.immediate_parent.unwrap() == root_index) diff --git a/src/rust/lqos_support_tool/src/sanity_checks/shaped_devices.rs b/src/rust/lqos_support_tool/src/sanity_checks/shaped_devices.rs index edc387e9..91995d3a 100644 --- a/src/rust/lqos_support_tool/src/sanity_checks/shaped_devices.rs +++ b/src/rust/lqos_support_tool/src/sanity_checks/shaped_devices.rs @@ -42,7 +42,7 @@ pub fn can_we_read_shaped_devices(results: &mut Vec) { pub fn parent_check(results: &mut Vec) { if let Ok(net_json) = lqos_config::NetworkJson::load() { - if net_json.nodes.len() < 2 { + if net_json.get_nodes_when_ready().len() < 2 { results.push(SanityCheck{ name: "Flat Network - Skipping Parent Check".to_string(), success: true, @@ -53,7 +53,7 @@ pub fn parent_check(results: &mut Vec) { if let Ok(shaped_devices) = lqos_config::ConfigShapedDevices::load() { for sd in shaped_devices.devices.iter() { - if !net_json.nodes.iter().any(|n| n.name == sd.parent_node) { + if !net_json.get_nodes_when_ready().iter().any(|n| n.name == sd.parent_node) { results.push(SanityCheck{ name: "Shaped Device Invalid Parent".to_string(), success: false, diff --git a/src/rust/lqosd/src/anonymous_usage/mod.rs b/src/rust/lqosd/src/anonymous_usage/mod.rs index fbbb44c1..19331886 100644 --- a/src/rust/lqosd/src/anonymous_usage/mod.rs +++ b/src/rust/lqosd/src/anonymous_usage/mod.rs @@ -71,7 +71,7 @@ fn anonymous_usage_dump() -> anyhow::Result<()> { data.git_hash = env!("GIT_HASH").to_string(); data.shaped_device_count = SHAPED_DEVICES.read().unwrap().devices.len(); - data.net_json_len = NETWORK_JSON.read().unwrap().nodes.len(); + data.net_json_len = NETWORK_JSON.read().unwrap().get_nodes_when_ready().len(); data.high_watermark_bps = ( HIGH_WATERMARK.get_down(), diff --git a/src/rust/lqosd/src/long_term_stats/mod.rs b/src/rust/lqosd/src/long_term_stats/mod.rs index 8628e0db..c87bb789 100644 --- a/src/rust/lqosd/src/long_term_stats/mod.rs +++ b/src/rust/lqosd/src/long_term_stats/mod.rs @@ -8,7 +8,7 @@ use lts_client::{ pub(crate) fn get_network_tree() -> Vec<(usize, NetworkTreeEntry)> { if let Ok(reader) = NETWORK_JSON.read() { let result = reader - .nodes + .get_nodes_when_ready() .iter() .enumerate() .map(|(idx, n)| (idx, n.into())) diff --git a/src/rust/lqosd/src/node_manager/local_api/network_tree.rs b/src/rust/lqosd/src/node_manager/local_api/network_tree.rs index 763c5523..2eaaac3c 100644 --- a/src/rust/lqosd/src/node_manager/local_api/network_tree.rs +++ b/src/rust/lqosd/src/node_manager/local_api/network_tree.rs @@ -10,7 +10,7 @@ pub async fn get_network_tree( ) -> Json> { let net_json = NETWORK_JSON.read().unwrap(); let result: Vec<(usize, NetworkJsonTransport)> = net_json - .nodes + .get_nodes_when_ready() .iter() .enumerate() .map(|(i, n) | (i, n.clone_to_transit())) diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs b/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs index a3f59e2a..c5047038 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs @@ -14,7 +14,7 @@ pub async fn network_tree(channels: Arc) { let data: Vec<(usize, NetworkJsonTransport)> = spawn_blocking(|| { let net_json = NETWORK_JSON.read().unwrap(); net_json - .nodes + .get_nodes_when_ready() .iter() .enumerate() .map(|(i, n) | (i, n.clone_to_transit())) diff --git a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs index 9a440afc..738c10c0 100644 --- a/src/rust/lqosd/src/shaped_devices_tracker/mod.rs +++ b/src/rust/lqosd/src/shaped_devices_tracker/mod.rs @@ -123,7 +123,7 @@ pub fn map_node_names(nodes: &[usize]) -> BusResponse { let mut result = Vec::new(); let reader = NETWORK_JSON.read().unwrap(); nodes.iter().for_each(|id| { - if let Some(node) = reader.nodes.get(*id) { + if let Some(node) = reader.get_nodes_when_ready().get(*id) { result.push((*id, node.name.clone())); } }); @@ -135,8 +135,8 @@ pub fn get_funnel(circuit_id: &str) -> BusResponse { if let Some(index) = reader.get_index_for_name(circuit_id) { // Reverse the scanning order and skip the last entry (the parent) let mut result = Vec::new(); - for idx in reader.nodes[index].parents.iter().rev().skip(1) { - result.push((*idx, reader.nodes[*idx].clone_to_transit())); + for idx in reader.get_nodes_when_ready()[index].parents.iter().rev().skip(1) { + result.push((*idx, reader.get_nodes_when_ready()[*idx].clone_to_transit())); } return BusResponse::NetworkMap(result); } diff --git a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs index 59234871..eebb1c8b 100644 --- a/src/rust/lqosd/src/throughput_tracker/tracking_data.rs +++ b/src/rust/lqosd/src/throughput_tracker/tracking_data.rs @@ -398,6 +398,8 @@ impl ThroughputTracker { pub(crate) fn next_cycle(&self) { self.cycle.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let lock = NETWORK_JSON.read().unwrap(); + lock.cycle_complete(); } pub(crate) fn bits_per_second(&self) -> DownUpOrder {