dont run routing table ticks that require the network until it has started up

This commit is contained in:
Christien Rioux 2024-05-19 10:49:37 -04:00 committed by k8wu
parent b37e2cc3c9
commit e82cdbbfce
8 changed files with 283 additions and 243 deletions

View File

@ -237,7 +237,7 @@ impl AttachmentManager {
} }
// see if we need to restart the network // see if we need to restart the network
if netman.needs_restart() { if netman.network_needs_restart() {
info!("Restarting network"); info!("Restarting network");
restart = true; restart = true;
break; break;

View File

@ -117,8 +117,9 @@ pub(crate) enum NodeContactMethod {
/// Must use outbound relay to reach the node /// Must use outbound relay to reach the node
OutboundRelay(NodeRef), OutboundRelay(NodeRef),
} }
#[derive(Copy, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)]
struct NodeContactMethodCacheKey { struct NodeContactMethodCacheKey {
node_ids: TypedKeyGroup,
own_node_info_ts: Timestamp, own_node_info_ts: Timestamp,
target_node_info_ts: Timestamp, target_node_info_ts: Timestamp,
target_node_ref_filter: Option<NodeRefFilter>, target_node_ref_filter: Option<NodeRefFilter>,
@ -305,6 +306,13 @@ impl NetworkManager {
.net .net
.clone() .clone()
} }
fn opt_net(&self) -> Option<Network> {
self.unlocked_inner
.components
.read()
.as_ref()
.map(|x| x.net.clone())
}
fn receipt_manager(&self) -> ReceiptManager { fn receipt_manager(&self) -> ReceiptManager {
self.unlocked_inner self.unlocked_inner
.components .components
@ -512,9 +520,16 @@ impl NetworkManager {
} }
} }
pub fn needs_restart(&self) -> bool { pub fn network_needs_restart(&self) -> bool {
let net = self.net(); self.opt_net()
net.needs_restart() .map(|net| net.needs_restart())
.unwrap_or(false)
}
pub fn network_is_started(&self) -> bool {
self.opt_net()
.and_then(|net| net.is_started())
.unwrap_or(false)
} }
pub fn generate_node_status(&self, _routing_domain: RoutingDomain) -> NodeStatus { pub fn generate_node_status(&self, _routing_domain: RoutingDomain) -> NodeStatus {

View File

@ -72,8 +72,8 @@ pub const MAX_CAPABILITIES: usize = 64;
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
struct NetworkInner { struct NetworkInner {
/// true if the low-level network is running /// Some(true) if the low-level network is running, Some(false) if it is not, None if it is in transit
network_started: bool, network_started: Option<bool>,
/// set if the network needs to be restarted due to a low level configuration change /// set if the network needs to be restarted due to a low level configuration change
/// such as dhcp release or change of address or interfaces being added or removed /// such as dhcp release or change of address or interfaces being added or removed
network_needs_restart: bool, network_needs_restart: bool,
@ -137,7 +137,7 @@ pub(in crate::network_manager) struct Network {
impl Network { impl Network {
fn new_inner() -> NetworkInner { fn new_inner() -> NetworkInner {
NetworkInner { NetworkInner {
network_started: false, network_started: Some(false),
network_needs_restart: false, network_needs_restart: false,
needs_public_dial_info_check: false, needs_public_dial_info_check: false,
network_already_cleared: false, network_already_cleared: false,
@ -675,196 +675,209 @@ impl Network {
#[instrument(level = "debug", err, skip_all)] #[instrument(level = "debug", err, skip_all)]
pub async fn startup(&self) -> EyreResult<()> { pub async fn startup(&self) -> EyreResult<()> {
// initialize interfaces self.inner.lock().network_started = None;
self.unlocked_inner.interfaces.refresh().await?; let startup_func = async {
// initialize interfaces
self.unlocked_inner.interfaces.refresh().await?;
// build the set of networks we should consider for the 'LocalNetwork' routing domain // build the set of networks we should consider for the 'LocalNetwork' routing domain
let mut local_networks: HashSet<(IpAddr, IpAddr)> = HashSet::new(); let mut local_networks: HashSet<(IpAddr, IpAddr)> = HashSet::new();
self.unlocked_inner self.unlocked_inner
.interfaces .interfaces
.with_interfaces(|interfaces| { .with_interfaces(|interfaces| {
log_net!(debug "interfaces: {:#?}", interfaces); log_net!(debug "interfaces: {:#?}", interfaces);
for intf in interfaces.values() { for intf in interfaces.values() {
// Skip networks that we should never encounter // Skip networks that we should never encounter
if intf.is_loopback() || !intf.is_running() { if intf.is_loopback() || !intf.is_running() {
continue; continue;
}
// Add network to local networks table
for addr in &intf.addrs {
let netmask = addr.if_addr().netmask();
let network_ip = ipaddr_apply_netmask(addr.if_addr().ip(), netmask);
local_networks.insert((network_ip, netmask));
}
} }
// Add network to local networks table });
for addr in &intf.addrs { let local_networks: Vec<(IpAddr, IpAddr)> = local_networks.into_iter().collect();
let netmask = addr.if_addr().netmask(); self.unlocked_inner
let network_ip = ipaddr_apply_netmask(addr.if_addr().ip(), netmask); .routing_table
local_networks.insert((network_ip, netmask)); .configure_local_network_routing_domain(local_networks);
}
}
});
let local_networks: Vec<(IpAddr, IpAddr)> = local_networks.into_iter().collect();
self.unlocked_inner
.routing_table
.configure_local_network_routing_domain(local_networks);
// determine if we have ipv4/ipv6 addresses // determine if we have ipv4/ipv6 addresses
{ {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.enable_ipv4 = false; inner.enable_ipv4 = false;
for addr in self.get_stable_interface_addresses() { for addr in self.get_stable_interface_addresses() {
if addr.is_ipv4() { if addr.is_ipv4() {
log_net!(debug "enable address {:?} as ipv4", addr); log_net!(debug "enable address {:?} as ipv4", addr);
inner.enable_ipv4 = true; inner.enable_ipv4 = true;
} else if addr.is_ipv6() { } else if addr.is_ipv6() {
let address = Address::from_ip_addr(addr); let address = Address::from_ip_addr(addr);
if address.is_global() { if address.is_global() {
log_net!(debug "enable address {:?} as ipv6 global", address); log_net!(debug "enable address {:?} as ipv6 global", address);
inner.enable_ipv6_global = true; inner.enable_ipv6_global = true;
} else if address.is_local() { } else if address.is_local() {
log_net!(debug "enable address {:?} as ipv6 local", address); log_net!(debug "enable address {:?} as ipv6 local", address);
inner.enable_ipv6_local = true; inner.enable_ipv6_local = true;
}
} }
} }
} }
}
// Build our protocol config to share it with other nodes // Build our protocol config to share it with other nodes
let protocol_config = {
let mut inner = self.inner.lock();
// Create stop source
inner.stop_source = Some(StopSource::new());
// get protocol config
let protocol_config = { let protocol_config = {
let c = self.config.get(); let mut inner = self.inner.lock();
let mut inbound = ProtocolTypeSet::new();
if c.network.protocol.udp.enabled { // Create stop source
inbound.insert(ProtocolType::UDP); inner.stop_source = Some(StopSource::new());
}
if c.network.protocol.tcp.listen {
inbound.insert(ProtocolType::TCP);
}
if c.network.protocol.ws.listen {
inbound.insert(ProtocolType::WS);
}
if c.network.protocol.wss.listen {
inbound.insert(ProtocolType::WSS);
}
let mut outbound = ProtocolTypeSet::new(); // get protocol config
if c.network.protocol.udp.enabled { let protocol_config = {
outbound.insert(ProtocolType::UDP); let c = self.config.get();
} let mut inbound = ProtocolTypeSet::new();
if c.network.protocol.tcp.connect {
outbound.insert(ProtocolType::TCP);
}
if c.network.protocol.ws.connect {
outbound.insert(ProtocolType::WS);
}
if c.network.protocol.wss.connect {
outbound.insert(ProtocolType::WSS);
}
let mut family_global = AddressTypeSet::new(); if c.network.protocol.udp.enabled {
let mut family_local = AddressTypeSet::new(); inbound.insert(ProtocolType::UDP);
if inner.enable_ipv4 { }
family_global.insert(AddressType::IPV4); if c.network.protocol.tcp.listen {
family_local.insert(AddressType::IPV4); inbound.insert(ProtocolType::TCP);
} }
if inner.enable_ipv6_global { if c.network.protocol.ws.listen {
family_global.insert(AddressType::IPV6); inbound.insert(ProtocolType::WS);
} }
if inner.enable_ipv6_local { if c.network.protocol.wss.listen {
family_local.insert(AddressType::IPV6); inbound.insert(ProtocolType::WSS);
} }
// set up the routing table's network config let mut outbound = ProtocolTypeSet::new();
// if we have static public dialinfo, upgrade our network class if c.network.protocol.udp.enabled {
let public_internet_capabilities = { outbound.insert(ProtocolType::UDP);
PUBLIC_INTERNET_CAPABILITIES }
.iter() if c.network.protocol.tcp.connect {
.copied() outbound.insert(ProtocolType::TCP);
.filter(|cap| !c.capabilities.disable.contains(cap)) }
.collect::<Vec<Capability>>() if c.network.protocol.ws.connect {
}; outbound.insert(ProtocolType::WS);
let local_network_capabilities = { }
LOCAL_NETWORK_CAPABILITIES if c.network.protocol.wss.connect {
.iter() outbound.insert(ProtocolType::WSS);
.copied() }
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>() let mut family_global = AddressTypeSet::new();
let mut family_local = AddressTypeSet::new();
if inner.enable_ipv4 {
family_global.insert(AddressType::IPV4);
family_local.insert(AddressType::IPV4);
}
if inner.enable_ipv6_global {
family_global.insert(AddressType::IPV6);
}
if inner.enable_ipv6_local {
family_local.insert(AddressType::IPV6);
}
// set up the routing table's network config
// if we have static public dialinfo, upgrade our network class
let public_internet_capabilities = {
PUBLIC_INTERNET_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
let local_network_capabilities = {
LOCAL_NETWORK_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
ProtocolConfig {
outbound,
inbound,
family_global,
family_local,
public_internet_capabilities,
local_network_capabilities,
}
}; };
inner.protocol_config = protocol_config.clone();
ProtocolConfig { protocol_config
outbound,
inbound,
family_global,
family_local,
public_internet_capabilities,
local_network_capabilities,
}
}; };
inner.protocol_config = protocol_config.clone();
protocol_config // Start editing routing table
}; let mut editor_public_internet = self
.unlocked_inner
.routing_table
.edit_routing_domain(RoutingDomain::PublicInternet);
let mut editor_local_network = self
.unlocked_inner
.routing_table
.edit_routing_domain(RoutingDomain::LocalNetwork);
// Start editing routing table // start listeners
let mut editor_public_internet = self if protocol_config.inbound.contains(ProtocolType::UDP) {
.unlocked_inner self.bind_udp_protocol_handlers(
.routing_table &mut editor_public_internet,
.edit_routing_domain(RoutingDomain::PublicInternet); &mut editor_local_network,
let mut editor_local_network = self )
.unlocked_inner
.routing_table
.edit_routing_domain(RoutingDomain::LocalNetwork);
// start listeners
if protocol_config.inbound.contains(ProtocolType::UDP) {
self.bind_udp_protocol_handlers(&mut editor_public_internet, &mut editor_local_network)
.await?; .await?;
}
if protocol_config.inbound.contains(ProtocolType::WS) {
self.start_ws_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
if protocol_config.inbound.contains(ProtocolType::WSS) {
self.start_wss_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
if protocol_config.inbound.contains(ProtocolType::TCP) {
self.start_tcp_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
editor_public_internet.setup_network(
protocol_config.outbound,
protocol_config.inbound,
protocol_config.family_global,
protocol_config.public_internet_capabilities,
);
editor_local_network.setup_network(
protocol_config.outbound,
protocol_config.inbound,
protocol_config.family_local,
protocol_config.local_network_capabilities,
);
let detect_address_changes = {
let c = self.config.get();
c.network.detect_address_changes
};
if !detect_address_changes {
let inner = self.inner.lock();
if !inner.static_public_dialinfo.is_empty() {
editor_public_internet.set_network_class(Some(NetworkClass::InboundCapable));
} }
} if protocol_config.inbound.contains(ProtocolType::WS) {
self.start_ws_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
if protocol_config.inbound.contains(ProtocolType::WSS) {
self.start_wss_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
if protocol_config.inbound.contains(ProtocolType::TCP) {
self.start_tcp_listeners(&mut editor_public_internet, &mut editor_local_network)
.await?;
}
// commit routing table edits editor_public_internet.setup_network(
editor_public_internet.commit(true).await; protocol_config.outbound,
editor_local_network.commit(true).await; protocol_config.inbound,
protocol_config.family_global,
protocol_config.public_internet_capabilities,
);
editor_local_network.setup_network(
protocol_config.outbound,
protocol_config.inbound,
protocol_config.family_local,
protocol_config.local_network_capabilities,
);
let detect_address_changes = {
let c = self.config.get();
c.network.detect_address_changes
};
if !detect_address_changes {
let inner = self.inner.lock();
if !inner.static_public_dialinfo.is_empty() {
editor_public_internet.set_network_class(Some(NetworkClass::InboundCapable));
}
}
// commit routing table edits
editor_public_internet.commit(true).await;
editor_local_network.commit(true).await;
Ok(())
};
let res = startup_func.await;
if res.is_err() {
info!("network failed to start");
self.inner.lock().network_started = Some(false);
return res;
}
info!("network started"); info!("network started");
self.inner.lock().network_started = true; self.inner.lock().network_started = Some(true);
Ok(()) Ok(())
} }
@ -872,7 +885,7 @@ impl Network {
self.inner.lock().network_needs_restart self.inner.lock().network_needs_restart
} }
pub fn is_started(&self) -> bool { pub fn is_started(&self) -> Option<bool> {
self.inner.lock().network_started self.inner.lock().network_started
} }

View File

@ -394,6 +394,7 @@ impl NetworkManager {
// Get cache key // Get cache key
let ncm_key = NodeContactMethodCacheKey { let ncm_key = NodeContactMethodCacheKey {
node_ids: target_node_ref.node_ids(),
own_node_info_ts: routing_table.get_own_node_info_ts(routing_domain), own_node_info_ts: routing_table.get_own_node_info_ts(routing_domain),
target_node_info_ts: target_node_ref.node_info_ts(routing_domain), target_node_info_ts: target_node_ref.node_info_ts(routing_domain),
target_node_ref_filter: target_node_ref.filter_ref().cloned(), target_node_ref_filter: target_node_ref.filter_ref().cloned(),

View File

@ -76,15 +76,7 @@ impl NetworkManager {
} }
pub fn get_veilid_state(&self) -> Box<VeilidStateNetwork> { pub fn get_veilid_state(&self) -> Box<VeilidStateNetwork> {
let has_state = self if !self.network_is_started() {
.unlocked_inner
.components
.read()
.as_ref()
.map(|c| c.net.is_started())
.unwrap_or(false);
if !has_state {
return Box::new(VeilidStateNetwork { return Box::new(VeilidStateNetwork {
started: false, started: false,
bps_down: 0.into(), bps_down: 0.into(),

View File

@ -52,7 +52,7 @@ pub const MAX_CAPABILITIES: usize = 64;
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
struct NetworkInner { struct NetworkInner {
network_started: bool, network_started: Option<bool>,
network_needs_restart: bool, network_needs_restart: bool,
protocol_config: ProtocolConfig, protocol_config: ProtocolConfig,
} }
@ -74,7 +74,7 @@ pub(in crate::network_manager) struct Network {
impl Network { impl Network {
fn new_inner() -> NetworkInner { fn new_inner() -> NetworkInner {
NetworkInner { NetworkInner {
network_started: false, network_started: Some(false),
network_needs_restart: false, network_needs_restart: false,
protocol_config: Default::default(), protocol_config: Default::default(),
} }
@ -334,70 +334,81 @@ impl Network {
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
pub async fn startup(&self) -> EyreResult<()> { pub async fn startup(&self) -> EyreResult<()> {
log_net!(debug "starting network"); self.inner.lock().network_started = None;
// get protocol config let startup_func = async {
let protocol_config = { log_net!(debug "starting network");
let c = self.config.get(); // get protocol config
let inbound = ProtocolTypeSet::new(); let protocol_config = {
let mut outbound = ProtocolTypeSet::new(); let c = self.config.get();
let inbound = ProtocolTypeSet::new();
let mut outbound = ProtocolTypeSet::new();
if c.network.protocol.ws.connect { if c.network.protocol.ws.connect {
outbound.insert(ProtocolType::WS); outbound.insert(ProtocolType::WS);
} }
if c.network.protocol.wss.connect { if c.network.protocol.wss.connect {
outbound.insert(ProtocolType::WSS); outbound.insert(ProtocolType::WSS);
} }
let supported_address_types: AddressTypeSet = if is_ipv6_supported() { let supported_address_types: AddressTypeSet = if is_ipv6_supported() {
AddressType::IPV4 | AddressType::IPV6 AddressType::IPV4 | AddressType::IPV6
} else { } else {
AddressType::IPV4.into() AddressType::IPV4.into()
};
let family_global = supported_address_types;
let family_local = supported_address_types;
let public_internet_capabilities = {
PUBLIC_INTERNET_CAPABILITIES
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
ProtocolConfig {
outbound,
inbound,
family_global,
family_local,
local_network_capabilities: vec![],
public_internet_capabilities,
}
}; };
self.inner.lock().protocol_config = protocol_config.clone();
let family_global = supported_address_types; // Start editing routing table
let family_local = supported_address_types; let mut editor_public_internet = self
.unlocked_inner
.routing_table
.edit_routing_domain(RoutingDomain::PublicInternet);
let public_internet_capabilities = { // set up the routing table's network config
PUBLIC_INTERNET_CAPABILITIES // if we have static public dialinfo, upgrade our network class
.iter()
.copied()
.filter(|cap| !c.capabilities.disable.contains(cap))
.collect::<Vec<Capability>>()
};
ProtocolConfig { editor_public_internet.setup_network(
outbound, protocol_config.outbound,
inbound, protocol_config.inbound,
family_global, protocol_config.family_global,
family_local, protocol_config.public_internet_capabilities.clone(),
local_network_capabilities: vec![], );
public_internet_capabilities, editor_public_internet.set_network_class(Some(NetworkClass::WebApp));
}
// commit routing table edits
editor_public_internet.commit(true).await;
Ok(())
}; };
self.inner.lock().protocol_config = protocol_config.clone();
// Start editing routing table let res = startup_func.await;
let mut editor_public_internet = self if res.is_err() {
.unlocked_inner info!("network failed to start");
.routing_table self.inner.lock().network_started = Some(false);
.edit_routing_domain(RoutingDomain::PublicInternet); return res;
}
// set up the routing table's network config info!("network started");
// if we have static public dialinfo, upgrade our network class self.inner.lock().network_started = Some(true);
editor_public_internet.setup_network(
protocol_config.outbound,
protocol_config.inbound,
protocol_config.family_global,
protocol_config.public_internet_capabilities.clone(),
);
editor_public_internet.set_network_class(Some(NetworkClass::WebApp));
// commit routing table edits
editor_public_internet.commit(true).await;
self.inner.lock().network_started = true;
log_net!(debug "network started");
Ok(()) Ok(())
} }
@ -405,7 +416,7 @@ impl Network {
self.inner.lock().network_needs_restart self.inner.lock().network_needs_restart
} }
pub fn is_started(&self) -> bool { pub fn is_started(&self) -> Option<bool> {
self.inner.lock().network_started self.inner.lock().network_started
} }

View File

@ -287,6 +287,9 @@ impl RoutingTable {
Ok(NodeContactMethod::Direct(v)) => v, Ok(NodeContactMethod::Direct(v)) => v,
Ok(v) => { Ok(v) => {
log_rtab!(warn "invalid contact method for bootstrap, ignoring peer: {:?}", v); log_rtab!(warn "invalid contact method for bootstrap, ignoring peer: {:?}", v);
let _ = routing_table
.network_manager()
.get_node_contact_method(nr.clone());
return; return;
} }
Err(e) => { Err(e) => {

View File

@ -149,6 +149,11 @@ impl RoutingTable {
inner.refresh_cached_entry_counts() inner.refresh_cached_entry_counts()
}; };
// Only do the rest if the network has started
if !self.network_manager().network_is_started() {
return Ok(());
}
let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize); let min_peer_count = self.with_config(|c| c.network.dht.min_peer_count as usize);
// Figure out which tables need bootstrap or peer minimum refresh // Figure out which tables need bootstrap or peer minimum refresh