mirror of
https://gitlab.com/veilid/veilid.git
synced 2024-11-25 10:10:41 -06:00
fix wasm
add connection limits
This commit is contained in:
parent
6ad1f60a61
commit
3b2f4d184f
214
veilid-core/src/connection_limits.rs
Normal file
214
veilid-core/src/connection_limits.rs
Normal file
@ -0,0 +1,214 @@
|
||||
use crate::xx::*;
|
||||
use crate::*;
|
||||
use alloc::collections::btree_map::Entry;
|
||||
use core::fmt;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum AddressFilterError {
|
||||
CountExceeded,
|
||||
RateExceeded,
|
||||
}
|
||||
impl fmt::Display for AddressFilterError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{}",
|
||||
match *self {
|
||||
Self::CountExceeded => "Count exceeded",
|
||||
Self::RateExceeded => "Rate exceeded",
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
impl std::error::Error for AddressFilterError {}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct AddressNotInTableError {}
|
||||
impl fmt::Display for AddressNotInTableError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "Address not in table")
|
||||
}
|
||||
}
|
||||
impl std::error::Error for AddressNotInTableError {}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionLimits {
|
||||
max_connections_per_ip4: usize,
|
||||
max_connections_per_ip6_prefix: usize,
|
||||
max_connections_per_ip6_prefix_size: usize,
|
||||
max_connection_frequency_per_min: usize,
|
||||
conn_count_by_ip4: BTreeMap<Ipv4Addr, usize>,
|
||||
conn_count_by_ip6_prefix: BTreeMap<Ipv6Addr, usize>,
|
||||
conn_timestamps_by_ip4: BTreeMap<Ipv4Addr, Vec<u64>>,
|
||||
conn_timestamps_by_ip6_prefix: BTreeMap<Ipv6Addr, Vec<u64>>,
|
||||
}
|
||||
|
||||
impl ConnectionLimits {
|
||||
pub fn new(config: VeilidConfig) -> Self {
|
||||
let c = config.get();
|
||||
Self {
|
||||
max_connections_per_ip4: c.network.max_connections_per_ip4 as usize,
|
||||
max_connections_per_ip6_prefix: c.network.max_connections_per_ip6_prefix as usize,
|
||||
max_connections_per_ip6_prefix_size: c.network.max_connections_per_ip6_prefix_size
|
||||
as usize,
|
||||
max_connection_frequency_per_min: c.network.max_connection_frequency_per_min as usize,
|
||||
conn_count_by_ip4: BTreeMap::new(),
|
||||
conn_count_by_ip6_prefix: BTreeMap::new(),
|
||||
conn_timestamps_by_ip4: BTreeMap::new(),
|
||||
conn_timestamps_by_ip6_prefix: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
// Converts an ip to a ip block by applying a netmask
|
||||
// to the host part of the ip address
|
||||
// ipv4 addresses are treated as single hosts
|
||||
// ipv6 addresses are treated as prefix allocated blocks
|
||||
fn ip_to_ipblock(&self, addr: IpAddr) -> IpAddr {
|
||||
match addr {
|
||||
IpAddr::V4(_) => addr,
|
||||
IpAddr::V6(v6) => {
|
||||
let mut hostlen = 128usize.saturating_sub(self.max_connections_per_ip6_prefix_size);
|
||||
let mut out = v6.octets();
|
||||
for i in (0..16).rev() {
|
||||
if hostlen >= 8 {
|
||||
out[i] = 0xFF;
|
||||
hostlen -= 8;
|
||||
} else {
|
||||
out[i] |= !(0xFFu8 << hostlen);
|
||||
break;
|
||||
}
|
||||
}
|
||||
IpAddr::V6(Ipv6Addr::from(out))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn purge_old_timestamps(&mut self, cur_ts: u64) {
|
||||
// v4
|
||||
{
|
||||
let mut dead_keys = Vec::<Ipv4Addr>::new();
|
||||
for (key, value) in &mut self.conn_timestamps_by_ip4 {
|
||||
value.retain(|v| {
|
||||
// keep timestamps that are less than a minute away
|
||||
cur_ts.saturating_sub(*v) < 60_000_000u64
|
||||
});
|
||||
if value.is_empty() {
|
||||
dead_keys.push(*key);
|
||||
}
|
||||
}
|
||||
for key in dead_keys {
|
||||
self.conn_timestamps_by_ip4.remove(&key);
|
||||
}
|
||||
}
|
||||
// v6
|
||||
{
|
||||
let mut dead_keys = Vec::<Ipv6Addr>::new();
|
||||
for (key, value) in &mut self.conn_timestamps_by_ip6_prefix {
|
||||
value.retain(|v| {
|
||||
// keep timestamps that are less than a minute away
|
||||
cur_ts.saturating_sub(*v) < 60_000_000u64
|
||||
});
|
||||
if value.is_empty() {
|
||||
dead_keys.push(*key);
|
||||
}
|
||||
}
|
||||
for key in dead_keys {
|
||||
self.conn_timestamps_by_ip6_prefix.remove(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add(&mut self, addr: IpAddr) -> Result<(), AddressFilterError> {
|
||||
let ipblock = self.ip_to_ipblock(addr);
|
||||
let ts = intf::get_timestamp();
|
||||
|
||||
self.purge_old_timestamps(ts);
|
||||
|
||||
match ipblock {
|
||||
IpAddr::V4(v4) => {
|
||||
// See if we have too many connections from this ip block
|
||||
let cnt = &mut *self.conn_count_by_ip4.entry(v4).or_default();
|
||||
assert!(*cnt <= self.max_connections_per_ip4);
|
||||
if *cnt == self.max_connections_per_ip4 {
|
||||
return Err(AddressFilterError::CountExceeded);
|
||||
}
|
||||
// See if this ip block has connected too frequently
|
||||
let tstamps = &mut self.conn_timestamps_by_ip4.entry(v4).or_default();
|
||||
tstamps.retain(|v| {
|
||||
// keep timestamps that are less than a minute away
|
||||
ts.saturating_sub(*v) < 60_000_000u64
|
||||
});
|
||||
assert!(tstamps.len() <= self.max_connection_frequency_per_min);
|
||||
if tstamps.len() == self.max_connection_frequency_per_min {
|
||||
return Err(AddressFilterError::RateExceeded);
|
||||
}
|
||||
|
||||
// If it's okay, add the counts and timestamps
|
||||
*cnt += 1;
|
||||
tstamps.push(ts);
|
||||
}
|
||||
IpAddr::V6(v6) => {
|
||||
// See if we have too many connections from this ip block
|
||||
let cnt = &mut *self.conn_count_by_ip6_prefix.entry(v6).or_default();
|
||||
assert!(*cnt <= self.max_connections_per_ip6_prefix);
|
||||
if *cnt == self.max_connections_per_ip6_prefix {
|
||||
return Err(AddressFilterError::CountExceeded);
|
||||
}
|
||||
// See if this ip block has connected too frequently
|
||||
let tstamps = &mut self.conn_timestamps_by_ip6_prefix.entry(v6).or_default();
|
||||
assert!(tstamps.len() <= self.max_connection_frequency_per_min);
|
||||
if tstamps.len() == self.max_connection_frequency_per_min {
|
||||
return Err(AddressFilterError::RateExceeded);
|
||||
}
|
||||
|
||||
// If it's okay, add the counts and timestamps
|
||||
*cnt += 1;
|
||||
tstamps.push(ts);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, addr: IpAddr) -> Result<(), AddressNotInTableError> {
|
||||
let ipblock = self.ip_to_ipblock(addr);
|
||||
|
||||
let ts = intf::get_timestamp();
|
||||
self.purge_old_timestamps(ts);
|
||||
|
||||
match ipblock {
|
||||
IpAddr::V4(v4) => {
|
||||
match self.conn_count_by_ip4.entry(v4) {
|
||||
Entry::Vacant(_) => {
|
||||
return Err(AddressNotInTableError {});
|
||||
}
|
||||
Entry::Occupied(mut o) => {
|
||||
let cnt = o.get_mut();
|
||||
assert!(*cnt > 0);
|
||||
if *cnt == 0 {
|
||||
self.conn_count_by_ip4.remove(&v4);
|
||||
} else {
|
||||
*cnt -= 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
IpAddr::V6(v6) => {
|
||||
match self.conn_count_by_ip6_prefix.entry(v6) {
|
||||
Entry::Vacant(_) => {
|
||||
return Err(AddressNotInTableError {});
|
||||
}
|
||||
Entry::Occupied(mut o) => {
|
||||
let cnt = o.get_mut();
|
||||
assert!(*cnt > 0);
|
||||
if *cnt == 0 {
|
||||
self.conn_count_by_ip6_prefix.remove(&v6);
|
||||
} else {
|
||||
*cnt -= 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -44,17 +44,18 @@ pub struct ConnectionManager {
|
||||
}
|
||||
|
||||
impl ConnectionManager {
|
||||
fn new_inner() -> ConnectionManagerInner {
|
||||
fn new_inner(config: VeilidConfig) -> ConnectionManagerInner {
|
||||
ConnectionManagerInner {
|
||||
connection_table: ConnectionTable::new(),
|
||||
connection_table: ConnectionTable::new(config),
|
||||
connection_processor_jh: None,
|
||||
connection_add_channel_tx: None,
|
||||
}
|
||||
}
|
||||
fn new_arc(network_manager: NetworkManager) -> ConnectionManagerArc {
|
||||
let config = network_manager.config();
|
||||
ConnectionManagerArc {
|
||||
network_manager,
|
||||
inner: AsyncMutex::new(Self::new_inner()),
|
||||
inner: AsyncMutex::new(Self::new_inner(config)),
|
||||
}
|
||||
}
|
||||
pub fn new(network_manager: NetworkManager) -> Self {
|
||||
@ -78,16 +79,15 @@ impl ConnectionManager {
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
*self.arc.inner.lock().await = Self::new_inner();
|
||||
*self.arc.inner.lock().await = Self::new_inner(self.arc.network_manager.config());
|
||||
}
|
||||
|
||||
// Returns a network connection if one already is established
|
||||
|
||||
pub async fn get_connection(
|
||||
&self,
|
||||
descriptor: ConnectionDescriptor,
|
||||
) -> Option<NetworkConnection> {
|
||||
let inner = self.arc.inner.lock().await;
|
||||
let mut inner = self.arc.inner.lock().await;
|
||||
inner.connection_table.get_connection(descriptor)
|
||||
}
|
||||
|
||||
|
@ -1,73 +1,120 @@
|
||||
use crate::connection_limits::*;
|
||||
use crate::network_connection::*;
|
||||
use crate::xx::*;
|
||||
use crate::*;
|
||||
use alloc::collections::btree_map::Entry;
|
||||
use hashlink::LruCache;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionTable {
|
||||
conn_by_descriptor: BTreeMap<ConnectionDescriptor, NetworkConnection>,
|
||||
max_connections: Vec<usize>,
|
||||
conn_by_descriptor: Vec<LruCache<ConnectionDescriptor, NetworkConnection>>,
|
||||
conns_by_remote: BTreeMap<PeerAddress, Vec<NetworkConnection>>,
|
||||
address_filter: ConnectionLimits,
|
||||
}
|
||||
|
||||
fn protocol_to_index(protocol: ProtocolType) -> usize {
|
||||
match protocol {
|
||||
ProtocolType::TCP => 0,
|
||||
ProtocolType::WS => 1,
|
||||
ProtocolType::WSS => 2,
|
||||
ProtocolType::UDP => panic!("not a connection-oriented protocol"),
|
||||
}
|
||||
}
|
||||
|
||||
impl ConnectionTable {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(config: VeilidConfig) -> Self {
|
||||
let max_connections = {
|
||||
let c = config.get();
|
||||
vec![
|
||||
c.network.protocol.tcp.max_connections as usize,
|
||||
c.network.protocol.ws.max_connections as usize,
|
||||
c.network.protocol.wss.max_connections as usize,
|
||||
]
|
||||
};
|
||||
Self {
|
||||
conn_by_descriptor: BTreeMap::new(),
|
||||
max_connections,
|
||||
conn_by_descriptor: vec![
|
||||
LruCache::new_unbounded(),
|
||||
LruCache::new_unbounded(),
|
||||
LruCache::new_unbounded(),
|
||||
],
|
||||
conns_by_remote: BTreeMap::new(),
|
||||
address_filter: ConnectionLimits::new(config),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_connection(&mut self, conn: NetworkConnection) -> Result<(), String> {
|
||||
let descriptor = conn.connection_descriptor();
|
||||
assert_ne!(
|
||||
descriptor.protocol_type(),
|
||||
ProtocolType::UDP,
|
||||
"Only connection oriented protocols go in the table!"
|
||||
);
|
||||
if self.conn_by_descriptor.contains_key(&descriptor) {
|
||||
let ip_addr = descriptor.remote.socket_address.to_ip_addr();
|
||||
|
||||
let index = protocol_to_index(descriptor.protocol_type());
|
||||
if self.conn_by_descriptor[index].contains_key(&descriptor) {
|
||||
return Err(format!(
|
||||
"Connection already added to table: {:?}",
|
||||
descriptor
|
||||
));
|
||||
}
|
||||
let res = self.conn_by_descriptor.insert(descriptor, conn.clone());
|
||||
|
||||
// Filter by ip for connection limits
|
||||
self.address_filter.add(ip_addr).map_err(map_to_string)?;
|
||||
|
||||
// Add the connection to the table
|
||||
let res = self.conn_by_descriptor[index].insert(descriptor, conn.clone());
|
||||
assert!(res.is_none());
|
||||
|
||||
// if we have reached the maximum number of connections per protocol type
|
||||
// then drop the least recently used connection
|
||||
if self.conn_by_descriptor[index].len() > self.max_connections[index] {
|
||||
if let Some((lruk, _)) = self.conn_by_descriptor[index].remove_lru() {
|
||||
self.remove_connection_records(lruk);
|
||||
}
|
||||
}
|
||||
|
||||
// add connection records
|
||||
let conns = self.conns_by_remote.entry(descriptor.remote).or_default();
|
||||
|
||||
//warn!("add_connection: {:?}", conn);
|
||||
conns.push(conn);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_connection(&self, descriptor: ConnectionDescriptor) -> Option<NetworkConnection> {
|
||||
let out = self.conn_by_descriptor.get(&descriptor).cloned();
|
||||
pub fn get_connection(
|
||||
&mut self,
|
||||
descriptor: ConnectionDescriptor,
|
||||
) -> Option<NetworkConnection> {
|
||||
let index = protocol_to_index(descriptor.protocol_type());
|
||||
let out = self.conn_by_descriptor[index].get(&descriptor).cloned();
|
||||
//warn!("get_connection: {:?} -> {:?}", descriptor, out);
|
||||
out
|
||||
}
|
||||
pub fn get_last_connection_by_remote(&self, remote: PeerAddress) -> Option<NetworkConnection> {
|
||||
|
||||
pub fn get_last_connection_by_remote(
|
||||
&mut self,
|
||||
remote: PeerAddress,
|
||||
) -> Option<NetworkConnection> {
|
||||
let out = self
|
||||
.conns_by_remote
|
||||
.get(&remote)
|
||||
.map(|v| v[(v.len() - 1)].clone());
|
||||
//warn!("get_last_connection_by_remote: {:?} -> {:?}", remote, out);
|
||||
if let Some(connection) = &out {
|
||||
// lru bump
|
||||
let index = protocol_to_index(connection.connection_descriptor().protocol_type());
|
||||
let _ = self.conn_by_descriptor[index].get(&connection.connection_descriptor());
|
||||
}
|
||||
out
|
||||
}
|
||||
|
||||
pub fn connection_count(&self) -> usize {
|
||||
self.conn_by_descriptor.len()
|
||||
self.conn_by_descriptor.iter().fold(0, |b, c| b + c.len())
|
||||
}
|
||||
|
||||
pub fn remove_connection(
|
||||
&mut self,
|
||||
descriptor: ConnectionDescriptor,
|
||||
) -> Result<NetworkConnection, String> {
|
||||
//warn!("remove_connection: {:?}", descriptor);
|
||||
let out = self
|
||||
.conn_by_descriptor
|
||||
.remove(&descriptor)
|
||||
.ok_or_else(|| format!("Connection not in table: {:?}", descriptor))?;
|
||||
fn remove_connection_records(&mut self, descriptor: ConnectionDescriptor) {
|
||||
let ip_addr = descriptor.remote.socket_address.to_ip_addr();
|
||||
|
||||
// conns_by_remote
|
||||
match self.conns_by_remote.entry(descriptor.remote) {
|
||||
Entry::Vacant(_) => {
|
||||
panic!("inconsistency in connection table")
|
||||
@ -88,6 +135,22 @@ impl ConnectionTable {
|
||||
}
|
||||
}
|
||||
}
|
||||
self.address_filter
|
||||
.remove(ip_addr)
|
||||
.expect("Inconsistency in connection table");
|
||||
}
|
||||
|
||||
pub fn remove_connection(
|
||||
&mut self,
|
||||
descriptor: ConnectionDescriptor,
|
||||
) -> Result<NetworkConnection, String> {
|
||||
//warn!("remove_connection: {:?}", descriptor);
|
||||
let index = protocol_to_index(descriptor.protocol_type());
|
||||
let out = self.conn_by_descriptor[index]
|
||||
.remove(&descriptor)
|
||||
.ok_or_else(|| format!("Connection not in table: {:?}", descriptor))?;
|
||||
|
||||
self.remove_connection_records(descriptor);
|
||||
|
||||
Ok(out)
|
||||
}
|
||||
|
@ -469,7 +469,10 @@ impl Network {
|
||||
.await?;
|
||||
}
|
||||
|
||||
self.inner.lock().network_class = context.inner.lock().network_class;
|
||||
let network_class = context.inner.lock().network_class;
|
||||
self.inner.lock().network_class = network_class;
|
||||
|
||||
log_net!(debug "network class set to {:?}", network_class);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -43,6 +43,10 @@ impl Network {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fn network_manager(&self) -> NetworkManager {
|
||||
self.inner.lock().network_manager.clone()
|
||||
}
|
||||
fn connection_manager(&self) -> ConnectionManager {
|
||||
self.inner.lock().network_manager.connection_manager()
|
||||
}
|
||||
@ -54,6 +58,8 @@ impl Network {
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(), String> {
|
||||
let data_len = data.len();
|
||||
|
||||
let res = match dial_info.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
@ -62,7 +68,7 @@ impl Network {
|
||||
return Err("no support for TCP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
ProtocolType::WS | ProtocolType::WSS => {
|
||||
WebsocketProtocolHandler::send_unbound_message(dial_info, data)
|
||||
WebsocketProtocolHandler::send_unbound_message(dial_info.clone(), data)
|
||||
.await
|
||||
.map_err(logthru_net!())
|
||||
}
|
||||
@ -80,6 +86,7 @@ impl Network {
|
||||
descriptor: ConnectionDescriptor,
|
||||
data: Vec<u8>,
|
||||
) -> Result<Option<Vec<u8>>, String> {
|
||||
let data_len = data.len();
|
||||
match descriptor.protocol_type() {
|
||||
ProtocolType::UDP => {
|
||||
return Err("no support for udp protocol".to_owned()).map_err(logthru_net!(error))
|
||||
@ -115,6 +122,7 @@ impl Network {
|
||||
dial_info: DialInfo,
|
||||
data: Vec<u8>,
|
||||
) -> Result<(), String> {
|
||||
let data_len = data.len();
|
||||
if dial_info.protocol_type() == ProtocolType::UDP {
|
||||
return Err("no support for UDP protocol".to_owned()).map_err(logthru_net!(error))
|
||||
}
|
||||
@ -125,7 +133,7 @@ impl Network {
|
||||
// Handle connection-oriented protocols
|
||||
let conn = self
|
||||
.connection_manager()
|
||||
.get_or_create_connection(None, dial_info)
|
||||
.get_or_create_connection(None, dial_info.clone())
|
||||
.await?;
|
||||
|
||||
let res = conn.send(data).await.map_err(logthru_net!(error));
|
||||
@ -143,15 +151,17 @@ impl Network {
|
||||
// get protocol config
|
||||
self.inner.lock().protocol_config = Some({
|
||||
let c = self.config.get();
|
||||
ProtocolConfig {
|
||||
udp_enabled: false, //c.network.protocol.udp.enabled && c.capabilities.protocol_udp,
|
||||
tcp_connect: false, //c.network.protocol.tcp.connect && c.capabilities.protocol_connect_tcp,
|
||||
tcp_listen: false, //c.network.protocol.tcp.listen && c.capabilities.protocol_accept_tcp,
|
||||
ws_connect: c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws,
|
||||
ws_listen: c.network.protocol.ws.listen && c.capabilities.protocol_accept_ws,
|
||||
wss_connect: c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss,
|
||||
wss_listen: c.network.protocol.wss.listen && c.capabilities.protocol_accept_wss,
|
||||
let inbound = ProtocolSet::new();
|
||||
let mut outbound = ProtocolSet::new();
|
||||
|
||||
if c.network.protocol.ws.connect && c.capabilities.protocol_connect_ws {
|
||||
outbound.insert(ProtocolType::WS);
|
||||
}
|
||||
if c.network.protocol.wss.connect && c.capabilities.protocol_connect_wss {
|
||||
outbound.insert(ProtocolType::WSS);
|
||||
}
|
||||
|
||||
ProtocolConfig { inbound, outbound }
|
||||
});
|
||||
|
||||
self.inner.lock().network_started = true;
|
||||
@ -174,7 +184,8 @@ impl Network {
|
||||
let routing_table = network_manager.routing_table();
|
||||
|
||||
// Drop all dial info
|
||||
routing_table.clear_dial_info_details();
|
||||
routing_table.clear_dial_info_details(RoutingDomain::PublicInternet);
|
||||
routing_table.clear_dial_info_details(RoutingDomain::LocalNetwork);
|
||||
|
||||
// Cancels all async background tasks by dropping join handles
|
||||
*self.inner.lock() = Self::new_inner(network_manager);
|
||||
@ -203,6 +214,12 @@ impl Network {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
pub fn reset_network_class(&self) {
|
||||
//let mut inner = self.inner.lock();
|
||||
//inner.network_class = None;
|
||||
}
|
||||
|
||||
pub fn get_protocol_config(&self) -> Option<ProtocolConfig> {
|
||||
self.inner.lock().protocol_config.clone()
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ extern crate alloc;
|
||||
mod api_logger;
|
||||
mod attachment_manager;
|
||||
mod callback_state_machine;
|
||||
mod connection_limits;
|
||||
mod connection_manager;
|
||||
mod connection_table;
|
||||
mod core_context;
|
||||
|
@ -59,7 +59,6 @@ impl DummyNetworkConnection {
|
||||
pub struct NetworkConnectionStats {
|
||||
last_message_sent_time: Option<u64>,
|
||||
last_message_recv_time: Option<u64>,
|
||||
_established_time: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -71,6 +70,7 @@ struct NetworkConnectionInner {
|
||||
struct NetworkConnectionArc {
|
||||
descriptor: ConnectionDescriptor,
|
||||
protocol_connection: ProtocolNetworkConnection,
|
||||
established_time: u64,
|
||||
inner: Mutex<NetworkConnectionInner>,
|
||||
}
|
||||
|
||||
@ -92,7 +92,6 @@ impl NetworkConnection {
|
||||
stats: NetworkConnectionStats {
|
||||
last_message_sent_time: None,
|
||||
last_message_recv_time: None,
|
||||
_established_time: intf::get_timestamp(),
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -103,6 +102,7 @@ impl NetworkConnection {
|
||||
NetworkConnectionArc {
|
||||
descriptor,
|
||||
protocol_connection,
|
||||
established_time: intf::get_timestamp(),
|
||||
inner: Mutex::new(Self::new_inner()),
|
||||
}
|
||||
}
|
||||
@ -161,4 +161,8 @@ impl NetworkConnection {
|
||||
let inner = self.arc.inner.lock();
|
||||
inner.stats.clone()
|
||||
}
|
||||
|
||||
pub fn established_time(&self) -> u64 {
|
||||
self.arc.established_time
|
||||
}
|
||||
}
|
||||
|
@ -1,10 +1,13 @@
|
||||
use super::test_veilid_config::*;
|
||||
use crate::connection_table::*;
|
||||
use crate::network_connection::*;
|
||||
use crate::xx::*;
|
||||
use crate::*;
|
||||
|
||||
pub async fn test_add_get_remove() {
|
||||
let mut table = ConnectionTable::new();
|
||||
let config = get_config();
|
||||
|
||||
let mut table = ConnectionTable::new(config);
|
||||
|
||||
let a1 = ConnectionDescriptor::new_no_local(PeerAddress::new(
|
||||
SocketAddress::new(Address::IPV4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
|
||||
|
@ -185,9 +185,12 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
|
||||
"protected_store.always_use_insecure_storage" => Ok(Box::new(false)),
|
||||
"protected_store.insecure_fallback_directory" => Ok(Box::new(get_protected_store_path())),
|
||||
"protected_store.delete" => Ok(Box::new(false)),
|
||||
"network.max_connections" => Ok(Box::new(16u32)),
|
||||
"network.connection_initial_timeout_ms" => Ok(Box::new(2_000u32)),
|
||||
"network.connection_inactivity_timeout_ms" => Ok(Box::new(60_000u32)),
|
||||
"network.max_connections_per_ip4" => Ok(Box::new(8u32)),
|
||||
"network.max_connections_per_ip6_prefix" => Ok(Box::new(8u32)),
|
||||
"network.max_connections_per_ip6_prefix_size" => Ok(Box::new(56u32)),
|
||||
"network.max_connection_frequency_per_min" => Ok(Box::new(8u32)),
|
||||
"network.client_whitelist_timeout_ms" => Ok(Box::new(300_000u32)),
|
||||
"network.node_id" => Ok(Box::new(dht::key::DHTKey::default())),
|
||||
"network.node_id_secret" => Ok(Box::new(dht::key::DHTKeySecret::default())),
|
||||
@ -264,6 +267,18 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_config() -> VeilidConfig {
|
||||
let mut vc = VeilidConfig::new();
|
||||
match vc.setup(Arc::new(config_callback)) {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
error!("Error: {}", e);
|
||||
unreachable!();
|
||||
}
|
||||
};
|
||||
vc
|
||||
}
|
||||
|
||||
pub async fn test_config() {
|
||||
let mut vc = VeilidConfig::new();
|
||||
match vc.setup(Arc::new(config_callback)) {
|
||||
@ -296,9 +311,12 @@ pub async fn test_config() {
|
||||
get_protected_store_path()
|
||||
);
|
||||
assert_eq!(inner.protected_store.delete, false);
|
||||
assert_eq!(inner.network.max_connections, 16);
|
||||
assert_eq!(inner.network.connection_initial_timeout_ms, 2_000u32);
|
||||
assert_eq!(inner.network.connection_inactivity_timeout_ms, 60_000u32);
|
||||
assert_eq!(inner.network.max_connections_per_ip4, 8u32);
|
||||
assert_eq!(inner.network.max_connections_per_ip6_prefix, 8u32);
|
||||
assert_eq!(inner.network.max_connections_per_ip6_prefix_size, 56u32);
|
||||
assert_eq!(inner.network.max_connection_frequency_per_min, 8u32);
|
||||
assert_eq!(inner.network.client_whitelist_timeout_ms, 300_000u32);
|
||||
assert!(!inner.network.node_id.valid);
|
||||
assert!(!inner.network.node_id_secret.valid);
|
||||
|
@ -135,9 +135,12 @@ pub struct VeilidConfigRoutingTable {
|
||||
|
||||
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct VeilidConfigNetwork {
|
||||
pub max_connections: u32,
|
||||
pub connection_initial_timeout_ms: u32,
|
||||
pub connection_inactivity_timeout_ms: u32,
|
||||
pub max_connections_per_ip4: u32,
|
||||
pub max_connections_per_ip6_prefix: u32,
|
||||
pub max_connections_per_ip6_prefix_size: u32,
|
||||
pub max_connection_frequency_per_min: u32,
|
||||
pub client_whitelist_timeout_ms: u32,
|
||||
pub reverse_connection_receipt_time_ms: u32,
|
||||
pub hole_punch_receipt_time_ms: u32,
|
||||
@ -294,9 +297,12 @@ impl VeilidConfig {
|
||||
get_config!(inner.protected_store.delete);
|
||||
get_config!(inner.network.node_id);
|
||||
get_config!(inner.network.node_id_secret);
|
||||
get_config!(inner.network.max_connections);
|
||||
get_config!(inner.network.connection_initial_timeout_ms);
|
||||
get_config!(inner.network.connection_inactivity_timeout_ms);
|
||||
get_config!(inner.network.max_connections_per_ip4);
|
||||
get_config!(inner.network.max_connections_per_ip6_prefix);
|
||||
get_config!(inner.network.max_connections_per_ip6_prefix_size);
|
||||
get_config!(inner.network.max_connection_frequency_per_min);
|
||||
get_config!(inner.network.client_whitelist_timeout_ms);
|
||||
get_config!(inner.network.bootstrap);
|
||||
get_config!(inner.network.routing_table.limit_over_attached);
|
||||
|
@ -33,6 +33,14 @@ macro_rules! log_net {
|
||||
(warn $fmt:literal, $($arg:expr),+) => {
|
||||
warn!(target:"net", $fmt, $($arg),+);
|
||||
};
|
||||
(debug $text:expr) => {debug!(
|
||||
target: "net",
|
||||
"{}",
|
||||
$text,
|
||||
)};
|
||||
(debug $fmt:literal, $($arg:expr),+) => {
|
||||
debug!(target:"net", $fmt, $($arg),+);
|
||||
};
|
||||
($text:expr) => {trace!(
|
||||
target: "net",
|
||||
"{}",
|
||||
|
@ -38,9 +38,12 @@ Future<VeilidConfig> getDefaultVeilidConfig() async {
|
||||
delete: false,
|
||||
),
|
||||
network: VeilidConfigNetwork(
|
||||
maxConnections: 16,
|
||||
connectionInitialTimeoutMs: 2000,
|
||||
connectionInactivityTimeoutMs: 60000,
|
||||
maxConnectionsPerIp4: 8,
|
||||
maxConnectionsPerIp6Prefix: 8,
|
||||
maxConnectionsPerIp6PrefixSize: 56,
|
||||
maxConnectionFrequencyPerMin: 8,
|
||||
clientWhitelistTimeoutMs: 300000,
|
||||
nodeId: "",
|
||||
nodeIdSecret: "",
|
||||
|
@ -549,9 +549,12 @@ class VeilidConfigLeases {
|
||||
////////////
|
||||
|
||||
class VeilidConfigNetwork {
|
||||
int maxConnections;
|
||||
int connectionInitialTimeoutMs;
|
||||
int connectionInactivityTimeoutMs;
|
||||
int maxConnectionsPerIp4;
|
||||
int maxConnectionsPerIp6Prefix;
|
||||
int maxConnectionsPerIp6PrefixSize;
|
||||
int maxConnectionFrequencyPerMin;
|
||||
int clientWhitelistTimeoutMs;
|
||||
String nodeId;
|
||||
String nodeIdSecret;
|
||||
@ -569,9 +572,12 @@ class VeilidConfigNetwork {
|
||||
VeilidConfigLeases leases;
|
||||
|
||||
VeilidConfigNetwork({
|
||||
required this.maxConnections,
|
||||
required this.connectionInitialTimeoutMs,
|
||||
required this.connectionInactivityTimeoutMs,
|
||||
required this.maxConnectionsPerIp4,
|
||||
required this.maxConnectionsPerIp6Prefix,
|
||||
required this.maxConnectionsPerIp6PrefixSize,
|
||||
required this.maxConnectionFrequencyPerMin,
|
||||
required this.clientWhitelistTimeoutMs,
|
||||
required this.nodeId,
|
||||
required this.nodeIdSecret,
|
||||
@ -591,9 +597,12 @@ class VeilidConfigNetwork {
|
||||
|
||||
Map<String, dynamic> get json {
|
||||
return {
|
||||
'max_connections': maxConnections,
|
||||
'connection_initial_timeout_ms': connectionInitialTimeoutMs,
|
||||
'connection_inactivity_timeout_ms': connectionInactivityTimeoutMs,
|
||||
'max_connections_per_ip4': maxConnectionsPerIp4,
|
||||
'max_connections_per_ip6_prefix': maxConnectionsPerIp6Prefix,
|
||||
'max_connections_per_ip6_prefix_size': maxConnectionsPerIp6PrefixSize,
|
||||
'max_connection_frequency_per_min': maxConnectionFrequencyPerMin,
|
||||
'client_whitelist_timeout_ms': clientWhitelistTimeoutMs,
|
||||
'node_id': nodeId,
|
||||
'node_id_secret': nodeIdSecret,
|
||||
@ -613,10 +622,14 @@ class VeilidConfigNetwork {
|
||||
}
|
||||
|
||||
VeilidConfigNetwork.fromJson(Map<String, dynamic> json)
|
||||
: maxConnections = json['max_connections'],
|
||||
connectionInitialTimeoutMs = json['connection_initial_timeout_ms'],
|
||||
: connectionInitialTimeoutMs = json['connection_initial_timeout_ms'],
|
||||
connectionInactivityTimeoutMs =
|
||||
json['connection_inactivity_timeout_ms'],
|
||||
maxConnectionsPerIp4 = json['max_connections_per_ip4'],
|
||||
maxConnectionsPerIp6Prefix = json['max_connections_per_ip6_prefix'],
|
||||
maxConnectionsPerIp6PrefixSize =
|
||||
json['max_connections_per_ip6_prefix_size'],
|
||||
maxConnectionFrequencyPerMin = json['max_connection_frequency_per_min'],
|
||||
clientWhitelistTimeoutMs = json['client_whitelist_timeout_ms'],
|
||||
nodeId = json['node_id'],
|
||||
nodeIdSecret = json['node_id_secret'],
|
||||
|
@ -48,10 +48,13 @@ core:
|
||||
directory: '%BLOCK_STORE_DIRECTORY%'
|
||||
delete: false
|
||||
network:
|
||||
max_connections: 16
|
||||
connection_initial_timeout_ms: 2000
|
||||
connection_inactivity_timeout_ms: 60000
|
||||
client_whitelist_timeout_ms: 300000
|
||||
max_connections_per_ip4: 8
|
||||
max_connections_per_ip6_prefix: 8
|
||||
max_connections_per_ip6_prefix_size: 56
|
||||
max_connection_frequency_per_min: 8
|
||||
client_whitelist_timeout_ms: 300000
|
||||
node_id: ''
|
||||
node_id_secret: ''
|
||||
bootstrap: []
|
||||
@ -538,9 +541,12 @@ pub struct RoutingTable {
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub struct Network {
|
||||
pub max_connections: u32,
|
||||
pub connection_initial_timeout_ms: u32,
|
||||
pub connection_inactivity_timeout_ms: u32,
|
||||
pub max_connections_per_ip4: u32,
|
||||
pub max_connections_per_ip6_prefix: u32,
|
||||
pub max_connections_per_ip6_prefix_size: u32,
|
||||
pub max_connection_frequency_per_min: u32,
|
||||
pub client_whitelist_timeout_ms: u32,
|
||||
pub node_id: veilid_core::DHTKey,
|
||||
pub node_id_secret: veilid_core::DHTKeySecret,
|
||||
@ -810,13 +816,25 @@ impl Settings {
|
||||
)),
|
||||
"block_store.delete" => Ok(Box::new(inner.core.block_store.delete)),
|
||||
|
||||
"network.max_connections" => Ok(Box::new(inner.core.network.max_connections)),
|
||||
"network.connection_initial_timeout_ms" => {
|
||||
Ok(Box::new(inner.core.network.connection_initial_timeout_ms))
|
||||
}
|
||||
"network.connection_inactivity_timeout_ms" => Ok(Box::new(
|
||||
inner.core.network.connection_inactivity_timeout_ms,
|
||||
)),
|
||||
"network.max_connections_per_ip4" => {
|
||||
Ok(Box::new(inner.core.network.max_connections_per_ip4))
|
||||
}
|
||||
"network.max_connections_per_ip6_prefix" => {
|
||||
Ok(Box::new(inner.core.network.max_connections_per_ip6_prefix))
|
||||
}
|
||||
"network.max_connections_per_ip6_prefix_size" => Ok(Box::new(
|
||||
inner.core.network.max_connections_per_ip6_prefix_size,
|
||||
)),
|
||||
"network.max_connection_frequency_per_min" => Ok(Box::new(
|
||||
inner.core.network.max_connection_frequency_per_min,
|
||||
)),
|
||||
|
||||
"network.client_whitelist_timeout_ms" => {
|
||||
Ok(Box::new(inner.core.network.client_whitelist_timeout_ms))
|
||||
}
|
||||
@ -1170,9 +1188,12 @@ mod tests {
|
||||
);
|
||||
assert_eq!(s.core.protected_store.delete, false);
|
||||
|
||||
assert_eq!(s.core.network.max_connections, 16);
|
||||
assert_eq!(s.core.network.connection_initial_timeout_ms, 2_000u32);
|
||||
assert_eq!(s.core.network.connection_inactivity_timeout_ms, 60_000u32);
|
||||
assert_eq!(s.core.network.max_connections_per_ip4, 8u32);
|
||||
assert_eq!(s.core.network.max_connections_per_ip6_prefix, 8u32);
|
||||
assert_eq!(s.core.network.max_connections_per_ip6_prefix_size, 56u32);
|
||||
assert_eq!(s.core.network.max_connection_frequency_per_min, 8u32);
|
||||
assert_eq!(s.core.network.client_whitelist_timeout_ms, 300_000u32);
|
||||
assert_eq!(s.core.network.node_id, veilid_core::DHTKey::default());
|
||||
assert_eq!(
|
||||
|
@ -37,7 +37,6 @@ fn init_callbacks() {
|
||||
case "capabilities.protocol_connect_wss": return true;
|
||||
case "capabilities.protocol_accept_wss": return false;
|
||||
case "tablestore.directory": return "";
|
||||
case "network.max_connections": return 16;
|
||||
case "network.node_id": return "ZLd4uMYdP4qYLtxF6GqrzBb32Z6T3rE2FWMkWup1pdY";
|
||||
case "network.node_id_secret": return "s2Gvq6HJOxgQh-3xIgfWSL3I-DWZ2c1RjZLJl2Xmg2E";
|
||||
case "network.bootstrap": return [];
|
||||
|
Loading…
Reference in New Issue
Block a user