partial fixes for nosqlbench-797 Race condition between exceptional activity shutdown and normal scenario shutdown.

This commit is contained in:
Jonathan Shook 2022-11-30 11:17:10 -06:00
parent 79a2d09779
commit 86c4dfd966
25 changed files with 1184 additions and 353 deletions

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 42 KiB

View File

@ -0,0 +1,105 @@
@startuml
'https://plantuml.com/sequence-diagram
title: Lifecycle of an activity
control caller as caller
control ActivityExecutor as ae
control "Activity\nException\nHandler" as aeh
control "Activity\nThread\nFactory" as atf
control ExecutorService as aes
control Annotator as ann
control Activity as activity
== startup sequence ==
caller -> ae**: create
ae -> aeh**: create
ae -> atf**: create(\w Exception Handler)
aeh -> atf: <injected\nvia ctor>
ae -> aes**: create(\w Thread Factory)
atf -> aes: <injected\nvia ctor>
caller -> ae: startActivity()
activate ae
ae -> ann: Annotate Activity Start
ae -> activity: initActivity()
activate activity
ae <- activity
deactivate activity
note over ae,aes: align threadcount as explained below
caller <- ae
deactivate ae
== dynamic threadcount update ==
note over ae, aes: threads can be changed dynamically
caller -> ae: apply params
activate ae
ae->ae: align motor count
ae->aes: stop extra motors
ae->aes: <start missing motors>
group for each new thread/motor
ae -> aes: execute(<motor>)
activate aes
aes -> atf: get()
atf -> thread**: create
activate atf
aes <- atf: <thread>
deactivate atf
aes --> thread: run()
note over ann, thread: At this point, the\nmotor thread starts running\nthe defined activity's action\nover cycles
ae->ae: await thread state update
ae<-aes:
deactivate aes
end group
caller <- ae
deactivate ae
== shutdown sequence [after startup] ==
caller -> ae: stopActivity()
activate ae
ae -> ae: request stop motors
ae -> ae: await all stop
ae -> activity: shutdownActivity()
activate activity
ae <- activity
deactivate activity
ae -> ann: Annotate Activity Finish
caller <- ae
deactivate ae
== on exception in motor thread ==
thread -> aeh: catch(<thrown exception>)
aeh -> ae: notifyException\n(<thread>,<throwable>)
activate ae
ae -> ae: save exception
ae -> ae: forceStopActivity()
ae -> aes: shutdown();
activate aes
ae <- aes:
deactivate aes
group if needed [after timeout]]
ae -> aes: shutdownNow();
activate aes
ae <- aes
deactivate aes
end group
ae -> activity: shutdownActivity();
ae -> activity: closeAutoCloseables();
note over thread: action\nthread\nterminates
destroy thread
deactivate ae
@enduml

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 27 KiB

View File

@ -0,0 +1,67 @@
@startuml
'https://plantuml.com/sequence-diagram
title Lifecycle of a single scenario.call()
control "caller" as c
control "Scenario" as s
control "Scenario\nController" as sc
control "Scripting\nEngine" as engine
control "Activity\nExecutor" as ae
control "Java\nRuntime" as jrt
control "Shutdown\nHook" as sh
control "Annotations" as ann
c -> s**: create
c -> s: call()
activate s
s -> sh**: create
s -> jrt: register(ShutdownHook)
s -> ann: Annotate Scenario Start
s -> sc**: create
s -> engine**: create
s -> engine: run(script)
activate engine
group async calls [javacript+Java]
engine <--> sc: scenario.(*)
engine <--> sc: activities.(*)
engine <--> sc: metrics.(*)
engine <--> sc: params.(*)
engine -> sc: start(<activity>)
activate sc
sc -> ae**: create
sc -> ae: startActivity()
deactivate sc
end group
s <- engine: result
deactivate engine
s -> sc: awaitCompletion()
activate sc
group for each activity
sc -> ae: awaitCompletion()
activate ae
sc <- ae
deactivate ae
end group
s <- sc
deactivate sc
s -> jrt: unregister(ShutdownHook)
s -> sh: run()
sh -> ann: Annotate Scenario Finish
c <- s: Scenario\nResult
deactivate s
== on exception during call() ==
jrt -> sh: run()
sh -> ann: Annotate Scenario Finish
@enduml

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 28 KiB

View File

@ -0,0 +1,62 @@
@startuml
'https://plantuml.com/sequence-diagram
title Lifecycle of Scenarios
control "NBCLI" as nbcli
control "Scenario" as s
control "Scenario\nController" as sc
control "Scenarios\nExecutor" as se
control "Exception\nHandler" as seh
control "Thread\nFactory" as stf
control "Executor\nService" as ses
nbcli -> se** : create
se -> seh** : create
se -> stf** : create ThreadFactory\n(w/ ExceptionHandler)
se -> ses** : create ExecutorService\n(w/ ThreadFactory)
nbcli -> s** : create
s -> sc** : create
nbcli --> se : execute(Scenario)
se --> ses: submit(<Callable> Scenario)
activate ses
ses -> future**: create
se <-- ses: <Future<ScenarioResult>>
deactivate ses
== [async] on thread from thread factory ==
ses -> stf: get()
stf -> thread**: create
ses <- stf: <thread>
ses -> thread: run task
activate thread
thread -> s: call()
activate s
thread <- s: ScenarioResult
deactivate s
thread -> future: result
deactivate thread
== [async] on NBCLI thread ==
nbcli -> se: awaitAllResults();
activate se
se -> ses: shutdown
loop timeout
se -> ses: awaitTermination(timeout)
activate ses
se <- ses
deactivate ses
end loop
loop each future
se -> future: get()
activate future
se <- future: ScenarioResult
deactivate future
end loop
nbcli <- se: <ScenariosResults>
deactivate se
@enduml

View File

@ -18,18 +18,36 @@ package io.nosqlbench.engine.api.activityapi.core;
public enum RunState {
// Initial state after creation of this control
/**
* Initial state after creation of this control
*/
Uninitialized("i⌀"),
// This thread has been queued to run, but hasn't signaled yet that it is full started
// This must be set by the executor before executing the slot runnable
/**
* This thread has been queued to run, but hasn't signaled yet that it is full started
* This must be set by the executor before executing the slot runnable
*/
Starting("s⏫"),
// This thread is running. This should only be set by the controlled thread
/**
* This thread is running. This should only be set by the controlled thread
*/
Running("R\u23F5"),
// This thread has completed all of its activity, and will do no further work without new input
/**
* This thread has completed all of its activity, and will do no further work without new input
*/
Finished("F⏯"),
// The thread has been requested to stop. This says nothing of the internal state.
/**
* The thread has been requested to stop. This says nothing of the internal state.
*/
Stopping("s⏬"),
// The thread has stopped. This should only be set by the controlled thread
/**
* The thread has stopped. This should only be set by the controlled thread
*/
Stopped("_\u23F9");
private final String runcode;
@ -42,53 +60,25 @@ public enum RunState {
return this.runcode;
}
public boolean canTransitionTo(RunState to) {
switch (this) {
default:
return false;
case Uninitialized: // A motor was just created. This is its initial state.
case Stopped:
switch (to) {
case Starting: // a motor has been reserved for an execution command
return true;
default:
return false;
}
case Starting:
switch (to) {
case Running: // a motor has indicated that is in the run() method
case Finished: // a motor has exhausted its input, and has declined to go into started mode
return true;
default:
return false;
}
case Running:
switch (to) {
case Stopping: // A request was made to stop the motor before it finished
case Finished: // A motor has exhausted its input, and is finished with its work
return true;
default:
return false;
}
case Stopping:
switch (to) {
case Stopped: // A motor was stopped by request before exhausting input
return true;
default:
return false;
}// A motor was restarted after being stopped
case Finished:
switch (to) {
case Running: // A motor was restarted?
return true;
// not useful as of yet.
// Perhaps this will be allowed via explicit reset of input stream.
// If the input isn't reset, then trying to start a finished motor
// will cause it to short-circuit back to Finished state.
default:
return false;
}
}
/**
* @param target The target state
* @return true if the current state is allowed to transition to the target state
*/
public boolean canTransitionTo(RunState target) {
return switch (this) {
default -> false; // A motor was just created. This is its initial state.
case Uninitialized, Stopped -> (target == Starting);
case Starting -> switch (target) { // a motor has indicated that is in the run() method
case Running, Finished -> true;// a motor has exhausted its input, and has declined to go into started mode
default -> false;
};
case Running -> switch (target) { // A request was made to stop the motor before it finished
case Stopping, Finished -> true;// A motor has exhausted its input, and is finished with its work
default -> false;
};
case Stopping -> (target == Stopped); // A motor was stopped by request before exhausting input
case Finished -> (target == Running); // A motor was restarted?
};
}

View File

@ -418,7 +418,7 @@ public class NBCLI implements Function<String[], Integer> {
// intentionally not shown for warn-only
logger.info("console logging level is " + options.getConsoleLogLevel());
ScenariosExecutor executor = new ScenariosExecutor("executor-" + sessionName, 1);
ScenariosExecutor scenariosExecutor = new ScenariosExecutor("executor-" + sessionName, 1);
if (options.getConsoleLogLevel().isGreaterOrEqualTo(NBLogLevel.WARN)) {
options.setWantsStackTraces(true);
logger.debug("enabling stack traces since log level is " + options.getConsoleLogLevel());
@ -466,7 +466,7 @@ public class NBCLI implements Function<String[], Integer> {
scriptParams.putAll(buffer.getCombinedParams());
scenario.addScenarioScriptParams(scriptParams);
executor.execute(scenario);
scenariosExecutor.execute(scenario);
// while (true) {
// Optional<ScenarioResult> pendingResult = executor.getPendingResult(scenario.getScenarioName());
@ -476,7 +476,7 @@ public class NBCLI implements Function<String[], Integer> {
// LockSupport.parkNanos(100000000L);
// }
ScenariosResults scenariosResults = executor.awaitAllResults();
ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults();
logger.debug("Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor");
ActivityMetrics.closeMetrics(options.wantsEnableChart());

View File

@ -23,9 +23,9 @@ public class ActivityExceptionHandler implements Thread.UncaughtExceptionHandler
private static final Logger logger = LogManager.getLogger(ActivityExceptionHandler.class);
private final ActivityExecutor executor;
private final ActivityThreadsManager executor;
public ActivityExceptionHandler(ActivityExecutor executor) {
public ActivityExceptionHandler(ActivityThreadsManager executor) {
this.executor = executor;
logger.debug(() -> "Activity exception handler starting up for executor '" + executor + "'");
}

View File

@ -22,11 +22,11 @@ import org.apache.logging.log4j.Logger;
public class ActivityFinisher extends Thread {
private final static Logger logger = LogManager.getLogger(ActivityFinisher.class);
private final ActivityExecutor executor;
private final ActivityThreadsManager executor;
private final int timeout;
private boolean result;
public ActivityFinisher(ActivityExecutor executor, int timeout) {
public ActivityFinisher(ActivityThreadsManager executor, int timeout) {
super(executor.getActivityDef().getAlias() + "_finisher");
this.executor = executor;
this.timeout = timeout;

View File

@ -0,0 +1,20 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle;
public class ActivityStatus {
}

View File

@ -26,11 +26,10 @@ import io.nosqlbench.engine.core.annotation.Annotators;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
@ -49,9 +48,9 @@ import java.util.stream.Collectors;
* </ul>
*/
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable {
public class ActivityThreadsManager implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecResult> {
private static final Logger logger = LogManager.getLogger(ActivityExecutor.class);
private static final Logger logger = LogManager.getLogger(ActivityThreadsManager.class);
private static final Logger activitylogger = LogManager.getLogger("ACTIVITY");
private final List<Motor<?>> motors = new ArrayList<>();
@ -68,25 +67,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
// private RunState intendedState = RunState.Uninitialized;
public ActivityExecutor(Activity activity, String sessionId) {
public ActivityThreadsManager(Activity activity, String sessionId) {
this.activity = activity;
this.activityDef = activity.getActivityDef();
executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
);
activity.getActivityDef().getParams().addListener(this);
activity.setActivityController(this);
this.sessionId = sessionId;
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
// TODO: Doc how uninitialized activities do not propagate parameter map changes and how
// TODO: this is different from preventing modification to uninitialized activities
@ -101,14 +95,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
public synchronized void startActivity() {
logger.info("starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
@ -130,7 +124,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
/**
* Simply stop the motors
*/
public synchronized void stopActivity() {
private synchronized void stopActivity() {
activitylogger.debug("STOP/before alias=(" + activity.getAlias() + ")");
activity.setRunState(RunState.Stopping);
@ -142,19 +136,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activity.setRunState(RunState.Stopped);
logger.info("stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
activitylogger.debug("STOP/after alias=(" + activity.getAlias() + ")");
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
}
public synchronized RuntimeException forceStopScenario(int initialMillisToWait) {
public RuntimeException forceStopActivity(int initialMillisToWait) {
activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")");
activity.setRunState(RunState.Stopped);
@ -206,14 +201,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
* @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing
* everything to stop
*/
public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) {
RuntimeException exception = forceStopScenario(initialMillisToWait);
private synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) {
RuntimeException exception = forceStopActivity(initialMillisToWait);
if (exception != null && rethrow) {
throw exception;
}
}
public boolean finishAndShutdownExecutor(int secondsToWait) {
private boolean finishAndShutdownExecutor(int secondsToWait) {
activitylogger.debug("REQUEST STOP/before alias=(" + activity.getAlias() + ")");
logger.debug("Stopping executor for " + activity.getAlias() + " when work completes.");
@ -245,6 +240,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
if (stoppingException != null) {
logger.trace(() -> "an exception caused the activity to stop:" + stoppingException.getMessage());
logger.trace("Setting ERROR on activity executor: " + stoppingException.getMessage());
throw stoppingException;
}
@ -270,10 +266,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
adjustToActivityDef(activity.getActivityDef());
}
motors.stream()
.filter(m -> (m instanceof ActivityDefObserver))
.filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
}
}
@ -288,25 +284,25 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
* <p>
* TODO: move activity finisher thread to this class and remove separate implementation
*/
public boolean awaitCompletion(int waitTime) {
logger.debug(()-> "awaiting completion of '" + this.getActivity().getAlias() + "'");
private boolean awaitCompletion(int waitTime) {
logger.debug(() -> "awaiting completion of '" + this.getActivity().getAlias() + "'");
boolean finished = finishAndShutdownExecutor(waitTime);
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
.session(sessionId)
.interval(startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
return finished;
}
public boolean awaitFinish(int timeout) {
public boolean awaitFinishedOrStopped(int timeout) {
activitylogger.debug("AWAIT-FINISH/before alias=(" + activity.getAlias() + ")");
boolean awaited = awaitAllRequiredMotorState(timeout, 50, RunState.Finished, RunState.Stopped);
@ -327,8 +323,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private String getSlotStatus() {
return motors.stream()
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
}
/**
@ -371,18 +367,18 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
case Running:
case Starting:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
break;
case Stopped:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
break;
case Finished:
case Stopping:
@ -421,31 +417,23 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
* Await a thread (aka motor/slot) entering a specific SlotState
*
* @param m motor instance
* @param waitTime milliseco`nds to wait, total
* @param waitTime milliseconds to wait, total
* @param pollTime polling interval between state checks
* @param desiredRunStates any desired SlotState
* @return true, if the desired SlotState was detected
*/
private boolean awaitMotorState(Motor m, int waitTime, int pollTime, RunState... desiredRunStates) {
private boolean awaitMotorState(Motor<?> m, int waitTime, int pollTime, RunState... desiredRunStates) {
long startedAt = System.currentTimeMillis();
while (System.currentTimeMillis() < (startedAt + waitTime)) {
Map<RunState, Integer> actualStates = new HashMap<>();
for (RunState state : desiredRunStates) {
actualStates.compute(state, (k, v) -> (v == null ? 0 : v) + 1);
}
for (RunState desiredRunState : desiredRunStates) {
actualStates.remove(desiredRunState);
}
logger.trace(() -> "state of remaining slots:" + actualStates);
if (actualStates.size() == 0) {
return true;
} else {
System.out.println("motor states:" + actualStates);
try {
Thread.sleep(pollTime);
} catch (InterruptedException ignored) {
if (desiredRunState == m.getSlotStateTracker().getSlotState()) {
return true;
}
}
try {
Thread.sleep(pollTime);
} catch (InterruptedException ignored) {
}
}
logger.trace(() -> activityDef.getAlias() + "/Motor[" + m.getSlotId() + "] is now in state " + m.getSlotStateTracker().getSlotState());
return false;
@ -461,7 +449,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
awaited = awaitMotorState(motor, waitTime, pollTime, awaitingState);
if (!awaited) {
logger.trace(() -> "failed awaiting motor " + motor.getSlotId() + " for state in " +
Arrays.asList(awaitingState));
Arrays.asList(awaitingState));
break;
}
}
@ -469,28 +457,6 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
return awaited;
}
private boolean awaitAnyRequiredMotorState(int waitTime, int pollTime, RunState... awaitingState) {
long startedAt = System.currentTimeMillis();
while (System.currentTimeMillis() < (startedAt + waitTime)) {
for (Motor motor : motors) {
for (RunState state : awaitingState) {
if (motor.getSlotStateTracker().getSlotState() == state) {
logger.trace(() -> "at least one 'any' of " + activityDef.getAlias() + "/Motor[" + motor.getSlotId() + "] is now in state " + motor.getSlotStateTracker().getSlotState());
return true;
}
}
}
try {
Thread.sleep(pollTime);
} catch (InterruptedException ignored) {
}
}
logger.trace(() -> "none of " + activityDef.getAlias() + "/Motor [" + motors.size() + "] is in states in " + Arrays.asList(awaitingState));
return false;
}
/**
* Await a required thread (aka motor/slot) entering a specific SlotState
*
@ -505,8 +471,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
boolean awaitedRequiredState = awaitMotorState(m, waitTime, pollTime, awaitingState);
if (!awaitedRequiredState) {
String error = "Unable to await " + activityDef.getAlias() +
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
RuntimeException e = new RuntimeException(error);
logger.error(error);
throw e;
@ -532,7 +498,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
public synchronized void notifyException(Thread t, Throwable e) {
logger.debug(() -> "Uncaught exception in activity thread forwarded to activity executor: " + e.getMessage());
this.stoppingException = new RuntimeException("Error in activity thread " + t.getName(), e);
forceStopScenario(10000);
forceStopActivity(10000);
}
@Override
@ -564,4 +530,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
}
@Override
public synchronized ExecResult call() throws Exception {
boolean stopped = awaitCompletion(Integer.MAX_VALUE);
ExecResult result = new ExecResult(startedAt, stoppedAt, "", this.stoppingException);
return result;
}
}

View File

@ -20,20 +20,16 @@ import com.codahale.metrics.*;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.core.logging.Log4JMetricsReporter;
import io.nosqlbench.engine.core.metrics.NBMetricsSummary;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class ScenarioResult {
public class ExecMetricsResult extends ExecResult {
private final static Logger logger = LogManager.getLogger(ScenarioResult.class);
public static final Set<MetricAttribute> INTERVAL_ONLY_METRICS = Set.of(
MetricAttribute.MIN,
MetricAttribute.MAX,
@ -51,32 +47,12 @@ public class ScenarioResult {
MetricAttribute.M5_RATE,
MetricAttribute.M15_RATE
);
private final long startedAt;
private final long endedAt;
private final Exception exception;
private final String iolog;
public ScenarioResult(Exception e, String iolog, long startedAt, long endedAt) {
logger.debug("populating "+(e==null? "NORMAL" : "ERROR")+" scenario result");
if (logger.isDebugEnabled()) {
StackTraceElement[] st = Thread.currentThread().getStackTrace();
for (int i = 0; i < st.length; i++) {
logger.debug(":AT " + st[i].getFileName()+":"+st[i].getLineNumber()+":"+st[i].getMethodName());
if (i>10) break;
}
}
this.iolog = ((iolog!=null) ? iolog + "\n\n" : "") + (e!=null? e.getMessage() : "");
this.startedAt = startedAt;
this.endedAt = endedAt;
this.exception = e;
public ExecMetricsResult(long startedAt, long endedAt, String iolog, Exception e) {
super(startedAt, endedAt, iolog, e);
}
public void reportElapsedMillis() {
logger.info("-- SCENARIO TOOK " + getElapsedMillis() + "ms --");
}
public String getSummaryReport() {
public String getMetricsSummary() {
ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(os);
ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(ActivityMetrics.getMetricRegistry())
@ -91,45 +67,23 @@ public class ScenarioResult {
builder.disabledMetricAttributes(disabled);
ConsoleReporter consoleReporter = builder.build();
consoleReporter.report();
ps.flush();
consoleReporter.close();
String result = os.toString(StandardCharsets.UTF_8);
return result;
}
public void reportToConsole() {
String summaryReport = getSummaryReport();
String summaryReport = getMetricsSummary();
System.out.println(summaryReport);
}
public Optional<Exception> getException() {
return Optional.ofNullable(exception);
public void reportMetricsSummaryTo(PrintStream out) {
out.println(getMetricsSummary());
}
public void rethrowIfError() {
if (exception != null) {
if (exception instanceof RuntimeException) {
throw ((RuntimeException) exception);
} else {
throw new RuntimeException(exception);
}
}
}
public String getIOLog() {
return this.iolog;
}
public long getElapsedMillis() {
return endedAt - startedAt;
}
public void reportTo(PrintStream out) {
out.println(getSummaryReport());
}
public void reportToLog() {
public void reportMetricsSummaryToLog() {
logger.debug("-- WARNING: Metrics which are taken per-interval (like histograms) will not have --");
logger.debug("-- active data on this last report. (The workload has already stopped.) Record --");
logger.debug("-- metrics to an external format to see values for each reporting interval. --");
@ -142,10 +96,11 @@ public class ScenarioResult {
.outputTo(logger)
.build();
reporter.report();
reporter.close();
logger.debug("-- END METRICS DETAIL --");
}
public void reportCountsTo(PrintStream printStream) {
public void reportMetricsCountsTo(PrintStream printStream) {
StringBuilder sb = new StringBuilder();
ActivityMetrics.getMetricRegistry().getMetrics().forEach((k, v) -> {

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Provide a result type back to a caller, including the start and end times,
* any exception that occurred, and any content written to stdout or stderr equivalent
* IO streams. This is an <EM>execution result</EM>.
*
*/
public class ExecResult {
protected final static Logger logger = LogManager.getLogger(ExecMetricsResult.class);
protected final long startedAt;
protected final long endedAt;
protected final Exception exception;
protected final String iolog;
public ExecResult(long startedAt, long endedAt, String iolog, Exception e) {
this.startedAt = startedAt;
this.endedAt = endedAt;
this.exception = e;
this.iolog = ((iolog != null) ? iolog + "\n\n" : "") + (e != null ? e.getMessage() : "");
logger.debug("populating "+(e==null? "NORMAL" : "ERROR")+" scenario result");
if (logger.isDebugEnabled()) {
StackTraceElement[] st = Thread.currentThread().getStackTrace();
for (int i = 0; i < st.length; i++) {
logger.debug(":AT " + st[i].getFileName()+":"+st[i].getLineNumber()+":"+st[i].getMethodName());
if (i>10) break;
}
}
}
public void reportElapsedMillisToLog() {
logger.info("-- SCENARIO TOOK " + getElapsedMillis() + "ms --");
}
public String getIOLog() {
return this.iolog;
}
public long getElapsedMillis() {
return endedAt - startedAt;
}
public Optional<Exception> getException() {
return Optional.ofNullable(exception);
}
}

View File

@ -39,14 +39,15 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* A ScenarioController provides a way to start Activities, modify them while running, and forceStopMotors, pause or restart them.
* A ScenarioController provides a way to start Activities,
* modify them while running, and forceStopMotors, pause or restart them.
*/
public class ScenarioController {
private static final Logger logger = LogManager.getLogger(ScenarioController.class);
private static final Logger scenariologger = LogManager.getLogger("SCENARIO");
private final Map<String, ActivityExecutor> activityExecutors = new ConcurrentHashMap<>();
private final Map<String, ActivityThreadsManager> activityExecutors = new ConcurrentHashMap<>();
private final String sessionId;
private final Maturity minMaturity;
@ -72,10 +73,9 @@ public class ScenarioController {
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, true);
scenariologger.debug("START " + activityDef.getAlias());
activityExecutor.startActivity();
activityThreadsManager.startActivity();
}
/**
@ -120,12 +120,12 @@ public class ScenarioController {
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true);
ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, true);
scenariologger.debug("RUN alias=" + activityDef.getAlias());
scenariologger.debug(" (RUN/START) alias=" + activityDef.getAlias());
activityExecutor.startActivity();
activityThreadsManager.startActivity();
scenariologger.debug(" (RUN/AWAIT before) alias=" + activityDef.getAlias());
boolean completed = activityExecutor.awaitCompletion(timeout);
boolean completed = activityThreadsManager.awaitCompletion(timeout);
scenariologger.debug(" (RUN/AWAIT after) completed=" + activityDef.getAlias());
}
@ -154,8 +154,8 @@ public class ScenarioController {
public boolean isRunningActivity(ActivityDef activityDef) {
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
return activityExecutor != null && activityExecutor.isRunning();
ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, false);
return activityThreadsManager != null && activityThreadsManager.isRunning();
}
public boolean isRunningActivity(Map<String, String> activityDefMap) {
@ -180,18 +180,18 @@ public class ScenarioController {
.detail("params", activityDef.toString())
.build());
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
if (activityExecutor == null) {
ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, false);
if (activityThreadsManager == null) {
throw new RuntimeException("could not stop missing activity:" + activityDef);
}
RunState runstate = activityExecutor.getActivity().getRunState();
RunState runstate = activityThreadsManager.getActivity().getRunState();
if (runstate != RunState.Running) {
logger.warn("NOT stopping activity '" + activityExecutor.getActivity().getAlias() + "' because it is in state '" + runstate + "'");
logger.warn("NOT stopping activity '" + activityThreadsManager.getActivity().getAlias() + "' because it is in state '" + runstate + "'");
return;
}
scenariologger.debug("STOP " + activityDef.getAlias());
activityExecutor.stopActivity();
activityThreadsManager.stopActivity();
}
/**
@ -240,8 +240,8 @@ public class ScenarioController {
if (param.equals("alias")) {
throw new InvalidParameterException("It is not allowed to change the name of an existing activity.");
}
ActivityExecutor activityExecutor = getActivityExecutor(alias);
ParameterMap params = activityExecutor.getActivityDef().getParams();
ActivityThreadsManager activityThreadsManager = getActivityExecutor(alias);
ParameterMap params = activityThreadsManager.getActivityDef().getParams();
scenariologger.debug("SET (" + alias + "/" + param + ")=(" + value + ")");
params.set(param, value);
}
@ -261,7 +261,7 @@ public class ScenarioController {
throw new BasicError("alias must be provided");
}
ActivityExecutor executor = activityExecutors.get(alias);
ActivityThreadsManager executor = activityExecutors.get(alias);
if (executor == null) {
logger.info("started scenario from apply:" + alias);
@ -290,8 +290,8 @@ public class ScenarioController {
* @return the associated ActivityExecutor
* @throws RuntimeException a runtime exception if the named activity is not found
*/
private ActivityExecutor getActivityExecutor(String activityAlias) {
Optional<ActivityExecutor> executor =
private ActivityThreadsManager getActivityExecutor(String activityAlias) {
Optional<ActivityThreadsManager> executor =
Optional.ofNullable(activityExecutors.get(activityAlias));
return executor.orElseThrow(
() -> new RuntimeException("ActivityExecutor for alias " + activityAlias + " not found.")
@ -315,9 +315,9 @@ public class ScenarioController {
return matching;
}
private ActivityExecutor getActivityExecutor(ActivityDef activityDef, boolean createIfMissing) {
private ActivityThreadsManager getActivityExecutor(ActivityDef activityDef, boolean createIfMissing) {
synchronized (activityExecutors) {
ActivityExecutor executor = activityExecutors.get(activityDef.getAlias());
ActivityThreadsManager executor = activityExecutors.get(activityDef.getAlias());
if (executor == null && createIfMissing) {
if (activityDef.getParams().containsKey("driver")) {
@ -333,7 +333,7 @@ public class ScenarioController {
)
);
executor = new ActivityExecutor(
executor = new ActivityThreadsManager(
activityType.getAssembledActivity(
activityDef,
getActivityMap()
@ -342,7 +342,7 @@ public class ScenarioController {
);
activityExecutors.put(activityDef.getAlias(), executor);
} else {
executor = new ActivityExecutor(
executor = new ActivityThreadsManager(
new StandardActivityType(activityDef).getAssembledActivity(
activityDef, getActivityMap()
), this.sessionId
@ -392,7 +392,7 @@ public class ScenarioController {
*/
public List<ActivityDef> getActivityDefs() {
return activityExecutors.values().stream()
.map(ActivityExecutor::getActivityDef)
.map(ActivityThreadsManager::getActivityDef)
.collect(Collectors.toList());
}
@ -425,7 +425,8 @@ public class ScenarioController {
/**
* Await completion of all running activities, but do not force shutdownActivity. This method is meant to provide
* the blocking point for calling logic. It waits.
* the blocking point for calling logic. It waits. If there is an error which should propagate into the scenario,
* then it should be thrown from this method.
*
* @param waitTimeMillis The time to wait, usually set very high
* @return true, if all activities completed before the timer expired, false otherwise
@ -436,7 +437,7 @@ public class ScenarioController {
long remaining = waitTimeMillis;
List<ActivityFinisher> finishers = new ArrayList<>();
for (ActivityExecutor ae : activityExecutors.values()) {
for (ActivityThreadsManager ae : activityExecutors.values()) {
ActivityFinisher finisher = new ActivityFinisher(ae, (int) remaining);
finishers.add(finisher);
finisher.start();
@ -492,12 +493,12 @@ public class ScenarioController {
}
public boolean awaitActivity(ActivityDef activityDef) {
ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false);
if (activityExecutor == null) {
ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, false);
if (activityThreadsManager == null) {
throw new RuntimeException("Could not await missing activity: " + activityDef);
}
scenariologger.debug("AWAIT/before alias=" + activityDef.getAlias());
boolean finished = activityExecutor.awaitFinish(Integer.MAX_VALUE);
boolean finished = activityThreadsManager.awaitFinishedOrStopped(Integer.MAX_VALUE);
scenariologger.debug("AWAIT/after completed=" + finished);
return finished;
@ -506,7 +507,7 @@ public class ScenarioController {
/**
* @return an unmodifyable String to executor map of all activities known to this scenario
*/
public Map<String, ActivityExecutor> getActivityExecutorMap() {
public Map<String, ActivityThreadsManager> getActivityExecutorMap() {
return Collections.unmodifiableMap(activityExecutors);
}
@ -516,7 +517,7 @@ public class ScenarioController {
private Map<String, Activity> getActivityMap() {
Map<String, Activity> activityMap = new HashMap<String, Activity>();
for (Map.Entry<String, ActivityExecutor> entry : activityExecutors.entrySet()) {
for (Map.Entry<String, ActivityThreadsManager> entry : activityExecutors.entrySet()) {
activityMap.put(entry.getKey(), entry.getValue().getActivity());
}
return activityMap;
@ -524,7 +525,7 @@ public class ScenarioController {
public List<ProgressMeterDisplay> getProgressMeters() {
List<ProgressMeterDisplay> indicators = new ArrayList<>();
for (ActivityExecutor ae : activityExecutors.values()) {
for (ActivityThreadsManager ae : activityExecutors.values()) {
indicators.add(ae.getProgressMeter());
}
indicators.sort(Comparator.comparing(ProgressMeterDisplay::getStartTime));

View File

@ -28,27 +28,26 @@ public class ScenariosResults {
private static final Logger logger = LogManager.getLogger(ScenariosResults.class);
private final String scenariosExecutorName;
private final Map<Scenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
private final Map<Scenario, ExecMetricsResult> scenarioResultMap = new LinkedHashMap<>();
public ScenariosResults(ScenariosExecutor scenariosExecutor) {
this.scenariosExecutorName = scenariosExecutor.getName();
}
public ScenariosResults(ScenariosExecutor scenariosExecutor, Map<Scenario, ScenarioResult> map) {
public ScenariosResults(ScenariosExecutor scenariosExecutor, Map<Scenario, ExecMetricsResult> map) {
this.scenariosExecutorName = scenariosExecutor.getName();
scenarioResultMap.putAll(map);
}
public String getExecutionSummary() {
StringBuilder sb = new StringBuilder("executions: ");
sb.append(scenarioResultMap.size()).append(" scenarios, ");
sb.append(scenarioResultMap.values().stream().filter(r -> r.getException().isEmpty()).count()).append(" normal, ");
sb.append(scenarioResultMap.values().stream().filter(r -> r.getException().isPresent()).count()).append(" errored");
return sb.toString();
String sb = "executions: " + scenarioResultMap.size() + " scenarios, " +
scenarioResultMap.values().stream().filter(r -> r.getException().isEmpty()).count() + " normal, " +
scenarioResultMap.values().stream().filter(r -> r.getException().isPresent()).count() + " errored";
return sb;
}
public ScenarioResult getOne() {
public ExecMetricsResult getOne() {
if (this.scenarioResultMap.size() != 1) {
throw new RuntimeException("getOne found " + this.scenarioResultMap.size() + " results instead of 1.");
}
@ -57,14 +56,14 @@ public class ScenariosResults {
}
public void reportToLog() {
for (Map.Entry<Scenario, ScenarioResult> entry : this.scenarioResultMap.entrySet()) {
for (Map.Entry<Scenario, ExecMetricsResult> entry : this.scenarioResultMap.entrySet()) {
Scenario scenario = entry.getKey();
ScenarioResult oresult = entry.getValue();
ExecMetricsResult oresult = entry.getValue();
logger.info("results for scenario: " + scenario);
if (oresult != null) {
oresult.reportElapsedMillis();
oresult.reportElapsedMillisToLog();
} else {
logger.error(scenario.getScenarioName() + ": incomplete (missing result)");
}

View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle;
import io.nosqlbench.engine.api.activityapi.core.Activity;
public class StartedActivityInfo {
private final Activity activity;
StartedActivityInfo(Activity activity) {
this.activity = activity;
}
}

View File

@ -27,9 +27,9 @@ import io.nosqlbench.engine.api.extensions.ScriptingPluginInfo;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ActivityProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult;
import io.nosqlbench.engine.core.lifecycle.PolyglotScenarioController;
import io.nosqlbench.engine.core.lifecycle.ScenarioController;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.metrics.PolyglotMetricRegistryBindings;
import io.nosqlbench.nb.annotations.Maturity;
import org.apache.logging.log4j.LogManager;
@ -58,7 +58,7 @@ import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
public class Scenario implements Callable<ScenarioResult> {
public class Scenario implements Callable<ExecMetricsResult> {
private final String commandLine;
private final String reportSummaryTo;
@ -71,9 +71,9 @@ public class Scenario implements Callable<ScenarioResult> {
private Exception error;
private ScenarioMetadata scenarioMetadata;
private ScenarioResult result;
private ExecMetricsResult result;
public Optional<ScenarioResult> getResultIfComplete() {
public Optional<ExecMetricsResult> getResultIfComplete() {
return Optional.ofNullable(this.result);
}
@ -171,7 +171,7 @@ public class Scenario implements Callable<ScenarioResult> {
return this;
}
private void initializeScriptingEngine() {
private void initializeScriptingEngine(ScenarioController scenarioController) {
logger.debug("Using engine " + engine.toString());
MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry();
@ -198,12 +198,10 @@ public class Scenario implements Callable<ScenarioResult> {
this.scriptEngine = GraalJSScriptEngine.create(polyglotEngine, contextSettings);
scenarioController = new ScenarioController(this.scenarioName, minMaturity);
if (!progressInterval.equals("disabled")) {
activityProgressIndicator = new ActivityProgressIndicator(scenarioController, progressInterval);
}
scriptEnv = new ScenarioContext(scenarioController);
scriptEngine.setContext(scriptEnv);
@ -264,9 +262,22 @@ public class Scenario implements Callable<ScenarioResult> {
.build()
);
initializeScriptingEngine();
logger.debug("Running control script for " + getScenarioName() + ".");
scenarioController = new ScenarioController(this.scenarioName, minMaturity);
initializeScriptingEngine(scenarioController);
executeScenarioScripts();
long awaitCompletionTime = 86400 * 365 * 1000L;
logger.debug("Awaiting completion of scenario and activities for " + awaitCompletionTime + " millis.");
scenarioController.awaitCompletion(awaitCompletionTime);
//TODO: Ensure control flow covers controller shutdown in event of internal error.
Runtime.getRuntime().removeShutdownHook(scenarioShutdownHook);
scenarioShutdownHook.run();
}
private void executeScenarioScripts() {
for (String script : scripts) {
try {
Object result = null;
@ -304,6 +315,7 @@ public class Scenario implements Callable<ScenarioResult> {
System.err.flush();
System.out.flush();
} catch (Exception e) {
this.error=e;
this.state = State.Errored;
logger.error("Error in scenario, shutting down. (" + e + ")");
try {
@ -311,7 +323,6 @@ public class Scenario implements Callable<ScenarioResult> {
} catch (Exception eInner) {
logger.debug("Found inner exception while forcing stop with rethrow=false: " + eInner);
} finally {
this.error = e;
throw new RuntimeException(e);
}
} finally {
@ -320,14 +331,6 @@ public class Scenario implements Callable<ScenarioResult> {
endedAtMillis = System.currentTimeMillis();
}
}
long awaitCompletionTime = 86400 * 365 * 1000L;
logger.debug("Awaiting completion of scenario and activities for " + awaitCompletionTime + " millis.");
scenarioController.awaitCompletion(awaitCompletionTime);
//TODO: Ensure control flow covers controller shutdown in event of internal error.
Runtime.getRuntime().removeShutdownHook(scenarioShutdownHook);
scenarioShutdownHook = null;
finish();
}
public void finish() {
@ -370,9 +373,23 @@ public class Scenario implements Callable<ScenarioResult> {
/**
* This should be the only way to get a ScenarioResult for a Scenario.
*
* The lifecycle of a scenario includes the lifecycles of all of the following:
* <OL>
* <LI>The scenario control script, executing within a graaljs context.</LI>
* <LI>The lifecycle of every activity which is started within the scenario.</LI>
* </OL>
*
* All of these run asynchronously within the scenario, however the same thread that calls
* the scenario is the one which executes the control script. A scenario ends when all
* of the following conditions are met:
* <UL>
* <LI>The scenario control script has run to completion, or experienced an exception.</LI>
* <LI>Each activity has run to completion, experienced an exception, or all</LI>
* </UL>
*
* @return
*/
public synchronized ScenarioResult call() {
public synchronized ExecMetricsResult call() {
if (result == null) {
try {
runScenario();
@ -386,15 +403,15 @@ public class Scenario implements Callable<ScenarioResult> {
}
String iolog = scriptEnv.getTimedLog();
this.result = new ScenarioResult(this.error, iolog, this.startedAtMillis, this.endedAtMillis);
result.reportToLog();
this.result = new ExecMetricsResult(this.error, iolog, this.startedAtMillis, this.endedAtMillis);
result.reportMetricsSummaryToLog();
doReportSummaries(reportSummaryTo, result);
}
return result;
}
private void doReportSummaries(String reportSummaryTo, ScenarioResult result) {
private void doReportSummaries(String reportSummaryTo, ExecMetricsResult result) {
List<PrintStream> fullChannels = new ArrayList<>();
List<PrintStream> briefChannels = new ArrayList<>();
@ -437,7 +454,7 @@ public class Scenario implements Callable<ScenarioResult> {
}
}
}
fullChannels.forEach(result::reportTo);
fullChannels.forEach(result::reportMetricsSummaryTo);
// briefChannels.forEach(result::reportCountsTo);
}

View File

@ -50,7 +50,7 @@ public class ScenariosExecutor {
if (submitted.get(scenario.getScenarioName()) != null) {
throw new BasicError("Scenario " + scenario.getScenarioName() + " is already defined. Remove it first to reuse the name.");
}
Future<ScenarioResult> future = executor.submit(scenario);
Future<ExecMetricsResult> future = executor.submit(scenario);
SubmittedScenario s = new SubmittedScenario(scenario, future);
submitted.put(s.getName(), s);
}
@ -106,7 +106,7 @@ public class ScenariosExecutor {
throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
}
Map<Scenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
Map<Scenario, ExecMetricsResult> scenarioResultMap = new LinkedHashMap<>();
getAsyncResultStatus()
.entrySet()
.forEach(
@ -133,26 +133,26 @@ public class ScenariosExecutor {
* All submitted scenarios are included. Those which are still pending
* are returned with an empty option.</p>
*
* <p>Results may be exceptional. If {@link ScenarioResult#getException()} is present,
* <p>Results may be exceptional. If {@link ExecMetricsResult#getException()} is present,
* then the result did not complete normally.</p>
*
* @return map of async results, with incomplete results as Optional.empty()
*/
public Map<Scenario, Optional<ScenarioResult>> getAsyncResultStatus() {
public Map<Scenario, Optional<ExecMetricsResult>> getAsyncResultStatus() {
Map<Scenario, Optional<ScenarioResult>> optResults = new LinkedHashMap<>();
Map<Scenario, Optional<ExecMetricsResult>> optResults = new LinkedHashMap<>();
for (SubmittedScenario submittedScenario : submitted.values()) {
Future<ScenarioResult> resultFuture = submittedScenario.getResultFuture();
Future<ExecMetricsResult> resultFuture = submittedScenario.getResultFuture();
Optional<ScenarioResult> oResult = Optional.empty();
Optional<ExecMetricsResult> oResult = Optional.empty();
if (resultFuture.isDone()) {
try {
oResult = Optional.of(resultFuture.get());
} catch (Exception e) {
long now = System.currentTimeMillis();
logger.debug("creating exceptional scenario result from getAsyncResultStatus");
oResult = Optional.of(new ScenarioResult(e, "errored output", now, now));
oResult = Optional.of(new ExecMetricsResult(now, now, "errored output", e));
}
}
@ -179,7 +179,7 @@ public class ScenariosExecutor {
* @param scenarioName the scenario name of interest
* @return an optional result
*/
public Optional<Future<ScenarioResult>> getPendingResult(String scenarioName) {
public Optional<Future<ExecMetricsResult>> getPendingResult(String scenarioName) {
return Optional.ofNullable(submitted.get(scenarioName)).map(s -> s.resultFuture);
}
@ -224,9 +224,9 @@ public class ScenariosExecutor {
private static class SubmittedScenario {
private final Scenario scenario;
private final Future<ScenarioResult> resultFuture;
private final Future<ExecMetricsResult> resultFuture;
SubmittedScenario(Scenario scenario, Future<ScenarioResult> resultFuture) {
SubmittedScenario(Scenario scenario, Future<ExecMetricsResult> resultFuture) {
this.scenario = scenario;
this.resultFuture = resultFuture;
}
@ -235,7 +235,7 @@ public class ScenariosExecutor {
return scenario;
}
Future<ScenarioResult> getResultFuture() {
Future<ExecMetricsResult> getResultFuture() {
return resultFuture;
}

View File

@ -28,7 +28,7 @@ import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser;
import io.nosqlbench.engine.api.activityimpl.input.AtomicInput;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
import io.nosqlbench.engine.core.lifecycle.ActivityExecutor;
import io.nosqlbench.engine.core.lifecycle.ActivityThreadsManager;
import io.nosqlbench.engine.core.lifecycle.ActivityTypeLoader;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -38,8 +38,8 @@ import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
public class ActivityExecutorTest {
private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class);
public class ActivityThreadsManagerTest {
private static final Logger logger = LogManager.getLogger(ActivityThreadsManagerTest.class);
@Test
public synchronized void testRestart() {
@ -55,7 +55,7 @@ public class ActivityExecutorTest {
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ActivityExecutor ae = new ActivityExecutor(a, "test-restart");
ActivityThreadsManager ae = new ActivityThreadsManager(a, "test-restart");
ad.setThreads(1);
ae.startActivity();
ae.stopActivity();
@ -79,7 +79,7 @@ public class ActivityExecutorTest {
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ActivityExecutor ae = new ActivityExecutor(a, "test-delayed-start");
ActivityThreadsManager ae = new ActivityThreadsManager(a, "test-delayed-start");
ad.setThreads(1);
ae.startActivity();
ae.awaitCompletion(15000);
@ -104,7 +104,7 @@ public class ActivityExecutorTest {
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ActivityExecutor ae = new ActivityExecutor(a, "test-new-executor");
ActivityThreadsManager ae = new ActivityThreadsManager(a, "test-new-executor");
ad.setThreads(5);
ae.startActivity();

View File

@ -21,7 +21,7 @@ import io.nosqlbench.engine.cli.BasicScriptBuffer;
import io.nosqlbench.engine.cli.Cmd;
import io.nosqlbench.engine.cli.NBCLICommandParser;
import io.nosqlbench.engine.cli.ScriptBuffer;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult;
import io.nosqlbench.engine.core.script.Scenario;
import io.nosqlbench.engine.core.script.ScenariosExecutor;
import io.nosqlbench.engine.rest.services.WorkSpace;
@ -234,8 +234,8 @@ public class ScenarioExecutorEndpoint implements WebServiceObject {
Optional<Scenario> pendingScenario = executor.getPendingScenario(scenarioName);
if (pendingScenario.isPresent()) {
Optional<Future<ScenarioResult>> pendingResult = executor.getPendingResult(scenarioName);
Future<ScenarioResult> scenarioResultFuture = pendingResult.get();
Optional<Future<ExecMetricsResult>> pendingResult = executor.getPendingResult(scenarioName);
Future<ExecMetricsResult> scenarioResultFuture = pendingResult.get();
return new LiveScenarioView(pendingScenario.get());
} else {
throw new RuntimeException("Scenario name '" + scenarioName + "' not found.");

View File

@ -16,13 +16,13 @@
package io.nosqlbench.engine.rest.transfertypes;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult;
public class ResultView {
private final ScenarioResult result;
private final ExecMetricsResult result;
public ResultView(ScenarioResult result) {
public ResultView(ExecMetricsResult result) {
this.result = result;
}

View File

@ -16,7 +16,7 @@
package io.nosqlbench.nbr.examples;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult;
import io.nosqlbench.engine.core.lifecycle.ScenariosResults;
import io.nosqlbench.engine.core.script.Scenario;
import io.nosqlbench.engine.core.script.ScenariosExecutor;
@ -41,7 +41,7 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
public class ScriptExampleTests {
public static ScenarioResult runScenario(String scriptname, String... params) {
public static ExecMetricsResult runScenario(String scriptname, String... params) {
if ((params.length % 2) != 0) {
throw new RuntimeException("params must be pairwise key, value, ...");
}
@ -74,7 +74,7 @@ public class ScriptExampleTests {
// s.addScriptText("load('classpath:scripts/async/" + scriptname + ".js');");
executor.execute(s);
ScenariosResults scenariosResults = executor.awaitAllResults();
ScenarioResult scenarioResult = scenariosResults.getOne();
ExecMetricsResult scenarioResult = scenariosResults.getOne();
executor.shutdownNow();
return scenarioResult;
}
@ -86,7 +86,7 @@ public class ScriptExampleTests {
@Test
public void testLinkedInput() {
ScenarioResult scenarioResult = runScenario("linkedinput");
ExecMetricsResult scenarioResult = runScenario("linkedinput");
Pattern p = Pattern.compile(".*started leader.*started follower.*stopped leader.*stopped follower.*",
Pattern.DOTALL);
assertThat(p.matcher(scenarioResult.getIOLog()).matches()).isTrue();
@ -94,14 +94,14 @@ public class ScriptExampleTests {
@Test
public void testExceptionPropagationFromMotorThread() {
ScenarioResult scenarioResult = runScenario("activityerror");
ExecMetricsResult scenarioResult = runScenario("activityerror");
assertThat(scenarioResult.getException()).isPresent();
assertThat(scenarioResult.getException().get().getMessage()).contains("For input string: \"unparsable\"");
}
@Test
public void testCycleRate() {
ScenarioResult scenarioResult = runScenario("cycle_rate");
ExecMetricsResult scenarioResult = runScenario("cycle_rate");
String iolog = scenarioResult.getIOLog();
System.out.println("iolog\n" + iolog);
Pattern p = Pattern.compile(".*mean cycle rate = (\\d[.\\d]+).*", Pattern.DOTALL);
@ -116,13 +116,13 @@ public class ScriptExampleTests {
@Test
public void testExtensionPoint() {
ScenarioResult scenarioResult = runScenario("extensions");
ExecMetricsResult scenarioResult = runScenario("extensions");
assertThat(scenarioResult.getIOLog()).contains("sum is 46");
}
@Test
public void testOptimo() {
ScenarioResult scenarioResult = runScenario("optimo");
ExecMetricsResult scenarioResult = runScenario("optimo");
String iolog = scenarioResult.getIOLog();
System.out.println("iolog\n" + iolog);
assertThat(iolog).contains("map of result was");
@ -130,14 +130,14 @@ public class ScriptExampleTests {
@Test
public void testExtensionCsvLogger() {
ScenarioResult scenarioResult = runScenario("extension_csvmetrics");
ExecMetricsResult scenarioResult = runScenario("extension_csvmetrics");
assertThat(scenarioResult.getIOLog()).contains("started new " +
"csvlogger: logs/csvmetricstestdir");
}
@Test
public void testScriptParamsVariable() {
ScenarioResult scenarioResult = runScenario("params_variable", "one", "two", "three", "four");
ExecMetricsResult scenarioResult = runScenario("params_variable", "one", "two", "three", "four");
assertThat(scenarioResult.getIOLog()).contains("params[\"one\"]='two'");
assertThat(scenarioResult.getIOLog()).contains("params[\"three\"]='four'");
assertThat(scenarioResult.getIOLog()).contains("overridden[\"three\"] [overridden-three-five]='five'");
@ -146,7 +146,7 @@ public class ScriptExampleTests {
@Test
public void testScriptParamsUndefVariableWithOverride() {
ScenarioResult scenarioResult = runScenario("undef_param", "one", "two", "three", "four");
ExecMetricsResult scenarioResult = runScenario("undef_param", "one", "two", "three", "four");
assertThat(scenarioResult.getIOLog()).contains("before: params[\"three\"]:four");
assertThat(scenarioResult.getIOLog()).contains("before: params.three:four");
assertThat(scenarioResult.getIOLog()).contains("after: params[\"three\"]:undefined");
@ -155,7 +155,7 @@ public class ScriptExampleTests {
@Test
public void testExtensionHistoStatsLogger() throws IOException {
ScenarioResult scenarioResult = runScenario("extension_histostatslogger");
ExecMetricsResult scenarioResult = runScenario("extension_histostatslogger");
assertThat(scenarioResult.getIOLog()).contains("stdout started " +
"logging to logs/histostats.csv");
List<String> strings = Files.readAllLines(Paths.get(
@ -167,7 +167,7 @@ public class ScriptExampleTests {
@Test
public void testExtensionCsvOutput() throws IOException {
ScenarioResult scenarioResult = runScenario("extension_csvoutput");
ExecMetricsResult scenarioResult = runScenario("extension_csvoutput");
List<String> strings = Files.readAllLines(Paths.get(
"logs/csvoutputtestfile.csv"));
String logdata = strings.stream().collect(Collectors.joining("\n"));
@ -177,7 +177,7 @@ public class ScriptExampleTests {
@Test
public void testExtensionHistogramLogger() throws IOException {
ScenarioResult scenarioResult = runScenario("extension_histologger");
ExecMetricsResult scenarioResult = runScenario("extension_histologger");
assertThat(scenarioResult.getIOLog()).contains("stdout started logging to hdrhistodata.log");
List<String> strings = Files.readAllLines(Paths.get("hdrhistodata.log"));
String logdata = strings.stream().collect(Collectors.joining("\n"));
@ -187,7 +187,7 @@ public class ScriptExampleTests {
@Test
public void testBlockingRun() {
ScenarioResult scenarioResult = runScenario("blockingrun");
ExecMetricsResult scenarioResult = runScenario("blockingrun");
int a1end = scenarioResult.getIOLog().indexOf("blockingactivity1 finished");
int a2start = scenarioResult.getIOLog().indexOf("running blockingactivity2");
assertThat(a1end).isLessThan(a2start);
@ -195,12 +195,12 @@ public class ScriptExampleTests {
@Test
public void testAwaitFinished() {
ScenarioResult scenarioResult = runScenario("awaitfinished");
ExecMetricsResult scenarioResult = runScenario("awaitfinished");
}
@Test
public void testStartStop() {
ScenarioResult scenarioResult = runScenario("startstopdiag");
ExecMetricsResult scenarioResult = runScenario("startstopdiag");
int startedAt = scenarioResult.getIOLog().indexOf("starting activity teststartstopdiag");
int stoppedAt = scenarioResult.getIOLog().indexOf("stopped activity teststartstopdiag");
assertThat(startedAt).isGreaterThan(0);
@ -210,7 +210,7 @@ public class ScriptExampleTests {
// TODO: find out why this causes a long delay after stop is called.
@Test
public void testThreadChange() {
ScenarioResult scenarioResult = runScenario("threadchange");
ExecMetricsResult scenarioResult = runScenario("threadchange");
int changedTo1At = scenarioResult.getIOLog().indexOf("threads now 1");
int changedTo5At = scenarioResult.getIOLog().indexOf("threads now 5");
System.out.println("IOLOG:\n"+scenarioResult.getIOLog());
@ -220,13 +220,13 @@ public class ScriptExampleTests {
@Test
public void testReadMetric() {
ScenarioResult scenarioResult = runScenario("readmetrics");
ExecMetricsResult scenarioResult = runScenario("readmetrics");
assertThat(scenarioResult.getIOLog()).contains("count: ");
}
@Test
public void testShutdownHook() {
ScenarioResult scenarioResult = runScenario("extension_shutdown_hook");
ExecMetricsResult scenarioResult = runScenario("extension_shutdown_hook");
assertThat(scenarioResult.getIOLog()).doesNotContain("shutdown hook running").describedAs(
"shutdown hooks should not run in the same IO context as the main scenario"
);
@ -234,7 +234,7 @@ public class ScriptExampleTests {
@Test
public void testExceptionPropagationFromActivityInit() {
ScenarioResult scenarioResult = runScenario("activityiniterror");
ExecMetricsResult scenarioResult = runScenario("activityiniterror");
assertThat(scenarioResult.getException()).isPresent();
assertThat(scenarioResult.getException().get().getMessage()).contains("Unable to convert end cycle from invalid");
assertThat(scenarioResult.getException()).isNotNull();
@ -242,7 +242,7 @@ public class ScriptExampleTests {
@Test
public void testReportedCoDelayBursty() {
ScenarioResult scenarioResult = runScenario("cocycledelay_bursty");
ExecMetricsResult scenarioResult = runScenario("cocycledelay_bursty");
assertThat(scenarioResult.getIOLog()).contains("step1 metrics.waittime=");
assertThat(scenarioResult.getIOLog()).contains("step2 metrics.waittime=");
String iolog = scenarioResult.getIOLog();
@ -252,7 +252,7 @@ public class ScriptExampleTests {
@Test
public void testReportedCoDelayStrict() {
ScenarioResult scenarioResult = runScenario("cocycledelay_strict");
ExecMetricsResult scenarioResult = runScenario("cocycledelay_strict");
assertThat(scenarioResult.getIOLog()).contains("step1 cycles.waittime=");
assertThat(scenarioResult.getIOLog()).contains("step2 cycles.waittime=");
String iolog = scenarioResult.getIOLog();
@ -263,14 +263,14 @@ public class ScriptExampleTests {
@Test
public void testCycleRateChangeNewMetrics() {
ScenarioResult scenarioResult = runScenario("cycle_rate_change");
ExecMetricsResult scenarioResult = runScenario("cycle_rate_change");
String ioLog = scenarioResult.getIOLog();
assertThat(ioLog).contains("cycles adjusted, exiting on iteration");
}
@Test
public void testExitLogic() {
ScenarioResult scenarioResult = runScenario(
ExecMetricsResult scenarioResult = runScenario(
"basicdiag",
"type", "diag", "cyclerate", "5", "erroroncycle", "10", "cycles", "2000"
);

View File

@ -15,7 +15,7 @@
*/
package io.nosqlbench.nbr.examples;
import io.nosqlbench.engine.core.lifecycle.ScenarioResult;
import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@ -30,14 +30,14 @@ public class SpeedCheckIntegrationTests {
@Disabled
// Verified as working
public void testSpeedSanity() {
ScenarioResult scenarioResult = ScriptExampleTests.runScenario("speedcheck");
ExecMetricsResult scenarioResult = ScriptExampleTests.runScenario("speedcheck");
}
@Test
@Disabled
// This seems incomplete
public void testThreadSpeeds() {
ScenarioResult scenarioResult = ScriptExampleTests.runScenario("threadspeeds");
ExecMetricsResult scenarioResult = ScriptExampleTests.runScenario("threadspeeds");
}

View File

@ -86,18 +86,18 @@ class ExitStatusIntegrationTests {
assertThat(result.exitStatus).isEqualTo(2);
}
// This will not work reliablyl until the activity shutdown bug is fixed.
// @Test
// public void testCloseErrorHandlerOnSpace() {
// ProcessInvoker invoker = new ProcessInvoker();
// invoker.setLogDir("logs/test");
// ProcessResult result = invoker.run("exitstatus_erroronclose", 30,
// java, "-jar", JARNAME, "--logs-dir", "logs/test/error_on_close", "run",
// "driver=diag", "threads=2", "rate=5", "op=noop", "cycles=10", "erroronclose=true", "-vvv"
// );
// String stdout = String.join("\n", result.getStdoutData());
// String stderr = String.join("\n", result.getStderrData());
// assertThat(result.exception).isNotNull();
// assertThat(result.exception.getMessage()).contains("diag space was configured to throw");
// }
@Test
public void testCloseErrorHandlerOnSpace() {
ProcessInvoker invoker = new ProcessInvoker();
invoker.setLogDir("logs/test");
ProcessResult result = invoker.run("exitstatus_erroronclose", 30,
java, "-jar", JARNAME, "--logs-dir", "logs/test/error_on_close", "run",
"driver=diag", "threads=2", "rate=5", "op=noop", "cycles=10", "erroronclose=true", "-vvv"
);
String stdout = String.join("\n", result.getStdoutData());
String stderr = String.join("\n", result.getStderrData());
assertThat(result.exception).isNotNull();
assertThat(result.exception.getMessage()).contains("diag space was configured to throw");
}
}