rework scenario invocation for better tests, unit test running only

This commit is contained in:
Jonathan Shook
2023-10-03 21:57:15 -05:00
parent 280c270d8f
commit 17052eaadf
38 changed files with 1391 additions and 956 deletions

View File

@@ -16,11 +16,8 @@
package io.nosqlbench.engine.api.scripting; package io.nosqlbench.engine.api.scripting;
import javax.script.SimpleScriptContext; import javax.script.SimpleScriptContext;
import java.io.CharArrayWriter;
import java.io.IOException;
import java.io.Reader; import java.io.Reader;
import java.io.Writer; import java.io.Writer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@@ -92,9 +89,9 @@ public class ScriptEnvBuffer extends SimpleScriptContext {
public List<String> getTimeLogLines() { public List<String> getTimeLogLines() {
List<String> log = new ArrayList<String>(); List<String> log = new ArrayList<String>();
Optional.ofNullable(this.stdinBuffer).map(t->t.timedLog).ifPresent(log::addAll); Optional.ofNullable(this.stdinBuffer).map(DiagReader::getTimedLog).ifPresent(log::addAll);
Optional.ofNullable(this.stderrBuffer).map(t->t.timedLog).ifPresent(log::addAll); Optional.ofNullable(this.stderrBuffer).map(DiagWriter::getTimedLog).ifPresent(log::addAll);
Optional.ofNullable(this.stdoutBuffer).map(t->t.timedLog).ifPresent(log::addAll); Optional.ofNullable(this.stdoutBuffer).map(DiagWriter::getTimedLog).ifPresent(log::addAll);
log = log.stream().map(l -> l.endsWith("\n") ? l : l+"\n").collect(Collectors.toList()); log = log.stream().map(l -> l.endsWith("\n") ? l : l+"\n").collect(Collectors.toList());
return log; return log;
} }
@@ -102,87 +99,4 @@ public class ScriptEnvBuffer extends SimpleScriptContext {
return getTimeLogLines().stream().collect(Collectors.joining()); return getTimeLogLines().stream().collect(Collectors.joining());
} }
private class DiagReader extends Reader {
Reader wrapped;
private final String prefix;
CharArrayWriter buffer = new CharArrayWriter(0);
private final List<String> timedLog = new ArrayList<String>();
public DiagReader(Reader wrapped, String prefix) {
this.wrapped = wrapped; this.prefix = prefix;
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
String tsprefix = LocalDateTime.now().format(tsformat);
int read = wrapped.read(cbuf, off, len);
buffer.write(cbuf, off, len);
timedLog.add(tsprefix + prefix + new String(cbuf, off, len));
return read;
}
@Override
public void close() throws IOException {
wrapped.close();
buffer.close();
}
}
private class DiagWriter extends Writer {
Writer wrapped;
private final String prefix;
CharArrayWriter buffer = new CharArrayWriter();
private final List<String> timedLog = new ArrayList<String>();
private final StringBuilder sb = new StringBuilder();
public DiagWriter(Writer wrapped, String prefix) {
this.wrapped = wrapped;
this.prefix = prefix;
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
String tsprefix = LocalDateTime.now().format(tsformat);
buffer.write(cbuf, off, len);
String text = new String(cbuf, off, len);
sb.append(text);
if (text.contains("\n")) {
String msgs = sb.toString();
String extra = msgs.substring(msgs.lastIndexOf("\n")+1);
sb.setLength(0);
sb.append(extra);
String[] parts = msgs.substring(0,msgs.length()-extra.length()).split("\n");
for (String part : parts) {
if (!part.isBlank()) {
String tslogEntry = tsprefix + prefix + part + "\n";
timedLog.add(tslogEntry);
}
}
}
wrapped.write(cbuf, off, len);
}
@Override
public void flush() throws IOException {
buffer.flush();
wrapped.flush();
}
@Override
public void close() throws IOException {
buffer.close();
wrapped.close();
}
}
} }

View File

@@ -51,7 +51,6 @@ import io.nosqlbench.engine.core.clientload.StatReader;
import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler; import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader; import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler; import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler;
import io.nosqlbench.engine.core.lifecycle.scenario.script.MetricsMapper;
import io.nosqlbench.engine.core.lifecycle.session.NBSession; import io.nosqlbench.engine.core.lifecycle.session.NBSession;
import io.nosqlbench.engine.core.logging.LoggerConfig; import io.nosqlbench.engine.core.logging.LoggerConfig;
import io.nosqlbench.engine.core.metadata.MarkdownFinder; import io.nosqlbench.engine.core.metadata.MarkdownFinder;
@@ -434,9 +433,6 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
new NBBaseComponent(null), new NBBaseComponent(null),
sessionName, sessionName,
options.getProgressSpec(), options.getProgressSpec(),
options.getReportSummaryTo(),
options.getLogsDirectory(),
options.getScriptFile(),
options.wantsShowScript() options.wantsShowScript()
); );
ExecutionResult sessionResult = session.apply(options.getCommands()); ExecutionResult sessionResult = session.apply(options.getCommands());
@@ -461,10 +457,6 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
} }
private String getMetricsHelpFor(final String activityType) {
final String metrics = MetricsMapper.metricsDetail(activityType);
return metrics;
}
private void registerLoadAvgMetrics() { private void registerLoadAvgMetrics() {
LoadAvgReader reader = new LoadAvgReader(); LoadAvgReader reader = new LoadAvgReader();

View File

@@ -38,6 +38,7 @@ public class Cmd {
stop(Arg.of("alias_name")), stop(Arg.of("alias_name")),
forceStop(Arg.of("alias_name")), forceStop(Arg.of("alias_name")),
script(Arg.of("script_path", s -> s)), script(Arg.of("script_path", s -> s)),
java(Arg.of("main_class",s->s)),
await(Arg.of("alias_name")), await(Arg.of("alias_name")),
waitMillis(Arg.of("millis_to_wait", Long::parseLong)), waitMillis(Arg.of("millis_to_wait", Long::parseLong)),
fragment(Arg.ofFreeform("script_fragment")),; fragment(Arg.ofFreeform("script_fragment")),;
@@ -220,4 +221,14 @@ public class Cmd {
return "// params.size==" + map.size() + "\n" + varname + "=" + toJSONBlock(map, oneline); return "// params.size==" + map.size() + "\n" + varname + "=" + toJSONBlock(map, oneline);
} }
public static List<Cmd> parseCmds (String...arglist){
LinkedList<String> ll = new LinkedList<>(Arrays.asList(arglist));
List<Cmd> cmds = new ArrayList<>();
while (!ll.isEmpty()) {
Cmd cmd = parseArg(ll, null);
cmds.add(cmd);
}
return cmds;
}
} }

View File

@@ -19,6 +19,8 @@ package io.nosqlbench.engine.cli;
import io.nosqlbench.api.content.Content; import io.nosqlbench.api.content.Content;
import io.nosqlbench.api.content.NBIO; import io.nosqlbench.api.content.NBIO;
import io.nosqlbench.engine.api.scenarios.NBCLIScenarioParser; import io.nosqlbench.engine.api.scenarios.NBCLIScenarioParser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*; import java.util.*;
@@ -28,6 +30,7 @@ import java.util.*;
* was not recognized. * was not recognized.
*/ */
public class SessionCommandParser { public class SessionCommandParser {
private final static Logger logger = LogManager.getLogger(SessionCommandParser.class);
private static final String FRAGMENT = "fragment"; private static final String FRAGMENT = "fragment";
private static final String SCRIPT = "script"; private static final String SCRIPT = "script";
@@ -40,24 +43,33 @@ public class SessionCommandParser {
private static final String SCENARIO = "scenario"; private static final String SCENARIO = "scenario";
private static final String WAIT_MILLIS = "waitmillis"; private static final String WAIT_MILLIS = "waitmillis";
private static final String JAVA_MAIN = "java";
public static final Set<String> RESERVED_WORDS = new HashSet<>() {{ public static final Set<String> RESERVED_WORDS = new HashSet<>() {{
addAll( addAll(
Arrays.asList( Arrays.asList(
FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, FORCE_STOP, ACTIVITY, SCENARIO, WAIT_MILLIS FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, FORCE_STOP, ACTIVITY, SCENARIO, WAIT_MILLIS
) )
); );
}}; }};
public static Optional<List<Cmd>> parse( public static Optional<List<Cmd>> parse(
LinkedList<String> arglist, LinkedList<String> arglist,
String... includes String... includes
) { ) {
boolean scriptCommands = false;
boolean javaCommands = false;
List<Cmd> cmdList = new LinkedList<>(); List<Cmd> cmdList = new LinkedList<>();
PathCanonicalizer canonicalizer = new PathCanonicalizer(includes); PathCanonicalizer canonicalizer = new PathCanonicalizer(includes);
while (arglist.peekFirst() != null) { while (arglist.peekFirst() != null) {
String word = arglist.peekFirst(); String word = arglist.peekFirst();
Cmd cmd; Cmd cmd;
switch (word) { switch (word) {
case JAVA_MAIN:
cmd = Cmd.parseArg(arglist, canonicalizer);
cmdList.add(cmd);
javaCommands = true;
break;
case FRAGMENT: case FRAGMENT:
case SCRIPT: case SCRIPT:
case START: case START:
@@ -68,13 +80,14 @@ public class SessionCommandParser {
case WAIT_MILLIS: case WAIT_MILLIS:
cmd = Cmd.parseArg(arglist, canonicalizer); cmd = Cmd.parseArg(arglist, canonicalizer);
cmdList.add(cmd); cmdList.add(cmd);
scriptCommands = true;
break; break;
default: default:
Optional<Content<?>> scriptfile = NBIO.local() Optional<Content<?>> scriptfile = NBIO.local()
.searchPrefixes("scripts/auto") .searchPrefixes("scripts/auto")
.pathname(word) .pathname(word)
.extensionSet("js") .extensionSet("js")
.first(); .first();
//Script //Script
if (scriptfile.isPresent()) { if (scriptfile.isPresent()) {
@@ -86,10 +99,14 @@ public class SessionCommandParser {
} else if (NBCLIScenarioParser.isFoundWorkload(word, includes)) { } else if (NBCLIScenarioParser.isFoundWorkload(word, includes)) {
NBCLIScenarioParser.parseScenarioCommand(arglist, RESERVED_WORDS, includes); NBCLIScenarioParser.parseScenarioCommand(arglist, RESERVED_WORDS, includes);
} else { } else {
logger.warn("unrecognized Cmd: " + word);
return Optional.empty(); return Optional.empty();
} }
break; break;
} }
if (javaCommands && scriptCommands) {
throw new RuntimeException("combining java and javascript commands into one session is not yet supported.");
}
} }
return Optional.of(cmdList); return Optional.of(cmdList);

View File

@@ -16,7 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.activity; package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenarioController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -24,9 +24,9 @@ public class ActivitiesExceptionHandler implements Thread.UncaughtExceptionHandl
private static final Logger logger = LogManager.getLogger(ActivitiesExceptionHandler.class); private static final Logger logger = LogManager.getLogger(ActivitiesExceptionHandler.class);
private final ScenarioController controller; private final ActivitiesController controller;
public ActivitiesExceptionHandler(ScenarioController controller) { public ActivitiesExceptionHandler(ActivitiesController controller) {
this.controller = controller; this.controller = controller;
logger.debug(() -> "Activities exception handler starting up for executor '" + this.controller + "'"); logger.debug(() -> "Activities exception handler starting up for executor '" + this.controller + "'");
} }

View File

@@ -22,7 +22,7 @@ import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.metrics.IndicatorMode; import io.nosqlbench.engine.api.metrics.IndicatorMode;
import io.nosqlbench.api.engine.metrics.PeriodicRunnable; import io.nosqlbench.api.engine.metrics.PeriodicRunnable;
import io.nosqlbench.api.engine.util.Unit; import io.nosqlbench.api.engine.util.Unit;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenarioController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -31,18 +31,18 @@ import java.util.HashSet;
import java.util.Locale; import java.util.Locale;
import java.util.Set; import java.util.Set;
public class ActivityProgressIndicator implements Runnable { public class ActivitiesProgressIndicator implements Runnable {
private final static Logger logger = LogManager.getLogger("PROGRESS"); private final static Logger logger = LogManager.getLogger("PROGRESS");
private final String indicatorSpec; private final String indicatorSpec;
private final ScenarioController sc; private final ActivitiesController sc;
private PeriodicRunnable<ActivityProgressIndicator> runnable; private PeriodicRunnable<ActivitiesProgressIndicator> runnable;
private IndicatorMode indicatorMode = IndicatorMode.console; private IndicatorMode indicatorMode = IndicatorMode.console;
private final Set<String> seen = new HashSet<>(); private final Set<String> seen = new HashSet<>();
private long intervalMillis = 1L; private long intervalMillis = 1L;
public ActivityProgressIndicator(ScenarioController sc, String indicatorSpec) { public ActivitiesProgressIndicator(ActivitiesController sc, String indicatorSpec) {
this.sc = sc; this.sc = sc;
this.indicatorSpec = indicatorSpec; this.indicatorSpec = indicatorSpec;
start(); start();

View File

@@ -79,12 +79,11 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
private ActivityExecutorShutdownHook shutdownHook = null; private ActivityExecutorShutdownHook shutdownHook = null;
private NBMetricGauge threadsGauge; private NBMetricGauge threadsGauge;
public ActivityExecutor(Activity activity, String sessionId) { public ActivityExecutor(Activity activity) {
this.activity = activity; this.activity = activity;
this.activityDef = activity.getActivityDef(); this.activityDef = activity.getActivityDef();
activity.getActivityDef().getParams().addListener(this); activity.getActivityDef().getParams().addListener(this);
activity.setActivityController(this); activity.setActivityController(this);
this.sessionId = sessionId;
this.tally = activity.getRunStateTally(); this.tally = activity.getRunStateTally();
} }

View File

@@ -16,11 +16,10 @@
package io.nosqlbench.engine.core.lifecycle.activity; package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType; import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
import io.nosqlbench.engine.core.lifecycle.scenario.NBScenario;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -35,10 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class ActivityLoader { public class ActivityLoader {
private static final Logger logger = LogManager.getLogger("ACTIVITIES"); private static final Logger logger = LogManager.getLogger("ACTIVITIES");
private final Map<String, Activity> activityMap = new ConcurrentHashMap<>(); private final Map<String, Activity> activityMap = new ConcurrentHashMap<>();
private final NBScenario scenario;
public ActivityLoader(final NBScenario scenario) { public ActivityLoader() {
this.scenario = scenario;
} }
public synchronized Activity loadActivity(ActivityDef activityDef, final NBComponent parent) { public synchronized Activity loadActivity(ActivityDef activityDef, final NBComponent parent) {

View File

@@ -1,507 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario;
import com.codahale.metrics.MetricRegistry;
import com.oracle.truffle.js.scriptengine.GraalJSScriptEngine;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.extensions.SandboxExtensionFinder;
import io.nosqlbench.api.extensions.ScriptingExtensionPluginInfo;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.metadata.ScenarioMetadata;
import io.nosqlbench.api.metadata.ScenarioMetadataAware;
import io.nosqlbench.api.metadata.SystemId;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScenarioContext;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScriptParams;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.ActivityBindings;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotMetricRegistryBindings;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotScenarioController;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.Engine.Builder;
import org.graalvm.polyglot.EnvironmentAccess;
import org.graalvm.polyglot.HostAccess;
import org.graalvm.polyglot.PolyglotAccess;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.*;
import java.util.concurrent.Callable;
public class NBScenario extends NBBaseComponent implements Callable<ExecutionMetricsResult> {
private final String reportSummaryTo;
private final Path logsPath;
private final Invocation invocation;
private final String scriptfile;
private Logger logger = LogManager.getLogger("SCENARIO");
private State state = State.Scheduled;
private volatile ScenarioShutdownHook scenarioShutdownHook;
private Exception error;
private ScenarioMetadata scenarioMetadata;
private ExecutionMetricsResult result;
private final NBLabeledElement parentComponent;
public Optional<ExecutionMetricsResult> getResultIfComplete() {
return Optional.ofNullable(result);
}
public enum Invocation {
RENDER_SCRIPT,
EXECUTE_SCRIPT
}
public enum State {
Scheduled,
Running,
Errored,
Interrupted,
Finished
}
private final List<String> scripts = new ArrayList<>();
private ScriptEngine scriptEngine;
private ScenarioController scenarioController;
private ActivityProgressIndicator activityProgressIndicator;
private String progressInterval = "console:1m";
private ScenarioContext scriptEnv;
private final String scenarioName;
private ScriptParams scenarioScriptParams;
private final Engine engine = Engine.Graalvm;
private long startedAtMillis = -1L;
private long endedAtMillis = -1L;
public enum Engine {
Graalvm
}
public NBScenario(
final String scenarioName,
final String progressInterval,
final String reportSummaryTo,
final Path logsPath,
String scriptfile,
NBComponent parentComponent,
Invocation invocation
) {
super(parentComponent, NBLabels.forKV("scenario", scenarioName));
this.scenarioName = scenarioName;
this.progressInterval = progressInterval;
this.reportSummaryTo = reportSummaryTo;
this.logsPath = logsPath;
this.scriptfile = scriptfile;
this.parentComponent = parentComponent;
this.invocation = invocation;
}
public static NBScenario forTesting(final String name, final String reportSummaryTo, NBComponent parent) {
return new NBScenario(
name,
"console:10s",
reportSummaryTo,
Path.of("logs"),
"",
new NBBaseComponent(parent,NBLabels.forKV("test","testrun")),
Invocation.EXECUTE_SCRIPT
);
}
public NBScenario setLogger(final Logger logger) {
this.logger = logger;
return this;
}
public Logger getLogger() {
return this.logger;
}
public NBScenario addScriptText(final String scriptText) {
this.scripts.add(scriptText);
return this;
}
public NBScenario addScriptFiles(final String... args) {
for (final String scriptFile : args) {
final Path scriptPath = Paths.get(scriptFile);
byte[] bytes = new byte[0];
try {
bytes = Files.readAllBytes(scriptPath);
} catch (final IOException e) {
e.printStackTrace();
}
final ByteBuffer bb = ByteBuffer.wrap(bytes);
final Charset utf8 = StandardCharsets.UTF_8;
final String scriptData = utf8.decode(bb).toString();
this.addScriptText(scriptData);
}
return this;
}
private void initializeScriptingEngine(final ScenarioController scenarioController) {
this.logger.debug("Using engine {}", this.engine.toString());
final MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
final Context.Builder contextSettings = Context.newBuilder("js")
.allowHostAccess(HostAccess.ALL)
.allowNativeAccess(true)
.allowCreateThread(true)
.allowIO(true)
.allowHostClassLookup(s -> true)
.allowHostClassLoading(true)
.allowCreateProcess(true)
.allowAllAccess(true)
.allowEnvironmentAccess(EnvironmentAccess.INHERIT)
.allowPolyglotAccess(PolyglotAccess.ALL)
.option("js.ecmascript-version", "2022")
.option("js.nashorn-compat", "true");
final Builder engineBuilder = org.graalvm.polyglot.Engine.newBuilder();
engineBuilder.option("engine.WarnInterpreterOnly", "false");
final org.graalvm.polyglot.Engine polyglotEngine = engineBuilder.build();
// TODO: add in, out, err for this scenario
scriptEngine = GraalJSScriptEngine.create(polyglotEngine, contextSettings);
if (!"disabled".equals(progressInterval))
this.activityProgressIndicator = new ActivityProgressIndicator(scenarioController, this.progressInterval);
this.scriptEnv = new ScenarioContext(scenarioName, scenarioController);
this.scriptEngine.setContext(this.scriptEnv);
this.scriptEngine.put("params", this.scenarioScriptParams);
this.scriptEngine.put("scenario", new PolyglotScenarioController(scenarioController));
this.scriptEngine.put("metrics", new PolyglotMetricRegistryBindings(metricRegistry));
this.scriptEngine.put("activities", new ActivityBindings(scenarioController));
for (final ScriptingExtensionPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
if (!extensionDescriptor.isAutoLoading()) {
this.logger.info(() -> "Not loading " + extensionDescriptor + ", autoloading is false");
continue;
}
final Logger extensionLogger =
LogManager.getLogger("extensions." + extensionDescriptor.getBaseVariableName());
final Object extensionObject = extensionDescriptor.getExtensionObject(
extensionLogger,
metricRegistry,
this.scriptEnv
);
ScenarioMetadataAware.apply(extensionObject, this.getScenarioMetadata());
this.logger.trace(() -> "Adding extension object: name=" + extensionDescriptor.getBaseVariableName() +
" class=" + extensionObject.getClass().getSimpleName());
this.scriptEngine.put(extensionDescriptor.getBaseVariableName(), extensionObject);
}
}
private synchronized ScenarioMetadata getScenarioMetadata() {
if (null == this.scenarioMetadata) scenarioMetadata = new ScenarioMetadata(
startedAtMillis,
scenarioName,
SystemId.getNodeId(),
SystemId.getNodeFingerprint()
);
return this.scenarioMetadata;
}
private synchronized void runScenario() {
this.scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(this.scenarioShutdownHook);
this.state = State.Running;
this.startedAtMillis = System.currentTimeMillis();
Annotators.recordAnnotation(
Annotation.newBuilder()
.element(this)
.now()
.layer(Layer.Scenario)
.addDetail("engine", engine.toString())
.build()
);
this.logger.debug("Running control script for {}.", scenarioName);
this.scenarioController = new ScenarioController(this);
try {
this.initializeScriptingEngine(this.scenarioController);
this.executeScenarioScripts();
final long awaitCompletionTime = 86400 * 365 * 1000L;
this.logger.debug("Awaiting completion of scenario and activities for {} millis.", awaitCompletionTime);
this.scenarioController.awaitCompletion(awaitCompletionTime);
} catch (final Exception e) {
error = e;
} finally {
this.scenarioController.shutdown();
}
Runtime.getRuntime().removeShutdownHook(this.scenarioShutdownHook);
final var runHook = this.scenarioShutdownHook;
this.scenarioShutdownHook = null;
runHook.run();
this.logger.debug("removing scenario shutdown hook");
}
public void notifyException(final Thread t, final Throwable e) {
error = new RuntimeException("in thread " + t.getName() + ", " + e, e);
}
private void executeScenarioScripts() {
for (final String script : this.scripts)
try {
Object result = null;
if ((scriptEngine instanceof Compilable compilableEngine)) {
this.logger.debug("Using direct script compilation");
final CompiledScript compiled = compilableEngine.compile(script);
this.logger.debug("-> invoking main scenario script (compiled)");
result = compiled.eval();
this.logger.debug("<- scenario script completed (compiled)");
} else if ((null != scriptfile) && !this.scriptfile.isEmpty()) {
final String filename = this.scriptfile.replace("_SESSION_", this.scenarioName);
this.logger.debug("-> invoking main scenario script (interpreted from {})", filename);
final Path written = Files.write(
Path.of(filename),
script.getBytes(StandardCharsets.UTF_8),
StandardOpenOption.TRUNCATE_EXISTING,
StandardOpenOption.CREATE
);
final BufferedReader reader = Files.newBufferedReader(written);
this.scriptEngine.eval(reader);
this.logger.debug("<- scenario control script completed (interpreted) from {})", filename);
} else {
this.logger.debug("-> invoking main scenario script (interpreted)");
result = this.scriptEngine.eval(script);
this.logger.debug("<- scenario control script completed (interpreted)");
}
if (null != result)
this.logger.debug("scenario result: type({}): value:{}", result.getClass().getCanonicalName(), result);
System.err.flush();
System.out.flush();
} catch (final Exception e) {
error = e;
state = State.Errored;
this.logger.error("Error in scenario, shutting down. ({})", e);
try {
scenarioController.forceStopScenario(5000, false);
} catch (final Exception eInner) {
this.logger.debug("Found inner exception while forcing stop with rethrow=false: {}", eInner);
} finally {
throw new RuntimeException(e);
}
} finally {
System.out.flush();
System.err.flush();
this.endedAtMillis = System.currentTimeMillis();
}
}
public void finish() {
this.logger.debug("finishing scenario");
this.endedAtMillis = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
if (State.Running == this.state) state = State.Finished;
if (null != scenarioShutdownHook) {
// If this method was called while the shutdown hook is defined, then it means
// that the scenario was ended before the hook was uninstalled normally.
state = State.Interrupted;
this.logger.warn("Scenario was interrupted by process exit, shutting down");
} else
this.logger.info("Scenario completed successfully, with {} logical activities.", this.scenarioController.getActivityExecutorMap().size());
this.logger.info(() -> "scenario state: " + state);
// We report the scenario state via annotation even for short runs
final Annotation annotation = Annotation.newBuilder()
.element(this)
.interval(startedAtMillis, this.endedAtMillis)
.layer(Layer.Scenario)
.addDetail("event", "stop-scenario")
.build();
Annotators.recordAnnotation(annotation);
}
public long getStartedAtMillis() {
return this.startedAtMillis;
}
public long getEndedAtMillis() {
return this.endedAtMillis;
}
/**
* This should be the only way to get a ScenarioResult for a Scenario.
* <p>
* The lifecycle of a scenario includes the lifecycles of all of the following:
* <OL>
* <LI>The scenario control script, executing within a graaljs context.</LI>
* <LI>The lifecycle of every activity which is started within the scenario.</LI>
* </OL>
* <p>
* All of these run asynchronously within the scenario, however the same thread that calls
* the scenario is the one which executes the control script. A scenario ends when all
* of the following conditions are met:
* <UL>
* <LI>The scenario control script has run to completion, or experienced an exception.</LI>
* <LI>Each activity has run to completion, experienced an exception, or all</LI>
* </UL>
*
* @return
*/
@Override
public synchronized ExecutionMetricsResult call() {
if (null == result) {
try {
this.runScenario();
} catch (final Exception e) {
error = e;
} finally {
this.logger.debug("{} scenario run", null == this.error ? "NORMAL" : "ERRORED");
}
String iolog = error != null ? error.toString() : this.scriptEnv.getTimedLog();
result = new ExecutionMetricsResult(startedAtMillis, endedAtMillis, iolog, this.error);
this.result.reportMetricsSummaryToLog();
this.doReportSummaries(this.reportSummaryTo, this.result);
}
return this.result;
}
private void doReportSummaries(final String reportSummaryTo, final ExecutionMetricsResult result) {
final List<PrintStream> fullChannels = new ArrayList<>();
final List<PrintStream> briefChannels = new ArrayList<>();
final String[] destinationSpecs = reportSummaryTo.split(", *");
for (final String spec : destinationSpecs)
if ((null != spec) && !spec.isBlank()) {
final String[] split = spec.split(":", 2);
final String summaryTo = split[0];
final long summaryWhen = (2 == split.length) ? (Long.parseLong(split[1]) * 1000L) : 0;
PrintStream out = null;
switch (summaryTo.toLowerCase()) {
case "console":
case "stdout":
out = System.out;
break;
case "stderr":
out = System.err;
break;
default:
final String outName = summaryTo
.replaceAll("_SESSION_", scenarioName)
.replaceAll("_LOGS_", this.logsPath.toString());
try {
out = new PrintStream(new FileOutputStream(outName));
break;
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
}
}
if (result.getElapsedMillis() > summaryWhen) fullChannels.add(out);
else {
this.logger.debug("Summarizing counting metrics only to {} with scenario duration of {}ms (<{})", spec, summaryWhen, summaryWhen);
briefChannels.add(out);
}
}
fullChannels.forEach(result::reportMetricsSummaryTo);
// briefChannels.forEach(result::reportCountsTo);
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if ((null == o) || (this.getClass() != o.getClass())) {
return false;
}
final NBScenario scenario = (NBScenario) o;
return Objects.equals(this.scenarioName, scenario.scenarioName);
}
@Override
public int hashCode() {
return (null != this.scenarioName) ? scenarioName.hashCode() : 0;
}
public String getScenarioName() {
return this.scenarioName;
}
public ScenarioController getScenarioController() {
return this.scenarioController;
}
public String getScriptText() {
return String.join("", this.scripts);
}
public Optional<List<String>> getIOLog() {
return Optional.ofNullable(this.scriptEnv).map(ScriptEnvBuffer::getTimeLogLines);
}
public String toString() {
return "name:'" + scenarioName + '\'';
}
public void addScenarioScriptParams(final ScriptParams scenarioScriptParams) {
this.scenarioScriptParams = scenarioScriptParams;
}
public void addScenarioScriptParams(final Map<String, String> scriptParams) {
this.addScenarioScriptParams(new ScriptParams() {{
this.putAll(scriptParams);
}});
}
public State getScenarioState() {
return this.state;
}
public String getReportSummaryTo() {
return this.reportSummaryTo;
}
}

View File

@@ -13,13 +13,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.core.lifecycle.scenario; package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap; import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.api.engine.metrics.ActivityMetrics; import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBComponentErrorHandler;
import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult; import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
@@ -40,22 +42,28 @@ import java.util.stream.Collectors;
* A ScenarioController provides a way to start Activities, * A ScenarioController provides a way to start Activities,
* modify them while running, and forceStopMotors, pause or restart them. * modify them while running, and forceStopMotors, pause or restart them.
*/ */
public class ScenarioController implements NBLabeledElement { public class ActivitiesController extends NBBaseComponent {
private static final Logger logger = LogManager.getLogger(ScenarioController.class); private static final Logger logger = LogManager.getLogger(ActivitiesController.class);
private static final Logger scenariologger = LogManager.getLogger("SCENARIO"); private static final Logger scenariologger = LogManager.getLogger("SCENARIO");
private final ActivityLoader activityLoader; private final ActivityLoader activityLoader;
private final Map<String, ActivityRuntimeInfo> activityInfoMap = new ConcurrentHashMap<>(); private final Map<String, ActivityRuntimeInfo> activityInfoMap = new ConcurrentHashMap<>();
private final NBScenario scenario;
private final ExecutorService activitiesExecutor; private final ExecutorService activitiesExecutor;
public ScenarioController(NBScenario scenario) { public ActivitiesController() {
this.scenario = scenario; super(new TestComponent("test","test"));
this.activityLoader = new ActivityLoader(scenario); this.activityLoader = new ActivityLoader();
ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this);
IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler);
this.activitiesExecutor = Executors.newVirtualThreadPerTaskExecutor();
}
public ActivitiesController(NBComponent parent) {
super(parent);
this.activityLoader = new ActivityLoader();
ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this); ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this);
IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler); IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler);
this.activitiesExecutor = Executors.newCachedThreadPool(indexedThreadFactory); this.activitiesExecutor = Executors.newCachedThreadPool(indexedThreadFactory);
@@ -74,8 +82,8 @@ public class ScenarioController implements NBLabeledElement {
private synchronized ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) { private synchronized ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) { if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef, scenario); Activity activity = this.activityLoader.loadActivity(activityDef, this);
ActivityExecutor executor = new ActivityExecutor(activity, this.scenario.getScenarioName()); ActivityExecutor executor = new ActivityExecutor(activity);
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor); Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor); ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo); this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo);
@@ -316,7 +324,7 @@ public class ScenarioController implements NBLabeledElement {
* @param waitTimeMillis grace period during which an activity may cooperatively shut down * @param waitTimeMillis grace period during which an activity may cooperatively shut down
*/ */
public synchronized void forceStopScenario(int waitTimeMillis, boolean rethrow) { public synchronized void forceStopScenario(int waitTimeMillis, boolean rethrow) {
logger.debug("force stopping scenario {}", this.scenario.getScenarioName()); logger.debug("force stopping scenario {}", description());
activityInfoMap.values().forEach(a -> a.getActivityExecutor().forceStopActivity(10000)); activityInfoMap.values().forEach(a -> a.getActivityExecutor().forceStopActivity(10000));
logger.debug("Scenario force stopped."); logger.debug("Scenario force stopped.");
} }
@@ -431,7 +439,9 @@ public class ScenarioController implements NBLabeledElement {
public void notifyException(Thread t, Throwable e) { public void notifyException(Thread t, Throwable e) {
logger.error("Uncaught exception in activity lifecycle thread:{}", e, e); logger.error("Uncaught exception in activity lifecycle thread:{}", e, e);
scenario.notifyException(t,e); if (getParent() instanceof NBComponentErrorHandler handler) {
handler.notifyException(t,e);
}
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@@ -456,8 +466,4 @@ public class ScenarioController implements NBLabeledElement {
} }
} }
@Override
public NBLabels getLabels() {
return this.scenario.getLabels();
}
} }

View File

@@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.core.lifecycle.scenario.script.bindings; package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenarioController;
import org.graalvm.polyglot.Value; import org.graalvm.polyglot.Value;
import org.graalvm.polyglot.proxy.ProxyObject; import org.graalvm.polyglot.proxy.ProxyObject;
@@ -29,11 +28,11 @@ import java.util.stream.Collectors;
*/ */
public class ActivityBindings implements Bindings, ProxyObject { public class ActivityBindings implements Bindings, ProxyObject {
private final ScenarioController scenario; private final ActivitiesController scenario;
private final Map<String, Bindings> elementMap = new HashMap<String, Bindings>(); private final Map<String, Bindings> elementMap = new HashMap<String, Bindings>();
public ActivityBindings(ScenarioController scenarioController) { public ActivityBindings(ActivitiesController activitiesController) {
this.scenario = scenarioController; this.scenario = activitiesController;
} }
@Override @Override

View File

@@ -0,0 +1,132 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import io.nosqlbench.engine.core.lifecycle.session.NBSession;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
/**
* <P>An NBSceneFixtures instance represents the runtime fixtures needed to run a specific
* scenario. It is instanced per execution and is not expected to be thread-safe nor
* run more than once. This provides all of the required runtime support and IO boundaries
* needed by a scenario.</P>
*
* <P>The properties on this context define the API used by any scenario logic,
* whether implemented in script or directly. This should allow different
* execution forms to read similarly, easing development and debugging of more advanced
* scenarios.</P>
*
* <P>When using the fixtures within a context, they should be named <em>scene</em>
* which suggests an episodic sequence of events.</P>
*
* <P>Within an execution context, scenario logic is expected to adhere to usage of
* <i>scene.in</i>, <i>scene.out</i>, and <i>scene.error</i> instead of System.out, ...</P>
*/
public class NBDefaultSceneFixtures implements NBSceneFixtures {
/*
These are parameters which are passed into the script, named scenario, etc.
*/
private ScriptParams params;
/*
* NBSession is the root component type in a NB process, and all CLI options which
* affect global settings are expected to be properties on the session.
*/
private NBComponent session;
/*
* ScenarioActivitiesController is the concurrency handling layer for activities within
* a given scenario. A scenario doesn't complete unless until all activities
* are complete or errored.
*/
private ActivitiesController controller;
/*
* Extensions provide additional scripting capabilities which are not provided by the
* scripting or other runtimes, or new ways of tapping into extant features.
*/
private Extensions extensions;
private Writer out;
private Writer err;
private Reader in;
public NBDefaultSceneFixtures(ScriptParams params, NBComponent parent, ActivitiesController controller, Extensions extensions, Writer out, Writer err, Reader in) {
this.params = params;
this.session = parent;
this.controller = controller;
this.extensions = extensions;
this.out = out;
this.err = err;
this.in = in;
}
public static NBSceneFixtures ofDefault() {
return new NBDefaultSceneFixtures(
new ScriptParams(),
new NBSession(
new TestComponent("test", "test"), "test", "console:10s", false
),
new ActivitiesController(),
Extensions.ofNone(),
new OutputStreamWriter(System.out),
new OutputStreamWriter(System.err),
new InputStreamReader(System.in)
);
}
@Override
public ScriptParams params() {
return params;
}
@Override
public NBComponent session() {
return session;
}
@Override
public ActivitiesController controller() {
return controller;
}
@Override
public Extensions extensions() {
return extensions;
}
@Override
public Writer out() {
return out;
}
@Override
public Writer err() {
return err;
}
@Override
public Reader in() {
return in;
}
}

View File

@@ -0,0 +1,94 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.scripting.DiagReader;
import io.nosqlbench.engine.api.scripting.DiagWriter;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
public class NBSceneBuffer implements NBSceneFixtures {
private final NBSceneFixtures fixtures;
private DiagWriter stdoutBuffer;
private DiagWriter stderrBuffer;
private DiagReader stdinBuffer;
private DiagWriter stdoutWriter;
public NBSceneBuffer(NBSceneFixtures fixtures) {
this.fixtures = fixtures;
stdoutBuffer = new DiagWriter(fixtures.out(), " stdout ");
stderrBuffer = new DiagWriter(fixtures.err(), " stderr ");
stdinBuffer = new DiagReader(fixtures.in(), " stdin ");
}
@Override
public ScriptParams params() {
return fixtures.params();
}
@Override
public NBComponent session() {
return fixtures.session();
}
@Override
public ActivitiesController controller() {
return fixtures.controller();
}
@Override
public Extensions extensions() {
return fixtures.extensions();
}
@Override
public Writer out() {
return stdoutWriter;
}
@Override
public Writer err() {
return null;
}
@Override
public Reader in() {
return null;
}
public List<String> getTimedLogLines() {
List<String> log = new ArrayList<String>();
Optional.ofNullable(this.stdinBuffer).map(DiagReader::getTimedLog).ifPresent(log::addAll);
Optional.ofNullable(this.stderrBuffer).map(DiagWriter::getTimedLog).ifPresent(log::addAll);
Optional.ofNullable(this.stdoutBuffer).map(DiagWriter::getTimedLog).ifPresent(log::addAll);
log = log.stream().map(l -> l.endsWith("\n") ? l : l+"\n").collect(Collectors.toList());
return log;
}
public String getIoLog() {
return String.join("",getTimedLogLines());
}
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import java.io.Reader;
import java.io.Writer;
public interface NBSceneFixtures {
ScriptParams params();
NBComponent session();
ActivitiesController controller();
Extensions extensions();
Writer out();
Writer err();
Reader in();
}

View File

@@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.core.lifecycle.scenario.script; package io.nosqlbench.engine.core.lifecycle.scenario.context;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.GsonBuilder; import com.google.gson.GsonBuilder;
@@ -39,6 +39,11 @@ public class ScriptParams extends HashMap<String, String> implements ProxyObject
private static final Logger logger = LogManager.getLogger(ScriptParams.class); private static final Logger logger = LogManager.getLogger(ScriptParams.class);
private static final Gson gson = new GsonBuilder().setPrettyPrinting().create(); private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();
public static ScriptParams of(Map<String,String> params) {
return new ScriptParams() {{
putAll(params);
}};
}
public ScriptParams withOverrides(Object overrides) { public ScriptParams withOverrides(Object overrides) {
Map<String, String> map; Map<String, String> map;
if (overrides instanceof Map) { if (overrides instanceof Map) {

View File

@@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@@ -14,19 +14,12 @@
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.core.script; package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.engine.core.lifecycle.scenario.script.MetricsMapper; import java.util.concurrent.ConcurrentHashMap;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat; public class Extensions extends ConcurrentHashMap<String,String> {
public static Extensions ofNone() {
public class MetricsMapperIntegrationTest { return new Extensions();
@Test
public void testDiagMetrics() {
String diagMetrics = MetricsMapper.metricsDetail("driver=diag;alias=foo;cycles=1;op=noop");
assertThat(diagMetrics).contains("metrics.foo.result.fiveMinuteRate");
} }
} }

View File

@@ -0,0 +1,222 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.api.annotations.Annotation;
import io.nosqlbench.api.annotations.Layer;
import io.nosqlbench.api.metadata.ScenarioMetadata;
import io.nosqlbench.api.metadata.SystemId;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBComponentErrorHandler;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.activity.ActivitiesProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.function.Function;
/**
* This is the core logic of every NB scenario.
* <OL>
* <LI>NBScenario creates a generic execution context.</LI>
* <LI>This context is functionally applied to (executed by) a specific implementation.</LI>
* <LI>Activities associated with the scenario are completed or errored.</LI>
* <LI>A result is composited from the data in the component tree.</LI>
* </OL>
*/
public abstract class NBScenario extends NBBaseComponent
implements Function<NBSceneFixtures, ScenarioResult>, NBComponentErrorHandler {
private final String scenarioName;
private final Map<String, String> params;
protected Logger logger = LogManager.getLogger("SCENARIO");
private long startedAtMillis, endedAtMillis;
private ScenarioMetadata scenarioMetadata;
private ActivitiesController activitiesController;
private Exception error;
private String progressInterval;
private ActivitiesProgressIndicator activitiesProgressIndicator;
public NBScenario(
NBComponent parentComponent,
String scenarioName,
Map<String, String> params,
String progressInterval
) {
super(parentComponent);
this.scenarioName = scenarioName;
this.params = params;
this.progressInterval = progressInterval;
this.activitiesController = new ActivitiesController();
}
public String getScenarioName() {
return scenarioName;
}
public void forceStopScenario(int i, boolean b) {
activitiesController.forceStopScenario(i,b);
}
public Map<String, String> getParams() {
return this.params;
}
public ActivitiesController getActivitiesController() {
return this.activitiesController;
}
public enum State {
Scheduled,
Running,
Errored,
Interrupted,
Finished
}
private ScenarioShutdownHook scenarioShutdownHook;
private State state;
/**
* This should be the only way to get a ScenarioResult for a Scenario.
* <p>
* The lifecycle of a scenario includes the lifecycles of all of the following:
* <OL>
* <LI>The scenario control script, executing within a graaljs context.</LI>
* <LI>The lifecycle of every activity which is started within the scenario.</LI>
* </OL>
* <p>
* All of these run asynchronously within the scenario, however the same thread that calls
* the scenario is the one which executes the control script. A scenario ends when all
* of the following conditions are met:
* <UL>
* <LI>The scenario control script has run to completion, or experienced an exception.</LI>
* <LI>Each activity has run to completion, experienced an exception, or all</LI>
* </UL>
*
* @return
*/
@Override
public final ScenarioResult apply(NBSceneFixtures sctx) {
this.scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(this.scenarioShutdownHook);
this.state = NBScriptedScenario.State.Running;
this.startedAtMillis = System.currentTimeMillis();
Annotators.recordAnnotation(
Annotation.newBuilder()
.element(this)
.now()
.layer(Layer.Scenario)
.build()
);
if (!"disabled".equals(progressInterval))
this.activitiesProgressIndicator = new ActivitiesProgressIndicator(activitiesController, this.progressInterval);
ScenarioResult result = null;
try {
runScenario(sctx);
final long awaitCompletionTime = 86400 * 365 * 1000L;
this.logger.debug("Awaiting completion of scenario and activities for {} millis.", awaitCompletionTime);
this.activitiesController.awaitCompletion(awaitCompletionTime);
} catch (Exception e) {
try {
activitiesController.forceStopScenario(5000, false);
} catch (final Exception eInner) {
this.logger.debug("Found inner exception while forcing stop with rethrow=false: {}", eInner);
throw new RuntimeException(e);
}
this.error = e;
} finally {
this.activitiesController.shutdown();
this.endedAtMillis = System.currentTimeMillis();
result = new ScenarioResult(
startedAtMillis,
endedAtMillis,
(error != null) ? error.toString() : "",
error
);
}
Runtime.getRuntime().removeShutdownHook(this.scenarioShutdownHook);
final var retiringScenarioShutdownHook = this.scenarioShutdownHook;
this.scenarioShutdownHook = null;
retiringScenarioShutdownHook.run();
this.logger.debug("removing scenario shutdown hook");
return result;
}
public void notifyException(final Thread t, final Throwable e) {
error = new RuntimeException("in thread " + t.getName() + ", " + e, e);
}
protected abstract void runScenario(NBSceneFixtures sctx);
public void finish() {
this.logger.debug("finishing scenario");
this.endedAtMillis = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
if (State.Running == this.state) state = State.Finished;
if (null != scenarioShutdownHook) {
// If this method was called while the shutdown hook is defined, then it means
// that the scenario was ended before the hook was uninstalled normally.
state = State.Interrupted;
this.logger.warn("Scenario was interrupted by process exit, shutting down");
} else
this.logger.info(
"Scenario completed successfully, with {} logical activities.",
activitiesController.getActivityExecutorMap().size()
);
this.logger.info(() -> "scenario state: " + state);
// We report the scenario state via annotation even for short runs
final Annotation annotation = Annotation.newBuilder()
.element(this)
.interval(startedAtMillis, this.endedAtMillis)
.layer(Layer.Scenario)
.addDetail("event", "stop-scenario")
.build();
Annotators.recordAnnotation(annotation);
}
private synchronized ScenarioMetadata getScenarioMetadata() {
if (null == this.scenarioMetadata) scenarioMetadata = new ScenarioMetadata(
startedAtMillis,
scenarioName,
SystemId.getNodeId(),
SystemId.getNodeFingerprint()
);
return this.scenarioMetadata;
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneBuffer;
public class ScenarioResult {
private final long startedAt;
private final long endedAt;
private final String iolog;
private final Exception error;
public ScenarioResult(long startedAt, long endedAt, String iolog, Exception error) {
this.startedAt = startedAt;
this.endedAt = endedAt;
this.iolog = iolog;
this.error = error;
}
public ScenarioResult(ScenarioResult baseResult, NBSceneBuffer bufferedContext) {
this.startedAt = baseResult.startedAt;
this.endedAt = baseResult.endedAt;
String log = bufferedContext.getIoLog();
this.error = baseResult.error;
if (this.error!=null) {
log+=error.getMessage();
}
this.iolog = log;
}
public Exception getException() {
return error;
}
public String getIOLog() {
return iolog;
}
}

View File

@@ -14,18 +14,17 @@
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.core.lifecycle.scenario; package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
public class ScenarioShutdownHook extends Thread { public class ScenarioShutdownHook extends Thread {
private final NBScenario scenario; private final NBScenario scenario;
private final Logger logger;
public ScenarioShutdownHook(NBScenario scenario) { public ScenarioShutdownHook(NBScenario scenario) {
this.scenario = scenario; this.scenario = scenario;
logger = scenario.getLogger();
} }
@Override @Override

View File

@@ -0,0 +1,76 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class ScenarioSummary {
private final static Logger logger = LogManager.getLogger(ScenarioSummary.class);
private static void doReportSummaries(final String reportSummaryTo, final ExecutionMetricsResult result, Map<String,String> subs) {
final List<PrintStream> fullChannels = new ArrayList<>();
final List<PrintStream> briefChannels = new ArrayList<>();
final String[] destinationSpecs = reportSummaryTo.split(", *");
for (final String spec : destinationSpecs)
if ((null != spec) && !spec.isBlank()) {
final String[] split = spec.split(":", 2);
final String summaryTo = split[0];
final long summaryWhen = (2 == split.length) ? (Long.parseLong(split[1]) * 1000L) : 0;
PrintStream out = null;
switch (summaryTo.toLowerCase()) {
case "console":
case "stdout":
out = System.out;
break;
case "stderr":
out = System.err;
break;
default:
String outName = summaryTo;
for (String s : subs.keySet()) {
outName = outName.replaceAll("_"+s.toUpperCase()+"_",subs.get(s));
}
try {
out = new PrintStream(new FileOutputStream(outName));
break;
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
}
}
if (result.getElapsedMillis() > summaryWhen) fullChannels.add(out);
else {
logger.debug("Summarizing counting metrics only to {} with scenario duration of {}ms (<{})", spec, summaryWhen, summaryWhen);
briefChannels.add(out);
}
}
fullChannels.forEach(result::reportMetricsSummaryTo);
// briefChannels.forEach(result::reportCountsTo);
}
}

View File

@@ -14,20 +14,29 @@
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.core.lifecycle.scenario; package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.api.errors.BasicError; import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult; import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory; import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBDefaultSceneFixtures;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneBuffer;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ScriptParams;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScenarioExceptionHandler; import io.nosqlbench.engine.core.lifecycle.scenario.script.ScenarioExceptionHandler;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class ScenariosExecutor { public class ScenariosExecutor extends NBBaseComponent {
private final Logger logger = LogManager.getLogger("SCENARIOS"); private final Logger logger = LogManager.getLogger("SCENARIOS");
private final LinkedHashMap<String, SubmittedScenario> submitted = new LinkedHashMap<>(); private final LinkedHashMap<String, SubmittedScenario> submitted = new LinkedHashMap<>();
@@ -36,11 +45,8 @@ public class ScenariosExecutor {
private final String name; private final String name;
private RuntimeException stoppingException; private RuntimeException stoppingException;
public ScenariosExecutor(String name) { public ScenariosExecutor(NBComponent parent, String name, int threads) {
this(name, 1); super(parent, NBLabels.forKV("executor","name"));
}
public ScenariosExecutor(String name, int threads) {
executor = new ThreadPoolExecutor(1, threads, executor = new ThreadPoolExecutor(1, threads,
0L, TimeUnit.MILLISECONDS, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>(),
@@ -52,9 +58,24 @@ public class ScenariosExecutor {
if (submitted.get(scenario.getScenarioName()) != null) { if (submitted.get(scenario.getScenarioName()) != null) {
throw new BasicError("Scenario " + scenario.getScenarioName() + " is already defined. Remove it first to reuse the name."); throw new BasicError("Scenario " + scenario.getScenarioName() + " is already defined. Remove it first to reuse the name.");
} }
Future<ExecutionMetricsResult> future = executor.submit(scenario);
NBSceneFixtures basecontext = new NBDefaultSceneFixtures(
ScriptParams.of(scenario.getParams()),
this.getParent(),
scenario.getActivitiesController(),
loadExtensions(),
new OutputStreamWriter(System.out),
new OutputStreamWriter(System.err),
new InputStreamReader(System.in)
);
NBSceneBuffer bufferedContext = new NBSceneBuffer(basecontext);
Future<ScenarioResult> future = executor.submit(
() -> new ScenarioResult(scenario.apply(bufferedContext),bufferedContext) // combine basic execution data with trace
);
SubmittedScenario s = new SubmittedScenario(scenario, future); SubmittedScenario s = new SubmittedScenario(scenario, future);
submitted.put(s.getName(), s); submitted.put(s.getName(), s);
// TODO at this point, bufferedContext holds all the trace, make it visible in results
} }
@Override @Override
@@ -110,7 +131,7 @@ public class ScenariosExecutor {
throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown()); + "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
} }
Map<NBScenario, ExecutionMetricsResult> scenarioResultMap = new LinkedHashMap<>(); Map<NBScenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
getAsyncResultStatus() getAsyncResultStatus()
.entrySet() .entrySet()
.forEach( .forEach(
@@ -142,21 +163,21 @@ public class ScenariosExecutor {
* *
* @return map of async results, with incomplete results as Optional.empty() * @return map of async results, with incomplete results as Optional.empty()
*/ */
public Map<NBScenario, Optional<ExecutionMetricsResult>> getAsyncResultStatus() { public Map<NBScenario, Optional<ScenarioResult>> getAsyncResultStatus() {
Map<NBScenario, Optional<ExecutionMetricsResult>> optResults = new LinkedHashMap<>(); Map<NBScenario, Optional<ScenarioResult>> optResults = new LinkedHashMap<>();
for (SubmittedScenario submittedScenario : submitted.values()) { for (SubmittedScenario submittedScenario : submitted.values()) {
Future<ExecutionMetricsResult> resultFuture = submittedScenario.getResultFuture(); Future<ScenarioResult> resultFuture = submittedScenario.getResultFuture();
Optional<ExecutionMetricsResult> oResult = Optional.empty(); Optional<ScenarioResult> oResult = Optional.empty();
if (resultFuture.isDone()) { if (resultFuture.isDone()) {
try { try {
oResult = Optional.of(resultFuture.get()); oResult = Optional.of(resultFuture.get());
} catch (Exception e) { } catch (Exception e) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
logger.debug("creating exceptional scenario result from getAsyncResultStatus"); logger.debug("creating exceptional scenario result from getAsyncResultStatus");
oResult = Optional.of(new ExecutionMetricsResult(now, now, "errored output", e)); oResult = Optional.of(new ScenarioResult(now, now, "errored output", e));
} }
} }
@@ -183,7 +204,7 @@ public class ScenariosExecutor {
* @param scenarioName the scenario name of interest * @param scenarioName the scenario name of interest
* @return an optional result * @return an optional result
*/ */
public Optional<Future<ExecutionMetricsResult>> getPendingResult(String scenarioName) { public Optional<Future<ScenarioResult>> getPendingResult(String scenarioName) {
return Optional.ofNullable(submitted.get(scenarioName)).map(s -> s.resultFuture); return Optional.ofNullable(submitted.get(scenarioName)).map(s -> s.resultFuture);
} }
@@ -195,7 +216,7 @@ public class ScenariosExecutor {
logger.debug("#stopScenario(name=" + scenarioName + ", rethrow="+ rethrow+")"); logger.debug("#stopScenario(name=" + scenarioName + ", rethrow="+ rethrow+")");
Optional<NBScenario> pendingScenario = getPendingScenario(scenarioName); Optional<NBScenario> pendingScenario = getPendingScenario(scenarioName);
if (pendingScenario.isPresent()) { if (pendingScenario.isPresent()) {
pendingScenario.get().getScenarioController().forceStopScenario(10000, true); pendingScenario.get().forceStopScenario(10000, true);
} else { } else {
throw new RuntimeException("Unable to cancel scenario: " + scenarioName + ": not found"); throw new RuntimeException("Unable to cancel scenario: " + scenarioName + ": not found");
} }
@@ -225,9 +246,9 @@ public class ScenariosExecutor {
private static class SubmittedScenario { private static class SubmittedScenario {
private final NBScenario scenario; private final NBScenario scenario;
private final Future<ExecutionMetricsResult> resultFuture; private final Future<ScenarioResult> resultFuture;
SubmittedScenario(NBScenario scenario, Future<ExecutionMetricsResult> resultFuture) { SubmittedScenario(NBScenario scenario, Future<ScenarioResult> resultFuture) {
this.scenario = scenario; this.scenario = scenario;
this.resultFuture = resultFuture; this.resultFuture = resultFuture;
} }
@@ -236,7 +257,7 @@ public class ScenariosExecutor {
return scenario; return scenario;
} }
Future<ExecutionMetricsResult> getResultFuture() { Future<ScenarioResult> getResultFuture() {
return resultFuture; return resultFuture;
} }
@@ -250,5 +271,29 @@ public class ScenariosExecutor {
this.stoppingException = new RuntimeException("Error in scenario thread " + t.getName(), e); this.stoppingException = new RuntimeException("Error in scenario thread " + t.getName(), e);
} }
private Extensions loadExtensions() {
Extensions extensions = new Extensions();// TODO: Load component oriented extensions into here
// for (final ScriptingExtensionPluginInfo<?> extensionDescriptor : SandboxExtensionFinder.findAll()) {
// if (!extensionDescriptor.isAutoLoading()) {
// this.logger.info(() -> "Not loading " + extensionDescriptor + ", autoloading is false");
// continue;
// }
//
// final Logger extensionLogger =
// LogManager.getLogger("extensions." + extensionDescriptor.getBaseVariableName());
// final Object extensionObject = extensionDescriptor.getExtensionObject(
// extensionLogger,
// metricRegistry,
// this.scriptEnv
// );
// ScenarioMetadataAware.apply(extensionObject, this.getScenarioMetadata());
// this.logger.trace(() -> "Adding extension object: name=" + extensionDescriptor.getBaseVariableName() +
// " class=" + extensionObject.getClass().getSimpleName());
// this.scriptEngine.put(extensionDescriptor.getBaseVariableName(), extensionObject);
// }
return extensions;
}
} }

View File

@@ -14,10 +14,8 @@
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.core.lifecycle.scenario; package io.nosqlbench.engine.core.lifecycle.scenario.execution;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -30,14 +28,14 @@ public class ScenariosResults {
private static final Logger logger = LogManager.getLogger(ScenariosResults.class); private static final Logger logger = LogManager.getLogger(ScenariosResults.class);
private final String scenariosExecutorName; private final String scenariosExecutorName;
private final Map<NBScenario, ExecutionMetricsResult> scenarioResultMap = new LinkedHashMap<>(); private final Map<NBScenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
public ScenariosResults(ScenariosExecutor scenariosExecutor) { public ScenariosResults(ScenariosExecutor scenariosExecutor) {
this.scenariosExecutorName = scenariosExecutor.getName(); this.scenariosExecutorName = scenariosExecutor.getName();
} }
public ScenariosResults(ScenariosExecutor scenariosExecutor, Map<NBScenario, ExecutionMetricsResult> map) { public ScenariosResults(ScenariosExecutor scenariosExecutor, Map<NBScenario, ScenarioResult> map) {
this.scenariosExecutorName = scenariosExecutor.getName(); this.scenariosExecutorName = scenariosExecutor.getName();
scenarioResultMap.putAll(map); scenarioResultMap.putAll(map);
} }
@@ -49,7 +47,7 @@ public class ScenariosResults {
return sb; return sb;
} }
public ExecutionMetricsResult getOne() { public ScenarioResult getOne() {
if (this.scenarioResultMap.size() != 1) { if (this.scenarioResultMap.size() != 1) {
throw new RuntimeException("getOne found " + this.scenarioResultMap.size() + " results instead of 1."); throw new RuntimeException("getOne found " + this.scenarioResultMap.size() + " results instead of 1.");
} }
@@ -58,17 +56,17 @@ public class ScenariosResults {
} }
public void reportToLog() { public void reportToLog() {
for (Map.Entry<NBScenario, ExecutionMetricsResult> entry : this.scenarioResultMap.entrySet()) { for (Map.Entry<NBScenario, ScenarioResult> entry : this.scenarioResultMap.entrySet()) {
NBScenario scenario = entry.getKey(); NBScenario scenario = entry.getKey();
ExecutionMetricsResult oresult = entry.getValue(); ScenarioResult oresult = entry.getValue();
logger.info(() -> "results for scenario: " + scenario); logger.info(() -> "results for scenario: " + scenario);
if (oresult != null) { // if (oresult != null) {
oresult.reportElapsedMillisToLog(); // oresult.reportElapsedMillisToLog();
} else { // } else {
logger.error(scenario.getScenarioName() + ": incomplete (missing result)"); // logger.error(scenario.getScenarioName() + ": incomplete (missing result)");
} // }
} }
} }
@@ -80,7 +78,7 @@ public class ScenariosResults {
public Optional<Exception> getAnyError() { public Optional<Exception> getAnyError() {
return this.scenarioResultMap.values().stream() return this.scenarioResultMap.values().stream()
.map(ExecutionResult::getException).filter(Objects::nonNull).findFirst(); .map(ScenarioResult::getException).filter(Objects::nonNull).findFirst();
} }
public int getSize() { public int getSize() {

View File

@@ -1,123 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.script;
import com.codahale.metrics.*;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotMetricRegistryBindings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.Method;
import java.util.*;
import java.util.Timer;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* Find the metrics associated with an activity type by instantiating the activity in idle mode.
*/
public enum MetricsMapper {
;
private static final Logger logger = LogManager.getLogger(MetricsMapper.class);
private static final Set<Class<?>> metricsElements = new HashSet<>() {{
add(Snapshot.class);
add(Gauge.class);
add(Histogram.class);
add(Timer.class);
add(Counter.class);
add(Meter.class);
}};
private static final Predicate<Method> isSimpleGetter = method ->
method.getName().startsWith("get")
&& (0 == method.getParameterCount())
&& !"getClass".equals(method.getName());
private static final Function<Method, String> getPropertyName = method ->
{
String mName = method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4);
return mName;
};
public static String metricsDetail(final String activitySpec) {
//StringBuilder metricsDetail = new StringBuilder();
final List<String> metricsDetails = new ArrayList<>();
final ActivityDef activityDef = ActivityDef.parseActivityDef(activitySpec);
MetricsMapper.logger.info(() -> "introspecting metric names for " + activitySpec);
final Optional<ActivityType> activityType = new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
if (!activityType.isPresent())
throw new RuntimeException("Activity type '" + activityDef.getActivityType() + "' does not exist in this runtime.");
final Activity activity = activityType.get().getAssembledActivity(activityDef, new HashMap<>(), TestComponent.INSTANCE);
final PolyglotMetricRegistryBindings nashornMetricRegistryBindings = new PolyglotMetricRegistryBindings(ActivityMetrics.getMetricRegistry());
activity.initActivity();
activity.getInputDispenserDelegate().getInput(0);
activity.getActionDispenserDelegate().getAction(0);
activity.getMotorDispenserDelegate().getMotor(activityDef, 0);
final Map<String, Metric> metricMap = nashornMetricRegistryBindings.getMetrics();
// Map<String, Map<String,String>> details = new LinkedHashMap<>();
for (final Entry<String, Metric> metricEntry : metricMap.entrySet()) {
final String metricName = metricEntry.getKey();
final Metric metricValue = metricEntry.getValue();
final Map<String, String> getterSummary = MetricsMapper.getGetterSummary(metricValue);
// details.put(metricName,getterSummary);
final List<String> methodDetails = getterSummary.entrySet().stream().map(
es -> metricName + es.getKey() + " " + es.getValue()
).collect(Collectors.toList());
methodDetails.sort(String::compareTo);
final String getterText = methodDetails.stream().collect(Collectors.joining("\n"));
metricsDetails.add(metricName + '\n' + getterText);
}
// return details;
return metricsDetails.stream().collect(Collectors.joining("\n"));
}
private static Map<String, String> getGetterSummary(final Object o) {
return MetricsMapper.getGetterSummary(new HashMap<>(), "", o.getClass());
}
private static Map<String, String> getGetterSummary(final Map<String, String> accumulator, final String name, final Class<?> objectType) {
Arrays.stream(objectType.getMethods())
.filter(MetricsMapper.isSimpleGetter)
.forEach(m -> {
if (m.getReturnType().isPrimitive())
accumulator.put(name + '.' + MetricsMapper.getPropertyName.apply(m), m.getReturnType().getSimpleName());
else {
final String fullName = name + '.' + MetricsMapper.getPropertyName.apply(m);
MetricsMapper.getGetterSummary(accumulator, fullName, m.getReturnType());
}
});
return accumulator;
}
}

View File

@@ -0,0 +1,261 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.script;
import com.codahale.metrics.MetricRegistry;
import com.oracle.truffle.js.scriptengine.GraalJSScriptEngine;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult;
import io.nosqlbench.engine.core.lifecycle.activity.ActivitiesProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ScriptParams;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBScenario;
import org.graalvm.polyglot.Context;
import org.graalvm.polyglot.Engine.Builder;
import org.graalvm.polyglot.EnvironmentAccess;
import org.graalvm.polyglot.HostAccess;
import org.graalvm.polyglot.PolyglotAccess;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptEngine;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
public class NBScriptedScenario extends NBScenario {
private final Invocation invocation;
private Exception error;
private ExecutionMetricsResult result;
private final NBLabeledElement parentComponent;
public Optional<ExecutionMetricsResult> getResultIfComplete() {
return Optional.ofNullable(result);
}
public enum Invocation {
RENDER_SCRIPT,
EXECUTE_SCRIPT
}
private final List<String> scripts = new ArrayList<>();
private ScriptEngine scriptEngine;
private ActivitiesProgressIndicator activitiesProgressIndicator;
private String progressInterval = "console:1m";
private ScenarioScriptShell scriptEnv;
private final String scenarioName;
private ScriptParams scenarioScriptParams;
private final Engine engine = Engine.Graalvm;
private long startedAtMillis = -1L;
private long endedAtMillis = -1L;
public enum Engine {
Graalvm
}
public NBScriptedScenario(
final String scenarioName,
final String progressInterval,
Map<String, String> params,
NBComponent parentComponent,
Invocation invocation
) {
super(parentComponent, scenarioName, params, progressInterval);
this.scenarioName = scenarioName;
this.progressInterval = progressInterval;
this.parentComponent = parentComponent;
this.invocation = invocation;
}
public static NBScriptedScenario ofScripted(String name, Map<String, String> params, NBComponent parent, Invocation invocation) {
return new NBScriptedScenario(name, "console:10s",params,parent,invocation);
};
public NBScriptedScenario addScriptText(final String scriptText) {
this.scripts.add(scriptText);
return this;
}
public NBScriptedScenario addScriptFiles(final String... args) {
for (final String scriptFile : args) {
final Path scriptPath = Paths.get(scriptFile);
byte[] bytes = new byte[0];
try {
bytes = Files.readAllBytes(scriptPath);
} catch (final IOException e) {
e.printStackTrace();
}
final ByteBuffer bb = ByteBuffer.wrap(bytes);
final Charset utf8 = StandardCharsets.UTF_8;
final String scriptData = utf8.decode(bb).toString();
this.addScriptText(scriptData);
}
return this;
}
private void initializeScriptingEngine() {
this.logger.debug("Using engine {}", this.engine.toString());
final MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
final Context.Builder contextSettings = Context.newBuilder("js")
.allowHostAccess(HostAccess.ALL)
.allowNativeAccess(true)
.allowCreateThread(true)
.allowIO(true)
.allowHostClassLookup(s -> true)
.allowHostClassLoading(true)
.allowCreateProcess(true)
.allowAllAccess(true)
.allowEnvironmentAccess(EnvironmentAccess.INHERIT)
.allowPolyglotAccess(PolyglotAccess.ALL)
.option("js.ecmascript-version", "2022")
.option("js.nashorn-compat", "true");
final Builder engineBuilder = org.graalvm.polyglot.Engine.newBuilder();
engineBuilder.option("engine.WarnInterpreterOnly", "false");
final org.graalvm.polyglot.Engine polyglotEngine = engineBuilder.build();
// TODO: add in, out, err for this scenario
scriptEngine = GraalJSScriptEngine.create(polyglotEngine, contextSettings);
this.scriptEnv = new ScenarioScriptShell(scenarioName);
this.scriptEngine.setContext(this.scriptEnv);
// NBScenarioPojoContext sctx = new NBScenarioPojoContext(
// this.scenarioScriptParams,
// (NBSession) this.getParent(),
// scenarioController,
// new ActivityBindings(scenarioController)
// );
//
// this.scriptEngine.put("params", sctx.params());
// this.scriptEngine.put("session", sctx.session());
// this.scriptEngine.put("activities", sctx.activities());
// this.scriptEngine.put("scenario", new PolyglotScenarioController(sctx.controller()));
//
}
protected synchronized void runScenario(NBSceneFixtures context) {
if (null == result) {
try {
this.logger.debug("Initializing scripting engine for {}.", scenarioName);
this.initializeScriptingEngine();
this.logger.debug("Running control script for {}.", scenarioName);
this.executeScenarioScripts();
} catch (final Exception e) {
error = e;
} finally {
this.logger.debug("{} scenario run", null == this.error ? "NORMAL" : "ERRORED");
}
String iolog = error != null ? error.toString() : this.scriptEnv.getTimedLog();
result = new ExecutionMetricsResult(startedAtMillis, endedAtMillis, iolog, this.error);
this.result.reportMetricsSummaryToLog();
}
}
private void executeScenarioScripts() {
for (final String script : this.scripts)
try {
Object result = null;
if ((scriptEngine instanceof Compilable compilableEngine)) {
this.logger.debug("Using direct script compilation");
final CompiledScript compiled = compilableEngine.compile(script);
this.logger.debug("-> invoking main scenario script (compiled)");
result = compiled.eval();
this.logger.debug("<- scenario script completed (compiled)");
}
// else if ((null != scriptfile) && !this.scriptfile.isEmpty()) {
// final String filename = this.scriptfile.replace("_SESSION_", this.scenarioName);
// this.logger.debug("-> invoking main scenario script (interpreted from {})", filename);
// final Path written = Files.writeString(
// Path.of(filename),
// script,
// StandardOpenOption.TRUNCATE_EXISTING,
// StandardOpenOption.CREATE
// );
// final BufferedReader reader = Files.newBufferedReader(written);
// this.scriptEngine.eval(reader);
// this.logger.debug("<- scenario control script completed (interpreted) from {})", filename);
// }
else {
this.logger.debug("-> invoking main scenario script (interpreted)");
result = this.scriptEngine.eval(script);
this.logger.debug("<- scenario control script completed (interpreted)");
}
if (null != result)
this.logger.debug("scenario result: type({}): value:{}", result.getClass().getCanonicalName(), result);
} catch (final Exception e) {
error = e;
this.logger.error("Error in scenario, shutting down. ({})", e);
} finally {
this.endedAtMillis = System.currentTimeMillis();
System.out.flush();
System.err.flush();
}
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if ((null == o) || (this.getClass() != o.getClass())) {
return false;
}
final NBScriptedScenario scenario = (NBScriptedScenario) o;
return Objects.equals(this.scenarioName, scenario.scenarioName);
}
@Override
public int hashCode() {
return (null != this.scenarioName) ? scenarioName.hashCode() : 0;
}
public String toString() {
return "name:'" + scenarioName + '\'';
}
public void addScenarioScriptParams(final ScriptParams scenarioScriptParams) {
this.scenarioScriptParams = scenarioScriptParams;
}
public void addScenarioScriptParams(final Map<String, String> scriptParams) {
this.addScenarioScriptParams(new ScriptParams() {{
this.putAll(scriptParams);
}});
}
}

View File

@@ -16,7 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.scenario.script; package io.nosqlbench.engine.core.lifecycle.scenario.script;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenariosExecutor; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosExecutor;
public class ScenarioExceptionHandler implements Thread.UncaughtExceptionHandler { public class ScenarioExceptionHandler implements Thread.UncaughtExceptionHandler {
private final ScenariosExecutor scenariosExecutor; private final ScenariosExecutor scenariosExecutor;

View File

@@ -18,16 +18,13 @@ package io.nosqlbench.engine.core.lifecycle.scenario.script;
import io.nosqlbench.api.config.LabeledScenarioContext; import io.nosqlbench.api.config.LabeledScenarioContext;
import io.nosqlbench.api.labels.NBLabels; import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer; import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenarioController;
public class ScenarioContext extends ScriptEnvBuffer implements LabeledScenarioContext { public class ScenarioScriptShell extends ScriptEnvBuffer implements LabeledScenarioContext {
private final ScenarioController sc;
private final String contextName; private final String contextName;
public ScenarioContext(String contextName, ScenarioController sc) { public ScenarioScriptShell(String contextName) {
this.contextName = contextName; this.contextName = contextName;
this.sc = sc;
} }
public String getContextName() { public String getContextName() {
@@ -36,14 +33,12 @@ public class ScenarioContext extends ScriptEnvBuffer implements LabeledScenarioC
@Override @Override
public Object getAttribute(String name) { public Object getAttribute(String name) {
Object o = super.getAttribute(name); return super.getAttribute(name);
return o;
} }
@Override @Override
public Object getAttribute(String name, int scope) { public Object getAttribute(String name, int scope) {
Object o = super.getAttribute(name, scope); return super.getAttribute(name, scope);
return o;
} }
@Override @Override

View File

@@ -17,7 +17,7 @@
package io.nosqlbench.engine.core.lifecycle.scenario.script.bindings; package io.nosqlbench.engine.core.lifecycle.scenario.script.bindings;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenarioController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.graalvm.polyglot.Value; import org.graalvm.polyglot.Value;
@@ -29,9 +29,9 @@ public class PolyglotScenarioController {
private static final Logger logger = LogManager.getLogger("SCENARIO/POLYGLOT"); private static final Logger logger = LogManager.getLogger("SCENARIO/POLYGLOT");
private final ScenarioController controller; private final ActivitiesController controller;
public PolyglotScenarioController(ScenarioController inner) { public PolyglotScenarioController(ActivitiesController inner) {
this.controller = inner; this.controller = inner;
} }

View File

@@ -16,25 +16,22 @@
package io.nosqlbench.engine.core.lifecycle.session; package io.nosqlbench.engine.core.lifecycle.session;
import io.nosqlbench.components.NBComponent; import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBBaseComponent; import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBComponentSubScope; import io.nosqlbench.components.NBComponentSubScope;
import io.nosqlbench.engine.cli.BasicScriptBuffer; import io.nosqlbench.engine.cli.BasicScriptBuffer;
import io.nosqlbench.engine.cli.Cmd; import io.nosqlbench.engine.cli.Cmd;
import io.nosqlbench.engine.cli.ScriptBuffer; import io.nosqlbench.engine.cli.ScriptBuffer;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult; import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler; import io.nosqlbench.engine.core.lifecycle.process.NBCLIErrorHandler;
import io.nosqlbench.engine.core.lifecycle.process.ShutdownManager; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScriptParams;
import io.nosqlbench.engine.core.lifecycle.scenario.NBScenario; import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBScenario;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenariosExecutor; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosExecutor;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenariosResults; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosResults;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScriptParams; import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.nio.file.Path;
import java.util.List; import java.util.List;
import java.util.function.Function; import java.util.function.Function;
@@ -48,11 +45,7 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
private final static Logger logger = LogManager.getLogger(NBSession.class); private final static Logger logger = LogManager.getLogger(NBSession.class);
private final String sessionName; private final String sessionName;
private final String progressSpec; private final String progressSpec;
private final String reportSummaryTo; private final boolean wantsDryRun;
private final Path logspath;
private final boolean wantsShowScript;
private final String scriptfile;
public enum STATUS { public enum STATUS {
OK, OK,
@@ -61,78 +54,91 @@ public class NBSession extends NBBaseComponent implements Function<List<Cmd>, Ex
} }
public NBSession( public NBSession(
NBComponent parent, NBLabeledElement labelContext,
String sessionName, String sessionName,
String progressSpec, String progressSpec,
String reportSummaryTo, boolean wantsDryRun
Path logspath,
String scriptfile,
boolean wantsShowScript
) { ) {
super(parent, NBLabels.forKV("session", sessionName)); super(null, labelContext.getLabels().and("session", sessionName));
this.sessionName = sessionName; this.sessionName = sessionName;
this.progressSpec = progressSpec; this.progressSpec = progressSpec;
this.reportSummaryTo = reportSummaryTo; this.wantsDryRun = wantsDryRun;
this.logspath = logspath;
this.scriptfile = scriptfile;
this.wantsShowScript = wantsShowScript;
} }
public ExecutionResult apply(List<Cmd> cmds) { public ExecutionResult apply(List<Cmd> cmds) {
if (cmds.isEmpty()) {
logger.info("No commands provided.");
}
ResultCollector collector = new ResultCollector(); ResultCollector collector = new ResultCollector();
try (ResultContext results = new ResultContext(collector)) { try (ResultContext results = new ResultContext(collector)) {
final ScenariosExecutor scenariosExecutor = new ScenariosExecutor("executor-" + sessionName, 1); final ScenariosExecutor scenariosExecutor = new ScenariosExecutor(this, "executor-" + sessionName, 1);
NBScenario.Invocation invocation = wantsShowScript ? NBScenario.Invocation.RENDER_SCRIPT : NBScenario.Invocation.EXECUTE_SCRIPT;
final NBScenario scenario = new NBScenario(
sessionName,
progressSpec,
reportSummaryTo,
logspath,
scriptfile,
this,
invocation
);
try (NBComponentSubScope s = new NBComponentSubScope(scenario)) {
final ScriptBuffer buffer = new BasicScriptBuffer().add(cmds.toArray(new Cmd[0]));
final String scriptData = buffer.getParsedScript();
// Execute Scenario!
if (cmds.isEmpty()) {
logger.info("No commands provided.");
}
scenario.addScriptText(scriptData);
final ScriptParams scriptParams = new ScriptParams();
scriptParams.putAll(buffer.getCombinedParams());
scenario.addScenarioScriptParams(scriptParams);
scenariosExecutor.execute(scenario);
final ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults();
logger.debug(() -> "Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor");
ActivityMetrics.closeMetrics();
scenariosResults.reportToLog();
ShutdownManager.shutdown();
logger.info(scenariosResults.getExecutionSummary());
if (scenariosResults.hasError()) {
results.error(scenariosResults.getAnyError().orElseThrow());
final Exception exception = scenariosResults.getOne().getException();
logger.warn(scenariosResults.getExecutionSummary());
NBCLIErrorHandler.handle(exception, true);
System.err.println(exception.getMessage()); // TODO: make this consistent with ConsoleLogging sequencing
}
results.output(scenariosResults.getExecutionSummary());
NBScenario scenario;
if (cmds.get(0).getCmdType().equals(Cmd.CmdType.java)) {
scenario = buildJavaScenario(cmds, wantsDryRun);
} else {
scenario = buildJavacriptScenario(cmds, wantsDryRun);
} }
try (NBComponentSubScope scope = new NBComponentSubScope(scenario)) {
assert scenario != null;
scenariosExecutor.execute(scenario);
// this.doReportSummaries(this.reportSummaryTo, this.result);
}
final ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults();
logger.debug(() -> "Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor");
// ActivityMetrics.closeMetrics();
// scenariosResults.reportToLog();
// ShutdownManager.shutdown();
//
// logger.info(scenariosResults.getExecutionSummary());
if (scenariosResults.hasError()) {
results.error(scenariosResults.getAnyError().orElseThrow());
final Exception exception = scenariosResults.getOne().getException();
logger.warn(scenariosResults.getExecutionSummary());
NBCLIErrorHandler.handle(exception, true);
System.err.println(exception.getMessage()); // TODO: make this consistent with ConsoleLogging sequencing
}
results.output(scenariosResults.getExecutionSummary());
} }
return collector.toExecutionResult(); return collector.toExecutionResult();
} }
private NBScenario buildJavacriptScenario(List<Cmd> cmds, boolean dryrun) {
NBScriptedScenario.Invocation invocation = dryrun ?
NBScriptedScenario.Invocation.RENDER_SCRIPT :
NBScriptedScenario.Invocation.EXECUTE_SCRIPT;
final ScriptBuffer buffer = new BasicScriptBuffer().add(cmds.toArray(new Cmd[0]));
final String scriptData = buffer.getParsedScript();
final ScriptParams scriptParams = new ScriptParams();
scriptParams.putAll(buffer.getCombinedParams());
final NBScriptedScenario scenario = new NBScriptedScenario(
sessionName,
progressSpec,
scriptParams,
this,
invocation
);
scenario.addScriptText(scriptData);
scenario.addScenarioScriptParams(scriptParams);
return scenario;
}
private NBScenario buildJavaScenario(List<Cmd> cmds, boolean dryrun) {
return null;
}
} }

View File

@@ -98,7 +98,7 @@ class ActivityExecutorTest {
activity.setInputDispenserDelegate(inputDispenser); activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser); activity.setMotorDispenserDelegate(motorDispenser);
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start"); ActivityExecutor activityExecutor = new ActivityExecutor(activity);
ExecutorService testExecutor = Executors.newCachedThreadPool(); ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(activityExecutor); Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
@@ -136,7 +136,7 @@ class ActivityExecutorTest {
simpleActivity.setInputDispenserDelegate(inputDispenser); simpleActivity.setInputDispenserDelegate(inputDispenser);
simpleActivity.setMotorDispenserDelegate(motorDispenser); simpleActivity.setMotorDispenserDelegate(motorDispenser);
ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor"); ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity);
activityDef.setThreads(5); activityDef.setThreads(5);
ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor); ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor);

View File

@@ -18,11 +18,15 @@ package io.nosqlbench.engine.core;
import io.nosqlbench.api.config.standard.TestComponent; import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer; import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.lifecycle.scenario.NBScenario; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenarioResult;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosExecutor;
import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public class NBScenarioTest { public class NBScenarioTest {
@@ -31,15 +35,16 @@ public class NBScenarioTest {
@Test @Test
public void shouldLoadScriptText() { public void shouldLoadScriptText() {
ScriptEnvBuffer buffer = new ScriptEnvBuffer(); ScriptEnvBuffer buffer = new ScriptEnvBuffer();
NBScenario scenario = NBScenario.forTesting("testing", "stdout:300", new TestComponent()); NBScriptedScenario scenario = NBScriptedScenario.ofScripted("testing", Map.of(),new TestComponent(), NBScriptedScenario.Invocation.EXECUTE_SCRIPT);
scenario.addScriptText("print('loaded script environment...');\n"); scenario.addScriptText("print('loaded script environment...');\n");
try { try {
var result=scenario.call(); ScenariosExecutor executor = new ScenariosExecutor(TestComponent.INSTANCE, "test", 1);
executor.execute(scenario);
ScenarioResult result = executor.awaitAllResults().getOne();
assertThat(result.getIOLog()).contains("loaded script environment...");
} catch (Exception e) { } catch (Exception e) {
logger.debug(() -> "Scenario run encountered an exception: " + e.getMessage()); logger.debug(() -> "Scenario run encountered an exception: " + e.getMessage());
} }
assertThat(scenario.getIOLog().get().get(0)).contains("loaded script environment...");
} }
} }

View File

@@ -17,21 +17,23 @@
package io.nosqlbench.engine.core.script; package io.nosqlbench.engine.core.script;
import io.nosqlbench.api.config.standard.TestComponent; import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.NBScenario; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosExecutor;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenariosExecutor; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosResults;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenariosResults; import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map;
public class ScenariosExecutorTest { public class ScenariosExecutorTest {
@Test @Test
@Disabled @Disabled
public void testAwaitOnTime() { public void testAwaitOnTime() {
ScenariosExecutor e = new ScenariosExecutor(ScenariosExecutorTest.class.getSimpleName(), 1); ScenariosExecutor e = new ScenariosExecutor(new TestComponent("id","test-await-on-time"),ScenariosExecutorTest.class.getSimpleName(), 1);
NBScenario s = NBScenario.forTesting("testing", "stdout:3000", new TestComponent()); NBScriptedScenario scenario = NBScriptedScenario.ofScripted("testing", Map.of(),new TestComponent("scripted-scenario","scripted-scenario"), NBScriptedScenario.Invocation.EXECUTE_SCRIPT);
s.addScriptText("load('classpath:scripts/asyncs.js');\nsetTimeout(\"print('waited')\",5000);\n"); scenario.addScriptText("load('classpath:scripts/asyncs.js');\nsetTimeout(\"print('waited')\",5000);\n");
e.execute(s); e.execute(scenario);
ScenariosResults scenariosResults = e.awaitAllResults(); ScenariosResults scenariosResults = e.awaitAllResults();
} }

View File

@@ -17,7 +17,7 @@
package io.nosqlbench.engine.core.script; package io.nosqlbench.engine.core.script;
import io.nosqlbench.api.errors.BasicError; import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScriptParams; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScriptParams;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.HashMap; import java.util.HashMap;

View File

@@ -0,0 +1,21 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.components;
public interface NBComponentErrorHandler {
public void notifyException(Thread t, Throwable e);
}

View File

@@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.scripting;
import java.io.CharArrayWriter;
import java.io.IOException;
import java.io.Reader;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
public class DiagReader extends Reader {
Reader wrapped;
private final String prefix;
CharArrayWriter buffer = new CharArrayWriter(0);
private final List<String> timedLog = new ArrayList<String>();
private final DateTimeFormatter tsformat = DateTimeFormatter.ISO_DATE_TIME;
public DiagReader(Reader wrapped, String prefix) {
this.wrapped = wrapped;
this.prefix = prefix;
}
@Override
public int read(char[] cbuf, int off, int len) throws IOException {
String tsprefix = LocalDateTime.now().format(tsformat);
int read = wrapped.read(cbuf, off, len);
buffer.write(cbuf, off, len);
timedLog.add(tsprefix + prefix + new String(cbuf, off, len));
return read;
}
@Override
public void close() throws IOException {
wrapped.close();
buffer.close();
}
public List<String> getTimedLog() {
return this.timedLog;
}
}

View File

@@ -0,0 +1,84 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.scripting;
import java.io.CharArrayWriter;
import java.io.IOException;
import java.io.Writer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
public class DiagWriter extends Writer {
Writer wrapped;
private final String prefix;
CharArrayWriter buffer = new CharArrayWriter();
private final List<String> timedLog = new ArrayList<String>();
private final StringBuilder sb = new StringBuilder();
private final DateTimeFormatter tsformat = DateTimeFormatter.ISO_DATE_TIME;
public DiagWriter(Writer wrapped, String prefix) {
this.wrapped = wrapped;
this.prefix = prefix;
}
@Override
public void write(char[] cbuf, int off, int len) throws IOException {
String tsprefix = LocalDateTime.now().format(tsformat);
buffer.write(cbuf, off, len);
String text = new String(cbuf, off, len);
sb.append(text);
if (text.contains("\n")) {
String msgs = sb.toString();
String extra = msgs.substring(msgs.lastIndexOf("\n") + 1);
sb.setLength(0);
sb.append(extra);
String[] parts = msgs.substring(0, msgs.length() - extra.length()).split("\n");
for (String part : parts) {
if (!part.isBlank()) {
String tslogEntry = tsprefix + prefix + part + "\n";
timedLog.add(tslogEntry);
}
}
}
wrapped.write(cbuf, off, len);
}
@Override
public void flush() throws IOException {
buffer.flush();
wrapped.flush();
}
@Override
public void close() throws IOException {
buffer.close();
wrapped.close();
}
public List<String> getTimedLog() {
return timedLog;
}
}

View File

@@ -0,0 +1,34 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.nbr;
public class DirectRuntimeScenarioTests {
// @Test
// public void testDirect() {
// TestComponent testC = new TestComponent("testroot", "testroot");
// new NBScriptedScenario(
// "testname",
// "console:1m",
// null,
// Path.of("native-logs"),
// null,
// testC,
// NBScriptedScenario.Invocation.EXECUTE_SCRIPT
// );
// }
}

View File

@@ -17,13 +17,14 @@
package io.nosqlbench.nbr.examples; package io.nosqlbench.nbr.examples;
import io.nosqlbench.api.config.standard.TestComponent; import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenarioResult;
import io.nosqlbench.engine.core.lifecycle.scenario.NBScenario; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosExecutor;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenariosExecutor; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenariosResults;
import io.nosqlbench.engine.core.lifecycle.scenario.ScenariosResults; import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.compress.utils.IOUtils;
import org.assertj.core.data.Offset; import org.assertj.core.data.Offset;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Execution; import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.api.parallel.ExecutionMode;
@@ -41,10 +42,11 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@Disabled
@Execution(ExecutionMode.SAME_THREAD) @Execution(ExecutionMode.SAME_THREAD)
public class ScriptExampleTests { public class ScriptExampleTests {
public static ExecutionMetricsResult runScenario(String scriptname, String... params) { public static ScenarioResult runScenario(String scriptname, String... params) {
if ((params.length % 2) != 0) { if ((params.length % 2) != 0) {
throw new RuntimeException("params must be pairwise key, value, ..."); throw new RuntimeException("params must be pairwise key, value, ...");
} }
@@ -55,8 +57,9 @@ public class ScriptExampleTests {
} }
String scenarioName = "scenario " + scriptname; String scenarioName = "scenario " + scriptname;
System.out.println("=".repeat(29) + " Running integration test for example scenario: " + scenarioName); System.out.println("=".repeat(29) + " Running integration test for example scenario: " + scenarioName);
ScenariosExecutor executor = new ScenariosExecutor(ScriptExampleTests.class.getSimpleName() + ":" + scriptname, 1);
NBScenario s = NBScenario.forTesting(scenarioName,"stdout:300", new TestComponent()); ScenariosExecutor executor = new ScenariosExecutor(new TestComponent("test","test"),ScriptExampleTests.class.getSimpleName() + ":" + scriptname, 1);
NBScriptedScenario s = NBScriptedScenario.ofScripted(scenarioName,Map.of(),new TestComponent("test","test"), NBScriptedScenario.Invocation.EXECUTE_SCRIPT);
s.addScenarioScriptParams(paramsMap); s.addScenarioScriptParams(paramsMap);
@@ -77,7 +80,7 @@ public class ScriptExampleTests {
// s.addScriptText("load('classpath:scripts/async/" + scriptname + ".js');"); // s.addScriptText("load('classpath:scripts/async/" + scriptname + ".js');");
executor.execute(s); executor.execute(s);
ScenariosResults scenariosResults = executor.awaitAllResults(); ScenariosResults scenariosResults = executor.awaitAllResults();
ExecutionMetricsResult scenarioResult = scenariosResults.getOne(); ScenarioResult scenarioResult = scenariosResults.getOne();
executor.shutdownNow(); executor.shutdownNow();
return scenarioResult; return scenarioResult;
} }
@@ -89,7 +92,7 @@ public class ScriptExampleTests {
@Test @Test
public void testLinkedInput() { public void testLinkedInput() {
ExecutionMetricsResult scenarioResult = runScenario("linkedinput"); ScenarioResult scenarioResult = runScenario("linkedinput");
Pattern p = Pattern.compile(".*started leader.*started follower.*stopped leader.*stopped follower.*", Pattern p = Pattern.compile(".*started leader.*started follower.*stopped leader.*stopped follower.*",
Pattern.DOTALL); Pattern.DOTALL);
assertThat(p.matcher(scenarioResult.getIOLog()).matches()).isTrue(); assertThat(p.matcher(scenarioResult.getIOLog()).matches()).isTrue();
@@ -97,7 +100,7 @@ public class ScriptExampleTests {
@Test @Test
public void testCycleRate() { public void testCycleRate() {
ExecutionMetricsResult scenarioResult = runScenario("cycle_rate"); ScenarioResult scenarioResult = runScenario("cycle_rate");
String iolog = scenarioResult.getIOLog(); String iolog = scenarioResult.getIOLog();
System.out.println("iolog\n" + iolog); System.out.println("iolog\n" + iolog);
Pattern p = Pattern.compile(".*mean cycle rate = (\\d[.\\d]+).*", Pattern.DOTALL); Pattern p = Pattern.compile(".*mean cycle rate = (\\d[.\\d]+).*", Pattern.DOTALL);
@@ -112,13 +115,13 @@ public class ScriptExampleTests {
@Test @Test
public void testExtensionPoint() { public void testExtensionPoint() {
ExecutionMetricsResult scenarioResult = runScenario("extensions"); ScenarioResult scenarioResult = runScenario("extensions");
assertThat(scenarioResult.getIOLog()).contains("sum is 46"); assertThat(scenarioResult.getIOLog()).contains("sum is 46");
} }
@Test @Test
public void testOptimo() { public void testOptimo() {
ExecutionMetricsResult scenarioResult = runScenario("optimo"); ScenarioResult scenarioResult = runScenario("optimo");
String iolog = scenarioResult.getIOLog(); String iolog = scenarioResult.getIOLog();
System.out.println("iolog\n" + iolog); System.out.println("iolog\n" + iolog);
assertThat(iolog).contains("map of result was"); assertThat(iolog).contains("map of result was");
@@ -126,13 +129,13 @@ public class ScriptExampleTests {
@Test @Test
public void testExtensionCsvMetrics() { public void testExtensionCsvMetrics() {
ExecutionMetricsResult scenarioResult = runScenario("extension_csvmetrics"); ScenarioResult scenarioResult = runScenario("extension_csvmetrics");
assertThat(scenarioResult.getIOLog()).contains("started new csvmetrics: logs/csvmetricstestdir"); assertThat(scenarioResult.getIOLog()).contains("started new csvmetrics: logs/csvmetricstestdir");
} }
@Test @Test
public void testScriptParamsVariable() { public void testScriptParamsVariable() {
ExecutionMetricsResult scenarioResult = runScenario("params_variable", "one", "two", "three", "four"); ScenarioResult scenarioResult = runScenario("params_variable", "one", "two", "three", "four");
assertThat(scenarioResult.getIOLog()).contains("params[\"one\"]='two'"); assertThat(scenarioResult.getIOLog()).contains("params[\"one\"]='two'");
assertThat(scenarioResult.getIOLog()).contains("params[\"three\"]='four'"); assertThat(scenarioResult.getIOLog()).contains("params[\"three\"]='four'");
assertThat(scenarioResult.getIOLog()).contains("overridden[\"three\"] [overridden-three-five]='five'"); assertThat(scenarioResult.getIOLog()).contains("overridden[\"three\"] [overridden-three-five]='five'");
@@ -141,7 +144,7 @@ public class ScriptExampleTests {
@Test @Test
public void testScriptParamsUndefVariableWithOverride() { public void testScriptParamsUndefVariableWithOverride() {
ExecutionMetricsResult scenarioResult = runScenario("undef_param", "one", "two", "three", "four"); ScenarioResult scenarioResult = runScenario("undef_param", "one", "two", "three", "four");
assertThat(scenarioResult.getIOLog()).contains("before: params[\"three\"]:four"); assertThat(scenarioResult.getIOLog()).contains("before: params[\"three\"]:four");
assertThat(scenarioResult.getIOLog()).contains("before: params.three:four"); assertThat(scenarioResult.getIOLog()).contains("before: params.three:four");
assertThat(scenarioResult.getIOLog()).contains("after: params[\"three\"]:undefined"); assertThat(scenarioResult.getIOLog()).contains("after: params[\"three\"]:undefined");
@@ -151,7 +154,7 @@ public class ScriptExampleTests {
// TODO - length >= 2 expected, not passing with changes for metrics // TODO - length >= 2 expected, not passing with changes for metrics
@Test @Test
public void testExtensionHistoStatsLogger() throws IOException { public void testExtensionHistoStatsLogger() throws IOException {
ExecutionMetricsResult scenarioResult = runScenario("extension_histostatslogger"); ScenarioResult scenarioResult = runScenario("extension_histostatslogger");
assertThat(scenarioResult.getIOLog()).contains("stdout started " + assertThat(scenarioResult.getIOLog()).contains("stdout started " +
"logging to logs/histostats.csv"); "logging to logs/histostats.csv");
List<String> strings = Files.readAllLines(Paths.get( List<String> strings = Files.readAllLines(Paths.get(
@@ -163,7 +166,7 @@ public class ScriptExampleTests {
@Test @Test
public void testExtensionCsvOutput() throws IOException { public void testExtensionCsvOutput() throws IOException {
ExecutionMetricsResult scenarioResult = runScenario("extension_csvoutput"); ScenarioResult scenarioResult = runScenario("extension_csvoutput");
List<String> strings = Files.readAllLines(Paths.get( List<String> strings = Files.readAllLines(Paths.get(
"logs/csvoutputtestfile.csv")); "logs/csvoutputtestfile.csv"));
String logdata = strings.stream().collect(Collectors.joining("\n")); String logdata = strings.stream().collect(Collectors.joining("\n"));
@@ -174,7 +177,7 @@ public class ScriptExampleTests {
// TODO - length >= 2 expected, not passing with changes for metrics // TODO - length >= 2 expected, not passing with changes for metrics
@Test @Test
public void testExtensionHistogramLogger() throws IOException { public void testExtensionHistogramLogger() throws IOException {
ExecutionMetricsResult scenarioResult = runScenario("extension_histologger"); ScenarioResult scenarioResult = runScenario("extension_histologger");
assertThat(scenarioResult.getIOLog()).contains("stdout started logging to hdrhistodata.log"); assertThat(scenarioResult.getIOLog()).contains("stdout started logging to hdrhistodata.log");
List<String> strings = Files.readAllLines(Paths.get("hdrhistodata.log")); List<String> strings = Files.readAllLines(Paths.get("hdrhistodata.log"));
String logdata = strings.stream().collect(Collectors.joining("\n")); String logdata = strings.stream().collect(Collectors.joining("\n"));
@@ -184,7 +187,7 @@ public class ScriptExampleTests {
@Test @Test
public void testBlockingRun() { public void testBlockingRun() {
ExecutionMetricsResult scenarioResult = runScenario("blockingrun"); ScenarioResult scenarioResult = runScenario("blockingrun");
int a1end = scenarioResult.getIOLog().indexOf("blockingactivity1 finished"); int a1end = scenarioResult.getIOLog().indexOf("blockingactivity1 finished");
int a2start = scenarioResult.getIOLog().indexOf("running blockingactivity2"); int a2start = scenarioResult.getIOLog().indexOf("running blockingactivity2");
assertThat(a1end).isLessThan(a2start); assertThat(a1end).isLessThan(a2start);
@@ -192,12 +195,12 @@ public class ScriptExampleTests {
@Test @Test
public void testAwaitFinished() { public void testAwaitFinished() {
ExecutionMetricsResult scenarioResult = runScenario("awaitfinished"); ScenarioResult scenarioResult = runScenario("awaitfinished");
} }
@Test @Test
public void testStartStop() { public void testStartStop() {
ExecutionMetricsResult scenarioResult = runScenario("startstopdiag"); ScenarioResult scenarioResult = runScenario("startstopdiag");
int startedAt = scenarioResult.getIOLog().indexOf("starting activity teststartstopdiag"); int startedAt = scenarioResult.getIOLog().indexOf("starting activity teststartstopdiag");
int stoppedAt = scenarioResult.getIOLog().indexOf("stopped activity teststartstopdiag"); int stoppedAt = scenarioResult.getIOLog().indexOf("stopped activity teststartstopdiag");
assertThat(startedAt).isGreaterThan(0); assertThat(startedAt).isGreaterThan(0);
@@ -207,7 +210,7 @@ public class ScriptExampleTests {
// TODO: find out why this causes a long delay after stop is called. // TODO: find out why this causes a long delay after stop is called.
@Test @Test
public void testThreadChange() { public void testThreadChange() {
ExecutionMetricsResult scenarioResult = runScenario("threadchange"); ScenarioResult scenarioResult = runScenario("threadchange");
int changedTo1At = scenarioResult.getIOLog().indexOf("threads now 1"); int changedTo1At = scenarioResult.getIOLog().indexOf("threads now 1");
int changedTo5At = scenarioResult.getIOLog().indexOf("threads now 5"); int changedTo5At = scenarioResult.getIOLog().indexOf("threads now 5");
System.out.println("IOLOG:\n"+scenarioResult.getIOLog()); System.out.println("IOLOG:\n"+scenarioResult.getIOLog());
@@ -217,20 +220,20 @@ public class ScriptExampleTests {
@Test @Test
public void testReadMetric() { public void testReadMetric() {
ExecutionMetricsResult scenarioResult = runScenario("readmetrics"); ScenarioResult scenarioResult = runScenario("readmetrics");
assertThat(scenarioResult.getIOLog()).contains("count: "); assertThat(scenarioResult.getIOLog()).contains("count: ");
} }
@Test @Test
public void testShutdownHook() { public void testShutdownHook() {
ExecutionMetricsResult scenarioResult = runScenario("extension_shutdown_hook"); ScenarioResult scenarioResult = runScenario("extension_shutdown_hook");
assertThat(scenarioResult.getIOLog()).doesNotContain("shutdown hook running").describedAs( assertThat(scenarioResult.getIOLog()).doesNotContain("shutdown hook running").describedAs(
"shutdown hooks should not run in the same IO context as the main scenario" "shutdown hooks should not run in the same IO context as the main scenario"
); );
} }
@Test @Test
public void testReportedCoDelayBursty() { public void testReportedCoDelayBursty() {
ExecutionMetricsResult scenarioResult = runScenario("cocycledelay_bursty"); ScenarioResult scenarioResult = runScenario("cocycledelay_bursty");
assertThat(scenarioResult.getIOLog()).contains("step1 metrics.waittime="); assertThat(scenarioResult.getIOLog()).contains("step1 metrics.waittime=");
assertThat(scenarioResult.getIOLog()).contains("step2 metrics.waittime="); assertThat(scenarioResult.getIOLog()).contains("step2 metrics.waittime=");
String iolog = scenarioResult.getIOLog(); String iolog = scenarioResult.getIOLog();
@@ -240,7 +243,7 @@ public class ScriptExampleTests {
@Test @Test
public void testReportedCoDelayStrict() { public void testReportedCoDelayStrict() {
ExecutionMetricsResult scenarioResult = runScenario("cocycledelay_strict"); ScenarioResult scenarioResult = runScenario("cocycledelay_strict");
assertThat(scenarioResult.getIOLog()).contains("step1 cycles_waittime="); assertThat(scenarioResult.getIOLog()).contains("step1 cycles_waittime=");
assertThat(scenarioResult.getIOLog()).contains("step2 cycles_waittime="); assertThat(scenarioResult.getIOLog()).contains("step2 cycles_waittime=");
String iolog = scenarioResult.getIOLog(); String iolog = scenarioResult.getIOLog();
@@ -251,14 +254,14 @@ public class ScriptExampleTests {
@Test @Test
public void testCycleRateChangeNewMetrics() { public void testCycleRateChangeNewMetrics() {
ExecutionMetricsResult scenarioResult = runScenario("cycle_rate_change"); ScenarioResult scenarioResult = runScenario("cycle_rate_change");
String ioLog = scenarioResult.getIOLog(); String ioLog = scenarioResult.getIOLog();
assertThat(ioLog).contains("cycles adjusted, exiting on iteration"); assertThat(ioLog).contains("cycles adjusted, exiting on iteration");
} }
@Test @Test
public void testErrorPropagationFromAdapterOperation() { public void testErrorPropagationFromAdapterOperation() {
ExecutionMetricsResult scenarioResult = runScenario( ScenarioResult scenarioResult = runScenario(
"basicdiag", "basicdiag",
"driver", "diag", "cyclerate", "5", "erroroncycle", "10", "cycles", "2000" "driver", "diag", "cyclerate", "5", "erroroncycle", "10", "cycles", "2000"
); );
@@ -267,14 +270,14 @@ public class ScriptExampleTests {
@Test @Test
public void testErrorPropagationFromMotorThread() { public void testErrorPropagationFromMotorThread() {
ExecutionMetricsResult scenarioResult = runScenario("activity_error"); ScenarioResult scenarioResult = runScenario("activity_error");
assertThat(scenarioResult.getException()).isNotNull(); assertThat(scenarioResult.getException()).isNotNull();
assertThat(scenarioResult.getException().getMessage()).contains("For input string: \"unparsable\""); assertThat(scenarioResult.getException().getMessage()).contains("For input string: \"unparsable\"");
} }
@Test @Test
public void testErrorPropagationFromActivityInitialization() { public void testErrorPropagationFromActivityInitialization() {
ExecutionMetricsResult scenarioResult = runScenario("activity_init_error"); ScenarioResult scenarioResult = runScenario("activity_init_error");
assertThat(scenarioResult.getException()).isNotNull(); assertThat(scenarioResult.getException()).isNotNull();
assertThat(scenarioResult.getException().getMessage()).contains("Unknown config parameter 'unknown_config'"); assertThat(scenarioResult.getException().getMessage()).contains("Unknown config parameter 'unknown_config'");
assertThat(scenarioResult.getException()).isNotNull(); assertThat(scenarioResult.getException()).isNotNull();

View File

@@ -15,7 +15,7 @@
*/ */
package io.nosqlbench.nbr.examples; package io.nosqlbench.nbr.examples;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult; import io.nosqlbench.engine.core.lifecycle.scenario.execution.ScenarioResult;
import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@@ -30,14 +30,14 @@ public class SpeedCheckIntegrationTests {
@Disabled @Disabled
// Verified as working // Verified as working
public void testSpeedSanity() { public void testSpeedSanity() {
ExecutionMetricsResult scenarioResult = ScriptExampleTests.runScenario("speedcheck"); ScenarioResult scenarioResult = ScriptExampleTests.runScenario("speedcheck");
} }
@Test @Test
@Disabled @Disabled
// This seems incomplete // This seems incomplete
public void testThreadSpeeds() { public void testThreadSpeeds() {
ExecutionMetricsResult scenarioResult = ScriptExampleTests.runScenario("threadspeeds"); ScenarioResult scenarioResult = ScriptExampleTests.runScenario("threadspeeds");
} }