From 45e4295055f14f4905c8f4d1411a4cd3312dc2eb Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Thu, 18 Jul 2024 12:54:21 -0500 Subject: [PATCH] Rework the ticker system for improved resilience. --- src/rust/lqosd/src/node_manager/ws/ticker.rs | 56 +++++++++----------- 1 file changed, 24 insertions(+), 32 deletions(-) diff --git a/src/rust/lqosd/src/node_manager/ws/ticker.rs b/src/rust/lqosd/src/node_manager/ws/ticker.rs index 2bc0579f..cacfc02f 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use tokio::join; +use tokio::spawn; use crate::node_manager::ws::publish_subscribe::PubSub; mod cadence; mod throughput; @@ -21,11 +21,9 @@ pub use network_tree::{Circuit, all_circuits}; /// Runs a periodic tick to feed data to the node manager. pub(super) async fn channel_ticker(channels: Arc) { - join!( - one_second_cadence(channels.clone()), - two_second_cadence(channels.clone()), - five_second_cadence(channels.clone()), - ); + spawn(async { one_second_cadence(channels.clone()) }); + spawn(async { two_second_cadence(channels.clone()) }); + spawn(async { five_second_cadence(channels.clone()) }); } async fn one_second_cadence(channels: Arc) { @@ -33,24 +31,22 @@ async fn one_second_cadence(channels: Arc) { interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; // Once per second - let _ = join!( - cadence::cadence(channels.clone()), - throughput::throughput(channels.clone()), - rtt_histogram::rtt_histo(channels.clone()), - flow_counter::flow_count(channels.clone()), - top_10::top_10_downloaders(channels.clone()), - top_10::worst_10_downloaders(channels.clone()), - top_10::worst_10_retransmit(channels.clone()), - top_flows::top_flows_bytes(channels.clone()), - top_flows::top_flows_rate(channels.clone()), - flow_endpoints::endpoints_by_country(channels.clone()), - flow_endpoints::ether_protocols(channels.clone()), - flow_endpoints::ip_protocols(channels.clone()), - tree_summary::tree_summary(channels.clone()), - network_tree::network_tree(channels.clone()), - circuit_capacity::circuit_capacity(channels.clone()), - tree_capacity::tree_capacity(channels.clone()), - ); + spawn(async { cadence::cadence(channels.clone()) }); + spawn(async { throughput::throughput(channels.clone()) }); + spawn(async { rtt_histogram::rtt_histo(channels.clone()) }); + spawn(async { flow_counter::flow_count(channels.clone()) }); + spawn(async { top_10::top_10_downloaders(channels.clone()) }); + spawn(async { top_10::worst_10_downloaders(channels.clone()) }); + spawn(async { top_10::worst_10_retransmit(channels.clone()) }); + spawn(async { top_flows::top_flows_bytes(channels.clone()) }); + spawn(async { top_flows::top_flows_rate(channels.clone()) }); + spawn(async { flow_endpoints::endpoints_by_country(channels.clone()) }); + spawn(async { flow_endpoints::ether_protocols(channels.clone()) }); + spawn(async { flow_endpoints::ip_protocols(channels.clone()) }); + spawn(async { tree_summary::tree_summary(channels.clone()) }); + spawn(async { network_tree::network_tree(channels.clone()) }); + spawn(async { circuit_capacity::circuit_capacity(channels.clone()) }); + spawn(async { tree_capacity::tree_capacity(channels.clone()) }); } } @@ -59,10 +55,8 @@ async fn two_second_cadence(channels: Arc) { interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; // Once per second - let _ = join!( - queue_stats_total::queue_stats_totals(channels.clone()), - network_tree::all_subscribers(channels.clone()), - ); + spawn(async { queue_stats_total::queue_stats_totals(channels.clone()) }); + spawn(async { network_tree::all_subscribers(channels.clone()) }); } } @@ -71,9 +65,7 @@ async fn five_second_cadence(channels: Arc) { interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; // Once per second - let _ = join!( - system_info::cpu_info(channels.clone()), - system_info::ram_info(channels.clone()), - ); + spawn(async { system_info::cpu_info(channels.clone()) }); + spawn(async { system_info::ram_info(channels.clone()) }); } } \ No newline at end of file