Network funnel - work in progress

Step 1 of the network funnel

* network.json reader now tags throughput entries with their tree
  location and parents to whom data should be applied.
* Data flows "up the tree", giving totals all the way up.
* Simple network map page for displaying the data while it's worked
  on.
This commit is contained in:
Herbert Wolverson 2023-03-04 16:58:17 +00:00
parent 7e5b432253
commit f64862a8ff
16 changed files with 555 additions and 31 deletions

1
src/rust/Cargo.lock generated
View File

@ -1334,6 +1334,7 @@ dependencies = [
"ip_network_table",
"log",
"serde",
"serde_json",
"sha2",
"thiserror",
"toml 0.7.2",

View File

@ -111,6 +111,12 @@ pub enum BusRequest {
/// Request that the Rust side of things validate the CSV
ValidateShapedDevicesCsv,
/// Request details of part of the network tree
GetNetworkMap{
/// The parent of the map to retrieve
parent: usize
},
/// If running on Equinix (the `equinix_test` feature is enabled),
/// display a "run bandwidht test" link.
#[cfg(feature = "equinix_tests")]

View File

@ -68,4 +68,7 @@ pub enum BusResponse {
/// A string containing a JSON dump of a queue stats. Analagos to
/// the response from `tc show qdisc`.
RawQueueData(String),
/// Results from network map queries
NetworkMap(Vec<(usize, lqos_config::NetworkJsonNode)>),
}

View File

@ -7,6 +7,7 @@ edition = "2021"
thiserror = "1"
toml = "0"
serde = { version = "1.0", features = [ "derive" ] }
serde_json = "1"
csv = "1"
ip_network_table = "0"
ip_network = "0"

View File

@ -11,12 +11,14 @@ mod etc;
mod libre_qos_config;
mod program_control;
mod shaped_devices;
mod network_json;
pub use authentication::{UserRole, WebUsers};
pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables};
pub use libre_qos_config::LibreQoSConfig;
pub use program_control::load_libreqos;
pub use shaped_devices::{ConfigShapedDevices, ShapedDevice};
pub use network_json::{NetworkJson, NetworkJsonNode};
/// Used as a constant in determining buffer preallocation
pub const SUPPORTED_CUSTOMERS: usize = 16_000_000;

View File

@ -0,0 +1,230 @@
use crate::etc;
use log::{error, info, warn};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::{
fs,
path::{Path, PathBuf},
};
use thiserror::Error;
/// Describes a node in the network map tree.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct NetworkJsonNode {
/// The node name, as it appears in `network.json`
pub name: String,
/// The maximum throughput allowed per `network.json` for this node
pub max_throughput: (u32, u32), // In mbps
/// Current throughput (in bytes/second) at this node
pub current_throughput: (u64, u64), // In bytes
/// Approximate RTTs reported for this level of the tree.
/// It's never going to be as statistically accurate as the actual
/// numbers, being based on medians.
pub rtts: Vec<f32>,
/// A list of indices in the `NetworkJson` vector of nodes
/// linking to parent nodes
pub parents: Vec<usize>,
/// The immediate parent node
pub immediate_parent: Option<usize>,
}
/// Holder for the network.json representation.
/// This is condensed into a single level vector with index-based referencing
/// for easy use in funnel calculations.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct NetworkJson {
nodes: Vec<NetworkJsonNode>,
}
impl Default for NetworkJson {
fn default() -> Self {
Self::new()
}
}
impl NetworkJson {
/// Generates an empty network.json
pub fn new() -> Self {
Self { nodes: Vec::new() }
}
/// The path to the current `network.json` file, determined
/// by acquiring the prefix from the `/etc/lqos.conf` configuration
/// file.
pub fn path() -> Result<PathBuf, NetworkJsonError> {
let cfg =
etc::EtcLqos::load().map_err(|_| NetworkJsonError::ConfigLoadError)?;
let base_path = Path::new(&cfg.lqos_directory);
Ok(base_path.join("network.json"))
}
/// Does network.json exist?
pub fn exists() -> bool {
if let Ok(path) = Self::path() {
path.exists()
} else {
false
}
}
/// Attempt to load network.json from disk
pub fn load() -> Result<Self, NetworkJsonError> {
let mut nodes = vec![NetworkJsonNode {
name: "Root".to_string(),
max_throughput: (0, 0),
current_throughput: (0, 0),
parents: Vec::new(),
immediate_parent: None,
rtts: Vec::new(),
}];
if !Self::exists() {
return Err(NetworkJsonError::FileNotFound);
}
let path = Self::path()?;
let raw = fs::read_to_string(path)
.map_err(|_| NetworkJsonError::ConfigLoadError)?;
let json: Value = serde_json::from_str(&raw)
.map_err(|_| NetworkJsonError::ConfigLoadError)?;
// Start reading from the top. We are at the root node.
let parents = vec![0];
if let Value::Object(map) = &json {
for (key, value) in map.iter() {
if let Value::Object(inner_map) = value {
recurse_node(&mut nodes, key, inner_map, &parents, 0);
}
}
}
Ok(Self { nodes })
}
/// Find the index of a circuit_id
pub fn get_index_for_name(&self, name: &str) -> Option<usize> {
self.nodes.iter().position(|n| n.name == name)
}
/// Retrieve a cloned copy of a NetworkJsonNode entry, or None if there isn't
/// an entry at that index.
pub fn get_cloned_entry_by_index(
&self,
index: usize,
) -> Option<NetworkJsonNode> {
self.nodes.get(index).cloned()
}
/// Retrieve a cloned copy of all children with a parent containing a specific
/// node index.
pub fn get_cloned_children(&self, index: usize) -> Vec<(usize, NetworkJsonNode)> {
self
.nodes
.iter()
.enumerate()
.filter(|(_i,n)| n.immediate_parent == Some(index))
.map(|(i, n)| (i, n.clone()))
.collect()
}
/// Find a circuit_id, and if it exists return its list of parent nodes
/// as indices within the network_json layout.
pub fn get_parents_for_circuit_id(
&self,
circuit_id: &str,
) -> Option<Vec<usize>> {
self
.nodes
.iter()
.find(|n| n.name == circuit_id)
.map(|node| node.parents.clone())
}
/// Sets all current throughput values to zero
pub fn zero_throughput_and_rtt(&mut self) {
self.nodes.iter_mut().for_each(|n| {
n.current_throughput = (0, 0);
n.rtts.clear();
});
}
/// Add throughput numbers to node entries
pub fn add_throughput_cycle(
&mut self,
targets: &[usize],
bytes: (u64, u64),
median_rtt: f32,
) {
for idx in targets {
// Safety first: use "get" to ensure that the node exists
if let Some(node) = self.nodes.get_mut(*idx) {
node.current_throughput.0 += bytes.0;
node.current_throughput.1 += bytes.1;
if median_rtt > 0.0 {
node.rtts.push(median_rtt);
}
} else {
warn!("No network tree entry for index {idx}");
}
}
}
}
fn json_to_u32(val: Option<&Value>) -> u32 {
if let Some(val) = val {
if let Some(n) = val.as_u64() {
n as u32
} else {
0
}
} else {
0
}
}
fn recurse_node(
nodes: &mut Vec<NetworkJsonNode>,
name: &str,
json: &Map<String, Value>,
parents: &[usize],
immediate_parent: usize,
) {
info!("Mapping {name} from network.json");
let node = NetworkJsonNode {
parents: parents.to_vec(),
max_throughput: (
json_to_u32(json.get("downloadBandwidthMbps")),
json_to_u32(json.get("uploadBandwidthMbps")),
),
current_throughput: (0, 0),
name: name.to_string(),
immediate_parent: Some(immediate_parent),
rtts: Vec::new(),
};
let my_id = nodes.len();
nodes.push(node);
let mut parents = parents.to_vec();
parents.push(my_id);
// Recurse children
for (key, value) in json.iter() {
let key_str = key.as_str();
if key_str != "uploadBandwidthMbps" && key_str != "downloadBandwidthMbps" {
if let Value::Object(value) = value {
recurse_node(nodes, key, value, &parents, my_id);
}
}
}
}
#[derive(Error, Debug)]
pub enum NetworkJsonError {
#[error("Unable to find or load network.json")]
ConfigLoadError,
#[error("network.json not found or does not exist")]
FileNotFound,
}

View File

@ -10,6 +10,7 @@ use rocket_async_compression::Compression;
mod auth_guard;
mod config_control;
mod queue_info;
mod network_tree;
// Use JemAllocator only on supported platforms
#[cfg(any(target_arch = "x86", target_arch = "x86_64"))]
@ -38,6 +39,7 @@ fn rocket() -> _ {
static_pages::unknown_devices_page,
static_pages::circuit_queue,
config_control::config_page,
network_tree::tree_page,
// Our JS library
static_pages::lqos_js,
static_pages::lqos_css,
@ -77,6 +79,7 @@ fn rocket() -> _ {
auth_guard::admin_check,
static_pages::login_page,
auth_guard::username,
network_tree::tree_entry,
// Supporting files
static_pages::bootsrap_css,
static_pages::plotly_js,

View File

@ -0,0 +1,26 @@
use lqos_bus::{bus_request, BusRequest, BusResponse};
use lqos_config::NetworkJsonNode;
use rocket::{fs::NamedFile, serde::json::Json};
use crate::cache_control::NoCache;
// Note that NoCache can be replaced with a cache option
// once the design work is complete.
#[get("/tree")]
pub async fn tree_page<'a>() -> NoCache<Option<NamedFile>> {
NoCache::new(NamedFile::open("static/tree.html").await.ok())
}
#[get("/api/network_tree/<parent>")]
pub async fn tree_entry(
parent: usize,
) -> NoCache<Json<Vec<(usize, NetworkJsonNode)>>> {
let responses =
bus_request(vec![BusRequest::GetNetworkMap { parent }]).await.unwrap();
let result = match &responses[0] {
BusResponse::NetworkMap(nodes) => nodes.to_owned(),
_ => Vec::new(),
};
NoCache::new(Json(result))
}

View File

@ -26,9 +26,9 @@
<a class="nav-link active" aria-current="page" href="/"><i class="fa fa-home"></i> Dashboard</a>
</li>
<li class="nav-item" id="currentLogin"></li>
<!--<li class="nav-item">
<a class="nav-link" href="#"><i class="fa fa-globe"></i> Network Layout</a>
</li>-->
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-globe"></i> Network Layout</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span id="shapedCount" class="badge badge-pill badge-success green-badge">?</span></a>
</li>

View File

@ -0,0 +1,129 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link href="/vendor/bootstrap.min.css" rel="stylesheet">
<link rel="stylesheet" href="/vendor/solid.min.css">
<link rel="stylesheet" href="/lqos.css">
<title>LibreQoS - Local Node Manager</title>
<script src="/lqos.js"></script>
<script src="/vendor/plotly-2.16.1.min.js"></script>
<script src="/vendor/jquery.min.js"></script>
<script defer src="/vendor/bootstrap.bundle.min.js"></script>
</head>
<body class="bg-secondary">
<!-- Navigation -->
<nav class="navbar navbar-expand-lg navbar-dark bg-dark">
<div class="container-fluid">
<a class="navbar-brand" href="/"><img src="/vendor/tinylogo.svg" alt="LibreQoS SVG Logo" width="25" height="25" />&nbsp;LibreQoS</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav me-auto mb-2 mb-lg-0">
<li class="nav-item">
<a class="nav-link active" aria-current="page" href="/"><i class="fa fa-home"></i> Dashboard</a>
</li>
<li class="nav-item" id="currentLogin"></li>
<li class="nav-item">
<a class="nav-link" href="/tree?parent=0"><i class="fa fa-globe"></i> Network Layout</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/shaped"><i class="fa fa-users"></i> Shaped Devices <span id="shapedCount" class="badge badge-pill badge-success green-badge">?</span></a>
</li>
<li class="nav-item">
<a class="nav-link" href="/unknown"><i class="fa fa-address-card"></i> Unknown IPs <span id="unshapedCount" class="badge badge-warning orange-badge">?</span></a>
</li>
</ul>
</div>
<ul class="navbar-nav ms-auto">
<li class="nav-item">
<a class="nav-link" href="#" id="startTest"><i class="fa fa-flag-checkered"></i> Run Bandwidth Test</a>
</li>
<li class="nav-item ms-auto">
<a class="nav-link" href="/config"><i class="fa fa-gear"></i> Configuration</a>
</li>
<li>
<a class="nav-link btn btn-small" href="#" id="btnReload"><i class="fa fa-refresh"></i> Reload LibreQoS</a>
</li>
</ul>
</div>
</nav>
<div id="container" class="pad4">
<div class="row top-shunt">
<div class="col-sm-12 bg-light center-txt">
THIS NODE
</div>
</div>
<div class="row">
<div class="col-sm-12 bg-light center-txt">
<div id="treeList"></div>
</div>
</div>
</div>
<footer>&copy; 2022-2023, LibreQoE LLC</footer>
<script>
let node = 0;
function getTree() {
$.get("/api/network_tree/" + node, (data) => {
//console.log(data);
let tbl = "<table class='table'>";
tbl += "<thead><th>Circuit</th><th>Limit</th><th>Download</th><th>Upload</th><th>RTT Latency</th></thead>";
for (let i=1; i<data.length; ++i) {
tbl += "<tr>";
tbl += "<td style='width: 20%'><a href='/tree?parent=" + encodeURI(data[i][0]) + "'>" + data[i][1].name + "</a></td>";
if (data[i][1].max_throughput[0]==0 && data[i][1].max_throughput[1] == 0) {
tbl += "<td>No Limit</td>";
} else {
let down = scaleNumber(data[i][1].max_throughput[0] * 1000000);
let up = scaleNumber(data[i][1].max_throughput[1] * 1000000);
tbl += "<td>" + down + " / " + up + "</td>";
}
let down = scaleNumber(data[i][1].current_throughput[0] * 8);
let up = scaleNumber(data[i][1].current_throughput[1] * 8);
tbl += "<td>" + down + "</td>";
tbl += "<td>" + up + "</td>";
let rtt = "-";
if (data[i][1].rtts.length > 0) {
let sum = 0;
for (let j=0; j<data[i][1].rtts.length; ++j) {
sum += data[i][1].rtts[j];
}
sum /= data[i][1].rtts.length;
rtt = sum.toFixed(2) + " ms";
}
tbl += "<td>" + rtt + "</td>";
tbl += "</tr>";
}
tbl += "</table>";
$("#treeList").html(tbl);
});
setTimeout(getTree, 1000);
}
function start() {
colorReloadButton();
updateHostCounts();
getTree();
}
const params = new Proxy(new URLSearchParams(window.location.search), {
get: (searchParams, prop) => searchParams.get(prop),
});
node = params.parent;
$(document).ready(start);
</script>
</body>
</html>

View File

@ -3,8 +3,8 @@ mod ip_mapping;
#[cfg(feature = "equinix_tests")]
mod lqos_daht_test;
mod program_control;
mod throughput_tracker;
mod shaped_devices_tracker;
mod throughput_tracker;
mod tuning;
mod validation;
use crate::{
@ -64,7 +64,11 @@ async fn main() -> Result<()> {
};
// Spawn tracking sub-systems
join!(spawn_queue_structure_monitor(), shaped_devices_tracker::shaped_devices_watcher());
join!(
spawn_queue_structure_monitor(),
shaped_devices_tracker::shaped_devices_watcher(),
shaped_devices_tracker::network_json_watcher()
);
throughput_tracker::spawn_throughput_monitor();
spawn_queue_monitor();
@ -156,7 +160,10 @@ fn handle_bus_requests(
BusRequest::RequestLqosEquinixTest => lqos_daht_test::lqos_daht_test(),
BusRequest::ValidateShapedDevicesCsv => {
validation::validate_shaped_devices_csv()
}
},
BusRequest::GetNetworkMap { parent } => {
shaped_devices_tracker::get_one_network_map_layer(*parent)
},
});
}
}

View File

@ -1,10 +1,13 @@
use anyhow::Result;
use log::{error, info, warn};
use lqos_bus::BusResponse;
use lqos_config::ConfigShapedDevices;
use lqos_utils::file_watcher::FileWatcher;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use tokio::task::spawn_blocking;
mod netjson;
pub use netjson::*;
pub static SHAPED_DEVICES: Lazy<RwLock<ConfigShapedDevices>> =
Lazy::new(|| RwLock::new(ConfigShapedDevices::default()));
@ -50,3 +53,14 @@ fn watch_for_shaped_devices_changing() -> Result<()> {
info!("ShapedDevices watcher returned: {result:?}");
}
}
pub fn get_one_network_map_layer(parent_idx: usize) -> BusResponse {
let net_json = NETWORK_JSON.read();
if let Some(parent) = net_json.get_cloned_entry_by_index(parent_idx) {
let mut nodes = vec![(parent_idx, parent)];
nodes.extend_from_slice(&net_json.get_cloned_children(parent_idx));
BusResponse::NetworkMap(nodes)
} else {
BusResponse::Fail("No such node".to_string())
}
}

View File

@ -0,0 +1,47 @@
use log::{info, error, warn};
use lqos_config::NetworkJson;
use lqos_utils::file_watcher::FileWatcher;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use tokio::task::spawn_blocking;
use anyhow::Result;
pub static NETWORK_JSON: Lazy<RwLock<NetworkJson>> = Lazy::new(|| RwLock::new(NetworkJson::default()));
pub async fn network_json_watcher() {
spawn_blocking(|| {
info!("Watching for network.kson changes");
let _ = watch_for_network_json_changing();
});
}
/// Fires up a Linux file system watcher than notifies
/// when `network.json` changes, and triggers a reload.
fn watch_for_network_json_changing() -> Result<()> {
let watch_path = NetworkJson::path();
if watch_path.is_err() {
error!("Unable to generate path for network.json");
return Err(anyhow::Error::msg(
"Unable to create path for network.json",
));
}
let watch_path = watch_path.unwrap();
let mut watcher = FileWatcher::new("network.json", watch_path);
watcher.set_file_exists_callback(load_network_json);
watcher.set_file_created_callback(load_network_json);
watcher.set_file_changed_callback(load_network_json);
loop {
let result = watcher.watch();
info!("network.json watcher returned: {result:?}");
}
}
fn load_network_json() {
let njs = NetworkJson::load();
if let Ok(njs) = njs {
*NETWORK_JSON.write() = njs;
} else {
warn!("Unable to load network.json");
}
}

View File

@ -1,6 +1,6 @@
mod throughput_entry;
mod tracking_data;
use crate::throughput_tracker::tracking_data::ThroughputTracker;
use crate::{throughput_tracker::tracking_data::ThroughputTracker, shaped_devices_tracker::NETWORK_JSON};
use log::{info, warn};
use lqos_bus::{BusResponse, IpStats, TcHandle, XdpPpingResult};
use lqos_sys::XdpIpAddress;
@ -21,10 +21,11 @@ pub fn spawn_throughput_monitor() {
std::thread::spawn(move || {
periodic(interval_ms, "Throughput Monitor", &mut || {
let mut throughput = THROUGHPUT_TRACKER.write();
throughput.copy_previous_and_reset_rtt();
let mut net_json = NETWORK_JSON.write();
throughput.copy_previous_and_reset_rtt(&mut net_json);
throughput.apply_new_throughput_counters();
throughput.apply_rtt_data();
throughput.update_totals();
throughput.update_totals(&mut net_json);
throughput.next_cycle();
});
});

View File

@ -3,6 +3,7 @@ use lqos_bus::TcHandle;
#[derive(Debug)]
pub(crate) struct ThroughputEntry {
pub(crate) circuit_id: Option<String>,
pub(crate) network_json_parents: Option<Vec<usize>>,
pub(crate) first_cycle: u64,
pub(crate) most_recent_cycle: u64,
pub(crate) bytes: (u64, u64),

View File

@ -2,6 +2,7 @@ use crate::shaped_devices_tracker::SHAPED_DEVICES;
use super::{throughput_entry::ThroughputEntry, RETIRE_AFTER_SECONDS};
use lqos_bus::TcHandle;
use lqos_config::NetworkJson;
use lqos_sys::{rtt_for_each, throughput_for_each, XdpIpAddress};
use rayon::prelude::{IntoParallelRefMutIterator, ParallelIterator};
use std::collections::HashMap;
@ -28,7 +29,13 @@ impl ThroughputTracker {
}
}
pub(crate) fn copy_previous_and_reset_rtt(&mut self) {
pub(crate) fn copy_previous_and_reset_rtt(
&mut self,
netjson: &mut NetworkJson,
) {
// Zero the previous funnel hierarchy current numbers
netjson.zero_throughput_and_rtt();
// Copy previous byte/packet numbers and reset RTT data
// We're using Rayon's "par_iter_mut" to spread the operation across
// all CPU cores.
@ -65,9 +72,37 @@ impl ThroughputTracker {
circuit_id
}
pub(crate) fn get_node_name_for_circuit_id(
circuit_id: Option<String>,
) -> Option<String> {
if let Some(circuit_id) = circuit_id {
let shaped = SHAPED_DEVICES.read();
shaped
.devices
.iter()
.find(|d| d.circuit_id == circuit_id)
.map(|device| device.parent_node.clone())
} else {
None
}
}
pub(crate) fn lookup_network_parents(
circuit_id: Option<String>,
) -> Option<Vec<usize>> {
if let Some(parent) = Self::get_node_name_for_circuit_id(circuit_id) {
let lock = crate::shaped_devices_tracker::NETWORK_JSON.read();
lock.get_parents_for_circuit_id(&parent)
} else {
None
}
}
pub(crate) fn refresh_circuit_ids(&mut self) {
self.raw_data.par_iter_mut().for_each(|(ip, data)| {
data.circuit_id = Self::lookup_circuit_id(ip);
data.network_json_parents =
Self::lookup_network_parents(data.circuit_id.clone());
});
}
@ -94,8 +129,10 @@ impl ThroughputTracker {
entry.most_recent_cycle = cycle;
}
} else {
let circuit_id = Self::lookup_circuit_id(xdp_ip);
let mut entry = ThroughputEntry {
circuit_id: Self::lookup_circuit_id(xdp_ip),
circuit_id: circuit_id.clone(),
network_json_parents: Self::lookup_network_parents(circuit_id),
first_cycle: self.cycle,
most_recent_cycle: 0,
bytes: (0, 0),
@ -135,7 +172,7 @@ impl ThroughputTracker {
});
}
pub(crate) fn update_totals(&mut self) {
pub(crate) fn update_totals(&mut self, net_json: &mut NetworkJson) {
self.bytes_per_second = (0, 0);
self.packets_per_second = (0, 0);
self.shaped_bytes_per_second = (0, 0);
@ -149,27 +186,43 @@ impl ThroughputTracker {
v.packets.0.saturating_sub(v.prev_packets.0),
v.packets.1.saturating_sub(v.prev_packets.1),
v.tc_handle.as_u32() > 0,
&v.network_json_parents,
v.median_latency(),
)
})
.for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
self.bytes_per_second.0 =
self.bytes_per_second.0.checked_add(bytes_down).unwrap_or(0);
self.bytes_per_second.1 =
self.bytes_per_second.1.checked_add(bytes_up).unwrap_or(0);
self.packets_per_second.0 =
self.packets_per_second.0.checked_add(packets_down).unwrap_or(0);
self.packets_per_second.1 =
self.packets_per_second.1.checked_add(packets_up).unwrap_or(0);
if shaped {
self.shaped_bytes_per_second.0 = self
.shaped_bytes_per_second
.0
.checked_add(bytes_down)
.unwrap_or(0);
self.shaped_bytes_per_second.1 =
self.shaped_bytes_per_second.1.checked_add(bytes_up).unwrap_or(0);
}
});
.for_each(
|(bytes_down, bytes_up, packets_down, packets_up, shaped, parents, median_rtt)| {
self.bytes_per_second.0 =
self.bytes_per_second.0.checked_add(bytes_down).unwrap_or(0);
self.bytes_per_second.1 =
self.bytes_per_second.1.checked_add(bytes_up).unwrap_or(0);
self.packets_per_second.0 =
self.packets_per_second.0.checked_add(packets_down).unwrap_or(0);
self.packets_per_second.1 =
self.packets_per_second.1.checked_add(packets_up).unwrap_or(0);
if shaped {
self.shaped_bytes_per_second.0 = self
.shaped_bytes_per_second
.0
.checked_add(bytes_down)
.unwrap_or(0);
self.shaped_bytes_per_second.1 = self
.shaped_bytes_per_second
.1
.checked_add(bytes_up)
.unwrap_or(0);
}
// If we have parent node data, we apply it now
if let Some(parents) = parents {
net_json.add_throughput_cycle(
parents,
(self.bytes_per_second.0, self.bytes_per_second.1),
median_rtt,
)
}
},
);
}
pub(crate) fn next_cycle(&mut self) {