This commit is contained in:
John Smith 2022-11-16 12:49:53 -05:00
parent 28c31fe424
commit 688995ed0d
17 changed files with 534 additions and 122 deletions

2
Cargo.lock generated
View File

@ -5551,6 +5551,7 @@ dependencies = [
"flexi_logger", "flexi_logger",
"futures", "futures",
"hex", "hex",
"json",
"log", "log",
"parking_lot 0.12.1", "parking_lot 0.12.1",
"serde", "serde",
@ -5714,6 +5715,7 @@ dependencies = [
"flume", "flume",
"futures-util", "futures-util",
"hostname", "hostname",
"json",
"lazy_static", "lazy_static",
"nix 0.25.0", "nix 0.25.0",
"opentelemetry", "opentelemetry",

View File

@ -43,7 +43,8 @@ flexi_logger = { version = "^0", features = ["use_chrono_for_offset"] }
thiserror = "^1" thiserror = "^1"
crossbeam-channel = "^0" crossbeam-channel = "^0"
hex = "^0" hex = "^0"
veilid-core = { path = "../veilid-core", default_features = false} veilid-core = { path = "../veilid-core", default_features = false }
json = "^0"
[dev-dependencies] [dev-dependencies]
serial_test = "^0" serial_test = "^0"

View File

@ -89,6 +89,9 @@ impl veilid_client::Server for VeilidClientImpl {
VeilidUpdate::Network(network) => { VeilidUpdate::Network(network) => {
self.comproc.update_network_status(network); self.comproc.update_network_status(network);
} }
VeilidUpdate::Config(config) => {
self.comproc.update_config(config);
}
VeilidUpdate::Shutdown => self.comproc.update_shutdown(), VeilidUpdate::Shutdown => self.comproc.update_shutdown(),
} }
@ -101,6 +104,7 @@ struct ClientApiConnectionInner {
connect_addr: Option<SocketAddr>, connect_addr: Option<SocketAddr>,
disconnector: Option<Disconnector<rpc_twoparty_capnp::Side>>, disconnector: Option<Disconnector<rpc_twoparty_capnp::Side>>,
server: Option<Rc<RefCell<veilid_server::Client>>>, server: Option<Rc<RefCell<veilid_server::Client>>>,
server_settings: Option<String>,
disconnect_requested: bool, disconnect_requested: bool,
cancel_eventual: Eventual, cancel_eventual: Eventual,
} }
@ -120,6 +124,7 @@ impl ClientApiConnection {
connect_addr: None, connect_addr: None,
disconnector: None, disconnector: None,
server: None, server: None,
server_settings: None,
disconnect_requested: false, disconnect_requested: false,
cancel_eventual: Eventual::new(), cancel_eventual: Eventual::new(),
})), })),
@ -141,7 +146,7 @@ impl ClientApiConnection {
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
inner.comproc.update_attachment(veilid_state.attachment); inner.comproc.update_attachment(veilid_state.attachment);
inner.comproc.update_network_status(veilid_state.network); inner.comproc.update_network_status(veilid_state.network);
inner.comproc.update_config(veilid_state.config);
Ok(()) Ok(())
} }
@ -209,6 +214,13 @@ impl ClientApiConnection {
.map_err(|e| format!("failed to get deserialize veilid state: {}", e))?; .map_err(|e| format!("failed to get deserialize veilid state: {}", e))?;
self.process_veilid_state(veilid_state).await?; self.process_veilid_state(veilid_state).await?;
// Save server settings
let server_settings = response
.get_settings()
.map_err(|e| format!("failed to get initial veilid server settings: {}", e))?
.to_owned();
self.inner.borrow_mut().server_settings = Some(server_settings.clone());
// Don't drop the registration, doing so will remove the client // Don't drop the registration, doing so will remove the client
// object mapping from the server which we need for the update backchannel // object mapping from the server which we need for the update backchannel
@ -219,9 +231,10 @@ impl ClientApiConnection {
res.map_err(|e| format!("client RPC system error: {}", e)) res.map_err(|e| format!("client RPC system error: {}", e))
} }
async fn handle_connection(&mut self) -> Result<(), String> { async fn handle_connection(&mut self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::handle_connection"); trace!("ClientApiConnection::handle_connection");
let connect_addr = self.inner.borrow().connect_addr.unwrap();
self.inner.borrow_mut().connect_addr = Some(connect_addr);
// Connect the TCP socket // Connect the TCP socket
let stream = TcpStream::connect(connect_addr) let stream = TcpStream::connect(connect_addr)
.await .await
@ -263,9 +276,11 @@ impl ClientApiConnection {
// Drop the server and disconnector too (if we still have it) // Drop the server and disconnector too (if we still have it)
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
let disconnect_requested = inner.disconnect_requested; let disconnect_requested = inner.disconnect_requested;
inner.server_settings = None;
inner.server = None; inner.server = None;
inner.disconnector = None; inner.disconnector = None;
inner.disconnect_requested = false; inner.disconnect_requested = false;
inner.connect_addr = None;
if !disconnect_requested { if !disconnect_requested {
// Connection lost // Connection lost
@ -456,9 +471,7 @@ impl ClientApiConnection {
pub async fn connect(&mut self, connect_addr: SocketAddr) -> Result<(), String> { pub async fn connect(&mut self, connect_addr: SocketAddr) -> Result<(), String> {
trace!("ClientApiConnection::connect"); trace!("ClientApiConnection::connect");
// Save the address to connect to // Save the address to connect to
self.inner.borrow_mut().connect_addr = Some(connect_addr); self.handle_connection(connect_addr).await
self.handle_connection().await
} }
// End Client API connection // End Client API connection
@ -469,7 +482,6 @@ impl ClientApiConnection {
Some(d) => { Some(d) => {
self.inner.borrow_mut().disconnect_requested = true; self.inner.borrow_mut().disconnect_requested = true;
d.await.unwrap(); d.await.unwrap();
self.inner.borrow_mut().connect_addr = None;
} }
None => { None => {
debug!("disconnector doesn't exist"); debug!("disconnector doesn't exist");

View File

@ -388,6 +388,7 @@ reply - reply to an AppCall not handled directly by the server
// called by client_api_connection // called by client_api_connection
// calls into ui // calls into ui
//////////////////////////////////////////// ////////////////////////////////////////////
pub fn update_attachment(&mut self, attachment: veilid_core::VeilidStateAttachment) { pub fn update_attachment(&mut self, attachment: veilid_core::VeilidStateAttachment) {
self.inner_mut().ui.set_attachment_state(attachment.state); self.inner_mut().ui.set_attachment_state(attachment.state);
} }
@ -400,6 +401,9 @@ reply - reply to an AppCall not handled directly by the server
network.peers, network.peers,
); );
} }
pub fn update_config(&mut self, config: veilid_core::VeilidStateConfig) {
self.inner_mut().ui.set_config(config.config)
}
pub fn update_log(&mut self, log: veilid_core::VeilidLog) { pub fn update_log(&mut self, log: veilid_core::VeilidLog) {
self.inner().ui.add_node_event(format!( self.inner().ui.add_node_event(format!(

View File

@ -55,6 +55,7 @@ struct UIState {
network_down_up: Dirty<(f32, f32)>, network_down_up: Dirty<(f32, f32)>,
connection_state: Dirty<ConnectionState>, connection_state: Dirty<ConnectionState>,
peers_state: Dirty<Vec<PeerTableData>>, peers_state: Dirty<Vec<PeerTableData>>,
node_id: Dirty<String>,
} }
impl UIState { impl UIState {
@ -65,6 +66,7 @@ impl UIState {
network_down_up: Dirty::new((0.0, 0.0)), network_down_up: Dirty::new((0.0, 0.0)),
connection_state: Dirty::new(ConnectionState::Disconnected), connection_state: Dirty::new(ConnectionState::Disconnected),
peers_state: Dirty::new(Vec::new()), peers_state: Dirty::new(Vec::new()),
node_id: Dirty::new("".to_owned()),
} }
} }
} }
@ -214,6 +216,11 @@ impl UI {
}); });
} }
fn node_events_panel(
s: &mut Cursive,
) -> ViewRef<Panel<ResizedView<NamedView<ScrollView<FlexiLoggerView>>>>> {
s.find_name("node-events-panel").unwrap()
}
fn command_line(s: &mut Cursive) -> ViewRef<EditView> { fn command_line(s: &mut Cursive) -> ViewRef<EditView> {
s.find_name("command-line").unwrap() s.find_name("command-line").unwrap()
} }
@ -572,6 +579,12 @@ impl UI {
} }
} }
fn refresh_main_titlebar(s: &mut Cursive) {
let mut main_window = UI::node_events_panel(s);
let inner = Self::inner_mut(s);
main_window.set_title(format!("Node: {}", inner.ui_state.node_id.get()));
}
fn refresh_statusbar(s: &mut Cursive) { fn refresh_statusbar(s: &mut Cursive) {
let mut statusbar = UI::status_bar(s); let mut statusbar = UI::status_bar(s);
@ -634,6 +647,7 @@ impl UI {
let mut refresh_button_attach = false; let mut refresh_button_attach = false;
let mut refresh_connection_dialog = false; let mut refresh_connection_dialog = false;
let mut refresh_peers = false; let mut refresh_peers = false;
let mut refresh_main_titlebar = false;
if inner.ui_state.attachment_state.take_dirty() { if inner.ui_state.attachment_state.take_dirty() {
refresh_statusbar = true; refresh_statusbar = true;
refresh_button_attach = true; refresh_button_attach = true;
@ -654,6 +668,9 @@ impl UI {
if inner.ui_state.peers_state.take_dirty() { if inner.ui_state.peers_state.take_dirty() {
refresh_peers = true; refresh_peers = true;
} }
if inner.ui_state.node_id.take_dirty() {
refresh_main_titlebar = true;
}
drop(inner); drop(inner);
@ -669,6 +686,9 @@ impl UI {
if refresh_peers { if refresh_peers {
Self::refresh_peers(s); Self::refresh_peers(s);
} }
if refresh_main_titlebar {
Self::refresh_main_titlebar(s);
}
} }
//////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////
@ -722,7 +742,8 @@ impl UI {
.full_screen(), .full_screen(),
) )
.title_position(HAlign::Left) .title_position(HAlign::Left)
.title("Node Events"); .title("Node Events")
.with_name("node-events-panel");
let peers_table_view = PeersTableView::new() let peers_table_view = PeersTableView::new()
.column(PeerTableColumn::NodeId, "Node Id", |c| c.width(43)) .column(PeerTableColumn::NodeId, "Node Id", |c| c.width(43))
@ -839,6 +860,16 @@ impl UI {
inner.ui_state.peers_state.set(peers); inner.ui_state.peers_state.set(peers);
let _ = inner.cb_sink.send(Box::new(UI::update_cb)); let _ = inner.cb_sink.send(Box::new(UI::update_cb));
} }
pub fn set_config(&mut self, config: VeilidConfigInner) {
let mut inner = self.inner.borrow_mut();
inner.ui_state.node_id.set(
config
.network
.node_id
.map(|x| x.encode())
.unwrap_or("<unknown>".to_owned()),
);
}
pub fn set_connection_state(&mut self, state: ConnectionState) { pub fn set_connection_state(&mut self, state: ConnectionState) {
let mut inner = self.inner.borrow_mut(); let mut inner = self.inner.borrow_mut();
inner.ui_state.connection_state.set(state); inner.ui_state.connection_state.set(state);

View File

@ -177,7 +177,7 @@ impl VeilidCoreContext {
// Set up config from callback // Set up config from callback
trace!("setup config with callback"); trace!("setup config with callback");
let mut config = VeilidConfig::new(); let mut config = VeilidConfig::new();
config.setup(config_callback)?; config.setup(config_callback, update_callback.clone())?;
Self::new_common(update_callback, config).await Self::new_common(update_callback, config).await
} }
@ -190,7 +190,7 @@ impl VeilidCoreContext {
// Set up config from callback // Set up config from callback
trace!("setup config with json"); trace!("setup config with json");
let mut config = VeilidConfig::new(); let mut config = VeilidConfig::new();
config.setup_from_json(config_json)?; config.setup_from_json(config_json, update_callback.clone())?;
Self::new_common(update_callback, config).await Self::new_common(update_callback, config).await
} }

View File

@ -271,6 +271,34 @@ impl RPCProcessor {
) )
} }
} }
#[instrument(level = "trace", skip_all, err)]
pub(crate) async fn process_private_route_first_hop(
&self,
operation: RoutedOperation,
sr_pubkey: DHTKey,
private_route: &PrivateRoute,
) -> Result<(), RPCError> {
let PrivateRouteHops::FirstHop(pr_first_hop) = &private_route.hops else {
return Err(RPCError::protocol("switching from safety route to private route requires first hop"));
};
// Switching to private route from safety route
self.process_route_private_route_hop(
operation,
pr_first_hop.node.clone(),
sr_pubkey,
PrivateRoute {
public_key: private_route.public_key,
hop_count: private_route.hop_count - 1,
hops: pr_first_hop
.next_hop
.clone()
.map(|rhd| PrivateRouteHops::Data(rhd))
.unwrap_or(PrivateRouteHops::Empty),
},
)
.await
}
#[instrument(level = "trace", skip(self, msg), err)] #[instrument(level = "trace", skip(self, msg), err)]
pub(crate) async fn process_route(&self, msg: RPCMessage) -> Result<(), RPCError> { pub(crate) async fn process_route(&self, msg: RPCMessage) -> Result<(), RPCError> {
@ -332,24 +360,11 @@ impl RPCProcessor {
decode_private_route(&pr_reader)? decode_private_route(&pr_reader)?
}; };
// Get the next hop node ref // Switching from full safety route to private route first hop
let PrivateRouteHops::FirstHop(pr_first_hop) = private_route.hops else { self.process_private_route_first_hop(
return Err(RPCError::protocol("switching from safety route to private route requires first hop"));
};
// Switching to private route from safety route
self.process_route_private_route_hop(
route.operation, route.operation,
pr_first_hop.node,
route.safety_route.public_key, route.safety_route.public_key,
PrivateRoute { &private_route,
public_key: private_route.public_key,
hop_count: private_route.hop_count - 1,
hops: pr_first_hop
.next_hop
.map(|rhd| PrivateRouteHops::Data(rhd))
.unwrap_or(PrivateRouteHops::Empty),
},
) )
.await?; .await?;
} else if blob_tag == 0 { } else if blob_tag == 0 {
@ -361,6 +376,7 @@ impl RPCProcessor {
decode_route_hop(&rh_reader)? decode_route_hop(&rh_reader)?
}; };
// Continue the full safety route with another hop
self.process_route_safety_route_hop(route, route_hop) self.process_route_safety_route_hop(route, route_hop)
.await?; .await?;
} else { } else {
@ -372,7 +388,13 @@ impl RPCProcessor {
// See if we have a hop, if not, we are at the end of the private route // See if we have a hop, if not, we are at the end of the private route
match &private_route.hops { match &private_route.hops {
PrivateRouteHops::FirstHop(_) => { PrivateRouteHops::FirstHop(_) => {
return Err(RPCError::protocol("should not have first hop here")); // Safety route was a stub, start with the beginning of the private route
self.process_private_route_first_hop(
route.operation,
route.safety_route.public_key,
private_route,
)
.await?;
} }
PrivateRouteHops::Data(route_hop_data) => { PrivateRouteHops::Data(route_hop_data) => {
// Decrypt the blob with DEC(nonce, DH(the PR's public key, this hop's secret) // Decrypt the blob with DEC(nonce, DH(the PR's public key, this hop's secret)

View File

@ -156,13 +156,12 @@ cfg_if! {
} }
} }
fn update_callback(update: VeilidUpdate) {
println!("update_callback: {:?}", update);
}
pub fn setup_veilid_core() -> (UpdateCallback, ConfigCallback) { pub fn setup_veilid_core() -> (UpdateCallback, ConfigCallback) {
( (Arc::new(update_callback), Arc::new(config_callback))
Arc::new(move |veilid_update: VeilidUpdate| {
println!("update_callback: {:?}", veilid_update);
}),
Arc::new(config_callback),
)
} }
fn config_callback(key: String) -> ConfigCallbackReturn { fn config_callback(key: String) -> ConfigCallbackReturn {
@ -268,7 +267,7 @@ fn config_callback(key: String) -> ConfigCallbackReturn {
pub fn get_config() -> VeilidConfig { pub fn get_config() -> VeilidConfig {
let mut vc = VeilidConfig::new(); let mut vc = VeilidConfig::new();
match vc.setup(Arc::new(config_callback)) { match vc.setup(Arc::new(config_callback), Arc::new(update_callback)) {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {
error!("Error: {}", e); error!("Error: {}", e);
@ -280,7 +279,7 @@ pub fn get_config() -> VeilidConfig {
pub async fn test_config() { pub async fn test_config() {
let mut vc = VeilidConfig::new(); let mut vc = VeilidConfig::new();
match vc.setup(Arc::new(config_callback)) { match vc.setup(Arc::new(config_callback), Arc::new(update_callback)) {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {
error!("Error: {}", e); error!("Error: {}", e);

View File

@ -319,6 +319,14 @@ pub struct VeilidStateNetwork {
pub peers: Vec<PeerTableData>, pub peers: Vec<PeerTableData>,
} }
#[derive(
Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize,
)]
#[archive_attr(repr(C), derive(CheckBytes))]
pub struct VeilidStateConfig {
pub config: VeilidConfigInner,
}
#[derive(Debug, Clone, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize)] #[derive(Debug, Clone, Serialize, Deserialize, RkyvArchive, RkyvSerialize, RkyvDeserialize)]
#[archive_attr(repr(u8), derive(CheckBytes))] #[archive_attr(repr(u8), derive(CheckBytes))]
#[serde(tag = "kind")] #[serde(tag = "kind")]
@ -328,6 +336,7 @@ pub enum VeilidUpdate {
AppCall(VeilidAppCall), AppCall(VeilidAppCall),
Attachment(VeilidStateAttachment), Attachment(VeilidStateAttachment),
Network(VeilidStateNetwork), Network(VeilidStateNetwork),
Config(VeilidStateConfig),
Shutdown, Shutdown,
} }
@ -336,6 +345,7 @@ pub enum VeilidUpdate {
pub struct VeilidState { pub struct VeilidState {
pub attachment: VeilidStateAttachment, pub attachment: VeilidStateAttachment,
pub network: VeilidStateNetwork, pub network: VeilidStateNetwork,
pub config: VeilidStateConfig,
} }
///////////////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////////////
@ -2679,13 +2689,16 @@ impl VeilidAPI {
pub async fn get_state(&self) -> Result<VeilidState, VeilidAPIError> { pub async fn get_state(&self) -> Result<VeilidState, VeilidAPIError> {
let attachment_manager = self.attachment_manager()?; let attachment_manager = self.attachment_manager()?;
let network_manager = attachment_manager.network_manager(); let network_manager = attachment_manager.network_manager();
let config = self.config()?;
let attachment = attachment_manager.get_veilid_state(); let attachment = attachment_manager.get_veilid_state();
let network = network_manager.get_veilid_state(); let network = network_manager.get_veilid_state();
let config = config.get_veilid_state();
Ok(VeilidState { Ok(VeilidState {
attachment, attachment,
network, network,
config,
}) })
} }

View File

@ -1,5 +1,6 @@
use crate::xx::*; use crate::xx::*;
use crate::*; use crate::*;
use rkyv::{Archive as RkyvArchive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
use serde::*; use serde::*;
//////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////
@ -16,7 +17,18 @@ pub type ConfigCallback = Arc<dyn Fn(String) -> ConfigCallbackReturn + Send + Sy
/// url: 'https://localhost:5150' /// url: 'https://localhost:5150'
/// ``` /// ```
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigHTTPS { pub struct VeilidConfigHTTPS {
pub enabled: bool, pub enabled: bool,
pub listen_address: String, pub listen_address: String,
@ -34,7 +46,18 @@ pub struct VeilidConfigHTTPS {
/// url: 'https://localhost:5150' /// url: 'https://localhost:5150'
/// ``` /// ```
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigHTTP { pub struct VeilidConfigHTTP {
pub enabled: bool, pub enabled: bool,
pub listen_address: String, pub listen_address: String,
@ -48,7 +71,18 @@ pub struct VeilidConfigHTTP {
/// ///
/// To be implemented... /// To be implemented...
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigApplication { pub struct VeilidConfigApplication {
pub https: VeilidConfigHTTPS, pub https: VeilidConfigHTTPS,
pub http: VeilidConfigHTTP, pub http: VeilidConfigHTTP,
@ -64,7 +98,18 @@ pub struct VeilidConfigApplication {
/// public_address: '' /// public_address: ''
/// ``` /// ```
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigUDP { pub struct VeilidConfigUDP {
pub enabled: bool, pub enabled: bool,
pub socket_pool_size: u32, pub socket_pool_size: u32,
@ -82,7 +127,18 @@ pub struct VeilidConfigUDP {
/// listen_address: ':5150' /// listen_address: ':5150'
/// public_address: '' /// public_address: ''
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigTCP { pub struct VeilidConfigTCP {
pub connect: bool, pub connect: bool,
pub listen: bool, pub listen: bool,
@ -102,7 +158,18 @@ pub struct VeilidConfigTCP {
/// path: 'ws' /// path: 'ws'
/// url: 'ws://localhost:5150/ws' /// url: 'ws://localhost:5150/ws'
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigWS { pub struct VeilidConfigWS {
pub connect: bool, pub connect: bool,
pub listen: bool, pub listen: bool,
@ -123,7 +190,18 @@ pub struct VeilidConfigWS {
/// path: 'ws' /// path: 'ws'
/// url: '' /// url: ''
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigWSS { pub struct VeilidConfigWSS {
pub connect: bool, pub connect: bool,
pub listen: bool, pub listen: bool,
@ -140,7 +218,18 @@ pub struct VeilidConfigWSS {
/// All protocols are available by default, and the Veilid node will /// All protocols are available by default, and the Veilid node will
/// sort out which protocol is used for each peer connection. /// sort out which protocol is used for each peer connection.
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigProtocol { pub struct VeilidConfigProtocol {
pub udp: VeilidConfigUDP, pub udp: VeilidConfigUDP,
pub tcp: VeilidConfigTCP, pub tcp: VeilidConfigTCP,
@ -156,7 +245,18 @@ pub struct VeilidConfigProtocol {
/// private_key_path: /path/to/private/key /// private_key_path: /path/to/private/key
/// connection_initial_timeout_ms: 2000 /// connection_initial_timeout_ms: 2000
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigTLS { pub struct VeilidConfigTLS {
pub certificate_path: String, pub certificate_path: String,
pub private_key_path: String, pub private_key_path: String,
@ -165,7 +265,18 @@ pub struct VeilidConfigTLS {
/// Configure the Distributed Hash Table (DHT) /// Configure the Distributed Hash Table (DHT)
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigDHT { pub struct VeilidConfigDHT {
pub resolve_node_timeout_ms: Option<u32>, pub resolve_node_timeout_ms: Option<u32>,
pub resolve_node_count: u32, pub resolve_node_count: u32,
@ -184,7 +295,18 @@ pub struct VeilidConfigDHT {
/// Configure RPC /// Configure RPC
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigRPC { pub struct VeilidConfigRPC {
pub concurrency: u32, pub concurrency: u32,
pub queue_size: u32, pub queue_size: u32,
@ -197,7 +319,18 @@ pub struct VeilidConfigRPC {
/// Configure the network routing table /// Configure the network routing table
/// ///
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigRoutingTable { pub struct VeilidConfigRoutingTable {
pub limit_over_attached: u32, pub limit_over_attached: u32,
pub limit_fully_attached: u32, pub limit_fully_attached: u32,
@ -206,7 +339,18 @@ pub struct VeilidConfigRoutingTable {
pub limit_attached_weak: u32, pub limit_attached_weak: u32,
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigNetwork { pub struct VeilidConfigNetwork {
pub connection_initial_timeout_ms: u32, pub connection_initial_timeout_ms: u32,
pub connection_inactivity_timeout_ms: u32, pub connection_inactivity_timeout_ms: u32,
@ -233,19 +377,52 @@ pub struct VeilidConfigNetwork {
pub protocol: VeilidConfigProtocol, pub protocol: VeilidConfigProtocol,
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigTableStore { pub struct VeilidConfigTableStore {
pub directory: String, pub directory: String,
pub delete: bool, pub delete: bool,
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigBlockStore { pub struct VeilidConfigBlockStore {
pub directory: String, pub directory: String,
pub delete: bool, pub delete: bool,
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigProtectedStore { pub struct VeilidConfigProtectedStore {
pub allow_insecure_fallback: bool, pub allow_insecure_fallback: bool,
pub always_use_insecure_storage: bool, pub always_use_insecure_storage: bool,
@ -253,7 +430,18 @@ pub struct VeilidConfigProtectedStore {
pub delete: bool, pub delete: bool,
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigCapabilities { pub struct VeilidConfigCapabilities {
pub protocol_udp: bool, pub protocol_udp: bool,
pub protocol_connect_tcp: bool, pub protocol_connect_tcp: bool,
@ -264,7 +452,18 @@ pub struct VeilidConfigCapabilities {
pub protocol_accept_wss: bool, pub protocol_accept_wss: bool,
} }
#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, Deserialize)] #[derive(
Clone,
Copy,
PartialEq,
Eq,
Debug,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub enum VeilidConfigLogLevel { pub enum VeilidConfigLogLevel {
Off, Off,
Error, Error,
@ -322,7 +521,18 @@ impl Default for VeilidConfigLogLevel {
} }
} }
#[derive(Default, Debug, Clone, Serialize, Deserialize)] #[derive(
Default,
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
RkyvArchive,
RkyvSerialize,
RkyvDeserialize,
)]
pub struct VeilidConfigInner { pub struct VeilidConfigInner {
pub program_name: String, pub program_name: String,
pub namespace: String, pub namespace: String,
@ -338,6 +548,7 @@ pub struct VeilidConfigInner {
/// Veilid is configured /// Veilid is configured
#[derive(Clone)] #[derive(Clone)]
pub struct VeilidConfig { pub struct VeilidConfig {
update_cb: Option<UpdateCallback>,
inner: Arc<RwLock<VeilidConfigInner>>, inner: Arc<RwLock<VeilidConfigInner>>,
} }
@ -362,23 +573,29 @@ impl VeilidConfig {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
update_cb: None,
inner: Arc::new(RwLock::new(Self::new_inner())), inner: Arc::new(RwLock::new(Self::new_inner())),
} }
} }
pub fn setup_from_json(&mut self, config: String) -> Result<(), VeilidAPIError> { pub fn setup_from_json(
{ &mut self,
let mut inner = self.inner.write(); config: String,
update_cb: UpdateCallback,
) -> Result<(), VeilidAPIError> {
self.update_cb = Some(update_cb);
self.with_mut(|inner| {
*inner = serde_json::from_str(&config).map_err(VeilidAPIError::generic)?; *inner = serde_json::from_str(&config).map_err(VeilidAPIError::generic)?;
} Ok(())
})
// Validate settings
self.validate()?;
Ok(())
} }
pub fn setup(&mut self, cb: ConfigCallback) -> Result<(), VeilidAPIError> { pub fn setup(
&mut self,
cb: ConfigCallback,
update_cb: UpdateCallback,
) -> Result<(), VeilidAPIError> {
macro_rules! get_config { macro_rules! get_config {
($key:expr) => { ($key:expr) => {
let keyname = &stringify!($key)[6..]; let keyname = &stringify!($key)[6..];
@ -389,8 +606,9 @@ impl VeilidConfig {
})?; })?;
}; };
} }
{
let mut inner = self.inner.write(); self.update_cb = Some(update_cb);
self.with_mut(|inner| {
get_config!(inner.program_name); get_config!(inner.program_name);
get_config!(inner.namespace); get_config!(inner.namespace);
get_config!(inner.capabilities.protocol_udp); get_config!(inner.capabilities.protocol_udp);
@ -482,19 +700,44 @@ impl VeilidConfig {
get_config!(inner.network.protocol.wss.listen_address); get_config!(inner.network.protocol.wss.listen_address);
get_config!(inner.network.protocol.wss.path); get_config!(inner.network.protocol.wss.path);
get_config!(inner.network.protocol.wss.url); get_config!(inner.network.protocol.wss.url);
} Ok(())
// Validate settings })
self.validate()?; }
Ok(()) pub fn get_veilid_state(&self) -> VeilidStateConfig {
let inner = self.inner.read();
VeilidStateConfig {
config: inner.clone(),
}
} }
pub fn get(&self) -> RwLockReadGuard<VeilidConfigInner> { pub fn get(&self) -> RwLockReadGuard<VeilidConfigInner> {
self.inner.read() self.inner.read()
} }
pub fn get_mut(&self) -> RwLockWriteGuard<VeilidConfigInner> { pub fn with_mut<F, R>(&self, f: F) -> Result<R, VeilidAPIError>
self.inner.write() where
F: FnOnce(&mut VeilidConfigInner) -> Result<R, VeilidAPIError>,
{
let (out, config) = {
let inner = &mut *self.inner.write();
// Edit a copy
let mut editedinner = inner.clone();
// Make changes
let out = f(&mut editedinner)?;
// Validate
Self::validate(&mut editedinner)?;
// Commit changes
*inner = editedinner.clone();
(out, editedinner)
};
// Send configuration update to clients
if let Some(update_cb) = &self.update_cb {
update_cb(VeilidUpdate::Config(VeilidStateConfig { config }));
}
Ok(out)
} }
pub fn get_key_json(&self, key: &str) -> Result<String, VeilidAPIError> { pub fn get_key_json(&self, key: &str) -> Result<String, VeilidAPIError> {
@ -521,47 +764,43 @@ impl VeilidConfig {
} }
} }
pub fn set_key_json(&self, key: &str, value: &str) -> Result<(), VeilidAPIError> { pub fn set_key_json(&self, key: &str, value: &str) -> Result<(), VeilidAPIError> {
let mut c = self.get_mut(); self.with_mut(|c| {
// Split key into path parts
let keypath: Vec<&str> = key.split('.').collect();
// Split key into path parts // Convert value into jsonvalue
let keypath: Vec<&str> = key.split('.').collect(); let newval = json::parse(value).map_err(VeilidAPIError::generic)?;
// Convert value into jsonvalue // Generate json from whole config
let newval = json::parse(value).map_err(VeilidAPIError::generic)?; let jc = serde_json::to_string(&*c).map_err(VeilidAPIError::generic)?;
let mut jvc = json::parse(&jc).map_err(VeilidAPIError::generic)?;
// Generate json from whole config // Find requested subkey
let jc = serde_json::to_string(&*c).map_err(VeilidAPIError::generic)?; let newconfigstring = if let Some((objkeyname, objkeypath)) = keypath.split_last() {
let mut jvc = json::parse(&jc).map_err(VeilidAPIError::generic)?; // Replace subkey
let mut out = &mut jvc;
// Find requested subkey for k in objkeypath {
let newconfigstring = if let Some((objkeyname, objkeypath)) = keypath.split_last() { if !out.has_key(*k) {
// Replace subkey apibail_parse!(format!("invalid subkey in key '{}'", key), k);
let mut out = &mut jvc; }
for k in objkeypath { out = &mut out[*k];
if !out.has_key(*k) {
apibail_parse!(format!("invalid subkey in key '{}'", key), k);
} }
out = &mut out[*k]; if !out.has_key(objkeyname) {
} apibail_parse!(format!("invalid subkey in key '{}'", key), objkeyname);
if !out.has_key(objkeyname) { }
apibail_parse!(format!("invalid subkey in key '{}'", key), objkeyname); out[*objkeyname] = newval;
} jvc.to_string()
out[*objkeyname] = newval; } else {
jvc.to_string() newval.to_string()
} else { };
newval.to_string()
}; // Generate new config
// Generate and validate new config *c = serde_json::from_str(&newconfigstring).map_err(VeilidAPIError::generic)?;
let mut newconfig = VeilidConfig::new(); Ok(())
newconfig.setup_from_json(newconfigstring)?; })
// Replace whole config
*c = newconfig.get().clone();
Ok(())
} }
fn validate(&self) -> Result<(), VeilidAPIError> { fn validate(inner: &VeilidConfigInner) -> Result<(), VeilidAPIError> {
let inner = self.inner.read();
if inner.program_name.is_empty() { if inner.program_name.is_empty() {
apibail_generic!("Program name must not be empty in 'program_name'"); apibail_generic!("Program name must not be empty in 'program_name'");
} }
@ -731,8 +970,11 @@ impl VeilidConfig {
.await .await
.map_err(VeilidAPIError::internal)?; .map_err(VeilidAPIError::internal)?;
self.inner.write().network.node_id = Some(node_id); self.with_mut(|c| {
self.inner.write().network.node_id_secret = Some(node_id_secret); c.network.node_id = Some(node_id);
c.network.node_id_secret = Some(node_id_secret);
Ok(())
})?;
trace!("init_node_id complete"); trace!("init_node_id complete");

View File

@ -1262,6 +1262,10 @@ abstract class VeilidUpdate {
{ {
return VeilidUpdateNetwork(state: VeilidStateNetwork.fromJson(json)); return VeilidUpdateNetwork(state: VeilidStateNetwork.fromJson(json));
} }
case "Config":
{
return VeilidUpdateConfig(state: VeilidStateConfig.fromJson(json));
}
default: default:
{ {
throw VeilidAPIExceptionInternal( throw VeilidAPIExceptionInternal(
@ -1363,6 +1367,19 @@ class VeilidUpdateNetwork implements VeilidUpdate {
} }
} }
class VeilidUpdateConfig implements VeilidUpdate {
final VeilidStateConfig state;
//
VeilidUpdateConfig({required this.state});
@override
Map<String, dynamic> get json {
var jsonRep = state.json;
jsonRep['kind'] = "Config";
return jsonRep;
}
}
////////////////////////////////////// //////////////////////////////////////
/// VeilidStateAttachment /// VeilidStateAttachment
@ -1413,19 +1430,43 @@ class VeilidStateNetwork {
} }
} }
//////////////////////////////////////
/// VeilidStateConfig
class VeilidStateConfig {
final Map<String, dynamic> config;
VeilidStateConfig({
required this.config,
});
VeilidStateConfig.fromJson(Map<String, dynamic> json)
: config = jsonDecode(json['config']);
Map<String, dynamic> get json {
return {'config': jsonEncode(config)};
}
}
////////////////////////////////////// //////////////////////////////////////
/// VeilidState /// VeilidState
class VeilidState { class VeilidState {
final VeilidStateAttachment attachment; final VeilidStateAttachment attachment;
final VeilidStateNetwork network; final VeilidStateNetwork network;
final VeilidStateConfig config;
VeilidState.fromJson(Map<String, dynamic> json) VeilidState.fromJson(Map<String, dynamic> json)
: attachment = VeilidStateAttachment.fromJson(json['attachment']), : attachment = VeilidStateAttachment.fromJson(json['attachment']),
network = VeilidStateNetwork.fromJson(json['network']); network = VeilidStateNetwork.fromJson(json['network']),
config = VeilidStateConfig.fromJson(json['config']);
Map<String, dynamic> get json { Map<String, dynamic> get json {
return {'attachment': attachment.json, 'network': network.json}; return {
'attachment': attachment.json,
'network': network.json,
'config': config.json
};
} }
} }

View File

@ -18,7 +18,7 @@ final _path = Platform.isWindows
: Platform.isMacOS : Platform.isMacOS
? 'lib$_base.dylib' ? 'lib$_base.dylib'
: 'lib$_base.so'; : 'lib$_base.so';
late final _dylib = final _dylib =
Platform.isIOS ? DynamicLibrary.process() : DynamicLibrary.open(_path); Platform.isIOS ? DynamicLibrary.process() : DynamicLibrary.open(_path);
// Linkage for initialization // Linkage for initialization

View File

@ -44,6 +44,7 @@ cfg-if = "^1"
serde = "^1" serde = "^1"
serde_derive = "^1" serde_derive = "^1"
serde_yaml = "^0" serde_yaml = "^0"
json = "^0"
futures-util = { version = "^0", default_features = false, features = ["alloc"] } futures-util = { version = "^0", default_features = false, features = ["alloc"] }
url = "^2" url = "^2"
ctrlc = "^3" ctrlc = "^3"

View File

@ -10,7 +10,7 @@ struct ApiResult @0x8111724bdb812929 {
interface Registration @0xdd45f30a7c22e391 {} interface Registration @0xdd45f30a7c22e391 {}
interface VeilidServer @0xcb2c699f14537f94 { interface VeilidServer @0xcb2c699f14537f94 {
register @0 (veilidClient :VeilidClient) -> (registration :Registration, state :Text); register @0 (veilidClient :VeilidClient) -> (registration :Registration, state :Text, settings :Text);
debug @1 (command :Text) -> (result :ApiResult); debug @1 (command :Text) -> (result :ApiResult);
attach @2 () -> (result :ApiResult); attach @2 () -> (result :ApiResult);
detach @3 () -> (result :ApiResult); detach @3 () -> (result :ApiResult);

View File

@ -1,3 +1,4 @@
use crate::settings::*;
use crate::tools::*; use crate::tools::*;
use crate::veilid_client_capnp::*; use crate::veilid_client_capnp::*;
use crate::veilid_logs::VeilidLogs; use crate::veilid_logs::VeilidLogs;
@ -81,18 +82,24 @@ impl registration::Server for RegistrationImpl {}
struct VeilidServerImpl { struct VeilidServerImpl {
veilid_api: veilid_core::VeilidAPI, veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs, veilid_logs: VeilidLogs,
settings: Settings,
next_id: u64, next_id: u64,
pub registration_map: Rc<RefCell<RegistrationMap>>, pub registration_map: Rc<RefCell<RegistrationMap>>,
} }
impl VeilidServerImpl { impl VeilidServerImpl {
#[instrument(level = "trace", skip_all)] #[instrument(level = "trace", skip_all)]
pub fn new(veilid_api: veilid_core::VeilidAPI, veilid_logs: VeilidLogs) -> Self { pub fn new(
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
settings: Settings,
) -> Self {
Self { Self {
next_id: 0, next_id: 0,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())), registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
veilid_api, veilid_api,
veilid_logs, veilid_logs,
settings,
} }
} }
} }
@ -115,6 +122,7 @@ impl veilid_server::Server for VeilidServerImpl {
); );
let veilid_api = self.veilid_api.clone(); let veilid_api = self.veilid_api.clone();
let settings = self.settings.clone();
let registration = capnp_rpc::new_client(RegistrationImpl::new( let registration = capnp_rpc::new_client(RegistrationImpl::new(
self.next_id, self.next_id,
self.registration_map.clone(), self.registration_map.clone(),
@ -132,6 +140,14 @@ impl veilid_server::Server for VeilidServerImpl {
res.set_registration(registration); res.set_registration(registration);
res.set_state(&state); res.set_state(&state);
let settings = &*settings.read();
let settings_json_string = serialize_json(settings);
let mut settings_json = json::parse(&settings_json_string)
.map_err(|e| ::capnp::Error::failed(format!("{:?}", e)))?;
settings_json["core"]["network"].remove("node_id_secret");
let safe_settings_json = settings_json.to_string();
res.set_settings(&safe_settings_json);
Ok(()) Ok(())
}) })
} }
@ -265,6 +281,7 @@ type ClientApiAllFuturesJoinHandle =
struct ClientApiInner { struct ClientApiInner {
veilid_api: veilid_core::VeilidAPI, veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs, veilid_logs: VeilidLogs,
settings: Settings,
registration_map: Rc<RefCell<RegistrationMap>>, registration_map: Rc<RefCell<RegistrationMap>>,
stop: Option<StopSource>, stop: Option<StopSource>,
join_handle: Option<ClientApiAllFuturesJoinHandle>, join_handle: Option<ClientApiAllFuturesJoinHandle>,
@ -276,11 +293,16 @@ pub struct ClientApi {
impl ClientApi { impl ClientApi {
#[instrument(level = "trace", skip_all)] #[instrument(level = "trace", skip_all)]
pub fn new(veilid_api: veilid_core::VeilidAPI, veilid_logs: VeilidLogs) -> Rc<Self> { pub fn new(
veilid_api: veilid_core::VeilidAPI,
veilid_logs: VeilidLogs,
settings: Settings,
) -> Rc<Self> {
Rc::new(Self { Rc::new(Self {
inner: RefCell::new(ClientApiInner { inner: RefCell::new(ClientApiInner {
veilid_api, veilid_api,
veilid_logs, veilid_logs,
settings,
registration_map: Rc::new(RefCell::new(RegistrationMap::new())), registration_map: Rc::new(RefCell::new(RegistrationMap::new())),
stop: Some(StopSource::new()), stop: Some(StopSource::new()),
join_handle: None, join_handle: None,
@ -427,6 +449,7 @@ impl ClientApi {
let veilid_server_impl = VeilidServerImpl::new( let veilid_server_impl = VeilidServerImpl::new(
self.inner.borrow().veilid_api.clone(), self.inner.borrow().veilid_api.clone(),
self.inner.borrow().veilid_logs.clone(), self.inner.borrow().veilid_logs.clone(),
self.inner.borrow().settings.clone(),
); );
self.inner.borrow_mut().registration_map = veilid_server_impl.registration_map.clone(); self.inner.borrow_mut().registration_map = veilid_server_impl.registration_map.clone();

View File

@ -301,6 +301,7 @@ pub fn process_command_line() -> EyreResult<(Settings, ArgMatches)> {
settingsrw.core.network.bootstrap_nodes = bootstrap_list; settingsrw.core.network.bootstrap_nodes = bootstrap_list;
} }
#[cfg(feature = "rt-tokio")]
if matches.occurrences_of("console") != 0 { if matches.occurrences_of("console") != 0 {
settingsrw.logging.console.enabled = true; settingsrw.logging.console.enabled = true;
} }

View File

@ -4,6 +4,8 @@ use crate::tools::*;
use crate::veilid_logs::*; use crate::veilid_logs::*;
use crate::*; use crate::*;
use flume::{unbounded, Receiver, Sender}; use flume::{unbounded, Receiver, Sender};
use futures_util::select;
use futures_util::FutureExt;
use lazy_static::*; use lazy_static::*;
use parking_lot::Mutex; use parking_lot::Mutex;
use std::sync::Arc; use std::sync::Arc;
@ -70,7 +72,8 @@ pub async fn run_veilid_server_internal(
// Start client api if one is requested // Start client api if one is requested
let mut capi = if settingsr.client_api.enabled && matches!(server_mode, ServerMode::Normal) { let mut capi = if settingsr.client_api.enabled && matches!(server_mode, ServerMode::Normal) {
let some_capi = client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone()); let some_capi =
client_api::ClientApi::new(veilid_api.clone(), veilid_logs.clone(), settings.clone());
some_capi some_capi
.clone() .clone()
.run(settingsr.client_api.listen_address.addrs.clone()); .run(settingsr.client_api.listen_address.addrs.clone());
@ -85,12 +88,29 @@ pub async fn run_veilid_server_internal(
// Process all updates // Process all updates
let capi2 = capi.clone(); let capi2 = capi.clone();
let mut shutdown_switch = {
let shutdown_switch_locked = SHUTDOWN_SWITCH.lock();
(*shutdown_switch_locked).as_ref().map(|ss| ss.instance())
}
.unwrap()
.fuse();
let update_receiver_jh = spawn_local(async move { let update_receiver_jh = spawn_local(async move {
while let Ok(change) = receiver.recv_async().await { loop {
if let Some(capi) = &capi2 { select! {
// Handle state changes on main thread for capnproto rpc res = receiver.recv_async() => {
capi.clone().handle_update(change); if let Ok(change) = res {
} if let Some(capi) = &capi2 {
// Handle state changes on main thread for capnproto rpc
capi.clone().handle_update(change);
}
} else {
break;
}
}
_ = shutdown_switch => {
break;
}
};
} }
}); });