Bus Client is more resilient to socket failure

* BusClient will survive creation without a successful connection.
* Attempting to use the BusClient without a connection will try to
  connect.
* Attempts to use the BusClient are gated by timeouts and error
  checking. Failures result in returning the client stream to
  "None" - triggering a reconnect attempt on the next use.
* Client programs have added the Tokio "time" feature, needed for
  timeout support.
* lqtop and lqos_node_manager have been tested to run without
  lqosd started, and with stop/start cycles triggering resumption.
This commit is contained in:
Herbert Wolverson 2023-01-20 15:20:21 +00:00
parent adb8c1f471
commit e64e41c96b
7 changed files with 112 additions and 34 deletions

View File

@ -12,7 +12,7 @@ serde = { version = "1.0", features = ["derive"] }
bincode = "1"
anyhow = "1"
lqos_config = { path = "../lqos_config" }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] }
log = "0"
nix = "0"

View File

@ -1,39 +1,116 @@
use anyhow::Result;
use tokio::{net::UnixStream, io::{AsyncWriteExt, AsyncReadExt}};
use crate::{BUS_SOCKET_PATH, BusRequest, BusResponse, BusSession, encode_request, decode_response};
use crate::{
decode_response, encode_request, BusRequest, BusResponse, BusSession,
BUS_SOCKET_PATH,
};
use anyhow::{Error, Result};
use std::time::Duration;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::UnixStream,
time::timeout,
};
/// Provides a lqosd bus client that persists between connections. Useful for when you are
/// going to be repeatedly polling the bus for data (e.g. `lqtop`) and want to avoid the
/// overhead of an individual connection.
pub struct BusClient {
stream: UnixStream,
buffer: Vec<u8>,
stream: Option<UnixStream>,
buffer: Vec<u8>,
timeout: Duration,
}
impl BusClient {
/// Instantiates a bus client, connecting to the bus stream and initializing
/// a buffer.
pub async fn new() -> Result<Self> {
let stream = UnixStream::connect(BUS_SOCKET_PATH).await?;
Ok(Self {
stream,
buffer: vec![0u8; 10240],
})
/// Instantiates a bus client, connecting to the bus stream and initializing
/// a buffer.
pub async fn new() -> Result<Self> {
Ok(Self {
stream: Self::connect().await,
buffer: vec![0u8; 10240],
timeout: Duration::from_millis(100),
})
}
async fn connect() -> Option<UnixStream> {
if let Ok(stream) = UnixStream::connect(BUS_SOCKET_PATH).await {
Some(stream)
} else {
None
}
}
/// Analagous to the singe-task `bus_request`, sends a request to the existing
/// bus connection.
pub async fn request(
&mut self,
requests: Vec<BusRequest>,
) -> Result<Vec<BusResponse>> {
if self.stream.is_none() {
self.stream = Self::connect().await;
}
/// Analagous to the singe-task `bus_request`, sends a request to the existing
/// bus connection.
pub async fn request(&mut self, requests: Vec<BusRequest>) -> Result<Vec<BusResponse>> {
let test = BusSession {
persist: true,
requests,
};
let msg = encode_request(&test)?;
self.stream.write(&msg).await?;
self.stream.read(&mut self.buffer).await.unwrap();
let reply = decode_response(&self.buffer)?;
self.buffer.iter_mut().for_each(|b| *b=0);
Ok(reply.responses)
// If the stream isn't writeable, bail out
if self.stream.is_some()
&& self.stream.as_ref().unwrap().writable().await.is_err()
{
// The stream has gone away
self.stream = None;
return Err(Error::msg("Stream not connected"));
}
// Encode the message
let message = BusSession { persist: true, requests };
let msg = encode_request(&message)?;
// Send with a timeout. If the timeout fails, then the stream went wrong
if self.stream.is_some() {
let timer = timeout(
self.timeout,
Self::send(self.stream.as_mut().unwrap(), &msg),
);
let failed = if let Ok(inner) = timer.await {
if inner.is_err() {
true
} else {
false
}
} else {
false
};
if failed {
self.stream = None;
return Err(Error::msg("Stream not connected"));
}
}
// Receive with a timeout. If the timeout fails, then something went wrong.
if self.stream.is_some() {
let timer = timeout(
self.timeout,
self.stream.as_mut().unwrap().read(&mut self.buffer),
);
let failed = if let Ok(inner) = timer.await {
if inner.is_err() {
true
} else {
false
}
} else {
false
};
if failed {
self.stream = None;
return Err(Error::msg("Stream not connected"));
}
}
let reply = decode_response(&self.buffer)?;
self.buffer.iter_mut().for_each(|b| *b = 0);
Ok(reply.responses)
}
async fn send(stream: &mut UnixStream, msg: &[u8]) -> Result<()> {
stream.write(&msg).await?;
Ok(())
}
}

View File

@ -76,8 +76,9 @@ async fn get_data_from_server(bus_client: &mut BusClient) -> Result<()> {
BusRequest::RttHistogram,
BusRequest::AllUnknownIps,
];
for r in lqos_bus::bus_request(requests).await?.iter() {
// for r in bus_client.request(requests).await?.iter() {
// for r in lqos_bus::bus_request(requests).await?.iter() {
println!("Tick");
for r in bus_client.request(requests).await?.iter() {
match r {
BusResponse::CurrentThroughput {
bits_per_second,

View File

@ -10,5 +10,5 @@ crate-type = ["cdylib"]
[dependencies]
pyo3 = "0.17"
lqos_bus = { path = "../lqos_bus" }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] }
anyhow = "1"

View File

@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] }
lqos_bus = { path = "../lqos_bus" }
anyhow = "1"
tui = "0.19"

View File

@ -5,6 +5,6 @@ edition = "2021"
[dependencies]
clap = { version = "4", features = ["derive"] }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] }
anyhow = "1"
lqos_bus = { path = "../lqos_bus" }

View File

@ -4,6 +4,6 @@ version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util" ] }
tokio = { version = "1.22", features = [ "rt", "macros", "net", "io-util", "time" ] }
anyhow = "1"
lqos_bus = { path = "../lqos_bus" }