Remove timeouts from the ticker system, because async code interrupting sync code that may hold a lock can cause problems when the Drop doesn't fire and free the lock properly.

This commit is contained in:
Herbert Wolverson 2024-07-16 14:26:52 -05:00
parent c5e25035ef
commit 6a34d895eb

View File

@ -1,11 +1,7 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::join;
use tokio::time::timeout;
use crate::node_manager::ws::publish_subscribe::PubSub;
mod cadence;
mod throughput;
mod rtt_histogram;
@ -31,51 +27,48 @@ pub(super) async fn channel_ticker(channels: Arc<PubSub>) {
async fn one_second_cadence(channels: Arc<PubSub>) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1));
let timeout_time = Duration::from_secs_f32(0.9);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await; // Once per second
let _ = join!(
timeout(timeout_time, cadence::cadence(channels.clone())),
timeout(timeout_time, throughput::throughput(channels.clone())),
timeout(timeout_time, rtt_histogram::rtt_histo(channels.clone())),
timeout(timeout_time, flow_counter::flow_count(channels.clone())),
timeout(timeout_time, top_10::top_10_downloaders(channels.clone())),
timeout(timeout_time, top_10::worst_10_downloaders(channels.clone())),
timeout(timeout_time, top_10::worst_10_retransmit(channels.clone())),
timeout(timeout_time, top_flows::top_flows_bytes(channels.clone())),
timeout(timeout_time, top_flows::top_flows_rate(channels.clone())),
timeout(timeout_time, flow_endpoints::endpoints_by_country(channels.clone())),
timeout(timeout_time, flow_endpoints::ether_protocols(channels.clone())),
timeout(timeout_time, flow_endpoints::ip_protocols(channels.clone())),
timeout(timeout_time, tree_summary::tree_summary(channels.clone())),
timeout(timeout_time, network_tree::network_tree(channels.clone())),
cadence::cadence(channels.clone()),
throughput::throughput(channels.clone()),
rtt_histogram::rtt_histo(channels.clone()),
flow_counter::flow_count(channels.clone()),
top_10::top_10_downloaders(channels.clone()),
top_10::worst_10_downloaders(channels.clone()),
top_10::worst_10_retransmit(channels.clone()),
top_flows::top_flows_bytes(channels.clone()),
top_flows::top_flows_rate(channels.clone()),
flow_endpoints::endpoints_by_country(channels.clone()),
flow_endpoints::ether_protocols(channels.clone()),
flow_endpoints::ip_protocols(channels.clone()),
tree_summary::tree_summary(channels.clone()),
network_tree::network_tree(channels.clone()),
);
}
}
async fn two_second_cadence(channels: Arc<PubSub>) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(2));
let timeout_time = Duration::from_secs_f32(1.9);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await; // Once per second
let _ = join!(
timeout(timeout_time, queue_stats_total::queue_stats_totals(channels.clone())),
timeout(timeout_time, network_tree::all_subscribers(channels.clone())),
queue_stats_total::queue_stats_totals(channels.clone()),
network_tree::all_subscribers(channels.clone()),
);
}
}
async fn five_second_cadence(channels: Arc<PubSub>) {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
let timeout_time = Duration::from_secs_f32(4.9);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
interval.tick().await; // Once per second
let _ = join!(
timeout(timeout_time, system_info::cpu_info(channels.clone())),
timeout(timeout_time, system_info::ram_info(channels.clone())),
system_info::cpu_info(channels.clone()),
system_info::ram_info(channels.clone()),
);
}
}