diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java index c25373d9d..e629da5a9 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java @@ -38,7 +38,7 @@ public class KafkaAdapterMetrics implements NBNamedElement { @Override public String getName() { - return "S4JAdapterMetrics"; + return "KafkaAdapterMetrics"; } public void initS4JAdapterInstrumentation() { diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/BasicScriptBuffer.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/BasicScriptBuffer.java index f2eb5f840..73aac148f 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/BasicScriptBuffer.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/BasicScriptBuffer.java @@ -72,6 +72,7 @@ public class BasicScriptBuffer implements ScriptBuffer { case run: // run activity case await: // await activity case stop: // stop activity + case forceStop: // force stopping activity case waitMillis: sb.append("scenario.").append(cmd).append("\n"); diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/Cmd.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/Cmd.java index 5c6bd936a..6e96c3334 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/Cmd.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/Cmd.java @@ -36,6 +36,7 @@ public class Cmd { run(), start(), stop(Arg.of("alias_name")), + forceStop(Arg.of("alias_name")), script(Arg.of("script_path", s -> s)), await(Arg.of("alias_name")), waitMillis(Arg.of("millis_to_wait", Long::parseLong)), diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLICommandParser.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLICommandParser.java index a19fe7b8d..81ca6e981 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLICommandParser.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLICommandParser.java @@ -35,6 +35,7 @@ public class NBCLICommandParser { private static final String RUN = "run"; private static final String AWAIT = "await"; private static final String STOP = "stop"; + private static final String FORCE_STOP = "forceStop"; private static final String ACTIVITY = "activity"; private static final String SCENARIO = "scenario"; private static final String WAIT_MILLIS = "waitmillis"; @@ -42,7 +43,7 @@ public class NBCLICommandParser { public static final Set RESERVED_WORDS = new HashSet<>() {{ addAll( Arrays.asList( - FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, ACTIVITY, SCENARIO, WAIT_MILLIS + FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, FORCE_STOP, ACTIVITY, SCENARIO, WAIT_MILLIS ) ); }}; @@ -63,6 +64,7 @@ public class NBCLICommandParser { case RUN: case AWAIT: case STOP: + case FORCE_STOP: case WAIT_MILLIS: cmd = Cmd.parseArg(arglist, canonicalizer); cmdList.add(cmd); diff --git a/engine-cli/src/main/resources/cli-scripting.md b/engine-cli/src/main/resources/cli-scripting.md index 7ef37f7fe..95eb001a9 100644 --- a/engine-cli/src/main/resources/cli-scripting.md +++ b/engine-cli/src/main/resources/cli-scripting.md @@ -88,11 +88,16 @@ To start an activity and then wait for it to complete before continuing: run = ... ~~~ -To stop an activity by its alias: +To stop an activity by its alias while first waiting for a required thread (motor/slot) entering a specific SlotState: ~~~ stop ~~~ +To stop an activity by its alias, without first waiting for a required thread (motor/slot) entering a specific SlotState: +~~~ +forceStop +~~~ + To wait for a particular activity that has been started to complete before continuing: ~~~ await diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java index 82f6735e7..0286bccf5 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java @@ -85,9 +85,37 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen */ public void stopActivity() { logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias()); + activity.setRunState(RunState.Stopping); motors.forEach(Motor::requestStop); + tally.awaitNoneOther(RunState.Stopped, RunState.Finished); + + shutdownExecutorService(Integer.MAX_VALUE); tally.awaitNoneOther(RunState.Stopped,RunState.Finished); + activity.setRunState(RunState.Stopped); + + logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); + + Annotators.recordAnnotation(Annotation.newBuilder() + .session(sessionId) + .interval(this.startedAt, this.stoppedAt) + .layer(Layer.Activity) + .label("alias", getActivityDef().getAlias()) + .label("driver", getActivityDef().getActivityType()) + .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) + .detail("params", getActivityDef().toString()) + .build() + ); + } + + /** + * Force stop the motors without trying to wait for the activity to reach stopped/finished state + */ + public void forceStopActivity() { + logger.info(() -> "force stopping activity in progress: " + this.getActivityDef().getAlias()); + + activity.setRunState(RunState.Stopping); + motors.forEach(Motor::requestStop); shutdownExecutorService(Integer.MAX_VALUE); tally.awaitNoneOther(RunState.Stopped,RunState.Finished); diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java index c38a7f5f6..f16b449ec 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java @@ -84,9 +84,9 @@ public class ActivityRuntimeInfo implements ProgressCapable { return this.activity.getRunState(); } - public void stopActivity() { - this.executor.stopActivity(); - } + public void stopActivity() { this.executor.stopActivity(); } + + public void forceStopActivity() { this.executor.forceStopActivity(); } public ActivityExecutor getActivityExecutor() { return executor; diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/ScenarioController.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/ScenarioController.java index 5595d2ea2..f4102e59f 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/ScenarioController.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/ScenarioController.java @@ -232,6 +232,67 @@ public class ScenarioController { } } + /** + *

Force stopping an activity, given an activity def. The only part of the activity def that is important is the + * alias parameter. This method retains the activity def signature to provide convenience for scripting.

+ *

For example, sc.forceStop("alias=foo")

+ * + * @param activityDef An activity def, including at least the alias parameter. + */ + public synchronized void forceStop(ActivityDef activityDef) { + Annotators.recordAnnotation(Annotation.newBuilder() + .session(this.scenario.getScenarioName()) + .now() + .layer(Layer.Activity) + .label("alias", activityDef.getAlias()) + .detail("command", "forceStop") + .detail("params", activityDef.toString()) + .build()); + + ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias()); + if (runtimeInfo == null) { + throw new RuntimeException("could not force stop missing activity:" + activityDef); + } + + scenariologger.debug("FORCE STOP " + activityDef.getAlias()); + + runtimeInfo.forceStopActivity(); + } + + /** + *

Stop an activity, given an activity def map. The only part of the map that is important is the + * alias parameter. This method retains the map signature to provide convenience for scripting.

+ * + * @param activityDefMap A map, containing at least the alias parameter + */ + public synchronized void forceStop(Map activityDefMap) { + ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap)); + forceStop(ad); + } + + /** + * Stop an activity, given the name by which it is known already in the scenario. This causes the + * activity to stop all threads, but keeps the thread objects handy for starting again. This can be useful + * for certain testing scenarios in which you want to stop some workloads and start others based on other conditions. + * + * Alternately, you can provide one or more aliases in the same command, and all matching names will be stopped. + * + * @param spec The name of the activity that is already known to the scenario + */ + public synchronized void forceStop(String spec) { + logger.debug("request->STOP '" + spec + "'"); + List aliases = Arrays.asList(spec.split("[,; ]")); + List matched = aliases.stream() + .map(String::trim) + .filter(s -> !s.isEmpty()) + .flatMap(aspec -> getMatchingAliases(aspec).stream()).collect(Collectors.toList()); + for (String alias : matched) { + ActivityDef adef = aliasToDef(alias); + scenariologger.debug("STOP " + adef.getAlias()); + forceStop(adef); + } + } + private List getMatchingAliases(String pattern) { Pattern matcher; diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/script/bindings/PolyglotScenarioController.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/script/bindings/PolyglotScenarioController.java index 8d2504724..0e44cfc25 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/script/bindings/PolyglotScenarioController.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/script/bindings/PolyglotScenarioController.java @@ -148,6 +148,31 @@ public class PolyglotScenarioController { } + public synchronized void forceStop(Object o) { + if (o instanceof Value) { + forceStop((Value) o); + } else if (o instanceof Map) { + controller.forceStop((Map) o); + } else if (o instanceof String) { + controller.forceStop(o.toString()); + } else { + throw new RuntimeException("unknown type " + o.getClass().getCanonicalName()); + } + } + + private synchronized void forceStopValue(Value spec) { + if (spec.isHostObject()) { + controller.forceStop((ActivityDef) spec.asHostObject()); + } else if (spec.isString()) { + controller.forceStop(spec.asString()); + } else if (spec.hasMembers()) { + controller.forceStop(spec.as(Map.class)); + } else { + throw new RuntimeException("unknown base type for graal polyglot: " + spec.toString()); + } + } + + public synchronized void awaitActivity(Object o) { this.await(o); }