diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs index 4eba944b..2e52a0ce 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs @@ -9,6 +9,7 @@ use std::{ use tracing::{warn, error}; use zerocopy::FromBytes; use std::sync::OnceLock; +use once_cell::sync::Lazy; static EVENT_COUNT: AtomicU64 = AtomicU64::new(0); static EVENTS_PER_SECOND: AtomicU64 = AtomicU64::new(0); @@ -130,7 +131,7 @@ const EVENT_SIZE: usize = size_of::(); static FLOW_BYTES_SENDER: OnceLock> = OnceLock::new(); static FLOW_COMMAND_SENDER: OnceLock> = OnceLock::new(); -static FLOW_BYTES: crossbeam_queue::SegQueue<[u8; EVENT_SIZE]> = crossbeam_queue::SegQueue::new(); +static FLOW_BYTES: Lazy> = Lazy::new(|| crossbeam_queue::ArrayQueue::new(65536*2)); #[derive(Debug)] enum FlowCommands { @@ -260,9 +261,10 @@ pub unsafe extern "C" fn flowbee_handle_events( // Copy the bytes (to free the ringbuffer slot) let data_u8 = data as *const u8; let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); - FLOW_BYTES.push(data_slice.try_into().unwrap()); - if tx.try_send(()).is_err() { - warn!("Could not submit flow event - buffer full"); + if let Ok(_) = FLOW_BYTES.push(data_slice.try_into().unwrap()) { + if tx.try_send(()).is_err() { + warn!("Could not submit flow event - buffer full"); + } } } else { warn!("Flow ringbuffer data arrived before the actor is ready. Dropping it.");