diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index a5be155c..40656391 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -169,7 +169,7 @@ impl StorageManager { Ok(inner) } - fn online_writes_ready_inner(inner: &StorageManagerInner) -> Option { + fn online_ready_inner(inner: &StorageManagerInner) -> Option { if let Some(rpc_processor) = { inner.opt_rpc_processor.clone() } { if let Some(network_class) = rpc_processor .routing_table() @@ -193,7 +193,7 @@ impl StorageManager { async fn online_writes_ready(&self) -> EyreResult> { let inner = self.lock().await?; - Ok(Self::online_writes_ready_inner(&inner)) + Ok(Self::online_ready_inner(&inner)) } async fn has_offline_subkey_writes(&self) -> EyreResult { @@ -243,7 +243,7 @@ impl StorageManager { // No record yet, try to get it from the network // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { + let Some(rpc_processor) = Self::online_ready_inner(&inner) else { apibail_try_again!("offline, try again later"); }; @@ -293,7 +293,7 @@ impl StorageManager { pub async fn close_record(&self, key: TypedKey) -> VeilidAPIResult<()> { let (opt_opened_record, opt_rpc_processor) = { let mut inner = self.lock().await?; - (inner.close_record(key)?, inner.opt_rpc_processor.clone()) + (inner.close_record(key)?, Self::online_ready_inner(&inner)) }; // Send a one-time cancel request for the watch if we have one and we're online @@ -376,7 +376,7 @@ impl StorageManager { // Refresh if we can // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { + let Some(rpc_processor) = Self::online_ready_inner(&inner) else { // Return the existing value if we have one if we aren't online if let Some(last_subkey_result_value) = last_subkey_result.value { return Ok(Some(last_subkey_result_value.value_data().clone())); @@ -499,20 +499,19 @@ impl StorageManager { writer.secret, )?); + // Write the value locally first + log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); + inner + .handle_set_local_value( + key, + subkey, + signed_value_data.clone(), + WatchUpdateMode::NoUpdate, + ) + .await?; + // Get rpc processor and drop mutex so we don't block while getting the value from the network - let Some(rpc_processor) = Self::online_writes_ready_inner(&inner) else { - log_stor!(debug "Writing subkey locally: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); - - // Offline, just write it locally and return immediately - inner - .handle_set_local_value( - key, - subkey, - signed_value_data.clone(), - WatchUpdateMode::UpdateAll, - ) - .await?; - + let Some(rpc_processor) = Self::online_ready_inner(&inner) else { log_stor!(debug "Writing subkey offline: {}:{} len={}", key, subkey, signed_value_data.value_data().data().len() ); // Add to offline writes to flush inner @@ -547,18 +546,18 @@ impl StorageManager { let mut inner = self.lock().await?; inner.set_value_nodes(key, result.value_nodes)?; - // Whatever record we got back, store it locally, might be newer than the one we asked to save - inner - .handle_set_local_value( - key, - subkey, - result.signed_value_data.clone(), - WatchUpdateMode::UpdateAll, - ) - .await?; - // Return the new value if it differs from what was asked to set if result.signed_value_data.value_data() != signed_value_data.value_data() { + // Record the newer value and send and update since it is different than what we just set + inner + .handle_set_local_value( + key, + subkey, + result.signed_value_data.clone(), + WatchUpdateMode::UpdateAll, + ) + .await?; + return Ok(Some(result.signed_value_data.value_data().clone())); } @@ -596,7 +595,7 @@ impl StorageManager { }; // Get rpc processor and drop mutex so we don't block while requesting the watch from the network - let Some(rpc_processor) = inner.opt_rpc_processor.clone() else { + let Some(rpc_processor) = Self::online_ready_inner(&inner) else { apibail_try_again!("offline, try again later"); }; @@ -729,7 +728,7 @@ impl StorageManager { async fn send_value_change(&self, vc: ValueChangedInfo) -> VeilidAPIResult<()> { let rpc_processor = { let inner = self.inner.lock().await; - if let Some(rpc_processor) = &inner.opt_rpc_processor { + if let Some(rpc_processor) = Self::online_ready_inner(&inner) { rpc_processor.clone() } else { apibail_try_again!("network is not available"); @@ -745,10 +744,9 @@ impl StorageManager { .map_err(VeilidAPIError::from)?; network_result_value_or_log!(rpc_processor - .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, (*vc.value).clone()) - .await - .map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] { - }); + .rpc_call_value_changed(dest, vc.key, vc.subkeys.clone(), vc.count, (*vc.value).clone()) + .await + .map_err(VeilidAPIError::from)? => [format!(": dest={:?} vc={:?}", dest, vc)] {}); Ok(()) }