Needs a LOT of cleaning, but marks/drops are being calculated and applied to the tree.

This commit is contained in:
Herbert Wolverson 2024-07-07 21:45:18 -05:00
parent a019890b31
commit ebb95ce82e
8 changed files with 98 additions and 12 deletions

View File

@ -24,6 +24,12 @@ pub struct NetworkJsonNode {
/// Current TCP Retransmits /// Current TCP Retransmits
pub current_tcp_retransmits: AtomicDownUp, // In retries pub current_tcp_retransmits: AtomicDownUp, // In retries
/// Current Cake Marks
pub current_marks: AtomicDownUp,
/// Current Cake Drops
pub current_drops: AtomicDownUp,
/// Approximate RTTs reported for this level of the tree. /// Approximate RTTs reported for this level of the tree.
/// It's never going to be as statistically accurate as the actual /// It's never going to be as statistically accurate as the actual
/// numbers, being based on medians. /// numbers, being based on medians.
@ -55,6 +61,14 @@ impl NetworkJsonNode {
self.current_tcp_retransmits.get_down(), self.current_tcp_retransmits.get_down(),
self.current_tcp_retransmits.get_up(), self.current_tcp_retransmits.get_up(),
), ),
current_marks: (
self.current_marks.get_down(),
self.current_marks.get_up(),
),
current_drops: (
self.current_drops.get_down(),
self.current_drops.get_up(),
),
rtts: self.rtts.iter().map(|n| *n as f32 / 100.0).collect(), rtts: self.rtts.iter().map(|n| *n as f32 / 100.0).collect(),
parents: self.parents.clone(), parents: self.parents.clone(),
immediate_parent: self.immediate_parent, immediate_parent: self.immediate_parent,
@ -76,6 +90,10 @@ pub struct NetworkJsonTransport {
pub current_throughput: (u64, u64), pub current_throughput: (u64, u64),
/// Current count of TCP retransmits /// Current count of TCP retransmits
pub current_retransmits: (u64, u64), pub current_retransmits: (u64, u64),
/// Cake marks
pub current_marks: (u64, u64),
/// Cake drops
pub current_drops: (u64, u64),
/// Set of RTT data /// Set of RTT data
pub rtts: Vec<f32>, pub rtts: Vec<f32>,
/// Node indices of parents /// Node indices of parents
@ -135,6 +153,8 @@ impl NetworkJson {
max_throughput: (0, 0), max_throughput: (0, 0),
current_throughput: AtomicDownUp::zeroed(), current_throughput: AtomicDownUp::zeroed(),
current_tcp_retransmits: AtomicDownUp::zeroed(), current_tcp_retransmits: AtomicDownUp::zeroed(),
current_drops: AtomicDownUp::zeroed(),
current_marks: AtomicDownUp::zeroed(),
parents: Vec::new(), parents: Vec::new(),
immediate_parent: None, immediate_parent: None,
rtts: DashSet::new(), rtts: DashSet::new(),
@ -213,6 +233,8 @@ impl NetworkJson {
n.current_throughput.set_to_zero(); n.current_throughput.set_to_zero();
n.current_tcp_retransmits.set_to_zero(); n.current_tcp_retransmits.set_to_zero();
n.rtts.clear(); n.rtts.clear();
n.current_drops.set_to_zero();
n.current_marks.set_to_zero();
}); });
} }
@ -257,6 +279,18 @@ impl NetworkJson {
} }
} }
} }
pub fn add_queue_cycle(&self, targets: &[usize], marks: &DownUpOrder<u64>, drops: &DownUpOrder<u64>) {
for idx in targets {
// Safety first; use "get" to ensure that the node exists
if let Some(node) = self.nodes.get(*idx) {
node.current_marks.checked_add(*marks);
node.current_drops.checked_add(*drops);
} else {
warn!("No network tree entry for index {idx}");
}
}
}
} }
fn json_to_u32(val: Option<&Value>) -> u32 { fn json_to_u32(val: Option<&Value>) -> u32 {
@ -294,6 +328,8 @@ fn recurse_node(
), ),
current_throughput: AtomicDownUp::zeroed(), current_throughput: AtomicDownUp::zeroed(),
current_tcp_retransmits: AtomicDownUp::zeroed(), current_tcp_retransmits: AtomicDownUp::zeroed(),
current_drops: AtomicDownUp::zeroed(),
current_marks: AtomicDownUp::zeroed(),
name: name.to_string(), name: name.to_string(),
immediate_parent: Some(immediate_parent), immediate_parent: Some(immediate_parent),
rtts: DashSet::new(), rtts: DashSet::new(),

View File

@ -9,10 +9,10 @@ pub static ALL_QUEUE_SUMMARY: Lazy<AllQueueData> = Lazy::new(|| AllQueueData::ne
#[derive(Debug)] #[derive(Debug)]
pub struct QueueData { pub struct QueueData {
pub drops: DownUpOrder<u32>, pub drops: DownUpOrder<u64>,
pub marks: DownUpOrder<u32>, pub marks: DownUpOrder<u64>,
pub prev_drops: DownUpOrder<u32>, pub prev_drops: DownUpOrder<u64>,
pub prev_marks: DownUpOrder<u32>, pub prev_marks: DownUpOrder<u64>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -57,6 +57,7 @@ impl AllQueueData {
}; };
new_record.drops.down = dl.drops; new_record.drops.down = dl.drops;
new_record.marks.down = dl.marks; new_record.marks.down = dl.marks;
println!("Inserting for circuit_id: {}", dl.circuit_id);
lock.insert(dl.circuit_id.clone(), new_record); lock.insert(dl.circuit_id.clone(), new_record);
} }
} }
@ -77,10 +78,24 @@ impl AllQueueData {
}; };
new_record.drops.up = ul.drops; new_record.drops.up = ul.drops;
new_record.marks.up = ul.marks; new_record.marks.up = ul.marks;
println!("Inserting for circuit_id: {}", ul.circuit_id);
lock.insert(ul.circuit_id.clone(), new_record); lock.insert(ul.circuit_id.clone(), new_record);
} }
} }
println!("{:?}", lock); //println!("{:?}", lock);
}
pub fn iterate_queues(&self, f: impl Fn(&str, &DownUpOrder<u64>, &DownUpOrder<u64>)) {
let lock = self.data.lock().unwrap();
for (circuit_id, q) in lock.iter() {
println!("Checking for change in {}", circuit_id);
if q.drops > q.prev_drops || q.marks > q.prev_marks {
println!("Change detected");
let drops = q.drops.checked_sub_or_zero(q.prev_drops);
let marks = q.marks.checked_sub_or_zero(q.prev_marks);
f(circuit_id, &drops, &marks);
}
}
} }
} }

View File

@ -86,8 +86,8 @@ fn track_queues() {
struct TrackedQueue { struct TrackedQueue {
circuit_id: String, circuit_id: String,
drops: u32, drops: u64,
marks: u32, marks: u64,
} }
fn connect_queues_to_circuit(structure: &[QueueNode], queues: &[QueueType]) -> Vec<TrackedQueue> { fn connect_queues_to_circuit(structure: &[QueueNode], queues: &[QueueType]) -> Vec<TrackedQueue> {
@ -102,8 +102,8 @@ fn connect_queues_to_circuit(structure: &[QueueNode], queues: &[QueueType]) -> V
if (cake.drops > 0 || marks > 0) { if (cake.drops > 0 || marks > 0) {
return Some(TrackedQueue { return Some(TrackedQueue {
circuit_id: circuit_id.clone(), circuit_id: circuit_id.clone(),
drops: cake.drops, drops: cake.drops as u64,
marks, marks: marks as u64,
}) })
} }
} }
@ -126,8 +126,8 @@ fn connect_queues_to_circuit_up(structure: &[QueueNode], queues: &[QueueType]) -
if (cake.drops > 0 || marks > 0) { if (cake.drops > 0 || marks > 0) {
return Some(TrackedQueue { return Some(TrackedQueue {
circuit_id: circuit_id.clone(), circuit_id: circuit_id.clone(),
drops: cake.drops, drops: cake.drops as u64,
marks, marks: marks as u64,
}) })
} }
} }

View File

@ -5,7 +5,7 @@ use crate::units::UpDownOrder;
/// Provides strong download/upload separation for /// Provides strong download/upload separation for
/// stored statistics to eliminate confusion. /// stored statistics to eliminate confusion.
#[repr(C)] #[repr(C)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default)] #[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, FromBytes, Default, Ord, PartialOrd)]
pub struct DownUpOrder<T> { pub struct DownUpOrder<T> {
pub down: T, pub down: T,
pub up: T, pub up: T,

View File

@ -38,6 +38,8 @@ export class TopTreeSummary extends BaseDashlet {
th.appendChild(theading("DL ⬇️")); th.appendChild(theading("DL ⬇️"));
th.appendChild(theading("UL ⬆️")); th.appendChild(theading("UL ⬆️"));
th.appendChild(theading("TCP Re-xmit")); th.appendChild(theading("TCP Re-xmit"));
th.appendChild(theading("Cake Marks"));
th.appendChild(theading("Cake Drops"));
t.appendChild(th); t.appendChild(th);
let tbody = document.createElement("tbody"); let tbody = document.createElement("tbody");
@ -47,6 +49,8 @@ export class TopTreeSummary extends BaseDashlet {
row.appendChild(simpleRow(scaleNumber(r[1].current_throughput[0] * 8))); row.appendChild(simpleRow(scaleNumber(r[1].current_throughput[0] * 8)));
row.appendChild(simpleRow(scaleNumber(r[1].current_throughput[1] * 8))); row.appendChild(simpleRow(scaleNumber(r[1].current_throughput[1] * 8)));
row.appendChild(simpleRow(r[1].current_retransmits[0] + " / " + r[1].current_retransmits[1])) row.appendChild(simpleRow(r[1].current_retransmits[0] + " / " + r[1].current_retransmits[1]))
row.appendChild(simpleRow(r[1].current_marks[0] + " / " + r[1].current_marks[1]));
row.appendChild(simpleRow(r[1].current_drops[0] + " / " + r[1].current_drops[1]));
t.appendChild(row); t.appendChild(row);
}); });

View File

@ -84,11 +84,17 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse {
if nodes.len() > n_queues { if nodes.len() > n_queues {
let mut other_bw = (0, 0); let mut other_bw = (0, 0);
let mut other_xmit = (0, 0); let mut other_xmit = (0, 0);
let mut other_marks = (0, 0);
let mut other_drops = (0, 0);
nodes.drain(n_queues..).for_each(|n| { nodes.drain(n_queues..).for_each(|n| {
other_bw.0 += n.1.current_throughput.0; other_bw.0 += n.1.current_throughput.0;
other_bw.1 += n.1.current_throughput.1; other_bw.1 += n.1.current_throughput.1;
other_xmit.0 += n.1.current_retransmits.0; other_xmit.0 += n.1.current_retransmits.0;
other_xmit.1 += n.1.current_retransmits.1; other_xmit.1 += n.1.current_retransmits.1;
other_marks.0 += n.1.current_marks.0;
other_marks.1 += n.1.current_marks.1;
other_drops.0 += n.1.current_drops.0;
other_drops.1 += n.1.current_drops.1;
}); });
nodes.push(( nodes.push((
@ -98,6 +104,8 @@ pub fn get_top_n_root_queues(n_queues: usize) -> BusResponse {
max_throughput: (0, 0), max_throughput: (0, 0),
current_throughput: other_bw, current_throughput: other_bw,
current_retransmits: other_xmit, current_retransmits: other_xmit,
current_marks: other_marks,
current_drops: other_drops,
rtts: Vec::new(), rtts: Vec::new(),
parents: Vec::new(), parents: Vec::new(),
immediate_parent: None, immediate_parent: None,

View File

@ -92,6 +92,7 @@ async fn throughput_task(
netflow_enabled, netflow_enabled,
my_netflow_sender.clone(), my_netflow_sender.clone(),
); );
THROUGHPUT_TRACKER.apply_queue_stats();
THROUGHPUT_TRACKER.update_totals(); THROUGHPUT_TRACKER.update_totals();
THROUGHPUT_TRACKER.next_cycle(); THROUGHPUT_TRACKER.next_cycle();
let duration_ms = start.elapsed().as_micros(); let duration_ms = start.elapsed().as_micros();

View File

@ -4,6 +4,7 @@ use super::{flow_data::{get_flowbee_event_count_and_reset, FlowAnalysis, Flowbee
use dashmap::DashMap; use dashmap::DashMap;
use fxhash::FxHashMap; use fxhash::FxHashMap;
use lqos_bus::TcHandle; use lqos_bus::TcHandle;
use lqos_queue_tracker::ALL_QUEUE_SUMMARY;
use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each}; use lqos_sys::{flowbee_data::FlowbeeKey, iterate_flows, throughput_for_each};
use lqos_utils::{unix_time::time_since_boot, XdpIpAddress}; use lqos_utils::{unix_time::time_since_boot, XdpIpAddress};
use lqos_utils::units::{AtomicDownUp, DownUpOrder}; use lqos_utils::units::{AtomicDownUp, DownUpOrder};
@ -162,6 +163,27 @@ impl ThroughputTracker {
}); });
} }
pub(crate) fn apply_queue_stats(&self) {
// Iterate through the queue data and find the matching circuit_id
ALL_QUEUE_SUMMARY.iterate_queues(|circuit_id, drops, marks| {
println!("Found marks or drops!");
if let Some(entry) = self.raw_data.iter().find(|v| {
match v.circuit_id {
Some(ref id) => id == circuit_id,
None => false,
}
}) {
// Find the net_json parents
println!("Found a matching circuit");
if let Some(parents) = &entry.network_json_parents {
let net_json = NETWORK_JSON.read().unwrap();
// Send it upstream
net_json.add_queue_cycle(parents, drops, marks);
}
}
});
}
pub(crate) fn apply_flow_data( pub(crate) fn apply_flow_data(
&self, &self,
timeout_seconds: u64, timeout_seconds: u64,