From bac12131c6f8a6012d52f1a562e0924033bf700d Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 27 Apr 2024 22:05:19 -0400 Subject: [PATCH] fix bug where messages sent to a private route without a safety route would not receive replies fix verbose-tracing feature flag improve route allocation to avoid co-located nodes --- .../src/routing_table/route_spec_store/mod.rs | 42 ++++++++- veilid-core/src/rpc_processor/mod.rs | 12 ++- .../src/rpc_processor/rpc_inspect_value.rs | 86 ++++++++----------- .../src/rpc_processor/rpc_value_changed.rs | 7 +- 4 files changed, 90 insertions(+), 57 deletions(-) diff --git a/veilid-core/src/routing_table/route_spec_store/mod.rs b/veilid-core/src/routing_table/route_spec_store/mod.rs index d051122e..248db49f 100644 --- a/veilid-core/src/routing_table/route_spec_store/mod.rs +++ b/veilid-core/src/routing_table/route_spec_store/mod.rs @@ -215,6 +215,13 @@ impl RouteSpecStore { ) -> VeilidAPIResult { use core::cmp::Ordering; + let ip6_prefix_size = rti + .unlocked_inner + .config + .get() + .network + .max_connections_per_ip6_prefix_size as usize; + if hop_count < 1 { apibail_invalid_argument!( "Not allocating route less than one hop in length", @@ -286,6 +293,39 @@ impl RouteSpecStore { return false; }; + // Exclude nodes on our same ipblock, or their relay is on our same ipblock + // or our relay is on their ipblock, or their relay is on our relays same ipblock + + // our node vs their node + if our_peer_info + .signed_node_info() + .node_info() + .node_is_on_same_ipblock(sni.node_info(), ip6_prefix_size) + { + return false; + } + if let Some(rni) = sni.relay_info() { + // our node vs their relay + if our_peer_info + .signed_node_info() + .node_info() + .node_is_on_same_ipblock(rni, ip6_prefix_size) + { + return false; + } + if let Some(our_rni) = our_peer_info.signed_node_info().relay_info() { + // our relay vs their relay + if our_rni.node_is_on_same_ipblock(rni, ip6_prefix_size) { + return false; + } + } + } else if let Some(our_rni) = our_peer_info.signed_node_info().relay_info() { + // our relay vs their node + if our_rni.node_is_on_same_ipblock(sni.node_info(), ip6_prefix_size) { + return false; + } + } + // Relay check let relay_ids = sni.relay_ids(); if !relay_ids.is_empty() { @@ -1527,7 +1567,7 @@ impl RouteSpecStore { /// Returns a route set id #[cfg_attr( feature = "verbose-tracing", - instrument(level = "trace", skip(self, blob), ret, err) + instrument(level = "trace", skip(self), ret, err) )] pub fn add_remote_private_route( &self, diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index bbfa4e1d..8049781f 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -619,9 +619,15 @@ impl RPCProcessor { // Ensure the reply comes over the private route that was requested if let Some(reply_private_route) = waitable_reply.reply_private_route { match &rpcreader.header.detail { - RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => { - return Err(RPCError::protocol("should have received reply over private route")); - } + RPCMessageHeaderDetail::Direct(_) => { + return Err(RPCError::protocol("should have received reply over private route or stub")); + }, + RPCMessageHeaderDetail::SafetyRouted(sr) => { + let node_id = self.routing_table.node_id(sr.direct.envelope.get_crypto_kind()); + if node_id.value != reply_private_route { + return Err(RPCError::protocol("should have received reply from safety route to a stub")); + } + }, RPCMessageHeaderDetail::PrivateRouted(pr) => { if pr.private_route != reply_private_route { return Err(RPCError::protocol("received reply over the wrong private route")); diff --git a/veilid-core/src/rpc_processor/rpc_inspect_value.rs b/veilid-core/src/rpc_processor/rpc_inspect_value.rs index 64188109..f1c23a6d 100644 --- a/veilid-core/src/rpc_processor/rpc_inspect_value.rs +++ b/veilid-core/src/rpc_processor/rpc_inspect_value.rs @@ -22,9 +22,7 @@ impl RPCProcessor { #[cfg_attr( feature = "verbose-tracing", instrument(level = "trace", skip(self, last_descriptor), - fields(ret.value.data.len, - ret.seqs, - ret.peers.len, + fields(ret.peers.len, ret.latency ),err) )] @@ -64,7 +62,8 @@ impl RPCProcessor { ); // Send the inspectvalue question - let inspect_value_q = RPCOperationInspectValueQ::new(key, subkeys.clone(), last_descriptor.is_none())?; + let inspect_value_q = + RPCOperationInspectValueQ::new(key, subkeys.clone(), last_descriptor.is_none())?; let question = RPCQuestion::new( network_result_try!(self.get_destination_respond_to(&dest)?), RPCQuestionDetail::InspectValueQ(Box::new(inspect_value_q)), @@ -107,19 +106,18 @@ impl RPCProcessor { let debug_string_answer = format!( "OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}", key, - if descriptor.is_some() { - " +desc" - } else { - "" - }, + if descriptor.is_some() { " +desc" } else { "" }, peers.len(), dest, debug_seqs(&seqs) ); log_dht!(debug "{}", debug_string_answer); - - let peer_ids:Vec = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect(); + + let peer_ids: Vec = peers + .iter() + .filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())) + .collect(); log_dht!(debug "Peers: {:#?}", peer_ids); } @@ -140,8 +138,6 @@ impl RPCProcessor { #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.latency", latency.as_u64()); #[cfg(feature = "verbose-tracing")] - tracing::Span::current().record("ret.seqs", seqs); - #[cfg(feature = "verbose-tracing")] tracing::Span::current().record("ret.peers.len", peers.len()); Ok(NetworkResult::value(Answer::new( @@ -158,11 +154,7 @@ impl RPCProcessor { //////////////////////////////////////////////////////////////////////////////////////////////// #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), ret, err))] - pub(crate) async fn process_inspect_value_q( - &self, - msg: RPCMessage, - ) -> RPCNetworkResult<()> { - + pub(crate) async fn process_inspect_value_q(&self, msg: RPCMessage) -> RPCNetworkResult<()> { // Ensure this never came over a private route, safety route is okay though match &msg.header.detail { RPCMessageHeaderDetail::Direct(_) | RPCMessageHeaderDetail::SafetyRouted(_) => {} @@ -175,14 +167,8 @@ impl RPCProcessor { // Ignore if disabled let routing_table = self.routing_table(); let opi = routing_table.get_own_peer_info(msg.header.routing_domain()); - if !opi - .signed_node_info() - .node_info() - .has_capability(CAP_DHT) - { - return Ok(NetworkResult::service_unavailable( - "dht is not available", - )); + if !opi.signed_node_info().node_info().has_capability(CAP_DHT) { + return Ok(NetworkResult::service_unavailable("dht is not available")); } // Get the question @@ -200,18 +186,16 @@ impl RPCProcessor { // Get the nodes that we know about that are closer to the the key than our own node let routing_table = self.routing_table(); - let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT])); + let closer_to_key_peers = network_result_try!( + routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT]) + ); if debug_target_enabled!("dht") { let debug_string = format!( "IN <=== InspectValueQ({} {}{}) <== {}", key, subkeys, - if want_descriptor { - " +wantdesc" - } else { - "" - }, + if want_descriptor { " +wantdesc" } else { "" }, msg.header.direct_sender_node_id() ); @@ -223,20 +207,21 @@ impl RPCProcessor { let c = self.config.get(); c.network.dht.set_value_count as usize }; - let (inspect_result_seqs, inspect_result_descriptor) = if closer_to_key_peers.len() >= set_value_count { - // Not close enough - (Vec::new(), None) - } else { - // Close enough, lets get it + let (inspect_result_seqs, inspect_result_descriptor) = + if closer_to_key_peers.len() >= set_value_count { + // Not close enough + (Vec::new(), None) + } else { + // Close enough, lets get it - // See if we have this record ourselves - let storage_manager = self.storage_manager(); - let inspect_result = network_result_try!(storage_manager - .inbound_inspect_value(key, subkeys, want_descriptor) - .await - .map_err(RPCError::internal)?); - (inspect_result.seqs, inspect_result.opt_descriptor) - }; + // See if we have this record ourselves + let storage_manager = self.storage_manager(); + let inspect_result = network_result_try!(storage_manager + .inbound_inspect_value(key, subkeys, want_descriptor) + .await + .map_err(RPCError::internal)?); + (inspect_result.seqs, inspect_result.opt_descriptor) + }; if debug_target_enabled!("dht") { let debug_string_answer = format!( @@ -251,10 +236,10 @@ impl RPCProcessor { closer_to_key_peers.len(), msg.header.direct_sender_node_id() ); - + log_dht!(debug "{}", debug_string_answer); } - + // Make InspectValue answer let inspect_value_a = RPCOperationInspectValueA::new( inspect_result_seqs, @@ -263,7 +248,10 @@ impl RPCProcessor { )?; // Send InspectValue answer - self.answer(msg, RPCAnswer::new(RPCAnswerDetail::InspectValueA(Box::new(inspect_value_a)))) - .await + self.answer( + msg, + RPCAnswer::new(RPCAnswerDetail::InspectValueA(Box::new(inspect_value_a))), + ) + .await } } diff --git a/veilid-core/src/rpc_processor/rpc_value_changed.rs b/veilid-core/src/rpc_processor/rpc_value_changed.rs index 0fe59913..9a7b5b49 100644 --- a/veilid-core/src/rpc_processor/rpc_value_changed.rs +++ b/veilid-core/src/rpc_processor/rpc_value_changed.rs @@ -1,12 +1,11 @@ use super::*; impl RPCProcessor { - #[cfg_attr(feature="verbose-tracing", instrument(level = "trace", skip(self, msg), fields(msg.operation.op_id), err))] - // Sends a high level app message - // Can be sent via all methods including relays and routes + // Sends a dht value change notification + // Can be sent via all methods including relays and routes but never over a safety route #[cfg_attr( feature = "verbose-tracing", - instrument(level = "trace", skip(self, message), fields(message.len = message.len()), err) + instrument(level = "trace", skip(self, value), err) )] pub async fn rpc_call_value_changed( self,