A bit of a hybrid commit, sorry. Provides various default pre-allocations that seem reasonable, and fixup on bus permissions.

This commit is contained in:
Herbert Wolverson 2023-01-20 19:28:54 +00:00
parent 287ddb6e10
commit 922ddd602f
12 changed files with 42 additions and 26 deletions

1
src/rust/Cargo.lock generated
View File

@ -1308,6 +1308,7 @@ dependencies = [
"criterion", "criterion",
"log", "log",
"lqos_config", "lqos_config",
"lqos_utils",
"nix", "nix",
"serde", "serde",
"tokio", "tokio",

View File

@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
bincode = "1" bincode = "1"
anyhow = "1" anyhow = "1"
lqos_config = { path = "../lqos_config" } lqos_config = { path = "../lqos_config" }
lqos_utils = { path = "../lqos_utils" }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] } tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] }
log = "0" log = "0"
nix = "0" nix = "0"

View File

@ -1,6 +1,7 @@
use tokio::{net::UnixStream, io::{AsyncWriteExt, AsyncReadExt}}; use tokio::{net::UnixStream, io::{AsyncWriteExt, AsyncReadExt}};
use crate::{BUS_SOCKET_PATH, BusSession, BusRequest, encode_request, decode_response, BusResponse}; use crate::{BUS_SOCKET_PATH, BusSession, BusRequest, encode_request, decode_response, BusResponse};
use anyhow::Result; use anyhow::Result;
use super::PREALLOCATE_CLIENT_BUFFER_BYTES;
/// Convenient wrapper for accessing the bus /// Convenient wrapper for accessing the bus
/// ///
@ -17,7 +18,7 @@ pub async fn bus_request(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>>
}; };
let msg = encode_request(&test)?; let msg = encode_request(&test)?;
stream.write(&msg).await?; stream.write(&msg).await?;
let mut buf = Vec::new(); let mut buf = Vec::with_capacity(PREALLOCATE_CLIENT_BUFFER_BYTES);
let _ = stream.read_to_end(&mut buf).await.unwrap(); let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf)?; let reply = decode_response(&buf)?;

View File

@ -20,7 +20,9 @@ pub const BUS_SOCKET_PATH: &str = "/run/lqos/bus";
/// The directory containing the bus socket. Used for ensuring /// The directory containing the bus socket. Used for ensuring
/// that the directory exists. /// that the directory exists.
pub(crate) const BUS_SOCKET_DIRECTORY: &str = "/run/lqos/."; pub(crate) const BUS_SOCKET_DIRECTORY: &str = "/run/lqos";
const PREALLOCATE_CLIENT_BUFFER_BYTES: usize = 10240;
/// Encodes a BusSession with `bincode`, providing a tight binary /// Encodes a BusSession with `bincode`, providing a tight binary
/// representation of the request object for TCP transmission. /// representation of the request object for TCP transmission.

View File

@ -9,6 +9,7 @@ use tokio::{
net::UnixStream, net::UnixStream,
time::timeout, time::timeout,
}; };
use super::PREALLOCATE_CLIENT_BUFFER_BYTES;
/// Provides a lqosd bus client that persists between connections. Useful for when you are /// Provides a lqosd bus client that persists between connections. Useful for when you are
/// going to be repeatedly polling the bus for data (e.g. `lqtop`) and want to avoid the /// going to be repeatedly polling the bus for data (e.g. `lqtop`) and want to avoid the
@ -25,7 +26,7 @@ impl BusClient {
pub async fn new() -> Result<Self> { pub async fn new() -> Result<Self> {
Ok(Self { Ok(Self {
stream: Self::connect().await, stream: Self::connect().await,
buffer: vec![0u8; 10240], buffer: vec![0u8; PREALLOCATE_CLIENT_BUFFER_BYTES],
timeout: Duration::from_millis(100), timeout: Duration::from_millis(100),
}) })
} }

View File

@ -1,7 +1,6 @@
use std::{fs::remove_file, ffi::CString}; use std::{fs::remove_file, ffi::CString};
use crate::{BUS_SOCKET_PATH, decode_request, BusReply, encode_response, BusRequest, BusResponse}; use crate::{BUS_SOCKET_PATH, decode_request, BusReply, encode_response, BusRequest, BusResponse};
use anyhow::Result; use anyhow::Result;
use nix::libc::mode_t;
use tokio::{net::{UnixListener, UnixStream}, io::{AsyncReadExt, AsyncWriteExt}}; use tokio::{net::{UnixListener, UnixStream}, io::{AsyncReadExt, AsyncWriteExt}};
use log::warn; use log::warn;
@ -15,8 +14,9 @@ impl UnixSocketServer {
/// Creates a new `UnixSocketServer`. Will delete any pre-existing /// Creates a new `UnixSocketServer`. Will delete any pre-existing
/// socket file. /// socket file.
pub fn new() -> Result<Self> { pub fn new() -> Result<Self> {
Self::check_directory()?;
Self::delete_local_socket()?; Self::delete_local_socket()?;
Self::check_directory()?;
Self::path_permissions()?;
Ok(Self {}) Ok(Self {})
} }
@ -32,15 +32,19 @@ impl UnixSocketServer {
if dir_path.exists() && dir_path.is_dir() { if dir_path.exists() && dir_path.is_dir() {
Ok(()) Ok(())
} else { } else {
std::fs::create_dir(dir_path)?; std::fs::create_dir(dir_path)?;
let unix_path = CString::new(BUS_SOCKET_DIRECTORY)?;
unsafe {
nix::libc::chmod(unix_path.as_ptr(), mode_t::from_le(666));
}
Ok(()) Ok(())
} }
} }
fn path_permissions() -> Result<()> {
let unix_path = CString::new(BUS_SOCKET_DIRECTORY)?;
unsafe {
nix::libc::chmod(unix_path.as_ptr(), 777);
}
Ok(())
}
fn delete_local_socket() -> Result<()> { fn delete_local_socket() -> Result<()> {
let socket_path = std::path::Path::new(BUS_SOCKET_PATH); let socket_path = std::path::Path::new(BUS_SOCKET_PATH);
if socket_path.exists() { if socket_path.exists() {
@ -50,10 +54,7 @@ impl UnixSocketServer {
} }
fn make_socket_public() -> Result<()> { fn make_socket_public() -> Result<()> {
let unix_path = CString::new(BUS_SOCKET_PATH)?; lqos_utils::run_success!("/bin/chmod", "-R", "a+rwx", BUS_SOCKET_DIRECTORY);
unsafe {
nix::libc::chmod(unix_path.as_ptr(), mode_t::from_le(666));
}
Ok(()) Ok(())
} }
@ -78,7 +79,7 @@ impl UnixSocketServer {
if let Ok(request) = decode_request(&buf) { if let Ok(request) = decode_request(&buf) {
let mut response = BusReply { let mut response = BusReply {
responses: Vec::new(), responses: Vec::with_capacity(8),
}; };
handle_bus_requests(&request.requests, &mut response.responses); handle_bus_requests(&request.requests, &mut response.responses);
let _ = reply_unix(&encode_response(&response).unwrap(), &mut socket).await; let _ = reply_unix(&encode_response(&response).unwrap(), &mut socket).await;

View File

@ -17,3 +17,6 @@ pub use etc::{BridgeConfig, BridgeInterface, BridgeVlan, EtcLqos, Tunables};
pub use libre_qos_config::LibreQoSConfig; pub use libre_qos_config::LibreQoSConfig;
pub use program_control::load_libreqos; pub use program_control::load_libreqos;
pub use shaped_devices::{ConfigShapedDevices, ShapedDevice}; pub use shaped_devices::{ConfigShapedDevices, ShapedDevice};
/// Used as a constant in determining buffer preallocation
pub const SUPPORTED_CUSTOMERS: usize = 16_000_000;

View File

@ -1,6 +1,6 @@
mod serializable; mod serializable;
mod shaped_device; mod shaped_device;
use crate::etc; use crate::{etc, SUPPORTED_CUSTOMERS};
use anyhow::Result; use anyhow::Result;
use csv::{QuoteStyle, WriterBuilder, ReaderBuilder}; use csv::{QuoteStyle, WriterBuilder, ReaderBuilder};
use serializable::SerializableShapedDevice; use serializable::SerializableShapedDevice;
@ -38,7 +38,7 @@ impl ConfigShapedDevices {
// Example: StringRecord(["1", "968 Circle St., Gurnee, IL 60031", "1", "Device 1", "", "", "192.168.101.2", "", "25", "5", "10000", "10000", ""]) // Example: StringRecord(["1", "968 Circle St., Gurnee, IL 60031", "1", "Device 1", "", "", "192.168.101.2", "", "25", "5", "10000", "10000", ""])
let mut devices = Vec::new(); let mut devices = Vec::with_capacity(SUPPORTED_CUSTOMERS);
for result in reader.records() { for result in reader.records() {
if let Ok(result) = result { if let Ok(result) = result {
if let Ok(device) = ShapedDevice::from_csv(&result) { if let Ok(device) = ShapedDevice::from_csv(&result) {

View File

@ -3,7 +3,7 @@ use parking_lot::RwLock;
lazy_static! { lazy_static! {
/// Global storage of current CPU usage /// Global storage of current CPU usage
pub static ref CPU_USAGE : RwLock<Vec<f32>> = RwLock::new(Vec::new()); pub static ref CPU_USAGE : RwLock<Vec<f32>> = RwLock::new(Vec::with_capacity(128));
} }
lazy_static! { lazy_static! {

View File

@ -3,15 +3,15 @@ use lqos_bus::IpStats;
use parking_lot::RwLock; use parking_lot::RwLock;
lazy_static! { lazy_static! {
pub static ref TOP_10_DOWNLOADERS: RwLock<Vec<IpStats>> = RwLock::new(Vec::new()); pub static ref TOP_10_DOWNLOADERS: RwLock<Vec<IpStats>> = RwLock::new(Vec::with_capacity(10));
} }
lazy_static! { lazy_static! {
pub static ref WORST_10_RTT: RwLock<Vec<IpStats>> = RwLock::new(Vec::new()); pub static ref WORST_10_RTT: RwLock<Vec<IpStats>> = RwLock::new(Vec::with_capacity(10));
} }
lazy_static! { lazy_static! {
pub static ref RTT_HISTOGRAM: RwLock<Vec<u32>> = RwLock::new(Vec::new()); pub static ref RTT_HISTOGRAM: RwLock<Vec<u32>> = RwLock::new(Vec::with_capacity(100));
} }
lazy_static! { lazy_static! {

View File

@ -4,7 +4,7 @@ use lqos_bus::{
}; };
pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> { pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
let mut replies = Vec::new(); let mut replies = Vec::with_capacity(8);
tokio::runtime::Builder::new_current_thread() tokio::runtime::Builder::new_current_thread()
.enable_all() .enable_all()
.build() .build()

View File

@ -5,7 +5,7 @@ use sysinfo::{System, SystemExt, Pid, ProcessExt};
const LOCK_PATH: &str = "/run/lqos/lqosd.lock"; const LOCK_PATH: &str = "/run/lqos/lqosd.lock";
const LOCK_DIR: &str = "/run/lqos"; const LOCK_DIR: &str = "/run/lqos";
const LOCK_DIR_PERMS: &str = "/run/lqos/."; const LOCK_DIR_PERMS: &str = "/run/lqos";
pub struct FileLock {} pub struct FileLock {}
@ -46,8 +46,14 @@ impl FileLock {
fn create_lock() -> Result<()> { fn create_lock() -> Result<()> {
let pid = unsafe { getpid() }; let pid = unsafe { getpid() };
let pid_format = format!("{pid}"); let pid_format = format!("{pid}");
let mut f = File::create(LOCK_PATH)?; {
f.write_all(pid_format.as_bytes())?; let mut f = File::create(LOCK_PATH)?;
f.write_all(pid_format.as_bytes())?;
}
let unix_path = CString::new(LOCK_PATH)?;
unsafe {
nix::libc::chmod(unix_path.as_ptr(), mode_t::from_le(666));
}
Ok(()) Ok(())
} }
@ -59,7 +65,7 @@ impl FileLock {
std::fs::create_dir(dir_path)?; std::fs::create_dir(dir_path)?;
let unix_path = CString::new(LOCK_DIR_PERMS)?; let unix_path = CString::new(LOCK_DIR_PERMS)?;
unsafe { unsafe {
nix::libc::chmod(unix_path.as_ptr(), mode_t::from_le(666)); nix::libc::chmod(unix_path.as_ptr(), 777);
} }
Ok(()) Ok(())
} }