diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index faf62304..07d0776e 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -718,6 +718,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -834,12 +843,11 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ "cfg-if", - "crossbeam-utils", "hashbrown", "lock_api", "once_cell", @@ -1859,6 +1867,7 @@ dependencies = [ "axum-extra", "bincode", "crossbeam-channel", + "crossbeam-queue", "csv", "dashmap", "default-net", diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index abbf62a3..35c19420 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -71,6 +71,8 @@ num-traits = "0.2.19" clap = { version = "4", features = ["derive"] } timerfd = "1.6" crossbeam-channel = { version = "0.5" } +crossbeam-queue = "0.3.11" +arc-swap = "1.7.1" # May have to change this one for ARM? jemallocator = "0.5" diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index 6ff0b7ea..04f4ce2f 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -47,7 +47,8 @@ rand = "0.8.5" mime_guess = "2.0.4" timerfd = { workspace = true } crossbeam-channel = { workspace = true } -arc-swap = "1.7.1" +arc-swap = { workspace = true } +crossbeam-queue = { workspace = true } # Support JemAlloc on supported platforms [target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies] diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs index c1d62a95..4eba944b 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/kernel_ringbuffer.rs @@ -128,8 +128,9 @@ pub struct FlowActor {} const EVENT_SIZE: usize = size_of::(); -static FLOW_BYTES_SENDER: OnceLock>> = OnceLock::new(); +static FLOW_BYTES_SENDER: OnceLock> = OnceLock::new(); static FLOW_COMMAND_SENDER: OnceLock> = OnceLock::new(); +static FLOW_BYTES: crossbeam_queue::SegQueue<[u8; EVENT_SIZE]> = crossbeam_queue::SegQueue::new(); #[derive(Debug)] enum FlowCommands { @@ -139,7 +140,7 @@ enum FlowCommands { impl FlowActor { pub fn start() -> anyhow::Result<()> { - let (tx, rx) = crossbeam_channel::bounded::>(65536); + let (tx, rx) = crossbeam_channel::bounded::<()>(65536); // Placeholder for when you need to read the flow system. let (cmd_tx, cmd_rx) = crossbeam_channel::bounded::(16); @@ -181,8 +182,10 @@ impl FlowActor { } // A flow event arrives recv(rx) -> msg => { - if let Ok(msg) = msg { - FlowActor::receive_flow(&mut flows, msg.as_slice()); + if let Ok(_) = msg { + while let Some(msg) = FLOW_BYTES.pop() { + FlowActor::receive_flow(&mut flows, msg.as_slice()); + } } } } @@ -257,8 +260,8 @@ pub unsafe extern "C" fn flowbee_handle_events( // Copy the bytes (to free the ringbuffer slot) let data_u8 = data as *const u8; let data_slice: &[u8] = slice::from_raw_parts(data_u8, EVENT_SIZE); - let target: Box<[u8; EVENT_SIZE]> = Box::new(data_slice.try_into().unwrap()); - if tx.try_send(target).is_err() { + FLOW_BYTES.push(data_slice.try_into().unwrap()); + if tx.try_send(()).is_err() { warn!("Could not submit flow event - buffer full"); } } else {