From 8efa2de82ce6923f23a716b000b9d31c68945340 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Thu, 23 Nov 2023 19:00:33 -0500 Subject: [PATCH] config -> record store --- .../src/storage_manager/record_store.rs | 46 +++++++++++++++++-- .../storage_manager/record_store_limits.rs | 4 ++ .../storage_manager/storage_manager_inner.rs | 40 ++++++++++++++++ .../src/storage_manager/watch_value.rs | 9 +++- 4 files changed, 94 insertions(+), 5 deletions(-) diff --git a/veilid-core/src/storage_manager/record_store.rs b/veilid-core/src/storage_manager/record_store.rs index c7a9da80..bc199b1c 100644 --- a/veilid-core/src/storage_manager/record_store.rs +++ b/veilid-core/src/storage_manager/record_store.rs @@ -13,15 +13,34 @@ struct DeadRecord where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, { - // The key used in the record_index + /// The key used in the record_index key: RecordTableKey, - // The actual record + /// The actual record record: Record, - // True if this record is accounted for in the total storage - // and needs to have the statistics updated or not when purged + /// True if this record is accounted for in the total storage + /// and needs to have the statistics updated or not when purged in_total_storage: bool, } +/// An individual watch +#[derive(Debug, Clone)] +struct WatchedRecordWatch { + subkeys: ValueSubkeyRangeSet, + expiration: Timestamp, + count: u32, + target: Target, + opt_watcher: Option, +} + +#[derive(Debug, Clone)] +/// A record being watched for changes +struct WatchedRecord { + /// Number of watchers that are anonymous + anon_count: usize, + /// The list of active watchers + watchers: Vec, +} + pub struct RecordStore where D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, @@ -46,6 +65,8 @@ where dead_records: Vec>, /// The list of records that have changed since last flush to disk (optimization for batched writes) changed_records: HashSet, + /// The list of records being watched for changes + watched_records: HashMap, /// A mutex to ensure we handle this concurrently purge_dead_records_mutex: Arc>, @@ -93,6 +114,7 @@ where ), dead_records: Vec::new(), changed_records: HashSet::new(), + watched_records: HashMap::new(), purge_dead_records_mutex: Arc::new(AsyncMutex::new(())), } } @@ -675,6 +697,22 @@ where Ok(()) } + /// Add a record watch for changes + pub async fn watch_subkeys( + &mut self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + expiration: Timestamp, + count: u32, + target: Target, + opt_watcher: Option, + ) -> VeilidAPIResult> { + + // If we have a watcher and it is in the record's schema + // then we have a guaranteed watch slot for it + xxx continue here + } + /// LRU out some records until we reclaim the amount of space requested /// This will force a garbage collection of the space immediately /// If zero is passed in here, a garbage collection will be performed of dead records diff --git a/veilid-core/src/storage_manager/record_store_limits.rs b/veilid-core/src/storage_manager/record_store_limits.rs index 5dfb25d4..6e03c713 100644 --- a/veilid-core/src/storage_manager/record_store_limits.rs +++ b/veilid-core/src/storage_manager/record_store_limits.rs @@ -13,4 +13,8 @@ pub struct RecordStoreLimits { pub max_subkey_cache_memory_mb: Option, /// Limit on the amount of storage space to use for subkey data and record data pub max_storage_space_mb: Option, + /// Max number of anonymous watches + pub public_watch_limit: u32, + /// Max number of watches per schema member + pub member_watch_limit: u32, } diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index bf78b593..09a935fb 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -39,6 +39,8 @@ fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { max_records: None, max_subkey_cache_memory_mb: Some(c.network.dht.local_max_subkey_cache_memory_mb as usize), max_storage_space_mb: None, + public_watch_limit: c.network.dht.public_watch_limit, + member_watch_limit: c.network.dht.member_watch_limit, } } @@ -51,6 +53,8 @@ fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits { max_records: Some(c.network.dht.remote_max_records as usize), max_subkey_cache_memory_mb: Some(c.network.dht.remote_max_subkey_cache_memory_mb as usize), max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize), + public_watch_limit: c.network.dht.public_watch_limit, + member_watch_limit: c.network.dht.member_watch_limit, } } @@ -505,6 +509,24 @@ impl StorageManagerInner { Ok(()) } + pub async fn handle_watch_local_value( + &mut self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + expiration: Timestamp, + count: u32, + target: Target, + opt_watcher: Option, + ) -> VeilidAPIResult> { + // See if it's in the local record store + let Some(local_record_store) = self.local_record_store.as_mut() else { + apibail_not_initialized!(); + }; + local_record_store + .watch_subkeys(key, subkeys, expiration, count, target, opt_watcher) + .await + } + pub async fn handle_get_remote_value( &mut self, key: TypedKey, @@ -561,6 +583,24 @@ impl StorageManagerInner { Ok(()) } + pub async fn handle_watch_remote_value( + &mut self, + key: TypedKey, + subkeys: ValueSubkeyRangeSet, + expiration: Timestamp, + count: u32, + target: Target, + opt_watcher: Option, + ) -> VeilidAPIResult> { + // See if it's in the remote record store + let Some(remote_record_store) = self.remote_record_store.as_mut() else { + apibail_not_initialized!(); + }; + remote_record_store + .watch_subkeys(key, subkeys, expiration, count, target, opt_watcher) + .await + } + /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] fn get_key(vcrypto: CryptoSystemVersion, record: &Record) -> TypedKey where diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index db9e718c..d4982b22 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -181,7 +181,14 @@ impl StorageManager { let (_is_local, opt_expiration_ts) = { // See if the subkey we are watching has a local value let opt_expiration_ts = inner - .handle_watch_local_value(key, subkeys, expiration, count, target, opt_watcher) + .handle_watch_local_value( + key, + subkeys.clone(), + expiration, + count, + target.clone(), + opt_watcher, + ) .await?; if opt_expiration_ts.is_some() { (true, opt_expiration_ts)