mirror of
https://gitlab.com/veilid/veilid.git
synced 2025-02-25 18:55:38 -06:00
add StartupDisposition to handle waiting for binding without reporting errors
This commit is contained in:
parent
8c297acdee
commit
1048fc6bb9
@ -219,16 +219,27 @@ impl AttachmentManager {
|
||||
let netman = self.network_manager();
|
||||
|
||||
let mut restart;
|
||||
loop {
|
||||
let mut restart_delay;
|
||||
while self.inner.lock().maintain_peers {
|
||||
restart = false;
|
||||
if let Err(err) = netman.startup().await {
|
||||
restart_delay = 1;
|
||||
|
||||
match netman.startup().await {
|
||||
Err(err) => {
|
||||
error!("network startup failed: {}", err);
|
||||
netman.shutdown().await;
|
||||
restart = true;
|
||||
} else {
|
||||
}
|
||||
Ok(StartupDisposition::BindRetry) => {
|
||||
info!("waiting for network to bind...");
|
||||
restart = true;
|
||||
restart_delay = 10;
|
||||
}
|
||||
Ok(StartupDisposition::Success) => {
|
||||
log_net!(debug "started maintaining peers");
|
||||
|
||||
while self.inner.lock().maintain_peers {
|
||||
// tick network manager
|
||||
let next_tick_ts = get_timestamp() + 1_000_000u64;
|
||||
if let Err(err) = netman.tick().await {
|
||||
error!("Error in network manager: {}", err);
|
||||
self.inner.lock().maintain_peers = false;
|
||||
@ -248,7 +259,10 @@ impl AttachmentManager {
|
||||
self.update_attachment();
|
||||
|
||||
// sleep should be at the end in case maintain_peers changes state
|
||||
sleep(1000).await;
|
||||
let wait_duration = next_tick_ts
|
||||
.saturating_sub(get_timestamp())
|
||||
.clamp(0, 1_000_000u64);
|
||||
sleep((wait_duration / 1_000) as u32).await;
|
||||
}
|
||||
log_net!(debug "stopped maintaining peers");
|
||||
|
||||
@ -260,15 +274,22 @@ impl AttachmentManager {
|
||||
log_net!(debug "stopping network");
|
||||
netman.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
if !restart {
|
||||
break;
|
||||
}
|
||||
|
||||
log_net!(debug "completely restarting attachment");
|
||||
|
||||
// chill out for a second first, give network stack time to settle out
|
||||
for _ in 0..restart_delay {
|
||||
if !self.inner.lock().maintain_peers {
|
||||
break;
|
||||
}
|
||||
sleep(1000).await;
|
||||
}
|
||||
}
|
||||
|
||||
self.update_attaching_detaching_state(AttachmentState::Detached);
|
||||
log_net!(debug "attachment stopped");
|
||||
|
@ -136,6 +136,12 @@ enum SendDataToExistingFlowResult {
|
||||
NotSent(Vec<u8>),
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
|
||||
pub enum StartupDisposition {
|
||||
Success,
|
||||
BindRetry,
|
||||
}
|
||||
|
||||
// The mutable state of the network manager
|
||||
struct NetworkManagerInner {
|
||||
stats: NetworkManagerStats,
|
||||
@ -388,10 +394,10 @@ impl NetworkManager {
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, err)]
|
||||
pub async fn internal_startup(&self) -> EyreResult<()> {
|
||||
pub async fn internal_startup(&self) -> EyreResult<StartupDisposition> {
|
||||
if self.unlocked_inner.components.read().is_some() {
|
||||
log_net!(debug "NetworkManager::internal_startup already started");
|
||||
return Ok(());
|
||||
return Ok(StartupDisposition::Success);
|
||||
}
|
||||
|
||||
// Clean address filter for things that should not be persistent
|
||||
@ -423,26 +429,37 @@ impl NetworkManager {
|
||||
|
||||
// Start network components
|
||||
connection_manager.startup().await;
|
||||
net.startup().await?;
|
||||
match net.startup().await? {
|
||||
StartupDisposition::Success => {}
|
||||
StartupDisposition::BindRetry => {
|
||||
return Ok(StartupDisposition::BindRetry);
|
||||
}
|
||||
}
|
||||
rpc_processor.startup().await?;
|
||||
receipt_manager.startup().await?;
|
||||
|
||||
log_net!("NetworkManager::internal_startup end");
|
||||
|
||||
Ok(())
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all, err)]
|
||||
pub async fn startup(&self) -> EyreResult<()> {
|
||||
if let Err(e) = self.internal_startup().await {
|
||||
self.shutdown().await;
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
|
||||
match self.internal_startup().await {
|
||||
Ok(StartupDisposition::Success) => {
|
||||
// Inform api clients that things have changed
|
||||
self.send_network_update();
|
||||
|
||||
Ok(())
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
Ok(StartupDisposition::BindRetry) => {
|
||||
self.shutdown().await;
|
||||
Ok(StartupDisposition::BindRetry)
|
||||
}
|
||||
Err(e) => {
|
||||
self.shutdown().await;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", skip_all)]
|
||||
|
@ -709,10 +709,7 @@ impl Network {
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
||||
#[instrument(level = "debug", err, skip_all)]
|
||||
pub async fn startup(&self) -> EyreResult<()> {
|
||||
self.inner.lock().network_started = None;
|
||||
let startup_func = async {
|
||||
pub async fn startup_internal(&self) -> EyreResult<StartupDisposition> {
|
||||
// initialize interfaces
|
||||
self.unlocked_inner.interfaces.refresh().await?;
|
||||
|
||||
@ -861,23 +858,36 @@ impl Network {
|
||||
|
||||
// start listeners
|
||||
if protocol_config.inbound.contains(ProtocolType::UDP) {
|
||||
self.bind_udp_protocol_handlers(
|
||||
&mut editor_public_internet,
|
||||
&mut editor_local_network,
|
||||
)
|
||||
.await?;
|
||||
let res = self
|
||||
.bind_udp_protocol_handlers(&mut editor_public_internet, &mut editor_local_network)
|
||||
.await;
|
||||
if !matches!(res, Ok(StartupDisposition::Success)) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
if protocol_config.inbound.contains(ProtocolType::WS) {
|
||||
self.start_ws_listeners(&mut editor_public_internet, &mut editor_local_network)
|
||||
.await?;
|
||||
let res = self
|
||||
.start_ws_listeners(&mut editor_public_internet, &mut editor_local_network)
|
||||
.await;
|
||||
if !matches!(res, Ok(StartupDisposition::Success)) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
if protocol_config.inbound.contains(ProtocolType::WSS) {
|
||||
self.start_wss_listeners(&mut editor_public_internet, &mut editor_local_network)
|
||||
.await?;
|
||||
let res = self
|
||||
.start_wss_listeners(&mut editor_public_internet, &mut editor_local_network)
|
||||
.await;
|
||||
if !matches!(res, Ok(StartupDisposition::Success)) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
if protocol_config.inbound.contains(ProtocolType::TCP) {
|
||||
self.start_tcp_listeners(&mut editor_public_internet, &mut editor_local_network)
|
||||
.await?;
|
||||
let res = self
|
||||
.start_tcp_listeners(&mut editor_public_internet, &mut editor_local_network)
|
||||
.await;
|
||||
if !matches!(res, Ok(StartupDisposition::Success)) {
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
editor_public_internet.setup_network(
|
||||
@ -907,18 +917,30 @@ impl Network {
|
||||
editor_public_internet.commit(true).await;
|
||||
editor_local_network.commit(true).await;
|
||||
|
||||
Ok(())
|
||||
};
|
||||
let res = startup_func.await;
|
||||
if res.is_err() {
|
||||
info!("network failed to start");
|
||||
self.inner.lock().network_started = Some(false);
|
||||
return res;
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
|
||||
#[instrument(level = "debug", err, skip_all)]
|
||||
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
|
||||
self.inner.lock().network_started = None;
|
||||
|
||||
match self.startup_internal().await {
|
||||
Ok(StartupDisposition::Success) => {
|
||||
info!("network started");
|
||||
self.inner.lock().network_started = Some(true);
|
||||
Ok(())
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
Ok(StartupDisposition::BindRetry) => {
|
||||
debug!("network bind retry");
|
||||
self.inner.lock().network_started = Some(false);
|
||||
Ok(StartupDisposition::BindRetry)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("network failed to start");
|
||||
self.inner.lock().network_started = Some(false);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn needs_restart(&self) -> bool {
|
||||
|
@ -349,7 +349,7 @@ impl Network {
|
||||
bind_set: NetworkBindSet,
|
||||
is_tls: bool,
|
||||
new_protocol_accept_handler: Box<NewProtocolAcceptHandler>,
|
||||
) -> EyreResult<Vec<SocketAddress>> {
|
||||
) -> EyreResult<Option<Vec<SocketAddress>>> {
|
||||
let mut out = Vec::<SocketAddress>::new();
|
||||
|
||||
for ip_addr in bind_set.addrs {
|
||||
@ -404,7 +404,8 @@ impl Network {
|
||||
}
|
||||
|
||||
if !bind_set.search {
|
||||
bail!("unable to bind to tcp {}", addr);
|
||||
log_net!(debug "unable to bind to tcp {}", addr);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if port == 65535u16 {
|
||||
@ -419,6 +420,6 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(out)
|
||||
Ok(Some(out))
|
||||
}
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ impl Network {
|
||||
pub(super) async fn create_udp_protocol_handlers(
|
||||
&self,
|
||||
bind_set: NetworkBindSet,
|
||||
) -> EyreResult<Vec<DialInfo>> {
|
||||
) -> EyreResult<Option<Vec<DialInfo>>> {
|
||||
let mut out = Vec::<DialInfo>::new();
|
||||
|
||||
for ip_addr in bind_set.addrs {
|
||||
@ -175,7 +175,8 @@ impl Network {
|
||||
}
|
||||
|
||||
if !bind_set.search {
|
||||
bail!("unable to bind to udp {}", addr);
|
||||
log_net!(debug "unable to bind to udp {}", addr);
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if port == 65535u16 {
|
||||
@ -189,7 +190,7 @@ impl Network {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
Ok(Some(out))
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
@ -140,7 +140,7 @@ impl Network {
|
||||
&self,
|
||||
editor_public_internet: &mut RoutingDomainEditor,
|
||||
editor_local_network: &mut RoutingDomainEditor,
|
||||
) -> EyreResult<()> {
|
||||
) -> EyreResult<StartupDisposition> {
|
||||
log_net!("UDP: binding protocol handlers");
|
||||
let routing_table = self.routing_table();
|
||||
let (listen_address, public_address, detect_address_changes) = {
|
||||
@ -170,7 +170,10 @@ impl Network {
|
||||
);
|
||||
}
|
||||
|
||||
let mut local_dial_info_list = self.create_udp_protocol_handlers(bind_set).await?;
|
||||
let Some(mut local_dial_info_list) = self.create_udp_protocol_handlers(bind_set).await?
|
||||
else {
|
||||
return Ok(StartupDisposition::BindRetry);
|
||||
};
|
||||
local_dial_info_list.sort();
|
||||
|
||||
let mut static_public = false;
|
||||
@ -241,14 +244,16 @@ impl Network {
|
||||
}
|
||||
|
||||
// Now create tasks for udp listeners
|
||||
self.create_udp_listener_tasks().await
|
||||
self.create_udp_listener_tasks().await?;
|
||||
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
|
||||
pub(super) async fn start_ws_listeners(
|
||||
&self,
|
||||
editor_public_internet: &mut RoutingDomainEditor,
|
||||
editor_local_network: &mut RoutingDomainEditor,
|
||||
) -> EyreResult<()> {
|
||||
) -> EyreResult<StartupDisposition> {
|
||||
log_net!("WS: binding protocol handlers");
|
||||
let routing_table = self.routing_table();
|
||||
let (listen_address, url, path, detect_address_changes) = {
|
||||
@ -277,13 +282,16 @@ impl Network {
|
||||
bind_set.port, bind_set.addrs
|
||||
);
|
||||
}
|
||||
let socket_addresses = self
|
||||
let Some(socket_addresses) = self
|
||||
.start_tcp_listener(
|
||||
bind_set,
|
||||
false,
|
||||
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
else {
|
||||
return Ok(StartupDisposition::BindRetry);
|
||||
};
|
||||
log_net!("WS: protocol handlers started on {:#?}", socket_addresses);
|
||||
|
||||
let mut static_public = false;
|
||||
@ -353,14 +361,14 @@ impl Network {
|
||||
Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WS));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
|
||||
pub(super) async fn start_wss_listeners(
|
||||
&self,
|
||||
editor_public_internet: &mut RoutingDomainEditor,
|
||||
editor_local_network: &mut RoutingDomainEditor,
|
||||
) -> EyreResult<()> {
|
||||
) -> EyreResult<StartupDisposition> {
|
||||
log_net!("WSS: binding protocol handlers");
|
||||
|
||||
let (listen_address, url, _detect_address_changes) = {
|
||||
@ -389,13 +397,17 @@ impl Network {
|
||||
);
|
||||
}
|
||||
|
||||
let socket_addresses = self
|
||||
let Some(socket_addresses) = self
|
||||
.start_tcp_listener(
|
||||
bind_set,
|
||||
true,
|
||||
Box::new(|c, t| Box::new(WebsocketProtocolHandler::new(c, t))),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
else {
|
||||
return Ok(StartupDisposition::BindRetry);
|
||||
};
|
||||
|
||||
log_net!("WSS: protocol handlers started on {:#?}", socket_addresses);
|
||||
|
||||
// NOTE: No interface dial info for WSS, as there is no way to connect to a local dialinfo via TLS
|
||||
@ -448,14 +460,14 @@ impl Network {
|
||||
Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::WSS));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
|
||||
pub(super) async fn start_tcp_listeners(
|
||||
&self,
|
||||
editor_public_internet: &mut RoutingDomainEditor,
|
||||
editor_local_network: &mut RoutingDomainEditor,
|
||||
) -> EyreResult<()> {
|
||||
) -> EyreResult<StartupDisposition> {
|
||||
log_net!("TCP: binding protocol handlers");
|
||||
|
||||
let routing_table = self.routing_table();
|
||||
@ -484,13 +496,17 @@ impl Network {
|
||||
bind_set.port, bind_set.addrs
|
||||
);
|
||||
}
|
||||
let socket_addresses = self
|
||||
let Some(socket_addresses) = self
|
||||
.start_tcp_listener(
|
||||
bind_set,
|
||||
false,
|
||||
Box::new(|c, _| Box::new(RawTcpProtocolHandler::new(c))),
|
||||
)
|
||||
.await?;
|
||||
.await?
|
||||
else {
|
||||
return Ok(StartupDisposition::BindRetry);
|
||||
};
|
||||
|
||||
log_net!("TCP: protocol handlers started on {:#?}", socket_addresses);
|
||||
|
||||
let mut static_public = false;
|
||||
@ -546,6 +562,6 @@ impl Network {
|
||||
Self::add_preferred_local_address(&mut inner, PeerAddress::new(sa, ProtocolType::TCP));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
}
|
||||
|
@ -333,9 +333,7 @@ impl Network {
|
||||
|
||||
/////////////////////////////////////////////////////////////////
|
||||
|
||||
pub async fn startup(&self) -> EyreResult<()> {
|
||||
self.inner.lock().network_started = None;
|
||||
let startup_func = async {
|
||||
pub async fn startup_internal(&self) -> EyreResult<StartupDisposition> {
|
||||
log_net!(debug "starting network");
|
||||
// get protocol config
|
||||
let protocol_config = {
|
||||
@ -397,19 +395,30 @@ impl Network {
|
||||
|
||||
// commit routing table edits
|
||||
editor_public_internet.commit(true).await;
|
||||
Ok(())
|
||||
};
|
||||
|
||||
let res = startup_func.await;
|
||||
if res.is_err() {
|
||||
info!("network failed to start");
|
||||
self.inner.lock().network_started = Some(false);
|
||||
return res;
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
|
||||
pub async fn startup(&self) -> EyreResult<StartupDisposition> {
|
||||
self.inner.lock().network_started = None;
|
||||
|
||||
match self.startup_internal().await {
|
||||
Ok(StartupDisposition::Success) => {
|
||||
info!("network started");
|
||||
self.inner.lock().network_started = Some(true);
|
||||
Ok(())
|
||||
Ok(StartupDisposition::Success)
|
||||
}
|
||||
Ok(StartupDisposition::BindRetry) => {
|
||||
debug!("network bind retry");
|
||||
self.inner.lock().network_started = Some(false);
|
||||
Ok(StartupDisposition::BindRetry)
|
||||
}
|
||||
Err(e) => {
|
||||
debug!("network failed to start");
|
||||
self.inner.lock().network_started = Some(false);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn needs_restart(&self) -> bool {
|
||||
|
@ -149,6 +149,7 @@ impl RoutingDomainEditor {
|
||||
address_type,
|
||||
protocol_type,
|
||||
} => {
|
||||
if !detail.common_mut().dial_info_details().is_empty() {
|
||||
if address_type.is_some() || protocol_type.is_some() {
|
||||
info!(
|
||||
"[{:?}] cleared dial info: {}:{}",
|
||||
@ -163,13 +164,16 @@ impl RoutingDomainEditor {
|
||||
} else {
|
||||
info!("[{:?}] cleared all dial info", self.routing_domain);
|
||||
}
|
||||
}
|
||||
detail
|
||||
.common_mut()
|
||||
.clear_dial_info_details(address_type, protocol_type);
|
||||
peer_info_changed = true;
|
||||
}
|
||||
RoutingDomainChange::ClearRelayNode => {
|
||||
if detail.common_mut().relay_node().is_some() {
|
||||
info!("[{:?}] cleared relay node", self.routing_domain);
|
||||
}
|
||||
detail.common_mut().set_relay_node(None);
|
||||
peer_info_changed = true;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user