merge fixups and adapting client load sensors

This commit is contained in:
Jonathan Shook
2023-10-16 17:35:16 -05:00
parent 22d460896f
commit fe1d3193a4
8 changed files with 112 additions and 675 deletions

View File

@@ -2,13 +2,13 @@ package io.nosqlbench.adapter.mongodb.core;
/*
* Copyright (c) 2022 nosqlbench
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,7 +24,7 @@ import io.nosqlbench.components.NBComponent;
import io.nosqlbench.nb.annotations.Service;
@Service(value = DriverAdapterLoader.class, selector = "mongodb")
public class KafkaDriverAdapterLoader implements DriverAdapterLoader {
public class MongoDriverAdapterLoader implements DriverAdapterLoader {
@Override
public MongodbDriverAdapter load(NBComponent parent, NBLabels childLabels) {
return new MongodbDriverAdapter(parent, childLabels);

View File

@@ -24,7 +24,6 @@ import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import io.nosqlbench.components.NBNamedElement;
import com.mongodb.client.MongoDatabase;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;

View File

@@ -24,7 +24,6 @@ import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.apps.BundledApp;
import io.nosqlbench.api.content.Content;
import io.nosqlbench.api.content.NBIO;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.api.labels.NBLabeledElement;
@@ -393,13 +392,6 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
// }
// client machine metrics; TODO: modify pollInterval
this.clientMetricChecker = new ClientSystemMetricChecker(10);
registerLoadAvgMetrics();
registerMemInfoMetrics();
registerDiskStatsMetrics();
registerNetworkInterfaceMetrics();
registerStatMetrics();
clientMetricChecker.start();
// intentionally not shown for warn-only
NBCLI.logger.info(() -> "console logging level is " + options.getConsoleLogLevel());
@@ -435,124 +427,6 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
}
private void registerLoadAvgMetrics() {
LoadAvgReader reader = new LoadAvgReader();
if (!reader.fileExists())
return;
Gauge<Double> loadAvgOneMinGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getOneMinLoadAverage(), "loadavg_1min")
);
Gauge<Double> loadAvgFiveMinGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getFiveMinLoadAverage(), "loadavg_5min")
);
Gauge<Double> loadAvgFifteenMinuteGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getFifteenMinLoadAverage(), "loadavg_15min")
);
// add checking for CPU load averages; TODO: Modify thresholds
clientMetricChecker.addMetricToCheck("loadavg_5min", loadAvgFiveMinGauge, 50.0);
clientMetricChecker.addMetricToCheck("loadavg_1min", loadAvgOneMinGauge, 50.0);
clientMetricChecker.addMetricToCheck("loadavg_15min", loadAvgFifteenMinuteGauge, 50.0);
}
private void registerMemInfoMetrics() {
MemInfoReader reader = new MemInfoReader();
if (!reader.fileExists())
return;
Gauge<Double> memTotalGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getMemTotalkB(), "mem_total")
);
Gauge<Double> memUsedGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getMemUsedkB(), "mem_used")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getMemFreekB(), "mem_free")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getMemAvailablekB(), "mem_available")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getMemCachedkB(), "mem_cached")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getMemBufferskB(), "mem_buffered")
);
// add checking for percent memory used at some given time; TODO: Modify percent threshold
clientMetricChecker.addRatioMetricToCheck(
"mem_used_percent", memUsedGauge, memTotalGauge, 90.0, false
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getSwapTotalkB(), "swap_total")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getSwapFreekB(), "swap_free")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getSwapUsedkB(), "swap_used")
);
}
private void registerDiskStatsMetrics() {
DiskStatsReader reader = new DiskStatsReader();
if (!reader.fileExists())
return;
for (String device: reader.getDevices()) {
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getTransactionsForDevice(device), device + "_transactions")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getKbReadForDevice(device), device + "_kB_read")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getKbWrittenForDevice(device), device + "_kB_written")
);
}
}
private void registerNetworkInterfaceMetrics() {
NetDevReader reader = new NetDevReader();
if (!reader.fileExists())
return;
for (String interfaceName: reader.getInterfaces()) {
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getBytesReceived(interfaceName), interfaceName + "_rx_bytes")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getPacketsReceived(interfaceName), interfaceName + "_rx_packets")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getBytesTransmitted(interfaceName), interfaceName + "_tx_bytes")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getPacketsTransmitted(interfaceName), interfaceName + "_tx_packets")
);
}
}
private void registerStatMetrics() {
StatReader reader = new StatReader();
if (!reader.fileExists())
return;
Gauge<Double> cpuUserGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getUserTime(), "cpu_user")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getSystemTime(), "cpu_system")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getIdleTime(), "cpu_idle")
);
ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getIoWaitTime(), "cpu_iowait")
);
Gauge<Double> cpuTotalGauge = ActivityMetrics.register(
new NBFunctionGauge(this, () -> reader.getTotalTime(), "cpu_total")
);
// add checking for percent of time spent in user space; TODO: Modify percent threshold
clientMetricChecker.addRatioMetricToCheck(
"cpu_user_percent", cpuUserGauge, cpuTotalGauge, 50.0, true
);
}
@Override
public NBLabels getLabels() {

View File

@@ -17,6 +17,10 @@
package io.nosqlbench.engine.core.clientload;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Executors;
@@ -25,22 +29,23 @@ import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
public class ClientSystemMetricChecker {
public class ClientSystemMetricChecker extends NBBaseComponent {
private final int pollIntervalSeconds;
private final ScheduledExecutorService scheduler;
private List<ClientMetric> clientMetrics;
public ClientSystemMetricChecker(int pollIntervalSeconds) {
public ClientSystemMetricChecker(NBComponent parent, NBLabels additionalLabels, int pollIntervalSeconds) {
super(parent,additionalLabels);
this.pollIntervalSeconds = pollIntervalSeconds;
this.scheduler = Executors.newScheduledThreadPool(1);
this.clientMetrics = new ArrayList<>();
}
public void addMetricToCheck(String name, Gauge<Double> metric, Double threshold) {
addRatioMetricToCheck(name, metric, null, threshold, false);
public void addMetricToCheck(NBMetricGauge gauge, Double threshold) {
addRatioMetricToCheck(gauge, null, threshold, false);
}
public void addRatioMetricToCheck(String name, Gauge<Double> numerator, Gauge<Double> denominator, Double threshold, boolean retainPrev) {
public void addRatioMetricToCheck(NBMetricGauge numerator, NBMetricGauge denominator, Double threshold, boolean retainPrev) {
/**
* Some "meaningful" system metrics are derived via:
* - taking a ratio of instantaneous values (e.g. MemUsed / MemTotal from /proc/meminfo)
@@ -48,13 +53,11 @@ public class ClientSystemMetricChecker {
*
* This method serves to be able to allow checking those which can be derived as a ratio of two existing metrics.
*/
clientMetrics.add(new ClientMetric(name, numerator, denominator, threshold, retainPrev));
clientMetrics.add(new ClientMetric(numerator, denominator, threshold, retainPrev));
}
public void start() {
scheduler.scheduleAtFixedRate(() -> {
checkMetrics();
}, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(this::checkMetrics, pollIntervalSeconds, pollIntervalSeconds, TimeUnit.SECONDS);
}
private void checkMetrics() {
@@ -68,20 +71,18 @@ public class ClientSystemMetricChecker {
private class ClientMetric {
private static final Logger logger = LogManager.getLogger(ClientMetric.class);
private final String name;
private final Gauge<Double> numerator;
private final Gauge<Double> denominator;
private final NBMetricGauge numerator;
private final NBMetricGauge denominator;
private final Double threshold;
private final Boolean retainPrevValue;
private Double prevNumeratorValue;
private Double prevDenominatorValue;
private ClientMetric(String name, Gauge<Double> gauge, Double threshold) {
this(name, gauge, null, threshold, false);
private ClientMetric(NBMetricGauge gauge, Double threshold) {
this(gauge, null, threshold, false);
}
private ClientMetric(String name, Gauge<Double> numerator, Gauge<Double> denominator, Double threshold, Boolean retainPrevValue) {
this.name = name;
private ClientMetric(NBMetricGauge numerator, NBMetricGauge denominator, Double threshold, Boolean retainPrevValue) {
this.numerator = numerator;
this.denominator = denominator;
this.threshold = threshold;
@@ -123,7 +124,7 @@ public class ClientSystemMetricChecker {
private void check() {
Double extractedVal = extract();
if (extractedVal != null && extractedVal > threshold)
logger.warn(name + " value = " + extractedVal + " > threshold " + threshold);
logger.warn(getLabels().asMap().get("name") + " value = " + extractedVal + " > threshold " + threshold);
}
}
}

View File

@@ -449,15 +449,6 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
this.activity.create().gauge("threads",() -> (double) this.motors.size());
}
private void unregisterMetrics() {
// TODO: metrics
// ActivityMetrics.unregister(this.threadsGauge);
if (this.threadsGauge != null) {
ActivityMetrics.unregister(this.threadsGauge);
this.threadsGauge = null;
}
private boolean shutdownExecutorService(int secondsToWait) {

View File

@@ -1,506 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario;
import com.codahale.metrics.MetricRegistry;
import com.oracle.truffle.js.scriptengine.GraalJSScriptEngine;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.extensions.SandboxExtensionFinder;
import io.nosqlbench.api.extensions.ScriptingExtensionPluginInfo;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.metadata.ScenarioMetadata;
import io.nosqlbench.api.metadata.ScenarioMetadataAware;
import io.nosqlbench.api.metadata.SystemId;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScenarioContext;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScriptParams;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.ActivityBindings;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotMetricRegistryBindings;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotScenarioController;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.Engine.Builder;
import org.graalvm.polyglot.EnvironmentAccess;
import org.graalvm.polyglot.HostAccess;
import org.graalvm.polyglot.PolyglotAccess;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.Callable;
public class NBScenario extends NBBaseComponent implements Callable<ExecutionMetricsResult> {
private final String reportSummaryTo;
private final Path logsPath;
private final Invocation invocation;
private final String scriptfile;
private Logger logger = LogManager.getLogger("SCENARIO");
private State state = State.Scheduled;
private volatile ScenarioShutdownHook scenarioShutdownHook;
private Exception error;
private ScenarioMetadata scenarioMetadata;
private ExecutionMetricsResult result;
private final NBLabeledElement parentComponent;
public Optional<ExecutionMetricsResult> getResultIfComplete() {
return Optional.ofNullable(result);
}
public enum Invocation {
RENDER_SCRIPT,
EXECUTE_SCRIPT
}
public enum State {
Scheduled,
Running,
Errored,
Interrupted,
Finished
}
private final List<String> scripts = new ArrayList<>();
private ScriptEngine scriptEngine;
private ScenarioController scenarioController;
private ActivityProgressIndicator activityProgressIndicator;
private String progressInterval = "console:1m";
private ScenarioContext scriptEnv;
private final String scenarioName;
private ScriptParams scenarioScriptParams;
private final Engine engine = Engine.Graalvm;
private long startedAtMillis = -1L;
private long endedAtMillis = -1L;
public enum Engine {
Graalvm
}
public NBScenario(
final String scenarioName,
final String progressInterval,
final String reportSummaryTo,
final Path logsPath,
String scriptfile,
NBComponent parentComponent,
Invocation invocation
) {
super(parentComponent, NBLabels.forKV("scenario", scenarioName));
this.scenarioName = scenarioName;
this.progressInterval = progressInterval;
this.reportSummaryTo = reportSummaryTo;
this.logsPath = logsPath;
this.scriptfile = scriptfile;
this.parentComponent = parentComponent;
this.invocation = invocation;
}
public static NBScenario forTesting(final String name, final String reportSummaryTo, NBComponent parent) {
return new NBScenario(
name,
"console:10s",
reportSummaryTo,
Path.of("logs"),
"",
new NBBaseComponent(parent,NBLabels.forKV("test","testrun")),
Invocation.EXECUTE_SCRIPT
);
}
public NBScenario setLogger(final Logger logger) {
this.logger = logger;
return this;
}
public Logger getLogger() {
return this.logger;
}
public NBScenario addScriptText(final String scriptText) {
this.scripts.add(scriptText);
return this;
}
public NBScenario addScriptFiles(final String... args) {
for (final String scriptFile : args) {
final Path scriptPath = Paths.get(scriptFile);
byte[] bytes = new byte[0];
try {
bytes = Files.readAllBytes(scriptPath);
} catch (final IOException e) {
e.printStackTrace();
}
final ByteBuffer bb = ByteBuffer.wrap(bytes);
final Charset utf8 = StandardCharsets.UTF_8;
final String scriptData = utf8.decode(bb).toString();
this.addScriptText(scriptData);
}
return this;
}
private void initializeScriptingEngine(final ScenarioController scenarioController) {
this.logger.debug("Using engine {}", this.engine.toString());
final MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
final Context.Builder contextSettings = Context.newBuilder("js")
.allowHostAccess(HostAccess.ALL)
.allowNativeAccess(true)
.allowCreateThread(true)
.allowIO(true)
.allowHostClassLookup(s -> true)
.allowHostClassLoading(true)
.allowCreateProcess(true)
.allowAllAccess(true)
.allowEnvironmentAccess(EnvironmentAccess.INHERIT)
.allowPolyglotAccess(PolyglotAccess.ALL)
.option("js.ecmascript-version", "2022")
.option("js.nashorn-compat", "true");
final Builder engineBuilder = org.graalvm.polyglot.Engine.newBuilder();
engineBuilder.option("engine.WarnInterpreterOnly", "false");
final org.graalvm.polyglot.Engine polyglotEngine = engineBuilder.build();
// TODO: add in, out, err for this scenario
scriptEngine = GraalJSScriptEngine.create(polyglotEngine, contextSettings);
if (!"disabled".equals(progressInterval))
this.activityProgressIndicator = new ActivityProgressIndicator(scenarioController, this.progressInterval);
this.scriptEnv = new ScenarioContext(scenarioName, scenarioController);
this.scriptEngine.setContext(this.scriptEnv);
this.scriptEngine.put("params", this.scenarioScriptParams);
this.scriptEngine.put("scenario", new PolyglotScenarioController(scenarioController));
this.scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
this.scriptEngine.put("activities", new ActivityBindings(scenarioController));
for (final ScriptingExtensionPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
if (!extensionDescriptor.isAutoLoading()) {
this.logger.info(() -> "Not loading " + extensionDescriptor + ", autoloading is false");
continue;
}
final Logger extensionLogger =
LogManager.getLogger("extensions." + extensionDescriptor.getBaseVariableName());
final Object extensionObject = extensionDescriptor.getExtensionObject(
extensionLogger,
this
);
ScenarioMetadataAware.apply(extensionObject, this.getScenarioMetadata());
this.logger.trace(() -> "Adding extension object: name=" + extensionDescriptor.getBaseVariableName() +
" class=" + extensionObject.getClass().getSimpleName());
this.scriptEngine.put(extensionDescriptor.getBaseVariableName(), extensionObject);
}
}
private synchronized ScenarioMetadata getScenarioMetadata() {
if (null == this.scenarioMetadata) scenarioMetadata = new ScenarioMetadata(
startedAtMillis,
scenarioName,
SystemId.getNodeId(),
SystemId.getNodeFingerprint()
);
return this.scenarioMetadata;
}
private synchronized void runScenario() {
this.scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(this.scenarioShutdownHook);
this.state = State.Running;
this.startedAtMillis = System.currentTimeMillis();
Annotators.recordAnnotation(
Annotation.newBuilder()
.element(this)
.now()
.layer(Layer.Scenario)
.addDetail("engine", engine.toString())
.build()
);
this.logger.debug("Running control script for {}.", scenarioName);
this.scenarioController = new ScenarioController(this);
try {
this.initializeScriptingEngine(this.scenarioController);
this.executeScenarioScripts();
final long awaitCompletionTime = 86400 * 365 * 1000L;
this.logger.debug("Awaiting completion of scenario and activities for {} millis.", awaitCompletionTime);
this.scenarioController.awaitCompletion(awaitCompletionTime);
} catch (final Exception e) {
error = e;
} finally {
this.scenarioController.shutdown();
}
Runtime.getRuntime().removeShutdownHook(this.scenarioShutdownHook);
final var runHook = this.scenarioShutdownHook;
this.scenarioShutdownHook = null;
runHook.run();
this.logger.debug("removing scenario shutdown hook");
}
public void notifyException(final Thread t, final Throwable e) {
error = new RuntimeException("in thread " + t.getName() + ", " + e, e);
}
private void executeScenarioScripts() {
for (final String script : this.scripts)
try {
Object result = null;
if ((scriptEngine instanceof Compilable compilableEngine)) {
this.logger.debug("Using direct script compilation");
final CompiledScript compiled = compilableEngine.compile(script);
this.logger.debug("-> invoking main scenario script (compiled)");
result = compiled.eval();
this.logger.debug("<- scenario script completed (compiled)");
} else if ((null != scriptfile) && !this.scriptfile.isEmpty()) {
final String filename = this.scriptfile.replace("_SESSION_", this.scenarioName);
this.logger.debug("-> invoking main scenario script (interpreted from {})", filename);
final Path written = Files.write(
Path.of(filename),
script.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.CREATE
);
final BufferedReader reader = Files.newBufferedReader(written);
this.scriptEngine.eval(reader);
this.logger.debug("<- scenario control script completed (interpreted) from {})", filename);
} else {
this.logger.debug("-> invoking main scenario script (interpreted)");
result = this.scriptEngine.eval(script);
this.logger.debug("<- scenario control script completed (interpreted)");
}
if (null != result)
this.logger.debug("scenario result: type({}): value:{}", result.getClass().getCanonicalName(), result);
System.err.flush();
System.out.flush();
} catch (final Exception e) {
error = e;
state = State.Errored;
this.logger.error("Error in scenario, shutting down. ({})", e);
try {
scenarioController.forceStopScenario(5000, false);
} catch (final Exception eInner) {
this.logger.debug("Found inner exception while forcing stop with rethrow=false: {}", eInner);
} finally {
throw new RuntimeException(e);
}
} finally {
System.out.flush();
System.err.flush();
this.endedAtMillis = System.currentTimeMillis();
}
}
public void finish() {
this.logger.debug("finishing scenario");
this.endedAtMillis = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
if (State.Running == this.state) state = State.Finished;
if (null != scenarioShutdownHook) {
// If this method was called while the shutdown hook is defined, then it means
// that the scenario was ended before the hook was uninstalled normally.
state = State.Interrupted;
this.logger.warn("Scenario was interrupted by process exit, shutting down");
} else
this.logger.info("Scenario completed successfully, with {} logical activities.", this.scenarioController.getActivityExecutorMap().size());
this.logger.info(() -> "scenario state: " + state);
// We report the scenario state via annotation even for short runs
final Annotation annotation = Annotation.newBuilder()
.element(this)
.interval(startedAtMillis, this.endedAtMillis)
.layer(Layer.Scenario)
.addDetail("event", "stop-scenario")
.build();
Annotators.recordAnnotation(annotation);
}
public long getStartedAtMillis() {
return this.startedAtMillis;
}
public long getEndedAtMillis() {
return this.endedAtMillis;
}
/**
* This should be the only way to get a ScenarioResult for a Scenario.
* <p>
* The lifecycle of a scenario includes the lifecycles of all of the following:
* <OL>
* <LI>The scenario control script, executing within a graaljs context.</LI>
* <LI>The lifecycle of every activity which is started within the scenario.</LI>
* </OL>
* <p>
* All of these run asynchronously within the scenario, however the same thread that calls
* the scenario is the one which executes the control script. A scenario ends when all
* of the following conditions are met:
* <UL>
* <LI>The scenario control script has run to completion, or experienced an exception.</LI>
* <LI>Each activity has run to completion, experienced an exception, or all</LI>
* </UL>
*
* @return
*/
@Override
public synchronized ExecutionMetricsResult call() {
if (null == result) {
try {
this.runScenario();
} catch (final Exception e) {
error = e;
} finally {
this.logger.debug("{} scenario run", null == this.error ? "NORMAL" : "ERRORED");
}
String iolog = error != null ? error.toString() : this.scriptEnv.getTimedLog();
result = new ExecutionMetricsResult(startedAtMillis, endedAtMillis, iolog, this.error);
this.result.reportMetricsSummaryToLog();
this.doReportSummaries(this.reportSummaryTo, this.result);
}
return this.result;
}
private void doReportSummaries(final String reportSummaryTo, final ExecutionMetricsResult result) {
final List<PrintStream> fullChannels = new ArrayList<>();
final List<PrintStream> briefChannels = new ArrayList<>();
final String[] destinationSpecs = reportSummaryTo.split(", *");
for (final String spec : destinationSpecs)
if ((null != spec) && !spec.isBlank()) {
final String[] split = spec.split(":", 2);
final String summaryTo = split[0];
final long summaryWhen = (2 == split.length) ? (Long.parseLong(split[1]) * 1000L) : 0;
PrintStream out = null;
switch (summaryTo.toLowerCase()) {
case "console":
case "stdout":
out = System.out;
break;
case "stderr":
out = System.err;
break;
default:
final String outName = summaryTo
.replaceAll("_SESSION_", scenarioName)
.replaceAll("_LOGS_", this.logsPath.toString());
try {
out = new PrintStream(new FileOutputStream(outName));
break;
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
}
}
if (result.getElapsedMillis() > summaryWhen) fullChannels.add(out);
else {
this.logger.debug("Summarizing counting metrics only to {} with scenario duration of {}ms (<{})", spec, summaryWhen, summaryWhen);
briefChannels.add(out);
}
}
fullChannels.forEach(result::reportMetricsSummaryTo);
// briefChannels.forEach(result::reportCountsTo);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if ((null == o) || (this.getClass() != o.getClass())) {
return false;
}
final NBScenario scenario = (NBScenario) o;
return Objects.equals(this.scenarioName, scenario.scenarioName);
}
@Override
public int hashCode() {
return (null != this.scenarioName) ? scenarioName.hashCode() : 0;
}
public String getScenarioName() {
return this.scenarioName;
}
public ScenarioController getScenarioController() {
return this.scenarioController;
}
public String getScriptText() {
return String.join("", this.scripts);
}
public Optional<List<String>> getIOLog() {
return Optional.ofNullable(this.scriptEnv).map(ScriptEnvBuffer::getTimeLogLines);
}
public String toString() {
return "name:'" + scenarioName + '\'';
}
public void addScenarioScriptParams(final ScriptParams scenarioScriptParams) {
this.scenarioScriptParams = scenarioScriptParams;
}
public void addScenarioScriptParams(final Map<String, String> scriptParams) {
this.addScenarioScriptParams(new ScriptParams() {{
this.putAll(scriptParams);
}});
}
public State getScenarioState() {
return this.state;
}
public String getReportSummaryTo() {
return this.reportSummaryTo;
}
}

View File

@@ -17,8 +17,9 @@
package io.nosqlbench.engine.core.lifecycle.session;
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.spi.SimpleServiceLoader;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponentSubScope;
@@ -26,6 +27,7 @@ import io.nosqlbench.components.decorators.NBTokenWords;
import io.nosqlbench.engine.cli.BasicScriptBuffer;
import io.nosqlbench.engine.cli.Cmd;
import io.nosqlbench.engine.cli.ScriptBuffer;
import io.nosqlbench.engine.core.clientload.*;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioParams;
@@ -33,7 +35,6 @@ import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBScenario;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosExecutor;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosResults;
import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
import io.nosqlbench.nb.annotations.Maturity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -52,6 +53,7 @@ import java.util.function.Function;
public class NBSession extends NBBaseComponent implements Function<List<Cmd>, ExecutionResult>, NBTokenWords {
private final static Logger logger = LogManager.getLogger(NBSession.class);
private final String sessionName;
private final ClientSystemMetricChecker clientMetricChecker;
public enum STATUS {
OK,
@@ -65,6 +67,15 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
) {
super(null, labelContext.getLabels().and("session", sessionName));
this.sessionName = sessionName;
this.clientMetricChecker = new ClientSystemMetricChecker(this, NBLabels.forKV(),10);
registerLoadAvgMetrics();
registerMemInfoMetrics();
registerDiskStatsMetrics();
registerNetworkInterfaceMetrics();
registerStatMetrics();
clientMetricChecker.start();
}
public ExecutionResult apply(List<Cmd> cmds) {
@@ -87,19 +98,13 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
scenario = buildJavacriptScenario(cmds);
}
try (NBComponentSubScope scope = new NBComponentSubScope(scenario)) {
assert scenario != null;
scenariosExecutor.execute(scenario,params);
scenariosExecutor.execute(scenario, params);
// this.doReportSummaries(this.reportSummaryTo, this.result);
}
final ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults();
logger.debug(() -> "Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor");
// ActivityMetrics.closeMetrics();
// scenariosResults.reportToLog();
// ShutdownManager.shutdown();
//
// logger.info(scenariosResults.getExecutionSummary());
// logger.info(scenariosResults.getExecutionSummary());
if (scenariosResults.hasError()) {
results.error(scenariosResults.getAnyError().orElseThrow());
@@ -136,7 +141,7 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
}
private NBScenario buildJavaScenario(List<Cmd> cmds) {
if (cmds.size()!=1) {
if (cmds.size() != 1) {
throw new RuntimeException("java scenarios require exactly 1 java command");
}
Cmd javacmd = cmds.get(0);
@@ -173,5 +178,79 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
}
private void registerLoadAvgMetrics() {
LoadAvgReader reader = new LoadAvgReader();
if (!reader.fileExists())
return;
NBFunctionGauge load1m = create().gauge("loadavg_1min", reader::getOneMinLoadAverage);
clientMetricChecker.addMetricToCheck(load1m, 50.0);
NBFunctionGauge load5m = create().gauge("loadavg_5min", reader::getFiveMinLoadAverage);
clientMetricChecker.addMetricToCheck(load5m, 50.0);
NBFunctionGauge load15m = create().gauge("loadavg_15min", reader::getFifteenMinLoadAverage);
clientMetricChecker.addMetricToCheck(load15m, 50.0);
// add checking for CPU load averages; TODO: Modify thresholds
}
private void registerMemInfoMetrics() {
MemInfoReader reader = new MemInfoReader();
if (!reader.fileExists())
return;
NBMetricGauge memTotalGauge = create().gauge("mem_total",reader::getMemTotalkB);
NBMetricGauge memUsedGauge = create().gauge("mem_used",reader::getMemUsedkB);
NBMetricGauge memFreeGauge = create().gauge("mem_free",reader::getMemFreekB);
NBMetricGauge memAvailableGauge = create().gauge("mem_avaialble",reader::getMemAvailablekB);
NBMetricGauge memCachedGauge = create().gauge("mem_cache",reader::getMemCachedkB);
NBMetricGauge memBufferedGauge = create().gauge("mem_buffered", reader::getMemBufferskB);
// add checking for percent memory used at some given time; TODO: Modify percent threshold
clientMetricChecker.addRatioMetricToCheck(memUsedGauge, memTotalGauge, 90.0, false);
NBMetricGauge swapTotalGauge = create().gauge("swap_total", reader::getSwapTotalkB);
NBMetricGauge swapFreeGauge = create().gauge("swap_free",reader::getSwapFreekB);
NBMetricGauge swapUsedGauge = create().gauge("swap_used",reader::getSwapUsedkB);
}
private void registerDiskStatsMetrics() {
DiskStatsReader reader = new DiskStatsReader();
if (!reader.fileExists())
return;
for (String device : reader.getDevices()) {
create().gauge(device +"_transactions", () ->reader.getTransactionsForDevice(device));
create().gauge(device +"_kB_read", () -> reader.getKbReadForDevice(device));
create().gauge(device+"_kB_written", () -> reader.getKbWrittenForDevice(device));
}
}
private void registerNetworkInterfaceMetrics() {
NetDevReader reader = new NetDevReader();
if (!reader.fileExists())
return;
for (String iface : reader.getInterfaces()) {
create().gauge(iface+"_rx_bytes",() -> reader.getBytesReceived(iface));
create().gauge(iface+"_rx_packets",() -> reader.getPacketsReceived(iface));
create().gauge(iface+"_tx_bytes",() -> reader.getBytesTransmitted(iface));
create().gauge(iface+"_tx_packets",() -> reader.getPacketsTransmitted(iface));
}
}
private void registerStatMetrics() {
StatReader reader = new StatReader();
if (!reader.fileExists())
return;
NBMetricGauge cpuUserGauge = create().gauge("cpu_user", reader::getUserTime);
NBMetricGauge cpuSystemGauge = create().gauge("cpu_system",reader::getSystemTime);
NBMetricGauge cpuIdleGauge = create().gauge("cpu_idle", reader::getIdleTime);
NBMetricGauge cpuIoWaitGauge = create().gauge("cpu_iowait", reader::getIoWaitTime);
NBMetricGauge cpuTotalGauge = create().gauge("cpu_total", reader::getTotalTime);
// add checking for percent of time spent in user space; TODO: Modify percent threshold
clientMetricChecker.addRatioMetricToCheck(cpuUserGauge, cpuTotalGauge, 50.0, true);
}
}

View File

@@ -17,7 +17,6 @@
package io.nosqlbench.api.histo;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.engine.metrics.HistoStatsLogger;
import io.nosqlbench.api.engine.util.Unit;
import io.nosqlbench.components.NBBaseComponent;