Integrate the flow data into the moving average system for RTTs, giving a better spread of results.

This commit is contained in:
Herbert Wolverson
2024-02-27 14:54:29 -06:00
parent f33d22faa0
commit df2b9dfe32
9 changed files with 37 additions and 938 deletions

View File

@@ -1,6 +1,6 @@
use std::time::Instant; use std::time::Instant;
use lqos_sys::{rtt_for_each, throughput_for_each}; use lqos_sys::{iterate_flows, throughput_for_each};
fn main() { fn main() {
println!("LibreQoS Map Performance Tool"); println!("LibreQoS Map Performance Tool");
@@ -8,7 +8,7 @@ fn main() {
// Test the RTT map // Test the RTT map
let mut rtt_count = 0; let mut rtt_count = 0;
let now = Instant::now(); let now = Instant::now();
rtt_for_each(&mut |_rtt, _tracker| { iterate_flows(&mut |_rtt, _tracker| {
rtt_count += 1; rtt_count += 1;
}); });
let elapsed = now.elapsed(); let elapsed = now.elapsed();

View File

@@ -9,7 +9,7 @@
#define SECOND_IN_NANOS 1000000000 #define SECOND_IN_NANOS 1000000000
#define TIMESTAMP_INTERVAL_NANOS 2000000000 //#define TIMESTAMP_INTERVAL_NANOS 10000000
// Some helpers to make understanding direction easier // Some helpers to make understanding direction easier
// for readability. // for readability.
@@ -59,8 +59,6 @@ struct flow_data_t {
__u32 tsecr[2]; __u32 tsecr[2];
// When did the timestamp change? // When did the timestamp change?
__u64 ts_change_time[2]; __u64 ts_change_time[2];
// When should we calculate RTT (to avoid flooding)
__u64 ts_calc_time[2];
// Most recent RTT // Most recent RTT
__u64 last_rtt[2]; __u64 last_rtt[2];
// Has the connection ended? // Has the connection ended?
@@ -97,7 +95,6 @@ static __always_inline struct flow_data_t new_flow_data(
.tsval = { 0, 0 }, .tsval = { 0, 0 },
.tsecr = { 0, 0 }, .tsecr = { 0, 0 },
.ts_change_time = { 0, 0 }, .ts_change_time = { 0, 0 },
.ts_calc_time = { now, now }, // Get a first number quickly
.last_rtt = { 0, 0 }, .last_rtt = { 0, 0 },
.end_status = 0 .end_status = 0
}; };
@@ -220,7 +217,7 @@ static __always_inline void process_tcp(
u_int64_t now u_int64_t now
) { ) {
if ((BITCHECK(DIS_TCP_SYN) && !BITCHECK(DIS_TCP_ACK) && direction == TO_INTERNET) || if ((BITCHECK(DIS_TCP_SYN) && !BITCHECK(DIS_TCP_ACK) && direction == TO_INTERNET) ||
(BITCHECK(DIS_TCP_SYN) && BITCHECK(DIS_TCP_ACK) && direction == FROM_INTERNET)) { (BITCHECK(DIS_TCP_SYN) && !BITCHECK(DIS_TCP_ACK) && direction == FROM_INTERNET)) {
// A customer is requesting a new TCP connection. That means // A customer is requesting a new TCP connection. That means
// we need to start tracking this flow. // we need to start tracking this flow.
#ifdef VERBOSE #ifdef VERBOSE
@@ -273,11 +270,9 @@ static __always_inline void process_tcp(
if (tsval != data->tsval[0] || tsecr != data->tsecr[0]) { if (tsval != data->tsval[0] || tsecr != data->tsecr[0]) {
if (tsval == data->tsecr[1]) { if (tsval == data->tsecr[1]) {
if (now > data->ts_calc_time[0]) { __u64 elapsed = now - data->ts_change_time[1];
__u64 elapsed = now - data->ts_change_time[1]; data->last_rtt[0] = elapsed;
data->ts_calc_time[0] = now + TIMESTAMP_INTERVAL_NANOS; //bpf_debug("[FLOWS][0] RTT: %llu", elapsed);
data->last_rtt[0] = elapsed;
}
} }
data->ts_change_time[0] = now; data->ts_change_time[0] = now;
@@ -288,11 +283,9 @@ static __always_inline void process_tcp(
if (tsval != data->tsval[1] || tsecr != data->tsecr[1]) { if (tsval != data->tsval[1] || tsecr != data->tsecr[1]) {
if (tsval == data->tsecr[0]) { if (tsval == data->tsecr[0]) {
if (now > data->ts_calc_time[1]) { __u64 elapsed = now - data->ts_change_time[0];
__u64 elapsed = now - data->ts_change_time[0]; data->last_rtt[1] = elapsed;
data->ts_calc_time[1] = now + TIMESTAMP_INTERVAL_NANOS; //bpf_debug("[FLOWS][1] RTT: %llu", elapsed);
data->last_rtt[1] = elapsed;
}
} }
data->ts_change_time[1] = now; data->ts_change_time[1] = now;

View File

@@ -1,797 +0,0 @@
/* SPDX-License-Identifier: GPL-2.0 */
/*
Based on the GPLv2 xdp-pping project
(https://github.com/xdp-project/bpf-examples/tree/master/pping)
xdp_pping is based on the ideas in Dr. Kathleen Nichols' pping
utility: https://github.com/pollere/pping
and the papers around "Listening to Networks":
http://www.pollere.net/Pdfdocs/ListeningGoog.pdf
My modifications are Copyright 2022, Herbert Wolverson
(Bracket Productions)
*/
/* Shared structures between userspace and kernel space
*/
/* Implementation of pping inside the kernel
* classifier
*/
#ifndef __TC_CLASSIFY_KERN_PPING_H
#define __TC_CLASSIFY_KERN_PPING_H
#include <linux/bpf.h>
#include <bpf/bpf_helpers.h>
#include <linux/pkt_cls.h>
#include <linux/in.h>
#include <linux/in6.h>
#include <linux/if_ether.h>
#include <linux/ip.h>
#include <linux/ipv6.h>
#include <linux/tcp.h>
#include <bpf/bpf_endian.h>
#include <stdbool.h>
#include "tc_classify_kern_pping_common.h"
#include "maximums.h"
#include "debug.h"
#include "ip_hash.h"
#include "dissector_tc.h"
#include "tcp_opts.h"
#define MAX_MEMCMP_SIZE 128
struct parsing_context
{
struct tcphdr *tcp;
__u64 now;
struct tc_dissector_t * dissector;
struct in6_addr * active_host;
};
/* Event type recorded for a packet flow */
enum __attribute__((__packed__)) flow_event_type
{
FLOW_EVENT_NONE,
FLOW_EVENT_OPENING,
FLOW_EVENT_CLOSING,
FLOW_EVENT_CLOSING_BOTH
};
enum __attribute__((__packed__)) connection_state
{
CONNECTION_STATE_EMPTY,
CONNECTION_STATE_WAITOPEN,
CONNECTION_STATE_OPEN,
CONNECTION_STATE_CLOSED
};
struct flow_state
{
__u64 last_timestamp;
__u32 last_id;
__u32 outstanding_timestamps;
enum connection_state conn_state;
__u8 reserved[2];
};
/*
* Stores flowstate for both direction (src -> dst and dst -> src) of a flow
*
* Uses two named members instead of array of size 2 to avoid hassels with
* convincing verifier that member access is not out of bounds
*/
struct dual_flow_state
{
struct flow_state dir1;
struct flow_state dir2;
};
/*
* Struct filled in by parse_packet_id.
*
* Note: As long as parse_packet_id is successful, the flow-parts of pid
* and reply_pid should be valid, regardless of value for pid_valid and
* reply_pid valid. The *pid_valid members are there to indicate that the
* identifier part of *pid are valid and can be used for timestamping/lookup.
* The reason for not keeping the flow parts as an entirely separate members
* is to save some performance by avoid doing a copy for lookup/insertion
* in the packet_ts map.
*/
struct packet_info
{
__u64 time; // Arrival time of packet
//__u32 payload; // Size of packet data (excluding headers)
struct packet_id pid; // flow + identifier to timestamp (ex. TSval)
struct packet_id reply_pid; // rev. flow + identifier to match against (ex. TSecr)
//__u32 ingress_ifindex; // Interface packet arrived on (if is_ingress, otherwise not valid)
bool pid_flow_is_dfkey; // Used to determine which member of dualflow state to use for forward direction
bool pid_valid; // identifier can be used to timestamp packet
bool reply_pid_valid; // reply_identifier can be used to match packet
enum flow_event_type event_type; // flow event triggered by packet
};
/*
* Struct filled in by protocol id parsers (ex. parse_tcp_identifier)
*/
struct protocol_info
{
__u32 pid;
__u32 reply_pid;
bool pid_valid;
bool reply_pid_valid;
enum flow_event_type event_type;
};
/* Map Definitions */
struct
{
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, struct packet_id);
__type(value, __u64);
__uint(max_entries, MAX_PACKETS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
// __uint(map_flags, BPF_F_NO_PREALLOC);
} packet_ts SEC(".maps");
struct
{
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, struct network_tuple);
__type(value, struct dual_flow_state);
__uint(max_entries, MAX_FLOWS);
__uint(pinning, LIBBPF_PIN_BY_NAME);
// __uint(map_flags, BPF_F_NO_PREALLOC);
} flow_state SEC(".maps");
struct
{
__uint(type, BPF_MAP_TYPE_LRU_HASH);
__type(key, struct in6_addr); // Keyed to the IP address
__type(value, struct rotating_performance);
__uint(max_entries, IP_HASH_ENTRIES_MAX);
__uint(pinning, LIBBPF_PIN_BY_NAME);
// __uint(map_flags, BPF_F_NO_PREALLOC);
} rtt_tracker SEC(".maps");
// Mask for IPv6 flowlabel + traffic class - used in fib lookup
#define IPV6_FLOWINFO_MASK __cpu_to_be32(0x0FFFFFFF)
#ifndef AF_INET
#define AF_INET 2
#endif
#ifndef AF_INET6
#define AF_INET6 10
#endif
#define MAX_TCP_OPTIONS 10
/* Functions */
/*
* Convenience function for getting the corresponding reverse flow.
* PPing needs to keep track of flow in both directions, and sometimes
* also needs to reverse the flow to report the "correct" (consistent
* with Kathie's PPing) src and dest address.
*/
static __always_inline void reverse_flow(
struct network_tuple *dest,
struct network_tuple *src
) {
dest->ipv = src->ipv;
dest->proto = src->proto;
dest->saddr = src->daddr;
dest->daddr = src->saddr;
dest->reserved = 0;
}
/*
* Can't seem to get __builtin_memcmp to work, so hacking my own
*
* Based on https://githubhot.com/repo/iovisor/bcc/issues/3559,
* __builtin_memcmp should work constant size but I still get the "failed to
* find BTF for extern" error.
*/
static __always_inline int my_memcmp(
const void *s1_,
const void *s2_,
__u32 size
) {
const __u8 *s1 = (const __u8 *)s1_, *s2 = (const __u8 *)s2_;
int i;
for (i = 0; i < MAX_MEMCMP_SIZE && i < size; i++)
{
if (s1[i] != s2[i])
return s1[i] > s2[i] ? 1 : -1;
}
return 0;
}
static __always_inline bool is_dualflow_key(struct network_tuple *flow)
{
return my_memcmp(&flow->saddr, &flow->daddr, sizeof(flow->saddr)) <= 0;
}
static __always_inline struct flow_state *fstate_from_dfkey(
struct dual_flow_state *df_state,
bool is_dfkey
) {
if (!df_state) {
return (struct flow_state *)NULL;
}
return is_dfkey ? &df_state->dir1 : &df_state->dir2;
}
/*
* Attempts to fetch an identifier for TCP packets, based on the TCP timestamp
* option.
*
* Will use the TSval as pid and TSecr as reply_pid, and the TCP source and dest
* as port numbers.
*
* If successful, tcph, sport, dport and proto_info will be set
* appropriately and 0 will be returned.
* On failure -1 will be returned (and arguments will not be set).
*/
static __always_inline int parse_tcp_identifier(
struct parsing_context *context,
__u16 *sport,
__u16 *dport,
struct protocol_info *proto_info
) {
if (parse_tcp_ts(context->tcp, context->dissector->end, &proto_info->pid,
&proto_info->reply_pid) < 0) {
return -1; // Possible TODO, fall back on seq/ack instead
}
// Do not timestamp pure ACKs (no payload)
void *nh_pos = (context->tcp + 1) + (context->tcp->doff << 2);
proto_info->pid_valid = nh_pos - context->dissector->start < context->dissector->ctx->len || context->tcp->syn;
// Do not match on non-ACKs (TSecr not valid)
proto_info->reply_pid_valid = context->tcp->ack;
// Check if connection is opening/closing
if (context->tcp->rst)
{
proto_info->event_type = FLOW_EVENT_CLOSING_BOTH;
}
else if (context->tcp->fin)
{
proto_info->event_type = FLOW_EVENT_CLOSING;
}
else if (context->tcp->syn)
{
proto_info->event_type = FLOW_EVENT_OPENING;
}
else
{
proto_info->event_type = FLOW_EVENT_NONE;
}
*sport = bpf_ntohs(context->tcp->dest);
*dport = bpf_ntohs(context->tcp->source);
return 0;
}
/* This is a bit of a hackjob from the original */
static __always_inline int parse_packet_identifier(
struct parsing_context *context,
struct packet_info *p_info
) {
p_info->time = context->now;
if (context->dissector->eth_type == ETH_P_IP)
{
p_info->pid.flow.ipv = AF_INET;
p_info->pid.flow.saddr.ip = context->dissector->src_ip;
p_info->pid.flow.daddr.ip = context->dissector->dst_ip;
}
else if (context->dissector->eth_type == ETH_P_IPV6)
{
p_info->pid.flow.ipv = AF_INET6;
p_info->pid.flow.saddr.ip = context->dissector->src_ip;
p_info->pid.flow.daddr.ip = context->dissector->dst_ip;
}
else
{
bpf_debug("Unknown protocol");
return -1;
}
//bpf_debug("IPs: %u %u", p_info->pid.flow.saddr.ip.in6_u.u6_addr32[3], p_info->pid.flow.daddr.ip.in6_u.u6_addr32[3]);
struct protocol_info proto_info;
int err = parse_tcp_identifier(context,
&p_info->pid.flow.saddr.port,
&p_info->pid.flow.daddr.port,
&proto_info);
if (err)
return -1;
//bpf_debug("Ports: %u %u", p_info->pid.flow.saddr.port, p_info->pid.flow.daddr.port);
// Sucessfully parsed packet identifier - fill in remaining members and return
p_info->pid.identifier = proto_info.pid;
p_info->pid_valid = proto_info.pid_valid;
p_info->reply_pid.identifier = proto_info.reply_pid;
p_info->reply_pid_valid = proto_info.reply_pid_valid;
p_info->event_type = proto_info.event_type;
if (p_info->pid.flow.ipv == AF_INET && p_info->pid.flow.ipv == AF_INET6) {
bpf_debug("Unknown internal protocol");
return -1;
}
p_info->pid_flow_is_dfkey = is_dualflow_key(&p_info->pid.flow);
reverse_flow(&p_info->reply_pid.flow, &p_info->pid.flow);
return 0;
}
static __always_inline struct network_tuple *
get_dualflow_key_from_packet(struct packet_info *p_info)
{
return p_info->pid_flow_is_dfkey ? &p_info->pid.flow : &p_info->reply_pid.flow;
}
/*
* Initilizes an "empty" flow state based on the forward direction of the
* current packet
*/
static __always_inline void init_flowstate(struct flow_state *f_state,
struct packet_info *p_info)
{
f_state->conn_state = CONNECTION_STATE_WAITOPEN;
f_state->last_timestamp = p_info->time;
}
static __always_inline void init_empty_flowstate(struct flow_state *f_state)
{
f_state->conn_state = CONNECTION_STATE_EMPTY;
}
static __always_inline struct flow_state *
get_flowstate_from_packet(struct dual_flow_state *df_state,
struct packet_info *p_info)
{
return fstate_from_dfkey(df_state, p_info->pid_flow_is_dfkey);
}
static __always_inline struct flow_state *
get_reverse_flowstate_from_packet(struct dual_flow_state *df_state,
struct packet_info *p_info)
{
return fstate_from_dfkey(df_state, !p_info->pid_flow_is_dfkey);
}
/*
* Initilize a new (assumed 0-initlized) dual flow state based on the current
* packet.
*/
static __always_inline void init_dualflow_state(
struct dual_flow_state *df_state,
struct packet_info *p_info
) {
struct flow_state *fw_state =
get_flowstate_from_packet(df_state, p_info);
struct flow_state *rev_state =
get_reverse_flowstate_from_packet(df_state, p_info);
init_flowstate(fw_state, p_info);
init_empty_flowstate(rev_state);
}
static __always_inline struct dual_flow_state *
create_dualflow_state(
struct parsing_context *ctx,
struct packet_info *p_info,
bool *new_flow
) {
struct network_tuple *key = get_dualflow_key_from_packet(p_info);
struct dual_flow_state new_state = {0};
init_dualflow_state(&new_state, p_info);
//new_state.dir1.tc_handle.handle = ctx->tc_handle;
//new_state.dir2.tc_handle.handle = ctx->tc_handle;
if (bpf_map_update_elem(&flow_state, key, &new_state, BPF_NOEXIST) ==
0)
{
if (new_flow)
*new_flow = true;
}
else
{
return (struct dual_flow_state *)NULL;
}
return (struct dual_flow_state *)bpf_map_lookup_elem(&flow_state, key);
}
static __always_inline struct dual_flow_state *
lookup_or_create_dualflow_state(
struct parsing_context *ctx,
struct packet_info *p_info,
bool *new_flow
) {
struct dual_flow_state *df_state;
struct network_tuple *key = get_dualflow_key_from_packet(p_info);
df_state = (struct dual_flow_state *)bpf_map_lookup_elem(&flow_state, key);
if (df_state)
{
return df_state;
}
// Only try to create new state if we have a valid pid
if (!p_info->pid_valid || p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH)
return (struct dual_flow_state *)NULL;
return create_dualflow_state(ctx, p_info, new_flow);
}
static __always_inline bool is_flowstate_active(struct flow_state *f_state)
{
return f_state->conn_state != CONNECTION_STATE_EMPTY &&
f_state->conn_state != CONNECTION_STATE_CLOSED;
}
static __always_inline void update_forward_flowstate(
struct packet_info *p_info,
struct flow_state *f_state,
bool *new_flow
) {
// "Create" flowstate if it's empty
if (f_state->conn_state == CONNECTION_STATE_EMPTY &&
p_info->pid_valid)
{
init_flowstate(f_state, p_info);
if (new_flow)
*new_flow = true;
}
}
static __always_inline void update_reverse_flowstate(
void *ctx,
struct packet_info *p_info,
struct flow_state *f_state
) {
if (!is_flowstate_active(f_state))
return;
// First time we see reply for flow?
if (f_state->conn_state == CONNECTION_STATE_WAITOPEN &&
p_info->event_type != FLOW_EVENT_CLOSING_BOTH)
{
f_state->conn_state = CONNECTION_STATE_OPEN;
}
}
static __always_inline bool is_new_identifier(
struct packet_id *pid,
struct flow_state *f_state
) {
if (pid->flow.proto == IPPROTO_TCP)
/* TCP timestamps should be monotonically non-decreasing
* Check that pid > last_ts (considering wrap around) by
* checking 0 < pid - last_ts < 2^31 as specified by
* RFC7323 Section 5.2*/
return pid->identifier - f_state->last_id > 0 &&
pid->identifier - f_state->last_id < 1UL << 31;
return pid->identifier != f_state->last_id;
}
static __always_inline bool is_rate_limited(__u64 now, __u64 last_ts)
{
if (now < last_ts)
return true;
// Static rate limit
//return now - last_ts < DELAY_BETWEEN_RTT_REPORTS_MS * NS_PER_MS;
return false; // Max firehose drinking speed
}
/*
* Attempt to create a timestamp-entry for packet p_info for flow in f_state
*/
static __always_inline void pping_timestamp_packet(
struct flow_state *f_state,
void *ctx,
struct packet_info *p_info,
bool new_flow
) {
if (!is_flowstate_active(f_state) || !p_info->pid_valid)
return;
// Check if identfier is new
if (!new_flow && !is_new_identifier(&p_info->pid, f_state))
return;
f_state->last_id = p_info->pid.identifier;
// Check rate-limit
if (!new_flow && is_rate_limited(p_info->time, f_state->last_timestamp))
return;
/*
* Updates attempt at creating timestamp, even if creation of timestamp
* fails (due to map being full). This should make the competition for
* the next available map slot somewhat fairer between heavy and sparse
* flows.
*/
f_state->last_timestamp = p_info->time;
if (bpf_map_update_elem(&packet_ts, &p_info->pid, &p_info->time,
BPF_NOEXIST) == 0)
__sync_fetch_and_add(&f_state->outstanding_timestamps, 1);
}
/*
* Attempt to match packet in p_info with a timestamp from flow in f_state
*/
static __always_inline void pping_match_packet(struct flow_state *f_state,
struct packet_info *p_info,
struct in6_addr *active_host)
{
__u64 *p_ts;
if (!is_flowstate_active(f_state) || !p_info->reply_pid_valid)
return;
if (f_state->outstanding_timestamps == 0)
return;
p_ts = (__u64 *)bpf_map_lookup_elem(&packet_ts, &p_info->reply_pid);
if (!p_ts || p_info->time < *p_ts)
return;
__u64 rtt = (p_info->time - *p_ts) / NS_PER_MS_TIMES_100;
bpf_debug("RTT (from TC): %u", p_info->time - *p_ts);
// Delete timestamp entry as soon as RTT is calculated
if (bpf_map_delete_elem(&packet_ts, &p_info->reply_pid) == 0)
{
__sync_fetch_and_add(&f_state->outstanding_timestamps, -1);
}
// Update the most performance map to include this data
struct rotating_performance *perf =
(struct rotating_performance *)bpf_map_lookup_elem(
&rtt_tracker, active_host);
if (perf == NULL) return;
__sync_fetch_and_add(&perf->next_entry, 1);
__u32 next_entry = perf->next_entry;
if (next_entry < MAX_PERF_SECONDS) {
__sync_fetch_and_add(&perf->rtt[next_entry], rtt);
perf->has_fresh_data = 1;
}
}
static __always_inline void close_and_delete_flows(
void *ctx,
struct packet_info *p_info,
struct flow_state *fw_flow,
struct flow_state *rev_flow
) {
// Forward flow closing
if (p_info->event_type == FLOW_EVENT_CLOSING ||
p_info->event_type == FLOW_EVENT_CLOSING_BOTH)
{
fw_flow->conn_state = CONNECTION_STATE_CLOSED;
}
// Reverse flow closing
if (p_info->event_type == FLOW_EVENT_CLOSING_BOTH)
{
rev_flow->conn_state = CONNECTION_STATE_CLOSED;
}
// Delete flowstate entry if neither flow is open anymore
if (!is_flowstate_active(fw_flow) && !is_flowstate_active(rev_flow))
{
bpf_map_delete_elem(&flow_state, get_dualflow_key_from_packet(p_info));
}
}
/*
* Contains the actual pping logic that is applied after a packet has been
* parsed and deemed to contain some valid identifier.
* Looks up and updates flowstate (in both directions), tries to save a
* timestamp of the packet, tries to match packet against previous timestamps,
* calculates RTTs and pushes messages to userspace as appropriate.
*/
static __always_inline void pping_parsed_packet(
struct parsing_context *context,
struct packet_info *p_info
) {
struct dual_flow_state *df_state;
struct flow_state *fw_flow, *rev_flow;
bool new_flow = false;
df_state = lookup_or_create_dualflow_state(context, p_info, &new_flow);
if (!df_state)
{
// bpf_debug("No flow state - stop");
return;
}
fw_flow = get_flowstate_from_packet(df_state, p_info);
update_forward_flowstate(p_info, fw_flow, &new_flow);
pping_timestamp_packet(fw_flow, context, p_info, new_flow);
rev_flow = get_reverse_flowstate_from_packet(df_state, p_info);
update_reverse_flowstate(context, p_info, rev_flow);
pping_match_packet(rev_flow, p_info, context->active_host);
close_and_delete_flows(context, p_info, fw_flow, rev_flow);
}
/* Entry poing for running pping in the tc context */
static __always_inline void tc_pping_start(struct parsing_context *context)
{
// Check to see if we can store perf info. Bail if we've hit the limit.
// Copying occurs because otherwise the validator complains.
struct rotating_performance *perf =
(struct rotating_performance *)bpf_map_lookup_elem(
&rtt_tracker, context->active_host);
if (perf) {
if (perf->next_entry >= MAX_PERF_SECONDS-1) {
//bpf_debug("Flow has max samples. Not sampling further until next reset.");
//for (int i=0; i<MAX_PERF_SECONDS; ++i) {
// bpf_debug("%u", perf->rtt[i]);
//}
if (context->now > perf->recycle_time) {
// If the time-to-live for the sample is exceeded, recycle it to be
// usable again.
//bpf_debug("Recycling flow, %u > %u", context->now, perf->recycle_time);
__builtin_memset(perf->rtt, 0, sizeof(__u32) * MAX_PERF_SECONDS);
perf->recycle_time = context->now + RECYCLE_RTT_INTERVAL;
perf->next_entry = 0;
perf->has_fresh_data = 0;
}
return;
}
}
// Populate the TCP Header
if (context->dissector->eth_type == ETH_P_IP)
{
// If its not TCP, stop
if (context->dissector->ip_header.iph + 1 > context->dissector->end)
return; // Stops the error checking from crashing
if (context->dissector->ip_header.iph->protocol != IPPROTO_TCP)
{
return;
}
context->tcp = (struct tcphdr *)((char *)context->dissector->ip_header.iph + (context->dissector->ip_header.iph->ihl * 4));
}
else if (context->dissector->eth_type == ETH_P_IPV6)
{
// If its not TCP, stop
if (context->dissector->ip_header.ip6h + 1 > context->dissector->end)
return; // Stops the error checking from crashing
if (context->dissector->ip_header.ip6h->nexthdr != IPPROTO_TCP)
{
return;
}
context->tcp = (struct tcphdr *)(context->dissector->ip_header.ip6h + 1);
}
else
{
bpf_debug("UNKNOWN PROTOCOL TYPE");
return;
}
// Bail out if the packet is incomplete
if (context->tcp + 1 > context->dissector->end)
{
return;
}
// If we didn't get a handle, make one
if (perf == NULL)
{
struct rotating_performance new_perf = {0};
new_perf.recycle_time = context->now + RECYCLE_RTT_INTERVAL;
new_perf.has_fresh_data = 0;
if (bpf_map_update_elem(&rtt_tracker, context->active_host, &new_perf, BPF_NOEXIST) != 0) return;
}
// Start the parsing process
struct packet_info p_info = {0};
if (parse_packet_identifier(context, &p_info) < 0)
{
//bpf_debug("Unable to parse packet identifier");
return;
}
pping_parsed_packet(context, &p_info);
}
#endif /* __TC_CLASSIFY_KERN_PPING_H */
/*
Understanding how this works (psuedocode):
1. Parsing context is passed into tc_pping_start
1. We lookup the rotating_performance map for the active host (local side).
1. If it exists, we check to see if we are in "next entry" time window yet.
2. If we are, and the current time exceeds the "recycle time", we reset the
performance map and set the "recycle time" to the current time plus the
recycle interval. We exit the function.
2. We then check to see if the packet is TCP. If it is not, we exit the function.
3. We then check to see if the packet is complete. If it is not, we exit the function.
4. We then parse the packet identifier. If we are unable to parse the packet identifier,
we exit the function. (the `parse_packet_identifier` function).
1. We set the packet time to the current time.
2. We set the flow type to either AF_INET or AF_INET6.
3. We set the source and destination IP addresses.
4. We call `parse_tcp_identifier` to parse the TCP identifier.
1. We use `parse_tcp_ts` to extract the TSval and TSecr from the TCP header.
These are stored in `proto_info.pid` and `proto_info.reply_pid`.
If we fail to parse the TCP identifier, we exit the function.
2. We set "pid_valid" to true if the next header position is less than the end of the packet
or if the packet is a SYN packet. (i.e. ignore packets with no payload).
3. We set "reply_pid_valid" to true if the packet is an ACK packet.
4. RST events are set to "FLOW_EVENT_CLOSING_BOTH", FIN events are set to "FLOW_EVENT_CLOSING",
and SYN events are set to "FLOW_EVENT_OPENING".
5. We set the source and destination ports.
5. If we failed to parse the TCP identifier, we exit the function.
6. We set "pid.identifier" to "proto_info.pid" and "reply_pid.identifier" to "proto_info.reply_pid".
7. We set "pid_valid" to "proto_info.pid_valid" and "reply_pid_valid" to "proto_info.reply_pid_valid".
8. We set "event_type" to "proto_info.event_type".
9. We bail if the protocol is not AF_INET or AF_INET6.
10. We set "pid_flow_is_dfkey" to "is_dualflow_key(&p_info->pid.flow)".
1. Compare the source and destination addresses and return true when it
encounters a packet with the source address less than the destination address.
2. This appears to be a way to sort the flow keys.
11. We call `reverse_flow` with the reply flow and the forward flow.
1.Reverse flow sets the destination to the source.
5. We then call pping_parsed_packet with the parsing context and the packet info.
1. We call `lookup_or_create_dualflow_state` and return it if we found one.
1. We call `get_dualflow_key_from_packet` to get the flow key from the packet.
1.
2. If `pid_valid` is false, or the event type is "FLOW_EVENT_CLOSING" or "FLOW_EVENT_CLOSING_BOTH",
we return NULL.
3. If we still haven't got a flow state, we call `create_dualflow_state` with the parsing context,
the packet info, and a pointer to new_flow.
1. We call `get_dualflow_key_from_packet` to get the flow key from the packet.
1. If "pid_flow_is_dfkey" we return pid.flow, otherwise reply_pid.flow.
2. We call `init_dualflow_state` with the new state and the packet info.
3. We create a new state in the flow state map (or return an existing one).
4. We set `fw_flow` with `get_flowstate_from_packet` and the packet info.
1. This in turns calls `fstate_from_dfkey` with the dual flow state and the packet info.
1. If the packet flow is the dual flow key, we return dir1, otherwise dir2.
5. We call `update_forward_flowstate` with the packet info.
1. If the connection state is empty and the packet identifier is valid, we call `init_flowstate`
with the flow state and the packet info.
1. `init_flowstate` sets the connection state to "WAITOPEN" and the last timestamp to the packet time.
6. We call `pping_timestamp_packet` with the forward flow, the parsing context, the packet info, and new_flow.
1. If the flow state is not active, or the packet identifier is not valid, we return.
2. If the flow state is not new and the identifier is not new, we return.
3. If the flow state is not new and the packet is rate limited, we return.
4. We set the last timestamp to the packet time.
7. We set `rev_flow` with `get_reverse_flowstate_from_packet` and the packet info.
1.
8. We call `update_reverse_flowstate` with the parsing context, the packet info, and the reverse flow.
1.
9. We call `pping_match_packet` with the reverse flow, the packet info, and the active host.
1. If the flow state is not active, or the reply packet identifier is not valid, we return.
2. If the flow state has no outstanding timestamps, we return.
3. We call `bpf_map_lookup_elem` with the packet timestamp map and the reply packet identifier.
1. If the lookup fails, or the packet time is less than the timestamp, we return.
4. We calculate the round trip time.
5. We call `bpf_map_delete_elem` with the packet timestamp map and the reply packet identifier.
1. If the delete is successful, we decrement the outstanding timestamps.
10. We call `close_and_delete_flows` with the parsing context, the packet info, the forward flow, and the reverse flow.
1.
*/

View File

@@ -15,7 +15,6 @@
#include "common/throughput.h" #include "common/throughput.h"
#include "common/lpm.h" #include "common/lpm.h"
#include "common/cpu_map.h" #include "common/cpu_map.h"
#include "common/tcp_rtt.h"
#include "common/bifrost.h" #include "common/bifrost.h"
#include "common/heimdall.h" #include "common/heimdall.h"
#include "common/flows.h" #include "common/flows.h"
@@ -228,14 +227,6 @@ int tc_iphash_to_cpu(struct __sk_buff *skb)
bpf_debug("(TC) effective direction: %d", effective_direction); bpf_debug("(TC) effective direction: %d", effective_direction);
#endif #endif
// Call pping to obtain RTT times
struct parsing_context context = {0};
context.now = bpf_ktime_get_ns();
context.tcp = NULL;
context.dissector = &dissector;
context.active_host = &lookup_key.address;
//tc_pping_start(&context); // Commented out for comparison
if (ip_info && ip_info->tc_handle != 0) { if (ip_info && ip_info->tc_handle != 0) {
// We found a matching mapped TC flow // We found a matching mapped TC flow
#ifdef VERBOSE #ifdef VERBOSE
@@ -375,25 +366,6 @@ int throughput_reader(struct bpf_iter__bpf_map_elem *ctx)
return 0; return 0;
} }
SEC("iter/bpf_map_elem")
int rtt_reader(struct bpf_iter__bpf_map_elem *ctx)
{
// The sequence file
struct seq_file *seq = ctx->meta->seq;
struct rotating_performance *counter = ctx->value;
struct in6_addr *ip = ctx->key;
// Bail on end
if (counter == NULL || ip == NULL) {
return 0;
}
//BPF_SEQ_PRINTF(seq, "%d %d\n", counter->next_entry, counter->rtt[0]);
bpf_seq_write(seq, ip, sizeof(struct in6_addr));
bpf_seq_write(seq, counter, sizeof(struct rotating_performance));
return 0;
}
SEC("iter/bpf_map_elem") SEC("iter/bpf_map_elem")
int heimdall_reader(struct bpf_iter__bpf_map_elem *ctx) { int heimdall_reader(struct bpf_iter__bpf_map_elem *ctx) {
// The sequence file // The sequence file

View File

@@ -1,5 +1,6 @@
use crate::{ use crate::{
flowbee_data::{FlowbeeData, FlowbeeKey}, heimdall_data::{HeimdallData, HeimdallKey}, kernel_wrapper::BPF_SKELETON, lqos_kernel::bpf, HostCounter, RttTrackingEntry flowbee_data::{FlowbeeData, FlowbeeKey}, heimdall_data::{HeimdallData, HeimdallKey},
kernel_wrapper::BPF_SKELETON, lqos_kernel::bpf, HostCounter
}; };
use lqos_utils::XdpIpAddress; use lqos_utils::XdpIpAddress;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@@ -192,10 +193,6 @@ static mut MAP_TRAFFIC: Lazy<
Option<BpfMapIterator<XdpIpAddress, HostCounter>>, Option<BpfMapIterator<XdpIpAddress, HostCounter>>,
> = Lazy::new(|| None); > = Lazy::new(|| None);
static mut RTT_TRACKER: Lazy<
Option<BpfMapIterator<XdpIpAddress, RttTrackingEntry>>,
> = Lazy::new(|| None);
static mut HEIMDALL_TRACKER: Lazy< static mut HEIMDALL_TRACKER: Lazy<
Option<BpfMapIterator<HeimdallKey, HeimdallData>>, Option<BpfMapIterator<HeimdallKey, HeimdallData>>,
> = Lazy::new(|| None); > = Lazy::new(|| None);
@@ -227,35 +224,6 @@ pub unsafe fn iterate_throughput(
} }
} }
pub unsafe fn iterate_rtt(
callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry),
) {
if RTT_TRACKER.is_none() {
let lock = BPF_SKELETON.lock().unwrap();
if let Some(skeleton) = lock.as_ref() {
let skeleton = skeleton.get_ptr();
if let Ok(iter) = unsafe {
BpfMapIterator::new(
(*skeleton).progs.rtt_reader,
(*skeleton).maps.rtt_tracker,
)
} {
*RTT_TRACKER = Some(iter);
}
}
}
if let Some(iter) = RTT_TRACKER.as_mut() {
let _ = iter.for_each(callback);
}
// TEMPORARY
let mut callback = |key: &FlowbeeKey, data: &FlowbeeData| {
println!("{:?} {:?}", key, data);
};
iterate_flows(&mut callback);
}
/// Iterate through the heimdall map and call the callback for each entry. /// Iterate through the heimdall map and call the callback for each entry.
pub fn iterate_heimdall( pub fn iterate_heimdall(
callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]), callback: &mut dyn FnMut(&HeimdallKey, &[HeimdallData]),

View File

@@ -55,8 +55,6 @@ pub struct FlowbeeData {
pub tsecr: [u32; 2], pub tsecr: [u32; 2],
/// When did the timestamp change? /// When did the timestamp change?
pub ts_change_time: [u64; 2], pub ts_change_time: [u64; 2],
/// When should we calculate RTT (to avoid flooding)
pub ts_calc_time: [u64; 2],
/// Most recent RTT /// Most recent RTT
pub last_rtt: [u64; 2], pub last_rtt: [u64; 2],
/// Has the connection ended? /// Has the connection ended?

View File

@@ -15,7 +15,6 @@ mod cpu_map;
mod ip_mapping; mod ip_mapping;
mod kernel_wrapper; mod kernel_wrapper;
mod lqos_kernel; mod lqos_kernel;
mod tcp_rtt;
mod throughput; mod throughput;
mod linux; mod linux;
mod bpf_iterator; mod bpf_iterator;
@@ -30,6 +29,5 @@ pub use ip_mapping::{
pub use kernel_wrapper::LibreQoSKernels; pub use kernel_wrapper::LibreQoSKernels;
pub use linux::num_possible_cpus; pub use linux::num_possible_cpus;
pub use lqos_kernel::max_tracked_ips; pub use lqos_kernel::max_tracked_ips;
pub use tcp_rtt::{rtt_for_each, RttTrackingEntry};
pub use throughput::{throughput_for_each, HostCounter}; pub use throughput::{throughput_for_each, HostCounter};
pub use bpf_iterator::{iterate_heimdall, iterate_flows}; pub use bpf_iterator::{iterate_heimdall, iterate_flows};

View File

@@ -1,38 +0,0 @@
use lqos_utils::XdpIpAddress;
use zerocopy::FromBytes;
use crate::bpf_iterator::iterate_rtt;
/// Entry from the XDP rtt_tracker map.
#[repr(C)]
#[derive(Clone, Copy, Debug, FromBytes)]
pub struct RttTrackingEntry {
/// Array containing TCP round-trip times. Convert to an `f32` and divide by `100.0` for actual numbers.
pub rtt: [u32; 60],
/// Used internally by the XDP program to store the current position in the storage array. Do not modify.
next_entry: u32,
/// Used internally by the XDP program to determine when it is time to recycle and reuse a record. Do not modify.
recycle_time: u64,
/// Flag indicating that an entry has been updated recently (last 30 seconds by default).
pub has_fresh_data: u32,
}
impl Default for RttTrackingEntry {
fn default() -> Self {
Self { rtt: [0; 60], next_entry: 0, recycle_time: 0, has_fresh_data: 0 }
}
}
/// Queries the active XDP/TC programs for TCP round-trip time tracking
/// data (from the `rtt_tracker` pinned eBPF map).
///
/// Only IP addresses facing the ISP Network side are tracked.
///
/// Executes `callback` for each entry.
pub fn rtt_for_each(callback: &mut dyn FnMut(&XdpIpAddress, &RttTrackingEntry)) {
unsafe {
iterate_rtt(callback);
}
}

View File

@@ -1,10 +1,10 @@
use std::sync::atomic::AtomicU64; use std::{sync::atomic::AtomicU64, time::Duration};
use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}}; use crate::{shaped_devices_tracker::{SHAPED_DEVICES, NETWORK_JSON}, stats::{HIGH_WATERMARK_DOWN, HIGH_WATERMARK_UP}};
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS}; use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use dashmap::DashMap; use dashmap::DashMap;
use lqos_bus::TcHandle; use lqos_bus::TcHandle;
use lqos_sys::{iterate_flows, throughput_for_each}; use lqos_sys::{iterate_flows, throughput_for_each};
use lqos_utils::XdpIpAddress; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
pub struct ThroughputTracker { pub struct ThroughputTracker {
pub(crate) cycle: AtomicU64, pub(crate) cycle: AtomicU64,
@@ -185,27 +185,32 @@ impl ThroughputTracker {
} }
});*/ });*/
iterate_flows(&mut |key, data| { if let Ok(now) = time_since_boot() {
// 6 is TCP, not expired let since_boot = Duration::from(now);
if key.ip_protocol == 6 && data.end_status == 0 { let expire = (since_boot - Duration::from_secs(60)).as_nanos() as u64;
if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) { iterate_flows(&mut |key, data| {
let rtt_as_nanos = data.last_rtt[0]; // 6 is TCP, not expired
let data_as_ms_times_10 = rtt_as_nanos / 10000; if key.ip_protocol == 6 && data.last_seen > expire && (data.last_rtt[0] != 0 || data.last_rtt[1] != 0) {
// Shift left if let Some(mut tracker) = self.raw_data.get_mut(&key.local_ip) {
for i in 1..60 { // Shift left
tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1]; for i in 1..60 {
} tracker.recent_rtt_data[i] = tracker.recent_rtt_data[i - 1];
tracker.recent_rtt_data[0] = data_as_ms_times_10 as u32; }
tracker.last_fresh_rtt_data_cycle = self_cycle; tracker.recent_rtt_data[0] = u32::max(
if let Some(parents) = &tracker.network_json_parents { (data.last_rtt[0] / 10000) as u32,
let net_json = NETWORK_JSON.write().unwrap(); (data.last_rtt[1] / 10000) as u32,
if let Some(rtt) = tracker.median_latency() { );
net_json.add_rtt_cycle(parents, rtt); tracker.last_fresh_rtt_data_cycle = self_cycle;
if let Some(parents) = &tracker.network_json_parents {
let net_json = NETWORK_JSON.write().unwrap();
if let Some(rtt) = tracker.median_latency() {
net_json.add_rtt_cycle(parents, rtt);
}
} }
} }
} }
} });
}); }
} }
#[inline(always)] #[inline(always)]