address draft PR comments

This commit is contained in:
ShaunakDas88 2023-10-06 10:42:18 -07:00
parent bce0f9b297
commit cd1ab9c540
14 changed files with 623 additions and 593 deletions

View File

@ -1,115 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform;
import com.codahale.metrics.Gauge;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.Map;
import java.util.Map.Entry;
import java.util.HashMap;
public class ClientSystemMetricChecker {
private static final Logger logger = LogManager.getLogger(ClientSystemMetricChecker.class);
private final int pollIntervalSeconds;
private final ScheduledExecutorService scheduler;
private final Map<String,Gauge<Double>> nameToNumerator;
private final Map<String,Gauge<Double>> nameToDenominator;
private final Map<String,Double> nameToThreshold;
private final Map<String,Double> nameToPrevNumeratorValue;
private final Map<String,Double> nameToPrevDenominatorValue;
private final Map<String,Boolean> nameToRetainPrevValue;
public ClientSystemMetricChecker(int pollIntervalSeconds) {
this.pollIntervalSeconds = pollIntervalSeconds;
this.scheduler = Executors.newScheduledThreadPool(1);
nameToNumerator = new HashMap<>();
nameToDenominator = new HashMap<>();
nameToThreshold = new HashMap<>();
nameToPrevNumeratorValue = new HashMap<>();
nameToPrevDenominatorValue = new HashMap<>();
nameToRetainPrevValue = new HashMap<>();
}
public void addMetricToCheck(String name, Gauge<Double> metric, Double threshold) {
addRatioMetricToCheck(name, metric, null, threshold, false);
}
public void addRatioMetricToCheck(String name, Gauge<Double> numerator, Gauge<Double> denominator, Double threshold, boolean retainPrev) {
/**
* Some "meaningful" system metrics are derived via:
* - taking a ratio of instantaneous values (e.g. MemUsed / MemTotal from /proc/meminfo)
* - taking a ratio of deltas of aggregates values over a time window (e.g. CPU utilization from /proc/stat)
*
* This method serves to be able to allow checking those which can be derived as a ratio of two existing metrics.
*/
nameToNumerator.put(name, numerator);
if (denominator != null)
nameToDenominator.put(name, denominator);
nameToThreshold.put(name, threshold);
nameToRetainPrevValue.put(name, retainPrev);
}
public void start() {
scheduler.scheduleAtFixedRate(() -> {
checkMetrics();
}, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
}
private void checkMetrics() {
for (Entry<String,Gauge<Double>> entry: nameToNumerator.entrySet()) {
String name = entry.getKey();
Gauge<Double> numerator = entry.getValue();
Gauge<Double> denominator = nameToDenominator.get(name);
Double threshold = nameToThreshold.get(name);
Double numeratorVal = numerator.getValue();
if (numeratorVal == null)
continue;
Double deltaNumeratorVal = numeratorVal - nameToPrevNumeratorValue.getOrDefault(name, 0.0);
// the case that we are not checking a ratio of values
if (denominator == null) {
if (deltaNumeratorVal > threshold)
logger.warn(name + " value = " + deltaNumeratorVal + " > threshold " + threshold);
if (nameToRetainPrevValue.get(name))
nameToPrevNumeratorValue.put(name, numeratorVal);
continue;
}
// at this point, we are checking ratio of gauge value changes over a time interval
Double denominatorValue = denominator.getValue();
if (denominatorValue == null)
continue;
Double deltaDenominatorVal = denominatorValue - nameToPrevDenominatorValue.getOrDefault(name, 0.0);
if (deltaDenominatorVal != 0.0) {
Double percent = (deltaNumeratorVal / deltaDenominatorVal) * 100.0;
if (percent > threshold)
logger.warn(name + " value = " + percent + " > threshold " + threshold);
}
// finally, save these currently recorded values if required
if (nameToRetainPrevValue.get(name)) {
nameToPrevNumeratorValue.put(name, numeratorVal);
nameToPrevDenominatorValue.put(name, denominatorValue);
}
}
}
public void shutdown() {
scheduler.shutdown();
}
}

View File

@ -1,93 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DiskStatsReader {
private static final String filePath = "/proc/diskstats";
private static final Map<String,Map<String,Double>> metricsMap = new HashMap<>();
public static List<String> getDevices() {
parseFile();
return new ArrayList<>(metricsMap.keySet());
}
public static Double getTransactionsForDevice(String device) {
return getValue(device, "transactions");
}
public static Double getKbReadForDevice(String device) {
return getValue(device, "kB_read");
}
public static Double getKbWrittenForDevice(String device) {
return getValue(device, "kB_written");
}
private static Double getValue(String device, String metric) {
parseFile();
if (metricsMap.get(device) == null)
return null;
return metricsMap.get(device).get(metric);
}
private static void parseFile() {
/*
Note that all fields are cumulative within /proc/diskstats.
Reference:
- https://serverfault.com/questions/619097/interpreting-proc-diskstats-for-a-webserver-more-writes-than-reads
*/
metricsMap.clear();
try {
FileReader file = new FileReader(filePath);
BufferedReader reader = new BufferedReader(file);
int sectorSizeBytes = 512;
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.trim().split("\\s+");
if (parts.length < 14)
continue;
Map<String,Double> innerMap = new HashMap<>();
String device = parts[2];
Double readsCompleted = Double.parseDouble(parts[3]);
Double sectorsRead = Double.parseDouble(parts[5]);
Double writesCompleted = Double.parseDouble(parts[7]);
Double sectorsWritten = Double.parseDouble(parts[9]);
Double transactions = readsCompleted + writesCompleted;
Double kbRead = (sectorsRead * sectorSizeBytes) / 1024;
Double kbWritten = (sectorsWritten * sectorSizeBytes) / 1024;
innerMap.put("transactions", transactions);
innerMap.put("kB_read", kbRead);
innerMap.put("kB_written", kbWritten);
metricsMap.put(device, innerMap);
}
} catch (FileNotFoundException e) {
return;
}
catch (final Throwable t) {
throw new RuntimeException("Failed to read " + filePath);
}
}
}

View File

@ -1,70 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
public class LoadAvgReader {
private static final String filePath = "/proc/loadavg";
private static Map<String,Double> metricsMap = new HashMap<>();
public static Double getOneMinLoadAverage() {
return getValue("loadAvg1min");
}
public static Double getFiveMinLoadAverage() {
return getValue("loadAvg5min");
}
public static Double getFifteenMinLoadAverage() {
return getValue("loadAvg15min");
}
private static Double getValue(String key) {
parseFile();
return metricsMap.get(key);
}
private static void parseFile()
{
metricsMap.clear();
try {
FileReader file = new FileReader(filePath);
BufferedReader reader = new BufferedReader(file);
String line = reader.readLine();
if (line == null)
return;
String[] parts = line.split("\\s+");
Double loadAvgOneMin = Double.parseDouble(parts[0]);
Double loadAvgFiveMin = Double.parseDouble(parts[1]);
Double loadAvgFifteenMinute = Double.parseDouble(parts[2]);
metricsMap.put("loadAvg1min", loadAvgOneMin);
metricsMap.put("loadAvg5min", loadAvgFiveMin);
metricsMap.put("loadAvg15min", loadAvgFifteenMinute);
} catch (FileNotFoundException e) {
return;
}
catch (final Throwable t) {
throw new RuntimeException("Failed to read " + filePath);
}
}
}

View File

@ -1,90 +0,0 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MemInfoReader {
private static final String filePath = "/proc/meminfo";
private static final Set<String> relevantKeys = Set.of(
"MemTotal", "MemFree", "MemAvailable", "Cached", "Buffers", "SwapTotal", "SwapFree"
);
private static final Map<String,Double> metricsMap = new HashMap<>();
public static Double getMemTotalkB() {
return getValue("MemTotal");
}
public static Double getMemFreekB() {
return getValue("MemFree");
}
public static Double getMemAvailablekB() {
return getValue("MemAvailable");
}
public static Double getMemUsedkB() {
return getValue("MemUsed");
}
public static Double getMemCachedkB() {
return getValue("Cached");
}
public static Double getMemBufferskB() {
return getValue("Buffers");
}
public static Double getSwapTotalkB() {
return getValue("SwapTotal");
}
public static Double getSwapFreekB() {
return getValue("SwapFree");
}
public static Double getSwapUsedkB() {
return getValue("SwapUsed");
}
private static Double getValue(String key) {
parseFile();
return metricsMap.get(key);
}
private static void parseFile() {
/*
References:
- https://docs.kernel.org/filesystems/proc.html#meminfo
- https://stackoverflow.com/questions/41224738/how-to-calculate-system-memory-usage-from-proc-meminfo-like-htop
*/
metricsMap.clear();
try {
FileReader file = new FileReader(filePath);
BufferedReader reader = new BufferedReader(file);
String line;
while ((line = reader.readLine()) != null) {
String[] parts = line.split(":");
if (parts.length != 2)
continue;
String key = parts[0].trim();
Double value = Double.parseDouble(parts[1].trim().split(" ")[0]);
if (relevantKeys.contains(key)) {
metricsMap.put(key, value);
}
}
if (metricsMap.get("MemTotal") != null && metricsMap.get("MemFree") != null)
metricsMap.put("MemUsed", metricsMap.get("MemTotal") - metricsMap.get("MemFree"));
if (metricsMap.get("SwapTotal") != null && metricsMap.get("SwapFree") != null)
metricsMap.put("SwapUsed", metricsMap.get("SwapTotal") - metricsMap.get("SwapFree"));
} catch (FileNotFoundException e) {
return;
}
catch (final Throwable t) {
throw new RuntimeException("Failed to read " + filePath);
}
}
}

View File

@ -1,100 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class NetDevReader {
private static final String filePath = "/proc/net/dev";
private static final Map<String,Map<String,Double>> metricsMap = new HashMap<>();
public static List<String> getInterfaces() {
parseFile();
return new ArrayList<>(metricsMap.keySet());
}
public static Double getBytesReceived(String interfaceName) {
return getValue(interfaceName, "rx_bytes");
}
public static Double getPacketsReceived(String interfaceName) {
return getValue(interfaceName, "rx_packets");
}
public static Double getBytesTransmitted(String interfaceName) {
return getValue(interfaceName, "tx_bytes");
}
public static Double getPacketsTransmitted(String interfaceName) {
return getValue(interfaceName, "tx_packets");
}
private static Double getValue(String interfaceName, String metric) {
parseFile();
if (metricsMap.get(interfaceName) == null)
return null;
return metricsMap.get(interfaceName).get(metric);
}
private static void parseFile() {
/*
Note that all fields are cumulative in /proc/net/dev
Reference:
- https://www.linuxquestions.org/questions/linux-networking-3/need-explanation-of-proc-net-dev-bytes-counters-4175458860/
*/
metricsMap.clear();
try {
FileReader file = new FileReader(filePath);
BufferedReader reader = new BufferedReader(file);
String line;
while ((line = reader.readLine()) != null) {
line = line.trim();
if (line.startsWith("Inter-|"))
continue;
String[] parts = line.split(":");
if (parts.length != 2)
continue;
String interfaceName = parts[0].trim();
String[] stats = parts[1].trim().split("\\s+");
if (stats.length < 16)
continue;
Map<String,Double> innerMap = new HashMap<>();
Double receivedBytes = Double.parseDouble(stats[0]);
Double receivedPackets = Double.parseDouble(stats[1]);
Double transmittedBytes = Double.parseDouble(stats[8]);
Double transmittedPackets = Double.parseDouble(stats[9]);
innerMap.put("rx_bytes", receivedBytes);
innerMap.put("rx_packets", receivedPackets);
innerMap.put("tx_bytes", transmittedBytes);
innerMap.put("tx_packets", transmittedPackets);
metricsMap.put(interfaceName, innerMap);
}
} catch (FileNotFoundException e) {
return;
}
catch (final Throwable t) {
throw new RuntimeException("Failed to read " + filePath);
}
}
}

View File

@ -1,93 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.HashMap;
import java.util.Map;
public class StatReader {
private static final String filePath = "/proc/stat";
private static final Map<String,Double> metricsMap = new HashMap<>();
public static Double getUserTime() {
return getValue("user");
}
public static Double getSystemTime() {
return getValue("system");
}
public static Double getIdleTime() {
return getValue("idle");
}
public static Double getIoWaitTime() {
return getValue("iowait");
}
public static Double getTotalTime() {
return getValue("total");
}
private static Double getValue(String key) {
parseFile();
return metricsMap.get(key);
}
private static void parseFile() {
/*
Note that all fields are cumulative within /proc/stat.
Reference:
- https://docs.kernel.org/filesystems/proc.html#miscellaneous-kernel-statistics-in-proc-stat
*/
metricsMap.clear();
try {
FileReader file = new FileReader(filePath);
BufferedReader reader = new BufferedReader(file);
String line;
while ((line = reader.readLine()) != null) {
if (line.startsWith("cpu ")) {
String[] parts = line.split("\\s+");
Double user = Double.parseDouble(parts[1]);
Double nice = Double.parseDouble(parts[2]);
Double system = Double.parseDouble(parts[3]);
Double idle = Double.parseDouble(parts[4]);
Double iowait = Double.parseDouble(parts[5]);
Double irq = Double.parseDouble(parts[6]);
Double softirq = Double.parseDouble(parts[7]);
Double steal = Double.parseDouble(parts[8]);
Double total = user + nice + system + idle + iowait + irq + softirq + steal;
metricsMap.put("user", user);
metricsMap.put("system", system);
metricsMap.put("idle", idle);
metricsMap.put("iowait", iowait);
metricsMap.put("total", total);
break;
}
}
} catch (FileNotFoundException e) {
return;
}
catch (final Throwable t) {
throw new RuntimeException("Failed to read " + filePath);
}
}
}

View File

@ -37,16 +37,16 @@ import io.nosqlbench.engine.api.activityapi.cyclelog.outputs.cyclelog.CycleLogDu
import io.nosqlbench.engine.api.activityapi.cyclelog.outputs.cyclelog.CycleLogImporterUtility;
import io.nosqlbench.engine.api.activityapi.input.InputType;
import io.nosqlbench.engine.api.activityapi.output.OutputType;
import io.nosqlbench.engine.api.activityimpl.uniform.ClientSystemMetricChecker;
import io.nosqlbench.engine.api.activityimpl.uniform.DiskStatsReader;
import io.nosqlbench.engine.api.activityimpl.uniform.LoadAvgReader;
import io.nosqlbench.engine.api.activityimpl.uniform.MemInfoReader;
import io.nosqlbench.engine.api.activityimpl.uniform.NetDevReader;
import io.nosqlbench.engine.api.activityimpl.uniform.StatReader;
import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpsLoader;
import io.nosqlbench.engine.cli.NBCLIOptions.LoggerConfigData;
import io.nosqlbench.engine.cli.NBCLIOptions.Mode;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.clientload.ClientSystemMetricChecker;
import io.nosqlbench.engine.core.clientload.DiskStatsReader;
import io.nosqlbench.engine.core.clientload.LoadAvgReader;
import io.nosqlbench.engine.core.clientload.MemInfoReader;
import io.nosqlbench.engine.core.clientload.NetDevReader;
import io.nosqlbench.engine.core.clientload.StatReader;
import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
import io.nosqlbench.engine.core.lifecycle.process.ShutdownManager;
@ -519,14 +519,17 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
}
private void registerLoadAvgMetrics() {
LoadAvgReader reader = new LoadAvgReader();
if (!reader.fileExists())
return;
Gauge<Double> loadAvgOneMinGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> LoadAvgReader.getOneMinLoadAverage(), "loadavg_1min")
new NBFunctionGauge(this, () -> reader.getOneMinLoadAverage(), "loadavg_1min")
);
Gauge<Double> loadAvgFiveMinGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> LoadAvgReader.getFiveMinLoadAverage(), "loadavg_5min")
new NBFunctionGauge(this, () -> reader.getFiveMinLoadAverage(), "loadavg_5min")
);
Gauge<Double> loadAvgFifteenMinuteGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> LoadAvgReader.getFifteenMinLoadAverage(), "loadavg_15min")
new NBFunctionGauge(this, () -> reader.getFifteenMinLoadAverage(), "loadavg_15min")
);
// add checking for CPU load averages; TODO: Modify thresholds
clientMetricChecker.addMetricToCheck("loadavg_5min", loadAvgFiveMinGauge, 50.0);
@ -535,23 +538,26 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
}
private void registerMemInfoMetrics() {
MemInfoReader reader = new MemInfoReader();
if (!reader.fileExists())
return;
Gauge<Double> memTotalGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getMemTotalkB(), "mem_total")
new NBFunctionGauge(this, () -> reader.getMemTotalkB(), "mem_total")
);
Gauge<Double> memUsedGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getMemUsedkB(), "mem_used")
new NBFunctionGauge(this, () -> reader.getMemUsedkB(), "mem_used")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getMemFreekB(), "mem_free")
new NBFunctionGauge(this, () -> reader.getMemFreekB(), "mem_free")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getMemAvailablekB(), "mem_available")
new NBFunctionGauge(this, () -> reader.getMemAvailablekB(), "mem_available")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getMemCachedkB(), "mem_cached")
new NBFunctionGauge(this, () -> reader.getMemCachedkB(), "mem_cached")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getMemBufferskB(), "mem_buffered")
new NBFunctionGauge(this, () -> reader.getMemBufferskB(), "mem_buffered")
);
// add checking for percent memory used at some given time; TODO: Modify percent threshold
clientMetricChecker.addRatioMetricToCheck(
@ -559,62 +565,71 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getSwapTotalkB(), "swap_total")
new NBFunctionGauge(this, () -> reader.getSwapTotalkB(), "swap_total")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getSwapFreekB(), "swap_free")
new NBFunctionGauge(this, () -> reader.getSwapFreekB(), "swap_free")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> MemInfoReader.getSwapUsedkB(), "swap_used")
new NBFunctionGauge(this, () -> reader.getSwapUsedkB(), "swap_used")
);
}
private void registerDiskStatsMetrics() {
for (String device: DiskStatsReader.getDevices()) {
DiskStatsReader reader = new DiskStatsReader();
if (!reader.fileExists())
return;
for (String device: reader.getDevices()) {
ActivityMetrics.register(
new NBFunctionGauge(this, () -> DiskStatsReader.getTransactionsForDevice(device), device + "_transactions")
new NBFunctionGauge(this, () -> reader.getTransactionsForDevice(device), device + "_transactions")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> DiskStatsReader.getKbReadForDevice(device), device + "_kB_read")
new NBFunctionGauge(this, () -> reader.getKbReadForDevice(device), device + "_kB_read")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> DiskStatsReader.getKbWrittenForDevice(device), device + "_kB_written")
new NBFunctionGauge(this, () -> reader.getKbWrittenForDevice(device), device + "_kB_written")
);
}
}
private void registerNetworkInterfaceMetrics() {
for (String interfaceName: NetDevReader.getInterfaces()) {
NetDevReader reader = new NetDevReader();
if (!reader.fileExists())
return;
for (String interfaceName: reader.getInterfaces()) {
ActivityMetrics.register(
new NBFunctionGauge(this, () -> NetDevReader.getBytesReceived(interfaceName), interfaceName + "_rx_bytes")
new NBFunctionGauge(this, () -> reader.getBytesReceived(interfaceName), interfaceName + "_rx_bytes")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> NetDevReader.getPacketsReceived(interfaceName), interfaceName + "_rx_packets")
new NBFunctionGauge(this, () -> reader.getPacketsReceived(interfaceName), interfaceName + "_rx_packets")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> NetDevReader.getBytesTransmitted(interfaceName), interfaceName + "_tx_bytes")
new NBFunctionGauge(this, () -> reader.getBytesTransmitted(interfaceName), interfaceName + "_tx_bytes")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> NetDevReader.getPacketsTransmitted(interfaceName), interfaceName + "_tx_packets")
new NBFunctionGauge(this, () -> reader.getPacketsTransmitted(interfaceName), interfaceName + "_tx_packets")
);
}
}
private void registerStatMetrics() {
StatReader reader = new StatReader();
if (!reader.fileExists())
return;
Gauge<Double> cpuUserGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> StatReader.getUserTime(), "cpu_user")
new NBFunctionGauge(this, () -> reader.getUserTime(), "cpu_user")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> StatReader.getSystemTime(), "cpu_system")
new NBFunctionGauge(this, () -> reader.getSystemTime(), "cpu_system")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> StatReader.getIdleTime(), "cpu_idle")
new NBFunctionGauge(this, () -> reader.getIdleTime(), "cpu_idle")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> StatReader.getIoWaitTime(), "cpu_iowait")
new NBFunctionGauge(this, () -> reader.getIoWaitTime(), "cpu_iowait")
);
Gauge<Double> cpuTotalGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> StatReader.getTotalTime(), "cpu_total")
new NBFunctionGauge(this, () -> reader.getTotalTime(), "cpu_total")
);
// add checking for percent of time spent in user space; TODO: Modify percent threshold
clientMetricChecker.addRatioMetricToCheck(

View File

@ -0,0 +1,129 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.clientload;
import com.codahale.metrics.Gauge;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
public class ClientSystemMetricChecker {
private final int pollIntervalSeconds;
private final ScheduledExecutorService scheduler;
private List<ClientMetric> clientMetrics;
public ClientSystemMetricChecker(int pollIntervalSeconds) {
this.pollIntervalSeconds = pollIntervalSeconds;
this.scheduler = Executors.newScheduledThreadPool(1);
this.clientMetrics = new ArrayList<>();
}
public void addMetricToCheck(String name, Gauge<Double> metric, Double threshold) {
addRatioMetricToCheck(name, metric, null, threshold, false);
}
public void addRatioMetricToCheck(String name, Gauge<Double> numerator, Gauge<Double> denominator, Double threshold, boolean retainPrev) {
/**
* Some "meaningful" system metrics are derived via:
* - taking a ratio of instantaneous values (e.g. MemUsed / MemTotal from /proc/meminfo)
* - taking a ratio of deltas of aggregates values over a time window (e.g. CPU utilization from /proc/stat)
*
* This method serves to be able to allow checking those which can be derived as a ratio of two existing metrics.
*/
clientMetrics.add(new ClientMetric(name, numerator, denominator, threshold, retainPrev));
}
public void start() {
scheduler.scheduleAtFixedRate(() -> {
checkMetrics();
}, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
}
private void checkMetrics() {
for (ClientMetric c: clientMetrics)
c.check();
}
public void shutdown() {
scheduler.shutdown();
}
private class ClientMetric {
private static final Logger logger = LogManager.getLogger(ClientMetric.class);
private final String name;
private final Gauge<Double> numerator;
private final Gauge<Double> denominator;
private final Double threshold;
private final Boolean retainPrevValue;
private Double prevNumeratorValue;
private Double prevDenominatorValue;
private ClientMetric(String name, Gauge<Double> gauge, Double threshold) {
this(name, gauge, null, threshold, false);
}
private ClientMetric(String name, Gauge<Double> numerator, Gauge<Double> denominator, Double threshold, Boolean retainPrevValue) {
this.name = name;
this.numerator = numerator;
this.denominator = denominator;
this.threshold = threshold;
this.retainPrevValue = retainPrevValue;
this.prevNumeratorValue = null;
this.prevDenominatorValue = null;
}
private Double extract(){
Double numeratorVal = numerator.getValue();
if (numeratorVal == null)
return null;
Double deltaNumeratorVal = numeratorVal;
if (prevNumeratorValue != null)
deltaNumeratorVal -= prevNumeratorValue;
// the case that we are not extracting a ratio of values
if (denominator == null) {
if (retainPrevValue)
prevNumeratorValue = numeratorVal;
return deltaNumeratorVal;
}
// at this point, we will be extracting a ratio of gauge value changes over a time interval
Double denominatorVal = denominator.getValue();
if (denominatorVal == null)
return null;
Double deltaDenominatorVal = denominatorVal;
if (prevDenominatorValue != null)
deltaDenominatorVal -= prevDenominatorValue;
if (deltaDenominatorVal == 0.0)
return null;
Double percent = (deltaNumeratorVal / deltaDenominatorVal) * 100.0;
if (retainPrevValue) {
prevNumeratorValue = numeratorVal;
prevDenominatorValue = denominatorVal;
}
return percent;
}
private void check() {
Double extractedVal = extract();
if (extractedVal != null && extractedVal > threshold)
logger.warn(name + " value = " + extractedVal + " > threshold " + threshold);
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.clientload;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class DiskStatsReader extends LinuxSystemFileReader {
/**
* Note that all fields are cumulative within /proc/diskstats.
*
* Reference:
* - https://serverfault.com/questions/619097/interpreting-proc-diskstats-for-a-webserver-more-writes-than-reads
*
* Example line:
* 259 0 nvme0n1 669494 21 65326120 388760 3204963 2891102 734524354 42209620 0 446420 41361212
*/
private static final Double sectorSizeBytes = 512.0;
public DiskStatsReader() {
super("/proc/diskstats");
}
public Double getTransactionsForDevice(String deviceName) {
MatchResult result = findFirstMatch(Pattern.compile(buildRegex(deviceName)));
if (result == null)
return null;
Double readsCompleted = Double.valueOf(result.group(1));
Double writesCompleted = Double.valueOf(result.group(5));
return readsCompleted + writesCompleted;
}
public Double getKbReadForDevice(String deviceName) {
MatchResult result = findFirstMatch(Pattern.compile(buildRegex(deviceName)));
if (result == null)
return null;
Double sectorsRead = Double.valueOf(result.group(3));
return sectorsRead * sectorSizeBytes / 1024;
}
public Double getKbWrittenForDevice(String deviceName) {
MatchResult result = findFirstMatch(Pattern.compile(buildRegex(deviceName)));
if (result == null)
return null;
Double sectorsWritten = Double.valueOf(result.group(7));
return sectorsWritten * sectorSizeBytes / 1024;
}
private String buildRegex(String deviceName) {
return "\\b" + Pattern.quote(deviceName) + "\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)";
}
public List<String> getDevices() {
String regex = "^\\s*\\d+\\s+\\d+\\s+([a-zA-Z0-9]+)\\s+.*$";
Pattern pattern = Pattern.compile(regex);
List<MatchResult> results = findAllLinesMatching(pattern);
return results.stream().map(m -> m.group(1)).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,99 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.clientload;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.Files;
import java.util.List;
import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;
public abstract class LinuxSystemFileReader {
protected Logger logger;
protected String filePath;
public LinuxSystemFileReader(String filePath) {
logger = LogManager.getLogger(this.getClass());
this.filePath = filePath;
}
public boolean fileExists() {
Path path = Paths.get(filePath);
return Files.exists(path);
}
protected Double extract(String regex, int groupIdx){
Pattern pattern = Pattern.compile(regex);
MatchResult result = findFirstMatch(pattern);
if (result == null)
return null;
assert (1 <= groupIdx && groupIdx <= result.groupCount());
return Double.valueOf(result.group(groupIdx));
}
protected MatchResult findFirstMatch(Pattern pattern) {
Matcher matcher = null;
try (FileReader file = new FileReader(filePath);
BufferedReader reader = new BufferedReader(file)) {
String line;
while ((line = reader.readLine()) != null) {
matcher = pattern.matcher(line);
if (matcher.find())
break;
}
} catch (FileNotFoundException e) {
logger.warn("File not found: " + filePath);
} catch (final Throwable t) {
throw new RuntimeException("Failed to read " + filePath);
}
if (matcher == null)
return null;
return matcher.toMatchResult();
}
protected List<MatchResult> findAllLinesMatching(Pattern pattern) {
List<MatchResult> results = new ArrayList<>();
Matcher matcher;
try (FileReader file = new FileReader(filePath);
BufferedReader reader = new BufferedReader(file)) {
String line;
while ((line = reader.readLine()) != null) {
try {
matcher = pattern.matcher(line);
if (matcher.find())
results.add(matcher.toMatchResult());
} catch (Exception e) {
logger.error("Error processing line: " + e.getMessage());
}
}
} catch (FileNotFoundException e) {
logger.warn("File not found: " + filePath);
}
catch (final Throwable t) {
throw new RuntimeException("Failed to read " + filePath);
}
return results;
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.clientload;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class LoadAvgReader extends LinuxSystemFileReader {
/**
* Example line:
* 0.78 1.39 2.03 1/2153 2818
*/
private static final String regex = "(\\d+\\.\\d+)\\s(\\d+\\.\\d+)\\s(\\d+\\.\\d+)";
public LoadAvgReader(){
super("/proc/loadavg");
}
public Double getOneMinLoadAverage() {
return extract(regex, 1);
}
public Double getFiveMinLoadAverage() {
return extract(regex, 2);
}
public Double getFifteenMinLoadAverage() {
return extract(regex, 3);
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.clientload;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MemInfoReader extends LinuxSystemFileReader {
/**
* References:
* - https://docs.kernel.org/filesystems/proc.html#meminfo
* - https://stackoverflow.com/questions/41224738/how-to-calculate-system-memory-usage-from-proc-meminfo-like-htop
*/
public MemInfoReader() {
super("/proc/meminfo");
}
public Double getMemTotalkB() {
String regex = "MemTotal:\\s+(\\d+) kB";
return extract(regex, 1);
}
public Double getMemFreekB() {
String regex = "MemFree:\\s+(\\d+) kB";
return extract(regex, 1);
}
public Double getMemAvailablekB() {
String regex = "MemAvailable:\\s+(\\d+) kB";
return extract(regex, 1);
}
public Double getMemUsedkB() {
Double memTotal = getMemTotalkB();
Double memFree = getMemFreekB();
if (memTotal != null && memFree != null)
return memTotal - memFree;
return null;
}
public Double getMemCachedkB() {
String regex = "Cached:\\s+(\\d+) kB";
return extract(regex, 1);
}
public Double getMemBufferskB() {
String regex = "Buffers:\\s+(\\d+) kB";
return extract(regex, 1);
}
public Double getSwapTotalkB() {
String regex = "SwapTotal:\\s+(\\d+) kB";
return extract(regex, 1);
}
public Double getSwapFreekB() {
String regex = "SwapFree:\\s+(\\d+) kB";
return extract(regex, 1);
}
public Double getSwapUsedkB() {
Double swapTotal = getSwapTotalkB();
Double swapFree = getSwapFreekB();
if (swapTotal != null && swapFree != null)
return swapTotal - swapFree;
return null;
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.clientload;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class NetDevReader extends LinuxSystemFileReader {
/**
* Note that all fields are cumulative in /proc/net/dev
*
* Reference:
* - https://www.linuxquestions.org/questions/linux-networking-3/need-explanation-of-proc-net-dev-bytes-counters-4175458860/
*
* Example line:
* wlp59s0: 2941956695 4935327 0 0 0 0 0 0 1213470966 3450551 0 0 0 0 0 0
*/
public NetDevReader() {
super("/proc/net/dev");
}
public Double getBytesReceived(String interfaceName) {
return extract(buildRegex(interfaceName), 1);
}
public Double getPacketsReceived(String interfaceName) {
return extract(buildRegex(interfaceName), 2);
}
public Double getBytesTransmitted(String interfaceName) {
return extract(buildRegex(interfaceName), 3);
}
public Double getPacketsTransmitted(String interfaceName) {
return extract(buildRegex(interfaceName), 4);
}
private String buildRegex(String interfaceName) {
return "\\b" + Pattern.quote(interfaceName) + "\\s*:(\\s*\\d+)\\s+(\\d+)\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+(\\d+)\\s+(\\d+)";
}
public List<String> getInterfaces() {
String regex = "^\\s*([^\\s:]+):\\s*\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+\\s+\\d+";
Pattern pattern = Pattern.compile(regex);
List<MatchResult> results = findAllLinesMatching(pattern);
return results.stream().map(m -> m.group(1)).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.clientload;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.regex.MatchResult;
import java.util.regex.Pattern;
public class StatReader extends LinuxSystemFileReader {
/**
* Note that all fields are cumulative within /proc/stat.
*
* Reference:
* - https://docs.kernel.org/filesystems/proc.html#miscellaneous-kernel-statistics-in-proc-stat
*
* Example line:
* cpu 6955150 945 1205506 139439365 115574 0 113356 0 0 0
*/
private static final String regex = "cpu\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)\\s+(\\d+)";
public StatReader() {
super("/proc/stat");
}
public Double getUserTime() {
return extract(regex, 1);
}
public Double getSystemTime() {
return extract(regex, 3);
}
public Double getIdleTime() {
return extract(regex, 4);
}
public Double getIoWaitTime() {
return extract(regex, 5);
}
public Double getTotalTime() {
MatchResult result = findFirstMatch(Pattern.compile(regex));
if (result == null)
return null;
Double user = Double.valueOf(result.group(1));
Double nice = Double.valueOf(result.group(2));
Double system = Double.valueOf(result.group(3));
Double idle = Double.valueOf(result.group(4));
Double iowait = Double.valueOf(result.group(5));
Double irq = Double.valueOf(result.group(6));
Double softirq = Double.valueOf(result.group(7));
return user + nice + system + idle + iowait + irq + softirq;
}
}