mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2024-11-22 08:16:25 -06:00
Rework the ticker system for improved resilience.
This commit is contained in:
parent
3e7e42f5e5
commit
45e4295055
@ -1,6 +1,6 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::join;
|
use tokio::spawn;
|
||||||
use crate::node_manager::ws::publish_subscribe::PubSub;
|
use crate::node_manager::ws::publish_subscribe::PubSub;
|
||||||
mod cadence;
|
mod cadence;
|
||||||
mod throughput;
|
mod throughput;
|
||||||
@ -21,11 +21,9 @@ pub use network_tree::{Circuit, all_circuits};
|
|||||||
|
|
||||||
/// Runs a periodic tick to feed data to the node manager.
|
/// Runs a periodic tick to feed data to the node manager.
|
||||||
pub(super) async fn channel_ticker(channels: Arc<PubSub>) {
|
pub(super) async fn channel_ticker(channels: Arc<PubSub>) {
|
||||||
join!(
|
spawn(async { one_second_cadence(channels.clone()) });
|
||||||
one_second_cadence(channels.clone()),
|
spawn(async { two_second_cadence(channels.clone()) });
|
||||||
two_second_cadence(channels.clone()),
|
spawn(async { five_second_cadence(channels.clone()) });
|
||||||
five_second_cadence(channels.clone()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn one_second_cadence(channels: Arc<PubSub>) {
|
async fn one_second_cadence(channels: Arc<PubSub>) {
|
||||||
@ -33,24 +31,22 @@ async fn one_second_cadence(channels: Arc<PubSub>) {
|
|||||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await; // Once per second
|
interval.tick().await; // Once per second
|
||||||
let _ = join!(
|
spawn(async { cadence::cadence(channels.clone()) });
|
||||||
cadence::cadence(channels.clone()),
|
spawn(async { throughput::throughput(channels.clone()) });
|
||||||
throughput::throughput(channels.clone()),
|
spawn(async { rtt_histogram::rtt_histo(channels.clone()) });
|
||||||
rtt_histogram::rtt_histo(channels.clone()),
|
spawn(async { flow_counter::flow_count(channels.clone()) });
|
||||||
flow_counter::flow_count(channels.clone()),
|
spawn(async { top_10::top_10_downloaders(channels.clone()) });
|
||||||
top_10::top_10_downloaders(channels.clone()),
|
spawn(async { top_10::worst_10_downloaders(channels.clone()) });
|
||||||
top_10::worst_10_downloaders(channels.clone()),
|
spawn(async { top_10::worst_10_retransmit(channels.clone()) });
|
||||||
top_10::worst_10_retransmit(channels.clone()),
|
spawn(async { top_flows::top_flows_bytes(channels.clone()) });
|
||||||
top_flows::top_flows_bytes(channels.clone()),
|
spawn(async { top_flows::top_flows_rate(channels.clone()) });
|
||||||
top_flows::top_flows_rate(channels.clone()),
|
spawn(async { flow_endpoints::endpoints_by_country(channels.clone()) });
|
||||||
flow_endpoints::endpoints_by_country(channels.clone()),
|
spawn(async { flow_endpoints::ether_protocols(channels.clone()) });
|
||||||
flow_endpoints::ether_protocols(channels.clone()),
|
spawn(async { flow_endpoints::ip_protocols(channels.clone()) });
|
||||||
flow_endpoints::ip_protocols(channels.clone()),
|
spawn(async { tree_summary::tree_summary(channels.clone()) });
|
||||||
tree_summary::tree_summary(channels.clone()),
|
spawn(async { network_tree::network_tree(channels.clone()) });
|
||||||
network_tree::network_tree(channels.clone()),
|
spawn(async { circuit_capacity::circuit_capacity(channels.clone()) });
|
||||||
circuit_capacity::circuit_capacity(channels.clone()),
|
spawn(async { tree_capacity::tree_capacity(channels.clone()) });
|
||||||
tree_capacity::tree_capacity(channels.clone()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,10 +55,8 @@ async fn two_second_cadence(channels: Arc<PubSub>) {
|
|||||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await; // Once per second
|
interval.tick().await; // Once per second
|
||||||
let _ = join!(
|
spawn(async { queue_stats_total::queue_stats_totals(channels.clone()) });
|
||||||
queue_stats_total::queue_stats_totals(channels.clone()),
|
spawn(async { network_tree::all_subscribers(channels.clone()) });
|
||||||
network_tree::all_subscribers(channels.clone()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,9 +65,7 @@ async fn five_second_cadence(channels: Arc<PubSub>) {
|
|||||||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await; // Once per second
|
interval.tick().await; // Once per second
|
||||||
let _ = join!(
|
spawn(async { system_info::cpu_info(channels.clone()) });
|
||||||
system_info::cpu_info(channels.clone()),
|
spawn(async { system_info::ram_info(channels.clone()) });
|
||||||
system_info::ram_info(channels.clone()),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user