turn off linger

This commit is contained in:
Christien Rioux 2023-10-23 20:17:21 -04:00
parent 2c4883ee85
commit 23f47be357
8 changed files with 103 additions and 86 deletions

View File

@ -132,33 +132,33 @@ impl Network {
} }
}; };
#[cfg(all(feature = "rt-async-std", unix))] // #[cfg(all(feature = "rt-async-std", unix))]
{ // {
// async-std does not directly support linger on TcpStream yet // // async-std does not directly support linger on TcpStream yet
use std::os::fd::{AsRawFd, FromRawFd}; // use std::os::fd::{AsRawFd, FromRawFd};
if let Err(e) = unsafe { socket2::Socket::from_raw_fd(tcp_stream.as_raw_fd()) } // if let Err(e) = unsafe { socket2::Socket::from_raw_fd(tcp_stream.as_raw_fd()) }
.set_linger(Some(core::time::Duration::from_secs(0))) // .set_linger(Some(core::time::Duration::from_secs(0)))
{ // {
log_net!(debug "Couldn't set TCP linger: {}", e); // log_net!(debug "Couldn't set TCP linger: {}", e);
return; // return;
} // }
} // }
#[cfg(all(feature = "rt-async-std", windows))] // #[cfg(all(feature = "rt-async-std", windows))]
{ // {
// async-std does not directly support linger on TcpStream yet // // async-std does not directly support linger on TcpStream yet
use std::os::windows::io::{AsRawSocket, FromRawSocket}; // use std::os::windows::io::{AsRawSocket, FromRawSocket};
if let Err(e) = unsafe { socket2::Socket::from_raw_socket(tcp_stream.as_raw_socket()) } // if let Err(e) = unsafe { socket2::Socket::from_raw_socket(tcp_stream.as_raw_socket()) }
.set_linger(Some(core::time::Duration::from_secs(0))) // .set_linger(Some(core::time::Duration::from_secs(0)))
{ // {
log_net!(debug "Couldn't set TCP linger: {}", e); // log_net!(debug "Couldn't set TCP linger: {}", e);
return; // return;
} // }
} // }
#[cfg(not(feature = "rt-async-std"))] // #[cfg(not(feature = "rt-async-std"))]
if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) { // if let Err(e) = tcp_stream.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(debug "Couldn't set TCP linger: {}", e); // log_net!(debug "Couldn't set TCP linger: {}", e);
return; // return;
} // }
if let Err(e) = tcp_stream.set_nodelay(true) { if let Err(e) = tcp_stream.set_nodelay(true) {
log_net!(debug "Couldn't set TCP nodelay: {}", e); log_net!(debug "Couldn't set TCP nodelay: {}", e);
return; return;

View File

@ -55,15 +55,15 @@ impl ProtocolNetworkConnection {
} }
} }
// pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {
// match self { match self {
// Self::Dummy(d) => d.close(), Self::Dummy(d) => d.close(),
// Self::RawTcp(t) => t.close().await, Self::RawTcp(t) => t.close().await,
// Self::WsAccepted(w) => w.close().await, Self::WsAccepted(w) => w.close().await,
// Self::Ws(w) => w.close().await, Self::Ws(w) => w.close().await,
// Self::Wss(w) => w.close().await, Self::Wss(w) => w.close().await,
// } }
// } }
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
match self { match self {

View File

@ -112,9 +112,9 @@ pub fn new_unbound_tcp_socket(domain: Domain) -> io::Result<Socket> {
#[instrument(level = "trace", ret)] #[instrument(level = "trace", ret)]
pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> { pub fn new_unbound_shared_tcp_socket(domain: Domain) -> io::Result<Socket> {
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { // if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(error "Couldn't set TCP linger: {}", e); // log_net!(error "Couldn't set TCP linger: {}", e);
} // }
if let Err(e) = socket.set_nodelay(true) { if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e); log_net!(error "Couldn't set TCP nodelay: {}", e);
} }
@ -148,9 +148,9 @@ pub fn new_bound_first_tcp_socket(local_address: SocketAddr) -> io::Result<Socke
let domain = Domain::for_address(local_address); let domain = Domain::for_address(local_address);
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?; let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) { // if let Err(e) = socket.set_linger(Some(core::time::Duration::from_secs(0))) {
log_net!(error "Couldn't set TCP linger: {}", e); // log_net!(error "Couldn't set TCP linger: {}", e);
} // }
if let Err(e) = socket.set_nodelay(true) { if let Err(e) = socket.set_nodelay(true) {
log_net!(error "Couldn't set TCP nodelay: {}", e); log_net!(error "Couldn't set TCP nodelay: {}", e);
} }

View File

@ -22,25 +22,30 @@ impl RawTcpNetworkConnection {
self.descriptor self.descriptor
} }
// #[instrument(level = "trace", err, skip(self))] #[cfg_attr(
// pub async fn close(&mut self) -> io::Result<NetworkResult<()>> { feature = "verbose-tracing",
// // Make an attempt to flush the stream instrument(level = "trace", err, skip(self))
// self.stream.clone().close().await?; )]
// // Then shut down the write side of the socket to effect a clean close pub async fn close(&self) -> io::Result<NetworkResult<()>> {
// cfg_if! { // Make an attempt to flush the stream
// if #[cfg(feature="rt-async-std")] { self.stream.clone().close().await?;
// self.tcp_stream Ok(NetworkResult::value(()))
// .shutdown(async_std::net::Shutdown::Write)
// } else if #[cfg(feature="rt-tokio")] { // // Then shut down the write side of the socket to effect a clean close
// use tokio::io::AsyncWriteExt; // cfg_if! {
// self.tcp_stream.get_mut() // if #[cfg(feature="rt-async-std")] {
// .shutdown() // self.tcp_stream
// .await // .shutdown(async_std::net::Shutdown::Write)
// } else { // } else if #[cfg(feature="rt-tokio")] {
// compile_error!("needs executor implementation") // use tokio::io::AsyncWriteExt;
// } // self.tcp_stream.get_mut()
// } // .shutdown()
// } // .await
// } else {
// compile_error!("needs executor implementation")
// }
// }
}
async fn send_internal( async fn send_internal(
stream: &mut AsyncPeekStream, stream: &mut AsyncPeekStream,

View File

@ -77,15 +77,24 @@ where
self.descriptor self.descriptor
} }
// #[instrument(level = "trace", err, skip(self))] #[cfg_attr(
// pub async fn close(&self) -> io::Result<()> { feature = "verbose-tracing",
// // Make an attempt to flush the stream instrument(level = "trace", err, skip(self))
// self.stream.clone().close().await.map_err(to_io_error_other)?; )]
// // Then forcibly close the socket pub async fn close(&self) -> io::Result<NetworkResult<()>> {
// self.tcp_stream // Make an attempt to flush the stream
// .shutdown(Shutdown::Both) self.stream
// .map_err(to_io_error_other) .clone()
// } .close()
.await
.map_err(to_io_error_other)?;
// // Then forcibly close the socket
// self.tcp_stream
// .shutdown(Shutdown::Both)
// .map_err(to_io_error_other)
Ok(NetworkResult::value(()))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {

View File

@ -54,9 +54,9 @@ impl DummyNetworkConnection {
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn descriptor(&self) -> ConnectionDescriptor {
self.descriptor self.descriptor
} }
// pub fn close(&self) -> io::Result<()> { pub fn close(&self) -> io::Result<NetworkResult<()>> {
// Ok(()) Ok(NetworkResult::Value(()))
// } }
pub fn send(&self, _message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub fn send(&self, _message: Vec<u8>) -> io::Result<NetworkResult<()>> {
Ok(NetworkResult::Value(())) Ok(NetworkResult::Value(()))
} }

View File

@ -41,13 +41,12 @@ impl ProtocolNetworkConnection {
Self::Ws(w) => w.descriptor(), Self::Ws(w) => w.descriptor(),
} }
} }
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
// pub async fn close(&self) -> io::Result<NetworkResult<()>> { match self {
// match self { Self::Dummy(d) => d.close(),
// Self::Dummy(d) => d.close(), Self::Ws(w) => w.close().await,
// Self::Ws(w) => w.close().await, }
// } }
// }
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {
match self { match self {
Self::Dummy(d) => d.send(message), Self::Dummy(d) => d.send(message),

View File

@ -5,7 +5,7 @@ use std::io;
use ws_stream_wasm::*; use ws_stream_wasm::*;
struct WebsocketNetworkConnectionInner { struct WebsocketNetworkConnectionInner {
_ws_meta: WsMeta, ws_meta: WsMeta,
ws_stream: CloneStream<WsStream>, ws_stream: CloneStream<WsStream>,
} }
@ -49,7 +49,7 @@ impl WebsocketNetworkConnection {
Self { Self {
descriptor, descriptor,
inner: Arc::new(WebsocketNetworkConnectionInner { inner: Arc::new(WebsocketNetworkConnectionInner {
_ws_meta: ws_meta, ws_meta,
ws_stream: CloneStream::new(ws_stream), ws_stream: CloneStream::new(ws_stream),
}), }),
} }
@ -59,10 +59,14 @@ impl WebsocketNetworkConnection {
self.descriptor self.descriptor
} }
// #[instrument(level = "trace", err, skip(self))] #[cfg_attr(
// pub async fn close(&self) -> io::Result<()> { feature = "verbose-tracing",
// self.inner.ws_meta.close().await.map_err(to_io).map(drop) instrument(level = "trace", err, skip(self))
// } )]
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
let _ = self.inner.ws_meta.close().await.map_err(to_io)?;
Ok(NetworkResult::value(()))
}
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, message), fields(network_result, message.len = message.len())))]
pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> { pub async fn send(&self, message: Vec<u8>) -> io::Result<NetworkResult<()>> {