Add 'forceStop' NB CLI option in NB5 (port over from similar functionally in NB4 - PR# 675)

This commit is contained in:
yabinmeng
2023-01-11 16:49:23 -06:00
parent 949a458f3a
commit c172a140eb
10 changed files with 111 additions and 9 deletions

View File

@@ -38,7 +38,7 @@ public class KafkaAdapterMetrics implements NBNamedElement {
@Override
public String getName() {
return "S4JAdapterMetrics";
return "KafkaAdapterMetrics";
}
public void initS4JAdapterInstrumentation() {

View File

@@ -72,6 +72,7 @@ public class BasicScriptBuffer implements ScriptBuffer {
case run: // run activity
case await: // await activity
case stop: // stop activity
case forceStop: // force stopping activity
case waitMillis:
sb.append("scenario.").append(cmd).append("\n");

View File

@@ -36,6 +36,7 @@ public class Cmd {
run(),
start(),
stop(Arg.of("alias_name")),
forceStop(Arg.of("alias_name")),
script(Arg.of("script_path", s -> s)),
await(Arg.of("alias_name")),
waitMillis(Arg.of("millis_to_wait", Long::parseLong)),

View File

@@ -35,6 +35,7 @@ public class NBCLICommandParser {
private static final String RUN = "run";
private static final String AWAIT = "await";
private static final String STOP = "stop";
private static final String FORCE_STOP = "forceStop";
private static final String ACTIVITY = "activity";
private static final String SCENARIO = "scenario";
private static final String WAIT_MILLIS = "waitmillis";
@@ -42,7 +43,7 @@ public class NBCLICommandParser {
public static final Set<String> RESERVED_WORDS = new HashSet<>() {{
addAll(
Arrays.asList(
FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, ACTIVITY, SCENARIO, WAIT_MILLIS
FRAGMENT, SCRIPT, START, RUN, AWAIT, STOP, FORCE_STOP, ACTIVITY, SCENARIO, WAIT_MILLIS
)
);
}};
@@ -63,6 +64,7 @@ public class NBCLICommandParser {
case RUN:
case AWAIT:
case STOP:
case FORCE_STOP:
case WAIT_MILLIS:
cmd = Cmd.parseArg(arglist, canonicalizer);
cmdList.add(cmd);

View File

@@ -88,11 +88,16 @@ To start an activity and then wait for it to complete before continuing:
run <pram>=<value> ...
~~~
To stop an activity by its alias:
To stop an activity by its alias while first waiting for a required thread (motor/slot) entering a specific SlotState:
~~~
stop <activity alias>
~~~
To stop an activity by its alias, without first waiting for a required thread (motor/slot) entering a specific SlotState:
~~~
forceStop <activity alias>
~~~
To wait for a particular activity that has been started to complete before continuing:
~~~
await <activity alias>

View File

@@ -82,12 +82,18 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
/**
* Simply stop the motors
*
* @param forcing whether to force (without trying to wait) the activity to reach stopped/finished state
*/
public void stopActivity() {
logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias());
public void stopActivity(boolean forcing) {
logger.info(() ->
(forcing ? "forcing " : "") + "stopping activity in progress: " + this.getActivityDef().getAlias());
activity.setRunState(RunState.Stopping);
motors.forEach(Motor::requestStop);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
if (!forcing) {
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
}
shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);

View File

@@ -84,8 +84,9 @@ public class ActivityRuntimeInfo implements ProgressCapable {
return this.activity.getRunState();
}
public void stopActivity() {
this.executor.stopActivity();
public void stopActivity() { this.executor.stopActivity(false); }
public void forceStopActivity() {
this.executor.stopActivity(true);
}
public ActivityExecutor getActivityExecutor() {

View File

@@ -232,6 +232,67 @@ public class ScenarioController {
}
}
/**
* <p>Force stopping an activity, given an activity def. The only part of the activity def that is important is the
* alias parameter. This method retains the activity def signature to provide convenience for scripting.</p>
* <p>For example, sc.forceStop("alias=foo")</p>
*
* @param activityDef An activity def, including at least the alias parameter.
*/
public synchronized void forceStop(ActivityDef activityDef) {
Annotators.recordAnnotation(Annotation.newBuilder()
.session(this.scenario.getScenarioName())
.now()
.layer(Layer.Activity)
.label("alias", activityDef.getAlias())
.detail("command", "forceStop")
.detail("params", activityDef.toString())
.build());
ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias());
if (runtimeInfo == null) {
throw new RuntimeException("could not force stop missing activity:" + activityDef);
}
scenariologger.debug("FORCE STOP " + activityDef.getAlias());
runtimeInfo.forceStopActivity();
}
/**
* <p>Stop an activity, given an activity def map. The only part of the map that is important is the
* alias parameter. This method retains the map signature to provide convenience for scripting.</p>
*
* @param activityDefMap A map, containing at least the alias parameter
*/
public synchronized void forceStop(Map<String, String> activityDefMap) {
ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap));
forceStop(ad);
}
/**
* Stop an activity, given the name by which it is known already in the scenario. This causes the
* activity to stop all threads, but keeps the thread objects handy for starting again. This can be useful
* for certain testing scenarios in which you want to stop some workloads and start others based on other conditions.
*
* Alternately, you can provide one or more aliases in the same command, and all matching names will be stopped.
*
* @param spec The name of the activity that is already known to the scenario
*/
public synchronized void forceStop(String spec) {
logger.debug("request->STOP '" + spec + "'");
List<String> aliases = Arrays.asList(spec.split("[,; ]"));
List<String> matched = aliases.stream()
.map(String::trim)
.filter(s -> !s.isEmpty())
.flatMap(aspec -> getMatchingAliases(aspec).stream()).collect(Collectors.toList());
for (String alias : matched) {
ActivityDef adef = aliasToDef(alias);
scenariologger.debug("STOP " + adef.getAlias());
forceStop(adef);
}
}
private List<String> getMatchingAliases(String pattern) {
Pattern matcher;

View File

@@ -148,6 +148,31 @@ public class PolyglotScenarioController {
}
public synchronized void forceStop(Object o) {
if (o instanceof Value) {
forceStop((Value) o);
} else if (o instanceof Map) {
controller.forceStop((Map<String, String>) o);
} else if (o instanceof String) {
controller.forceStop(o.toString());
} else {
throw new RuntimeException("unknown type " + o.getClass().getCanonicalName());
}
}
private synchronized void forceStopValue(Value spec) {
if (spec.isHostObject()) {
controller.forceStop((ActivityDef) spec.asHostObject());
} else if (spec.isString()) {
controller.forceStop(spec.asString());
} else if (spec.hasMembers()) {
controller.forceStop(spec.as(Map.class));
} else {
throw new RuntimeException("unknown base type for graal polyglot: " + spec.toString());
}
}
public synchronized void awaitActivity(Object o) {
this.await(o);
}

View File

@@ -65,7 +65,7 @@ public class ActivityExecutorTest {
try {
ad.setThreads(1);
ae.startActivity();
ae.stopActivity();
ae.stopActivity(false);
ae.startActivity();
ae.startActivity();
ExecutionResult executionResult = future.get();