Finish removing potentially dangerous unwraps from the websocket system.

This commit is contained in:
Herbert Wolverson 2024-07-17 08:51:10 -05:00
parent a649ea8639
commit 11e8a8fb86
7 changed files with 151 additions and 129 deletions

View File

@ -18,10 +18,11 @@ pub(super) async fn cake_watcher(circuit_id: String, tx: tokio::sync::mpsc::Send
match get_raw_circuit_data(&circuit_id) { match get_raw_circuit_data(&circuit_id) {
lqos_bus::BusResponse::RawQueueData(Some(msg)) => { lqos_bus::BusResponse::RawQueueData(Some(msg)) => {
let json = serde_json::to_string(&QueueStoreTransit::from(*msg)).unwrap(); if let Ok(json) = serde_json::to_string(&QueueStoreTransit::from(*msg)) {
let send_result = tx.send(json.to_string()).await; let send_result = tx.send(json.to_string()).await;
if send_result.is_err() { if send_result.is_err() {
break; break;
}
} }
} }
_ => {} _ => {}

View File

@ -32,10 +32,11 @@ pub(super) async fn circuit_watcher(circuit: String, tx: tokio::sync::mpsc::Send
devices: devices_for_circuit, devices: devices_for_circuit,
}; };
let message = serde_json::to_string(&result).unwrap(); if let Ok(message) = serde_json::to_string(&result) {
if let Err(_) = tx.send(message.to_string()).await { if let Err(_) = tx.send(message.to_string()).await {
log::info!("Channel is gone"); log::info!("Channel is gone");
break; break;
}
} }
} }
} }

View File

@ -9,74 +9,75 @@ use crate::throughput_tracker::flow_data::{ALL_FLOWS, FlowAnalysis, FlowbeeLocal
const FIVE_MINUTES_AS_NANOS: u64 = 300 * 1_000_000_000; const FIVE_MINUTES_AS_NANOS: u64 = 300 * 1_000_000_000;
fn recent_flows_by_circuit(circuit_id: &str) -> Vec<(FlowbeeKeyTransit, FlowbeeLocalData, FlowAnalysis)> { fn recent_flows_by_circuit(circuit_id: &str) -> Vec<(FlowbeeKeyTransit, FlowbeeLocalData, FlowAnalysis)> {
let device_reader = SHAPED_DEVICES.read().unwrap(); if let Ok(device_reader) = SHAPED_DEVICES.read() {
if let Ok(now) = time_since_boot() {
let now_as_nanos = Duration::from(now).as_nanos() as u64;
let five_minutes_ago = now_as_nanos - FIVE_MINUTES_AS_NANOS;
if let Ok(now) = time_since_boot() { if let Ok(all_flows) = ALL_FLOWS.lock() {
let now_as_nanos = Duration::from(now).as_nanos() as u64; let result: Vec<(FlowbeeKeyTransit, FlowbeeLocalData, FlowAnalysis)> = all_flows
let five_minutes_ago = now_as_nanos - FIVE_MINUTES_AS_NANOS; .iter()
.filter_map(|(key, (local, analysis))| {
// Don't show older flows
if local.last_seen < five_minutes_ago {
return None;
}
let all_flows = ALL_FLOWS.lock().unwrap(); // Don't show flows that don't belong to the circuit
let result: Vec<(FlowbeeKeyTransit, FlowbeeLocalData, FlowAnalysis)> = all_flows let local_ip_str; // Using late binding
.iter() let remote_ip_str;
.filter_map(|(key, (local, analysis))| { let device_name;
// Don't show older flows let asn_name;
if local.last_seen < five_minutes_ago { let asn_country;
return None; let local_ip = match key.local_ip.as_ip() {
} IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
let remote_ip = match key.remote_ip.as_ip() {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
if let Some(device) = device_reader.trie.longest_match(local_ip) {
// The normal way around
local_ip_str = key.local_ip.to_string();
remote_ip_str = key.remote_ip.to_string();
let device = &device_reader.devices[*device.1];
if device.circuit_id != circuit_id {
return None;
}
device_name = device.device_name.clone();
(asn_name, asn_country) = get_asn_name_and_country(key.remote_ip.as_ip());
} else if let Some(device) = device_reader.trie.longest_match(remote_ip) {
// The reverse way around
local_ip_str = key.remote_ip.to_string();
remote_ip_str = key.local_ip.to_string();
let device = &device_reader.devices[*device.1];
if device.circuit_id != circuit_id {
return None;
}
device_name = device.device_name.clone();
(asn_name, asn_country) = get_asn_name_and_country(key.local_ip.as_ip());
} else {
return None;
}
// Don't show flows that don't belong to the circuit Some((FlowbeeKeyTransit {
let local_ip_str ; // Using late binding remote_ip: remote_ip_str,
let remote_ip_str ; local_ip: local_ip_str,
let device_name ; src_port: key.src_port,
let asn_name ; dst_port: key.dst_port,
let asn_country ; ip_protocol: key.ip_protocol,
let local_ip = match key.local_ip.as_ip() { device_name,
IpAddr::V4(ip) => ip.to_ipv6_mapped(), asn_name,
IpAddr::V6(ip) => ip, asn_country,
}; protocol_name: analysis.protocol_analysis.to_string(),
let remote_ip = match key.remote_ip.as_ip() { last_seen_nanos: now_as_nanos.saturating_sub(local.last_seen),
IpAddr::V4(ip) => ip.to_ipv6_mapped(), }, local.clone(), analysis.clone()))
IpAddr::V6(ip) => ip, })
}; .collect();
if let Some(device) = device_reader.trie.longest_match(local_ip) { return result;
// The normal way around }
local_ip_str = key.local_ip.to_string(); }
remote_ip_str = key.remote_ip.to_string();
let device = &device_reader.devices[*device.1];
if device.circuit_id != circuit_id {
return None;
}
device_name = device.device_name.clone();
(asn_name, asn_country) = get_asn_name_and_country(key.remote_ip.as_ip());
} else if let Some(device) = device_reader.trie.longest_match(remote_ip) {
// The reverse way around
local_ip_str = key.remote_ip.to_string();
remote_ip_str = key.local_ip.to_string();
let device = &device_reader.devices[*device.1];
if device.circuit_id != circuit_id {
return None;
}
device_name = device.device_name.clone();
(asn_name, asn_country) = get_asn_name_and_country(key.local_ip.as_ip());
} else {
return None;
}
Some((FlowbeeKeyTransit {
remote_ip: remote_ip_str,
local_ip: local_ip_str,
src_port: key.src_port,
dst_port: key.dst_port,
ip_protocol: key.ip_protocol,
device_name,
asn_name,
asn_country,
protocol_name: analysis.protocol_analysis.to_string(),
last_seen_nanos: now_as_nanos.saturating_sub(local.last_seen),
}, local.clone(), analysis.clone()))
})
.collect();
return result;
} }
Vec::new() Vec::new()
} }
@ -127,10 +128,11 @@ pub(super) async fn flows_by_circuit(circuit: String, tx: tokio::sync::mpsc::Sen
circuit_id: circuit.clone(), circuit_id: circuit.clone(),
flows, flows,
}; };
let message = serde_json::to_string(&result).unwrap(); if let Ok(message) = serde_json::to_string(&result) {
if let Err(_) = tx.send(message).await { if let Err(_) = tx.send(message).await {
log::info!("Channel is gone"); log::debug!("Channel is gone");
break; break;
}
} }
} }

View File

@ -24,8 +24,14 @@ pub(super) async fn ping_monitor(ip_addresses: Vec<(String, String)>, tx: tokio:
loop { loop {
ticker.tick().await; ticker.tick().await;
let client_v4 = Client::new(&Config::default()).unwrap(); let client_v4 = Client::new(&Config::default());
let client_v6 = Client::new(&Config::builder().kind(ICMP::V6).build()).unwrap(); let client_v6 = Client::new(&Config::builder().kind(ICMP::V6).build());
if client_v4.is_err() || client_v6.is_err() {
log::info!("Failed to create ping clients");
break;
}
let client_v4 = client_v4.unwrap();
let client_v6 = client_v6.unwrap();
let mut tasks = Vec::new(); let mut tasks = Vec::new();
for (ip, label) in ip_addresses.iter() { for (ip, label) in ip_addresses.iter() {
@ -42,7 +48,7 @@ pub(super) async fn ping_monitor(ip_addresses: Vec<(String, String)>, tx: tokio:
// Use futures to join on all tasks // Use futures to join on all tasks
for task in tasks { for task in tasks {
task.await.unwrap(); let _ = task.await;
} }
// Notify the channel that we're still here - this is really // Notify the channel that we're still here - this is really
@ -51,10 +57,11 @@ pub(super) async fn ping_monitor(ip_addresses: Vec<(String, String)>, tx: tokio:
ip: "test".to_string(), ip: "test".to_string(),
result: PingState::ChannelTest, result: PingState::ChannelTest,
}; };
let message = serde_json::to_string(&channel_test).unwrap(); if let Ok(message) = serde_json::to_string(&channel_test) {
if let Err(_) = tx.send(message.to_string()).await { if let Err(_) = tx.send(message.to_string()).await {
log::info!("Channel is gone"); log::debug!("Channel is gone");
break; break;
}
} }
} }
} }
@ -64,9 +71,10 @@ async fn send_timeout(tx: tokio::sync::mpsc::Sender<String>, ip: String) {
ip, ip,
result: PingState::NoResponse, result: PingState::NoResponse,
}; };
let message = serde_json::to_string(&result).unwrap(); if let Ok(message) = serde_json::to_string(&result) {
if let Err(_) = tx.send(message.to_string()).await { if let Err(_) = tx.send(message.to_string()).await {
log::info!("Channel is gone"); log::info!("Channel is gone");
}
} }
} }
@ -78,9 +86,10 @@ async fn send_alive(tx: tokio::sync::mpsc::Sender<String>, ip: String, ping_time
label, label,
}, },
}; };
let message = serde_json::to_string(&result).unwrap(); if let Ok(message) = serde_json::to_string(&result) {
if let Err(_) = tx.send(message.to_string()).await { if let Err(_) = tx.send(message.to_string()).await {
log::info!("Channel is gone"); log::info!("Channel is gone");
}
} }
} }

View File

@ -10,8 +10,11 @@ pub async fn flow_count(channels: Arc<PubSub>) {
} }
let active_flows = { let active_flows = {
let lock = ALL_FLOWS.lock().unwrap(); if let Ok(lock) = ALL_FLOWS.lock() {
lock.len() as u64 lock.len() as u64
} else {
0
}
}; };
let expired_flows = RECENT_FLOWS.len(); let expired_flows = RECENT_FLOWS.len();
let active_flows = json!( let active_flows = json!(

View File

@ -20,23 +20,26 @@ pub async fn network_tree(channels: Arc<PubSub>) {
return; return;
} }
let data: Vec<(usize, NetworkJsonTransport)> = spawn_blocking(|| { if let Ok(data) = spawn_blocking(|| {
let net_json = NETWORK_JSON.read().unwrap(); if let Ok(net_json) = NETWORK_JSON.read() {
net_json net_json
.get_nodes_when_ready() .get_nodes_when_ready()
.iter() .iter()
.enumerate() .enumerate()
.map(|(i, n)| (i, n.clone_to_transit())) .map(|(i, n)| (i, n.clone_to_transit()))
.collect() .collect::<Vec<(usize, NetworkJsonTransport)>>()
}).await.unwrap(); } else {
Vec::new()
let message = json!( }
}).await {
let message = json!(
{ {
"event": PublishedChannels::NetworkTree.to_string(), "event": PublishedChannels::NetworkTree.to_string(),
"data": data, "data": data,
} }
).to_string(); ).to_string();
channels.send(PublishedChannels::NetworkTree, message).await; channels.send(PublishedChannels::NetworkTree, message).await;
}
} }
#[derive(Serialize)] #[derive(Serialize)]

View File

@ -21,33 +21,36 @@ pub async fn tree_capacity(channels: Arc<PubSub>) {
} }
let capacities: Vec<_> = { let capacities: Vec<_> = {
let reader = NETWORK_JSON.read().unwrap(); if let Ok(reader) = NETWORK_JSON.read() {
reader.get_nodes_when_ready().iter().map(|node| { reader.get_nodes_when_ready().iter().map(|node| {
let node = node.clone_to_transit(); let node = node.clone_to_transit();
let down = node.current_throughput.0 as f64 * 8.0 / 1_000_000.0; let down = node.current_throughput.0 as f64 * 8.0 / 1_000_000.0;
let up = node.current_throughput.1 as f64 * 8.0 / 1_000_000.0; let up = node.current_throughput.1 as f64 * 8.0 / 1_000_000.0;
let max_down = node.max_throughput.0 as f64; let max_down = node.max_throughput.0 as f64;
let max_up = node.max_throughput.1 as f64; let max_up = node.max_throughput.1 as f64;
let median_rtt = if node.rtts.is_empty() { let median_rtt = if node.rtts.is_empty() {
0.0 0.0
} else {
let n = node.rtts.len() / 2;
if node.rtts.len() % 2 == 0 {
(node.rtts[n - 1] + node.rtts[n]) / 2.0
} else { } else {
node.rtts[n] let n = node.rtts.len() / 2;
} if node.rtts.len() % 2 == 0 {
}; (node.rtts[n - 1] + node.rtts[n]) / 2.0
} else {
node.rtts[n]
}
};
NodeCapacity { NodeCapacity {
name: node.name.clone(), name: node.name.clone(),
down, down,
up, up,
max_down, max_down,
max_up, max_up,
median_rtt, median_rtt,
} }
}).collect() }).collect()
} else {
Vec::new()
}
}; };
let message = json!( let message = json!(