Merge pull request #907 from yabinmeng/main

Add 'forceStop' NB CLI option in NB5
This commit is contained in:
Jonathan Shook
2023-01-11 18:02:22 -06:00
committed by GitHub
9 changed files with 129 additions and 6 deletions

View File

@@ -38,7 +38,7 @@ public class KafkaAdapterMetrics implements NBNamedElement {
@Override
public String getName() {
return "S4JAdapterMetrics";
return "KafkaAdapterMetrics";
}
public void initS4JAdapterInstrumentation() {

View File

@@ -72,6 +72,7 @@ public class BasicScriptBuffer implements ScriptBuffer {
case run: // run activity
case await: // await activity
case stop: // stop activity
case forceStop: // force stopping activity
case waitMillis:
sb.append("scenario.").append(cmd).append("\n");

View File

@@ -36,6 +36,7 @@ public class Cmd {
run(),
start(),
stop(Arg.of("alias_name")),
forceStop(Arg.of("alias_name")),
script(Arg.of("script_path", s -> s)),
await(Arg.of("alias_name")),
waitMillis(Arg.of("millis_to_wait", Long::parseLong)),

View File

@@ -35,6 +35,7 @@ public class NBCLICommandParser {
private static final String RUN = "run";
private static final String AWAIT = "await";
private static final String STOP = "stop";
private static final String FORCE_STOP = "forceStop";
private static final String ACTIVITY = "activity";
private static final String SCENARIO = "scenario";
private static final String WAIT_MILLIS = "waitmillis";
@@ -42,7 +43,7 @@ public class NBCLICommandParser {
public static final Set<String> RESERVED_WORDS = new HashSet<>() {{
addAll(
Arrays.asList(
FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, ACTIVITY, SCENARIO, WAIT_MILLIS
FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, FORCE_STOP, ACTIVITY, SCENARIO, WAIT_MILLIS
)
);
}};
@@ -63,6 +64,7 @@ public class NBCLICommandParser {
case RUN:
case AWAIT:
case STOP:
case FORCE_STOP:
case WAIT_MILLIS:
cmd = Cmd.parseArg(arglist, canonicalizer);
cmdList.add(cmd);

View File

@@ -88,11 +88,16 @@ To start an activity and then wait for it to complete before continuing:
run <pram>=<value> ...
~~~
To stop an activity by its alias:
To stop an activity by its alias while first waiting for a required thread (motor/slot) entering a specific SlotState:
~~~
stop <activity alias>
~~~
To stop an activity by its alias, without first waiting for a required thread (motor/slot) entering a specific SlotState:
~~~
forceStop <activity alias>
~~~
To wait for a particular activity that has been started to complete before continuing:
~~~
await <activity alias>

View File

@@ -85,9 +85,37 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
*/
public void stopActivity() {
logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias());
activity.setRunState(RunState.Stopping);
motors.forEach(Motor::requestStop);
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
activity.setRunState(RunState.Stopped);
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
}
/**
* Force stop the motors without trying to wait for the activity to reach stopped/finished state
*/
public void forceStopActivity() {
logger.info(() -> "force stopping activity in progress: " + this.getActivityDef().getAlias());
activity.setRunState(RunState.Stopping);
motors.forEach(Motor::requestStop);
shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);

View File

@@ -84,9 +84,9 @@ public class ActivityRuntimeInfo implements ProgressCapable {
return this.activity.getRunState();
}
public void stopActivity() {
this.executor.stopActivity();
}
public void stopActivity() { this.executor.stopActivity(); }
public void forceStopActivity() { this.executor.forceStopActivity(); }
public ActivityExecutor getActivityExecutor() {
return executor;

View File

@@ -232,6 +232,67 @@ public class ScenarioController {
}
}
/**
* <p>Force stopping an activity, given an activity def. The only part of the activity def that is important is the
* alias parameter. This method retains the activity def signature to provide convenience for scripting.</p>
* <p>For example, sc.forceStop("alias=foo")</p>
*
* @param activityDef An activity def, including at least the alias parameter.
*/
public synchronized void forceStop(ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(this.scenario.getScenarioName())
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "forceStop")
.detail("params", activityDef.toString())
.build());
ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias());
if (runtimeInfo == null) {
throw new RuntimeException("could not force stop missing activity:" + activityDef);
}
scenariologger.debug("FORCE STOP " + activityDef.getAlias());
runtimeInfo.forceStopActivity();
}
/**
* <p>Stop an activity, given an activity def map. The only part of the map that is important is the
* alias parameter. This method retains the map signature to provide convenience for scripting.</p>
*
* @param activityDefMap A map, containing at least the alias parameter
*/
public synchronized void forceStop(Map<String, String> activityDefMap) {
ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap));
forceStop(ad);
}
/**
* Stop an activity, given the name by which it is known already in the scenario. This causes the
* activity to stop all threads, but keeps the thread objects handy for starting again. This can be useful
* for certain testing scenarios in which you want to stop some workloads and start others based on other conditions.
*
* Alternately, you can provide one or more aliases in the same command, and all matching names will be stopped.
*
* @param spec The name of the activity that is already known to the scenario
*/
public synchronized void forceStop(String spec) {
logger.debug("request->STOP '" + spec + "'");
List<String> aliases = Arrays.asList(spec.split("[,; ]"));
List<String> matched = aliases.stream()
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(aspec -> getMatchingAliases(aspec).stream()).collect(Collectors.toList());
for (String alias : matched) {
ActivityDef adef = aliasToDef(alias);
scenariologger.debug("STOP " + adef.getAlias());
forceStop(adef);
}
}
private List<String> getMatchingAliases(String pattern) {
Pattern matcher;

View File

@@ -148,6 +148,31 @@ public class PolyglotScenarioController {
}
public synchronized void forceStop(Object o) {
if (o instanceof Value) {
forceStop((Value) o);
} else if (o instanceof Map) {
controller.forceStop((Map<String, String>) o);
} else if (o instanceof String) {
controller.forceStop(o.toString());
} else {
throw new RuntimeException("unknown type " + o.getClass().getCanonicalName());
}
}
private synchronized void forceStopValue(Value spec) {
if (spec.isHostObject()) {
controller.forceStop((ActivityDef) spec.asHostObject());
} else if (spec.isString()) {
controller.forceStop(spec.asString());
} else if (spec.hasMembers()) {
controller.forceStop(spec.as(Map.class));
} else {
throw new RuntimeException("unknown base type for graal polyglot: " + spec.toString());
}
}
public synchronized void awaitActivity(Object o) {
this.await(o);
}