more crypto support

This commit is contained in:
John Smith 2023-02-11 23:16:32 -05:00
parent 1ba0cdb9cf
commit 5fd0684ae7
5 changed files with 66 additions and 76 deletions

View File

@ -792,25 +792,15 @@ impl NetworkManager {
// Get node's min/max envelope version and see if we can send to it
// and if so, get the max version we can use
node_ref.envelope_support()
let envelope_version = if let Some(min_max_version) = {
#[allow(clippy::absurd_extreme_comparisons)]
if min_max_version.min > MAX_ENVELOPE_VERSION || min_max_version.max < MIN_ENVELOPE_VERSION
{
bail!(
"can't talk to this node {} because version is unsupported: ({},{})",
via_node_id,
min_max_version.min,
min_max_version.max
);
}
cmp::min(min_max_version.max, MAX_CRYPTO_VERSION)
} else {
MAX_CRYPTO_VERSION
let Some(envelope_version) = node_ref.envelope_support().iter().rev().find(|x| VALID_ENVELOPE_VERSIONS.contains(x)) else {
bail!(
"can't talk to this node {} because we dont support its envelope versions",
node_ref
);
};
// Build the envelope to send
let out = self.build_envelope(envelope_node_id, version, body)?;
let out = self.build_envelope(envelope_node_id, envelope_version, body)?;
// Send the envelope via whatever means necessary
self.send_data(node_ref.clone(), out).await
@ -1449,7 +1439,7 @@ impl NetworkManager {
// Cache the envelope information in the routing table
let source_noderef = match routing_table.register_node_with_existing_connection(
envelope.get_sender_id(),
TypedKey::new(envelope.get_crypto_kind(), envelope.get_sender_id()),
connection_descriptor,
ts,
) {

View File

@ -101,8 +101,7 @@ impl Bucket {
log_rtab!("Node added: {}", node_id);
// Add new entry
let entry = Arc::new(BucketEntry::new());
entry.with_mut_inner(|e| e.add_node_id(node_id));
let entry = Arc::new(BucketEntry::new(node_id));
self.entries.insert(node_id.key, entry.clone());
// This is now the newest bucket entry

View File

@ -129,6 +129,9 @@ impl BucketEntryInner {
pub fn add_node_id(&mut self, node_id: TypedKey) {
self.node_ids.add(node_id);
}
pub fn best_node_id(&self) -> TypedKey {
self.node_ids.best().unwrap()
}
// Less is faster
pub fn cmp_fastest(e1: &Self, e2: &Self) -> std::cmp::Ordering {
@ -237,22 +240,9 @@ impl BucketEntryInner {
}
}
// Update the protocol min/max version we have to use, to include relay requirements if needed
let mut version_range = VersionRange {
min: signed_node_info.node_info().min_version,
max: signed_node_info.node_info().max_version,
};
if let Some(relay_info) = signed_node_info.relay_info() {
version_range.min.max_assign(relay_info.min_version);
version_range.max.min_assign(relay_info.max_version);
}
if version_range.min <= version_range.max {
// Can be reached with at least one crypto version
self.min_max_version = Some(version_range);
} else {
// No valid crypto version in range
self.min_max_version = None;
}
// Update the envelope version support we have to use
let mut envelope_support = signed_node_info.node_info().envelope_support.clone();
self.set_envelope_support(envelope_support);
// Update the signed node info
*opt_current_sni = Some(Box::new(signed_node_info));
@ -756,39 +746,41 @@ pub struct BucketEntry {
}
impl BucketEntry {
pub(super) fn new() -> Self {
pub(super) fn new(first_node_id: TypedKey) -> Self {
let now = get_aligned_timestamp();
Self {
ref_count: AtomicU32::new(0),
inner: RwLock::new(BucketEntryInner {
node_ids: TypedKeySet::new(),
envelope_support: Vec::new(),
updated_since_last_network_change: false,
last_connections: BTreeMap::new(),
local_network: BucketEntryLocalNetwork {
last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None,
node_status: None,
},
public_internet: BucketEntryPublicInternet {
last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None,
node_status: None,
},
peer_stats: PeerStats {
time_added: now,
rpc_stats: RPCStats::default(),
latency: None,
transfer: TransferStatsDownUp::default(),
},
latency_stats_accounting: LatencyStatsAccounting::new(),
transfer_stats_accounting: TransferStatsAccounting::new(),
#[cfg(feature = "tracking")]
next_track_id: 0,
#[cfg(feature = "tracking")]
node_ref_tracks: HashMap::new(),
}),
}
let mut node_ids = TypedKeySet::new();
node_ids.add(first_node_id);
let inner = BucketEntryInner {
node_ids,
envelope_support: Vec::new(),
updated_since_last_network_change: false,
last_connections: BTreeMap::new(),
local_network: BucketEntryLocalNetwork {
last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None,
node_status: None,
},
public_internet: BucketEntryPublicInternet {
last_seen_our_node_info_ts: Timestamp::new(0u64),
signed_node_info: None,
node_status: None,
},
peer_stats: PeerStats {
time_added: now,
rpc_stats: RPCStats::default(),
latency: None,
transfer: TransferStatsDownUp::default(),
},
latency_stats_accounting: LatencyStatsAccounting::new(),
transfer_stats_accounting: TransferStatsAccounting::new(),
#[cfg(feature = "tracking")]
next_track_id: 0,
#[cfg(feature = "tracking")]
node_ref_tracks: HashMap::new(),
};
Self::new_with_inner(inner)
}
pub(super) fn new_with_inner(inner: BucketEntryInner) -> Self {

View File

@ -571,9 +571,14 @@ impl RoutingTable {
inner.get_all_nodes(self.clone(), cur_ts)
}
fn queue_bucket_kick(&self, node_id: TypedKey) {
let idx = self.unlocked_inner.find_bucket_index(node_id).unwrap();
self.unlocked_inner.kick_queue.lock().insert(idx);
fn queue_bucket_kicks(&self, node_ids: TypedKeySet) {
for node_id in node_ids.iter() {
let Some(x) = self.unlocked_inner.find_bucket_index(*node_id) else {
log_rtab!(error "find bucket index failed for nodeid {}", node_id);
continue;
};
self.unlocked_inner.kick_queue.lock().insert(x);
}
}
/// Resolve an existing routing table entry and return a reference to it

View File

@ -101,6 +101,9 @@ pub trait NodeRefBase: Sized {
fn node_ids(&self) -> TypedKeySet {
self.operate(|_rti, e| e.node_ids())
}
fn best_node_id(&self) -> TypedKey {
self.operate(|_rti, e| e.best_node_id())
}
fn has_updated_since_last_network_change(&self) -> bool {
self.operate(|_rti, e| e.has_updated_since_last_network_change())
}
@ -281,7 +284,7 @@ pub trait NodeRefBase: Sized {
fn set_last_connection(&self, connection_descriptor: ConnectionDescriptor, ts: Timestamp) {
self.operate_mut(|rti, e| {
e.set_last_connection(connection_descriptor, ts);
rti.touch_recent_peer(self.common().node_id, connection_descriptor);
rti.touch_recent_peer(e.best_node_id(), connection_descriptor);
})
}
@ -426,14 +429,14 @@ impl Clone for NodeRef {
impl fmt::Display for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.common.node_id.encode())
write!(f, "{}", self.common.entry.with_inner(|e| e.best_node_id()))
}
}
impl fmt::Debug for NodeRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeRef")
.field("node_id", &self.common.node_id)
.field("node_ids", &self.common.entry.with_inner(|e| e.node_ids()))
.field("filter", &self.common.filter)
.field("sequencing", &self.common.sequencing)
.finish()
@ -453,9 +456,10 @@ impl Drop for NodeRef {
.fetch_sub(1u32, Ordering::Relaxed)
- 1;
if new_ref_count == 0 {
self.common
.routing_table
.queue_bucket_kick(self.common.node_id);
// get node ids with inner unlocked because nothing could be referencing this entry now
// and we don't know when it will get dropped, possibly inside a lock
let node_ids = self.common().entry.with_inner(|e| e.node_ids());
self.common.routing_table.queue_bucket_kicks(node_ids);
}
}
}