diff --git a/veilid-core/src/routing_table/bucket.rs b/veilid-core/src/routing_table/bucket.rs index 1b819d7d..2f367aa3 100644 --- a/veilid-core/src/routing_table/bucket.rs +++ b/veilid-core/src/routing_table/bucket.rs @@ -118,7 +118,7 @@ impl Bucket { pub(super) fn kick( &mut self, bucket_depth: usize, - closest_nodes: &BTreeSet, + exempt_peers: &BTreeSet, ) -> Option> { // Get number of entries to attempt to purge from bucket let bucket_len = self.entries.len(); @@ -171,8 +171,8 @@ impl Bucket { continue; } - // if this entry is one of our N closest entries, don't drop it - if closest_nodes.contains(&entry.0) { + // if this entry is one of our exempt entries, don't drop it + if exempt_peers.contains(&entry.0) { continue; } diff --git a/veilid-core/src/routing_table/bucket_entry.rs b/veilid-core/src/routing_table/bucket_entry.rs index c5dca3a4..5497f81f 100644 --- a/veilid-core/src/routing_table/bucket_entry.rs +++ b/veilid-core/src/routing_table/bucket_entry.rs @@ -166,12 +166,20 @@ impl BucketEntryInner { common_crypto_kinds(&self.validated_node_ids.kinds(), other) } - /// Capability check - pub fn has_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool { + /// All-of capability check + pub fn has_all_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool { let Some(ni) = self.node_info(routing_domain) else { return false; }; - ni.has_capabilities(capabilities) + ni.has_all_capabilities(capabilities) + } + + /// Any-of capability check + pub fn has_any_capabilities(&self, routing_domain: RoutingDomain, capabilities: &[Capability]) -> bool { + let Some(ni) = self.node_info(routing_domain) else { + return false; + }; + ni.has_any_capabilities(capabilities) } // Less is faster diff --git a/veilid-core/src/routing_table/debug.rs b/veilid-core/src/routing_table/debug.rs index 86341f98..3691d279 100644 --- a/veilid-core/src/routing_table/debug.rs +++ b/veilid-core/src/routing_table/debug.rs @@ -132,7 +132,7 @@ impl RoutingTable { .entries() .filter(|e| { let cap_match = e.1.with(inner, |_rti, e| { - e.has_capabilities(RoutingDomain::PublicInternet, &capabilities) + e.has_all_capabilities(RoutingDomain::PublicInternet, &capabilities) }); let state = e.1.with(inner, |_rti, e| e.state(cur_ts)); state >= min_state && cap_match diff --git a/veilid-core/src/routing_table/find_peers.rs b/veilid-core/src/routing_table/find_peers.rs index 0d3a56e7..e2cc2970 100644 --- a/veilid-core/src/routing_table/find_peers.rs +++ b/veilid-core/src/routing_table/find_peers.rs @@ -34,12 +34,12 @@ impl RoutingTable { // Ensure capabilities are met match opt_entry { Some(entry) => entry.with(rti, |_rti, e| { - e.has_capabilities(RoutingDomain::PublicInternet, capabilities) + e.has_all_capabilities(RoutingDomain::PublicInternet, capabilities) }), None => own_peer_info .signed_node_info() .node_info() - .has_capabilities(capabilities), + .has_all_capabilities(capabilities), } }, ) as RoutingTableEntryFilter; @@ -98,7 +98,9 @@ impl RoutingTable { }; // Ensure only things that have a minimum set of capabilities are returned entry.with(rti, |rti, e| { - if !e.has_capabilities(RoutingDomain::PublicInternet, &required_capabilities) { + if !e + .has_all_capabilities(RoutingDomain::PublicInternet, &required_capabilities) + { return false; } // Ensure only things that are valid/signed in the PublicInternet domain are returned diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index faf54f4a..0418a23c 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -115,6 +115,8 @@ pub(crate) struct RoutingTableUnlockedInner { bootstrap_task: TickTask, /// Background process to ensure we have enough nodes in our routing table peer_minimum_refresh_task: TickTask, + /// Background process to ensure we have enough nodes close to our own in our routing table + closest_peers_refresh_task: TickTask, /// Background process to check nodes to see if they are still alive and for reliability ping_validator_task: TickTask, /// Background process to keep relays up @@ -223,6 +225,7 @@ impl RoutingTable { kick_buckets_task: TickTask::new(1), bootstrap_task: TickTask::new(1), peer_minimum_refresh_task: TickTask::new(1), + closest_peers_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), ping_validator_task: TickTask::new(1), relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), @@ -1029,18 +1032,21 @@ impl RoutingTable { out } + /// Finds nodes near a particular node id + /// Ensures all returned nodes have a set of capabilities enabled #[instrument(level = "trace", skip(self), err)] pub async fn find_node( &self, node_ref: NodeRef, node_id: TypedKey, + capabilities: Vec, ) -> EyreResult>> { let rpc_processor = self.rpc_processor(); let res = network_result_try!( rpc_processor .clone() - .rpc_call_find_node(Destination::direct(node_ref), node_id, vec![]) + .rpc_call_find_node(Destination::direct(node_ref), node_id, capabilities) .await? ); @@ -1051,36 +1057,45 @@ impl RoutingTable { } /// Ask a remote node to list the nodes it has around the current node + /// Ensures all returned nodes have a set of capabilities enabled #[instrument(level = "trace", skip(self), err)] pub async fn find_self( &self, crypto_kind: CryptoKind, node_ref: NodeRef, + capabilities: Vec, ) -> EyreResult>> { let self_node_id = self.node_id(crypto_kind); - self.find_node(node_ref, self_node_id).await + self.find_node(node_ref, self_node_id, capabilities).await } /// Ask a remote node to list the nodes it has around itself + /// Ensures all returned nodes have a set of capabilities enabled #[instrument(level = "trace", skip(self), err)] pub async fn find_target( &self, crypto_kind: CryptoKind, node_ref: NodeRef, + capabilities: Vec, ) -> EyreResult>> { let Some(target_node_id) = node_ref.node_ids().get(crypto_kind) else { bail!("no target node ids for this crypto kind"); }; - self.find_node(node_ref, target_node_id).await + self.find_node(node_ref, target_node_id, capabilities).await } + /// Ask node to 'find node' on own node so we can get some more nodes near ourselves + /// and then contact those nodes to inform -them- that we exist #[instrument(level = "trace", skip(self))] - pub async fn reverse_find_node(&self, crypto_kind: CryptoKind, node_ref: NodeRef, wide: bool) { - // Ask node to 'find node' on own node so we can get some more nodes near ourselves - // and then contact those nodes to inform -them- that we exist - + pub async fn reverse_find_node( + &self, + crypto_kind: CryptoKind, + node_ref: NodeRef, + wide: bool, + capabilities: Vec, + ) { // Ask node for nodes closest to our own node - let closest_nodes = network_result_value_or_log!(match self.find_self(crypto_kind, node_ref.clone()).await { + let closest_nodes = network_result_value_or_log!(match self.find_self(crypto_kind, node_ref.clone(), capabilities.clone()).await { Err(e) => { log_rtab!(error "find_self failed for {:?}: {:?}", @@ -1096,7 +1111,7 @@ impl RoutingTable { // Ask each node near us to find us as well if wide { for closest_nr in closest_nodes { - network_result_value_or_log!(match self.find_self(crypto_kind, closest_nr.clone()).await { + network_result_value_or_log!(match self.find_self(crypto_kind, closest_nr.clone(), capabilities.clone()).await { Err(e) => { log_rtab!(error "find_self failed for {:?}: {:?}", diff --git a/veilid-core/src/routing_table/routing_table_inner.rs b/veilid-core/src/routing_table/routing_table_inner.rs index ef8a5cb4..bca042f7 100644 --- a/veilid-core/src/routing_table/routing_table_inner.rs +++ b/veilid-core/src/routing_table/routing_table_inner.rs @@ -397,11 +397,11 @@ impl RoutingTableInner { /// Attempt to settle buckets and remove entries down to the desired number /// which may not be possible due extant NodeRefs - pub fn kick_bucket(&mut self, bucket_index: BucketIndex, closest_nodes: &BTreeSet) { + pub fn kick_bucket(&mut self, bucket_index: BucketIndex, exempt_peers: &BTreeSet) { let bucket = self.get_bucket_mut(bucket_index); let bucket_depth = Self::bucket_depth(bucket_index); - if let Some(_dead_node_ids) = bucket.kick(bucket_depth, closest_nodes) { + if let Some(_dead_node_ids) = bucket.kick(bucket_depth, exempt_peers) { // Remove expired entries self.all_entries.remove_expired(); diff --git a/veilid-core/src/routing_table/tasks/bootstrap.rs b/veilid-core/src/routing_table/tasks/bootstrap.rs index 64f5a53c..a195a890 100644 --- a/veilid-core/src/routing_table/tasks/bootstrap.rs +++ b/veilid-core/src/routing_table/tasks/bootstrap.rs @@ -300,7 +300,7 @@ impl RoutingTable { // Need VALID signed peer info, so ask bootstrap to find_node of itself // which will ensure it has the bootstrap's signed peer info as part of the response - let _ = routing_table.find_target(crypto_kind, nr.clone()).await; + let _ = routing_table.find_target(crypto_kind, nr.clone(), vec![]).await; // Ensure we got the signed peer info if !nr.signed_node_info_has_valid_signature(RoutingDomain::PublicInternet) { @@ -311,7 +311,7 @@ impl RoutingTable { routing_table.network_manager().address_filter().set_dial_info_failed(bsdi); } else { // otherwise this bootstrap is valid, lets ask it to find ourselves now - routing_table.reverse_find_node(crypto_kind, nr, true).await + routing_table.reverse_find_node(crypto_kind, nr, true, vec![]).await } } .instrument(Span::current()), diff --git a/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs b/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs new file mode 100644 index 00000000..22f4e92b --- /dev/null +++ b/veilid-core/src/routing_table/tasks/closest_peers_refresh.rs @@ -0,0 +1,89 @@ +use super::*; + +/// How many nodes to consult for closest peers simultaneously +pub const CLOSEST_PEERS_REQUEST_COUNT: usize = 5; + +use futures_util::stream::{FuturesUnordered, StreamExt}; +use stop_token::future::FutureExt as StopFutureExt; + +impl RoutingTable { + /// Ask our closest peers to give us more peers close to ourselves. This will + /// assist with the DHT and other algorithms that utilize the distance metric. + #[instrument(level = "trace", skip(self), err)] + pub(crate) async fn closest_peers_refresh_task_routine( + self, + stop_token: StopToken, + ) -> EyreResult<()> { + let mut unord = FuturesUnordered::new(); + + for crypto_kind in VALID_CRYPTO_KINDS { + // Get our node id for this cryptokind + let self_node_id = self.node_id(crypto_kind); + + let routing_table = self.clone(); + let mut filters = VecDeque::new(); + let filter = Box::new( + move |rti: &RoutingTableInner, opt_entry: Option>| { + // Exclude our own node + let Some(entry) = opt_entry else { + return false; + }; + + entry.with(rti, |_rti, e| { + // Keep only the entries that contain the crypto kind we're looking for + let compatible_crypto = e.crypto_kinds().contains(&crypto_kind); + if !compatible_crypto { + return false; + } + // Keep only the entries that participate in distance-metric relevant capabilities + // This would be better to be 'has_any_capabilities' but for now until out capnp gets + // this ability, it will do. + if !e.has_all_capabilities( + RoutingDomain::PublicInternet, + DISTANCE_METRIC_CAPABILITIES, + ) { + return false; + } + true + }) + }, + ) as RoutingTableEntryFilter; + filters.push_front(filter); + + let noderefs = routing_table + .find_preferred_closest_nodes( + CLOSEST_PEERS_REQUEST_COUNT, + self_node_id, + filters, + |_rti, entry: Option>| { + NodeRef::new(routing_table.clone(), entry.unwrap().clone(), None) + }, + ) + .unwrap(); + + for nr in noderefs { + let routing_table = self.clone(); + unord.push( + async move { + // This would be better if it were 'any' instead of 'all' capabilities + // but that requires extending the capnp to support it. + routing_table + .reverse_find_node( + crypto_kind, + nr, + false, + DISTANCE_METRIC_CAPABILITIES.to_vec(), + ) + .await + } + .instrument(Span::current()), + ); + } + } + + // do closest peers search in parallel + while let Ok(Some(_)) = unord.next().timeout_at(stop_token.clone()).await {} + + Ok(()) + } +} diff --git a/veilid-core/src/routing_table/tasks/kick_buckets.rs b/veilid-core/src/routing_table/tasks/kick_buckets.rs index b6f0bed9..dd58472e 100644 --- a/veilid-core/src/routing_table/tasks/kick_buckets.rs +++ b/veilid-core/src/routing_table/tasks/kick_buckets.rs @@ -1,10 +1,10 @@ use super::*; /// How many 'reliable' nodes closest to our own node id to keep -const KEEP_N_CLOSEST_RELIABLE_ENTRIES_COUNT: usize = 16; +const KEEP_N_CLOSEST_RELIABLE_PEERS_COUNT: usize = 16; /// How many 'unreliable' nodes closest to our own node id to keep -const KEEP_N_CLOSEST_UNRELIABLE_ENTRIES_COUNT: usize = 8; +const KEEP_N_CLOSEST_UNRELIABLE_PEERS_COUNT: usize = 8; impl RoutingTable { // Kick the queued buckets in the routing table to free dead nodes if necessary @@ -22,8 +22,8 @@ impl RoutingTable { .collect(); let mut inner = self.inner.write(); - // Get our closest nodes for each crypto kind - let mut closest_nodes_by_kind = BTreeMap::>::new(); + // Get our exempt nodes for each crypto kind + let mut exempt_peers_by_kind = BTreeMap::>::new(); for kind in VALID_CRYPTO_KINDS { let our_node_id = self.node_id(kind); @@ -32,7 +32,7 @@ impl RoutingTable { }; let sort = make_closest_node_id_sort(self.crypto(), our_node_id); - let mut closest_nodes = BTreeSet::::new(); + let mut closest_peers = BTreeSet::::new(); let mut closest_unreliable_count = 0usize; let mut closest_reliable_count = 0usize; @@ -41,6 +41,17 @@ impl RoutingTable { let mut entries = bucket.entries().collect::>(); entries.sort_by(|a, b| sort(a.0, b.0)); for (key, entry) in entries { + // See if this entry is a distance-metric capability node + // If not, disqualify it from this closest_nodes list + if !entry.with(&inner, |_rti, e| { + e.has_any_capabilities( + RoutingDomain::PublicInternet, + DISTANCE_METRIC_CAPABILITIES, + ) + }) { + continue; + } + let state = entry.with(&inner, |_rti, e| e.state(cur_ts)); match state { BucketEntryState::Dead => { @@ -48,32 +59,32 @@ impl RoutingTable { } BucketEntryState::Unreliable => { // Add to closest unreliable nodes list - if closest_unreliable_count < KEEP_N_CLOSEST_UNRELIABLE_ENTRIES_COUNT { - closest_nodes.insert(*key); + if closest_unreliable_count < KEEP_N_CLOSEST_UNRELIABLE_PEERS_COUNT { + closest_peers.insert(*key); closest_unreliable_count += 1; } } BucketEntryState::Reliable => { // Add to closest reliable nodes list - if closest_reliable_count < KEEP_N_CLOSEST_RELIABLE_ENTRIES_COUNT { - closest_nodes.insert(*key); + if closest_reliable_count < KEEP_N_CLOSEST_RELIABLE_PEERS_COUNT { + closest_peers.insert(*key); closest_reliable_count += 1; } } } - if closest_unreliable_count == KEEP_N_CLOSEST_UNRELIABLE_ENTRIES_COUNT - && closest_reliable_count == KEEP_N_CLOSEST_RELIABLE_ENTRIES_COUNT + if closest_unreliable_count == KEEP_N_CLOSEST_UNRELIABLE_PEERS_COUNT + && closest_reliable_count == KEEP_N_CLOSEST_RELIABLE_PEERS_COUNT { break 'outer; } } } - closest_nodes_by_kind.insert(kind, closest_nodes); + exempt_peers_by_kind.insert(kind, closest_peers); } for bucket_index in kick_queue { - inner.kick_bucket(bucket_index, &closest_nodes_by_kind[&bucket_index.0]); + inner.kick_bucket(bucket_index, &exempt_peers_by_kind[&bucket_index.0]); } Ok(()) } diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index 47369b62..b8e87121 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -1,4 +1,5 @@ pub mod bootstrap; +pub mod closest_peers_refresh; pub mod kick_buckets; pub mod peer_minimum_refresh; pub mod ping_validator; @@ -72,6 +73,23 @@ impl RoutingTable { }); } + // Set closest peers refresh tick task + { + let this = self.clone(); + self.unlocked_inner + .closest_peers_refresh_task + .set_routine(move |s, _l, _t| { + Box::pin( + this.clone() + .closest_peers_refresh_task_routine(s) + .instrument(trace_span!( + parent: None, + "closest peers refresh task routine" + )), + ) + }); + } + // Set ping validator tick task { let this = self.clone(); @@ -181,9 +199,23 @@ impl RoutingTable { // Run the relay management task self.unlocked_inner.relay_management_task.tick().await?; - // Run the private route management task - // If we don't know our network class then don't do this yet - if self.has_valid_network_class(RoutingDomain::PublicInternet) { + // Only perform these operations if we already have a valid network class + // and if we didn't need to bootstrap or perform a peer minimum refresh as these operations + // require having a suitably full routing table and guaranteed ability to contact other nodes + if !needs_bootstrap + && !needs_peer_minimum_refresh + && self.has_valid_network_class(RoutingDomain::PublicInternet) + { + // Run closest peers refresh task + // this will also inform other close nodes of -our- existence so we would + // much rather perform this action -after- we have a valid network class + // so our PeerInfo is valid when informing the other nodes of our existence. + self.unlocked_inner + .closest_peers_refresh_task + .tick() + .await?; + + // Run the private route management task self.unlocked_inner .private_route_management_task .tick() diff --git a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs index 00581179..34ebfc25 100644 --- a/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs +++ b/veilid-core/src/routing_table/tasks/peer_minimum_refresh.rs @@ -80,7 +80,7 @@ impl RoutingTable { ord.push_back( async move { routing_table - .reverse_find_node(crypto_kind, nr, false) + .reverse_find_node(crypto_kind, nr, false, vec![]) .await } .instrument(Span::current()), diff --git a/veilid-core/src/routing_table/types/node_info.rs b/veilid-core/src/routing_table/types/node_info.rs index 5c9849b4..2a05084b 100644 --- a/veilid-core/src/routing_table/types/node_info.rs +++ b/veilid-core/src/routing_table/types/node_info.rs @@ -13,6 +13,8 @@ pub const CAP_APPMESSAGE: Capability = FourCC(*b"APPM"); #[cfg(feature = "unstable-blockstore")] pub const CAP_BLOCKSTORE: Capability = FourCC(*b"BLOC"); +pub const DISTANCE_METRIC_CAPABILITIES: &[Capability] = &[CAP_DHT, CAP_DHT_WATCH]; + #[derive(Clone, Default, PartialEq, Eq, Debug, Serialize, Deserialize)] pub struct NodeInfo { network_class: NetworkClass, @@ -152,7 +154,7 @@ impl NodeInfo { pub fn has_capability(&self, cap: Capability) -> bool { self.capabilities.contains(&cap) } - pub fn has_capabilities(&self, capabilities: &[Capability]) -> bool { + pub fn has_all_capabilities(&self, capabilities: &[Capability]) -> bool { for cap in capabilities { if !self.has_capability(*cap) { return false; @@ -160,6 +162,17 @@ impl NodeInfo { } true } + pub fn has_any_capabilities(&self, capabilities: &[Capability]) -> bool { + if capabilities.is_empty() { + return true; + } + for cap in capabilities { + if self.has_capability(*cap) { + return true; + } + } + false + } /// Can direct connections be made pub fn is_fully_direct_inbound(&self) -> bool { diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index c64c688a..22daf74c 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -66,7 +66,7 @@ pub(crate) fn empty_fanout_node_info_filter() -> FanoutNodeInfoFilter { } pub(crate) fn capability_fanout_node_info_filter(caps: Vec) -> FanoutNodeInfoFilter { - Arc::new(move |_, ni| ni.has_capabilities(&caps)) + Arc::new(move |_, ni| ni.has_all_capabilities(&caps)) } /// Contains the logic for generically searching the Veilid routing table for a set of nodes and applying an diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index 44567650..d942fec7 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -456,7 +456,7 @@ impl RPCProcessor { ) -> bool { let routing_table = self.routing_table(); routing_table.signed_node_info_is_valid_in_routing_domain(routing_domain, signed_node_info) - && signed_node_info.node_info().has_capabilities(capabilities) + && signed_node_info.node_info().has_all_capabilities(capabilities) } ////////////////////////////////////////////////////////////////////// diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 2de238e2..19fe8509 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -234,7 +234,7 @@ impl StorageManager { .into_iter() .filter(|x| { x.node_info(RoutingDomain::PublicInternet) - .map(|ni| ni.has_capabilities(&[CAP_DHT, CAP_DHT_WATCH])) + .map(|ni| ni.has_all_capabilities(&[CAP_DHT, CAP_DHT_WATCH])) .unwrap_or_default() }) .collect()