Merge pull request #461 from LibreQoE/circuit_weights

Circuit weights
This commit is contained in:
Robert Chacón 2024-03-01 11:21:57 -07:00 committed by GitHub
commit 5ff0737a76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 331 additions and 12 deletions

View File

@ -25,7 +25,7 @@ from liblqos_python import is_lqosd_alive, clear_ip_mappings, delete_ip_mapping,
check_config, sqm, upstream_bandwidth_capacity_download_mbps, upstream_bandwidth_capacity_upload_mbps, \
interface_a, interface_b, enable_actual_shell_commands, use_bin_packing_to_balance_cpu, monitor_mode_only, \
run_shell_commands_as_sudo, generated_pn_download_mbps, generated_pn_upload_mbps, queues_available_override, \
on_a_stick
on_a_stick, get_tree_weights, get_weights
# Automatically account for TCP overhead of plans. For example a 100Mbps plan needs to be set to 109Mbps for the user to ever see that result on a speed test
# Does not apply to nodes of any sort, just endpoint devices
@ -513,6 +513,13 @@ def refreshShapers():
generatedPNs.append(genPNname)
if use_bin_packing_to_balance_cpu():
print("Using binpacking module to sort circuits by CPU core")
weights = get_weights()
for circuitId in dictForCircuitsWithoutParentNodes:
for circuit in subscriberCircuits:
if circuit['idForCircuitsWithoutParentNodes'] == circuitId:
for w in weights:
if w.circuit_id == circuit['circuitID']:
dictForCircuitsWithoutParentNodes[circuitId] = w.weight
bins = binpacking.to_constant_bin_number(dictForCircuitsWithoutParentNodes, numberOfGeneratedPNs)
genPNcounter = 0
for binItem in bins:
@ -615,9 +622,16 @@ def refreshShapers():
# Track minor counter by CPU. This way we can have > 32000 hosts (htb has u16 limit to minor handle)
for x in range(queuesAvailable):
minorByCPUpreloaded[x+1] = 3
def traverseNetwork(data, depth, major, minorByCPU, queue, parentClassID, upParentClassID, parentMaxDL, parentMaxUL):
def traverseNetwork(data, depth, major, minorByCPU, queue, parentClassID, upParentClassID, parentMaxDL, parentMaxUL, bins):
for node in data:
circuitsForThisNetworkNode = []
circuitsForThisNetworkNode = []
if depth==0 and use_bin_packing_to_balance_cpu():
# Lookup the "major" number from the bins
for x in range(queuesAvailable):
if node in bins[x]:
major = x+1
print("Assigned major " + str(major) + " to node " + node)
nodeClassID = hex(major) + ':' + hex(minorByCPU[queue])
upNodeClassID = hex(major+stickOffset) + ':' + hex(minorByCPU[queue])
data[node]['classid'] = nodeClassID
@ -700,7 +714,7 @@ def refreshShapers():
if 'children' in data[node]:
# We need to keep tabs on the minor counter, because we can't have repeating class IDs. Here, we bring back the minor counter from the recursive function
minorByCPU[queue] = minorByCPU[queue] + 1
minorByCPU = traverseNetwork(data[node]['children'], depth+1, major, minorByCPU, queue, nodeClassID, upNodeClassID, data[node]['downloadBandwidthMbps'], data[node]['uploadBandwidthMbps'])
minorByCPU = traverseNetwork(data[node]['children'], depth+1, major, minorByCPU, queue, nodeClassID, upNodeClassID, data[node]['downloadBandwidthMbps'], data[node]['uploadBandwidthMbps'], bins)
# If top level node, increment to next queue / cpu core
if depth == 0:
if queue >= queuesAvailable:
@ -710,8 +724,21 @@ def refreshShapers():
queue += 1
major += 1
return minorByCPU
# If we're in binpacking mode, we need to sort the network structure a bit
bins = []
if use_bin_packing_to_balance_cpu():
print("BinPacking is enabled, so we're going to sort your network.")
cpuBin = {}
weights = get_tree_weights()
for w in weights:
cpuBin[w.name] = w.weight
bins = binpacking.to_constant_bin_number(cpuBin, queuesAvailable)
#for x in range(queuesAvailable):
# print("Bin " + str(x) + " = ", bins[x])
# Here is the actual call to the recursive traverseNetwork() function. finalMinor is not used.
minorByCPU = traverseNetwork(network, 0, major=1, minorByCPU=minorByCPUpreloaded, queue=1, parentClassID=None, upParentClassID=None, parentMaxDL=upstream_bandwidth_capacity_download_mbps(), parentMaxUL=upstream_bandwidth_capacity_upload_mbps())
minorByCPU = traverseNetwork(network, 0, major=1, minorByCPU=minorByCPUpreloaded, queue=1, parentClassID=None, upParentClassID=None, parentMaxDL=upstream_bandwidth_capacity_download_mbps(), parentMaxUL=upstream_bandwidth_capacity_upload_mbps(), bins=bins)
linuxTCcommands = []
devicesShaped = []

View File

@ -319,7 +319,7 @@ class NetworkGraph:
data[node]['uploadBandwidthMbps'] = min(int(data[node]['uploadBandwidthMbps']),int(parentMaxUL))
if 'children' in data[node]:
inheritBandwidthMaxes(data[node]['children'], data[node]['downloadBandwidthMbps'], data[node]['uploadBandwidthMbps'])
inheritBandwidthMaxes(topLevelNode, parentMaxDL=upstream_bandwidth_capacity_download_mbps, parentMaxUL=upstream_bandwidth_capacity_upload_mbps)
inheritBandwidthMaxes(topLevelNode, parentMaxDL=upstream_bandwidth_capacity_download_mbps(), parentMaxUL=upstream_bandwidth_capacity_upload_mbps())
with open('network.json', 'w') as f:
json.dump(topLevelNode, f, indent=4)

78
src/rust/Cargo.lock generated
View File

@ -41,6 +41,21 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anes"
version = "0.1.6"
@ -393,6 +408,20 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "chrono"
version = "0.4.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.0",
]
[[package]]
name = "ciborium"
version = "0.2.2"
@ -1244,6 +1273,29 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "idna"
version = "0.5.0"
@ -1594,11 +1646,14 @@ name = "lqos_python"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"lqos_bus",
"lqos_config",
"lqos_utils",
"nix",
"pyo3",
"reqwest",
"serde",
"sysinfo",
"tokio",
]
@ -2424,9 +2479,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "reqwest"
version = "0.11.23"
version = "0.11.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41"
checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251"
dependencies = [
"base64",
"bytes",
@ -2446,9 +2501,11 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-native-tls",
@ -2614,6 +2671,15 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
"base64",
]
[[package]]
name = "rustversion"
version = "1.0.14"
@ -2696,9 +2762,9 @@ checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
[[package]]
name = "serde"
version = "1.0.195"
version = "1.0.196"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02"
checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32"
dependencies = [
"serde_derive",
]
@ -2715,9 +2781,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.195"
version = "1.0.196"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c"
checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67"
dependencies = [
"proc-macro2",
"quote",

View File

@ -17,3 +17,6 @@ tokio = { version = "1", features = [ "full" ] }
anyhow = "1"
sysinfo = "0"
nix = "0"
reqwest = { version = "0.11.24", features = ["blocking"] }
serde = { version = "1.0.196", features = ["derive"] }
chrono = "0.4.33"

View File

@ -0,0 +1,199 @@
//! This module is responsible for getting the device weights from the Long Term Stats API
//! and the ShapedDevices.csv file. It will merge the two sources of weights and return the
//! result as a Vec<DeviceWeightResponse>.
//!
//! # Example
//!
//! ```python
//! from liblqos_python import get_weights;
//! weights = get_weights();
//! for w in weights:
//! print(w.circuit_id + " : " + str(w.weight));
//! ```
use anyhow::Result;
use lqos_config::{load_config, ConfigShapedDevices, ShapedDevice};
use pyo3::pyclass;
use serde::{Deserialize, Serialize};
const URL: &str = "https:/stats.libreqos.io/api/device_weights";
/// This struct is used to send a request to the Long Term Stats API
#[derive(Serialize, Deserialize)]
pub struct DeviceWeightRequest {
pub org_key: String,
pub node_id: String,
pub start: i64,
pub duration_seconds: i64,
pub percentile: f64,
}
/// This struct is used to receive a response from the Long Term Stats API
/// It contains the circuit_id and the weight of the device
#[derive(Serialize, Deserialize, Debug)]
#[pyclass]
pub struct DeviceWeightResponse {
#[pyo3(get)]
pub circuit_id: String,
#[pyo3(get)]
pub weight: i64,
}
/// This function is used to get the device weights from the Long Term Stats API
fn get_weights_from_lts(
org_key: &str,
node_id: &str,
start: i64,
duration_seconds: i64,
percentile: f64,
) -> Result<Vec<DeviceWeightResponse>> {
let request = DeviceWeightRequest {
org_key: org_key.to_string(),
node_id: node_id.to_string(),
start,
duration_seconds,
percentile,
};
// Make a BLOCKING reqwest call (we're not in an async context)
let response = reqwest::blocking::Client::new()
.post(URL)
.json(&request)
.send()?
.json::<Vec<DeviceWeightResponse>>()?;
Ok(response)
}
/// This function is used to get the device weights from the ShapedDevices.csv file
fn get_weights_from_shaped_devices() -> Result<Vec<DeviceWeightResponse>> {
let mut devices_list = ConfigShapedDevices::load()?.devices;
devices_list.sort_by(|a,b| a.circuit_id.cmp(&b.circuit_id));
let mut result = vec![];
let mut prev_id = String::new();
for device in devices_list {
let circuit_id = device.circuit_id;
if circuit_id == prev_id {
continue;
}
let weight = device.download_max_mbps as i64 / 2;
result.push(DeviceWeightResponse { circuit_id: circuit_id.clone(), weight });
prev_id = circuit_id.clone();
}
Ok(result)
}
/// This function is used to determine if we should use the Long Term Stats API to get the weights
fn use_lts_weights() -> bool {
let config = load_config().unwrap();
config.long_term_stats.gather_stats && config.long_term_stats.license_key.is_some()
}
/// This function is used to get the device weights from the Long Term Stats API and the ShapedDevices.csv file
/// It will merge the two sources of weights and return the result as a Vec<DeviceWeightResponse>.
///
/// It serves as the Rust-side implementation of the `get_weights` function in the Python module.
pub(crate) fn get_weights_rust() -> Result<Vec<DeviceWeightResponse>> {
let mut shaped_devices_weights = get_weights_from_shaped_devices()?;
if use_lts_weights() {
// This allows us to use Python printing
println!("Using LTS weights");
let config = load_config().unwrap();
let org_key = config.long_term_stats.license_key.unwrap();
let node_id = config.node_id;
// Get current local time as unix timestamp
let now = chrono::Utc::now().timestamp();
// Subtract 7 days to get the start time
let start = now - (60 * 60 * 24 * 7);
let duration_seconds = 60 * 60 * 24; // 1 day
let percentile = 0.95;
eprintln!("Getting weights from LTS");
let weights = get_weights_from_lts(&org_key, &node_id, start, duration_seconds, percentile);
if let Ok(weights) = weights {
eprintln!("Retrieved {} weights from LTS", weights.len());
// Merge them
for weight in weights.iter() {
if let Some(existing) = shaped_devices_weights.iter_mut().find(|d| d.circuit_id == weight.circuit_id) {
existing.weight = weight.weight;
}
}
} else {
eprintln!("Failed to get weights from LTS: {:?}", weights);
}
} else {
eprintln!("Not using LTS weights. Using weights from ShapedDevices.csv");
}
Ok(shaped_devices_weights)
}
fn recurse_weights(
device_list: &[ShapedDevice],
device_weights: &[DeviceWeightResponse],
network: &lqos_config::NetworkJson,
node_index: usize,
) -> Result<i64> {
let mut weight = 0;
let n = &network.nodes[node_index];
//println!(" Tower: {}", n.name);
device_list
.iter()
.filter(|d| d.parent_node == n.name)
.for_each(|d| {
if let Some(w) = device_weights.iter().find(|w| w.circuit_id == d.circuit_id) {
weight += w.weight;
}
});
//println!(" Weight: {}", weight);
for (i, n) in network.nodes
.iter()
.enumerate()
.filter(|(_i, n)| n.immediate_parent == Some(node_index))
{
//println!(" Child: {}", n.name);
weight += recurse_weights(device_list, device_weights, network, i)?;
}
Ok(weight)
}
#[pyclass]
pub struct NetworkNodeWeight {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub weight: i64,
}
/// Calculate the top-level network tree nodes and then
/// calculate the weights for each node
pub(crate) fn calculate_tree_weights() -> Result<Vec<NetworkNodeWeight>> {
let device_list = ConfigShapedDevices::load()?.devices;
let device_weights = get_weights_rust()?;
let network = lqos_config::NetworkJson::load()?;
let root_index = network.nodes.iter().position(|n| n.immediate_parent.is_none()).unwrap();
let mut result = Vec::new();
//println!("Root index is: {}", root_index);
// Find all network nodes one off the top
network
.nodes
.iter()
.enumerate()
.filter(|(_,n)| n.immediate_parent.is_some() && n.immediate_parent.unwrap() == root_index)
.for_each(|(idx, n)| {
//println!("Node: {} ", n.name);
let weight = recurse_weights(&device_list, &device_weights, &network, idx).unwrap();
//println!("Node: {} : {weight}", n.name);
result.push(NetworkNodeWeight { name: n.name.clone(), weight });
});
result.sort_by(|a,b| b.weight.cmp(&a.weight));
Ok(result)
}

View File

@ -14,6 +14,7 @@ mod blocking;
use anyhow::{Error, Result};
use blocking::run_query;
use sysinfo::System;
mod device_weights;
const LOCK_FILE: &str = "/run/lqos/libreqos.lock";
@ -24,6 +25,7 @@ fn liblqos_python(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyIpMapping>()?;
m.add_class::<BatchedCommands>()?;
m.add_class::<PyExceptionCpe>()?;
m.add_class::<device_weights::DeviceWeightResponse>()?;
m.add_wrapped(wrap_pyfunction!(is_lqosd_alive))?;
m.add_wrapped(wrap_pyfunction!(list_ip_mappings))?;
m.add_wrapped(wrap_pyfunction!(clear_ip_mappings))?;
@ -86,6 +88,8 @@ fn liblqos_python(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(influx_db_org))?;
m.add_wrapped(wrap_pyfunction!(influx_db_token))?;
m.add_wrapped(wrap_pyfunction!(influx_db_url))?;
m.add_wrapped(wrap_pyfunction!(get_weights))?;
m.add_wrapped(wrap_pyfunction!(get_tree_weights))?;
Ok(())
}
@ -637,4 +641,24 @@ fn influx_db_token() -> PyResult<String> {
fn influx_db_url() -> PyResult<String> {
let config = lqos_config::load_config().unwrap();
Ok(config.influxdb.url)
}
#[pyfunction]
pub fn get_weights() -> PyResult<Vec<device_weights::DeviceWeightResponse>> {
match device_weights::get_weights_rust() {
Ok(weights) => Ok(weights),
Err(e) => {
Err(PyOSError::new_err(e.to_string()))
}
}
}
#[pyfunction]
pub fn get_tree_weights() -> PyResult<Vec<device_weights::NetworkNodeWeight>> {
match device_weights::calculate_tree_weights() {
Ok(w) => Ok(w),
Err(e) => {
Err(PyOSError::new_err(e.to_string()))
}
}
}