veilid/veilid-python/tests/test_dht.py

83 lines
3.0 KiB
Python
Raw Normal View History

2023-06-19 21:35:49 -05:00
# Routing context veilid tests
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
import veilid
import pytest
import asyncio
import json
from . import *
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
##################################################################
BOGUS_KEY = veilid.TypedKey.from_value(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.PublicKey.from_bytes(b' '))
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
@pytest.mark.asyncio
async def test_get_dht_value_unopened(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
out = await rc.get_dht_value(BOGUS_KEY, veilid.ValueSubkey(0), False)
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
@pytest.mark.asyncio
async def test_open_dht_record_nonexistent_no_writer(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
out = await rc.open_dht_record(BOGUS_KEY, None)
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
@pytest.mark.asyncio
async def test_close_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
await rc.close_dht_record(BOGUS_KEY)
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
@pytest.mark.asyncio
async def test_delete_dht_record_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
with pytest.raises(veilid.VeilidAPIError):
await rc.delete_dht_record(BOGUS_KEY)
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
@pytest.mark.asyncio
async def test_create_delete_dht_record_simple(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key)
2023-06-18 17:47:39 -05:00
2023-06-19 21:35:49 -05:00
@pytest.mark.asyncio
async def test_get_dht_value_nonexistent(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
assert await rc.get_dht_value(rec.key, 0, False) == None
await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key)
2023-06-19 21:35:49 -05:00
@pytest.mark.asyncio
async def test_set_get_dht_value(api_connection: veilid.VeilidAPI):
rc = await api_connection.new_routing_context()
async with rc:
rec = await rc.create_dht_record(veilid.CryptoKind.CRYPTO_KIND_VLD0, veilid.DHTSchema.dflt(1))
2023-06-19 21:35:49 -05:00
vd = await rc.set_dht_value(rec.key, 0, b"BLAH BLAH BLAH")
assert vd != None
2023-06-19 21:35:49 -05:00
vd2 = await rc.get_dht_value(rec.key, 0, False)
assert vd2 != None
2023-06-26 20:29:02 -05:00
vd3 = await rc.get_dht_value(rec.key, 0, True)
assert vd3 != None
2023-06-20 22:46:39 -05:00
print("vd: {}", vd.__dict__)
print("vd2: {}", vd2.__dict__)
2023-06-26 20:29:02 -05:00
print("vd3: {}", vd3.__dict__)
2023-06-20 22:46:39 -05:00
2023-06-19 21:35:49 -05:00
assert vd == vd2
2023-06-26 20:29:02 -05:00
assert vd2 == vd3
2023-06-19 21:35:49 -05:00
await rc.close_dht_record(rec.key)
await rc.delete_dht_record(rec.key)