add instant and interval annotations

This commit is contained in:
Jonathan Shook 2020-12-07 01:50:43 -06:00
parent 5315b8a98a
commit 0d422b5a5f
7 changed files with 225 additions and 57 deletions

View File

@ -18,6 +18,9 @@ import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.input.ProgressCapable;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.nb.api.Layer;
import io.nosqlbench.nb.api.annotations.Annotation;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -56,6 +59,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private RuntimeException stoppingException;
private final static int waitTime = 10000;
private String sessionId = "";
private long startedAt = 0L;
private long stoppedAt = 0L;
private String[] annotatedCommand;
// private RunState intendedState = RunState.Uninitialized;
@ -63,15 +70,19 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
this.activity = activity;
this.activityDef = activity.getActivityDef();
executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
);
activity.getActivityDef().getParams().addListener(this);
activity.setActivityController(this);
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
// TODO: Doc how uninitialized activities do not propagate parameter map changes and how
// TODO: this is different from preventing modification to uninitialized activities
@ -86,9 +97,22 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
*/
public synchronized void startActivity() {
logger.info("starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
this.annotatedCommand = annotatedCommand;
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
try {
activity.setRunState(RunState.Starting);
this.startedAt = System.currentTimeMillis();
activity.initActivity();
//activity.onActivityDefUpdate(activityDef);
} catch (Exception e) {
@ -117,6 +141,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activity.setRunState(RunState.Stopped);
logger.info("stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
activitylogger.debug("STOP/after alias=(" + activity.getAlias() + ")");
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
}
@ -179,11 +214,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
}
}
public boolean requestStopExecutor(int secondsToWait) {
public boolean finishAndShutdownExecutor(int secondsToWait) {
activitylogger.debug("REQUEST STOP/before alias=(" + activity.getAlias() + ")");
logger.info("Stopping executor for " + activity.getAlias() + " when work completes.");
logger.debug("Stopping executor for " + activity.getAlias() + " when work completes.");
executorService.shutdown();
boolean wasStopped = false;
@ -202,6 +236,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activity.closeAutoCloseables();
activity.setRunState(RunState.Stopped);
}
if (stoppingException != null) {
logger.trace("an exception caused the activity to stop:" + stoppingException.getMessage());
throw stoppingException;
@ -231,10 +266,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
adjustToActivityDef(activity.getActivityDef());
}
motors.stream()
.filter(m -> (m instanceof ActivityDefObserver))
.filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
}
}
@ -242,8 +277,27 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
return activityDef;
}
/**
* This is the canonical way to wait for an activity to finish. It ties together
* any way that an activity can finish under one blocking call.
* This should be awaited asynchronously from the control layer in separate threads.
*
* TODO: move activity finisher threaad to this class and remove separate implementation
*/
public boolean awaitCompletion(int waitTime) {
return requestStopExecutor(waitTime);
boolean finished = finishAndShutdownExecutor(waitTime);
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
return finished;
}
public boolean awaitFinish(int timeout) {
@ -267,8 +321,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private String getSlotStatus() {
return motors.stream()
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
}
/**
@ -311,18 +365,18 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
case Running:
case Starting:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
break;
case Stopped:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
break;
case Finished:
case Stopping:
@ -403,7 +457,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
awaited = awaitMotorState(motor, waitTime, pollTime, awaitingState);
if (!awaited) {
logger.trace("failed awaiting motor " + motor.getSlotId() + " for state in " +
Arrays.asList(awaitingState));
Arrays.asList(awaitingState));
break;
}
}
@ -447,8 +501,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
boolean awaitedRequiredState = awaitMotorState(m, waitTime, pollTime, awaitingState);
if (!awaitedRequiredState) {
String error = "Unable to await " + activityDef.getAlias() +
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
RuntimeException e = new RuntimeException(error);
logger.error(error);
throw e;

View File

@ -20,6 +20,9 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.ProgressAndStateMeter;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.nb.api.Layer;
import io.nosqlbench.nb.api.annotations.Annotation;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -38,6 +41,11 @@ public class ScenarioController {
private static final Logger scenariologger = LogManager.getLogger("SCENARIO");
private final Map<String, ActivityExecutor> activityExecutors = new ConcurrentHashMap<>();
private final String sessionId;
public ScenarioController(String sessionId) {
this.sessionId = sessionId;
}
/**
* Start an activity, given the activity definition for it. The activity will be known in the scenario
@ -46,9 +54,20 @@ public class ScenarioController {
* @param activityDef string in alias=value1;driver=value2;... format
*/
public synchronized void start(ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "start")
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
scenariologger.debug("START " + activityDef.getAlias());
activityExecutor.startActivity();
}
/**
@ -84,6 +103,15 @@ public class ScenarioController {
* @param activityDef A definition for an activity to run
*/
public synchronized void run(int timeout, ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "run")
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
scenariologger.debug("RUN alias=" + activityDef.getAlias());
scenariologger.debug(" (RUN/START) alias=" + activityDef.getAlias());
@ -135,6 +163,15 @@ public class ScenarioController {
* @param activityDef An activity def, including at least the alias parameter.
*/
public synchronized void stop(ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "stop")
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
if (activityExecutor == null) {
throw new RuntimeException("could not stop missing activity:" + activityDef);
@ -179,7 +216,7 @@ public class ScenarioController {
}
ActivityExecutor activityExecutor = getActivityExecutor(alias);
ParameterMap params = activityExecutor.getActivityDef().getParams();
scenariologger.debug("SET ("+alias+"/"+param + ")=(" + value + ")");
scenariologger.debug("SET (" + alias + "/" + param + ")=(" + value + ")");
params.set(param, value);
}
@ -242,25 +279,25 @@ public class ScenarioController {
if (executor == null && createIfMissing) {
String activityTypeName = activityDef.getParams().getOptionalString("driver","type").orElse(null);
String activityTypeName = activityDef.getParams().getOptionalString("driver", "type").orElse(null);
List<String> knownTypes = ActivityType.FINDER.getAll().stream().map(ActivityType::getName).collect(Collectors.toList());
// Infer the type from either alias or yaml if possible (exactly one matches)
if (activityTypeName==null) {
if (activityTypeName == null) {
List<String> matching = knownTypes.stream().filter(
n ->
activityDef.getParams().getOptionalString("alias").orElse("").contains(n)
|| activityDef.getParams().getOptionalString("yaml", "workload").orElse("").contains(n)
).collect(Collectors.toList());
if (matching.size()==1) {
activityTypeName=matching.get(0);
if (matching.size() == 1) {
activityTypeName = matching.get(0);
logger.info("param 'type' was inferred as '" + activityTypeName + "' since it was seen in yaml or alias parameter.");
}
}
if (activityTypeName==null) {
if (activityTypeName == null) {
String errmsg = "You must provide a driver=<driver> parameter. Valid examples are:\n" +
knownTypes.stream().map(t -> " driver="+t+"\n").collect(Collectors.joining());
knownTypes.stream().map(t -> " driver=" + t + "\n").collect(Collectors.joining());
throw new BasicError(errmsg);
}
@ -345,15 +382,33 @@ public class ScenarioController {
* @return true, if all activities completed before the timer expired, false otherwise
*/
public boolean awaitCompletion(int waitTimeMillis) {
boolean completed = false;
for (ActivityExecutor executor : activityExecutors.values()) {
if (!executor.awaitCompletion(waitTimeMillis)) {
logger.debug("awaiting completion signaled FALSE");
return false;
boolean completed = true;
long waitstart = System.currentTimeMillis();
long remaining = waitTimeMillis;
List<ActivityFinisher> finishers = new ArrayList<>();
for (ActivityExecutor ae : activityExecutors.values()) {
ActivityFinisher finisher = new ActivityFinisher(ae, (int) remaining);
finishers.add(finisher);
finisher.start();
}
for (ActivityFinisher finisher : finishers) {
try {
finisher.join(waitTimeMillis);
} catch (InterruptedException ignored) {
}
}
logger.debug("All activities awaiting completion signaled TRUE");
return true;
for (ActivityFinisher finisher : finishers) {
if (!finisher.getResult()) {
logger.debug("finisher for " + finisher.getName() + " did not signal TRUE");
completed = false;
}
}
return completed;
}
private ActivityDef aliasToDef(String alias) {
@ -364,9 +419,10 @@ public class ScenarioController {
}
}
public boolean await(Map<String,String> activityDefMap) {
public boolean await(Map<String, String> activityDefMap) {
return this.awaitActivity(activityDefMap);
}
public boolean awaitActivity(Map<String, String> activityDefMap) {
ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap));
return awaitActivity(ad);
@ -375,6 +431,7 @@ public class ScenarioController {
public boolean await(String alias) {
return this.awaitActivity(alias);
}
public boolean awaitActivity(String alias) {
ActivityDef toAwait = aliasToDef(alias);
return awaitActivity(toAwait);
@ -383,6 +440,7 @@ public class ScenarioController {
public boolean await(ActivityDef activityDef) {
return this.awaitActivity(activityDef);
}
public boolean awaitActivity(ActivityDef activityDef) {
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
if (activityExecutor == null) {

View File

@ -56,14 +56,19 @@ import java.util.stream.Collectors;
public class Scenario implements Callable<ScenarioResult> {
private final String commandLine;
private Logger logger = LogManager.getLogger("SCENARIO");
private State state = State.Scheduled;
private volatile ScenarioShutdownHook scenarioShutdownHook;
private Exception error;
public enum State {
Scheduled,
Running,
Errored,
Interrupted,
Finished
}
@ -96,7 +101,8 @@ public class Scenario implements Callable<ScenarioResult> {
String progressInterval,
boolean wantsGraaljsCompatMode,
boolean wantsStackTraces,
boolean wantsCompiledScript) {
boolean wantsCompiledScript,
String commandLine) {
this.scenarioName = scenarioName;
this.scriptfile = scriptfile;
this.engine = engine;
@ -104,6 +110,7 @@ public class Scenario implements Callable<ScenarioResult> {
this.wantsGraaljsCompatMode = wantsGraaljsCompatMode;
this.wantsStackTraces = wantsStackTraces;
this.wantsCompiledScript = wantsCompiledScript;
this.commandLine = commandLine;
}
public Scenario setLogger(Logger logger) {
@ -118,6 +125,7 @@ public class Scenario implements Callable<ScenarioResult> {
public Scenario(String name, Engine engine) {
this.scenarioName = name;
this.engine = engine;
this.commandLine = "";
}
public Scenario addScriptText(String scriptText) {
@ -180,7 +188,7 @@ public class Scenario implements Callable<ScenarioResult> {
break;
}
scenarioController = new ScenarioController();
scenarioController = new ScenarioController(this.scenarioName);
if (!progressInterval.equals("disabled")) {
activityProgressIndicator = new ActivityProgressIndicator(scenarioController, progressInterval);
}
@ -229,6 +237,9 @@ public class Scenario implements Callable<ScenarioResult> {
}
public void run() {
scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(scenarioShutdownHook);
state = State.Running;
startedAtMillis = System.currentTimeMillis();
@ -237,7 +248,6 @@ public class Scenario implements Callable<ScenarioResult> {
.session(this.scenarioName)
.now()
.layer(Layer.Scenario)
.label("scenario", getScenarioName())
.detail("engine", this.engine.toString())
.build()
);
@ -285,14 +295,12 @@ public class Scenario implements Callable<ScenarioResult> {
this.state = State.Errored;
logger.warn("Error in scenario, shutting down.");
this.scenarioController.forceStopScenario(5000, false);
this.error = e;
throw new RuntimeException(e);
} finally {
if (this.state==State.Running) {
this.state = State.Finished;
}
System.out.flush();
System.err.flush();
endedAtMillis=System.currentTimeMillis();
endedAtMillis = System.currentTimeMillis();
}
}
int awaitCompletionTime = 86400 * 365 * 1000;
@ -300,8 +308,38 @@ public class Scenario implements Callable<ScenarioResult> {
scenarioController.awaitCompletion(awaitCompletionTime);
//TODO: Ensure control flow covers controller shutdown in event of internal error.
logger.debug("scenario completed without errors");
endedAtMillis=System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
Runtime.getRuntime().removeShutdownHook(scenarioShutdownHook);
scenarioShutdownHook = null;
finish();
}
public void finish() {
logger.debug("finishing scenario");
endedAtMillis = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
if (this.state == State.Running) {
this.state = State.Finished;
}
if (scenarioShutdownHook != null) {
// If this method was called while the shutdown hook is defined, then it means
// that the scenario was ended before the hook was uninstalled normally.
this.state = State.Interrupted;
logger.warn("Scenario was interrupted by process exit, shutting down");
}
logger.info("scenario state: " + this.state);
// We report the scenario state via annotation even for short runs
Annotation annotation = Annotation.newBuilder()
.session(this.scenarioName)
.interval(this.startedAtMillis, endedAtMillis)
.layer(Layer.Scenario)
.label("state", this.state.toString())
.detail("command_line", this.commandLine)
.build();
Annotators.recordAnnotation(annotation);
}
public long getStartedAtMillis() {
@ -315,7 +353,7 @@ public class Scenario implements Callable<ScenarioResult> {
public ScenarioResult call() {
run();
String iolog = scriptEnv.getTimedLog();
return new ScenarioResult(iolog);
return new ScenarioResult(iolog, this.startedAtMillis, this.endedAtMillis);
}
@Override

View File

@ -149,7 +149,8 @@ public class ScenariosExecutor {
try {
oResult = Optional.of(resultFuture.get());
} catch (Exception e) {
oResult = Optional.of(new ScenarioResult(e));
long now = System.currentTimeMillis();
oResult = Optional.of(new ScenarioResult(e, now, now));
}
}
@ -182,27 +183,28 @@ public class ScenariosExecutor {
if (resultFuture1 == null) {
throw new BasicError("Unknown scenario name:" + scenarioName);
}
long now = System.currentTimeMillis();
if (resultFuture1.isDone()) {
try {
return Optional.ofNullable(resultFuture1.get());
} catch (Exception e) {
return Optional.of(new ScenarioResult(e));
return Optional.of(new ScenarioResult(e, now, now));
}
} else if (resultFuture1.isCancelled()) {
return Optional.of(new ScenarioResult(new Exception("result was cancelled.")));
return Optional.of(new ScenarioResult(new Exception("result was cancelled."), now, now));
}
return Optional.empty();
}
public synchronized void stopScenario(String scenarioName) {
this.stopScenario(scenarioName,false);
this.stopScenario(scenarioName, false);
}
public synchronized void stopScenario(String scenarioName, boolean rethrow) {
Optional<Scenario> pendingScenario = getPendingScenario(scenarioName);
if (pendingScenario.isPresent()) {
ScenarioController controller = pendingScenario.get().getScenarioController();
if (controller!=null) {
if (controller != null) {
controller.forceStopScenario(0, rethrow);
}
} else {

View File

@ -108,8 +108,8 @@ public class ScenarioExecutorEndpoint implements WebServiceObject {
"disabled",
false,
true,
false
);
false,
cmdList.toString());
scenario.addScriptText(buffer.getParsedScript());

View File

@ -0,0 +1,12 @@
package io.nosqlbench.nb.api.annotations;
public enum Span {
/**
* A span of time of size zero.
*/
instant,
/**
* A span in time for which the start and end are different.
*/
interval
}

View File

@ -26,6 +26,7 @@ public class AnnotationBuilderTest {
String represented = an1.toString();
assertThat(represented).isEqualTo("session: test-session\n" +
"[2020-09-13T12:26:40Z[GMT]]\n" +
"span:instant\n" +
"details:\n" +
" detailk1: detailv1\n" +
" detailk2: \n" +
@ -38,7 +39,10 @@ public class AnnotationBuilderTest {
"labels:\n" +
" layer: Scenario\n" +
" labelka: labelvb\n" +
" labelkc: labelvd\n");
" labelkc: labelvd\n" +
" session: test-session\n" +
" span: instant\n" +
" appname: nosqlbench\n");
}