From eba349d22cc40db1f0b4ba058342c5f5839c5161 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sun, 28 Apr 2024 12:42:13 -0400 Subject: [PATCH] integration test for dht records --- veilid-core/src/storage_manager/debug.rs | 1 + .../tasks/offline_subkey_writes.rs | 6 ++ veilid-python/tests/api.py | 28 ++++---- veilid-python/tests/test_dht.py | 64 ++++++++++++++++++- veilid-python/veilid/api.py | 8 +-- veilid-python/veilid/json_api.py | 8 +-- 6 files changed, 92 insertions(+), 23 deletions(-) diff --git a/veilid-core/src/storage_manager/debug.rs b/veilid-core/src/storage_manager/debug.rs index 141fdf3a..fb165877 100644 --- a/veilid-core/src/storage_manager/debug.rs +++ b/veilid-core/src/storage_manager/debug.rs @@ -50,6 +50,7 @@ impl StorageManager { let reclaimed = local_record_store .reclaim_space(reclaim.unwrap_or(usize::MAX)) .await; + inner.offline_subkey_writes.clear(); format!("Local records purged: reclaimed {} bytes", reclaimed) } pub(crate) async fn purge_remote_records(&self, reclaim: Option) -> String { diff --git a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs index e0acb788..ced02735 100644 --- a/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs +++ b/veilid-core/src/storage_manager/tasks/offline_subkey_writes.rs @@ -40,14 +40,20 @@ impl StorageManager { }; let Ok(get_result) = get_result else { log_stor!(debug "Offline subkey write had no subkey result: {}:{}", key, subkey); + // drop this one + written_subkeys.insert(subkey); continue; }; let Some(value) = get_result.opt_value else { log_stor!(debug "Offline subkey write had no subkey value: {}:{}", key, subkey); + // drop this one + written_subkeys.insert(subkey); continue; }; let Some(descriptor) = get_result.opt_descriptor else { log_stor!(debug "Offline subkey write had no descriptor: {}:{}", key, subkey); + // drop this one + written_subkeys.insert(subkey); continue; }; log_stor!(debug "Offline subkey write: {}:{} len={}", key, subkey, value.value_data().data().len()); diff --git a/veilid-python/tests/api.py b/veilid-python/tests/api.py index 3bb86f33..fa7d1a9f 100644 --- a/veilid-python/tests/api.py +++ b/veilid-python/tests/api.py @@ -21,16 +21,16 @@ class VeilidTestConnectionError(Exception): @cache -def server_info() -> tuple[str, int]: +def server_info(subindex: int = 0) -> tuple[str, int]: """Return the hostname and port of the test server.""" VEILID_SERVER_NETWORK = os.getenv("VEILID_SERVER_NETWORK") if VEILID_SERVER_NETWORK is None: - return "localhost", 5959 + return "localhost", 5959 + subindex hostname, *rest = VEILID_SERVER_NETWORK.split(":") if rest: - return hostname, int(rest[0]) - return hostname, 5959 + return hostname, int(rest[0]) + subindex + return hostname, 5959 + subindex def ipc_path_exists(path: str) -> bool: """Determine if an IPC socket exists in a platform independent way.""" @@ -42,42 +42,42 @@ def ipc_path_exists(path: str) -> bool: return os.path.exists(path) @cache -def ipc_info() -> str: +def ipc_info(subindex: int = 0) -> str: """Return the path of the ipc socket of the test server.""" VEILID_SERVER_IPC = os.getenv("VEILID_SERVER_IPC") if VEILID_SERVER_IPC is not None: return VEILID_SERVER_IPC if os.name == 'nt': - return '\\\\.\\PIPE\\veilid-server\\0' + return f'\\\\.\\PIPE\\veilid-server\\{subindex}' - ipc_0_path = "/var/db/veilid-server/ipc/0" - if os.path.exists(ipc_0_path): - return ipc_0_path + ipc_path = f"/var/db/veilid-server/ipc/{subindex}" + if os.path.exists(ipc_path): + return ipc_path # hack to deal with rust's 'directories' crate case-inconsistency if sys.platform.startswith('darwin'): data_dir = appdirs.user_data_dir("org.Veilid.Veilid") else: data_dir = appdirs.user_data_dir("veilid","veilid") - ipc_0_path = os.path.join(data_dir, "ipc", "0") - return ipc_0_path + ipc_path = os.path.join(data_dir, "ipc", str(subindex)) + return ipc_path -async def api_connector(callback: Callable) -> _JsonVeilidAPI: +async def api_connector(callback: Callable, subindex: int = 0) -> _JsonVeilidAPI: """Return an API connection if possible. If the connection fails due to an inability to connect to the server's socket, raise an easy-to-catch VeilidTestConnectionError. """ - ipc_path = ipc_info() - hostname, port = server_info() + ipc_path = ipc_info(subindex) try: if ipc_path_exists(ipc_path): return await veilid.json_api_connect_ipc(ipc_path, callback) else: + hostname, port = server_info(subindex) return await veilid.json_api_connect(hostname, port, callback) except OSError as exc: # This is a little goofy. The underlying Python library handles diff --git a/veilid-python/tests/test_dht.py b/veilid-python/tests/test_dht.py index 58447c9e..e7aa01b2 100644 --- a/veilid-python/tests/test_dht.py +++ b/veilid-python/tests/test_dht.py @@ -4,6 +4,7 @@ import veilid import pytest import asyncio import json +import os from . import * from .api import VeilidTestConnectionError, api_connector @@ -340,4 +341,65 @@ async def test_inspect_dht_record(api_connection: veilid.VeilidAPI): assert rr2.network_seqs == [0, 0xFFFFFFFF] await rc.close_dht_record(rec.key) - await rc.delete_dht_record(rec.key) \ No newline at end of file + await rc.delete_dht_record(rec.key) + +@pytest.mark.skipif(os.getenv("INTEGRATION") != "1", reason="integration test requires two servers running") +@pytest.mark.asyncio +async def test_dht_integration_writer_reader(): + + async def null_update_callback(update: veilid.VeilidUpdate): + pass + + try: + api0 = await api_connector(null_update_callback, 0) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server 0.") + return + + try: + api1 = await api_connector(null_update_callback, 1) + except VeilidTestConnectionError: + pytest.skip("Unable to connect to veilid-server 1.") + return + + async with api0, api1: + # purge local and remote record stores to ensure we start fresh + await api0.debug("record purge local") + await api0.debug("record purge remote") + await api1.debug("record purge local") + await api1.debug("record purge remote") + + # make routing contexts + rc0 = await api0.new_routing_context() + rc1 = await api1.new_routing_context() + async with rc0, rc1: + + COUNT = 10 + TEST_DATA = b"test data" + + # write dht records on server 0 + records = [] + schema = veilid.DHTSchema.dflt(1) + print(f'writing {COUNT} records') + for n in range(COUNT): + desc = await rc0.create_dht_record(schema) + records.append(desc) + + await rc0.set_dht_value(desc.key, 0, TEST_DATA) + await rc0.close_dht_record(desc.key) + + print(f' {n}') + + # read dht records on server 1 + print(f'reading {COUNT} records') + n=0 + for desc0 in records: + desc1 = await rc1.open_dht_record(desc0.key) + vd1 = await rc1.get_dht_value(desc1.key, 0) + assert vd1.data == TEST_DATA + await rc1.close_dht_record(desc1.key) + + print(f' {n}') + n+=1 + + diff --git a/veilid-python/veilid/api.py b/veilid-python/veilid/api.py index 4f32e178..3d2e29c0 100644 --- a/veilid-python/veilid/api.py +++ b/veilid-python/veilid/api.py @@ -69,7 +69,7 @@ class RoutingContext(ABC): @abstractmethod async def get_dht_value( - self, key: types.TypedKey, subkey: types.ValueSubkey, force_refresh: bool + self, key: types.TypedKey, subkey: types.ValueSubkey, force_refresh: bool = False ) -> Optional[types.ValueData]: pass @@ -84,8 +84,8 @@ class RoutingContext(ABC): self, key: types.TypedKey, subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]], - expiration: types.Timestamp, - count: int, + expiration: types.Timestamp = 0, + count: int = 0xFFFFFFFF, ) -> types.Timestamp: pass @@ -102,7 +102,7 @@ class RoutingContext(ABC): self, key: types.TypedKey, subkeys: list[tuple[types.ValueSubkey, types.ValueSubkey]], - scope: types.DHTReportScope, + scope: types.DHTReportScope = types.DHTReportScope.LOCAL, ) -> types.DHTRecordReport: pass diff --git a/veilid-python/veilid/json_api.py b/veilid-python/veilid/json_api.py index 0ff6ee96..5745327e 100644 --- a/veilid-python/veilid/json_api.py +++ b/veilid-python/veilid/json_api.py @@ -619,7 +619,7 @@ class _JsonRoutingContext(RoutingContext): ) async def get_dht_value( - self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool + self, key: TypedKey, subkey: ValueSubkey, force_refresh: bool = False ) -> Optional[ValueData]: ret = raise_api_result( await self.api.send_ndjson_request( @@ -655,8 +655,8 @@ class _JsonRoutingContext(RoutingContext): self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], - expiration: Timestamp, - count: int, + expiration: Timestamp = 0, + count: int = 0xFFFFFFFF, ) -> Timestamp: return Timestamp( raise_api_result( @@ -691,7 +691,7 @@ class _JsonRoutingContext(RoutingContext): self, key: TypedKey, subkeys: list[tuple[ValueSubkey, ValueSubkey]], - scope: DHTReportScope, + scope: DHTReportScope = DHTReportScope.LOCAL, ) -> DHTRecordReport: return DHTRecordReport.from_json( raise_api_result(