Merge branch 'develop' into per_flow

This commit is contained in:
Herbert Wolverson 2024-03-02 09:16:08 -06:00
commit 13f2fabe4c
9 changed files with 381 additions and 28 deletions

View File

@ -1,6 +1,6 @@
# Share your before and after
We ask that you please share an anonymized screenshot of your LibreQoS deployment before (monitor only mode) and after (queuing enabled) to our [Matrix Channel](https://matrix.to/#/#libreqos:matrix.org). This helps us gauge the impact of our software. It also makes us smile.
We ask that you please share an anonymized screenshot of your LibreQoS deployment before (monitor only mode) and after (queuing enabled) to the [LibreQoS Chat](https://chat.libreqos.io/join/fvu3cerayyaumo377xwvpev6/). This helps us gauge the impact of our software. It also makes us smile.
1. Enable monitor only mode
2. Klingon mode (Redact customer info)

View File

@ -25,7 +25,7 @@ from liblqos_python import is_lqosd_alive, clear_ip_mappings, delete_ip_mapping,
check_config, sqm, upstream_bandwidth_capacity_download_mbps, upstream_bandwidth_capacity_upload_mbps, \
interface_a, interface_b, enable_actual_shell_commands, use_bin_packing_to_balance_cpu, monitor_mode_only, \
run_shell_commands_as_sudo, generated_pn_download_mbps, generated_pn_upload_mbps, queues_available_override, \
on_a_stick
on_a_stick, get_tree_weights, get_weights
# Automatically account for TCP overhead of plans. For example a 100Mbps plan needs to be set to 109Mbps for the user to ever see that result on a speed test
# Does not apply to nodes of any sort, just endpoint devices
@ -513,6 +513,14 @@ def refreshShapers():
generatedPNs.append(genPNname)
if use_bin_packing_to_balance_cpu():
print("Using binpacking module to sort circuits by CPU core")
weights = get_weights()
if weights is not None and dictForCircuitsWithoutParentNodes is not None:
for circuitId in dictForCircuitsWithoutParentNodes:
for circuit in subscriberCircuits:
if 'idForCircuitsWithoutParentNodes' in circuit and circuit['idForCircuitsWithoutParentNodes'] == circuitId:
for w in weights:
if w.circuit_id == circuit['circuitID']:
dictForCircuitsWithoutParentNodes[circuitId] = w.weight
bins = binpacking.to_constant_bin_number(dictForCircuitsWithoutParentNodes, numberOfGeneratedPNs)
genPNcounter = 0
for binItem in bins:
@ -617,7 +625,22 @@ def refreshShapers():
minorByCPUpreloaded[x+1] = 3
def traverseNetwork(data, depth, major, minorByCPU, queue, parentClassID, upParentClassID, parentMaxDL, parentMaxUL):
for node in data:
#if data[node]['type'] == "virtual":
# print(node + " is a virtual node. Skipping.")
# if depth == 0:
# parentClassID = hex(major) + ':'
# upParentClassID = hex(major+stickOffset) + ':'
# data[node]['parentClassID'] = parentClassID
# data[node]['up_parentClassID'] = upParentClassID
# data[node]['classMajor'] = hex(major)
# data[node]['up_classMajor'] = hex(major + stickOffset)
# data[node]['classMinor'] = hex(minorByCPU[queue])
# data[node]['cpuNum'] = hex(queue-1)
# data[node]['up_cpuNum'] = hex(queue-1+stickOffset)
# traverseNetwork(data[node]['children'], depth, major, minorByCPU, queue, parentClassID, upParentClassID, parentMaxDL, parentMaxUL)
# continue
circuitsForThisNetworkNode = []
nodeClassID = hex(major) + ':' + hex(minorByCPU[queue])
upNodeClassID = hex(major+stickOffset) + ':' + hex(minorByCPU[queue])
data[node]['classid'] = nodeClassID
@ -710,6 +733,43 @@ def refreshShapers():
queue += 1
major += 1
return minorByCPU
# If we're in binpacking mode, we need to sort the network structure a bit
bins = []
if use_bin_packing_to_balance_cpu() and not on_a_stick():
print("BinPacking is enabled, so we're going to sort your network.")
cpuBin = {}
weights = get_tree_weights()
for w in weights:
cpuBin[w.name] = w.weight
bins = binpacking.to_constant_bin_number(cpuBin, queuesAvailable)
for x in range(queuesAvailable):
print("Bin " + str(x) + " = ", bins[x])
#print(network)
binnedNetwork = {}
for cpu in range(queuesAvailable):
cpuKey = "CpueQueue" + str(cpu)
binnedNetwork[cpuKey] = {
'downloadBandwidthMbps': generated_pn_download_mbps(),
'uploadBandwidthMbps': generated_pn_upload_mbps(),
'type': 'site',
'downloadBandwidthMbpsMin': generated_pn_download_mbps(),
'uploadBandwidthMbpsMin': generated_pn_upload_mbps(),
'children': {}
}
for node in network:
found = False
for bin in range(queuesAvailable):
if node in bins[bin]:
binnedNetwork["CpueQueue" + str(bin)]['children'][node] = network[node]
found = True
if found == False:
newQueueId = queuesAvailable-1
binnedNetwork["CpueQueue" + str(newQueueId)]['children'][node] = network[node]
#print("Binned network = ", binnedNetwork)
network = binnedNetwork
# Here is the actual call to the recursive traverseNetwork() function. finalMinor is not used.
minorByCPU = traverseNetwork(network, 0, major=1, minorByCPU=minorByCPUpreloaded, queue=1, parentClassID=None, upParentClassID=None, parentMaxDL=upstream_bandwidth_capacity_download_mbps(), parentMaxUL=upstream_bandwidth_capacity_upload_mbps())

View File

@ -319,7 +319,7 @@ class NetworkGraph:
data[node]['uploadBandwidthMbps'] = min(int(data[node]['uploadBandwidthMbps']),int(parentMaxUL))
if 'children' in data[node]:
inheritBandwidthMaxes(data[node]['children'], data[node]['downloadBandwidthMbps'], data[node]['uploadBandwidthMbps'])
inheritBandwidthMaxes(topLevelNode, parentMaxDL=upstream_bandwidth_capacity_download_mbps, parentMaxUL=upstream_bandwidth_capacity_upload_mbps)
inheritBandwidthMaxes(topLevelNode, parentMaxDL=upstream_bandwidth_capacity_download_mbps(), parentMaxUL=upstream_bandwidth_capacity_upload_mbps())
with open('network.json', 'w') as f:
json.dump(topLevelNode, f, indent=4)

View File

@ -459,7 +459,6 @@ def buildFullGraph():
if site['identification']['status'] == "disconnected":
print("WARNING: Site " + name + " is disconnected")
continue
node = NetworkNode(id=id, displayName=name, type=nodeType,
parentId=parent, download=download, upload=upload, address=address, customerName=customerName)

78
src/rust/Cargo.lock generated
View File

@ -41,6 +41,21 @@ dependencies = [
"alloc-no-stdlib",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anes"
version = "0.1.6"
@ -393,6 +408,20 @@ dependencies = [
"cpufeatures",
]
[[package]]
name = "chrono"
version = "0.4.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.0",
]
[[package]]
name = "ciborium"
version = "0.2.2"
@ -1244,6 +1273,29 @@ dependencies = [
"tokio-native-tls",
]
[[package]]
name = "iana-time-zone"
version = "0.1.60"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7ffbb5a1b541ea2561f8c41c087286cc091e21e556a4f09a8f6cbf17b69b141"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "idna"
version = "0.5.0"
@ -1603,11 +1655,14 @@ name = "lqos_python"
version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"lqos_bus",
"lqos_config",
"lqos_utils",
"nix",
"pyo3",
"reqwest",
"serde",
"sysinfo",
"tokio",
]
@ -2434,9 +2489,9 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "reqwest"
version = "0.11.23"
version = "0.11.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41"
checksum = "c6920094eb85afde5e4a138be3f2de8bbdf28000f0029e72c45025a56b042251"
dependencies = [
"base64",
"bytes",
@ -2456,9 +2511,11 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-native-tls",
@ -2624,6 +2681,15 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c"
dependencies = [
"base64",
]
[[package]]
name = "rustversion"
version = "1.0.14"
@ -2706,9 +2772,9 @@ checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0"
[[package]]
name = "serde"
version = "1.0.195"
version = "1.0.196"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02"
checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32"
dependencies = [
"serde_derive",
]
@ -2725,9 +2791,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.195"
version = "1.0.196"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c"
checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67"
dependencies = [
"proc-macro2",
"quote",

View File

@ -17,3 +17,6 @@ tokio = { version = "1", features = [ "full" ] }
anyhow = "1"
sysinfo = "0"
nix = "0"
reqwest = { version = "0.11.24", features = ["blocking"] }
serde = { version = "1.0.196", features = ["derive"] }
chrono = "0.4.33"

View File

@ -0,0 +1,199 @@
//! This module is responsible for getting the device weights from the Long Term Stats API
//! and the ShapedDevices.csv file. It will merge the two sources of weights and return the
//! result as a Vec<DeviceWeightResponse>.
//!
//! # Example
//!
//! ```python
//! from liblqos_python import get_weights;
//! weights = get_weights();
//! for w in weights:
//! print(w.circuit_id + " : " + str(w.weight));
//! ```
use anyhow::Result;
use lqos_config::{load_config, ConfigShapedDevices, ShapedDevice};
use pyo3::pyclass;
use serde::{Deserialize, Serialize};
const URL: &str = "https:/stats.libreqos.io/api/device_weights";
/// This struct is used to send a request to the Long Term Stats API
#[derive(Serialize, Deserialize)]
pub struct DeviceWeightRequest {
pub org_key: String,
pub node_id: String,
pub start: i64,
pub duration_seconds: i64,
pub percentile: f64,
}
/// This struct is used to receive a response from the Long Term Stats API
/// It contains the circuit_id and the weight of the device
#[derive(Serialize, Deserialize, Debug)]
#[pyclass]
pub struct DeviceWeightResponse {
#[pyo3(get)]
pub circuit_id: String,
#[pyo3(get)]
pub weight: i64,
}
/// This function is used to get the device weights from the Long Term Stats API
fn get_weights_from_lts(
org_key: &str,
node_id: &str,
start: i64,
duration_seconds: i64,
percentile: f64,
) -> Result<Vec<DeviceWeightResponse>> {
let request = DeviceWeightRequest {
org_key: org_key.to_string(),
node_id: node_id.to_string(),
start,
duration_seconds,
percentile,
};
// Make a BLOCKING reqwest call (we're not in an async context)
let response = reqwest::blocking::Client::new()
.post(URL)
.json(&request)
.send()?
.json::<Vec<DeviceWeightResponse>>()?;
Ok(response)
}
/// This function is used to get the device weights from the ShapedDevices.csv file
fn get_weights_from_shaped_devices() -> Result<Vec<DeviceWeightResponse>> {
let mut devices_list = ConfigShapedDevices::load()?.devices;
devices_list.sort_by(|a,b| a.circuit_id.cmp(&b.circuit_id));
let mut result = vec![];
let mut prev_id = String::new();
for device in devices_list {
let circuit_id = device.circuit_id;
if circuit_id == prev_id {
continue;
}
let weight = device.download_max_mbps as i64 / 2;
result.push(DeviceWeightResponse { circuit_id: circuit_id.clone(), weight });
prev_id = circuit_id.clone();
}
Ok(result)
}
/// This function is used to determine if we should use the Long Term Stats API to get the weights
fn use_lts_weights() -> bool {
let config = load_config().unwrap();
config.long_term_stats.gather_stats && config.long_term_stats.license_key.is_some()
}
/// This function is used to get the device weights from the Long Term Stats API and the ShapedDevices.csv file
/// It will merge the two sources of weights and return the result as a Vec<DeviceWeightResponse>.
///
/// It serves as the Rust-side implementation of the `get_weights` function in the Python module.
pub(crate) fn get_weights_rust() -> Result<Vec<DeviceWeightResponse>> {
let mut shaped_devices_weights = get_weights_from_shaped_devices()?;
if use_lts_weights() {
// This allows us to use Python printing
println!("Using LTS weights");
let config = load_config().unwrap();
let org_key = config.long_term_stats.license_key.unwrap();
let node_id = config.node_id;
// Get current local time as unix timestamp
let now = chrono::Utc::now().timestamp();
// Subtract 7 days to get the start time
let start = now - (60 * 60 * 24 * 7);
let duration_seconds = 60 * 60 * 24; // 1 day
let percentile = 0.95;
eprintln!("Getting weights from LTS");
let weights = get_weights_from_lts(&org_key, &node_id, start, duration_seconds, percentile);
if let Ok(weights) = weights {
eprintln!("Retrieved {} weights from LTS", weights.len());
// Merge them
for weight in weights.iter() {
if let Some(existing) = shaped_devices_weights.iter_mut().find(|d| d.circuit_id == weight.circuit_id) {
existing.weight = weight.weight;
}
}
} else {
eprintln!("Failed to get weights from LTS: {:?}", weights);
}
} else {
eprintln!("Not using LTS weights. Using weights from ShapedDevices.csv");
}
Ok(shaped_devices_weights)
}
fn recurse_weights(
device_list: &[ShapedDevice],
device_weights: &[DeviceWeightResponse],
network: &lqos_config::NetworkJson,
node_index: usize,
) -> Result<i64> {
let mut weight = 0;
let n = &network.nodes[node_index];
//println!(" Tower: {}", n.name);
device_list
.iter()
.filter(|d| d.parent_node == n.name)
.for_each(|d| {
if let Some(w) = device_weights.iter().find(|w| w.circuit_id == d.circuit_id) {
weight += w.weight;
}
});
//println!(" Weight: {}", weight);
for (i, n) in network.nodes
.iter()
.enumerate()
.filter(|(_i, n)| n.immediate_parent == Some(node_index))
{
//println!(" Child: {}", n.name);
weight += recurse_weights(device_list, device_weights, network, i)?;
}
Ok(weight)
}
#[pyclass]
pub struct NetworkNodeWeight {
#[pyo3(get)]
pub name: String,
#[pyo3(get)]
pub weight: i64,
}
/// Calculate the top-level network tree nodes and then
/// calculate the weights for each node
pub(crate) fn calculate_tree_weights() -> Result<Vec<NetworkNodeWeight>> {
let device_list = ConfigShapedDevices::load()?.devices;
let device_weights = get_weights_rust()?;
let network = lqos_config::NetworkJson::load()?;
let root_index = network.nodes.iter().position(|n| n.immediate_parent.is_none()).unwrap();
let mut result = Vec::new();
//println!("Root index is: {}", root_index);
// Find all network nodes one off the top
network
.nodes
.iter()
.enumerate()
.filter(|(_,n)| n.immediate_parent.is_some() && n.immediate_parent.unwrap() == root_index)
.for_each(|(idx, n)| {
//println!("Node: {} ", n.name);
let weight = recurse_weights(&device_list, &device_weights, &network, idx).unwrap();
//println!("Node: {} : {weight}", n.name);
result.push(NetworkNodeWeight { name: n.name.clone(), weight });
});
result.sort_by(|a,b| b.weight.cmp(&a.weight));
Ok(result)
}

View File

@ -14,6 +14,7 @@ mod blocking;
use anyhow::{Error, Result};
use blocking::run_query;
use sysinfo::System;
mod device_weights;
const LOCK_FILE: &str = "/run/lqos/libreqos.lock";
@ -24,6 +25,7 @@ fn liblqos_python(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyIpMapping>()?;
m.add_class::<BatchedCommands>()?;
m.add_class::<PyExceptionCpe>()?;
m.add_class::<device_weights::DeviceWeightResponse>()?;
m.add_wrapped(wrap_pyfunction!(is_lqosd_alive))?;
m.add_wrapped(wrap_pyfunction!(list_ip_mappings))?;
m.add_wrapped(wrap_pyfunction!(clear_ip_mappings))?;
@ -86,6 +88,8 @@ fn liblqos_python(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(influx_db_org))?;
m.add_wrapped(wrap_pyfunction!(influx_db_token))?;
m.add_wrapped(wrap_pyfunction!(influx_db_url))?;
m.add_wrapped(wrap_pyfunction!(get_weights))?;
m.add_wrapped(wrap_pyfunction!(get_tree_weights))?;
Ok(())
}
@ -637,4 +641,24 @@ fn influx_db_token() -> PyResult<String> {
fn influx_db_url() -> PyResult<String> {
let config = lqos_config::load_config().unwrap();
Ok(config.influxdb.url)
}
#[pyfunction]
pub fn get_weights() -> PyResult<Vec<device_weights::DeviceWeightResponse>> {
match device_weights::get_weights_rust() {
Ok(weights) => Ok(weights),
Err(e) => {
Err(PyOSError::new_err(e.to_string()))
}
}
}
#[pyfunction]
pub fn get_tree_weights() -> PyResult<Vec<device_weights::NetworkNodeWeight>> {
match device_weights::calculate_tree_weights() {
Ok(w) => Ok(w),
Err(e) => {
Err(PyOSError::new_err(e.to_string()))
}
}
}

View File

@ -208,26 +208,28 @@ pub fn attach_xdp_and_tc_to_interface(
// Attach to the ingress IF it is configured
if let Ok(etc) = lqos_config::load_config() {
if let Some(bridge) = &etc.bridge {
// Enable "promiscuous" mode on interfaces
info!("Enabling promiscuous mode on {}", &bridge.to_internet);
std::process::Command::new("/bin/ip")
.args(["link", "set", &bridge.to_internet, "promisc", "on"])
.output()?;
info!("Enabling promiscuous mode on {}", &bridge.to_network);
std::process::Command::new("/bin/ip")
.args(["link", "set", &bridge.to_network, "promisc", "on"])
.output()?;
if bridge.use_xdp_bridge {
// Enable "promiscuous" mode on interfaces
info!("Enabling promiscuous mode on {}", &bridge.to_internet);
std::process::Command::new("/bin/ip")
.args(["link", "set", &bridge.to_internet, "promisc", "on"])
.output()?;
info!("Enabling promiscuous mode on {}", &bridge.to_network);
std::process::Command::new("/bin/ip")
.args(["link", "set", &bridge.to_network, "promisc", "on"])
.output()?;
// Build the interface and vlan map entries
crate::bifrost_maps::clear_bifrost()?;
crate::bifrost_maps::map_multi_interface_mode(&bridge.to_internet, &bridge.to_network)?;
// Build the interface and vlan map entries
crate::bifrost_maps::clear_bifrost()?;
crate::bifrost_maps::map_multi_interface_mode(&bridge.to_internet, &bridge.to_network)?;
// Actually attach the TC ingress program
let error = unsafe {
bpf::tc_attach_ingress(interface_index as i32, false, skeleton)
};
if error != 0 {
return Err(Error::msg("Unable to attach TC Ingress to interface"));
// Actually attach the TC ingress program
let error = unsafe {
bpf::tc_attach_ingress(interface_index as i32, false, skeleton)
};
if error != 0 {
return Err(Error::msg("Unable to attach TC Ingress to interface"));
}
}
}