diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index 7d16b086..94dfda66 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -288,7 +288,11 @@ where } // Wait for them to complete timeout(timeout_ms, async { - while let Some(_) = unord.next().await {} + while let Some(_) = unord.next().await { + if self.clone().evaluate_done() { + break; + } + } }) .await .into_timeout_or() diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 47998eff..2bb7d1c7 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -13,7 +13,6 @@ struct OutboundGetValueContext { } impl StorageManager { - /// Perform a 'get value' query on the network pub async fn outbound_get_value( &self, @@ -74,15 +73,14 @@ impl StorageManager { if let Some(descriptor) = gva.answer.descriptor { let mut ctx = context.lock(); if ctx.descriptor.is_none() && ctx.schema.is_none() { - ctx.schema = - Some(descriptor.schema().map_err(RPCError::invalid_format)?); + ctx.schema = Some(descriptor.schema().map_err(RPCError::invalid_format)?); ctx.descriptor = Some(descriptor); } } // Keep the value if we got one and it is newer and it passes schema validation if let Some(value) = gva.answer.value { - log_stor!(debug "Got value back: len={}", value.value_data().data().len()); + log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); let mut ctx = context.lock(); // Ensure we have a schema and descriptor @@ -126,8 +124,7 @@ impl StorageManager { } else { // If the sequence number is older, ignore it } - } - else { + } else { // If we have no prior value, keep it ctx.value = Some(value); // One node has shown us this value so far @@ -136,7 +133,7 @@ impl StorageManager { } // Return peers if we have some - #[cfg(feature="network-result-extra")] + #[cfg(feature = "network-result-extra")] log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); Ok(Some(gva.answer.peers)) @@ -147,7 +144,8 @@ impl StorageManager { let check_done = |_closest_nodes: &[NodeRef]| { // If we have reached sufficient consensus, return done let ctx = context.lock(); - if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count { + if ctx.value.is_some() && ctx.descriptor.is_some() && ctx.value_count >= consensus_count + { return Some(()); } None @@ -167,14 +165,31 @@ impl StorageManager { match fanout_call.run().await { // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout | + TimeoutOr::Timeout => { + log_stor!(debug "GetValue Fanout Timeout"); + // Return the best answer we've got + let ctx = context.lock(); + Ok(SubkeyResult { + value: ctx.value.clone(), + descriptor: ctx.descriptor.clone(), + }) + } // If we finished with consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) | + TimeoutOr::Value(Ok(Some(()))) => { + log_stor!(debug "GetValue Fanout Consensus"); + // Return the best answer we've got + let ctx = context.lock(); + Ok(SubkeyResult { + value: ctx.value.clone(), + descriptor: ctx.descriptor.clone(), + }) + } // If we finished without consensus (ran out of nodes before getting consensus) TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); - Ok(SubkeyResult{ + log_stor!(debug "GetValue Fanout No Consensus: {}", ctx.value_count); + Ok(SubkeyResult { value: ctx.value.clone(), descriptor: ctx.descriptor.clone(), }) @@ -182,22 +197,31 @@ impl StorageManager { // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that + log_stor!(debug "GetValue Fanout Error: {}", e); Err(e.into()) } } } /// Handle a recieved 'Get Value' query - pub async fn inbound_get_value(&self, key: TypedKey, subkey: ValueSubkey, want_descriptor: bool) -> VeilidAPIResult> { + pub async fn inbound_get_value( + &self, + key: TypedKey, + subkey: ValueSubkey, + want_descriptor: bool, + ) -> VeilidAPIResult> { let mut inner = self.lock().await?; - let res = match inner.handle_get_remote_value(key, subkey, want_descriptor).await { + let res = match inner + .handle_get_remote_value(key, subkey, want_descriptor) + .await + { Ok(res) => res, Err(VeilidAPIError::Internal { message }) => { apibail_internal!(message); - }, + } Err(e) => { return Ok(NetworkResult::invalid_message(e)); - }, + } }; Ok(NetworkResult::value(res)) } diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 25a08170..09f655aa 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -11,7 +11,6 @@ struct OutboundSetValueContext { } impl StorageManager { - /// Perform a 'set value' query on the network pub async fn outbound_set_value( &self, @@ -49,7 +48,6 @@ impl StorageManager { let context = context.clone(); let descriptor = descriptor.clone(); async move { - let send_descriptor = true; // xxx check if next_node needs the descriptor or not // get most recent value to send @@ -81,6 +79,7 @@ impl StorageManager { // Keep the value if we got one and it is newer and it passes schema validation if let Some(value) = sva.answer.value { + log_stor!(debug "Got value back: len={} seq={}", value.value_data().data().len(), value.value_data().seq()); // Validate with schema if !ctx.schema.check_subkey_value_data( @@ -101,14 +100,12 @@ impl StorageManager { // One node has shown us this value so far ctx.value_count = 1; } else { - // If the sequence number is older, or an equal sequence number, + // If the sequence number is older, or an equal sequence number, // node should have not returned a value here. // Skip this node and it's closer list because it is misbehaving return Ok(None); } - } - else - { + } else { // It was set on this node and no newer value was found and returned, // so increase our consensus count ctx.value_count += 1; @@ -116,7 +113,7 @@ impl StorageManager { } // Return peers if we have some - #[cfg(feature="network-result-extra")] + #[cfg(feature = "network-result-extra")] log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len()); Ok(Some(sva.answer.peers)) @@ -147,18 +144,30 @@ impl StorageManager { match fanout_call.run().await { // If we don't finish in the timeout (too much time passed checking for consensus) - TimeoutOr::Timeout | + TimeoutOr::Timeout => { + log_stor!(debug "SetValue Fanout Timeout"); + // Return the best answer we've got + let ctx = context.lock(); + Ok(ctx.value.clone()) + } // If we finished with consensus (enough nodes returning the same value) - TimeoutOr::Value(Ok(Some(()))) | + TimeoutOr::Value(Ok(Some(()))) => { + log_stor!(debug "SetValue Fanout Consensus"); + // Return the best answer we've got + let ctx = context.lock(); + Ok(ctx.value.clone()) + } // If we finished without consensus (ran out of nodes before getting consensus) TimeoutOr::Value(Ok(None)) => { // Return the best answer we've got let ctx = context.lock(); + log_stor!(debug "SetValue Fanout No Consensus: {}", ctx.value_count); Ok(ctx.value.clone()) } // Failed TimeoutOr::Value(Err(e)) => { // If we finished with an error, return that + log_stor!(debug "SetValue Fanout Error: {}", e); Err(e.into()) } } @@ -167,7 +176,13 @@ impl StorageManager { /// Handle a recieved 'Set Value' query /// Returns a None if the value passed in was set /// Returns a Some(current value) if the value was older and the current value was kept - pub async fn inbound_set_value(&self, key: TypedKey, subkey: ValueSubkey, value: SignedValueData, descriptor: Option) -> VeilidAPIResult>> { + pub async fn inbound_set_value( + &self, + key: TypedKey, + subkey: ValueSubkey, + value: SignedValueData, + descriptor: Option, + ) -> VeilidAPIResult>> { let mut inner = self.lock().await?; // See if this is a remote or local value @@ -198,19 +213,23 @@ impl StorageManager { if let Some(descriptor) = descriptor { // Descriptor must match last one if it is provided if descriptor.cmp_no_sig(&last_descriptor) != cmp::Ordering::Equal { - return Ok(NetworkResult::invalid_message("setvalue descriptor does not match last descriptor")); + return Ok(NetworkResult::invalid_message( + "setvalue descriptor does not match last descriptor", + )); } } else { // Descriptor was not provided always go with last descriptor } last_descriptor - } + } None => { if let Some(descriptor) = descriptor { descriptor } else { // No descriptor - return Ok(NetworkResult::invalid_message("descriptor must be provided")); + return Ok(NetworkResult::invalid_message( + "descriptor must be provided", + )); } } }; @@ -228,16 +247,18 @@ impl StorageManager { let res = if is_local { inner.handle_set_local_value(key, subkey, value).await } else { - inner.handle_set_remote_value(key, subkey, value, actual_descriptor).await + inner + .handle_set_remote_value(key, subkey, value, actual_descriptor) + .await }; - match res { - Ok(()) => {}, + match res { + Ok(()) => {} Err(VeilidAPIError::Internal { message }) => { apibail_internal!(message); - }, + } Err(e) => { return Ok(NetworkResult::invalid_message(e)); - }, + } } Ok(NetworkResult::value(None)) } diff --git a/veilid-flutter/lib/default_config.dart b/veilid-flutter/lib/default_config.dart index 9b877dbc..5ee45061 100644 --- a/veilid-flutter/lib/default_config.dart +++ b/veilid-flutter/lib/default_config.dart @@ -117,15 +117,15 @@ Future getDefaultVeilidConfig(String programName) async => ), dht: VeilidConfigDHT( resolveNodeTimeoutMs: 10000, - resolveNodeCount: 20, - resolveNodeFanout: 3, + resolveNodeCount: 1, + resolveNodeFanout: 4, maxFindNodeCount: 20, getValueTimeoutMs: 10000, - getValueCount: 20, - getValueFanout: 3, + getValueCount: 3, + getValueFanout: 4, setValueTimeoutMs: 10000, - setValueCount: 20, - setValueFanout: 5, + setValueCount: 4, + setValueFanout: 6, minPeerCount: 20, minPeerRefreshTimeMs: 60000, validateDialInfoReceiptTimeMs: 2000,