Merge pull request #486 from LibreQoE/xdp_hot_cache

Xdp hot cache
This commit is contained in:
Herbert "TheBracket
2024-05-30 10:12:14 -05:00
committed by GitHub
9 changed files with 164 additions and 30 deletions

View File

@@ -1,14 +1,19 @@
use std::sync::Mutex;
use crate::tracker::ThroughputPerSecond;
use lqos_bus::{bus_request, BusRequest, BusResponse};
use once_cell::sync::Lazy;
use rocket::tokio::sync::RwLock;
pub static THROUGHPUT_BUFFER: Lazy<RwLock<TotalThroughput>> =
Lazy::new(|| RwLock::new(TotalThroughput::new()));
pub static THROUGHPUT_BUFFER: Lazy<TotalThroughput> =
Lazy::new(|| TotalThroughput::new());
/// Maintains an in-memory ringbuffer of the last 5 minutes of
/// throughput data.
pub struct TotalThroughput {
inner: Mutex<TotalThroughputInner>
}
struct TotalThroughputInner {
data: Vec<ThroughputPerSecond>,
head: usize,
prev_head: usize,
@@ -18,14 +23,16 @@ impl TotalThroughput {
/// Create a new throughput ringbuffer system
pub fn new() -> Self {
Self {
data: vec![ThroughputPerSecond::default(); 300],
head: 0,
prev_head: 0,
inner: Mutex::new(TotalThroughputInner {
data: vec![ThroughputPerSecond::default(); 300],
head: 0,
prev_head: 0,
}),
}
}
/// Run once per second to update the ringbuffer with current data
pub async fn tick(&mut self) {
pub async fn tick(&self) {
if let Ok(messages) =
bus_request(vec![BusRequest::GetCurrentThroughput]).await
{
@@ -36,12 +43,14 @@ impl TotalThroughput {
shaped_bits_per_second,
} = msg
{
self.data[self.head].bits_per_second = bits_per_second;
self.data[self.head].packets_per_second = packets_per_second;
self.data[self.head].shaped_bits_per_second = shaped_bits_per_second;
self.prev_head = self.head;
self.head += 1;
self.head %= 300;
let mut lock = self.inner.lock().unwrap();
let head = lock.head;
lock.data[head].bits_per_second = bits_per_second;
lock.data[head].packets_per_second = packets_per_second;
lock.data[head].shaped_bits_per_second = shaped_bits_per_second;
lock.prev_head = lock.head;
lock.head += 1;
lock.head %= 300;
}
}
}
@@ -49,12 +58,14 @@ impl TotalThroughput {
/// Retrieve just the current throughput data (1 tick)
pub fn current(&self) -> ThroughputPerSecond {
self.data[self.prev_head]
let lock = self.inner.lock().unwrap();
lock.data[lock.prev_head]
}
/// Retrieve the head (0-299) and the full current throughput
/// buffer. Used to populate the dashboard the first time.
pub fn copy(&self) -> (usize, Vec<ThroughputPerSecond>) {
(self.head, self.data.clone())
let lock = self.inner.lock().unwrap();
(lock.head, lock.data.clone())
}
}

View File

@@ -121,11 +121,10 @@ fn watch_for_shaped_devices_changing() -> Result<()> {
pub async fn update_total_throughput_buffer() {
loop {
let now = Instant::now();
let mut lock = THROUGHPUT_BUFFER.write().await;
lock.tick().await;
let wait_time = Duration::from_secs(1) - now.elapsed();
if wait_time.as_micros() > 0 {
rocket::tokio::time::sleep(Duration::from_secs(1)).await;
THROUGHPUT_BUFFER.tick().await;
let elapsed = now.elapsed();
if elapsed < Duration::from_secs(1) {
rocket::tokio::time::sleep(Duration::from_secs(1) - elapsed).await;
}
}
}

View File

@@ -73,7 +73,7 @@ pub struct ThroughputPerSecond {
pub async fn current_throughput(
_auth: AuthGuard,
) -> NoCache<MsgPack<ThroughputPerSecond>> {
let result = THROUGHPUT_BUFFER.read().await.current();
let result = THROUGHPUT_BUFFER.current();
NoCache::new(MsgPack(result))
}
@@ -81,7 +81,7 @@ pub async fn current_throughput(
pub async fn throughput_ring_buffer(
_auth: AuthGuard,
) -> NoCache<MsgPack<(usize, Vec<ThroughputPerSecond>)>> {
let result = THROUGHPUT_BUFFER.read().await.copy();
let result = THROUGHPUT_BUFFER.copy();
NoCache::new(MsgPack(result))
}

View File

@@ -359,7 +359,14 @@ class MultiRingBuffer {
{x: x, y:this.data['shaped'].sortedY[1], name: 'Shaped Upload', type: 'scatter', fill: 'tozeroy', marker: {color: 'rgb(124,252,0)'}},
];
if (this.plotted == null) {
Plotly.newPlot(graph, graphData, { margin: { l:0,r:0,b:0,t:0,pad:4 }, yaxis: { automargin: true, title: "Traffic (bits)" }, xaxis: {automargin: true, title: "Time since now (seconds)"} }, { responsive: true });
Plotly.newPlot(
graph,
graphData,
{
margin: { l:0,r:0,b:0,t:0,pad:4 },
yaxis: { automargin: true, title: "Traffic (bits)", exponentformat: "SI" },
xaxis: {automargin: true, title: "Time since now (seconds)"}
}, { responsive: true });
this.plotted = true;
} else {
Plotly.redraw(graph, graphData);

View File

@@ -9,7 +9,6 @@
#include <linux/ip.h>
#include <linux/ipv6.h>
#include "maximums.h"
#include "debug.h"
#include "dissector.h"
// Data structure used for map_ip_hash
@@ -24,6 +23,23 @@ struct ip_hash_key {
struct in6_addr address; // An IPv6 address. IPv4 uses the last 32 bits.
};
// Hot cache for recent IP lookups, an attempt
// at a speed improvement predicated on the idea
// that LPM isn't the fastest
// The cache is optional. define USE_HOTCACHE
// to enable it.
#define USE_HOTCACHE 1
#ifdef USE_HOTCACHE
struct {
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__uint(max_entries, HOT_CACHE_SIZE);
__type(key, struct in6_addr);
__type(value, struct ip_hash_info);
__uint(pinning, LIBBPF_PIN_BY_NAME);
} ip_to_cpu_and_tc_hotcache SEC(".maps");
#endif
// Map describing IP to CPU/TC mappings
struct {
__uint(type, BPF_MAP_TYPE_LPM_TRIE);
@@ -71,13 +87,57 @@ static __always_inline struct ip_hash_info * setup_lookup_key_and_tc_cpu(
struct dissector_t * dissector
)
{
lookup_key->prefixlen = 128;
lookup_key->address = (direction == 1) ? dissector->dst_ip :
struct ip_hash_info * ip_info;
lookup_key->address = (direction == 1) ? dissector->dst_ip :
dissector->src_ip;
struct ip_hash_info * ip_info = bpf_map_lookup_elem(
#ifdef USE_HOTCACHE
// Try a hot cache search
ip_info = bpf_map_lookup_elem(
&ip_to_cpu_and_tc_hotcache,
&lookup_key->address
);
if (ip_info) {
// Is it a negative hit?
if (ip_info->cpu == NEGATIVE_HIT) {
return NULL;
}
// We got a cache hit, so return
return ip_info;
}
#endif
lookup_key->prefixlen = 128;
ip_info = bpf_map_lookup_elem(
&map_ip_to_cpu_and_tc,
lookup_key
);
#ifdef USE_HOTCACHE
if (ip_info) {
// We found it, so add it to the cache
bpf_map_update_elem(
&ip_to_cpu_and_tc_hotcache,
&lookup_key->address,
ip_info,
BPF_NOEXIST
);
} else {
// Store a negative result. This is designed to alleviate the pain
// of repeatedly hitting queries for IPs that ARE NOT shaped.
struct ip_hash_info negative_hit = {
.cpu = NEGATIVE_HIT,
.tc_handle = NEGATIVE_HIT
};
bpf_map_update_elem(
&ip_to_cpu_and_tc_hotcache,
&lookup_key->address,
&negative_hit,
BPF_NOEXIST
);
}
#endif
return ip_info;
}
@@ -104,9 +164,10 @@ static __always_inline struct ip_hash_info * tc_setup_lookup_key_and_tc_cpu(
lookup_key->prefixlen = 128;
// Direction is reversed because we are operating on egress
if (direction < 3) {
lookup_key->address = (direction == 1) ? dissector->src_ip :
lookup_key->address = (direction == 1) ? dissector->src_ip :
dissector->dst_ip;
*out_effective_direction = direction;
struct ip_hash_info * ip_info = bpf_map_lookup_elem(
&map_ip_to_cpu_and_tc,
lookup_key

View File

@@ -14,3 +14,10 @@
// Maximum number of packet pairs to track per flow.
#define MAX_PACKETS MAX_FLOWS
// Hot Cache Size
#define HOT_CACHE_SIZE 32768
// Hot Cache Negative Hit Flag
// If you have 4294967294 CPUs, I love you.
#define NEGATIVE_HIT 4294967294

View File

@@ -21,6 +21,7 @@
#include "common/flows.h"
//#define VERBOSE 1
//#define TRACING 1
/* Theory of operation:
1. (Packet arrives at interface)
@@ -60,7 +61,6 @@ __be16 isp_vlan = 0;
#define round_up(x, y) ((((x) - 1) | __round_mask(x, y)) + 1)
#define ctx_ptr(ctx, mem) (void *)(unsigned long)ctx->mem
// Structure for passing metadata from XDP to TC
struct metadata_pass_t {
__u32 tc_handle; // The encoded TC handle
@@ -70,6 +70,9 @@ struct metadata_pass_t {
SEC("xdp")
int xdp_prog(struct xdp_md *ctx)
{
#ifdef TRACING
__u64 started = bpf_ktime_get_ns();
#endif
#ifdef VERBOSE
bpf_debug("XDP-RDR");
#endif
@@ -106,6 +109,7 @@ int xdp_prog(struct xdp_md *ctx)
// If the dissector is unable to figure out what's going on, bail
// out.
if (!dissector_new(ctx, &dissector)) return XDP_PASS;
// Note that this step rewrites the VLAN tag if redirection
// is requested.
if (!dissector_find_l3_offset(&dissector, vlan_redirect)) return XDP_PASS;
@@ -115,6 +119,7 @@ int xdp_prog(struct xdp_md *ctx)
internet_vlan,
&dissector
);
#ifdef VERBOSE
bpf_debug("(XDP) Effective direction: %d", effective_direction);
#endif
@@ -208,6 +213,14 @@ int xdp_prog(struct xdp_md *ctx)
#ifdef VERBOSE
bpf_debug("(XDP) Redirect result: %u", redirect_result);
#endif
#ifdef TRACING
{
__u64 now = bpf_ktime_get_ns();
bpf_debug("(XDP) Exit time: %u", now - started);
}
#endif
return redirect_result;
}
return XDP_PASS;
@@ -217,6 +230,9 @@ int xdp_prog(struct xdp_md *ctx)
SEC("tc")
int tc_iphash_to_cpu(struct __sk_buff *skb)
{
#ifdef TRACING
__u64 started = bpf_ktime_get_ns();
#endif
#ifdef VERBOSE
bpf_debug("TC-MAP");
#endif
@@ -271,6 +287,12 @@ int tc_iphash_to_cpu(struct __sk_buff *skb)
// We can short-circuit the redirect and bypass the second
// LPM lookup! Yay!
skb->priority = meta->tc_handle;
#ifdef TRACING
{
__u64 now = bpf_ktime_get_ns();
bpf_debug("(TC) Exit time: %u", now - started);
}
#endif
return TC_ACT_OK;
}
}
@@ -310,12 +332,24 @@ int tc_iphash_to_cpu(struct __sk_buff *skb)
bpf_debug("(TC) Mapped to TC handle %x", ip_info->tc_handle);
#endif
skb->priority = ip_info->tc_handle;
#ifdef TRACING
{
__u64 now = bpf_ktime_get_ns();
bpf_debug("(TC) Exit time: %u", now - started);
}
#endif
return TC_ACT_OK;
} else {
// We didn't find anything
#ifdef VERBOSE
bpf_debug("(TC) didn't map anything");
#endif
#ifdef TRACING
{
__u64 now = bpf_ktime_get_ns();
bpf_debug("(TC) Exit time: %u", now - started);
}
#endif
return TC_ACT_OK;
}

View File

@@ -36,6 +36,7 @@ pub fn add_ip_to_tc(
let mut value =
IpHashData { cpu: ip_to_add.cpu, tc_handle: ip_to_add.handle() };
bpf_map.insert_or_update(&mut key, &mut value)?;
clear_hot_cache()?;
Ok(())
}
@@ -56,6 +57,7 @@ pub fn del_ip_from_tc(address: &str, upload: bool) -> Result<()> {
let ip = XdpIpAddress::from_ip(ip);
let mut key = IpHashKey { prefixlen: ip_to_add.prefix, address: ip.0 };
bpf_map.delete(&mut key)?;
clear_hot_cache()?;
Ok(())
}
@@ -71,6 +73,8 @@ pub fn clear_ips_from_tc() -> Result<()> {
)?;
bpf_map.clear()?;
clear_hot_cache()?;
Ok(())
}
@@ -89,3 +93,12 @@ pub fn list_mapped_ips() -> Result<Vec<(IpHashKey, IpHashData)>> {
Ok(raw)
}
/// Clears the "hot cache", which should be done whenever you change the IP
/// mappings - because otherwise cached data will keep going to the previous
/// destinations.
fn clear_hot_cache() -> Result<()> {
let mut bpf_map = BpfMap::<XdpIpAddress, IpHashData>::from_path("/sys/fs/bpf/ip_to_cpu_and_tc_hotcache")?;
bpf_map.clear()?;
Ok(())
}

View File

@@ -13,4 +13,6 @@ rm -v /sys/fs/bpf/bifrost_vlan_map
rm -v /sys/fs/bpf/heimdall
rm -v /sys/fs/bpf/heimdall_config
rm -v /sys/fs/bpf/heimdall_watching
rm -v /sys/fs/bpf/flowbee
rm -v /sys/fs/bpf/flowbee
rm -v /sys/fs/bpf/ip_to_cpu_and_tc_hotcache