WIP - Use a UNIX stream socket rather than TCP for local bus communications.

This commit is contained in:
Herbert Wolverson
2023-01-18 21:21:42 +00:00
parent 0d81df6031
commit 6c52fceb7f
18 changed files with 236 additions and 265 deletions

10
src/rust/Cargo.lock generated
View File

@@ -253,6 +253,15 @@ version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba" checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "bus_benchmark"
version = "0.1.0"
dependencies = [
"anyhow",
"lqos_bus",
"tokio",
]
[[package]] [[package]]
name = "byteorder" name = "byteorder"
version = "1.4.3" version = "1.4.3"
@@ -1278,6 +1287,7 @@ dependencies = [
"cc", "cc",
"lqos_config", "lqos_config",
"serde", "serde",
"tokio",
] ]
[[package]] [[package]]

View File

@@ -21,4 +21,5 @@ members = [
"lqos_node_manager", # A lightweight web interface for management and local monitoring "lqos_node_manager", # A lightweight web interface for management and local monitoring
"lqos_python", # Python bindings for using the Rust bus directly "lqos_python", # Python bindings for using the Rust bus directly
"webusers", # CLI control for managing the web user list "webusers", # CLI control for managing the web user list
"bus_benchmark", # Simple tool to measure the overall latency of using our local socket connection
] ]

View File

@@ -0,0 +1,9 @@
[package]
name = "bus_benchmark"
version = "0.1.0"
edition = "2021"
[dependencies]
lqos_bus = { path = "../lqos_bus" }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
anyhow = "1"

View File

@@ -0,0 +1,22 @@
use std::time::Instant;
use anyhow::Result;
use lqos_bus::{bus_request, BusRequest};
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<()> {
const RUNS: usize = 100;
println!("Sending {RUNS} bus pings, please wait.");
let mut times = Vec::new();
for _ in 0 .. RUNS {
let now = Instant::now();
let responses = bus_request(vec![BusRequest::Ping]).await?;
let runtime = now.elapsed();
assert_eq!(responses.len(), 1);
times.push(runtime);
}
let sum_usec: u128 = times.iter().map(|t| t.as_nanos()).sum();
let avg_usec = sum_usec / RUNS as u128;
println!("Average bus time: {avg_usec} nanoseconds");
Ok(())
}

View File

@@ -12,6 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
bincode = "1" bincode = "1"
anyhow = "1" anyhow = "1"
lqos_config = { path = "../lqos_config" } lqos_config = { path = "../lqos_config" }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
[build-dependencies] [build-dependencies]
cc = "1.0" cc = "1.0"

View File

@@ -0,0 +1,28 @@
use tokio::{net::UnixStream, io::{AsyncWriteExt, AsyncReadExt}};
use crate::{BUS_SOCKET_PATH, BusSession, BusRequest, encode_request, decode_response, cookie_value, BusResponse};
use anyhow::{Result, Error};
/// Convenient wrapper for accessing the bus
///
/// ## Arguments
///
/// * `requests` a vector of `BusRequest` requests to make.
///
/// **Returns** Either an error, or a vector of `BusResponse` replies
pub async fn bus_request(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
let mut stream = UnixStream::connect(BUS_SOCKET_PATH).await.unwrap();
let test = BusSession {
auth_cookie: 1234,
requests,
};
let msg = encode_request(&test)?;
stream.write(&msg).await?;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf)?;
if reply.auth_cookie != cookie_value() {
return Err(Error::msg("Invalid reply cookie"));
}
Ok(reply.responses)
}

View File

@@ -2,17 +2,17 @@ mod reply;
mod request; mod request;
mod response; mod response;
mod session; mod session;
mod client;
use anyhow::Result; use anyhow::Result;
pub use reply::BusReply; pub use reply::BusReply;
pub use request::BusRequest; pub use request::BusRequest;
pub use response::BusResponse; pub use response::BusResponse;
pub use session::BusSession; pub use session::BusSession;
pub use client::bus_request;
/// The address to which `lqosd` should bind itself when listening for /// The local socket path to which `lqosd` will bind itself,
/// local bust requests. /// listening for requets.
/// pub const BUS_SOCKET_PATH: &str = "/tmp/lqos_bus";
/// This is typically `localhost` to minimize the exposed footprint.
pub const BUS_BIND_ADDRESS: &str = "127.0.0.1:9999";
/// Encodes a BusSession with `bincode`, providing a tight binary /// Encodes a BusSession with `bincode`, providing a tight binary
/// representation of the request object for TCP transmission. /// representation of the request object for TCP transmission.

View File

@@ -16,6 +16,6 @@ pub use ip_stats::{IpMapping, IpStats, XdpPpingResult};
mod tc_handle; mod tc_handle;
pub use bus::{ pub use bus::{
cookie_value, decode_request, decode_response, encode_request, encode_response, BusReply, cookie_value, decode_request, decode_response, encode_request, encode_response, BusReply,
BusRequest, BusResponse, BusSession, BUS_BIND_ADDRESS, BusRequest, BusResponse, BusSession, BUS_SOCKET_PATH, bus_request
}; };
pub use tc_handle::TcHandle; pub use tc_handle::TcHandle;

View File

@@ -1,14 +1,10 @@
use crate::{auth_guard::AuthGuard, cache_control::NoCache}; use crate::{auth_guard::AuthGuard, cache_control::NoCache};
use default_net::get_interfaces; use default_net::get_interfaces;
use lqos_bus::{decode_response, encode_request, BusRequest, BusSession, BUS_BIND_ADDRESS}; use lqos_bus::{BusRequest, bus_request};
use lqos_config::{EtcLqos, LibreQoSConfig, Tunables}; use lqos_config::{EtcLqos, LibreQoSConfig, Tunables};
use rocket::{ use rocket::{
fs::NamedFile, fs::NamedFile,
serde::json::Json, serde::json::Json,
tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
},
}; };
// Note that NoCache can be replaced with a cache option // Note that NoCache can be replaced with a cache option
@@ -63,18 +59,8 @@ pub async fn update_lqos_tuning(
} }
// Send the update to the server // Send the update to the server
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await.unwrap(); bus_request(vec![BusRequest::UpdateLqosDTuning(period, (*tuning).clone())]).await.unwrap();
let test = BusSession {
auth_cookie: 1234,
requests: vec![BusRequest::UpdateLqosDTuning(period, (*tuning).clone())],
};
let msg = encode_request(&test).unwrap();
stream.write(&msg).await.unwrap();
// Receive reply
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let _reply = decode_response(&buf).unwrap();
// For now, ignore the reply. // For now, ignore the reply.
Json("OK".to_string()) Json("OK".to_string())

View File

@@ -2,13 +2,11 @@ use crate::auth_guard::AuthGuard;
use crate::cache_control::NoCache; use crate::cache_control::NoCache;
use crate::tracker::SHAPED_DEVICES; use crate::tracker::SHAPED_DEVICES;
use lqos_bus::{ use lqos_bus::{
decode_response, encode_request, BusRequest, BusResponse, BusSession, BUS_BIND_ADDRESS, BusRequest, BusResponse, bus_request,
}; };
use rocket::response::content::RawJson; use rocket::response::content::RawJson;
use rocket::serde::json::Json; use rocket::serde::json::Json;
use rocket::serde::Serialize; use rocket::serde::Serialize;
use rocket::tokio::io::{AsyncReadExt, AsyncWriteExt};
use rocket::tokio::net::TcpStream;
use std::net::IpAddr; use std::net::IpAddr;
#[derive(Serialize, Clone)] #[derive(Serialize, Clone)]
@@ -20,18 +18,7 @@ pub struct CircuitInfo {
#[get("/api/watch_circuit/<circuit_id>")] #[get("/api/watch_circuit/<circuit_id>")]
pub async fn watch_circuit(circuit_id: String, _auth: AuthGuard) -> NoCache<Json<String>> { pub async fn watch_circuit(circuit_id: String, _auth: AuthGuard) -> NoCache<Json<String>> {
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await.unwrap(); bus_request(vec![BusRequest::WatchQueue(circuit_id)]).await.unwrap();
let test = BusSession {
auth_cookie: 1234,
requests: vec![BusRequest::WatchQueue(circuit_id)],
};
let msg = encode_request(&test).unwrap();
stream.write(&msg).await.unwrap();
// Receive reply
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let _reply = decode_response(&buf).unwrap();
NoCache::new(Json("OK".to_string())) NoCache::new(Json("OK".to_string()))
} }
@@ -70,19 +57,8 @@ pub async fn current_circuit_throughput(
// Get a list of host counts // Get a list of host counts
// This is really inefficient, but I'm struggling to find a better way. // This is really inefficient, but I'm struggling to find a better way.
// TODO: Fix me up // TODO: Fix me up
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await.unwrap();
let test = BusSession {
auth_cookie: 1234,
requests: vec![BusRequest::GetHostCounter],
};
let msg = encode_request(&test).unwrap();
stream.write(&msg).await.unwrap();
// Receive reply for msg in bus_request(vec![BusRequest::GetHostCounter]).await.unwrap().iter() {
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf).unwrap();
for msg in reply.responses.iter() {
match msg { match msg {
BusResponse::HostCounters(hosts) => { BusResponse::HostCounters(hosts) => {
let devices = SHAPED_DEVICES.read(); let devices = SHAPED_DEVICES.read();
@@ -110,20 +86,8 @@ pub async fn raw_queue_by_circuit(
circuit_id: String, circuit_id: String,
_auth: AuthGuard, _auth: AuthGuard,
) -> NoCache<RawJson<String>> { ) -> NoCache<RawJson<String>> {
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await.unwrap(); let responses = bus_request(vec![BusRequest::GetRawQueueData(circuit_id)]).await.unwrap();
let test = BusSession { let result = match &responses[0] {
auth_cookie: 1234,
requests: vec![BusRequest::GetRawQueueData(circuit_id)],
};
let msg = encode_request(&test).unwrap();
stream.write(&msg).await.unwrap();
// Receive reply
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf).unwrap();
let result = match &reply.responses[0] {
BusResponse::RawQueueData(msg) => msg.clone(), BusResponse::RawQueueData(msg) => msg.clone(),
_ => "Unable to request queue".to_string(), _ => "Unable to request queue".to_string(),
}; };
@@ -133,20 +97,8 @@ pub async fn raw_queue_by_circuit(
#[cfg(feature = "equinix_tests")] #[cfg(feature = "equinix_tests")]
#[get("/api/run_btest")] #[get("/api/run_btest")]
pub async fn run_btest() -> NoCache<RawJson<String>> { pub async fn run_btest() -> NoCache<RawJson<String>> {
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await.unwrap(); let responses = bus_request(vec![BusRequest::RequestLqosEquinixTest]).await.unwrap();
let test = BusSession { let result = match &responses[0] {
auth_cookie: 1234,
requests: vec![BusRequest::RequestLqosEquinixTest],
};
let msg = encode_request(&test).unwrap();
stream.write(&msg).await.unwrap();
// Receive reply
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf).unwrap();
let result = match &reply.responses[0] {
BusResponse::Ack => String::new(), BusResponse::Ack => String::new(),
_ => "Unable to request test".to_string(), _ => "Unable to request test".to_string(),
}; };

View File

@@ -3,13 +3,11 @@ use crate::cache_control::NoCache;
use crate::tracker::SHAPED_DEVICES; use crate::tracker::SHAPED_DEVICES;
use lazy_static::*; use lazy_static::*;
use lqos_bus::{ use lqos_bus::{
decode_response, encode_request, BusRequest, BusResponse, BusSession, BUS_BIND_ADDRESS, BusRequest, BusResponse, bus_request,
}; };
use lqos_config::ShapedDevice; use lqos_config::ShapedDevice;
use parking_lot::RwLock; use parking_lot::RwLock;
use rocket::serde::json::Json; use rocket::serde::json::Json;
use rocket::tokio::io::{AsyncReadExt, AsyncWriteExt};
use rocket::tokio::net::TcpStream;
lazy_static! { lazy_static! {
static ref RELOAD_REQUIRED: RwLock<bool> = RwLock::new(false); static ref RELOAD_REQUIRED: RwLock<bool> = RwLock::new(false);
@@ -69,20 +67,8 @@ pub async fn reload_libreqos(auth: AuthGuard) -> NoCache<Json<String>> {
return NoCache::new(Json("Not authorized".to_string())); return NoCache::new(Json("Not authorized".to_string()));
} }
// Send request to lqosd // Send request to lqosd
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await.unwrap(); let responses = bus_request(vec![BusRequest::ReloadLibreQoS]).await.unwrap();
let test = BusSession { let result = match &responses[0] {
auth_cookie: 1234,
requests: vec![BusRequest::ReloadLibreQoS],
};
let msg = encode_request(&test).unwrap();
stream.write(&msg).await.unwrap();
// Receive reply
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf).unwrap();
let result = match &reply.responses[0] {
BusResponse::ReloadLibreQoS(msg) => msg.clone(), BusResponse::ReloadLibreQoS(msg) => msg.clone(),
_ => "Unable to reload LibreQoS".to_string(), _ => "Unable to reload LibreQoS".to_string(),
}; };

View File

@@ -4,12 +4,10 @@
use super::cache::*; use super::cache::*;
use anyhow::Result; use anyhow::Result;
use lqos_bus::{ use lqos_bus::{
decode_response, encode_request, BusRequest, BusResponse, BusSession, IpStats, BUS_BIND_ADDRESS, BusRequest, BusResponse, IpStats, bus_request,
}; };
use lqos_config::ConfigShapedDevices; use lqos_config::ConfigShapedDevices;
use rocket::tokio::{ use rocket::tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
task::spawn_blocking, task::spawn_blocking,
}; };
use std::{net::IpAddr, time::Duration}; use std::{net::IpAddr, time::Duration};
@@ -71,27 +69,14 @@ fn watch_for_shaped_devices_changing() -> Result<()> {
/// caches. /// caches.
async fn get_data_from_server() -> Result<()> { async fn get_data_from_server() -> Result<()> {
// Send request to lqosd // Send request to lqosd
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await?; let requests = vec![
let test = BusSession { BusRequest::GetCurrentThroughput,
auth_cookie: 1234, BusRequest::GetTopNDownloaders(10),
requests: vec![ BusRequest::GetWorstRtt(10),
BusRequest::GetCurrentThroughput, BusRequest::RttHistogram,
BusRequest::GetTopNDownloaders(10), BusRequest::AllUnknownIps,
BusRequest::GetWorstRtt(10), ];
BusRequest::RttHistogram, for r in bus_request(requests).await?.iter() {
BusRequest::AllUnknownIps,
],
};
let msg = encode_request(&test)?;
stream.write(&msg).await?;
// Receive reply
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf)?;
// Process the reply
for r in reply.responses.iter() {
match r { match r {
BusResponse::CurrentThroughput { BusResponse::CurrentThroughput {
bits_per_second, bits_per_second,

View File

@@ -1,10 +1,6 @@
use anyhow::Result; use anyhow::Result;
use lqos_bus::{ use lqos_bus::{
decode_response, encode_request, BusRequest, BusResponse, BusSession, BUS_BIND_ADDRESS, BusRequest, BusResponse, bus_request,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
}; };
pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> { pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
@@ -14,17 +10,7 @@ pub fn run_query(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
.build() .build()
.unwrap() .unwrap()
.block_on(async { .block_on(async {
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await?; replies.extend_from_slice(&bus_request(requests).await?);
let test = BusSession {
auth_cookie: 1234,
requests: requests,
};
let msg = encode_request(&test)?;
stream.write(&msg).await?;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await?;
let reply = decode_response(&buf)?;
replies.extend_from_slice(&reply.responses);
Ok(replies) Ok(replies)
}) })
} }

View File

@@ -4,12 +4,11 @@ mod lqos_daht_test;
mod program_control; mod program_control;
mod throughput_tracker; mod throughput_tracker;
mod tuning; mod tuning;
use crate::ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow}; mod unix_socket_server;
use crate::{ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow}, unix_socket_server::UnixSocketServer};
use anyhow::Result; use anyhow::Result;
use log::{info, warn}; use log::{info, warn};
use lqos_bus::{ use lqos_bus::{BusResponse, BusRequest};
cookie_value, decode_request, encode_response, BusReply, BusRequest, BUS_BIND_ADDRESS,
};
use lqos_config::LibreQoSConfig; use lqos_config::LibreQoSConfig;
use lqos_queue_tracker::{ use lqos_queue_tracker::{
add_watched_queue, get_raw_circuit_data, spawn_queue_monitor, spawn_queue_structure_monitor, add_watched_queue, get_raw_circuit_data, spawn_queue_monitor, spawn_queue_structure_monitor,
@@ -19,11 +18,7 @@ use signal_hook::{
consts::{SIGHUP, SIGINT, SIGTERM}, consts::{SIGHUP, SIGINT, SIGTERM},
iterator::Signals, iterator::Signals,
}; };
use tokio::{ use tokio::join;
io::{AsyncReadExt, AsyncWriteExt},
join,
net::{TcpListener, TcpStream},
};
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@@ -88,75 +83,52 @@ async fn main() -> Result<()> {
}); });
// Main bus listen loop // Main bus listen loop
let listener = TcpListener::bind(BUS_BIND_ADDRESS).await?; let server = UnixSocketServer::new().expect("Unable to spawn server");
warn!("Listening on: {}", BUS_BIND_ADDRESS); server.listen().await?;
loop { Ok(())
let (mut socket, _) = listener.accept().await?; }
tokio::spawn(async move {
let mut buf = vec![0; 1024];
let _ = socket async fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec<BusResponse>) {
.read(&mut buf) for req in requests.iter() {
.await //println!("Request: {:?}", req);
.expect("failed to read data from socket"); responses.push(match req {
BusRequest::Ping => lqos_bus::BusResponse::Ack,
if let Ok(request) = decode_request(&buf) { BusRequest::GetCurrentThroughput => {
if request.auth_cookie == cookie_value() { throughput_tracker::current_throughput()
let mut response = BusReply { }
auth_cookie: request.auth_cookie, BusRequest::GetHostCounter => throughput_tracker::host_counters(),
responses: Vec::new(), BusRequest::GetTopNDownloaders(n) => throughput_tracker::top_n(*n),
}; BusRequest::GetWorstRtt(n) => throughput_tracker::worst_n(*n),
for req in request.requests.iter() { BusRequest::MapIpToFlow {
//println!("Request: {:?}", req); ip_address,
response.responses.push(match req { tc_handle,
BusRequest::Ping => lqos_bus::BusResponse::Ack, cpu,
BusRequest::GetCurrentThroughput => { upload,
throughput_tracker::current_throughput() } => map_ip_to_flow(ip_address, tc_handle, *cpu, *upload),
} BusRequest::DelIpFlow { ip_address, upload } => {
BusRequest::GetHostCounter => throughput_tracker::host_counters(), del_ip_flow(&ip_address, *upload)
BusRequest::GetTopNDownloaders(n) => throughput_tracker::top_n(*n), }
BusRequest::GetWorstRtt(n) => throughput_tracker::worst_n(*n), BusRequest::ClearIpFlow => clear_ip_flows(),
BusRequest::MapIpToFlow { BusRequest::ListIpFlow => list_mapped_ips(),
ip_address, BusRequest::XdpPping => throughput_tracker::xdp_pping_compat(),
tc_handle, BusRequest::RttHistogram => throughput_tracker::rtt_histogram(),
cpu, BusRequest::HostCounts => throughput_tracker::host_counts(),
upload, BusRequest::AllUnknownIps => throughput_tracker::all_unknown_ips(),
} => map_ip_to_flow(ip_address, tc_handle, *cpu, *upload), BusRequest::ReloadLibreQoS => program_control::reload_libre_qos(),
BusRequest::DelIpFlow { ip_address, upload } => { BusRequest::GetRawQueueData(circuit_id) => {
del_ip_flow(&ip_address, *upload) get_raw_circuit_data(&circuit_id)
} }
BusRequest::ClearIpFlow => clear_ip_flows(), BusRequest::WatchQueue(circuit_id) => {
BusRequest::ListIpFlow => list_mapped_ips(), add_watched_queue(&circuit_id);
BusRequest::XdpPping => throughput_tracker::xdp_pping_compat(), lqos_bus::BusResponse::Ack
BusRequest::RttHistogram => throughput_tracker::rtt_histogram(), }
BusRequest::HostCounts => throughput_tracker::host_counts(), BusRequest::UpdateLqosDTuning(..) => {
BusRequest::AllUnknownIps => throughput_tracker::all_unknown_ips(), tuning::tune_lqosd_from_bus(&req).await
BusRequest::ReloadLibreQoS => program_control::reload_libre_qos(), }
BusRequest::GetRawQueueData(circuit_id) => { #[cfg(feature = "equinix_tests")]
get_raw_circuit_data(&circuit_id) BusRequest::RequestLqosEquinixTest => {
} lqos_daht_test::lqos_daht_test().await
BusRequest::WatchQueue(circuit_id) => {
add_watched_queue(&circuit_id);
lqos_bus::BusResponse::Ack
}
BusRequest::UpdateLqosDTuning(..) => {
tuning::tune_lqosd_from_bus(&req).await
}
#[cfg(feature = "equinix_tests")]
BusRequest::RequestLqosEquinixTest => {
lqos_daht_test::lqos_daht_test().await
}
});
}
//println!("{:?}", response);
let _ = reply(&encode_response(&response).unwrap(), &mut socket).await;
}
} }
}); });
} }
} }
async fn reply(response: &[u8], socket: &mut TcpStream) -> Result<()> {
socket.write_all(&response).await?;
Ok(())
}

View File

@@ -0,0 +1,75 @@
use std::{fs::remove_file, ffi::CString};
use lqos_bus::{BUS_SOCKET_PATH, decode_request, cookie_value, BusReply, encode_response};
use anyhow::Result;
use nix::libc::mode_t;
use tokio::{net::{UnixListener, UnixStream}, io::{AsyncReadExt, AsyncWriteExt}};
use log::warn;
pub(crate) struct UnixSocketServer {}
impl UnixSocketServer {
pub(crate) fn new() -> Result<Self> {
Self::delete_local_socket()?;
Ok(Self {})
}
fn delete_local_socket() -> Result<()> {
let socket_path = std::path::Path::new(BUS_SOCKET_PATH);
if socket_path.exists() {
remove_file(socket_path)?;
}
Ok(())
}
fn make_socket_public() -> Result<()> {
let unix_path = CString::new(BUS_SOCKET_PATH)?;
unsafe {
nix::libc::chmod(unix_path.as_ptr(), mode_t::from_le(666));
}
Ok(())
}
pub(crate) async fn listen(&self) -> Result<()>
{
// Setup the listener and grant permissions to it
let listener = UnixListener::bind(BUS_SOCKET_PATH)?;
Self::make_socket_public()?;
warn!("Listening on: {}", BUS_SOCKET_PATH);
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
let _ = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");
if let Ok(request) = decode_request(&buf) {
if request.auth_cookie == cookie_value() {
let mut response = BusReply {
auth_cookie: request.auth_cookie,
responses: Vec::new(),
};
super::handle_bus_requests(&request.requests, &mut response.responses).await;
let _ = reply_unix(&encode_response(&response).unwrap(), &mut socket).await;
}
} else {
warn!("Invalid data on local socket");
}
});
}
//Ok(()) // unreachable
}
}
impl Drop for UnixSocketServer {
fn drop(&mut self) {
let _ = UnixSocketServer::delete_local_socket(); // Ignore result
}
}
async fn reply_unix(response: &[u8], socket: &mut UnixStream) -> Result<()> {
socket.write_all(&response).await?;
Ok(())
}

View File

@@ -1,13 +1,9 @@
use anyhow::Result; use anyhow::Result;
use crossterm::{event::KeyCode, terminal::enable_raw_mode}; use crossterm::{event::KeyCode, terminal::enable_raw_mode};
use lqos_bus::{ use lqos_bus::{
decode_response, encode_request, BusRequest, BusResponse, BusSession, IpStats, BUS_BIND_ADDRESS, BusRequest, BusResponse, IpStats, bus_request,
}; };
use std::{io, time::Duration}; use std::{io, time::Duration};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
use tui::{ use tui::{
backend::CrosstermBackend, backend::CrosstermBackend,
layout::{Alignment, Constraint, Direction, Layout}, layout::{Alignment, Constraint, Direction, Layout},
@@ -26,22 +22,12 @@ async fn get_data(n_rows: u16) -> Result<DataResult> {
let mut result = DataResult { let mut result = DataResult {
totals: (0, 0, 0, 0), totals: (0, 0, 0, 0),
top: Vec::new(), top: Vec::new(),
}; };
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await?; let requests = vec![
let test = BusSession { BusRequest::GetCurrentThroughput,
auth_cookie: 1234, BusRequest::GetTopNDownloaders(n_rows as u32),
requests: vec![ ];
BusRequest::GetCurrentThroughput, for r in bus_request(requests).await? {
BusRequest::GetTopNDownloaders(n_rows as u32),
],
};
let msg = encode_request(&test)?;
stream.write(&msg).await?;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf)?;
for r in reply.responses.iter() {
match r { match r {
BusResponse::CurrentThroughput { BusResponse::CurrentThroughput {
bits_per_second, bits_per_second,

View File

@@ -1,14 +1,9 @@
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use clap::{Parser, Subcommand}; use clap::{Parser, Subcommand};
use lqos_bus::{ use lqos_bus::{
decode_response, encode_request, BusRequest, BusResponse, BusSession, IpMapping, TcHandle, BusRequest, BusResponse, IpMapping, TcHandle, bus_request,
BUS_BIND_ADDRESS,
}; };
use std::process::exit; use std::process::exit;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
#[derive(Parser)] #[derive(Parser)]
#[command()] #[command()]
@@ -53,17 +48,8 @@ enum Commands {
} }
async fn talk_to_server(command: BusRequest) -> Result<()> { async fn talk_to_server(command: BusRequest) -> Result<()> {
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await?; let responses = bus_request(vec![command]).await?;
let test = BusSession { match &responses[0] {
auth_cookie: 1234,
requests: vec![command],
};
let msg = encode_request(&test)?;
stream.write(&msg).await?;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf)?;
match &reply.responses[0] {
BusResponse::Ack => { BusResponse::Ack => {
println!("Success"); println!("Success");
Ok(()) Ok(())

View File

@@ -1,25 +1,11 @@
use anyhow::Result; use anyhow::Result;
use lqos_bus::{ use lqos_bus::{
decode_response, encode_request, BusRequest, BusResponse, BusSession, BUS_BIND_ADDRESS, BusRequest, BusResponse, bus_request,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
}; };
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<()> { pub async fn main() -> Result<()> {
let mut stream = TcpStream::connect(BUS_BIND_ADDRESS).await?; for resp in bus_request(vec![BusRequest::XdpPping]).await? {
let test = BusSession {
auth_cookie: 1234,
requests: vec![BusRequest::XdpPping],
};
let msg = encode_request(&test)?;
stream.write(&msg).await?;
let mut buf = Vec::new();
let _ = stream.read_to_end(&mut buf).await.unwrap();
let reply = decode_response(&buf)?;
for resp in reply.responses.iter() {
match resp { match resp {
BusResponse::XdpPping(lines) => { BusResponse::XdpPping(lines) => {
println!("["); println!("[");