IPC to server

This commit is contained in:
John Smith 2023-12-14 17:23:43 -05:00 committed by Christien Rioux
parent 6d2119f32e
commit 37979277b5
20 changed files with 817 additions and 239 deletions

1
Cargo.lock generated
View File

@ -5769,6 +5769,7 @@ dependencies = [
"stop-token", "stop-token",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
"tracing", "tracing",
"tracing-oslog", "tracing-oslog",

View File

@ -3,6 +3,7 @@ use crate::tools::*;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::SystemTime; use std::time::SystemTime;
use stop_token::{future::FutureExt as _, StopSource}; use stop_token::{future::FutureExt as _, StopSource};
@ -20,7 +21,6 @@ cfg_if! {
struct ClientApiConnectionInner { struct ClientApiConnectionInner {
comproc: CommandProcessor, comproc: CommandProcessor,
connect_addr: Option<SocketAddr>,
request_sender: Option<flume::Sender<String>>, request_sender: Option<flume::Sender<String>>,
disconnector: Option<StopSource>, disconnector: Option<StopSource>,
disconnect_requested: bool, disconnect_requested: bool,
@ -38,7 +38,6 @@ impl ClientApiConnection {
Self { Self {
inner: Arc::new(Mutex::new(ClientApiConnectionInner { inner: Arc::new(Mutex::new(ClientApiConnectionInner {
comproc, comproc,
connect_addr: None,
request_sender: None, request_sender: None,
disconnector: None, disconnector: None,
disconnect_requested: false, disconnect_requested: false,
@ -117,33 +116,15 @@ impl ClientApiConnection {
} }
} }
async fn handle_connection(&self, connect_addr: SocketAddr) -> Result<(), String> { pub async fn run_json_api_processor<R, W>(
trace!("ClientApiConnection::handle_connection"); self,
mut reader: R,
// Connect the TCP socket mut writer: W,
let stream = TcpStream::connect(connect_addr) ) -> Result<(), String>
.await where
.map_err(map_to_string)?; R: AsyncBufReadExt + Unpin + Send,
W: AsyncWriteExt + Unpin + Send,
// If it succeed, disable nagle algorithm {
stream.set_nodelay(true).map_err(map_to_string)?;
// State we connected
let comproc = self.inner.lock().comproc.clone();
comproc.set_connection_state(ConnectionState::Connected(connect_addr, SystemTime::now()));
// Split the stream
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use futures::AsyncReadExt;
let (reader, mut writer) = stream.split();
let mut reader = BufReader::new(reader);
} else if #[cfg(feature="rt-tokio")] {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
}
}
// Requests to send // Requests to send
let (requests_tx, requests_rx) = flume::unbounded(); let (requests_tx, requests_rx) = flume::unbounded();
@ -152,7 +133,6 @@ impl ClientApiConnection {
let stop_source = StopSource::new(); let stop_source = StopSource::new();
let token = stop_source.token(); let token = stop_source.token();
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
inner.connect_addr = Some(connect_addr);
inner.disconnector = Some(stop_source); inner.disconnector = Some(stop_source);
inner.request_sender = Some(requests_tx); inner.request_sender = Some(requests_tx);
token token
@ -231,7 +211,6 @@ impl ClientApiConnection {
inner.request_sender = None; inner.request_sender = None;
inner.disconnector = None; inner.disconnector = None;
inner.disconnect_requested = false; inner.disconnect_requested = false;
inner.connect_addr = None;
// Connection finished // Connection finished
if disconnect_requested { if disconnect_requested {
@ -241,6 +220,66 @@ impl ClientApiConnection {
} }
} }
async fn handle_tcp_connection(&self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::handle_tcp_connection");
// Connect the TCP socket
let stream = TcpStream::connect(connect_addr)
.await
.map_err(map_to_string)?;
// If it succeed, disable nagle algorithm
stream.set_nodelay(true).map_err(map_to_string)?;
// State we connected
let comproc = self.inner.lock().comproc.clone();
comproc.set_connection_state(ConnectionState::ConnectedTCP(
connect_addr,
SystemTime::now(),
));
// Split into reader and writer halves
// with line buffering on the reader
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use futures::AsyncReadExt;
let (reader, mut writer) = stream.split();
let reader = BufReader::new(reader);
} else {
let (reader, writer) = stream.into_split();
let reader = BufReader::new(reader);
}
}
self.clone().run_json_api_processor(reader, writer).await
}
async fn handle_ipc_connection(&self, ipc_path: PathBuf) -> Result<(), String> {
trace!("ClientApiConnection::handle_ipc_connection");
// Connect the IPC socket
let stream = IpcStream::connect(&ipc_path).await.map_err(map_to_string)?;
// State we connected
let comproc = self.inner.lock().comproc.clone();
comproc.set_connection_state(ConnectionState::ConnectedIPC(ipc_path, SystemTime::now()));
// Split into reader and writer halves
// with line buffering on the reader
use futures::AsyncReadExt;
let (reader, writer) = stream.split();
cfg_if! {
if #[cfg(feature = "rt-tokio")] {
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
let reader = reader.compat();
let writer = writer.compat_write();
}
}
let reader = BufReader::new(reader);
self.clone().run_json_api_processor(reader, writer).await
}
async fn perform_request(&self, mut req: json::JsonValue) -> Option<json::JsonValue> { async fn perform_request(&self, mut req: json::JsonValue) -> Option<json::JsonValue> {
let (sender, reply_rx) = { let (sender, reply_rx) = {
let mut inner = self.inner.lock(); let mut inner = self.inner.lock();
@ -358,10 +397,15 @@ impl ClientApiConnection {
} }
// Start Client API connection // Start Client API connection
pub async fn connect(&self, connect_addr: SocketAddr) -> Result<(), String> { pub async fn ipc_connect(&self, ipc_path: PathBuf) -> Result<(), String> {
trace!("ClientApiConnection::connect"); trace!("ClientApiConnection::ipc_connect");
// Save the pathto connect to
self.handle_ipc_connection(ipc_path).await
}
pub async fn tcp_connect(&self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::tcp_connect");
// Save the address to connect to // Save the address to connect to
self.handle_connection(connect_addr).await self.handle_tcp_connection(connect_addr).await
} }
// End Client API connection // End Client API connection

View File

@ -4,6 +4,7 @@ use crate::tools::*;
use crate::ui::*; use crate::ui::*;
use indent::indent_all_by; use indent::indent_all_by;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf;
use std::time::SystemTime; use std::time::SystemTime;
use veilid_tools::*; use veilid_tools::*;
@ -22,18 +23,20 @@ pub fn convert_loglevel(s: &str) -> Result<String, String> {
#[derive(PartialEq, Clone)] #[derive(PartialEq, Clone)]
pub enum ConnectionState { pub enum ConnectionState {
Disconnected, Disconnected,
Connected(SocketAddr, SystemTime), ConnectedTCP(SocketAddr, SystemTime),
Retrying(SocketAddr, SystemTime), RetryingTCP(SocketAddr, SystemTime),
ConnectedIPC(PathBuf, SystemTime),
RetryingIPC(PathBuf, SystemTime),
} }
impl ConnectionState { impl ConnectionState {
pub fn is_disconnected(&self) -> bool { pub fn is_disconnected(&self) -> bool {
matches!(*self, Self::Disconnected) matches!(*self, Self::Disconnected)
} }
pub fn is_connected(&self) -> bool { pub fn is_connected(&self) -> bool {
matches!(*self, Self::Connected(_, _)) matches!(*self, Self::ConnectedTCP(_, _) | Self::ConnectedIPC(_, _))
} }
pub fn is_retrying(&self) -> bool { pub fn is_retrying(&self) -> bool {
matches!(*self, Self::Retrying(_, _)) matches!(*self, Self::RetryingTCP(_, _) | Self::RetryingIPC(_, _))
} }
} }
@ -44,7 +47,8 @@ struct CommandProcessorInner {
finished: bool, finished: bool,
autoconnect: bool, autoconnect: bool,
autoreconnect: bool, autoreconnect: bool,
server_addr: Option<SocketAddr>, ipc_path: Option<PathBuf>,
network_addr: Option<SocketAddr>,
connection_waker: Eventual, connection_waker: Eventual,
last_call_id: Option<u64>, last_call_id: Option<u64>,
enable_app_messages: bool, enable_app_messages: bool,
@ -65,7 +69,8 @@ impl CommandProcessor {
finished: false, finished: false,
autoconnect: settings.autoconnect, autoconnect: settings.autoconnect,
autoreconnect: settings.autoreconnect, autoreconnect: settings.autoreconnect,
server_addr: None, ipc_path: None,
network_addr: None,
connection_waker: Eventual::new(), connection_waker: Eventual::new(),
last_call_id: None, last_call_id: None,
enable_app_messages: false, enable_app_messages: false,
@ -306,26 +311,30 @@ Server Debug Commands:
// Loop while we want to keep the connection // Loop while we want to keep the connection
let mut first = true; let mut first = true;
while self.inner().reconnect { while self.inner().reconnect {
let server_addr_opt = self.inner_mut().server_addr; // IPC
let server_addr = match server_addr_opt { let ipc_path_opt = self.inner_mut().ipc_path.clone();
None => break, if let Some(ipc_path) = ipc_path_opt {
Some(addr) => addr,
};
if first { if first {
info!("Connecting to server at {}", server_addr); info!(
self.set_connection_state(ConnectionState::Retrying( "Connecting to server at {}",
server_addr, ipc_path.to_string_lossy().to_string()
);
self.set_connection_state(ConnectionState::RetryingIPC(
ipc_path.clone(),
SystemTime::now(), SystemTime::now(),
)); ));
} else { } else {
debug!("Retrying connection to {}", server_addr); debug!(
"Retrying connection to {}",
ipc_path.to_string_lossy().to_string()
);
} }
let capi = self.capi(); let capi = self.capi();
let res = capi.connect(server_addr).await; let res = capi.ipc_connect(ipc_path.clone()).await;
if res.is_ok() { if res.is_ok() {
info!( info!(
"Connection to server at {} terminated normally", "Connection to server at {} terminated normally",
server_addr ipc_path.to_string_lossy().to_string()
); );
break; break;
} }
@ -334,10 +343,43 @@ Server Debug Commands:
break; break;
} }
self.set_connection_state(ConnectionState::Retrying( self.set_connection_state(ConnectionState::RetryingIPC(
server_addr, ipc_path,
SystemTime::now(), SystemTime::now(),
)); ));
}
// TCP
let network_addr_opt = self.inner_mut().network_addr;
if let Some(network_addr) = network_addr_opt {
if first {
info!("Connecting to server at {}", network_addr);
self.set_connection_state(ConnectionState::RetryingTCP(
network_addr,
SystemTime::now(),
));
} else {
debug!("Retrying connection to {}", network_addr);
}
let capi = self.capi();
let res = capi.tcp_connect(network_addr).await;
if res.is_ok() {
info!(
"Connection to server at {} terminated normally",
network_addr
);
break;
}
if !self.inner().autoreconnect {
info!("Connection to server lost.");
break;
}
self.set_connection_state(ConnectionState::RetryingTCP(
network_addr,
SystemTime::now(),
));
}
debug!("Connection lost, retrying in 2 seconds"); debug!("Connection lost, retrying in 2 seconds");
{ {
@ -355,11 +397,17 @@ Server Debug Commands:
// called by ui // called by ui
//////////////////////////////////////////// ////////////////////////////////////////////
pub fn set_server_address(&self, server_addr: Option<SocketAddr>) { pub fn set_ipc_path(&self, ipc_path: Option<PathBuf>) {
self.inner_mut().server_addr = server_addr; self.inner_mut().ipc_path = ipc_path;
} }
pub fn get_server_address(&self) -> Option<SocketAddr> { pub fn get_ipc_path(&self) -> Option<PathBuf> {
self.inner().server_addr self.inner().ipc_path.clone()
}
pub fn set_network_address(&self, network_addr: Option<SocketAddr>) {
self.inner_mut().network_addr = network_addr;
}
pub fn get_network_address(&self) -> Option<SocketAddr> {
self.inner().network_addr
} }
// called by client_api_connection // called by client_api_connection
// calls into ui // calls into ui

View File

@ -3,11 +3,11 @@
#![deny(unused_must_use)] #![deny(unused_must_use)]
#![recursion_limit = "256"] #![recursion_limit = "256"]
use crate::tools::*; use crate::{settings::NamedSocketAddrs, tools::*};
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use flexi_logger::*; use flexi_logger::*;
use std::{net::ToSocketAddrs, path::PathBuf}; use std::path::PathBuf;
mod cached_text_view; mod cached_text_view;
mod client_api_connection; mod client_api_connection;
@ -28,14 +28,20 @@ enum LogLevel {
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about = "Veilid Console Client")] #[command(author, version, about = "Veilid Console Client")]
struct CmdlineArgs { struct CmdlineArgs {
/// IPC socket to connect to
#[arg(long, short = 'p')]
ipc_path: Option<PathBuf>,
/// IPC socket to connect to
#[arg(long, short = 'i', default_value = "0")]
subnode_index: usize,
/// Address to connect to /// Address to connect to
#[arg(long)] #[arg(long, short = 'a')]
address: Option<String>, address: Option<String>,
/// Wait for debugger to attach /// Wait for debugger to attach
#[arg(long)] #[arg(long)]
wait_for_debug: bool, wait_for_debug: bool,
/// Specify a configuration file to use /// Specify a configuration file to use
#[arg(short, long, value_name = "FILE")] #[arg(short = 'c', long, value_name = "FILE")]
config_file: Option<PathBuf>, config_file: Option<PathBuf>,
/// log level /// log level
#[arg(value_enum)] #[arg(value_enum)]
@ -123,16 +129,48 @@ fn main() -> Result<(), String> {
.expect("failed to initialize logger!"); .expect("failed to initialize logger!");
} }
} }
// Get client address // Get client address
let server_addrs = if let Some(address_arg) = args.address { let enable_ipc = settings.enable_ipc;
address_arg let mut enable_network = settings.enable_network;
.to_socket_addrs()
.map_err(|e| format!("Invalid server address '{}'", e))? // Determine IPC path to try
.collect() let mut client_api_ipc_path = None;
if enable_ipc {
if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) {
if ipc_path.exists() && !ipc_path.is_dir() {
// try direct path
enable_network = false;
client_api_ipc_path = Some(ipc_path);
} else if ipc_path.exists() && ipc_path.is_dir() {
// try subnode index inside path
let ipc_path = ipc_path.join(args.subnode_index.to_string());
if ipc_path.exists() && !ipc_path.is_dir() {
// subnode indexed path exists
enable_network = false;
client_api_ipc_path = Some(ipc_path);
}
}
}
}
let mut client_api_network_addresses = None;
if enable_network {
let args_address = if let Some(args_address) = args.address {
match NamedSocketAddrs::try_from(args_address) {
Ok(v) => Some(v),
Err(e) => {
return Err(format!("Invalid server address: {}", e));
}
}
} else { } else {
settings.address.addrs.clone() None
}; };
let server_addr = server_addrs.first().cloned(); if let Some(address_arg) = args_address.or(settings.address.clone()) {
client_api_network_addresses = Some(address_arg.addrs);
} else if let Some(address) = settings.address.clone() {
client_api_network_addresses = Some(address.addrs.clone());
}
}
// Create command processor // Create command processor
debug!("Creating Command Processor "); debug!("Creating Command Processor ");
@ -147,7 +185,13 @@ fn main() -> Result<(), String> {
comproc.set_client_api_connection(capi.clone()); comproc.set_client_api_connection(capi.clone());
// Keep a connection to the server // Keep a connection to the server
comproc.set_server_address(server_addr); if let Some(client_api_ipc_path) = client_api_ipc_path {
comproc.set_ipc_path(Some(client_api_ipc_path));
} else if let Some(client_api_network_address) = client_api_network_addresses {
let network_addr = client_api_network_address.first().cloned();
comproc.set_network_address(network_addr);
}
let comproc2 = comproc.clone(); let comproc2 = comproc.clone();
let connection_future = comproc.connection_manager(); let connection_future = comproc.connection_manager();

View File

@ -7,6 +7,9 @@ use std::path::{Path, PathBuf};
pub fn load_default_config() -> Result<config::Config, config::ConfigError> { pub fn load_default_config() -> Result<config::Config, config::ConfigError> {
let default_config = r#"--- let default_config = r#"---
enable_ipc: true
local_socket_path: '%LOCAL_SOCKET_DIRECTORY%'
enable_network: true
address: "localhost:5959" address: "localhost:5959"
autoconnect: true autoconnect: true
autoreconnect: true autoreconnect: true
@ -45,6 +48,10 @@ interface:
warn : "light yellow" warn : "light yellow"
error : "light red" error : "light red"
"# "#
.replace(
"%LOCAL_SOCKET_DIRECTORY%",
&Settings::get_default_local_socket_path().to_string_lossy(),
)
.replace( .replace(
"%LOGGING_FILE_DIRECTORY%", "%LOGGING_FILE_DIRECTORY%",
&Settings::get_default_log_directory().to_string_lossy(), &Settings::get_default_log_directory().to_string_lossy(),
@ -111,11 +118,22 @@ pub fn convert_loglevel(log_level: LogLevel) -> log::LevelFilter {
} }
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct NamedSocketAddrs { pub struct NamedSocketAddrs {
pub name: String, pub name: String,
pub addrs: Vec<SocketAddr>, pub addrs: Vec<SocketAddr>,
} }
impl TryFrom<String> for NamedSocketAddrs {
type Error = std::io::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let addrs = value.to_socket_addrs()?.collect();
let name = value;
Ok(NamedSocketAddrs { name, addrs })
}
}
impl<'de> serde::Deserialize<'de> for NamedSocketAddrs { impl<'de> serde::Deserialize<'de> for NamedSocketAddrs {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where where
@ -200,7 +218,10 @@ pub struct Interface {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
pub struct Settings { pub struct Settings {
pub address: NamedSocketAddrs, pub enable_ipc: bool,
pub ipc_path: Option<PathBuf>,
pub enable_network: bool,
pub address: Option<NamedSocketAddrs>,
pub autoconnect: bool, pub autoconnect: bool,
pub autoreconnect: bool, pub autoreconnect: bool,
pub logging: Logging, pub logging: Logging,
@ -208,6 +229,29 @@ pub struct Settings {
} }
impl Settings { impl Settings {
fn get_server_default_directory(subpath: &str) -> PathBuf {
#[cfg(unix)]
{
let globalpath = PathBuf::from("/var/db/veilid-server").join(subpath);
if globalpath.is_dir() {
return globalpath;
}
}
let mut ts_path = if let Some(my_proj_dirs) = ProjectDirs::from("org", "Veilid", "Veilid") {
PathBuf::from(my_proj_dirs.data_local_dir())
} else {
PathBuf::from("./")
};
ts_path.push(subpath);
ts_path
}
pub fn get_default_local_socket_path() -> PathBuf {
Self::get_server_default_directory("local_sockets")
}
pub fn get_default_config_path() -> PathBuf { pub fn get_default_config_path() -> PathBuf {
// Get default configuration file location // Get default configuration file location
let mut default_config_path = let mut default_config_path =

View File

@ -20,6 +20,7 @@ use crate::cached_text_view::*;
use chrono::{Datelike, Timelike}; use chrono::{Datelike, Timelike};
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::io::Write; use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use thiserror::Error; use thiserror::Error;
@ -259,8 +260,17 @@ impl UI {
fn peers(s: &mut Cursive) -> ViewRef<PeersTableView> { fn peers(s: &mut Cursive) -> ViewRef<PeersTableView> {
s.find_name("peers").unwrap() s.find_name("peers").unwrap()
} }
fn connection_address(s: &mut Cursive) -> ViewRef<EditView> { fn ipc_path(s: &mut Cursive) -> ViewRef<EditView> {
s.find_name("connection-address").unwrap() s.find_name("ipc-path").unwrap()
}
fn ipc_path_radio(s: &mut Cursive) -> ViewRef<RadioButton<u32>> {
s.find_name("ipc-path-radio").unwrap()
}
fn network_address(s: &mut Cursive) -> ViewRef<EditView> {
s.find_name("network-address").unwrap()
}
fn network_address_radio(s: &mut Cursive) -> ViewRef<RadioButton<u32>> {
s.find_name("network-address-radio").unwrap()
} }
fn connection_dialog(s: &mut Cursive) -> ViewRef<Dialog> { fn connection_dialog(s: &mut Cursive) -> ViewRef<Dialog> {
s.find_name("connection-dialog").unwrap() s.find_name("connection-dialog").unwrap()
@ -321,7 +331,7 @@ impl UI {
} }
} }
fn render_button_attach<'a>(inner: &mut UIInner) -> (&'a str, bool) { fn render_button_attach<'a>(inner: &mut UIInner) -> (&'a str, bool) {
if let ConnectionState::Connected(_, _) = inner.ui_state.connection_state.get() { if let ConnectionState::ConnectedTCP(_, _) = inner.ui_state.connection_state.get() {
match inner.ui_state.attachment_state.get().as_str() { match inner.ui_state.attachment_state.get().as_str() {
"Detached" => ("Attach", true), "Detached" => ("Attach", true),
"Attaching" => ("Detach", true), "Attaching" => ("Detach", true),
@ -496,19 +506,39 @@ impl UI {
button_attach.set_enabled(button_enable); button_attach.set_enabled(button_enable);
} }
fn submit_connection_address(s: &mut Cursive) { fn submit_ipc_path(s: &mut Cursive) {
let edit = Self::connection_address(s); let edit = Self::ipc_path(s);
let addr = (*edit.get_content()).clone(); let addr = (*edit.get_content()).clone();
let sa = match addr.parse::<std::net::SocketAddr>() { let ipc_path = match addr.parse::<PathBuf>() {
Ok(sa) => Some(sa), Ok(sa) => sa,
Err(_) => { Err(_) => {
s.add_layer(Dialog::text("Invalid address").button("Close", |s| { s.add_layer(Dialog::text("Invalid IPC path").button("Close", |s| {
s.pop_layer(); s.pop_layer();
})); }));
return; return;
} }
}; };
Self::command_processor(s).set_server_address(sa); Self::command_processor(s).set_ipc_path(Some(ipc_path));
Self::command_processor(s).set_network_address(None);
Self::command_processor(s).start_connection();
}
fn submit_network_address(s: &mut Cursive) {
let edit = Self::network_address(s);
let addr = (*edit.get_content()).clone();
let sa = match addr.parse::<std::net::SocketAddr>() {
Ok(sa) => sa,
Err(_) => {
s.add_layer(
Dialog::text("Invalid network address").button("Close", |s| {
s.pop_layer();
}),
);
return;
}
};
Self::command_processor(s).set_ipc_path(None);
Self::command_processor(s).set_network_address(Some(sa));
Self::command_processor(s).start_connection(); Self::command_processor(s).start_connection();
} }
@ -589,8 +619,19 @@ impl UI {
} }
fn show_connection_dialog(s: &mut Cursive, state: ConnectionState) -> bool { fn show_connection_dialog(s: &mut Cursive, state: ConnectionState) -> bool {
let is_ipc = Self::command_processor(s).get_ipc_path().is_some();
let mut inner = Self::inner_mut(s); let mut inner = Self::inner_mut(s);
let mut connection_type_group: RadioGroup<u32> = RadioGroup::new().on_change(|s, v| {
if *v == 0 {
Self::ipc_path(s).enable();
Self::network_address(s).disable();
} else if *v == 1 {
Self::ipc_path(s).disable();
Self::network_address(s).enable();
}
});
let mut show: bool = false; let mut show: bool = false;
let mut hide: bool = false; let mut hide: bool = false;
let mut reset: bool = false; let mut reset: bool = false;
@ -613,7 +654,7 @@ impl UI {
reset = true; reset = true;
} }
} }
ConnectionState::Connected(_, _) => { ConnectionState::ConnectedTCP(_, _) | ConnectionState::ConnectedIPC(_, _) => {
if inner.connection_dialog_state.is_some() if inner.connection_dialog_state.is_some()
&& !inner && !inner
.connection_dialog_state .connection_dialog_state
@ -624,7 +665,7 @@ impl UI {
hide = true; hide = true;
} }
} }
ConnectionState::Retrying(_, _) => { ConnectionState::RetryingTCP(_, _) | ConnectionState::RetryingIPC(_, _) => {
if inner.connection_dialog_state.is_none() if inner.connection_dialog_state.is_none()
|| inner || inner
.connection_dialog_state .connection_dialog_state
@ -655,15 +696,42 @@ impl UI {
ResizedView::with_full_screen(DummyView {}), ResizedView::with_full_screen(DummyView {}),
ColorStyle::new(PaletteColor::Background, PaletteColor::Background), ColorStyle::new(PaletteColor::Background, PaletteColor::Background),
)); ));
s.add_layer( s.add_layer(
Dialog::around( Dialog::around(
LinearLayout::vertical().child( LinearLayout::vertical().child(
LinearLayout::horizontal() LinearLayout::horizontal()
.child(TextView::new("Address:")) .child(
if is_ipc {
connection_type_group.button(0, "IPC Path").selected()
} else {
connection_type_group.button(0, "IPC Path")
}
.with_name("ipc-path-radio"),
)
.child( .child(
EditView::new() EditView::new()
.on_submit(|s, _| Self::submit_connection_address(s)) .with_enabled(is_ipc)
.with_name("connection-address") .on_submit(|s, _| Self::submit_ipc_path(s))
.with_name("ipc-path")
.fixed_height(1)
.min_width(40),
)
.child(
if is_ipc {
connection_type_group.button(1, "Network Address")
} else {
connection_type_group
.button(1, "Network Address")
.selected()
}
.with_name("network-address-radio"),
)
.child(
EditView::new()
.with_enabled(!is_ipc)
.on_submit(|s, _| Self::submit_network_address(s))
.with_name("network-address")
.fixed_height(1) .fixed_height(1)
.min_width(40), .min_width(40),
), ),
@ -693,24 +761,57 @@ impl UI {
match new_state { match new_state {
ConnectionState::Disconnected => { ConnectionState::Disconnected => {
let addr = match Self::command_processor(s).get_server_address() { Self::ipc_path_radio(s).set_enabled(true);
None => "".to_owned(), Self::network_address_radio(s).set_enabled(true);
Some(addr) => addr.to_string(),
let (network_address, network_address_enabled) =
match Self::command_processor(s).get_network_address() {
None => ("".to_owned(), false),
Some(addr) => (addr.to_string(), true),
}; };
debug!("address is {}", addr); let mut edit = Self::network_address(s);
let mut edit = Self::connection_address(s); edit.set_content(network_address);
edit.set_content(addr); edit.set_enabled(network_address_enabled);
edit.set_enabled(true);
let (ipc_path, ipc_path_enabled) = match Self::command_processor(s).get_ipc_path() {
None => ("".to_owned(), false),
Some(ipc_path) => (ipc_path.to_string_lossy().to_string(), true),
};
let mut edit = Self::ipc_path(s);
edit.set_content(ipc_path);
edit.set_enabled(ipc_path_enabled);
let mut dlg = Self::connection_dialog(s); let mut dlg = Self::connection_dialog(s);
dlg.add_button("Connect", Self::submit_connection_address); dlg.add_button("Connect", Self::submit_network_address);
} }
ConnectionState::Connected(_, _) => {} ConnectionState::ConnectedTCP(_, _) | ConnectionState::ConnectedIPC(_, _) => {}
ConnectionState::Retrying(addr, _) => { ConnectionState::RetryingTCP(addr, _) => {
Self::ipc_path_radio(s).set_enabled(false);
Self::network_address_radio(s).set_enabled(false);
// //
let mut edit = Self::connection_address(s); let mut edit = Self::network_address(s);
debug!("address is {}", addr);
edit.set_content(addr.to_string()); edit.set_content(addr.to_string());
edit.set_enabled(false); edit.set_enabled(false);
Self::ipc_path(s).set_enabled(false);
let mut dlg = Self::connection_dialog(s);
dlg.add_button("Cancel", |s| {
Self::command_processor(s).cancel_reconnect();
});
}
ConnectionState::RetryingIPC(ipc_path, _) => {
Self::ipc_path_radio(s).set_enabled(false);
Self::network_address_radio(s).set_enabled(false);
//
let mut edit = Self::ipc_path(s);
edit.set_content(ipc_path.to_string_lossy().to_string());
edit.set_enabled(false);
Self::network_address(s).set_enabled(false);
let mut dlg = Self::connection_dialog(s); let mut dlg = Self::connection_dialog(s);
dlg.add_button("Cancel", |s| { dlg.add_button("Cancel", |s| {
Self::command_processor(s).cancel_reconnect(); Self::command_processor(s).cancel_reconnect();
@ -732,6 +833,8 @@ impl UI {
let mut status = StyledString::new(); let mut status = StyledString::new();
let mut enable_status_fields = false;
match inner.ui_state.connection_state.get() { match inner.ui_state.connection_state.get() {
ConnectionState::Disconnected => { ConnectionState::Disconnected => {
status.append_styled( status.append_styled(
@ -740,18 +843,48 @@ impl UI {
); );
status.append_styled("|", ColorStyle::highlight_inactive()); status.append_styled("|", ColorStyle::highlight_inactive());
} }
ConnectionState::Retrying(addr, _) => { ConnectionState::RetryingTCP(addr, _) => {
status.append_styled( status.append_styled(
format!("Reconnecting to {} ", addr), format!("Reconnecting to {} ", addr),
ColorStyle::highlight_inactive(), ColorStyle::highlight_inactive(),
); );
status.append_styled("|", ColorStyle::highlight_inactive()); status.append_styled("|", ColorStyle::highlight_inactive());
} }
ConnectionState::Connected(addr, _) => { ConnectionState::RetryingIPC(path, _) => {
status.append_styled(
format!(
"Reconnecting to IPC#{} ",
path.file_name()
.unwrap_or_default()
.to_string_lossy()
.into_owned()
),
ColorStyle::highlight_inactive(),
);
status.append_styled("|", ColorStyle::highlight_inactive());
}
ConnectionState::ConnectedTCP(addr, _) => {
status.append_styled( status.append_styled(
format!("Connected to {} ", addr), format!("Connected to {} ", addr),
ColorStyle::highlight_inactive(), ColorStyle::highlight_inactive(),
); );
enable_status_fields = true;
}
ConnectionState::ConnectedIPC(path, _) => {
status.append_styled(
format!(
"Connected to IPC#{} ",
path.file_name()
.unwrap_or_default()
.to_string_lossy()
.into_owned()
),
ColorStyle::highlight_inactive(),
);
enable_status_fields = true;
}
}
if enable_status_fields {
status.append_styled("|", ColorStyle::highlight_inactive()); status.append_styled("|", ColorStyle::highlight_inactive());
// Add attachment state // Add attachment state
status.append_styled( status.append_styled(
@ -768,7 +901,6 @@ impl UI {
// Add tunnel status // Add tunnel status
status.append_styled(" No Tunnels ", ColorStyle::highlight_inactive()); status.append_styled(" No Tunnels ", ColorStyle::highlight_inactive());
status.append_styled("|", ColorStyle::highlight_inactive()); status.append_styled("|", ColorStyle::highlight_inactive());
}
}; };
statusbar.set_content(status); statusbar.set_content(status);

View File

@ -48,10 +48,10 @@ opentelemetry = { version = "0.20" }
opentelemetry-otlp = { version = "0.13" } opentelemetry-otlp = { version = "0.13" }
opentelemetry-semantic-conventions = "0.12" opentelemetry-semantic-conventions = "0.12"
async-std = { version = "^1", features = ["unstable"], optional = true } async-std = { version = "^1", features = ["unstable"], optional = true }
tokio = { version = "^1", features = ["full", "tracing"], optional = true } tokio = { version = "1.32.0", features = ["full", "tracing"], optional = true }
tokio-stream = { version = "0.1.14", features = ["net"], optional = true }
tokio-util = { version = "0.7.8", features = ["compat"], optional = true }
console-subscriber = { version = "^0", optional = true } console-subscriber = { version = "^0", optional = true }
tokio-stream = { version = "^0", features = ["net"], optional = true }
tokio-util = { version = "^0", features = ["compat"], optional = true }
async-tungstenite = { package = "veilid-async-tungstenite", version = "^0", features = [ async-tungstenite = { package = "veilid-async-tungstenite", version = "^0", features = [
"async-tls", "async-tls",
] } ] }

View File

@ -6,6 +6,7 @@ use futures_util::{future::join_all, stream::FuturesUnordered, StreamExt};
use parking_lot::Mutex; use parking_lot::Mutex;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use stop_token::future::FutureExt as _; use stop_token::future::FutureExt as _;
use stop_token::*; use stop_token::*;
@ -46,7 +47,7 @@ struct ClientApiInner {
settings: Settings, settings: Settings,
stop: Option<StopSource>, stop: Option<StopSource>,
join_handle: Option<ClientApiAllFuturesJoinHandle>, join_handle: Option<ClientApiAllFuturesJoinHandle>,
update_channels: HashMap<(SocketAddr, SocketAddr), flume::Sender<String>>, update_channels: HashMap<u64, flume::Sender<String>>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -108,9 +109,40 @@ impl ClientApi {
trace!("ClientApi::stop: stopped"); trace!("ClientApi::stop: stopped");
} }
async fn handle_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> { async fn handle_ipc_incoming(self, ipc_path: PathBuf) -> std::io::Result<()> {
let listener = IpcListener::bind(ipc_path.clone()).await?;
debug!("IPC Client API listening on: {:?}", ipc_path);
// Process the incoming accept stream
let mut incoming_stream = listener.incoming();
// Make wait group for all incoming connections
let awg = AsyncWaitGroup::new();
let stop_token = self.inner.lock().stop.as_ref().unwrap().token();
while let Ok(Some(stream_result)) =
incoming_stream.next().timeout_at(stop_token.clone()).await
{
// Get the stream to process
let stream = stream_result?;
// Increment wait group
awg.add(1);
let t_awg = awg.clone();
// Process the connection
spawn(self.clone().handle_ipc_connection(stream, t_awg)).detach();
}
// Wait for all connections to terminate
awg.wait().await;
Ok(())
}
async fn handle_tcp_incoming(self, bind_addr: SocketAddr) -> std::io::Result<()> {
let listener = TcpListener::bind(bind_addr).await?; let listener = TcpListener::bind(bind_addr).await?;
debug!("Client API listening on: {:?}", bind_addr); debug!("TCPClient API listening on: {:?}", bind_addr);
// Process the incoming accept stream // Process the incoming accept stream
cfg_if! { cfg_if! {
@ -137,7 +169,7 @@ impl ClientApi {
let t_awg = awg.clone(); let t_awg = awg.clone();
// Process the connection // Process the connection
spawn(self.clone().handle_connection(stream, t_awg)).detach(); spawn(self.clone().handle_tcp_connection(stream, t_awg)).detach();
} }
// Wait for all connections to terminate // Wait for all connections to terminate
@ -300,47 +332,11 @@ impl ClientApi {
VeilidAPIResult::Ok(None) VeilidAPIResult::Ok(None)
} }
pub async fn handle_connection(self, stream: TcpStream, awg: AsyncWaitGroup) { pub async fn run_json_request_processor<R, W>(self, reader: R, writer: W, stop_token: StopToken)
// Get address of peer where
let peer_addr = match stream.peer_addr() { R: AsyncBufReadExt + Unpin + Send,
Ok(v) => v, W: AsyncWriteExt + Unpin + Send,
Err(e) => { {
eprintln!("can't get peer address: {}", e);
return;
}
};
// Get local address
let local_addr = match stream.local_addr() {
Ok(v) => v,
Err(e) => {
eprintln!("can't get local address: {}", e);
return;
}
};
// Get connection tuple
let conn_tuple = (local_addr, peer_addr);
debug!(
"Accepted Client API Connection: {:?} -> {:?}",
peer_addr, local_addr
);
// Make stop token to quit when stop() is requested externally
let stop_token = self.inner.lock().stop.as_ref().unwrap().token();
// Split into reader and writer halves
// with line buffering on the reader
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use futures_util::AsyncReadExt;
let (reader, mut writer) = stream.split();
let reader = BufReader::new(reader);
} else {
let (reader, writer) = stream.into_split();
let reader = BufReader::new(reader);
}
}
// Make request processor for this connection // Make request processor for this connection
let api = self.inner.lock().veilid_api.clone(); let api = self.inner.lock().veilid_api.clone();
let jrp = json_api::JsonRequestProcessor::new(api); let jrp = json_api::JsonRequestProcessor::new(api);
@ -354,10 +350,11 @@ impl ClientApi {
let (responses_tx, responses_rx) = flume::unbounded(); let (responses_tx, responses_rx) = flume::unbounded();
// Start sending updates // Start sending updates
let id = get_timestamp();
self.inner self.inner
.lock() .lock()
.update_channels .update_channels
.insert(conn_tuple, responses_tx.clone()); .insert(id, responses_tx.clone());
// Request receive processor future // Request receive processor future
// Receives from socket and enqueues RequestLines // Receives from socket and enqueues RequestLines
@ -407,7 +404,50 @@ impl ClientApi {
} }
// Stop sending updates // Stop sending updates
self.inner.lock().update_channels.remove(&conn_tuple); self.inner.lock().update_channels.remove(&id);
}
pub async fn handle_tcp_connection(self, stream: TcpStream, awg: AsyncWaitGroup) {
// Get address of peer
let peer_addr = match stream.peer_addr() {
Ok(v) => v,
Err(e) => {
eprintln!("can't get peer address: {}", e);
return;
}
};
// Get local address
let local_addr = match stream.local_addr() {
Ok(v) => v,
Err(e) => {
eprintln!("can't get local address: {}", e);
return;
}
};
// Get connection tuple
debug!(
"Accepted TCP Client API Connection: {:?} -> {:?}",
peer_addr, local_addr
);
// Make stop token to quit when stop() is requested externally
let stop_token = self.inner.lock().stop.as_ref().unwrap().token();
// Split into reader and writer halves
// with line buffering on the reader
cfg_if! {
if #[cfg(feature="rt-async-std")] {
use futures_util::AsyncReadExt;
let (reader, mut writer) = stream.split();
let reader = BufReader::new(reader);
} else {
let (reader, writer) = stream.into_split();
let reader = BufReader::new(reader);
}
}
self.run_json_request_processor(reader, writer, stop_token)
.await;
debug!( debug!(
"Closed Client API Connection: {:?} -> {:?}", "Closed Client API Connection: {:?} -> {:?}",
@ -417,6 +457,34 @@ impl ClientApi {
awg.done(); awg.done();
} }
pub async fn handle_ipc_connection(self, stream: IpcStream, awg: AsyncWaitGroup) {
// Get connection tuple
debug!("Accepted IPC Client API Connection");
// Make stop token to quit when stop() is requested externally
let stop_token = self.inner.lock().stop.as_ref().unwrap().token();
// Split into reader and writer halves
// with line buffering on the reader
use futures_util::AsyncReadExt;
let (reader, writer) = stream.split();
cfg_if! {
if #[cfg(feature = "rt-tokio")] {
use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
let reader = reader.compat();
let writer = writer.compat_write();
}
}
let reader = BufReader::new(reader);
self.run_json_request_processor(reader, writer, stop_token)
.await;
debug!("Closed Client API Connection",);
awg.done();
}
pub fn handle_update(&self, veilid_update: veilid_core::VeilidUpdate) { pub fn handle_update(&self, veilid_update: veilid_core::VeilidUpdate) {
// serialize update to NDJSON // serialize update to NDJSON
let veilid_update = serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n"; let veilid_update = serialize_json(json_api::RecvMessage::Update(veilid_update)) + "\n";
@ -431,15 +499,29 @@ impl ClientApi {
} }
#[instrument(level = "trace", skip(self))] #[instrument(level = "trace", skip(self))]
pub fn run(&self, bind_addrs: Vec<SocketAddr>) { pub fn run(&self, ipc_path: Option<PathBuf>, tcp_bind_addrs: Vec<SocketAddr>) {
let bind_futures = bind_addrs.iter().copied().map(|addr| { let mut bind_futures: Vec<SendPinBoxFuture<()>> = Vec::new();
// Local IPC
if let Some(ipc_path) = ipc_path {
let this = self.clone(); let this = self.clone();
async move { bind_futures.push(Box::pin(async move {
if let Err(e) = this.handle_incoming(addr).await { if let Err(e) = this.handle_ipc_incoming(ipc_path.clone()).await {
warn!("Not binding client API to {}: {}", addr, e); warn!("Not binding IPC client API to {:?}: {}", ipc_path, e);
} }
}));
} }
});
// Network sockets
for addr in tcp_bind_addrs.iter().copied() {
let this = self.clone();
bind_futures.push(Box::pin(async move {
if let Err(e) = this.handle_tcp_incoming(addr).await {
warn!("Not binding TCP client API to {}: {}", addr, e);
}
}));
}
let bind_futures_join = join_all(bind_futures); let bind_futures_join = join_all(bind_futures);
self.inner.lock().join_handle = Some(spawn(bind_futures_join)); self.inner.lock().join_handle = Some(spawn(bind_futures_join));
} }

View File

@ -50,15 +50,21 @@ pub async fn run_veilid_server_internal(
let ( let (
settings_auto_attach, settings_auto_attach,
settings_client_api_enabled, settings_client_api_ipc_enabled,
settings_client_api_network_enabled,
settings_client_api_ipc_directory,
settings_client_api_listen_address_addrs, settings_client_api_listen_address_addrs,
subnode_index,
) = { ) = {
let settingsr = settings.read(); let settingsr = settings.read();
( (
settingsr.auto_attach, settingsr.auto_attach,
settingsr.client_api.enabled, settingsr.client_api.ipc_enabled,
settingsr.client_api.network_enabled,
settingsr.client_api.ipc_directory.clone(),
settingsr.client_api.listen_address.addrs.clone(), settingsr.client_api.listen_address.addrs.clone(),
settingsr.testing.subnode_index,
) )
}; };
@ -84,12 +90,22 @@ pub async fn run_veilid_server_internal(
.wrap_err("VeilidCore startup failed")?; .wrap_err("VeilidCore startup failed")?;
// Start client api if one is requested // Start client api if one is requested
let mut capi = if settings_client_api_enabled && matches!(server_mode, ServerMode::Normal) { let capi_enabled = settings_client_api_ipc_enabled || settings_client_api_network_enabled;
let mut capi = if capi_enabled && matches!(server_mode, ServerMode::Normal) {
let some_capi = let some_capi =
client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone(), settings.clone()); client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone(), settings.clone());
some_capi some_capi.clone().run(
.clone() if settings_client_api_ipc_enabled {
.run(settings_client_api_listen_address_addrs); Some(settings_client_api_ipc_directory.join(subnode_index.to_string()))
} else {
None
},
if settings_client_api_network_enabled {
settings_client_api_listen_address_addrs
} else {
vec![]
},
);
Some(some_capi) Some(some_capi)
} else { } else {
None None

View File

@ -18,7 +18,9 @@ pub fn load_default_config() -> EyreResult<config::Config> {
daemon: daemon:
enabled: false enabled: false
client_api: client_api:
enabled: true ipc_enabled: false
ipc_directory: '%IPC_DIRECTORY%'
network_enabled: false
listen_address: 'localhost:5959' listen_address: 'localhost:5959'
auto_attach: true auto_attach: true
logging: logging:
@ -158,6 +160,10 @@ core:
# url: '' # url: ''
"#, "#,
) )
.replace(
"%IPC_DIRECTORY%",
&Settings::get_default_ipc_directory().to_string_lossy(),
)
.replace( .replace(
"%TABLE_STORE_DIRECTORY%", "%TABLE_STORE_DIRECTORY%",
&VeilidConfigTableStore::default().directory, &VeilidConfigTableStore::default().directory,
@ -172,11 +178,11 @@ core:
) )
.replace( .replace(
"%CERTIFICATE_PATH%", "%CERTIFICATE_PATH%",
&VeilidConfigTLS::default().certificate_path &VeilidConfigTLS::default().certificate_path,
) )
.replace( .replace(
"%PRIVATE_KEY_PATH%", "%PRIVATE_KEY_PATH%",
&VeilidConfigTLS::default().private_key_path &VeilidConfigTLS::default().private_key_path,
) )
.replace( .replace(
"%REMOTE_MAX_SUBKEY_CACHE_MEMORY_MB%", "%REMOTE_MAX_SUBKEY_CACHE_MEMORY_MB%",
@ -445,7 +451,9 @@ pub struct Otlp {
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
pub struct ClientApi { pub struct ClientApi {
pub enabled: bool, pub ipc_enabled: bool,
pub ipc_directory: PathBuf,
pub network_enabled: bool,
pub listen_address: NamedSocketAddrs, pub listen_address: NamedSocketAddrs,
} }
@ -798,6 +806,10 @@ impl Settings {
.unwrap_or_else(|| PathBuf::from("./veilid-server.conf")) .unwrap_or_else(|| PathBuf::from("./veilid-server.conf"))
} }
pub fn get_default_ipc_directory() -> PathBuf {
Self::get_or_create_default_directory("ipc")
}
pub fn get_default_remote_max_subkey_cache_memory_mb() -> u32 { pub fn get_default_remote_max_subkey_cache_memory_mb() -> u32 {
let sys = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory()); let sys = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory());
((sys.free_memory() / (1024u64 * 1024u64)) / 16) as u32 ((sys.free_memory() / (1024u64 * 1024u64)) / 16) as u32
@ -854,7 +866,9 @@ impl Settings {
} }
set_config_value!(inner.daemon.enabled, value); set_config_value!(inner.daemon.enabled, value);
set_config_value!(inner.client_api.enabled, value); set_config_value!(inner.client_api.ipc_enabled, value);
set_config_value!(inner.client_api.ipc_directory, value);
set_config_value!(inner.client_api.network_enabled, value);
set_config_value!(inner.client_api.listen_address, value); set_config_value!(inner.client_api.listen_address, value);
set_config_value!(inner.auto_attach, value); set_config_value!(inner.auto_attach, value);
set_config_value!(inner.logging.system.enabled, value); set_config_value!(inner.logging.system.enabled, value);
@ -1021,13 +1035,9 @@ impl Settings {
"protected_store.always_use_insecure_storage" => Ok(Box::new( "protected_store.always_use_insecure_storage" => Ok(Box::new(
inner.core.protected_store.always_use_insecure_storage, inner.core.protected_store.always_use_insecure_storage,
)), )),
"protected_store.directory" => Ok(Box::new( "protected_store.directory" => {
inner Ok(Box::new(inner.core.protected_store.directory.clone()))
.core }
.protected_store
.directory
.clone(),
)),
"protected_store.delete" => Ok(Box::new(inner.core.protected_store.delete)), "protected_store.delete" => Ok(Box::new(inner.core.protected_store.delete)),
"protected_store.device_encryption_key_password" => Ok(Box::new( "protected_store.device_encryption_key_password" => Ok(Box::new(
inner inner
@ -1044,22 +1054,10 @@ impl Settings {
.clone(), .clone(),
)), )),
"table_store.directory" => Ok(Box::new( "table_store.directory" => Ok(Box::new(inner.core.table_store.directory.clone())),
inner
.core
.table_store
.directory
.clone(),
)),
"table_store.delete" => Ok(Box::new(inner.core.table_store.delete)), "table_store.delete" => Ok(Box::new(inner.core.table_store.delete)),
"block_store.directory" => Ok(Box::new( "block_store.directory" => Ok(Box::new(inner.core.block_store.directory.clone())),
inner
.core
.block_store
.directory
.clone(),
)),
"block_store.delete" => Ok(Box::new(inner.core.block_store.delete)), "block_store.delete" => Ok(Box::new(inner.core.block_store.delete)),
"network.connection_initial_timeout_ms" => { "network.connection_initial_timeout_ms" => {
@ -1214,22 +1212,12 @@ impl Settings {
"network.restricted_nat_retries" => { "network.restricted_nat_retries" => {
Ok(Box::new(inner.core.network.restricted_nat_retries)) Ok(Box::new(inner.core.network.restricted_nat_retries))
} }
"network.tls.certificate_path" => Ok(Box::new( "network.tls.certificate_path" => {
inner Ok(Box::new(inner.core.network.tls.certificate_path.clone()))
.core }
.network "network.tls.private_key_path" => {
.tls Ok(Box::new(inner.core.network.tls.private_key_path.clone()))
.certificate_path }
.clone(),
)),
"network.tls.private_key_path" => Ok(Box::new(
inner
.core
.network
.tls
.private_key_path
.clone(),
)),
"network.tls.connection_initial_timeout_ms" => Ok(Box::new( "network.tls.connection_initial_timeout_ms" => Ok(Box::new(
inner.core.network.tls.connection_initial_timeout_ms, inner.core.network.tls.connection_initial_timeout_ms,
)), )),
@ -1439,7 +1427,8 @@ mod tests {
assert_eq!(s.daemon.group, None); assert_eq!(s.daemon.group, None);
assert_eq!(s.daemon.stdout_file, None); assert_eq!(s.daemon.stdout_file, None);
assert_eq!(s.daemon.stderr_file, None); assert_eq!(s.daemon.stderr_file, None);
assert!(s.client_api.enabled); assert!(s.client_api.ipc_enabled);
assert!(!s.client_api.network_enabled);
assert_eq!(s.client_api.listen_address.name, "localhost:5959"); assert_eq!(s.client_api.listen_address.name, "localhost:5959");
assert_eq!( assert_eq!(
s.client_api.listen_address.addrs, s.client_api.listen_address.addrs,

View File

@ -23,6 +23,7 @@ rt-async-std = [
rt-tokio = [ rt-tokio = [
"tokio", "tokio",
"tokio-util", "tokio-util",
"tokio-stream",
"rtnetlink/tokio_socket", "rtnetlink/tokio_socket",
"async_executors/tokio_tp", "async_executors/tokio_tp",
"async_executors/tokio_io", "async_executors/tokio_io",
@ -66,6 +67,7 @@ flume = { version = "0.11.0", features = ["async"] }
async-std = { version = "1.12.0", features = ["unstable"], optional = true } async-std = { version = "1.12.0", features = ["unstable"], optional = true }
tokio = { version = "1.32.0", features = ["full"], optional = true } tokio = { version = "1.32.0", features = ["full"], optional = true }
tokio-util = { version = "0.7.8", features = ["compat"], optional = true } tokio-util = { version = "0.7.8", features = ["compat"], optional = true }
tokio-stream = { version = "0.1.14", features = ["net"], optional = true }
futures-util = { version = "0.3.28", default-features = false, features = [ futures-util = { version = "0.3.28", default-features = false, features = [
"async-await", "async-await",
"sink", "sink",

View File

@ -0,0 +1,11 @@
use cfg_if::*;
cfg_if! {
if #[cfg(unix)] {
mod unix;
pub use unix::*;
} else if #[cfg(windows)] {
mod windows;
pub use windows::*;
}
}

View File

@ -0,0 +1,11 @@
use cfg_if::*;
cfg_if! {
if #[cfg(unix)] {
mod unix;
pub use unix::*;
} else if #[cfg(windows)] {
mod windows;
pub use windows::*;
}
}

View File

@ -0,0 +1,114 @@
use futures_util::AsyncRead as FuturesAsyncRead;
use futures_util::AsyncWrite as FuturesAsyncWrite;
use futures_util::Stream;
use std::{io, path::Path};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
/////////////////////////////////////////////////////////////
pub struct IpcStream {
internal: UnixStream,
}
impl IpcStream {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcStream> {
Ok(IpcStream {
internal: UnixStream::connect(path).await?,
})
}
}
impl FuturesAsyncRead for IpcStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<io::Result<usize>> {
let mut rb = ReadBuf::new(buf);
match <UnixStream as AsyncRead>::poll_read(
std::pin::Pin::new(&mut self.internal),
cx,
&mut rb,
) {
std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map(|_| rb.filled().len())),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
impl FuturesAsyncWrite for IpcStream {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<io::Result<usize>> {
<UnixStream as AsyncWrite>::poll_write(std::pin::Pin::new(&mut self.internal), cx, buf)
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
<UnixStream as AsyncWrite>::poll_flush(std::pin::Pin::new(&mut self.internal), cx)
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
<UnixStream as AsyncWrite>::poll_shutdown(std::pin::Pin::new(&mut self.internal), cx)
}
}
/////////////////////////////////////////////////////////////
pub struct IpcIncoming {
internal: UnixListenerStream,
}
impl Stream for IpcIncoming {
type Item = io::Result<IpcStream>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match <UnixListenerStream as Stream>::poll_next(std::pin::Pin::new(&mut self.internal), cx)
{
std::task::Poll::Ready(ro) => {
std::task::Poll::Ready(ro.map(|rr| rr.map(|s| IpcStream { internal: s })))
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
/////////////////////////////////////////////////////////////
pub struct IpcListener {
internal: UnixListener,
}
impl IpcListener {
/// Creates a new `IpcListener` bound to the specified path.
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Ok(Self {
internal: UnixListener::bind(path)?,
})
}
/// Accepts a new incoming connection to this listener.
pub async fn accept(&self) -> io::Result<IpcStream> {
Ok(IpcStream {
internal: self.internal.accept().await?.0,
})
}
/// Returns a stream of incoming connections.
pub fn incoming(self) -> IpcIncoming {
IpcIncoming {
internal: UnixListenerStream::new(self.internal),
}
}
}

View File

@ -0,0 +1,11 @@
use cfg_if::*;
cfg_if! {
if #[cfg(feature="rt-tokio")] {
mod ipc_tokio;
pub use ipc_tokio::*;
} else if #[cfg(feature="rt-async-std")] {
mod ipc_async_std;
pub use ipc_async_std::*;
}
}

View File

@ -36,6 +36,7 @@ pub mod eventual_value_clone;
pub mod interval; pub mod interval;
pub mod ip_addr_port; pub mod ip_addr_port;
pub mod ip_extra; pub mod ip_extra;
pub mod ipc;
pub mod log_thru; pub mod log_thru;
pub mod must_join_handle; pub mod must_join_handle;
pub mod must_join_single_future; pub mod must_join_single_future;
@ -176,6 +177,8 @@ pub use ip_addr_port::*;
#[doc(inline)] #[doc(inline)]
pub use ip_extra::*; pub use ip_extra::*;
#[doc(inline)] #[doc(inline)]
pub use ipc::*;
#[doc(inline)]
pub use log_thru::*; pub use log_thru::*;
#[doc(inline)] #[doc(inline)]
pub use must_join_handle::*; pub use must_join_handle::*;

View File

@ -314,7 +314,7 @@ cfg_if::cfg_if! {
pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String> pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
{ {
let path = path.as_ref(); let path = path.as_ref();
if !path.exists() { if !path.is_file() {
return Ok(()); return Ok(());
} }
@ -330,6 +330,32 @@ cfg_if::cfg_if! {
} }
Ok(()) Ok(())
} }
pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, group_read: bool) -> Result<(), String>
{
let path = path.as_ref();
if !path.is_dir() {
return Ok(());
}
let uid = Uid::effective();
let gid = Gid::effective();
let meta = std::fs::metadata(path).map_err(|e| format!("unable to get metadata for path: {}", e))?;
let perm = if group_read {
0o750
} else {
0o700
};
if meta.mode() != perm {
std::fs::set_permissions(path,std::fs::Permissions::from_mode(perm)).map_err(|e| format!("unable to set correct permissions on path: {}", e))?;
}
if meta.uid() != uid.as_raw() || meta.gid() != gid.as_raw() {
return Err("path has incorrect owner/group".to_owned());
}
Ok(())
}
} else if #[cfg(windows)] { } else if #[cfg(windows)] {
//use std::os::windows::fs::MetadataExt; //use std::os::windows::fs::MetadataExt;
//use windows_permissions::*; //use windows_permissions::*;