ISSUE #209 - Full error pass on lqos_queue_tracker module

* Replace every value unwrap with unwrap_or to not panic.
* Replace Anyhow errors with specific errors and log entries.
This commit is contained in:
Herbert Wolverson
2023-01-31 21:31:03 +00:00
parent 982e7314c1
commit 74101655d8
13 changed files with 438 additions and 179 deletions

1
src/rust/Cargo.lock generated
View File

@@ -1370,7 +1370,6 @@ dependencies = [
name = "lqos_queue_tracker"
version = "0.1.0"
dependencies = [
"anyhow",
"criterion",
"lazy_static",
"log",

View File

@@ -110,7 +110,7 @@ impl ConfigShapedDevices {
for d in self
.devices
.iter()
.map(|d| SerializableShapedDevice::from(d))
.map(SerializableShapedDevice::from)
{
if writer.serialize(&d).is_err() {
error!("Unable to serialize record, {:?}", d);

View File

@@ -4,7 +4,6 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
thiserror = "1"
serde = "1"
serde_json = "1"

View File

@@ -1,6 +1,7 @@
use crate::queue_types::QueueType;
use anyhow::Result;
use serde::Serialize;
use thiserror::Error;
use log::error;
#[derive(Debug, Clone, Serialize)]
pub enum QueueDiff {
@@ -12,13 +13,19 @@ pub enum QueueDiff {
// ClsAct,
}
pub(crate) fn make_queue_diff(previous: &QueueType, current: &QueueType) -> Result<QueueDiff> {
pub(crate) fn make_queue_diff(previous: &QueueType, current: &QueueType) -> Result<QueueDiff, QueueDiffError> {
match previous {
QueueType::Cake(..) => match current {
QueueType::Cake(..) => Ok(cake_diff(previous, current)?),
_ => Err(anyhow::Error::msg("Not implemented")),
_ => {
error!("Queue diffs are not implemented for Cake to {:?}", current);
Err(QueueDiffError::NotImplemented)
}
},
_ => Err(anyhow::Error::msg("Not implemented")),
_ => {
error!("Queue diffs are not implemented for {:?}", current);
Err(QueueDiffError::NotImplemented)
}
}
}
@@ -39,7 +46,7 @@ pub struct CakeDiffTin {
pub avg_delay_us: u32,
}
fn cake_diff(previous: &QueueType, current: &QueueType) -> Result<QueueDiff> {
fn cake_diff(previous: &QueueType, current: &QueueType) -> Result<QueueDiff, QueueDiffError> {
// TODO: Wrapping Handler
if let QueueType::Cake(prev) = previous {
if let QueueType::Cake(new) = current {
@@ -63,5 +70,11 @@ fn cake_diff(previous: &QueueType, current: &QueueType) -> Result<QueueDiff> {
}));
}
}
Err(anyhow::Error::msg("Not implemented"))
Err(QueueDiffError::NotImplemented)
}
#[derive(Debug, Error)]
pub enum QueueDiffError {
#[error("Not implemented")]
NotImplemented,
}

View File

@@ -1,18 +1,60 @@
mod queing_structure_json_monitor;
mod queue_network;
mod queue_node;
use anyhow::Result;
pub use queing_structure_json_monitor::spawn_queue_structure_monitor;
pub(crate) use queing_structure_json_monitor::QUEUE_STRUCTURE;
use queue_network::QueueNetwork;
use queue_node::QueueNode;
use thiserror::Error;
use log::error;
fn read_hex_string(s: &str) -> Result<u32> {
Ok(u32::from_str_radix(&s.replace("0x", ""), 16)?)
fn read_hex_string(s: &str) -> Result<u32, HexParseError> {
let result = u32::from_str_radix(&s.replace("0x", ""), 16);
match result {
Ok(data) => Ok(data),
Err(e) => {
error!("Unable to convert {s} to a u32");
error!("{:?}", e);
Err(HexParseError::ParseError)
}
}
}
pub(crate) fn read_queueing_structure() -> Result<Vec<QueueNode>> {
pub(crate) fn read_queueing_structure() -> Result<Vec<QueueNode>, QueueStructureError> {
// Note: the ? is allowed because the sub-types return a QueueStructureError and handle logging.
let network = QueueNetwork::from_json()?;
let flattened = network.to_flat();
Ok(flattened)
}
#[derive(Error, Debug)]
pub enum HexParseError {
#[error("Unable to decode string into valid hex")]
ParseError
}
#[derive(Error, Debug)]
pub enum QueueStructureError {
#[error("Unable to parse node structure from JSON")]
JsonKeyUnparseable(String),
#[error("unable to parse u64")]
U64Parse(String),
#[error("Unable to retrieve string from JSON")]
StringParse(String),
#[error("Unable to convert string to TC Handle")]
TcHandle(String),
#[error("Unable to convert string to u32 via hex")]
HexParse(String),
#[error("Error reading child circuit")]
Circuit,
#[error("Error reading child device")]
Device,
#[error("Error reading child's children")]
Children,
#[error("Unable to read configuration from /etc/lqos.conf")]
LqosConf,
#[error("Unable to access queueingStructure.json")]
FileNotFound,
#[error("Unable to read JSON")]
JSON,
}

View File

@@ -1,16 +1,21 @@
use super::queue_node::QueueNode;
use anyhow::{Error, Result};
use super::{queue_node::QueueNode, QueueStructureError};
use lqos_config::EtcLqos;
use serde_json::Value;
use std::path::{Path, PathBuf};
use log::error;
pub struct QueueNetwork {
pub(crate) cpu_node: Vec<QueueNode>,
}
impl QueueNetwork {
pub fn path() -> Result<PathBuf> {
let cfg = EtcLqos::load()?;
pub fn path() -> Result<PathBuf, QueueStructureError> {
let cfg = EtcLqos::load();
if cfg.is_err() {
error!("unable to read /etc/lqos.conf");
return Err(QueueStructureError::LqosConf);
}
let cfg = cfg.unwrap();
let base_path = Path::new(&cfg.lqos_directory);
Ok(base_path.join("queuingStructure.json"))
}
@@ -23,18 +28,17 @@ impl QueueNetwork {
}
}
pub(crate) fn from_json() -> Result<Self> {
pub(crate) fn from_json() -> Result<Self, QueueStructureError> {
let path = QueueNetwork::path()?;
if !QueueNetwork::exists() {
return Err(Error::msg(
"queueStructure.json does not exist yet. Try running LibreQoS?",
));
error!("queueStructure.json does not exist yet. Try running LibreQoS?");
return Err(QueueStructureError::FileNotFound);
}
let raw_string = std::fs::read_to_string(path)?;
let raw_string = std::fs::read_to_string(path).map_err(|_| QueueStructureError::FileNotFound)?;
let mut result = Self {
cpu_node: Vec::new(),
};
let json: Value = serde_json::from_str(&raw_string)?;
let json: Value = serde_json::from_str(&raw_string).map_err(|_| QueueStructureError::FileNotFound)?;
if let Value::Object(map) = &json {
if let Some(network) = map.get("Network") {
if let Value::Object(map) = network {
@@ -42,13 +46,16 @@ impl QueueNetwork {
result.cpu_node.push(QueueNode::from_json(&key, value)?);
}
} else {
return Err(Error::msg("Unable to parse network object structure"));
error!("Unable to parse JSON for queueStructure");
return Err(QueueStructureError::JSON);
}
} else {
return Err(Error::msg("Network entry not found"));
error!("Unable to parse JSON for queueStructure");
return Err(QueueStructureError::JSON);
}
} else {
return Err(Error::msg("Unable to parse queueStructure.json"));
error!("Unable to parse JSON for queueStructure");
return Err(QueueStructureError::JSON);
}
Ok(result)

View File

@@ -1,7 +1,7 @@
use super::read_hex_string;
use anyhow::{Error, Result};
use super::{read_hex_string, QueueStructureError};
use lqos_bus::TcHandle;
use serde_json::Value;
use log::error;
#[derive(Default, Clone, Debug)]
pub struct QueueNode {
@@ -30,82 +30,185 @@ pub struct QueueNode {
pub children: Vec<QueueNode>,
}
/// Provides a convenient wrapper that attempts to decode a u64 from a JSON
/// value, and returns an error if decoding fails.
macro_rules! grab_u64 {
($target: expr, $key: expr, $value: expr) => {
let tmp = $value.as_u64().ok_or(QueueStructureError::U64Parse(format!("{} => {:?}", $key, $value)));
if tmp.is_err() {
error!("Error decoding JSON. Key: {}, Value: {:?} is not readily convertible to a u64.", $key, $value);
return Err(tmp.unwrap_err());
} else {
$target = tmp.unwrap();
}
};
}
/// Provides a macro to safely unwrap TC Handles and issue an error if they didn't parse
/// correctly.
macro_rules! grab_tc_handle {
($target: expr, $key: expr, $value: expr) => {
let s = $value.as_str();
if s.is_none() {
error!("Unable to parse {:?} as a string from JSON", s);
return Err(QueueStructureError::StringParse(format!("{:?}", $value)));
}
let s = s.unwrap();
let tmp = TcHandle::from_string(s);
if tmp.is_err() {
error!("Unable to parse {:?} as a TC Handle", s);
return Err(QueueStructureError::TcHandle(format!("{:?}", tmp)));
}
$target = tmp.unwrap();
};
}
/// Macro to convert hex strings (e.g. 0xff) to a u32
macro_rules! grab_hex {
($target: expr, $key: expr, $value: expr) => {
let s = $value.as_str();
if s.is_none() {
error!("Unable to parse {:?} as a string from JSON", $value);
return Err(QueueStructureError::StringParse(format!("{:?}", s)));
}
let s = s.unwrap();
let tmp = read_hex_string(s);
if tmp.is_err() {
error!("Unable to parse {:?} as a hex string", $value);
return Err(QueueStructureError::HexParse(format!("{:?}", tmp)));
}
$target = tmp.unwrap();
};
}
/// Macro to extract an option<string>
macro_rules! grab_string_option {
($target: expr, $key: expr, $value: expr) => {
let s = $value.as_str();
if s.is_none() {
error!("Unable to parse {:?} as a string from JSON", $value);
return Err(QueueStructureError::StringParse(format!("{:?}", s)));
}
$target = Some(s.unwrap().to_string());
};
}
/// Macro to extract a string
macro_rules! grab_string {
($target: expr, $key: expr, $value: expr) => {
let s = $value.as_str();
if s.is_none() {
error!("Unable to parse {:?} as a string from JSON", $value);
return Err(QueueStructureError::StringParse(format!("{:?}", s)));
}
$target = s.unwrap().to_string();
};
}
impl QueueNode {
pub(crate) fn from_json(key: &str, value: &Value) -> Result<Self> {
pub(crate) fn from_json(key: &str, value: &Value) -> Result<Self, QueueStructureError> {
let mut result = Self::default();
if let Value::Object(map) = value {
for (key, value) in map.iter() {
match key.as_str() {
"downloadBandwidthMbps" | "maxDownload" => {
result.download_bandwidth_mbps = value.as_u64().unwrap()
grab_u64!(result.download_bandwidth_mbps, key.as_str(), value);
}
"uploadBandwidthMbps" | "maxUpload" => {
result.upload_bandwidth_mbps = value.as_u64().unwrap()
grab_u64!(result.upload_bandwidth_mbps, key.as_str(), value);
}
"downloadBandwidthMbpsMin" | "minDownload" => {
result.download_bandwidth_mbps_min = value.as_u64().unwrap()
grab_u64!(result.download_bandwidth_mbps_min, key.as_str(), value);
}
"uploadBandwidthMbpsMin" | "minUpload" => {
result.upload_bandwidth_mbps_min = value.as_u64().unwrap()
grab_u64!(result.upload_bandwidth_mbps_min, key.as_str(), value);
}
"classid" => {
result.class_id =
TcHandle::from_string(&value.as_str().unwrap().to_string())?
grab_tc_handle!(result.class_id, key.as_str(), value);
}
"up_classid" => {
result.up_class_id =
TcHandle::from_string(value.as_str().unwrap().to_string())?
grab_tc_handle!(result.up_class_id, key.as_str(), value);
}
"classMajor" => {
grab_hex!(result.class_major, key.as_str(), value);
}
"classMajor" => result.class_major = read_hex_string(value.as_str().unwrap())?,
"up_classMajor" => {
result.up_class_major = read_hex_string(value.as_str().unwrap())?
grab_hex!(result.up_class_major, key.as_str(), value);
}
"classMinor" => {
grab_hex!(result.class_minor, key.as_str(), value);
}
"cpuNum" => {
grab_hex!(result.cpu_num, key.as_str(), value);
}
"up_cpuNum" => {
grab_hex!(result.up_cpu_num, key.as_str(), value);
}
"classMinor" => result.class_minor = read_hex_string(value.as_str().unwrap())?,
"cpuNum" => result.cpu_num = read_hex_string(value.as_str().unwrap())?,
"up_cpuNum" => result.up_cpu_num = read_hex_string(value.as_str().unwrap())?,
"parentClassID" => {
result.parent_class_id =
TcHandle::from_string(value.as_str().unwrap().to_string())?
grab_tc_handle!(result.parent_class_id, key.as_str(), value);
}
"up_parentClassID" => {
result.up_parent_class_id =
TcHandle::from_string(value.as_str().unwrap().to_string())?
grab_tc_handle!(result.up_parent_class_id, key.as_str(), value);
}
"circuitId" | "circuitID" => {
result.circuit_id = Some(value.as_str().unwrap().to_string())
grab_string_option!(result.circuit_id, key.as_str(), value);
}
"circuitName" => {
result.circuit_name = Some(value.as_str().unwrap().to_string())
grab_string_option!(result.circuit_name, key.as_str(), value);
}
"parentNode" | "ParentNode" => {
result.parent_node = Some(value.as_str().unwrap().to_string())
grab_string_option!(result.parent_node, key.as_str(), value);
}
"comment" => {
grab_string!(result.comment, key.as_str(), value);
}
"comment" => result.comment = value.as_str().unwrap().to_string(),
"deviceId" | "deviceID" => {
result.device_id = Some(value.as_str().unwrap().to_string())
grab_string_option!(result.device_id, key.as_str(), value);
}
"deviceName" => {
grab_string_option!(result.device_name, key.as_str(), value);
}
"mac" => {
grab_string_option!(result.mac, key.as_str(), value);
}
"deviceName" => result.device_name = Some(value.as_str().unwrap().to_string()),
"mac" => result.mac = Some(value.as_str().unwrap().to_string()),
"ipv4s" => {} // Ignore
"ipv6s" => {}
"circuits" => {
if let Value::Array(array) = value {
for c in array.iter() {
result.circuits.push(QueueNode::from_json(key, c)?);
let n = QueueNode::from_json(key, c);
if n.is_err() {
error!("Unable to read circuit children");
error!("{:?}", n);
return Err(QueueStructureError::Circuit);
}
result.circuits.push(n.unwrap());
}
}
}
"devices" => {
if let Value::Array(array) = value {
for c in array.iter() {
result.devices.push(QueueNode::from_json(key, c)?);
let n = QueueNode::from_json(key, c);
if n.is_err() {
error!("Unable to read device children");
error!("{:?}", n);
return Err(QueueStructureError::Device);
}
result.devices.push(n.unwrap());
}
}
}
"children" => {
if let Value::Object(map) = value {
for (key, c) in map.iter() {
result.circuits.push(QueueNode::from_json(key, c)?);
let n = QueueNode::from_json(key, c);
if n.is_err() {
error!("Unable to read children. Don't worry, we all feel that way sometimes.");
error!("{:?}", n);
return Err(QueueStructureError::Children);
}
result.circuits.push(n.unwrap());
}
} else {
log::warn!("Children was not an object");
@@ -116,8 +219,9 @@ impl QueueNode {
}
}
} else {
return Err(Error::msg(format!(
"Unable to parse node structure for [{key}]"
error!("Unable to parse node structure for [{key}]");
return Err(QueueStructureError::JsonKeyUnparseable(format!(
"{key}"
)));
}
Ok(result)

View File

@@ -2,9 +2,10 @@ mod tc_cake;
mod tc_fq_codel;
mod tc_htb;
mod tc_mq;
use anyhow::{Error, Result};
use serde::Serialize;
use serde_json::Value;
use thiserror::Error;
use log::warn;
#[derive(Debug, Clone, Serialize)]
pub enum QueueType {
@@ -16,22 +17,25 @@ pub enum QueueType {
}
impl QueueType {
fn parse(kind: &str, map: &serde_json::Map<std::string::String, Value>) -> Result<QueueType> {
fn parse(kind: &str, map: &serde_json::Map<std::string::String, Value>) -> Result<QueueType, QDiscError> {
match kind {
"mq" => Ok(QueueType::Mq(tc_mq::TcMultiQueue::from_json(map)?)),
"htb" => Ok(QueueType::Htb(tc_htb::TcHtb::from_json(map)?)),
"fq_codel" => Ok(QueueType::FqCodel(tc_fq_codel::TcFqCodel::from_json(map)?)),
"cake" => Ok(QueueType::Cake(tc_cake::TcCake::from_json(map)?)),
"clsact" => Ok(QueueType::ClsAct),
_ => Err(Error::msg(format!("Unknown queue kind: {kind}"))),
_ => {
warn!("I don't know how to parse qdisc type {kind}");
Err(QDiscError::UnknownQdisc(format!("Unknown queue kind: {kind}")))
}
}
}
}
/// Separated into a separate function for cleaner benchmark code
pub fn deserialize_tc_tree(json: &str) -> Result<Vec<QueueType>> {
pub fn deserialize_tc_tree(json: &str) -> Result<Vec<QueueType>, QDiscError> {
let mut result = Vec::new();
let json: Value = serde_json::from_str(json)?;
let json: Value = serde_json::from_str(json).map_err(|_| QDiscError::Json(json.to_string()))?;
if let Value::Array(array) = &json {
for entry in array.iter() {
match entry {
@@ -47,8 +51,48 @@ pub fn deserialize_tc_tree(json: &str) -> Result<Vec<QueueType>> {
}
}
} else {
return Err(Error::msg("Unable to parse TC data array"));
warn!("Failed to parse TC queue stats data array.");
return Err(QDiscError::ArrayInvalid);
}
Ok(result)
}
#[derive(Error, Debug)]
pub enum QDiscError {
#[error("Unknown queue kind")]
UnknownQdisc(String),
#[error("Error parsing queue information JSON")]
Json(String),
#[error("Unable to parse TC data array")]
ArrayInvalid,
#[error("Unable to parse Cake Tin options")]
CakeTin,
#[error("Unable to parse Cake options")]
CakeOpts,
#[error("Unable to parse HTB options")]
HtbOpts,
#[error("Unable to parse fq_codel options")]
CodelOpts,
}
/// Used to extract TC handles without unwrapping.
/// Sets a default value if none can be extracted, rather than
/// bailing on the entire parse run.
#[macro_export]
macro_rules! parse_tc_handle {
($target: expr, $value: expr) => {
let s = $value.as_str();
if let Some(s) = s {
if let Ok(handle) = TcHandle::from_string(s) {
$target = handle;
} else {
info_once!("Unable to extract TC handle from string");
$target = TcHandle::default();
}
} else {
info_once!("Unable to extract string for TC handle");
$target = TcHandle::default();
}
};
}

View File

@@ -1,9 +1,11 @@
use anyhow::{Error, Result};
use lqos_bus::TcHandle;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use lqos_utils::{string_table_enum, dashy_table_enum};
use log_once::info_once;
use log::warn;
use crate::parse_tc_handle;
use super::QDiscError;
string_table_enum!(DiffServ, besteffort, diffserv3, diffserv4, diffserv8, precedence);
dashy_table_enum!(AckFilter, none, ack_filter, ack_filter_aggressive);
@@ -74,27 +76,31 @@ pub struct TcCakeTin {
}
impl TcCake {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self> {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self, QDiscError> {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"handle" => result.handle = TcHandle::from_string(value.as_str().unwrap())?,
"parent" => result.parent = TcHandle::from_string(value.as_str().unwrap())?,
"bytes" => result.bytes = value.as_u64().unwrap(),
"packets" => result.packets = value.as_u64().unwrap() as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap() as u32,
"requeues" => result.requeues = value.as_u64().unwrap() as u32,
"backlog" => result.backlog = value.as_u64().unwrap() as u32,
"qlen" => result.qlen = value.as_u64().unwrap() as u32,
"memory_used" => result.memory_used = value.as_u64().unwrap() as u32,
"memory_limit" => result.memory_limit = value.as_u64().unwrap() as u32,
"capacity_estimate" => result.capacity_estimate = value.as_u64().unwrap() as u32,
"min_network_size" => result.min_network_size = value.as_u64().unwrap() as u16,
"max_network_size" => result.max_network_size = value.as_u64().unwrap() as u16,
"min_adj_size" => result.min_adj_size = value.as_u64().unwrap() as u16,
"max_adj_size" => result.max_adj_size = value.as_u64().unwrap() as u16,
"avg_hdr_offset" => result.avg_hdr_offset = value.as_u64().unwrap() as u16,
"drops" => result.drops = value.as_u64().unwrap() as u32,
"handle" => {
parse_tc_handle!(result.handle, value);
}
"parent" => {
parse_tc_handle!(result.parent, value);
}
"bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,
"memory_used" => result.memory_used = value.as_u64().unwrap_or(0) as u32,
"memory_limit" => result.memory_limit = value.as_u64().unwrap_or(0) as u32,
"capacity_estimate" => result.capacity_estimate = value.as_u64().unwrap_or(0) as u32,
"min_network_size" => result.min_network_size = value.as_u64().unwrap_or(0) as u16,
"max_network_size" => result.max_network_size = value.as_u64().unwrap_or(0) as u16,
"min_adj_size" => result.min_adj_size = value.as_u64().unwrap_or(0) as u16,
"max_adj_size" => result.max_adj_size = value.as_u64().unwrap_or(0) as u16,
"avg_hdr_offset" => result.avg_hdr_offset = value.as_u64().unwrap_or(0) as u16,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"options" => result.options = TcCakeOptions::from_json(value)?,
"tins" => match value {
Value::Array(array) => {
@@ -115,24 +121,24 @@ impl TcCake {
}
impl TcCakeOptions {
fn from_json(value: &Value) -> Result<Self> {
fn from_json(value: &Value) -> Result<Self, QDiscError> {
match value {
Value::Object(map) => {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"bandwidth" => result.bandwidth = BandWidth::from_str(value.as_str().unwrap()),
"diffserv" => result.diffserv = DiffServ::from_str(value.as_str().unwrap()),
"flowmode" => result.flowmode = FlowMode::from_str(value.as_str().unwrap()),
"nat" => result.nat = value.as_bool().unwrap(),
"wash" => result.wash = value.as_bool().unwrap(),
"ingress" => result.ingress = value.as_bool().unwrap(),
"ack-filter" => result.ack_filter = AckFilter::from_str(value.as_str().unwrap()),
"split_gso" => result.split_gso = value.as_bool().unwrap(),
"rtt" => result.rtt = value.as_u64().unwrap(),
"raw" => result.raw = value.as_bool().unwrap(),
"overhead" => result.overhead = value.as_u64().unwrap() as u16,
"fwmark" => result.fwmark = value.as_str().unwrap().to_string(),
"bandwidth" => result.bandwidth = BandWidth::from_str(value.as_str().unwrap_or("")),
"diffserv" => result.diffserv = DiffServ::from_str(value.as_str().unwrap_or("")),
"flowmode" => result.flowmode = FlowMode::from_str(value.as_str().unwrap_or("")),
"nat" => result.nat = value.as_bool().unwrap_or(false),
"wash" => result.wash = value.as_bool().unwrap_or(false),
"ingress" => result.ingress = value.as_bool().unwrap_or(false),
"ack-filter" => result.ack_filter = AckFilter::from_str(value.as_str().unwrap_or("")),
"split_gso" => result.split_gso = value.as_bool().unwrap_or(false),
"rtt" => result.rtt = value.as_u64().unwrap_or(0),
"raw" => result.raw = value.as_bool().unwrap_or(false),
"overhead" => result.overhead = value.as_u64().unwrap_or(0) as u16,
"fwmark" => result.fwmark = value.as_str().unwrap_or("").to_string(),
_ => {
info_once!("Unknown entry in tc-cake-options json decoder: {key}");
}
@@ -140,42 +146,42 @@ impl TcCakeOptions {
}
Ok(result)
}
_ => Err(Error::msg("Unable to parse cake options")),
_ => Err(QDiscError::CakeOpts),
}
}
}
impl TcCakeTin {
fn from_json(value: &Value) -> Result<Self> {
fn from_json(value: &Value) -> Result<Self, QDiscError> {
match value {
Value::Object(map) => {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"threshold_rate" => result.threshold_rate = value.as_u64().unwrap(),
"sent_bytes" => result.sent_bytes = value.as_u64().unwrap(),
"backlog_bytes" => result.backlog_bytes = value.as_u64().unwrap() as u32,
"target_us" => result.target_us = value.as_u64().unwrap() as u32,
"interval_us" => result.interval_us = value.as_u64().unwrap() as u32,
"peak_delay_us" => result.peak_delay_us = value.as_u64().unwrap() as u32,
"avg_delay_us" => result.avg_delay_us = value.as_u64().unwrap() as u32,
"base_delay_us" => result.base_delay_us = value.as_u64().unwrap() as u32,
"sent_packets" => result.sent_packets = value.as_u64().unwrap() as u32,
"threshold_rate" => result.threshold_rate = value.as_u64().unwrap_or(0),
"sent_bytes" => result.sent_bytes = value.as_u64().unwrap_or(0),
"backlog_bytes" => result.backlog_bytes = value.as_u64().unwrap_or(0) as u32,
"target_us" => result.target_us = value.as_u64().unwrap_or(0) as u32,
"interval_us" => result.interval_us = value.as_u64().unwrap_or(0) as u32,
"peak_delay_us" => result.peak_delay_us = value.as_u64().unwrap_or(0) as u32,
"avg_delay_us" => result.avg_delay_us = value.as_u64().unwrap_or(0) as u32,
"base_delay_us" => result.base_delay_us = value.as_u64().unwrap_or(0) as u32,
"sent_packets" => result.sent_packets = value.as_u64().unwrap_or(0) as u32,
"way_indirect_hits" => {
result.way_indirect_hits = value.as_u64().unwrap() as u16
result.way_indirect_hits = value.as_u64().unwrap_or(0) as u16
}
"way_misses" => result.way_misses = value.as_u64().unwrap() as u16,
"way_collisions" => result.way_collisions = value.as_u64().unwrap() as u16,
"drops" => result.drops = value.as_u64().unwrap() as u32,
"ecn_mark" => result.ecn_marks = value.as_u64().unwrap() as u32,
"ack_drops" => result.ack_drops = value.as_u64().unwrap() as u32,
"sparse_flows" => result.sparse_flows = value.as_u64().unwrap() as u16,
"bulk_flows" => result.bulk_flows = value.as_u64().unwrap() as u16,
"way_misses" => result.way_misses = value.as_u64().unwrap_or(0) as u16,
"way_collisions" => result.way_collisions = value.as_u64().unwrap_or(0) as u16,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"ecn_mark" => result.ecn_marks = value.as_u64().unwrap_or(0) as u32,
"ack_drops" => result.ack_drops = value.as_u64().unwrap_or(0) as u32,
"sparse_flows" => result.sparse_flows = value.as_u64().unwrap_or(0) as u16,
"bulk_flows" => result.bulk_flows = value.as_u64().unwrap_or(0) as u16,
"unresponsive_flows" => {
result.unresponsive_flows = value.as_u64().unwrap() as u16
result.unresponsive_flows = value.as_u64().unwrap_or(0) as u16
}
"max_pkt_len" => result.max_pkt_len = value.as_u64().unwrap() as u16,
"flow_quantum" => result.flow_quantum = value.as_u64().unwrap() as u16,
"max_pkt_len" => result.max_pkt_len = value.as_u64().unwrap_or(0) as u16,
"flow_quantum" => result.flow_quantum = value.as_u64().unwrap_or(0) as u16,
_ => {
info_once!("Unknown entry in tc-cake-tin json decoder: {key}");
}
@@ -183,7 +189,10 @@ impl TcCakeTin {
}
Ok(result)
}
_ => Err(Error::msg("Unable to parse cake tin options")),
_ => {
warn!("Unable to parse cake tin");
Err(QDiscError::CakeTin)
}
}
}
}

View File

@@ -5,11 +5,12 @@
"ecn_mark":0,"new_flows_len":0,"old_flows_len":0},
*/
use anyhow::{Error, Result};
use lqos_bus::TcHandle;
use serde::Serialize;
use serde_json::Value;
use log_once::info_once;
use crate::parse_tc_handle;
use super::QDiscError;
#[derive(Default, Clone, Debug, Serialize)]
pub struct TcFqCodel {
@@ -44,25 +45,29 @@ struct TcFqCodelOptions {
}
impl TcFqCodel {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self> {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self, QDiscError> {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"handle" => result.handle = TcHandle::from_string(value.as_str().unwrap())?,
"parent" => result.parent = TcHandle::from_string(value.as_str().unwrap())?,
"bytes" => result.bytes = value.as_u64().unwrap(),
"packets" => result.packets = value.as_u64().unwrap() as u32,
"drops" => result.drops = value.as_u64().unwrap() as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap() as u32,
"requeues" => result.requeues = value.as_u64().unwrap() as u32,
"backlog" => result.backlog = value.as_u64().unwrap() as u32,
"qlen" => result.qlen = value.as_u64().unwrap() as u32,
"maxpacket" => result.maxpacket = value.as_u64().unwrap() as u16,
"drop_overlimit" => result.drop_overlimit = value.as_u64().unwrap() as u32,
"new_flow_count" => result.new_flow_count = value.as_u64().unwrap() as u32,
"ecn_mark" => result.ecn_mark = value.as_u64().unwrap() as u32,
"new_flows_len" => result.new_flows_len = value.as_u64().unwrap() as u16,
"old_flows_len" => result.old_flows_len = value.as_u64().unwrap() as u16,
"handle" => {
parse_tc_handle!(result.handle, value);
}
"parent" => {
parse_tc_handle!(result.parent, value);
}
"bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,
"maxpacket" => result.maxpacket = value.as_u64().unwrap_or(0) as u16,
"drop_overlimit" => result.drop_overlimit = value.as_u64().unwrap_or(0) as u32,
"new_flow_count" => result.new_flow_count = value.as_u64().unwrap_or(0) as u32,
"ecn_mark" => result.ecn_mark = value.as_u64().unwrap_or(0) as u32,
"new_flows_len" => result.new_flows_len = value.as_u64().unwrap_or(0) as u16,
"old_flows_len" => result.old_flows_len = value.as_u64().unwrap_or(0) as u16,
"options" => result.options = TcFqCodelOptions::from_json(value)?,
"kind" => {}
_ => {
@@ -75,20 +80,20 @@ impl TcFqCodel {
}
impl TcFqCodelOptions {
fn from_json(value: &Value) -> Result<Self> {
fn from_json(value: &Value) -> Result<Self, QDiscError> {
match value {
Value::Object(map) => {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"limit" => result.limit = value.as_u64().unwrap() as u32,
"flows" => result.flows = value.as_u64().unwrap() as u16,
"quantum" => result.quantum = value.as_u64().unwrap() as u16,
"target" => result.target = value.as_u64().unwrap(),
"interval" => result.interval = value.as_u64().unwrap(),
"memory_limit" => result.memory_limit = value.as_u64().unwrap() as u32,
"ecn" => result.ecn = value.as_bool().unwrap(),
"drop_batch" => result.drop_batch = value.as_u64().unwrap() as u16,
"limit" => result.limit = value.as_u64().unwrap_or(0) as u32,
"flows" => result.flows = value.as_u64().unwrap_or(0) as u16,
"quantum" => result.quantum = value.as_u64().unwrap_or(0) as u16,
"target" => result.target = value.as_u64().unwrap_or(0),
"interval" => result.interval = value.as_u64().unwrap_or(0),
"memory_limit" => result.memory_limit = value.as_u64().unwrap_or(0) as u32,
"ecn" => result.ecn = value.as_bool().unwrap_or(false),
"drop_batch" => result.drop_batch = value.as_u64().unwrap_or(0) as u16,
_ => {
info_once!("Unknown entry in tc-codel-options json decoder: {key}");
}
@@ -96,7 +101,7 @@ impl TcFqCodelOptions {
}
Ok(result)
}
_ => Err(Error::msg("Unable to parse fq_codel options")),
_ => Err(QDiscError::CodelOpts),
}
}
}

View File

@@ -3,11 +3,12 @@
"bytes":1920791512305,"packets":1466145855,"drops":32136937,"overlimits":2627500070,"requeues":1224,"backlog":0,"qlen":0}
*/
use anyhow::{Error, Result};
use lqos_bus::TcHandle;
use serde::Serialize;
use serde_json::Value;
use log_once::info_once;
use crate::parse_tc_handle;
use super::QDiscError;
#[derive(Default, Clone, Debug, Serialize)]
pub struct TcHtb {
@@ -32,19 +33,23 @@ struct TcHtbOptions {
}
impl TcHtb {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self> {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self, QDiscError> {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"handle" => result.handle = TcHandle::from_string(value.as_str().unwrap())?,
"parent" => result.parent = TcHandle::from_string(value.as_str().unwrap())?,
"bytes" => result.bytes = value.as_u64().unwrap(),
"packets" => result.packets = value.as_u64().unwrap() as u32,
"drops" => result.drops = value.as_u64().unwrap() as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap() as u32,
"requeues" => result.requeues = value.as_u64().unwrap() as u32,
"backlog" => result.backlog = value.as_u64().unwrap() as u32,
"qlen" => result.qlen = value.as_u64().unwrap() as u32,
"handle" => {
parse_tc_handle!(result.handle, value);
}
"parent" => {
parse_tc_handle!(result.parent, value);
}
"bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,
"options" => result.options = TcHtbOptions::from_json(value)?,
"kind" => {}
_ => {
@@ -57,20 +62,20 @@ impl TcHtb {
}
impl TcHtbOptions {
fn from_json(value: &Value) -> Result<Self> {
fn from_json(value: &Value) -> Result<Self, QDiscError> {
match value {
Value::Object(map) => {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"r2q" => result.r2q = value.as_u64().unwrap() as u32,
"r2q" => result.r2q = value.as_u64().unwrap_or(0) as u32,
"default" => {
result.default = TcHandle::from_string(value.as_str().unwrap())?
parse_tc_handle!(result.default, value);
}
"direct_packets_stat" => {
result.direct_packets_stat = value.as_u64().unwrap() as u32
result.direct_packets_stat = value.as_u64().unwrap_or(0) as u32
}
"direct_qlen" => result.direct_qlen = value.as_u64().unwrap() as u32,
"direct_qlen" => result.direct_qlen = value.as_u64().unwrap_or(0) as u32,
_ => {
info_once!("Unknown entry in tc-HTB json decoder: {key}");
}
@@ -78,7 +83,7 @@ impl TcHtbOptions {
}
Ok(result)
}
_ => Err(Error::msg("Unable to parse HTB options")),
_ => Err(QDiscError::HtbOpts),
}
}
}

View File

@@ -2,11 +2,12 @@
{"kind":"mq","handle":"7fff:","root":true,"options":{},"bytes":0,"packets":0,"drops":0,"overlimits":0,"requeues":0,"backlog":0,"qlen":0}
*/
use anyhow::Result;
use lqos_bus::TcHandle;
use serde::Serialize;
use serde_json::Value;
use log_once::info_once;
use crate::parse_tc_handle;
use super::QDiscError;
#[derive(Default, Clone, Debug, Serialize)]
pub struct TcMultiQueue {
@@ -22,19 +23,21 @@ pub struct TcMultiQueue {
}
impl TcMultiQueue {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self> {
pub(crate) fn from_json(map: &serde_json::Map<std::string::String, Value>) -> Result<Self, QDiscError> {
let mut result = Self::default();
for (key, value) in map.iter() {
match key.as_str() {
"handle" => result.handle = TcHandle::from_string(value.as_str().unwrap())?,
"root" => result.root = value.as_bool().unwrap(),
"bytes" => result.bytes = value.as_u64().unwrap(),
"packets" => result.packets = value.as_u64().unwrap() as u32,
"drops" => result.drops = value.as_u64().unwrap() as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap() as u32,
"requeues" => result.requeues = value.as_u64().unwrap() as u32,
"backlog" => result.backlog = value.as_u64().unwrap() as u32,
"qlen" => result.qlen = value.as_u64().unwrap() as u32,
"handle" => {
parse_tc_handle!(result.handle, value);
}
"root" => result.root = value.as_bool().unwrap_or(false),
"bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,
"kind" => {}
"options" => {}
_ => {

View File

@@ -1,14 +1,15 @@
use crate::{deserialize_tc_tree, queue_types::QueueType};
use anyhow::Result;
use lqos_bus::TcHandle;
use std::process::Command;
use thiserror::Error;
use log::error;
const TC: &str = "/sbin/tc";
pub fn read_named_queue_from_interface(
interface: &str,
tc_handle: TcHandle,
) -> Result<Vec<QueueType>> {
) -> Result<Vec<QueueType>, QueueReaderError> {
let command_output = Command::new(TC)
.args([
"-s",
@@ -20,9 +21,37 @@ pub fn read_named_queue_from_interface(
"parent",
&tc_handle.to_string(),
])
.output()?;
.output();
let json = String::from_utf8(command_output.stdout)?;
if command_output.is_err() {
error!("Failed to call process tc -s -j qdisc show dev {interface} parent {}", &tc_handle.to_string());
error!("{:?}", command_output);
return Err(QueueReaderError::CommandError);
}
let command_output = command_output.unwrap();
let json = String::from_utf8(command_output.stdout);
if json.is_err() {
error!("Failed to convert byte stream to UTF-8 string");
error!("{:?}", json);
return Err(QueueReaderError::Utf8Error);
}
let json = json.unwrap();
let result = deserialize_tc_tree(&json);
Ok(result?)
if result.is_err() {
error!("Failed to deserialize TC tree result.");
error!("{:?}", result);
return Err(QueueReaderError::Deserialization);
}
Ok(result.unwrap())
}
#[derive(Error, Debug)]
pub enum QueueReaderError {
#[error("Subprocess call failed")]
CommandError,
#[error("Failed to convert bytes to valid UTF-8")]
Utf8Error,
#[error("Deserialization Error")]
Deserialization,
}