diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/cpu_dash.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/cpu_dash.js new file mode 100644 index 00000000..24af6130 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/cpu_dash.js @@ -0,0 +1,33 @@ +import {BaseDashlet} from "./base_dashlet"; +import {CpuHistogram} from "../graphs/cpu_graph"; + +export class CpuDash extends BaseDashlet{ + constructor(slot) { + super(slot); + } + + title() { + return "CPU Utilization"; + } + + subscribeTo() { + return [ "Cpu" ]; + } + + buildContainer() { + let base = super.buildContainer(); + base.appendChild(this.graphDiv()); + return base; + } + + setup() { + super.setup(); + this.graph = new CpuHistogram(this.graphDivId()); + } + + onMessage(msg) { + if (msg.event === "Cpu") { + this.graph.update(msg.data); + } + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js index e6fd1686..451776cc 100644 --- a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/dashlet_index.js @@ -12,6 +12,8 @@ import {Top10EndpointsByCountry} from "./endpoints_by_country"; import {EtherProtocols} from "./ether_protocols"; import {IpProtocols} from "./ip_protocols"; import {Worst10Retransmits} from "./worst10_retransmits"; +import {CpuDash} from "./cpu_dash"; +import {RamDash} from "./ram_dash"; export const DashletMenu = [ { name: "Throughput Bits/Second", tag: "throughputBps", size: 3 }, @@ -28,6 +30,8 @@ export const DashletMenu = [ { name: "Top 10 Endpoints by Country", tag: "top10endpointsCountry", size: 6 }, { name: "Ether Protocols", tag: "etherProtocols", size: 6 }, { name: "IP Protocols", tag: "ipProtocols", size: 6 }, + { name: "CPU Utilization", tag: "cpu", size: 6 }, + { name: "RAM Utilization", tag: "ram", size: 6 }, ]; export function widgetFactory(widgetName, count) { @@ -47,6 +51,8 @@ export function widgetFactory(widgetName, count) { case "top10endpointsCountry" : widget = new Top10EndpointsByCountry(count); break; case "etherProtocols" : widget = new EtherProtocols(count); break; case "ipProtocols" : widget = new IpProtocols(count); break; + case "cpu" : widget = new CpuDash(count); break; + case "ram" : widget = new RamDash(count); break; default: { console.log("I don't know how to construct a widget of type [" + widgetName + "]"); return null; diff --git a/src/rust/lqosd/src/node_manager/js_build/src/dashlets/ram_dash.js b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/ram_dash.js new file mode 100644 index 00000000..e229d2b7 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/js_build/src/dashlets/ram_dash.js @@ -0,0 +1,37 @@ +import {BaseDashlet} from "./base_dashlet"; +import {RttHistogram} from "../graphs/rtt_histo"; +import {RamPie} from "../graphs/ram_pie"; + +export class RamDash extends BaseDashlet{ + constructor(slot) { + super(slot); + } + + title() { + return "RAM Utilization"; + } + + subscribeTo() { + return [ "Ram" ]; + } + + buildContainer() { + let base = super.buildContainer(); + base.appendChild(this.graphDiv()); + return base; + } + + setup() { + super.setup(); + this.graph = new RamPie(this.graphDivId()); + } + + onMessage(msg) { + if (msg.event === "Ram") { + let total = msg.data.total; + let used = msg.data.used; + let free = total - used; + this.graph.update(free, used); + } + } +} \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/js_build/src/graphs/cpu_graph.js b/src/rust/lqosd/src/node_manager/js_build/src/graphs/cpu_graph.js new file mode 100644 index 00000000..e1b7d306 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/js_build/src/graphs/cpu_graph.js @@ -0,0 +1,46 @@ +import {DashboardGraph} from "./dashboard_graph"; +import {lerpGreenToRedViaOrange} from "../helpers/scaling"; + +export class CpuHistogram extends DashboardGraph { + constructor(id) { + super(id); + let d = []; + let axis = []; + for (let i=0; i<20; i++) { + d.push({ + value: i, + itemStyle: {color: lerpGreenToRedViaOrange(20-i, 20)}, + }); + axis.push(i.toString()); + } + this.option = { + xAxis: { + type: 'category', + data: axis, + }, + yAxis: { + type: 'value', + min: 0, + max: 100, + }, + series: { + data: d, + type: 'bar', + } + }; + this.option && this.chart.setOption(this.option); + } + + update(cpu) { + this.chart.hideLoading(); + this.option.series.data = []; + for (let i=0; i Router { let channels = PubSub::new(); tokio::spawn(channel_ticker(channels.clone())); + tokio::spawn(ticker::system_info::cache::update_cache()); Router::new() .route("/ws", get(ws_handler)) .layer(Extension(channels)) diff --git a/src/rust/lqosd/src/node_manager/ws/published_channels.rs b/src/rust/lqosd/src/node_manager/ws/published_channels.rs index 9dd69a3a..f5b1945d 100644 --- a/src/rust/lqosd/src/node_manager/ws/published_channels.rs +++ b/src/rust/lqosd/src/node_manager/ws/published_channels.rs @@ -15,4 +15,6 @@ pub enum PublishedChannels { EndpointsByCountry, EtherProtocols, IpProtocols, + Cpu, + Ram, } diff --git a/src/rust/lqosd/src/node_manager/ws/ticker.rs b/src/rust/lqosd/src/node_manager/ws/ticker.rs index 6fbd4d1a..6a3ff6fc 100644 --- a/src/rust/lqosd/src/node_manager/ws/ticker.rs +++ b/src/rust/lqosd/src/node_manager/ws/ticker.rs @@ -6,6 +6,7 @@ mod top_10; mod ipstats_conversion; mod top_flows; mod flow_endpoints; +pub mod system_info; use std::sync::Arc; use crate::node_manager::ws::publish_subscribe::PubSub; @@ -29,6 +30,8 @@ pub(super) async fn channel_ticker(channels: Arc) { flow_endpoints::endpoints_by_country(channels.clone()), flow_endpoints::ether_protocols(channels.clone()), flow_endpoints::ip_protocols(channels.clone()), + system_info::cpu_info(channels.clone()), + system_info::ram_info(channels.clone()), ); } } \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/system_info.rs b/src/rust/lqosd/src/node_manager/ws/ticker/system_info.rs new file mode 100644 index 00000000..51dca704 --- /dev/null +++ b/src/rust/lqosd/src/node_manager/ws/ticker/system_info.rs @@ -0,0 +1,47 @@ +use std::sync::Arc; +use serde_json::json; +use crate::node_manager::ws::publish_subscribe::PubSub; +use crate::node_manager::ws::published_channels::PublishedChannels; +use crate::node_manager::ws::ticker::system_info::cache::{CPU_USAGE, NUM_CPUS, RAM_USED, TOTAL_RAM}; + +pub mod cache; + +pub async fn cpu_info(channels: Arc) { + if !channels.is_channel_alive(PublishedChannels::Cpu).await { + return; + } + + let usage: Vec = CPU_USAGE + .iter() + .take(NUM_CPUS.load(std::sync::atomic::Ordering::Relaxed)) + .map(|cpu| cpu.load(std::sync::atomic::Ordering::Relaxed)) + .collect(); + + let message = json!( + { + "event": PublishedChannels::Cpu.to_string(), + "data": usage, + } + ).to_string(); + channels.send(PublishedChannels::Cpu, message).await; +} + +pub async fn ram_info(channels: Arc) { + if !channels.is_channel_alive(PublishedChannels::Ram).await { + return; + } + + let ram_usage = RAM_USED.load(std::sync::atomic::Ordering::Relaxed); + let total_ram = TOTAL_RAM.load(std::sync::atomic::Ordering::Relaxed); + + let message = json!( + { + "event": PublishedChannels::Ram.to_string(), + "data": { + "total" : total_ram, + "used" : ram_usage, + }, + } + ).to_string(); + channels.send(PublishedChannels::Ram, message).await; +} \ No newline at end of file diff --git a/src/rust/lqosd/src/node_manager/ws/ticker/system_info/cache.rs b/src/rust/lqosd/src/node_manager/ws/ticker/system_info/cache.rs new file mode 100644 index 00000000..4f7c531f --- /dev/null +++ b/src/rust/lqosd/src/node_manager/ws/ticker/system_info/cache.rs @@ -0,0 +1,57 @@ +use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize}; +use std::time::Duration; + +use once_cell::sync::Lazy; + +const MAX_CPUS_COUNTED: usize = 128; + +/// Stores overall CPU usage +pub static CPU_USAGE: Lazy<[AtomicU32; MAX_CPUS_COUNTED]> = + Lazy::new(build_empty_cpu_list); + +/// Total number of CPUs detected +pub static NUM_CPUS: AtomicUsize = AtomicUsize::new(0); + +/// Total RAM used (bytes) +pub static RAM_USED: AtomicU64 = AtomicU64::new(0); + +/// Total RAM installed (bytes) +pub static TOTAL_RAM: AtomicU64 = AtomicU64::new(0); + +fn build_empty_cpu_list() -> [AtomicU32; MAX_CPUS_COUNTED] { + let mut temp = Vec::with_capacity(MAX_CPUS_COUNTED); + for _ in 0..MAX_CPUS_COUNTED { + temp.push(AtomicU32::new(0)); + } + temp.try_into().expect("This should never happen, sizes are constant.") +} + +pub async fn update_cache() { + use sysinfo::System; + let mut sys = System::new_all(); + tokio::time::sleep(Duration::from_secs(10)).await; + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + loop { + interval.tick().await; // Once per second + + sys.refresh_cpu(); + sys.refresh_memory(); + + sys + .cpus() + .iter() + .enumerate() + .map(|(i, cpu)| (i, cpu.cpu_usage() as u32)) // Always rounds down + .for_each(|(i, cpu)| { + CPU_USAGE[i].store(cpu, std::sync::atomic::Ordering::Relaxed) + }); + + NUM_CPUS + .store(sys.cpus().len(), std::sync::atomic::Ordering::Relaxed); + RAM_USED + .store(sys.used_memory(), std::sync::atomic::Ordering::Relaxed); + TOTAL_RAM + .store(sys.total_memory(), std::sync::atomic::Ordering::Relaxed); + } +} \ No newline at end of file