diff --git a/veilid-cli/Cargo.toml b/veilid-cli/Cargo.toml index 1a48797f..4f6c4124 100644 --- a/veilid-cli/Cargo.toml +++ b/veilid-cli/Cargo.toml @@ -22,12 +22,12 @@ rt-async-std = [ rt-tokio = ["tokio", "tokio-util", "veilid-tools/rt-tokio", "cursive/rt-tokio"] [dependencies] -async-std = { version = "^1.12", features = [ +async-std = { version = "1.12.0", features = [ "unstable", "attributes", ], optional = true } -tokio = { version = "^1", features = ["full"], optional = true } -tokio-util = { version = "^0", features = ["compat"], optional = true } +tokio = { version = "1.38.1", features = ["full", "tracing"], optional = true } +tokio-util = { version = "0.7.11", features = ["compat"], optional = true } async-tungstenite = { version = "^0.23" } cursive = { git = "https://gitlab.com/veilid/cursive.git", default-features = false, features = [ "crossterm", diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index ebcc40d9..160cd873 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -51,6 +51,7 @@ crypto-test = ["enable-crypto-vld0", "enable-crypto-none"] crypto-test-none = ["enable-crypto-none"] veilid_core_android_tests = ["dep:paranoid-android"] veilid_core_ios_tests = ["dep:tracing-oslog"] +debug-locks = ["veilid-tools/debug-locks"] ### DEPENDENCIES diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index eaea459a..2a501d15 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -211,7 +211,7 @@ impl AttachmentManager { } } - #[instrument(level = "debug", skip_all)] + #[instrument(parent = None, level = "debug", skip_all)] async fn attachment_maintainer(self) { log_net!(debug "attachment starting"); self.update_attaching_detaching_state(AttachmentState::Attaching); diff --git a/veilid-core/src/network_manager/address_filter.rs b/veilid-core/src/network_manager/address_filter.rs index 1e35f44b..5247026e 100644 --- a/veilid-core/src/network_manager/address_filter.rs +++ b/veilid-core/src/network_manager/address_filter.rs @@ -337,6 +337,7 @@ impl AddressFilter { .or_insert(punishment); } + #[instrument(parent = None, level = "trace", skip_all, err)] pub async fn address_filter_task_routine( self, _stop_token: StopToken, diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index ef69b170..ba956405 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -64,8 +64,6 @@ pub const PUBLIC_ADDRESS_INCONSISTENCY_PUNISHMENT_TIMEOUT_US: TimestampDuration pub const ADDRESS_FILTER_TASK_INTERVAL_SECS: u32 = 60; pub const BOOT_MAGIC: &[u8; 4] = b"BOOT"; -static FUCK: AtomicUsize = AtomicUsize::new(0); - #[derive(Clone, Debug, Default)] pub struct ProtocolConfig { pub outbound: ProtocolTypeSet, @@ -586,7 +584,7 @@ impl NetworkManager { } /// Generates a multi-shot/normal receipt - #[instrument(level = "trace", skip(self, extra_data, callback), err)] + #[instrument(level = "trace", skip(self, extra_data, callback))] pub fn generate_receipt>( &self, expiration_us: u64, @@ -626,7 +624,7 @@ impl NetworkManager { } /// Generates a single-shot/normal receipt - #[instrument(level = "trace", skip(self, extra_data), err)] + #[instrument(level = "trace", skip(self, extra_data))] pub fn generate_single_shot_receipt>( &self, expiration_us: u64, @@ -918,10 +916,8 @@ impl NetworkManager { destination_node_ref: Option, body: B, ) -> EyreResult> { - let _dg = DebugGuard::new(&FUCK); - let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { - bail!("network is not started"); + return Ok(NetworkResult::no_connection_other("network is not started")); }; let destination_node_ref = destination_node_ref.as_ref().unwrap_or(&node_ref).clone(); @@ -962,7 +958,8 @@ impl NetworkManager { rcpt_data: Vec, ) -> EyreResult<()> { let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { - bail!("network is not started"); + log_net!(debug "not sending out-of-band receipt to {} because network is stopped", dial_info); + return Ok(()); }; // Do we need to validate the outgoing receipt? Probably not diff --git a/veilid-core/src/network_manager/native/discovery_context.rs b/veilid-core/src/network_manager/native/discovery_context.rs index 9a5be337..69a3129e 100644 --- a/veilid-core/src/network_manager/native/discovery_context.rs +++ b/veilid-core/src/network_manager/native/discovery_context.rs @@ -262,7 +262,7 @@ impl DiscoveryContext { // Always process two at a time so we get both addresses in parallel if possible if unord.len() == 2 { // Process one - if let Some(Some(ei)) = unord.next().await { + if let Some(Some(ei)) = unord.next().in_current_span().await { external_address_infos.push(ei); if external_address_infos.len() == 2 { break; @@ -272,7 +272,7 @@ impl DiscoveryContext { } // Finish whatever is left if we need to if external_address_infos.len() < 2 { - while let Some(res) = unord.next().await { + while let Some(res) = unord.next().in_current_span().await { if let Some(ei) = res { external_address_infos.push(ei); if external_address_infos.len() == 2 { @@ -644,6 +644,7 @@ impl DiscoveryContext { } /// Add discovery futures to an unordered set that may detect dialinfo when they complete + #[instrument(level = "trace", skip(self))] pub async fn discover( &self, unord: &mut FuturesUnordered>>, @@ -681,7 +682,7 @@ impl DiscoveryContext { } }; if let Some(clear_network_callback) = some_clear_network_callback { - clear_network_callback().await; + clear_network_callback().in_current_span().await; } // UPNP Automatic Mapping diff --git a/veilid-core/src/network_manager/native/igd_manager.rs b/veilid-core/src/network_manager/native/igd_manager.rs index c867323a..bdda356c 100644 --- a/veilid-core/src/network_manager/native/igd_manager.rs +++ b/veilid-core/src/network_manager/native/igd_manager.rs @@ -310,7 +310,7 @@ impl IGDManager { .await } - #[instrument(level = "trace", target = "net", skip_all, err)] + #[instrument(level = "trace", target = "net", name = "IGDManager::tick", skip_all, err)] pub async fn tick(&self) -> EyreResult { // Refresh mappings if we have them // If an error is received, then return false to restart the local network @@ -434,6 +434,6 @@ impl IGDManager { // Normal exit, no restart Ok(true) - }, Err(eyre!("failed to process blocking task"))).in_current_span().await + }, Err(eyre!("failed to process blocking task"))).instrument(tracing::trace_span!("igd tick fut")).await } } diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 1e9a0400..44d6dcf4 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -518,7 +518,7 @@ impl Network { let mut out = vec![0u8; MAX_MESSAGE_SIZE]; let (recv_len, recv_addr) = network_result_try!(timeout( timeout_ms, - h.recv_message(&mut out).instrument(Span::current()) + h.recv_message(&mut out).in_current_span() ) .await .into_network_result()) @@ -569,7 +569,7 @@ impl Network { let out = network_result_try!(network_result_try!(timeout( timeout_ms, - pnc.recv() + pnc.recv().in_current_span() ) .await .into_network_result()) @@ -1063,7 +1063,7 @@ impl Network { Ok(()) } - #[instrument(level = "trace", target = "net", skip_all, err)] + #[instrument(parent = None, level = "trace", target = "net", skip_all, err)] async fn upnp_task_routine(self, _stop_token: StopToken, _l: u64, _t: u64) -> EyreResult<()> { if !self.unlocked_inner.igd_manager.tick().await? { info!("upnp failed, restarting local network"); @@ -1074,7 +1074,7 @@ impl Network { Ok(()) } - #[instrument(level = "trace", target = "net", skip_all, err)] + #[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)] pub(crate) async fn tick(&self) -> EyreResult<()> { let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { log_net!(debug "ignoring due to not started up"); diff --git a/veilid-core/src/network_manager/native/network_class_discovery.rs b/veilid-core/src/network_manager/native/network_class_discovery.rs index 2c33f205..bf6a9c16 100644 --- a/veilid-core/src/network_manager/native/network_class_discovery.rs +++ b/veilid-core/src/network_manager/native/network_class_discovery.rs @@ -200,7 +200,12 @@ impl Network { // Wait for all discovery futures to complete and apply discoverycontexts let mut all_address_types = AddressTypeSet::new(); loop { - match unord.next().timeout_at(stop_token.clone()).await { + match unord + .next() + .timeout_at(stop_token.clone()) + .in_current_span() + .await + { Ok(Some(Some(dr))) => { // Found some new dial info for this protocol/address combination self.update_with_detected_dial_info(dr.ddi.clone()).await?; @@ -277,7 +282,7 @@ impl Network { Ok(()) } - #[instrument(level = "trace", skip(self), err)] + #[instrument(parent = None, level = "trace", skip(self), err)] pub async fn update_network_class_task_routine( self, stop_token: StopToken, diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index 60d26a35..ca9c862e 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -38,6 +38,7 @@ impl Network { Ok(acceptor) } + #[instrument(level = "trace", skip_all)] async fn try_tls_handlers( &self, tls_acceptor: &TlsAcceptor, @@ -60,7 +61,7 @@ impl Network { // read a chunk of the stream timeout( tls_connection_initial_timeout_ms, - ps.peek_exact(&mut first_packet), + ps.peek_exact(&mut first_packet).in_current_span(), ) .await .wrap_err("tls initial timeout")? @@ -70,6 +71,7 @@ impl Network { .await } + #[instrument(level = "trace", skip_all)] async fn try_handlers( &self, stream: AsyncPeekStream, @@ -90,6 +92,7 @@ impl Network { Ok(None) } + #[instrument(level = "trace", skip_all)] async fn tcp_acceptor( self, tcp_stream: io::Result, @@ -180,7 +183,7 @@ impl Network { // read a chunk of the stream if timeout( connection_initial_timeout_ms, - ps.peek_exact(&mut first_packet), + ps.peek_exact(&mut first_packet).in_current_span(), ) .await .is_err() @@ -237,6 +240,7 @@ impl Network { } } + #[instrument(level = "trace", skip_all)] async fn spawn_socket_listener(&self, addr: SocketAddr) -> EyreResult { // Get config let (connection_initial_timeout_ms, tls_connection_initial_timeout_ms) = { @@ -344,6 +348,7 @@ impl Network { ///////////////////////////////////////////////////////////////// // TCP listener that multiplexes ports so multiple protocols can exist on a single port + #[instrument(level = "trace", skip_all)] pub(super) async fn start_tcp_listener( &self, bind_set: NetworkBindSet, diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index d4435245..b7caa29f 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -3,6 +3,7 @@ use sockets::*; use stop_token::future::FutureExt; impl Network { + #[instrument(level = "trace", skip_all)] pub(super) async fn create_udp_listener_tasks(&self) -> EyreResult<()> { // Spawn socket tasks let mut task_count = { @@ -108,6 +109,7 @@ impl Network { Ok(()) } + #[instrument(level = "trace", skip_all)] async fn create_udp_protocol_handler(&self, addr: SocketAddr) -> EyreResult { log_net!("create_udp_protocol_handler on {:?}", &addr); @@ -148,6 +150,7 @@ impl Network { Ok(true) } + #[instrument(level = "trace", skip_all)] pub(super) async fn create_udp_protocol_handlers( &self, bind_set: NetworkBindSet, diff --git a/veilid-core/src/network_manager/native/protocol/sockets.rs b/veilid-core/src/network_manager/native/protocol/sockets.rs index da100d6f..2ef43277 100644 --- a/veilid-core/src/network_manager/native/protocol/sockets.rs +++ b/veilid-core/src/network_manager/native/protocol/sockets.rs @@ -162,10 +162,12 @@ pub async fn nonblocking_connect( let async_stream = Async::new(std::net::TcpStream::from(socket))?; // The stream becomes writable when connected - timeout_or_try!(timeout(timeout_ms, async_stream.writable()) - .await - .into_timeout_or() - .into_result()?); + timeout_or_try!( + timeout(timeout_ms, async_stream.writable().in_current_span()) + .await + .into_timeout_or() + .into_result()? + ); // Check low level error let async_stream = match async_stream.get_ref().take_error()? { diff --git a/veilid-core/src/network_manager/native/protocol/tcp.rs b/veilid-core/src/network_manager/native/protocol/tcp.rs index 90ae9f95..0acbb97d 100644 --- a/veilid-core/src/network_manager/native/protocol/tcp.rs +++ b/veilid-core/src/network_manager/native/protocol/tcp.rs @@ -134,7 +134,7 @@ impl RawTcpProtocolHandler { let mut peekbuf: [u8; PEEK_DETECT_LEN] = [0u8; PEEK_DETECT_LEN]; if (timeout( self.connection_initial_timeout_ms, - ps.peek_exact(&mut peekbuf), + ps.peek_exact(&mut peekbuf).in_current_span(), ) .await) .is_err() diff --git a/veilid-core/src/network_manager/native/protocol/ws.rs b/veilid-core/src/network_manager/native/protocol/ws.rs index 36b17e8b..471fde5f 100644 --- a/veilid-core/src/network_manager/native/protocol/ws.rs +++ b/veilid-core/src/network_manager/native/protocol/ws.rs @@ -222,7 +222,7 @@ impl WebsocketProtocolHandler { let mut peek_buf = [0u8; MAX_WS_BEFORE_BODY]; let peek_len = match timeout( self.arc.connection_initial_timeout_ms, - ps.peek(&mut peek_buf), + ps.peek(&mut peek_buf).in_current_span(), ) .await { diff --git a/veilid-core/src/network_manager/native/start_protocols.rs b/veilid-core/src/network_manager/native/start_protocols.rs index fc037de5..082c7daf 100644 --- a/veilid-core/src/network_manager/native/start_protocols.rs +++ b/veilid-core/src/network_manager/native/start_protocols.rs @@ -84,6 +84,7 @@ impl Network { // Returns a port, a set of ip addresses to bind to, and a // bool specifying if multiple ports should be tried + #[instrument(level = "trace", skip_all)] async fn convert_listen_address_to_bind_set( &self, listen_address: String, @@ -136,6 +137,7 @@ impl Network { ///////////////////////////////////////////////////// + #[instrument(level = "trace", skip_all)] pub(super) async fn bind_udp_protocol_handlers( &self, editor_public_internet: &mut RoutingDomainEditor, @@ -249,6 +251,7 @@ impl Network { Ok(StartupDisposition::Success) } + #[instrument(level = "trace", skip_all)] pub(super) async fn start_ws_listeners( &self, editor_public_internet: &mut RoutingDomainEditor, @@ -364,6 +367,7 @@ impl Network { Ok(StartupDisposition::Success) } + #[instrument(level = "trace", skip_all)] pub(super) async fn start_wss_listeners( &self, editor_public_internet: &mut RoutingDomainEditor, @@ -463,6 +467,7 @@ impl Network { Ok(StartupDisposition::Success) } + #[instrument(level = "trace", skip_all)] pub(super) async fn start_tcp_listeners( &self, editor_public_internet: &mut RoutingDomainEditor, diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 18a73dc9..ac3f6873 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -136,7 +136,8 @@ impl NetworkConnection { let flow = protocol_connection.flow(); // Create handle for sending - let (sender, receiver) = flume::bounded(get_concurrency() as usize); + //let (sender, receiver) = flume::bounded(get_concurrency() as usize); + let (sender, receiver) = flume::unbounded(); // Create stats let stats = Arc::new(Mutex::new(NetworkConnectionStats { @@ -265,7 +266,7 @@ impl NetworkConnection { // Connection receiver loop #[allow(clippy::too_many_arguments)] - #[instrument(level="trace", target="net", skip_all)] + #[instrument(parent = None, level="trace", target="net", skip_all)] fn process_connection( connection_manager: ConnectionManager, local_stop_token: StopToken, @@ -299,7 +300,7 @@ impl NetworkConnection { }; let timer = MutableFuture::new(new_timer()); - unord.push(system_boxed(timer.clone())); + unord.push(system_boxed(timer.clone().in_current_span())); loop { // Add another message sender future if necessary @@ -333,7 +334,7 @@ impl NetworkConnection { RecvLoopAction::Finish } } - }); + }.in_current_span()); unord.push(system_boxed(sender_fut.in_current_span())); } diff --git a/veilid-core/src/network_manager/receipt_manager.rs b/veilid-core/src/network_manager/receipt_manager.rs index 45c9c38b..c0e53a9d 100644 --- a/veilid-core/src/network_manager/receipt_manager.rs +++ b/veilid-core/src/network_manager/receipt_manager.rs @@ -281,7 +281,13 @@ impl ReceiptManager { } } - #[instrument(level = "trace", target = "receipt", skip_all, err)] + #[instrument( + level = "trace", + target = "receipt", + name = "ReceiptManager::tick", + skip_all, + err + )] pub async fn tick(&self) -> EyreResult<()> { let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { return Ok(()); @@ -308,8 +314,9 @@ impl ReceiptManager { "receipt timeout", self.clone() .timeout_task_routine(now, stop_token) - .in_current_span(), + .instrument(trace_span!(parent: None, "receipt timeout task")), ) + .in_current_span() .await; } } diff --git a/veilid-core/src/network_manager/send_data.rs b/veilid-core/src/network_manager/send_data.rs index 503be4ab..722610a4 100644 --- a/veilid-core/src/network_manager/send_data.rs +++ b/veilid-core/src/network_manager/send_data.rs @@ -1,4 +1,5 @@ use super::*; +use stop_token::future::FutureExt as _; impl NetworkManager { /// Send raw data to a node @@ -146,7 +147,7 @@ impl NetworkManager { Ok(NetworkResult::value(send_data_method)) } - .instrument(trace_span!("send_data")), + .in_current_span() ) } @@ -559,6 +560,12 @@ impl NetworkManager { target_nr: NodeRef, data: Vec, ) -> EyreResult> { + + // Detect if network is stopping so we can break out of this + let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else { + return Ok(NetworkResult::service_unavailable("network is stopping")); + }; + // Build a return receipt for the signal let receipt_timeout = ms_to_us( self.unlocked_inner @@ -588,30 +595,38 @@ impl NetworkManager { let rpc = self.rpc_processor(); network_result_try!(rpc .rpc_call_signal( - Destination::relay(relay_nr, target_nr.clone()), + Destination::relay(relay_nr.clone(), target_nr.clone()), SignalInfo::ReverseConnect { receipt, peer_info }, ) .await .wrap_err("failed to send signal")?); // Wait for the return receipt - let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedOutOfBand - | ReceiptEvent::ReturnedSafety => { - return Ok(NetworkResult::invalid_message( - "reverse connect receipt should be returned in-band", - )); + let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await { + Err(_) => { + return Ok(NetworkResult::service_unavailable("network is stopping")); } - ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, - ReceiptEvent::Expired => { - return Ok(NetworkResult::timeout()); - } - ReceiptEvent::Cancelled => { - return Ok(NetworkResult::no_connection_other(format!( - "reverse connect receipt cancelled from {}", - target_nr - ))) + Ok(v) => { + let receipt_event = v.take_value().unwrap(); + match receipt_event { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedOutOfBand + | ReceiptEvent::ReturnedSafety => { + return Ok(NetworkResult::invalid_message( + "reverse connect receipt should be returned in-band", + )); + } + ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, + ReceiptEvent::Expired => { + return Ok(NetworkResult::timeout()); + } + ReceiptEvent::Cancelled => { + return Ok(NetworkResult::no_connection_other(format!( + "reverse connect receipt cancelled from {}", + target_nr + ))) + } + } } }; @@ -634,7 +649,9 @@ impl NetworkManager { )), } } else { - bail!("no reverse connection available") + return Ok(NetworkResult::no_connection_other(format!( + "reverse connection dropped from {}", target_nr) + )); } } @@ -648,6 +665,11 @@ impl NetworkManager { target_nr: NodeRef, data: Vec, ) -> EyreResult> { + // Detect if network is stopping so we can break out of this + let Some(stop_token) = self.unlocked_inner.startup_lock.stop_token() else { + return Ok(NetworkResult::service_unavailable("network is stopping")); + }; + // Ensure we are filtered down to UDP (the only hole punch protocol supported today) assert!(target_nr .filter_ref() @@ -706,23 +728,31 @@ impl NetworkManager { .wrap_err("failed to send signal")?); // Wait for the return receipt - let inbound_nr = match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedOutOfBand - | ReceiptEvent::ReturnedSafety => { - return Ok(NetworkResult::invalid_message( - "hole punch receipt should be returned in-band", - )); + let inbound_nr = match eventual_value.timeout_at(stop_token).in_current_span().await { + Err(_) => { + return Ok(NetworkResult::service_unavailable("network is stopping")); } - ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, - ReceiptEvent::Expired => { - return Ok(NetworkResult::timeout()); - } - ReceiptEvent::Cancelled => { - return Ok(NetworkResult::no_connection_other(format!( - "hole punch receipt cancelled from {}", - target_nr - ))) + Ok(v) => { + let receipt_event = v.take_value().unwrap(); + match receipt_event { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedOutOfBand + | ReceiptEvent::ReturnedSafety => { + return Ok(NetworkResult::invalid_message( + "hole punch receipt should be returned in-band", + )); + } + ReceiptEvent::ReturnedInBand { inbound_noderef } => inbound_noderef, + ReceiptEvent::Expired => { + return Ok(NetworkResult::timeout()); + } + ReceiptEvent::Cancelled => { + return Ok(NetworkResult::no_connection_other(format!( + "hole punch receipt cancelled from {}", + target_nr + ))) + } + } } }; @@ -749,7 +779,9 @@ impl NetworkManager { )), } } else { - bail!("no hole punch available") + return Ok(NetworkResult::no_connection_other(format!( + "hole punch dropped from {}", target_nr) + )); } } } diff --git a/veilid-core/src/network_manager/tasks/mod.rs b/veilid-core/src/network_manager/tasks/mod.rs index 0556bdb6..e663204d 100644 --- a/veilid-core/src/network_manager/tasks/mod.rs +++ b/veilid-core/src/network_manager/tasks/mod.rs @@ -48,6 +48,7 @@ impl NetworkManager { } } + #[instrument(level = "trace", name = "NetworkManager::tick", skip_all, err)] pub async fn tick(&self) -> EyreResult<()> { let routing_table = self.routing_table(); let net = self.net(); diff --git a/veilid-core/src/network_manager/tasks/public_address_check.rs b/veilid-core/src/network_manager/tasks/public_address_check.rs index e62567a6..fdb92262 100644 --- a/veilid-core/src/network_manager/tasks/public_address_check.rs +++ b/veilid-core/src/network_manager/tasks/public_address_check.rs @@ -2,10 +2,10 @@ use super::*; impl NetworkManager { // Clean up the public address check tables, removing entries that have timed out - #[instrument(level = "trace", skip(self), err)] + #[instrument(parent = None, level = "trace", skip_all, err)] pub(crate) async fn public_address_check_task_routine( self, - stop_token: StopToken, + _stop_token: StopToken, _last_ts: Timestamp, cur_ts: Timestamp, ) -> EyreResult<()> { diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 401af750..f116baa5 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -510,6 +510,7 @@ impl Network { } ////////////////////////////////////////// + #[instrument(level = "trace", target = "net", name = "Network::tick", skip_all, err)] pub(crate) async fn tick(&self) -> EyreResult<()> { let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { log_net!(debug "ignoring due to not started up"); diff --git a/veilid-core/src/routing_table/tasks/mod.rs b/veilid-core/src/routing_table/tasks/mod.rs index f90fa8e4..31a838b7 100644 --- a/veilid-core/src/routing_table/tasks/mod.rs +++ b/veilid-core/src/routing_table/tasks/mod.rs @@ -112,6 +112,7 @@ impl RoutingTable { /// Ticks about once per second /// to run tick tasks which may run at slower tick rates as configured + #[instrument(level = "trace", name = "RoutingTable::tick", skip_all, err)] pub async fn tick(&self) -> EyreResult<()> { // Don't tick if paused let opt_tick_guard = { diff --git a/veilid-core/src/routing_table/tasks/ping_validator.rs b/veilid-core/src/routing_table/tasks/ping_validator.rs index c95bd888..077a07d0 100644 --- a/veilid-core/src/routing_table/tasks/ping_validator.rs +++ b/veilid-core/src/routing_table/tasks/ping_validator.rs @@ -237,8 +237,6 @@ impl RoutingTable { _last_ts: Timestamp, cur_ts: Timestamp, ) -> EyreResult<()> { - eprintln!("pv tick"); - let mut futurequeue: VecDeque = VecDeque::new(); // PublicInternet @@ -253,9 +251,19 @@ impl RoutingTable { let mut unord = FuturesUnordered::new(); while !unord.is_empty() || !futurequeue.is_empty() { - log_rtab!(debug "Ping validation queue: {} remaining, {} in progress", futurequeue.len(), unord.len()); + log_rtab!( + "Ping validation queue: {} remaining, {} in progress", + futurequeue.len(), + unord.len() + ); + // Process one unordered futures if we have some - match unord.next().timeout_at(stop_token.clone()).await { + match unord + .next() + .timeout_at(stop_token.clone()) + .in_current_span() + .await + { Ok(Some(_)) => { // Some ping completed } @@ -273,7 +281,7 @@ impl RoutingTable { let Some(fq) = futurequeue.pop_front() else { break; }; - unord.push(fq.in_current_span()); + unord.push(fq); } } diff --git a/veilid-core/src/rpc_processor/fanout_call.rs b/veilid-core/src/rpc_processor/fanout_call.rs index a8e7cff9..0b9bc509 100644 --- a/veilid-core/src/rpc_processor/fanout_call.rs +++ b/veilid-core/src/rpc_processor/fanout_call.rs @@ -321,13 +321,17 @@ where } } // Wait for them to complete - timeout(timeout_ms, async { - while let Some(is_done) = unord.next().await { - if is_done { - break; + timeout( + timeout_ms, + async { + while let Some(is_done) = unord.next().in_current_span().await { + if is_done { + break; + } } } - }) + .in_current_span(), + ) .await .into_timeout_or() .map(|_| { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index f1b4b4ed..8b342f7c 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -277,7 +277,7 @@ enum RPCKind { ///////////////////////////////////////////////////////////////////// struct RPCProcessorInner { - send_channel: Option, RPCMessageEncoded)>>, + send_channel: Option>, stop_source: Option, worker_join_handles: Vec>, } @@ -590,7 +590,7 @@ impl RPCProcessor { }; Ok(nr) - }) + }.in_current_span()) } #[instrument(level="trace", target="rpc", skip_all)] @@ -1662,10 +1662,13 @@ impl RPCProcessor { RPCStatementDetail::AppMessage(_) => self.process_app_message(msg).await, }, RPCOperationKind::Answer(_) => { - self.unlocked_inner + let op_id = msg.operation.op_id(); + if let Err(e) = self.unlocked_inner .waiting_rpc_table - .complete_op_waiter(msg.operation.op_id(), msg) - .await?; + .complete_op_waiter(op_id, msg) { + log_rpc!(debug "Operation id {} did not complete: {}", op_id, e); + // Don't throw an error here because it's okay if the original operation timed out + } Ok(NetworkResult::value(())) } } @@ -1675,13 +1678,16 @@ impl RPCProcessor { async fn rpc_worker( self, stop_token: StopToken, - receiver: flume::Receiver<(Option, RPCMessageEncoded)>, + receiver: flume::Receiver<(Span, RPCMessageEncoded)>, ) { - while let Ok(Ok((_span_id, msg))) = + while let Ok(Ok((prev_span, msg))) = receiver.recv_async().timeout_at(stop_token.clone()).await { + let rpc_message_span = tracing::trace_span!("rpc message"); + rpc_message_span.follows_from(prev_span); + network_result_value_or_log!(match self - .process_rpc_message(msg).in_current_span() + .process_rpc_message(msg).instrument(rpc_message_span) .await { Err(e) => { @@ -1730,9 +1736,8 @@ impl RPCProcessor { }; send_channel }; - let span_id = Span::current().id(); send_channel - .try_send((span_id, msg)) + .try_send((Span::current(), msg)) .map_err(|e| eyre!("failed to enqueue direct RPC message: {}", e))?; Ok(()) } @@ -1766,9 +1771,8 @@ impl RPCProcessor { }; send_channel }; - let span_id = Span::current().id(); send_channel - .try_send((span_id, msg)) + .try_send((Span::current(), msg)) .map_err(|e| eyre!("failed to enqueue safety routed RPC message: {}", e))?; Ok(()) } @@ -1805,9 +1809,8 @@ impl RPCProcessor { }; send_channel }; - let span_id = Span::current().id(); send_channel - .try_send((span_id, msg)) + .try_send((Span::current(), msg)) .map_err(|e| eyre!("failed to enqueue private routed RPC message: {}", e))?; Ok(()) } diff --git a/veilid-core/src/rpc_processor/operation_waiter.rs b/veilid-core/src/rpc_processor/operation_waiter.rs index 0bf5ba01..9c24708b 100644 --- a/veilid-core/src/rpc_processor/operation_waiter.rs +++ b/veilid-core/src/rpc_processor/operation_waiter.rs @@ -8,7 +8,7 @@ where { waiter: OperationWaiter, op_id: OperationId, - eventual_instance: Option, T)>>, + result_receiver: Option>, } impl Drop for OperationWaitHandle @@ -17,7 +17,7 @@ where C: Unpin + Clone, { fn drop(&mut self) { - if self.eventual_instance.is_some() { + if self.result_receiver.is_some() { self.waiter.cancel_op_waiter(self.op_id); } } @@ -31,7 +31,7 @@ where { context: C, timestamp: Timestamp, - eventual: EventualValue<(Option, T)>, + result_sender: flume::Sender<(Span, T)>, } #[derive(Debug)] @@ -80,11 +80,11 @@ where /// Set up wait for operation to complete pub fn add_op_waiter(&self, op_id: OperationId, context: C) -> OperationWaitHandle { let mut inner = self.inner.lock(); - let e = EventualValue::new(); + let (result_sender, result_receiver) = flume::bounded(1); let waiting_op = OperationWaitingOp { context, timestamp: get_aligned_timestamp(), - eventual: e.clone(), + result_sender, }; if inner.waiting_op_table.insert(op_id, waiting_op).is_some() { error!( @@ -96,7 +96,7 @@ where OperationWaitHandle { waiter: self.clone(), op_id, - eventual_instance: Some(e.instance()), + result_receiver: Some(result_receiver), } } @@ -122,14 +122,15 @@ where } /// Remove wait for op + #[instrument(level = "trace", target = "rpc", skip_all)] fn cancel_op_waiter(&self, op_id: OperationId) { let mut inner = self.inner.lock(); inner.waiting_op_table.remove(&op_id); } - /// Complete the app call + /// Complete the waiting op #[instrument(level = "trace", target = "rpc", skip_all)] - pub async fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> { + pub fn complete_op_waiter(&self, op_id: OperationId, message: T) -> Result<(), RPCError> { let waiting_op = { let mut inner = self.inner.lock(); inner @@ -141,10 +142,9 @@ where )))? }; waiting_op - .eventual - .resolve((Span::current().id(), message)) - .await; - Ok(()) + .result_sender + .send((Span::current(), message)) + .map_err(RPCError::ignore) } /// Wait for operation to complete @@ -156,29 +156,30 @@ where ) -> Result, RPCError> { let timeout_ms = us_to_ms(timeout_us.as_u64()).map_err(RPCError::internal)?; - // Take the instance + // Take the receiver // After this, we must manually cancel since the cancel on handle drop is disabled - let eventual_instance = handle.eventual_instance.take().unwrap(); + let result_receiver = handle.result_receiver.take().unwrap(); + + let result_fut = result_receiver.recv_async().in_current_span(); // wait for eventualvalue let start_ts = get_aligned_timestamp(); - let res = timeout(timeout_ms, eventual_instance) - .await - .into_timeout_or(); - Ok(res - .on_timeout(|| { - // log_rpc!(debug "op wait timed out: {}", handle.op_id); - // debug_print_backtrace(); + let res = timeout(timeout_ms, result_fut).await.into_timeout_or(); + + match res { + TimeoutOr::Timeout => { self.cancel_op_waiter(handle.op_id); - }) - .map(|res| { - let (_span_id, ret) = res.take_value().unwrap(); + Ok(TimeoutOr::Timeout) + } + TimeoutOr::Value(Ok((_span_id, ret))) => { let end_ts = get_aligned_timestamp(); //xxx: causes crash (Missing otel data span extensions) // Span::current().follows_from(span_id); - (ret, end_ts.saturating_sub(start_ts)) - })) + Ok(TimeoutOr::Value((ret, end_ts.saturating_sub(start_ts)))) + } + TimeoutOr::Value(Err(e)) => Err(RPCError::ignore(e)), + } } } diff --git a/veilid-core/src/rpc_processor/rpc_app_call.rs b/veilid-core/src/rpc_processor/rpc_app_call.rs index cde79fe4..e5e4eaf6 100644 --- a/veilid-core/src/rpc_processor/rpc_app_call.rs +++ b/veilid-core/src/rpc_processor/rpc_app_call.rs @@ -153,11 +153,7 @@ impl RPCProcessor { /// Exposed to API for apps to return app call answers #[instrument(level = "trace", target = "rpc", skip_all)] - pub async fn app_call_reply( - &self, - call_id: OperationId, - message: Vec, - ) -> Result<(), RPCError> { + pub fn app_call_reply(&self, call_id: OperationId, message: Vec) -> Result<(), RPCError> { let _guard = self .unlocked_inner .startup_lock @@ -166,6 +162,6 @@ impl RPCProcessor { self.unlocked_inner .waiting_app_call_table .complete_op_waiter(call_id, message) - .await + .map_err(RPCError::ignore) } } diff --git a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs index 02fbfaed..95f2e0f9 100644 --- a/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs +++ b/veilid-core/src/rpc_processor/rpc_validate_dial_info.rs @@ -15,6 +15,11 @@ impl RPCProcessor { .startup_lock .enter() .map_err(RPCError::map_try_again("not started up"))?; + let stop_token = self + .unlocked_inner + .startup_lock + .stop_token() + .ok_or(RPCError::try_again("not started up"))?; let network_manager = self.network_manager(); let receipt_time = ms_to_us(self.unlocked_inner.validate_dial_info_receipt_time_ms); @@ -38,23 +43,35 @@ impl RPCProcessor { ); // Wait for receipt - match eventual_value.await.take_value().unwrap() { - ReceiptEvent::ReturnedPrivate { private_route: _ } - | ReceiptEvent::ReturnedInBand { inbound_noderef: _ } - | ReceiptEvent::ReturnedSafety => { - log_net!(debug "validate_dial_info receipt should be returned out-of-band"); - Ok(false) + match eventual_value + .timeout_at(stop_token) + .in_current_span() + .await + { + Err(_) => { + return Err(RPCError::try_again("not started up")); } - ReceiptEvent::ReturnedOutOfBand => { - log_net!(debug "validate_dial_info receipt returned"); - Ok(true) - } - ReceiptEvent::Expired => { - log_net!(debug "validate_dial_info receipt expired"); - Ok(false) - } - ReceiptEvent::Cancelled => { - Err(RPCError::internal("receipt was dropped before expiration")) + Ok(v) => { + let receipt_event = v.take_value().unwrap(); + match receipt_event { + ReceiptEvent::ReturnedPrivate { private_route: _ } + | ReceiptEvent::ReturnedInBand { inbound_noderef: _ } + | ReceiptEvent::ReturnedSafety => { + log_net!(debug "validate_dial_info receipt should be returned out-of-band"); + Ok(false) + } + ReceiptEvent::ReturnedOutOfBand => { + log_net!(debug "validate_dial_info receipt returned"); + Ok(true) + } + ReceiptEvent::Expired => { + log_net!(debug "validate_dial_info receipt expired"); + Ok(false) + } + ReceiptEvent::Cancelled => { + Err(RPCError::internal("receipt was dropped before expiration")) + } + } } } } diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index ef2eb2e6..675aef6f 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -182,7 +182,7 @@ impl StorageManager { log_network_result!(debug "GetValue fanout call returned peers {}", gva.answer.peers.len()); Ok(NetworkResult::value(gva.answer.peers)) - }.in_current_span() + }.instrument(tracing::trace_span!("outbound_get_value fanout routine")) } }; @@ -271,7 +271,7 @@ impl StorageManager { })) { log_dht!(debug "Sending GetValue result failed: {}", e); } - }.in_current_span())) + }.instrument(tracing::trace_span!("outbound_get_value result")))) .detach(); Ok(out_rx) @@ -319,7 +319,7 @@ impl StorageManager { // Return done false - }.in_current_span()) + }.instrument(tracing::trace_span!("outbound_get_value deferred results"))) }, ), ); diff --git a/veilid-core/src/storage_manager/inspect_value.rs b/veilid-core/src/storage_manager/inspect_value.rs index 1ec7761e..7483caaf 100644 --- a/veilid-core/src/storage_manager/inspect_value.rs +++ b/veilid-core/src/storage_manager/inspect_value.rs @@ -228,7 +228,7 @@ impl StorageManager { log_network_result!(debug "InspectValue fanout call returned peers {}", answer.peers.len()); Ok(NetworkResult::value(answer.peers)) - }.in_current_span() + }.instrument(tracing::trace_span!("outbound_inspect_value fanout call")) }; // Routine to call to check if we're done at each step diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index dd77a1c5..d673c3c5 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -177,7 +177,7 @@ impl StorageManager { ctx.send_partial_update = true; Ok(NetworkResult::value(sva.answer.peers)) - }.in_current_span() + }.instrument(tracing::trace_span!("fanout call_routine")) } }; @@ -267,7 +267,7 @@ impl StorageManager { })) { log_dht!(debug "Sending SetValue result failed: {}", e); } - }.in_current_span())) + }.instrument(tracing::trace_span!("outbound_set_value fanout routine")))) .detach(); Ok(out_rx) @@ -329,7 +329,7 @@ impl StorageManager { // Return done false - }.in_current_span()) + }.instrument(tracing::trace_span!("outbound_set_value deferred results"))) }, ), ); diff --git a/veilid-core/src/storage_manager/tasks/mod.rs b/veilid-core/src/storage_manager/tasks/mod.rs index dc580860..c128cfec 100644 --- a/veilid-core/src/storage_manager/tasks/mod.rs +++ b/veilid-core/src/storage_manager/tasks/mod.rs @@ -80,6 +80,7 @@ impl StorageManager { } } + #[instrument(parent = None, level = "trace", target = "stor", name = "StorageManager::tick", skip_all, err)] pub async fn tick(&self) -> EyreResult<()> { // Run the flush stores task self.unlocked_inner.flush_record_stores_task.tick().await?; @@ -109,6 +110,7 @@ impl StorageManager { Ok(()) } + #[instrument(level = "trace", target = "stor", skip_all)] pub(crate) async fn cancel_tasks(&self) { log_stor!(debug "stopping check watched records task"); if let Err(e) = self.unlocked_inner.check_watched_records_task.stop().await { diff --git a/veilid-core/src/storage_manager/tasks/send_value_changes.rs b/veilid-core/src/storage_manager/tasks/send_value_changes.rs index 457fc897..98a43c24 100644 --- a/veilid-core/src/storage_manager/tasks/send_value_changes.rs +++ b/veilid-core/src/storage_manager/tasks/send_value_changes.rs @@ -32,15 +32,24 @@ impl StorageManager { // Add a future for each value change for vc in value_changes { let this = self.clone(); - unord.push(async move { - if let Err(e) = this.send_value_change(vc).await { - log_stor!(debug "Failed to send value change: {}", e); + unord.push( + async move { + if let Err(e) = this.send_value_change(vc).await { + log_stor!(debug "Failed to send value change: {}", e); + } } - }); + .in_current_span(), + ); } while !unord.is_empty() { - match unord.next().timeout_at(stop_token.clone()).await { + match unord + .next() + .in_current_span() + .timeout_at(stop_token.clone()) + .in_current_span() + .await + { Ok(Some(_)) => { // Some ValueChanged completed } diff --git a/veilid-core/src/storage_manager/watch_value.rs b/veilid-core/src/storage_manager/watch_value.rs index 628f5b1f..e7a7f5aa 100644 --- a/veilid-core/src/storage_manager/watch_value.rs +++ b/veilid-core/src/storage_manager/watch_value.rs @@ -294,7 +294,7 @@ impl StorageManager { log_network_result!(debug "WatchValue fanout call returned peers {} ({})", wva.answer.peers.len(), next_node); Ok(NetworkResult::value(wva.answer.peers)) - }.in_current_span() + }.instrument(tracing::trace_span!("outbound_watch_value call routine")) }; // Routine to call to check if we're done at each step diff --git a/veilid-core/src/veilid_api/api.rs b/veilid-core/src/veilid_api/api.rs index a4179422..0bcf58e1 100644 --- a/veilid-core/src/veilid_api/api.rs +++ b/veilid-core/src/veilid_api/api.rs @@ -366,7 +366,6 @@ impl VeilidAPI { let rpc_processor = self.rpc_processor()?; rpc_processor .app_call_reply(call_id, message) - .await .map_err(|e| e.into()) } diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index 9feee64f..da53e02a 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -38,6 +38,7 @@ rt-tokio = [ ] tracking = ["veilid-core/tracking"] debug-json-api = [] +debug-locks = ["veilid-core/debug-locks"] [dependencies] veilid-core = { path = "../veilid-core", default-features = false } diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 0a21dbc0..20820737 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -108,25 +108,29 @@ pub async fn run_veilid_server( let capi2 = capi.clone(); let update_receiver_shutdown = SingleShotEventual::new(Some(())); let mut update_receiver_shutdown_instance = update_receiver_shutdown.instance().fuse(); - let update_receiver_jh = spawn_local("update_receiver", async move { - loop { - select! { - res = receiver.recv_async() => { - if let Ok(change) = res { - if let Some(capi) = &capi2 { - // Handle state changes on main thread for capnproto rpc - capi.clone().handle_update(change); + let update_receiver_jh = spawn_local( + "update_receiver", + async move { + loop { + select! { + res = receiver.recv_async() => { + if let Ok(change) = res { + if let Some(capi) = &capi2 { + // Handle state changes on main thread for capnproto rpc + capi.clone().handle_update(change); + } + } else { + break; } - } else { + } + _ = update_receiver_shutdown_instance => { break; } - } - _ = update_receiver_shutdown_instance => { - break; - } - }; + }; + } } - }); + .in_current_span(), + ); // Auto-attach if desired let mut out = Ok(()); diff --git a/veilid-tools/src/eventual.rs b/veilid-tools/src/eventual.rs index a0a49a47..33d5c51b 100644 --- a/veilid-tools/src/eventual.rs +++ b/veilid-tools/src/eventual.rs @@ -104,7 +104,7 @@ where match out { None => task::Poll::::Pending, Some(wakers) => { - // Wake all EventualResolvedFutures + // Wake all other instance futures for w in wakers { w.wake(); } diff --git a/veilid-tools/src/eventual_value.rs b/veilid-tools/src/eventual_value.rs index 16650f31..8bf93aa6 100644 --- a/veilid-tools/src/eventual_value.rs +++ b/veilid-tools/src/eventual_value.rs @@ -81,7 +81,7 @@ impl Future for EventualValueFuture { match out { None => task::Poll::::Pending, Some(wakers) => { - // Wake all EventualResolvedFutures + // Wake all other instance futures for w in wakers { w.wake(); } diff --git a/veilid-tools/src/eventual_value_clone.rs b/veilid-tools/src/eventual_value_clone.rs index fdaa9cf8..f0c555fb 100644 --- a/veilid-tools/src/eventual_value_clone.rs +++ b/veilid-tools/src/eventual_value_clone.rs @@ -77,7 +77,7 @@ impl Future for EventualValueCloneFuture { match out { None => task::Poll::::Pending, Some(wakers) => { - // Wake all EventualResolvedFutures + // Wake all other instance futures for w in wakers { w.wake(); } diff --git a/veilid-tools/src/lib.rs b/veilid-tools/src/lib.rs index 78fd2c4c..4cb232ac 100644 --- a/veilid-tools/src/lib.rs +++ b/veilid-tools/src/lib.rs @@ -251,6 +251,7 @@ pub mod tests; cfg_if! { if #[cfg(feature = "tracing")] { use tracing::*; + #[macro_export] macro_rules! debug_target_enabled { ($target:expr) => { enabled!(target: $target, Level::DEBUG) } diff --git a/veilid-tools/src/must_join_single_future.rs b/veilid-tools/src/must_join_single_future.rs index e89f000d..858a6575 100644 --- a/veilid-tools/src/must_join_single_future.rs +++ b/veilid-tools/src/must_join_single_future.rs @@ -64,6 +64,7 @@ where } /// Check the result and take it if there is one + #[cfg_attr(feature = "tracing", instrument(level = "trace", skip_all))] pub async fn check(&self) -> Result, ()> { let mut out: Option = None; @@ -95,6 +96,7 @@ where } /// Wait for the result and take it + #[cfg_attr(feature = "tracing", instrument(level = "trace", skip_all))] pub async fn join(&self) -> Result, ()> { let mut out: Option = None; diff --git a/veilid-tools/src/startup_lock.rs b/veilid-tools/src/startup_lock.rs index 04095375..dccd37ae 100644 --- a/veilid-tools/src/startup_lock.rs +++ b/veilid-tools/src/startup_lock.rs @@ -34,20 +34,20 @@ impl<'a> StartupLockGuard<'a> { #[derive(Debug)] pub struct StartupLockEnterGuard<'a> { _guard: AsyncRwLockReadGuard<'a, bool>, - // #[cfg(feature = "debug-locks")] + #[cfg(feature = "debug-locks")] id: usize, - // #[cfg(feature = "debug-locks")] + #[cfg(feature = "debug-locks")] active_guards: Arc>>, } -//#[cfg(feature = "debug-locks")] +#[cfg(feature = "debug-locks")] impl<'a> Drop for StartupLockEnterGuard<'a> { fn drop(&mut self) { self.active_guards.lock().remove(&self.id); } } -//#[cfg(feature = "debug-locks")] +#[cfg(feature = "debug-locks")] static GUARD_ID: AtomicUsize = AtomicUsize::new(0); /// Synchronization mechanism that tracks the startup and shutdown of a region of code. @@ -59,16 +59,18 @@ static GUARD_ID: AtomicUsize = AtomicUsize::new(0); /// asynchronous shutdown to wait for operations to finish before proceeding. #[derive(Debug)] pub struct StartupLock { - rwlock: AsyncRwLock, - // #[cfg(feature = "debug-locks")] + startup_state: AsyncRwLock, + stop_source: Mutex>, + #[cfg(feature = "debug-locks")] active_guards: Arc>>, } impl StartupLock { pub fn new() -> Self { Self { - rwlock: AsyncRwLock::new(false), - // #[cfg(feature = "debug-locks")] + startup_state: AsyncRwLock::new(false), + stop_source: Mutex::new(None), + #[cfg(feature = "debug-locks")] active_guards: Arc::new(Mutex::new(HashMap::new())), } } @@ -77,20 +79,29 @@ impl StartupLock { /// One must call 'success()' on the returned startup lock guard if startup was successful /// otherwise the startup lock will not shift to the 'started' state. pub fn startup(&self) -> Result { - let guard = asyncrwlock_try_write!(self.rwlock).ok_or(StartupLockAlreadyStartedError)?; + let guard = + asyncrwlock_try_write!(self.startup_state).ok_or(StartupLockAlreadyStartedError)?; if *guard { return Err(StartupLockAlreadyStartedError); } + *self.stop_source.lock() = Some(StopSource::new()); + Ok(StartupLockGuard { guard, success_value: true, }) } + /// Get a stop token for this lock + /// One can wait on this to timeout operations when a shutdown is requested + pub fn stop_token(&self) -> Option { + self.stop_source.lock().as_ref().map(|ss| ss.token()) + } + /// Check if this StartupLock is currently in a started state /// Returns false is the state is in transition pub fn is_started(&self) -> bool { - let Some(guard) = asyncrwlock_try_read!(self.rwlock) else { + let Some(guard) = asyncrwlock_try_read!(self.startup_state) else { return false; }; *guard @@ -99,7 +110,7 @@ impl StartupLock { /// Check if this StartupLock is currently in a shut down state /// Returns false is the state is in transition pub fn is_shut_down(&self) -> bool { - let Some(guard) = asyncrwlock_try_read!(self.rwlock) else { + let Some(guard) = asyncrwlock_try_read!(self.startup_state) else { return false; }; !*guard @@ -109,18 +120,20 @@ impl StartupLock { /// One must call 'success()' on the returned startup lock guard if shutdown was successful /// otherwise the startup lock will not shift to the 'stopped' state. pub async fn shutdown(&self) -> Result { + // Drop the stop source to ensure we can detect shutdown has been requested + *self.stop_source.lock() = None; + cfg_if! { if #[cfg(feature = "debug-locks")] { - //let guard = self.rwlock.write().await; + let guard = match timeout(30000, self.startup_state.write()).await { + Ok(v) => v, + Err(_) => { + eprintln!("active guards: {:#?}", self.active_guards.lock().values().collect::>()); + panic!("shutdown deadlock"); + } + }; } else { - let guard = self.rwlock.write().await; - // let guard = match timeout(30000, self.rwlock.write()).await { - // Ok(v) => v, - // Err(_) => { - // eprintln!("active guards: {:#?}", self.active_guards.lock().values().collect::>()); - // panic!("shutdown deadlock"); - // } - // }; + let guard = self.startup_state.write().await; } } if !*guard { @@ -136,19 +149,23 @@ impl StartupLock { /// If this module has not yet started up or is in the process of startup or shutdown /// this will fail. pub fn enter(&self) -> Result { - let guard = asyncrwlock_try_read!(self.rwlock).ok_or(StartupLockNotStartedError)?; + let guard = asyncrwlock_try_read!(self.startup_state).ok_or(StartupLockNotStartedError)?; if !*guard { return Err(StartupLockNotStartedError); } let out = StartupLockEnterGuard { _guard: guard, - //#[cfg(feature = "debug-locks")] + #[cfg(feature = "debug-locks")] id: GUARD_ID.fetch_add(1, Ordering::AcqRel), + #[cfg(feature = "debug-locks")] active_guards: self.active_guards.clone(), }; + + #[cfg(feature = "debug-locks")] self.active_guards .lock() .insert(out.id, backtrace::Backtrace::new()); + Ok(out) } } diff --git a/veilid-tools/src/tick_task.rs b/veilid-tools/src/tick_task.rs index d0f5d9fe..a121a193 100644 --- a/veilid-tools/src/tick_task.rs +++ b/veilid-tools/src/tick_task.rs @@ -104,22 +104,29 @@ impl TickTask { return Ok(()); } - self.internal_tick(now, last_timestamp_us).await.map(drop) + let itick = self.internal_tick(now, last_timestamp_us); + + itick.await.map(drop) } pub async fn try_tick_now(&self) -> Result { let now = get_timestamp(); let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire); - self.internal_tick(now, last_timestamp_us).await + let itick = self.internal_tick(now, last_timestamp_us); + + itick.await } async fn internal_tick(&self, now: u64, last_timestamp_us: u64) -> Result { // Lock the stop source, tells us if we have ever started this future - let opt_stop_source = &mut *self.stop_source.lock().await; + let opt_stop_source_fut = self.stop_source.lock(); + + let opt_stop_source = &mut *opt_stop_source_fut.await; + if opt_stop_source.is_some() { // See if the previous execution finished with an error - match self.single_future.check().await { + match self.single_future.check().in_current_span().await { Ok(Some(Err(e))) => { // We have an error result, which means the singlefuture ran but we need to propagate the error return Err(e); @@ -145,15 +152,18 @@ impl TickTask { let stop_token = stop_source.token(); let running = self.running.clone(); let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now); + let wrapped_routine = Box::pin(async move { running.store(true, core::sync::atomic::Ordering::Release); let out = routine.await; running.store(false, core::sync::atomic::Ordering::Release); out }); + match self .single_future .single_spawn(&self.name, wrapped_routine) + .in_current_span() .await { // We should have already consumed the result of the last run, or there was none diff --git a/veilid-tools/src/timeout.rs b/veilid-tools/src/timeout.rs index 5203c416..514bb727 100644 --- a/veilid-tools/src/timeout.rs +++ b/veilid-tools/src/timeout.rs @@ -8,7 +8,9 @@ cfg_if! { where F: Future, { - match select(Box::pin(sleep(dur_ms)), Box::pin(f)).await { + let tout = select(Box::pin(sleep(dur_ms)), Box::pin(f)); + + match tout.await { Either::Left((_x, _b)) => Err(TimeoutError()), Either::Right((y, _a)) => Ok(y), } @@ -22,11 +24,13 @@ cfg_if! { { cfg_if! { if #[cfg(feature="rt-async-std")] { - async_std::future::timeout(Duration::from_millis(dur_ms as u64), f).await.map_err(|e| e.into()) + let tout = async_std::future::timeout(Duration::from_millis(dur_ms as u64), f); } else if #[cfg(feature="rt-tokio")] { - tokio::time::timeout(Duration::from_millis(dur_ms as u64), f).await.map_err(|e| e.into()) + let tout = tokio::time::timeout(Duration::from_millis(dur_ms as u64), f); } } + + tout.await.map_err(|e| e.into()) } } diff --git a/veilid-tools/src/tools.rs b/veilid-tools/src/tools.rs index 4536b5dd..b9a1da4c 100644 --- a/veilid-tools/src/tools.rs +++ b/veilid-tools/src/tools.rs @@ -506,20 +506,21 @@ pub fn map_to_string(arg: X) -> String { ////////////////////////////////////////////////////////////////////////////////////////////////////////////// pub struct DebugGuard { + name: &'static str, counter: &'static AtomicUsize, } impl DebugGuard { - pub fn new(counter: &'static AtomicUsize) -> Self { + pub fn new(name: &'static str, counter: &'static AtomicUsize) -> Self { let c = counter.fetch_add(1, Ordering::SeqCst); - eprintln!("DebugGuard Entered: {}", c + 1); - Self { counter } + eprintln!("{} entered: {}", name, c + 1); + Self { name, counter } } } impl Drop for DebugGuard { fn drop(&mut self) { let c = self.counter.fetch_sub(1, Ordering::SeqCst); - eprintln!("DebugGuard Exited: {}", c - 1); + eprintln!("{} exited: {}", self.name, c - 1); } }