Replace flow submission channel with a bounded crossbeam channel.

This commit is contained in:
Herbert Wolverson 2024-10-09 13:11:05 -05:00
parent e20f57e056
commit 2defbf7ce4
3 changed files with 8 additions and 10 deletions

View File

@ -9,13 +9,10 @@ mod flow_analysis;
use crate::throughput_tracker::flow_data::{flow_analysis::FinishedFlowAnalysis, netflow5::Netflow5, netflow9::Netflow9};
pub(crate) use flow_tracker::{ALL_FLOWS, AsnId, FlowbeeLocalData};
use lqos_sys::flowbee_data::FlowbeeKey;
use std::sync::{
mpsc::{channel, Sender},
Arc,
};
use std::sync::Arc;
use tracing::{debug, error, info};
use anyhow::Result;
use crossbeam_channel::Sender;
pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country,
FlowAnalysis, RECENT_FLOWS, flowbee_handle_events, get_flowbee_event_count_and_reset,
expire_rtt_flows, flowbee_rtt_map, RttData, get_rtt_events_per_second, AsnListEntry,
@ -28,8 +25,9 @@ trait FlowbeeRecipient {
// Creates the netflow tracker and returns the sender
pub fn setup_netflow_tracker() -> Result<Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>> {
let (tx, rx) = channel::<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>();
let config = lqos_config::load_config().unwrap();
let (tx, rx) = crossbeam_channel::bounded::<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>(65535);
let config = lqos_config::load_config()
.inspect_err(|e| error!("Failed to load configuration: {e}"))?;
std::thread::Builder::new()
.name("Netflow Tracker".to_string())

View File

@ -37,7 +37,7 @@ pub static THROUGHPUT_TRACKER: Lazy<ThroughputTracker> = Lazy::new(ThroughputTra
/// collection thread that there is fresh data.
pub fn spawn_throughput_monitor(
long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
) -> anyhow::Result<()> {
debug!("Starting the bandwidth monitor thread.");
std::thread::Builder::new()
@ -101,7 +101,7 @@ impl ThroughputTaskTimeMetrics {
fn throughput_task(
long_term_stats_tx: Sender<StatsUpdateMessage>,
netflow_sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
netflow_sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
) {
// Obtain the flow timeout from the config, default to 30 seconds
let timeout_seconds = if let Ok(config) = lqos_config::load_config() {

View File

@ -191,7 +191,7 @@ impl ThroughputTracker {
&self,
timeout_seconds: u64,
_netflow_enabled: bool,
sender: std::sync::mpsc::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
sender: crossbeam_channel::Sender<(FlowbeeKey, (FlowbeeLocalData, FlowAnalysis))>,
net_json_calc: &mut NetworkJsonCounting,
) {
//log::debug!("Flowbee events this second: {}", get_flowbee_event_count_and_reset());