diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index 36ee21b7..d9c62019 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -291,40 +291,42 @@ impl StorageManager { /// Close an opened local record pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { - let (opened_record, opt_rpc_processor) = { + let (opt_opened_record, opt_rpc_processor) = { let mut inner = self.lock().await?; (inner.close_record(key)?, inner.opt_rpc_processor.clone()) }; // Send a one-time cancel request for the watch if we have one and we're online - if let Some(active_watch) = opened_record.active_watch() { - if let Some(rpc_processor) = opt_rpc_processor { - // Use the safety selection we opened the record with - // Use the writer we opened with as the 'watcher' as well - let opt_owvresult = self - .outbound_watch_value( - rpc_processor, - key, - ValueSubkeyRangeSet::full(), - Timestamp::new(0), - 0, - opened_record.safety_selection(), - opened_record.writer().cloned(), - Some(active_watch.watch_node), - ) - .await?; - if let Some(owvresult) = opt_owvresult { - if owvresult.expiration_ts.as_u64() != 0 { - log_stor!(debug - "close record watch cancel got unexpected expiration: {}", - owvresult.expiration_ts - ); + if let Some(opened_record) = opt_opened_record { + if let Some(active_watch) = opened_record.active_watch() { + if let Some(rpc_processor) = opt_rpc_processor { + // Use the safety selection we opened the record with + // Use the writer we opened with as the 'watcher' as well + let opt_owvresult = self + .outbound_watch_value( + rpc_processor, + key, + ValueSubkeyRangeSet::full(), + Timestamp::new(0), + 0, + opened_record.safety_selection(), + opened_record.writer().cloned(), + Some(active_watch.watch_node), + ) + .await?; + if let Some(owvresult) = opt_owvresult { + if owvresult.expiration_ts.as_u64() != 0 { + log_stor!(debug + "close record watch cancel got unexpected expiration: {}", + owvresult.expiration_ts + ); + } + } else { + log_stor!(debug "close record watch cancel unsuccessful"); } } else { - log_stor!(debug "close record watch cancel unsuccessful"); + log_stor!(debug "skipping last-ditch watch cancel because we are offline"); } - } else { - log_stor!(debug "skipping last-ditch watch cancel because we are offline"); } } @@ -336,6 +338,7 @@ impl StorageManager { // Ensure the record is closed self.close_record(key).await?; + // Get record from the local store let mut inner = self.lock().await?; let Some(local_record_store) = inner.local_record_store.as_mut() else { apibail_not_initialized!(); @@ -526,8 +529,11 @@ impl StorageManager { ) .await?; - // Whatever record we got back, store it locally, might be newer than the one we asked to save + // Keep the list of nodes that returned a value for later reference let mut inner = self.lock().await?; + inner.set_value_nodes(key, result.value_nodes)?; + + // Whatever record we got back, store it locally, might be newer than the one we asked to save inner .handle_set_local_value(key, subkey, result.signed_value_data.clone()) .await?; diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 198a4628..90c3cfd1 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -480,11 +480,15 @@ impl StorageManagerInner { Ok(()) } - pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult { - let Some(opened_record) = self.opened_records.remove(&key) else { - apibail_generic!("record not open"); + pub fn close_record(&mut self, key: TypedKey) -> VeilidAPIResult> { + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); }; - Ok(opened_record) + if local_record_store.peek_record(key, |_| {}).is_none() { + return Err(VeilidAPIError::key_not_found(key)); + } + + Ok(self.opened_records.remove(&key)) } pub(super) async fn handle_get_local_value( diff --git a/veilid-core/src/storage_manager/types/record_data.rs b/veilid-core/src/storage_manager/types/record_data.rs index 86c9f9b7..69b47837 100644 --- a/veilid-core/src/storage_manager/types/record_data.rs +++ b/veilid-core/src/storage_manager/types/record_data.rs @@ -16,7 +16,6 @@ impl RecordData { self.signed_value_data.data_size() } pub fn total_size(&self) -> usize { - (mem::size_of::() - mem::size_of::()) - + self.signed_value_data.total_size() + mem::size_of::() + self.signed_value_data.total_size() } } diff --git a/veilid-core/src/tests/common/test_veilid_config.rs b/veilid-core/src/tests/common/test_veilid_config.rs index 8f16f206..43515cbd 100644 --- a/veilid-core/src/tests/common/test_veilid_config.rs +++ b/veilid-core/src/tests/common/test_veilid_config.rs @@ -229,6 +229,9 @@ pub fn config_callback(key: String) -> ConfigCallbackReturn { "network.dht.remote_max_records" => Ok(Box::new(4096u32)), "network.dht.remote_max_subkey_cache_memory_mb" => Ok(Box::new(64u32)), "network.dht.remote_max_storage_space_mb" => Ok(Box::new(64u32)), + "network.dht.public_watch_limit" => Ok(Box::new(32u32)), + "network.dht.member_watch_limit" => Ok(Box::new(8u32)), + "network.dht.max_watch_expiration_ms" => Ok(Box::new(600_000u32)), "network.upnp" => Ok(Box::new(false)), "network.detect_address_changes" => Ok(Box::new(true)), "network.restricted_nat_retries" => Ok(Box::new(0u32)),