From fdead16fc8f4c8aac8a7f91c45d5385ef7a20964 Mon Sep 17 00:00:00 2001 From: Christien Rioux Date: Fri, 19 Jul 2024 14:33:31 -0400 Subject: [PATCH] wasm startup lock --- veilid-core/src/network_manager/wasm/mod.rs | 67 ++++++++++++++++----- 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/veilid-core/src/network_manager/wasm/mod.rs b/veilid-core/src/network_manager/wasm/mod.rs index 0340c9f7..25890b00 100644 --- a/veilid-core/src/network_manager/wasm/mod.rs +++ b/veilid-core/src/network_manager/wasm/mod.rs @@ -58,6 +58,9 @@ struct NetworkInner { } struct NetworkUnlockedInner { + // Startup lock + startup_lock: StartupLock, + // Accessors routing_table: RoutingTable, network_manager: NetworkManager, @@ -86,6 +89,7 @@ impl Network { connection_manager: ConnectionManager, ) -> NetworkUnlockedInner { NetworkUnlockedInner { + startup_lock: StartupLock::new(), network_manager, routing_table, connection_manager, @@ -121,7 +125,7 @@ impl Network { ///////////////////////////////////////////////////////////////// // Record DialInfo failures - pub async fn record_dial_info_failure>>>( + async fn record_dial_info_failure>>>( &self, dial_info: DialInfo, fut: F, @@ -135,12 +139,18 @@ impl Network { Ok(network_result) } - #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] + // Send data to a dial info, unbound, using a new connection from a random port + // This creates a short-lived connection in the case of connection-oriented protocols + // for the purpose of sending this one message. + // This bypasses the connection table as it is not a 'node to node' connection. + #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))] pub async fn send_data_unbound_to_dial_info( &self, dial_info: DialInfo, data: Vec, ) -> EyreResult> { + let _guard = self.unlocked_inner.startup_lock.enter()?; + self.record_dial_info_failure(dial_info.clone(), async move { let data_len = data.len(); let timeout_ms = { @@ -187,13 +197,15 @@ impl Network { // This creates a short-lived connection in the case of connection-oriented protocols // for the purpose of sending this one message. // This bypasses the connection table as it is not a 'node to node' connection. - #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] + #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))] pub async fn send_recv_data_unbound_to_dial_info( &self, dial_info: DialInfo, data: Vec, timeout_ms: u32, ) -> EyreResult>> { + let _guard = self.unlocked_inner.startup_lock.enter()?; + self.record_dial_info_failure(dial_info.clone(), async move { let data_len = data.len(); let connect_timeout_ms = { @@ -247,12 +259,14 @@ impl Network { .await } - #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] + #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))] pub async fn send_data_to_existing_flow( &self, flow: Flow, data: Vec, ) -> EyreResult { + let _guard = self.unlocked_inner.startup_lock.enter()?; + let data_len = data.len(); match flow.protocol_type() { ProtocolType::UDP => { @@ -292,12 +306,16 @@ impl Network { Ok(SendDataToExistingFlowResult::NotSent(data)) } - #[cfg_attr(feature="verbose-tracing", instrument(level="trace", err, skip(self, data), fields(data.len = data.len())))] + // Send data directly to a dial info, possibly without knowing which node it is going to + // Returns a flow for the connection used to send the data + #[instrument(level="trace", target="net", err, skip(self, data), fields(data.len = data.len()))] pub async fn send_data_to_dial_info( &self, dial_info: DialInfo, data: Vec, ) -> EyreResult> { + let _guard = self.unlocked_inner.startup_lock.enter()?; + self.record_dial_info_failure(dial_info.clone(), async move { let data_len = data.len(); if dial_info.protocol_type() == ProtocolType::UDP { @@ -399,23 +417,22 @@ impl Network { Ok(StartupDisposition::Success) } + #[instrument(level = "debug", err, skip_all)] pub async fn startup(&self) -> EyreResult { - self.inner.lock().network_started = None; + let guard = self.unlocked_inner.startup_lock.startup()?; match self.startup_internal().await { Ok(StartupDisposition::Success) => { info!("network started"); - self.inner.lock().network_started = Some(true); + guard.success(); Ok(StartupDisposition::Success) } Ok(StartupDisposition::BindRetry) => { debug!("network bind retry"); - self.inner.lock().network_started = Some(false); Ok(StartupDisposition::BindRetry) } Err(e) => { debug!("network failed to start"); - self.inner.lock().network_started = Some(false); Err(e) } } @@ -425,16 +442,22 @@ impl Network { self.inner.lock().network_needs_restart } - pub fn is_started(&self) -> Option { - self.inner.lock().network_started + pub fn is_started(&self) -> bool { + self.unlocked_inner.startup_lock.is_started() } + #[instrument(level = "debug", skip_all)] pub fn restart_network(&self) { self.inner.lock().network_needs_restart = true; } + #[instrument(level = "debug", skip_all)] pub async fn shutdown(&self) { - log_net!(debug "stopping network"); + log_net!(debug "starting low level network shutdown"); + let Ok(guard) = self.unlocked_inner.startup_lock.shutdown().await else { + log_net!(debug "low level network is already shut down"); + return; + }; // Reset state let routing_table = self.routing_table(); @@ -451,7 +474,8 @@ impl Network { // Cancels all async background tasks by dropping join handles *self.inner.lock() = Self::new_inner(); - log_net!(debug "network stopped"); + guard.success(); + log_net!(debug "finished low level network shutdown"); } pub fn get_preferred_local_address(&self, _dial_info: &DialInfo) -> Option { @@ -472,15 +496,28 @@ impl Network { &self, _punishment: Option>, ) { - // + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return; + }; } pub fn needs_public_dial_info_check(&self) -> bool { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return false; + }; + false } ////////////////////////////////////////// - pub async fn tick(&self) -> EyreResult<()> { + pub(crate) async fn tick(&self) -> EyreResult<()> { + let Ok(_guard) = self.unlocked_inner.startup_lock.enter() else { + log_net!(debug "ignoring due to not started up"); + return Ok(()); + }; + Ok(()) } }