Considerably cleaned up flow tracking system.

This commit is contained in:
Herbert Wolverson
2024-02-26 20:36:37 -06:00
parent 23487f3328
commit 0fd6b29e6c
4 changed files with 395 additions and 41 deletions

View File

@@ -123,7 +123,7 @@ fn main() {
.header(&wrapper_target)
// Tell cargo to invalidate the built crate whenever any of the
// included header files changed.
.parse_callbacks(Box::new(bindgen::CargoCallbacks))
.parse_callbacks(Box::new(bindgen::CargoCallbacks::new()))
// Finish the builder and generate the bindings.
.generate()
// Unwrap the Result and panic on failure.

View File

@@ -6,6 +6,14 @@
#include "debug.h"
#define SECOND_IN_NANOS 1000000000
#define TIMESTAMP_INTERVAL_NANOS 2000000000
// Some helpers to make understanding direction easier
// for readability.
#define TO_INTERNET 2
#define FROM_INTERNET 1
#define TO_LOCAL 1
#define FROM_LOCAL 2
// Defines a TCP connection flow key
struct tcp_flow_key_t {
@@ -17,23 +25,35 @@ struct tcp_flow_key_t {
// TCP connection flow entry
struct tcp_flow_data_t {
// Time (nanos) when the connection was established
__u64 start_time;
__u64 last_seen_a;
__u64 last_seen_b;
__u64 bytes_sent;
__u64 bytes_received;
__u32 time_a;
__u32 time_b;
__u64 last_rtt;
__u64 packets_sent;
__u64 packets_received;
__u64 retries_a;
__u64 retries_b;
__u64 last_count_time;
__u64 next_count_time;
__u64 next_count_bytes;
__u64 rate_estimate;
// Time (nanos) when the connection was last seen
__u64 last_seen;
// Bytes transmitted
__u64 bytes_sent[2];
// Packets transmitted
__u64 packets_sent[2];
// Clock for the next rate estimate
__u64 next_count_time[2];
// Clock for the previous rate estimate
__u64 last_count_time[2];
// Bytes at the next rate estimate
__u64 next_count_bytes[2];
// Rate estimate
__u64 rate_estimate[2];
// Sequence number of the last packet
__u32 last_sequence[2];
// Acknowledgement number of the last packet
__u32 last_ack[2];
// Retry Counters
__u32 retries[2];
// Timestamp values
__u32 tsval[2];
__u32 tsecr[2];
__u64 ts_change_time[2];
__u64 ts_calc_time[2];
// Most recent RTT
__u64 last_rtt[2];
};
// Map for tracking TCP flow progress.
@@ -47,23 +67,24 @@ struct
__uint(pinning, LIBBPF_PIN_BY_NAME);
} flowbee SEC(".maps");
static __always_inline struct tcp_flow_key_t build_flow_key(
static __always_inline struct tcp_flow_key_t build_tcp_flow_key(
struct dissector_t *dissector, // The packet dissector from the previous step
struct tcphdr *tcp, // The TCP header
u_int8_t direction // The direction of the packet (1 = to internet, 2 = to local network)
) {
if (direction == 1) {
if (direction == FROM_INTERNET) {
return (struct tcp_flow_key_t) {
.src = dissector->src_ip,
.dst = dissector->dst_ip,
.src_port = dissector->src_port,
.dst_port = dissector->dst_port
.src_port = tcp->source,
.dst_port = tcp->dest,
};
} else {
return (struct tcp_flow_key_t) {
.src = dissector->dst_ip,
.dst = dissector->src_ip,
.src_port = dissector->dst_port,
.dst_port = dissector->src_port
.src_port = tcp->dest,
.dst_port = tcp->source,
};
}
}
@@ -84,8 +105,8 @@ static __always_inline bool get_timestamps(
u_int8_t *pos = (u_int8_t *)(tcp + 1); // Current pos in TCP options
u_int8_t len;
// This 8 should be 10, but we're running out of space
for (u_int8_t i = 0; i<8; i++) {
// This should be 10, but we're running out of space
for (u_int8_t i = 0; i<6; i++) {
if (pos + 2 > dissector->end) {
return false;
}
@@ -110,9 +131,253 @@ static __always_inline bool get_timestamps(
return false;
}
// Handle Per-Flow ICMP Analysis
static __always_inline void process_icmp(
struct dissector_t *dissector,
u_int8_t direction,
struct icmphdr *icmp
) {
}
// Handle Per-Flow UDP Analysis
static __always_inline void process_udp(
struct dissector_t *dissector,
u_int8_t direction,
struct udphdr *udp
) {
}
// Handle Per-Flow TCP Analysis
static __always_inline void process_tcp(
struct dissector_t *dissector,
u_int8_t direction,
struct tcphdr *tcp,
u_int64_t now
) {
if ((tcp->syn && !tcp->ack && direction == TO_INTERNET) || (tcp->syn && tcp->ack && direction == FROM_INTERNET)) {
// A customer is requesting a new TCP connection. That means
// we need to start tracking this flow.
bpf_debug("[FLOWS] New TCP Connection Detected (%u)", direction);
struct tcp_flow_key_t key = build_tcp_flow_key(dissector, tcp, direction);
struct tcp_flow_data_t data = {
.start_time = now,
.bytes_sent = { 0, 0 },
.packets_sent = { 0, 0 },
.next_count_time = { now + SECOND_IN_NANOS, now + SECOND_IN_NANOS },
.last_count_time = { now, now },
.next_count_bytes = { dissector->skb_len, dissector->skb_len },
.rate_estimate = { 0, 0 },
.last_sequence = { 0, 0 },
.last_ack = { 0, 0 },
.retries = { 0, 0 },
.tsval = { 0, 0 },
.tsecr = { 0, 0 },
.ts_change_time = { 0, 0 },
.ts_calc_time = { now + TIMESTAMP_INTERVAL_NANOS, now + TIMESTAMP_INTERVAL_NANOS },
.last_rtt = { 0, 0 }
};
if (bpf_map_update_elem(&flowbee, &key, &data, BPF_ANY) != 0) {
bpf_debug("[FLOWS] Failed to add new flow to map");
}
return;
}
// Build the flow key
struct tcp_flow_key_t key = build_tcp_flow_key(dissector, tcp, direction);
struct tcp_flow_data_t *data = bpf_map_lookup_elem(&flowbee, &key);
if (data == NULL) {
// If it isn't a flow we're tracking, bail out now
return;
}
// Update last seen to now
data->last_seen = now;
// Update bytes and packets sent
if (direction == TO_INTERNET) {
data->bytes_sent[0] += dissector->skb_len;
data->packets_sent[0]++;
if (now > data->next_count_time[0]) {
// Calculate the rate estimate
__u64 bits = (data->bytes_sent[0] - data->next_count_bytes[0])*8;
__u64 time = (now - data->last_count_time[0]) / 1000000000; // Seconds
data->rate_estimate[0] = bits/time;
//bpf_debug("[FLOWS][%d] Rate Estimate: %u mbits / second", direction, data->rate_estimate[0] / 1000000);
data->next_count_time[0] = now + SECOND_IN_NANOS;
data->next_count_bytes[0] = data->bytes_sent[0];
data->last_count_time[0] = now;
}
} else {
data->bytes_sent[1] += dissector->skb_len;
data->packets_sent[1]++;
if (now > data->next_count_time[1]) {
// Calculate the rate estimate
__u64 bits = (data->bytes_sent[1] - data->next_count_bytes[1])*8;
__u64 time = (now - data->last_count_time[1]) / 1000000000; // Seconds
data->rate_estimate[1] = bits/time;
//bpf_debug("[FLOWS][%d] Rate Estimate: %u mbits / second", direction, data->rate_estimate[1] / 1000000);
data->next_count_time[1] = now + SECOND_IN_NANOS;
data->next_count_bytes[1] = data->bytes_sent[1];
data->last_count_time[1] = now;
}
}
// Sequence and Acknowledgement numbers
__u32 sequence = bpf_ntohl(tcp->seq);
__u32 ack_seq = bpf_ntohl(tcp->ack_seq);
if (direction == TO_INTERNET) {
if (data->last_sequence[0] != 0 && sequence < data->last_sequence[0]) {
// This is a retransmission
//bpf_debug("[FLOWS] Retransmission detected (%u)", direction);
data->retries[0]++;
}
data->last_sequence[0] = sequence;
data->last_ack[0] = ack_seq;
} else {
if (data->last_sequence[1] != 0 && sequence < data->last_sequence[1]) {
// This is a retransmission
//bpf_debug("[FLOWS] Retransmission detected (%u)", direction);
data->retries[1]++;
}
data->last_sequence[1] = sequence;
data->last_ack[1] = ack_seq;
}
//bpf_debug("[FLOWS][%d] Sequence: %u Ack: %u", direction, sequence, ack_seq);
// Timestamps to calculate RTT
u_int32_t tsval = 0;
u_int32_t tsecr = 0;
void *end_opts = (tcp + 1) + (tcp->doff << 2);
if (tcp->ack && get_timestamps(&tsval, &tsecr, tcp, dissector, end_opts)) {
//bpf_debug("[FLOWS][%d] TSVal %u TSecr %u", direction, tsval, tsecr);
if (direction == TO_INTERNET) {
if (tsval != data->tsval[0] || tsecr != data->tsecr[0]) {
if (tsval == data->tsecr[1]) {
//bpf_debug("%d Matched!", direction);
__u64 elapsed = now - data->ts_change_time[1];
data->last_rtt[0] = elapsed;
//bpf_debug("%d TS Change (RTT): %u nanos", direction, elapsed);
// TODO: Do something with the RTT
}
//bpf_debug("%d TSVal Changed", direction);
data->ts_change_time[0] = now;
data->tsval[0] = tsval;
data->tsecr[0] = tsecr;
}
} else {
if (tsval != data->tsval[1] || tsecr != data->tsecr[1]) {
if (tsval == data->tsecr[0]) {
//bpf_debug("%d Matched!", direction);
__u64 elapsed = now - data->ts_change_time[0];
data->last_rtt[1] = elapsed;
//bpf_debug("%d TS Change (RTT): %u nanos", direction, elapsed);
// TODO: Do something with the RTT
}
//bpf_debug("%d TSVal Changed", direction);
data->ts_change_time[1] = now;
data->tsval[1] = tsval;
data->tsecr[1] = tsecr;
}
}
/*else {
if (tsval == data->tsecr[0]) {
//if (tsval == data->tsecr[0] && now > data->ts_calc_time[1]) {
__u64 elapsed = now - data->ts_change_time[0];
bpf_debug("[FLOWS][%d] TS Change (RTT): %u nanos", direction, elapsed);
data->ts_calc_time[1] = now + TIMESTAMP_INTERVAL_NANOS;
}
if (tsval != data->tsval[1]) {
data->ts_change_time[1] = now;
}
data->tsval[1] = tsval;
data->tsecr[1] = tsecr;
}*/
}
// Has the connection ended?
if (tcp->fin || tcp->rst) {
__u64 lifetime = now - data->start_time;
bpf_debug("[FLOWS] TCP Connection Ended [%d / %d]. Lasted %u nanos.", data->bytes_sent[0], data->bytes_sent[1], lifetime);
bpf_debug("[FLOWS] Rate Estimate (Mbps): %u / %u", data->rate_estimate[0] / 1000000, data->rate_estimate[1] / 1000000);
bpf_debug("[FLOWS] Retries: %u / %u", data->retries[0], data->retries[1]);
bpf_debug("[FLOWS] RTT: %u / %u (nanos)", data->last_rtt[0], data->last_rtt[1]);
bpf_map_delete_elem(&flowbee, &key);
}
}
// Note that this duplicates a lot of what we do for "snoop" - we're hoping
// to replace both it and the old RTT system.
static __always_inline void track_flows(
struct dissector_t *dissector, // The packet dissector from the previous step
u_int8_t direction // The direction of the packet (1 = to internet, 2 = to local network)
) {
//bpf_debug("[FLOWS] Packet detected");
__u64 now = bpf_ktime_get_ns();
switch (dissector->ip_protocol)
{
case IPPROTO_TCP: {
struct tcphdr * tcp = get_tcp_header(dissector);
if (tcp == NULL) {
// Bail out if it's not a TCP packet
return;
}
// Bail out if we've exceeded the packet size and there is no payload
// This keeps the safety checker happy and is generally a good idea
if (tcp + 1 >= dissector->end) {
return;
}
//bpf_debug("[FLOWS] TCP packet detected");
process_tcp(dissector, direction, tcp, now);
} break;
case IPPROTO_UDP: {
struct udphdr *udp = get_udp_header(dissector);
if (udp == NULL) {
// Bail out if it's not a UDP packet
return;
}
// Bail out if we've exceeded the packet size and there is no payload
// This keeps the safety checker happy and is generally a good idea
if (udp + 1 >= dissector->end) {
return;
}
bpf_debug("[FLOWS] UDP packet detected");
process_udp(dissector, direction, udp);
} break;
case IPPROTO_ICMP: {
struct icmphdr *icmp = get_icmp_header(dissector);
if (icmp == NULL) {
// Bail out if it's not an ICMP packet
return;
}
// Bail out if we've exceeded the packet size and there is no payload
// This keeps the safety checker happy and is generally a good idea
if (icmp + 1 >= dissector->end) {
return;
}
bpf_debug("[FLOWS] ICMP packet detected");
process_icmp(dissector, direction, icmp);
} break;
default: {
bpf_debug("[FLOWS] Unsupported protocol: %d", dissector->ip_protocol);
}
}
}
/*static __always_inline void track_flows(
struct dissector_t *dissector, // The packet dissector from the previous step
u_int8_t direction // The direction of the packet (1 = to internet, 2 = to local network)
) {
struct tcphdr * tcp = get_tcp_header(dissector);
if (tcp == NULL) {
@@ -175,39 +440,49 @@ static __always_inline void track_flows(
//bpf_debug("Dir: %d, Sent/Received: [%d]/[%d]", direction, data->bytes_sent, data->bytes_received);
// Parse the TCP options
__u32 tsval = 0;
__u32 tsecr = 0;
//__u32 tsval = 0;
//__u32 tsecr = 0;
void *end_opts = (tcp + 1) + (tcp->doff << 2);
bool has_data = end_opts - dissector->start < dissector->skb_len;
get_timestamps(&tsval, &tsecr, tcp, dissector, end_opts);
//if (get_timestamps(&tsval, &tsecr, tcp, dissector, end_opts)) {
//bpf_debug("[%d] => TSVal %u TSecr %u", direction, tsval, tsecr);
//bpf_debug("[%d] => Seq %u AckSeq %u", direction, tcp->seq, tcp->ack_seq);
//}
if ( tcp->ack && has_data) {
//bpf_debug("Direction %d", direction);
//bpf_debug("to 192.168.66.%d => SEQ %d <-> %d", dissector->dst_ip.in6_u.u6_addr8[15], bpf_ntohs(tcp->seq), bpf_ntohs(tcp->ack_seq));
//bpf_debug("Direction %d", direction);
__u32 sequence = bpf_ntohl(tcp->seq);
__u32 ack_seq = bpf_ntohl(tcp->ack_seq);
if (direction == 1) {
// Going TO the Internet. We're acknowledging a packet.
// We don't need to record an RTT measurement and check for issues.
//bpf_debug("%d / %d", data->time_a, data->time_b);
bpf_debug("%u, A: %u / B: %u", sequence, data->time_a, data->time_b);
bpf_debug("%u", ack_seq);
if (now > data->next_count_time) {
// Calculate the rate estimate
__u64 bytes = data->bytes_sent + data->bytes_received - data->next_count_bytes;
__u64 time = now - data->last_count_time;
data->rate_estimate = ((bytes * SECOND_IN_NANOS / time)*8)/1000000;
data->rate_estimate = ((bytes * SECOND_IN_NANOS / time)*8)/1048576;
data->next_count_time = now + SECOND_IN_NANOS;
data->next_count_bytes = data->bytes_sent + data->bytes_received;
data->last_count_time = now;
bpf_debug("Rate estimate: %u mbits/sec", data->rate_estimate);
bpf_debug("[1] Rate estimate: %u mbits/sec", data->rate_estimate);
if (data->rate_estimate > 5) {
if (data->rate_estimate > 5 && tcp->ack_seq >= data->time_a) {
__u64 rtt = now - last_seen;
bpf_debug("RTT: %d nanos", rtt);
bpf_debug("RTT: %d nanos (%u - %u)", rtt, tcp->ack_seq, data->time_a);
data->last_rtt = rtt;
}
}
if (data->rate_estimate > 5 && ack_seq >= data->time_b) {
__u64 rtt = now - last_seen;
bpf_debug("[1] RTT: %d nanos (%u - %u)", rtt, sequence, data->time_b);
data->last_rtt = rtt;
}
if (data->time_a != 0 && sequence < data->time_a) {
// This is a retransmission
//bpf_debug("DIR 1 Retransmission (or out of order) detected");
@@ -229,11 +504,11 @@ static __always_inline void track_flows(
data->next_count_time = now + SECOND_IN_NANOS;
data->next_count_bytes = data->bytes_sent + data->bytes_received;
data->last_count_time = now;
bpf_debug("Rate estimate: %u mbits/sec", data->rate_estimate);
bpf_debug("[2] Rate estimate: %u mbits/sec", data->rate_estimate);
if (data->rate_estimate > 5) {
if (data->rate_estimate > 5 && tcp->ack_seq >= data->time_b) {
__u64 rtt = now - last_seen;
bpf_debug("RTT: %d nanos", rtt);
bpf_debug("[2] RTT: %d nanos", rtt);
data->last_rtt = rtt;
}
}
@@ -276,4 +551,4 @@ static __always_inline void track_flows(
// /TODO
bpf_map_delete_elem(&flowbee, &key);
}
}
}*/

View File

@@ -553,6 +553,7 @@ static __always_inline void pping_match_packet(struct flow_state *f_state,
return;
__u64 rtt = (p_info->time - *p_ts) / NS_PER_MS_TIMES_100;
bpf_debug("RTT (from TC): %u", p_info->time - *p_ts);
// Delete timestamp entry as soon as RTT is calculated
if (bpf_map_delete_elem(&packet_ts, &p_info->reply_pid) == 0)
@@ -715,4 +716,82 @@ static __always_inline void tc_pping_start(struct parsing_context *context)
pping_parsed_packet(context, &p_info);
}
#endif /* __TC_CLASSIFY_KERN_PPING_H */
#endif /* __TC_CLASSIFY_KERN_PPING_H */
/*
Understanding how this works (psuedocode):
1. Parsing context is passed into tc_pping_start
1. We lookup the rotating_performance map for the active host (local side).
1. If it exists, we check to see if we are in "next entry" time window yet.
2. If we are, and the current time exceeds the "recycle time", we reset the
performance map and set the "recycle time" to the current time plus the
recycle interval. We exit the function.
2. We then check to see if the packet is TCP. If it is not, we exit the function.
3. We then check to see if the packet is complete. If it is not, we exit the function.
4. We then parse the packet identifier. If we are unable to parse the packet identifier,
we exit the function. (the `parse_packet_identifier` function).
1. We set the packet time to the current time.
2. We set the flow type to either AF_INET or AF_INET6.
3. We set the source and destination IP addresses.
4. We call `parse_tcp_identifier` to parse the TCP identifier.
1. We use `parse_tcp_ts` to extract the TSval and TSecr from the TCP header.
These are stored in `proto_info.pid` and `proto_info.reply_pid`.
If we fail to parse the TCP identifier, we exit the function.
2. We set "pid_valid" to true if the next header position is less than the end of the packet
or if the packet is a SYN packet. (i.e. ignore packets with no payload).
3. We set "reply_pid_valid" to true if the packet is an ACK packet.
4. RST events are set to "FLOW_EVENT_CLOSING_BOTH", FIN events are set to "FLOW_EVENT_CLOSING",
and SYN events are set to "FLOW_EVENT_OPENING".
5. We set the source and destination ports.
5. If we failed to parse the TCP identifier, we exit the function.
6. We set "pid.identifier" to "proto_info.pid" and "reply_pid.identifier" to "proto_info.reply_pid".
7. We set "pid_valid" to "proto_info.pid_valid" and "reply_pid_valid" to "proto_info.reply_pid_valid".
8. We set "event_type" to "proto_info.event_type".
9. We bail if the protocol is not AF_INET or AF_INET6.
10. We set "pid_flow_is_dfkey" to "is_dualflow_key(&p_info->pid.flow)".
1. Compare the source and destination addresses and return true when it
encounters a packet with the source address less than the destination address.
2. This appears to be a way to sort the flow keys.
11. We call `reverse_flow` with the reply flow and the forward flow.
1.Reverse flow sets the destination to the source.
5. We then call pping_parsed_packet with the parsing context and the packet info.
1. We call `lookup_or_create_dualflow_state` and return it if we found one.
1. We call `get_dualflow_key_from_packet` to get the flow key from the packet.
1.
2. If `pid_valid` is false, or the event type is "FLOW_EVENT_CLOSING" or "FLOW_EVENT_CLOSING_BOTH",
we return NULL.
3. If we still haven't got a flow state, we call `create_dualflow_state` with the parsing context,
the packet info, and a pointer to new_flow.
1. We call `get_dualflow_key_from_packet` to get the flow key from the packet.
1. If "pid_flow_is_dfkey" we return pid.flow, otherwise reply_pid.flow.
2. We call `init_dualflow_state` with the new state and the packet info.
3. We create a new state in the flow state map (or return an existing one).
4. We set `fw_flow` with `get_flowstate_from_packet` and the packet info.
1. This in turns calls `fstate_from_dfkey` with the dual flow state and the packet info.
1. If the packet flow is the dual flow key, we return dir1, otherwise dir2.
5. We call `update_forward_flowstate` with the packet info.
1. If the connection state is empty and the packet identifier is valid, we call `init_flowstate`
with the flow state and the packet info.
1. `init_flowstate` sets the connection state to "WAITOPEN" and the last timestamp to the packet time.
6. We call `pping_timestamp_packet` with the forward flow, the parsing context, the packet info, and new_flow.
1. If the flow state is not active, or the packet identifier is not valid, we return.
2. If the flow state is not new and the identifier is not new, we return.
3. If the flow state is not new and the packet is rate limited, we return.
4. We set the last timestamp to the packet time.
7. We set `rev_flow` with `get_reverse_flowstate_from_packet` and the packet info.
1.
8. We call `update_reverse_flowstate` with the parsing context, the packet info, and the reverse flow.
1.
9. We call `pping_match_packet` with the reverse flow, the packet info, and the active host.
1. If the flow state is not active, or the reply packet identifier is not valid, we return.
2. If the flow state has no outstanding timestamps, we return.
3. We call `bpf_map_lookup_elem` with the packet timestamp map and the reply packet identifier.
1. If the lookup fails, or the packet time is less than the timestamp, we return.
4. We calculate the round trip time.
5. We call `bpf_map_delete_elem` with the packet timestamp map and the reply packet identifier.
1. If the delete is successful, we decrement the outstanding timestamps.
10. We call `close_and_delete_flows` with the parsing context, the packet info, the forward flow, and the reverse flow.
1.
*/

View File

@@ -233,7 +233,7 @@ int tc_iphash_to_cpu(struct __sk_buff *skb)
context.tcp = NULL;
context.dissector = &dissector;
context.active_host = &lookup_key.address;
tc_pping_start(&context);
//tc_pping_start(&context); // Commented out for comparison
if (ip_info && ip_info->tc_handle != 0) {
// We found a matching mapped TC flow