From 35dc7bdfd6a69ecec2c32b114d06bb04ea4f83f4 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Sat, 20 Jul 2024 19:42:25 -0400 Subject: [PATCH] spawn names --- veilid-cli/src/client_api_connection.rs | 2 +- veilid-cli/src/command_processor.rs | 20 +++++----- veilid-cli/src/log_viewer_ui.rs | 1 + veilid-core/src/attachment_manager.rs | 5 ++- veilid-core/src/crypto/mod.rs | 2 +- .../src/network_manager/connection_manager.rs | 5 ++- veilid-core/src/network_manager/mod.rs | 16 ++++++-- .../src/network_manager/native/igd_manager.rs | 6 +-- veilid-core/src/network_manager/native/mod.rs | 6 +-- .../src/network_manager/native/network_tcp.rs | 2 +- .../src/network_manager/native/network_udp.rs | 4 +- .../src/network_manager/network_connection.rs | 2 +- .../src/network_manager/receipt_manager.rs | 1 + veilid-core/src/network_manager/wasm/mod.rs | 2 - veilid-core/src/routing_table/mod.rs | 28 ++++++++++---- veilid-core/src/rpc_processor/mod.rs | 11 ++---- veilid-core/src/storage_manager/get_value.rs | 2 +- veilid-core/src/storage_manager/mod.rs | 25 +++++++++--- veilid-core/src/storage_manager/set_value.rs | 2 +- .../storage_manager/storage_manager_inner.rs | 2 +- veilid-core/src/veilid_api/api.rs | 2 +- veilid-server/src/client_api.rs | 14 +++++-- veilid-server/src/server.rs | 2 +- veilid-server/src/unix.rs | 2 +- veilid-tools/src/deferred_stream_processor.rs | 5 ++- veilid-tools/src/interval.rs | 8 ++-- veilid-tools/src/must_join_single_future.rs | 6 ++- veilid-tools/src/spawn.rs | 38 +++++++++---------- .../src/tests/common/test_async_tag_lock.rs | 12 +++--- .../src/tests/common/test_host_interface.rs | 37 +++++++++--------- .../src/tests/common/test_startup_lock.rs | 6 +-- veilid-tools/src/tick_task.rs | 16 ++++++-- 32 files changed, 177 insertions(+), 115 deletions(-) diff --git a/veilid-cli/src/client_api_connection.rs b/veilid-cli/src/client_api_connection.rs index ef185a2a..18a3f277 100644 --- a/veilid-cli/src/client_api_connection.rs +++ b/veilid-cli/src/client_api_connection.rs @@ -187,7 +187,7 @@ impl ClientApiConnection { // Request initial server state let capi = self.clone(); - spawn_detached_local(async move { + spawn_detached_local("get initial server state", async move { let mut req = json::JsonValue::new_object(); req["op"] = "GetState".into(); let Some(resp) = capi.perform_request(req).await else { diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index 7eb7c847..f648eff4 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -114,7 +114,7 @@ impl CommandProcessor { trace!("CommandProcessor::cmd_help"); let capi = self.capi(); let ui = self.ui_sender(); - spawn_detached_local(async move { + spawn_detached_local("cmd help", async move { let out = match capi.server_debug("help".to_owned()).await { Err(e) => { error!("Server command 'debug help' failed: {}", e); @@ -166,7 +166,7 @@ Server Debug Commands: trace!("CommandProcessor::cmd_shutdown"); let capi = self.capi(); let ui = self.ui_sender(); - spawn_detached_local(async move { + spawn_detached_local("cmd shutdown", async move { if let Err(e) = capi.server_shutdown().await { error!("Server command 'shutdown' failed to execute: {}", e); } @@ -179,7 +179,7 @@ Server Debug Commands: trace!("CommandProcessor::cmd_disconnect"); let capi = self.capi(); let ui = self.ui_sender(); - spawn_detached_local(async move { + spawn_detached_local("cmd disconnect", async move { capi.disconnect().await; ui.send_callback(callback); }); @@ -190,7 +190,7 @@ Server Debug Commands: trace!("CommandProcessor::cmd_debug"); let capi = self.capi(); let ui = self.ui_sender(); - spawn_detached_local(async move { + spawn_detached_local("cmd debug", async move { match capi.server_debug(command_line).await { Ok(output) => { ui.add_node_event(Level::Info, &output); @@ -213,7 +213,7 @@ Server Debug Commands: trace!("CommandProcessor::cmd_change_log_level"); let capi = self.capi(); let ui = self.ui_sender(); - spawn_detached_local(async move { + spawn_detached_local("cmd change_log_level", async move { let (layer, rest) = Self::word_split(&rest.unwrap_or_default()); let log_level = match convert_loglevel(&rest.unwrap_or_default()) { Ok(v) => v, @@ -252,7 +252,7 @@ Server Debug Commands: trace!("CommandProcessor::cmd_change_log_ignore"); let capi = self.capi(); let ui = self.ui_sender(); - spawn_detached_local(async move { + spawn_detached_local("cmd change_log_ignoe", async move { let (layer, rest) = Self::word_split(&rest.unwrap_or_default()); let log_ignore = rest.unwrap_or_default(); @@ -284,7 +284,7 @@ Server Debug Commands: let ui = self.ui_sender(); let this = self.clone(); - spawn_detached_local(async move { + spawn_detached_local("cmd enable", async move { let flag = rest.clone().unwrap_or_default(); match flag.as_str() { "app_messages" => { @@ -306,7 +306,7 @@ Server Debug Commands: let ui = self.ui_sender(); let this = self.clone(); - spawn_detached_local(async move { + spawn_detached_local("cmd disable", async move { let flag = rest.clone().unwrap_or_default(); match flag.as_str() { "app_messages" => { @@ -664,7 +664,7 @@ Server Debug Commands: pub fn attach(&self) { let capi = self.capi(); - spawn_detached_local(async move { + spawn_detached_local("attach", async move { if let Err(e) = capi.server_attach().await { error!("Server command 'attach' failed to execute: {}", e); } @@ -674,7 +674,7 @@ Server Debug Commands: pub fn detach(&self) { let capi = self.capi(); - spawn_detached_local(async move { + spawn_detached_local("detach", async move { if let Err(e) = capi.server_detach().await { error!("Server command 'detach' failed to execute: {}", e); } diff --git a/veilid-cli/src/log_viewer_ui.rs b/veilid-cli/src/log_viewer_ui.rs index 696cefe4..1e740ad0 100644 --- a/veilid-cli/src/log_viewer_ui.rs +++ b/veilid-cli/src/log_viewer_ui.rs @@ -86,6 +86,7 @@ impl LogViewerUI { done.await; } else { while let Ok(Ok(c)) = blocking_wrapper( + "LogViewerUI read", { let term = term.clone(); move || term.read_char() diff --git a/veilid-core/src/attachment_manager.rs b/veilid-core/src/attachment_manager.rs index 6a1a3a55..eaea459a 100644 --- a/veilid-core/src/attachment_manager.rs +++ b/veilid-core/src/attachment_manager.rs @@ -323,7 +323,10 @@ impl AttachmentManager { return false; } inner.maintain_peers = true; - inner.attachment_maintainer_jh = Some(spawn(self.clone().attachment_maintainer())); + inner.attachment_maintainer_jh = Some(spawn( + "attachment maintainer", + self.clone().attachment_maintainer(), + )); true } diff --git a/veilid-core/src/crypto/mod.rs b/veilid-core/src/crypto/mod.rs index 9d88a3ca..81218942 100644 --- a/veilid-core/src/crypto/mod.rs +++ b/veilid-core/src/crypto/mod.rs @@ -176,7 +176,7 @@ impl Crypto { // Schedule flushing let this = self.clone(); - let flush_future = interval(60000, move || { + let flush_future = interval("crypto flush", 60000, move || { let this = this.clone(); async move { if let Err(e) = this.flush().await { diff --git a/veilid-core/src/network_manager/connection_manager.rs b/veilid-core/src/network_manager/connection_manager.rs index 2539a7a8..6d789f10 100644 --- a/veilid-core/src/network_manager/connection_manager.rs +++ b/veilid-core/src/network_manager/connection_manager.rs @@ -134,7 +134,10 @@ impl ConnectionManager { let stop_source = StopSource::new(); // Spawn the async processor - let async_processor = spawn(self.clone().async_processor(stop_source.token(), receiver)); + let async_processor = spawn( + "connection manager async processor", + self.clone().async_processor(stop_source.token(), receiver), + ); // Store in the inner object *inner = Some(Self::new_inner(stop_source, sender, async_processor)); diff --git a/veilid-core/src/network_manager/mod.rs b/veilid-core/src/network_manager/mod.rs index 524ebccd..ef69b170 100644 --- a/veilid-core/src/network_manager/mod.rs +++ b/veilid-core/src/network_manager/mod.rs @@ -141,6 +141,7 @@ enum SendDataToExistingFlowResult { #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum StartupDisposition { Success, + #[cfg_attr(target_arch = "wasm32", allow(dead_code))] BindRetry, } @@ -213,9 +214,18 @@ impl NetworkManager { routing_table: RwLock::new(None), components: RwLock::new(None), update_callback: RwLock::new(None), - rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), - public_address_check_task: TickTask::new(PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS), - address_filter_task: TickTask::new(ADDRESS_FILTER_TASK_INTERVAL_SECS), + rolling_transfers_task: TickTask::new( + "rolling_transfers_task", + ROLLING_TRANSFERS_INTERVAL_SECS, + ), + public_address_check_task: TickTask::new( + "public_address_check_task", + PUBLIC_ADDRESS_CHECK_TASK_INTERVAL_SECS, + ), + address_filter_task: TickTask::new( + "address_filter_task", + ADDRESS_FILTER_TASK_INTERVAL_SECS, + ), network_key, startup_lock: StartupLock::new(), } diff --git a/veilid-core/src/network_manager/native/igd_manager.rs b/veilid-core/src/network_manager/native/igd_manager.rs index bdd0391a..c867323a 100644 --- a/veilid-core/src/network_manager/native/igd_manager.rs +++ b/veilid-core/src/network_manager/native/igd_manager.rs @@ -191,7 +191,7 @@ impl IGDManager { mapped_port: u16, ) -> Option<()> { let this = self.clone(); - blocking_wrapper(move || { + blocking_wrapper("igd unmap_port", move || { let mut inner = this.inner.lock(); // If we already have this port mapped, just return the existing portmap @@ -235,7 +235,7 @@ impl IGDManager { expected_external_address: Option, ) -> Option { let this = self.clone(); - blocking_wrapper(move || { + blocking_wrapper("igd map_any_port", move || { let mut inner = this.inner.lock(); // If we already have this port mapped, just return the existing portmap @@ -340,7 +340,7 @@ impl IGDManager { } let this = self.clone(); - blocking_wrapper(move || { + blocking_wrapper("igd tick", move || { let mut inner = this.inner.lock(); // Process full renewals diff --git a/veilid-core/src/network_manager/native/mod.rs b/veilid-core/src/network_manager/native/mod.rs index 909809d0..1e9a0400 100644 --- a/veilid-core/src/network_manager/native/mod.rs +++ b/veilid-core/src/network_manager/native/mod.rs @@ -173,9 +173,9 @@ impl Network { routing_table, connection_manager, interfaces: NetworkInterfaces::new(), - update_network_class_task: TickTask::new(1), - network_interfaces_task: TickTask::new(1), - upnp_task: TickTask::new(1), + update_network_class_task: TickTask::new("update_network_class_task", 1), + network_interfaces_task: TickTask::new("network_interfaces_task", 1), + upnp_task: TickTask::new("upnp_task", 1), igd_manager: igd_manager::IGDManager::new(config.clone()), } } diff --git a/veilid-core/src/network_manager/native/network_tcp.rs b/veilid-core/src/network_manager/native/network_tcp.rs index d47a4da3..60d26a35 100644 --- a/veilid-core/src/network_manager/native/network_tcp.rs +++ b/veilid-core/src/network_manager/native/network_tcp.rs @@ -297,7 +297,7 @@ impl Network { let connection_manager = self.connection_manager(); //////////////////////////////////////////////////////////// - let jh = spawn(async move { + let jh = spawn(&format!("TCP listener {}", addr), async move { // moves listener object in and get incoming iterator // when this task exists, the listener will close the socket diff --git a/veilid-core/src/network_manager/native/network_udp.rs b/veilid-core/src/network_manager/native/network_udp.rs index 6e082992..d4435245 100644 --- a/veilid-core/src/network_manager/native/network_udp.rs +++ b/veilid-core/src/network_manager/native/network_udp.rs @@ -16,14 +16,14 @@ impl Network { } } log_net!("task_count: {}", task_count); - for _ in 0..task_count { + for task_n in 0..task_count { log_net!("Spawning UDP listener task"); //////////////////////////////////////////////////////////// // Run thread task to process stream of messages let this = self.clone(); - let jh = spawn(async move { + let jh = spawn(&format!("UDP listener {}", task_n), async move { log_net!("UDP listener task spawned"); // Collect all our protocol handlers into a vector diff --git a/veilid-core/src/network_manager/network_connection.rs b/veilid-core/src/network_manager/network_connection.rs index 6acc0117..18a73dc9 100644 --- a/veilid-core/src/network_manager/network_connection.rs +++ b/veilid-core/src/network_manager/network_connection.rs @@ -148,7 +148,7 @@ impl NetworkConnection { let local_stop_token = stop_source.token(); // Spawn connection processor and pass in protocol connection - let processor = spawn(Self::process_connection( + let processor = spawn("connection processor", Self::process_connection( connection_manager, local_stop_token, manager_stop_token, diff --git a/veilid-core/src/network_manager/receipt_manager.rs b/veilid-core/src/network_manager/receipt_manager.rs index ae8bf083..45c9c38b 100644 --- a/veilid-core/src/network_manager/receipt_manager.rs +++ b/veilid-core/src/network_manager/receipt_manager.rs @@ -305,6 +305,7 @@ impl ReceiptManager { // Single-spawn the timeout task routine let _ = timeout_task .single_spawn( + "receipt timeout", self.clone() .timeout_task_routine(now, stop_token) .in_current_span(), diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 25890b00..401af750 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -52,7 +52,6 @@ pub const MAX_CAPABILITIES: usize = 64; ///////////////////////////////////////////////////////////////// struct NetworkInner { - network_started: Option, network_needs_restart: bool, protocol_config: ProtocolConfig, } @@ -77,7 +76,6 @@ pub(in crate::network_manager) struct Network { impl Network { fn new_inner() -> NetworkInner { NetworkInner { - network_started: Some(false), network_needs_restart: false, protocol_config: Default::default(), } diff --git a/veilid-core/src/routing_table/mod.rs b/veilid-core/src/routing_table/mod.rs index 19c0d5e0..fc8acc10 100644 --- a/veilid-core/src/routing_table/mod.rs +++ b/veilid-core/src/routing_table/mod.rs @@ -221,14 +221,26 @@ impl RoutingTable { node_id: c.network.routing_table.node_id.clone(), node_id_secret: c.network.routing_table.node_id_secret.clone(), kick_queue: Mutex::new(BTreeSet::default()), - rolling_transfers_task: TickTask::new(ROLLING_TRANSFERS_INTERVAL_SECS), - kick_buckets_task: TickTask::new(1), - bootstrap_task: TickTask::new(1), - peer_minimum_refresh_task: TickTask::new(1), - closest_peers_refresh_task: TickTask::new_ms(c.network.dht.min_peer_refresh_time_ms), - ping_validator_task: TickTask::new(1), - relay_management_task: TickTask::new(RELAY_MANAGEMENT_INTERVAL_SECS), - private_route_management_task: TickTask::new(PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS), + rolling_transfers_task: TickTask::new( + "rolling_transfers_task", + ROLLING_TRANSFERS_INTERVAL_SECS, + ), + kick_buckets_task: TickTask::new("kick_buckets_task", 1), + bootstrap_task: TickTask::new("bootstrap_task", 1), + peer_minimum_refresh_task: TickTask::new("peer_minimum_refresh_task", 1), + closest_peers_refresh_task: TickTask::new_ms( + "closest_peers_refresh_task", + c.network.dht.min_peer_refresh_time_ms, + ), + ping_validator_task: TickTask::new("ping_validator_task", 1), + relay_management_task: TickTask::new( + "relay_management_task", + RELAY_MANAGEMENT_INTERVAL_SECS, + ), + private_route_management_task: TickTask::new( + "private_route_management_task", + PRIVATE_ROUTE_MANAGEMENT_INTERVAL_SECS, + ), } } pub fn new(network_manager: NetworkManager) -> Self { diff --git a/veilid-core/src/rpc_processor/mod.rs b/veilid-core/src/rpc_processor/mod.rs index cca7766b..f1b4b4ed 100644 --- a/veilid-core/src/rpc_processor/mod.rs +++ b/veilid-core/src/rpc_processor/mod.rs @@ -392,10 +392,10 @@ impl RPCProcessor { "Spinning up {} RPC workers", self.unlocked_inner.concurrency ); - for _ in 0..self.unlocked_inner.concurrency { + for task_n in 0..self.unlocked_inner.concurrency { let this = self.clone(); let receiver = channel.1.clone(); - let jh = spawn(Self::rpc_worker( + let jh = spawn(&format!("rpc worker {}",task_n), Self::rpc_worker( this, inner.stop_source.as_ref().unwrap().token(), receiver, @@ -1679,14 +1679,9 @@ impl RPCProcessor { ) { while let Ok(Ok((_span_id, msg))) = receiver.recv_async().timeout_at(stop_token.clone()).await - { - //let rpc_worker_span = span!(parent: None, Level::TRACE, "rpc_worker recv"); - // xxx: causes crash (Missing otel data span extensions) - // rpc_worker_span.follows_from(span_id); - + { network_result_value_or_log!(match self .process_rpc_message(msg).in_current_span() - //.instrument(rpc_worker_span) .await { Err(e) => { diff --git a/veilid-core/src/storage_manager/get_value.rs b/veilid-core/src/storage_manager/get_value.rs index 1f2a785a..ef2eb2e6 100644 --- a/veilid-core/src/storage_manager/get_value.rs +++ b/veilid-core/src/storage_manager/get_value.rs @@ -225,7 +225,7 @@ impl StorageManager { }; // Call the fanout in a spawned task - spawn(Box::pin(async move { + spawn("outbound_get_value fanout", Box::pin(async move { let fanout_call = FanoutCall::new( routing_table.clone(), key, diff --git a/veilid-core/src/storage_manager/mod.rs b/veilid-core/src/storage_manager/mod.rs index c841e616..b34f494c 100644 --- a/veilid-core/src/storage_manager/mod.rs +++ b/veilid-core/src/storage_manager/mod.rs @@ -89,11 +89,26 @@ impl StorageManager { table_store, #[cfg(feature = "unstable-blockstore")] block_store, - flush_record_stores_task: TickTask::new(FLUSH_RECORD_STORES_INTERVAL_SECS), - offline_subkey_writes_task: TickTask::new(OFFLINE_SUBKEY_WRITES_INTERVAL_SECS), - send_value_changes_task: TickTask::new(SEND_VALUE_CHANGES_INTERVAL_SECS), - check_active_watches_task: TickTask::new(CHECK_ACTIVE_WATCHES_INTERVAL_SECS), - check_watched_records_task: TickTask::new(CHECK_WATCHED_RECORDS_INTERVAL_SECS), + flush_record_stores_task: TickTask::new( + "flush_record_stores_task", + FLUSH_RECORD_STORES_INTERVAL_SECS, + ), + offline_subkey_writes_task: TickTask::new( + "offline_subkey_writes_task", + OFFLINE_SUBKEY_WRITES_INTERVAL_SECS, + ), + send_value_changes_task: TickTask::new( + "send_value_changes_task", + SEND_VALUE_CHANGES_INTERVAL_SECS, + ), + check_active_watches_task: TickTask::new( + "check_active_watches_task", + CHECK_ACTIVE_WATCHES_INTERVAL_SECS, + ), + check_watched_records_task: TickTask::new( + "check_watched_records_task", + CHECK_WATCHED_RECORDS_INTERVAL_SECS, + ), anonymous_watch_keys, } diff --git a/veilid-core/src/storage_manager/set_value.rs b/veilid-core/src/storage_manager/set_value.rs index 7e343b80..dd77a1c5 100644 --- a/veilid-core/src/storage_manager/set_value.rs +++ b/veilid-core/src/storage_manager/set_value.rs @@ -224,7 +224,7 @@ impl StorageManager { }; // Call the fanout in a spawned task - spawn(Box::pin(async move { + spawn("outbound_set_value fanout", Box::pin(async move { let fanout_call = FanoutCall::new( routing_table.clone(), key, diff --git a/veilid-core/src/storage_manager/storage_manager_inner.rs b/veilid-core/src/storage_manager/storage_manager_inner.rs index 70f59092..f7a1a18e 100644 --- a/veilid-core/src/storage_manager/storage_manager_inner.rs +++ b/veilid-core/src/storage_manager/storage_manager_inner.rs @@ -133,7 +133,7 @@ impl StorageManagerInner { self.deferred_result_processor.init().await; // Schedule tick - let tick_future = interval(1000, move || { + let tick_future = interval("storage manager tick", 1000, move || { let this = outer_self.clone(); async move { if let Err(e) = this.tick().await { diff --git a/veilid-core/src/veilid_api/api.rs b/veilid-core/src/veilid_api/api.rs index 5c5f8123..a4179422 100644 --- a/veilid-core/src/veilid_api/api.rs +++ b/veilid-core/src/veilid_api/api.rs @@ -15,7 +15,7 @@ impl fmt::Debug for VeilidAPIInner { impl Drop for VeilidAPIInner { fn drop(&mut self) { if let Some(context) = self.context.take() { - spawn_detached(api_shutdown(context)); + spawn_detached("api shutdown", api_shutdown(context)); } } } diff --git a/veilid-server/src/client_api.rs b/veilid-server/src/client_api.rs index 22b47838..33cd160c 100644 --- a/veilid-server/src/client_api.rs +++ b/veilid-server/src/client_api.rs @@ -145,7 +145,11 @@ impl ClientApi { let t_awg = awg.clone(); // Process the connection - spawn(self.clone().handle_ipc_connection(stream, t_awg)).detach(); + spawn( + "client_api handle_ipc_connection", + self.clone().handle_ipc_connection(stream, t_awg), + ) + .detach(); } // Wait for all connections to terminate @@ -183,7 +187,11 @@ impl ClientApi { let t_awg = awg.clone(); // Process the connection - spawn(self.clone().handle_tcp_connection(stream, t_awg)).detach(); + spawn( + "client_api handle_tcp_connection", + self.clone().handle_tcp_connection(stream, t_awg), + ) + .detach(); } // Wait for all connections to terminate @@ -543,6 +551,6 @@ impl ClientApi { } let bind_futures_join = join_all(bind_futures); - self.inner.lock().join_handle = Some(spawn(bind_futures_join)); + self.inner.lock().join_handle = Some(spawn("client_api bind_futures", bind_futures_join)); } } diff --git a/veilid-server/src/server.rs b/veilid-server/src/server.rs index 33b519fc..0a21dbc0 100644 --- a/veilid-server/src/server.rs +++ b/veilid-server/src/server.rs @@ -108,7 +108,7 @@ pub async fn run_veilid_server( let capi2 = capi.clone(); let update_receiver_shutdown = SingleShotEventual::new(Some(())); let mut update_receiver_shutdown_instance = update_receiver_shutdown.instance().fuse(); - let update_receiver_jh = spawn_local(async move { + let update_receiver_jh = spawn_local("update_receiver", async move { loop { select! { res = receiver.recv_async() => { diff --git a/veilid-server/src/unix.rs b/veilid-server/src/unix.rs index 2d3d7fd4..af0d65f8 100644 --- a/veilid-server/src/unix.rs +++ b/veilid-server/src/unix.rs @@ -34,7 +34,7 @@ pub async fn run_veilid_server_with_signals( Signals::new([SIGHUP, SIGTERM, SIGINT, SIGQUIT]).wrap_err("failed to init signals")?; let handle = signals.handle(); - let signals_task = spawn(handle_signals(signals)); + let signals_task = spawn("signals", handle_signals(signals)); // Run veilid server let res = run_veilid_server(settings, server_mode, veilid_logs).await; diff --git a/veilid-tools/src/deferred_stream_processor.rs b/veilid-tools/src/deferred_stream_processor.rs index de9577f7..13f036fe 100644 --- a/veilid-tools/src/deferred_stream_processor.rs +++ b/veilid-tools/src/deferred_stream_processor.rs @@ -32,7 +32,10 @@ impl DeferredStreamProcessor { self.opt_stopper = Some(stopper); let (dsc_tx, dsc_rx) = flume::unbounded::>(); self.opt_deferred_stream_channel = Some(dsc_tx); - self.opt_join_handle = Some(spawn(Self::processor(stop_token, dsc_rx))); + self.opt_join_handle = Some(spawn( + "deferred stream processor", + Self::processor(stop_token, dsc_rx), + )); } /// Terminate the processor and ensure all streams are closed diff --git a/veilid-tools/src/interval.rs b/veilid-tools/src/interval.rs index 1d9a0bee..2ea5974f 100644 --- a/veilid-tools/src/interval.rs +++ b/veilid-tools/src/interval.rs @@ -3,7 +3,7 @@ use super::*; cfg_if! { if #[cfg(target_arch = "wasm32")] { - pub fn interval(freq_ms: u32, callback: F) -> SendPinBoxFuture<()> + pub fn interval(name: &str, freq_ms: u32, callback: F) -> SendPinBoxFuture<()> where F: Fn() -> FUT + Send + Sync + 'static, FUT: Future + Send, @@ -11,7 +11,7 @@ cfg_if! { let e = Eventual::new(); let ie = e.clone(); - let jh = spawn(Box::pin(async move { + let jh = spawn(name, Box::pin(async move { while timeout(freq_ms, ie.instance_clone(())).await.is_err() { callback().await; } @@ -25,7 +25,7 @@ cfg_if! { } else { - pub fn interval(freq_ms: u32, callback: F) -> SendPinBoxFuture<()> + pub fn interval(name: &str, freq_ms: u32, callback: F) -> SendPinBoxFuture<()> where F: Fn() -> FUT + Send + Sync + 'static, FUT: Future + Send, @@ -33,7 +33,7 @@ cfg_if! { let e = Eventual::new(); let ie = e.clone(); - let jh = spawn(async move { + let jh = spawn(name, async move { while timeout(freq_ms, ie.instance_clone(())).await.is_err() { callback().await; } diff --git a/veilid-tools/src/must_join_single_future.rs b/veilid-tools/src/must_join_single_future.rs index d9857ad6..e89f000d 100644 --- a/veilid-tools/src/must_join_single_future.rs +++ b/veilid-tools/src/must_join_single_future.rs @@ -124,6 +124,7 @@ where // Possibly spawn the future possibly returning the value of the last execution pub async fn single_spawn_local( &self, + name: &str, future: impl Future + 'static, ) -> Result<(Option, bool), ()> { let mut out: Option = None; @@ -152,7 +153,7 @@ where // Run if we should do that if run { - self.unlock(Some(spawn_local(future))); + self.unlock(Some(spawn_local(name, future))); } // Return the prior result if we have one @@ -166,6 +167,7 @@ where { pub async fn single_spawn( &self, + name: &str, future: impl Future + Send + 'static, ) -> Result<(Option, bool), ()> { let mut out: Option = None; @@ -191,7 +193,7 @@ where } // Run if we should do that if run { - self.unlock(Some(spawn(future))); + self.unlock(Some(spawn(name, future))); } // Return the prior result if we have one Ok((out, run)) diff --git a/veilid-tools/src/spawn.rs b/veilid-tools/src/spawn.rs index 8d85b01c..5b494c1c 100644 --- a/veilid-tools/src/spawn.rs +++ b/veilid-tools/src/spawn.rs @@ -4,7 +4,7 @@ cfg_if! { if #[cfg(target_arch = "wasm32")] { use async_executors::{Bindgen, LocalSpawnHandleExt, SpawnHandleExt}; - pub fn spawn(future: impl Future + Send + 'static) -> MustJoinHandle + pub fn spawn(_name: &str, future: impl Future + Send + 'static) -> MustJoinHandle where Out: Send + 'static, { @@ -15,7 +15,7 @@ cfg_if! { ) } - pub fn spawn_local(future: impl Future + 'static) -> MustJoinHandle + pub fn spawn_local(_name: &str, future: impl Future + 'static) -> MustJoinHandle where Out: 'static, { @@ -26,7 +26,7 @@ cfg_if! { ) } - pub fn spawn_detached(future: impl Future + Send + 'static) + pub fn spawn_detached(_name: &str, future: impl Future + Send + 'static) where Out: Send + 'static, { @@ -35,7 +35,7 @@ cfg_if! { .expect("wasm-bindgen-futures spawn_handle_local should never error out") .detach() } - pub fn spawn_detached_local(future: impl Future + 'static) + pub fn spawn_detached_local(_name: &str, future: impl Future + 'static) where Out: 'static, { @@ -47,60 +47,60 @@ cfg_if! { } else { - pub fn spawn(future: impl Future + Send + 'static) -> MustJoinHandle + pub fn spawn(name: &str, future: impl Future + Send + 'static) -> MustJoinHandle where Out: Send + 'static, { cfg_if! { if #[cfg(feature="rt-async-std")] { - MustJoinHandle::new(async_std::task::spawn(future)) + MustJoinHandle::new(async_std::task::Builder::new().name(name).spawn(future).unwrap()) } else if #[cfg(feature="rt-tokio")] { - MustJoinHandle::new(tokio::task::spawn(future)) + MustJoinHandle::new(tokio::task::Builder::new().name(name).spawn(future).unwrap()) } } } - pub fn spawn_local(future: impl Future + 'static) -> MustJoinHandle + pub fn spawn_local(name: &str, future: impl Future + 'static) -> MustJoinHandle where Out: 'static, { cfg_if! { if #[cfg(feature="rt-async-std")] { - MustJoinHandle::new(async_std::task::spawn_local(future)) + MustJoinHandle::new(async_std::task::Builder::new().name(name).local(future).unwrap()) } else if #[cfg(feature="rt-tokio")] { - MustJoinHandle::new(tokio::task::spawn_local(future)) + MustJoinHandle::new(tokio::task::Builder::new().name(name).spawn_local(future).unwrap()) } } } - pub fn spawn_detached(future: impl Future + Send + 'static) + pub fn spawn_detached(name: &str, future: impl Future + Send + 'static) where Out: Send + 'static, { cfg_if! { if #[cfg(feature="rt-async-std")] { - drop(async_std::task::spawn(future)); + drop(async_std::task::Builder::new().name(name).spawn(future).unwrap()); } else if #[cfg(feature="rt-tokio")] { - drop(tokio::task::spawn(future)); + drop(tokio::task::Builder::new().name(name).spawn(future).unwrap()); } } } - pub fn spawn_detached_local(future: impl Future + 'static) + pub fn spawn_detached_local(name: &str,future: impl Future + 'static) where Out: 'static, { cfg_if! { if #[cfg(feature="rt-async-std")] { - drop(async_std::task::spawn_local(future)); + drop(async_std::task::Builder::new().name(name).local(future).unwrap()); } else if #[cfg(feature="rt-tokio")] { - drop(tokio::task::spawn_local(future)); + drop(tokio::task::Builder::new().name(name).spawn_local(future).unwrap()); } } } #[allow(unused_variables)] - pub async fn blocking_wrapper(blocking_task: F, err_result: R) -> R + pub async fn blocking_wrapper(name: &str, blocking_task: F, err_result: R) -> R where F: FnOnce() -> R + Send + 'static, R: Send + 'static, @@ -108,9 +108,9 @@ cfg_if! { // run blocking stuff in blocking thread cfg_if! { if #[cfg(feature="rt-async-std")] { - async_std::task::spawn_blocking(blocking_task).await + async_std::task::Builder::new().name(name).blocking(blocking_task) } else if #[cfg(feature="rt-tokio")] { - tokio::task::spawn_blocking(blocking_task).await.unwrap_or(err_result) + tokio::task::Builder::new().name(name).spawn_blocking(blocking_task).unwrap().await.unwrap_or(err_result) } else { #[compile_error("must use an executor")] } diff --git a/veilid-tools/src/tests/common/test_async_tag_lock.rs b/veilid-tools/src/tests/common/test_async_tag_lock.rs index caef3001..1789cd16 100644 --- a/veilid-tools/src/tests/common/test_async_tag_lock.rs +++ b/veilid-tools/src/tests/common/test_async_tag_lock.rs @@ -35,7 +35,7 @@ pub async fn test_simple_single_contention() { let g1 = table.lock_tag(a1).await; info!("locked"); - let t1 = spawn(async move { + let t1 = spawn("t1", async move { // move the guard into the task let _g1_take = g1; // hold the guard for a bit @@ -90,7 +90,7 @@ pub async fn test_simple_double_contention() { let g2 = table.lock_tag(a2).await; info!("locked"); - let t1 = spawn(async move { + let t1 = spawn("t1", async move { // move the guard into the tas let _g1_take = g1; // hold the guard for a bit @@ -99,7 +99,7 @@ pub async fn test_simple_double_contention() { // release the guard info!("released"); }); - let t2 = spawn(async move { + let t2 = spawn("t2", async move { // move the guard into the task let _g2_take = g2; // hold the guard for a bit @@ -131,7 +131,7 @@ pub async fn test_parallel_single_contention() { let a1 = SocketAddr::new("1.2.3.4".parse().unwrap(), 1234); let table1 = table.clone(); - let t1 = spawn(async move { + let t1 = spawn("t1", async move { // lock the tag let _g = table1.lock_tag(a1).await; info!("locked t1"); @@ -143,7 +143,7 @@ pub async fn test_parallel_single_contention() { }); let table2 = table.clone(); - let t2 = spawn(async move { + let t2 = spawn("t2", async move { // lock the tag let _g = table2.lock_tag(a1).await; info!("locked t2"); @@ -155,7 +155,7 @@ pub async fn test_parallel_single_contention() { }); let table3 = table.clone(); - let t3 = spawn(async move { + let t3 = spawn("t3", async move { // lock the tag let _g = table3.lock_tag(a1).await; info!("locked t3"); diff --git a/veilid-tools/src/tests/common/test_host_interface.rs b/veilid-tools/src/tests/common/test_host_interface.rs index f694e866..014af920 100644 --- a/veilid-tools/src/tests/common/test_host_interface.rs +++ b/veilid-tools/src/tests/common/test_host_interface.rs @@ -30,7 +30,7 @@ pub async fn test_eventual() { let i4 = e1.instance_clone(4u32); drop(i2); - let jh = spawn(async move { + let jh = spawn("task", async move { sleep(1000).await; e1.resolve(); }); @@ -47,7 +47,7 @@ pub async fn test_eventual() { let i3 = e1.instance_clone(3u32); let i4 = e1.instance_clone(4u32); let e1_c1 = e1.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { let i5 = e1.instance_clone(5u32); let i6 = e1.instance_clone(6u32); assert_eq!(i1.await, 1u32); @@ -67,7 +67,7 @@ pub async fn test_eventual() { let i1 = e1.instance_clone(1u32); let i2 = e1.instance_clone(2u32); let e1_c1 = e1.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { assert_eq!(i1.await, 1u32); assert_eq!(i2.await, 2u32); }); @@ -80,7 +80,7 @@ pub async fn test_eventual() { // let j1 = e1.instance_clone(1u32); let j2 = e1.instance_clone(2u32); - let jh = spawn(async move { + let jh = spawn("task", async move { assert_eq!(j1.await, 1u32); assert_eq!(j2.await, 2u32); }); @@ -105,7 +105,7 @@ pub async fn test_eventual_value() { drop(i2); let e1_c1 = e1.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { sleep(1000).await; e1_c1.resolve(3u32); }); @@ -122,7 +122,7 @@ pub async fn test_eventual_value() { let i3 = e1.instance(); let i4 = e1.instance(); let e1_c1 = e1.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { let i5 = e1.instance(); let i6 = e1.instance(); i1.await; @@ -144,7 +144,7 @@ pub async fn test_eventual_value() { let i1 = e1.instance(); let i2 = e1.instance(); let e1_c1 = e1.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { i1.await; i2.await; }); @@ -157,7 +157,7 @@ pub async fn test_eventual_value() { // let j1 = e1.instance(); let j2 = e1.instance(); - let jh = spawn(async move { + let jh = spawn("task", async move { j1.await; j2.await; }); @@ -181,7 +181,7 @@ pub async fn test_eventual_value_clone() { let i4 = e1.instance(); drop(i2); - let jh = spawn(async move { + let jh = spawn("task", async move { sleep(1000).await; e1.resolve(3u32); }); @@ -199,7 +199,7 @@ pub async fn test_eventual_value_clone() { let i3 = e1.instance(); let i4 = e1.instance(); let e1_c1 = e1.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { let i5 = e1.instance(); let i6 = e1.instance(); assert_eq!(i1.await, 4); @@ -220,7 +220,7 @@ pub async fn test_eventual_value_clone() { let i1 = e1.instance(); let i2 = e1.instance(); let e1_c1 = e1.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { assert_eq!(i1.await, 5); assert_eq!(i2.await, 5); }); @@ -231,7 +231,7 @@ pub async fn test_eventual_value_clone() { // let j1 = e1.instance(); let j2 = e1.instance(); - let jh = spawn(async move { + let jh = spawn("task", async move { assert_eq!(j1.await, 6); assert_eq!(j2.await, 6); }); @@ -245,7 +245,7 @@ pub async fn test_interval() { info!("testing interval"); let tick: Arc> = Arc::new(Mutex::new(0u32)); - let stopper = interval(1000, move || { + let stopper = interval("interval", 1000, move || { let tick = tick.clone(); async move { let mut tick = tick.lock(); @@ -493,7 +493,7 @@ pub async fn test_must_join_single_future() { let sf = MustJoinSingleFuture::::new(); assert_eq!(sf.check().await, Ok(None)); assert_eq!( - sf.single_spawn(async { + sf.single_spawn("t1", async { sleep(2000).await; 69 }) @@ -501,10 +501,13 @@ pub async fn test_must_join_single_future() { Ok((None, true)) ); assert_eq!(sf.check().await, Ok(None)); - assert_eq!(sf.single_spawn(async { panic!() }).await, Ok((None, false))); + assert_eq!( + sf.single_spawn("t2", async { panic!() }).await, + Ok((None, false)) + ); assert_eq!(sf.join().await, Ok(Some(69))); assert_eq!( - sf.single_spawn(async { + sf.single_spawn("t3", async { sleep(1000).await; 37 }) @@ -513,7 +516,7 @@ pub async fn test_must_join_single_future() { ); sleep(2000).await; assert_eq!( - sf.single_spawn(async { + sf.single_spawn("t4", async { sleep(1000).await; 27 }) diff --git a/veilid-tools/src/tests/common/test_startup_lock.rs b/veilid-tools/src/tests/common/test_startup_lock.rs index d8823839..4efb1bf7 100644 --- a/veilid-tools/src/tests/common/test_startup_lock.rs +++ b/veilid-tools/src/tests/common/test_startup_lock.rs @@ -62,7 +62,7 @@ pub async fn test_contention() { assert!(lock.is_started()); let lock2 = lock.clone(); let val2 = val.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { let _guard = lock2.enter().expect("should enter"); sleep(2000).await; val2.store(true, Ordering::Release); @@ -95,7 +95,7 @@ pub async fn test_bad_enter() { assert!(!lock.is_shut_down()); let lock2 = lock.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { let guard = lock2.shutdown().await.expect("should shutdown"); sleep(2000).await; guard.success(); @@ -139,7 +139,7 @@ pub async fn test_multiple_enter() { //eprintln!("1"); let lock2 = lock.clone(); - let jh = spawn(async move { + let jh = spawn("task", async move { //eprintln!("2"); let guard = lock2.shutdown().await.expect("should shutdown"); //eprintln!("7"); diff --git a/veilid-tools/src/tick_task.rs b/veilid-tools/src/tick_task.rs index 339bd49d..d0f5d9fe 100644 --- a/veilid-tools/src/tick_task.rs +++ b/veilid-tools/src/tick_task.rs @@ -10,6 +10,7 @@ type TickTaskRoutine = /// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again. /// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity. pub struct TickTask { + name: String, last_timestamp_us: AtomicU64, tick_period_us: u64, routine: OnceCell>>, @@ -19,8 +20,9 @@ pub struct TickTask { } impl TickTask { - pub fn new_us(tick_period_us: u64) -> Self { + pub fn new_us(name: &str, tick_period_us: u64) -> Self { Self { + name: name.to_string(), last_timestamp_us: AtomicU64::new(0), tick_period_us, routine: OnceCell::new(), @@ -29,8 +31,9 @@ impl TickTask { running: Arc::new(AtomicBool::new(false)), } } - pub fn new_ms(tick_period_ms: u32) -> Self { + pub fn new_ms(name: &str, tick_period_ms: u32) -> Self { Self { + name: name.to_string(), last_timestamp_us: AtomicU64::new(0), tick_period_us: (tick_period_ms as u64) * 1000u64, routine: OnceCell::new(), @@ -39,8 +42,9 @@ impl TickTask { running: Arc::new(AtomicBool::new(false)), } } - pub fn new(tick_period_sec: u32) -> Self { + pub fn new(name: &str, tick_period_sec: u32) -> Self { Self { + name: name.to_string(), last_timestamp_us: AtomicU64::new(0), tick_period_us: (tick_period_sec as u64) * 1000000u64, routine: OnceCell::new(), @@ -147,7 +151,11 @@ impl TickTask { running.store(false, core::sync::atomic::Ordering::Release); out }); - match self.single_future.single_spawn(wrapped_routine).await { + match self + .single_future + .single_spawn(&self.name, wrapped_routine) + .await + { // We should have already consumed the result of the last run, or there was none // and we should definitely have run, because the prior 'check()' operation // should have ensured the singlefuture was ready to run