mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Add 'forceStop' NB CLI option in NB5 (port over from similar functionally in NB4 - PR# 675)
This commit is contained in:
@@ -38,7 +38,7 @@ public class KafkaAdapterMetrics implements NBNamedElement {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "S4JAdapterMetrics";
|
||||
return "KafkaAdapterMetrics";
|
||||
}
|
||||
|
||||
public void initS4JAdapterInstrumentation() {
|
||||
|
||||
@@ -72,6 +72,7 @@ public class BasicScriptBuffer implements ScriptBuffer {
|
||||
case run: // run activity
|
||||
case await: // await activity
|
||||
case stop: // stop activity
|
||||
case forceStop: // force stopping activity
|
||||
case waitMillis:
|
||||
|
||||
sb.append("scenario.").append(cmd).append("\n");
|
||||
|
||||
@@ -36,6 +36,7 @@ public class Cmd {
|
||||
run(),
|
||||
start(),
|
||||
stop(Arg.of("alias_name")),
|
||||
forceStop(Arg.of("alias_name")),
|
||||
script(Arg.of("script_path", s -> s)),
|
||||
await(Arg.of("alias_name")),
|
||||
waitMillis(Arg.of("millis_to_wait", Long::parseLong)),
|
||||
|
||||
@@ -35,6 +35,7 @@ public class NBCLICommandParser {
|
||||
private static final String RUN = "run";
|
||||
private static final String AWAIT = "await";
|
||||
private static final String STOP = "stop";
|
||||
private static final String FORCE_STOP = "forceStop";
|
||||
private static final String ACTIVITY = "activity";
|
||||
private static final String SCENARIO = "scenario";
|
||||
private static final String WAIT_MILLIS = "waitmillis";
|
||||
@@ -42,7 +43,7 @@ public class NBCLICommandParser {
|
||||
public static final Set<String> RESERVED_WORDS = new HashSet<>() {{
|
||||
addAll(
|
||||
Arrays.asList(
|
||||
FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, ACTIVITY, SCENARIO, WAIT_MILLIS
|
||||
FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, FORCE_STOP, ACTIVITY, SCENARIO, WAIT_MILLIS
|
||||
)
|
||||
);
|
||||
}};
|
||||
@@ -63,6 +64,7 @@ public class NBCLICommandParser {
|
||||
case RUN:
|
||||
case AWAIT:
|
||||
case STOP:
|
||||
case FORCE_STOP:
|
||||
case WAIT_MILLIS:
|
||||
cmd = Cmd.parseArg(arglist, canonicalizer);
|
||||
cmdList.add(cmd);
|
||||
|
||||
@@ -88,11 +88,16 @@ To start an activity and then wait for it to complete before continuing:
|
||||
run <pram>=<value> ...
|
||||
~~~
|
||||
|
||||
To stop an activity by its alias:
|
||||
To stop an activity by its alias while first waiting for a required thread (motor/slot) entering a specific SlotState:
|
||||
~~~
|
||||
stop <activity alias>
|
||||
~~~
|
||||
|
||||
To stop an activity by its alias, without first waiting for a required thread (motor/slot) entering a specific SlotState:
|
||||
~~~
|
||||
forceStop <activity alias>
|
||||
~~~
|
||||
|
||||
To wait for a particular activity that has been started to complete before continuing:
|
||||
~~~
|
||||
await <activity alias>
|
||||
|
||||
@@ -82,12 +82,18 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
|
||||
/**
|
||||
* Simply stop the motors
|
||||
*
|
||||
* @param forcing whether to force (without trying to wait) the activity to reach stopped/finished state
|
||||
*/
|
||||
public void stopActivity() {
|
||||
logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias());
|
||||
public void stopActivity(boolean forcing) {
|
||||
logger.info(() ->
|
||||
(forcing ? "forcing " : "") + "stopping activity in progress: " + this.getActivityDef().getAlias());
|
||||
|
||||
activity.setRunState(RunState.Stopping);
|
||||
motors.forEach(Motor::requestStop);
|
||||
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
|
||||
if (!forcing) {
|
||||
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
|
||||
}
|
||||
|
||||
shutdownExecutorService(Integer.MAX_VALUE);
|
||||
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
|
||||
|
||||
@@ -84,8 +84,9 @@ public class ActivityRuntimeInfo implements ProgressCapable {
|
||||
return this.activity.getRunState();
|
||||
}
|
||||
|
||||
public void stopActivity() {
|
||||
this.executor.stopActivity();
|
||||
public void stopActivity() { this.executor.stopActivity(false); }
|
||||
public void forceStopActivity() {
|
||||
this.executor.stopActivity(true);
|
||||
}
|
||||
|
||||
public ActivityExecutor getActivityExecutor() {
|
||||
|
||||
@@ -232,6 +232,67 @@ public class ScenarioController {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Force stopping an activity, given an activity def. The only part of the activity def that is important is the
|
||||
* alias parameter. This method retains the activity def signature to provide convenience for scripting.</p>
|
||||
* <p>For example, sc.forceStop("alias=foo")</p>
|
||||
*
|
||||
* @param activityDef An activity def, including at least the alias parameter.
|
||||
*/
|
||||
public synchronized void forceStop(ActivityDef activityDef) {
|
||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||
.session(this.scenario.getScenarioName())
|
||||
.now()
|
||||
.layer(Layer.Activity)
|
||||
.label("alias", activityDef.getAlias())
|
||||
.detail("command", "forceStop")
|
||||
.detail("params", activityDef.toString())
|
||||
.build());
|
||||
|
||||
ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias());
|
||||
if (runtimeInfo == null) {
|
||||
throw new RuntimeException("could not force stop missing activity:" + activityDef);
|
||||
}
|
||||
|
||||
scenariologger.debug("FORCE STOP " + activityDef.getAlias());
|
||||
|
||||
runtimeInfo.forceStopActivity();
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Stop an activity, given an activity def map. The only part of the map that is important is the
|
||||
* alias parameter. This method retains the map signature to provide convenience for scripting.</p>
|
||||
*
|
||||
* @param activityDefMap A map, containing at least the alias parameter
|
||||
*/
|
||||
public synchronized void forceStop(Map<String, String> activityDefMap) {
|
||||
ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap));
|
||||
forceStop(ad);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop an activity, given the name by which it is known already in the scenario. This causes the
|
||||
* activity to stop all threads, but keeps the thread objects handy for starting again. This can be useful
|
||||
* for certain testing scenarios in which you want to stop some workloads and start others based on other conditions.
|
||||
*
|
||||
* Alternately, you can provide one or more aliases in the same command, and all matching names will be stopped.
|
||||
*
|
||||
* @param spec The name of the activity that is already known to the scenario
|
||||
*/
|
||||
public synchronized void forceStop(String spec) {
|
||||
logger.debug("request->STOP '" + spec + "'");
|
||||
List<String> aliases = Arrays.asList(spec.split("[,; ]"));
|
||||
List<String> matched = aliases.stream()
|
||||
.map(String::trim)
|
||||
.filter(s -> !s.isEmpty())
|
||||
.flatMap(aspec -> getMatchingAliases(aspec).stream()).collect(Collectors.toList());
|
||||
for (String alias : matched) {
|
||||
ActivityDef adef = aliasToDef(alias);
|
||||
scenariologger.debug("STOP " + adef.getAlias());
|
||||
forceStop(adef);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private List<String> getMatchingAliases(String pattern) {
|
||||
Pattern matcher;
|
||||
|
||||
@@ -148,6 +148,31 @@ public class PolyglotScenarioController {
|
||||
}
|
||||
|
||||
|
||||
public synchronized void forceStop(Object o) {
|
||||
if (o instanceof Value) {
|
||||
forceStop((Value) o);
|
||||
} else if (o instanceof Map) {
|
||||
controller.forceStop((Map<String, String>) o);
|
||||
} else if (o instanceof String) {
|
||||
controller.forceStop(o.toString());
|
||||
} else {
|
||||
throw new RuntimeException("unknown type " + o.getClass().getCanonicalName());
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void forceStopValue(Value spec) {
|
||||
if (spec.isHostObject()) {
|
||||
controller.forceStop((ActivityDef) spec.asHostObject());
|
||||
} else if (spec.isString()) {
|
||||
controller.forceStop(spec.asString());
|
||||
} else if (spec.hasMembers()) {
|
||||
controller.forceStop(spec.as(Map.class));
|
||||
} else {
|
||||
throw new RuntimeException("unknown base type for graal polyglot: " + spec.toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public synchronized void awaitActivity(Object o) {
|
||||
this.await(o);
|
||||
}
|
||||
|
||||
@@ -65,7 +65,7 @@ public class ActivityExecutorTest {
|
||||
try {
|
||||
ad.setThreads(1);
|
||||
ae.startActivity();
|
||||
ae.stopActivity();
|
||||
ae.stopActivity(false);
|
||||
ae.startActivity();
|
||||
ae.startActivity();
|
||||
ExecutionResult executionResult = future.get();
|
||||
|
||||
Reference in New Issue
Block a user