From 5a3f90412d6bccd18c8836a4900d0a5401a11119 Mon Sep 17 00:00:00 2001 From: Herbert Wolverson Date: Tue, 12 Mar 2024 14:02:36 -0500 Subject: [PATCH] Working geocode system, albeit not as useful as I hoped. --- src/rust/Cargo.lock | 3 + src/rust/lqosd/Cargo.toml | 3 + .../flow_data/flow_analysis/asn.rs | 141 ++++++++++++++++++ .../flow_data/flow_analysis/finished_flows.rs | 4 +- .../flow_data/flow_analysis/mod.rs | 43 ++---- src/rust/lqosd/src/throughput_tracker/mod.rs | 6 +- 6 files changed, 165 insertions(+), 35 deletions(-) diff --git a/src/rust/Cargo.lock b/src/rust/Cargo.lock index 3cd9aebe..036631c6 100644 --- a/src/rust/Cargo.lock +++ b/src/rust/Cargo.lock @@ -1738,10 +1738,13 @@ name = "lqosd" version = "0.1.0" dependencies = [ "anyhow", + "bincode", "csv", "dashmap", "env_logger", "flate2", + "ip_network", + "ip_network_table", "itertools 0.12.1", "jemallocator", "log", diff --git a/src/rust/lqosd/Cargo.toml b/src/rust/lqosd/Cargo.toml index ae1cd6df..c4d948cd 100644 --- a/src/rust/lqosd/Cargo.toml +++ b/src/rust/lqosd/Cargo.toml @@ -33,6 +33,9 @@ itertools = "0.12.1" csv = "1" reqwest = { version = "0.11.24", features = ["blocking"] } flate2 = "1.0" +bincode = "1" +ip_network_table = "0" +ip_network = "0" # Support JemAlloc on supported platforms [target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies] diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs index d06e8141..68641c13 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/asn.rs @@ -1,6 +1,146 @@ +//! Obtain ASN and geo mappings from IP addresses for flow +//! analysis. + + use std::{io::Read, net::IpAddr, path::Path}; use serde::Deserialize; +#[derive(Deserialize, Clone, Debug)] +struct AsnEncoded { + network: IpAddr, + prefix: u8, + pub asn: u32, + organization: String, +} + +#[allow(dead_code)] +#[derive(Deserialize, Debug)] +struct GeoIpLocation { + network: IpAddr, + prefix: u8, + latitude: f64, + longitude: f64, + city_and_country: String, + +} + +#[derive(Deserialize)] +struct Geobin { + asn: Vec, + geo: Vec, +} + +pub struct GeoTable { + asn_trie: ip_network_table::IpNetworkTable, + geo_trie: ip_network_table::IpNetworkTable, +} + +impl GeoTable { + const FILENAME: &'static str = "geo.bin"; + + fn file_path() -> std::path::PathBuf { + Path::new(&lqos_config::load_config().unwrap().lqos_directory) + .join(Self::FILENAME) + } + + fn download() -> anyhow::Result<()> { + log::info!("Downloading ASN-IP Table"); + let file_path = Self::file_path(); + let url = "https://bfnightly.bracketproductions.com/geo.bin"; + let response = reqwest::blocking::get(url)?; + let content = response.bytes()?; + let bytes = &content[0..]; + std::fs::write(file_path, bytes)?; + Ok(()) + } + + pub fn load() -> anyhow::Result { + let path = Self::file_path(); + if !path.exists() { + log::info!("geo.bin not found - trying to download it"); + Self::download()?; + } + + // Decompress and deserialize + let file = std::fs::File::open(path)?; + let mut buffer = Vec::new(); + flate2::read::GzDecoder::new(file).read_to_end(&mut buffer)?; + let geobin: Geobin = bincode::deserialize(&buffer)?; + + // Build the ASN trie + log::info!("Building ASN trie"); + let mut asn_trie = ip_network_table::IpNetworkTable::::new(); + for entry in geobin.asn { + let (ip, prefix) = match entry.network { + IpAddr::V4(ip) => (ip.to_ipv6_mapped(), entry.prefix+96 ), + IpAddr::V6(ip) => (ip, entry.prefix), + }; + if let Ok(ip) = ip_network::Ipv6Network::new(ip, prefix) { + asn_trie.insert(ip, entry); + } + } + + // Build the GeoIP trie + log::info!("Building GeoIP trie"); + let mut geo_trie = ip_network_table::IpNetworkTable::::new(); + for entry in geobin.geo { + let (ip, prefix) = match entry.network { + IpAddr::V4(ip) => (ip.to_ipv6_mapped(), entry.prefix+96 ), + IpAddr::V6(ip) => (ip, entry.prefix), + }; + if let Ok(ip) = ip_network::Ipv6Network::new(ip, prefix) { + geo_trie.insert(ip, entry); + } + } + + log::info!("GeoTables loaded, {}-{} records.", asn_trie.len().1, geo_trie.len().1); + + Ok(Self { + asn_trie, + geo_trie, + }) + } + + pub fn find_asn(&self, ip: IpAddr) -> Option { + log::debug!("Looking up ASN for IP: {:?}", ip); + let ip = match ip { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + if let Some(matched) = self.asn_trie.longest_match(ip) { + log::debug!("Matched ASN: {:?}", matched.1.asn); + Some(matched.1.asn) + } else { + log::debug!("No ASN found"); + None + } + } + + pub fn find_owners_by_ip(&self, ip: IpAddr) -> (String, String) { + log::debug!("Looking up ASN for IP: {:?}", ip); + let ip = match ip { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + let mut owners = String::new(); + let mut country = String::new(); + + if let Some(matched) = self.asn_trie.longest_match(ip) { + log::debug!("Matched ASN: {:?}", matched.1.asn); + owners = matched.1.organization.clone(); + } + if let Some(matched) = self.geo_trie.longest_match(ip) { + log::debug!("Matched Geo: {:?}", matched.1.city_and_country); + country = matched.1.city_and_country.clone(); + } + + (owners, country) + } +} + +/////////////////////////////////////////////////////////////////////// + +/* /// Structure to represent the on-disk structure for files /// from: https://iptoasn.com/ /// Specifically: https://iptoasn.com/data/ip2asn-combined.tsv.gz @@ -105,3 +245,4 @@ impl AsnTable { self.asn_table.iter().find(|row| row.asn == asn).map(|row| row.clone()) } } +*/ \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs index 3bfa123a..974ab649 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/finished_flows.rs @@ -39,8 +39,8 @@ impl TimeBuffer { let mut my_buffer = buffer .iter() .map(|v| { - let (_key, data, analysis) = &v.data; - let (_name, country) = get_asn_name_and_country(analysis.asn_id.0); + let (key, data, _analysis) = &v.data; + let (_name, country) = get_asn_name_and_country(key.remote_ip.as_ip()); let rtt = [ (data.last_rtt[0] / 1000000) as f32, (data.last_rtt[1] / 1000000) as f32, diff --git a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs index 7800fc88..b0cc985b 100644 --- a/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/flow_data/flow_analysis/mod.rs @@ -1,7 +1,6 @@ use std::{net::IpAddr, sync::Mutex}; use lqos_sys::flowbee_data::FlowbeeKey; use once_cell::sync::Lazy; -use self::asn::AsnTable; mod asn; mod protocol; pub use protocol::FlowProtocol; @@ -12,7 +11,7 @@ pub use finished_flows::FinishedFlowAnalysis; static ANALYSIS: Lazy = Lazy::new(|| FlowAnalysisSystem::new()); pub struct FlowAnalysisSystem { - asn_table: Mutex>, + asn_table: Mutex>, } impl FlowAnalysisSystem { @@ -20,7 +19,7 @@ impl FlowAnalysisSystem { // Periodically update the ASN table std::thread::spawn(|| { loop { - let result = AsnTable::new(); + let result = asn::GeoTable::load(); match result { Ok(table) => { ANALYSIS.asn_table.lock().unwrap().replace(table); @@ -66,35 +65,19 @@ impl FlowAnalysis { pub fn lookup_asn_id(ip: IpAddr) -> Option { - let table_lock = ANALYSIS.asn_table.lock(); - if table_lock.is_err() { - return None; - } - let table = table_lock.unwrap(); - if table.is_none() { - return None; - } - let table = table.as_ref().unwrap(); - if let Some(asn) = table.find_asn(ip) { - Some(asn.asn) - } else { - None + if let Ok(table_lock) = ANALYSIS.asn_table.lock() { + if let Some(table) = table_lock.as_ref() { + return table.find_asn(ip); + } } + None } -pub fn get_asn_name_and_country(asn: u32) -> (String, String) { - let table_lock = ANALYSIS.asn_table.lock(); - if table_lock.is_err() { - return ("".to_string(), "".to_string()); - } - let table = table_lock.unwrap(); - if table.is_none() { - return ("".to_string(), "".to_string()); - } - let table = table.as_ref().unwrap(); - if let Some(row) = table.find_asn_by_id(asn) { - (row.owners.clone(), row.country.clone()) - } else { - ("".to_string(), "".to_string()) +pub fn get_asn_name_and_country(ip: IpAddr) -> (String, String) { + if let Ok(table_lock) = ANALYSIS.asn_table.lock() { + if let Some(table) = table_lock.as_ref() { + return table.find_owners_by_ip(ip); + } } + (String::new(), String::new()) } \ No newline at end of file diff --git a/src/rust/lqosd/src/throughput_tracker/mod.rs b/src/rust/lqosd/src/throughput_tracker/mod.rs index d34bacf3..d8f64509 100644 --- a/src/rust/lqosd/src/throughput_tracker/mod.rs +++ b/src/rust/lqosd/src/throughput_tracker/mod.rs @@ -505,7 +505,7 @@ pub fn dump_active_flows() -> BusResponse { let result: Vec = lock .iter() .map(|(key, row)| { - let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(row.1.asn_id.0); + let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); lqos_bus::FlowbeeData { remote_ip: key.remote_ip.as_ip().to_string(), @@ -589,7 +589,7 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse { .iter() .take(n as usize) .map(|(ip, flow)| { - let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(flow.1.asn_id.0); + let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(ip.remote_ip.as_ip()); lqos_bus::FlowbeeData { remote_ip: ip.remote_ip.as_ip().to_string(), local_ip: ip.local_ip.as_ip().to_string(), @@ -624,7 +624,7 @@ pub fn flows_by_ip(ip: &str) -> BusResponse { .iter() .filter(|(key, _)| key.local_ip == ip) .map(|(key, row)| { - let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(row.1.asn_id.0); + let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip()); lqos_bus::FlowbeeData { remote_ip: key.remote_ip.as_ip().to_string(),