config -> record store

This commit is contained in:
Christien Rioux 2023-11-23 19:00:33 -05:00
parent 077a1808a5
commit 8efa2de82c
4 changed files with 94 additions and 5 deletions

View File

@ -13,15 +13,34 @@ struct DeadRecord<D>
where where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
{ {
// The key used in the record_index /// The key used in the record_index
key: RecordTableKey, key: RecordTableKey,
// The actual record /// The actual record
record: Record<D>, record: Record<D>,
// True if this record is accounted for in the total storage /// True if this record is accounted for in the total storage
// and needs to have the statistics updated or not when purged /// and needs to have the statistics updated or not when purged
in_total_storage: bool, in_total_storage: bool,
} }
/// An individual watch
#[derive(Debug, Clone)]
struct WatchedRecordWatch {
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
target: Target,
opt_watcher: Option<CryptoKey>,
}
#[derive(Debug, Clone)]
/// A record being watched for changes
struct WatchedRecord {
/// Number of watchers that are anonymous
anon_count: usize,
/// The list of active watchers
watchers: Vec<WatchedRecordWatch>,
}
pub struct RecordStore<D> pub struct RecordStore<D>
where where
D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>, D: fmt::Debug + Clone + Serialize + for<'d> Deserialize<'d>,
@ -46,6 +65,8 @@ where
dead_records: Vec<DeadRecord<D>>, dead_records: Vec<DeadRecord<D>>,
/// The list of records that have changed since last flush to disk (optimization for batched writes) /// The list of records that have changed since last flush to disk (optimization for batched writes)
changed_records: HashSet<RecordTableKey>, changed_records: HashSet<RecordTableKey>,
/// The list of records being watched for changes
watched_records: HashMap<RecordTableKey, WatchedRecord>,
/// A mutex to ensure we handle this concurrently /// A mutex to ensure we handle this concurrently
purge_dead_records_mutex: Arc<AsyncMutex<()>>, purge_dead_records_mutex: Arc<AsyncMutex<()>>,
@ -93,6 +114,7 @@ where
), ),
dead_records: Vec::new(), dead_records: Vec::new(),
changed_records: HashSet::new(), changed_records: HashSet::new(),
watched_records: HashMap::new(),
purge_dead_records_mutex: Arc::new(AsyncMutex::new(())), purge_dead_records_mutex: Arc::new(AsyncMutex::new(())),
} }
} }
@ -675,6 +697,22 @@ where
Ok(()) Ok(())
} }
/// Add a record watch for changes
pub async fn watch_subkeys(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
target: Target,
opt_watcher: Option<CryptoKey>,
) -> VeilidAPIResult<Option<Timestamp>> {
// If we have a watcher and it is in the record's schema
// then we have a guaranteed watch slot for it
xxx continue here
}
/// LRU out some records until we reclaim the amount of space requested /// LRU out some records until we reclaim the amount of space requested
/// This will force a garbage collection of the space immediately /// This will force a garbage collection of the space immediately
/// If zero is passed in here, a garbage collection will be performed of dead records /// If zero is passed in here, a garbage collection will be performed of dead records

View File

@ -13,4 +13,8 @@ pub struct RecordStoreLimits {
pub max_subkey_cache_memory_mb: Option<usize>, pub max_subkey_cache_memory_mb: Option<usize>,
/// Limit on the amount of storage space to use for subkey data and record data /// Limit on the amount of storage space to use for subkey data and record data
pub max_storage_space_mb: Option<usize>, pub max_storage_space_mb: Option<usize>,
/// Max number of anonymous watches
pub public_watch_limit: u32,
/// Max number of watches per schema member
pub member_watch_limit: u32,
} }

View File

@ -39,6 +39,8 @@ fn local_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
max_records: None, max_records: None,
max_subkey_cache_memory_mb: Some(c.network.dht.local_max_subkey_cache_memory_mb as usize), max_subkey_cache_memory_mb: Some(c.network.dht.local_max_subkey_cache_memory_mb as usize),
max_storage_space_mb: None, max_storage_space_mb: None,
public_watch_limit: c.network.dht.public_watch_limit,
member_watch_limit: c.network.dht.member_watch_limit,
} }
} }
@ -51,6 +53,8 @@ fn remote_limits_from_config(config: VeilidConfig) -> RecordStoreLimits {
max_records: Some(c.network.dht.remote_max_records as usize), max_records: Some(c.network.dht.remote_max_records as usize),
max_subkey_cache_memory_mb: Some(c.network.dht.remote_max_subkey_cache_memory_mb as usize), max_subkey_cache_memory_mb: Some(c.network.dht.remote_max_subkey_cache_memory_mb as usize),
max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize), max_storage_space_mb: Some(c.network.dht.remote_max_storage_space_mb as usize),
public_watch_limit: c.network.dht.public_watch_limit,
member_watch_limit: c.network.dht.member_watch_limit,
} }
} }
@ -505,6 +509,24 @@ impl StorageManagerInner {
Ok(()) Ok(())
} }
pub async fn handle_watch_local_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
target: Target,
opt_watcher: Option<CryptoKey>,
) -> VeilidAPIResult<Option<Timestamp>> {
// See if it's in the local record store
let Some(local_record_store) = self.local_record_store.as_mut() else {
apibail_not_initialized!();
};
local_record_store
.watch_subkeys(key, subkeys, expiration, count, target, opt_watcher)
.await
}
pub async fn handle_get_remote_value( pub async fn handle_get_remote_value(
&mut self, &mut self,
key: TypedKey, key: TypedKey,
@ -561,6 +583,24 @@ impl StorageManagerInner {
Ok(()) Ok(())
} }
pub async fn handle_watch_remote_value(
&mut self,
key: TypedKey,
subkeys: ValueSubkeyRangeSet,
expiration: Timestamp,
count: u32,
target: Target,
opt_watcher: Option<CryptoKey>,
) -> VeilidAPIResult<Option<Timestamp>> {
// See if it's in the remote record store
let Some(remote_record_store) = self.remote_record_store.as_mut() else {
apibail_not_initialized!();
};
remote_record_store
.watch_subkeys(key, subkeys, expiration, count, target, opt_watcher)
.await
}
/// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ] /// # DHT Key = Hash(ownerKeyKind) of: [ ownerKeyValue, schema ]
fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey fn get_key<D>(vcrypto: CryptoSystemVersion, record: &Record<D>) -> TypedKey
where where

View File

@ -181,7 +181,14 @@ impl StorageManager {
let (_is_local, opt_expiration_ts) = { let (_is_local, opt_expiration_ts) = {
// See if the subkey we are watching has a local value // See if the subkey we are watching has a local value
let opt_expiration_ts = inner let opt_expiration_ts = inner
.handle_watch_local_value(key, subkeys, expiration, count, target, opt_watcher) .handle_watch_local_value(
key,
subkeys.clone(),
expiration,
count,
target.clone(),
opt_watcher,
)
.await?; .await?;
if opt_expiration_ts.is_some() { if opt_expiration_ts.is_some() {
(true, opt_expiration_ts) (true, opt_expiration_ts)