This should be a working, more resilient ticker system. I'd forgotten to await my futures.

This commit is contained in:
Herbert Wolverson 2024-07-18 13:35:21 -05:00
parent 0c74ee56c3
commit 0235b2e657

View File

@ -30,22 +30,22 @@ async fn one_second_cadence(channels: Arc<PubSub>) {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
interval.tick().await; // Once per second interval.tick().await; // Once per second
let mc = channels.clone(); spawn(async move { cadence::cadence(mc) }); let mc = channels.clone(); spawn(async move { cadence::cadence(mc).await });
let mc = channels.clone(); spawn(async move { throughput::throughput(mc) }); let mc = channels.clone(); spawn(async move { throughput::throughput(mc).await });
let mc = channels.clone(); spawn(async move { rtt_histogram::rtt_histo(mc) }); let mc = channels.clone(); spawn(async move { rtt_histogram::rtt_histo(mc).await });
let mc = channels.clone(); spawn(async move { flow_counter::flow_count(mc) }); let mc = channels.clone(); spawn(async move { flow_counter::flow_count(mc).await });
let mc = channels.clone(); spawn(async move { top_10::top_10_downloaders(mc) }); let mc = channels.clone(); spawn(async move { top_10::top_10_downloaders(mc).await });
let mc = channels.clone(); spawn(async move { top_10::worst_10_downloaders(mc) }); let mc = channels.clone(); spawn(async move { top_10::worst_10_downloaders(mc).await });
let mc = channels.clone(); spawn(async move { top_10::worst_10_retransmit(mc) }); let mc = channels.clone(); spawn(async move { top_10::worst_10_retransmit(mc).await });
let mc = channels.clone(); spawn(async move { top_flows::top_flows_bytes(mc) }); let mc = channels.clone(); spawn(async move { top_flows::top_flows_bytes(mc).await });
let mc = channels.clone(); spawn(async move { top_flows::top_flows_rate(mc) }); let mc = channels.clone(); spawn(async move { top_flows::top_flows_rate(mc).await });
let mc = channels.clone(); spawn(async move { flow_endpoints::endpoints_by_country(mc) }); let mc = channels.clone(); spawn(async move { flow_endpoints::endpoints_by_country(mc).await });
let mc = channels.clone(); spawn(async move { flow_endpoints::ether_protocols(mc) }); let mc = channels.clone(); spawn(async move { flow_endpoints::ether_protocols(mc).await });
let mc = channels.clone(); spawn(async move { flow_endpoints::ip_protocols(mc) }); let mc = channels.clone(); spawn(async move { flow_endpoints::ip_protocols(mc).await });
let mc = channels.clone(); spawn(async move { tree_summary::tree_summary(mc) }); let mc = channels.clone(); spawn(async move { tree_summary::tree_summary(mc).await });
let mc = channels.clone(); spawn(async move { network_tree::network_tree(mc) }); let mc = channels.clone(); spawn(async move { network_tree::network_tree(mc).await });
let mc = channels.clone(); spawn(async move { circuit_capacity::circuit_capacity(mc) }); let mc = channels.clone(); spawn(async move { circuit_capacity::circuit_capacity(mc).await });
let mc = channels.clone(); spawn(async move { tree_capacity::tree_capacity(mc) }); let mc = channels.clone(); spawn(async move { tree_capacity::tree_capacity(mc).await });
} }
} }
@ -54,8 +54,8 @@ async fn two_second_cadence(channels: Arc<PubSub>) {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
interval.tick().await; // Once per second interval.tick().await; // Once per second
let mc = channels.clone(); spawn(async move { queue_stats_total::queue_stats_totals(mc) }); let mc = channels.clone(); spawn(async move { queue_stats_total::queue_stats_totals(mc).await });
let mc = channels.clone(); spawn(async move { network_tree::all_subscribers(mc) }); let mc = channels.clone(); spawn(async move { network_tree::all_subscribers(mc).await });
} }
} }
@ -64,7 +64,7 @@ async fn five_second_cadence(channels: Arc<PubSub>) {
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop { loop {
interval.tick().await; // Once per second interval.tick().await; // Once per second
let mc = channels.clone(); spawn(async move { system_info::cpu_info(mc) }); let mc = channels.clone(); spawn(async move { system_info::cpu_info(mc).await });
let mc = channels.clone(); spawn(async move { system_info::ram_info(mc) }); let mc = channels.clone(); spawn(async move { system_info::ram_info(mc).await });
} }
} }