refactor for flows

This commit is contained in:
Christien Rioux 2023-11-04 14:08:39 -04:00
parent facb343160
commit 0640342556
32 changed files with 468 additions and 424 deletions

View File

@ -2,8 +2,8 @@ use super::*;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct ConnectionHandle { pub struct ConnectionHandle {
_id: NetworkConnectionId, connection_id: NetworkConnectionId,
descriptor: ConnectionDescriptor, flow: Flow,
channel: flume::Sender<(Option<Id>, Vec<u8>)>, channel: flume::Sender<(Option<Id>, Vec<u8>)>,
} }
@ -15,23 +15,32 @@ pub enum ConnectionHandleSendResult {
impl ConnectionHandle { impl ConnectionHandle {
pub(super) fn new( pub(super) fn new(
id: NetworkConnectionId, connection_id: NetworkConnectionId,
descriptor: ConnectionDescriptor, flow: Flow,
channel: flume::Sender<(Option<Id>, Vec<u8>)>, channel: flume::Sender<(Option<Id>, Vec<u8>)>,
) -> Self { ) -> Self {
Self { Self {
_id: id, connection_id,
descriptor, flow,
channel, channel,
} }
} }
// pub fn connection_id(&self) -> NetworkConnectionId { #[allow(dead_code)]
// self.id pub fn connection_id(&self) -> NetworkConnectionId {
// } self.connection_id
}
pub fn connection_descriptor(&self) -> ConnectionDescriptor { #[allow(dead_code)]
self.descriptor pub fn flow(&self) -> Flow {
self.flow
}
pub fn unique_flow(&self) -> UniqueFlow {
UniqueFlow {
flow: self.flow,
connection_id: Some(self.connection_id),
}
} }
// #[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))] // #[cfg_attr(feature="verbose-tracing", instrument(level="trace", skip(self, message), fields(message.len = message.len())))]
@ -57,7 +66,7 @@ impl ConnectionHandle {
impl PartialEq for ConnectionHandle { impl PartialEq for ConnectionHandle {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.descriptor == other.descriptor self.connection_id == other.connection_id && self.flow == other.flow
} }
} }

View File

@ -16,23 +16,25 @@ enum ConnectionManagerEvent {
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct ConnectionRefScope { pub(crate) struct ConnectionRefScope {
connection_manager: ConnectionManager, connection_manager: ConnectionManager,
descriptor: ConnectionDescriptor, id: NetworkConnectionId,
} }
impl ConnectionRefScope { impl ConnectionRefScope {
pub fn new(connection_manager: ConnectionManager, descriptor: ConnectionDescriptor) -> Self { pub fn try_new(connection_manager: ConnectionManager, id: NetworkConnectionId) -> Option<Self> {
connection_manager.connection_ref(descriptor, ConnectionRefKind::AddRef); if !connection_manager.connection_ref(id, ConnectionRefKind::AddRef) {
Self { return None;
connection_manager,
descriptor,
} }
Some(Self {
connection_manager,
id,
})
} }
} }
impl Drop for ConnectionRefScope { impl Drop for ConnectionRefScope {
fn drop(&mut self) { fn drop(&mut self) {
self.connection_manager self.connection_manager
.connection_ref(self.descriptor, ConnectionRefKind::RemoveRef); .connection_ref(self.id, ConnectionRefKind::RemoveRef);
} }
} }
@ -163,7 +165,7 @@ impl ConnectionManager {
fn should_protect_connection(&self, conn: &NetworkConnection) -> Option<NodeRef> { fn should_protect_connection(&self, conn: &NetworkConnection) -> Option<NodeRef> {
let netman = self.network_manager(); let netman = self.network_manager();
let routing_table = netman.routing_table(); let routing_table = netman.routing_table();
let remote_address = conn.connection_descriptor().remote_address().address(); let remote_address = conn.flow().remote_address().address();
let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else { let Some(routing_domain) = routing_table.routing_domain_for_address(remote_address) else {
return None; return None;
}; };
@ -173,8 +175,8 @@ impl ConnectionManager {
let relay_nr = rn.filtered_clone( let relay_nr = rn.filtered_clone(
NodeRefFilter::new() NodeRefFilter::new()
.with_routing_domain(routing_domain) .with_routing_domain(routing_domain)
.with_address_type(conn.connection_descriptor().address_type()) .with_address_type(conn.flow().address_type())
.with_protocol_type(conn.connection_descriptor().protocol_type()), .with_protocol_type(conn.flow().protocol_type()),
); );
let dids = relay_nr.all_filtered_dial_info_details(); let dids = relay_nr.all_filtered_dial_info_details();
for did in dids { for did in dids {
@ -231,7 +233,7 @@ impl ConnectionManager {
} }
Err(ConnectionTableAddError::AddressFilter(conn, e)) => { Err(ConnectionTableAddError::AddressFilter(conn, e)) => {
// Connection filtered // Connection filtered
let desc = conn.connection_descriptor(); let desc = conn.flow();
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!( return Ok(NetworkResult::no_connection_other(format!(
"connection filtered: {:?} ({})", "connection filtered: {:?} ({})",
@ -240,7 +242,7 @@ impl ConnectionManager {
} }
Err(ConnectionTableAddError::AlreadyExists(conn)) => { Err(ConnectionTableAddError::AlreadyExists(conn)) => {
// Connection already exists // Connection already exists
let desc = conn.connection_descriptor(); let desc = conn.flow();
log_net!(debug "== Connection already exists: {:?}", conn.debug_print(get_aligned_timestamp())); log_net!(debug "== Connection already exists: {:?}", conn.debug_print(get_aligned_timestamp()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!( return Ok(NetworkResult::no_connection_other(format!(
@ -250,7 +252,7 @@ impl ConnectionManager {
} }
Err(ConnectionTableAddError::TableFull(conn)) => { Err(ConnectionTableAddError::TableFull(conn)) => {
// Connection table is full // Connection table is full
let desc = conn.connection_descriptor(); let desc = conn.flow();
log_net!(debug "== Connection table full: {:?}", conn.debug_print(get_aligned_timestamp())); log_net!(debug "== Connection table full: {:?}", conn.debug_print(get_aligned_timestamp()));
let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn)); let _ = inner.sender.send(ConnectionManagerEvent::Dead(conn));
return Ok(NetworkResult::no_connection_other(format!( return Ok(NetworkResult::no_connection_other(format!(
@ -263,10 +265,8 @@ impl ConnectionManager {
} }
// Returns a network connection if one already is established // Returns a network connection if one already is established
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<ConnectionHandle> { pub fn get_connection(&self, flow: Flow) -> Option<ConnectionHandle> {
self.arc self.arc.connection_table.peek_connection_by_flow(flow)
.connection_table
.peek_connection_by_descriptor(descriptor)
} }
// Returns a network connection if one already is established // Returns a network connection if one already is established
@ -275,13 +275,11 @@ impl ConnectionManager {
} }
// Protects a network connection if one already is established // Protects a network connection if one already is established
fn connection_ref(&self, descriptor: ConnectionDescriptor, kind: ConnectionRefKind) { fn connection_ref(&self, id: NetworkConnectionId, kind: ConnectionRefKind) -> bool {
self.arc self.arc.connection_table.ref_connection_by_id(id, kind)
.connection_table
.ref_connection_by_descriptor(descriptor, kind);
} }
pub fn connection_ref_scope(&self, descriptor: ConnectionDescriptor) -> ConnectionRefScope { pub fn try_connection_ref_scope(&self, id: NetworkConnectionId) -> Option<ConnectionRefScope> {
ConnectionRefScope::new(self.clone(), descriptor) ConnectionRefScope::try_new(self.clone(), id)
} }
/// Called when we want to create a new connection or get the current one that already exists /// Called when we want to create a new connection or get the current one that already exists
@ -385,14 +383,22 @@ impl ConnectionManager {
// Process async commands // Process async commands
while let Ok(Ok(event)) = receiver.recv_async().timeout_at(stop_token.clone()).await { while let Ok(Ok(event)) = receiver.recv_async().timeout_at(stop_token.clone()).await {
match event { match event {
ConnectionManagerEvent::Accepted(conn) => { ConnectionManagerEvent::Accepted(prot_conn) => {
// Async lock on the remote address for atomicity per remote
let _lock_guard = self
.arc
.address_lock_table
.lock_tag(prot_conn.flow().remote_address().socket_addr())
.await;
let mut inner = self.arc.inner.lock(); let mut inner = self.arc.inner.lock();
match &mut *inner { match &mut *inner {
Some(inner) => { Some(inner) => {
// Register the connection // Register the connection
// We don't care if this fails, since nobody here asked for the inbound connection. // We don't care if this fails, since nobody here asked for the inbound connection.
// If it does, we just drop the connection // If it does, we just drop the connection
let _ = self.on_new_protocol_network_connection(inner, conn);
let _ = self.on_new_protocol_network_connection(inner, prot_conn);
} }
None => { None => {
// If this somehow happens, we're shutting down // If this somehow happens, we're shutting down
@ -400,6 +406,12 @@ impl ConnectionManager {
}; };
} }
ConnectionManagerEvent::Dead(mut conn) => { ConnectionManagerEvent::Dead(mut conn) => {
let _lock_guard = self
.arc
.address_lock_table
.lock_tag(conn.flow().remote_address().socket_addr())
.await;
conn.close(); conn.close();
conn.await; conn.await;
} }

View File

@ -38,7 +38,7 @@ struct ConnectionTableInner {
max_connections: Vec<usize>, max_connections: Vec<usize>,
conn_by_id: Vec<LruCache<NetworkConnectionId, NetworkConnection>>, conn_by_id: Vec<LruCache<NetworkConnectionId, NetworkConnection>>,
protocol_index_by_id: BTreeMap<NetworkConnectionId, usize>, protocol_index_by_id: BTreeMap<NetworkConnectionId, usize>,
id_by_descriptor: BTreeMap<ConnectionDescriptor, NetworkConnectionId>, id_by_flow: BTreeMap<Flow, NetworkConnectionId>,
ids_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnectionId>>, ids_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnectionId>>,
address_filter: AddressFilter, address_filter: AddressFilter,
} }
@ -67,7 +67,7 @@ impl ConnectionTable {
LruCache::new_unbounded(), LruCache::new_unbounded(),
], ],
protocol_index_by_id: BTreeMap::new(), protocol_index_by_id: BTreeMap::new(),
id_by_descriptor: BTreeMap::new(), id_by_flow: BTreeMap::new(),
ids_by_remote: BTreeMap::new(), ids_by_remote: BTreeMap::new(),
address_filter, address_filter,
})), })),
@ -105,7 +105,7 @@ impl ConnectionTable {
} }
} }
inner.protocol_index_by_id.clear(); inner.protocol_index_by_id.clear();
inner.id_by_descriptor.clear(); inner.id_by_flow.clear();
inner.ids_by_remote.clear(); inner.ids_by_remote.clear();
unord unord
}; };
@ -151,14 +151,14 @@ impl ConnectionTable {
) -> Result<Option<NetworkConnection>, ConnectionTableAddError> { ) -> Result<Option<NetworkConnection>, ConnectionTableAddError> {
// Get indices for network connection table // Get indices for network connection table
let id = network_connection.connection_id(); let id = network_connection.connection_id();
let descriptor = network_connection.connection_descriptor(); let flow = network_connection.flow();
let protocol_index = Self::protocol_to_index(descriptor.protocol_type()); let protocol_index = Self::protocol_to_index(flow.protocol_type());
let remote = descriptor.remote(); let remote = flow.remote();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
// Two connections to the same descriptor should be rejected (soft rejection) // Two connections to the same flow should be rejected (soft rejection)
if inner.id_by_descriptor.contains_key(&descriptor) { if inner.id_by_flow.contains_key(&flow) {
return Err(ConnectionTableAddError::already_exists(network_connection)); return Err(ConnectionTableAddError::already_exists(network_connection));
} }
@ -169,14 +169,14 @@ impl ConnectionTable {
if inner.protocol_index_by_id.get(&id).is_some() { if inner.protocol_index_by_id.get(&id).is_some() {
panic!("duplicate id to protocol index: {:#?}", network_connection); panic!("duplicate id to protocol index: {:#?}", network_connection);
} }
if let Some(ids) = inner.ids_by_remote.get(&descriptor.remote()) { if let Some(ids) = inner.ids_by_remote.get(&flow.remote()) {
if ids.contains(&id) { if ids.contains(&id) {
panic!("duplicate id by remote: {:#?}", network_connection); panic!("duplicate id by remote: {:#?}", network_connection);
} }
} }
// Filter by ip for connection limits // Filter by ip for connection limits
let ip_addr = descriptor.remote_address().ip_addr(); let ip_addr = flow.remote_address().ip_addr();
match inner.address_filter.add_connection(ip_addr) { match inner.address_filter.add_connection(ip_addr) {
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {
@ -217,12 +217,26 @@ impl ConnectionTable {
// add connection records // add connection records
inner.protocol_index_by_id.insert(id, protocol_index); inner.protocol_index_by_id.insert(id, protocol_index);
inner.id_by_descriptor.insert(descriptor, id); inner.id_by_flow.insert(flow, id);
inner.ids_by_remote.entry(remote).or_default().push(id); inner.ids_by_remote.entry(remote).or_default().push(id);
Ok(out_conn) Ok(out_conn)
} }
//#[instrument(level = "trace", skip(self), ret)]
pub fn peek_connection_by_flow(&self, flow: Flow) -> Option<ConnectionHandle> {
if flow.protocol_type() == ProtocolType::UDP {
return None;
}
let inner = self.inner.lock();
let id = *inner.id_by_flow.get(&flow)?;
let protocol_index = Self::protocol_to_index(flow.protocol_type());
let out = inner.conn_by_id[protocol_index].peek(&id).unwrap();
Some(out.get_handle())
}
//#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
pub fn touch_connection_by_id(&self, id: NetworkConnectionId) { pub fn touch_connection_by_id(&self, id: NetworkConnectionId) {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -233,45 +247,21 @@ impl ConnectionTable {
} }
//#[instrument(level = "trace", skip(self), ret)] //#[instrument(level = "trace", skip(self), ret)]
pub fn peek_connection_by_descriptor( pub fn ref_connection_by_id(
&self, &self,
descriptor: ConnectionDescriptor, id: NetworkConnectionId,
) -> Option<ConnectionHandle> {
if descriptor.protocol_type() == ProtocolType::UDP {
return None;
}
let inner = self.inner.lock();
let id = *inner.id_by_descriptor.get(&descriptor)?;
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());
let out = inner.conn_by_id[protocol_index].peek(&id).unwrap();
Some(out.get_handle())
}
//#[instrument(level = "trace", skip(self), ret)]
pub fn ref_connection_by_descriptor(
&self,
descriptor: ConnectionDescriptor,
ref_type: ConnectionRefKind, ref_type: ConnectionRefKind,
) -> bool { ) -> bool {
if descriptor.protocol_type() == ProtocolType::UDP {
return false;
}
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let Some(protocol_index) = inner.protocol_index_by_id.get(&id).copied() else {
let Some(id) = inner.id_by_descriptor.get(&descriptor).copied() else { // Sometimes network connections die before we can ref/unref them
log_net!(error "failed to ref descriptor: {:?} ({:?})", descriptor, ref_type);
return false; return false;
}; };
let protocol_index = Self::protocol_to_index(descriptor.protocol_type());
let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap(); let out = inner.conn_by_id[protocol_index].get_mut(&id).unwrap();
match ref_type { match ref_type {
ConnectionRefKind::AddRef => out.add_ref(), ConnectionRefKind::AddRef => out.add_ref(),
ConnectionRefKind::RemoveRef => out.remove_ref(), ConnectionRefKind::RemoveRef => out.remove_ref(),
} }
true true
} }
@ -299,7 +289,7 @@ impl ConnectionTable {
if let Some(best_port) = best_port { if let Some(best_port) = best_port {
for id in all_ids_by_remote { for id in all_ids_by_remote {
let nc = inner.conn_by_id[protocol_index].peek(id).unwrap(); let nc = inner.conn_by_id[protocol_index].peek(id).unwrap();
if let Some(local_addr) = nc.connection_descriptor().local() { if let Some(local_addr) = nc.flow().local() {
if local_addr.port() == best_port { if local_addr.port() == best_port {
let nc = inner.conn_by_id[protocol_index].get(id).unwrap(); let nc = inner.conn_by_id[protocol_index].get(id).unwrap();
return Some(nc.get_handle()); return Some(nc.get_handle());
@ -326,13 +316,13 @@ impl ConnectionTable {
// pub fn drain_filter<F>(&self, mut filter: F) -> Vec<NetworkConnection> // pub fn drain_filter<F>(&self, mut filter: F) -> Vec<NetworkConnection>
// where // where
// F: FnMut(ConnectionDescriptor) -> bool, // F: FnMut(Flow) -> bool,
// { // {
// let mut inner = self.inner.lock(); // let mut inner = self.inner.lock();
// let mut filtered_ids = Vec::new(); // let mut filtered_ids = Vec::new();
// for cbi in &mut inner.conn_by_id { // for cbi in &mut inner.conn_by_id {
// for (id, conn) in cbi { // for (id, conn) in cbi {
// if filter(conn.connection_descriptor()) { // if filter(conn.flow()) {
// filtered_ids.push(*id); // filtered_ids.push(*id);
// } // }
// } // }
@ -359,11 +349,11 @@ impl ConnectionTable {
let protocol_index = inner.protocol_index_by_id.remove(&id).unwrap(); let protocol_index = inner.protocol_index_by_id.remove(&id).unwrap();
// conn_by_id // conn_by_id
let conn = inner.conn_by_id[protocol_index].remove(&id).unwrap(); let conn = inner.conn_by_id[protocol_index].remove(&id).unwrap();
// id_by_descriptor // id_by_flow
let descriptor = conn.connection_descriptor(); let flow = conn.flow();
inner.id_by_descriptor.remove(&descriptor).unwrap(); inner.id_by_flow.remove(&flow).unwrap();
// ids_by_remote // ids_by_remote
let remote = descriptor.remote(); let remote = flow.remote();
let ids = inner.ids_by_remote.get_mut(&remote).unwrap(); let ids = inner.ids_by_remote.get_mut(&remote).unwrap();
for (n, elem) in ids.iter().enumerate() { for (n, elem) in ids.iter().enumerate() {
if *elem == id { if *elem == id {

View File

@ -3,10 +3,7 @@ use super::*;
impl NetworkManager { impl NetworkManager {
// Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism) // Direct bootstrap request handler (separate fallback mechanism from cheaper TXT bootstrap mechanism)
#[instrument(level = "trace", skip(self), ret, err)] #[instrument(level = "trace", skip(self), ret, err)]
pub(crate) async fn handle_boot_request( pub(crate) async fn handle_boot_request(&self, flow: Flow) -> EyreResult<NetworkResult<()>> {
&self,
descriptor: ConnectionDescriptor,
) -> EyreResult<NetworkResult<()>> {
let routing_table = self.routing_table(); let routing_table = self.routing_table();
// Get a bunch of nodes with the various // Get a bunch of nodes with the various
@ -22,14 +19,14 @@ impl NetworkManager {
// Reply with a chunk of signed routing table // Reply with a chunk of signed routing table
match self match self
.net() .net()
.send_data_to_existing_connection(descriptor, json_bytes) .send_data_to_existing_flow(flow, json_bytes)
.await? .await?
{ {
None => { SendDataToExistingFlowResult::Sent(_) => {
// Bootstrap reply was sent // Bootstrap reply was sent
Ok(NetworkResult::value(())) Ok(NetworkResult::value(()))
} }
Some(_) => Ok(NetworkResult::no_connection_other( SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other(
"bootstrap reply could not be sent", "bootstrap reply could not be sent",
)), )),
} }

View File

@ -95,8 +95,8 @@ pub(crate) struct SendDataMethod {
pub contact_method: NodeContactMethod, pub contact_method: NodeContactMethod,
/// Pre-relayed contact method /// Pre-relayed contact method
pub opt_relayed_contact_method: Option<NodeContactMethod>, pub opt_relayed_contact_method: Option<NodeContactMethod>,
/// The connection used to send the data /// The specific flow used to send the data
pub connection_descriptor: ConnectionDescriptor, pub unique_flow: UniqueFlow,
} }
/// Mechanism required to contact another node /// Mechanism required to contact another node
@ -128,6 +128,11 @@ struct NodeContactMethodCacheKey {
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] #[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct PublicAddressCheckCacheKey(ProtocolType, AddressType); struct PublicAddressCheckCacheKey(ProtocolType, AddressType);
enum SendDataToExistingFlowResult {
Sent(UniqueFlow),
NotSent(Vec<u8>),
}
// The mutable state of the network manager // The mutable state of the network manager
struct NetworkManagerInner { struct NetworkManagerInner {
stats: NetworkManagerStats, stats: NetworkManagerStats,
@ -661,7 +666,7 @@ impl NetworkManager {
#[instrument(level = "trace", skip(self), err)] #[instrument(level = "trace", skip(self), err)]
pub async fn handle_signal( pub async fn handle_signal(
&self, &self,
signal_connection_descriptor: ConnectionDescriptor, signal_flow: Flow,
signal_info: SignalInfo, signal_info: SignalInfo,
) -> EyreResult<NetworkResult<()>> { ) -> EyreResult<NetworkResult<()>> {
match signal_info { match signal_info {
@ -685,7 +690,7 @@ impl NetworkManager {
}; };
// Restrict reverse connection to same sequencing requirement as inbound signal // Restrict reverse connection to same sequencing requirement as inbound signal
if signal_connection_descriptor.protocol_type().is_ordered() { if signal_flow.protocol_type().is_ordered() {
peer_nr.set_sequencing(Sequencing::EnsureOrdered); peer_nr.set_sequencing(Sequencing::EnsureOrdered);
} }
@ -730,7 +735,7 @@ impl NetworkManager {
// Do our half of the hole punch by sending an empty packet // Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole // Both sides will do this and then the receipt will get sent over the punched hole
let connection_descriptor = network_result_try!( let unique_flow = network_result_try!(
self.net() self.net()
.send_data_to_dial_info( .send_data_to_dial_info(
hole_punch_dial_info_detail.dial_info.clone(), hole_punch_dial_info_detail.dial_info.clone(),
@ -742,7 +747,7 @@ impl NetworkManager {
// XXX: do we need a delay here? or another hole punch packet? // XXX: do we need a delay here? or another hole punch packet?
// Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch // Set the hole punch as our 'last connection' to ensure we return the receipt over the direct hole punch
peer_nr.set_last_connection(connection_descriptor, get_aligned_timestamp()); peer_nr.set_last_flow(unique_flow.flow, get_aligned_timestamp());
// Return the receipt using the same dial info send the receipt to it // Return the receipt using the same dial info send the receipt to it
rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt) rpc.rpc_call_return_receipt(Destination::direct(peer_nr), receipt)
@ -867,28 +872,20 @@ impl NetworkManager {
// network protocol handler. Processes the envelope, authenticates and decrypts the RPC message // network protocol handler. Processes the envelope, authenticates and decrypts the RPC message
// and passes it to the RPC handler // and passes it to the RPC handler
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", ret, err, skip(self, data), fields(data.len = data.len())))]
async fn on_recv_envelope( async fn on_recv_envelope(&self, data: &mut [u8], flow: Flow) -> EyreResult<bool> {
&self,
data: &mut [u8],
connection_descriptor: ConnectionDescriptor,
) -> EyreResult<bool> {
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
let root = span!( let root = span!(
parent: None, parent: None,
Level::TRACE, Level::TRACE,
"on_recv_envelope", "on_recv_envelope",
"data.len" = data.len(), "data.len" = data.len(),
"descriptor" = ?connection_descriptor "flow" = ?flow
); );
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
let _root_enter = root.enter(); let _root_enter = root.enter();
log_net!( log_net!("envelope of {} bytes received from {:?}", data.len(), flow);
"envelope of {} bytes received from {:?}", let remote_addr = flow.remote_address().ip_addr();
data.len(),
connection_descriptor
);
let remote_addr = connection_descriptor.remote_address().ip_addr();
// Network accounting // Network accounting
self.stats_packet_rcvd(remote_addr, ByteCount::new(data.len() as u64)); self.stats_packet_rcvd(remote_addr, ByteCount::new(data.len() as u64));
@ -910,18 +907,18 @@ impl NetworkManager {
// Get the routing domain for this data // Get the routing domain for this data
let routing_domain = match self let routing_domain = match self
.routing_table() .routing_table()
.routing_domain_for_address(connection_descriptor.remote_address().address()) .routing_domain_for_address(flow.remote_address().address())
{ {
Some(rd) => rd, Some(rd) => rd,
None => { None => {
log_net!(debug "no routing domain for envelope received from {:?}", connection_descriptor); log_net!(debug "no routing domain for envelope received from {:?}", flow);
return Ok(false); return Ok(false);
} }
}; };
// Is this a direct bootstrap request instead of an envelope? // Is this a direct bootstrap request instead of an envelope?
if data[0..4] == *BOOT_MAGIC { if data[0..4] == *BOOT_MAGIC {
network_result_value_or_log!(self.handle_boot_request(connection_descriptor).await? => [ format!(": connection_descriptor={:?}", connection_descriptor) ] {}); network_result_value_or_log!(self.handle_boot_request(flow).await? => [ format!(": flow={:?}", flow) ] {});
return Ok(true); return Ok(true);
} }
@ -968,7 +965,7 @@ impl NetworkManager {
log_net!(debug log_net!(debug
"Timestamp behind: {}ms ({})", "Timestamp behind: {}ms ({})",
timestamp_to_secs(ts.saturating_sub(ets).as_u64()) * 1000f64, timestamp_to_secs(ts.saturating_sub(ets).as_u64()) * 1000f64,
connection_descriptor.remote() flow.remote()
); );
return Ok(false); return Ok(false);
} }
@ -978,7 +975,7 @@ impl NetworkManager {
log_net!(debug log_net!(debug
"Timestamp ahead: {}ms ({})", "Timestamp ahead: {}ms ({})",
timestamp_to_secs(ets.saturating_sub(ts).as_u64()) * 1000f64, timestamp_to_secs(ets.saturating_sub(ts).as_u64()) * 1000f64,
connection_descriptor.remote() flow.remote()
); );
return Ok(false); return Ok(false);
} }
@ -1033,7 +1030,7 @@ impl NetworkManager {
if let Some(mut relay_nr) = some_relay_nr { if let Some(mut relay_nr) = some_relay_nr {
// Ensure the protocol used to forward is of the same sequencing requirement // Ensure the protocol used to forward is of the same sequencing requirement
// Address type is allowed to change if connectivity is better // Address type is allowed to change if connectivity is better
if connection_descriptor.protocol_type().is_ordered() { if flow.protocol_type().is_ordered() {
relay_nr.set_sequencing(Sequencing::EnsureOrdered); relay_nr.set_sequencing(Sequencing::EnsureOrdered);
}; };
@ -1080,7 +1077,7 @@ impl NetworkManager {
// Cache the envelope information in the routing table // Cache the envelope information in the routing table
let source_noderef = match routing_table.register_node_with_existing_connection( let source_noderef = match routing_table.register_node_with_existing_connection(
envelope.get_sender_typed_id(), envelope.get_sender_typed_id(),
connection_descriptor, flow,
ts, ts,
) { ) {
Ok(v) => v, Ok(v) => v,
@ -1093,13 +1090,7 @@ impl NetworkManager {
source_noderef.add_envelope_version(envelope.get_version()); source_noderef.add_envelope_version(envelope.get_version());
// Pass message to RPC system // Pass message to RPC system
rpc.enqueue_direct_message( rpc.enqueue_direct_message(envelope, source_noderef, flow, routing_domain, body)?;
envelope,
source_noderef,
connection_descriptor,
routing_domain,
body,
)?;
// Inform caller that we dealt with the envelope locally // Inform caller that we dealt with the envelope locally
Ok(true) Ok(true)

View File

@ -578,75 +578,79 @@ impl Network {
} }
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_existing_connection( pub async fn send_data_to_existing_flow(
&self, &self,
descriptor: ConnectionDescriptor, flow: Flow,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<Option<Vec<u8>>> { ) -> EyreResult<SendDataToExistingFlowResult> {
let data_len = data.len(); let data_len = data.len();
// Handle connectionless protocol // Handle connectionless protocol
if descriptor.protocol_type() == ProtocolType::UDP { if flow.protocol_type() == ProtocolType::UDP {
// send over the best udp socket we have bound since UDP is not connection oriented // send over the best udp socket we have bound since UDP is not connection oriented
let peer_socket_addr = descriptor.remote().socket_addr(); let peer_socket_addr = flow.remote().socket_addr();
if let Some(ph) = self.find_best_udp_protocol_handler( if let Some(ph) = self.find_best_udp_protocol_handler(
&peer_socket_addr, &peer_socket_addr,
&descriptor.local().map(|sa| sa.socket_addr()), &flow.local().map(|sa| sa.socket_addr()),
) { ) {
network_result_value_or_log!(ph.clone() network_result_value_or_log!(ph.clone()
.send_message(data.clone(), peer_socket_addr) .send_message(data.clone(), peer_socket_addr)
.await .await
.wrap_err("sending data to existing connection")? => [ format!(": data.len={}, descriptor={:?}", data.len(), descriptor) ] .wrap_err("sending data to existing connection")? => [ format!(": data.len={}, flow={:?}", data.len(), flow) ]
{ return Ok(Some(data)); } ); { return Ok(SendDataToExistingFlowResult::NotSent(data)); } );
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(peer_socket_addr.ip(), ByteCount::new(data_len as u64)); .stats_packet_sent(peer_socket_addr.ip(), ByteCount::new(data_len as u64));
// Data was consumed // Data was consumed
return Ok(None); let unique_flow = UniqueFlow {
flow,
connection_id: None,
};
return Ok(SendDataToExistingFlowResult::Sent(unique_flow));
} }
} }
// Handle connection-oriented protocols // Handle connection-oriented protocols
// Try to send to the exact existing connection if one exists // Try to send to the exact existing connection if one exists
if let Some(conn) = self.connection_manager().get_connection(descriptor) { if let Some(conn) = self.connection_manager().get_connection(flow) {
// connection exists, send over it // connection exists, send over it
match conn.send_async(data).await { match conn.send_async(data).await {
ConnectionHandleSendResult::Sent => { ConnectionHandleSendResult::Sent => {
// Network accounting // Network accounting
self.network_manager().stats_packet_sent( self.network_manager().stats_packet_sent(
descriptor.remote().socket_addr().ip(), flow.remote().socket_addr().ip(),
ByteCount::new(data_len as u64), ByteCount::new(data_len as u64),
); );
// Data was consumed // Data was consumed
return Ok(None); return Ok(SendDataToExistingFlowResult::Sent(conn.unique_flow()));
} }
ConnectionHandleSendResult::NotSent(data) => { ConnectionHandleSendResult::NotSent(data) => {
// Couldn't send // Couldn't send
// Pass the data back out so we don't own it any more // Pass the data back out so we don't own it any more
return Ok(Some(data)); return Ok(SendDataToExistingFlowResult::NotSent(data));
} }
} }
} }
// Connection didn't exist // Connection didn't exist
// Pass the data back out so we don't own it any more // Pass the data back out so we don't own it any more
Ok(Some(data)) Ok(SendDataToExistingFlowResult::NotSent(data))
} }
// Send data directly to a dial info, possibly without knowing which node it is going to // Send data directly to a dial info, possibly without knowing which node it is going to
// Returns a descriptor for the connection used to send the data // Returns a flow for the connection used to send the data
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_dial_info( pub async fn send_data_to_dial_info(
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<UniqueFlow>> {
self.record_dial_info_failure(dial_info.clone(), async move { self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len(); let data_len = data.len();
let connection_descriptor; let unique_flow;
if dial_info.protocol_type() == ProtocolType::UDP { if dial_info.protocol_type() == ProtocolType::UDP {
// Handle connectionless protocol // Handle connectionless protocol
let peer_socket_addr = dial_info.to_socket_addr(); let peer_socket_addr = dial_info.to_socket_addr();
@ -658,10 +662,14 @@ impl Network {
)); ));
} }
}; };
connection_descriptor = network_result_try!(ph let flow = network_result_try!(ph
.send_message(data, peer_socket_addr) .send_message(data, peer_socket_addr)
.await .await
.wrap_err("failed to send data to dial info")?); .wrap_err("failed to send data to dial info")?);
unique_flow = UniqueFlow {
flow,
connection_id: None,
};
} else { } else {
// Handle connection-oriented protocols // Handle connection-oriented protocols
let conn = network_result_try!( let conn = network_result_try!(
@ -676,14 +684,14 @@ impl Network {
"failed to send", "failed to send",
))); )));
} }
connection_descriptor = conn.connection_descriptor(); unique_flow = conn.unique_flow();
} }
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64)); .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(connection_descriptor)) Ok(NetworkResult::value(unique_flow))
}) })
.await .await
} }

View File

@ -65,16 +65,16 @@ impl Network {
.timeout_at(stop_token.clone()) .timeout_at(stop_token.clone())
.await .await
{ {
Ok(Ok((size, descriptor))) => { Ok(Ok((size, flow))) => {
// Network accounting // Network accounting
network_manager.stats_packet_rcvd( network_manager.stats_packet_rcvd(
descriptor.remote_address().ip_addr(), flow.remote_address().ip_addr(),
ByteCount::new(size as u64), ByteCount::new(size as u64),
); );
// Pass it up for processing // Pass it up for processing
if let Err(e) = network_manager if let Err(e) = network_manager
.on_recv_envelope(&mut data[..size], descriptor) .on_recv_envelope(&mut data[..size], flow)
.await .await
{ {
log_net!(debug "failed to process received udp envelope: {}", e); log_net!(debug "failed to process received udp envelope: {}", e);

View File

@ -45,13 +45,13 @@ impl ProtocolNetworkConnection {
} }
} }
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn flow(&self) -> Flow {
match self { match self {
// Self::Dummy(d) => d.descriptor(), // Self::Dummy(d) => d.flow(),
Self::RawTcp(t) => t.descriptor(), Self::RawTcp(t) => t.flow(),
Self::WsAccepted(w) => w.descriptor(), Self::WsAccepted(w) => w.flow(),
Self::Ws(w) => w.descriptor(), Self::Ws(w) => w.flow(),
Self::Wss(w) => w.descriptor(), Self::Wss(w) => w.flow(),
} }
} }

View File

@ -3,7 +3,7 @@ use futures_util::{AsyncReadExt, AsyncWriteExt};
use sockets::*; use sockets::*;
pub struct RawTcpNetworkConnection { pub struct RawTcpNetworkConnection {
descriptor: ConnectionDescriptor, flow: Flow,
stream: AsyncPeekStream, stream: AsyncPeekStream,
} }
@ -14,12 +14,12 @@ impl fmt::Debug for RawTcpNetworkConnection {
} }
impl RawTcpNetworkConnection { impl RawTcpNetworkConnection {
pub fn new(descriptor: ConnectionDescriptor, stream: AsyncPeekStream) -> Self { pub fn new(flow: Flow, stream: AsyncPeekStream) -> Self {
Self { descriptor, stream } Self { flow, stream }
} }
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn flow(&self) -> Flow {
self.descriptor self.flow
} }
#[cfg_attr( #[cfg_attr(
@ -152,7 +152,7 @@ impl RawTcpProtocolHandler {
ProtocolType::TCP, ProtocolType::TCP,
); );
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(
ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), Flow::new(peer_addr, SocketAddress::from_socket_addr(local_addr)),
ps, ps,
)); ));
@ -186,7 +186,7 @@ impl RawTcpProtocolHandler {
// Wrap the stream in a network connection and return it // Wrap the stream in a network connection and return it
let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new( let conn = ProtocolNetworkConnection::RawTcp(RawTcpNetworkConnection::new(
ConnectionDescriptor::new( Flow::new(
PeerAddress::new( PeerAddress::new(
SocketAddress::from_socket_addr(socket_addr), SocketAddress::from_socket_addr(socket_addr),
ProtocolType::TCP, ProtocolType::TCP,

View File

@ -17,9 +17,9 @@ impl RawUdpProtocolHandler {
} }
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.descriptor)))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.len, ret.flow)))]
pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, ConnectionDescriptor)> { pub async fn recv_message(&self, data: &mut [u8]) -> io::Result<(usize, Flow)> {
let (message_len, descriptor) = loop { let (message_len, flow) = loop {
// Get a packet // Get a packet
let (size, remote_addr) = network_result_value_or_log!(self.socket.recv_from(data).await.into_network_result()? => continue); let (size, remote_addr) = network_result_value_or_log!(self.socket.recv_from(data).await.into_network_result()? => continue);
@ -64,33 +64,33 @@ impl RawUdpProtocolHandler {
// Copy assemble message out if we got one // Copy assemble message out if we got one
data[0..message.len()].copy_from_slice(&message); data[0..message.len()].copy_from_slice(&message);
// Return a connection descriptor and the amount of data in the message // Return a flow and the amount of data in the message
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
SocketAddress::from_socket_addr(remote_addr), SocketAddress::from_socket_addr(remote_addr),
ProtocolType::UDP, ProtocolType::UDP,
); );
let local_socket_addr = self.socket.local_addr()?; let local_socket_addr = self.socket.local_addr()?;
let descriptor = ConnectionDescriptor::new( let flow = Flow::new(
peer_addr, peer_addr,
SocketAddress::from_socket_addr(local_socket_addr), SocketAddress::from_socket_addr(local_socket_addr),
); );
break (message.len(), descriptor); break (message.len(), flow);
}; };
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.len", message_len); tracing::Span::current().record("ret.len", message_len);
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.descriptor", format!("{:?}", descriptor).as_str()); tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str());
Ok((message_len, descriptor)) Ok((message_len, flow))
} }
#[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.descriptor)))] #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", err, skip(self, data), fields(data.len = data.len(), ret.flow)))]
pub async fn send_message( pub async fn send_message(
&self, &self,
data: Vec<u8>, data: Vec<u8>,
remote_addr: SocketAddr, remote_addr: SocketAddr,
) -> io::Result<NetworkResult<ConnectionDescriptor>> { ) -> io::Result<NetworkResult<Flow>> {
if data.len() > MAX_MESSAGE_SIZE { if data.len() > MAX_MESSAGE_SIZE {
bail_io_error_other!("sending too large UDP message"); bail_io_error_other!("sending too large UDP message");
} }
@ -121,21 +121,21 @@ impl RawUdpProtocolHandler {
.await? .await?
); );
// Return a connection descriptor for the sent message // Return a flow for the sent message
let peer_addr = PeerAddress::new( let peer_addr = PeerAddress::new(
SocketAddress::from_socket_addr(remote_addr), SocketAddress::from_socket_addr(remote_addr),
ProtocolType::UDP, ProtocolType::UDP,
); );
let local_socket_addr = self.socket.local_addr()?; let local_socket_addr = self.socket.local_addr()?;
let descriptor = ConnectionDescriptor::new( let flow = Flow::new(
peer_addr, peer_addr,
SocketAddress::from_socket_addr(local_socket_addr), SocketAddress::from_socket_addr(local_socket_addr),
); );
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
tracing::Span::current().record("ret.descriptor", format!("{:?}", descriptor).as_str()); tracing::Span::current().record("ret.flow", format!("{:?}", flow).as_str());
Ok(NetworkResult::value(descriptor)) Ok(NetworkResult::value(flow))
} }
#[instrument(level = "trace", err)] #[instrument(level = "trace", err)]

View File

@ -54,7 +54,7 @@ pub struct WebsocketNetworkConnection<T>
where where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static, T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
descriptor: ConnectionDescriptor, flow: Flow,
stream: CloneStream<WebSocketStream<T>>, stream: CloneStream<WebSocketStream<T>>,
} }
@ -71,15 +71,15 @@ impl<T> WebsocketNetworkConnection<T>
where where
T: AsyncRead + AsyncWrite + Send + Unpin + 'static, T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
{ {
pub fn new(descriptor: ConnectionDescriptor, stream: WebSocketStream<T>) -> Self { pub fn new(flow: Flow, stream: WebSocketStream<T>) -> Self {
Self { Self {
descriptor, flow,
stream: CloneStream::new(stream), stream: CloneStream::new(stream),
} }
} }
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn flow(&self) -> Flow {
self.descriptor self.flow
} }
#[cfg_attr( #[cfg_attr(
@ -286,7 +286,7 @@ impl WebsocketProtocolHandler {
PeerAddress::new(SocketAddress::from_socket_addr(socket_addr), protocol_type); PeerAddress::new(SocketAddress::from_socket_addr(socket_addr), protocol_type);
let conn = ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new( let conn = ProtocolNetworkConnection::WsAccepted(WebsocketNetworkConnection::new(
ConnectionDescriptor::new(peer_addr, SocketAddress::from_socket_addr(local_addr)), Flow::new(peer_addr, SocketAddress::from_socket_addr(local_addr)),
ws_stream, ws_stream,
)); ));
@ -335,8 +335,8 @@ impl WebsocketProtocolHandler {
#[cfg(feature = "rt-tokio")] #[cfg(feature = "rt-tokio")]
let tcp_stream = tcp_stream.compat(); let tcp_stream = tcp_stream.compat();
// Make our connection descriptor // Make our flow
let descriptor = ConnectionDescriptor::new( let flow = Flow::new(
dial_info.peer_address(), dial_info.peer_address(),
SocketAddress::from_socket_addr(actual_local_addr), SocketAddress::from_socket_addr(actual_local_addr),
); );
@ -350,14 +350,14 @@ impl WebsocketProtocolHandler {
.map_err(to_io_error_other)?; .map_err(to_io_error_other)?;
Ok(NetworkResult::Value(ProtocolNetworkConnection::Wss( Ok(NetworkResult::Value(ProtocolNetworkConnection::Wss(
WebsocketNetworkConnection::new(descriptor, ws_stream), WebsocketNetworkConnection::new(flow, ws_stream),
))) )))
} else { } else {
let (ws_stream, _response) = client_async(request, tcp_stream) let (ws_stream, _response) = client_async(request, tcp_stream)
.await .await
.map_err(to_io_error_other)?; .map_err(to_io_error_other)?;
Ok(NetworkResult::Value(ProtocolNetworkConnection::Ws( Ok(NetworkResult::Value(ProtocolNetworkConnection::Ws(
WebsocketNetworkConnection::new(descriptor, ws_stream), WebsocketNetworkConnection::new(flow, ws_stream),
))) )))
} }
} }

View File

@ -47,12 +47,12 @@ cfg_if::cfg_if! {
// #[derive(Debug)] // #[derive(Debug)]
// pub struct DummyNetworkConnection { // pub struct DummyNetworkConnection {
// descriptor: ConnectionDescriptor, // flow: Flow,
// } // }
// impl DummyNetworkConnection { // impl DummyNetworkConnection {
// pub fn descriptor(&self) -> ConnectionDescriptor { // pub fn flow(&self) -> Flow {
// self.descriptor // self.flow
// } // }
// pub fn close(&self) -> io::Result<NetworkResult<()>> { // pub fn close(&self) -> io::Result<NetworkResult<()>> {
// Ok(NetworkResult::Value(())) // Ok(NetworkResult::Value(()))
@ -83,12 +83,10 @@ pub struct NetworkConnectionStats {
} }
pub type NetworkConnectionId = AlignedU64;
#[derive(Debug)] #[derive(Debug)]
pub(in crate::network_manager) struct NetworkConnection { pub(in crate::network_manager) struct NetworkConnection {
connection_id: NetworkConnectionId, connection_id: NetworkConnectionId,
descriptor: ConnectionDescriptor, flow: Flow,
processor: Option<MustJoinHandle<()>>, processor: Option<MustJoinHandle<()>>,
established_time: Timestamp, established_time: Timestamp,
stats: Arc<Mutex<NetworkConnectionStats>>, stats: Arc<Mutex<NetworkConnectionStats>>,
@ -108,13 +106,13 @@ impl Drop for NetworkConnection {
impl NetworkConnection { impl NetworkConnection {
pub(super) fn dummy(id: NetworkConnectionId, descriptor: ConnectionDescriptor) -> Self { pub(super) fn dummy(id: NetworkConnectionId, flow: Flow) -> Self {
// Create handle for sending (dummy is immediately disconnected) // Create handle for sending (dummy is immediately disconnected)
let (sender, _receiver) = flume::bounded(get_concurrency() as usize); let (sender, _receiver) = flume::bounded(get_concurrency() as usize);
Self { Self {
connection_id: id, connection_id: id,
descriptor, flow,
processor: None, processor: None,
established_time: get_aligned_timestamp(), established_time: get_aligned_timestamp(),
stats: Arc::new(Mutex::new(NetworkConnectionStats { stats: Arc::new(Mutex::new(NetworkConnectionStats {
@ -134,8 +132,8 @@ impl NetworkConnection {
protocol_connection: ProtocolNetworkConnection, protocol_connection: ProtocolNetworkConnection,
connection_id: NetworkConnectionId, connection_id: NetworkConnectionId,
) -> Self { ) -> Self {
// Get descriptor // Get flow
let descriptor = protocol_connection.descriptor(); let flow = protocol_connection.flow();
// Create handle for sending // Create handle for sending
let (sender, receiver) = flume::bounded(get_concurrency() as usize); let (sender, receiver) = flume::bounded(get_concurrency() as usize);
@ -155,7 +153,7 @@ impl NetworkConnection {
local_stop_token, local_stop_token,
manager_stop_token, manager_stop_token,
connection_id, connection_id,
descriptor, flow,
receiver, receiver,
protocol_connection, protocol_connection,
stats.clone(), stats.clone(),
@ -164,7 +162,7 @@ impl NetworkConnection {
// Return the connection // Return the connection
Self { Self {
connection_id, connection_id,
descriptor, flow,
processor: Some(processor), processor: Some(processor),
established_time: get_aligned_timestamp(), established_time: get_aligned_timestamp(),
stats, stats,
@ -179,12 +177,20 @@ impl NetworkConnection {
self.connection_id self.connection_id
} }
pub fn connection_descriptor(&self) -> ConnectionDescriptor { pub fn flow(&self) -> Flow {
self.descriptor self.flow
}
#[allow(dead_code)]
pub fn unique_flow(&self) -> UniqueFlow {
UniqueFlow {
flow: self.flow,
connection_id: Some(self.connection_id),
}
} }
pub fn get_handle(&self) -> ConnectionHandle { pub fn get_handle(&self) -> ConnectionHandle {
ConnectionHandle::new(self.connection_id, self.descriptor, self.sender.clone()) ConnectionHandle::new(self.connection_id, self.flow, self.sender.clone())
} }
pub fn is_in_use(&self) -> bool { pub fn is_in_use(&self) -> bool {
@ -264,7 +270,7 @@ impl NetworkConnection {
local_stop_token: StopToken, local_stop_token: StopToken,
manager_stop_token: StopToken, manager_stop_token: StopToken,
connection_id: NetworkConnectionId, connection_id: NetworkConnectionId,
descriptor: ConnectionDescriptor, flow: Flow,
receiver: flume::Receiver<(Option<Id>, Vec<u8>)>, receiver: flume::Receiver<(Option<Id>, Vec<u8>)>,
protocol_connection: ProtocolNetworkConnection, protocol_connection: ProtocolNetworkConnection,
stats: Arc<Mutex<NetworkConnectionStats>>, stats: Arc<Mutex<NetworkConnectionStats>>,
@ -272,7 +278,7 @@ impl NetworkConnection {
Box::pin(async move { Box::pin(async move {
log_net!( log_net!(
"== Starting process_connection loop for id={}, {:?}", connection_id, "== Starting process_connection loop for id={}, {:?}", connection_id,
descriptor flow
); );
let network_manager = connection_manager.network_manager(); let network_manager = connection_manager.network_manager();
@ -286,7 +292,7 @@ impl NetworkConnection {
let new_timer = || { let new_timer = || {
sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async { sleep(connection_manager.connection_inactivity_timeout_ms()).then(|_| async {
// timeout // timeout
log_net!("== Connection timeout on {:?}", descriptor); log_net!("== Connection timeout on {:?}", flow);
RecvLoopAction::Timeout RecvLoopAction::Timeout
}) })
}; };
@ -341,7 +347,7 @@ impl NetworkConnection {
.then(|res| async { .then(|res| async {
match res { match res {
Ok(v) => { Ok(v) => {
let peer_address = protocol_connection.descriptor().remote(); let peer_address = protocol_connection.flow().remote();
// Check to see if it is punished // Check to see if it is punished
if address_filter.is_ip_addr_punished(peer_address.socket_addr().ip()) { if address_filter.is_ip_addr_punished(peer_address.socket_addr().ip()) {
@ -367,7 +373,7 @@ impl NetworkConnection {
// Pass received messages up to the network manager for processing // Pass received messages up to the network manager for processing
if let Err(e) = network_manager if let Err(e) = network_manager
.on_recv_envelope(message.as_mut_slice(), descriptor) .on_recv_envelope(message.as_mut_slice(), flow)
.await .await
{ {
log_net!(debug "failed to process received envelope: {}", e); log_net!(debug "failed to process received envelope: {}", e);
@ -424,8 +430,8 @@ impl NetworkConnection {
} }
log_net!( log_net!(
"== Connection loop finished descriptor={:?}", "== Connection loop finished flow={:?}",
descriptor flow
); );
// Let the connection manager know the receive loop exited // Let the connection manager know the receive loop exited
@ -443,8 +449,8 @@ impl NetworkConnection {
pub fn debug_print(&self, cur_ts: Timestamp) -> String { pub fn debug_print(&self, cur_ts: Timestamp) -> String {
format!("{} <- {} | {} | est {} sent {} rcvd {} refcount {}{}", format!("{} <- {} | {} | est {} sent {} rcvd {} refcount {}{}",
self.descriptor.remote_address(), self.flow.remote_address(),
self.descriptor.local().map(|x| x.to_string()).unwrap_or("---".to_owned()), self.flow.local().map(|x| x.to_string()).unwrap_or("---".to_owned()),
self.connection_id.as_u64(), self.connection_id.as_u64(),
debug_duration(cur_ts.as_u64().saturating_sub(self.established_time.as_u64())), debug_duration(cur_ts.as_u64().saturating_sub(self.established_time.as_u64())),
self.stats().last_message_sent_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()), self.stats().last_message_sent_time.map(|ts| debug_duration(cur_ts.as_u64().saturating_sub(ts.as_u64())) ).unwrap_or("---".to_owned()),

View File

@ -3,10 +3,10 @@ use super::*;
impl NetworkManager { impl NetworkManager {
/// Send raw data to a node /// Send raw data to a node
/// ///
/// We may not have dial info for a node, but have an existing connection for it /// We may not have dial info for a node, but have an existing flow for it
/// because an inbound connection happened first, and no FindNodeQ has happened to that /// because an inbound flow happened first, and no FindNodeQ has happened to that
/// node yet to discover its dial info. The existing connection should be tried first /// node yet to discover its dial info. The existing flow should be tried first
/// in this case, if it matches the node ref's filters and no more permissive connection /// in this case, if it matches the node ref's filters and no more permissive flow
/// could be established. /// could be established.
/// ///
/// Sending to a node requires determining a NetworkClass compatible contact method /// Sending to a node requires determining a NetworkClass compatible contact method
@ -19,27 +19,26 @@ impl NetworkManager {
let this = self.clone(); let this = self.clone();
Box::pin( Box::pin(
async move { async move {
// First try to send data to the last flow we've seen this peer on
// First try to send data to the last socket we've seen this peer on let data = if let Some(flow) = destination_node_ref.last_flow() {
let data = if let Some(connection_descriptor) = destination_node_ref.last_connection() {
match this match this
.net() .net()
.send_data_to_existing_connection(connection_descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
{ {
None => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last flow since we just sent to it
destination_node_ref destination_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp()); .set_last_flow(unique_flow.flow, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataMethod { return Ok(NetworkResult::value(SendDataMethod {
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
connection_descriptor, unique_flow,
})); }));
} }
Some(data) => { SendDataToExistingFlowResult::NotSent(data) => {
// Couldn't send data to existing connection // Couldn't send data to existing flow
// so pass the data back out // so pass the data back out
data data
} }
@ -135,30 +134,32 @@ impl NetworkManager {
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataMethod>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// First try to send data to the last connection we've seen this peer on // First try to send data to the last connection we've seen this peer on
let Some(connection_descriptor) = target_node_ref.last_connection() else { let Some(flow) = target_node_ref.last_flow() else {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
format!("should have found an existing connection: {}", target_node_ref) format!("should have found an existing connection: {}", target_node_ref)
)); ));
}; };
if self let unique_flow = match self
.net() .net()
.send_data_to_existing_connection(connection_descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
.is_some()
{ {
SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow,
SendDataToExistingFlowResult::NotSent(_) => {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
"failed to send to existing connection", "failed to send to existing flow",
)); ));
} }
};
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); target_node_ref.set_last_flow(flow, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataMethod{ Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
connection_descriptor unique_flow
})) }))
} }
@ -169,30 +170,32 @@ impl NetworkManager {
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataMethod>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// Try to send data to the last socket we've seen this peer on // Try to send data to the last socket we've seen this peer on
let Some(connection_descriptor) = target_node_ref.last_connection() else { let Some(flow) = target_node_ref.last_flow() else {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
format!("Node is not reachable and has no existing connection: {}", target_node_ref) format!("Node is not reachable and has no existing connection: {}", target_node_ref)
)); ));
}; };
if self let unique_flow = match self
.net() .net()
.send_data_to_existing_connection(connection_descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
.is_some()
{ {
SendDataToExistingFlowResult::Sent(unique_flow) => unique_flow,
SendDataToExistingFlowResult::NotSent(_) => {
return Ok(NetworkResult::no_connection_other( return Ok(NetworkResult::no_connection_other(
format!("failed to send to unreachable node over existing connection: {:?}", connection_descriptor) format!("failed to send to unreachable node over existing connection: {:?}", flow)
)); ));
} }
};
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); target_node_ref.set_last_flow(flow, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
connection_descriptor,
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
unique_flow,
})) }))
} }
@ -204,24 +207,24 @@ impl NetworkManager {
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataMethod>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { let data = if let Some(flow) = target_node_ref.last_flow() {
match self match self
.net() .net()
.send_data_to_existing_connection(connection_descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
{ {
None => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref target_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp()); .set_last_flow(flow, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataMethod{ return Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
connection_descriptor unique_flow
})); }));
} }
Some(data) => { SendDataToExistingFlowResult::NotSent(data) => {
// Couldn't send data to existing connection // Couldn't send data to existing connection
// so pass the data back out // so pass the data back out
data data
@ -232,14 +235,14 @@ impl NetworkManager {
data data
}; };
let connection_descriptor = network_result_try!( let unique_flow = network_result_try!(
self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data) self.do_reverse_connect(relay_nr.clone(), target_node_ref.clone(), data)
.await? .await?
); );
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
connection_descriptor,
contact_method: NodeContactMethod::SignalReverse(relay_nr, target_node_ref), contact_method: NodeContactMethod::SignalReverse(relay_nr, target_node_ref),
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
unique_flow,
})) }))
} }
@ -251,24 +254,24 @@ impl NetworkManager {
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<SendDataMethod>> { ) -> EyreResult<NetworkResult<SendDataMethod>> {
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = target_node_ref.last_connection() { let data = if let Some(flow) = target_node_ref.last_flow() {
match self match self
.net() .net()
.send_data_to_existing_connection(connection_descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
{ {
None => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
target_node_ref target_node_ref
.set_last_connection(connection_descriptor, get_aligned_timestamp()); .set_last_flow(flow, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataMethod{ return Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
connection_descriptor unique_flow
})); }));
} }
Some(data) => { SendDataToExistingFlowResult::NotSent(data) => {
// Couldn't send data to existing connection // Couldn't send data to existing connection
// so pass the data back out // so pass the data back out
data data
@ -279,12 +282,12 @@ impl NetworkManager {
data data
}; };
let connection_descriptor = let unique_flow =
network_result_try!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data).await?); network_result_try!(self.do_hole_punch(relay_nr.clone(), target_node_ref.clone(), data).await?);
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
connection_descriptor,
contact_method: NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref), contact_method: NodeContactMethod::SignalHolePunch(relay_nr, target_node_ref),
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
unique_flow,
})) }))
} }
@ -299,31 +302,31 @@ impl NetworkManager {
let node_ref = node_ref.filtered_clone(NodeRefFilter::from(dial_info.make_filter())); let node_ref = node_ref.filtered_clone(NodeRefFilter::from(dial_info.make_filter()));
// First try to send data to the last socket we've seen this peer on // First try to send data to the last socket we've seen this peer on
let data = if let Some(connection_descriptor) = node_ref.last_connection() { let data = if let Some(flow) = node_ref.last_flow() {
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
debug!( debug!(
"ExistingConnection: {:?} for {:?}", "ExistingConnection: {:?} for {:?}",
connection_descriptor, node_ref flow, node_ref
); );
match self match self
.net() .net()
.send_data_to_existing_connection(connection_descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
{ {
None => { SendDataToExistingFlowResult::Sent(unique_flow) => {
// Update timestamp for this last connection since we just sent to it // Update timestamp for this last connection since we just sent to it
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); node_ref.set_last_flow(flow, get_aligned_timestamp());
return Ok(NetworkResult::value(SendDataMethod{ return Ok(NetworkResult::value(SendDataMethod{
contact_method: NodeContactMethod::Existing, contact_method: NodeContactMethod::Existing,
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
connection_descriptor unique_flow
})); }));
} }
Some(d) => { SendDataToExistingFlowResult::NotSent(d) => {
// Connection couldn't send, kill it // Connection couldn't send, kill it
node_ref.clear_last_connection(connection_descriptor); node_ref.clear_last_connection(flow);
d d
} }
} }
@ -332,16 +335,16 @@ impl NetworkManager {
}; };
// New direct connection was necessary for this dial info // New direct connection was necessary for this dial info
let connection_descriptor = let unique_flow =
network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?); network_result_try!(self.net().send_data_to_dial_info(dial_info.clone(), data).await?);
// If we connected to this node directly, save off the last connection so we can use it again // If we connected to this node directly, save off the last connection so we can use it again
node_ref.set_last_connection(connection_descriptor, get_aligned_timestamp()); node_ref.set_last_flow(unique_flow.flow, get_aligned_timestamp());
Ok(NetworkResult::value(SendDataMethod { Ok(NetworkResult::value(SendDataMethod {
connection_descriptor,
contact_method: NodeContactMethod::Direct(dial_info), contact_method: NodeContactMethod::Direct(dial_info),
opt_relayed_contact_method: None, opt_relayed_contact_method: None,
unique_flow,
})) }))
} }
@ -528,7 +531,7 @@ impl NetworkManager {
relay_nr: NodeRef, relay_nr: NodeRef,
target_nr: NodeRef, target_nr: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<UniqueFlow>> {
// Build a return receipt for the signal // Build a return receipt for the signal
let receipt_timeout = ms_to_us( let receipt_timeout = ms_to_us(
self.unlocked_inner self.unlocked_inner
@ -592,14 +595,14 @@ impl NetworkManager {
} }
// And now use the existing connection to send over // And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() { if let Some(flow) = inbound_nr.last_flow() {
match self match self
.net() .net()
.send_data_to_existing_connection(descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
{ {
None => Ok(NetworkResult::value(descriptor)), SendDataToExistingFlowResult::Sent(unique_flow) => Ok(NetworkResult::value(unique_flow)),
Some(_) => Ok(NetworkResult::no_connection_other( SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other(
"unable to send over reverse connection", "unable to send over reverse connection",
)), )),
} }
@ -620,7 +623,7 @@ impl NetworkManager {
relay_nr: NodeRef, relay_nr: NodeRef,
target_nr: NodeRef, target_nr: NodeRef,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<UniqueFlow>> {
// Ensure we are filtered down to UDP (the only hole punch protocol supported today) // Ensure we are filtered down to UDP (the only hole punch protocol supported today)
assert!(target_nr assert!(target_nr
.filter_ref() .filter_ref()
@ -660,7 +663,7 @@ impl NetworkManager {
// Do our half of the hole punch by sending an empty packet // Do our half of the hole punch by sending an empty packet
// Both sides will do this and then the receipt will get sent over the punched hole // Both sides will do this and then the receipt will get sent over the punched hole
// Don't bother storing the returned connection descriptor as the 'last connection' because the other side of the hole // Don't bother storing the returned flow as the 'last flow' because the other side of the hole
// punch should come through and create a real 'last connection' for us if this succeeds // punch should come through and create a real 'last connection' for us if this succeeds
network_result_try!( network_result_try!(
self.net() self.net()
@ -710,14 +713,14 @@ impl NetworkManager {
} }
// And now use the existing connection to send over // And now use the existing connection to send over
if let Some(descriptor) = inbound_nr.last_connection() { if let Some(flow) = inbound_nr.last_flow() {
match self match self
.net() .net()
.send_data_to_existing_connection(descriptor, data) .send_data_to_existing_flow(flow, data)
.await? .await?
{ {
None => Ok(NetworkResult::value(descriptor)), SendDataToExistingFlowResult::Sent(unique_flow) => Ok(NetworkResult::value(unique_flow)),
Some(_) => Ok(NetworkResult::no_connection_other( SendDataToExistingFlowResult::NotSent(_) => Ok(NetworkResult::no_connection_other(
"unable to send over hole punch", "unable to send over hole punch",
)), )),
} }

View File

@ -31,7 +31,7 @@ impl NetworkManager {
pub fn report_local_network_socket_address( pub fn report_local_network_socket_address(
&self, &self,
_socket_address: SocketAddress, _socket_address: SocketAddress,
_connection_descriptor: ConnectionDescriptor, _flow: Flow,
_reporting_peer: NodeRef, _reporting_peer: NodeRef,
) { ) {
// XXX: Nothing here yet. // XXX: Nothing here yet.
@ -43,11 +43,11 @@ impl NetworkManager {
pub fn report_public_internet_socket_address( pub fn report_public_internet_socket_address(
&self, &self,
socket_address: SocketAddress, // the socket address as seen by the remote peer socket_address: SocketAddress, // the socket address as seen by the remote peer
connection_descriptor: ConnectionDescriptor, // the connection descriptor used flow: Flow, // the flow used
reporting_peer: NodeRef, // the peer's noderef reporting the socket address reporting_peer: NodeRef, // the peer's noderef reporting the socket address
) { ) {
#[cfg(feature = "network-result-extra")] #[cfg(feature = "network-result-extra")]
debug!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); debug!("report_global_socket_address\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
// Ignore these reports if we are currently detecting public dial info // Ignore these reports if we are currently detecting public dial info
let net = self.net(); let net = self.net();
@ -77,10 +77,7 @@ impl NetworkManager {
}); });
// Get the ip(block) this report is coming from // Get the ip(block) this report is coming from
let reporting_ipblock = ip_to_ipblock( let reporting_ipblock = ip_to_ipblock(ip6_prefix_size, flow.remote_address().ip_addr());
ip6_prefix_size,
connection_descriptor.remote_address().ip_addr(),
);
// Reject public address reports from nodes that we know are behind symmetric nat or // Reject public address reports from nodes that we know are behind symmetric nat or
// nodes that must be using a relay for everything // nodes that must be using a relay for everything
@ -105,10 +102,8 @@ impl NetworkManager {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
let inner = &mut *inner; let inner = &mut *inner;
let addr_proto_type_key = PublicAddressCheckCacheKey( let addr_proto_type_key =
connection_descriptor.protocol_type(), PublicAddressCheckCacheKey(flow.protocol_type(), flow.address_type());
connection_descriptor.address_type(),
);
if inner if inner
.public_address_inconsistencies_table .public_address_inconsistencies_table
.get(&addr_proto_type_key) .get(&addr_proto_type_key)
@ -136,7 +131,7 @@ impl NetworkManager {
NetworkClass::InboundCapable NetworkClass::InboundCapable
) { ) {
// Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed // Get the dial info filter for this connection so we can check if we have any public dialinfo that may have changed
let dial_info_filter = connection_descriptor.make_dial_info_filter(); let dial_info_filter = flow.make_dial_info_filter();
// Get current external ip/port from registered global dialinfo // Get current external ip/port from registered global dialinfo
let current_addresses: BTreeSet<SocketAddress> = routing_table let current_addresses: BTreeSet<SocketAddress> = routing_table
@ -267,7 +262,7 @@ impl NetworkManager {
net.set_needs_public_dial_info_check(bad_public_address_detection_punishment); net.set_needs_public_dial_info_check(bad_public_address_detection_punishment);
} else { } else {
warn!("Public address may have changed. Restarting the server may be required."); warn!("Public address may have changed. Restarting the server may be required.");
warn!("report_global_socket_address\nsocket_address: {:#?}\nconnection_descriptor: {:#?}\nreporting_peer: {:#?}", socket_address, connection_descriptor, reporting_peer); warn!("report_global_socket_address\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
warn!( warn!(
"public_address_check_cache: {:#?}", "public_address_check_cache: {:#?}",
inner.public_address_check_cache inner.public_address_check_cache

View File

@ -9,12 +9,12 @@ pub async fn test_add_get_remove() {
let address_filter = AddressFilter::new(config.clone(), mock_routing_table()); let address_filter = AddressFilter::new(config.clone(), mock_routing_table());
let table = ConnectionTable::new(config, address_filter); let table = ConnectionTable::new(config, address_filter);
let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new( let a1 = Flow::new_no_local(PeerAddress::new(
SocketAddress::new(Address::IPV4(Ipv4Addr::new(192, 168, 0, 1)), 8080), SocketAddress::new(Address::IPV4(Ipv4Addr::new(192, 168, 0, 1)), 8080),
ProtocolType::TCP, ProtocolType::TCP,
)); ));
let a2 = a1; let a2 = a1;
let a3 = ConnectionDescriptor::new( let a3 = Flow::new(
PeerAddress::new( PeerAddress::new(
SocketAddress::new(Address::IPV6(Ipv6Addr::new(191, 0, 0, 0, 0, 0, 0, 1)), 8090), SocketAddress::new(Address::IPV6(Ipv6Addr::new(191, 0, 0, 0, 0, 0, 0, 1)), 8090),
ProtocolType::TCP, ProtocolType::TCP,
@ -26,7 +26,7 @@ pub async fn test_add_get_remove() {
0, 0,
))), ))),
); );
let a4 = ConnectionDescriptor::new( let a4 = Flow::new(
PeerAddress::new( PeerAddress::new(
SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090), SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090),
ProtocolType::TCP, ProtocolType::TCP,
@ -38,7 +38,7 @@ pub async fn test_add_get_remove() {
0, 0,
))), ))),
); );
let a5 = ConnectionDescriptor::new( let a5 = Flow::new(
PeerAddress::new( PeerAddress::new(
SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090), SocketAddress::new(Address::IPV6(Ipv6Addr::new(192, 0, 0, 0, 0, 0, 0, 1)), 8090),
ProtocolType::WSS, ProtocolType::WSS,
@ -59,12 +59,12 @@ pub async fn test_add_get_remove() {
let c4 = NetworkConnection::dummy(4.into(), a4); let c4 = NetworkConnection::dummy(4.into(), a4);
let c5 = NetworkConnection::dummy(5.into(), a5); let c5 = NetworkConnection::dummy(5.into(), a5);
assert_eq!(a1, c2.connection_descriptor()); assert_eq!(a1, c2.flow());
assert_ne!(a3, c4.connection_descriptor()); assert_ne!(a3, c4.flow());
assert_ne!(a4, c5.connection_descriptor()); assert_ne!(a4, c5.flow());
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
assert_eq!(table.peek_connection_by_descriptor(a1), None); assert_eq!(table.peek_connection_by_flow(a1), None);
table.add_connection(c1).unwrap(); table.add_connection(c1).unwrap();
assert!(table.add_connection(c1b).is_err()); assert!(table.add_connection(c1b).is_err());
@ -72,26 +72,26 @@ pub async fn test_add_get_remove() {
assert!(table.remove_connection_by_id(4.into()).is_none()); assert!(table.remove_connection_by_id(4.into()).is_none());
assert!(table.remove_connection_by_id(5.into()).is_none()); assert!(table.remove_connection_by_id(5.into()).is_none());
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone()));
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone()));
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_err!(table.add_connection(c2)); assert_err!(table.add_connection(c2));
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone()));
assert_eq!(table.peek_connection_by_descriptor(a1), Some(c1h.clone())); assert_eq!(table.peek_connection_by_flow(a1), Some(c1h.clone()));
assert_eq!(table.connection_count(), 1); assert_eq!(table.connection_count(), 1);
assert_eq!( assert_eq!(
table table
.remove_connection_by_id(1.into()) .remove_connection_by_id(1.into())
.map(|c| c.connection_descriptor()) .map(|c| c.flow())
.unwrap(), .unwrap(),
a1 a1
); );
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
assert!(table.remove_connection_by_id(2.into()).is_none()); assert!(table.remove_connection_by_id(2.into()).is_none());
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
assert_eq!(table.peek_connection_by_descriptor(a2), None); assert_eq!(table.peek_connection_by_flow(a2), None);
assert_eq!(table.peek_connection_by_descriptor(a1), None); assert_eq!(table.peek_connection_by_flow(a1), None);
assert_eq!(table.connection_count(), 0); assert_eq!(table.connection_count(), 0);
let c1 = NetworkConnection::dummy(6.into(), a1); let c1 = NetworkConnection::dummy(6.into(), a1);
table.add_connection(c1).unwrap(); table.add_connection(c1).unwrap();
@ -103,21 +103,21 @@ pub async fn test_add_get_remove() {
assert_eq!( assert_eq!(
table table
.remove_connection_by_id(6.into()) .remove_connection_by_id(6.into())
.map(|c| c.connection_descriptor()) .map(|c| c.flow())
.unwrap(), .unwrap(),
a2 a2
); );
assert_eq!( assert_eq!(
table table
.remove_connection_by_id(3.into()) .remove_connection_by_id(3.into())
.map(|c| c.connection_descriptor()) .map(|c| c.flow())
.unwrap(), .unwrap(),
a3 a3
); );
assert_eq!( assert_eq!(
table table
.remove_connection_by_id(4.into()) .remove_connection_by_id(4.into())
.map(|c| c.connection_descriptor()) .map(|c| c.flow())
.unwrap(), .unwrap(),
a4 a4
); );

View File

@ -97,8 +97,8 @@ impl From<AddressType> for DialInfoFilter {
} }
} }
impl From<ConnectionDescriptor> for DialInfoFilter { impl From<Flow> for DialInfoFilter {
fn from(other: ConnectionDescriptor) -> Self { fn from(other: Flow) -> Self {
Self { Self {
protocol_type_set: ProtocolTypeSet::from(other.protocol_type()), protocol_type_set: ProtocolTypeSet::from(other.protocol_type()),
address_type_set: AddressTypeSet::from(other.address_type()), address_type_set: AddressTypeSet::from(other.address_type()),

View File

@ -3,17 +3,21 @@ use super::*;
/// Represents the 5-tuple of an established connection /// Represents the 5-tuple of an established connection
/// Not used to specify connections to create, that is reserved for DialInfo /// Not used to specify connections to create, that is reserved for DialInfo
/// ///
/// ConnectionDescriptors should never be from unspecified local addresses for connection oriented protocols /// Abstracts both connections to 'connection oriented' protocols (TCP/WS/WSS), but also datagram protocols (UDP)
///
/// Flows should never be from UNSPECIFIED local addresses for connection oriented protocols
/// If the medium does not allow local addresses, None should have been used or 'new_no_local' /// If the medium does not allow local addresses, None should have been used or 'new_no_local'
/// If we are specifying only a port, then the socket's 'local_address()' should have been used, since an /// If we are specifying only a port, then the socket's 'local_address()' should have been used, since an
/// established connection is always from a real address to another real address. /// established connection is always from a real address to another real address.
///
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] #[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct ConnectionDescriptor { pub struct Flow {
remote: PeerAddress, remote: PeerAddress,
local: Option<SocketAddress>, local: Option<SocketAddress>,
} }
impl ConnectionDescriptor { impl Flow {
pub fn new(remote: PeerAddress, local: SocketAddress) -> Self { pub fn new(remote: PeerAddress, local: SocketAddress) -> Self {
assert!(!remote.protocol_type().is_ordered() || !local.address().is_unspecified()); assert!(!remote.protocol_type().is_ordered() || !local.address().is_unspecified());
@ -50,7 +54,7 @@ impl ConnectionDescriptor {
} }
} }
impl MatchesDialInfoFilter for ConnectionDescriptor { impl MatchesDialInfoFilter for Flow {
fn matches_filter(&self, filter: &DialInfoFilter) -> bool { fn matches_filter(&self, filter: &DialInfoFilter) -> bool {
if !filter.protocol_type_set.contains(self.protocol_type()) { if !filter.protocol_type_set.contains(self.protocol_type()) {
return false; return false;
@ -61,3 +65,14 @@ impl MatchesDialInfoFilter for ConnectionDescriptor {
true true
} }
} }
/// UniqueFlow is a record a specific flow that may or may not currently exist
/// The NetworkConnectionId associated with each flow may represent a low level network connection
/// and will be unique with high probability per low-level connection
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct UniqueFlow {
pub flow: Flow,
pub connection_id: Option<NetworkConnectionId>,
}
pub type NetworkConnectionId = AlignedU64;

View File

@ -1,9 +1,9 @@
mod address; mod address;
mod address_type; mod address_type;
mod connection_descriptor;
mod dial_info; mod dial_info;
mod dial_info_class; mod dial_info_class;
mod dial_info_filter; mod dial_info_filter;
mod flow;
mod low_level_protocol_type; mod low_level_protocol_type;
mod network_class; mod network_class;
mod peer_address; mod peer_address;
@ -15,10 +15,10 @@ use super::*;
pub use address::*; pub use address::*;
pub use address_type::*; pub use address_type::*;
pub use connection_descriptor::*;
pub use dial_info::*; pub use dial_info::*;
pub use dial_info_class::*; pub use dial_info_class::*;
pub use dial_info_filter::*; pub use dial_info_filter::*;
pub use flow::*;
pub use low_level_protocol_type::*; pub use low_level_protocol_type::*;
pub use network_class::*; pub use network_class::*;
pub use peer_address::*; pub use peer_address::*;

View File

@ -248,11 +248,11 @@ impl Network {
#[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))]
pub async fn send_data_to_existing_connection( pub async fn send_data_to_existing_connection(
&self, &self,
descriptor: ConnectionDescriptor, flow: Flow,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<Option<Vec<u8>>> { ) -> EyreResult<Option<Vec<u8>>> {
let data_len = data.len(); let data_len = data.len();
match descriptor.protocol_type() { match flow.protocol_type() {
ProtocolType::UDP => { ProtocolType::UDP => {
bail!("no support for UDP protocol") bail!("no support for UDP protocol")
} }
@ -265,13 +265,13 @@ impl Network {
// Handle connection-oriented protocols // Handle connection-oriented protocols
// Try to send to the exact existing connection if one exists // Try to send to the exact existing connection if one exists
if let Some(conn) = self.connection_manager().get_connection(descriptor) { if let Some(conn) = self.connection_manager().get_connection(flow) {
// connection exists, send over it // connection exists, send over it
match conn.send_async(data).await { match conn.send_async(data).await {
ConnectionHandleSendResult::Sent => { ConnectionHandleSendResult::Sent => {
// Network accounting // Network accounting
self.network_manager().stats_packet_sent( self.network_manager().stats_packet_sent(
descriptor.remote().socket_addr().ip(), flow.remote().socket_addr().ip(),
ByteCount::new(data_len as u64), ByteCount::new(data_len as u64),
); );
@ -295,7 +295,7 @@ impl Network {
&self, &self,
dial_info: DialInfo, dial_info: DialInfo,
data: Vec<u8>, data: Vec<u8>,
) -> EyreResult<NetworkResult<ConnectionDescriptor>> { ) -> EyreResult<NetworkResult<Flow>> {
self.record_dial_info_failure(dial_info.clone(), async move { self.record_dial_info_failure(dial_info.clone(), async move {
let data_len = data.len(); let data_len = data.len();
if dial_info.protocol_type() == ProtocolType::UDP { if dial_info.protocol_type() == ProtocolType::UDP {
@ -318,13 +318,13 @@ impl Network {
"failed to send", "failed to send",
))); )));
} }
let connection_descriptor = conn.connection_descriptor(); let flow = conn.flow();
// Network accounting // Network accounting
self.network_manager() self.network_manager()
.stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64)); .stats_packet_sent(dial_info.ip_addr(), ByteCount::new(data_len as u64));
Ok(NetworkResult::value(connection_descriptor)) Ok(NetworkResult::value(flow))
}) })
.await .await
} }

View File

@ -35,10 +35,10 @@ impl ProtocolNetworkConnection {
} }
} }
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn flow(&self) -> Flow {
match self { match self {
// Self::Dummy(d) => d.descriptor(), // Self::Dummy(d) => d.flow(),
Self::Ws(w) => w.descriptor(), Self::Ws(w) => w.flow(),
} }
} }
pub async fn close(&self) -> io::Result<NetworkResult<()>> { pub async fn close(&self) -> io::Result<NetworkResult<()>> {

View File

@ -34,7 +34,7 @@ fn to_io(err: WsErr) -> io::Error {
#[derive(Clone)] #[derive(Clone)]
pub struct WebsocketNetworkConnection { pub struct WebsocketNetworkConnection {
descriptor: ConnectionDescriptor, flow: Flow,
inner: Arc<WebsocketNetworkConnectionInner>, inner: Arc<WebsocketNetworkConnectionInner>,
} }
@ -45,9 +45,9 @@ impl fmt::Debug for WebsocketNetworkConnection {
} }
impl WebsocketNetworkConnection { impl WebsocketNetworkConnection {
pub fn new(descriptor: ConnectionDescriptor, ws_meta: WsMeta, ws_stream: WsStream) -> Self { pub fn new(flow: Flow, ws_meta: WsMeta, ws_stream: WsStream) -> Self {
Self { Self {
descriptor, flow,
inner: Arc::new(WebsocketNetworkConnectionInner { inner: Arc::new(WebsocketNetworkConnectionInner {
ws_meta, ws_meta,
ws_stream: CloneStream::new(ws_stream), ws_stream: CloneStream::new(ws_stream),
@ -55,8 +55,8 @@ impl WebsocketNetworkConnection {
} }
} }
pub fn descriptor(&self) -> ConnectionDescriptor { pub fn flow(&self) -> Flow {
self.descriptor self.flow
} }
#[cfg_attr( #[cfg_attr(
@ -147,9 +147,9 @@ impl WebsocketProtocolHandler {
.into_network_result()) .into_network_result())
.into_network_result()?); .into_network_result()?);
// Make our connection descriptor // Make our flow
let wnc = WebsocketNetworkConnection::new( let wnc = WebsocketNetworkConnection::new(
ConnectionDescriptor::new_no_local(dial_info.peer_address()), Flow::new_no_local(dial_info.peer_address()),
wsmeta, wsmeta,
wsio, wsio,
); );

View File

@ -36,7 +36,7 @@ pub(crate) enum BucketEntryState {
} }
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)] #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub(crate) struct LastConnectionKey(ProtocolType, AddressType); pub(crate) struct LastFlowKey(ProtocolType, AddressType);
/// Bucket entry information specific to the LocalNetwork RoutingDomain /// Bucket entry information specific to the LocalNetwork RoutingDomain
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -75,9 +75,9 @@ pub(crate) struct BucketEntryInner {
/// has the same timestamp, because if we change our own IP address or network class it may be possible for nodes that were /// has the same timestamp, because if we change our own IP address or network class it may be possible for nodes that were
/// unreachable may now be reachable with the same SignedNodeInfo/DialInfo /// unreachable may now be reachable with the same SignedNodeInfo/DialInfo
updated_since_last_network_change: bool, updated_since_last_network_change: bool,
/// The last connection descriptors used to contact this node, per protocol type /// The last flows used to contact this node, per protocol type
#[serde(skip)] #[serde(skip)]
last_connections: BTreeMap<LastConnectionKey, (ConnectionDescriptor, Timestamp)>, last_flows: BTreeMap<LastFlowKey, (Flow, Timestamp)>,
/// The node info for this entry on the publicinternet routing domain /// The node info for this entry on the publicinternet routing domain
public_internet: BucketEntryPublicInternet, public_internet: BucketEntryPublicInternet,
/// The node info for this entry on the localnetwork routing domain /// The node info for this entry on the localnetwork routing domain
@ -304,7 +304,7 @@ impl BucketEntryInner {
// The latest connection would have been the one we got the new node info // The latest connection would have been the one we got the new node info
// over so that connection is still valid. // over so that connection is still valid.
if node_info_changed { if node_info_changed {
self.clear_last_connections_except_latest(); self.clear_last_flows_except_latest();
} }
} }
@ -333,7 +333,7 @@ impl BucketEntryInner {
} }
// Check connections // Check connections
let last_connections = self.last_connections( let last_connections = self.last_flows(
rti, rti,
true, true,
NodeRefFilter::from(routing_domain), NodeRefFilter::from(routing_domain),
@ -387,7 +387,7 @@ impl BucketEntryInner {
} }
// Check connections // Check connections
let mut best_routing_domain: Option<RoutingDomain> = None; let mut best_routing_domain: Option<RoutingDomain> = None;
let last_connections = self.last_connections( let last_connections = self.last_flows(
rti, rti,
true, true,
NodeRefFilter::from(routing_domain_set), NodeRefFilter::from(routing_domain_set),
@ -408,77 +408,77 @@ impl BucketEntryInner {
best_routing_domain best_routing_domain
} }
fn descriptor_to_key(&self, last_connection: ConnectionDescriptor) -> LastConnectionKey { fn flow_to_key(&self, last_flow: Flow) -> LastFlowKey {
LastConnectionKey( LastFlowKey(
last_connection.protocol_type(), last_flow.protocol_type(),
last_connection.address_type(), last_flow.address_type(),
) )
} }
// Stores a connection descriptor in this entry's table of last connections // Stores a flow in this entry's table of last flows
pub fn set_last_connection(&mut self, last_connection: ConnectionDescriptor, timestamp: Timestamp) { pub fn set_last_flow(&mut self, last_flow: Flow, timestamp: Timestamp) {
if self.is_punished { if self.is_punished {
// Don't record connection if this entry is currently punished // Don't record connection if this entry is currently punished
return; return;
} }
let key = self.descriptor_to_key(last_connection); let key = self.flow_to_key(last_flow);
self.last_connections self.last_flows
.insert(key, (last_connection, timestamp)); .insert(key, (last_flow, timestamp));
} }
// Removes a connection descriptor in this entry's table of last connections // Removes a flow in this entry's table of last flows
pub fn clear_last_connection(&mut self, last_connection: ConnectionDescriptor) { pub fn remove_last_flow(&mut self, last_flow: Flow) {
let key = self.descriptor_to_key(last_connection); let key = self.flow_to_key(last_flow);
self.last_connections self.last_flows
.remove(&key); .remove(&key);
} }
// Clears the table of last connections to ensure we create new ones and drop any existing ones // Clears the table of last flows to ensure we create new ones and drop any existing ones
pub fn clear_last_connections(&mut self) { pub fn clear_last_flows(&mut self) {
self.last_connections.clear(); self.last_flows.clear();
} }
// Clears the table of last connections except the most recent one // Clears the table of last flows except the most recent one
pub fn clear_last_connections_except_latest(&mut self) { pub fn clear_last_flows_except_latest(&mut self) {
if self.last_connections.is_empty() { if self.last_flows.is_empty() {
// No last_connections // No last_connections
return; return;
} }
let mut dead_keys = Vec::with_capacity(self.last_connections.len()-1); let mut dead_keys = Vec::with_capacity(self.last_flows.len()-1);
let mut most_recent_connection = None; let mut most_recent_flow = None;
let mut most_recent_connection_time = 0u64; let mut most_recent_flow_time = 0u64;
for (k, v) in &self.last_connections { for (k, v) in &self.last_flows {
let lct = v.1.as_u64(); let lct = v.1.as_u64();
if lct > most_recent_connection_time { if lct > most_recent_flow_time {
most_recent_connection = Some(k); most_recent_flow = Some(k);
most_recent_connection_time = lct; most_recent_flow_time = lct;
} }
} }
let Some(most_recent_connection) = most_recent_connection else { let Some(most_recent_flow) = most_recent_flow else {
return; return;
}; };
for k in self.last_connections.keys() { for k in self.last_flows.keys() {
if k != most_recent_connection { if k != most_recent_flow {
dead_keys.push(k.clone()); dead_keys.push(k.clone());
} }
} }
for dk in dead_keys { for dk in dead_keys {
self.last_connections.remove(&dk); self.last_flows.remove(&dk);
} }
} }
// Gets all the 'last connections' that match a particular filter, and their accompanying timestamps of last use // Gets all the 'last flows' that match a particular filter, and their accompanying timestamps of last use
pub(super) fn last_connections( pub(super) fn last_flows(
&self, &self,
rti: &RoutingTableInner, rti: &RoutingTableInner,
only_live: bool, only_live: bool,
filter: NodeRefFilter, filter: NodeRefFilter,
) -> Vec<(ConnectionDescriptor, Timestamp)> { ) -> Vec<(Flow, Timestamp)> {
let connection_manager = let connection_manager =
rti.unlocked_inner.network_manager.connection_manager(); rti.unlocked_inner.network_manager.connection_manager();
let mut out: Vec<(ConnectionDescriptor, Timestamp)> = self let mut out: Vec<(Flow, Timestamp)> = self
.last_connections .last_flows
.iter() .iter()
.filter_map(|(k, v)| { .filter_map(|(k, v)| {
let include = { let include = {
@ -564,7 +564,7 @@ impl BucketEntryInner {
pub fn set_punished(&mut self, punished: bool) { pub fn set_punished(&mut self, punished: bool) {
self.is_punished = punished; self.is_punished = punished;
if punished { if punished {
self.clear_last_connections(); self.clear_last_flows();
} }
} }
@ -845,7 +845,7 @@ impl BucketEntry {
unsupported_node_ids: TypedKeyGroup::new(), unsupported_node_ids: TypedKeyGroup::new(),
envelope_support: Vec::new(), envelope_support: Vec::new(),
updated_since_last_network_change: false, updated_since_last_network_change: false,
last_connections: BTreeMap::new(), last_flows: BTreeMap::new(),
local_network: BucketEntryLocalNetwork { local_network: BucketEntryLocalNetwork {
last_seen_our_node_info_ts: Timestamp::new(0u64), last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None, signed_node_info: None,

View File

@ -90,7 +90,7 @@ pub type BucketIndex = (CryptoKind, usize);
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub(crate) struct RecentPeersEntry { pub(crate) struct RecentPeersEntry {
pub last_connection: ConnectionDescriptor, pub last_connection: Flow,
} }
pub(crate) struct RoutingTableUnlockedInner { pub(crate) struct RoutingTableUnlockedInner {
@ -662,13 +662,13 @@ impl RoutingTable {
pub fn register_node_with_existing_connection( pub fn register_node_with_existing_connection(
&self, &self,
node_id: TypedKey, node_id: TypedKey,
descriptor: ConnectionDescriptor, flow: Flow,
timestamp: Timestamp, timestamp: Timestamp,
) -> EyreResult<NodeRef> { ) -> EyreResult<NodeRef> {
self.inner.write().register_node_with_existing_connection( self.inner.write().register_node_with_existing_connection(
self.clone(), self.clone(),
node_id, node_id,
descriptor, flow,
timestamp, timestamp,
) )
} }
@ -698,7 +698,7 @@ impl RoutingTable {
for e in &recent_peers { for e in &recent_peers {
let mut dead = true; let mut dead = true;
if let Ok(Some(nr)) = self.lookup_node_ref(*e) { if let Ok(Some(nr)) = self.lookup_node_ref(*e) {
if let Some(last_connection) = nr.last_connection() { if let Some(last_connection) = nr.last_flow() {
out.push((*e, RecentPeersEntry { last_connection })); out.push((*e, RecentPeersEntry { last_connection }));
dead = false; dead = false;
} }

View File

@ -273,13 +273,13 @@ pub(crate) trait NodeRefBase: Sized {
/// Get the most recent 'last connection' to this node /// Get the most recent 'last connection' to this node
/// Filtered first and then sorted by ordering preference and then by most recent /// Filtered first and then sorted by ordering preference and then by most recent
fn last_connection(&self) -> Option<ConnectionDescriptor> { fn last_flow(&self) -> Option<Flow> {
self.operate(|rti, e| { self.operate(|rti, e| {
// apply sequencing to filter and get sort // apply sequencing to filter and get sort
let sequencing = self.common().sequencing; let sequencing = self.common().sequencing;
let filter = self.common().filter.unwrap_or_default(); let filter = self.common().filter.unwrap_or_default();
let (ordered, filter) = filter.with_sequencing(sequencing); let (ordered, filter) = filter.with_sequencing(sequencing);
let mut last_connections = e.last_connections(rti, true, filter); let mut last_connections = e.last_flows(rti, true, filter);
if ordered { if ordered {
last_connections.sort_by(|a, b| { last_connections.sort_by(|a, b| {
@ -292,19 +292,19 @@ pub(crate) trait NodeRefBase: Sized {
} }
fn clear_last_connections(&self) { fn clear_last_connections(&self) {
self.operate_mut(|_rti, e| e.clear_last_connections()) self.operate_mut(|_rti, e| e.clear_last_flows())
} }
fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: Timestamp) { fn set_last_flow(&self, flow: Flow, ts: Timestamp) {
self.operate_mut(|rti, e| { self.operate_mut(|rti, e| {
e.set_last_connection(connection_descriptor, ts); e.set_last_flow(flow, ts);
rti.touch_recent_peer(e.best_node_id(), connection_descriptor); rti.touch_recent_peer(e.best_node_id(), flow);
}) })
} }
fn clear_last_connection(&self, connection_descriptor: ConnectionDescriptor) { fn clear_last_connection(&self, flow: Flow) {
self.operate_mut(|_rti, e| { self.operate_mut(|_rti, e| {
e.clear_last_connection(connection_descriptor); e.remove_last_flow(flow);
}) })
} }
@ -325,6 +325,10 @@ pub(crate) trait NodeRefBase: Sized {
self.stats_failed_to_send(get_aligned_timestamp(), false); self.stats_failed_to_send(get_aligned_timestamp(), false);
} }
fn report_failed_route_test(&self) {
self.stats_failed_to_send(get_aligned_timestamp(), false);
}
fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) { fn stats_question_sent(&self, ts: Timestamp, bytes: Timestamp, expects_answer: bool) {
self.operate_mut(|rti, e| { self.operate_mut(|rti, e| {
rti.transfer_stats_accounting().add_up(bytes); rti.transfer_stats_accounting().add_up(bytes);

View File

@ -112,8 +112,8 @@ impl From<AddressType> for NodeRefFilter {
} }
} }
impl From<ConnectionDescriptor> for NodeRefFilter { impl From<Flow> for NodeRefFilter {
fn from(other: ConnectionDescriptor) -> Self { fn from(other: Flow) -> Self {
Self { Self {
routing_domain_set: RoutingDomainSet::all(), routing_domain_set: RoutingDomainSet::all(),
dial_info_filter: DialInfoFilter::from(other), dial_info_filter: DialInfoFilter::from(other),

View File

@ -661,9 +661,9 @@ impl RouteSpecStore {
)] )]
async fn test_allocated_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> { async fn test_allocated_route(&self, private_route_id: RouteId) -> VeilidAPIResult<bool> {
// Make loopback route to test with // Make loopback route to test with
let dest = { let (dest, hops) = {
// Get the best private route for this id // Get the best allocated route for this id
let (key, hop_count) = { let (key, hops) = {
let inner = &mut *self.inner.lock(); let inner = &mut *self.inner.lock();
let Some(rssd) = inner.content.get_detail(&private_route_id) else { let Some(rssd) = inner.content.get_detail(&private_route_id) else {
apibail_invalid_argument!( apibail_invalid_argument!(
@ -675,9 +675,10 @@ impl RouteSpecStore {
let Some(key) = rssd.get_best_route_set_key() else { let Some(key) = rssd.get_best_route_set_key() else {
apibail_internal!("no best key to test allocated route"); apibail_internal!("no best key to test allocated route");
}; };
// Match the private route's hop length for safety route length // Get the hops so we can match the route's hop length for safety
let hop_count = rssd.hop_count(); // route length as well as marking nodes as unreliable if this fails
(key, hop_count) let hops = rssd.hops_node_refs();
(key, hops)
}; };
// Get the private route to send to // Get the private route to send to
@ -686,6 +687,8 @@ impl RouteSpecStore {
let stability = Stability::Reliable; let stability = Stability::Reliable;
// Routes should test with the most likely to succeed sequencing they are capable of // Routes should test with the most likely to succeed sequencing they are capable of
let sequencing = Sequencing::PreferOrdered; let sequencing = Sequencing::PreferOrdered;
// Hop count for safety spec should match the private route spec
let hop_count = hops.len();
let safety_spec = SafetySpec { let safety_spec = SafetySpec {
preferred_route: Some(private_route_id), preferred_route: Some(private_route_id),
@ -695,10 +698,13 @@ impl RouteSpecStore {
}; };
let safety_selection = SafetySelection::Safe(safety_spec); let safety_selection = SafetySelection::Safe(safety_spec);
(
Destination::PrivateRoute { Destination::PrivateRoute {
private_route, private_route,
safety_selection, safety_selection,
} },
hops,
)
}; };
// Test with double-round trip ping to self // Test with double-round trip ping to self
@ -706,7 +712,12 @@ impl RouteSpecStore {
let _res = match rpc_processor.rpc_call_status(dest).await? { let _res = match rpc_processor.rpc_call_status(dest).await? {
NetworkResult::Value(v) => v, NetworkResult::Value(v) => v,
_ => { _ => {
// Did not error, but did not come back, just return false // Did not error, but did not come back, mark the nodes as failed to send, and then return false
// This will prevent those node from immediately being included in the next allocated route,
// avoiding the same route being constructed to replace this one when it is removed.
for hop in hops {
hop.report_failed_route_test();
}
return Ok(false); return Ok(false);
} }
}; };

View File

@ -86,6 +86,9 @@ impl RouteSetSpecDetail {
pub fn hop_count(&self) -> usize { pub fn hop_count(&self) -> usize {
self.hop_node_refs.len() self.hop_node_refs.len()
} }
pub fn hops_node_refs(&self) -> Vec<NodeRef> {
self.hop_node_refs.clone()
}
pub fn hop_node_ref(&self, idx: usize) -> Option<NodeRef> { pub fn hop_node_ref(&self, idx: usize) -> Option<NodeRef> {
self.hop_node_refs.get(idx).cloned() self.hop_node_refs.get(idx).cloned()
} }

View File

@ -378,7 +378,7 @@ impl RoutingTableInner {
for bucket in &self.buckets[&ck] { for bucket in &self.buckets[&ck] {
for entry in bucket.entries() { for entry in bucket.entries() {
entry.1.with_mut_inner(|e| { entry.1.with_mut_inner(|e| {
e.clear_last_connections(); e.clear_last_flows();
}); });
} }
} }
@ -853,7 +853,7 @@ impl RoutingTableInner {
&mut self, &mut self,
outer_self: RoutingTable, outer_self: RoutingTable,
node_id: TypedKey, node_id: TypedKey,
descriptor: ConnectionDescriptor, flow: Flow,
timestamp: Timestamp, timestamp: Timestamp,
) -> EyreResult<NodeRef> { ) -> EyreResult<NodeRef> {
let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| { let nr = self.create_node_ref(outer_self, &TypedKeyGroup::from(node_id), |_rti, e| {
@ -861,8 +861,7 @@ impl RoutingTableInner {
e.touch_last_seen(timestamp); e.touch_last_seen(timestamp);
})?; })?;
// set the most recent node address for connection finding and udp replies // set the most recent node address for connection finding and udp replies
nr.locked_mut(self) nr.locked_mut(self).set_last_flow(flow, timestamp);
.set_last_connection(descriptor, timestamp);
Ok(nr) Ok(nr)
} }
@ -912,7 +911,7 @@ impl RoutingTableInner {
} }
} }
pub fn touch_recent_peer(&mut self, node_id: TypedKey, last_connection: ConnectionDescriptor) { pub fn touch_recent_peer(&mut self, node_id: TypedKey, last_connection: Flow) {
self.recent_peers self.recent_peers
.insert(node_id, RecentPeersEntry { last_connection }); .insert(node_id, RecentPeersEntry { last_connection });
} }

View File

@ -54,8 +54,8 @@ struct RPCMessageHeaderDetailDirect {
envelope: Envelope, envelope: Envelope,
/// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it /// The noderef of the peer that sent the message (not the original sender). Ensures node doesn't get evicted from routing table until we're done with it
peer_noderef: NodeRef, peer_noderef: NodeRef,
/// The connection from the peer sent the message (not the original sender) /// The flow from the peer sent the message (not the original sender)
connection_descriptor: ConnectionDescriptor, flow: Flow,
/// The routing domain the message was sent through /// The routing domain the message was sent through
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
} }
@ -189,7 +189,7 @@ struct WaitableReply {
safety_route: Option<PublicKey>, safety_route: Option<PublicKey>,
remote_private_route: Option<PublicKey>, remote_private_route: Option<PublicKey>,
reply_private_route: Option<PublicKey>, reply_private_route: Option<PublicKey>,
_connection_ref_scope: ConnectionRefScope, _opt_connection_ref_scope: Option<ConnectionRefScope>,
} }
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
@ -1158,7 +1158,7 @@ impl RPCProcessor {
// Log rpc send // Log rpc send
#[cfg(feature = "verbose-tracing")] #[cfg(feature = "verbose-tracing")]
debug!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest, protect); debug!(target: "rpc_message", dir = "send", kind = "question", op_id = op_id.as_u64(), desc = operation.kind().desc(), ?dest);
// Produce rendered operation // Produce rendered operation
let RenderedOperation { let RenderedOperation {
@ -1224,10 +1224,10 @@ impl RPCProcessor {
// Ref the connection so it doesn't go away until we're done with the waitable reply // Ref the connection so it doesn't go away until we're done with the waitable reply
let connection_ref_scope = self let opt_connection_ref_scope = send_data_method.unique_flow.connection_id.and_then(|id| self
.network_manager() .network_manager()
.connection_manager() .connection_manager()
.connection_ref_scope(send_data_method.connection_descriptor); .try_connection_ref_scope(id));
// Pass back waitable reply completion // Pass back waitable reply completion
Ok(NetworkResult::value(WaitableReply { Ok(NetworkResult::value(WaitableReply {
@ -1239,7 +1239,7 @@ impl RPCProcessor {
safety_route, safety_route,
remote_private_route, remote_private_route,
reply_private_route, reply_private_route,
_connection_ref_scope: connection_ref_scope, _opt_connection_ref_scope: opt_connection_ref_scope,
})) }))
} }
@ -1660,7 +1660,7 @@ impl RPCProcessor {
&self, &self,
envelope: Envelope, envelope: Envelope,
peer_noderef: NodeRef, peer_noderef: NodeRef,
connection_descriptor: ConnectionDescriptor, flow: Flow,
routing_domain: RoutingDomain, routing_domain: RoutingDomain,
body: Vec<u8>, body: Vec<u8>,
) -> EyreResult<()> { ) -> EyreResult<()> {
@ -1668,7 +1668,7 @@ impl RPCProcessor {
detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect { detail: RPCMessageHeaderDetail::Direct(RPCMessageHeaderDetailDirect {
envelope, envelope,
peer_noderef, peer_noderef,
connection_descriptor, flow,
routing_domain, routing_domain,
}), }),
timestamp: get_aligned_timestamp(), timestamp: get_aligned_timestamp(),

View File

@ -49,8 +49,8 @@ impl RPCProcessor {
// Can't allow anything other than direct packets here, as handling reverse connections // Can't allow anything other than direct packets here, as handling reverse connections
// or anything like via signals over private routes would deanonymize the route // or anything like via signals over private routes would deanonymize the route
let connection_descriptor = match &msg.header.detail { let flow = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(d) => d.connection_descriptor, RPCMessageHeaderDetail::Direct(d) => d.flow,
RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => { RPCMessageHeaderDetail::SafetyRouted(_) | RPCMessageHeaderDetail::PrivateRouted(_) => {
return Ok(NetworkResult::invalid_message("signal must be direct")); return Ok(NetworkResult::invalid_message("signal must be direct"));
} }
@ -70,7 +70,7 @@ impl RPCProcessor {
let network_manager = self.network_manager(); let network_manager = self.network_manager();
let signal_info = signal.destructure(); let signal_info = signal.destructure();
network_manager network_manager
.handle_signal(connection_descriptor, signal_info) .handle_signal(flow, signal_info)
.await .await
.map_err(RPCError::network) .map_err(RPCError::network)
} }

View File

@ -162,13 +162,13 @@ impl RPCProcessor {
.network_manager() .network_manager()
.report_public_internet_socket_address( .report_public_internet_socket_address(
sender_info.socket_address, sender_info.socket_address,
send_data_method.connection_descriptor, send_data_method.unique_flow.flow,
target, target,
), ),
RoutingDomain::LocalNetwork => { RoutingDomain::LocalNetwork => {
self.network_manager().report_local_network_socket_address( self.network_manager().report_local_network_socket_address(
sender_info.socket_address, sender_info.socket_address,
send_data_method.connection_descriptor, send_data_method.unique_flow.flow,
target, target,
) )
} }
@ -208,7 +208,7 @@ impl RPCProcessor {
let (node_status, sender_info) = match &msg.header.detail { let (node_status, sender_info) = match &msg.header.detail {
RPCMessageHeaderDetail::Direct(detail) => { RPCMessageHeaderDetail::Direct(detail) => {
let connection_descriptor = detail.connection_descriptor; let flow = detail.flow;
let routing_domain = detail.routing_domain; let routing_domain = detail.routing_domain;
// Ensure the node status from the question is the kind for the routing domain we received the request in // Ensure the node status from the question is the kind for the routing domain we received the request in
@ -222,7 +222,7 @@ impl RPCProcessor {
// Get the peer address in the returned sender info // Get the peer address in the returned sender info
let sender_info = SenderInfo { let sender_info = SenderInfo {
socket_address: *connection_descriptor.remote_address(), socket_address: *flow.remote_address(),
}; };
// Make status answer // Make status answer

View File

@ -34,6 +34,7 @@ rt-tokio = [
] ]
tracking = ["veilid-core/tracking"] tracking = ["veilid-core/tracking"]
network-result-extra = ["veilid-core/network-result-extra"] network-result-extra = ["veilid-core/network-result-extra"]
verbose-tracing = ["veilid-core/verbose-tracing"]
[dependencies] [dependencies]
veilid-core = { path = "../veilid-core", default-features = false } veilid-core = { path = "../veilid-core", default-features = false }