Still a work in progress, needs work - but adds a persistent client option for the bus. It currently won't work well without a pause between request/replies - but it works well enough for lqtop. More improvements needed.

This commit is contained in:
Herbert Wolverson
2023-01-18 23:15:09 +00:00
parent da6a8f08a2
commit 3cb2799beb
8 changed files with 73 additions and 18 deletions

View File

@@ -9,6 +9,7 @@ use lqos_bus::*;
pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("encode_request", |b| {
let session_to_encode = BusSession {
persist: false,
requests: vec![BusRequest::Ping],
};
b.iter(|| {
@@ -19,6 +20,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("decode_request", |b| {
let session_to_encode = BusSession {
persist: false,
requests: vec![BusRequest::Ping],
};
let msg = encode_request(&session_to_encode).unwrap();
@@ -61,6 +63,14 @@ pub fn criterion_benchmark(c: &mut Criterion) {
black_box(result);
});
});
c.bench_function("bus_ping_with_persistence", |b| {
let mut client = tokio_rt.block_on(BusClient::new()).unwrap();
b.iter(|| {
let result = tokio_rt.block_on(client.request(vec![BusRequest::Ping])).unwrap();
black_box(result);
});
});
}
criterion_group!(benches, criterion_benchmark);

View File

@@ -12,6 +12,7 @@ use anyhow::Result;
pub async fn bus_request(requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
let mut stream = UnixStream::connect(BUS_SOCKET_PATH).await.unwrap();
let test = BusSession {
persist: false,
requests,
};
let msg = encode_request(&test)?;

View File

@@ -3,6 +3,7 @@ mod request;
mod response;
mod session;
mod client;
mod persistent_client;
mod unix_socket_server;
use anyhow::Result;
pub use reply::BusReply;
@@ -11,6 +12,7 @@ pub use response::BusResponse;
pub use session::BusSession;
pub use client::bus_request;
pub use unix_socket_server::UnixSocketServer;
pub use persistent_client::BusClient;
/// The local socket path to which `lqosd` will bind itself,
/// listening for requets.
@@ -45,6 +47,7 @@ mod test {
#[test]
fn test_session_roundtrip() {
let session = BusSession {
persist: false,
requests: vec![BusRequest::Ping],
};

View File

@@ -0,0 +1,31 @@
use anyhow::Result;
use tokio::{net::UnixStream, io::{AsyncWriteExt, AsyncReadExt}};
use crate::{BUS_SOCKET_PATH, BusRequest, BusResponse, BusSession, encode_request, decode_response};
pub struct BusClient {
stream: UnixStream,
buffer: Vec<u8>,
}
impl BusClient {
pub async fn new() -> Result<Self> {
let stream = UnixStream::connect(BUS_SOCKET_PATH).await?;
Ok(Self {
stream,
buffer: vec![0u8; 10240],
})
}
pub async fn request(&mut self, requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
let test = BusSession {
persist: true,
requests,
};
let msg = encode_request(&test)?;
self.stream.write(&msg).await?;
self.stream.read(&mut self.buffer).await.unwrap();
let reply = decode_response(&self.buffer)?;
Ok(reply.responses)
}
}

View File

@@ -6,6 +6,9 @@ use serde::{Deserialize, Serialize};
/// which serves as a sanity check that the connection is valid.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BusSession {
/// Should the session stick around after this request?
pub persist: bool,
/// A list of requests to include in this session.
pub requests: Vec<BusRequest>,
}

View File

@@ -38,21 +38,27 @@ impl UnixSocketServer {
loop {
let (mut socket, _) = listener.accept().await?;
tokio::spawn(async move {
let mut buf = vec![0; 1024];
loop {
let mut buf = vec![0; 1024];
let _ = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");
let _bytes_read = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");
if let Ok(request) = decode_request(&buf) {
let mut response = BusReply {
responses: Vec::new(),
};
handle_bus_requests(&request.requests, &mut response.responses);
let _ = reply_unix(&encode_response(&response).unwrap(), &mut socket).await;
} else {
warn!("Invalid data on local socket");
if let Ok(request) = decode_request(&buf) {
let mut response = BusReply {
responses: Vec::new(),
};
handle_bus_requests(&request.requests, &mut response.responses);
let _ = reply_unix(&encode_response(&response).unwrap(), &mut socket).await;
if !request.persist {
break;
}
} else {
warn!("Invalid data on local socket");
break;
}
}
});
}

View File

@@ -17,6 +17,6 @@ mod tc_handle;
pub use bus::{
decode_request, decode_response, encode_request, encode_response, BusReply,
BusRequest, BusResponse, BusSession, BUS_SOCKET_PATH, bus_request,
UnixSocketServer,
UnixSocketServer, BusClient,
};
pub use tc_handle::TcHandle;

View File

@@ -1,7 +1,7 @@
use anyhow::Result;
use crossterm::{event::KeyCode, terminal::enable_raw_mode};
use lqos_bus::{
BusRequest, BusResponse, IpStats, bus_request,
BusRequest, BusResponse, IpStats, bus_request, BusClient,
};
use std::{io, time::Duration};
use tui::{
@@ -18,7 +18,7 @@ struct DataResult {
top: Vec<IpStats>,
}
async fn get_data(n_rows: u16) -> Result<DataResult> {
async fn get_data(client: &mut BusClient, n_rows: u16) -> Result<DataResult> {
let mut result = DataResult {
totals: (0, 0, 0, 0),
top: Vec::new(),
@@ -27,7 +27,7 @@ async fn get_data(n_rows: u16) -> Result<DataResult> {
BusRequest::GetCurrentThroughput,
BusRequest::GetTopNDownloaders(n_rows as u32),
];
for r in bus_request(requests).await? {
for r in client.request(requests).await? {
match r {
BusResponse::CurrentThroughput {
bits_per_second,
@@ -167,6 +167,7 @@ fn draw_top_pane<'a>(
#[tokio::main(flavor = "current_thread")]
pub async fn main() -> Result<()> {
let mut bus_client = BusClient::new().await?;
let mut packets = (0, 0);
let mut bits = (0, 0);
let mut top = Vec::new();
@@ -179,7 +180,7 @@ pub async fn main() -> Result<()> {
let mut n_rows = 10;
loop {
if let Ok(result) = get_data(n_rows).await {
if let Ok(result) = get_data(&mut bus_client, n_rows).await {
let (bits_down, bits_up, packets_down, packets_up) = result.totals;
packets = (packets_down, packets_up);
bits = (bits_down, bits_up);