diff --git a/Cargo.lock b/Cargo.lock index 1ef146e4..90bfa7c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5770,9 +5770,12 @@ dependencies = [ [[package]] name = "veilid-bugsalot" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9ee584edf237fac328b891dd06c21e7914a1db3762907edc366a13803451fe3" +checksum = "2836acd414bd560c55c906a636c3bca7f080a8fc21802f18616d6be380819ddc" +dependencies = [ + "libc", +] [[package]] name = "veilid-cli" diff --git a/veilid-cli/Cargo.toml b/veilid-cli/Cargo.toml index 09be53b4..59f304f0 100644 --- a/veilid-cli/Cargo.toml +++ b/veilid-cli/Cargo.toml @@ -50,7 +50,7 @@ serde_derive = "^1" parking_lot = "^0" cfg-if = "^1" config = { version = "^0", features = ["yaml"] } -bugsalot = { package = "veilid-bugsalot", version = "0.1.0" } +bugsalot = { package = "veilid-bugsalot", version = "0.2.0" } flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] } thiserror = "^1" crossbeam-channel = "^0" diff --git a/veilid-cli/src/command_processor.rs b/veilid-cli/src/command_processor.rs index d81432d7..fec2e21c 100644 --- a/veilid-cli/src/command_processor.rs +++ b/veilid-cli/src/command_processor.rs @@ -414,7 +414,7 @@ Server Debug Commands: //////////////////////////////////////////// pub fn log_message(&self, log_level: Level, message: &str) { - self.inner().ui_sender.add_node_event(log_level, message); + self.inner().ui_sender.add_log_event(log_level, message); } pub fn update_attachment(&self, attachment: &json::JsonValue) { @@ -481,7 +481,7 @@ Server Debug Commands: pub fn update_log(&self, log: &json::JsonValue) { let log_level = Level::from_str(log["log_level"].as_str().unwrap_or("error")).unwrap_or(Level::Error); - self.inner().ui_sender.add_node_event( + self.inner().ui_sender.add_log_event( log_level, &format!( "{}: {}{}", diff --git a/veilid-cli/src/cursive_ui.rs b/veilid-cli/src/cursive_ui.rs index 127c39bc..7ce43094 100644 --- a/veilid-cli/src/cursive_ui.rs +++ b/veilid-cli/src/cursive_ui.rs @@ -1370,6 +1370,22 @@ impl UISender for CursiveUISender { ), ); } + fn add_log_event(&self, log_color: Level, event: &str) { + let color = { + let inner = self.inner.lock(); + *inner.log_colors.get(&log_color).unwrap() + }; + + let _ = self.push_styled_lines( + color.into(), + format!( + "{}: {}\n", + CursiveUI::cli_ts(CursiveUI::get_start_time()), + event + ), + ); + } + } impl CursiveUISender { pub fn push_styled(&self, styled_string: StyledString) -> std::io::Result<()> { diff --git a/veilid-cli/src/interactive_ui.rs b/veilid-cli/src/interactive_ui.rs index 679b9f1c..2b0679f3 100644 --- a/veilid-cli/src/interactive_ui.rs +++ b/veilid-cli/src/interactive_ui.rs @@ -114,7 +114,6 @@ impl UI for InteractiveUI { let mut inner = self.inner.lock(); inner.cmdproc = Some(cmdproc); } - // Note: Cursive is not re-entrant, can't borrow_mut self.siv again after this fn run_async(&mut self) -> Pin>> { let this = self.clone(); Box::pin(async move { @@ -139,7 +138,12 @@ impl UISender for InteractiveUISender { } fn display_string_dialog(&self, title: &str, text: &str, close_cb: UICallback) { - println!("{}: {}", title, text); + let Some(mut stdout) = self.inner.lock().stdout.clone() else { + return; + }; + if let Err(e) = writeln!(stdout, "{}: {}", title, text) { + self.inner.lock().error = Some(e.to_string()); + } if let UICallback::Interactive(mut close_cb) = close_cb { close_cb() } @@ -186,4 +190,5 @@ impl UISender for InteractiveUISender { self.inner.lock().error = Some(e.to_string()); } } + fn add_log_event(&self, _log_color: Level, _event: &str) {} } diff --git a/veilid-cli/src/io_read_write_ui.rs b/veilid-cli/src/io_read_write_ui.rs new file mode 100644 index 00000000..92b14700 --- /dev/null +++ b/veilid-cli/src/io_read_write_ui.rs @@ -0,0 +1,268 @@ +use crate::command_processor::*; +use crate::settings::*; +use crate::tools::*; +use crate::ui::*; + +use futures::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; +use stop_token::future::FutureExt as StopTokenFutureExt; +use stop_token::*; +use veilid_tools::AsyncMutex; + +use flexi_logger::writers::LogWriter; + +static FINISHED_LINE: &str = "\x7F ===FINISHED=== \x7F"; + +pub type IOReadWriteUICallback = Box; + +pub struct IOReadWriteUIInner { + cmdproc: Option, + in_io: Arc>>, + out_io: Arc>>, + out_receiver: flume::Receiver, + out_sender: flume::Sender, + done: Option, + connection_state_receiver: flume::Receiver, +} + +pub struct IOReadWriteUI { + inner: Arc>>, +} +impl Clone for IOReadWriteUI { + fn clone(&self) -> Self { + IOReadWriteUI { + inner: self.inner.clone(), + } + } +} + +impl IOReadWriteUI { + pub fn new(_settings: &Settings, in_io: R, out_io: W) -> (Self, IOReadWriteUISender) { + // Create the UI object + let (sender, receiver) = flume::unbounded::(); + let (cssender, csreceiver) = flume::unbounded::(); + let this = Self { + inner: Arc::new(Mutex::new(IOReadWriteUIInner { + cmdproc: None, + in_io: Arc::new(AsyncMutex::new(BufReader::new(in_io))), + out_io: Arc::new(AsyncMutex::new(BufWriter::new(out_io))), + out_receiver: receiver, + out_sender: sender.clone(), + connection_state_receiver: csreceiver, + done: Some(StopSource::new()), + })), + }; + + let ui_sender = IOReadWriteUISender { + inner: this.inner.clone(), + out_sender: sender, + connection_state_sender: cssender, + }; + + (this, ui_sender) + } + + pub async fn output_loop(&self) { + let out_receiver = self.inner.lock().out_receiver.clone(); + let out_io = self.inner.lock().out_io.clone(); + + let mut out = out_io.lock().await; + let done = self.inner.lock().done.as_ref().unwrap().token(); + + while let Ok(Ok(line)) = out_receiver.recv_async().timeout_at(done.clone()).await { + if line == FINISHED_LINE { + break; + } + let line = format!("{}\n", line); + if let Err(e) = out.write_all(line.as_bytes()).await { + eprintln!("Error: {:?}", e); + break; + } + if let Err(e) = out.flush().await { + eprintln!("Error: {:?}", e); + break; + } + } + } + + pub async fn command_loop(&self) { + let (in_io, out_sender, connection_state_receiver) = { + let inner = self.inner.lock(); + ( + inner.in_io.clone(), + inner.out_sender.clone(), + inner.connection_state_receiver.clone(), + ) + }; + let mut in_io = in_io.lock().await; + + let done = self.inner.lock().done.as_ref().unwrap().token(); + let (exec_sender, exec_receiver) = flume::bounded(1); + + // Wait for connection to be established + loop { + match connection_state_receiver.recv_async().await { + Ok(ConnectionState::ConnectedTCP(_, _)) + | Ok(ConnectionState::ConnectedIPC(_, _)) => { + break; + } + Ok(ConnectionState::RetryingTCP(_, _)) | Ok(ConnectionState::RetryingIPC(_, _)) => { + } + Ok(ConnectionState::Disconnected) => {} + Err(e) => { + eprintln!("Error: {:?}", e); + self.inner.lock().done.take(); + break; + } + } + } + + // Process the input + loop { + let mut line = String::new(); + match in_io.read_line(&mut line).timeout_at(done.clone()).await { + Ok(Ok(bytes)) => { + if bytes == 0 { + // Clean exit after everything else is sent + if let Err(e) = out_sender.send(FINISHED_LINE.to_string()) { + eprintln!("Error: {:?}", e); + self.inner.lock().done.take(); + } + break; + } + let line = line.trim(); + if !line.is_empty() { + let cmdproc = self.inner.lock().cmdproc.clone(); + if let Some(cmdproc) = &cmdproc { + // Run command + if let Err(e) = cmdproc.run_command( + line, + UICallback::IOReadWrite(Box::new({ + let exec_sender = exec_sender.clone(); + move || { + // Let the next command execute + if let Err(e) = exec_sender.send(()) { + eprintln!("Error: {:?}", e); + } + } + })), + ) { + eprintln!("Error: {:?}", e); + self.inner.lock().done.take(); + break; + } + // Wait until command is done executing before running the next line + if let Err(e) = exec_receiver.recv_async().await { + eprintln!("Error: {:?}", e); + self.inner.lock().done.take(); + break; + } + } + } + } + Ok(Err(e)) => { + eprintln!("IO Error: {:?}", e); + self.inner.lock().done.take(); + break; + } + Err(_) => { + break; + } + } + } + } +} + +impl UI + for IOReadWriteUI +{ + fn set_command_processor(&mut self, cmdproc: CommandProcessor) { + let mut inner = self.inner.lock(); + inner.cmdproc = Some(cmdproc); + } + fn run_async(&mut self) -> Pin>> { + let this = self.clone(); + Box::pin(async move { + let out_fut = this.output_loop(); + let cmd_fut = this.command_loop(); + futures::join!(out_fut, cmd_fut); + }) + } +} + +////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone)] +pub struct IOReadWriteUISender { + inner: Arc>>, + out_sender: flume::Sender, + connection_state_sender: flume::Sender, +} + +impl UISender + for IOReadWriteUISender +{ + fn clone_uisender(&self) -> Box { + Box::new(IOReadWriteUISender { + inner: self.inner.clone(), + out_sender: self.out_sender.clone(), + connection_state_sender: self.connection_state_sender.clone(), + }) + } + fn as_logwriter(&self) -> Option> { + None + } + + fn display_string_dialog(&self, title: &str, text: &str, close_cb: UICallback) { + if let Err(e) = self.out_sender.send(format!("{}: {}", title, text)) { + eprintln!("Error: {:?}", e); + self.inner.lock().done.take(); + } + if let UICallback::IOReadWrite(mut close_cb) = close_cb { + close_cb() + } + } + + fn quit(&self) { + self.inner.lock().done.take(); + } + + fn send_callback(&self, callback: UICallback) { + if let UICallback::IOReadWrite(mut callback) = callback { + callback(); + } + } + fn set_attachment_state( + &mut self, + _state: &str, + _public_internet_ready: bool, + _local_network_ready: bool, + ) { + // + } + fn set_network_status( + &mut self, + _started: bool, + _bps_down: u64, + _bps_up: u64, + mut _peers: Vec, + ) { + // + } + fn set_config(&mut self, _config: &json::JsonValue) { + // + } + fn set_connection_state(&mut self, state: ConnectionState) { + if let Err(e) = self.connection_state_sender.send(state) { + eprintln!("Error: {:?}", e); + self.inner.lock().done.take(); + } + } + + fn add_node_event(&self, _log_color: Level, event: &str) { + if let Err(e) = self.out_sender.send(format!("{}\n", event)) { + eprintln!("Error: {:?}", e); + self.inner.lock().done.take(); + } + } + fn add_log_event(&self, _log_color: Level, _event: &str) {} +} diff --git a/veilid-cli/src/main.rs b/veilid-cli/src/main.rs index e1e3b3ad..f8fff5f0 100644 --- a/veilid-cli/src/main.rs +++ b/veilid-cli/src/main.rs @@ -8,11 +8,13 @@ use crate::{settings::NamedSocketAddrs, tools::*, ui::*}; use clap::{Parser, ValueEnum}; use flexi_logger::*; use std::path::PathBuf; + mod cached_text_view; mod client_api_connection; mod command_processor; mod cursive_ui; mod interactive_ui; +mod io_read_write_ui; mod peers_table_view; mod settings; mod tools; @@ -67,199 +69,249 @@ struct CmdlineArgs { } fn main() -> Result<(), String> { - // Get command line options - let default_config_path = settings::Settings::get_default_config_path(); - let args = CmdlineArgs::parse(); + // Start async + block_on(async move { + // Get command line options + let default_config_path = settings::Settings::get_default_config_path(); + let args = CmdlineArgs::parse(); - if args.wait_for_debug { - use bugsalot::debugger; - debugger::wait_until_attached(None).expect("state() not implemented on this platform"); - } - - // Attempt to load configuration - let settings_path = args.config_file.unwrap_or(default_config_path); - let settings_path = if settings_path.exists() { - Some(settings_path.into_os_string()) - } else { - None - }; - - let mut settings = settings::Settings::new(settings_path.as_deref()) - .map_err(|e| format!("configuration is invalid: {}", e))?; - - // Set config from command line - if let Some(LogLevel::Debug) = args.log_level { - settings.logging.level = settings::LogLevel::Debug; - settings.logging.terminal.enabled = true; - } - if let Some(LogLevel::Trace) = args.log_level { - settings.logging.level = settings::LogLevel::Trace; - settings.logging.terminal.enabled = true; - } - - // If we are running in interactive mode disable some things - let mut enable_cursive = true; - if args.interactive || args.show_log || args.command_file.is_some() || args.evaluate.is_some() { - settings.logging.terminal.enabled = false; - enable_cursive = false; - } - - // Create UI object - let (mut ui, uisender) = if enable_cursive { - let (ui, uisender) = cursive_ui::CursiveUI::new(&settings); - ( - Box::new(ui) as Box, - Box::new(uisender) as Box, - ) - } else if args.interactive { - let (ui, uisender) = interactive_ui::InteractiveUI::new(&settings); - ( - Box::new(ui) as Box, - Box::new(uisender) as Box, - ) - } else { - panic!("unknown ui mode"); - }; - - // Set up loggers - { - let mut specbuilder = LogSpecBuilder::new(); - specbuilder.default(settings::convert_loglevel(settings.logging.level)); - specbuilder.module("cursive", LevelFilter::Off); - specbuilder.module("cursive_core", LevelFilter::Off); - specbuilder.module("cursive_buffered_backend", LevelFilter::Off); - specbuilder.module("tokio_util", LevelFilter::Off); - specbuilder.module("mio", LevelFilter::Off); - specbuilder.module("async_std", LevelFilter::Off); - specbuilder.module("async_io", LevelFilter::Off); - specbuilder.module("polling", LevelFilter::Off); - - let logger = Logger::with(specbuilder.build()); - - if settings.logging.terminal.enabled { - if settings.logging.file.enabled { - std::fs::create_dir_all(settings.logging.file.directory.clone()) - .map_err(map_to_string)?; - logger - .log_to_file_and_writer( - FileSpec::default() - .directory(settings.logging.file.directory.clone()) - .suppress_timestamp(), - uisender.as_logwriter().unwrap(), - ) - .start() - .expect("failed to initialize logger!"); - } else { - logger - .log_to_writer(uisender.as_logwriter().unwrap()) - .start() - .expect("failed to initialize logger!"); - } - } else if settings.logging.file.enabled { - std::fs::create_dir_all(settings.logging.file.directory.clone()) - .map_err(map_to_string)?; - logger - .log_to_file( - FileSpec::default() - .directory(settings.logging.file.directory.clone()) - .suppress_timestamp(), - ) - .start() - .expect("failed to initialize logger!"); + if args.wait_for_debug { + use bugsalot::debugger; + debugger::wait_until_attached(None).expect("state() not implemented on this platform"); } - } - // Get client address - let enable_ipc = (settings.enable_ipc && args.address.is_none()) || args.ipc_path.is_some(); - let mut enable_network = - (settings.enable_network && args.ipc_path.is_none()) || args.address.is_some(); - - // Determine IPC path to try - let mut client_api_ipc_path = None; - if enable_ipc { - cfg_if::cfg_if! { - if #[cfg(windows)] { - if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { - if is_ipc_socket_path(&ipc_path) { - // try direct path - enable_network = false; - client_api_ipc_path = Some(ipc_path); - } else { - // try subnode index inside path - let ipc_path = ipc_path.join(args.subnode_index.to_string()); - if is_ipc_socket_path(&ipc_path) { - // subnode indexed path exists - enable_network = false; - client_api_ipc_path = Some(ipc_path); - } - } - } - } else { - if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { - if is_ipc_socket_path(&ipc_path) { - // try direct path - enable_network = false; - client_api_ipc_path = Some(ipc_path); - } else if ipc_path.exists() && ipc_path.is_dir() { - // try subnode index inside path - let ipc_path = ipc_path.join(args.subnode_index.to_string()); - if is_ipc_socket_path(&ipc_path) { - // subnode indexed path exists - enable_network = false; - client_api_ipc_path = Some(ipc_path); - } - } - } - } - } - } - let mut client_api_network_addresses = None; - if enable_network { - let args_address = if let Some(args_address) = args.address { - match NamedSocketAddrs::try_from(args_address) { - Ok(v) => Some(v), - Err(e) => { - return Err(format!("Invalid server address: {}", e)); - } - } + // Attempt to load configuration + let settings_path = args.config_file.unwrap_or(default_config_path); + let settings_path = if settings_path.exists() { + Some(settings_path.into_os_string()) } else { None }; - if let Some(address_arg) = args_address.or(settings.address.clone()) { - client_api_network_addresses = Some(address_arg.addrs); - } else if let Some(address) = settings.address.clone() { - client_api_network_addresses = Some(address.addrs.clone()); + + let mut settings = settings::Settings::new(settings_path.as_deref()) + .map_err(|e| format!("configuration is invalid: {}", e))?; + + // Set config from command line + if let Some(LogLevel::Debug) = args.log_level { + settings.logging.level = settings::LogLevel::Debug; + settings.logging.terminal.enabled = true; + } + if let Some(LogLevel::Trace) = args.log_level { + settings.logging.level = settings::LogLevel::Trace; + settings.logging.terminal.enabled = true; } - } - // Create command processor - debug!("Creating Command Processor "); - let comproc = command_processor::CommandProcessor::new(uisender, &settings); + // If we are running in interactive mode disable some things + let mut enable_cursive = true; + if args.interactive + || args.show_log + || args.command_file.is_some() + || args.evaluate.is_some() + { + settings.logging.terminal.enabled = false; + enable_cursive = false; + } - ui.set_command_processor(comproc.clone()); + // Create UI object + let (mut ui, uisender) = if enable_cursive { + let (ui, uisender) = cursive_ui::CursiveUI::new(&settings); + ( + Box::new(ui) as Box, + Box::new(uisender) as Box, + ) + } else if args.interactive { + let (ui, uisender) = interactive_ui::InteractiveUI::new(&settings); + ( + Box::new(ui) as Box, + Box::new(uisender) as Box, + ) + } else if let Some(command_file) = args.command_file { + cfg_if! { + if #[cfg(feature="rt-async-std")] { + use async_std::prelude::*; + } else if #[cfg(feature="rt-tokio")] { + use tokio_util::compat::{TokioAsyncWriteCompatExt, TokioAsyncReadCompatExt}; + let (in_obj, out_obj) = + if command_file.to_string_lossy() == "-" { + (Box::pin(tokio::io::stdin().compat()) as Pin>, tokio::io::stdout().compat_write()) + } else { + let f = match tokio::fs::File::open(command_file).await { + Ok(v) => v, + Err(e) => { + return Err(e.to_string()); + } + }; + (Box::pin(f.compat()) as Pin>, tokio::io::stdout().compat_write()) + }; + } else { + compile_error!("needs executor implementation") + } + } - // Create client api client side - info!("Starting API connection"); - let capi = client_api_connection::ClientApiConnection::new(comproc.clone()); + let (ui, uisender) = io_read_write_ui::IOReadWriteUI::new(&settings, in_obj, out_obj); + ( + Box::new(ui) as Box, + Box::new(uisender) as Box, + ) + } else if let Some(evaluate) = args.evaluate { + cfg_if! { + if #[cfg(feature="rt-async-std")] { + use async_std::prelude::*; + } else if #[cfg(feature="rt-tokio")] { + use tokio_util::compat::{TokioAsyncWriteCompatExt}; + let in_str = format!("{}\n", evaluate); + let (in_obj, out_obj) = (futures::io::Cursor::new(in_str), tokio::io::stdout().compat_write()); + } else { + compile_error!("needs executor implementation") + } + } - // Save client api in command processor - comproc.set_client_api_connection(capi.clone()); + let (ui, uisender) = io_read_write_ui::IOReadWriteUI::new(&settings, in_obj, out_obj); + ( + Box::new(ui) as Box, + Box::new(uisender) as Box, + ) + } else { + panic!("unknown ui mode"); + }; - // Keep a connection to the server - if let Some(client_api_ipc_path) = client_api_ipc_path { - comproc.set_ipc_path(Some(client_api_ipc_path)); - } else if let Some(client_api_network_address) = client_api_network_addresses { - let network_addr = client_api_network_address.first().cloned(); - comproc.set_network_address(network_addr); - } else { - return Err("veilid-server could not be reached".to_owned()); - } + // Set up loggers + { + let mut specbuilder = LogSpecBuilder::new(); + specbuilder.default(settings::convert_loglevel(settings.logging.level)); + specbuilder.module("cursive", LevelFilter::Off); + specbuilder.module("cursive_core", LevelFilter::Off); + specbuilder.module("cursive_buffered_backend", LevelFilter::Off); + specbuilder.module("tokio_util", LevelFilter::Off); + specbuilder.module("mio", LevelFilter::Off); + specbuilder.module("async_std", LevelFilter::Off); + specbuilder.module("async_io", LevelFilter::Off); + specbuilder.module("polling", LevelFilter::Off); - let comproc2 = comproc.clone(); - let connection_future = comproc.connection_manager(); + let logger = Logger::with(specbuilder.build()); + + if settings.logging.terminal.enabled { + if settings.logging.file.enabled { + std::fs::create_dir_all(settings.logging.file.directory.clone()) + .map_err(map_to_string)?; + logger + .log_to_file_and_writer( + FileSpec::default() + .directory(settings.logging.file.directory.clone()) + .suppress_timestamp(), + uisender.as_logwriter().unwrap(), + ) + .start() + .expect("failed to initialize logger!"); + } else { + logger + .log_to_writer(uisender.as_logwriter().unwrap()) + .start() + .expect("failed to initialize logger!"); + } + } else if settings.logging.file.enabled { + std::fs::create_dir_all(settings.logging.file.directory.clone()) + .map_err(map_to_string)?; + logger + .log_to_file( + FileSpec::default() + .directory(settings.logging.file.directory.clone()) + .suppress_timestamp(), + ) + .start() + .expect("failed to initialize logger!"); + } + } + + // Get client address + let enable_ipc = (settings.enable_ipc && args.address.is_none()) || args.ipc_path.is_some(); + let mut enable_network = + (settings.enable_network && args.ipc_path.is_none()) || args.address.is_some(); + + // Determine IPC path to try + let mut client_api_ipc_path = None; + if enable_ipc { + cfg_if::cfg_if! { + if #[cfg(windows)] { + if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { + if is_ipc_socket_path(&ipc_path) { + // try direct path + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } else { + // try subnode index inside path + let ipc_path = ipc_path.join(args.subnode_index.to_string()); + if is_ipc_socket_path(&ipc_path) { + // subnode indexed path exists + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } + } + } + } else { + if let Some(ipc_path) = args.ipc_path.or(settings.ipc_path.clone()) { + if is_ipc_socket_path(&ipc_path) { + // try direct path + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } else if ipc_path.exists() && ipc_path.is_dir() { + // try subnode index inside path + let ipc_path = ipc_path.join(args.subnode_index.to_string()); + if is_ipc_socket_path(&ipc_path) { + // subnode indexed path exists + enable_network = false; + client_api_ipc_path = Some(ipc_path); + } + } + } + } + } + } + let mut client_api_network_addresses = None; + if enable_network { + let args_address = if let Some(args_address) = args.address { + match NamedSocketAddrs::try_from(args_address) { + Ok(v) => Some(v), + Err(e) => { + return Err(format!("Invalid server address: {}", e)); + } + } + } else { + None + }; + if let Some(address_arg) = args_address.or(settings.address.clone()) { + client_api_network_addresses = Some(address_arg.addrs); + } else if let Some(address) = settings.address.clone() { + client_api_network_addresses = Some(address.addrs.clone()); + } + } + + // Create command processor + debug!("Creating Command Processor "); + let comproc = command_processor::CommandProcessor::new(uisender, &settings); + + ui.set_command_processor(comproc.clone()); + + // Create client api client side + info!("Starting API connection"); + let capi = client_api_connection::ClientApiConnection::new(comproc.clone()); + + // Save client api in command processor + comproc.set_client_api_connection(capi.clone()); + + // Keep a connection to the server + if let Some(client_api_ipc_path) = client_api_ipc_path { + comproc.set_ipc_path(Some(client_api_ipc_path)); + } else if let Some(client_api_network_address) = client_api_network_addresses { + let network_addr = client_api_network_address.first().cloned(); + comproc.set_network_address(network_addr); + } else { + return Err("veilid-server could not be reached".to_owned()); + } + + let comproc2 = comproc.clone(); + let connection_future = comproc.connection_manager(); - // Start async - block_on(async move { // Start UI let ui_future = async move { ui.run_async().await; @@ -281,7 +333,6 @@ fn main() -> Result<(), String> { compile_error!("needs executor implementation") } } - }); - - Ok(()) + Ok(()) + }) } diff --git a/veilid-cli/src/ui.rs b/veilid-cli/src/ui.rs index 02682f37..75b80acc 100644 --- a/veilid-cli/src/ui.rs +++ b/veilid-cli/src/ui.rs @@ -1,6 +1,7 @@ use crate::command_processor::*; use crate::cursive_ui::CursiveUICallback; use crate::interactive_ui::InteractiveUICallback; +use crate::io_read_write_ui::IOReadWriteUICallback; use crate::tools::*; use flexi_logger::writers::LogWriter; use log::Level; @@ -8,6 +9,7 @@ use log::Level; pub enum UICallback { Cursive(CursiveUICallback), Interactive(InteractiveUICallback), + IOReadWrite(IOReadWriteUICallback), } pub trait UISender: Send { @@ -33,6 +35,7 @@ pub trait UISender: Send { fn set_config(&mut self, config: &json::JsonValue); fn set_connection_state(&mut self, state: ConnectionState); fn add_node_event(&self, log_color: Level, event: &str); + fn add_log_event(&self, log_color: Level, event: &str); } pub trait UI { diff --git a/veilid-core/Cargo.toml b/veilid-core/Cargo.toml index 62ddf4c0..5338758c 100644 --- a/veilid-core/Cargo.toml +++ b/veilid-core/Cargo.toml @@ -144,7 +144,7 @@ lz4_flex = { version = "0.11.1", default-features = false, features = [ # Tools config = { version = "0.13.4", features = ["yaml"] } -bugsalot = { package = "veilid-bugsalot", version = "0.1.0" } +bugsalot = { package = "veilid-bugsalot", version = "0.2.0" } chrono = "0.4.31" libc = "0.2.151" nix = "0.27.1" diff --git a/veilid-server/Cargo.toml b/veilid-server/Cargo.toml index bf13c130..03976bc1 100644 --- a/veilid-server/Cargo.toml +++ b/veilid-server/Cargo.toml @@ -76,7 +76,7 @@ futures-util = { version = "^0", default-features = false, features = [ url = "^2" ctrlc = "^3" lazy_static = "^1" -bugsalot = { package = "veilid-bugsalot", version = "0.1.0" } +bugsalot = { package = "veilid-bugsalot", version = "0.2.0" } flume = { version = "^0", features = ["async"] } rpassword = "^7" hostname = "^0"