mirror of
https://github.com/LibreQoE/LibreQoS.git
synced 2025-02-25 18:55:32 -06:00
Move the socketserver into the bus crate also, makes more sense there.
This commit is contained in:
parent
2b573708ac
commit
da6a8f08a2
2
src/rust/Cargo.lock
generated
2
src/rust/Cargo.lock
generated
@ -1279,7 +1279,9 @@ dependencies = [
|
||||
"bincode",
|
||||
"cc",
|
||||
"criterion",
|
||||
"log",
|
||||
"lqos_config",
|
||||
"nix",
|
||||
"serde",
|
||||
"tokio",
|
||||
]
|
||||
|
@ -13,6 +13,8 @@ bincode = "1"
|
||||
anyhow = "1"
|
||||
lqos_config = { path = "../lqos_config" }
|
||||
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
|
||||
log = "0"
|
||||
nix = "0"
|
||||
|
||||
[build-dependencies]
|
||||
cc = "1.0"
|
||||
|
@ -3,12 +3,14 @@ mod request;
|
||||
mod response;
|
||||
mod session;
|
||||
mod client;
|
||||
mod unix_socket_server;
|
||||
use anyhow::Result;
|
||||
pub use reply::BusReply;
|
||||
pub use request::BusRequest;
|
||||
pub use response::BusResponse;
|
||||
pub use session::BusSession;
|
||||
pub use client::bus_request;
|
||||
pub use unix_socket_server::UnixSocketServer;
|
||||
|
||||
/// The local socket path to which `lqosd` will bind itself,
|
||||
/// listening for requets.
|
||||
|
@ -1,14 +1,14 @@
|
||||
use std::{fs::remove_file, ffi::CString};
|
||||
use lqos_bus::{BUS_SOCKET_PATH, decode_request, BusReply, encode_response};
|
||||
use crate::{BUS_SOCKET_PATH, decode_request, BusReply, encode_response, BusRequest, BusResponse};
|
||||
use anyhow::Result;
|
||||
use nix::libc::mode_t;
|
||||
use tokio::{net::{UnixListener, UnixStream}, io::{AsyncReadExt, AsyncWriteExt}};
|
||||
use log::warn;
|
||||
|
||||
pub(crate) struct UnixSocketServer {}
|
||||
pub struct UnixSocketServer {}
|
||||
|
||||
impl UnixSocketServer {
|
||||
pub(crate) fn new() -> Result<Self> {
|
||||
pub fn new() -> Result<Self> {
|
||||
Self::delete_local_socket()?;
|
||||
Ok(Self {})
|
||||
}
|
||||
@ -29,7 +29,7 @@ impl UnixSocketServer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn listen(&self) -> Result<()>
|
||||
pub async fn listen(&self, handle_bus_requests: fn(&[BusRequest], &mut Vec<BusResponse>)) -> Result<()>
|
||||
{
|
||||
// Setup the listener and grant permissions to it
|
||||
let listener = UnixListener::bind(BUS_SOCKET_PATH)?;
|
||||
@ -49,7 +49,7 @@ impl UnixSocketServer {
|
||||
let mut response = BusReply {
|
||||
responses: Vec::new(),
|
||||
};
|
||||
super::handle_bus_requests(&request.requests, &mut response.responses).await;
|
||||
handle_bus_requests(&request.requests, &mut response.responses);
|
||||
let _ = reply_unix(&encode_response(&response).unwrap(), &mut socket).await;
|
||||
} else {
|
||||
warn!("Invalid data on local socket");
|
@ -16,6 +16,7 @@ pub use ip_stats::{IpMapping, IpStats, XdpPpingResult};
|
||||
mod tc_handle;
|
||||
pub use bus::{
|
||||
decode_request, decode_response, encode_request, encode_response, BusReply,
|
||||
BusRequest, BusResponse, BusSession, BUS_SOCKET_PATH, bus_request
|
||||
BusRequest, BusResponse, BusSession, BUS_SOCKET_PATH, bus_request,
|
||||
UnixSocketServer,
|
||||
};
|
||||
pub use tc_handle::TcHandle;
|
||||
|
@ -4,11 +4,10 @@ mod lqos_daht_test;
|
||||
mod program_control;
|
||||
mod throughput_tracker;
|
||||
mod tuning;
|
||||
mod unix_socket_server;
|
||||
use crate::{ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow}, unix_socket_server::UnixSocketServer};
|
||||
use crate::{ip_mapping::{clear_ip_flows, del_ip_flow, list_mapped_ips, map_ip_to_flow}};
|
||||
use anyhow::Result;
|
||||
use log::{info, warn};
|
||||
use lqos_bus::{BusResponse, BusRequest};
|
||||
use lqos_bus::{BusResponse, BusRequest, UnixSocketServer};
|
||||
use lqos_config::LibreQoSConfig;
|
||||
use lqos_queue_tracker::{
|
||||
add_watched_queue, get_raw_circuit_data, spawn_queue_monitor, spawn_queue_structure_monitor,
|
||||
@ -84,11 +83,11 @@ async fn main() -> Result<()> {
|
||||
|
||||
// Main bus listen loop
|
||||
let server = UnixSocketServer::new().expect("Unable to spawn server");
|
||||
server.listen().await?;
|
||||
server.listen(handle_bus_requests).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec<BusResponse>) {
|
||||
fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec<BusResponse>) {
|
||||
for req in requests.iter() {
|
||||
//println!("Request: {:?}", req);
|
||||
responses.push(match req {
|
||||
@ -123,11 +122,19 @@ async fn handle_bus_requests(requests: &[BusRequest], responses: &mut Vec<BusRes
|
||||
lqos_bus::BusResponse::Ack
|
||||
}
|
||||
BusRequest::UpdateLqosDTuning(..) => {
|
||||
tuning::tune_lqosd_from_bus(&req).await
|
||||
let tokio_rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_io()
|
||||
.build()
|
||||
.unwrap();
|
||||
tokio_rt.block_on(tuning::tune_lqosd_from_bus(&req))
|
||||
}
|
||||
#[cfg(feature = "equinix_tests")]
|
||||
BusRequest::RequestLqosEquinixTest => {
|
||||
lqos_daht_test::lqos_daht_test().await
|
||||
let tokio_rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_io()
|
||||
.build()
|
||||
.unwrap();
|
||||
tokio_rt.block_on(lqos_daht_test::lqos_daht_test())
|
||||
}
|
||||
});
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user