Very work in progress, but the tree data is now being collected. It's not as efficient as I'd like yet, but the data is there. Let's see how it scales with testing.

This commit is contained in:
Herbert Wolverson 2023-04-28 14:04:44 +00:00
parent dfb7e0d342
commit 0c0a7d48ce
8 changed files with 218 additions and 35 deletions

View File

@ -1,9 +1,10 @@
use influxdb2::{Client, models::DataPoint};
use lqos_bus::long_term_stats::StatsHost;
use pgdb::OrganizationDetails;
use pgdb::{OrganizationDetails, sqlx::{Pool, Postgres}};
use futures::prelude::*;
pub async fn collect_per_host(
cnn: Pool<Postgres>,
org: &OrganizationDetails,
node_id: &str,
timestamp: i64,
@ -14,12 +15,16 @@ pub async fn collect_per_host(
let client = Client::new(&influx_url, &org.influx_org, &org.influx_token);
let mut points: Vec<DataPoint> = Vec::new();
for host in hosts {
let mut trans = cnn.begin().await?;
log::info!("Received per-host stats, {} hosts", hosts.len());
for host in hosts.iter().filter(|h| !h.device_id.is_empty()) {
points.push(DataPoint::builder("host_bits")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("direction", "down".to_string())
.tag("circuit_id", host.circuit_id.to_string())
.tag("device_id", host.device_id.to_string())
.tag("ip", host.ip_address.to_string())
.timestamp(timestamp)
.field("min", host.bits.min.0 as i64)
@ -31,6 +36,7 @@ pub async fn collect_per_host(
.tag("organization_id", org.key.to_string())
.tag("direction", "up".to_string())
.tag("circuit_id", host.circuit_id.to_string())
.tag("device_id", host.device_id.to_string())
.tag("ip", host.ip_address.to_string())
.timestamp(timestamp)
.field("min", host.bits.min.1 as i64)
@ -41,12 +47,40 @@ pub async fn collect_per_host(
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("circuit_id", host.circuit_id.to_string())
.tag("device_id", host.device_id.to_string())
.tag("ip", host.ip_address.to_string())
.timestamp(timestamp)
.field("min", host.rtt.avg as f64 / 100.0)
.field("max", host.rtt.max as f64 / 100.0)
.field("avg", host.rtt.avg as f64 / 100.0)
.build()?);
const SQL: &str = "INSERT INTO devices (key, host_id, circuit_id, device_id, circuit_name, device_name, parent_node, mac_address, ip_address) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (key, host_id, device_id) DO UPDATE SET circuit_id = $3, circuit_name = $5, device_name = $6, parent_node = $7, mac_address = $8, ip_address = $9;";
if !host.device_id.is_empty() {
log::info!("Submitting device");
let result = pgdb::sqlx::query(SQL)
.bind(org.key.to_string())
.bind(node_id)
.bind(&host.circuit_id)
.bind(&host.device_id)
.bind(&host.circuit_name)
.bind(&host.device_name)
.bind(&host.parent_node)
.bind(&host.mac)
.bind(&host.ip_address)
.execute(&mut trans)
.await;
if let Err(e) = result {
log::error!("Error inserting tree node: {}", e);
}
}
}
let result = trans.commit().await;
log::warn!("Transaction committed");
if let Err(e) = result {
log::error!("Error committing transaction: {}", e);
}
client

View File

@ -37,8 +37,8 @@ async fn ingest_stats(cnn: Pool<Postgres>, node_id: NodeIdAndLicense, stats: Sta
let _ = join!(
update_last_seen(cnn.clone(), &node_id),
collect_host_totals(&org, &node_id.node_id, ts, &stats.totals),
collect_per_host(&org, &node_id.node_id, ts, &stats.hosts),
collect_tree(&org, &node_id.node_id, ts, &stats.tree),
collect_per_host(cnn.clone(), &org, &node_id.node_id, ts, &stats.hosts),
collect_tree(cnn.clone(), &org, &node_id.node_id, ts, &stats.tree),
collect_node_perf(&org, &node_id.node_id, ts, &stats.cpu_usage, stats.ram_percent),
);
} else {

View File

@ -1,9 +1,15 @@
use influxdb2::{Client, models::DataPoint};
use lqos_bus::long_term_stats::StatsTreeNode;
use pgdb::OrganizationDetails;
use futures::prelude::*;
use influxdb2::{models::DataPoint, Client};
use lqos_bus::long_term_stats::StatsTreeNode;
use pgdb::{
sqlx::{Pool, Postgres},
OrganizationDetails,
};
const SQL: &str = "INSERT INTO site_tree (key, host_id, site_name, index, parent, site_type) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (key, host_id, site_name) DO UPDATE SET index = $4, parent = $5, site_type = $6 WHERE site_tree.key=$1 AND site_tree.host_id=$2 AND site_tree.site_name=$3";
pub async fn collect_tree(
cnn: Pool<Postgres>,
org: &OrganizationDetails,
node_id: &str,
timestamp: i64,
@ -15,23 +21,59 @@ pub async fn collect_tree(
let client = Client::new(&influx_url, &org.influx_org, &org.influx_token);
let mut points: Vec<DataPoint> = Vec::new();
for node in tree.iter() {
points.push(DataPoint::builder("tree")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.tag("direction", "down".to_string())
.timestamp(timestamp)
.field("bits", node.current_throughput.0 as i64)
.build()?);
points.push(DataPoint::builder("tree")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.tag("direction", "up".to_string())
.timestamp(timestamp)
.field("bits", node.current_throughput.1 as i64)
.build()?);
let mut trans = cnn.begin().await?;
for (i, node) in tree.iter().enumerate() {
points.push(
DataPoint::builder("tree")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.tag("direction", "down".to_string())
.timestamp(timestamp)
.field("bits", node.current_throughput.0 as i64)
.build()?,
);
points.push(
DataPoint::builder("tree")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.tag("direction", "up".to_string())
.timestamp(timestamp)
.field("bits", node.current_throughput.1 as i64)
.build()?,
);
points.push(
DataPoint::builder("tree")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.tag("node_name", node.name.to_string())
.timestamp(timestamp)
.field("rtt_min", node.rtt.0 as i64 / 100)
.field("rtt_max", node.rtt.1 as i64 / 100)
.field("rtt_avg", node.rtt.2 as i64 / 100)
.build()?,
);
let result = pgdb::sqlx::query(SQL)
.bind(org.key.to_string())
.bind(node_id)
.bind(&node.name)
.bind(i as i32)
.bind(node.immediate_parent.unwrap_or(0) as i32)
.bind(node.node_type.as_ref().unwrap_or(&String::new()).clone())
.execute(&mut trans)
.await;
if let Err(e) = result {
log::error!("Error inserting tree node: {}", e);
}
}
let result = trans.commit().await;
log::info!("Transaction committed");
if let Err(e) = result {
log::error!("Error committing transaction: {}", e);
}
client
@ -43,4 +85,5 @@ pub async fn collect_tree(
.await?;
}
Ok(())
}
}

View File

@ -21,6 +21,31 @@ CREATE TABLE public.shaper_nodes (
public_key bytea
);
CREATE TABLE public.site_tree
(
key character varying(254) NOT NULL,
site_name character varying(254) NOT NULL,
host_id character varying(254) NOT NULL,
index integer NOT NULL,
parent integer NOT NULL,
site_type character varying(32),
PRIMARY KEY (key, site_name, host_id)
);
CREATE TABLE public.devices
(
key character varying(254) NOT NULL,
host_id character varying(254) NOT NULL,
device_id character varying(254) NOT NULL,
circuit_id character varying(254) NOT NULL,
parent_node character varying(254) NOT NULL,
circuit_name character varying(254) NOT NULL,
device_name character varying(254) NOT NULL,
mac_address character(20) NOT NULL DEFAULT '',
ip_address character varying(128) NOT NULL DEFAULT '',
PRIMARY KEY (key, host_id, device_id)
);
CREATE TABLE public.stats_hosts (
id integer NOT NULL,
ip_address character varying(128) NOT NULL,

View File

@ -47,8 +47,18 @@ pub struct StatsTotals {
pub struct StatsHost {
/// Host circuit_id as it appears in ShapedDevices.csv
pub circuit_id: String,
/// Device id as it appears in ShapedDevices.csv
pub device_id: String,
/// Parent node (hopefully in network.json!)
pub parent_node: String,
/// Device name as it appears in ShapedDevices.csv
pub device_name: String,
/// Circuit name as it appears in ShapedDevices.csv
pub circuit_name: String,
/// Host's IP address
pub ip_address: String,
/// Host's MAC address
pub mac: String,
/// Host's traffic statistics
pub bits: StatsSummary,
/// Host's RTT statistics
@ -66,6 +76,8 @@ pub struct StatsTreeNode {
pub max_throughput: (u32, u32),
/// Current throughput (from network.json)
pub current_throughput: (u32, u32),
/// RTT summaries
pub rtt: (u16, u16, u16),
/// Indices of parents in the tree
pub parents: Vec<usize>,
/// Index of immediate parent in the tree

View File

@ -27,6 +27,11 @@ pub(crate) struct SubmissionHost {
pub(crate) bits_per_second: MinMaxAvgPair<u64>,
pub(crate) median_rtt: MinMaxAvg<u32>,
pub(crate) tree_parent_indices: Vec<usize>,
pub(crate) device_id: String,
pub(crate) parent_node: String,
pub(crate) device_name: String,
pub(crate) circuit_name: String,
pub(crate) mac: String,
}
static SYS: Lazy<Mutex<System>> = Lazy::new(|| Mutex::new(System::new_all()));
@ -37,12 +42,13 @@ fn get_cpu_ram() -> (Vec<u32>, u32) {
lock.refresh_cpu();
lock.refresh_memory();
let cpus: Vec<u32> = lock.cpus()
let cpus: Vec<u32> = lock
.cpus()
.iter()
.map(|cpu| cpu.cpu_usage() as u32) // Always rounds down
.collect();
let memory = (lock.used_memory() as f32 / lock.total_memory() as f32) * 100.0;
let memory = (lock.used_memory() as f32 / lock.total_memory() as f32) * 100.0;
//println!("cpu: {:?}, ram: {}", cpus, memory);
@ -72,6 +78,7 @@ impl From<NetworkTreeEntry> for lqos_bus::long_term_stats::StatsTreeNode {
parents: value.parents,
immediate_parent: value.immediate_parent,
node_type: value.node_type,
rtt: value.rtts,
}
}
}
@ -114,6 +121,11 @@ impl From<SubmissionHost> for lqos_bus::long_term_stats::StatsHost {
bits: value.bits_per_second.into(),
rtt: value.median_rtt.into(),
tree_indices: value.tree_parent_indices,
device_id: value.device_id,
parent_node: value.parent_node,
circuit_name: value.circuit_name,
device_name: value.device_name,
mac: value.mac,
}
}
}
@ -151,10 +163,12 @@ pub(crate) async fn collate_stats() {
// Collate host stats
let mut host_accumulator =
HashMap::<(&IpAddr, &String), Vec<(u64, u64, f32, Vec<usize>)>>::new();
HashMap::<(&IpAddr, &String, &String, &String, &String, &String, &String), Vec<(u64, u64, f32, Vec<usize>)>>::new();
writer.iter().for_each(|session| {
session.hosts.iter().for_each(|host| {
if let Some(ha) = host_accumulator.get_mut(&(&host.ip_address, &host.circuit_id)) {
if let Some(ha) = host_accumulator.get_mut(&(
&host.ip_address, &host.circuit_id, &host.device_id, &host.parent_node, &host.circuit_name, &host.device_name, &host.mac)
) {
ha.push((
host.bits_per_second.0,
host.bits_per_second.1,
@ -163,7 +177,7 @@ pub(crate) async fn collate_stats() {
));
} else {
host_accumulator.insert(
(&host.ip_address, &host.circuit_id),
(&host.ip_address, &host.circuit_id, &host.device_id, &host.parent_node, &host.circuit_name, &host.device_name, &host.mac),
vec![(
host.bits_per_second.0,
host.bits_per_second.1,
@ -175,7 +189,7 @@ pub(crate) async fn collate_stats() {
});
});
for ((ip, circuit), data) in host_accumulator.iter() {
for ((ip, circuit, device_id, parent_node, circuit_name, device_name, mac), data) in host_accumulator.iter() {
let bps: Vec<(u64, u64)> = data.iter().map(|(d, u, _rtt, _tree)| (*d, *u)).collect();
let bps = MinMaxAvgPair::<u64>::from_slice(&bps);
let fps: Vec<u32> = data
@ -189,12 +203,18 @@ pub(crate) async fn collate_stats() {
.map(|(_d, _u, _rtt, tree)| tree)
.next()
.unwrap_or(Vec::new());
submission.hosts.push(SubmissionHost {
circuit_id: circuit.to_string(),
ip_address: **ip,
bits_per_second: bps,
median_rtt: fps,
tree_parent_indices: tree,
device_id: device_id.to_string(),
parent_node: parent_node.to_string(),
circuit_name: circuit_name.to_string(),
device_name: device_name.to_string(),
mac: mac.to_string(),
});
}

View File

@ -1,4 +1,4 @@
use crate::throughput_tracker::THROUGHPUT_TRACKER;
use crate::{throughput_tracker::THROUGHPUT_TRACKER, shaped_devices_tracker::SHAPED_DEVICES};
use once_cell::sync::Lazy;
use tokio::sync::Mutex;
use std::{
@ -21,6 +21,11 @@ pub(crate) struct SessionHost {
pub(crate) bits_per_second: (u64, u64),
pub(crate) median_rtt: f32,
pub(crate) tree_parent_indices: Vec<usize>,
pub(crate) device_id: String,
pub(crate) parent_node: String,
pub(crate) circuit_name: String,
pub(crate) device_name: String,
pub(crate) mac: String,
}
pub(crate) static SESSION_BUFFER: Lazy<Mutex<Vec<StatsSession>>> =
@ -53,21 +58,47 @@ pub(crate) async fn gather_throughput_stats() {
shaped_bits_per_second,
packets_per_second,
hosts: Vec::with_capacity(THROUGHPUT_TRACKER.raw_data.len()),
};
};
THROUGHPUT_TRACKER
.raw_data
.iter()
.filter(|t| t.circuit_id.is_some())
.for_each(|tp| {
let shaped_devices = SHAPED_DEVICES.read().unwrap();
let mut circuit_id = String::new();
let mut device_id = tp.key().as_ip().to_string();
let mut parent_node = String::new();
let mut circuit_name = String::new();
let mut device_name = String::new();
let mut mac = String::new();
let ip = tp.key().as_ip();
let lookup = match ip {
IpAddr::V4(ip) => ip.to_ipv6_mapped(),
IpAddr::V6(ip) => ip,
};
if let Some((_, index)) = shaped_devices.trie.longest_match(lookup) {
let shaped_device = &shaped_devices.devices[*index];
circuit_id = shaped_device.circuit_id.clone();
device_id = shaped_device.device_id.clone();
parent_node = shaped_device.parent_node.clone();
circuit_name = shaped_device.circuit_name.clone();
device_name = shaped_device.device_name.clone();
mac = shaped_device.mac.clone();
}
let bytes_per_second = tp.bytes_per_second;
let bits_per_second = (bytes_per_second.0 * 8, bytes_per_second.1 * 8);
session.hosts.push(SessionHost {
circuit_id: tp.circuit_id.as_ref().unwrap().clone(),
circuit_id,
ip_address: tp.key().as_ip(),
bits_per_second,
median_rtt: tp.median_latency(),
tree_parent_indices: tp.network_json_parents.clone().unwrap_or(Vec::new()),
device_id,
parent_node,
circuit_name,
device_name,
mac,
});
});

View File

@ -8,6 +8,7 @@ pub(crate) struct NetworkTreeEntry {
pub(crate) name: String,
pub(crate) max_throughput: (u32, u32),
pub(crate) current_throughput: (u32, u32),
pub(crate) rtts: (u16, u16, u16),
pub(crate) parents: Vec<usize>,
pub(crate) immediate_parent: Option<usize>,
pub(crate) node_type: Option<String>,
@ -15,6 +16,21 @@ pub(crate) struct NetworkTreeEntry {
impl From<&NetworkJsonNode> for NetworkTreeEntry {
fn from(value: &NetworkJsonNode) -> Self {
let mut max = 0;
let mut min = if value.rtts.is_empty() {
0
} else {
u16::MAX
};
let mut sum = 0;
for n in value.rtts.iter() {
let n = *n;
sum += n;
if n < min { min = n; }
if n > max { max = n; }
}
let avg = sum.checked_div(value.rtts.len() as u16).unwrap_or(0);
Self {
name: value.name.clone(),
max_throughput: value.max_throughput,
@ -25,6 +41,7 @@ impl From<&NetworkJsonNode> for NetworkTreeEntry {
value.current_throughput.1.load(std::sync::atomic::Ordering::Relaxed) as u32,
),
node_type: value.node_type.clone(),
rtts: (min, max, avg),
}
}
}
@ -38,6 +55,7 @@ impl From<&NetworkTreeEntry> for StatsTreeNode {
parents: value.parents.clone(),
immediate_parent: value.immediate_parent,
node_type: value.node_type.clone(),
rtt: value.rtts,
}
}
}