First iteration of a Python module for direct bus interaction

* Adds the `lqos_python` module to the Rust build system.
* `lqos_python` exposes an API for calling bus commands
  directly, instead of via a helper. Currently, "is alive"
  for testing the daemon, and IP mappings are exported.
* Update the build script to build the .so file and put
  it where Python can find it.
* Exclude the binary from Git.
* Update `LibreQoS.py` to use bus commands for clearing,
  deleting IP mappings. Adding is not yet covered, due
  to the way commands are batched - and that not yet being
  supported by the library.
* Add a check to `LibreQoS.py` that the `lqosd` daemon
  is running and accepting connections. Abort if it isn't,
  since nothing else will work anyway.

Signed-off-by: Herbert Wolverson <herberticus@gmail.com>
This commit is contained in:
Herbert Wolverson 2023-01-04 19:39:29 +00:00
parent bd5ace9fbf
commit b01b9520a5
8 changed files with 278 additions and 7 deletions

1
.gitignore vendored
View File

@ -47,6 +47,7 @@ src/queuingStructure.json
src/tinsStats.json
src/linux_tc.txt
src/lastRun.txt
src/liblqos_python.so
# Ignore Rust build artifacts
src/rust/target

View File

@ -24,6 +24,8 @@ from ispConfig import sqm, upstreamBandwidthCapacityDownloadMbps, upstreamBandwi
runShellCommandsAsSudo, generatedPNDownloadMbps, generatedPNUploadMbps, queuesAvailableOverride, \
OnAStick
from liblqos_python import is_lqosd_alive, clear_ip_mappings, delete_ip_mapping
# Automatically account for TCP overhead of plans. For example a 100Mbps plan needs to be set to 109Mbps for the user to ever see that result on a speed test
# Does not apply to nodes of any sort, just endpoint devices
tcpOverheadFactor = 1.09
@ -82,10 +84,8 @@ def tearDown(interfaceA, interfaceB):
# Full teardown of everything for exiting LibreQoS
if enableActualShellCommands:
# Clear IP filters and remove xdp program from interfaces
result = os.system('./bin/xdp_iphash_to_cpu_cmdline clear')
# The daemon is controling this now, let's not confuse things
#shell('ip link set dev ' + interfaceA + ' xdp off')
#shell('ip link set dev ' + interfaceB + ' xdp off')
#result = os.system('./bin/xdp_iphash_to_cpu_cmdline clear')
clear_ip_mappings() # Use the bus
clearPriorSettings(interfaceA, interfaceB)
def findQueuesAvailable():
@ -751,7 +751,8 @@ def refreshShapers():
xdpStartTime = datetime.now()
if enableActualShellCommands:
# Here we use os.system for the command, because otherwise it sometimes gltiches out with Popen in shell()
result = os.system('./bin/xdp_iphash_to_cpu_cmdline clear')
#result = os.system('./bin/xdp_iphash_to_cpu_cmdline clear')
clear_ip_mappings() # Use the bus
# Set up XDP-CPUMAP-TC
logging.info("# XDP Setup")
# Commented out - the daemon does this
@ -908,9 +909,11 @@ def refreshShapersUpdateOnly():
def removeDeviceIPsFromFilter(circuit):
for device in circuit['devices']:
for ipv4 in device['ipv4s']:
shell('./bin/xdp_iphash_to_cpu_cmdline del ip ' + str(ipv4))
#shell('./bin/xdp_iphash_to_cpu_cmdline del ip ' + str(ipv4))
delete_ip_mapping(str(ipv4))
for ipv6 in device['ipv6s']:
shell('./bin/xdp_iphash_to_cpu_cmdline del ip ' + str(ipv6))
#shell('./bin/xdp_iphash_to_cpu_cmdline del ip ' + str(ipv6))
delete_ip_mapping(str(ipv6))
def addDeviceIPsToFilter(circuit, cpuNumHex):
@ -1215,6 +1218,12 @@ def refreshShapersUpdateOnly():
print("refreshShapersUpdateOnly completed on " + datetime.now().strftime("%d/%m/%Y %H:%M:%S"))
if __name__ == '__main__':
if is_lqosd_alive:
print("lqosd is running")
else:
print("ERROR: lqosd is not running. Aborting")
os.exit()
parser = argparse.ArgumentParser()
parser.add_argument(
'-d', '--debug',

View File

@ -23,6 +23,17 @@ do
cp target/release/$prog ../bin
done
popd
# Copy the node manager's static web content
cp -R rust/lqos_node_manager/static/* bin/static
# Copy Rocket.toml to tell the node manager where to listen
cp rust/lqos_node_manager/Rocket.toml bin/
# Copy the Python library for LibreQoS.py et al.
pushd rust/lqos_python
cargo build --release
popd
cp rust/target/release/liblqos_python.so .
echo "Don't forget to setup /etc/lqos!"

88
src/rust/Cargo.lock generated
View File

@ -971,6 +971,12 @@ dependencies = [
"serde",
]
[[package]]
name = "indoc"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da2d6f23ffea9d7e76c53eee25dfb67bcd8fde7f1198b0855350698c9f07c780"
[[package]]
name = "inlinable_string"
version = "0.1.15"
@ -1206,6 +1212,16 @@ dependencies = [
"sysinfo",
]
[[package]]
name = "lqos_python"
version = "0.1.0"
dependencies = [
"anyhow",
"lqos_bus",
"pyo3",
"tokio",
]
[[package]]
name = "lqos_rs"
version = "0.1.0"
@ -1637,6 +1653,66 @@ dependencies = [
"yansi",
]
[[package]]
name = "pyo3"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "268be0c73583c183f2b14052337465768c07726936a260f480f0857cb95ba543"
dependencies = [
"cfg-if 1.0.0",
"indoc",
"libc",
"memoffset 0.6.5",
"parking_lot 0.12.1",
"pyo3-build-config",
"pyo3-ffi",
"pyo3-macros",
"unindent",
]
[[package]]
name = "pyo3-build-config"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28fcd1e73f06ec85bf3280c48c67e731d8290ad3d730f8be9dc07946923005c8"
dependencies = [
"once_cell",
"target-lexicon",
]
[[package]]
name = "pyo3-ffi"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f6cb136e222e49115b3c51c32792886defbfb0adead26a688142b346a0b9ffc"
dependencies = [
"libc",
"pyo3-build-config",
]
[[package]]
name = "pyo3-macros"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94144a1266e236b1c932682136dc35a9dee8d3589728f68130c7c3861ef96b28"
dependencies = [
"proc-macro2",
"pyo3-macros-backend",
"quote",
"syn",
]
[[package]]
name = "pyo3-macros-backend"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8df9be978a2d2f0cdebabb03206ed73b11314701a5bfe71b0d753b81997777f"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -2133,6 +2209,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "target-lexicon"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9410d0f6853b1d94f0e519fb95df60f29d2c1eff2d921ffdf01a4c8a3b54f12d"
[[package]]
name = "tempfile"
version = "3.3.0"
@ -2402,6 +2484,12 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "unindent"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1766d682d402817b5ac4490b3c3002d91dfa0d22812f341609f97b08757359c"
[[package]]
name = "universal-hash"
version = "0.5.0"

View File

@ -19,4 +19,5 @@ members = [
"xdp_iphash_to_cpu_cmdline", # Rust port of the C xdp_iphash_to_cpu_cmdline tool, for compatibility
"xdp_pping", # Rust port of cpumap's `xdp_pping` tool, for compatibility
"lqos_node_manager", # A lightweight web interface for management and local monitoring
"lqos_python", # Python bindings for using the Rust bus directly
]

View File

@ -0,0 +1,14 @@
[package]
name = "lqos_python"
version = "0.1.0"
edition = "2021"
[lib]
name = "lqos_python"
crate-type = ["cdylib"]
[dependencies]
pyo3 = "0.17"
lqos_bus = { path = "../lqos_bus" }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
anyhow = "1"

View File

@ -0,0 +1,25 @@
use lqos_bus::{BusRequest, BusResponse, BUS_BIND_ADDRESS, BusSession, encode_request, decode_response};
use anyhow::Result;
use tokio::{net::TcpStream, io::{AsyncWriteExt, AsyncReadExt}};
pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
let mut replies = Vec::new();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await?;
let test = BusSession {
auth_cookie: 1234,
requests: requests,
};
let msg = encode_request(&test)?;
stream.write(&msg).await?;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await?;
let reply = decode_response(&buf)?;
replies.extend_from_slice(&reply.responses);
Ok(replies)
})
}

View File

@ -0,0 +1,122 @@
use lqos_bus::{BusRequest, BusResponse, TcHandle};
use pyo3::{
exceptions::PyOSError, pyclass, pyfunction, pymodule, types::PyModule, wrap_pyfunction,
PyResult, Python,
};
mod blocking;
use anyhow::{Error, Result};
use blocking::run_query;
/// Defines the Python module exports.
/// All exported functions have to be listed here.
#[pymodule]
fn liblqos_python(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PyIpMapping>()?;
m.add_wrapped(wrap_pyfunction!(is_lqosd_alive))?;
m.add_wrapped(wrap_pyfunction!(list_ip_mappings))?;
m.add_wrapped(wrap_pyfunction!(clear_ip_mappings))?;
m.add_wrapped(wrap_pyfunction!(delete_ip_mapping))?;
m.add_wrapped(wrap_pyfunction!(add_ip_mapping))?;
Ok(())
}
/// Check that `lqosd` is running.
///
/// Returns true if it is running, false otherwise.
#[pyfunction]
fn is_lqosd_alive(_py: Python) -> PyResult<bool> {
if let Ok(reply) = run_query(vec![BusRequest::Ping]) {
for resp in reply.iter() {
match resp {
BusResponse::Ack => return Ok(true),
_ => {}
}
}
}
Ok(false)
}
/// Provides a representation of an IP address mapping
/// Available through python by field name.
#[pyclass]
pub struct PyIpMapping {
#[pyo3(get)]
pub ip_address: String,
#[pyo3(get)]
pub prefix_length: u32,
#[pyo3(get)]
pub tc_handle: (u16, u16),
#[pyo3(get)]
pub cpu: u32,
}
/// Returns a list of all IP mappings
#[pyfunction]
fn list_ip_mappings(_py: Python) -> PyResult<Vec<PyIpMapping>> {
let mut result = Vec::new();
if let Ok(reply) = run_query(vec![BusRequest::ListIpFlow]) {
for resp in reply.iter() {
match resp {
BusResponse::MappedIps(map) => {
for mapping in map.iter() {
result.push(PyIpMapping {
ip_address: mapping.ip_address.clone(),
prefix_length: mapping.prefix_length,
tc_handle: mapping.tc_handle.get_major_minor(),
cpu: mapping.cpu,
});
}
}
_ => {}
}
}
}
Ok(result)
}
/// Clear all IP address to TC/CPU mappings
#[pyfunction]
fn clear_ip_mappings(_py: Python) -> PyResult<()> {
run_query(vec![BusRequest::ClearIpFlow]).unwrap();
Ok(())
}
/// Deletes an IP to CPU/TC mapping.
///
/// ## Arguments
///
/// * `ip_address`: The IP address to unmap.
/// * `upload`: `true` if this needs to be applied to the upload map (for a split/stick setup)
#[pyfunction]
fn delete_ip_mapping(_py: Python, ip_address: String, upload: bool) -> PyResult<()> {
run_query(vec![BusRequest::DelIpFlow { ip_address, upload }]).unwrap();
Ok(())
}
/// Internal function
/// Converts IP address arguments into an IP mapping request.
fn parse_add_ip(ip: &str, classid: &str, cpu: &str, upload: bool) -> Result<BusRequest> {
if !classid.contains(":") {
return Err(Error::msg(
"Class id must be in the format (major):(minor), e.g. 1:12",
));
}
Ok(BusRequest::MapIpToFlow {
ip_address: ip.to_string(),
tc_handle: TcHandle::from_string(classid)?,
cpu: u32::from_str_radix(&cpu.replace("0x", ""), 16)?, // Force HEX representation
upload,
})
}
/// Adds an IP address mapping
#[pyfunction]
fn add_ip_mapping(ip: String, classid: String, cpu: String, upload: bool) -> PyResult<()> {
let request = parse_add_ip(&ip, &classid, &cpu, upload);
if let Ok(request) = request {
run_query(vec![request]).unwrap();
Ok(())
} else {
Err(PyOSError::new_err(request.err().unwrap().to_string()))
}
}