Switch to a lock free method of storing packets.

This commit is contained in:
Herbert Wolverson
2023-03-15 13:46:58 +00:00
parent f6d4542598
commit 70b4230339
2 changed files with 19 additions and 23 deletions

View File

@@ -4,7 +4,7 @@ use zerocopy::FromBytes;
use crate::{flows::record_flow, timeline::store_on_timeline}; use crate::{flows::record_flow, timeline::store_on_timeline};
#[derive(FromBytes, Debug, Clone)] #[derive(FromBytes, Debug, Clone, PartialEq, Eq, Hash)]
#[repr(C)] #[repr(C)]
pub struct HeimdallEvent { pub struct HeimdallEvent {
pub timestamp: u64, pub timestamp: u64,

View File

@@ -1,40 +1,39 @@
use std::{sync::RwLock, time::Duration}; use std::time::Duration;
use dashmap::DashSet;
use lqos_bus::PacketHeader; use lqos_bus::PacketHeader;
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use crate::perf_interface::HeimdallEvent; use crate::perf_interface::HeimdallEvent;
impl From<&HeimdallEvent> for PacketHeader { impl HeimdallEvent {
fn from(value: &HeimdallEvent) -> Self { fn as_header(&self) -> PacketHeader {
Self { PacketHeader {
timestamp: value.timestamp, timestamp: self.timestamp,
src: value.src.as_ip().to_string(), src: self.src.as_ip().to_string(),
dst: value.dst.as_ip().to_string(), dst: self.dst.as_ip().to_string(),
src_port: value.src_port, src_port: self.src_port,
dst_port: value.dst_port, dst_port: self.dst_port,
ip_protocol: value.ip_protocol, ip_protocol: self.ip_protocol,
tos: value.tos, tos: self.tos,
size: value.size, size: self.size,
} }
} }
} }
struct Timeline { struct Timeline {
data: RwLock<Vec<HeimdallEvent>>, data: DashSet<HeimdallEvent>,
} }
impl Timeline { impl Timeline {
fn new() -> Self { fn new() -> Self {
Self { data: RwLock::new(Vec::new()) } Self { data: DashSet::new() }
} }
} }
static TIMELINE: Lazy<Timeline> = Lazy::new(Timeline::new); static TIMELINE: Lazy<Timeline> = Lazy::new(Timeline::new);
pub(crate) fn store_on_timeline(event: HeimdallEvent) { pub(crate) fn store_on_timeline(event: HeimdallEvent) {
let mut lock = TIMELINE.data.write().unwrap(); TIMELINE.data.insert(event); // We're moving here deliberately
lock.push(event); // We're moving here deliberately
} }
pub(crate) fn expire_timeline() { pub(crate) fn expire_timeline() {
@@ -42,18 +41,15 @@ pub(crate) fn expire_timeline() {
let since_boot = Duration::from(now); let since_boot = Duration::from(now);
let ten_secs_ago = since_boot - Duration::from_secs(10); let ten_secs_ago = since_boot - Duration::from_secs(10);
let expire = ten_secs_ago.as_nanos() as u64; let expire = ten_secs_ago.as_nanos() as u64;
let mut lock = TIMELINE.data.write().unwrap(); TIMELINE.data.retain(|v| v.timestamp > expire);
lock.retain(|v| v.timestamp > expire);
} }
} }
pub fn ten_second_packet_dump(ip: XdpIpAddress) -> Vec<PacketHeader> { pub fn ten_second_packet_dump(ip: XdpIpAddress) -> Vec<PacketHeader> {
TIMELINE TIMELINE
.data .data
.read()
.unwrap()
.iter() .iter()
.filter(|e| e.src == ip || e.dst == ip) .filter(|e| e.src == ip || e.dst == ip)
.map(|e| e.into()) .map(|e| e.as_header())
.collect() .collect()
} }