no-std-net fix

more network refactor
This commit is contained in:
John Smith 2022-03-10 09:51:53 -05:00
parent 18a227717c
commit 909aa14fe2
17 changed files with 667 additions and 357 deletions

3
.gitmodules vendored
View File

@ -16,3 +16,6 @@
[submodule "external/netlink"]
path = external/netlink
url = ../netlink.git
[submodule "external/no-std-net"]
path = external/no-std-net
url = git@gitlab.hackers.town:veilid/no-std-net.git

7
.vscode/launch.json vendored
View File

@ -11,6 +11,13 @@
"program": "${workspaceFolder}/target/debug/veilid-server",
"pid": "${command:pickMyProcess}"
},
{
"type": "lldb",
"request": "attach",
"name": "Attach to veilid-flutter example",
"program": "${workspaceFolder}/veilid-flutter/example/build/linux/x64/debug/bundle/veilid_example",
"pid": "${command:pickMyProcess}"
},
{
"type": "lldb",
"request": "launch",

742
Cargo.lock generated

File diff suppressed because it is too large Load Diff

1
external/no-std-net vendored Submodule

@ -0,0 +1 @@
Subproject commit c57e56ff4ee4653bf17f8bd35dc2fd0f56dc74e7

View File

@ -80,7 +80,7 @@ wasm-bindgen-futures = "^0"
wasm-logger = "^0"
hashbrown = "^0"
lru = {version = "^0", features = ["hashbrown"] }
no-std-net = "^0"
no-std-net = { path = "../external/no-std-net", features = ["serde"] }
keyvaluedb-web = { path = "../external/keyvaluedb/keyvaluedb-web" }
data-encoding = { version = "^2", default_features = false, features = ["alloc"] }
serde = { version = "^1", default-features = false, features = ["derive", "alloc"] }

View File

@ -68,6 +68,7 @@ impl ConnectionManager {
}
pub async fn startup(&self) {
trace!("startup connection manager");
let mut inner = self.arc.inner.lock().await;
let cac = async_channel::bounded(CONNECTION_PROCESSOR_CHANNEL_SIZE); // xxx move to config
inner.connection_add_channel_tx = Some(cac.0);

View File

@ -389,7 +389,7 @@ impl Network {
}
pub async fn startup(&self) -> Result<(), String> {
info!("starting network");
trace!("startup network");
// initialize interfaces
let mut interfaces = NetworkInterfaces::new();
@ -424,6 +424,7 @@ impl Network {
if protocol_config.tcp_listen {
self.start_tcp_listeners().await?;
}
// release caches of available listener ports
// this releases the 'first bound' ports we use to guarantee
// that we have ports available to us

View File

@ -111,22 +111,17 @@ impl Network {
];
Ok((port, ip_addrs))
} else {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
// If the address is specified, only use the specified port and fail otherwise
let sockaddrs: Vec<SocketAddr> = listen_address
.to_socket_addrs()
.await
.map_err(|e| format!("Unable to resolve address: {}\n{}", listen_address, e))?
.collect();
let sockaddrs = listen_address_to_socket_addrs(&listen_address)?;
if sockaddrs.is_empty() {
Err(format!("No valid listen address: {}", listen_address))
return Err(format!("No valid listen address: {}", listen_address));
}
let port = sockaddrs[0].port();
if self.bind_first_udp_port(port) {
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))
} else {
let port = sockaddrs[0].port();
if self.bind_first_udp_port(port) {
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))
} else {
Err("Could not find free udp port to listen on".to_owned())
}
Err("Could not find free udp port to listen on".to_owned())
}
}
}
@ -144,22 +139,17 @@ impl Network {
];
Ok((port, ip_addrs))
} else {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
// If the address is specified, only use the specified port and fail otherwise
let sockaddrs: Vec<SocketAddr> = listen_address
.to_socket_addrs()
.await
.map_err(|e| format!("Unable to resolve address: {}\n{}", listen_address, e))?
.collect();
let sockaddrs = listen_address_to_socket_addrs(&listen_address)?;
if sockaddrs.is_empty() {
Err(format!("No valid listen address: {}", listen_address))
return Err(format!("No valid listen address: {}", listen_address));
}
let port = sockaddrs[0].port();
if self.bind_first_tcp_port(port) {
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))
} else {
let port = sockaddrs[0].port();
if self.bind_first_tcp_port(port) {
Ok((port, sockaddrs.iter().map(|s| s.ip()).collect()))
} else {
Err("Could not find free tcp port to listen on".to_owned())
}
Err("Could not find free tcp port to listen on".to_owned())
}
}
}
@ -167,6 +157,7 @@ impl Network {
/////////////////////////////////////////////////////
pub(super) async fn start_udp_listeners(&self) -> Result<(), String> {
trace!("starting udp listeners");
let routing_table = self.routing_table();
let (listen_address, public_address) = {
let c = self.config.get();
@ -238,6 +229,7 @@ impl Network {
}
pub(super) async fn start_ws_listeners(&self) -> Result<(), String> {
trace!("starting ws listeners");
let routing_table = self.routing_table();
let (listen_address, url, path) = {
let c = self.config.get();
@ -332,6 +324,8 @@ impl Network {
}
pub(super) async fn start_wss_listeners(&self) -> Result<(), String> {
trace!("starting wss listeners");
let routing_table = self.routing_table();
let (listen_address, url) = {
let c = self.config.get();
@ -403,6 +397,8 @@ impl Network {
}
pub(super) async fn start_tcp_listeners(&self) -> Result<(), String> {
trace!("starting tcp listeners");
let routing_table = self.routing_table();
let (listen_address, public_address) = {
let c = self.config.get();

View File

@ -329,13 +329,14 @@ impl NetworkInterfaces {
// returns Ok(false) if refresh had no changes, Ok(true) if changes were present
pub async fn refresh(&mut self) -> Result<bool, String> {
self.valid = false;
eprintln!("a");
let last_interfaces = core::mem::take(&mut self.interfaces);
let mut platform_support = PlatformSupport::new().map_err(logthru_net!())?;
platform_support
.get_interfaces(&mut self.interfaces)
.await?;
eprintln!("b");
self.valid = true;
@ -343,6 +344,8 @@ impl NetworkInterfaces {
if changed {
trace!("NetworkInterfaces refreshed: {:#?}?", self);
}
eprintln!("c");
xxx investigate why things get stuck here. threading and dart issue with logging ?
Ok(changed)
}
pub fn len(&self) -> usize {

View File

@ -54,6 +54,7 @@ impl LeaseManager {
self.inner.lock().network_manager.clone()
}
pub async fn startup(&self) -> Result<(), String> {
trace!("startup lease manager");
// Retrieve config
{
let mut inner = self.inner.lock();

View File

@ -197,6 +197,7 @@ impl ReceiptManager {
}
pub async fn startup(&self) -> Result<(), String> {
trace!("startup receipt manager");
// Retrieve config
/*
{
@ -319,9 +320,7 @@ impl ReceiptManager {
callback,
)));
let mut inner = self.inner.lock();
inner
.receipts_by_nonce
.insert(receipt.get_nonce(), record);
inner.receipts_by_nonce.insert(receipt.get_nonce(), record);
}
pub fn record_single_shot_receipt(
@ -334,9 +333,7 @@ impl ReceiptManager {
&receipt, expiration, eventual,
)));
let mut inner = self.inner.lock();
inner
.receipts_by_nonce
.insert(receipt.get_nonce(), record);
inner.receipts_by_nonce.insert(receipt.get_nonce(), record);
}
fn update_next_oldest_timestamp(inner: &mut ReceiptManagerInner) {

View File

@ -1256,7 +1256,7 @@ impl RPCProcessor {
}
pub async fn startup(&self) -> Result<(), String> {
trace!("VeilidCore::startup init RPC processor");
trace!("startup rpc processor");
let mut inner = self.inner.lock();
// make local copy of node id for easy access
let c = self.config.get();

View File

@ -75,15 +75,13 @@ impl VeilidAPI {
)?;
}
// Dump routing table bucket info
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
let routing_table = self.network_manager()?.routing_table();
Ok(routing_table.debug_info_buckets(min_state))
}
async fn debug_dialinfo(&self, _args: String) -> Result<String, VeilidAPIError> {
// Dump routing table dialinfo
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
let routing_table = self.network_manager()?.routing_table();
Ok(routing_table.debug_info_dialinfo())
}
@ -107,8 +105,7 @@ impl VeilidAPI {
}
// Dump routing table entries
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
let routing_table = self.network_manager()?.routing_table();
Ok(routing_table.debug_info_entries(limit, min_state))
}
@ -118,15 +115,13 @@ impl VeilidAPI {
let node_id = get_debug_argument_at(&args, 0, "debug_entry", "node_id", get_dht_key)?;
// Dump routing table entry
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
let routing_table = self.network_manager()?.routing_table();
Ok(routing_table.debug_info_entry(node_id))
}
async fn debug_nodeinfo(&self, _args: String) -> Result<String, VeilidAPIError> {
// Dump routing table entry
let rpc = self.rpc_processor()?;
let routing_table = rpc.routing_table();
let routing_table = self.network_manager()?.routing_table();
Ok(routing_table.debug_info_nodeinfo())
}
@ -141,22 +136,26 @@ impl VeilidAPI {
let (arg, rest) = args.split_once(' ').unwrap_or((args, ""));
let rest = rest.trim_start().to_owned();
// Must be detached
if matches!(
self.get_state().await?.attachment,
AttachmentState::Detached | AttachmentState::Detaching
) {
return Err(VeilidAPIError::Internal {
message: "Must be detached to change config".to_owned(),
});
}
// One argument is 'config get'
if rest.is_empty() {
return config
.get_key_json(arg)
.map_err(|e| VeilidAPIError::Internal { message: e });
}
// More than one argument is 'config set'
// Must be detached
if !matches!(
self.get_state().await?.attachment,
AttachmentState::Detached
) {
return Err(VeilidAPIError::Internal {
message: "Must be detached to change config".to_owned(),
});
}
// Change the config key
config
.set_key_json(arg, &rest)
.map_err(|e| VeilidAPIError::Internal { message: e })?;
@ -225,6 +224,7 @@ impl VeilidAPI {
pub async fn debug_help(&self, _args: String) -> Result<String, VeilidAPIError> {
Ok(r#">>> Debug commands:
help
buckets [dead|reliable]
dialinfo
entries [dead|reliable] [limit]
@ -247,28 +247,28 @@ impl VeilidAPI {
let (arg, rest) = args.split_once(' ').unwrap_or((args, ""));
let rest = rest.trim_start().to_owned();
let mut out = String::new();
if arg == "buckets" {
out += self.debug_buckets(rest).await?.as_str();
if arg == "help" {
self.debug_help(rest).await
} else if arg == "buckets" {
self.debug_buckets(rest).await
} else if arg == "dialinfo" {
out += self.debug_dialinfo(rest).await?.as_str();
self.debug_dialinfo(rest).await
} else if arg == "entries" {
out += self.debug_entries(rest).await?.as_str();
self.debug_entries(rest).await
} else if arg == "entry" {
out += self.debug_entry(rest).await?.as_str();
self.debug_entry(rest).await
} else if arg == "nodeinfo" {
out += self.debug_nodeinfo(rest).await?.as_str();
self.debug_nodeinfo(rest).await
} else if arg == "purge" {
out += self.debug_purge(rest).await?.as_str();
self.debug_purge(rest).await
} else if arg == "attach" {
out += self.debug_attach(rest).await?.as_str();
self.debug_attach(rest).await
} else if arg == "detach" {
out += self.debug_detach(rest).await?.as_str();
self.debug_detach(rest).await
} else if arg == "config" {
out += self.debug_config(rest).await?.as_str();
self.debug_config(rest).await
} else {
out += ">>> Unknown command\n";
Ok(">>> Unknown command\n".to_owned())
}
Ok(out)
}
}

View File

@ -359,22 +359,26 @@ impl VeilidConfig {
pub fn get_key_json(&self, key: &str) -> Result<String, String> {
let c = self.get();
// Split key into path parts
let keypath: Vec<&str> = key.split('.').collect();
// Generate json from whole config
let jc = serde_json::to_string(&*c).map_err(map_to_string)?;
let jvc = json::parse(&jc).map_err(map_to_string)?;
// Find requested subkey
let mut out = &jvc;
for k in keypath {
if !jvc.has_key(k) {
return Err(format!("invalid subkey '{}' in key '{}'", k, key));
if key.is_empty() {
Ok(jvc.to_string())
} else {
// Split key into path parts
let keypath: Vec<&str> = key.split('.').collect();
let mut out = &jvc;
for k in keypath {
if !out.has_key(k) {
return Err(format!("invalid subkey '{}' in key '{}'", k, key));
}
out = &out[k];
}
out = &jvc[k];
Ok(out.to_string())
}
Ok(out.to_string())
}
pub fn set_key_json(&self, key: &str, value: &str) -> Result<(), String> {
let mut c = self.get_mut();
@ -394,10 +398,10 @@ impl VeilidConfig {
// Replace subkey
let mut out = &mut jvc;
for k in objkeypath {
if !jvc.has_key(*k) {
if !out.has_key(*k) {
return Err(format!("invalid subkey '{}' in key '{}'", *k, key));
}
out = &mut jvc[*k];
out = &mut out[*k];
}
if !out.has_key(objkeyname) {
return Err(format!("invalid subkey '{}' in key '{}'", objkeyname, key));

View File

@ -135,3 +135,36 @@ where
}
}
}
pub fn listen_address_to_socket_addrs(listen_address: &str) -> Result<Vec<SocketAddr>, String> {
// If no address is specified, but the port is, use ipv4 and ipv6 unspecified
// If the address is specified, only use the specified port and fail otherwise
let ip_addrs = vec![
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
IpAddr::V6(Ipv6Addr::UNSPECIFIED),
];
Ok(if let Some(portstr) = listen_address.strip_prefix(':') {
let port = portstr.parse::<u16>().map_err(|_| {
format!(
"Invalid port format in udp listen address: {}",
listen_address
)
})?;
ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
} else if let Ok(port) = listen_address.parse::<u16>() {
ip_addrs.iter().map(|a| SocketAddr::new(*a, port)).collect()
} else {
cfg_if! {
if #[cfg(target_arch = "wasm32")] {
use core::str::FromStr;
vec![SocketAddr::from_str(listen_address).map_err(|_| format!("Unable to parse address: {}", listen_address))?]
} else {
listen_address
.to_socket_addrs()
.map_err(|_| format!("Unable to resolve address: {}", listen_address))?
.collect()
}
}
})
}

View File

@ -12,6 +12,18 @@ import 'config.dart';
// Loggy tools
const LogLevel traceLevel = LogLevel('trace', 1);
class ConsolePrinter extends LoggyPrinter {
ConsolePrinter(this.childPrinter) : super();
final LoggyPrinter childPrinter;
@override
void onLog(LogRecord record) {
debugPrint(record.toString());
childPrinter.onLog(record);
}
}
extension TraceLoggy on Loggy {
void trace(dynamic message, [Object? error, StackTrace? stackTrace]) =>
log(traceLevel, message, error, stackTrace);
@ -25,13 +37,12 @@ LogOptions getLogOptions(LogLevel? level) {
}
void setRootLogLevel(LogLevel? level) {
print("setRootLogLevel: $level");
Loggy('').level = getLogOptions(level);
}
void initLoggy() {
Loggy.initLoggy(
logPrinter: StreamPrinter(
logPrinter: ConsolePrinter(
const PrettyDeveloperPrinter(),
),
logOptions: getLogOptions(null),

View File

@ -6,11 +6,12 @@ use parking_lot::*;
use serde_derive::*;
use std::ffi::OsStr;
use std::net::{SocketAddr, ToSocketAddrs};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use url::Url;
use veilid_core::xx::*;
pub fn load_default_config(cfg: &mut config::Config) -> Result<(), config::ConfigError> {
let default_config = String::from(
@ -84,38 +85,38 @@ core:
application:
https:
enabled: false
listen_address: '[::]:5150'
listen_address: ':5150'
path: 'app'
# url: 'https://localhost:5150'
http:
enabled: false
listen_address: '[::]:5150'
listen_address: ':5150'
path: 'app'
# url: 'http://localhost:5150'
protocol:
udp:
enabled: true
socket_pool_size: 0
listen_address: '[::]:5150'
listen_address: ':5150'
# public_address: ''
tcp:
connect: true
listen: true
max_connections: 32
listen_address: '[::]:5150'
listen_address: ':5150'
#'public_address: ''
ws:
connect: true
listen: true
max_connections: 16
listen_address: '[::]:5150'
listen_address: ':5150'
path: 'ws'
# url: 'ws://localhost:5150/ws'
wss:
connect: true
listen: false
max_connections: 16
listen_address: '[::]:5150'
listen_address: ':5150'
path: 'ws'
# url: ''
leases:
@ -323,10 +324,11 @@ pub struct NamedSocketAddrs {
impl FromStr for NamedSocketAddrs {
type Err = std::io::Error;
fn from_str(s: &str) -> Result<NamedSocketAddrs, std::io::Error> {
let addr_iter = s.to_socket_addrs()?;
let addr_iter = listen_address_to_socket_addrs(s)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
Ok(NamedSocketAddrs {
name: s.to_owned(),
addrs: addr_iter.collect(),
addrs: addr_iter,
})
}
}
@ -1102,10 +1104,7 @@ mod tests {
assert_eq!(s.client_api.listen_address.name, "localhost:5959");
assert_eq!(
s.client_api.listen_address.addrs,
"localhost:5959"
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()
listen_address_to_socket_addrs("localhost:5959").unwrap()
);
assert_eq!(s.auto_attach, true);
assert_eq!(s.logging.terminal.enabled, true);
@ -1190,14 +1189,11 @@ mod tests {
assert_eq!(s.core.network.application.https.enabled, false);
assert_eq!(
s.core.network.application.https.listen_address.name,
"[::]:5150"
":5150"
);
assert_eq!(
s.core.network.application.https.listen_address.addrs,
"[::]:5150"
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()
listen_address_to_socket_addrs(":5150").unwrap()
);
assert_eq!(
s.core.network.application.https.path,
@ -1205,16 +1201,10 @@ mod tests {
);
assert_eq!(s.core.network.application.https.url, None);
assert_eq!(s.core.network.application.http.enabled, false);
assert_eq!(
s.core.network.application.http.listen_address.name,
"[::]:5150"
);
assert_eq!(s.core.network.application.http.listen_address.name, ":5150");
assert_eq!(
s.core.network.application.http.listen_address.addrs,
"[::]:5150"
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()
listen_address_to_socket_addrs(":5150").unwrap()
);
assert_eq!(
s.core.network.application.http.path,
@ -1227,10 +1217,7 @@ mod tests {
assert_eq!(s.core.network.protocol.udp.listen_address.name, "[::]:5150");
assert_eq!(
s.core.network.protocol.udp.listen_address.addrs,
"[::]:5150"
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()
listen_address_to_socket_addrs(":5150").unwrap()
);
assert_eq!(s.core.network.protocol.udp.public_address, None);
@ -1241,10 +1228,7 @@ mod tests {
assert_eq!(s.core.network.protocol.tcp.listen_address.name, "[::]:5150");
assert_eq!(
s.core.network.protocol.tcp.listen_address.addrs,
"[::]:5150"
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()
listen_address_to_socket_addrs(":5150").unwrap()
);
assert_eq!(s.core.network.protocol.tcp.public_address, None);
@ -1255,10 +1239,7 @@ mod tests {
assert_eq!(s.core.network.protocol.ws.listen_address.name, "[::]:5150");
assert_eq!(
s.core.network.protocol.ws.listen_address.addrs,
"[::]:5150"
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()
listen_address_to_socket_addrs(":5150").unwrap()
);
assert_eq!(
s.core.network.protocol.ws.path,
@ -1272,10 +1253,7 @@ mod tests {
assert_eq!(s.core.network.protocol.wss.listen_address.name, "[::]:5150");
assert_eq!(
s.core.network.protocol.wss.listen_address.addrs,
"[::]:5150"
.to_socket_addrs()
.unwrap()
.collect::<Vec<SocketAddr>>()
listen_address_to_socket_addrs(":5150").unwrap()
);
assert_eq!(
s.core.network.protocol.wss.path,