Working geocode system, albeit not as useful as I hoped.

This commit is contained in:
Herbert Wolverson
2024-03-12 14:02:36 -05:00
parent 55f24cf71b
commit 5a3f90412d
6 changed files with 165 additions and 35 deletions

3
src/rust/Cargo.lock generated
View File

@@ -1738,10 +1738,13 @@ name = "lqosd"
version = "0.1.0"
dependencies = [
"anyhow",
"bincode",
"csv",
"dashmap",
"env_logger",
"flate2",
"ip_network",
"ip_network_table",
"itertools 0.12.1",
"jemallocator",
"log",

View File

@@ -33,6 +33,9 @@ itertools = "0.12.1"
csv = "1"
reqwest = { version = "0.11.24", features = ["blocking"] }
flate2 = "1.0"
bincode = "1"
ip_network_table = "0"
ip_network = "0"
# Support JemAlloc on supported platforms
[target.'cfg(any(target_arch = "x86", target_arch = "x86_64"))'.dependencies]

View File

@@ -1,6 +1,146 @@
//! Obtain ASN and geo mappings from IP addresses for flow
//! analysis.
use std::{io::Read, net::IpAddr, path::Path};
use serde::Deserialize;
#[derive(Deserialize, Clone, Debug)]
struct AsnEncoded {
network: IpAddr,
prefix: u8,
pub asn: u32,
organization: String,
}
#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct GeoIpLocation {
network: IpAddr,
prefix: u8,
latitude: f64,
longitude: f64,
city_and_country: String,
}
#[derive(Deserialize)]
struct Geobin {
asn: Vec<AsnEncoded>,
geo: Vec<GeoIpLocation>,
}
pub struct GeoTable {
asn_trie: ip_network_table::IpNetworkTable<AsnEncoded>,
geo_trie: ip_network_table::IpNetworkTable<GeoIpLocation>,
}
impl GeoTable {
const FILENAME: &'static str = "geo.bin";
fn file_path() -> std::path::PathBuf {
Path::new(&lqos_config::load_config().unwrap().lqos_directory)
.join(Self::FILENAME)
}
fn download() -> anyhow::Result<()> {
log::info!("Downloading ASN-IP Table");
let file_path = Self::file_path();
let url = "https://bfnightly.bracketproductions.com/geo.bin";
let response = reqwest::blocking::get(url)?;
let content = response.bytes()?;
let bytes = &content[0..];
std::fs::write(file_path, bytes)?;
Ok(())
}
pub fn load() -> anyhow::Result<Self> {
let path = Self::file_path();
if !path.exists() {
log::info!("geo.bin not found - trying to download it");
Self::download()?;
}
// Decompress and deserialize
let file = std::fs::File::open(path)?;
let mut buffer = Vec::new();
flate2::read::GzDecoder::new(file).read_to_end(&mut buffer)?;
let geobin: Geobin = bincode::deserialize(&buffer)?;
// Build the ASN trie
log::info!("Building ASN trie");
let mut asn_trie = ip_network_table::IpNetworkTable::<AsnEncoded>::new();
for entry in geobin.asn {
let (ip, prefix) = match entry.network {
IpAddr::V4(ip) => (ip.to_ipv6_mapped(), entry.prefix+96 ),
IpAddr::V6(ip) => (ip, entry.prefix),
};
if let Ok(ip) = ip_network::Ipv6Network::new(ip, prefix) {
asn_trie.insert(ip, entry);
}
}
// Build the GeoIP trie
log::info!("Building GeoIP trie");
let mut geo_trie = ip_network_table::IpNetworkTable::<GeoIpLocation>::new();
for entry in geobin.geo {
let (ip, prefix) = match entry.network {
IpAddr::V4(ip) => (ip.to_ipv6_mapped(), entry.prefix+96 ),
IpAddr::V6(ip) => (ip, entry.prefix),
};
if let Ok(ip) = ip_network::Ipv6Network::new(ip, prefix) {
geo_trie.insert(ip, entry);
}
}
log::info!("GeoTables loaded, {}-{} records.", asn_trie.len().1, geo_trie.len().1);
Ok(Self {
asn_trie,
geo_trie,
})
}
pub fn find_asn(&self, ip: IpAddr) -> Option<u32> {
log::debug!("Looking up ASN for IP: {:?}", ip);
let ip = match ip {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
if let Some(matched) = self.asn_trie.longest_match(ip) {
log::debug!("Matched ASN: {:?}", matched.1.asn);
Some(matched.1.asn)
} else {
log::debug!("No ASN found");
None
}
}
pub fn find_owners_by_ip(&self, ip: IpAddr) -> (String, String) {
log::debug!("Looking up ASN for IP: {:?}", ip);
let ip = match ip {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
let mut owners = String::new();
let mut country = String::new();
if let Some(matched) = self.asn_trie.longest_match(ip) {
log::debug!("Matched ASN: {:?}", matched.1.asn);
owners = matched.1.organization.clone();
}
if let Some(matched) = self.geo_trie.longest_match(ip) {
log::debug!("Matched Geo: {:?}", matched.1.city_and_country);
country = matched.1.city_and_country.clone();
}
(owners, country)
}
}
///////////////////////////////////////////////////////////////////////
/*
/// Structure to represent the on-disk structure for files
/// from: https://iptoasn.com/
/// Specifically: https://iptoasn.com/data/ip2asn-combined.tsv.gz
@@ -105,3 +245,4 @@ impl AsnTable {
self.asn_table.iter().find(|row| row.asn == asn).map(|row| row.clone())
}
}
*/

View File

@@ -39,8 +39,8 @@ impl TimeBuffer {
let mut my_buffer = buffer
.iter()
.map(|v| {
let (_key, data, analysis) = &v.data;
let (_name, country) = get_asn_name_and_country(analysis.asn_id.0);
let (key, data, _analysis) = &v.data;
let (_name, country) = get_asn_name_and_country(key.remote_ip.as_ip());
let rtt = [
(data.last_rtt[0] / 1000000) as f32,
(data.last_rtt[1] / 1000000) as f32,

View File

@@ -1,7 +1,6 @@
use std::{net::IpAddr, sync::Mutex};
use lqos_sys::flowbee_data::FlowbeeKey;
use once_cell::sync::Lazy;
use self::asn::AsnTable;
mod asn;
mod protocol;
pub use protocol::FlowProtocol;
@@ -12,7 +11,7 @@ pub use finished_flows::FinishedFlowAnalysis;
static ANALYSIS: Lazy<FlowAnalysisSystem> = Lazy::new(|| FlowAnalysisSystem::new());
pub struct FlowAnalysisSystem {
asn_table: Mutex<Option<asn::AsnTable>>,
asn_table: Mutex<Option<asn::GeoTable>>,
}
impl FlowAnalysisSystem {
@@ -20,7 +19,7 @@ impl FlowAnalysisSystem {
// Periodically update the ASN table
std::thread::spawn(|| {
loop {
let result = AsnTable::new();
let result = asn::GeoTable::load();
match result {
Ok(table) => {
ANALYSIS.asn_table.lock().unwrap().replace(table);
@@ -66,35 +65,19 @@ impl FlowAnalysis {
pub fn lookup_asn_id(ip: IpAddr) -> Option<u32> {
let table_lock = ANALYSIS.asn_table.lock();
if table_lock.is_err() {
return None;
}
let table = table_lock.unwrap();
if table.is_none() {
return None;
}
let table = table.as_ref().unwrap();
if let Some(asn) = table.find_asn(ip) {
Some(asn.asn)
} else {
None
if let Ok(table_lock) = ANALYSIS.asn_table.lock() {
if let Some(table) = table_lock.as_ref() {
return table.find_asn(ip);
}
}
None
}
pub fn get_asn_name_and_country(asn: u32) -> (String, String) {
let table_lock = ANALYSIS.asn_table.lock();
if table_lock.is_err() {
return ("".to_string(), "".to_string());
}
let table = table_lock.unwrap();
if table.is_none() {
return ("".to_string(), "".to_string());
}
let table = table.as_ref().unwrap();
if let Some(row) = table.find_asn_by_id(asn) {
(row.owners.clone(), row.country.clone())
} else {
("".to_string(), "".to_string())
pub fn get_asn_name_and_country(ip: IpAddr) -> (String, String) {
if let Ok(table_lock) = ANALYSIS.asn_table.lock() {
if let Some(table) = table_lock.as_ref() {
return table.find_owners_by_ip(ip);
}
}
(String::new(), String::new())
}

View File

@@ -505,7 +505,7 @@ pub fn dump_active_flows() -> BusResponse {
let result: Vec<lqos_bus::FlowbeeData> = lock
.iter()
.map(|(key, row)| {
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(row.1.asn_id.0);
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip());
lqos_bus::FlowbeeData {
remote_ip: key.remote_ip.as_ip().to_string(),
@@ -589,7 +589,7 @@ pub fn top_flows(n: u32, flow_type: TopFlowType) -> BusResponse {
.iter()
.take(n as usize)
.map(|(ip, flow)| {
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(flow.1.asn_id.0);
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(ip.remote_ip.as_ip());
lqos_bus::FlowbeeData {
remote_ip: ip.remote_ip.as_ip().to_string(),
local_ip: ip.local_ip.as_ip().to_string(),
@@ -624,7 +624,7 @@ pub fn flows_by_ip(ip: &str) -> BusResponse {
.iter()
.filter(|(key, _)| key.local_ip == ip)
.map(|(key, row)| {
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(row.1.asn_id.0);
let (remote_asn_name, remote_asn_country) = get_asn_name_and_country(key.remote_ip.as_ip());
lqos_bus::FlowbeeData {
remote_ip: key.remote_ip.as_ip().to_string(),