#229 - Run cargo fmt to format everything.

This commit is contained in:
Herbert Wolverson 2023-02-14 22:06:57 +00:00
parent 6fe97e97b6
commit 088f608614
34 changed files with 136 additions and 152 deletions

View File

@ -44,10 +44,8 @@ pub fn criterion_benchmark(c: &mut Criterion) {
}); });
// Enable the Tokio runtime to test round-trip // Enable the Tokio runtime to test round-trip
let tokio_rt = tokio::runtime::Builder::new_current_thread() let tokio_rt =
.enable_io() tokio::runtime::Builder::new_current_thread().enable_io().build().unwrap();
.build()
.unwrap();
c.bench_function("bus_ping_round_trip", |b| { c.bench_function("bus_ping_round_trip", |b| {
b.iter(|| { b.iter(|| {

View File

@ -1,5 +1,3 @@
fn main() { fn main() {
cc::Build::new() cc::Build::new().file("src/tc_handle_parser.c").compile("tc_handle_parse.o");
.file("src/tc_handle_parser.c")
.compile("tc_handle_parse.o");
} }

View File

@ -70,10 +70,8 @@ impl BusClient {
// Send with a timeout. If the timeout fails, then the stream went wrong // Send with a timeout. If the timeout fails, then the stream went wrong
if self.stream.is_some() { if self.stream.is_some() {
let timer = timeout( let timer =
self.timeout, timeout(self.timeout, Self::send(self.stream.as_mut().unwrap(), &msg));
Self::send(self.stream.as_mut().unwrap(), &msg),
);
let failed = let failed =
if let Ok(inner) = timer.await { inner.is_err() } else { false }; if let Ok(inner) = timer.await { inner.is_err() } else { false };
if failed { if failed {

View File

@ -51,7 +51,9 @@ impl UnixSocketServer {
fn path_permissions() -> Result<(), UnixSocketServerError> { fn path_permissions() -> Result<(), UnixSocketServerError> {
let unix_path = CString::new(BUS_SOCKET_DIRECTORY); let unix_path = CString::new(BUS_SOCKET_DIRECTORY);
if unix_path.is_err() { if unix_path.is_err() {
error!("Unable to create C-compatible path string. This should never happen."); error!(
"Unable to create C-compatible path string. This should never happen."
);
return Err(UnixSocketServerError::CString); return Err(UnixSocketServerError::CString);
} }
let unix_path = unix_path.unwrap(); let unix_path = unix_path.unwrap();

View File

@ -113,9 +113,7 @@ impl WebUsers {
if let Ok(users) = parse_result { if let Ok(users) = parse_result {
Ok(users) Ok(users)
} else { } else {
error!( error!("Unable to deserialize lqusers.toml. Error in next message.");
"Unable to deserialize lqusers.toml. Error in next message."
);
error!("{:?}", parse_result); error!("{:?}", parse_result);
Err(AuthenticationError::UnableToParse) Err(AuthenticationError::UnableToParse)
} }

View File

@ -28,7 +28,10 @@ fn working_directory() -> Result<PathBuf, ProgramControlError> {
pub fn load_libreqos() -> Result<String, ProgramControlError> { pub fn load_libreqos() -> Result<String, ProgramControlError> {
let path = path_to_libreqos()?; let path = path_to_libreqos()?;
if !path.exists() { if !path.exists() {
error!("Unable to locate LibreQoS.py. ({}) Check your configuration directory.", path.display()); error!(
"Unable to locate LibreQoS.py. ({}) Check your configuration directory.",
path.display()
);
return Err(ProgramControlError::LibreQosPyNotFound); return Err(ProgramControlError::LibreQosPyNotFound);
} }
if !Path::new(PYTHON_PATH).exists() { if !Path::new(PYTHON_PATH).exists() {

View File

@ -33,8 +33,8 @@ impl ConfigShapedDevices {
/// by acquiring the prefix from the `/etc/lqos.conf` configuration /// by acquiring the prefix from the `/etc/lqos.conf` configuration
/// file. /// file.
pub fn path() -> Result<PathBuf, ShapedDevicesError> { pub fn path() -> Result<PathBuf, ShapedDevicesError> {
let cfg = etc::EtcLqos::load() let cfg =
.map_err(|_| ShapedDevicesError::ConfigLoadError)?; etc::EtcLqos::load().map_err(|_| ShapedDevicesError::ConfigLoadError)?;
let base_path = Path::new(&cfg.lqos_directory); let base_path = Path::new(&cfg.lqos_directory);
Ok(base_path.join("ShapedDevices.csv")) Ok(base_path.join("ShapedDevices.csv"))
} }
@ -52,8 +52,10 @@ impl ConfigShapedDevices {
/// object containing the resulting data. /// object containing the resulting data.
pub fn load() -> Result<Self, ShapedDevicesError> { pub fn load() -> Result<Self, ShapedDevicesError> {
let final_path = ConfigShapedDevices::path()?; let final_path = ConfigShapedDevices::path()?;
let reader = let reader = ReaderBuilder::new()
ReaderBuilder::new().comment(Some(b'#')).trim(csv::Trim::All).from_path(final_path); .comment(Some(b'#'))
.trim(csv::Trim::All)
.from_path(final_path);
if reader.is_err() { if reader.is_err() {
error!("Unable to read ShapedDevices.csv"); error!("Unable to read ShapedDevices.csv");
return Err(ShapedDevicesError::OpenFail); return Err(ShapedDevicesError::OpenFail);
@ -143,8 +145,8 @@ impl ConfigShapedDevices {
/// Saves the current shaped devices list to `ShapedDevices.csv` /// Saves the current shaped devices list to `ShapedDevices.csv`
pub fn write_csv(&self, filename: &str) -> Result<(), ShapedDevicesError> { pub fn write_csv(&self, filename: &str) -> Result<(), ShapedDevicesError> {
let cfg = etc::EtcLqos::load() let cfg =
.map_err(|_| ShapedDevicesError::ConfigLoadError)?; etc::EtcLqos::load().map_err(|_| ShapedDevicesError::ConfigLoadError)?;
let base_path = Path::new(&cfg.lqos_directory); let base_path = Path::new(&cfg.lqos_directory);
let path = base_path.join(filename); let path = base_path.join(filename);
let csv = self.to_csv_string()?; let csv = self.to_csv_string()?;

View File

@ -54,7 +54,9 @@ pub fn shaped_devices_search(
#[get("/api/reload_required")] #[get("/api/reload_required")]
pub fn reload_required() -> NoCache<Json<bool>> { pub fn reload_required() -> NoCache<Json<bool>> {
NoCache::new(Json(RELOAD_REQUIRED.load(std::sync::atomic::Ordering::Relaxed))) NoCache::new(Json(
RELOAD_REQUIRED.load(std::sync::atomic::Ordering::Relaxed),
))
} }
#[get("/api/reload_libreqos")] #[get("/api/reload_libreqos")]
@ -63,8 +65,7 @@ pub async fn reload_libreqos(auth: AuthGuard) -> NoCache<Json<String>> {
return NoCache::new(Json("Not authorized".to_string())); return NoCache::new(Json("Not authorized".to_string()));
} }
// Send request to lqosd // Send request to lqosd
let responses = let responses = bus_request(vec![BusRequest::ReloadLibreQoS]).await.unwrap();
bus_request(vec![BusRequest::ReloadLibreQoS]).await.unwrap();
let result = match &responses[0] { let result = match &responses[0] {
BusResponse::ReloadLibreQoS(msg) => msg.clone(), BusResponse::ReloadLibreQoS(msg) => msg.clone(),
_ => "Unable to reload LibreQoS".to_string(), _ => "Unable to reload LibreQoS".to_string(),

View File

@ -68,9 +68,7 @@ pub async fn shaped_devices_add_page<'a>(
#[get("/vendor/bootstrap.min.css")] #[get("/vendor/bootstrap.min.css")]
pub async fn bootsrap_css<'a>() -> LongCache<Option<NamedFile>> { pub async fn bootsrap_css<'a>() -> LongCache<Option<NamedFile>> {
LongCache::new( LongCache::new(NamedFile::open("static/vendor/bootstrap.min.css").await.ok())
NamedFile::open("static/vendor/bootstrap.min.css").await.ok(),
)
} }
// Note that NoCache can be replaced with a cache option // Note that NoCache can be replaced with a cache option

View File

@ -1,10 +1,11 @@
use std::sync::atomic::{AtomicU64, AtomicU32, AtomicUsize};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize};
const MAX_CPUS_COUNTED: usize = 128; const MAX_CPUS_COUNTED: usize = 128;
/// Stores overall CPU usage /// Stores overall CPU usage
pub static CPU_USAGE: Lazy<[AtomicU32; MAX_CPUS_COUNTED]> = Lazy::new(build_empty_cpu_list); pub static CPU_USAGE: Lazy<[AtomicU32; MAX_CPUS_COUNTED]> =
Lazy::new(build_empty_cpu_list);
/// Total number of CPUs detected /// Total number of CPUs detected
pub static NUM_CPUS: AtomicUsize = AtomicUsize::new(0); pub static NUM_CPUS: AtomicUsize = AtomicUsize::new(0);
@ -16,9 +17,9 @@ pub static RAM_USED: AtomicU64 = AtomicU64::new(0);
pub static TOTAL_RAM: AtomicU64 = AtomicU64::new(0); pub static TOTAL_RAM: AtomicU64 = AtomicU64::new(0);
fn build_empty_cpu_list() -> [AtomicU32; MAX_CPUS_COUNTED] { fn build_empty_cpu_list() -> [AtomicU32; MAX_CPUS_COUNTED] {
let mut temp = Vec::with_capacity(MAX_CPUS_COUNTED); let mut temp = Vec::with_capacity(MAX_CPUS_COUNTED);
for _ in 0..MAX_CPUS_COUNTED { for _ in 0..MAX_CPUS_COUNTED {
temp.push(AtomicU32::new(0)); temp.push(AtomicU32::new(0));
} }
temp.try_into().expect("This should never happen, sizes are constant.") temp.try_into().expect("This should never happen, sizes are constant.")
} }

View File

@ -59,11 +59,16 @@ pub async fn update_tracking() {
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, cpu)| (i, cpu.cpu_usage() as u32)) // Always rounds down .map(|(i, cpu)| (i, cpu.cpu_usage() as u32)) // Always rounds down
.for_each(|(i, cpu)| CPU_USAGE[i].store(cpu, std::sync::atomic::Ordering::Relaxed)); .for_each(|(i, cpu)| {
CPU_USAGE[i].store(cpu, std::sync::atomic::Ordering::Relaxed)
});
NUM_CPUS.store(sys.cpus().len(), std::sync::atomic::Ordering::Relaxed); NUM_CPUS
RAM_USED.store(sys.used_memory(), std::sync::atomic::Ordering::Relaxed); .store(sys.cpus().len(), std::sync::atomic::Ordering::Relaxed);
TOTAL_RAM.store(sys.total_memory(), std::sync::atomic::Ordering::Relaxed); RAM_USED
.store(sys.used_memory(), std::sync::atomic::Ordering::Relaxed);
TOTAL_RAM
.store(sys.total_memory(), std::sync::atomic::Ordering::Relaxed);
let error = get_data_from_server().await; // Ignoring errors to keep running let error = get_data_from_server().await; // Ignoring errors to keep running
if let Err(error) = error { if let Err(error) = error {
error!("Error in usage update loop: {:?}", error); error!("Error in usage update loop: {:?}", error);

View File

@ -1,8 +1,9 @@
mod cache; mod cache;
mod cache_manager; mod cache_manager;
use self::cache::{ use self::cache::{
CPU_USAGE, CURRENT_THROUGHPUT, HOST_COUNTS, RAM_USED, TOTAL_RAM, RTT_HISTOGRAM, CPU_USAGE, CURRENT_THROUGHPUT, HOST_COUNTS, NUM_CPUS, RAM_USED,
THROUGHPUT_BUFFER, TOP_10_DOWNLOADERS, WORST_10_RTT, NUM_CPUS, RTT_HISTOGRAM, THROUGHPUT_BUFFER, TOP_10_DOWNLOADERS, TOTAL_RAM,
WORST_10_RTT,
}; };
use crate::{auth_guard::AuthGuard, tracker::cache::ThroughputPerSecond}; use crate::{auth_guard::AuthGuard, tracker::cache::ThroughputPerSecond};
pub use cache::{SHAPED_DEVICES, UNKNOWN_DEVICES}; pub use cache::{SHAPED_DEVICES, UNKNOWN_DEVICES};
@ -44,10 +45,8 @@ impl From<&IpStats> for IpStatsWithPlan {
}; };
let cfg = SHAPED_DEVICES.read(); let cfg = SHAPED_DEVICES.read();
if let Some((_, id)) = cfg.trie.longest_match(lookup) { if let Some((_, id)) = cfg.trie.longest_match(lookup) {
result.ip_address = format!( result.ip_address =
"{} ({})", format!("{} ({})", cfg.devices[*id].circuit_name, result.ip_address);
cfg.devices[*id].circuit_name, result.ip_address
);
result.plan.0 = cfg.devices[*id].download_max_mbps; result.plan.0 = cfg.devices[*id].download_max_mbps;
result.plan.1 = cfg.devices[*id].upload_max_mbps; result.plan.1 = cfg.devices[*id].upload_max_mbps;
result.circuit_id = cfg.devices[*id].circuit_id.clone(); result.circuit_id = cfg.devices[*id].circuit_id.clone();
@ -136,8 +135,8 @@ pub fn busy_quantile(_auth: AuthGuard) -> Json<Vec<(u32, u32)>> {
let (down, up) = (down * 8, up * 8); let (down, up) = (down * 8, up * 8);
//println!("{down_capacity}, {up_capacity}, {down}, {up}"); //println!("{down_capacity}, {up_capacity}, {down}, {up}");
let (down, up) = ( let (down, up) = (
if down_capacity > 0.0 { down as f64 / down_capacity } else { 0.0 }, if down_capacity > 0.0 { down as f64 / down_capacity } else { 0.0 },
if up_capacity > 0.0 { up as f64 / up_capacity } else { 0.0}, if up_capacity > 0.0 { up as f64 / up_capacity } else { 0.0 },
); );
let (down, up) = ((down * 10.0) as usize, (up * 10.0) as usize); let (down, up) = ((down * 10.0) as usize, (up * 10.0) as usize);
result[usize::min(9, down)].0 += 1; result[usize::min(9, down)].0 += 1;

View File

@ -3,11 +3,10 @@ use lqos_bus::{bus_request, BusRequest, BusResponse};
pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> { pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
let mut replies = Vec::with_capacity(8); let mut replies = Vec::with_capacity(8);
tokio::runtime::Builder::new_current_thread() tokio::runtime::Builder::new_current_thread().enable_all().build()?.block_on(
.enable_all() async {
.build()?
.block_on(async {
replies.extend_from_slice(&bus_request(requests).await?); replies.extend_from_slice(&bus_request(requests).await?);
Ok(replies) Ok(replies)
}) },
)
} }

View File

@ -1,10 +1,14 @@
use std::{path::Path, fs::{File, remove_file}, io::Write};
use lqos_bus::{BusRequest, BusResponse, TcHandle}; use lqos_bus::{BusRequest, BusResponse, TcHandle};
use nix::libc::getpid; use nix::libc::getpid;
use pyo3::{ use pyo3::{
exceptions::PyOSError, pyclass, pyfunction, pymodule, types::PyModule, exceptions::PyOSError, pyclass, pyfunction, pymodule, types::PyModule,
wrap_pyfunction, PyResult, Python, wrap_pyfunction, PyResult, Python,
}; };
use std::{
fs::{remove_file, File},
io::Write,
path::Path,
};
mod blocking; mod blocking;
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use blocking::run_query; use blocking::run_query;
@ -147,9 +151,7 @@ fn validate_shaped_devices() -> PyResult<String> {
for response in result.iter() { for response in result.iter() {
match response { match response {
BusResponse::Ack => return Ok("OK".to_string()), BusResponse::Ack => return Ok("OK".to_string()),
BusResponse::ShapedDevicesValidation(error) => { BusResponse::ShapedDevicesValidation(error) => return Ok(error.clone()),
return Ok(error.clone())
}
_ => {} _ => {}
} }
} }
@ -185,12 +187,12 @@ fn is_libre_already_running() -> PyResult<bool> {
#[pyfunction] #[pyfunction]
fn create_lock_file() -> PyResult<()> { fn create_lock_file() -> PyResult<()> {
let pid = unsafe { getpid() }; let pid = unsafe { getpid() };
let pid_format = format!("{pid}"); let pid_format = format!("{pid}");
{ {
if let Ok(mut f) = File::create(LOCK_FILE) { if let Ok(mut f) = File::create(LOCK_FILE) {
f.write_all(pid_format.as_bytes())?; f.write_all(pid_format.as_bytes())?;
}
} }
}
Ok(()) Ok(())
} }
@ -198,4 +200,4 @@ fn create_lock_file() -> PyResult<()> {
fn free_lock_file() -> PyResult<()> { fn free_lock_file() -> PyResult<()> {
let _ = remove_file(LOCK_FILE); // Ignore result let _ = remove_file(LOCK_FILE); // Ignore result
Ok(()) Ok(())
} }

View File

@ -66,7 +66,7 @@ fn setup_parent_htb(interface: &str) {
.output() .output()
.unwrap(); .unwrap();
#[rustfmt::skip] #[rustfmt::skip]
Command::new(SUDO) Command::new(SUDO)
.args([ .args([
TC, "qdisc", "add", "dev", interface, "parent", "0x1:1", "cake", TC, "qdisc", "add", "dev", interface, "parent", "0x1:1", "cake",

View File

@ -122,11 +122,7 @@ impl QueueNode {
grab_u64!(result.upload_bandwidth_mbps, key.as_str(), value); grab_u64!(result.upload_bandwidth_mbps, key.as_str(), value);
} }
"downloadBandwidthMbpsMin" | "minDownload" => { "downloadBandwidthMbpsMin" | "minDownload" => {
grab_u64!( grab_u64!(result.download_bandwidth_mbps_min, key.as_str(), value);
result.download_bandwidth_mbps_min,
key.as_str(),
value
);
} }
"uploadBandwidthMbpsMin" | "minUpload" => { "uploadBandwidthMbpsMin" | "minUpload" => {
grab_u64!(result.upload_bandwidth_mbps_min, key.as_str(), value); grab_u64!(result.upload_bandwidth_mbps_min, key.as_str(), value);

View File

@ -101,9 +101,7 @@ impl TcCake {
} }
"bytes" => result.bytes = value.as_u64().unwrap_or(0), "bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32, "packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"overlimits" => { "overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
result.overlimits = value.as_u64().unwrap_or(0) as u32
}
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32, "requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32, "backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32, "qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,
@ -176,14 +174,10 @@ impl TcCakeOptions {
result.ack_filter = result.ack_filter =
AckFilter::from_str(value.as_str().unwrap_or("")) AckFilter::from_str(value.as_str().unwrap_or(""))
} }
"split_gso" => { "split_gso" => result.split_gso = value.as_bool().unwrap_or(false),
result.split_gso = value.as_bool().unwrap_or(false)
}
"rtt" => result.rtt = value.as_u64().unwrap_or(0), "rtt" => result.rtt = value.as_u64().unwrap_or(0),
"raw" => result.raw = value.as_bool().unwrap_or(false), "raw" => result.raw = value.as_bool().unwrap_or(false),
"overhead" => { "overhead" => result.overhead = value.as_u64().unwrap_or(0) as u16,
result.overhead = value.as_u64().unwrap_or(0) as u16
}
"fwmark" => { "fwmark" => {
result.fwmark = value.as_str().unwrap_or("").to_string() result.fwmark = value.as_str().unwrap_or("").to_string()
} }

View File

@ -60,9 +60,7 @@ impl TcFqCodel {
"bytes" => result.bytes = value.as_u64().unwrap_or(0), "bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32, "packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32, "drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"overlimits" => { "overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
result.overlimits = value.as_u64().unwrap_or(0) as u32
}
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32, "requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32, "backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32, "qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,

View File

@ -48,9 +48,7 @@ impl TcHtb {
"bytes" => result.bytes = value.as_u64().unwrap_or(0), "bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32, "packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32, "drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"overlimits" => { "overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
result.overlimits = value.as_u64().unwrap_or(0) as u32
}
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32, "requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32, "backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32, "qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,

View File

@ -36,9 +36,7 @@ impl TcMultiQueue {
"bytes" => result.bytes = value.as_u64().unwrap_or(0), "bytes" => result.bytes = value.as_u64().unwrap_or(0),
"packets" => result.packets = value.as_u64().unwrap_or(0) as u32, "packets" => result.packets = value.as_u64().unwrap_or(0) as u32,
"drops" => result.drops = value.as_u64().unwrap_or(0) as u32, "drops" => result.drops = value.as_u64().unwrap_or(0) as u32,
"overlimits" => { "overlimits" => result.overlimits = value.as_u64().unwrap_or(0) as u32,
result.overlimits = value.as_u64().unwrap_or(0) as u32
}
"requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32, "requeues" => result.requeues = value.as_u64().unwrap_or(0) as u32,
"backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32, "backlog" => result.backlog = value.as_u64().unwrap_or(0) as u32,
"qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32, "qlen" => result.qlen = value.as_u64().unwrap_or(0) as u32,

View File

@ -40,10 +40,7 @@ fn track_queues() {
) )
} else { } else {
( (
read_named_queue_from_interface( read_named_queue_from_interface(&config.isp_interface, download_class),
&config.isp_interface,
download_class,
),
read_named_queue_from_interface( read_named_queue_from_interface(
&config.internet_interface, &config.internet_interface,
download_class, download_class,
@ -64,7 +61,12 @@ fn track_queues() {
QueueStore::new(download[0].clone(), upload[0].clone()), QueueStore::new(download[0].clone(), upload[0].clone()),
); );
} else { } else {
info!("No queue data returned for {}, {}/{} found.", circuit_id.to_string(), download.len(), upload.len()); info!(
"No queue data returned for {}, {}/{} found.",
circuit_id.to_string(),
download.len(),
upload.len()
);
info!("You probably want to run LibreQoS.py"); info!("You probably want to run LibreQoS.py");
} }
} }

View File

@ -37,9 +37,7 @@ pub fn add_watched_queue(circuit_id: &str) {
{ {
let read_lock = WATCHED_QUEUES.read(); let read_lock = WATCHED_QUEUES.read();
if read_lock.iter().any(|q| q.circuit_id == circuit_id) { if read_lock.iter().any(|q| q.circuit_id == circuit_id) {
warn!( warn!("Queue {circuit_id} is already being watched. Duplicate ignored.");
"Queue {circuit_id} is already being watched. Duplicate ignored."
);
return; // No duplicates, please return; // No duplicates, please
} }

View File

@ -1,7 +1,5 @@
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use libbpf_sys::{ use libbpf_sys::{bpf_map_update_elem, bpf_obj_get, libbpf_num_possible_cpus};
bpf_map_update_elem, bpf_obj_get, libbpf_num_possible_cpus,
};
use log::info; use log::info;
use std::{ffi::CString, os::raw::c_void}; use std::{ffi::CString, os::raw::c_void};

View File

@ -120,12 +120,9 @@ mod test {
#[test] #[test]
fn parse_ipv6_subnet() { fn parse_ipv6_subnet() {
let map = IpToMap::new( let map =
"dead:beef::/64", IpToMap::new("dead:beef::/64", TcHandle::from_string("1:2").unwrap(), 1)
TcHandle::from_string("1:2").unwrap(), .unwrap();
1,
)
.unwrap();
let rust_ip: IpAddr = "dead:beef::".parse().unwrap(); let rust_ip: IpAddr = "dead:beef::".parse().unwrap();
assert_eq!(rust_ip, map.subnet); assert_eq!(rust_ip, map.subnet);
assert_eq!(map.prefix, 64); assert_eq!(map.prefix, 64);

View File

@ -48,8 +48,7 @@ pub fn interface_name_to_index(interface_name: &str) -> Result<u32> {
pub fn unload_xdp_from_interface(interface_name: &str) -> Result<()> { pub fn unload_xdp_from_interface(interface_name: &str) -> Result<()> {
info!("Unloading XDP/TC"); info!("Unloading XDP/TC");
check_root()?; check_root()?;
let interface_index = let interface_index = interface_name_to_index(interface_name)?.try_into()?;
interface_name_to_index(interface_name)?.try_into()?;
unsafe { unsafe {
let err = bpf_xdp_attach(interface_index, -1, 1 << 0, std::ptr::null()); let err = bpf_xdp_attach(interface_index, -1, 1 << 0, std::ptr::null());
if err != 0 { if err != 0 {

View File

@ -57,9 +57,7 @@ impl XdpIpAddress {
&& self.0[11] == 0xFF && self.0[11] == 0xFF
{ {
// It's an IPv4 Address // It's an IPv4 Address
IpAddr::V4(Ipv4Addr::new( IpAddr::V4(Ipv4Addr::new(self.0[12], self.0[13], self.0[14], self.0[15]))
self.0[12], self.0[13], self.0[14], self.0[15],
))
} else { } else {
// It's an IPv6 address // It's an IPv6 address
IpAddr::V6(Ipv6Addr::new( IpAddr::V6(Ipv6Addr::new(

View File

@ -1,11 +1,14 @@
use log::{error, warn}; use log::{error, warn};
use nix::{sys::time::TimeSpec, time::{clock_gettime, ClockId}}; use nix::{
sys::time::TimeSpec,
time::{clock_gettime, ClockId},
};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use thiserror::Error; use thiserror::Error;
/// Retrieves the current time, in seconds since the UNIX epoch. /// Retrieves the current time, in seconds since the UNIX epoch.
/// Otherwise known as "unix time". /// Otherwise known as "unix time".
/// ///
/// It can fail if the clock isn't ready. /// It can fail if the clock isn't ready.
pub fn unix_now() -> Result<u64, TimeError> { pub fn unix_now() -> Result<u64, TimeError> {
match SystemTime::now().duration_since(UNIX_EPOCH) { match SystemTime::now().duration_since(UNIX_EPOCH) {
@ -20,13 +23,13 @@ pub fn unix_now() -> Result<u64, TimeError> {
/// Return the time since boot, from the Linux kernel. /// Return the time since boot, from the Linux kernel.
/// Can fail if the clock isn't ready yet. /// Can fail if the clock isn't ready yet.
pub fn time_since_boot() -> Result<TimeSpec, TimeError> { pub fn time_since_boot() -> Result<TimeSpec, TimeError> {
match clock_gettime(ClockId::CLOCK_BOOTTIME) { match clock_gettime(ClockId::CLOCK_BOOTTIME) {
Ok(t) => Ok(t), Ok(t) => Ok(t),
Err(e) => { Err(e) => {
warn!("Clock not ready: {:?}", e); warn!("Clock not ready: {:?}", e);
Err(TimeError::ClockNotReady) Err(TimeError::ClockNotReady)
}
} }
}
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]

View File

@ -14,11 +14,7 @@ pub fn lqos_daht_test() -> BusResponse {
) == Ok(true) ) == Ok(true)
{ {
let result = Command::new("/bin/ssh") let result = Command::new("/bin/ssh")
.args([ .args(["-t", "lqtest@lqos.taht.net", "\"/home/lqtest/bin/v6vsv4.sh\""])
"-t",
"lqtest@lqos.taht.net",
"\"/home/lqtest/bin/v6vsv4.sh\"",
])
.output(); .output();
if result.is_err() { if result.is_err() {
log::warn!("Unable to call dtaht test: {:?}", result); log::warn!("Unable to call dtaht test: {:?}", result);

View File

@ -8,9 +8,7 @@ mod tuning;
mod validation; mod validation;
use crate::{ use crate::{
file_lock::FileLock, file_lock::FileLock,
ip_mapping::{ ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow},
clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow,
},
}; };
use anyhow::Result; use anyhow::Result;
use log::{info, warn}; use log::{info, warn};
@ -78,9 +76,9 @@ async fn main() -> Result<()> {
match sig { match sig {
SIGINT => warn!("Terminating on SIGINT"), SIGINT => warn!("Terminating on SIGINT"),
SIGTERM => warn!("Terminating on SIGTERM"), SIGTERM => warn!("Terminating on SIGTERM"),
_ => warn!( _ => {
"This should never happen - terminating on unknown signal" warn!("This should never happen - terminating on unknown signal")
), }
} }
std::mem::drop(kernels); std::mem::drop(kernels);
UnixSocketServer::signal_cleanup(); UnixSocketServer::signal_cleanup();

View File

@ -272,7 +272,6 @@ pub fn host_counts() -> BusResponse {
type FullList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, u64); type FullList = (XdpIpAddress, (u64, u64), (u64, u64), f32, TcHandle, u64);
pub fn all_unknown_ips() -> BusResponse { pub fn all_unknown_ips() -> BusResponse {
let boot_time = time_since_boot(); let boot_time = time_since_boot();
if boot_time.is_err() { if boot_time.is_err() {
warn!("The Linux system clock isn't available to provide time since boot, yet."); warn!("The Linux system clock isn't available to provide time since boot, yet.");
@ -281,7 +280,8 @@ pub fn all_unknown_ips() -> BusResponse {
} }
let boot_time = boot_time.unwrap(); let boot_time = boot_time.unwrap();
let time_since_boot = Duration::from(boot_time); let time_since_boot = Duration::from(boot_time);
let five_minutes_ago = time_since_boot.saturating_sub(Duration::from_secs(300)); let five_minutes_ago =
time_since_boot.saturating_sub(Duration::from_secs(300));
let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos(); let five_minutes_ago_nanoseconds = five_minutes_ago.as_nanos();
let mut full_list: Vec<FullList> = { let mut full_list: Vec<FullList> = {

View File

@ -120,7 +120,9 @@ impl ThroughputTracker {
self.packets_per_second = (0, 0); self.packets_per_second = (0, 0);
self.shaped_bytes_per_second = (0, 0); self.shaped_bytes_per_second = (0, 0);
self self
.raw_data.values().map(|v| { .raw_data
.values()
.map(|v| {
( (
v.bytes.0.saturating_sub(v.prev_bytes.0), v.bytes.0.saturating_sub(v.prev_bytes.0),
v.bytes.1.saturating_sub(v.prev_bytes.1), v.bytes.1.saturating_sub(v.prev_bytes.1),
@ -129,18 +131,25 @@ impl ThroughputTracker {
v.tc_handle.as_u32() > 0, v.tc_handle.as_u32() > 0,
) )
}) })
.for_each( .for_each(|(bytes_down, bytes_up, packets_down, packets_up, shaped)| {
|(bytes_down, bytes_up, packets_down, packets_up, shaped)| { self.bytes_per_second.0 =
self.bytes_per_second.0 = self.bytes_per_second.0.checked_add(bytes_down).unwrap_or(0); self.bytes_per_second.0.checked_add(bytes_down).unwrap_or(0);
self.bytes_per_second.1 = self.bytes_per_second.1.checked_add(bytes_up).unwrap_or(0); self.bytes_per_second.1 =
self.packets_per_second.0 = self.packets_per_second.0.checked_add(packets_down).unwrap_or(0); self.bytes_per_second.1.checked_add(bytes_up).unwrap_or(0);
self.packets_per_second.1 = self.packets_per_second.1.checked_add(packets_up).unwrap_or(0); self.packets_per_second.0 =
if shaped { self.packets_per_second.0.checked_add(packets_down).unwrap_or(0);
self.shaped_bytes_per_second.0 = self.shaped_bytes_per_second.0.checked_add(bytes_down).unwrap_or(0); self.packets_per_second.1 =
self.shaped_bytes_per_second.1 = self.shaped_bytes_per_second.1.checked_add(bytes_up).unwrap_or(0); self.packets_per_second.1.checked_add(packets_up).unwrap_or(0);
} if shaped {
}, self.shaped_bytes_per_second.0 = self
); .shaped_bytes_per_second
.0
.checked_add(bytes_down)
.unwrap_or(0);
self.shaped_bytes_per_second.1 =
self.shaped_bytes_per_second.1.checked_add(bytes_up).unwrap_or(0);
}
});
} }
pub(crate) fn next_cycle(&mut self) { pub(crate) fn next_cycle(&mut self) {

View File

@ -7,8 +7,7 @@ pub fn bpf_sysctls() {
} }
pub fn stop_irq_balance() { pub fn stop_irq_balance() {
let _ = let _ = Command::new("/bin/systemctl").args(["stop", "irqbalance"]).output();
Command::new("/bin/systemctl").args(["stop", "irqbalance"]).output();
} }
pub fn netdev_budget(usecs: u32, packets: u32) { pub fn netdev_budget(usecs: u32, packets: u32) {

View File

@ -62,10 +62,9 @@ fn draw_menu<'a>(is_connected: bool) -> Paragraph<'a> {
.0 .0
.push(Span::styled(" NOT CONNECTED ", Style::default().fg(Color::Red))) .push(Span::styled(" NOT CONNECTED ", Style::default().fg(Color::Red)))
} else { } else {
text.0.push(Span::styled( text
" CONNECTED ", .0
Style::default().fg(Color::Green), .push(Span::styled(" CONNECTED ", Style::default().fg(Color::Green)))
))
} }
let para = Paragraph::new(text) let para = Paragraph::new(text)
@ -321,7 +320,7 @@ pub async fn main() -> Result<()> {
code: KeyCode::Char('6'), code: KeyCode::Char('6'),
modifiers: KeyModifiers::NONE, modifiers: KeyModifiers::NONE,
.. ..
}) => break, // FIXME Just look at ipv6 }) => break, // FIXME Just look at ipv6
Event::Key(KeyEvent { Event::Key(KeyEvent {
code: KeyCode::Char('4'), code: KeyCode::Char('4'),
modifiers: KeyModifiers::NONE, modifiers: KeyModifiers::NONE,

View File

@ -1,5 +1,5 @@
max_width = 78 max_width = 79
tab_spaces = 2 tab_spaces = 2
use_small_heuristics = "Max" use_small_heuristics = "Max"
array_width = 78 array_width = 77