More DownUp typing

This commit is contained in:
Herbert Wolverson 2024-07-02 12:04:28 -05:00
parent e1814cfc08
commit cb7a1fd640
12 changed files with 74 additions and 79 deletions

View File

@ -25,18 +25,18 @@ pub enum BusResponse {
/// Current throughput for the overall system.
CurrentThroughput {
/// In bps
bits_per_second: (u64, u64),
bits_per_second: DownUpOrder<u64>,
/// In pps
packets_per_second: (u64, u64),
packets_per_second: DownUpOrder<u64>,
/// How much of the response has been subject to the shaper?
shaped_bits_per_second: (u64, u64),
shaped_bits_per_second: DownUpOrder<u64>,
},
/// Provides a list of ALL mapped hosts traffic counters,
/// listing the IP Address and upload/download in a tuple.
HostCounters(Vec<(IpAddr, u64, u64)>),
HostCounters(Vec<(IpAddr, DownUpOrder<u64>)>),
/// Provides the Top N downloaders IP stats.
TopDownloaders(Vec<IpStats>),
@ -90,7 +90,7 @@ pub enum BusResponse {
/// Us to poll hosts
time_to_poll_hosts: u64,
/// High traffic watermark
high_watermark: (u64, u64),
high_watermark: DownUpOrder<u64>,
/// Number of flows tracked
tracked_flows: u64,
/// RTT events per second
@ -141,19 +141,19 @@ pub enum BusResponse {
/// Summary of Ether Protocol
EtherProtocols{
/// Number of IPv4 Bytes
v4_bytes: [u64; 2],
v4_bytes: DownUpOrder<u64>,
/// Number of IPv6 Bytes
v6_bytes: [u64; 2],
v6_bytes: DownUpOrder<u64>,
/// Number of IPv4 Packets
v4_packets: [u64; 2],
v4_packets: DownUpOrder<u64>,
/// Number of IPv6 Packets
v6_packets: [u64; 2],
v6_packets: DownUpOrder<u64>,
/// Number of IPv4 Flows
v4_rtt: [u64; 2],
v4_rtt: DownUpOrder<u64>,
/// Number of IPv6 Flows
v6_rtt: [u64; 2],
v6_rtt: DownUpOrder<u64>,
},
/// Summary of IP Protocols
IpProtocols(Vec<(String, (u64, u64))>),
IpProtocols(Vec<(String, DownUpOrder<u64>)>),
}

View File

@ -135,7 +135,7 @@ pub async fn stats() -> NoCache<Json<LqosStats>> {
return NoCache::new(Json(LqosStats {
bus_requests_since_start: bus_requests,
time_to_poll_hosts_us: time_to_poll_hosts,
high_watermark,
high_watermark: (high_watermark.down, high_watermark.up),
tracked_flows,
rtt_events_per_second,
}));

View File

@ -92,6 +92,10 @@ pub async fn flows_ip_protocol() -> NoCache<Json<Vec<(String, (u64, u64))>>> {
BusResponse::IpProtocols(ip_protocols) => ip_protocols.to_owned(),
_ => Vec::new(),
};
let result = result.
into_iter()
.map(|(name, bytes)| (name, (bytes.down, bytes.up)))
.collect();
NoCache::new(Json(result))
}

View File

@ -62,7 +62,7 @@ pub async fn tree_clients(
{
let devices = SHAPED_DEVICES.read().unwrap();
if let BusResponse::HostCounters(hosts) = msg {
for (ip, down, up) in hosts.iter() {
for (ip, bytes) in hosts.iter() {
let lookup = match ip {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => *ip,
@ -72,7 +72,7 @@ pub async fn tree_clients(
result.push(CircuitThroughput {
id: devices.devices[*c.1].circuit_id.clone(),
name: devices.devices[*c.1].circuit_name.clone(),
traffic: (*down, *up),
traffic: (bytes.down, bytes.up),
limit: (
devices.devices[*c.1].download_max_mbps as u64,
devices.devices[*c.1].upload_max_mbps as u64,

View File

@ -71,14 +71,14 @@ pub async fn current_circuit_throughput(
{
if let BusResponse::HostCounters(hosts) = msg {
let devices = SHAPED_DEVICES.read().unwrap();
for (ip, down, up) in hosts.iter() {
for (ip, bytes) in hosts.iter() {
let lookup = match ip {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => *ip,
};
if let Some(c) = devices.trie.longest_match(lookup) {
if devices.devices[*c.1].circuit_id == circuit_id {
result.push((ip.to_string(), *down, *up));
result.push((ip.to_string(), bytes.down, bytes.up));
}
}
}

View File

@ -45,9 +45,9 @@ impl TotalThroughput {
{
let mut lock = self.inner.lock().unwrap();
let head = lock.head;
lock.data[head].bits_per_second = bits_per_second;
lock.data[head].packets_per_second = packets_per_second;
lock.data[head].shaped_bits_per_second = shaped_bits_per_second;
lock.data[head].bits_per_second = (bits_per_second.down, bits_per_second.up);
lock.data[head].packets_per_second = (packets_per_second.down, packets_per_second.up);
lock.data[head].shaped_bits_per_second = (shaped_bits_per_second.down, shaped_bits_per_second.up);
lock.prev_head = lock.head;
lock.head += 1;
lock.head %= 300;

View File

@ -1,5 +1,6 @@
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering::Relaxed;
use crate::units::DownUpOrder;
#[derive(Debug)]
pub struct AtomicDownUp {
@ -47,4 +48,11 @@ impl AtomicDownUp {
pub fn set_up(&self, n: u64) {
self.up.store(n, Relaxed);
}
pub fn as_down_up(&self) -> DownUpOrder<u64> {
DownUpOrder::new(
self.get_down(),
self.get_up()
)
}
}

View File

@ -223,10 +223,7 @@ fn handle_bus_requests(
BusResponse::LqosdStats {
bus_requests: BUS_REQUESTS.load(std::sync::atomic::Ordering::Relaxed),
time_to_poll_hosts: TIME_TO_POLL_HOSTS.load(std::sync::atomic::Ordering::Relaxed),
high_watermark: (
HIGH_WATERMARK.get_down(),
HIGH_WATERMARK.get_up(),
),
high_watermark: HIGH_WATERMARK.as_down_up(),
tracked_flows: FLOWS_TRACKED.load(std::sync::atomic::Ordering::Relaxed),
rtt_events_per_second: get_rtt_events_per_second(),
}

View File

@ -157,10 +157,10 @@ impl TimeBuffer {
pub fn ether_protocol_summary(&self) -> BusResponse {
let buffer = self.buffer.lock().unwrap();
let mut v4_bytes_sent = [0,0];
let mut v4_packets_sent = [0,0];
let mut v6_bytes_sent = [0,0];
let mut v6_packets_sent = [0,0];
let mut v4_bytes_sent = DownUpOrder::zeroed();
let mut v4_packets_sent = DownUpOrder::zeroed();
let mut v6_bytes_sent = DownUpOrder::zeroed();
let mut v6_packets_sent = DownUpOrder::zeroed();
let mut v4_rtt = [Vec::new(), Vec::new()];
let mut v6_rtt = [Vec::new(), Vec::new()];
@ -170,10 +170,8 @@ impl TimeBuffer {
let (key, data, _analysis) = &v.data;
if key.local_ip.is_v4() {
// It's V4
v4_bytes_sent[0] += data.bytes_sent.down;
v4_bytes_sent[1] += data.bytes_sent.up;
v4_packets_sent[0] += data.packets_sent.down;
v4_packets_sent[1] += data.packets_sent.up;
v4_bytes_sent.checked_add(data.bytes_sent);
v4_packets_sent.checked_add(data.packets_sent);
if data.rtt[0].as_nanos() > 0 {
v4_rtt[0].push(data.rtt[0].as_nanos());
}
@ -182,10 +180,8 @@ impl TimeBuffer {
}
} else {
// It's V6
v6_bytes_sent[0] += data.bytes_sent.down;
v6_bytes_sent[1] += data.bytes_sent.up;
v6_packets_sent[0] += data.packets_sent.down;
v6_packets_sent[1] += data.packets_sent.up;
v6_bytes_sent.checked_add(data.bytes_sent);
v6_packets_sent.checked_add(data.packets_sent);
if data.rtt[0].as_nanos() > 0 {
v6_rtt[0].push(data.rtt[0].as_nanos());
}
@ -196,14 +192,14 @@ impl TimeBuffer {
}
});
let v4_rtt = [
let v4_rtt = DownUpOrder::new(
Self::median(&v4_rtt[0]),
Self::median(&v4_rtt[1]),
];
let v6_rtt = [
);
let v6_rtt = DownUpOrder::new(
Self::median(&v6_rtt[0]),
Self::median(&v6_rtt[1]),
];
);
BusResponse::EtherProtocols {
v4_bytes: v4_bytes_sent,
@ -215,7 +211,7 @@ impl TimeBuffer {
}
}
pub fn ip_protocol_summary(&self) -> Vec<(String, (u64, u64))> {
pub fn ip_protocol_summary(&self) -> Vec<(String, DownUpOrder<u64>)> {
let buffer = self.buffer.lock().unwrap();
let mut results = FxHashMap::default();
@ -225,13 +221,12 @@ impl TimeBuffer {
.for_each(|v| {
let (_key, data, analysis) = &v.data;
let proto = analysis.protocol_analysis.to_string();
let entry = results.entry(proto).or_insert((0, 0));
entry.0 += data.bytes_sent.down;
entry.1 += data.bytes_sent.up;
let entry = results.entry(proto).or_insert(DownUpOrder::zeroed());
entry.checked_add(data.bytes_sent);
});
let mut results = results.into_iter().collect::<Vec<(String, (u64, u64))>>();
results.sort_by(|a, b| b.1.1.cmp(&a.1.1));
let mut results = results.into_iter().collect::<Vec<(String, DownUpOrder<u64>)>>();
results.sort_by(|a, b| b.1.up.cmp(&a.1.up));
// Keep only the top 10
results.truncate(10);
results

View File

@ -152,8 +152,8 @@ async fn submit_throughput_stats(long_term_stats_tx: Sender<StatsUpdateMessage>)
let summary = Box::new((
ThroughputSummary {
bits_per_second,
shaped_bits_per_second,
bits_per_second: (bits_per_second.down, bits_per_second.up),
shaped_bits_per_second: (shaped_bits_per_second.down, shaped_bits_per_second.up),
packets_per_second,
hosts,
},
@ -188,7 +188,7 @@ pub fn host_counters() -> BusResponse {
let mut result = Vec::new();
THROUGHPUT_TRACKER.raw_data.iter().for_each(|v| {
let ip = v.key().as_ip();
result.push((ip, v.bytes_per_second.down, v.bytes_per_second.up));
result.push((ip, v.bytes_per_second));
});
BusResponse::HostCounters(result)
}

View File

@ -353,16 +353,16 @@ impl ThroughputTracker {
});
let current = self.bits_per_second();
if current.0 < 100000000000 && current.1 < 100000000000 {
if current.both_less_than(100000000000) {
let prev_max = (
HIGH_WATERMARK.get_down(),
HIGH_WATERMARK.get_up(),
);
if current.0 > prev_max.0 {
HIGH_WATERMARK.set_down(current.0);
if current.down > prev_max.0 {
HIGH_WATERMARK.set_down(current.down);
}
if current.1 > prev_max.1 {
HIGH_WATERMARK.set_up(current.1);
if current.up > prev_max.1 {
HIGH_WATERMARK.set_up(current.up);
}
}
}
@ -371,25 +371,16 @@ impl ThroughputTracker {
self.cycle.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
pub(crate) fn bits_per_second(&self) -> (u64, u64) {
(
self.bytes_per_second.get_down() * 8,
self.bytes_per_second.get_up() * 8,
)
pub(crate) fn bits_per_second(&self) -> DownUpOrder<u64> {
self.bytes_per_second.as_down_up().to_bits_from_bytes()
}
pub(crate) fn shaped_bits_per_second(&self) -> (u64, u64) {
(
self.shaped_bytes_per_second.get_down() * 8,
self.shaped_bytes_per_second.get_up() * 8
)
pub(crate) fn shaped_bits_per_second(&self) -> DownUpOrder<u64> {
self.shaped_bytes_per_second.as_down_up().to_bits_from_bytes()
}
pub(crate) fn packets_per_second(&self) -> (u64, u64) {
(
self.packets_per_second.get_down(),
self.packets_per_second.get_up(),
)
pub(crate) fn packets_per_second(&self) -> DownUpOrder<u64> {
self.packets_per_second.as_down_up()
}
#[allow(dead_code)]

View File

@ -9,6 +9,7 @@ use ratatui::{
widgets::*,
};
use std::sync::mpsc::{Receiver, Sender};
use lqos_utils::units::DownUpOrder;
pub struct NetworkSparkline {
bus_link: tokio::sync::mpsc::Sender<crate::bus::BusMessage>,
@ -67,32 +68,31 @@ impl TopWidget for NetworkSparkline {
let bps_down: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.bits_per_second.1 as f64))
.map(|(i, &val)| (i as f64, val.bits_per_second.up as f64))
.collect();
let bps_up: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.bits_per_second.0 as f64))
.map(|(i, &val)| (i as f64, val.bits_per_second.down as f64))
.collect();
let shaped_down: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.1 as f64))
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.up as f64))
.collect();
let shaped_up: Vec<(f64, f64)> = raw_data
.iter()
.enumerate()
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.0 as f64))
.map(|(i, &val)| (i as f64, val.shaped_bits_per_second.down as f64))
.collect();
let (up, down) = self.current_throughput.bits_per_second;
let title = format!(
" [Throughput (Down: {} Up: {})]",
scale_bits(up),
scale_bits(down)
scale_bits(self.current_throughput.bits_per_second.down),
scale_bits(self.current_throughput.bits_per_second.up)
);
let block = Block::default()
@ -169,7 +169,7 @@ impl NetworkSparkline {
#[derive(Default, Copy, Clone)]
struct CurrentThroughput {
pub bits_per_second: (u64, u64),
pub _packets_per_second: (u64, u64),
pub shaped_bits_per_second: (u64, u64),
pub bits_per_second: DownUpOrder<u64>,
pub _packets_per_second: DownUpOrder<u64>,
pub shaped_bits_per_second: DownUpOrder<u64>,
}