Use a static flow arena.

This commit is contained in:
Herbert Wolverson 2024-10-24 11:58:53 -05:00
parent 48d65be206
commit eeea4acf14

View File

@ -9,6 +9,7 @@ use std::{
use tracing::{warn, error};
use zerocopy::FromBytes;
use std::sync::OnceLock;
use once_cell::sync::Lazy;
static EVENT_COUNT: AtomicU64 = AtomicU64::new(0);
static EVENTS_PER_SECOND: AtomicU64 = AtomicU64::new(0);
@ -130,7 +131,7 @@ const EVENT_SIZE: usize = size_of::<FlowbeeEvent>();
static FLOW_BYTES_SENDER: OnceLock<crossbeam_channel::Sender<()>> = OnceLock::new();
static FLOW_COMMAND_SENDER: OnceLock<crossbeam_channel::Sender<FlowCommands>> = OnceLock::new();
static FLOW_BYTES: crossbeam_queue::SegQueue<[u8; EVENT_SIZE]> = crossbeam_queue::SegQueue::new();
static FLOW_BYTES: Lazy<crossbeam_queue::ArrayQueue<[u8; EVENT_SIZE]>> = Lazy::new(|| crossbeam_queue::ArrayQueue::new(65536*2));
#[derive(Debug)]
enum FlowCommands {
@ -260,9 +261,10 @@ pub unsafe extern "C" fn flowbee_handle_events(
// Copy the bytes (to free the ringbuffer slot)
let data_u8 = data as *const u8;
let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
FLOW_BYTES.push(data_slice.try_into().unwrap());
if tx.try_send(()).is_err() {
warn!("Could not submit flow event - buffer full");
if let Ok(_) = FLOW_BYTES.push(data_slice.try_into().unwrap()) {
if tx.try_send(()).is_err() {
warn!("Could not submit flow event - buffer full");
}
}
} else {
warn!("Flow ringbuffer data arrived before the actor is ready. Dropping it.");