diff --git a/src/rust/lqosd/src/node_manager/ws/single_user_channels/cake_watcher.rs b/src/rust/lqosd/src/node_manager/ws/single_user_channels/cake_watcher.rs index 4b33f1cb..5384bfdd 100644 --- a/src/rust/lqosd/src/node_manager/ws/single_user_channels/cake_watcher.rs +++ b/src/rust/lqosd/src/node_manager/ws/single_user_channels/cake_watcher.rs @@ -18,10 +18,11 @@ pub(super) async fn cake_watcher(circuit_id: String, tx: tokio::sync::mpsc::Send match get_raw_circuit_data(&circuit_id) { lqos_bus::BusResponse::RawQueueData(Some(msg)) => { - let json = serde_json::to_string(&QueueStoreTransit::from(*msg)).unwrap(); - let send_result = tx.send(json.to_string()).await; - if send_result.is_err() { - break; + if let Ok(json) = serde_json::to_string(&QueueStoreTransit::from(*msg)) { + let send_result = tx.send(json.to_string()).await; + if send_result.is_err() { + break; + } } } _ => {} diff --git a/src/rust/lqosd/src/node_manager/ws/single_user_channels/circuit.rs b/src/rust/lqosd/src/node_manager/ws/single_user_channels/circuit.rs index 4d573e93..9d002e4c 100644 --- a/src/rust/lqosd/src/node_manager/ws/single_user_channels/circuit.rs +++ b/src/rust/lqosd/src/node_manager/ws/single_user_channels/circuit.rs @@ -32,10 +32,11 @@ pub(super) async fn circuit_watcher(circuit: String, tx: tokio::sync::mpsc::Send devices: devices_for_circuit, }; - let message = serde_json::to_string(&result).unwrap(); - if let Err(_) = tx.send(message.to_string()).await { - log::info!("Channel is gone"); - break; + if let Ok(message) = serde_json::to_string(&result) { + if let Err(_) = tx.send(message.to_string()).await { + log::info!("Channel is gone"); + break; + } } } } \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/ws/single_user_channels/flows_by_circuit.rs b/src/rust/lqosd/src/node_manager/ws/single_user_channels/flows_by_circuit.rs index 2b77fc7f..d55e473e 100644 --- a/src/rust/lqosd/src/node_manager/ws/single_user_channels/flows_by_circuit.rs +++ b/src/rust/lqosd/src/node_manager/ws/single_user_channels/flows_by_circuit.rs @@ -9,74 +9,75 @@ use crate::throughput_tracker::flow_data::{ALL_FLOWS, FlowAnalysis, FlowbeeLocal const FIVE_MINUTES_AS_NANOS: u64 = 300 * 1_000_000_000; fn recent_flows_by_circuit(circuit_id: &str) -> Vec<(FlowbeeKeyTransit, FlowbeeLocalData, FlowAnalysis)> { - let device_reader = SHAPED_DEVICES.read().unwrap(); + if let Ok(device_reader) = SHAPED_DEVICES.read() { + if let Ok(now) = time_since_boot() { + let now_as_nanos = Duration::from(now).as_nanos() as u64; + let five_minutes_ago = now_as_nanos - FIVE_MINUTES_AS_NANOS; - if let Ok(now) = time_since_boot() { - let now_as_nanos = Duration::from(now).as_nanos() as u64; - let five_minutes_ago = now_as_nanos - FIVE_MINUTES_AS_NANOS; + if let Ok(all_flows) = ALL_FLOWS.lock() { + let result: Vec<(FlowbeeKeyTransit, FlowbeeLocalData, FlowAnalysis)> = all_flows + .iter() + .filter_map(|(key, (local, analysis))| { + // Don't show older flows + if local.last_seen < five_minutes_ago { + return None; + } - let all_flows = ALL_FLOWS.lock().unwrap(); - let result: Vec<(FlowbeeKeyTransit, FlowbeeLocalData, FlowAnalysis)> = all_flows - .iter() - .filter_map(|(key, (local, analysis))| { - // Don't show older flows - if local.last_seen < five_minutes_ago { - return None; - } + // Don't show flows that don't belong to the circuit + let local_ip_str; // Using late binding + let remote_ip_str; + let device_name; + let asn_name; + let asn_country; + let local_ip = match key.local_ip.as_ip() { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + let remote_ip = match key.remote_ip.as_ip() { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + if let Some(device) = device_reader.trie.longest_match(local_ip) { + // The normal way around + local_ip_str = key.local_ip.to_string(); + remote_ip_str = key.remote_ip.to_string(); + let device = &device_reader.devices[*device.1]; + if device.circuit_id != circuit_id { + return None; + } + device_name = device.device_name.clone(); + (asn_name, asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); + } else if let Some(device) = device_reader.trie.longest_match(remote_ip) { + // The reverse way around + local_ip_str = key.remote_ip.to_string(); + remote_ip_str = key.local_ip.to_string(); + let device = &device_reader.devices[*device.1]; + if device.circuit_id != circuit_id { + return None; + } + device_name = device.device_name.clone(); + (asn_name, asn_country) = get_asn_name_and_country(key.local_ip.as_ip()); + } else { + return None; + } - // Don't show flows that don't belong to the circuit - let local_ip_str ; // Using late binding - let remote_ip_str ; - let device_name ; - let asn_name ; - let asn_country ; - let local_ip = match key.local_ip.as_ip() { - IpAddr::V4(ip) => ip.to_ipv6_mapped(), - IpAddr::V6(ip) => ip, - }; - let remote_ip = match key.remote_ip.as_ip() { - IpAddr::V4(ip) => ip.to_ipv6_mapped(), - IpAddr::V6(ip) => ip, - }; - if let Some(device) = device_reader.trie.longest_match(local_ip) { - // The normal way around - local_ip_str = key.local_ip.to_string(); - remote_ip_str = key.remote_ip.to_string(); - let device = &device_reader.devices[*device.1]; - if device.circuit_id != circuit_id { - return None; - } - device_name = device.device_name.clone(); - (asn_name, asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); - } else if let Some(device) = device_reader.trie.longest_match(remote_ip) { - // The reverse way around - local_ip_str = key.remote_ip.to_string(); - remote_ip_str = key.local_ip.to_string(); - let device = &device_reader.devices[*device.1]; - if device.circuit_id != circuit_id { - return None; - } - device_name = device.device_name.clone(); - (asn_name, asn_country) = get_asn_name_and_country(key.local_ip.as_ip()); - } else { - return None; - } - - Some((FlowbeeKeyTransit { - remote_ip: remote_ip_str, - local_ip: local_ip_str, - src_port: key.src_port, - dst_port: key.dst_port, - ip_protocol: key.ip_protocol, - device_name, - asn_name, - asn_country, - protocol_name: analysis.protocol_analysis.to_string(), - last_seen_nanos: now_as_nanos.saturating_sub(local.last_seen), - }, local.clone(), analysis.clone())) - }) - .collect(); - return result; + Some((FlowbeeKeyTransit { + remote_ip: remote_ip_str, + local_ip: local_ip_str, + src_port: key.src_port, + dst_port: key.dst_port, + ip_protocol: key.ip_protocol, + device_name, + asn_name, + asn_country, + protocol_name: analysis.protocol_analysis.to_string(), + last_seen_nanos: now_as_nanos.saturating_sub(local.last_seen), + }, local.clone(), analysis.clone())) + }) + .collect(); + return result; + } + } } Vec::new() } @@ -127,10 +128,11 @@ pub(super) async fn flows_by_circuit(circuit: String, tx: tokio::sync::mpsc::Sen circuit_id: circuit.clone(), flows, }; - let message = serde_json::to_string(&result).unwrap(); - if let Err(_) = tx.send(message).await { - log::info!("Channel is gone"); - break; + if let Ok(message) = serde_json::to_string(&result) { + if let Err(_) = tx.send(message).await { + log::debug!("Channel is gone"); + break; + } } } diff --git a/src/rust/lqosd/src/node_manager/ws/single_user_channels/ping_monitor.rs b/src/rust/lqosd/src/node_manager/ws/single_user_channels/ping_monitor.rs index 534af6e1..4adc5725 100644 --- a/src/rust/lqosd/src/node_manager/ws/single_user_channels/ping_monitor.rs +++ b/src/rust/lqosd/src/node_manager/ws/single_user_channels/ping_monitor.rs @@ -24,8 +24,14 @@ pub(super) async fn ping_monitor(ip_addresses: Vec<(String, String)>, tx: tokio: loop { ticker.tick().await; - let client_v4 = Client::new(&Config::default()).unwrap(); - let client_v6 = Client::new(&Config::builder().kind(ICMP::V6).build()).unwrap(); + let client_v4 = Client::new(&Config::default()); + let client_v6 = Client::new(&Config::builder().kind(ICMP::V6).build()); + if client_v4.is_err() || client_v6.is_err() { + log::info!("Failed to create ping clients"); + break; + } + let client_v4 = client_v4.unwrap(); + let client_v6 = client_v6.unwrap(); let mut tasks = Vec::new(); for (ip, label) in ip_addresses.iter() { @@ -42,7 +48,7 @@ pub(super) async fn ping_monitor(ip_addresses: Vec<(String, String)>, tx: tokio: // Use futures to join on all tasks for task in tasks { - task.await.unwrap(); + let _ = task.await; } // Notify the channel that we're still here - this is really @@ -51,10 +57,11 @@ pub(super) async fn ping_monitor(ip_addresses: Vec<(String, String)>, tx: tokio: ip: "test".to_string(), result: PingState::ChannelTest, }; - let message = serde_json::to_string(&channel_test).unwrap(); - if let Err(_) = tx.send(message.to_string()).await { - log::info!("Channel is gone"); - break; + if let Ok(message) = serde_json::to_string(&channel_test) { + if let Err(_) = tx.send(message.to_string()).await { + log::debug!("Channel is gone"); + break; + } } } } @@ -64,9 +71,10 @@ async fn send_timeout(tx: tokio::sync::mpsc::Sender, ip: String) { ip, result: PingState::NoResponse, }; - let message = serde_json::to_string(&result).unwrap(); - if let Err(_) = tx.send(message.to_string()).await { - log::info!("Channel is gone"); + if let Ok(message) = serde_json::to_string(&result) { + if let Err(_) = tx.send(message.to_string()).await { + log::info!("Channel is gone"); + } } } @@ -78,9 +86,10 @@ async fn send_alive(tx: tokio::sync::mpsc::Sender, ip: String, ping_time label, }, }; - let message = serde_json::to_string(&result).unwrap(); - if let Err(_) = tx.send(message.to_string()).await { - log::info!("Channel is gone"); + if let Ok(message) = serde_json::to_string(&result) { + if let Err(_) = tx.send(message.to_string()).await { + log::info!("Channel is gone"); + } } } diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/flow_counter.rs b/src/rust/lqosd/src/node_manager/ws/ticker/flow_counter.rs index 87882067..9692b106 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker/flow_counter.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker/flow_counter.rs @@ -10,8 +10,11 @@ pub async fn flow_count(channels: Arc) { } let active_flows = { - let lock = ALL_FLOWS.lock().unwrap(); - lock.len() as u64 + if let Ok(lock) = ALL_FLOWS.lock() { + lock.len() as u64 + } else { + 0 + } }; let expired_flows = RECENT_FLOWS.len(); let active_flows = json!( diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs b/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs index 5e90e521..a97f1d59 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker/network_tree.rs @@ -20,23 +20,26 @@ pub async fn network_tree(channels: Arc) { return; } - let data: Vec<(usize, NetworkJsonTransport)> = spawn_blocking(|| { - let net_json = NETWORK_JSON.read().unwrap(); - net_json - .get_nodes_when_ready() - .iter() - .enumerate() - .map(|(i, n)| (i, n.clone_to_transit())) - .collect() - }).await.unwrap(); - - let message = json!( + if let Ok(data) = spawn_blocking(|| { + if let Ok(net_json) = NETWORK_JSON.read() { + net_json + .get_nodes_when_ready() + .iter() + .enumerate() + .map(|(i, n)| (i, n.clone_to_transit())) + .collect::>() + } else { + Vec::new() + } + }).await { + let message = json!( { "event": PublishedChannels::NetworkTree.to_string(), "data": data, } ).to_string(); - channels.send(PublishedChannels::NetworkTree, message).await; + channels.send(PublishedChannels::NetworkTree, message).await; + } } #[derive(Serialize)] diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs b/src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs index c3f70699..a2866bcf 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker/tree_capacity.rs @@ -21,33 +21,36 @@ pub async fn tree_capacity(channels: Arc) { } let capacities: Vec<_> = { - let reader = NETWORK_JSON.read().unwrap(); - reader.get_nodes_when_ready().iter().map(|node| { - let node = node.clone_to_transit(); - let down = node.current_throughput.0 as f64 * 8.0 / 1_000_000.0; - let up = node.current_throughput.1 as f64 * 8.0 / 1_000_000.0; - let max_down = node.max_throughput.0 as f64; - let max_up = node.max_throughput.1 as f64; - let median_rtt = if node.rtts.is_empty() { - 0.0 - } else { - let n = node.rtts.len() / 2; - if node.rtts.len() % 2 == 0 { - (node.rtts[n - 1] + node.rtts[n]) / 2.0 + if let Ok(reader) = NETWORK_JSON.read() { + reader.get_nodes_when_ready().iter().map(|node| { + let node = node.clone_to_transit(); + let down = node.current_throughput.0 as f64 * 8.0 / 1_000_000.0; + let up = node.current_throughput.1 as f64 * 8.0 / 1_000_000.0; + let max_down = node.max_throughput.0 as f64; + let max_up = node.max_throughput.1 as f64; + let median_rtt = if node.rtts.is_empty() { + 0.0 } else { - node.rtts[n] - } - }; + let n = node.rtts.len() / 2; + if node.rtts.len() % 2 == 0 { + (node.rtts[n - 1] + node.rtts[n]) / 2.0 + } else { + node.rtts[n] + } + }; - NodeCapacity { - name: node.name.clone(), - down, - up, - max_down, - max_up, - median_rtt, - } - }).collect() + NodeCapacity { + name: node.name.clone(), + down, + up, + max_down, + max_up, + median_rtt, + } + }).collect() + } else { + Vec::new() + } }; let message = json!(