diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs
index 25f8c6b6..17b3597f 100644
--- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs
+++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs
@@ -139,9 +139,9 @@ impl GeoTable {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
- let mut owners = String::new();
- let mut country = String::new();
- let mut flag = String::new();
+ let mut owners = "Unknown".to_string();
+ let mut country = "Unknown".to_string();
+ let mut flag = "Unknown".to_string();
if let Some(matched) = self.asn_trie.longest_match(ip) {
log::debug!("Matched ASN: {:?}", matched.1.asn);
diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs
index 14370d1e..99f85de1 100644
--- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs
+++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs
@@ -33,6 +33,13 @@ pub struct AsnListEntry {
name: String,
}
+#[derive(Debug, Serialize)]
+pub struct AsnCountryListEntry {
+ count: usize,
+ name: String,
+ iso_code: String,
+}
+
impl TimeBuffer {
fn new() -> Self {
Self {
@@ -279,6 +286,18 @@ impl TimeBuffer {
.collect()
}
+ pub fn all_flows_for_country(&self, iso_code: &str) -> Vec<(FlowbeeKey, FlowbeeLocalData, FlowAnalysis)> {
+ let buffer = self.buffer.lock().unwrap();
+ buffer
+ .iter()
+ .filter(|flow| {
+ let country = get_asn_name_and_country(flow.data.0.remote_ip.as_ip());
+ country.flag == iso_code
+ })
+ .map(|flow| flow.data.clone())
+ .collect()
+ }
+
/// Builds a list of all ASNs with recent data, and how many flows they have.
pub fn asn_list(&self) -> Vec
{
// 1: Clone: large operation, don't keep the buffer locked longer than we have to
@@ -312,6 +331,43 @@ impl TimeBuffer {
})
.collect()
}
+
+ /// Builds a list of ASNs by country with recent data, and how many flows they have.
+ pub fn country_list(&self) -> Vec {
+ // 1: Clone: large operation, don't keep the buffer locked longer than we have to
+ let buffer = {
+ let buffer = self.buffer.lock().unwrap();
+ buffer.clone()
+ };
+
+ // Filter out the short flows and get the country & flag
+ let mut buffer: Vec<(String, String)> = buffer
+ .into_iter()
+ .filter(|flow| {
+ // Total flow time > 3 seconds
+ flow.data.1.last_seen - flow.data.1.start_time > 3_000_000_000
+ })
+ .map(|flow| {
+ let country = get_asn_name_and_country(flow.data.0.remote_ip.as_ip());
+ (country.country, country.flag)
+ })
+ .collect();
+
+ // Sort the buffer
+ buffer.sort_unstable_by(|a, b| a.0.cmp(&b.0));
+
+ // Deduplicate and count, decorate with name
+ buffer
+ .into_iter()
+ .sorted()
+ .dedup_with_count()
+ .map(|(count, asn)| AsnCountryListEntry {
+ count,
+ name: asn.0,
+ iso_code: asn.1,
+ })
+ .collect()
+ }
}
pub static RECENT_FLOWS: Lazy = Lazy::new(|| TimeBuffer::new());
diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs
index 35cf7a77..e6dcfe34 100644
--- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs
+++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs
@@ -14,7 +14,7 @@ mod kernel_ringbuffer;
pub use kernel_ringbuffer::*;
mod rtt_types;
pub use rtt_types::RttData;
-pub use finished_flows::AsnListEntry;
+pub use finished_flows::{AsnListEntry, AsnCountryListEntry};
use crate::throughput_tracker::flow_data::flow_analysis::asn::AsnNameCountryFlag;
static ANALYSIS: Lazy = Lazy::new(|| FlowAnalysisSystem::new());
diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs
index bcdcb012..785eec82 100644
--- a/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs
+++ b/src/rust/lqosd/src/throughput_tracker/flow_data/mod.rs
@@ -15,7 +15,8 @@ use std::sync::{
};
pub(crate) use flow_analysis::{setup_flow_analysis, get_asn_name_and_country,
FlowAnalysis, RECENT_FLOWS, flowbee_handle_events, get_flowbee_event_count_and_reset,
- expire_rtt_flows, flowbee_rtt_map, RttData, get_rtt_events_per_second, AsnListEntry
+ expire_rtt_flows, flowbee_rtt_map, RttData, get_rtt_events_per_second, AsnListEntry,
+ AsnCountryListEntry
};
trait FlowbeeRecipient {