From 7cf9fba83e391a181c78d824cde447206793d4db Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Tue, 9 Jul 2024 11:05:52 -0500 Subject: [PATCH] Set 3 different tick cadences (1, 2 and 5 seconds) to reduce the weight of the heavier operations (such as browsing the whole tree), and avoid updating data that isn't updated that frequently (such as Cake stats). --- src/rust/lqosd/src/node_manager/ws/ticker.rs | 81 ++++++++++++++------ 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/src/rust/lqosd/src/node_manager/ws/ticker.rs b/src/rust/lqosd/src/node_manager/ws/ticker.rs index 9dacdf48..2a48e493 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker.rs @@ -1,3 +1,11 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::join; +use tokio::time::timeout; + +use crate::node_manager::ws::publish_subscribe::PubSub; + mod cadence; mod throughput; mod rtt_histogram; @@ -11,36 +19,61 @@ mod tree_summary; mod queue_stats_total; mod network_tree; -use std::sync::Arc; -use std::time::Duration; -use tokio::time::timeout; -use crate::node_manager::ws::publish_subscribe::PubSub; - /// Runs a periodic tick to feed data to the node manager. pub(super) async fn channel_ticker(channels: Arc) { + join!( + one_second_cadence(channels.clone()), + two_second_cadence(channels.clone()), + five_second_cadence(channels.clone()), + ); +} + +async fn one_second_cadence(channels: Arc) { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + let timeout_time = Duration::from_secs_f32(0.9); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; // Once per second - - let _ = tokio::join!( - timeout(Duration::from_secs_f32(0.9), cadence::cadence(channels.clone())), - timeout(Duration::from_secs_f32(0.9), throughput::throughput(channels.clone())), - timeout(Duration::from_secs_f32(0.9), rtt_histogram::rtt_histo(channels.clone())), - timeout(Duration::from_secs_f32(0.9), flow_counter::flow_count(channels.clone())), - timeout(Duration::from_secs_f32(0.9), top_10::top_10_downloaders(channels.clone())), - timeout(Duration::from_secs_f32(0.9), top_10::worst_10_downloaders(channels.clone())), - timeout(Duration::from_secs_f32(0.9), top_10::worst_10_retransmit(channels.clone())), - timeout(Duration::from_secs_f32(0.9), top_flows::top_flows_bytes(channels.clone())), - timeout(Duration::from_secs_f32(0.9), top_flows::top_flows_rate(channels.clone())), - timeout(Duration::from_secs_f32(0.9), flow_endpoints::endpoints_by_country(channels.clone())), - timeout(Duration::from_secs_f32(0.9), flow_endpoints::ether_protocols(channels.clone())), - timeout(Duration::from_secs_f32(0.9), flow_endpoints::ip_protocols(channels.clone())), - timeout(Duration::from_secs_f32(0.9), system_info::cpu_info(channels.clone())), - timeout(Duration::from_secs_f32(0.9), system_info::ram_info(channels.clone())), - timeout(Duration::from_secs_f32(0.9), tree_summary::tree_summary(channels.clone())), - timeout(Duration::from_secs_f32(0.9), queue_stats_total::queue_stats_totals(channels.clone())), - timeout(Duration::from_secs_f32(0.9), network_tree::network_tree(channels.clone())), + let _ = join!( + timeout(timeout_time, cadence::cadence(channels.clone())), + timeout(timeout_time, throughput::throughput(channels.clone())), + timeout(timeout_time, rtt_histogram::rtt_histo(channels.clone())), + timeout(timeout_time, flow_counter::flow_count(channels.clone())), + timeout(timeout_time, top_10::top_10_downloaders(channels.clone())), + timeout(timeout_time, top_10::worst_10_downloaders(channels.clone())), + timeout(timeout_time, top_10::worst_10_retransmit(channels.clone())), + timeout(timeout_time, top_flows::top_flows_bytes(channels.clone())), + timeout(timeout_time, top_flows::top_flows_rate(channels.clone())), + timeout(timeout_time, flow_endpoints::endpoints_by_country(channels.clone())), + timeout(timeout_time, flow_endpoints::ether_protocols(channels.clone())), + timeout(timeout_time, flow_endpoints::ip_protocols(channels.clone())), ); } +} + +async fn two_second_cadence(channels: Arc) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2)); + let timeout_time = Duration::from_secs_f32(1.9); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + interval.tick().await; // Once per second + let _ = join!( + timeout(timeout_time, tree_summary::tree_summary(channels.clone())), + timeout(timeout_time, queue_stats_total::queue_stats_totals(channels.clone())), + ); + } +} + +async fn five_second_cadence(channels: Arc) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + let timeout_time = Duration::from_secs_f32(4.9); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { + interval.tick().await; // Once per second + let _ = join!( + timeout(timeout_time, network_tree::network_tree(channels.clone())), + timeout(timeout_time, system_info::cpu_info(channels.clone())), + timeout(timeout_time, system_info::ram_info(channels.clone())), + ); + } } \ No newline at end of file