mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-02-25 18:55:38 -06:00
major logging cleanup
This commit is contained in:
parent
6455aff14a
commit
fdc3de906f
@ -47,12 +47,10 @@ enable-crypto-none = []
|
|||||||
# Debugging and testing features
|
# Debugging and testing features
|
||||||
verbose-tracing = []
|
verbose-tracing = []
|
||||||
tracking = []
|
tracking = []
|
||||||
debug-dht = []
|
|
||||||
crypto-test = ["enable-crypto-vld0", "enable-crypto-none"]
|
crypto-test = ["enable-crypto-vld0", "enable-crypto-none"]
|
||||||
crypto-test-none = ["enable-crypto-none"]
|
crypto-test-none = ["enable-crypto-none"]
|
||||||
veilid_core_android_tests = ["dep:paranoid-android"]
|
veilid_core_android_tests = ["dep:paranoid-android"]
|
||||||
veilid_core_ios_tests = ["dep:tracing-oslog"]
|
veilid_core_ios_tests = ["dep:tracing-oslog"]
|
||||||
network-result-extra = ["veilid-tools/network-result-extra"]
|
|
||||||
|
|
||||||
### DEPENDENCIES
|
### DEPENDENCIES
|
||||||
|
|
||||||
|
@ -203,7 +203,7 @@ impl AttachmentManager {
|
|||||||
|
|
||||||
#[instrument(level = "debug", skip(self))]
|
#[instrument(level = "debug", skip(self))]
|
||||||
async fn attachment_maintainer(self) {
|
async fn attachment_maintainer(self) {
|
||||||
debug!("attachment starting");
|
log_net!(debug "attachment starting");
|
||||||
self.update_attaching_detaching_state(AttachmentState::Attaching);
|
self.update_attaching_detaching_state(AttachmentState::Attaching);
|
||||||
|
|
||||||
let netman = self.network_manager();
|
let netman = self.network_manager();
|
||||||
@ -217,7 +217,7 @@ impl AttachmentManager {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("started maintaining peers");
|
log_net!(debug "started maintaining peers");
|
||||||
while self.inner.lock().maintain_peers {
|
while self.inner.lock().maintain_peers {
|
||||||
// tick network manager
|
// tick network manager
|
||||||
if let Err(err) = netman.tick().await {
|
if let Err(err) = netman.tick().await {
|
||||||
@ -241,32 +241,31 @@ impl AttachmentManager {
|
|||||||
// sleep should be at the end in case maintain_peers changes state
|
// sleep should be at the end in case maintain_peers changes state
|
||||||
sleep(1000).await;
|
sleep(1000).await;
|
||||||
}
|
}
|
||||||
debug!("stopped maintaining peers");
|
log_net!(debug "stopped maintaining peers");
|
||||||
|
|
||||||
if !restart {
|
if !restart {
|
||||||
self.update_attaching_detaching_state(AttachmentState::Detaching);
|
self.update_attaching_detaching_state(AttachmentState::Detaching);
|
||||||
debug!("attachment stopping");
|
log_net!(debug "attachment stopping");
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("stopping network");
|
log_net!(debug "stopping network");
|
||||||
netman.shutdown().await;
|
netman.shutdown().await;
|
||||||
|
|
||||||
if !restart {
|
if !restart {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("completely restarting attachment");
|
log_net!(debug "completely restarting attachment");
|
||||||
// chill out for a second first, give network stack time to settle out
|
// chill out for a second first, give network stack time to settle out
|
||||||
sleep(1000).await;
|
sleep(1000).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
self.update_attaching_detaching_state(AttachmentState::Detached);
|
self.update_attaching_detaching_state(AttachmentState::Detached);
|
||||||
debug!("attachment stopped");
|
log_net!(debug "attachment stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "debug", skip_all, err)]
|
#[instrument(level = "debug", skip_all, err)]
|
||||||
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
|
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
|
||||||
trace!("init");
|
|
||||||
{
|
{
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
inner.update_callback = Some(update_callback.clone());
|
inner.update_callback = Some(update_callback.clone());
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
use crate::api_tracing_layer::*;
|
|
||||||
use crate::attachment_manager::*;
|
use crate::attachment_manager::*;
|
||||||
use crate::crypto::Crypto;
|
use crate::crypto::Crypto;
|
||||||
|
use crate::logging::*;
|
||||||
use crate::storage_manager::*;
|
use crate::storage_manager::*;
|
||||||
use crate::veilid_api::*;
|
use crate::veilid_api::*;
|
||||||
use crate::veilid_config::*;
|
use crate::veilid_config::*;
|
||||||
@ -70,7 +70,6 @@ impl ServicesContext {
|
|||||||
ApiTracingLayer::init(self.update_callback.clone()).await;
|
ApiTracingLayer::init(self.update_callback.clone()).await;
|
||||||
|
|
||||||
// Set up protected store
|
// Set up protected store
|
||||||
trace!("init protected store");
|
|
||||||
let protected_store = ProtectedStore::new(self.config.clone());
|
let protected_store = ProtectedStore::new(self.config.clone());
|
||||||
if let Err(e) = protected_store.init().await {
|
if let Err(e) = protected_store.init().await {
|
||||||
error!("failed to init protected store: {}", e);
|
error!("failed to init protected store: {}", e);
|
||||||
@ -80,7 +79,6 @@ impl ServicesContext {
|
|||||||
self.protected_store = Some(protected_store.clone());
|
self.protected_store = Some(protected_store.clone());
|
||||||
|
|
||||||
// Set up tablestore and crypto system
|
// Set up tablestore and crypto system
|
||||||
trace!("create table store and crypto system");
|
|
||||||
let table_store = TableStore::new(self.config.clone(), protected_store.clone());
|
let table_store = TableStore::new(self.config.clone(), protected_store.clone());
|
||||||
let crypto = Crypto::new(self.config.clone(), table_store.clone());
|
let crypto = Crypto::new(self.config.clone(), table_store.clone());
|
||||||
table_store.set_crypto(crypto.clone());
|
table_store.set_crypto(crypto.clone());
|
||||||
@ -88,7 +86,6 @@ impl ServicesContext {
|
|||||||
// Initialize table store first, so crypto code can load caches
|
// Initialize table store first, so crypto code can load caches
|
||||||
// Tablestore can use crypto during init, just not any cached operations or things
|
// Tablestore can use crypto during init, just not any cached operations or things
|
||||||
// that require flushing back to the tablestore
|
// that require flushing back to the tablestore
|
||||||
trace!("init table store");
|
|
||||||
if let Err(e) = table_store.init().await {
|
if let Err(e) = table_store.init().await {
|
||||||
error!("failed to init table store: {}", e);
|
error!("failed to init table store: {}", e);
|
||||||
self.shutdown().await;
|
self.shutdown().await;
|
||||||
@ -97,7 +94,6 @@ impl ServicesContext {
|
|||||||
self.table_store = Some(table_store.clone());
|
self.table_store = Some(table_store.clone());
|
||||||
|
|
||||||
// Set up crypto
|
// Set up crypto
|
||||||
trace!("init crypto");
|
|
||||||
if let Err(e) = crypto.init().await {
|
if let Err(e) = crypto.init().await {
|
||||||
error!("failed to init crypto: {}", e);
|
error!("failed to init crypto: {}", e);
|
||||||
self.shutdown().await;
|
self.shutdown().await;
|
||||||
@ -108,7 +104,6 @@ impl ServicesContext {
|
|||||||
// Set up block store
|
// Set up block store
|
||||||
#[cfg(feature = "unstable-blockstore")]
|
#[cfg(feature = "unstable-blockstore")]
|
||||||
{
|
{
|
||||||
trace!("init block store");
|
|
||||||
let block_store = BlockStore::new(self.config.clone());
|
let block_store = BlockStore::new(self.config.clone());
|
||||||
if let Err(e) = block_store.init().await {
|
if let Err(e) = block_store.init().await {
|
||||||
error!("failed to init block store: {}", e);
|
error!("failed to init block store: {}", e);
|
||||||
@ -119,7 +114,6 @@ impl ServicesContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set up storage manager
|
// Set up storage manager
|
||||||
trace!("init storage manager");
|
|
||||||
let update_callback = self.update_callback.clone();
|
let update_callback = self.update_callback.clone();
|
||||||
|
|
||||||
let storage_manager = StorageManager::new(
|
let storage_manager = StorageManager::new(
|
||||||
@ -137,7 +131,6 @@ impl ServicesContext {
|
|||||||
self.storage_manager = Some(storage_manager.clone());
|
self.storage_manager = Some(storage_manager.clone());
|
||||||
|
|
||||||
// Set up attachment manager
|
// Set up attachment manager
|
||||||
trace!("init attachment manager");
|
|
||||||
let update_callback = self.update_callback.clone();
|
let update_callback = self.update_callback.clone();
|
||||||
let attachment_manager = AttachmentManager::new(
|
let attachment_manager = AttachmentManager::new(
|
||||||
self.config.clone(),
|
self.config.clone(),
|
||||||
@ -163,28 +156,22 @@ impl ServicesContext {
|
|||||||
info!("Veilid API shutting down");
|
info!("Veilid API shutting down");
|
||||||
|
|
||||||
if let Some(attachment_manager) = &mut self.attachment_manager {
|
if let Some(attachment_manager) = &mut self.attachment_manager {
|
||||||
trace!("terminate attachment manager");
|
|
||||||
attachment_manager.terminate().await;
|
attachment_manager.terminate().await;
|
||||||
}
|
}
|
||||||
if let Some(storage_manager) = &mut self.storage_manager {
|
if let Some(storage_manager) = &mut self.storage_manager {
|
||||||
trace!("terminate storage manager");
|
|
||||||
storage_manager.terminate().await;
|
storage_manager.terminate().await;
|
||||||
}
|
}
|
||||||
#[cfg(feature = "unstable-blockstore")]
|
#[cfg(feature = "unstable-blockstore")]
|
||||||
if let Some(block_store) = &mut self.block_store {
|
if let Some(block_store) = &mut self.block_store {
|
||||||
trace!("terminate block store");
|
|
||||||
block_store.terminate().await;
|
block_store.terminate().await;
|
||||||
}
|
}
|
||||||
if let Some(crypto) = &mut self.crypto {
|
if let Some(crypto) = &mut self.crypto {
|
||||||
trace!("terminate crypto");
|
|
||||||
crypto.terminate().await;
|
crypto.terminate().await;
|
||||||
}
|
}
|
||||||
if let Some(table_store) = &mut self.table_store {
|
if let Some(table_store) = &mut self.table_store {
|
||||||
trace!("terminate table store");
|
|
||||||
table_store.terminate().await;
|
table_store.terminate().await;
|
||||||
}
|
}
|
||||||
if let Some(protected_store) = &mut self.protected_store {
|
if let Some(protected_store) = &mut self.protected_store {
|
||||||
trace!("terminate protected store");
|
|
||||||
protected_store.terminate().await;
|
protected_store.terminate().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -220,7 +207,6 @@ impl VeilidCoreContext {
|
|||||||
config_callback: ConfigCallback,
|
config_callback: ConfigCallback,
|
||||||
) -> VeilidAPIResult<VeilidCoreContext> {
|
) -> VeilidAPIResult<VeilidCoreContext> {
|
||||||
// Set up config from callback
|
// Set up config from callback
|
||||||
trace!("setup config with callback");
|
|
||||||
let mut config = VeilidConfig::new();
|
let mut config = VeilidConfig::new();
|
||||||
config.setup(config_callback, update_callback.clone())?;
|
config.setup(config_callback, update_callback.clone())?;
|
||||||
|
|
||||||
@ -233,20 +219,17 @@ impl VeilidCoreContext {
|
|||||||
config_json: String,
|
config_json: String,
|
||||||
) -> VeilidAPIResult<VeilidCoreContext> {
|
) -> VeilidAPIResult<VeilidCoreContext> {
|
||||||
// Set up config from json
|
// Set up config from json
|
||||||
trace!("setup config with json");
|
|
||||||
let mut config = VeilidConfig::new();
|
let mut config = VeilidConfig::new();
|
||||||
config.setup_from_json(config_json, update_callback.clone())?;
|
config.setup_from_json(config_json, update_callback.clone())?;
|
||||||
Self::new_common(update_callback, config).await
|
Self::new_common(update_callback, config).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[instrument(err, skip_all)]
|
#[instrument(err, skip_all)]
|
||||||
async fn new_with_config(
|
async fn new_with_config(
|
||||||
update_callback: UpdateCallback,
|
update_callback: UpdateCallback,
|
||||||
config_inner: VeilidConfigInner,
|
config_inner: VeilidConfigInner,
|
||||||
) -> VeilidAPIResult<VeilidCoreContext> {
|
) -> VeilidAPIResult<VeilidCoreContext> {
|
||||||
// Set up config from json
|
// Set up config from json
|
||||||
trace!("setup config with json");
|
|
||||||
let mut config = VeilidConfig::new();
|
let mut config = VeilidConfig::new();
|
||||||
config.setup_from_config(config_inner, update_callback.clone())?;
|
config.setup_from_config(config_inner, update_callback.clone())?;
|
||||||
Self::new_common(update_callback, config).await
|
Self::new_common(update_callback, config).await
|
||||||
|
@ -128,8 +128,8 @@ impl Crypto {
|
|||||||
self.unlocked_inner.config.clone()
|
self.unlocked_inner.config.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, err)]
|
||||||
pub async fn init(&self) -> EyreResult<()> {
|
pub async fn init(&self) -> EyreResult<()> {
|
||||||
trace!("Crypto::init");
|
|
||||||
let table_store = self.unlocked_inner.table_store.clone();
|
let table_store = self.unlocked_inner.table_store.clone();
|
||||||
// Init node id from config
|
// Init node id from config
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
@ -190,7 +190,6 @@ impl Crypto {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn flush(&self) -> EyreResult<()> {
|
pub async fn flush(&self) -> EyreResult<()> {
|
||||||
//trace!("Crypto::flush");
|
|
||||||
let cache_bytes = {
|
let cache_bytes = {
|
||||||
let inner = self.inner.lock();
|
let inner = self.inner.lock();
|
||||||
cache_to_bytes(&inner.dh_cache)
|
cache_to_bytes(&inner.dh_cache)
|
||||||
@ -206,15 +205,14 @@ impl Crypto {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn terminate(&self) {
|
pub async fn terminate(&self) {
|
||||||
trace!("Crypto::terminate");
|
|
||||||
let flush_future = self.inner.lock().flush_future.take();
|
let flush_future = self.inner.lock().flush_future.take();
|
||||||
if let Some(f) = flush_future {
|
if let Some(f) = flush_future {
|
||||||
f.await;
|
f.await;
|
||||||
}
|
}
|
||||||
trace!("starting termination flush");
|
log_crypto!("starting termination flush");
|
||||||
match self.flush().await {
|
match self.flush().await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
trace!("finished termination flush");
|
log_crypto!("finished termination flush");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("failed termination flush: {}", e);
|
error!("failed termination flush: {}", e);
|
||||||
|
@ -33,7 +33,7 @@ impl ProtectedStore {
|
|||||||
if let Err(e) = self.remove_user_secret(kpsk).await {
|
if let Err(e) = self.remove_user_secret(kpsk).await {
|
||||||
error!("failed to delete '{}': {}", kpsk, e);
|
error!("failed to delete '{}': {}", kpsk, e);
|
||||||
} else {
|
} else {
|
||||||
debug!("deleted table '{}'", kpsk);
|
log_pstore!(debug "deleted table '{}'", kpsk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -19,7 +19,7 @@ impl ProtectedStore {
|
|||||||
if let Err(e) = self.remove_user_secret(kpsk).await {
|
if let Err(e) = self.remove_user_secret(kpsk).await {
|
||||||
error!("failed to delete '{}': {}", kpsk, e);
|
error!("failed to delete '{}': {}", kpsk, e);
|
||||||
} else {
|
} else {
|
||||||
debug!("deleted table '{}'", kpsk);
|
log_pstore!(debug "deleted table '{}'", kpsk);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -44,11 +44,11 @@ cfg_if::cfg_if! {
|
|||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate alloc;
|
extern crate alloc;
|
||||||
|
|
||||||
mod api_tracing_layer;
|
|
||||||
mod attachment_manager;
|
mod attachment_manager;
|
||||||
mod core_context;
|
mod core_context;
|
||||||
mod crypto;
|
mod crypto;
|
||||||
mod intf;
|
mod intf;
|
||||||
|
mod logging;
|
||||||
mod network_manager;
|
mod network_manager;
|
||||||
mod routing_table;
|
mod routing_table;
|
||||||
mod rpc_processor;
|
mod rpc_processor;
|
||||||
@ -56,14 +56,12 @@ mod storage_manager;
|
|||||||
mod table_store;
|
mod table_store;
|
||||||
mod veilid_api;
|
mod veilid_api;
|
||||||
mod veilid_config;
|
mod veilid_config;
|
||||||
mod veilid_layer_filter;
|
|
||||||
mod wasm_helpers;
|
mod wasm_helpers;
|
||||||
|
|
||||||
pub use self::api_tracing_layer::ApiTracingLayer;
|
|
||||||
pub use self::core_context::{api_startup, api_startup_config, api_startup_json, UpdateCallback};
|
pub use self::core_context::{api_startup, api_startup_config, api_startup_json, UpdateCallback};
|
||||||
|
pub use self::logging::{ApiTracingLayer, VeilidLayerFilter};
|
||||||
pub use self::veilid_api::*;
|
pub use self::veilid_api::*;
|
||||||
pub use self::veilid_config::*;
|
pub use self::veilid_config::*;
|
||||||
pub use self::veilid_layer_filter::*;
|
|
||||||
pub use veilid_tools as tools;
|
pub use veilid_tools as tools;
|
||||||
|
|
||||||
/// The on-the-wire serialization format for Veilid RPC
|
/// The on-the-wire serialization format for Veilid RPC
|
||||||
@ -96,7 +94,7 @@ pub fn default_veilid_config() -> String {
|
|||||||
#[cfg(target_os = "android")]
|
#[cfg(target_os = "android")]
|
||||||
pub use intf::android::veilid_core_setup_android;
|
pub use intf::android::veilid_core_setup_android;
|
||||||
|
|
||||||
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 24] = [
|
pub static DEFAULT_LOG_IGNORE_LIST: [&str; 26] = [
|
||||||
"mio",
|
"mio",
|
||||||
"h2",
|
"h2",
|
||||||
"hyper",
|
"hyper",
|
||||||
@ -121,6 +119,8 @@ pub static DEFAULT_LOG_IGNORE_LIST: [&str; 24] = [
|
|||||||
"ws_stream_wasm",
|
"ws_stream_wasm",
|
||||||
"keyvaluedb_web",
|
"keyvaluedb_web",
|
||||||
"veilid_api",
|
"veilid_api",
|
||||||
|
"network_result",
|
||||||
|
"dht",
|
||||||
];
|
];
|
||||||
|
|
||||||
use cfg_if::*;
|
use cfg_if::*;
|
||||||
|
@ -1,12 +1,10 @@
|
|||||||
// LogThru
|
mod api_tracing_layer;
|
||||||
// Pass errors through and log them simultaneously via map_err()
|
mod veilid_layer_filter;
|
||||||
// Also contains common log facilities (net, rpc, rtab, stor, pstore, crypto, etc )
|
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
pub fn map_to_string<X: ToString>(arg: X) -> String {
|
pub use api_tracing_layer::*;
|
||||||
arg.to_string()
|
pub use veilid_layer_filter::*;
|
||||||
}
|
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! fn_string {
|
macro_rules! fn_string {
|
||||||
@ -51,6 +49,78 @@ macro_rules! log_net {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! log_client_api {
|
||||||
|
(error $text:expr) => {error!(
|
||||||
|
target: "client_api",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(error $fmt:literal, $($arg:expr),+) => {
|
||||||
|
error!(target:"client_api", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(warn $text:expr) => {warn!(
|
||||||
|
target: "client_api",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(warn $fmt:literal, $($arg:expr),+) => {
|
||||||
|
warn!(target:"client_api", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(debug $text:expr) => {debug!(
|
||||||
|
target: "client_api",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(debug $fmt:literal, $($arg:expr),+) => {
|
||||||
|
debug!(target:"client_api", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
($text:expr) => {trace!(
|
||||||
|
target: "client_api",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
($fmt:literal, $($arg:expr),+) => {
|
||||||
|
trace!(target:"client_api", $fmt, $($arg),+);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! log_network_result {
|
||||||
|
(error $text:expr) => {error!(
|
||||||
|
target: "network_result",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(error $fmt:literal, $($arg:expr),+) => {
|
||||||
|
error!(target: "network_result", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(warn $text:expr) => {warn!(
|
||||||
|
target: "network_result",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(warn $fmt:literal, $($arg:expr),+) => {
|
||||||
|
warn!(target:"network_result", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(debug $text:expr) => {debug!(
|
||||||
|
target: "network_result",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(debug $fmt:literal, $($arg:expr),+) => {
|
||||||
|
debug!(target:"network_result", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
($text:expr) => {trace!(
|
||||||
|
target: "network_result",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
($fmt:literal, $($arg:expr),+) => {
|
||||||
|
trace!(target:"network_result", $fmt, $($arg),+);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! log_rpc {
|
macro_rules! log_rpc {
|
||||||
(error $text:expr) => { error!(
|
(error $text:expr) => { error!(
|
||||||
@ -87,6 +157,42 @@ macro_rules! log_rpc {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! log_dht {
|
||||||
|
(error $text:expr) => { error!(
|
||||||
|
target: "dht",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(error $fmt:literal, $($arg:expr),+) => {
|
||||||
|
error!(target:"dht", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(warn $text:expr) => { warn!(
|
||||||
|
target: "dht",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(warn $fmt:literal, $($arg:expr),+) => {
|
||||||
|
warn!(target:"dht", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(debug $text:expr) => { debug!(
|
||||||
|
target: "dht",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(debug $fmt:literal, $($arg:expr),+) => {
|
||||||
|
debug!(target:"dht", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
($text:expr) => {trace!(
|
||||||
|
target: "dht",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
($fmt:literal, $($arg:expr),+) => {
|
||||||
|
trace!(target:"dht", $fmt, $($arg),+);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! log_rtab {
|
macro_rules! log_rtab {
|
||||||
(error $text:expr) => { error!(
|
(error $text:expr) => { error!(
|
||||||
@ -195,6 +301,42 @@ macro_rules! log_pstore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! log_tstore {
|
||||||
|
(error $text:expr) => { error!(
|
||||||
|
target: "tstore",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(error $fmt:literal, $($arg:expr),+) => {
|
||||||
|
error!(target:"tstore", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(warn $text:expr) => { warn!(
|
||||||
|
target: "tstore",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(warn $fmt:literal, $($arg:expr),+) => {
|
||||||
|
warn!(target:"tstore", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
(debug $text:expr) => { debug!(
|
||||||
|
target: "tstore",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
(debug $fmt:literal, $($arg:expr),+) => {
|
||||||
|
debug!(target:"tstore", $fmt, $($arg),+);
|
||||||
|
};
|
||||||
|
($text:expr) => {trace!(
|
||||||
|
target: "tstore",
|
||||||
|
"{}",
|
||||||
|
$text,
|
||||||
|
)};
|
||||||
|
($fmt:literal, $($arg:expr),+) => {
|
||||||
|
trace!(target:"tstore", $fmt, $($arg),+);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! log_crypto {
|
macro_rules! log_crypto {
|
||||||
(error $text:expr) => { error!(
|
(error $text:expr) => { error!(
|
||||||
@ -222,188 +364,3 @@ macro_rules! log_crypto {
|
|||||||
trace!(target:"crypto", $fmt, $($arg),+);
|
trace!(target:"crypto", $fmt, $($arg),+);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! logthru_net {
|
|
||||||
($($level:ident)?) => {
|
|
||||||
logthru!($($level)? "net")
|
|
||||||
};
|
|
||||||
($($level:ident)? $text:literal) => {
|
|
||||||
logthru!($($level)? "net", $text)
|
|
||||||
};
|
|
||||||
($($level:ident)? $fmt:literal, $($arg:expr),+) => {
|
|
||||||
logthru!($($level)? "net", $fmt, $($arg),+)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! logthru_rpc {
|
|
||||||
($($level:ident)?) => {
|
|
||||||
logthru!($($level)? "rpc")
|
|
||||||
};
|
|
||||||
($($level:ident)? $text:literal) => {
|
|
||||||
logthru!($($level)? "rpc", $text)
|
|
||||||
};
|
|
||||||
($($level:ident)? $fmt:literal, $($arg:expr),+) => {
|
|
||||||
logthru!($($level)? "rpc", $fmt, $($arg),+)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! logthru_rtab {
|
|
||||||
($($level:ident)?) => {
|
|
||||||
logthru!($($level)? "rtab")
|
|
||||||
};
|
|
||||||
($($level:ident)? $text:literal) => {
|
|
||||||
logthru!($($level)? "rtab", $text)
|
|
||||||
};
|
|
||||||
($($level:ident)? $fmt:literal, $($arg:expr),+) => {
|
|
||||||
logthru!($($level)? "rtab", $fmt, $($arg),+)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! logthru_stor {
|
|
||||||
($($level:ident)?) => {
|
|
||||||
logthru!($($level)? "stor")
|
|
||||||
};
|
|
||||||
($($level:ident)? $text:literal) => {
|
|
||||||
logthru!($($level)? "stor", $text)
|
|
||||||
};
|
|
||||||
($($level:ident)? $fmt:literal, $($arg:expr),+) => {
|
|
||||||
logthru!($($level)? "stor", $fmt, $($arg),+)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! logthru_pstore {
|
|
||||||
($($level:ident)?) => {
|
|
||||||
logthru!($($level)? "pstore")
|
|
||||||
};
|
|
||||||
($($level:ident)? $text:literal) => {
|
|
||||||
logthru!($($level)? "pstore", $text)
|
|
||||||
};
|
|
||||||
($($level:ident)? $fmt:literal, $($arg:expr),+) => {
|
|
||||||
logthru!($($level)? "pstore", $fmt, $($arg),+)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! logthru_crypto {
|
|
||||||
($($level:ident)?) => {
|
|
||||||
logthru!($($level)? "crypto")
|
|
||||||
};
|
|
||||||
($($level:ident)? $text:literal) => {
|
|
||||||
logthru!($($level)? "crypto", $text)
|
|
||||||
};
|
|
||||||
($($level:ident)? $fmt:literal, $($arg:expr),+) => {
|
|
||||||
logthru!($($level)? "crypto", $fmt, $($arg),+)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! logthru {
|
|
||||||
// error
|
|
||||||
(error $target:literal) => (|e__| {
|
|
||||||
error!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}]",
|
|
||||||
e__,
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
(error $target:literal, $text:literal) => (|e__| {
|
|
||||||
error!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}] {}",
|
|
||||||
e__,
|
|
||||||
$text
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
(error $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
|
||||||
error!(
|
|
||||||
target: $target,
|
|
||||||
concat!("[{:?}] ", $fmt),
|
|
||||||
e__,
|
|
||||||
$($arg),+
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
// warn
|
|
||||||
(warn $target:literal) => (|e__| {
|
|
||||||
warn!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}]",
|
|
||||||
e__,
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
(warn $target:literal, $text:literal) => (|e__| {
|
|
||||||
warn!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}] {}",
|
|
||||||
e__,
|
|
||||||
$text
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
(warn $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
|
||||||
warn!(
|
|
||||||
target: $target,
|
|
||||||
concat!("[{:?}] ", $fmt),
|
|
||||||
e__,
|
|
||||||
$($arg),+
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
// debug
|
|
||||||
(debug $target:literal) => (|e__| {
|
|
||||||
debug!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}]",
|
|
||||||
e__,
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
(debug $target:literal, $text:literal) => (|e__| {
|
|
||||||
debug!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}] {}",
|
|
||||||
e__,
|
|
||||||
$text
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
(debug $target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
|
||||||
debug!(
|
|
||||||
target: $target,
|
|
||||||
concat!("[{:?}] ", $fmt),
|
|
||||||
e__,
|
|
||||||
$($arg),+
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
// trace
|
|
||||||
($target:literal) => (|e__| {
|
|
||||||
trace!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}]",
|
|
||||||
e__,
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
($target:literal, $text:literal) => (|e__| {
|
|
||||||
trace!(
|
|
||||||
target: $target,
|
|
||||||
"[{:?}] {}",
|
|
||||||
e__,
|
|
||||||
$text
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
});
|
|
||||||
($target:literal, $fmt:literal, $($arg:expr),+) => (|e__| {
|
|
||||||
trace!(
|
|
||||||
target: $target,
|
|
||||||
concat!("[{:?}] ", $fmt),
|
|
||||||
e__,
|
|
||||||
$($arg),+
|
|
||||||
);
|
|
||||||
e__
|
|
||||||
})
|
|
||||||
}
|
|
@ -115,7 +115,7 @@ impl ConnectionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn startup(&self) {
|
pub async fn startup(&self) {
|
||||||
trace!("startup connection manager");
|
log_net!(debug "startup connection manager");
|
||||||
let mut inner = self.arc.inner.lock();
|
let mut inner = self.arc.inner.lock();
|
||||||
if inner.is_some() {
|
if inner.is_some() {
|
||||||
panic!("shouldn't start connection manager twice without shutting it down first");
|
panic!("shouldn't start connection manager twice without shutting it down first");
|
||||||
@ -135,7 +135,7 @@ impl ConnectionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(&self) {
|
pub async fn shutdown(&self) {
|
||||||
debug!("starting connection manager shutdown");
|
log_net!(debug "starting connection manager shutdown");
|
||||||
// Remove the inner from the lock
|
// Remove the inner from the lock
|
||||||
let mut inner = {
|
let mut inner = {
|
||||||
let mut inner_lock = self.arc.inner.lock();
|
let mut inner_lock = self.arc.inner.lock();
|
||||||
@ -148,16 +148,16 @@ impl ConnectionManager {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Stop all the connections and the async processor
|
// Stop all the connections and the async processor
|
||||||
debug!("stopping async processor task");
|
log_net!(debug "stopping async processor task");
|
||||||
drop(inner.stop_source.take());
|
drop(inner.stop_source.take());
|
||||||
let async_processor_jh = inner.async_processor_jh.take().unwrap();
|
let async_processor_jh = inner.async_processor_jh.take().unwrap();
|
||||||
// wait for the async processor to stop
|
// wait for the async processor to stop
|
||||||
debug!("waiting for async processor to stop");
|
log_net!(debug "waiting for async processor to stop");
|
||||||
async_processor_jh.await;
|
async_processor_jh.await;
|
||||||
// Wait for the connections to complete
|
// Wait for the connections to complete
|
||||||
debug!("waiting for connection handlers to complete");
|
log_net!(debug "waiting for connection handlers to complete");
|
||||||
self.arc.connection_table.join().await;
|
self.arc.connection_table.join().await;
|
||||||
debug!("finished connection manager shutdown");
|
log_net!(debug "finished connection manager shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Internal routine to see if we should keep this connection
|
// Internal routine to see if we should keep this connection
|
||||||
|
@ -99,7 +99,7 @@ impl ConnectionTable {
|
|||||||
let unord = FuturesUnordered::new();
|
let unord = FuturesUnordered::new();
|
||||||
for table in &mut inner.conn_by_id {
|
for table in &mut inner.conn_by_id {
|
||||||
for (_, mut v) in table.drain() {
|
for (_, mut v) in table.drain() {
|
||||||
trace!("connection table join: {:?}", v);
|
log_net!("connection table join: {:?}", v);
|
||||||
v.close();
|
v.close();
|
||||||
unord.push(v);
|
unord.push(v);
|
||||||
}
|
}
|
||||||
|
@ -363,9 +363,8 @@ impl NetworkManager {
|
|||||||
|
|
||||||
#[instrument(level = "debug", skip_all, err)]
|
#[instrument(level = "debug", skip_all, err)]
|
||||||
pub async fn internal_startup(&self) -> EyreResult<()> {
|
pub async fn internal_startup(&self) -> EyreResult<()> {
|
||||||
trace!("NetworkManager::internal_startup begin");
|
|
||||||
if self.unlocked_inner.components.read().is_some() {
|
if self.unlocked_inner.components.read().is_some() {
|
||||||
debug!("NetworkManager::internal_startup already started");
|
log_net!(debug "NetworkManager::internal_startup already started");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -402,7 +401,7 @@ impl NetworkManager {
|
|||||||
rpc_processor.startup().await?;
|
rpc_processor.startup().await?;
|
||||||
receipt_manager.startup().await?;
|
receipt_manager.startup().await?;
|
||||||
|
|
||||||
trace!("NetworkManager::internal_startup end");
|
log_net!("NetworkManager::internal_startup end");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -422,13 +421,13 @@ impl NetworkManager {
|
|||||||
|
|
||||||
#[instrument(level = "debug", skip_all)]
|
#[instrument(level = "debug", skip_all)]
|
||||||
pub async fn shutdown(&self) {
|
pub async fn shutdown(&self) {
|
||||||
debug!("starting network manager shutdown");
|
log_net!(debug "starting network manager shutdown");
|
||||||
|
|
||||||
// Cancel all tasks
|
// Cancel all tasks
|
||||||
self.cancel_tasks().await;
|
self.cancel_tasks().await;
|
||||||
|
|
||||||
// Shutdown network components if they started up
|
// Shutdown network components if they started up
|
||||||
debug!("shutting down network components");
|
log_net!(debug "shutting down network components");
|
||||||
|
|
||||||
let components = self.unlocked_inner.components.read().clone();
|
let components = self.unlocked_inner.components.read().clone();
|
||||||
if let Some(components) = components {
|
if let Some(components) = components {
|
||||||
@ -441,16 +440,16 @@ impl NetworkManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// reset the state
|
// reset the state
|
||||||
debug!("resetting network manager state");
|
log_net!(debug "resetting network manager state");
|
||||||
{
|
{
|
||||||
*self.inner.lock() = NetworkManager::new_inner();
|
*self.inner.lock() = NetworkManager::new_inner();
|
||||||
}
|
}
|
||||||
|
|
||||||
// send update
|
// send update
|
||||||
debug!("sending network state update to api clients");
|
log_net!(debug "sending network state update to api clients");
|
||||||
self.send_network_update();
|
self.send_network_update();
|
||||||
|
|
||||||
debug!("finished network manager shutdown");
|
log_net!(debug "finished network manager shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn update_client_allowlist(&self, client: TypedKey) {
|
pub fn update_client_allowlist(&self, client: TypedKey) {
|
||||||
@ -493,7 +492,7 @@ impl NetworkManager {
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
{
|
{
|
||||||
let (k, v) = inner.client_allowlist.remove_lru().unwrap();
|
let (k, v) = inner.client_allowlist.remove_lru().unwrap();
|
||||||
trace!(key=?k, value=?v, "purge_client_allowlist: remove_lru")
|
trace!(target: "net", key=?k, value=?v, "purge_client_allowlist: remove_lru")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -310,14 +310,16 @@ impl DiscoveryContext {
|
|||||||
|
|
||||||
// ask the node to send us a dial info validation receipt
|
// ask the node to send us a dial info validation receipt
|
||||||
|
|
||||||
rpc_processor
|
match rpc_processor
|
||||||
.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect)
|
.rpc_call_validate_dial_info(node_ref.clone(), dial_info, redirect)
|
||||||
.await
|
.await
|
||||||
.map_err(logthru_net!(
|
{
|
||||||
"failed to send validate_dial_info to {:?}",
|
Err(e) => {
|
||||||
node_ref
|
log_net!("failed to send validate_dial_info to {:?}: {}", node_ref, e);
|
||||||
))
|
false
|
||||||
.unwrap_or(false)
|
}
|
||||||
|
Ok(v) => v,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[instrument(level = "trace", skip(self), ret)]
|
#[instrument(level = "trace", skip(self), ret)]
|
||||||
|
@ -279,22 +279,22 @@ impl Network {
|
|||||||
fn load_server_config(&self) -> io::Result<ServerConfig> {
|
fn load_server_config(&self) -> io::Result<ServerConfig> {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
//
|
//
|
||||||
trace!(
|
log_net!(
|
||||||
"loading certificate from {}",
|
"loading certificate from {}",
|
||||||
c.network.tls.certificate_path
|
c.network.tls.certificate_path
|
||||||
);
|
);
|
||||||
let certs = Self::load_certs(&PathBuf::from(&c.network.tls.certificate_path))?;
|
let certs = Self::load_certs(&PathBuf::from(&c.network.tls.certificate_path))?;
|
||||||
trace!("loaded {} certificates", certs.len());
|
log_net!("loaded {} certificates", certs.len());
|
||||||
if certs.is_empty() {
|
if certs.is_empty() {
|
||||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Certificates at {} could not be loaded.\nEnsure it is in PEM format, beginning with '-----BEGIN CERTIFICATE-----'",c.network.tls.certificate_path)));
|
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Certificates at {} could not be loaded.\nEnsure it is in PEM format, beginning with '-----BEGIN CERTIFICATE-----'",c.network.tls.certificate_path)));
|
||||||
}
|
}
|
||||||
//
|
//
|
||||||
trace!(
|
log_net!(
|
||||||
"loading private key from {}",
|
"loading private key from {}",
|
||||||
c.network.tls.private_key_path
|
c.network.tls.private_key_path
|
||||||
);
|
);
|
||||||
let mut keys = Self::load_keys(&PathBuf::from(&c.network.tls.private_key_path))?;
|
let mut keys = Self::load_keys(&PathBuf::from(&c.network.tls.private_key_path))?;
|
||||||
trace!("loaded {} keys", keys.len());
|
log_net!("loaded {} keys", keys.len());
|
||||||
if keys.is_empty() {
|
if keys.is_empty() {
|
||||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Private key at {} could not be loaded.\nEnsure it is unencrypted and in RSA or PKCS8 format, beginning with '-----BEGIN RSA PRIVATE KEY-----' or '-----BEGIN PRIVATE KEY-----'",c.network.tls.private_key_path)));
|
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Private key at {} could not be loaded.\nEnsure it is unencrypted and in RSA or PKCS8 format, beginning with '-----BEGIN RSA PRIVATE KEY-----' or '-----BEGIN PRIVATE KEY-----'",c.network.tls.private_key_path)));
|
||||||
}
|
}
|
||||||
@ -915,12 +915,12 @@ impl Network {
|
|||||||
|
|
||||||
#[instrument(level = "debug", skip_all)]
|
#[instrument(level = "debug", skip_all)]
|
||||||
pub async fn shutdown(&self) {
|
pub async fn shutdown(&self) {
|
||||||
debug!("starting low level network shutdown");
|
log_net!(debug "starting low level network shutdown");
|
||||||
|
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
|
|
||||||
// Stop all tasks
|
// Stop all tasks
|
||||||
debug!("stopping update network class task");
|
log_net!(debug "stopping update network class task");
|
||||||
if let Err(e) = self.unlocked_inner.update_network_class_task.stop().await {
|
if let Err(e) = self.unlocked_inner.update_network_class_task.stop().await {
|
||||||
error!("update_network_class_task not cancelled: {}", e);
|
error!("update_network_class_task not cancelled: {}", e);
|
||||||
}
|
}
|
||||||
@ -930,17 +930,17 @@ impl Network {
|
|||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
// take the join handles out
|
// take the join handles out
|
||||||
for h in inner.join_handles.drain(..) {
|
for h in inner.join_handles.drain(..) {
|
||||||
trace!("joining: {:?}", h);
|
log_net!("joining: {:?}", h);
|
||||||
unord.push(h);
|
unord.push(h);
|
||||||
}
|
}
|
||||||
// Drop the stop
|
// Drop the stop
|
||||||
drop(inner.stop_source.take());
|
drop(inner.stop_source.take());
|
||||||
}
|
}
|
||||||
debug!("stopping {} low level network tasks", unord.len());
|
log_net!(debug "stopping {} low level network tasks", unord.len());
|
||||||
// Wait for everything to stop
|
// Wait for everything to stop
|
||||||
while unord.next().await.is_some() {}
|
while unord.next().await.is_some() {}
|
||||||
|
|
||||||
debug!("clearing dial info");
|
log_net!(debug "clearing dial info");
|
||||||
|
|
||||||
routing_table
|
routing_table
|
||||||
.edit_routing_domain(RoutingDomain::PublicInternet)
|
.edit_routing_domain(RoutingDomain::PublicInternet)
|
||||||
@ -961,7 +961,7 @@ impl Network {
|
|||||||
// Reset state including network class
|
// Reset state including network class
|
||||||
*self.inner.lock() = Self::new_inner();
|
*self.inner.lock() = Self::new_inner();
|
||||||
|
|
||||||
debug!("finished low level network shutdown");
|
log_net!(debug "finished low level network shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////
|
//////////////////////////////////////////
|
||||||
|
@ -15,7 +15,7 @@ impl Network {
|
|||||||
task_count = 1;
|
task_count = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!("task_count: {}", task_count);
|
log_net!("task_count: {}", task_count);
|
||||||
for _ in 0..task_count {
|
for _ in 0..task_count {
|
||||||
log_net!("Spawning UDP listener task");
|
log_net!("Spawning UDP listener task");
|
||||||
|
|
||||||
@ -103,7 +103,7 @@ impl Network {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("UDP listener task stopped");
|
log_net!("UDP listener task stopped");
|
||||||
});
|
});
|
||||||
////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
@ -39,9 +39,8 @@ impl RawUdpProtocolHandler {
|
|||||||
NetworkResult::Value(None) => {
|
NetworkResult::Value(None) => {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
#[cfg(feature = "network-result-extra")]
|
|
||||||
nres => {
|
nres => {
|
||||||
log_network_result!(
|
log_network_result!(debug
|
||||||
"UDP::recv_message insert_frame failed: {:?} <= size={} remote_addr={}",
|
"UDP::recv_message insert_frame failed: {:?} <= size={} remote_addr={}",
|
||||||
nres,
|
nres,
|
||||||
size,
|
size,
|
||||||
@ -49,10 +48,6 @@ impl RawUdpProtocolHandler {
|
|||||||
);
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
#[cfg(not(feature = "network-result-extra"))]
|
|
||||||
_ => {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check length of reassembled message (same for all protocols)
|
// Check length of reassembled message (same for all protocols)
|
||||||
|
@ -280,7 +280,7 @@ impl Network {
|
|||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
trace!("starting udp listeners");
|
log_net!("starting udp listeners");
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let (listen_address, public_address, detect_address_changes) = {
|
let (listen_address, public_address, detect_address_changes) = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -312,7 +312,7 @@ impl Network {
|
|||||||
let local_dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?;
|
let local_dial_info_list = self.create_udp_inbound_sockets(ip_addrs, udp_port).await?;
|
||||||
let mut static_public = false;
|
let mut static_public = false;
|
||||||
|
|
||||||
trace!("UDP: listener started on {:#?}", local_dial_info_list);
|
log_net!("UDP: listener started on {:#?}", local_dial_info_list);
|
||||||
|
|
||||||
// Register local dial info
|
// Register local dial info
|
||||||
for di in &local_dial_info_list {
|
for di in &local_dial_info_list {
|
||||||
@ -383,7 +383,7 @@ impl Network {
|
|||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
trace!("starting ws listeners");
|
log_net!("starting ws listeners");
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let (listen_address, url, path, detect_address_changes) = {
|
let (listen_address, url, path, detect_address_changes) = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -415,7 +415,7 @@ impl Network {
|
|||||||
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
trace!("WS: listener started on {:#?}", socket_addresses);
|
log_net!("WS: listener started on {:#?}", socket_addresses);
|
||||||
|
|
||||||
let mut static_public = false;
|
let mut static_public = false;
|
||||||
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
||||||
@ -493,7 +493,7 @@ impl Network {
|
|||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
trace!("starting wss listeners");
|
log_net!("starting wss listeners");
|
||||||
|
|
||||||
let (listen_address, url, detect_address_changes) = {
|
let (listen_address, url, detect_address_changes) = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -524,7 +524,7 @@ impl Network {
|
|||||||
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
trace!("WSS: listener started on {:#?}", socket_addresses);
|
log_net!("WSS: listener started on {:#?}", socket_addresses);
|
||||||
|
|
||||||
// NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS
|
// NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS
|
||||||
// If the hostname is specified, it is the public dialinfo via the URL. If no hostname
|
// If the hostname is specified, it is the public dialinfo via the URL. If no hostname
|
||||||
@ -586,7 +586,7 @@ impl Network {
|
|||||||
editor_public_internet: &mut RoutingDomainEditor,
|
editor_public_internet: &mut RoutingDomainEditor,
|
||||||
editor_local_network: &mut RoutingDomainEditor,
|
editor_local_network: &mut RoutingDomainEditor,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
trace!("starting tcp listeners");
|
log_net!("starting tcp listeners");
|
||||||
|
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let (listen_address, public_address, detect_address_changes) = {
|
let (listen_address, public_address, detect_address_changes) = {
|
||||||
@ -618,7 +618,7 @@ impl Network {
|
|||||||
Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))),
|
Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
trace!("TCP: listener started on {:#?}", socket_addresses);
|
log_net!("TCP: listener started on {:#?}", socket_addresses);
|
||||||
|
|
||||||
let mut static_public = false;
|
let mut static_public = false;
|
||||||
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
let mut registered_addresses: HashSet<IpAddr> = HashSet::new();
|
||||||
|
@ -185,9 +185,9 @@ impl ReceiptManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn startup(&self) -> EyreResult<()> {
|
pub async fn startup(&self) -> EyreResult<()> {
|
||||||
trace!("startup receipt manager");
|
log_net!(debug "startup receipt manager");
|
||||||
// Retrieve config
|
|
||||||
|
|
||||||
|
// Retrieve config
|
||||||
{
|
{
|
||||||
// let config = self.core().config();
|
// let config = self.core().config();
|
||||||
// let c = config.get();
|
// let c = config.get();
|
||||||
@ -296,7 +296,7 @@ impl ReceiptManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(&self) {
|
pub async fn shutdown(&self) {
|
||||||
debug!("starting receipt manager shutdown");
|
log_net!(debug "starting receipt manager shutdown");
|
||||||
let network_manager = self.network_manager();
|
let network_manager = self.network_manager();
|
||||||
|
|
||||||
// Stop all tasks
|
// Stop all tasks
|
||||||
@ -308,13 +308,13 @@ impl ReceiptManager {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Wait for everything to stop
|
// Wait for everything to stop
|
||||||
debug!("waiting for timeout task to stop");
|
log_net!(debug "waiting for timeout task to stop");
|
||||||
if timeout_task.join().await.is_err() {
|
if timeout_task.join().await.is_err() {
|
||||||
panic!("joining timeout task failed");
|
panic!("joining timeout task failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
*self.inner.lock() = Self::new_inner(network_manager);
|
*self.inner.lock() = Self::new_inner(network_manager);
|
||||||
debug!("finished receipt manager shutdown");
|
log_net!(debug "finished receipt manager shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
|
@ -64,7 +64,7 @@ impl NetworkManager {
|
|||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "verbose-tracing")]
|
#[cfg(feature = "verbose-tracing")]
|
||||||
debug!(
|
log_net!(debug
|
||||||
"ContactMethod: {:?} for {:?}",
|
"ContactMethod: {:?} for {:?}",
|
||||||
contact_method, destination_node_ref
|
contact_method, destination_node_ref
|
||||||
);
|
);
|
||||||
@ -304,7 +304,7 @@ impl NetworkManager {
|
|||||||
// First try to send data to the last socket we've seen this peer on
|
// First try to send data to the last socket we've seen this peer on
|
||||||
let data = if let Some(flow) = node_ref.last_flow() {
|
let data = if let Some(flow) = node_ref.last_flow() {
|
||||||
#[cfg(feature = "verbose-tracing")]
|
#[cfg(feature = "verbose-tracing")]
|
||||||
debug!(
|
log_net!(debug
|
||||||
"ExistingConnection: {:?} for {:?}",
|
"ExistingConnection: {:?} for {:?}",
|
||||||
flow, node_ref
|
flow, node_ref
|
||||||
);
|
);
|
||||||
|
@ -85,12 +85,12 @@ impl NetworkManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn cancel_tasks(&self) {
|
pub(crate) async fn cancel_tasks(&self) {
|
||||||
debug!("stopping rolling transfers task");
|
log_net!(debug "stopping rolling transfers task");
|
||||||
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
|
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
|
||||||
warn!("rolling_transfers_task not stopped: {}", e);
|
warn!("rolling_transfers_task not stopped: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("stopping routing table tasks");
|
log_net!(debug "stopping routing table tasks");
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
routing_table.cancel_tasks().await;
|
routing_table.cancel_tasks().await;
|
||||||
|
|
||||||
|
@ -46,8 +46,7 @@ impl NetworkManager {
|
|||||||
flow: Flow, // the flow used
|
flow: Flow, // the flow used
|
||||||
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
|
reporting_peer: NodeRef, // the peer's noderef reporting the socket address
|
||||||
) {
|
) {
|
||||||
#[cfg(feature = "network-result-extra")]
|
log_network_result!(debug "report_global_socket_address\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
|
||||||
debug!("report_global_socket_address\nsocket_address: {:#?}\nflow: {:#?}\nreporting_peer: {:#?}", socket_address, flow, reporting_peer);
|
|
||||||
|
|
||||||
// Ignore these reports if we are currently detecting public dial info
|
// Ignore these reports if we are currently detecting public dial info
|
||||||
let net = self.net();
|
let net = self.net();
|
||||||
@ -172,8 +171,7 @@ impl NetworkManager {
|
|||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
{
|
{
|
||||||
// Record the origin of the inconsistency
|
// Record the origin of the inconsistency
|
||||||
#[cfg(feature = "network-result-extra")]
|
log_network_result!(debug "inconsistency added from {:?}: reported {:?} with current_addresses = {:?}", reporting_ip_block, a, current_addresses);
|
||||||
debug!("inconsistency added from {:?}: reported {:?} with current_addresses = {:?}", reporting_ip_block, a, current_addresses);
|
|
||||||
|
|
||||||
inconsistencies.push(*reporting_ip_block);
|
inconsistencies.push(*reporting_ip_block);
|
||||||
}
|
}
|
||||||
@ -214,7 +212,7 @@ impl NetworkManager {
|
|||||||
|
|
||||||
// // debug code
|
// // debug code
|
||||||
// if inconsistent {
|
// if inconsistent {
|
||||||
// trace!("public_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
|
// log_net!("public_address_check_cache: {:#?}\ncurrent_addresses: {:#?}\ninconsistencies: {}", inner
|
||||||
// .public_address_check_cache, current_addresses, inconsistencies);
|
// .public_address_check_cache, current_addresses, inconsistencies);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
@ -334,6 +334,7 @@ impl Network {
|
|||||||
/////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
pub async fn startup(&self) -> EyreResult<()> {
|
pub async fn startup(&self) -> EyreResult<()> {
|
||||||
|
log_net!(debug "starting network");
|
||||||
// get protocol config
|
// get protocol config
|
||||||
let protocol_config = {
|
let protocol_config = {
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
@ -396,6 +397,7 @@ impl Network {
|
|||||||
editor_public_internet.commit(true).await;
|
editor_public_internet.commit(true).await;
|
||||||
|
|
||||||
self.inner.lock().network_started = true;
|
self.inner.lock().network_started = true;
|
||||||
|
log_net!(debug "network started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -412,7 +414,7 @@ impl Network {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(&self) {
|
pub async fn shutdown(&self) {
|
||||||
trace!("stopping network");
|
log_net!(debug "stopping network");
|
||||||
|
|
||||||
// Reset state
|
// Reset state
|
||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
@ -429,7 +431,7 @@ impl Network {
|
|||||||
// Cancels all async background tasks by dropping join handles
|
// Cancels all async background tasks by dropping join handles
|
||||||
*self.inner.lock() = Self::new_inner();
|
*self.inner.lock() = Self::new_inner();
|
||||||
|
|
||||||
trace!("network stopped");
|
log_net!(debug "network stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_preferred_local_address(&self, _dial_info: &DialInfo) -> Option<SocketAddr> {
|
pub fn get_preferred_local_address(&self, _dial_info: &DialInfo) -> Option<SocketAddr> {
|
||||||
|
@ -247,7 +247,7 @@ impl RoutingTable {
|
|||||||
|
|
||||||
/// Called to initialize the routing table after it is created
|
/// Called to initialize the routing table after it is created
|
||||||
pub async fn init(&self) -> EyreResult<()> {
|
pub async fn init(&self) -> EyreResult<()> {
|
||||||
debug!("starting routing table init");
|
log_rtab!(debug "starting routing table init");
|
||||||
|
|
||||||
// Set up routing buckets
|
// Set up routing buckets
|
||||||
{
|
{
|
||||||
@ -256,7 +256,7 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Load bucket entries from table db if possible
|
// Load bucket entries from table db if possible
|
||||||
debug!("loading routing table entries");
|
log_rtab!(debug "loading routing table entries");
|
||||||
if let Err(e) = self.load_buckets().await {
|
if let Err(e) = self.load_buckets().await {
|
||||||
log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e);
|
log_rtab!(debug "Error loading buckets from storage: {:#?}. Resetting.", e);
|
||||||
let mut inner = self.inner.write();
|
let mut inner = self.inner.write();
|
||||||
@ -264,7 +264,7 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Set up routespecstore
|
// Set up routespecstore
|
||||||
debug!("starting route spec store init");
|
log_rtab!(debug "starting route spec store init");
|
||||||
let route_spec_store = match RouteSpecStore::load(self.clone()).await {
|
let route_spec_store = match RouteSpecStore::load(self.clone()).await {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -272,7 +272,7 @@ impl RoutingTable {
|
|||||||
RouteSpecStore::new(self.clone())
|
RouteSpecStore::new(self.clone())
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
debug!("finished route spec store init");
|
log_rtab!(debug "finished route spec store init");
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut inner = self.inner.write();
|
let mut inner = self.inner.write();
|
||||||
@ -285,13 +285,13 @@ impl RoutingTable {
|
|||||||
.set_routing_table(Some(self.clone()))
|
.set_routing_table(Some(self.clone()))
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
debug!("finished routing table init");
|
log_rtab!(debug "finished routing table init");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Called to shut down the routing table
|
/// Called to shut down the routing table
|
||||||
pub async fn terminate(&self) {
|
pub async fn terminate(&self) {
|
||||||
debug!("starting routing table terminate");
|
log_rtab!(debug "starting routing table terminate");
|
||||||
|
|
||||||
// Stop storage manager from using us
|
// Stop storage manager from using us
|
||||||
self.network_manager
|
self.network_manager
|
||||||
@ -303,12 +303,12 @@ impl RoutingTable {
|
|||||||
self.cancel_tasks().await;
|
self.cancel_tasks().await;
|
||||||
|
|
||||||
// Load bucket entries from table db if possible
|
// Load bucket entries from table db if possible
|
||||||
debug!("saving routing table entries");
|
log_rtab!(debug "saving routing table entries");
|
||||||
if let Err(e) = self.save_buckets().await {
|
if let Err(e) = self.save_buckets().await {
|
||||||
error!("failed to save routing table entries: {}", e);
|
error!("failed to save routing table entries: {}", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("saving route spec store");
|
log_rtab!(debug "saving route spec store");
|
||||||
let rss = {
|
let rss = {
|
||||||
let mut inner = self.inner.write();
|
let mut inner = self.inner.write();
|
||||||
inner.route_spec_store.take()
|
inner.route_spec_store.take()
|
||||||
@ -318,12 +318,12 @@ impl RoutingTable {
|
|||||||
error!("couldn't save route spec store: {}", e);
|
error!("couldn't save route spec store: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
debug!("shutting down routing table");
|
log_rtab!(debug "shutting down routing table");
|
||||||
|
|
||||||
let mut inner = self.inner.write();
|
let mut inner = self.inner.write();
|
||||||
*inner = RoutingTableInner::new(self.unlocked_inner.clone());
|
*inner = RoutingTableInner::new(self.unlocked_inner.clone());
|
||||||
|
|
||||||
debug!("finished routing table terminate");
|
log_rtab!(debug "finished routing table terminate");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Serialize the routing table.
|
/// Serialize the routing table.
|
||||||
|
@ -194,31 +194,31 @@ impl RoutingTable {
|
|||||||
|
|
||||||
pub(crate) async fn cancel_tasks(&self) {
|
pub(crate) async fn cancel_tasks(&self) {
|
||||||
// Cancel all tasks being ticked
|
// Cancel all tasks being ticked
|
||||||
debug!("stopping rolling transfers task");
|
log_rtab!(debug "stopping rolling transfers task");
|
||||||
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
|
if let Err(e) = self.unlocked_inner.rolling_transfers_task.stop().await {
|
||||||
error!("rolling_transfers_task not stopped: {}", e);
|
error!("rolling_transfers_task not stopped: {}", e);
|
||||||
}
|
}
|
||||||
debug!("stopping kick buckets task");
|
log_rtab!(debug "stopping kick buckets task");
|
||||||
if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await {
|
if let Err(e) = self.unlocked_inner.kick_buckets_task.stop().await {
|
||||||
error!("kick_buckets_task not stopped: {}", e);
|
error!("kick_buckets_task not stopped: {}", e);
|
||||||
}
|
}
|
||||||
debug!("stopping bootstrap task");
|
log_rtab!(debug "stopping bootstrap task");
|
||||||
if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await {
|
if let Err(e) = self.unlocked_inner.bootstrap_task.stop().await {
|
||||||
error!("bootstrap_task not stopped: {}", e);
|
error!("bootstrap_task not stopped: {}", e);
|
||||||
}
|
}
|
||||||
debug!("stopping peer minimum refresh task");
|
log_rtab!(debug "stopping peer minimum refresh task");
|
||||||
if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await {
|
if let Err(e) = self.unlocked_inner.peer_minimum_refresh_task.stop().await {
|
||||||
error!("peer_minimum_refresh_task not stopped: {}", e);
|
error!("peer_minimum_refresh_task not stopped: {}", e);
|
||||||
}
|
}
|
||||||
debug!("stopping ping_validator task");
|
log_rtab!(debug "stopping ping_validator task");
|
||||||
if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await {
|
if let Err(e) = self.unlocked_inner.ping_validator_task.stop().await {
|
||||||
error!("ping_validator_task not stopped: {}", e);
|
error!("ping_validator_task not stopped: {}", e);
|
||||||
}
|
}
|
||||||
debug!("stopping relay management task");
|
log_rtab!(debug "stopping relay management task");
|
||||||
if let Err(e) = self.unlocked_inner.relay_management_task.stop().await {
|
if let Err(e) = self.unlocked_inner.relay_management_task.stop().await {
|
||||||
warn!("relay_management_task not stopped: {}", e);
|
warn!("relay_management_task not stopped: {}", e);
|
||||||
}
|
}
|
||||||
debug!("stopping private route management task");
|
log_rtab!(debug "stopping private route management task");
|
||||||
if let Err(e) = self
|
if let Err(e) = self
|
||||||
.unlocked_inner
|
.unlocked_inner
|
||||||
.private_route_management_task
|
.private_route_management_task
|
||||||
|
@ -105,9 +105,6 @@ impl RoutingTable {
|
|||||||
for relay_nr_filtered in relay_noderefs {
|
for relay_nr_filtered in relay_noderefs {
|
||||||
let rpc = rpc.clone();
|
let rpc = rpc.clone();
|
||||||
|
|
||||||
#[cfg(feature = "network-result-extra")]
|
|
||||||
log_rtab!(debug "--> Keepalive ping to {:?}", relay_nr_filtered);
|
|
||||||
#[cfg(not(feature = "network-result-extra"))]
|
|
||||||
log_rtab!("--> Keepalive ping to {:?}", relay_nr_filtered);
|
log_rtab!("--> Keepalive ping to {:?}", relay_nr_filtered);
|
||||||
|
|
||||||
futurequeue.push_back(
|
futurequeue.push_back(
|
||||||
|
@ -23,13 +23,13 @@ impl RoutingTable {
|
|||||||
let state = relay_node.state(cur_ts);
|
let state = relay_node.state(cur_ts);
|
||||||
// Relay node is dead or no longer needed
|
// Relay node is dead or no longer needed
|
||||||
if matches!(state, BucketEntryState::Dead) {
|
if matches!(state, BucketEntryState::Dead) {
|
||||||
debug!("Relay node died, dropping relay {}", relay_node);
|
log_rtab!(debug "Relay node died, dropping relay {}", relay_node);
|
||||||
editor.clear_relay_node();
|
editor.clear_relay_node();
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
// Relay node no longer can relay
|
// Relay node no longer can relay
|
||||||
else if relay_node.operate(|_rti, e| !relay_node_filter(e)) {
|
else if relay_node.operate(|_rti, e| !relay_node_filter(e)) {
|
||||||
debug!(
|
log_rtab!(debug
|
||||||
"Relay node can no longer relay, dropping relay {}",
|
"Relay node can no longer relay, dropping relay {}",
|
||||||
relay_node
|
relay_node
|
||||||
);
|
);
|
||||||
@ -38,7 +38,7 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
// Relay node is no longer required
|
// Relay node is no longer required
|
||||||
else if !own_node_info.requires_relay() {
|
else if !own_node_info.requires_relay() {
|
||||||
debug!(
|
log_rtab!(debug
|
||||||
"Relay node no longer required, dropping relay {}",
|
"Relay node no longer required, dropping relay {}",
|
||||||
relay_node
|
relay_node
|
||||||
);
|
);
|
||||||
@ -47,7 +47,7 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
// Should not have relay for invalid network class
|
// Should not have relay for invalid network class
|
||||||
else if !self.has_valid_network_class(RoutingDomain::PublicInternet) {
|
else if !self.has_valid_network_class(RoutingDomain::PublicInternet) {
|
||||||
debug!(
|
log_rtab!(debug
|
||||||
"Invalid network class does not get a relay, dropping relay {}",
|
"Invalid network class does not get a relay, dropping relay {}",
|
||||||
relay_node
|
relay_node
|
||||||
);
|
);
|
||||||
@ -75,7 +75,7 @@ impl RoutingTable {
|
|||||||
false,
|
false,
|
||||||
) {
|
) {
|
||||||
Ok(nr) => {
|
Ok(nr) => {
|
||||||
debug!("Outbound relay node selected: {}", nr);
|
log_rtab!(debug "Outbound relay node selected: {}", nr);
|
||||||
editor.set_relay_node(nr);
|
editor.set_relay_node(nr);
|
||||||
got_outbound_relay = true;
|
got_outbound_relay = true;
|
||||||
}
|
}
|
||||||
@ -84,13 +84,13 @@ impl RoutingTable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
debug!("Outbound relay desired but not available");
|
log_rtab!(debug "Outbound relay desired but not available");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !got_outbound_relay {
|
if !got_outbound_relay {
|
||||||
// Find a node in our routing table that is an acceptable inbound relay
|
// Find a node in our routing table that is an acceptable inbound relay
|
||||||
if let Some(nr) = self.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) {
|
if let Some(nr) = self.find_inbound_relay(RoutingDomain::PublicInternet, cur_ts) {
|
||||||
debug!("Inbound relay node selected: {}", nr);
|
log_rtab!(debug "Inbound relay node selected: {}", nr);
|
||||||
editor.set_relay_node(nr);
|
editor.set_relay_node(nr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -12,6 +12,7 @@ impl RPCAnswer {
|
|||||||
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
pub fn validate(&mut self, validate_context: &RPCValidateContext) -> Result<(), RPCError> {
|
||||||
self.detail.validate(validate_context)
|
self.detail.validate(validate_context)
|
||||||
}
|
}
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
self.detail.desc()
|
self.detail.desc()
|
||||||
}
|
}
|
||||||
@ -50,6 +51,7 @@ pub(in crate::rpc_processor) enum RPCAnswerDetail {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCAnswerDetail {
|
impl RPCAnswerDetail {
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
match self {
|
match self {
|
||||||
RPCAnswerDetail::StatusA(_) => "StatusA",
|
RPCAnswerDetail::StatusA(_) => "StatusA",
|
||||||
|
@ -8,6 +8,7 @@ pub(in crate::rpc_processor) enum RPCOperationKind {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCOperationKind {
|
impl RPCOperationKind {
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
match self {
|
match self {
|
||||||
RPCOperationKind::Question(q) => q.desc(),
|
RPCOperationKind::Question(q) => q.desc(),
|
||||||
|
@ -20,6 +20,7 @@ impl RPCQuestion {
|
|||||||
pub fn detail(&self) -> &RPCQuestionDetail {
|
pub fn detail(&self) -> &RPCQuestionDetail {
|
||||||
&self.detail
|
&self.detail
|
||||||
}
|
}
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
self.detail.desc()
|
self.detail.desc()
|
||||||
}
|
}
|
||||||
@ -62,6 +63,7 @@ pub(in crate::rpc_processor) enum RPCQuestionDetail {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCQuestionDetail {
|
impl RPCQuestionDetail {
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
match self {
|
match self {
|
||||||
RPCQuestionDetail::StatusQ(_) => "StatusQ",
|
RPCQuestionDetail::StatusQ(_) => "StatusQ",
|
||||||
|
@ -15,6 +15,7 @@ impl RPCStatement {
|
|||||||
pub fn detail(&self) -> &RPCStatementDetail {
|
pub fn detail(&self) -> &RPCStatementDetail {
|
||||||
&self.detail
|
&self.detail
|
||||||
}
|
}
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
self.detail.desc()
|
self.detail.desc()
|
||||||
}
|
}
|
||||||
@ -43,6 +44,7 @@ pub(in crate::rpc_processor) enum RPCStatementDetail {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RPCStatementDetail {
|
impl RPCStatementDetail {
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
pub fn desc(&self) -> &'static str {
|
pub fn desc(&self) -> &'static str {
|
||||||
match self {
|
match self {
|
||||||
RPCStatementDetail::ValidateDialInfo(_) => "ValidateDialInfo",
|
RPCStatementDetail::ValidateDialInfo(_) => "ValidateDialInfo",
|
||||||
|
@ -158,8 +158,7 @@ where
|
|||||||
#[allow(unused_variables)]
|
#[allow(unused_variables)]
|
||||||
Ok(x) => {
|
Ok(x) => {
|
||||||
// Call failed, node will not be considered again
|
// Call failed, node will not be considered again
|
||||||
#[cfg(feature = "network-result-extra")]
|
log_network_result!(debug "Fanout result {}: {:?}", &next_node, x);
|
||||||
log_rpc!(debug "Fanout result {}: {:?}", &next_node, x);
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Error happened, abort everything and return the error
|
// Error happened, abort everything and return the error
|
||||||
|
@ -168,14 +168,14 @@ pub(crate) struct RPCMessage {
|
|||||||
opt_sender_nr: Option<NodeRef>,
|
opt_sender_nr: Option<NodeRef>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[instrument(skip_all, err)]
|
||||||
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
|
pub fn builder_to_vec<'a, T>(builder: capnp::message::Builder<T>) -> Result<Vec<u8>, RPCError>
|
||||||
where
|
where
|
||||||
T: capnp::message::Allocator + 'a,
|
T: capnp::message::Allocator + 'a,
|
||||||
{
|
{
|
||||||
let mut buffer = vec![];
|
let mut buffer = vec![];
|
||||||
capnp::serialize_packed::write_message(&mut buffer, &builder)
|
capnp::serialize_packed::write_message(&mut buffer, &builder)
|
||||||
.map_err(RPCError::protocol)
|
.map_err(RPCError::protocol)?;
|
||||||
.map_err(logthru_rpc!())?;
|
|
||||||
Ok(buffer)
|
Ok(buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1408,6 +1408,7 @@ impl RPCProcessor {
|
|||||||
/// Decoding RPC from the wire
|
/// Decoding RPC from the wire
|
||||||
/// This performs a capnp decode on the data, and if it passes the capnp schema
|
/// This performs a capnp decode on the data, and if it passes the capnp schema
|
||||||
/// it performs the cryptographic validation required to pass the operation up for processing
|
/// it performs the cryptographic validation required to pass the operation up for processing
|
||||||
|
#[instrument(skip_all, err)]
|
||||||
fn decode_rpc_operation(
|
fn decode_rpc_operation(
|
||||||
&self,
|
&self,
|
||||||
encoded_msg: &RPCMessageEncoded,
|
encoded_msg: &RPCMessageEncoded,
|
||||||
@ -1415,8 +1416,7 @@ impl RPCProcessor {
|
|||||||
let reader = encoded_msg.data.get_reader()?;
|
let reader = encoded_msg.data.get_reader()?;
|
||||||
let op_reader = reader
|
let op_reader = reader
|
||||||
.get_root::<veilid_capnp::operation::Reader>()
|
.get_root::<veilid_capnp::operation::Reader>()
|
||||||
.map_err(RPCError::protocol)
|
.map_err(RPCError::protocol)?;
|
||||||
.map_err(logthru_rpc!())?;
|
|
||||||
let mut operation = RPCOperation::decode(&op_reader)?;
|
let mut operation = RPCOperation::decode(&op_reader)?;
|
||||||
|
|
||||||
// Validate the RPC message
|
// Validate the RPC message
|
||||||
@ -1568,30 +1568,36 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Process stats for questions/statements received
|
// Process stats for questions/statements received
|
||||||
let kind = match msg.operation.kind() {
|
match msg.operation.kind() {
|
||||||
RPCOperationKind::Question(_) => {
|
RPCOperationKind::Question(_) => {
|
||||||
self.record_question_received(&msg);
|
self.record_question_received(&msg);
|
||||||
|
|
||||||
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
|
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
|
||||||
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
|
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
|
||||||
}
|
}
|
||||||
"question"
|
|
||||||
|
// Log rpc receive
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
|
debug!(target: "rpc_message", dir = "recv", kind = "question", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header);
|
||||||
}
|
}
|
||||||
RPCOperationKind::Statement(_) => {
|
RPCOperationKind::Statement(_) => {
|
||||||
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
|
if let Some(sender_nr) = msg.opt_sender_nr.clone() {
|
||||||
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
|
sender_nr.stats_question_rcvd(msg.header.timestamp, msg.header.body_len);
|
||||||
}
|
}
|
||||||
"statement"
|
|
||||||
|
// Log rpc receive
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
|
debug!(target: "rpc_message", dir = "recv", kind = "statement", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header);
|
||||||
}
|
}
|
||||||
RPCOperationKind::Answer(_) => {
|
RPCOperationKind::Answer(_) => {
|
||||||
// Answer stats are processed in wait_for_reply
|
// Answer stats are processed in wait_for_reply
|
||||||
"answer"
|
|
||||||
|
// Log rpc receive
|
||||||
|
#[cfg(feature = "verbose-tracing")]
|
||||||
|
debug!(target: "rpc_message", dir = "recv", kind = "answer", op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Log rpc receive
|
|
||||||
trace!(target: "rpc_message", dir = "recv", kind, op_id = msg.operation.op_id().as_u64(), desc = msg.operation.kind().desc(), header = ?msg.header);
|
|
||||||
|
|
||||||
// Process specific message kind
|
// Process specific message kind
|
||||||
match msg.operation.kind() {
|
match msg.operation.kind() {
|
||||||
RPCOperationKind::Question(q) => match q.detail() {
|
RPCOperationKind::Question(q) => match q.detail() {
|
||||||
|
@ -74,8 +74,7 @@ impl RPCProcessor {
|
|||||||
vcrypto: vcrypto.clone(),
|
vcrypto: vcrypto.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
#[cfg(feature="debug-dht")]
|
log_dht!(debug "{}", debug_string);
|
||||||
log_rpc!(debug "{}", debug_string);
|
|
||||||
|
|
||||||
let waitable_reply = network_result_try!(
|
let waitable_reply = network_result_try!(
|
||||||
self.question(dest.clone(), question, Some(question_context))
|
self.question(dest.clone(), question, Some(question_context))
|
||||||
@ -102,8 +101,7 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let (value, peers, descriptor) = get_value_a.destructure();
|
let (value, peers, descriptor) = get_value_a.destructure();
|
||||||
#[cfg(feature="debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string_value = value.as_ref().map(|v| {
|
let debug_string_value = value.as_ref().map(|v| {
|
||||||
format!(" len={} seq={} writer={}",
|
format!(" len={} seq={} writer={}",
|
||||||
v.value_data().data().len(),
|
v.value_data().data().len(),
|
||||||
@ -126,10 +124,10 @@ impl RPCProcessor {
|
|||||||
dest
|
dest
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
|
|
||||||
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
|
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
|
||||||
log_rpc!(debug "Peers: {:#?}", peer_ids);
|
log_dht!(debug "Peers: {:#?}", peer_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
||||||
@ -215,8 +213,7 @@ impl RPCProcessor {
|
|||||||
let routing_table = self.routing_table();
|
let routing_table = self.routing_table();
|
||||||
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT, CAP_DHT_WATCH]));
|
let closer_to_key_peers = network_result_try!(routing_table.find_preferred_peers_closer_to_key(key, vec![CAP_DHT, CAP_DHT_WATCH]));
|
||||||
|
|
||||||
#[cfg(feature="debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string = format!(
|
let debug_string = format!(
|
||||||
"IN <=== GetValueQ({} #{}{}) <== {}",
|
"IN <=== GetValueQ({} #{}{}) <== {}",
|
||||||
key,
|
key,
|
||||||
@ -229,7 +226,7 @@ impl RPCProcessor {
|
|||||||
msg.header.direct_sender_node_id()
|
msg.header.direct_sender_node_id()
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string);
|
log_dht!(debug "{}", debug_string);
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if we would have accepted this as a set
|
// See if we would have accepted this as a set
|
||||||
@ -252,8 +249,7 @@ impl RPCProcessor {
|
|||||||
(subkey_result.value, subkey_result.descriptor)
|
(subkey_result.value, subkey_result.descriptor)
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature="debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string_value = subkey_result_value.as_ref().map(|v| {
|
let debug_string_value = subkey_result_value.as_ref().map(|v| {
|
||||||
format!(" len={} seq={} writer={}",
|
format!(" len={} seq={} writer={}",
|
||||||
v.value_data().data().len(),
|
v.value_data().data().len(),
|
||||||
@ -276,7 +272,7 @@ impl RPCProcessor {
|
|||||||
msg.header.direct_sender_node_id()
|
msg.header.direct_sender_node_id()
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make GetValue answer
|
// Make GetValue answer
|
||||||
|
@ -88,8 +88,9 @@ impl RPCProcessor {
|
|||||||
vcrypto: vcrypto.clone(),
|
vcrypto: vcrypto.clone(),
|
||||||
});
|
});
|
||||||
|
|
||||||
#[cfg(feature="debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
log_rpc!(debug "{}", debug_string);
|
log_dht!(debug "{}", debug_string);
|
||||||
|
}
|
||||||
|
|
||||||
let waitable_reply = network_result_try!(
|
let waitable_reply = network_result_try!(
|
||||||
self.question(dest.clone(), question, Some(question_context))
|
self.question(dest.clone(), question, Some(question_context))
|
||||||
@ -117,8 +118,7 @@ impl RPCProcessor {
|
|||||||
|
|
||||||
let (set, value, peers) = set_value_a.destructure();
|
let (set, value, peers) = set_value_a.destructure();
|
||||||
|
|
||||||
#[cfg(feature="debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string_value = value.as_ref().map(|v| {
|
let debug_string_value = value.as_ref().map(|v| {
|
||||||
format!(" len={} writer={}",
|
format!(" len={} writer={}",
|
||||||
v.value_data().data().len(),
|
v.value_data().data().len(),
|
||||||
@ -141,10 +141,10 @@ impl RPCProcessor {
|
|||||||
dest,
|
dest,
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
|
|
||||||
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
|
let peer_ids:Vec<String> = peers.iter().filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string())).collect();
|
||||||
log_rpc!(debug "Peers: {:#?}", peer_ids);
|
log_dht!(debug "Peers: {:#?}", peer_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
// Validate peers returned are, in fact, closer to the key than the node we sent this to
|
||||||
@ -272,8 +272,7 @@ impl RPCProcessor {
|
|||||||
(true, new_value)
|
(true, new_value)
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature="debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string_value = new_value.as_ref().map(|v| {
|
let debug_string_value = new_value.as_ref().map(|v| {
|
||||||
format!(" len={} seq={} writer={}",
|
format!(" len={} seq={} writer={}",
|
||||||
v.value_data().data().len(),
|
v.value_data().data().len(),
|
||||||
@ -296,7 +295,7 @@ impl RPCProcessor {
|
|||||||
msg.header.direct_sender_node_id()
|
msg.header.direct_sender_node_id()
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make SetValue answer
|
// Make SetValue answer
|
||||||
|
@ -62,8 +62,7 @@ impl RPCProcessor {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string_value = format!(
|
let debug_string_value = format!(
|
||||||
" len={} seq={} writer={}",
|
" len={} seq={} writer={}",
|
||||||
value.value_data().data().len(),
|
value.value_data().data().len(),
|
||||||
@ -82,7 +81,7 @@ impl RPCProcessor {
|
|||||||
msg.header.direct_sender_node_id(),
|
msg.header.direct_sender_node_id(),
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_stmt);
|
log_dht!(debug "{}", debug_string_stmt);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save the subkey, creating a new record if necessary
|
// Save the subkey, creating a new record if necessary
|
||||||
|
@ -81,8 +81,7 @@ impl RPCProcessor {
|
|||||||
RPCQuestionDetail::WatchValueQ(Box::new(watch_value_q)),
|
RPCQuestionDetail::WatchValueQ(Box::new(watch_value_q)),
|
||||||
);
|
);
|
||||||
|
|
||||||
#[cfg(feature = "debug-dht")]
|
log_dht!(debug "{}", debug_string);
|
||||||
log_rpc!(debug "{}", debug_string);
|
|
||||||
|
|
||||||
let waitable_reply =
|
let waitable_reply =
|
||||||
network_result_try!(self.question(dest.clone(), question, None).await?);
|
network_result_try!(self.question(dest.clone(), question, None).await?);
|
||||||
@ -107,8 +106,7 @@ impl RPCProcessor {
|
|||||||
};
|
};
|
||||||
let question_watch_id = watch_id;
|
let question_watch_id = watch_id;
|
||||||
let (accepted, expiration, peers, watch_id) = watch_value_a.destructure();
|
let (accepted, expiration, peers, watch_id) = watch_value_a.destructure();
|
||||||
#[cfg(feature = "debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string_answer = format!(
|
let debug_string_answer = format!(
|
||||||
"OUT <== WatchValueA({}id={} {} #{:?}@{} peers={}) <= {}",
|
"OUT <== WatchValueA({}id={} {} #{:?}@{} peers={}) <= {}",
|
||||||
if accepted { "+accept " } else { "" },
|
if accepted { "+accept " } else { "" },
|
||||||
@ -120,13 +118,13 @@ impl RPCProcessor {
|
|||||||
dest
|
dest
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
|
|
||||||
let peer_ids: Vec<String> = peers
|
let peer_ids: Vec<String> = peers
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
|
.filter_map(|p| p.node_ids().get(key.kind).map(|k| k.to_string()))
|
||||||
.collect();
|
.collect();
|
||||||
log_rpc!(debug "Peers: {:#?}", peer_ids);
|
log_dht!(debug "Peers: {:#?}", peer_ids);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate accepted requests
|
// Validate accepted requests
|
||||||
@ -232,8 +230,7 @@ impl RPCProcessor {
|
|||||||
let dest = network_result_try!(self.get_respond_to_destination(&msg));
|
let dest = network_result_try!(self.get_respond_to_destination(&msg));
|
||||||
let target = dest.get_target(rss)?;
|
let target = dest.get_target(rss)?;
|
||||||
|
|
||||||
#[cfg(feature = "debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string = format!(
|
let debug_string = format!(
|
||||||
"IN <=== WatchValueQ({}{} {}@{}+{}) <== {} (watcher={})",
|
"IN <=== WatchValueQ({}{} {}@{}+{}) <== {} (watcher={})",
|
||||||
if let Some(watch_id) = watch_id {
|
if let Some(watch_id) = watch_id {
|
||||||
@ -249,7 +246,7 @@ impl RPCProcessor {
|
|||||||
watcher
|
watcher
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string);
|
log_dht!(debug "{}", debug_string);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the nodes that we know about that are closer to the the key than our own node
|
// Get the nodes that we know about that are closer to the the key than our own node
|
||||||
@ -265,9 +262,7 @@ impl RPCProcessor {
|
|||||||
let (ret_accepted, ret_expiration, ret_watch_id) =
|
let (ret_accepted, ret_expiration, ret_watch_id) =
|
||||||
if closer_to_key_peers.len() >= set_value_count {
|
if closer_to_key_peers.len() >= set_value_count {
|
||||||
// Not close enough, not accepted
|
// Not close enough, not accepted
|
||||||
|
log_dht!(debug "Not close enough for watch value");
|
||||||
#[cfg(feature = "debug-dht")]
|
|
||||||
log_rpc!(debug "Not close enough for watch value");
|
|
||||||
|
|
||||||
(false, 0, watch_id.unwrap_or_default())
|
(false, 0, watch_id.unwrap_or_default())
|
||||||
} else {
|
} else {
|
||||||
@ -301,8 +296,7 @@ impl RPCProcessor {
|
|||||||
(true, ret_expiration, ret_watch_id)
|
(true, ret_expiration, ret_watch_id)
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(feature = "debug-dht")]
|
if debug_target_enabled!("dht") {
|
||||||
{
|
|
||||||
let debug_string_answer = format!(
|
let debug_string_answer = format!(
|
||||||
"IN ===> WatchValueA({}id={} {} #{} expiration={} peers={}) ==> {}",
|
"IN ===> WatchValueA({}id={} {} #{} expiration={} peers={}) ==> {}",
|
||||||
if ret_accepted { "+accept " } else { "" },
|
if ret_accepted { "+accept " } else { "" },
|
||||||
@ -314,7 +308,7 @@ impl RPCProcessor {
|
|||||||
msg.header.direct_sender_node_id()
|
msg.header.direct_sender_node_id()
|
||||||
);
|
);
|
||||||
|
|
||||||
log_rpc!(debug "{}", debug_string_answer);
|
log_dht!(debug "{}", debug_string_answer);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make WatchValue answer
|
// Make WatchValue answer
|
||||||
|
@ -142,8 +142,7 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return peers if we have some
|
// Return peers if we have some
|
||||||
#[cfg(feature = "network-result-extra")]
|
log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
|
||||||
log_stor!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len());
|
|
||||||
|
|
||||||
Ok(NetworkResult::value(gva.answer.peers))
|
Ok(NetworkResult::value(gva.answer.peers))
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ impl StorageManager {
|
|||||||
|
|
||||||
#[instrument(level = "debug", skip_all, err)]
|
#[instrument(level = "debug", skip_all, err)]
|
||||||
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
|
pub async fn init(&self, update_callback: UpdateCallback) -> EyreResult<()> {
|
||||||
debug!("startup storage manager");
|
log_stor!(debug "startup storage manager");
|
||||||
|
|
||||||
let mut inner = self.inner.lock().await;
|
let mut inner = self.inner.lock().await;
|
||||||
inner.init(self.clone(), update_callback).await?;
|
inner.init(self.clone(), update_callback).await?;
|
||||||
@ -132,7 +132,7 @@ impl StorageManager {
|
|||||||
|
|
||||||
#[instrument(level = "debug", skip_all)]
|
#[instrument(level = "debug", skip_all)]
|
||||||
pub async fn terminate(&self) {
|
pub async fn terminate(&self) {
|
||||||
debug!("starting storage manager shutdown");
|
log_stor!(debug "starting storage manager shutdown");
|
||||||
|
|
||||||
let mut inner = self.inner.lock().await;
|
let mut inner = self.inner.lock().await;
|
||||||
inner.terminate().await;
|
inner.terminate().await;
|
||||||
@ -143,7 +143,7 @@ impl StorageManager {
|
|||||||
// Release the storage manager
|
// Release the storage manager
|
||||||
*inner = Self::new_inner(self.unlocked_inner.clone());
|
*inner = Self::new_inner(self.unlocked_inner.clone());
|
||||||
|
|
||||||
debug!("finished storage manager shutdown");
|
log_stor!(debug "finished storage manager shutdown");
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn set_rpc_processor(&self, opt_rpc_processor: Option<RPCProcessor>) {
|
pub async fn set_rpc_processor(&self, opt_rpc_processor: Option<RPCProcessor>) {
|
||||||
|
@ -127,8 +127,7 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return peers if we have some
|
// Return peers if we have some
|
||||||
#[cfg(feature = "network-result-extra")]
|
log_network_result!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
|
||||||
log_stor!(debug "SetValue fanout call returned peers {}", sva.answer.peers.len());
|
|
||||||
|
|
||||||
Ok(NetworkResult::value(sva.answer.peers))
|
Ok(NetworkResult::value(sva.answer.peers))
|
||||||
}
|
}
|
||||||
|
@ -187,7 +187,7 @@ impl StorageManagerInner {
|
|||||||
Ok(v) => v.unwrap_or_default(),
|
Ok(v) => v.unwrap_or_default(),
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await {
|
if let Err(e) = metadata_db.delete(0, OFFLINE_SUBKEY_WRITES).await {
|
||||||
debug!("offline_subkey_writes format changed, clearing: {}", e);
|
log_stor!(debug "offline_subkey_writes format changed, clearing: {}", e);
|
||||||
}
|
}
|
||||||
Default::default()
|
Default::default()
|
||||||
}
|
}
|
||||||
|
@ -108,8 +108,7 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Return peers if we have some
|
// Return peers if we have some
|
||||||
#[cfg(feature = "network-result-extra")]
|
log_network_result!(debug "WatchValue fanout call returned peers {}", wva.answer.peers.len());
|
||||||
log_stor!(debug "WatchValue fanout call returned peers {}", wva.answer.peers.len());
|
|
||||||
|
|
||||||
Ok(NetworkResult::value(wva.answer.peers))
|
Ok(NetworkResult::value(wva.answer.peers))
|
||||||
}
|
}
|
||||||
|
@ -237,7 +237,7 @@ impl TableStore {
|
|||||||
) -> EyreResult<Vec<u8>> {
|
) -> EyreResult<Vec<u8>> {
|
||||||
// Check if we are to protect the key
|
// Check if we are to protect the key
|
||||||
if device_encryption_key_password.is_empty() {
|
if device_encryption_key_password.is_empty() {
|
||||||
debug!("no dek password");
|
log_tstore!(debug "no dek password");
|
||||||
// Return the unprotected key bytes
|
// Return the unprotected key bytes
|
||||||
let mut out = Vec::with_capacity(4 + SHARED_SECRET_LENGTH);
|
let mut out = Vec::with_capacity(4 + SHARED_SECRET_LENGTH);
|
||||||
out.extend_from_slice(&dek.kind.0);
|
out.extend_from_slice(&dek.kind.0);
|
||||||
@ -273,7 +273,7 @@ impl TableStore {
|
|||||||
.load_user_secret("device_encryption_key")
|
.load_user_secret("device_encryption_key")
|
||||||
.await?;
|
.await?;
|
||||||
let Some(dek_bytes) = dek_bytes else {
|
let Some(dek_bytes) = dek_bytes else {
|
||||||
debug!("no device encryption key");
|
log_tstore!(debug "no device encryption key");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -298,7 +298,7 @@ impl TableStore {
|
|||||||
.protected_store
|
.protected_store
|
||||||
.remove_user_secret("device_encryption_key")
|
.remove_user_secret("device_encryption_key")
|
||||||
.await?;
|
.await?;
|
||||||
debug!("removed device encryption key. existed: {}", existed);
|
log_tstore!(debug "removed device encryption key. existed: {}", existed);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -310,7 +310,7 @@ impl TableStore {
|
|||||||
let device_encryption_key_password =
|
let device_encryption_key_password =
|
||||||
if let Some(new_device_encryption_key_password) = new_device_encryption_key_password {
|
if let Some(new_device_encryption_key_password) = new_device_encryption_key_password {
|
||||||
// Change password
|
// Change password
|
||||||
debug!("changing dek password");
|
log_tstore!(debug "changing dek password");
|
||||||
self.config
|
self.config
|
||||||
.with_mut(|c| {
|
.with_mut(|c| {
|
||||||
c.protected_store.device_encryption_key_password =
|
c.protected_store.device_encryption_key_password =
|
||||||
@ -320,7 +320,7 @@ impl TableStore {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
} else {
|
} else {
|
||||||
// Get device encryption key protection password if we have it
|
// Get device encryption key protection password if we have it
|
||||||
debug!("saving with existing dek password");
|
log_tstore!(debug "saving with existing dek password");
|
||||||
let c = self.config.get();
|
let c = self.config.get();
|
||||||
c.protected_store.device_encryption_key_password.clone()
|
c.protected_store.device_encryption_key_password.clone()
|
||||||
};
|
};
|
||||||
@ -335,7 +335,7 @@ impl TableStore {
|
|||||||
.protected_store
|
.protected_store
|
||||||
.save_user_secret("device_encryption_key", &dek_bytes)
|
.save_user_secret("device_encryption_key", &dek_bytes)
|
||||||
.await?;
|
.await?;
|
||||||
debug!("saving device encryption key. existed: {}", existed);
|
log_tstore!(debug "saving device encryption key. existed: {}", existed);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,7 +393,7 @@ impl TableStore {
|
|||||||
},
|
},
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
// No table names yet, that's okay
|
// No table names yet, that's okay
|
||||||
trace!("__veilid_all_tables is empty");
|
log_tstore!("__veilid_all_tables is empty");
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("could not get __veilid_all_tables: {}", e);
|
error!("could not get __veilid_all_tables: {}", e);
|
||||||
@ -586,7 +586,7 @@ impl TableStore {
|
|||||||
apibail_not_initialized!();
|
apibail_not_initialized!();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!("TableStore::rename {} -> {}", old_name, new_name);
|
log_tstore!(debug "TableStore::rename {} -> {}", old_name, new_name);
|
||||||
self.name_rename(old_name, new_name).await?;
|
self.name_rename(old_name, new_name).await?;
|
||||||
self.flush().await;
|
self.flush().await;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -41,7 +41,7 @@ impl TableStoreDriver {
|
|||||||
// Ensure permissions are correct
|
// Ensure permissions are correct
|
||||||
ensure_file_private_owner(&dbpath).map_err(VeilidAPIError::internal)?;
|
ensure_file_private_owner(&dbpath).map_err(VeilidAPIError::internal)?;
|
||||||
|
|
||||||
trace!(
|
log_tstore!(
|
||||||
"opened table store '{}' at path '{:?}' with {} columns",
|
"opened table store '{}' at path '{:?}' with {} columns",
|
||||||
table_name,
|
table_name,
|
||||||
dbpath,
|
dbpath,
|
||||||
|
@ -15,7 +15,7 @@ impl TableStoreDriver {
|
|||||||
let db = Database::open(table_name, column_count, false)
|
let db = Database::open(table_name, column_count, false)
|
||||||
.await
|
.await
|
||||||
.map_err(VeilidAPIError::generic)?;
|
.map_err(VeilidAPIError::generic)?;
|
||||||
trace!(
|
log_tstore!(
|
||||||
"opened table store '{}' with {} columns",
|
"opened table store '{}' with {} columns",
|
||||||
table_name,
|
table_name,
|
||||||
column_count
|
column_count
|
||||||
@ -28,9 +28,9 @@ impl TableStoreDriver {
|
|||||||
if is_browser() {
|
if is_browser() {
|
||||||
let out = Database::delete(table_name).await.is_ok();
|
let out = Database::delete(table_name).await.is_ok();
|
||||||
if out {
|
if out {
|
||||||
trace!("TableStore::delete {} deleted", table_name);
|
log_tstore!("TableStore::delete {} deleted", table_name);
|
||||||
} else {
|
} else {
|
||||||
debug!("TableStore::delete {} not deleted", table_name);
|
log_tstore!(debug "TableStore::delete {} not deleted", table_name);
|
||||||
}
|
}
|
||||||
Ok(out)
|
Ok(out)
|
||||||
} else {
|
} else {
|
||||||
|
@ -1465,7 +1465,6 @@ impl VeilidAPI {
|
|||||||
let mut dc = DEBUG_CACHE.lock();
|
let mut dc = DEBUG_CACHE.lock();
|
||||||
dc.opened_record_contexts.insert(*record.key(), rc);
|
dc.opened_record_contexts.insert(*record.key(), rc);
|
||||||
|
|
||||||
debug!("DHT Record Created:\n{:#?}", record);
|
|
||||||
Ok(format!("Created: {:?} : {:?}", record.key(), record))
|
Ok(format!("Created: {:?} : {:?}", record.key(), record))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1167,29 +1167,29 @@ impl VeilidConfig {
|
|||||||
let table_key_node_id_secret = format!("node_id_secret_{}", ck);
|
let table_key_node_id_secret = format!("node_id_secret_{}", ck);
|
||||||
|
|
||||||
if node_id.is_none() {
|
if node_id.is_none() {
|
||||||
debug!("pulling {} from storage", table_key_node_id);
|
log_tstore!(debug "pulling {} from storage", table_key_node_id);
|
||||||
if let Ok(Some(stored_node_id)) = config_table
|
if let Ok(Some(stored_node_id)) = config_table
|
||||||
.load_json::<TypedKey>(0, table_key_node_id.as_bytes())
|
.load_json::<TypedKey>(0, table_key_node_id.as_bytes())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
debug!("{} found in storage", table_key_node_id);
|
log_tstore!(debug "{} found in storage", table_key_node_id);
|
||||||
node_id = Some(stored_node_id);
|
node_id = Some(stored_node_id);
|
||||||
} else {
|
} else {
|
||||||
debug!("{} not found in storage", table_key_node_id);
|
log_tstore!(debug "{} not found in storage", table_key_node_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// See if node id secret was previously stored in the protected store
|
// See if node id secret was previously stored in the protected store
|
||||||
if node_id_secret.is_none() {
|
if node_id_secret.is_none() {
|
||||||
debug!("pulling {} from storage", table_key_node_id_secret);
|
log_tstore!(debug "pulling {} from storage", table_key_node_id_secret);
|
||||||
if let Ok(Some(stored_node_id_secret)) = config_table
|
if let Ok(Some(stored_node_id_secret)) = config_table
|
||||||
.load_json::<TypedSecret>(0, table_key_node_id_secret.as_bytes())
|
.load_json::<TypedSecret>(0, table_key_node_id_secret.as_bytes())
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
debug!("{} found in storage", table_key_node_id_secret);
|
log_tstore!(debug "{} found in storage", table_key_node_id_secret);
|
||||||
node_id_secret = Some(stored_node_id_secret);
|
node_id_secret = Some(stored_node_id_secret);
|
||||||
} else {
|
} else {
|
||||||
debug!("{} not found in storage", table_key_node_id_secret);
|
log_tstore!(debug "{} not found in storage", table_key_node_id_secret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1206,7 +1206,7 @@ impl VeilidConfig {
|
|||||||
(node_id, node_id_secret)
|
(node_id, node_id_secret)
|
||||||
} else {
|
} else {
|
||||||
// If we still don't have a valid node id, generate one
|
// If we still don't have a valid node id, generate one
|
||||||
debug!("generating new node_id_{}", ck);
|
log_tstore!(debug "generating new node_id_{}", ck);
|
||||||
let kp = vcrypto.generate_keypair();
|
let kp = vcrypto.generate_keypair();
|
||||||
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
|
(TypedKey::new(ck, kp.key), TypedSecret::new(ck, kp.secret))
|
||||||
};
|
};
|
||||||
@ -1259,8 +1259,6 @@ impl VeilidConfig {
|
|||||||
Ok(())
|
Ok(())
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
trace!("init_node_ids complete");
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -37,8 +37,6 @@ rt-tokio = [
|
|||||||
"console-subscriber",
|
"console-subscriber",
|
||||||
]
|
]
|
||||||
tracking = ["veilid-core/tracking"]
|
tracking = ["veilid-core/tracking"]
|
||||||
network-result-extra = ["veilid-core/network-result-extra"]
|
|
||||||
verbose-tracing = ["veilid-core/verbose-tracing"]
|
|
||||||
debug-json-api = []
|
debug-json-api = []
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
@ -77,7 +77,7 @@ impl ClientApi {
|
|||||||
|
|
||||||
#[instrument(level = "trace", skip_all)]
|
#[instrument(level = "trace", skip_all)]
|
||||||
fn shutdown(&self) {
|
fn shutdown(&self) {
|
||||||
trace!("ClientApi::shutdown");
|
trace!(target: "client_api", "ClientApi::shutdown");
|
||||||
|
|
||||||
crate::server::shutdown();
|
crate::server::shutdown();
|
||||||
}
|
}
|
||||||
@ -87,14 +87,14 @@ impl ClientApi {
|
|||||||
layer: String,
|
layer: String,
|
||||||
log_level: VeilidConfigLogLevel,
|
log_level: VeilidConfigLogLevel,
|
||||||
) -> VeilidAPIResult<()> {
|
) -> VeilidAPIResult<()> {
|
||||||
trace!("ClientApi::change_log_level");
|
trace!(target: "client_api", "ClientApi::change_log_level");
|
||||||
|
|
||||||
let veilid_logs = self.inner.lock().veilid_logs.clone();
|
let veilid_logs = self.inner.lock().veilid_logs.clone();
|
||||||
veilid_logs.change_log_level(layer, log_level)
|
veilid_logs.change_log_level(layer, log_level)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn change_log_ignore(&self, layer: String, log_ignore: String) -> VeilidAPIResult<()> {
|
fn change_log_ignore(&self, layer: String, log_ignore: String) -> VeilidAPIResult<()> {
|
||||||
trace!("ClientApi::change_log_ignore");
|
trace!(target: "client_api", "ClientApi::change_log_ignore");
|
||||||
|
|
||||||
let veilid_logs = self.inner.lock().veilid_logs.clone();
|
let veilid_logs = self.inner.lock().veilid_logs.clone();
|
||||||
veilid_logs.change_log_ignore(layer, log_ignore)
|
veilid_logs.change_log_ignore(layer, log_ignore)
|
||||||
@ -102,19 +102,19 @@ impl ClientApi {
|
|||||||
|
|
||||||
#[instrument(level = "trace", skip(self))]
|
#[instrument(level = "trace", skip(self))]
|
||||||
pub async fn stop(&self) {
|
pub async fn stop(&self) {
|
||||||
trace!("ClientApi::stop requested");
|
trace!(target: "client_api", "ClientApi::stop requested");
|
||||||
let jh = {
|
let jh = {
|
||||||
let mut inner = self.inner.lock();
|
let mut inner = self.inner.lock();
|
||||||
if inner.join_handle.is_none() {
|
if inner.join_handle.is_none() {
|
||||||
trace!("ClientApi stop ignored");
|
trace!(target: "client_api", "ClientApi stop ignored");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
drop(inner.stop.take());
|
drop(inner.stop.take());
|
||||||
inner.join_handle.take().unwrap()
|
inner.join_handle.take().unwrap()
|
||||||
};
|
};
|
||||||
trace!("ClientApi::stop: waiting for stop");
|
trace!(target: "client_api", "ClientApi::stop: waiting for stop");
|
||||||
jh.await;
|
jh.await;
|
||||||
trace!("ClientApi::stop: stopped");
|
trace!(target: "client_api", "ClientApi::stop: stopped");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_ipc_incoming(self, ipc_path: PathBuf) -> std::io::Result<()> {
|
async fn handle_ipc_incoming(self, ipc_path: PathBuf) -> std::io::Result<()> {
|
||||||
@ -125,7 +125,7 @@ impl ClientApi {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
let mut listener = IpcListener::bind(ipc_path.clone()).await?;
|
let mut listener = IpcListener::bind(ipc_path.clone()).await?;
|
||||||
debug!("IPC Client API listening on: {:?}", ipc_path);
|
debug!(target: "client_api", "IPC Client API listening on: {:?}", ipc_path);
|
||||||
|
|
||||||
// Process the incoming accept stream
|
// Process the incoming accept stream
|
||||||
let mut incoming_stream = listener.incoming()?;
|
let mut incoming_stream = listener.incoming()?;
|
||||||
@ -156,7 +156,7 @@ impl ClientApi {
|
|||||||
|
|
||||||
async fn handle_tcp_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> {
|
async fn handle_tcp_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> {
|
||||||
let listener = TcpListener::bind(bind_addr).await?;
|
let listener = TcpListener::bind(bind_addr).await?;
|
||||||
debug!("TCPClient API listening on: {:?}", bind_addr);
|
debug!(target: "client_api", "TCPClient API listening on: {:?}", bind_addr);
|
||||||
|
|
||||||
// Process the incoming accept stream
|
// Process the incoming accept stream
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
|
@ -46,8 +46,6 @@ pub async fn run_veilid_server_internal(
|
|||||||
server_mode: ServerMode,
|
server_mode: ServerMode,
|
||||||
veilid_logs: VeilidLogs,
|
veilid_logs: VeilidLogs,
|
||||||
) -> EyreResult<()> {
|
) -> EyreResult<()> {
|
||||||
trace!(?settings, ?server_mode);
|
|
||||||
|
|
||||||
let (
|
let (
|
||||||
settings_auto_attach,
|
settings_auto_attach,
|
||||||
settings_client_api_ipc_enabled,
|
settings_client_api_ipc_enabled,
|
||||||
|
@ -34,8 +34,6 @@ rt-wasm-bindgen = ["async_executors/bindgen", "async_executors/timer"]
|
|||||||
veilid_tools_android_tests = ["dep:paranoid-android"]
|
veilid_tools_android_tests = ["dep:paranoid-android"]
|
||||||
veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
|
veilid_tools_ios_tests = ["dep:tracing", "dep:oslog", "dep:tracing-oslog"]
|
||||||
tracing = ["dep:tracing", "dep:tracing-subscriber"]
|
tracing = ["dep:tracing", "dep:tracing-subscriber"]
|
||||||
network-result-extra = []
|
|
||||||
network-result-info = []
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
tracing = { version = "0.1.40", features = [
|
tracing = { version = "0.1.40", features = [
|
||||||
|
@ -269,12 +269,12 @@ impl AssemblyBuffer {
|
|||||||
// If we receive a frame smaller than or equal to the length of the header, drop it
|
// If we receive a frame smaller than or equal to the length of the header, drop it
|
||||||
// or if this frame is larger than our max message length, then drop it
|
// or if this frame is larger than our max message length, then drop it
|
||||||
if frame.len() <= HEADER_LEN || frame.len() > MAX_LEN {
|
if frame.len() <= HEADER_LEN || frame.len() > MAX_LEN {
|
||||||
#[cfg(feature = "network-result-extra")]
|
if debug_target_enabled!("network_result") {
|
||||||
return NetworkResult::invalid_message(format!(
|
return NetworkResult::invalid_message(format!(
|
||||||
"invalid header length: frame.len={}",
|
"invalid header length: frame.len={}",
|
||||||
frame.len()
|
frame.len()
|
||||||
));
|
));
|
||||||
#[cfg(not(feature = "network-result-extra"))]
|
}
|
||||||
return NetworkResult::invalid_message("invalid header length");
|
return NetworkResult::invalid_message("invalid header length");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -282,12 +282,12 @@ impl AssemblyBuffer {
|
|||||||
|
|
||||||
// Drop versions we don't understand
|
// Drop versions we don't understand
|
||||||
if frame[0] != VERSION_1 {
|
if frame[0] != VERSION_1 {
|
||||||
#[cfg(feature = "network-result-extra")]
|
if debug_target_enabled!("network_result") {
|
||||||
return NetworkResult::invalid_message(format!(
|
return NetworkResult::invalid_message(format!(
|
||||||
"invalid frame version: frame[0]={}",
|
"invalid frame version: frame[0]={}",
|
||||||
frame[0]
|
frame[0]
|
||||||
));
|
));
|
||||||
#[cfg(not(feature = "network-result-extra"))]
|
}
|
||||||
return NetworkResult::invalid_message("invalid frame version");
|
return NetworkResult::invalid_message("invalid frame version");
|
||||||
}
|
}
|
||||||
// Version 1 header
|
// Version 1 header
|
||||||
@ -303,24 +303,24 @@ impl AssemblyBuffer {
|
|||||||
|
|
||||||
// Drop fragments with offsets greater than or equal to the message length
|
// Drop fragments with offsets greater than or equal to the message length
|
||||||
if off >= len {
|
if off >= len {
|
||||||
#[cfg(feature = "network-result-extra")]
|
if debug_target_enabled!("network_result") {
|
||||||
return NetworkResult::invalid_message(format!(
|
return NetworkResult::invalid_message(format!(
|
||||||
"offset greater than length: off={} >= len={}",
|
"offset greater than length: off={} >= len={}",
|
||||||
off, len
|
off, len
|
||||||
));
|
));
|
||||||
#[cfg(not(feature = "network-result-extra"))]
|
}
|
||||||
return NetworkResult::invalid_message("offset greater than length");
|
return NetworkResult::invalid_message("offset greater than length");
|
||||||
}
|
}
|
||||||
// Drop fragments where the chunk would be applied beyond the message length
|
// Drop fragments where the chunk would be applied beyond the message length
|
||||||
if off as usize + chunk.len() > len as usize {
|
if off as usize + chunk.len() > len as usize {
|
||||||
#[cfg(feature = "network-result-extra")]
|
if debug_target_enabled!("network_result") {
|
||||||
return NetworkResult::invalid_message(format!(
|
return NetworkResult::invalid_message(format!(
|
||||||
"chunk applied beyond message length: off={} + chunk.len={} > len={}",
|
"chunk applied beyond message length: off={} + chunk.len={} > len={}",
|
||||||
off,
|
off,
|
||||||
chunk.len(),
|
chunk.len(),
|
||||||
len
|
len
|
||||||
));
|
));
|
||||||
#[cfg(not(feature = "network-result-extra"))]
|
}
|
||||||
return NetworkResult::invalid_message("chunk applied beyond message length");
|
return NetworkResult::invalid_message("chunk applied beyond message length");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,7 +37,6 @@ pub mod interval;
|
|||||||
pub mod ip_addr_port;
|
pub mod ip_addr_port;
|
||||||
pub mod ip_extra;
|
pub mod ip_extra;
|
||||||
pub mod ipc;
|
pub mod ipc;
|
||||||
pub mod log_thru;
|
|
||||||
pub mod must_join_handle;
|
pub mod must_join_handle;
|
||||||
pub mod must_join_single_future;
|
pub mod must_join_single_future;
|
||||||
pub mod mutable_future;
|
pub mod mutable_future;
|
||||||
@ -179,8 +178,6 @@ pub use ip_extra::*;
|
|||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use ipc::*;
|
pub use ipc::*;
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use log_thru::*;
|
|
||||||
#[doc(inline)]
|
|
||||||
pub use must_join_handle::*;
|
pub use must_join_handle::*;
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use must_join_single_future::*;
|
pub use must_join_single_future::*;
|
||||||
@ -221,8 +218,16 @@ pub mod tests;
|
|||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(feature = "tracing")] {
|
if #[cfg(feature = "tracing")] {
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! debug_target_enabled {
|
||||||
|
($target:expr) => { enabled!(target: $target, Level::DEBUG) }
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
use log::*;
|
use log::*;
|
||||||
|
#[macro_export]
|
||||||
|
macro_rules! debug_target_enabled {
|
||||||
|
($target:expr) => { log_enabled!(target: $target, Level::Debug) }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
use cfg_if::*;
|
use cfg_if::*;
|
||||||
|
@ -491,7 +491,10 @@ impl PlatformSupportApple {
|
|||||||
) {
|
) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
log_net!(debug "failed to get address flags for ifname={}, ifaddr={:?} : {}", ifname, ifaddr.ifa_addr, e);
|
debug!(
|
||||||
|
"failed to get address flags for ifname={}, ifaddr={:?} : {}",
|
||||||
|
ifname, ifaddr.ifa_addr, e
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -329,41 +329,20 @@ macro_rules! network_result_try {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! log_network_result {
|
|
||||||
($text:expr) => {
|
|
||||||
cfg_if::cfg_if! {
|
|
||||||
if #[cfg(feature="network-result-extra")] {
|
|
||||||
info!(target: "network_result", "{}", format!("{}", $text))
|
|
||||||
} else {
|
|
||||||
debug!(target: "network_result", "{}", format!("{}", $text))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
($fmt:literal, $($arg:expr),+) => {
|
|
||||||
cfg_if::cfg_if! {
|
|
||||||
if #[cfg(feature="network-result-extra")] {
|
|
||||||
info!(target: "network_result", "{}", format!($fmt, $($arg),+));
|
|
||||||
} else {
|
|
||||||
debug!(target: "network_result", "{}", format!($fmt, $($arg),+));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! network_result_value_or_log {
|
macro_rules! network_result_value_or_log {
|
||||||
($r:expr => $f:expr) => {
|
($r:expr => $f:expr) => {
|
||||||
network_result_value_or_log!($r => [ "" ] $f )
|
network_result_value_or_log!($r => [ "" ] $f )
|
||||||
};
|
};
|
||||||
($r:expr => [ $d:expr ] $f:expr) => { {
|
($r:expr => [ $d:expr ] $f:expr) => { {
|
||||||
#[cfg(feature="network-result-extra")]
|
let __extra_message = if debug_target_enabled!("network_result") {
|
||||||
let __extra_message = $d;
|
$d.to_string()
|
||||||
#[cfg(not(feature="network-result-extra"))]
|
} else {
|
||||||
let __extra_message = "";
|
"".to_string()
|
||||||
|
};
|
||||||
match $r {
|
match $r {
|
||||||
NetworkResult::Timeout => {
|
NetworkResult::Timeout => {
|
||||||
log_network_result!(
|
log_network_result!(debug
|
||||||
"{} at {}@{}:{} in {}{}",
|
"{} at {}@{}:{} in {}{}",
|
||||||
"Timeout",
|
"Timeout",
|
||||||
file!(),
|
file!(),
|
||||||
@ -375,7 +354,7 @@ macro_rules! network_result_value_or_log {
|
|||||||
$f
|
$f
|
||||||
}
|
}
|
||||||
NetworkResult::ServiceUnavailable(ref s) => {
|
NetworkResult::ServiceUnavailable(ref s) => {
|
||||||
log_network_result!(
|
log_network_result!(debug
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
"ServiceUnavailable",
|
"ServiceUnavailable",
|
||||||
s,
|
s,
|
||||||
@ -388,7 +367,7 @@ macro_rules! network_result_value_or_log {
|
|||||||
$f
|
$f
|
||||||
}
|
}
|
||||||
NetworkResult::NoConnection(ref e) => {
|
NetworkResult::NoConnection(ref e) => {
|
||||||
log_network_result!(
|
log_network_result!(debug
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
"No connection",
|
"No connection",
|
||||||
e.to_string(),
|
e.to_string(),
|
||||||
@ -401,7 +380,7 @@ macro_rules! network_result_value_or_log {
|
|||||||
$f
|
$f
|
||||||
}
|
}
|
||||||
NetworkResult::AlreadyExists(ref e) => {
|
NetworkResult::AlreadyExists(ref e) => {
|
||||||
log_network_result!(
|
log_network_result!(debug
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
"Already exists",
|
"Already exists",
|
||||||
e.to_string(),
|
e.to_string(),
|
||||||
@ -414,7 +393,7 @@ macro_rules! network_result_value_or_log {
|
|||||||
$f
|
$f
|
||||||
}
|
}
|
||||||
NetworkResult::InvalidMessage(ref s) => {
|
NetworkResult::InvalidMessage(ref s) => {
|
||||||
log_network_result!(
|
log_network_result!(debug
|
||||||
"{}({}) at {}@{}:{} in {}{}",
|
"{}({}) at {}@{}:{} in {}{}",
|
||||||
"Invalid message",
|
"Invalid message",
|
||||||
s,
|
s,
|
||||||
|
@ -23,7 +23,7 @@ async fn make_tcp_loopback() -> Result<(TcpStream, TcpStream), io::Error> {
|
|||||||
|
|
||||||
let accept_future = async {
|
let accept_future = async {
|
||||||
let (accepted_stream, peer_address) = listener.accept().await?;
|
let (accepted_stream, peer_address) = listener.accept().await?;
|
||||||
trace!("connection from {}", peer_address);
|
trace!(target: "net", "connection from {}", peer_address);
|
||||||
accepted_stream.set_nodelay(true)?;
|
accepted_stream.set_nodelay(true)?;
|
||||||
Result::<TcpStream, io::Error>::Ok(accepted_stream)
|
Result::<TcpStream, io::Error>::Ok(accepted_stream)
|
||||||
};
|
};
|
||||||
|
@ -77,13 +77,13 @@ impl<E: Send + 'static> TickTask<E> {
|
|||||||
let opt_stop_source = &mut *self.stop_source.lock().await;
|
let opt_stop_source = &mut *self.stop_source.lock().await;
|
||||||
if opt_stop_source.is_none() {
|
if opt_stop_source.is_none() {
|
||||||
// already stopped, just return
|
// already stopped, just return
|
||||||
trace!("tick task already stopped");
|
trace!(target: "veilid_tools", "tick task already stopped");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
drop(opt_stop_source.take());
|
drop(opt_stop_source.take());
|
||||||
|
|
||||||
// wait for completion of the tick task
|
// wait for completion of the tick task
|
||||||
trace!("stopping single future");
|
trace!(target: "veilid_tools", "stopping single future");
|
||||||
match self.single_future.join().await {
|
match self.single_future.join().await {
|
||||||
Ok(Some(Err(err))) => Err(err),
|
Ok(Some(Err(err))) => Err(err),
|
||||||
_ => Ok(()),
|
_ => Ok(()),
|
||||||
|
@ -465,3 +465,7 @@ pub fn is_debug_backtrace_enabled() -> bool {
|
|||||||
pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
|
pub fn type_name_of_val<T: ?Sized>(_val: &T) -> &'static str {
|
||||||
std::any::type_name::<T>()
|
std::any::type_name::<T>()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn map_to_string<X: ToString>(arg: X) -> String {
|
||||||
|
arg.to_string()
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user