diff --git a/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/per_host.rs b/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/per_host.rs index 58f29454..2f341299 100644 --- a/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/per_host.rs +++ b/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/per_host.rs @@ -1,9 +1,10 @@ use influxdb2::{Client, models::DataPoint}; use lqos_bus::long_term_stats::StatsHost; -use pgdb::OrganizationDetails; +use pgdb::{OrganizationDetails, sqlx::{Pool, Postgres}}; use futures::prelude::*; pub async fn collect_per_host( + cnn: Pool, org: &OrganizationDetails, node_id: &str, timestamp: i64, @@ -14,12 +15,16 @@ pub async fn collect_per_host( let client = Client::new(&influx_url, &org.influx_org, &org.influx_token); let mut points: Vec = Vec::new(); - for host in hosts { + let mut trans = cnn.begin().await?; + log::info!("Received per-host stats, {} hosts", hosts.len()); + + for host in hosts.iter().filter(|h| !h.device_id.is_empty()) { points.push(DataPoint::builder("host_bits") .tag("host_id", node_id.to_string()) .tag("organization_id", org.key.to_string()) .tag("direction", "down".to_string()) .tag("circuit_id", host.circuit_id.to_string()) + .tag("device_id", host.device_id.to_string()) .tag("ip", host.ip_address.to_string()) .timestamp(timestamp) .field("min", host.bits.min.0 as i64) @@ -31,6 +36,7 @@ pub async fn collect_per_host( .tag("organization_id", org.key.to_string()) .tag("direction", "up".to_string()) .tag("circuit_id", host.circuit_id.to_string()) + .tag("device_id", host.device_id.to_string()) .tag("ip", host.ip_address.to_string()) .timestamp(timestamp) .field("min", host.bits.min.1 as i64) @@ -41,12 +47,40 @@ pub async fn collect_per_host( .tag("host_id", node_id.to_string()) .tag("organization_id", org.key.to_string()) .tag("circuit_id", host.circuit_id.to_string()) + .tag("device_id", host.device_id.to_string()) .tag("ip", host.ip_address.to_string()) .timestamp(timestamp) .field("min", host.rtt.avg as f64 / 100.0) .field("max", host.rtt.max as f64 / 100.0) .field("avg", host.rtt.avg as f64 / 100.0) .build()?); + + const SQL: &str = "INSERT INTO devices (key, host_id, circuit_id, device_id, circuit_name, device_name, parent_node, mac_address, ip_address) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (key, host_id, device_id) DO UPDATE SET circuit_id = $3, circuit_name = $5, device_name = $6, parent_node = $7, mac_address = $8, ip_address = $9;"; + + if !host.device_id.is_empty() { + log::info!("Submitting device"); + let result = pgdb::sqlx::query(SQL) + .bind(org.key.to_string()) + .bind(node_id) + .bind(&host.circuit_id) + .bind(&host.device_id) + .bind(&host.circuit_name) + .bind(&host.device_name) + .bind(&host.parent_node) + .bind(&host.mac) + .bind(&host.ip_address) + .execute(&mut trans) + .await; + if let Err(e) = result { + log::error!("Error inserting tree node: {}", e); + } + } + } + + let result = trans.commit().await; + log::warn!("Transaction committed"); + if let Err(e) = result { + log::error!("Error committing transaction: {}", e); } client diff --git a/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/queue.rs b/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/queue.rs index e0963820..cdf2df3d 100644 --- a/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/queue.rs +++ b/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/queue.rs @@ -37,8 +37,8 @@ async fn ingest_stats(cnn: Pool, node_id: NodeIdAndLicense, stats: Sta let _ = join!( update_last_seen(cnn.clone(), &node_id), collect_host_totals(&org, &node_id.node_id, ts, &stats.totals), - collect_per_host(&org, &node_id.node_id, ts, &stats.hosts), - collect_tree(&org, &node_id.node_id, ts, &stats.tree), + collect_per_host(cnn.clone(), &org, &node_id.node_id, ts, &stats.hosts), + collect_tree(cnn.clone(), &org, &node_id.node_id, ts, &stats.tree), collect_node_perf(&org, &node_id.node_id, ts, &stats.cpu_usage, stats.ram_percent), ); } else { diff --git a/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/tree.rs b/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/tree.rs index 289fab2a..25f04e3f 100644 --- a/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/tree.rs +++ b/src/rust/long_term_stats/lts_node/src/submissions/submission_queue/tree.rs @@ -1,9 +1,15 @@ -use influxdb2::{Client, models::DataPoint}; -use lqos_bus::long_term_stats::StatsTreeNode; -use pgdb::OrganizationDetails; use futures::prelude::*; +use influxdb2::{models::DataPoint, Client}; +use lqos_bus::long_term_stats::StatsTreeNode; +use pgdb::{ + sqlx::{Pool, Postgres}, + OrganizationDetails, +}; + +const SQL: &str = "INSERT INTO site_tree (key, host_id, site_name, index, parent, site_type) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (key, host_id, site_name) DO UPDATE SET index = $4, parent = $5, site_type = $6 WHERE site_tree.key=$1 AND site_tree.host_id=$2 AND site_tree.site_name=$3"; pub async fn collect_tree( + cnn: Pool, org: &OrganizationDetails, node_id: &str, timestamp: i64, @@ -15,23 +21,59 @@ pub async fn collect_tree( let client = Client::new(&influx_url, &org.influx_org, &org.influx_token); let mut points: Vec = Vec::new(); - for node in tree.iter() { - points.push(DataPoint::builder("tree") - .tag("host_id", node_id.to_string()) - .tag("organization_id", org.key.to_string()) - .tag("node_name", node.name.to_string()) - .tag("direction", "down".to_string()) - .timestamp(timestamp) - .field("bits", node.current_throughput.0 as i64) - .build()?); - points.push(DataPoint::builder("tree") - .tag("host_id", node_id.to_string()) - .tag("organization_id", org.key.to_string()) - .tag("node_name", node.name.to_string()) - .tag("direction", "up".to_string()) - .timestamp(timestamp) - .field("bits", node.current_throughput.1 as i64) - .build()?); + let mut trans = cnn.begin().await?; + + for (i, node) in tree.iter().enumerate() { + points.push( + DataPoint::builder("tree") + .tag("host_id", node_id.to_string()) + .tag("organization_id", org.key.to_string()) + .tag("node_name", node.name.to_string()) + .tag("direction", "down".to_string()) + .timestamp(timestamp) + .field("bits", node.current_throughput.0 as i64) + .build()?, + ); + points.push( + DataPoint::builder("tree") + .tag("host_id", node_id.to_string()) + .tag("organization_id", org.key.to_string()) + .tag("node_name", node.name.to_string()) + .tag("direction", "up".to_string()) + .timestamp(timestamp) + .field("bits", node.current_throughput.1 as i64) + .build()?, + ); + points.push( + DataPoint::builder("tree") + .tag("host_id", node_id.to_string()) + .tag("organization_id", org.key.to_string()) + .tag("node_name", node.name.to_string()) + .timestamp(timestamp) + .field("rtt_min", node.rtt.0 as i64 / 100) + .field("rtt_max", node.rtt.1 as i64 / 100) + .field("rtt_avg", node.rtt.2 as i64 / 100) + .build()?, + ); + + let result = pgdb::sqlx::query(SQL) + .bind(org.key.to_string()) + .bind(node_id) + .bind(&node.name) + .bind(i as i32) + .bind(node.immediate_parent.unwrap_or(0) as i32) + .bind(node.node_type.as_ref().unwrap_or(&String::new()).clone()) + .execute(&mut trans) + .await; + if let Err(e) = result { + log::error!("Error inserting tree node: {}", e); + } + } + + let result = trans.commit().await; + log::info!("Transaction committed"); + if let Err(e) = result { + log::error!("Error committing transaction: {}", e); } client @@ -43,4 +85,5 @@ pub async fn collect_tree( .await?; } Ok(()) -} \ No newline at end of file +} + diff --git a/src/rust/long_term_stats/pgdb/migrations/20230420154634_initial-tables.sql b/src/rust/long_term_stats/pgdb/migrations/20230420154634_initial-tables.sql index 57f236f3..4cd2e737 100644 --- a/src/rust/long_term_stats/pgdb/migrations/20230420154634_initial-tables.sql +++ b/src/rust/long_term_stats/pgdb/migrations/20230420154634_initial-tables.sql @@ -21,6 +21,31 @@ CREATE TABLE public.shaper_nodes ( public_key bytea ); +CREATE TABLE public.site_tree +( + key character varying(254) NOT NULL, + site_name character varying(254) NOT NULL, + host_id character varying(254) NOT NULL, + index integer NOT NULL, + parent integer NOT NULL, + site_type character varying(32), + PRIMARY KEY (key, site_name, host_id) +); + +CREATE TABLE public.devices +( + key character varying(254) NOT NULL, + host_id character varying(254) NOT NULL, + device_id character varying(254) NOT NULL, + circuit_id character varying(254) NOT NULL, + parent_node character varying(254) NOT NULL, + circuit_name character varying(254) NOT NULL, + device_name character varying(254) NOT NULL, + mac_address character(20) NOT NULL DEFAULT '', + ip_address character varying(128) NOT NULL DEFAULT '', + PRIMARY KEY (key, host_id, device_id) +); + CREATE TABLE public.stats_hosts ( id integer NOT NULL, ip_address character varying(128) NOT NULL, diff --git a/src/rust/lqos_bus/src/long_term_stats.rs b/src/rust/lqos_bus/src/long_term_stats.rs index 181bcf08..7168cf1a 100644 --- a/src/rust/lqos_bus/src/long_term_stats.rs +++ b/src/rust/lqos_bus/src/long_term_stats.rs @@ -47,8 +47,18 @@ pub struct StatsTotals { pub struct StatsHost { /// Host circuit_id as it appears in ShapedDevices.csv pub circuit_id: String, + /// Device id as it appears in ShapedDevices.csv + pub device_id: String, + /// Parent node (hopefully in network.json!) + pub parent_node: String, + /// Device name as it appears in ShapedDevices.csv + pub device_name: String, + /// Circuit name as it appears in ShapedDevices.csv + pub circuit_name: String, /// Host's IP address pub ip_address: String, + /// Host's MAC address + pub mac: String, /// Host's traffic statistics pub bits: StatsSummary, /// Host's RTT statistics @@ -66,6 +76,8 @@ pub struct StatsTreeNode { pub max_throughput: (u32, u32), /// Current throughput (from network.json) pub current_throughput: (u32, u32), + /// RTT summaries + pub rtt: (u16, u16, u16), /// Indices of parents in the tree pub parents: Vec, /// Index of immediate parent in the tree diff --git a/src/rust/lqosd/src/long_term_stats/collator.rs b/src/rust/lqosd/src/long_term_stats/collator.rs index a62700f7..5e832b2e 100644 --- a/src/rust/lqosd/src/long_term_stats/collator.rs +++ b/src/rust/lqosd/src/long_term_stats/collator.rs @@ -27,6 +27,11 @@ pub(crate) struct SubmissionHost { pub(crate) bits_per_second: MinMaxAvgPair, pub(crate) median_rtt: MinMaxAvg, pub(crate) tree_parent_indices: Vec, + pub(crate) device_id: String, + pub(crate) parent_node: String, + pub(crate) device_name: String, + pub(crate) circuit_name: String, + pub(crate) mac: String, } static SYS: Lazy> = Lazy::new(|| Mutex::new(System::new_all())); @@ -37,12 +42,13 @@ fn get_cpu_ram() -> (Vec, u32) { lock.refresh_cpu(); lock.refresh_memory(); - let cpus: Vec = lock.cpus() + let cpus: Vec = lock + .cpus() .iter() .map(|cpu| cpu.cpu_usage() as u32) // Always rounds down .collect(); - let memory = (lock.used_memory() as f32 / lock.total_memory() as f32) * 100.0; + let memory = (lock.used_memory() as f32 / lock.total_memory() as f32) * 100.0; //println!("cpu: {:?}, ram: {}", cpus, memory); @@ -72,6 +78,7 @@ impl From for lqos_bus::long_term_stats::StatsTreeNode { parents: value.parents, immediate_parent: value.immediate_parent, node_type: value.node_type, + rtt: value.rtts, } } } @@ -114,6 +121,11 @@ impl From for lqos_bus::long_term_stats::StatsHost { bits: value.bits_per_second.into(), rtt: value.median_rtt.into(), tree_indices: value.tree_parent_indices, + device_id: value.device_id, + parent_node: value.parent_node, + circuit_name: value.circuit_name, + device_name: value.device_name, + mac: value.mac, } } } @@ -151,10 +163,12 @@ pub(crate) async fn collate_stats() { // Collate host stats let mut host_accumulator = - HashMap::<(&IpAddr, &String), Vec<(u64, u64, f32, Vec)>>::new(); + HashMap::<(&IpAddr, &String, &String, &String, &String, &String, &String), Vec<(u64, u64, f32, Vec)>>::new(); writer.iter().for_each(|session| { session.hosts.iter().for_each(|host| { - if let Some(ha) = host_accumulator.get_mut(&(&host.ip_address, &host.circuit_id)) { + if let Some(ha) = host_accumulator.get_mut(&( + &host.ip_address, &host.circuit_id, &host.device_id, &host.parent_node, &host.circuit_name, &host.device_name, &host.mac) + ) { ha.push(( host.bits_per_second.0, host.bits_per_second.1, @@ -163,7 +177,7 @@ pub(crate) async fn collate_stats() { )); } else { host_accumulator.insert( - (&host.ip_address, &host.circuit_id), + (&host.ip_address, &host.circuit_id, &host.device_id, &host.parent_node, &host.circuit_name, &host.device_name, &host.mac), vec![( host.bits_per_second.0, host.bits_per_second.1, @@ -175,7 +189,7 @@ pub(crate) async fn collate_stats() { }); }); - for ((ip, circuit), data) in host_accumulator.iter() { + for ((ip, circuit, device_id, parent_node, circuit_name, device_name, mac), data) in host_accumulator.iter() { let bps: Vec<(u64, u64)> = data.iter().map(|(d, u, _rtt, _tree)| (*d, *u)).collect(); let bps = MinMaxAvgPair::::from_slice(&bps); let fps: Vec = data @@ -189,12 +203,18 @@ pub(crate) async fn collate_stats() { .map(|(_d, _u, _rtt, tree)| tree) .next() .unwrap_or(Vec::new()); + submission.hosts.push(SubmissionHost { circuit_id: circuit.to_string(), ip_address: **ip, bits_per_second: bps, median_rtt: fps, tree_parent_indices: tree, + device_id: device_id.to_string(), + parent_node: parent_node.to_string(), + circuit_name: circuit_name.to_string(), + device_name: device_name.to_string(), + mac: mac.to_string(), }); } diff --git a/src/rust/lqosd/src/long_term_stats/data_collector.rs b/src/rust/lqosd/src/long_term_stats/data_collector.rs index 15941c24..13e7b3cb 100644 --- a/src/rust/lqosd/src/long_term_stats/data_collector.rs +++ b/src/rust/lqosd/src/long_term_stats/data_collector.rs @@ -1,4 +1,4 @@ -use crate::throughput_tracker::THROUGHPUT_TRACKER; +use crate::{throughput_tracker::THROUGHPUT_TRACKER, shaped_devices_tracker::SHAPED_DEVICES}; use once_cell::sync::Lazy; use tokio::sync::Mutex; use std::{ @@ -21,6 +21,11 @@ pub(crate) struct SessionHost { pub(crate) bits_per_second: (u64, u64), pub(crate) median_rtt: f32, pub(crate) tree_parent_indices: Vec, + pub(crate) device_id: String, + pub(crate) parent_node: String, + pub(crate) circuit_name: String, + pub(crate) device_name: String, + pub(crate) mac: String, } pub(crate) static SESSION_BUFFER: Lazy>> = @@ -53,21 +58,47 @@ pub(crate) async fn gather_throughput_stats() { shaped_bits_per_second, packets_per_second, hosts: Vec::with_capacity(THROUGHPUT_TRACKER.raw_data.len()), - }; + }; THROUGHPUT_TRACKER .raw_data .iter() - .filter(|t| t.circuit_id.is_some()) .for_each(|tp| { + let shaped_devices = SHAPED_DEVICES.read().unwrap(); + let mut circuit_id = String::new(); + let mut device_id = tp.key().as_ip().to_string(); + let mut parent_node = String::new(); + let mut circuit_name = String::new(); + let mut device_name = String::new(); + let mut mac = String::new(); + let ip = tp.key().as_ip(); + let lookup = match ip { + IpAddr::V4(ip) => ip.to_ipv6_mapped(), + IpAddr::V6(ip) => ip, + }; + if let Some((_, index)) = shaped_devices.trie.longest_match(lookup) { + let shaped_device = &shaped_devices.devices[*index]; + circuit_id = shaped_device.circuit_id.clone(); + device_id = shaped_device.device_id.clone(); + parent_node = shaped_device.parent_node.clone(); + circuit_name = shaped_device.circuit_name.clone(); + device_name = shaped_device.device_name.clone(); + mac = shaped_device.mac.clone(); + } + let bytes_per_second = tp.bytes_per_second; let bits_per_second = (bytes_per_second.0 * 8, bytes_per_second.1 * 8); session.hosts.push(SessionHost { - circuit_id: tp.circuit_id.as_ref().unwrap().clone(), + circuit_id, ip_address: tp.key().as_ip(), bits_per_second, median_rtt: tp.median_latency(), tree_parent_indices: tp.network_json_parents.clone().unwrap_or(Vec::new()), + device_id, + parent_node, + circuit_name, + device_name, + mac, }); }); diff --git a/src/rust/lqosd/src/long_term_stats/tree.rs b/src/rust/lqosd/src/long_term_stats/tree.rs index 14cb4e4b..98545885 100644 --- a/src/rust/lqosd/src/long_term_stats/tree.rs +++ b/src/rust/lqosd/src/long_term_stats/tree.rs @@ -8,6 +8,7 @@ pub(crate) struct NetworkTreeEntry { pub(crate) name: String, pub(crate) max_throughput: (u32, u32), pub(crate) current_throughput: (u32, u32), + pub(crate) rtts: (u16, u16, u16), pub(crate) parents: Vec, pub(crate) immediate_parent: Option, pub(crate) node_type: Option, @@ -15,6 +16,21 @@ pub(crate) struct NetworkTreeEntry { impl From<&NetworkJsonNode> for NetworkTreeEntry { fn from(value: &NetworkJsonNode) -> Self { + let mut max = 0; + let mut min = if value.rtts.is_empty() { + 0 + } else { + u16::MAX + }; + let mut sum = 0; + for n in value.rtts.iter() { + let n = *n; + sum += n; + if n < min { min = n; } + if n > max { max = n; } + } + let avg = sum.checked_div(value.rtts.len() as u16).unwrap_or(0); + Self { name: value.name.clone(), max_throughput: value.max_throughput, @@ -25,6 +41,7 @@ impl From<&NetworkJsonNode> for NetworkTreeEntry { value.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed) as u32, ), node_type: value.node_type.clone(), + rtts: (min, max, avg), } } } @@ -38,6 +55,7 @@ impl From<&NetworkTreeEntry> for StatsTreeNode { parents: value.parents.clone(), immediate_parent: value.immediate_parent, node_type: value.node_type.clone(), + rtt: value.rtts, } } }