Add node CPU/RAM usage

This commit is contained in:
Herbert Wolverson 2023-04-27 17:23:50 +00:00
parent 2d11ce87b7
commit dfb7e0d342
17 changed files with 435 additions and 14 deletions

1
src/rust/Cargo.lock generated
View File

@ -2169,7 +2169,6 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"tokio-tungstenite",
"tower",
"tower-http",
"tracing",

View File

@ -20,7 +20,6 @@ influxdb2 = "0"
influxdb2-structmap = "0"
num-traits = "0"
futures = "0"
tokio-tungstenite = "0.18"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tower = { version = "0.4", features = ["util"] }

View File

@ -3,5 +3,6 @@ mod host_totals;
mod organization_cache;
mod per_host;
mod tree;
mod node_perf;
pub use queue::{submissions_queue, SubmissionType};
pub use organization_cache::get_org_details;

View File

@ -0,0 +1,35 @@
use influxdb2::{Client, models::DataPoint};
use pgdb::OrganizationDetails;
use futures::prelude::*;
pub async fn collect_node_perf(
org: &OrganizationDetails,
node_id: &str,
timestamp: i64,
cpu: &[u32],
ram: u32,
) -> anyhow::Result<()> {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(&influx_url, &org.influx_org, &org.influx_token);
let cpu_sum = cpu.iter().sum::<u32>();
let cpu_avg = cpu_sum / cpu.len() as u32;
let cpu_max = *cpu.iter().max().unwrap();
let points = vec![
DataPoint::builder("perf")
.tag("host_id", node_id.to_string())
.tag("organization_id", org.key.to_string())
.timestamp(timestamp)
.field("ram", ram as i64)
.field("cpu", cpu_avg as i64)
.field("cpu_max", cpu_max as i64)
.build()?,
];
client
.write_with_precision(
&org.influx_bucket,
stream::iter(points),
influxdb2::api::write::TimestampPrecision::Seconds,
)
.await?;
Ok(())
}

View File

@ -7,7 +7,7 @@
use lqos_bus::long_term_stats::{NodeIdAndLicense, StatsSubmission};
use pgdb::sqlx::{Pool, Postgres};
use tokio::{sync::mpsc::{Sender, Receiver}, join};
use crate::submissions::submission_queue::{host_totals::collect_host_totals, organization_cache::get_org_details, per_host::collect_per_host, tree::collect_tree};
use crate::submissions::submission_queue::{host_totals::collect_host_totals, organization_cache::get_org_details, per_host::collect_per_host, tree::collect_tree, node_perf::collect_node_perf};
const SUBMISSION_QUEUE_SIZE: usize = 100;
pub type SubmissionType = (NodeIdAndLicense, StatsSubmission);
@ -39,6 +39,7 @@ async fn ingest_stats(cnn: Pool<Postgres>, node_id: NodeIdAndLicense, stats: Sta
collect_host_totals(&org, &node_id.node_id, ts, &stats.totals),
collect_per_host(&org, &node_id.node_id, ts, &stats.hosts),
collect_tree(&org, &node_id.node_id, ts, &stats.tree),
collect_node_perf(&org, &node_id.node_id, ts, &stats.cpu_usage, stats.ram_percent),
);
} else {
log::warn!("Unable to find organization for license {}", node_id.license_key);

View File

@ -1,5 +1,5 @@
use crate::web::wss::queries::{
send_packets_for_all_nodes, send_rtt_for_all_nodes, send_throughput_for_all_nodes, send_packets_for_node, send_throughput_for_node, send_rtt_for_node,
send_packets_for_all_nodes, send_rtt_for_all_nodes, send_throughput_for_all_nodes, send_packets_for_node, send_throughput_for_node, send_rtt_for_node, send_perf_for_node,
};
use axum::{
extract::{
@ -154,6 +154,21 @@ async fn handle_socket(mut socket: WebSocket, cnn: Pool<Postgres>) {
log::info!("Throughput requested but no credentials provided");
}
}
"nodePerf" => {
if let Some(credentials) = &credentials {
let _ = send_perf_for_node(
cnn.clone(),
&mut socket,
&credentials.license_key,
period,
json.get("node_id").unwrap().as_str().unwrap().to_string(),
json.get("node_name").unwrap().as_str().unwrap().to_string(),
)
.await;
} else {
log::info!("Throughput requested but no credentials provided");
}
}
_ => {
log::warn!("Unknown message type: {msg_type}");
}

View File

@ -4,7 +4,9 @@
mod packet_counts;
mod throughput;
mod rtt;
mod node_perf;
pub mod time_period;
pub use packet_counts::{ send_packets_for_all_nodes, send_packets_for_node };
pub use throughput::{ send_throughput_for_all_nodes, send_throughput_for_node };
pub use rtt::{ send_rtt_for_all_nodes, send_rtt_for_node };
pub use node_perf::send_perf_for_node;

View File

@ -0,0 +1,124 @@
use axum::extract::ws::{WebSocket, Message};
use chrono::{DateTime, FixedOffset, Utc};
use influxdb2::{Client, FromDataPoint, models::Query};
use pgdb::sqlx::{Pool, Postgres};
use serde::Serialize;
use crate::submissions::get_org_details;
use super::time_period::InfluxTimePeriod;
#[derive(Debug, FromDataPoint)]
pub struct PerfRow {
pub host_id: String,
pub time: DateTime<FixedOffset>,
pub cpu: f64,
pub cpu_max: f64,
pub ram: f64,
}
impl Default for PerfRow {
fn default() -> Self {
Self {
host_id: "".to_string(),
time: DateTime::<Utc>::MIN_UTC.into(),
cpu: 0.0,
cpu_max: 0.0,
ram: 0.0,
}
}
}
#[derive(Serialize, Debug)]
pub struct PerfHost {
pub node_id: String,
pub node_name: String,
pub stats: Vec<Perf>,
}
#[derive(Serialize, Debug)]
pub struct Perf {
pub date: String,
pub cpu: f64,
pub cpu_max: f64,
pub ram: f64,
}
#[derive(Serialize, Debug)]
pub struct PacketChart {
pub msg: String,
pub nodes: Vec<PerfHost>,
}
pub async fn send_perf_for_node(
cnn: Pool<Postgres>,
socket: &mut WebSocket,
key: &str,
period: InfluxTimePeriod,
node_id: String,
node_name: String,
) -> anyhow::Result<()> {
let node = get_perf_for_node(cnn, key, node_id, node_name, period).await?;
let chart = PacketChart {
msg: "nodePerfChart".to_string(),
nodes: vec![node],
};
let json = serde_json::to_string(&chart).unwrap();
socket.send(Message::Text(json)).await.unwrap();
Ok(())
}
pub async fn get_perf_for_node(
cnn: Pool<Postgres>,
key: &str,
node_id: String,
node_name: String,
period: InfluxTimePeriod,
) -> anyhow::Result<PerfHost> {
if let Some(org) = get_org_details(cnn, key).await {
let influx_url = format!("http://{}:8086", org.influx_host);
let client = Client::new(influx_url, &org.influx_org, &org.influx_token);
let qs = format!(
"from(bucket: \"{}\")
|> {}
|> filter(fn: (r) => r[\"_measurement\"] == \"perf\")
|> filter(fn: (r) => r[\"organization_id\"] == \"{}\")
|> filter(fn: (r) => r[\"host_id\"] == \"{}\")
|> {}
|> yield(name: \"last\")",
org.influx_bucket, period.range(), org.key, node_id, period.aggregate_window()
);
let query = Query::new(qs);
let rows = client.query::<PerfRow>(Some(query)).await;
match rows {
Err(e) => {
tracing::error!("Error querying InfluxDB: {}", e);
return Err(anyhow::Error::msg("Unable to query influx"));
}
Ok(rows) => {
// Parse and send the data
//println!("{rows:?}");
let mut stats = Vec::new();
// Fill download
for row in rows.iter() {
stats.push(Perf {
date: row.time.format("%Y-%m-%d %H:%M:%S").to_string(),
cpu: row.cpu,
cpu_max: row.cpu_max,
ram: row.ram,
});
}
return Ok(PerfHost{
node_id,
node_name,
stats,
});
}
}
}
Err(anyhow::Error::msg("Unable to query influx"))
}

View File

@ -7,7 +7,6 @@ use axum::extract::ws::{WebSocket, Message};
use futures::future::join_all;
use influxdb2::{models::Query, Client};
use pgdb::sqlx::{Pool, Postgres};
use super::time_period::InfluxTimePeriod;
pub async fn send_packets_for_all_nodes(cnn: Pool<Postgres>, socket: &mut WebSocket, key: &str, period: InfluxTimePeriod) -> anyhow::Result<()> {

View File

@ -103,6 +103,17 @@ export class Bus {
let json = JSON.stringify(request);
this.ws.send(json);
}
requestNodePerfChart(node_id: string, node_name: string) {
let request = {
msg: "nodePerf",
period: window.graphPeriod,
node_id: node_id,
node_name: node_name,
};
let json = JSON.stringify(request);
this.ws.send(json);
}
}
function formatToken(token: string) {

View File

@ -0,0 +1,96 @@
import { scaleNumber } from "../helpers";
import { Component } from "./component";
import * as echarts from 'echarts';
export class NodeCpuChart implements Component {
div: HTMLElement;
myChart: echarts.ECharts;
chartMade: boolean = false;
node_id: string;
node_name: string;
constructor(node_id: string, node_name: string) {
this.node_id = node_id;
this.node_name = node_name;
this.div = document.getElementById("cpuChart") as HTMLElement;
this.myChart = echarts.init(this.div);
this.myChart.showLoading();
}
wireup(): void {
}
ontick(): void {
// Requested by the RAM chart
}
onmessage(event: any): void {
if (event.msg == "nodePerfChart") {
let series: echarts.SeriesOption[] = [];
// Iterate all provides nodes and create a set of series for each,
// providing upload and download banding per node.
let x: any[] = [];
let first = true;
let legend: string[] = [];
for (let i=0; i<event.nodes.length; i++) {
let node = event.nodes[i];
legend.push(node.node_name + " CPU %");
legend.push(node.node_name + " Single Core Peak");
//console.log(node);
let cpu: number[] = [];
let cpu_max: number[] = [];
for (let j=0; j<node.stats.length; j++) {
if (first) x.push(node.stats[j].date);
cpu.push(node.stats[j].cpu);
cpu_max.push(node.stats[j].cpu_max);
}
if (first) first = false;
let val: echarts.SeriesOption = {
name: node.node_name + " CPU %",
type: "line",
data: cpu,
symbol: 'none',
};
let val2: echarts.SeriesOption = {
name: node.node_name + " Single Core Peak",
type: "line",
data: cpu_max,
symbol: 'none',
};
series.push(val);
series.push(val2);
}
if (!this.chartMade) {
this.myChart.hideLoading();
var option: echarts.EChartsOption;
this.myChart.setOption<echarts.EChartsOption>(
(option = {
title: { text: "CPU Usage" },
legend: {
orient: "horizontal",
right: 10,
top: "bottom",
data: legend,
},
xAxis: {
type: 'category',
data: x,
},
yAxis: {
type: 'value',
name: '%',
},
series: series
})
);
option && this.myChart.setOption(option);
// this.chartMade = true;
}
}
}
}

View File

@ -0,0 +1,86 @@
import { scaleNumber } from "../helpers";
import { Component } from "./component";
import * as echarts from 'echarts';
export class NodeRamChart implements Component {
div: HTMLElement;
myChart: echarts.ECharts;
chartMade: boolean = false;
node_id: string;
node_name: string;
constructor(node_id: string, node_name: string) {
this.node_id = node_id;
this.node_name = node_name;
this.div = document.getElementById("ramChart") as HTMLElement;
this.myChart = echarts.init(this.div);
this.myChart.showLoading();
}
wireup(): void {
}
ontick(): void {
window.bus.requestNodePerfChart(this.node_id, this.node_name);
}
onmessage(event: any): void {
if (event.msg == "nodePerfChart") {
let series: echarts.SeriesOption[] = [];
// Iterate all provides nodes and create a set of series for each,
// providing upload and download banding per node.
let x: any[] = [];
let first = true;
let legend: string[] = [];
for (let i=0; i<event.nodes.length; i++) {
let node = event.nodes[i];
legend.push(node.node_name);
//console.log(node);
let ram: number[] = [];
for (let j=0; j<node.stats.length; j++) {
if (first) x.push(node.stats[j].date);
ram.push(node.stats[j].ram);
}
if (first) first = false;
let val: echarts.SeriesOption = {
name: node.node_name,
type: "line",
data: ram,
symbol: 'none',
};
series.push(val);
}
if (!this.chartMade) {
this.myChart.hideLoading();
var option: echarts.EChartsOption;
this.myChart.setOption<echarts.EChartsOption>(
(option = {
title: { text: "RAM Usage" },
legend: {
orient: "horizontal",
right: 10,
top: "bottom",
data: legend,
},
xAxis: {
type: 'category',
data: x,
},
yAxis: {
type: 'value',
name: '%',
},
series: series
})
);
option && this.myChart.setOption(option);
// this.chartMade = true;
}
}
}
}

View File

@ -6,6 +6,8 @@ import { PacketsChartSingle } from '../components/packets_single';
import { RttHisto } from '../components/rtt_histo';
import { ThroughputChartSingle } from '../components/throughput_single';
import { RttChartSingle } from '../components/rtt_graph_single';
import { NodeCpuChart } from '../components/node_cpu';
import { NodeRamChart } from '../components/node_ram';
export class ShaperNodePage implements Page {
menu: MenuPage;
@ -26,6 +28,8 @@ export class ShaperNodePage implements Page {
new ThroughputChartSingle(this.node_id, this.node_name),
new RttChartSingle(this.node_id, this.node_name),
new RttHisto(),
new NodeCpuChart(this.node_id, this.node_name),
new NodeRamChart(this.node_id, this.node_name),
];
let name = document.getElementById('nodeName');
if (name) {

View File

@ -4,6 +4,25 @@
<h1 id="nodeName">Node</h1>
</div>
</div>
<div class="row">
<div class="col-6">
<div class="card">
<div class="card-body">
<div id="cpuChart" style="height: 250px"></div>
</div>
</div>
</div>
<div class="col-6">
<div class="card">
<div class="card-body">
<div id="ramChart" style="height: 250px"></div>
</div>
</div>
</div>
</div>
<div class="row">
<div class="col-6">
<div class="card">

View File

@ -85,6 +85,10 @@ pub struct StatsSubmission {
pub hosts: Option<Vec<StatsHost>>,
/// Tree of traffic summaries
pub tree: Option<Vec<StatsTreeNode>>,
/// CPU utiliation on the shaper
pub cpu_usage: Vec<u32>,
/// RAM utilization on the shaper
pub ram_percent: u32,
}
/// Network-transmitted query to ask the status of a license

View File

@ -1,4 +1,6 @@
use lqos_utils::unix_time::unix_now;
use once_cell::sync::Lazy;
use sysinfo::{System, SystemExt};
use super::{
collation_utils::{MinMaxAvg, MinMaxAvgPair},
@ -6,7 +8,7 @@ use super::{
tree::{get_network_tree, NetworkTreeEntry},
};
use crate::long_term_stats::data_collector::SESSION_BUFFER;
use std::{collections::HashMap, net::IpAddr};
use std::{collections::HashMap, net::IpAddr, sync::Mutex};
#[derive(Debug, Clone)]
pub(crate) struct StatsSubmission {
@ -27,9 +29,32 @@ pub(crate) struct SubmissionHost {
pub(crate) tree_parent_indices: Vec<usize>,
}
static SYS: Lazy<Mutex<System>> = Lazy::new(|| Mutex::new(System::new_all()));
fn get_cpu_ram() -> (Vec<u32>, u32) {
use sysinfo::CpuExt;
let mut lock = SYS.lock().unwrap();
lock.refresh_cpu();
lock.refresh_memory();
let cpus: Vec<u32> = lock.cpus()
.iter()
.map(|cpu| cpu.cpu_usage() as u32) // Always rounds down
.collect();
let memory = (lock.used_memory() as f32 / lock.total_memory() as f32) * 100.0;
//println!("cpu: {:?}, ram: {}", cpus, memory);
(cpus, memory as u32)
}
impl From<StatsSubmission> for lqos_bus::long_term_stats::StatsSubmission {
fn from(value: StatsSubmission) -> Self {
let (cpu, ram) = get_cpu_ram();
Self {
cpu_usage: cpu,
ram_percent: ram,
timestamp: value.timestamp,
totals: Some(value.clone().into()),
hosts: Some(value.hosts.into_iter().map(Into::into).collect()),

View File

@ -89,14 +89,15 @@ async fn send_queue(host: String) {
if e.kind() == std::io::ErrorKind::NotFound {
log::error!("Unable to access {host}. Check that lqosd is running and you have appropriate permissions.");
}
}
let mut stream = stream.unwrap(); // This unwrap is safe, we checked that it exists previously
let ret = stream.write(&submission_buffer).await;
if ret.is_err() {
log::error!("Unable to write to {host} stream.");
log::error!("{:?}", ret);
} else {
s.sent = true;
let mut stream = stream.unwrap(); // This unwrap is safe, we checked that it exists previously
let ret = stream.write(&submission_buffer).await;
if ret.is_err() {
log::error!("Unable to write to {host} stream.");
log::error!("{:?}", ret);
} else {
s.sent = true;
}
}
}