clean up protect/refs

This commit is contained in:
Christien Rioux 2023-11-03 15:57:40 -04:00
parent 934cd93b4b
commit facb343160
19 changed files with 128 additions and 138 deletions

View File

@ -17,33 +17,22 @@ enum ConnectionManagerEvent {
pub(crate) struct ConnectionRefScope {
connection_manager: ConnectionManager,
descriptor: ConnectionDescriptor,
protect: bool,
}
impl ConnectionRefScope {
pub fn new(
connection_manager: ConnectionManager,
descriptor: ConnectionDescriptor,
protect: bool,
) -> Self {
connection_manager.connection_ref(descriptor, ConnectionRefKind::AddRef, protect);
pub fn new(connection_manager: ConnectionManager, descriptor: ConnectionDescriptor) -> Self {
connection_manager.connection_ref(descriptor, ConnectionRefKind::AddRef);
Self {
connection_manager,
descriptor,
protect,
}
}
}
impl Drop for ConnectionRefScope {
fn drop(&mut self) {
if !self.protect {
self.connection_manager.connection_ref(
self.descriptor,
ConnectionRefKind::RemoveRef,
false,
);
}
self.connection_manager
.connection_ref(self.descriptor, ConnectionRefKind::RemoveRef);
}
}
@ -169,6 +158,33 @@ impl ConnectionManager {
debug!("finished connection manager shutdown");
}
// Internal routine to see if we should keep this connection
// from being LRU removed. Used on our initiated relay connections.
fn should_protect_connection(&self, conn: &NetworkConnection) -> Option<NodeRef> {
let netman = self.network_manager();
let routing_table = netman.routing_table();
let remote_address = conn.connection_descriptor().remote_address().address();
let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else {
return None;
};
let Some(rn) = routing_table.relay_node(routing_domain) else {
return None;
};
let relay_nr = rn.filtered_clone(
NodeRefFilter::new()
.with_routing_domain(routing_domain)
.with_address_type(conn.connection_descriptor().address_type())
.with_protocol_type(conn.connection_descriptor().protocol_type()),
);
let dids = relay_nr.all_filtered_dial_info_details();
for did in dids {
if did.dial_info.address() == remote_address {
return Some(relay_nr);
}
}
None
}
// Internal routine to register new connection atomically.
// Registers connection in the connection table for later access
// and spawns a message processing loop for the connection
@ -193,9 +209,15 @@ impl ConnectionManager {
None => bail!("not creating connection because we are stopping"),
};
let conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
let mut conn = NetworkConnection::from_protocol(self.clone(), stop_token, prot_conn, id);
let handle = conn.get_handle();
// See if this should be a protected connection
if let Some(protect_nr) = self.should_protect_connection(&conn) {
log_net!(debug "== PROTECTING connection: {} -> {} for node {}", id, conn.debug_print(get_aligned_timestamp()), protect_nr);
conn.protect(protect_nr);
}
// Add to the connection table
match self.arc.connection_table.add_connection(conn) {
Ok(None) => {
@ -253,22 +275,13 @@ impl ConnectionManager {
}
// Protects a network connection if one already is established
fn connection_ref(
&self,
descriptor: ConnectionDescriptor,
kind: ConnectionRefKind,
protect: bool,
) {
fn connection_ref(&self, descriptor: ConnectionDescriptor, kind: ConnectionRefKind) {
self.arc
.connection_table
.ref_connection_by_descriptor(descriptor, kind, protect);
.ref_connection_by_descriptor(descriptor, kind);
}
pub fn connection_ref_scope(
&self,
descriptor: ConnectionDescriptor,
protect: bool,
) -> ConnectionRefScope {
ConnectionRefScope::new(self.clone(), descriptor, protect)
pub fn connection_ref_scope(&self, descriptor: ConnectionDescriptor) -> ConnectionRefScope {
ConnectionRefScope::new(self.clone(), descriptor)
}
/// Called when we want to create a new connection or get the current one that already exists
@ -446,6 +459,11 @@ impl ConnectionManager {
// Inform the processor of the event
if let Some(conn) = conn {
// If the connection closed while it was protected, report it on the node the connection was established on
// In-use connections will already get reported because they will cause a 'question_lost' stat on the remote node
if let Some(protect_nr) = conn.protected_node_ref() {
protect_nr.report_protected_connection_dropped();
}
let _ = sender.send_async(ConnectionManagerEvent::Dead(conn)).await;
}
}

View File

@ -195,7 +195,7 @@ impl ConnectionTable {
// Find a free connection to terminate to make room
let dead_k = {
let Some(lruk) = inner.conn_by_id[protocol_index].iter().find_map(|(k, v)| {
if !v.is_in_use() {
if !v.is_in_use() && v.protected_node_ref().is_none() {
Some(*k)
} else {
None
@ -254,7 +254,6 @@ impl ConnectionTable {
&self,
descriptor: ConnectionDescriptor,
ref_type: ConnectionRefKind,
protect: bool,
) -> bool {
if descriptor.protocol_type() == ProtocolType::UDP {
return false;
@ -269,11 +268,8 @@ impl ConnectionTable {
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());
let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap();
match ref_type {
ConnectionRefKind::AddRef => out.change_ref_count(true),
ConnectionRefKind::RemoveRef => out.change_ref_count(false),
}
if protect {
out.protect();
ConnectionRefKind::AddRef => out.add_ref(),
ConnectionRefKind::RemoveRef => out.remove_ref(),
}
true

View File

@ -135,7 +135,7 @@ impl DiscoveryContext {
async fn request_public_address(&self, node_ref: NodeRef) -> Option<SocketAddress> {
let rpc = self.unlocked_inner.routing_table.rpc_processor();
let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone()), false).await {
let res = network_result_value_or_log!(match rpc.rpc_call_status(Destination::direct(node_ref.clone())).await {
Ok(v) => v,
Err(e) => {
log_net!(error

View File

@ -27,8 +27,8 @@ impl RawTcpNetworkConnection {
instrument(level = "trace", err, skip(self))
)]
pub async fn close(&self) -> io::Result<NetworkResult<()>> {
// Make an attempt to flush the stream
self.stream.clone().close().await?;
let mut stream = self.stream.clone();
let _ = stream.close().await;
Ok(NetworkResult::value(()))
// // Then shut down the write side of the socket to effect a clean close

View File

@ -86,7 +86,7 @@ pub struct NetworkConnectionStats {
pub type NetworkConnectionId = AlignedU64;
#[derive(Debug)]
pub struct NetworkConnection {
pub(in crate::network_manager) struct NetworkConnection {
connection_id: NetworkConnectionId,
descriptor: ConnectionDescriptor,
processor: Option<MustJoinHandle<()>>,
@ -94,7 +94,7 @@ pub struct NetworkConnection {
stats: Arc<Mutex<NetworkConnectionStats>>,
sender: flume::Sender<(Option<Id>, Vec<u8>)>,
stop_source: Option<StopSource>,
protected: bool,
protected_nr: Option<NodeRef>,
ref_count: usize,
}
@ -123,7 +123,7 @@ impl NetworkConnection {
})),
sender,
stop_source: None,
protected: false,
protected_nr: None,
ref_count: 0,
}
}
@ -170,7 +170,7 @@ impl NetworkConnection {
stats,
sender,
stop_source: Some(stop_source),
protected: false,
protected_nr: None,
ref_count: 0,
}
}
@ -188,19 +188,23 @@ impl NetworkConnection {
}
pub fn is_in_use(&self) -> bool {
self.protected || self.ref_count > 0
self.ref_count > 0
}
pub fn protect(&mut self) {
self.protected = true;
pub fn protected_node_ref(&self) -> Option<NodeRef>{
self.protected_nr.clone()
}
pub fn change_ref_count(&mut self, add: bool) {
if add {
self.ref_count += 1;
} else {
self.ref_count -= 1;
}
pub fn protect(&mut self, protect_nr: NodeRef) {
self.protected_nr = Some(protect_nr);
}
pub fn add_ref(&mut self) {
self.ref_count += 1;
}
pub fn remove_ref(&mut self) {
self.ref_count -= 1;
}
pub fn close(&mut self) {
@ -373,7 +377,6 @@ impl NetworkConnection {
// Touch the LRU for this connection
connection_manager.touch_connection_by_id(connection_id);
RecvLoopAction::Recv
}
}
@ -439,13 +442,19 @@ impl NetworkConnection {
}
pub fn debug_print(&self, cur_ts: Timestamp) -> String {
format!("{} <- {} | {} | est {} sent {} rcvd {}",
format!("{} <- {} | {} | est {} sent {} rcvd {} refcount {}{}",
self.descriptor.remote_address(),
self.descriptor.local().map(|x| x.to_string()).unwrap_or("---".to_owned()),
self.connection_id.as_u64(),
debug_duration(cur_ts.as_u64().saturating_sub(self.established_time.as_u64())),
self.stats().last_message_sent_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()),
self.stats().last_message_recv_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()),
self.ref_count,
if let Some(pnr) = &self.protected_nr {
format!(" PROTECTED:{}",pnr)
} else {
"".to_owned()
}
)
}
}

View File

@ -42,6 +42,7 @@ impl NetworkManager {
.self_stats
.transfer_stats_accounting
.add_up(bytes);
#[allow(clippy::unwrap_or_default)]
inner
.stats
.per_address_stats
@ -58,6 +59,7 @@ impl NetworkManager {
.self_stats
.transfer_stats_accounting
.add_down(bytes);
#[allow(clippy::unwrap_or_default)]
inner
.stats
.per_address_stats

View File

@ -192,7 +192,7 @@ impl NetworkManager {
let pait = inner
.public_address_inconsistencies_table
.entry(addr_proto_type_key)
.or_insert_with(HashMap::new);
.or_default();
for i in &inconsistencies {
pait.insert(*i, exp_ts);
}
@ -204,7 +204,7 @@ impl NetworkManager {
let pait = inner
.public_address_inconsistencies_table
.entry(addr_proto_type_key)
.or_insert_with(HashMap::new);
.or_default();
let exp_ts = get_aligned_timestamp()
+ PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US;
for i in inconsistencies {

View File

@ -321,6 +321,10 @@ pub(crate) trait NodeRefBase: Sized {
})
}
fn report_protected_connection_dropped(&self) {
self.stats_failed_to_send(get_aligned_timestamp(), false);
}
fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) {
self.operate_mut(|rti, e| {
rti.transfer_stats_accounting().add_up(bytes);

View File

@ -703,7 +703,7 @@ impl RouteSpecStore {
// Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest, true).await? {
let _res = match rpc_processor.rpc_call_status(dest).await? {
NetworkResult::Value(v) => v,
_ => {
// Did not error, but did not come back, just return false
@ -746,7 +746,7 @@ impl RouteSpecStore {
// Test with double-round trip ping to self
let rpc_processor = self.unlocked_inner.routing_table.rpc_processor();
let _res = match rpc_processor.rpc_call_status(dest, true).await? {
let _res = match rpc_processor.rpc_call_status(dest).await? {
NetworkResult::Value(v) => v,
_ => {
// Did not error, but did not come back, just return false

View File

@ -112,7 +112,7 @@ impl RoutingTable {
futurequeue.push_back(
async move {
rpc.rpc_call_status(Destination::direct(relay_nr_filtered), true)
rpc.rpc_call_status(Destination::direct(relay_nr_filtered))
.await
}
.instrument(Span::current())
@ -148,7 +148,7 @@ impl RoutingTable {
let rpc = rpc.clone();
log_rtab!("--> Validator ping to {:?}", nr);
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr), false).await }
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
);
@ -176,7 +176,7 @@ impl RoutingTable {
// Just do a single ping with the best protocol for all the nodes
futurequeue.push_back(
async move { rpc.rpc_call_status(Destination::direct(nr), false).await }
async move { rpc.rpc_call_status(Destination::direct(nr)).await }
.instrument(Span::current())
.boxed(),
);

View File

@ -1148,7 +1148,6 @@ impl RPCProcessor {
dest: Destination,
question: RPCQuestion,
context: Option<QuestionContext>,
protect: bool,
) -> RPCNetworkResult<WaitableReply> {
// Get sender peer info if we should send that
let spi = self.get_sender_peer_info(&dest);
@ -1228,10 +1227,7 @@ impl RPCProcessor {
let connection_ref_scope = self
.network_manager()
.connection_manager()
.connection_ref_scope(
send_data_method.connection_descriptor,
protect,
);
.connection_ref_scope(send_data_method.connection_descriptor);
// Pass back waitable reply completion
Ok(NetworkResult::value(WaitableReply {

View File

@ -21,7 +21,7 @@ impl RPCProcessor {
);
// Send the app call question
let waitable_reply = network_result_try!(self.question(dest, question, None, false).await?);
let waitable_reply = network_result_try!(self.question(dest, question, None).await?);
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -41,8 +41,7 @@ impl RPCProcessor {
let debug_string = format!("FindNode(node_id={}) => {}", node_id, dest);
// Send the find_node request
let waitable_reply =
network_result_try!(self.question(dest, find_node_q, None, false).await?);
let waitable_reply = network_result_try!(self.question(dest, find_node_q, None).await?);
// Wait for reply
let (msg, latency) = match self.wait_for_reply(waitable_reply, debug_string).await? {

View File

@ -78,7 +78,7 @@ impl RPCProcessor {
log_rpc!(debug "{}", debug_string);
let waitable_reply = network_result_try!(
self.question(dest.clone(), question, Some(question_context), false)
self.question(dest.clone(), question, Some(question_context))
.await?
);

View File

@ -92,7 +92,7 @@ impl RPCProcessor {
log_rpc!(debug "{}", debug_string);
let waitable_reply = network_result_try!(
self.question(dest.clone(), question, Some(question_context), false)
self.question(dest.clone(), question, Some(question_context))
.await?
);

View File

@ -22,7 +22,6 @@ impl RPCProcessor {
pub async fn rpc_call_status(
self,
dest: Destination,
protect: bool,
) -> RPCNetworkResult<Answer<Option<SenderInfo>>> {
let (opt_target_nr, routing_domain, node_status) = match dest.get_safety_selection() {
SafetySelection::Unsafe(_) => {
@ -109,7 +108,7 @@ impl RPCProcessor {
// Send the info request
let waitable_reply =
network_result_try!(self.question(dest.clone(), question, None, protect).await?);
network_result_try!(self.question(dest.clone(), question, None).await?);
// Note what kind of ping this was and to what peer scope
let send_data_method = waitable_reply.send_data_method.clone();

View File

@ -930,7 +930,7 @@ impl VeilidAPI {
// Send a StatusQ
let out = match rpc
.rpc_call_status(dest, false)
.rpc_call_status(dest)
.await
.map_err(VeilidAPIError::internal)?
{

View File

@ -75,9 +75,7 @@ where
return Err(());
}
};
if maybe_jh.is_some() {
let mut jh = maybe_jh.unwrap();
if let Some(mut jh) = maybe_jh {
// See if we finished, if so, return the value of the last execution
if let Poll::Ready(r) = poll!(&mut jh) {
out = Some(r);
@ -110,8 +108,7 @@ where
return Err(());
}
};
if maybe_jh.is_some() {
let jh = maybe_jh.unwrap();
if let Some(jh) = maybe_jh {
// Wait for return value of the last execution
out = Some(jh.await);
// Task finished, unlock with nothing
@ -141,9 +138,7 @@ where
};
let mut run = true;
if maybe_jh.is_some() {
let mut jh = maybe_jh.unwrap();
if let Some(mut jh) = maybe_jh {
// See if we finished, if so, return the value of the last execution
if let Poll::Ready(r) = poll!(&mut jh) {
out = Some(r);
@ -183,8 +178,7 @@ where
}
};
let mut run = true;
if maybe_jh.is_some() {
let mut jh = maybe_jh.unwrap();
if let Some(mut jh) = maybe_jh {
// See if we finished, if so, return the value of the last execution
if let Poll::Ready(r) = poll!(&mut jh) {
out = Some(r);

View File

@ -73,8 +73,7 @@ pub async fn test_nothing() {
a.write_all(&outbuf).await.unwrap();
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
let mut inbuf: Vec<u8> = vec![0; outbuf.len()];
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
@ -88,8 +87,7 @@ pub async fn test_no_peek() {
a.write_all(&outbuf).await.unwrap();
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
let mut inbuf: Vec<u8> = vec![0; outbuf.len()];
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
@ -104,14 +102,12 @@ pub async fn test_peek_all_read() {
a.write_all(&outbuf).await.unwrap();
// peek everything
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len(), 0u8);
let mut peekbuf1: Vec<u8> = vec![0; outbuf.len()];
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read everything
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
let mut inbuf: Vec<u8> = vec![0; outbuf.len()];
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
@ -128,13 +124,11 @@ pub async fn test_peek_some_read() {
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 2, 0u8);
let mut peekbuf1: Vec<u8> = vec![0; outbuf.len() / 2];
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read everything
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
let mut inbuf: Vec<u8> = vec![0; outbuf.len()];
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
@ -151,20 +145,17 @@ pub async fn test_peek_some_peek_some_read() {
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let mut peekbuf1: Vec<u8> = vec![0; outbuf.len() / 4];
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(peeksize1 + 1, 0u8);
let mut peekbuf2: Vec<u8> = vec![0; peeksize1 + 1];
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read everything
let mut inbuf: Vec<u8> = Vec::new();
inbuf.resize(outbuf.len(), 0u8);
let mut inbuf: Vec<u8> = vec![0; outbuf.len()];
c.read_exact(&mut inbuf).await.unwrap();
assert_eq!(inbuf, outbuf);
@ -182,25 +173,21 @@ pub async fn test_peek_some_read_peek_some_read() {
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let mut peekbuf1: Vec<u8> = vec![0; outbuf.len() / 4];
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 - 1, 0u8);
let mut inbuf1: Vec<u8> = vec![0; peeksize1 - 1];
c.read_exact(&mut inbuf1).await.unwrap();
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(2, 0u8);
let mut peekbuf2: Vec<u8> = vec![0; 2];
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read partially
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(2, 0u8);
let mut inbuf2: Vec<u8> = vec![0; 2];
c.read_exact(&mut inbuf2).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
@ -219,25 +206,21 @@ pub async fn test_peek_some_read_peek_all_read() {
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let mut peekbuf1: Vec<u8> = vec![0; outbuf.len() / 4];
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 + 1, 0u8);
let mut inbuf1: Vec<u8> = vec![0; peeksize1 + 1];
c.read_exact(&mut inbuf1).await.unwrap();
// peek past end
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(outbuf.len(), 0u8);
let mut peekbuf2: Vec<u8> = vec![0; outbuf.len()];
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, outbuf.len() - (peeksize1 + 1));
// read remaining
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(peeksize2, 0u8);
let mut inbuf2: Vec<u8> = vec![0; peeksize2];
c.read_exact(&mut inbuf2).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
@ -259,29 +242,24 @@ pub async fn test_peek_some_read_peek_some_read_all_read() {
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let mut peekbuf1: Vec<u8> = vec![0; outbuf.len() / 4];
let peeksize1 = c.peek(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 - 1, 0u8);
let mut inbuf1: Vec<u8> = vec![0; peeksize1 - 1];
c.read_exact(&mut inbuf1).await.unwrap();
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(2, 0u8);
let mut peekbuf2: Vec<u8> = vec![0; 2];
let peeksize2 = c.peek(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read partially
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(1, 0u8);
let mut inbuf2: Vec<u8> = vec![0; 1];
c.read_exact(&mut inbuf2).await.unwrap();
// read remaining
let mut inbuf3: Vec<u8> = Vec::new();
inbuf3.resize(outbuf.len() - peeksize1, 0u8);
let mut inbuf3: Vec<u8> = vec![0; outbuf.len() - peeksize1];
c.read_exact(&mut inbuf3).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());
@ -304,29 +282,24 @@ pub async fn test_peek_exact_read_peek_exact_read_all_read() {
a.write_all(&outbuf).await.unwrap();
// peek partially
let mut peekbuf1: Vec<u8> = Vec::new();
peekbuf1.resize(outbuf.len() / 4, 0u8);
let mut peekbuf1: Vec<u8> = vec![0; outbuf.len() / 4];
let peeksize1 = c.peek_exact(&mut peekbuf1).await.unwrap();
assert_eq!(peeksize1, peekbuf1.len());
// read partially
let mut inbuf1: Vec<u8> = Vec::new();
inbuf1.resize(peeksize1 - 1, 0u8);
let mut inbuf1: Vec<u8> = vec![0; peeksize1 - 1];
c.read_exact(&mut inbuf1).await.unwrap();
// peek partially
let mut peekbuf2: Vec<u8> = Vec::new();
peekbuf2.resize(2, 0u8);
let mut peekbuf2: Vec<u8> = vec![0; 2];
let peeksize2 = c.peek_exact(&mut peekbuf2).await.unwrap();
assert_eq!(peeksize2, peekbuf2.len());
// read partially
let mut inbuf2: Vec<u8> = Vec::new();
inbuf2.resize(1, 0u8);
let mut inbuf2: Vec<u8> = vec![0; 1];
c.read_exact(&mut inbuf2).await.unwrap();
// read remaining
let mut inbuf3: Vec<u8> = Vec::new();
inbuf3.resize(outbuf.len() - peeksize1, 0u8);
let mut inbuf3: Vec<u8> = vec![0; outbuf.len() - peeksize1];
c.read_exact(&mut inbuf3).await.unwrap();
assert_eq!(peekbuf1, outbuf[0..peeksize1].to_vec());