Rework the ticker system for improved resilience. (Fixes previous commit that was pushed too early by mistake)

This commit is contained in:
Herbert Wolverson 2024-07-18 12:57:59 -05:00
parent 45e4295055
commit a6c527bdb0

View File

@ -21,9 +21,9 @@ pub use network_tree::{Circuit, all_circuits};
/// Runs a periodic tick to feed data to the node manager. /// Runs a periodic tick to feed data to the node manager.
pub(super) async fn channel_ticker(channels: Arc<PubSub>) { pub(super) async fn channel_ticker(channels: Arc<PubSub>) {
spawn(async { one_second_cadence(channels.clone()) }); let mc = channels.clone(); spawn(async move { one_second_cadence(mc) });
spawn(async { two_second_cadence(channels.clone()) }); let mc = channels.clone(); spawn(async move { two_second_cadence(mc) });
spawn(async { five_second_cadence(channels.clone()) }); let mc = channels.clone(); spawn(async move { five_second_cadence(mc) });
} }
async fn one_second_cadence(channels: Arc<PubSub>) { async fn one_second_cadence(channels: Arc<PubSub>) {
@ -31,22 +31,22 @@ async fn one_second_cadence(channels: Arc<PubSub>) {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
interval.tick().await; // Once per second interval.tick().await; // Once per second
spawn(async { cadence::cadence(channels.clone()) }); let mc = channels.clone(); spawn(async move { cadence::cadence(mc) });
spawn(async { throughput::throughput(channels.clone()) }); let mc = channels.clone(); spawn(async move { throughput::throughput(mc) });
spawn(async { rtt_histogram::rtt_histo(channels.clone()) }); let mc = channels.clone(); spawn(async move { rtt_histogram::rtt_histo(mc) });
spawn(async { flow_counter::flow_count(channels.clone()) }); let mc = channels.clone(); spawn(async move { flow_counter::flow_count(mc) });
spawn(async { top_10::top_10_downloaders(channels.clone()) }); let mc = channels.clone(); spawn(async move { top_10::top_10_downloaders(mc) });
spawn(async { top_10::worst_10_downloaders(channels.clone()) }); let mc = channels.clone(); spawn(async move { top_10::worst_10_downloaders(mc) });
spawn(async { top_10::worst_10_retransmit(channels.clone()) }); let mc = channels.clone(); spawn(async move { top_10::worst_10_retransmit(mc) });
spawn(async { top_flows::top_flows_bytes(channels.clone()) }); let mc = channels.clone(); spawn(async move { top_flows::top_flows_bytes(mc) });
spawn(async { top_flows::top_flows_rate(channels.clone()) }); let mc = channels.clone(); spawn(async move { top_flows::top_flows_rate(mc) });
spawn(async { flow_endpoints::endpoints_by_country(channels.clone()) }); let mc = channels.clone(); spawn(async move { flow_endpoints::endpoints_by_country(mc) });
spawn(async { flow_endpoints::ether_protocols(channels.clone()) }); let mc = channels.clone(); spawn(async move { flow_endpoints::ether_protocols(mc) });
spawn(async { flow_endpoints::ip_protocols(channels.clone()) }); let mc = channels.clone(); spawn(async move { flow_endpoints::ip_protocols(mc) });
spawn(async { tree_summary::tree_summary(channels.clone()) }); let mc = channels.clone(); spawn(async move { tree_summary::tree_summary(mc) });
spawn(async { network_tree::network_tree(channels.clone()) }); let mc = channels.clone(); spawn(async move { network_tree::network_tree(mc) });
spawn(async { circuit_capacity::circuit_capacity(channels.clone()) }); let mc = channels.clone(); spawn(async move { circuit_capacity::circuit_capacity(mc) });
spawn(async { tree_capacity::tree_capacity(channels.clone()) }); let mc = channels.clone(); spawn(async move { tree_capacity::tree_capacity(mc) });
} }
} }
@ -55,8 +55,8 @@ async fn two_second_cadence(channels: Arc<PubSub>) {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
interval.tick().await; // Once per second interval.tick().await; // Once per second
spawn(async { queue_stats_total::queue_stats_totals(channels.clone()) }); let mc = channels.clone(); spawn(async move { queue_stats_total::queue_stats_totals(mc) });
spawn(async { network_tree::all_subscribers(channels.clone()) }); let mc = channels.clone(); spawn(async move { network_tree::all_subscribers(mc) });
} }
} }
@ -65,7 +65,7 @@ async fn five_second_cadence(channels: Arc<PubSub>) {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
interval.tick().await; // Once per second interval.tick().await; // Once per second
spawn(async { system_info::cpu_info(channels.clone()) }); let mc = channels.clone(); spawn(async move { system_info::cpu_info(mc) });
spawn(async { system_info::ram_info(channels.clone()) }); let mc = channels.clone(); spawn(async move { system_info::ram_info(mc) });
} }
} }