windows specific ipc logic

This commit is contained in:
John Smith 2023-12-15 18:24:53 -05:00 committed by Christien Rioux
parent f47adfa03f
commit d1aa488883
8 changed files with 307 additions and 30 deletions

View File

@ -8,7 +8,6 @@ use crate::{settings::NamedSocketAddrs, tools::*};
use clap::{Parser, ValueEnum}; use clap::{Parser, ValueEnum};
use flexi_logger::*; use flexi_logger::*;
use std::path::PathBuf; use std::path::PathBuf;
mod cached_text_view; mod cached_text_view;
mod client_api_connection; mod client_api_connection;
mod command_processor; mod command_processor;
@ -137,15 +136,33 @@ fn main() -> Result<(), String> {
// Determine IPC path to try // Determine IPC path to try
let mut client_api_ipc_path = None; let mut client_api_ipc_path = None;
if enable_ipc { if enable_ipc {
cfg_if::cfg_if! {
if #[cfg(windows)] {
if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) {
if ipc_path.exists() && !ipc_path.is_dir() { if is_ipc_socket_path(&ipc_path) {
// try direct path
enable_network = false;
client_api_ipc_path = Some(ipc_path);
} else {
// try subnode index inside path
let ipc_path = ipc_path.join(args.subnode_index.to_string());
if is_ipc_socket_path(&ipc_path) {
// subnode indexed path exists
enable_network = false;
client_api_ipc_path = Some(ipc_path);
}
}
}
} else {
if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) {
if is_ipc_socket_path(&ipc_path) {
// try direct path // try direct path
enable_network = false; enable_network = false;
client_api_ipc_path = Some(ipc_path); client_api_ipc_path = Some(ipc_path);
} else if ipc_path.exists() && ipc_path.is_dir() { } else if ipc_path.exists() && ipc_path.is_dir() {
// try subnode index inside path // try subnode index inside path
let ipc_path = ipc_path.join(args.subnode_index.to_string()); let ipc_path = ipc_path.join(args.subnode_index.to_string());
if ipc_path.exists() && !ipc_path.is_dir() { if is_ipc_socket_path(&ipc_path) {
// subnode indexed path exists // subnode indexed path exists
enable_network = false; enable_network = false;
client_api_ipc_path = Some(ipc_path); client_api_ipc_path = Some(ipc_path);
@ -153,6 +170,8 @@ fn main() -> Result<(), String> {
} }
} }
} }
}
}
let mut client_api_network_addresses = None; let mut client_api_network_addresses = None;
if enable_network { if enable_network {
let args_address = if let Some(args_address) = args.address { let args_address = if let Some(args_address) = args.address {

View File

@ -229,6 +229,7 @@ pub struct Settings {
} }
impl Settings { impl Settings {
#[allow(dead_code)]
fn get_server_default_directory(subpath: &str) -> PathBuf { fn get_server_default_directory(subpath: &str) -> PathBuf {
#[cfg(unix)] #[cfg(unix)]
{ {
@ -249,8 +250,14 @@ impl Settings {
} }
pub fn get_default_ipc_directory() -> PathBuf { pub fn get_default_ipc_directory() -> PathBuf {
cfg_if::cfg_if! {
if #[cfg(windows)] {
PathBuf::from(r"\\.\PIPE\veilid-server")
} else {
Self::get_server_default_directory("ipc") Self::get_server_default_directory("ipc")
} }
}
}
pub fn get_default_config_path() -> PathBuf { pub fn get_default_config_path() -> PathBuf {
// Get default configuration file location // Get default configuration file location

View File

@ -805,7 +805,13 @@ impl UI {
edit.set_enabled(ipc_path_enabled); edit.set_enabled(ipc_path_enabled);
let mut dlg = Self::connection_dialog(s); let mut dlg = Self::connection_dialog(s);
dlg.add_button("Connect", Self::submit_network_address); dlg.add_button("Connect", |s| {
if Self::ipc_path_radio(s).is_selected() {
Self::submit_ipc_path(s);
} else {
Self::submit_network_address(s);
}
});
} }
ConnectionState::ConnectedTCP(_, _) | ConnectionState::ConnectedIPC(_, _) => {} ConnectionState::ConnectedTCP(_, _) | ConnectionState::ConnectedIPC(_, _) => {}
ConnectionState::RetryingTCP(addr, _) => { ConnectionState::RetryingTCP(addr, _) => {

View File

@ -61,10 +61,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: collection name: collection
sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a sha256: f092b211a4319e98e5ff58223576de6c2803db36221657b46c82574721240687
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.18.0" version: "1.17.2"
convert: convert:
dependency: transitive dependency: transitive
description: description:
@ -220,10 +220,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: meta name: meta
sha256: a6e590c838b18133bb482a2745ad77c5bb7715fb0451209e1a7567d416678b8e sha256: "3c74dbf8763d36539f114c799d8a2d87343b5067e9d796ca22b5eb8437090ee3"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.10.0" version: "1.9.1"
path: path:
dependency: "direct main" dependency: "direct main"
description: description:
@ -329,18 +329,18 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: stack_trace name: stack_trace
sha256: "73713990125a6d93122541237550ee3352a2d84baad52d375a4cad2eb9b7ce0b" sha256: c3c7d8edb15bee7f0f74debd4b9c5f3c2ea86766fe4178eb2a18eb30a0bdaed5
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "1.11.1" version: "1.11.0"
stream_channel: stream_channel:
dependency: transitive dependency: transitive
description: description:
name: stream_channel name: stream_channel
sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7 sha256: "83615bee9045c1d322bbbd1ba209b7a749c2cbcdcb3fdd1df8eb488b3279c1c8"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "2.1.2" version: "2.1.1"
string_scanner: string_scanner:
dependency: transitive dependency: transitive
description: description:
@ -377,10 +377,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: test_api name: test_api
sha256: "5c2f730018264d276c20e4f1503fd1308dfbbae39ec8ee63c5236311ac06954b" sha256: "75760ffd7786fffdfb9597c35c5b27eaeec82be8edfb6d71d32651128ed7aab8"
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.6.1" version: "0.6.0"
typed_data: typed_data:
dependency: transitive dependency: transitive
description: description:
@ -408,10 +408,10 @@ packages:
dependency: transitive dependency: transitive
description: description:
name: web name: web
sha256: afe077240a270dcfd2aafe77602b4113645af95d0ad31128cc02bce5ac5d5152 sha256: dc8ccd225a2005c1be616fe02951e2e342092edf968cf0844220383757ef8f10
url: "https://pub.dev" url: "https://pub.dev"
source: hosted source: hosted
version: "0.3.0" version: "0.1.4-beta"
win32: win32:
dependency: transitive dependency: transitive
description: description:
@ -437,5 +437,5 @@ packages:
source: hosted source: hosted
version: "3.5.0" version: "3.5.0"
sdks: sdks:
dart: ">=3.2.0-194.0.dev <4.0.0" dart: ">=3.1.0-185.0.dev <4.0.0"
flutter: ">=3.10.6" flutter: ">=3.10.6"

View File

@ -807,8 +807,14 @@ impl Settings {
} }
pub fn get_default_ipc_directory() -> PathBuf { pub fn get_default_ipc_directory() -> PathBuf {
cfg_if! {
if #[cfg(windows)] {
PathBuf::from(r"\\.\PIPE\veilid-server")
} else {
Self::get_or_create_default_directory("ipc") Self::get_or_create_default_directory("ipc")
} }
}
}
pub fn get_default_remote_max_subkey_cache_memory_mb() -> u32 { pub fn get_default_remote_max_subkey_cache_memory_mb() -> u32 {
let sys = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory()); let sys = sysinfo::System::new_with_specifics(sysinfo::RefreshKind::new().with_memory());

View File

@ -0,0 +1,192 @@
use crate::*;
use futures_util::stream::FuturesUnordered;
use futures_util::AsyncRead as FuturesAsyncRead;
use futures_util::AsyncWrite as FuturesAsyncWrite;
use futures_util::Stream;
use std::path::PathBuf;
use std::{io, path::Path};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::windows::named_pipe::{
ClientOptions, NamedPipeClient, NamedPipeServer, ServerOptions,
};
/////////////////////////////////////////////////////////////
enum IpcStreamInternal {
Client(NamedPipeClient),
Server(NamedPipeServer),
}
pub struct IpcStream {
internal: IpcStreamInternal,
}
impl IpcStream {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<IpcStream> {
Ok(IpcStream {
internal: IpcStreamInternal::Client(
ClientOptions::new().open(path.as_ref().to_path_buf().as_os_str())?,
),
})
}
}
impl FuturesAsyncRead for IpcStream {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<io::Result<usize>> {
match &mut self.internal {
IpcStreamInternal::Client(client) => {
let mut rb = ReadBuf::new(buf);
match <NamedPipeClient as AsyncRead>::poll_read(
std::pin::Pin::new(client),
cx,
&mut rb,
) {
std::task::Poll::Ready(r) => {
std::task::Poll::Ready(r.map(|_| rb.filled().len()))
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
IpcStreamInternal::Server(server) => {
let mut rb = ReadBuf::new(buf);
match <NamedPipeServer as AsyncRead>::poll_read(
std::pin::Pin::new(server),
cx,
&mut rb,
) {
std::task::Poll::Ready(r) => {
std::task::Poll::Ready(r.map(|_| rb.filled().len()))
}
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}
}
}
impl FuturesAsyncWrite for IpcStream {
fn poll_write(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<io::Result<usize>> {
match &mut self.internal {
IpcStreamInternal::Client(client) => {
<NamedPipeClient as AsyncWrite>::poll_write(std::pin::Pin::new(client), cx, buf)
}
IpcStreamInternal::Server(server) => {
<NamedPipeServer as AsyncWrite>::poll_write(std::pin::Pin::new(server), cx, buf)
}
}
}
fn poll_flush(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
match &mut self.internal {
IpcStreamInternal::Client(client) => {
<NamedPipeClient as AsyncWrite>::poll_flush(std::pin::Pin::new(client), cx)
}
IpcStreamInternal::Server(server) => {
<NamedPipeServer as AsyncWrite>::poll_flush(std::pin::Pin::new(server), cx)
}
}
}
fn poll_close(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<io::Result<()>> {
match &mut self.internal {
IpcStreamInternal::Client(client) => {
<NamedPipeClient as AsyncWrite>::poll_shutdown(std::pin::Pin::new(client), cx)
}
IpcStreamInternal::Server(server) => {
<NamedPipeServer as AsyncWrite>::poll_shutdown(std::pin::Pin::new(server), cx)
}
}
}
}
/////////////////////////////////////////////////////////////
pub struct IpcIncoming {
listener: Arc<IpcListener>,
unord: FuturesUnordered<SendPinBoxFuture<io::Result<IpcStream>>>,
}
impl Stream for IpcIncoming {
type Item = io::Result<IpcStream>;
fn poll_next<'a>(
mut self: std::pin::Pin<&'a mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.unord.is_empty() {
self.unord.push(Box::pin(self.listener.accept()));
}
match Pin::new(&mut self.unord).poll_next(cx) {
task::Poll::Ready(ro) => {
self.unord.push(Box::pin(self.listener.accept()));
std::task::Poll::Ready(ro)
}
task::Poll::Pending => std::task::Poll::Pending,
}
}
}
/////////////////////////////////////////////////////////////
pub struct IpcListener {
path: Option<PathBuf>,
internal: Mutex<Option<NamedPipeServer>>,
}
impl IpcListener {
/// Creates a new `IpcListener` bound to the specified path.
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<Self> {
let path = path.as_ref().to_path_buf();
let server = ServerOptions::new()
.first_pipe_instance(true)
.create(&path)?;
Ok(Self {
path: Some(path),
internal: Mutex::new(Some(server)),
})
}
/// Accepts a new incoming connection to this listener.
pub fn accept(&self) -> SendPinBoxFuture<io::Result<IpcStream>> {
let mut opt_server = self.internal.lock();
let Some(server) = opt_server.take() else {
return Box::pin(std::future::ready(Err(io::Error::from(
io::ErrorKind::BrokenPipe,
))));
};
let path = self.path.clone().unwrap();
*opt_server = match ServerOptions::new().create(path) {
Ok(v) => Some(v),
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Box::pin(async move {
server.connect().await?;
Ok(IpcStream {
internal: IpcStreamInternal::Server(server),
})
})
}
/// Returns a stream of incoming connections.
pub fn incoming(self) -> IpcIncoming {
IpcIncoming {
listener: Arc::new(self),
unord: FuturesUnordered::new(),
}
}
}

View File

@ -1,4 +1,5 @@
use cfg_if::*; use cfg_if::*;
use std::path::Path;
cfg_if! { cfg_if! {
if #[cfg(feature="rt-tokio")] { if #[cfg(feature="rt-tokio")] {
@ -9,3 +10,23 @@ cfg_if! {
pub use ipc_async_std::*; pub use ipc_async_std::*;
} }
} }
pub fn is_ipc_socket_path<P: AsRef<Path>>(path: P) -> bool {
cfg_if! {
if #[cfg(windows)] {
let p = path.as_ref().to_path_buf().to_string_lossy().to_string().to_lowercase();
p.starts_with(r"\\.\pipe") && path.as_ref().exists()
} else if #[cfg(unix)] {
use std::os::unix::fs::FileTypeExt;
let meta = match std::fs::metadata(path) {
Ok(v) => v,
Err(_) => {
return false;
}
};
meta.file_type().is_socket()
} else {
false
}
}
}

View File

@ -363,15 +363,41 @@ cfg_if::cfg_if! {
pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String> pub fn ensure_file_private_owner<P:AsRef<Path>>(path: P) -> Result<(), String>
{ {
let path = path.as_ref(); let path = path.as_ref();
if !path.exists() { if !path.is_file() {
return Ok(()); return Ok(());
} }
Ok(()) Ok(())
} }
pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
{
let path = path.as_ref();
if !path.is_dir() {
return Ok(());
}
Ok(())
}
} else { } else {
pub fn ensure_file_private_owner<P:AsRef<Path>>(_path: P) -> Result<(), String> pub fn ensure_file_private_owner<P:AsRef<Path>>(_path: P) -> Result<(), String>
{ {
let path = path.as_ref();
if !path.is_file() {
return Ok(());
}
Ok(())
}
pub fn ensure_directory_private_owner<P:AsRef<Path>>(path: P, _group_read: bool) -> Result<(), String>
{
let path = path.as_ref();
if !path.is_dir() {
return Ok(());
}
Ok(()) Ok(())
} }
} }