Merge remote-tracking branch 'origin/main' into jk-test-eng-95-expected-result-verification

# Conflicts:
#	adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java
#	engine-api/src/test/java/io/nosqlbench/engine/api/activityapi/errorhandling/modular/NBErrorHandlerTest.java
#	engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java
This commit is contained in:
kijanowski
2023-05-17 11:21:11 +02:00
153 changed files with 5727 additions and 4560 deletions

View File

@@ -14,7 +14,8 @@
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,8 +17,10 @@
package io.nosqlbench.engine.core.lifecycle;
import com.codahale.metrics.*;
import com.codahale.metrics.ConsoleReporter.Builder;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.core.logging.Log4JMetricsReporter;
import io.nosqlbench.api.engine.metrics.reporters.Log4JMetricsReporter;
import io.nosqlbench.api.engine.metrics.reporters.Log4JMetricsReporter.LoggingLevel;
import io.nosqlbench.engine.core.metrics.NBMetricsSummary;
import java.io.ByteArrayOutputStream;
@@ -48,74 +50,66 @@ public class ExecutionMetricsResult extends ExecutionResult {
MetricAttribute.M15_RATE
);
public ExecutionMetricsResult(long startedAt, long endedAt, String iolog, Exception error) {
public ExecutionMetricsResult(final long startedAt, final long endedAt, final String iolog, final Exception error) {
super(startedAt, endedAt, iolog, error);
}
public String getMetricsSummary() {
ByteArrayOutputStream os = new ByteArrayOutputStream();
try (PrintStream ps = new PrintStream(os)) {
ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(ActivityMetrics.getMetricRegistry())
final ByteArrayOutputStream os = new ByteArrayOutputStream();
try (final PrintStream ps = new PrintStream(os)) {
final Builder builder = ConsoleReporter.forRegistry(ActivityMetrics.getMetricRegistry())
.convertDurationsTo(TimeUnit.MICROSECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.filter(MetricFilter.ALL)
.outputTo(ps);
Set<MetricAttribute> disabled = new HashSet<>(INTERVAL_ONLY_METRICS);
if (this.getElapsedMillis()<60000) {
disabled.addAll(OVER_ONE_MINUTE_METRICS);
}
final Set<MetricAttribute> disabled = new HashSet<>(ExecutionMetricsResult.INTERVAL_ONLY_METRICS);
if (60000 > this.getElapsedMillis()) disabled.addAll(ExecutionMetricsResult.OVER_ONE_MINUTE_METRICS);
builder.disabledMetricAttributes(disabled);
ConsoleReporter consoleReporter = builder.build();
final ConsoleReporter consoleReporter = builder.build();
consoleReporter.report();
consoleReporter.close();
}
String result = os.toString(StandardCharsets.UTF_8);
final String result = os.toString(StandardCharsets.UTF_8);
return result;
}
public void reportToConsole() {
String summaryReport = getMetricsSummary();
final String summaryReport = this.getMetricsSummary();
System.out.println(summaryReport);
}
public void reportMetricsSummaryTo(PrintStream out) {
out.println(getMetricsSummary());
public void reportMetricsSummaryTo(final PrintStream out) {
out.println(this.getMetricsSummary());
}
public void reportMetricsSummaryToLog() {
logger.debug("-- WARNING: Metrics which are taken per-interval (like histograms) will not have --");
logger.debug("-- active data on this last report. (The workload has already stopped.) Record --");
logger.debug("-- metrics to an external format to see values for each reporting interval. --");
logger.debug("-- BEGIN METRICS DETAIL --");
Log4JMetricsReporter reporter = Log4JMetricsReporter.forRegistry(ActivityMetrics.getMetricRegistry())
.withLoggingLevel(Log4JMetricsReporter.LoggingLevel.DEBUG)
ExecutionResult.logger.debug("-- WARNING: Metrics which are taken per-interval (like histograms) will not have --");
ExecutionResult.logger.debug("-- active data on this last report. (The workload has already stopped.) Record --");
ExecutionResult.logger.debug("-- metrics to an external format to see values for each reporting interval. --");
ExecutionResult.logger.debug("-- BEGIN METRICS DETAIL --");
final Log4JMetricsReporter reporter = Log4JMetricsReporter.forRegistry(ActivityMetrics.getMetricRegistry())
.withLoggingLevel(LoggingLevel.DEBUG)
.convertDurationsTo(TimeUnit.MICROSECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.filter(MetricFilter.ALL)
.outputTo(logger)
.outputTo(ExecutionResult.logger)
.build();
reporter.report();
reporter.close();
logger.debug("-- END METRICS DETAIL --");
ExecutionResult.logger.debug("-- END METRICS DETAIL --");
}
public void reportMetricsCountsTo(PrintStream printStream) {
StringBuilder sb = new StringBuilder();
public void reportMetricsCountsTo(final PrintStream printStream) {
final StringBuilder sb = new StringBuilder();
ActivityMetrics.getMetricRegistry().getMetrics().forEach((k, v) -> {
if (v instanceof Counting counting) {
long count = counting.getCount();
if (count > 0) {
NBMetricsSummary.summarize(sb, k, v);
}
final long count = counting.getCount();
if (0 < count) NBMetricsSummary.summarize(sb, k, v);
} else if (v instanceof Gauge<?> gauge) {
Object value = gauge.getValue();
if (value instanceof Number n) {
if (n.doubleValue() != 0) {
NBMetricsSummary.summarize(sb, k, v);
}
}
final Object value = gauge.getValue();
if (value instanceof Number n) if (0 != n.doubleValue()) NBMetricsSummary.summarize(sb, k, v);
}
});

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
@@ -32,23 +33,23 @@ import java.util.concurrent.ConcurrentHashMap;
* see each other by name.
*/
public class ActivityLoader {
private final static Logger logger = LogManager.getLogger("ACTIVITIES");
private static final Logger logger = LogManager.getLogger("ACTIVITIES");
private final Map<String, Activity> activityMap = new ConcurrentHashMap<>();
private final Scenario scenario;
public ActivityLoader(Scenario scenario) {
public ActivityLoader(final Scenario scenario) {
this.scenario = scenario;
}
public synchronized Activity loadActivity(ActivityDef activityDef) {
public synchronized Activity loadActivity(ActivityDef activityDef, final NBLabeledElement labels) {
activityDef= activityDef.deprecate("yaml","workload").deprecate("type","driver");
Activity activity = new StandardActivityType(activityDef).getAssembledActivity(activityDef, activityMap);
activityMap.put(activity.getAlias(),activity);
logger.debug("Resolved activity for alias '" + activityDef.getAlias() + "'");
final Activity activity = new StandardActivityType(activityDef, labels).getAssembledActivity(activityDef, this.activityMap, labels);
this.activityMap.put(activity.getAlias(),activity);
ActivityLoader.logger.debug("Resolved activity for alias '{}'", activityDef.getAlias());
return activity;
}
public void purgeActivity(String activityAlias) {
this.activityMap.remove(activityAlias);
public void purgeActivity(final String activityAlias) {
activityMap.remove(activityAlias);
}
}

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
@@ -44,76 +45,71 @@ public class ActivityTypeLoader {
private final SimpleServiceLoader<DriverAdapter> DRIVERADAPTER_SPI_FINDER = new SimpleServiceLoader<>(DriverAdapter.class, Maturity.Any);
private final Set<URL> jarUrls = new HashSet<>();
public ActivityTypeLoader setMaturity(Maturity maturity) {
ACTIVITYTYPE_SPI_FINDER.setMaturity(maturity);
public ActivityTypeLoader setMaturity(final Maturity maturity) {
this.ACTIVITYTYPE_SPI_FINDER.setMaturity(maturity);
return this;
}
public ActivityTypeLoader() {
List<String> libpaths = NBEnvironment.INSTANCE.interpolateEach(":", "$" + NBEnvironment.NBLIBS);
final List<String> libpaths = NBEnvironment.INSTANCE.interpolateEach(":", '$' + NBEnvironment.NBLIBS);
Set<URL> urlsToAdd = new HashSet<>();
for (String libpaths_entry : libpaths) {
Path libpath = Path.of(libpaths_entry);
if (Files.isDirectory(libpath)) {
urlsToAdd = addLibDir(urlsToAdd, libpath);
} else if (Files.isRegularFile(libpath) && libpath.toString().toLowerCase().endsWith(".zip")) {
urlsToAdd = addZipDir(urlsToAdd, libpath);
} else if (Files.isRegularFile(libpath) && libpath.toString().toLowerCase().endsWith(".jar")) {
urlsToAdd = addJarFile(urlsToAdd, libpath);
}
for (final String libpaths_entry : libpaths) {
final Path libpath = Path.of(libpaths_entry);
if (Files.isDirectory(libpath)) urlsToAdd = this.addLibDir(urlsToAdd, libpath);
else if (Files.isRegularFile(libpath) && libpath.toString().toLowerCase().endsWith(".zip"))
urlsToAdd = this.addZipDir(urlsToAdd, libpath);
else if (Files.isRegularFile(libpath) && libpath.toString().toLowerCase().endsWith(".jar"))
urlsToAdd = this.addJarFile(urlsToAdd, libpath);
}
extendClassLoader(urlsToAdd);
this.extendClassLoader(urlsToAdd);
}
private synchronized void extendClassLoader(String... paths) {
Set<URL> urls = new HashSet<>();
for (String path : paths) {
private synchronized void extendClassLoader(final String... paths) {
final Set<URL> urls = new HashSet<>();
for (final String path : paths) {
URL url = null;
try {
url = new URL(path);
} catch (MalformedURLException e) {
} catch (final MalformedURLException e) {
throw new RuntimeException(e);
}
urls.add(url);
}
extendClassLoader(urls);
this.extendClassLoader(urls);
}
private synchronized void extendClassLoader(Set<URL> urls) {
Set<URL> newUrls = new HashSet<>();
if (!jarUrls.containsAll(urls)) {
for (URL url : urls) {
if (!jarUrls.contains(url)) {
private synchronized void extendClassLoader(final Set<URL> urls) {
final Set<URL> newUrls = new HashSet<>();
if (!this.jarUrls.containsAll(urls)) {
for (final URL url : urls)
if (!this.jarUrls.contains(url)) {
newUrls.add(url);
jarUrls.add(url);
this.jarUrls.add(url);
}
}
URL[] newUrlAry = newUrls.toArray(new URL[]{});
URLClassLoader ucl = URLClassLoader.newInstance(newUrlAry, Thread.currentThread().getContextClassLoader());
final URL[] newUrlAry = newUrls.toArray(new URL[]{});
final URLClassLoader ucl = URLClassLoader.newInstance(newUrlAry, Thread.currentThread().getContextClassLoader());
Thread.currentThread().setContextClassLoader(ucl);
logger.debug("Extended class loader layering with " + newUrls);
} else {
logger.debug("All URLs specified were already in a class loader.");
}
ActivityTypeLoader.logger.debug("Extended class loader layering with {}", newUrls);
} else ActivityTypeLoader.logger.debug("All URLs specified were already in a class loader.");
}
private Set<URL> addJarFile(Set<URL> urls, Path libpath) {
private Set<URL> addJarFile(final Set<URL> urls, final Path libpath) {
try {
urls.add(libpath.toUri().toURL());
} catch (MalformedURLException e) {
} catch (final MalformedURLException e) {
throw new RuntimeException(e);
}
return urls;
}
private Set<URL> addZipDir(Set<URL> urlsToAdd, Path libpath) {
private Set<URL> addZipDir(final Set<URL> urlsToAdd, final Path libpath) {
return urlsToAdd;
}
private Set<URL> addLibDir(Set<URL> urlsToAdd, Path libpath) {
Set<URL> urls = NBIO.local()
private Set<URL> addLibDir(final Set<URL> urlsToAdd, final Path libpath) {
final Set<URL> urls = NBIO.local()
.searchPrefixes(libpath.toString())
.extensionSet(".jar")
.list().stream().map(Content::getURL)
@@ -122,16 +118,16 @@ public class ActivityTypeLoader {
return urlsToAdd;
}
public Optional<ActivityType> load(ActivityDef activityDef) {
public Optional<ActivityType> load(final ActivityDef activityDef, final NBLabeledElement labels) {
final String driverName = activityDef.getParams()
String driverName = activityDef.getParams()
.getOptionalString("driver", "type")
.orElseThrow(() -> new BasicError("The parameter 'driver=' is required."));
activityDef.getParams()
.getOptionalString("jar")
.map(jar -> {
Set<URL> urls = NBIO.local().search(jar)
final Set<URL> urls = NBIO.local().search(jar)
.list()
.stream().map(Content::getURL)
.collect(Collectors.toSet());
@@ -139,28 +135,27 @@ public class ActivityTypeLoader {
})
.ifPresent(this::extendClassLoader);
return this.getDriverAdapter(driverName,activityDef)
.or(() -> ACTIVITYTYPE_SPI_FINDER.getOptionally(driverName));
return getDriverAdapter(driverName,activityDef,labels)
.or(() -> this.ACTIVITYTYPE_SPI_FINDER.getOptionally(driverName));
}
private Optional<ActivityType> getDriverAdapter(String activityTypeName, ActivityDef activityDef) {
Optional<DriverAdapter> oda = DRIVERADAPTER_SPI_FINDER.getOptionally(activityTypeName);
private Optional<ActivityType> getDriverAdapter(final String activityTypeName, final ActivityDef activityDef, final NBLabeledElement labels) {
final Optional<DriverAdapter> oda = this.DRIVERADAPTER_SPI_FINDER.getOptionally(activityTypeName);
if (oda.isPresent()) {
DriverAdapter<?, ?> driverAdapter = oda.get();
final DriverAdapter<?, ?> driverAdapter = oda.get();
ActivityType activityType = new StandardActivityType<>(driverAdapter, activityDef);
final ActivityType activityType = new StandardActivityType<>(driverAdapter, activityDef, labels);
return Optional.of(activityType);
} else {
return Optional.empty();
}
return Optional.empty();
}
public Set<String> getAllSelectors() {
Map<String, Maturity> allSelectors = ACTIVITYTYPE_SPI_FINDER.getAllSelectors();
Map<String, Maturity> addAdapters = DRIVERADAPTER_SPI_FINDER.getAllSelectors();
Set<String> all = new LinkedHashSet<>();
final Map<String, Maturity> allSelectors = this.ACTIVITYTYPE_SPI_FINDER.getAllSelectors();
final Map<String, Maturity> addAdapters = this.DRIVERADAPTER_SPI_FINDER.getAllSelectors();
final Set<String> all = new LinkedHashSet<>();
all.addAll(allSelectors.keySet());
all.addAll(addAdapters.keySet());
return all;

View File

@@ -19,6 +19,8 @@ import com.codahale.metrics.MetricRegistry;
import com.oracle.truffle.js.scriptengine.GraalJSScriptEngine;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.metadata.ScenarioMetadata;
import io.nosqlbench.api.metadata.ScenarioMetadataAware;
@@ -38,6 +40,7 @@ import io.nosqlbench.nb.annotations.Maturity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.Engine.Builder;
import org.graalvm.polyglot.EnvironmentAccess;
import org.graalvm.polyglot.HostAccess;
import org.graalvm.polyglot.PolyglotAccess;
@@ -53,14 +56,11 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
public class Scenario implements Callable<ExecutionMetricsResult> {
public class Scenario implements Callable<ExecutionMetricsResult>, NBLabeledElement {
private final String commandLine;
private final String reportSummaryTo;
@@ -76,10 +76,15 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
private ExecutionMetricsResult result;
public Optional<ExecutionMetricsResult> getResultIfComplete() {
return Optional.ofNullable(this.result);
return Optional.ofNullable(result);
}
@Override
public NBLabels getLabels() {
return NBLabels.forKV("scenario", this.scenarioName);
}
public enum State {
Scheduled,
Running,
@@ -97,7 +102,7 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
private ScriptParams scenarioScriptParams;
private String scriptfile;
private Engine engine = Engine.Graalvm;
private boolean wantsStackTraces = false;
private boolean wantsStackTraces;
private boolean wantsCompiledScript;
private long startedAtMillis = -1L;
private long endedAtMillis = -1L;
@@ -107,16 +112,16 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
}
public Scenario(
String scenarioName,
String scriptfile,
Engine engine,
String progressInterval,
boolean wantsStackTraces,
boolean wantsCompiledScript,
String reportSummaryTo,
String commandLine,
Path logsPath,
Maturity minMaturity) {
final String scenarioName,
final String scriptfile,
final Engine engine,
final String progressInterval,
final boolean wantsStackTraces,
final boolean wantsCompiledScript,
final String reportSummaryTo,
final String commandLine,
final Path logsPath,
final Maturity minMaturity) {
this.scenarioName = scenarioName;
this.scriptfile = scriptfile;
@@ -130,53 +135,53 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
this.minMaturity = minMaturity;
}
public Scenario(String name, Engine engine, String reportSummaryTo, Maturity minMaturity) {
this.scenarioName = name;
public Scenario(final String name, final Engine engine, final String reportSummaryTo, final Maturity minMaturity) {
scenarioName = name;
this.reportSummaryTo = reportSummaryTo;
this.engine = engine;
this.commandLine = "";
commandLine = "";
this.minMaturity = minMaturity;
this.logsPath = Path.of("logs");
logsPath = Path.of("logs");
}
public Scenario setLogger(Logger logger) {
public Scenario setLogger(final Logger logger) {
this.logger = logger;
return this;
}
public Logger getLogger() {
return logger;
return this.logger;
}
public Scenario addScriptText(String scriptText) {
scripts.add(scriptText);
public Scenario addScriptText(final String scriptText) {
this.scripts.add(scriptText);
return this;
}
public Scenario addScriptFiles(String... args) {
for (String scriptFile : args) {
Path scriptPath = Paths.get(scriptFile);
public Scenario addScriptFiles(final String... args) {
for (final String scriptFile : args) {
final Path scriptPath = Paths.get(scriptFile);
byte[] bytes = new byte[0];
try {
bytes = Files.readAllBytes(scriptPath);
} catch (IOException e) {
} catch (final IOException e) {
e.printStackTrace();
}
ByteBuffer bb = ByteBuffer.wrap(bytes);
Charset utf8 = StandardCharsets.UTF_8;
String scriptData = utf8.decode(bb).toString();
addScriptText(scriptData);
final ByteBuffer bb = ByteBuffer.wrap(bytes);
final Charset utf8 = StandardCharsets.UTF_8;
final String scriptData = utf8.decode(bb).toString();
this.addScriptText(scriptData);
}
return this;
}
private void initializeScriptingEngine(ScenarioController scenarioController) {
private void initializeScriptingEngine(final ScenarioController scenarioController) {
logger.debug("Using engine " + engine.toString());
MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
this.logger.debug("Using engine {}", this.engine.toString());
final MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
Context.Builder contextSettings = Context.newBuilder("js")
final Context.Builder contextSettings = Context.newBuilder("js")
.allowHostAccess(HostAccess.ALL)
.allowNativeAccess(true)
.allowCreateThread(true)
@@ -190,183 +195,171 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
.option("js.ecmascript-version", "2020")
.option("js.nashorn-compat", "true");
org.graalvm.polyglot.Engine.Builder engineBuilder = org.graalvm.polyglot.Engine.newBuilder();
final Builder engineBuilder = org.graalvm.polyglot.Engine.newBuilder();
engineBuilder.option("engine.WarnInterpreterOnly", "false");
org.graalvm.polyglot.Engine polyglotEngine = engineBuilder.build();
final org.graalvm.polyglot.Engine polyglotEngine = engineBuilder.build();
// TODO: add in, out, err for this scenario
this.scriptEngine = GraalJSScriptEngine.create(polyglotEngine, contextSettings);
scriptEngine = GraalJSScriptEngine.create(polyglotEngine, contextSettings);
if (!progressInterval.equals("disabled")) {
activityProgressIndicator = new ActivityProgressIndicator(scenarioController, progressInterval);
}
if (!"disabled".equals(progressInterval))
this.activityProgressIndicator = new ActivityProgressIndicator(scenarioController, this.progressInterval);
scriptEnv = new ScenarioContext(scenarioController);
scriptEngine.setContext(scriptEnv);
this.scriptEnv = new ScenarioContext(scenarioName,scenarioController);
this.scriptEngine.setContext(this.scriptEnv);
scriptEngine.put("params", scenarioScriptParams);
this.scriptEngine.put("params", this.scenarioScriptParams);
// scriptEngine.put("scenario", scenarioController);
// scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
// scriptEngine.put("activities", new NashornActivityBindings(scenarioController));
scriptEngine.put("scenario", new PolyglotScenarioController(scenarioController));
scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
scriptEngine.put("activities", new ActivityBindings(scenarioController));
this.scriptEngine.put("scenario", new PolyglotScenarioController(scenarioController));
this.scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
this.scriptEngine.put("activities", new ActivityBindings(scenarioController));
for (ScriptingPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
for (final ScriptingPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
if (!extensionDescriptor.isAutoLoading()) {
logger.info(() -> "Not loading " + extensionDescriptor + ", autoloading is false");
this.logger.info(() -> "Not loading " + extensionDescriptor + ", autoloading is false");
continue;
}
Logger extensionLogger =
final Logger extensionLogger =
LogManager.getLogger("extensions." + extensionDescriptor.getBaseVariableName());
Object extensionObject = extensionDescriptor.getExtensionObject(
final Object extensionObject = extensionDescriptor.getExtensionObject(
extensionLogger,
metricRegistry,
scriptEnv
this.scriptEnv
);
ScenarioMetadataAware.apply(extensionObject, getScenarioMetadata());
logger.trace(() -> "Adding extension object: name=" + extensionDescriptor.getBaseVariableName() +
ScenarioMetadataAware.apply(extensionObject, this.getScenarioMetadata());
this.logger.trace(() -> "Adding extension object: name=" + extensionDescriptor.getBaseVariableName() +
" class=" + extensionObject.getClass().getSimpleName());
scriptEngine.put(extensionDescriptor.getBaseVariableName(), extensionObject);
this.scriptEngine.put(extensionDescriptor.getBaseVariableName(), extensionObject);
}
}
private synchronized ScenarioMetadata getScenarioMetadata() {
if (this.scenarioMetadata == null) {
this.scenarioMetadata = new ScenarioMetadata(
this.startedAtMillis,
this.scenarioName,
SystemId.getNodeId(),
SystemId.getNodeFingerprint()
);
}
return scenarioMetadata;
if (null == this.scenarioMetadata) scenarioMetadata = new ScenarioMetadata(
startedAtMillis,
scenarioName,
SystemId.getNodeId(),
SystemId.getNodeFingerprint()
);
return this.scenarioMetadata;
}
private synchronized void runScenario() {
scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(scenarioShutdownHook);
this.scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(this.scenarioShutdownHook);
state = State.Running;
startedAtMillis = System.currentTimeMillis();
this.state = State.Running;
this.startedAtMillis = System.currentTimeMillis();
Annotators.recordAnnotation(
Annotation.newBuilder()
.session(this.scenarioName)
.session(scenarioName)
.now()
.layer(Layer.Scenario)
.detail("engine", this.engine.toString())
.detail("engine", engine.toString())
.build()
);
logger.debug("Running control script for " + getScenarioName() + ".");
scenarioController = new ScenarioController(this);
this.logger.debug("Running control script for {}.", scenarioName);
this.scenarioController = new ScenarioController(this);
try {
initializeScriptingEngine(scenarioController);
executeScenarioScripts();
long awaitCompletionTime = 86400 * 365 * 1000L;
logger.debug("Awaiting completion of scenario and activities for " + awaitCompletionTime + " millis.");
scenarioController.awaitCompletion(awaitCompletionTime);
} catch (Exception e) {
this.error=e;
this.initializeScriptingEngine(this.scenarioController);
this.executeScenarioScripts();
final long awaitCompletionTime = 86400 * 365 * 1000L;
this.logger.debug("Awaiting completion of scenario and activities for {} millis.", awaitCompletionTime);
this.scenarioController.awaitCompletion(awaitCompletionTime);
} catch (final Exception e) {
error =e;
} finally {
scenarioController.shutdown();
this.scenarioController.shutdown();
}
Runtime.getRuntime().removeShutdownHook(scenarioShutdownHook);
var runHook = scenarioShutdownHook;
scenarioShutdownHook = null;
Runtime.getRuntime().removeShutdownHook(this.scenarioShutdownHook);
final var runHook = this.scenarioShutdownHook;
this.scenarioShutdownHook = null;
runHook.run();
logger.debug("removing scenario shutdown hook");
this.logger.debug("removing scenario shutdown hook");
}
public void notifyException(Thread t, Throwable e) {
this.error=new RuntimeException("in thread " + t.getName() + ", " +e, e);
public void notifyException(final Thread t, final Throwable e) {
error =new RuntimeException("in thread " + t.getName() + ", " +e, e);
}
private void executeScenarioScripts() {
for (String script : scripts) {
for (final String script : this.scripts)
try {
Object result = null;
if (scriptEngine instanceof Compilable compilableEngine && wantsCompiledScript) {
logger.debug("Using direct script compilation");
CompiledScript compiled = compilableEngine.compile(script);
logger.debug("-> invoking main scenario script (compiled)");
if ((scriptEngine instanceof Compilable compilableEngine) && this.wantsCompiledScript) {
this.logger.debug("Using direct script compilation");
final CompiledScript compiled = compilableEngine.compile(script);
this.logger.debug("-> invoking main scenario script (compiled)");
result = compiled.eval();
logger.debug("<- scenario script completed (compiled)");
this.logger.debug("<- scenario script completed (compiled)");
} else if ((null != scriptfile) && !this.scriptfile.isEmpty()) {
final String filename = this.scriptfile.replace("_SESSION_", this.scenarioName);
this.logger.debug("-> invoking main scenario script (interpreted from {})", filename);
final Path written = Files.write(
Path.of(filename),
script.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.CREATE
);
final BufferedReader reader = Files.newBufferedReader(written);
this.scriptEngine.eval(reader);
this.logger.debug("<- scenario control script completed (interpreted) from {})", filename);
} else {
if (scriptfile != null && !scriptfile.isEmpty()) {
String filename = scriptfile.replace("_SESSION_", scenarioName);
logger.debug("-> invoking main scenario script (" +
"interpreted from " + filename + ")");
Path written = Files.write(
Path.of(filename),
script.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.CREATE
);
BufferedReader reader = Files.newBufferedReader(written);
scriptEngine.eval(reader);
logger.debug("<- scenario control script completed (interpreted) " +
"from " + filename + ")");
} else {
logger.debug("-> invoking main scenario script (interpreted)");
result = scriptEngine.eval(script);
logger.debug("<- scenario control script completed (interpreted)");
}
this.logger.debug("-> invoking main scenario script (interpreted)");
result = this.scriptEngine.eval(script);
this.logger.debug("<- scenario control script completed (interpreted)");
}
if (result != null) {
logger.debug("scenario result: type(" + result.getClass().getCanonicalName() + "): value:" + result);
}
if (null != result)
this.logger.debug("scenario result: type({}): value:{}", result.getClass().getCanonicalName(), result);
System.err.flush();
System.out.flush();
} catch (Exception e) {
this.error=e;
this.state = State.Errored;
logger.error("Error in scenario, shutting down. (" + e + ")");
} catch (final Exception e) {
error = e;
state = State.Errored;
this.logger.error("Error in scenario, shutting down. ({})", e);
try {
this.scenarioController.forceStopScenario(5000, false);
} catch (Exception eInner) {
logger.debug("Found inner exception while forcing stop with rethrow=false: " + eInner);
scenarioController.forceStopScenario(5000, false);
} catch (final Exception eInner) {
this.logger.debug("Found inner exception while forcing stop with rethrow=false: {}", eInner);
} finally {
throw new RuntimeException(e);
}
} finally {
System.out.flush();
System.err.flush();
endedAtMillis = System.currentTimeMillis();
this.endedAtMillis = System.currentTimeMillis();
}
}
}
public void finish() {
logger.debug("finishing scenario");
endedAtMillis = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
if (this.state == State.Running) {
this.state = State.Finished;
}
this.logger.debug("finishing scenario");
this.endedAtMillis = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
if (State.Running == this.state) state = State.Finished;
if (scenarioShutdownHook != null) {
if (null != scenarioShutdownHook) {
// If this method was called while the shutdown hook is defined, then it means
// that the scenario was ended before the hook was uninstalled normally.
this.state = State.Interrupted;
logger.warn("Scenario was interrupted by process exit, shutting down");
} else {
logger.info("Scenario completed successfully, with " + scenarioController.getActivityExecutorMap().size() + " logical activities.");
}
state = State.Interrupted;
this.logger.warn("Scenario was interrupted by process exit, shutting down");
} else
this.logger.info("Scenario completed successfully, with {} logical activities.", this.scenarioController.getActivityExecutorMap().size());
logger.info(() -> "scenario state: " + this.state);
this.logger.info(() -> "scenario state: " + state);
// We report the scenario state via annotation even for short runs
Annotation annotation = Annotation.newBuilder()
.session(this.scenarioName)
.interval(this.startedAtMillis, endedAtMillis)
final Annotation annotation = Annotation.newBuilder()
.session(scenarioName)
.interval(startedAtMillis, this.endedAtMillis)
.layer(Layer.Scenario)
.label("state", this.state.toString())
.detail("command_line", this.commandLine)
.label("state", state.toString())
.detail("command_line", commandLine)
.build();
Annotators.recordAnnotation(annotation);
@@ -374,11 +367,11 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
}
public long getStartedAtMillis() {
return startedAtMillis;
return this.startedAtMillis;
}
public long getEndedAtMillis() {
return endedAtMillis;
return this.endedAtMillis;
}
/**
@@ -400,37 +393,38 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
*
* @return
*/
@Override
public synchronized ExecutionMetricsResult call() {
if (result == null) {
if (null == result) {
try {
runScenario();
} catch (Exception e) {
this.error=e;
this.runScenario();
} catch (final Exception e) {
error =e;
} finally {
logger.debug((this.error==null ? "NORMAL" : "ERRORED") + " scenario run");
this.logger.debug("{} scenario run", null == this.error ? "NORMAL" : "ERRORED");
}
String iolog = scriptEnv.getTimedLog();
this.result = new ExecutionMetricsResult(this.startedAtMillis, this.endedAtMillis, iolog, error);
result.reportMetricsSummaryToLog();
doReportSummaries(reportSummaryTo, result);
final String iolog = this.scriptEnv.getTimedLog();
result = new ExecutionMetricsResult(startedAtMillis, endedAtMillis, iolog, this.error);
this.result.reportMetricsSummaryToLog();
this.doReportSummaries(this.reportSummaryTo, this.result);
}
return result;
return this.result;
}
private void doReportSummaries(String reportSummaryTo, ExecutionMetricsResult result) {
List<PrintStream> fullChannels = new ArrayList<>();
List<PrintStream> briefChannels = new ArrayList<>();
private void doReportSummaries(final String reportSummaryTo, final ExecutionMetricsResult result) {
final List<PrintStream> fullChannels = new ArrayList<>();
final List<PrintStream> briefChannels = new ArrayList<>();
String[] destinationSpecs = reportSummaryTo.split(", *");
final String[] destinationSpecs = reportSummaryTo.split(", *");
for (String spec : destinationSpecs) {
if (spec != null && !spec.isBlank()) {
String[] split = spec.split(":", 2);
String summaryTo = split[0];
long summaryWhen = split.length == 2 ? Long.parseLong(split[1]) * 1000L : 0;
for (final String spec : destinationSpecs)
if ((null != spec) && !spec.isBlank()) {
final String[] split = spec.split(":", 2);
final String summaryTo = split[0];
final long summaryWhen = (2 == split.length) ? (Long.parseLong(split[1]) * 1000L) : 0;
PrintStream out = null;
switch (summaryTo.toLowerCase()) {
@@ -442,83 +436,85 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
out = System.err;
break;
default:
String outName = summaryTo
.replaceAll("_SESSION_", getScenarioName())
.replaceAll("_LOGS_", logsPath.toString());
final String outName = summaryTo
.replaceAll("_SESSION_", scenarioName)
.replaceAll("_LOGS_", this.logsPath.toString());
try {
out = new PrintStream(new FileOutputStream(outName));
break;
} catch (FileNotFoundException e) {
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
}
}
if (result.getElapsedMillis() > summaryWhen) {
fullChannels.add(out);
} else {
logger.debug("Summarizing counting metrics only to " + spec + " with scenario duration of " + summaryWhen + "ms (<" + summaryWhen + ")");
if (result.getElapsedMillis() > summaryWhen) fullChannels.add(out);
else {
this.logger.debug("Summarizing counting metrics only to {} with scenario duration of {}ms (<{})", spec, summaryWhen, summaryWhen);
briefChannels.add(out);
}
}
}
fullChannels.forEach(result::reportMetricsSummaryTo);
// briefChannels.forEach(result::reportCountsTo);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Scenario scenario = (Scenario) o;
return getScenarioName() != null ? getScenarioName().equals(scenario.getScenarioName()) : scenario.getScenarioName() == null;
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if ((null == o) || (this.getClass() != o.getClass())) {
return false;
}
final Scenario scenario = (Scenario) o;
return Objects.equals(this.scenarioName, scenario.scenarioName);
}
@Override
public int hashCode() {
return getScenarioName() != null ? getScenarioName().hashCode() : 0;
return (null != this.scenarioName) ? scenarioName.hashCode() : 0;
}
public String getScenarioName() {
return scenarioName;
return this.scenarioName;
}
public ScenarioController getScenarioController() {
return scenarioController;
return this.scenarioController;
}
public String getScriptText() {
return scripts.stream().collect(Collectors.joining());
return this.scripts.stream().collect(Collectors.joining());
}
public Optional<List<String>> getIOLog() {
return Optional.ofNullable(scriptEnv).map(ScriptEnvBuffer::getTimeLogLines);
return Optional.ofNullable(this.scriptEnv).map(ScriptEnvBuffer::getTimeLogLines);
}
public String toString() {
return "name:'" + this.getScenarioName() + "'";
return "name:'" + scenarioName + '\'';
}
public void addScenarioScriptParams(ScriptParams scenarioScriptParams) {
public void addScenarioScriptParams(final ScriptParams scenarioScriptParams) {
this.scenarioScriptParams = scenarioScriptParams;
}
public void addScenarioScriptParams(Map<String, String> scriptParams) {
addScenarioScriptParams(new ScriptParams() {{
putAll(scriptParams);
public void addScenarioScriptParams(final Map<String, String> scriptParams) {
this.addScenarioScriptParams(new ScriptParams() {{
this.putAll(scriptParams);
}});
}
public State getScenarioState() {
return state;
return this.state;
}
public void enableCharting() {
MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
final MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
}
public String getReportSummaryTo() {
return reportSummaryTo;
return this.reportSummaryTo;
}
}

View File

@@ -17,6 +17,8 @@ package io.nosqlbench.engine.core.lifecycle.scenario;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
@@ -38,7 +40,7 @@ import java.util.stream.Collectors;
* A ScenarioController provides a way to start Activities,
* modify them while running, and forceStopMotors, pause or restart them.
*/
public class ScenarioController {
public class ScenarioController implements NBLabeledElement {
private static final Logger logger = LogManager.getLogger(ScenarioController.class);
private static final Logger scenariologger = LogManager.getLogger("SCENARIO");
@@ -81,7 +83,7 @@ public class ScenarioController {
private synchronized ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef);
Activity activity = this.activityLoader.loadActivity(activityDef, this);
ActivityExecutor executor = new ActivityExecutor(activity, this.scenario.getScenarioName());
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
@@ -161,7 +163,7 @@ public class ScenarioController {
public boolean isRunningActivity(ActivityDef activityDef) {
ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias());
return (runtimeInfo != null && runtimeInfo.isRunning());
return (null != runtimeInfo) && runtimeInfo.isRunning();
}
public boolean isRunningActivity(Map<String, String> activityDefMap) {
@@ -187,11 +189,11 @@ public class ScenarioController {
.build());
ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias());
if (runtimeInfo == null) {
if (null == runtimeInfo) {
throw new RuntimeException("could not stop missing activity:" + activityDef);
}
scenariologger.debug("STOP " + activityDef.getAlias());
scenariologger.debug("STOP {}", activityDef.getAlias());
runtimeInfo.stopActivity();
}
@@ -217,7 +219,7 @@ public class ScenarioController {
* @param spec The name of the activity that is already known to the scenario
*/
public synchronized void stop(String spec) {
logger.debug("request->STOP '" + spec + "'");
logger.debug("request->STOP '{}'", spec);
List<String> aliases = Arrays.asList(spec.split("[,; ]"));
List<String> matched = aliases.stream()
.map(String::trim)
@@ -225,7 +227,7 @@ public class ScenarioController {
.flatMap(aspec -> getMatchingAliases(aspec).stream()).collect(Collectors.toList());
for (String alias : matched) {
ActivityDef adef = aliasToDef(alias);
scenariologger.debug("STOP " + adef.getAlias());
scenariologger.debug("STOP {}", adef.getAlias());
stop(adef);
}
}
@@ -248,11 +250,11 @@ public class ScenarioController {
.build());
ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias());
if (runtimeInfo == null) {
if (null == runtimeInfo) {
throw new RuntimeException("could not force stop missing activity:" + activityDef);
}
scenariologger.debug("FORCE STOP " + activityDef.getAlias());
scenariologger.debug("FORCE STOP {}", activityDef.getAlias());
runtimeInfo.forceStopActivity();
}
@@ -278,7 +280,7 @@ public class ScenarioController {
* @param spec The name of the activity that is already known to the scenario
*/
public synchronized void forceStop(String spec) {
logger.debug("request->STOP '" + spec + "'");
logger.debug("request->STOP '{}'", spec);
List<String> aliases = Arrays.asList(spec.split("[,; ]"));
List<String> matched = aliases.stream()
.map(String::trim)
@@ -286,7 +288,7 @@ public class ScenarioController {
.flatMap(aspec -> getMatchingAliases(aspec).stream()).collect(Collectors.toList());
for (String alias : matched) {
ActivityDef adef = aliasToDef(alias);
scenariologger.debug("STOP " + adef.getAlias());
scenariologger.debug("STOP {}", adef.getAlias());
forceStop(adef);
}
}
@@ -295,15 +297,16 @@ public class ScenarioController {
private List<String> getMatchingAliases(String pattern) {
Pattern matcher;
// If the pattern is an alphanumeric name, the require it to match as a fully-qualified literal
// It is not, so the user is wanting to do a flexible match
if (pattern.matches("[a-zA-Z_][a-zA-Z0-9_.]*")) {
matcher = Pattern.compile("^" + pattern + "$");
} else { // It is not, so the user is wanting to do a flexible match
matcher = Pattern.compile('^' + pattern + '$');
} else {
matcher = Pattern.compile(pattern);
}
List<String> matching = activityInfoMap.keySet().stream()
.filter(a -> Pattern.matches(pattern, a))
.peek(p -> logger.debug("MATCH " + pattern + " -> " + p))
.peek(p -> logger.debug("MATCH {} -> {}", pattern, p))
.collect(Collectors.toList());
return matching;
}
@@ -314,12 +317,12 @@ public class ScenarioController {
* @param waitMillis time to wait, in milliseconds
*/
public void waitMillis(long waitMillis) {
scenariologger.debug("WAITMILLIS " + waitMillis);
scenariologger.debug("WAITMILLIS {}", waitMillis);
logger.trace("#> waitMillis(" + waitMillis + ")");
logger.trace("#> waitMillis({})", waitMillis);
long endTime = System.currentTimeMillis() + waitMillis;
while (waitMillis > 0L) {
while (0L < waitMillis) {
try {
Thread.sleep(waitMillis);
} catch (InterruptedException spurrious) {
@@ -347,7 +350,7 @@ public class ScenarioController {
* @param waitTimeMillis grace period during which an activity may cooperatively shut down
*/
public synchronized void forceStopScenario(int waitTimeMillis, boolean rethrow) {
logger.debug("force stopping scenario " + this.scenario.getScenarioName());
logger.debug("force stopping scenario {}", this.scenario.getScenarioName());
activityInfoMap.values().forEach(a -> a.getActivityExecutor().forceStopActivity(10000));
logger.debug("Scenario force stopped.");
}
@@ -369,17 +372,14 @@ public class ScenarioController {
boolean completed = true;
for (ActivityRuntimeInfo activityRuntimeInfo : this.activityInfoMap.values()) {
ExecutionResult activityResult = activityRuntimeInfo.awaitResult(waitTimeMillis);
if (activityResult == null) {
logger.error("Unable to retrieve activity result for " + activityRuntimeInfo.getActivity().getAlias());
if (null == activityResult) {
logger.error("Unable to retrieve activity result for {}", activityRuntimeInfo.getActivity().getAlias());
completed = false;
} else {
if (activityResult.getException()!=null) {
if (activityResult.getException() instanceof RuntimeException e) {
throw e;
} else {
throw new RuntimeException(activityResult.getException());
}
} else if (null != activityResult.getException()) {
if (activityResult.getException() instanceof RuntimeException e) {
throw e;
}
throw new RuntimeException(activityResult.getException());
}
}
return completed;
@@ -388,9 +388,8 @@ public class ScenarioController {
private ActivityDef aliasToDef(String alias) {
if (alias.contains("=")) {
return ActivityDef.parseActivityDef(alias);
} else {
return ActivityDef.parseActivityDef("alias=" + alias + ";");
}
return ActivityDef.parseActivityDef("alias=" + alias + ';');
}
public void await(Map<String, String> activityDefMap) {
@@ -417,10 +416,10 @@ public class ScenarioController {
public boolean awaitActivity(ActivityDef activityDef, long timeoutMs) {
ActivityRuntimeInfo ari = this.activityInfoMap.get(activityDef.getAlias());
if (ari == null) {
if (null == ari) {
throw new RuntimeException("Could not await missing activity: " + activityDef.getAlias());
}
scenariologger.debug("AWAIT/before alias=" + activityDef.getAlias());
scenariologger.debug("AWAIT/before alias={}", activityDef.getAlias());
ExecutionResult result = null;
Future<ExecutionResult> future=null;
try {
@@ -437,7 +436,7 @@ public class ScenarioController {
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
return (result != null);
return null != result;
}
/**
@@ -465,7 +464,7 @@ public class ScenarioController {
}
public void notifyException(Thread t, Throwable e) {
logger.error("Uncaught exception in activity lifecycle thread:" + e, e);
logger.error("Uncaught exception in activity lifecycle thread:{}", e, e);
scenario.notifyException(t,e);
throw new RuntimeException(e);
}
@@ -486,8 +485,13 @@ public class ScenarioController {
}
}
} catch (Exception e) {
logger.warn("There was an exception while trying to shutdown the ScenarioController:" + e,e);
logger.warn("There was an exception while trying to shutdown the ScenarioController:{}", e, e);
throw new RuntimeException(e);
}
}
@Override
public NBLabels getLabels() {
return this.scenario.getLabels();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.scenario.script;
import com.codahale.metrics.*;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
@@ -26,6 +27,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.Method;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.*;
import java.util.function.Function;
@@ -35,20 +37,22 @@ import java.util.stream.Collectors;
/**
* Find the metrics associated with an activity type by instantiating the activity in idle mode.
*/
public class MetricsMapper {
private final static Logger logger = LogManager.getLogger(MetricsMapper.class);
public enum MetricsMapper {
;
private static final Logger logger = LogManager.getLogger(MetricsMapper.class);
private static final Set<Class<?>> metricsElements = new HashSet<>() {{
add(Meter.class);
add(Counter.class);
add(Timer.class);
add(Histogram.class);
add(Gauge.class);
add(Snapshot.class);
add(Gauge.class);
add(Histogram.class);
add(Timer.class);
add(Counter.class);
add(Meter.class);
}};
private static final Predicate<Method> isSimpleGetter = method ->
method.getName().startsWith("get")
&& method.getParameterCount() == 0
&& !method.getName().equals("getClass");
method.getName().startsWith("get")
&& (0 == method.getParameterCount())
&& !"getClass".equals(method.getName());
private static final Function<Method, String> getPropertyName = method ->
{
@@ -56,62 +60,61 @@ public class MetricsMapper {
return mName;
};
public static String metricsDetail(String activitySpec) {
public static String metricsDetail(final String activitySpec) {
//StringBuilder metricsDetail = new StringBuilder();
List<String> metricsDetails = new ArrayList<>();
final List<String> metricsDetails = new ArrayList<>();
ActivityDef activityDef = ActivityDef.parseActivityDef(activitySpec);
logger.info(() -> "introspecting metric names for " + activitySpec);
final ActivityDef activityDef = ActivityDef.parseActivityDef(activitySpec);
MetricsMapper.logger.info(() -> "introspecting metric names for " + activitySpec);
Optional<ActivityType> activityType = new ActivityTypeLoader().load(activityDef);
final Optional<ActivityType> activityType = new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY);
if (!activityType.isPresent()) {
if (!activityType.isPresent())
throw new RuntimeException("Activity type '" + activityDef.getActivityType() + "' does not exist in this runtime.");
}
Activity activity = activityType.get().getAssembledActivity(activityDef, new HashMap<>());
PolyglotMetricRegistryBindings nashornMetricRegistryBindings = new PolyglotMetricRegistryBindings(ActivityMetrics.getMetricRegistry());
final Activity activity = activityType.get().getAssembledActivity(activityDef, new HashMap<>(), NBLabeledElement.EMPTY);
final PolyglotMetricRegistryBindings nashornMetricRegistryBindings = new PolyglotMetricRegistryBindings(ActivityMetrics.getMetricRegistry());
activity.initActivity();
activity.getInputDispenserDelegate().getInput(0);
activity.getActionDispenserDelegate().getAction(0);
activity.getMotorDispenserDelegate().getMotor(activityDef, 0);
Map<String, Metric> metricMap = nashornMetricRegistryBindings.getMetrics();
final Map<String, Metric> metricMap = nashornMetricRegistryBindings.getMetrics();
// Map<String, Map<String,String>> details = new LinkedHashMap<>();
for (Map.Entry<String, Metric> metricEntry : metricMap.entrySet()) {
String metricName = metricEntry.getKey();
Metric metricValue = metricEntry.getValue();
for (final Entry<String, Metric> metricEntry : metricMap.entrySet()) {
final String metricName = metricEntry.getKey();
final Metric metricValue = metricEntry.getValue();
Map<String, String> getterSummary = getGetterSummary(metricValue);
final Map<String, String> getterSummary = MetricsMapper.getGetterSummary(metricValue);
// details.put(metricName,getterSummary);
List<String> methodDetails = getterSummary.entrySet().stream().map(
final List<String> methodDetails = getterSummary.entrySet().stream().map(
es -> metricName + es.getKey() + " " + es.getValue()
).collect(Collectors.toList());
methodDetails.sort(String::compareTo);
String getterText = methodDetails.stream().collect(Collectors.joining("\n"));
metricsDetails.add(metricName + "\n" + getterText);
final String getterText = methodDetails.stream().collect(Collectors.joining("\n"));
metricsDetails.add(metricName + '\n' + getterText);
}
// return details;
return metricsDetails.stream().collect(Collectors.joining("\n"));
}
private static Map<String, String> getGetterSummary(Object o) {
return getGetterSummary(new HashMap<>(), "", o.getClass());
private static Map<String, String> getGetterSummary(final Object o) {
return MetricsMapper.getGetterSummary(new HashMap<>(), "", o.getClass());
}
private static Map<String, String> getGetterSummary(Map<String, String> accumulator, String name, Class<?> objectType) {
private static Map<String, String> getGetterSummary(final Map<String, String> accumulator, final String name, final Class<?> objectType) {
Arrays.stream(objectType.getMethods())
.filter(isSimpleGetter)
.filter(MetricsMapper.isSimpleGetter)
.forEach(m -> {
if (m.getReturnType().isPrimitive()) {
accumulator.put(name + "." + getPropertyName.apply(m), m.getReturnType().getSimpleName());
} else {
String fullName = name + "." + getPropertyName.apply(m);
getGetterSummary(accumulator, fullName, m.getReturnType());
if (m.getReturnType().isPrimitive())
accumulator.put(name + '.' + MetricsMapper.getPropertyName.apply(m), m.getReturnType().getSimpleName());
else {
final String fullName = name + '.' + MetricsMapper.getPropertyName.apply(m);
MetricsMapper.getGetterSummary(accumulator, fullName, m.getReturnType());
}
});
return accumulator;

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,17 +15,25 @@
*/
package io.nosqlbench.engine.core.lifecycle.scenario.script;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenarioController;
import io.nosqlbench.api.config.LabeledScenarioContext;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenarioController;
public class ScenarioContext extends ScriptEnvBuffer {
public class ScenarioContext extends ScriptEnvBuffer implements LabeledScenarioContext {
private final ScenarioController sc;
private final String contextName;
public ScenarioContext(ScenarioController sc) {
public ScenarioContext(String contextName, ScenarioController sc) {
this.contextName = contextName;
this.sc = sc;
}
public String getContextName() {
return this.contextName;
}
@Override
public Object getAttribute(String name) {
Object o = super.getAttribute(name);
@@ -43,4 +51,8 @@ public class ScenarioContext extends ScriptEnvBuffer {
super.setAttribute(name, value, scope);
}
@Override
public NBLabels getLabels() {
return NBLabels.forKV("scenario", this.contextName);
}
}

View File

@@ -1,434 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.logging;
import com.codahale.metrics.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* This is a Log4J targeted metrics logging reporter, derived from
* {@link com.codahale.metrics.Slf4jReporter}. This implementation
* was built to allow for consolidating internal logging dependencies
* to log4j only.
*/
public class Log4JMetricsReporter extends ScheduledReporter {
/**
* Returns a new {@link Builder} for {@link Log4JMetricsReporter}.
*
* @param registry the registry to report
* @return a {@link Builder} instance for a {@link Log4JMetricsReporter}
*/
public static Builder forRegistry(MetricRegistry registry) {
return new Builder(registry);
}
public enum LoggingLevel { TRACE, DEBUG, INFO, WARN, ERROR }
/**
* A builder for {@link Log4JMetricsReporter} instances. Defaults to logging to {@code metrics}, not
* using a marker, converting rates to events/second, converting durations to milliseconds, and
* not filtering metrics.
*/
public static class Builder {
private final MetricRegistry registry;
private Logger logger;
private LoggingLevel loggingLevel;
private Marker marker;
private String prefix;
private TimeUnit rateUnit;
private TimeUnit durationUnit;
private MetricFilter filter;
private ScheduledExecutorService executor;
private boolean shutdownExecutorOnStop;
private Builder(MetricRegistry registry) {
this.registry = registry;
this.logger = LogManager.getLogger("metrics");
this.marker = null;
this.prefix = "";
this.rateUnit = TimeUnit.SECONDS;
this.durationUnit = TimeUnit.MILLISECONDS;
this.filter = MetricFilter.ALL;
this.loggingLevel = LoggingLevel.INFO;
this.executor = null;
this.shutdownExecutorOnStop = true;
}
/**
* Specifies whether or not, the executor (used for reporting) will be stopped with same time with reporter.
* Default value is true.
* Setting this parameter to false, has the sense in combining with providing external managed executor via {@link #scheduleOn(ScheduledExecutorService)}.
*
* @param shutdownExecutorOnStop if true, then executor will be stopped in same time with this reporter
* @return {@code this}
*/
public Builder shutdownExecutorOnStop(boolean shutdownExecutorOnStop) {
this.shutdownExecutorOnStop = shutdownExecutorOnStop;
return this;
}
/**
* Specifies the executor to use while scheduling reporting of metrics.
* Default value is null.
* Null value leads to executor will be auto created on start.
*
* @param executor the executor to use while scheduling reporting of metrics.
* @return {@code this}
*/
public Builder scheduleOn(ScheduledExecutorService executor) {
this.executor = executor;
return this;
}
/**
* Log metrics to the given logger.
*
* @param logger an SLF4J {@link Logger}
* @return {@code this}
*/
public Builder outputTo(Logger logger) {
this.logger = logger;
return this;
}
/**
* Mark all logged metrics with the given marker.
*
* @param marker an SLF4J {@link Marker}
* @return {@code this}
*/
public Builder markWith(Marker marker) {
this.marker = marker;
return this;
}
/**
* Prefix all metric names with the given string.
*
* @param prefix the prefix for all metric names
* @return {@code this}
*/
public Builder prefixedWith(String prefix) {
this.prefix = prefix;
return this;
}
/**
* Convert rates to the given time unit.
*
* @param rateUnit a unit of time
* @return {@code this}
*/
public Builder convertRatesTo(TimeUnit rateUnit) {
this.rateUnit = rateUnit;
return this;
}
/**
* Convert durations to the given time unit.
*
* @param durationUnit a unit of time
* @return {@code this}
*/
public Builder convertDurationsTo(TimeUnit durationUnit) {
this.durationUnit = durationUnit;
return this;
}
/**
* Only report metrics which match the given filter.
*
* @param filter a {@link MetricFilter}
* @return {@code this}
*/
public Builder filter(MetricFilter filter) {
this.filter = filter;
return this;
}
/**
* Use Logging Level when reporting.
*
* @param loggingLevel a (@link Slf4jReporter.LoggingLevel}
* @return {@code this}
*/
public Builder withLoggingLevel(LoggingLevel loggingLevel) {
this.loggingLevel = loggingLevel;
return this;
}
/**
* Builds a {@link Log4JMetricsReporter} with the given properties.
*
* @return a {@link Log4JMetricsReporter}
*/
public Log4JMetricsReporter build() {
LoggerProxy loggerProxy;
switch (loggingLevel) {
case TRACE:
loggerProxy = new TraceLoggerProxy(logger);
break;
case INFO:
loggerProxy = new InfoLoggerProxy(logger);
break;
case WARN:
loggerProxy = new WarnLoggerProxy(logger);
break;
case ERROR:
loggerProxy = new ErrorLoggerProxy(logger);
break;
default:
case DEBUG:
loggerProxy = new DebugLoggerProxy(logger);
break;
}
return new Log4JMetricsReporter(registry, loggerProxy, marker, prefix, rateUnit, durationUnit, filter, executor, shutdownExecutorOnStop);
}
}
private final LoggerProxy loggerProxy;
private final Marker marker;
private final String prefix;
private Log4JMetricsReporter(MetricRegistry registry,
LoggerProxy loggerProxy,
Marker marker,
String prefix,
TimeUnit rateUnit,
TimeUnit durationUnit,
MetricFilter filter,
ScheduledExecutorService executor,
boolean shutdownExecutorOnStop) {
super(registry, "logger-reporter", filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop);
this.loggerProxy = loggerProxy;
this.marker = marker;
this.prefix = prefix;
}
@Override
@SuppressWarnings("rawtypes")
public void report(SortedMap<String, Gauge> gauges,
SortedMap<String, Counter> counters,
SortedMap<String, Histogram> histograms,
SortedMap<String, Meter> meters,
SortedMap<String, Timer> timers) {
if (loggerProxy.isEnabled(marker)) {
for (Entry<String, Gauge> entry : gauges.entrySet()) {
logGauge(entry.getKey(), entry.getValue());
}
for (Entry<String, Counter> entry : counters.entrySet()) {
logCounter(entry.getKey(), entry.getValue());
}
for (Entry<String, Histogram> entry : histograms.entrySet()) {
logHistogram(entry.getKey(), entry.getValue());
}
for (Entry<String, Meter> entry : meters.entrySet()) {
logMeter(entry.getKey(), entry.getValue());
}
for (Entry<String, Timer> entry : timers.entrySet()) {
logTimer(entry.getKey(), entry.getValue());
}
}
}
private void logTimer(String name, Timer timer) {
final Snapshot snapshot = timer.getSnapshot();
loggerProxy.log(marker,
"type={}, name={}, count={}, min={}, max={}, mean={}, stddev={}, median={}, " +
"p75={}, p95={}, p98={}, p99={}, p999={}, mean_rate={}, m1={}, m5={}, " +
"m15={}, rate_unit={}, duration_unit={}",
"TIMER",
prefix(name),
timer.getCount(),
convertDuration(snapshot.getMin()),
convertDuration(snapshot.getMax()),
convertDuration(snapshot.getMean()),
convertDuration(snapshot.getStdDev()),
convertDuration(snapshot.getMedian()),
convertDuration(snapshot.get75thPercentile()),
convertDuration(snapshot.get95thPercentile()),
convertDuration(snapshot.get98thPercentile()),
convertDuration(snapshot.get99thPercentile()),
convertDuration(snapshot.get999thPercentile()),
convertRate(timer.getMeanRate()),
convertRate(timer.getOneMinuteRate()),
convertRate(timer.getFiveMinuteRate()),
convertRate(timer.getFifteenMinuteRate()),
getRateUnit(),
getDurationUnit());
}
private void logMeter(String name, Meter meter) {
loggerProxy.log(marker,
"type={}, name={}, count={}, mean_rate={}, m1={}, m5={}, m15={}, rate_unit={}",
"METER",
prefix(name),
meter.getCount(),
convertRate(meter.getMeanRate()),
convertRate(meter.getOneMinuteRate()),
convertRate(meter.getFiveMinuteRate()),
convertRate(meter.getFifteenMinuteRate()),
getRateUnit());
}
private void logHistogram(String name, Histogram histogram) {
final Snapshot snapshot = histogram.getSnapshot();
loggerProxy.log(marker,
"type={}, name={}, count={}, min={}, max={}, mean={}, stddev={}, " +
"median={}, p75={}, p95={}, p98={}, p99={}, p999={}",
"HISTOGRAM",
prefix(name),
histogram.getCount(),
snapshot.getMin(),
snapshot.getMax(),
snapshot.getMean(),
snapshot.getStdDev(),
snapshot.getMedian(),
snapshot.get75thPercentile(),
snapshot.get95thPercentile(),
snapshot.get98thPercentile(),
snapshot.get99thPercentile(),
snapshot.get999thPercentile());
}
private void logCounter(String name, Counter counter) {
loggerProxy.log(marker, "type={}, name={}, count={}", "COUNTER", prefix(name), counter.getCount());
}
private void logGauge(String name, Gauge<?> gauge) {
loggerProxy.log(marker, "type={}, name={}, value={}", "GAUGE", prefix(name), gauge.getValue());
}
@Override
protected String getRateUnit() {
return "events/" + super.getRateUnit();
}
private String prefix(String... components) {
return MetricRegistry.name(prefix, components);
}
/* private class to allow logger configuration */
static abstract class LoggerProxy {
protected final Logger logger;
public LoggerProxy(Logger logger) {
this.logger = logger;
}
abstract void log(Marker marker, String format, Object... arguments);
abstract boolean isEnabled(Marker marker);
}
/* private class to allow logger configuration */
private static class DebugLoggerProxy extends LoggerProxy {
public DebugLoggerProxy(Logger logger) {
super(logger);
}
@Override
public void log(Marker marker, String format, Object... arguments) {
logger.debug(marker, format, arguments);
}
@Override
public boolean isEnabled(Marker marker) {
return logger.isDebugEnabled(marker);
}
}
/* private class to allow logger configuration */
private static class TraceLoggerProxy extends LoggerProxy {
public TraceLoggerProxy(Logger logger) {
super(logger);
}
@Override
public void log(Marker marker, String format, Object... arguments) {
logger.trace(marker, format, arguments);
}
@Override
public boolean isEnabled(Marker marker) {
return logger.isTraceEnabled(marker);
}
}
/* private class to allow logger configuration */
private static class InfoLoggerProxy extends LoggerProxy {
public InfoLoggerProxy(Logger logger) {
super(logger);
}
@Override
public void log(Marker marker, String format, Object... arguments) {
logger.info(marker, format, arguments);
}
@Override
public boolean isEnabled(Marker marker) {
return logger.isInfoEnabled(marker);
}
}
/* private class to allow logger configuration */
private static class WarnLoggerProxy extends LoggerProxy {
public WarnLoggerProxy(Logger logger) {
super(logger);
}
@Override
public void log(Marker marker, String format, Object... arguments) {
logger.warn(marker, format, arguments);
}
@Override
public boolean isEnabled(Marker marker) {
return logger.isWarnEnabled(marker);
}
}
/* private class to allow logger configuration */
private static class ErrorLoggerProxy extends LoggerProxy {
public ErrorLoggerProxy(Logger logger) {
super(logger);
}
@Override
public void log(Marker marker, String format, Object... arguments) {
logger.error(marker, format, arguments);
}
@Override
public boolean isEnabled(Marker marker) {
return logger.isErrorEnabled(marker);
}
}
}

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.core.metadata;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
@@ -29,30 +30,30 @@ import org.apache.logging.log4j.Logger;
import java.util.Optional;
public class MarkdownFinder {
private final static Logger logger = LogManager.getLogger(MarkdownFinder.class);
private static final Logger logger = LogManager.getLogger(MarkdownFinder.class);
public static Optional<String> forHelpTopic(String topic) {
public static Optional<String> forHelpTopic(final String topic) {
String help = null;
try {
help = new MarkdownFinder().forActivityInstance(topic);
return Optional.ofNullable(help);
} catch (Exception e) {
logger.debug("Did not find help topic for activity instance: " + topic);
} catch (final Exception e) {
MarkdownFinder.logger.debug("Did not find help topic for activity instance: {}", topic);
}
try {
help = new MarkdownFinder().forResourceMarkdown(topic, "docs/");
return Optional.ofNullable(help);
} catch (Exception e) {
logger.debug("Did not find help topic for generic markdown file: " + topic + "(.md)");
} catch (final Exception e) {
MarkdownFinder.logger.debug("Did not find help topic for generic markdown file: {}(.md)", topic);
}
return Optional.empty();
}
public String forResourceMarkdown(String s, String... additionalSearchPaths) {
Optional<Content<?>> docs = NBIO.local()
public String forResourceMarkdown(final String s, final String... additionalSearchPaths) {
final Optional<Content<?>> docs = NBIO.local()
.searchPrefixes("docs")
.searchPrefixes(additionalSearchPaths)
.pathname(s)
@@ -62,11 +63,11 @@ public class MarkdownFinder {
return docs.map(Content::asString).orElse(null);
}
public String forActivityInstance(String s) {
ActivityType activityType = new ActivityTypeLoader().load(ActivityDef.parseActivityDef("driver="+s)).orElseThrow(
() -> new BasicError("Unable to find driver for '" + s + "'")
public String forActivityInstance(final String s) {
final ActivityType activityType = new ActivityTypeLoader().load(ActivityDef.parseActivityDef("driver="+s), NBLabeledElement.EMPTY).orElseThrow(
() -> new BasicError("Unable to find driver for '" + s + '\'')
);
return forResourceMarkdown(activityType.getClass().getAnnotation(Service.class)
return this.forResourceMarkdown(activityType.getClass().getAnnotation(Service.class)
.selector() + ".md", "docs/");
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,10 +19,11 @@ package io.nosqlbench.engine.core.metrics;
import com.codahale.metrics.*;
import com.codahale.metrics.graphite.Graphite;
import com.codahale.metrics.graphite.GraphiteReporter;
import io.nosqlbench.api.engine.metrics.reporters.PromPushReporter;
import io.nosqlbench.engine.api.activityapi.core.Shutdownable;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.core.lifecycle.process.ShutdownManager;
import io.nosqlbench.engine.core.logging.Log4JMetricsReporter;
import io.nosqlbench.api.engine.metrics.reporters.Log4JMetricsReporter;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@@ -34,7 +35,7 @@ import java.util.Locale;
import java.util.concurrent.TimeUnit;
public class MetricReporters implements Shutdownable {
private final static Logger logger = LogManager.getLogger(MetricReporters.class);
private static final Logger logger = LogManager.getLogger(MetricReporters.class);
private static final MetricReporters instance = new MetricReporters();
private final List<PrefixedRegistry> metricRegistries = new ArrayList<>();
@@ -55,7 +56,7 @@ public class MetricReporters implements Shutdownable {
public MetricReporters addGraphite(String dest, String prefix) {
logger.debug(() -> "Adding graphite reporter to " + dest + " with prefix " + prefix);
if (dest.indexOf(":")>=0) {
if (0 <= dest.indexOf(':')) {
String[] split = dest.split(":");
addGraphite(split[0],Integer.valueOf(split[1]),prefix);
} else {
@@ -101,7 +102,7 @@ public class MetricReporters implements Shutdownable {
for (PrefixedRegistry prefixedRegistry : metricRegistries) {
Graphite graphite = new Graphite(new InetSocketAddress(host, graphitePort));
String _prefix = prefixedRegistry.prefix != null ? (!prefixedRegistry.prefix.isEmpty() ? globalPrefix + "." + prefixedRegistry.prefix : globalPrefix) : globalPrefix;
String _prefix = null != prefixedRegistry.prefix ? !prefixedRegistry.prefix.isEmpty() ? globalPrefix + '.' + prefixedRegistry.prefix : globalPrefix : globalPrefix;
GraphiteReporter graphiteReporter = GraphiteReporter.forRegistry(prefixedRegistry.metricRegistry)
.prefixedWith(_prefix)
.convertRatesTo(TimeUnit.SECONDS)
@@ -114,6 +115,30 @@ public class MetricReporters implements Shutdownable {
return this;
}
public MetricReporters addPromPush(final String reportPromPushTo, final String prefix) {
logger.debug(() -> "Adding prompush reporter to " + reportPromPushTo + " with prefix label to " + prefix);
if (metricRegistries.isEmpty()) {
throw new RuntimeException("There are no metric registries.");
}
for (PrefixedRegistry prefixedRegistry : metricRegistries) {
final PromPushReporter promPushReporter =
new PromPushReporter(
reportPromPushTo,
prefixedRegistry.metricRegistry,
"prompush",
MetricFilter.ALL,
TimeUnit.SECONDS,
TimeUnit.NANOSECONDS
);
scheduledReporters.add(promPushReporter);
}
return this;
}
public MetricReporters addLogger() {
logger.debug("Adding log4j reporter for metrics");
@@ -164,6 +189,7 @@ public class MetricReporters implements Shutdownable {
return this;
}
@Override
public void shutdown() {
for (ScheduledReporter reporter : scheduledReporters) {
reporter.report();
@@ -171,6 +197,7 @@ public class MetricReporters implements Shutdownable {
}
}
private class PrefixedRegistry {
public String prefix;
public MetricRegistry metricRegistry;

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.core;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.input.Input;
@@ -35,6 +36,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -86,24 +88,24 @@ class ActivityExecutorTest {
@Test
synchronized void testDelayedStartSanity() {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
new ActivityTypeLoader().load(activityDef);
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY);
final Activity activity = new DelayedInitActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(activity);
ActionDispenser actionDispenser = new CoreActionDispenser(activity);
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
Activity activity = new DelayedInitActivity(activityDef);
final InputDispenser inputDispenser = new CoreInputDispenser(activity);
final ActionDispenser actionDispenser = new CoreActionDispenser(activity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
activity.setActionDispenserDelegate(actionDispenser);
activity.setOutputDispenserDelegate(outputDispenser);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
final ExecutorService testExecutor = Executors.newCachedThreadPool();
final Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
try {
@@ -112,7 +114,7 @@ class ActivityExecutorTest {
future.get();
testExecutor.shutdownNow();
} catch (Exception e) {
} catch (final Exception e) {
fail("Unexpected exception", e);
}
@@ -122,38 +124,38 @@ class ActivityExecutorTest {
@Test
synchronized void testNewActivityExecutor() {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef);
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef,NBLabeledElement.EMPTY);
getActivityMotorFactory(motorActionDelay(999), new AtomicInput(activityDef));
this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(activityDef));
final Activity simpleActivity = new SimpleActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
Activity simpleActivity = new SimpleActivity(activityDef, NBLabeledElement.forMap(Map.of()));
final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
inputDispenser, actionDispenser, outputDispenser);
simpleActivity.setActionDispenserDelegate(actionDispenser);
simpleActivity.setInputDispenserDelegate(inputDispenser);
simpleActivity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
activityDef.setThreads(5);
activityExecutor.startActivity();
int[] speeds = new int[]{1, 50, 5, 50, 2, 50};
final int[] speeds = {1, 50, 5, 50, 2, 50};
for (int offset = 0; offset < speeds.length; offset += 2) {
int threadTarget = speeds[offset];
int threadTime = speeds[offset + 1];
final int threadTarget = speeds[offset];
final int threadTime = speeds[offset + 1];
logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
ActivityExecutorTest.logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
activityDef.setThreads(threadTarget);
try {
Thread.sleep(threadTime);
} catch (Exception e) {
} catch (final Exception e) {
fail("Not expecting exception", e);
}
}
@@ -162,31 +164,31 @@ class ActivityExecutorTest {
try {
activityExecutor.stopActivity();
// Thread.sleep(2000L);
} catch (Exception e) {
} catch (final Exception e) {
fail("Not expecting exception", e);
}
}
private MotorDispenser<?> getActivityMotorFactory(Action lc, final Input ls) {
private MotorDispenser<?> getActivityMotorFactory(final Action lc, Input ls) {
return new MotorDispenser<>() {
@Override
public Motor getMotor(ActivityDef activityDef, int slotId) {
Activity activity = new SimpleActivity(activityDef);
Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
public Motor getMotor(final ActivityDef activityDef, final int slotId) {
final Activity activity = new SimpleActivity(activityDef, NBLabeledElement.forMap(Map.of()));
final Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
cm.setAction(lc);
return cm;
}
};
}
private SyncAction motorActionDelay(final long delay) {
private SyncAction motorActionDelay(long delay) {
return new SyncAction() {
@Override
public int runCycle(long cycle) {
logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
public int runCycle(final long cycle) {
ActivityExecutorTest.logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ignored) {
} catch (final InterruptedException ignored) {
}
return 0;
}
@@ -197,19 +199,19 @@ class ActivityExecutorTest {
private static class DelayedInitActivity extends SimpleActivity {
private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class);
public DelayedInitActivity(ActivityDef activityDef) {
super(activityDef);
public DelayedInitActivity(final ActivityDef activityDef) {
super(activityDef, NBLabeledElement.EMPTY);
}
@Override
public void initActivity() {
Integer initDelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
logger.info(() -> "delaying for " + initDelay);
final Integer initDelay = this.activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
DelayedInitActivity.logger.info(() -> "delaying for " + initDelay);
try {
Thread.sleep(initDelay);
} catch (InterruptedException ignored) {
} catch (final InterruptedException ignored) {
}
logger.info(() -> "delayed for " + initDelay);
DelayedInitActivity.logger.info(() -> "delayed for " + initDelay);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,13 +16,18 @@
package io.nosqlbench.engine.core;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput;
import org.junit.jupiter.api.Test;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.Predicate;
@@ -33,41 +38,44 @@ public class CoreMotorTest {
@Test
public void testBasicActivityMotor() {
BlockingSegmentInput lockstepper = new BlockingSegmentInput();
Activity activity = new SimpleActivity(ActivityDef.parseActivityDef("alias=foo"));
Motor cm = new CoreMotor(activity, 5L, lockstepper);
AtomicLong observableAction = new AtomicLong(-3L);
cm.setAction(getTestConsumer(observableAction));
Thread t = new Thread(cm);
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Activity activity = new SimpleActivity(
ActivityDef.parseActivityDef("alias=foo"),
NBLabeledElement.forMap(Map.of("testing","coremotor"))
);
final Motor cm = new CoreMotor(activity, 5L, lockstepper);
final AtomicLong observableAction = new AtomicLong(-3L);
cm.setAction(this.getTestConsumer(observableAction));
final Thread t = new Thread(cm);
t.setName("TestMotor");
t.start();
try {
Thread.sleep(1000); // allow action time to be waiting in monitor for test fixture
} catch (InterruptedException ignored) {}
} catch (final InterruptedException ignored) {}
lockstepper.publishSegment(5L);
boolean result = awaitCondition(atomicInteger -> (atomicInteger.get()==5L),observableAction,5000,100);
final boolean result = this.awaitCondition(atomicInteger -> 5L == atomicInteger.get(),observableAction,5000,100);
assertThat(observableAction.get()).isEqualTo(5L);
}
@Test
public void testIteratorStride() {
BlockingSegmentInput lockstepper = new BlockingSegmentInput();
Motor cm1 = new CoreMotor(new SimpleActivity("stride=3"),1L, lockstepper);
AtomicLongArray ary = new AtomicLongArray(10);
Action a1 = getTestArrayConsumer(ary);
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm1 = new CoreMotor(new SimpleActivity("stride=3",NBLabeledElement.EMPTY),1L, lockstepper);
final AtomicLongArray ary = new AtomicLongArray(10);
final Action a1 = this.getTestArrayConsumer(ary);
cm1.setAction(a1);
Thread t1 = new Thread(cm1);
final Thread t1 = new Thread(cm1);
t1.setName("cm1");
t1.start();
try {
Thread.sleep(500); // allow action time to be waiting in monitor for test fixture
} catch (InterruptedException ignored) {}
} catch (final InterruptedException ignored) {}
lockstepper.publishSegment(11L,12L,13L);
boolean result = awaitAryCondition(ala -> (ala.get(2)==13L),ary,5000,100);
final boolean result = this.awaitAryCondition(ala -> 13L == ala.get(2),ary,5000,100);
assertThat(ary.get(0)).isEqualTo(11L);
assertThat(ary.get(1)).isEqualTo(12L);
assertThat(ary.get(2)).isEqualTo(13L);
@@ -75,20 +83,21 @@ public class CoreMotorTest {
}
private SyncAction getTestArrayConsumer(final AtomicLongArray ary) {
private SyncAction getTestArrayConsumer(AtomicLongArray ary) {
return new SyncAction() {
private int offset=0;
private int offset;
@Override
public int runCycle(long cycle) {
ary.set(offset++, cycle);
public int runCycle(final long cycle) {
ary.set(this.offset, cycle);
this.offset++;
return 0;
}
};
}
private SyncAction getTestConsumer(final AtomicLong atomicLong) {
private SyncAction getTestConsumer(AtomicLong atomicLong) {
return new SyncAction() {
@Override
public int runCycle(long cycle) {
public int runCycle(final long cycle) {
atomicLong.set(cycle);
return 0;
}
@@ -96,34 +105,30 @@ public class CoreMotorTest {
}
private boolean awaitAryCondition(Predicate<AtomicLongArray> atomicLongAryPredicate, AtomicLongArray ary, long millis, long retry) {
long start = System.currentTimeMillis();
private boolean awaitAryCondition(final Predicate<AtomicLongArray> atomicLongAryPredicate, final AtomicLongArray ary, final long millis, final long retry) {
final long start = System.currentTimeMillis();
long now=start;
while (now < start + millis) {
boolean result = atomicLongAryPredicate.test(ary);
if (result) {
return true;
} else {
try {
Thread.sleep(retry);
} catch (InterruptedException ignored) {}
while (now < (start + millis)) {
final boolean result = atomicLongAryPredicate.test(ary);
if (result) return true;
try {
Thread.sleep(retry);
} catch (final InterruptedException ignored) {
}
now = System.currentTimeMillis();
}
return false;
}
private boolean awaitCondition(Predicate<AtomicLong> atomicPredicate, AtomicLong atomicInteger, long millis, long retry) {
long start = System.currentTimeMillis();
private boolean awaitCondition(final Predicate<AtomicLong> atomicPredicate, final AtomicLong atomicInteger, final long millis, final long retry) {
final long start = System.currentTimeMillis();
long now=start;
while (now < start + millis) {
boolean result = atomicPredicate.test(atomicInteger);
if (result) {
return true;
} else {
try {
Thread.sleep(retry);
} catch (InterruptedException ignored) {}
while (now < (start + millis)) {
final boolean result = atomicPredicate.test(atomicInteger);
if (result) return true;
try {
Thread.sleep(retry);
} catch (final InterruptedException ignored) {
}
now = System.currentTimeMillis();
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.nosqlbench.engine.core.metrics;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
import org.junit.jupiter.api.Test;
@@ -26,12 +27,10 @@ public class NBMetricsSummaryTest {
@Test
public void testFormat() {
StringBuilder sb = new StringBuilder();
Timer timer = new Timer(new DeltaHdrHistogramReservoir("test", 4));
final StringBuilder sb = new StringBuilder();
final Timer timer = new Timer(new DeltaHdrHistogramReservoir(NBLabels.forKV("name","test"), 4));
for (int i = 0; i < 100000; i++) {
timer.update((i % 1000) + 1, TimeUnit.MILLISECONDS);
}
for (int i = 0; 100000 > i; i++) timer.update(i % 1000 + 1, TimeUnit.MILLISECONDS);
NBMetricsSummary.summarize(sb, "test", timer);