reporting correction and better debug

This commit is contained in:
Christien Rioux 2024-03-13 19:18:57 -04:00
parent 39eee4708c
commit c9f0003150
10 changed files with 125 additions and 32 deletions

View File

@ -21,6 +21,36 @@ pub(crate) struct FanoutResult {
pub value_nodes: Vec<NodeRef>, pub value_nodes: Vec<NodeRef>,
} }
pub(crate) fn debug_fanout_result(result: &FanoutResult) -> String {
let kc = match result.kind {
FanoutResultKind::Timeout => "T",
FanoutResultKind::Finished => "F",
FanoutResultKind::Exhausted => "E",
};
format!("{}:{}", kc, result.value_nodes.len())
}
pub(crate) fn debug_fanout_results(results: &[FanoutResult]) -> String {
let mut col = 0;
let mut out = String::new();
let mut left = results.len();
for r in results {
if col == 0 {
out += " ";
}
let sr = debug_fanout_result(r);
out += &sr;
out += ",";
col += 1;
left -= 1;
if col == 32 && left != 0 {
col = 0;
out += "\n"
}
}
out
}
pub(crate) type FanoutCallReturnType = RPCNetworkResult<Vec<PeerInfo>>; pub(crate) type FanoutCallReturnType = RPCNetworkResult<Vec<PeerInfo>>;
pub(crate) type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>; pub(crate) type FanoutNodeInfoFilter = Arc<dyn Fn(&[TypedKey], &NodeInfo) -> bool + Send + Sync>;

View File

@ -105,16 +105,16 @@ impl RPCProcessor {
let (seqs, peers, descriptor) = inspect_value_a.destructure(); let (seqs, peers, descriptor) = inspect_value_a.destructure();
if debug_target_enabled!("dht") { if debug_target_enabled!("dht") {
let debug_string_answer = format!( let debug_string_answer = format!(
"OUT <== InspectValueA({} {:?}{} peers={}) <= {}", "OUT <== InspectValueA({} {} peers={}) <= {} seqs:\n{}",
key, key,
seqs,
if descriptor.is_some() { if descriptor.is_some() {
" +desc" " +desc"
} else { } else {
"" ""
}, },
peers.len(), peers.len(),
dest dest,
debug_seqs(&seqs)
); );
log_dht!(debug "{}", debug_string_answer); log_dht!(debug "{}", debug_string_answer);

View File

@ -199,7 +199,7 @@ impl StorageManager {
kind, kind,
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}; };
log_dht!(debug "GetValue Fanout: {:?}", fanout_result); log_network_result!(debug "GetValue Fanout: {:?}", fanout_result);
Ok(OutboundGetValueResult { Ok(OutboundGetValueResult {
fanout_result, fanout_result,

View File

@ -15,8 +15,7 @@ impl DescriptorInfo {
subkeys: &ValueSubkeyRangeSet, subkeys: &ValueSubkeyRangeSet,
) -> VeilidAPIResult<Self> { ) -> VeilidAPIResult<Self> {
let schema = descriptor.schema().map_err(RPCError::invalid_format)?; let schema = descriptor.schema().map_err(RPCError::invalid_format)?;
let subkeys = subkeys.intersect(&ValueSubkeyRangeSet::single_range(0, schema.max_subkey())); let subkeys = schema.truncate_subkeys(subkeys, Some(MAX_INSPECT_VALUE_A_SEQS_LEN));
Ok(Self { Ok(Self {
descriptor, descriptor,
subkeys, subkeys,
@ -85,6 +84,7 @@ impl StorageManager {
// Make do-inspect-value answer context // Make do-inspect-value answer context
let opt_descriptor_info = if let Some(descriptor) = &local_inspect_result.opt_descriptor { let opt_descriptor_info = if let Some(descriptor) = &local_inspect_result.opt_descriptor {
// Get the descriptor info. This also truncates the subkeys list to what can be returned from the network.
Some(DescriptorInfo::new(descriptor.clone(), &subkeys)?) Some(DescriptorInfo::new(descriptor.clone(), &subkeys)?)
} else { } else {
None None
@ -127,6 +127,7 @@ impl StorageManager {
if let Some(descriptor) = answer.descriptor { if let Some(descriptor) = answer.descriptor {
let mut ctx = context.lock(); let mut ctx = context.lock();
if ctx.opt_descriptor_info.is_none() { if ctx.opt_descriptor_info.is_none() {
// Get the descriptor info. This also truncates the subkeys list to what can be returned from the network.
let descriptor_info = let descriptor_info =
match DescriptorInfo::new(Arc::new(descriptor.clone()), &subkeys) { match DescriptorInfo::new(Arc::new(descriptor.clone()), &subkeys) {
Ok(v) => v, Ok(v) => v,
@ -172,7 +173,11 @@ impl StorageManager {
.map(|s| SubkeySeqCount { .map(|s| SubkeySeqCount {
seq: *s, seq: *s,
// One node has shown us the newest sequence numbers so far // One node has shown us the newest sequence numbers so far
value_nodes: vec![next_node.clone()], value_nodes: if *s == ValueSeqNum::MAX {
vec![]
} else {
vec![next_node.clone()]
},
}) })
.collect(); .collect();
} else { } else {
@ -265,7 +270,7 @@ impl StorageManager {
let ctx = context.lock(); let ctx = context.lock();
let mut fanout_results = vec![]; let mut fanout_results = vec![];
for cs in &ctx.seqcounts { for cs in &ctx.seqcounts {
let has_consensus = cs.value_nodes.len() > consensus_count; let has_consensus = cs.value_nodes.len() >= consensus_count;
let fanout_result = FanoutResult { let fanout_result = FanoutResult {
kind: if has_consensus { kind: if has_consensus {
FanoutResultKind::Finished FanoutResultKind::Finished
@ -277,7 +282,7 @@ impl StorageManager {
fanout_results.push(fanout_result); fanout_results.push(fanout_result);
} }
log_dht!(debug "InspectValue Fanout ({:?}): {:?}", kind, fanout_results.iter().map(|fr| (fr.kind, fr.value_nodes.len())).collect::<Vec<_>>()); log_network_result!(debug "InspectValue Fanout ({:?}):\n{}", kind, debug_fanout_results(&fanout_results));
Ok(OutboundInspectValueResult { Ok(OutboundInspectValueResult {
fanout_results, fanout_results,

View File

@ -801,23 +801,9 @@ where
let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| { let Some((subkeys, opt_descriptor)) = self.with_record(key, |record| {
// Get number of subkeys from schema and ensure we are getting the // Get number of subkeys from schema and ensure we are getting the
// right number of sequence numbers betwen that and what we asked for // right number of sequence numbers betwen that and what we asked for
let in_schema_subkeys = subkeys.intersect(&ValueSubkeyRangeSet::single_range( let truncated_subkeys = record
0, .schema()
record.schema().max_subkey(), .truncate_subkeys(&subkeys, Some(MAX_INSPECT_VALUE_A_SEQS_LEN));
));
// Cap the number of total subkeys being inspected to the amount we can send across the wire
let truncated_subkeys = if let Some(nth_subkey) =
in_schema_subkeys.nth_subkey(MAX_INSPECT_VALUE_A_SEQS_LEN)
{
in_schema_subkeys.difference(&ValueSubkeyRangeSet::single_range(
nth_subkey,
ValueSubkey::MAX,
))
} else {
in_schema_subkeys
};
( (
truncated_subkeys, truncated_subkeys,
if want_descriptor { if want_descriptor {

View File

@ -183,7 +183,7 @@ impl StorageManager {
kind, kind,
value_nodes: ctx.value_nodes.clone(), value_nodes: ctx.value_nodes.clone(),
}; };
log_dht!(debug "SetValue Fanout: {:?}", fanout_result); log_network_result!(debug "SetValue Fanout: {:?}", fanout_result);
Ok(OutboundSetValueResult { Ok(OutboundSetValueResult {
fanout_result, fanout_result,

View File

@ -149,6 +149,9 @@ fn get_dht_schema(text: &str) -> Option<VeilidAPIResult<DHTSchema>> {
if text.is_empty() { if text.is_empty() {
return None; return None;
} }
if let Ok(n) = u16::from_str(text) {
return Some(DHTSchema::dflt(n));
}
Some(deserialize_json::<DHTSchema>(text)) Some(deserialize_json::<DHTSchema>(text))
} }
@ -1479,7 +1482,13 @@ impl VeilidAPI {
let mut dc = DEBUG_CACHE.lock(); let mut dc = DEBUG_CACHE.lock();
dc.opened_record_contexts.insert(*record.key(), rc); dc.opened_record_contexts.insert(*record.key(), rc);
Ok(format!("Created: {:?} : {:?}", record.key(), record)) Ok(format!(
"Created: {} {}:{}\n{:?}",
record.key(),
record.owner(),
record.owner_secret().unwrap(),
record
))
} }
async fn debug_record_open(&self, args: Vec<String>) -> VeilidAPIResult<String> { async fn debug_record_open(&self, args: Vec<String>) -> VeilidAPIResult<String> {
@ -1965,7 +1974,10 @@ record list <local|remote>
<addresstype> is: ipv4|ipv6 <addresstype> is: ipv4|ipv6
<routingdomain> is: public|local <routingdomain> is: public|local
<cryptokind> is: VLD0 <cryptokind> is: VLD0
<dhtschema> is: a json dht schema, default is '{"kind":"DFLT","o_cnt":1}' <dhtschema> is:
* a single-quoted json dht schema, or
* an integer number for a DFLT schema subkey count.
default is '{"kind":"DFLT","o_cnt":1}'
<scope> is: local, syncget, syncset, updateget, updateset <scope> is: local, syncget, syncset, updateget, updateset
<subkey> is: a number: 2 <subkey> is: a number: 2
<subkeys> is: <subkeys> is:

View File

@ -1,9 +1,7 @@
use super::*; use super::*;
/// DHT Record Report /// DHT Record Report
#[derive( #[derive(Default, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
Debug, Default, Clone, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize, JsonSchema,
)]
#[cfg_attr( #[cfg_attr(
target_arch = "wasm32", target_arch = "wasm32",
derive(Tsify), derive(Tsify),
@ -45,6 +43,17 @@ impl DHTRecordReport {
} }
} }
impl fmt::Debug for DHTRecordReport {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"DHTRecordReport {{\n subkeys: {:?}\n local_seqs:\n{}\n remote_seqs:\n{}\n}}\n",
&self.subkeys,
&debug_seqs(&self.local_seqs),
&debug_seqs(&self.network_seqs)
)
}
}
/// DHT Record Report Scope /// DHT Record Report Scope
#[derive( #[derive(
Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema, Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, JsonSchema,

View File

@ -18,3 +18,28 @@ pub type ValueSubkey = u32;
/// Value sequence number /// Value sequence number
#[cfg_attr(target_arch = "wasm32", declare)] #[cfg_attr(target_arch = "wasm32", declare)]
pub type ValueSeqNum = u32; pub type ValueSeqNum = u32;
pub(crate) fn debug_seqs(seqs: &[ValueSeqNum]) -> String {
let mut col = 0;
let mut out = String::new();
let mut left = seqs.len();
for s in seqs {
if col == 0 {
out += " ";
}
let sc = if *s == ValueSeqNum::MAX {
"-".to_owned()
} else {
s.to_string()
};
out += &sc;
out += ",";
col += 1;
left -= 1;
if col == 32 && left != 0 {
col = 0;
out += "\n"
}
}
out
}

View File

@ -75,6 +75,32 @@ impl DHTSchema {
DHTSchema::SMPL(s) => s.is_member(key), DHTSchema::SMPL(s) => s.is_member(key),
} }
} }
/// Truncate a subkey range set to the schema
/// Optionally also trim to maximum number of subkeys in the range
pub fn truncate_subkeys(
&self,
subkeys: &ValueSubkeyRangeSet,
opt_max_subkey_len: Option<usize>,
) -> ValueSubkeyRangeSet {
// Get number of subkeys from schema and trim to the bounds of the schema
let in_schema_subkeys =
subkeys.intersect(&ValueSubkeyRangeSet::single_range(0, self.max_subkey()));
// Cap the number of total subkeys being inspected to the amount we can send across the wire
if let Some(max_subkey_len) = opt_max_subkey_len {
if let Some(nth_subkey) = in_schema_subkeys.nth_subkey(max_subkey_len) {
in_schema_subkeys.difference(&ValueSubkeyRangeSet::single_range(
nth_subkey,
ValueSubkey::MAX,
))
} else {
in_schema_subkeys
}
} else {
in_schema_subkeys
}
}
} }
impl Default for DHTSchema { impl Default for DHTSchema {