Add batching to LibreQoS.py python XDP mapping system

* Add a new Python-exported class ('BatchedCommands') to the
  Python-Rust library.
* Replace direct calls to xdp_iphash_cpu_cmdline with batched
  commands.
* Execute the single batch and obtain counts from the batch.
This commit is contained in:
Herbert Wolverson
2023-03-03 16:52:34 +00:00
parent 24722aa608
commit 99dc9fc066
3 changed files with 68 additions and 17 deletions

View File

@@ -25,7 +25,7 @@ from ispConfig import sqm, upstreamBandwidthCapacityDownloadMbps, upstreamBandwi
OnAStick
from liblqos_python import is_lqosd_alive, clear_ip_mappings, delete_ip_mapping, validate_shaped_devices, \
is_libre_already_running, create_lock_file, free_lock_file, add_ip_mapping
is_libre_already_running, create_lock_file, free_lock_file, add_ip_mapping, BatchedCommands
# Automatically account for TCP overhead of plans. For example a 100Mbps plan needs to be set to 109Mbps for the user to ever see that result on a speed test
# Does not apply to nodes of any sort, just endpoint devices
@@ -406,7 +406,8 @@ def refreshShapers():
# Starting
print("refreshShapers starting at " + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
# Create a single batch of xdp update commands to execute together
ipMapBatch = BatchedCommands()
# Warn user if enableActualShellCommands is False, because that would mean no actual commands are executing
if enableActualShellCommands == False:
@@ -694,7 +695,6 @@ def refreshShapers():
minorByCPU = traverseNetwork(network, 0, major=1, minorByCPU=minorByCPUpreloaded, queue=1, parentClassID=None, upParentClassID=None, parentMaxDL=upstreamBandwidthCapacityDownloadMbps, parentMaxUL=upstreamBandwidthCapacityUploadMbps)
linuxTCcommands = []
xdpCPUmapCommands = []
devicesShaped = []
# Root HTB Setup
# Create MQ qdisc for each CPU core / rx-tx queue. Generate commands to create corresponding HTB and leaf classes. Prepare commands for execution later
@@ -775,14 +775,18 @@ def refreshShapers():
for device in circuit['devices']:
if device['ipv4s']:
for ipv4 in device['ipv4s']:
xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv4) + ' --cpu ' + data[node]['cpuNum'] + ' --classid ' + circuit['classid'])
ipMapBatch.add_ip_mapping(str(ipv4), circuit['classid'], data[node]['cpuNum'], False)
#xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv4) + ' --cpu ' + data[node]['cpuNum'] + ' --classid ' + circuit['classid'])
if OnAStick:
xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv4) + ' --cpu ' + data[node]['up_cpuNum'] + ' --classid ' + circuit['up_classid'] + ' --upload 1')
ipMapBatch.add_ip_mapping(str(ipv4), circuit['up_classid'], data[node]['up_cpuNum'], True)
#xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv4) + ' --cpu ' + data[node]['up_cpuNum'] + ' --classid ' + circuit['up_classid'] + ' --upload 1')
if device['ipv6s']:
for ipv6 in device['ipv6s']:
xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv6) + ' --cpu ' + data[node]['cpuNum'] + ' --classid ' + circuit['classid'])
ipMapBatch.add_ip_mapping(str(ipv6), circuit['classid'], data[node]['cpuNum'], False)
#xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv6) + ' --cpu ' + data[node]['cpuNum'] + ' --classid ' + circuit['classid'])
if OnAStick:
xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv6) + ' --cpu ' + data[node]['up_cpuNum'] + ' --classid ' + circuit['up_classid'] + ' --upload 1')
ipMapBatch.add_ip_mapping(str(ipv6), circuit['up_classid'], data[node]['up_cpuNum'], True)
#xdpCPUmapCommands.append('./bin/xdp_iphash_to_cpu_cmdline add --ip ' + str(ipv6) + ' --cpu ' + data[node]['up_cpuNum'] + ' --classid ' + circuit['up_classid'] + ' --upload 1')
if device['deviceName'] not in devicesShaped:
devicesShaped.append(device['deviceName'])
# Recursive call this function for children nodes attached to this node
@@ -849,15 +853,18 @@ def refreshShapers():
# Execute actual XDP-CPUMAP-TC filter commands
xdpFilterStartTime = datetime.now()
print("Executing XDP-CPUMAP-TC IP filter commands")
numXdpCommands = ipMapBatch.length();
if enableActualShellCommands:
for command in xdpCPUmapCommands:
logging.info(command)
commands = command.split(' ')
proc = subprocess.Popen(commands, stdout=subprocess.DEVNULL)
ipMapBatch.submit()
#for command in xdpCPUmapCommands:
# logging.info(command)
# commands = command.split(' ')
# proc = subprocess.Popen(commands, stdout=subprocess.DEVNULL)
else:
for command in xdpCPUmapCommands:
logging.info(command)
print("Executed " + str(len(xdpCPUmapCommands)) + " XDP-CPUMAP-TC IP filter commands")
ipMapBatch.log()
#for command in xdpCPUmapCommands:
# logging.info(command)
print("Executed " + str(numXdpCommands) + " XDP-CPUMAP-TC IP filter commands")
#print(xdpCPUmapCommands)
xdpFilterEndTime = datetime.now()
@@ -903,7 +910,7 @@ def refreshShapers():
print("Queue and IP filter reload completed in " + "{:g}".format(round(reloadTimeSeconds,1)) + " seconds")
print("\tTC commands: \t" + "{:g}".format(round(tcTimeSeconds,1)) + " seconds")
print("\tXDP setup: \t " + "{:g}".format(round(xdpSetupTimeSeconds,1)) + " seconds")
print("\tXDP filters: \t " + "{:g}".format(round(xdpFilterTimeSeconds,1)) + " seconds")
print("\tXDP filters: \t " + "{:g}".format(round(xdpFilterTimeSeconds,4)) + " seconds")
# Done

View File

@@ -12,6 +12,8 @@ use tokio::{
use super::BUS_SOCKET_DIRECTORY;
const READ_BUFFER_SIZE: usize = 20480;
/// Implements a Tokio-friendly server using Unix Sockets and the bus protocol.
/// Requests are handled and then forwarded to the handler.
pub struct UnixSocketServer {}
@@ -111,7 +113,7 @@ impl UnixSocketServer {
let (mut socket, _) = ret.unwrap();
tokio::spawn(async move {
loop {
let mut buf = vec![0; 4096];
let mut buf = vec![0; READ_BUFFER_SIZE];
let bytes_read = socket.read(&mut buf).await;
if bytes_read.is_err() {

View File

@@ -3,7 +3,7 @@ use lqos_utils::hex_string::read_hex_string;
use nix::libc::getpid;
use pyo3::{
exceptions::PyOSError, pyclass, pyfunction, pymodule, types::PyModule,
wrap_pyfunction, PyResult, Python,
wrap_pyfunction, PyResult, Python, pymethods,
};
use std::{
fs::{remove_file, File},
@@ -22,6 +22,7 @@ const LOCK_FILE: &str = "/run/lqos/libreqos.lock";
#[pymodule]
fn liblqos_python(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyIpMapping>()?;
m.add_class::<BatchedCommands>()?;
m.add_wrapped(wrap_pyfunction!(is_lqosd_alive))?;
m.add_wrapped(wrap_pyfunction!(list_ip_mappings))?;
m.add_wrapped(wrap_pyfunction!(clear_ip_mappings))?;
@@ -145,6 +146,47 @@ fn add_ip_mapping(
}
}
#[pyclass]
pub struct BatchedCommands {
batch: Vec<BusRequest>,
}
#[pymethods]
impl BatchedCommands {
#[new]
pub fn new() -> PyResult<Self> {
Ok(Self { batch: Vec::new() })
}
pub fn add_ip_mapping(&mut self, ip: String, classid: String, cpu: String, upload: bool) -> PyResult<()> {
let request = parse_add_ip(&ip, &classid, &cpu, upload);
if let Ok(request) = request {
self.batch.push(request);
Ok(())
} else {
Err(PyOSError::new_err(request.err().unwrap().to_string()))
}
}
pub fn length(&self) -> PyResult<usize> {
Ok(self.batch.len())
}
pub fn log(&self) -> PyResult<()> {
self.batch.iter().for_each(|c| println!("{c:?}"));
Ok(())
}
pub fn submit(&mut self) -> PyResult<usize> {
// We're draining the request list out, which is a move that
// *should* be elided by the optimizing compiler.
let len = self.batch.len();
let batch: Vec<BusRequest> = self.batch.drain(0..).collect();
run_query(batch).unwrap();
Ok(len)
}
}
/// Requests Rust-side validation of `ShapedDevices.csv`
#[pyfunction]
fn validate_shaped_devices() -> PyResult<String> {