Replace the current flow processing setup with a crossbeam-queue SeqQueue setup to better reuse memory and avoid thrashing.

This commit is contained in:
Herbert Wolverson 2024-10-24 11:38:24 -05:00
parent cd58411390
commit 48d65be206
4 changed files with 25 additions and 10 deletions

15
src/rust/Cargo.lock generated
View File

@ -718,6 +718,15 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.20"
@ -834,12 +843,11 @@ dependencies = [
[[package]]
name = "dashmap"
version = "6.1.0"
version = "5.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown",
"lock_api",
"once_cell",
@ -1859,6 +1867,7 @@ dependencies = [
"axum-extra",
"bincode",
"crossbeam-channel",
"crossbeam-queue",
"csv",
"dashmap",
"default-net",

View File

@ -71,6 +71,8 @@ num-traits = "0.2.19"
clap = { version = "4", features = ["derive"] }
timerfd = "1.6"
crossbeam-channel = { version = "0.5" }
crossbeam-queue = "0.3.11"
arc-swap = "1.7.1"
# May have to change this one for ARM?
jemallocator = "0.5"

View File

@ -47,7 +47,8 @@ rand = "0.8.5"
mime_guess = "2.0.4"
timerfd = { workspace = true }
crossbeam-channel = { workspace = true }
arc-swap = "1.7.1"
arc-swap = { workspace = true }
crossbeam-queue = { workspace = true }
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]

View File

@ -128,8 +128,9 @@ pub struct FlowActor {}
const EVENT_SIZE: usize = size_of::<FlowbeeEvent>();
static FLOW_BYTES_SENDER: OnceLock<crossbeam_channel::Sender<Box<[u8; EVENT_SIZE]>>> = OnceLock::new();
static FLOW_BYTES_SENDER: OnceLock<crossbeam_channel::Sender<()>> = OnceLock::new();
static FLOW_COMMAND_SENDER: OnceLock<crossbeam_channel::Sender<FlowCommands>> = OnceLock::new();
static FLOW_BYTES: crossbeam_queue::SegQueue<[u8; EVENT_SIZE]> = crossbeam_queue::SegQueue::new();
#[derive(Debug)]
enum FlowCommands {
@ -139,7 +140,7 @@ enum FlowCommands {
impl FlowActor {
pub fn start() -> anyhow::Result<()> {
let (tx, rx) = crossbeam_channel::bounded::<Box<[u8; EVENT_SIZE]>>(65536);
let (tx, rx) = crossbeam_channel::bounded::<()>(65536);
// Placeholder for when you need to read the flow system.
let (cmd_tx, cmd_rx) = crossbeam_channel::bounded::<FlowCommands>(16);
@ -181,8 +182,10 @@ impl FlowActor {
}
// A flow event arrives
recv(rx) -> msg => {
if let Ok(msg) = msg {
FlowActor::receive_flow(&mut flows, msg.as_slice());
if let Ok(_) = msg {
while let Some(msg) = FLOW_BYTES.pop() {
FlowActor::receive_flow(&mut flows, msg.as_slice());
}
}
}
}
@ -257,8 +260,8 @@ pub unsafe extern "C" fn flowbee_handle_events(
// Copy the bytes (to free the ringbuffer slot)
let data_u8 = data as *const u8;
let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE);
let target: Box<[u8; EVENT_SIZE]> = Box::new(data_slice.try_into().unwrap());
if tx.try_send(target).is_err() {
FLOW_BYTES.push(data_slice.try_into().unwrap());
if tx.try_send(()).is_err() {
warn!("Could not submit flow event - buffer full");
}
} else {