Merge branch 'nosqlbench-1001-nfe-init' into add-tags-script

This commit is contained in:
Jonathan Shook 2023-02-05 21:21:13 -06:00
commit d8d7bb61ab
15 changed files with 186 additions and 141 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -166,7 +166,7 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
.add(Param.optional("instrument", Boolean.class)) .add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file")) .add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver", String.class)) .add(Param.optional("driver", String.class))
.add(Param.defaultTo("dryrun",false)) .add(Param.defaultTo("dryrun","none").setRegex("(op|jsonnet|none)"))
.asReadOnly(); .asReadOnly();
} }

View File

@ -19,6 +19,6 @@ However, there are other ways to feed an activity. All inputs are
modular within the nosqlbench runtime. To see what inputs are modular within the nosqlbench runtime. To see what inputs are
available, you can simpy run: available, you can simpy run:
PROG --list-input-types ${PROG} --list-input-types
Any input listed this way should have its own documentation. Any input listed this way should have its own documentation.

View File

@ -14,7 +14,7 @@ All cycle logfiles have the *.cyclelog* suffix.
You can dump an rlefile to the screen to see the content in text form by You can dump an rlefile to the screen to see the content in text form by
running a command like this: running a command like this:
PROG --export-cycle-log <filename> [spans|cycles] ${PROG} --export-cycle-log <filename> [spans|cycles]
You do not need to specify the extension. If you do not specify either You do not need to specify the extension. If you do not specify either
optional format at the end, then *spans* is assumed. It will print output optional format at the end, then *spans* is assumed. It will print output
@ -40,7 +40,7 @@ If you need to modify and then re-use a cycle log, you can do this with
simple text tools. Once you have modified the file, you can import it back simple text tools. Once you have modified the file, you can import it back
to the native format with: to the native format with:
PROG --import-cycle-log <infile.txt> <outfile.cyclelog> ${PROG} --import-cycle-log <infile.txt> <outfile.cyclelog>
The importer recognizes both formats listed above. The importer recognizes both formats listed above.

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -35,27 +35,27 @@ public enum RunState {
* Initial state after creation of a motor. This is the initial state upon instantiation of a motor, before * Initial state after creation of a motor. This is the initial state upon instantiation of a motor, before
* it is called on to do any active logic besides what occurs in the constructor. * it is called on to do any active logic besides what occurs in the constructor.
*/ */
Uninitialized("i"), Uninitialized(""),
/** /**
* A thread has been invoked, but is initializing and preparing for its main control loop. * A thread has been invoked, but is initializing and preparing for its main control loop.
* This is signaled <EM>by the motor</EM> after {@link Runnable#run}, but before entering the main processing * This is signaled <EM>by the motor</EM> after {@link Runnable#run}, but before entering the main processing
* loop. * loop.
*/ */
Starting("s"), Starting(""),
/** /**
* A thread is iterating within the main control loop. * A thread is iterating within the main control loop.
* This is signaled <EM>by the motor</EM> once initialization in the main loop is complete and immediately * This is signaled <EM>by the motor</EM> once initialization in the main loop is complete and immediately
* before it enters it's main processing loop. * before it enters it's main processing loop.
*/ */
Running("R\u23F5"), Running("\u23F5"),
/** /**
* <P>The thread has been requested to stop. This can be set by a managing thread which is not the * <P>The thread has been requested to stop. This can be set by a managing thread which is not the
* motor thread, or by the motor thread. In either case, the motor thread is required to observe changes to this and initiate shutdown.</P> * motor thread, or by the motor thread. In either case, the motor thread is required to observe changes to this and initiate shutdown.</P>
*/ */
Stopping("s"), Stopping(""),
/** /**
* The thread has stopped. This should only be set by the motor. This state will only be visible * The thread has stopped. This should only be set by the motor. This state will only be visible
@ -64,7 +64,7 @@ public enum RunState {
* <P>NOTE: When a motor is stopped or finished, its state will remain visible in state tracking until * <P>NOTE: When a motor is stopped or finished, its state will remain visible in state tracking until
* {@link Motor#getState()}.{@link MotorState#removeState()} is called.</P> * {@link Motor#getState()}.{@link MotorState#removeState()} is called.</P>
*/ */
Stopped("e\u23F9"), Stopped("\u23F9"),
/** /**
* <P>A thread has exhausted its supply of values on the input (AKA cycles), thus has completed its work. * <P>A thread has exhausted its supply of values on the input (AKA cycles), thus has completed its work.
@ -73,12 +73,12 @@ public enum RunState {
* <P>NOTE: When a motor is stopped or finished, its state will remain visible in state tracking until * <P>NOTE: When a motor is stopped or finished, its state will remain visible in state tracking until
* {@link Motor#getState()}.{@link MotorState#removeState()} is called.</P> * {@link Motor#getState()}.{@link MotorState#removeState()} is called.</P>
*/ */
Finished("F"), Finished(""),
/** /**
* If a motor has seen an exception, it goes into errored state before propagating the error. * If a motor has seen an exception, it goes into errored state before propagating the error.
*/ */
Errored("E"); Errored("");
private final String runcode; private final String runcode;

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -22,4 +22,12 @@ public interface Stoppable {
* completes, the request will cause the component to stop cooperatively. * completes, the request will cause the component to stop cooperatively.
*/ */
void requestStop(); void requestStop();
static void stop(Object... candidates) {
for (Object candidate : candidates) {
if (candidate instanceof Stoppable stoppable) {
stoppable.requestStop();
}
}
}
} }

View File

@ -495,7 +495,8 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
logger.info(() -> "skipped mapping op '" + pop.getName() + "'"); logger.info(() -> "skipped mapping op '" + pop.getName() + "'");
continue; continue;
} }
boolean dryrun = pop.takeStaticConfigOr("dryrun", false); String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
boolean dryrun = dryrunSpec.equalsIgnoreCase("op");
DriverAdapter adapter = adapters.get(i); DriverAdapter adapter = adapters.get(i);
OpMapper opMapper = adapter.getOpMapper(); OpMapper opMapper = adapter.getOpMapper();

View File

@ -463,12 +463,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
public synchronized void requestStop() { public synchronized void requestStop() {
RunState currentState = motorState.get(); RunState currentState = motorState.get();
if (Objects.requireNonNull(currentState) == Running) { if (Objects.requireNonNull(currentState) == Running) {
if (input instanceof Stoppable) { Stoppable.stop(input, action);
((Stoppable) input).requestStop();
}
if (action instanceof Stoppable) {
((Stoppable) action).requestStop();
}
motorState.enterState(Stopping); motorState.enterState(Stopping);
} else { } else {
logger.warn(() -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState); logger.warn(() -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState);

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -20,6 +20,8 @@ import io.nosqlbench.engine.api.activityapi.core.RunState;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.Arrays;
/** /**
* A value type which encodes the atomic state of a RunState tally. * A value type which encodes the atomic state of a RunState tally.
*/ */
@ -39,21 +41,43 @@ public class RunStateImage {
} }
public boolean is(RunState runState) { public boolean is(RunState runState) {
return counts[runState.ordinal()]>0; return counts[runState.ordinal()] > 0;
} }
public boolean isOnly(RunState runState) { public boolean isOnly(RunState runState) {
for (int i = 0; i < counts.length; i++) { for (int i = 0; i < counts.length; i++) {
if (counts[i]>0 && i!=runState.ordinal()) { if (counts[i] > 0 && i != runState.ordinal()) {
return false; return false;
} }
} }
return true; return true;
} }
public boolean isNoneOther(RunState... runStates) {
int[] scan = Arrays.copyOf(counts, counts.length);
for (RunState runState : runStates) {
scan[runState.ordinal()]=0;
}
for (int i : scan) {
if (i>0) {
return false;
}
}
return true;
}
public RunState getMinState() {
for (int ord = 0; ord < counts.length - 1; ord++) {
if (counts[ord] > 0) {
return RunState.values()[ord];
}
}
throw new RuntimeException("There were zero states, so min state is undefined");
}
public RunState getMaxState() { public RunState getMaxState() {
for (int ord = counts.length-1; ord >= 0; ord--) { for (int ord = counts.length - 1; ord >= 0; ord--) {
if (counts[ord]>0) { if (counts[ord] > 0) {
return RunState.values()[ord]; return RunState.values()[ord];
} }
} }
@ -63,7 +87,7 @@ public class RunStateImage {
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (RunState runState : RunState.values()) { for (RunState runState : RunState.values()) {
sb.append(runState.getCode()).append(" ").append(counts[runState.ordinal()]).append(" "); sb.append(runState.getCode()).append(":").append(counts[runState.ordinal()]).append(" ");
} }
return sb.toString(); return sb.toString();
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -27,13 +27,17 @@ public class RunStateImageTest {
public void testMaxStateImage() { public void testMaxStateImage() {
int[] counts = new int[RunState.values().length]; int[] counts = new int[RunState.values().length];
counts[RunState.Running.ordinal()]=3; counts[RunState.Running.ordinal()]=3;
counts[RunState.Starting.ordinal()]=2;
RunStateImage image = new RunStateImage(counts, false); RunStateImage image = new RunStateImage(counts, false);
assertThat(image.is(RunState.Running)).isTrue(); assertThat(image.is(RunState.Running)).isTrue();
assertThat(image.is(RunState.Starting)).isTrue();
assertThat(image.isTimeout()).isFalse(); assertThat(image.isTimeout()).isFalse();
assertThat(image.is(RunState.Errored)).isFalse(); assertThat(image.is(RunState.Errored)).isFalse();
assertThat(image.isOnly(RunState.Running)).isTrue(); assertThat(image.isNoneOther(RunState.Starting, RunState.Running)).isTrue();
RunState maxState = image.getMaxState(); RunState maxState = image.getMaxState();
assertThat(maxState).isEqualTo(RunState.values()[2]); assertThat(maxState).isEqualTo(RunState.values()[RunState.Running.ordinal()]);
RunState minState = image.getMinState();
assertThat(minState).isEqualTo(RunState.values()[RunState.Starting.ordinal()]);
} }
} }

View File

@ -1,8 +1,8 @@
Running Activities and Scenarios via CLI Running Activities and Scenarios via CLI
======================================== ========================================
PROG always runs a scenario script. However, there are multiple ways to tell ${PROG} always runs a scenario script. However, there are multiple ways to tell
PROG what that script should be. ${PROG} what that script should be.
Any argument in name=value format serves as a parameter to the Any argument in name=value format serves as a parameter to the
script or activity that precedes it. script or activity that precedes it.
@ -10,18 +10,18 @@ script or activity that precedes it.
To create a scenario script that simply runs a single activity to completion, To create a scenario script that simply runs a single activity to completion,
use this format: use this format:
~~~ ~~~
PROG activity <param>=<value> [...] ${PROG} activity <param>=<value> [...]
~~~ ~~~
To create a scenario script that runs multiple activities concurrently, To create a scenario script that runs multiple activities concurrently,
simply add more activities to the list: simply add more activities to the list:
~~~ ~~~
PROG activity <param>=<value> [...] activity <param>=<value> [...] ${PROG} activity <param>=<value> [...] activity <param>=<value> [...]
~~~ ~~~
To execute a scenario script directly, simply use the format: To execute a scenario script directly, simply use the format:
~~~ ~~~
PROG script <scriptname> [param=value [...]] ${PROG} script <scriptname> [param=value [...]]
~~~ ~~~
Time & Size Units Time & Size Units
@ -55,19 +55,19 @@ so parameters may be dropped into scripts ad-hoc.
By using the option --session-name <name>, you can name the session logfile By using the option --session-name <name>, you can name the session logfile
that will be (over)written with execution details. that will be (over)written with execution details.
~~~ ~~~
PROG --session-name testsession42 ${PROG} --session-name testsession42
~~~ ~~~
## Metric Name ## Metric Name
If you need to see what metrics are available for a particular activity type, If you need to see what metrics are available for a particular activity type,
you can ask PROG to instantiate an activity of that type and discover the you can ask ${PROG} to instantiate an activity of that type and discover the
metrics, dumping out a list. The following form of the command shows you how metrics, dumping out a list. The following form of the command shows you how
to make a list that you can copy metric names from for scripting. If you provide to make a list that you can copy metric names from for scripting. If you provide
an example activity alias that matches one of your scripts, you can use it exactly an example activity alias that matches one of your scripts, you can use it exactly
as it appears. as it appears.
~~~ ~~~
PROG --list-metrics driver=diag alias=anexample ${PROG} --list-metrics driver=diag alias=anexample
~~~ ~~~
This will dump a list of metric names in the shortened format that is most suitable This will dump a list of metric names in the shortened format that is most suitable
for scenario script development. This format is required for the --list-metrics for scenario script development. This format is required for the --list-metrics

View File

@ -1,4 +1,4 @@
### Command-Line Options ### # Command-Line Options
Help ( You're looking at it. ) Help ( You're looking at it. )
@ -8,27 +8,31 @@ Short options, like '-v' represent simple options, like verbosity. Using multipl
level of the option, like '-vvv'. level of the option, like '-vvv'.
Long options, like '--help' are top-level options that may only be used once. These modify general Long options, like '--help' are top-level options that may only be used once. These modify general
behavior, or allow you to get more details on how to use PROG. behavior, or allow you to get more details on how to use ${PROG}.
All other options are either commands, or named arguments to commands. Any single word without All other options are either commands, or named arguments to commands. Any single word without
dashes is a command that will be converted into script form. Any option that includes an equals sign dashes is a command that will be converted into script form. Any option that includes an equals sign
is a named argument to the previous command. The following example is a commandline with a command * is a named argument to the previous command. The following example is a commandline with a command *
start*, and two named arguments to that command. start*, and two named arguments to that command.
PROG start driver=diag alias=example ${PROG} start driver=diag alias=example
### Discovery options ### ## Discovery options
These options help you learn more about running PROG, and about the plugins that are present in your These options help you learn more about running ${PROG}, and about the plugins that are present in your
particular version. particular version.
Show version, long form, with artifact coordinates.
--version
Get a list of additional help topics that have more detailed documentation: Get a list of additional help topics that have more detailed documentation:
PROG help topics ${PROG} help topics
Provide specific help for the named activity type: Provide specific help for the named activity type:
PROG help <activity type> ${PROG} help <activity type>
List the available drivers: List the available drivers:
@ -50,9 +54,9 @@ Provide the metrics that are available for scripting
--list-metrics <activity type> [ <activity name> ] --list-metrics <activity type> [ <activity name> ]
### Execution Options ### ## Execution Options
This is how you actually tell PROG what scenario to run. Each of these commands appends script logic This is how you actually tell ${PROG} what scenario to run. Each of these commands appends script logic
to the scenario that will be executed. These are considered as commands, can occur in any order and to the scenario that will be executed. These are considered as commands, can occur in any order and
quantity. The only rule is that arguments in the arg=value form will apply to the preceding script quantity. The only rule is that arguments in the arg=value form will apply to the preceding script
or activity. or activity.
@ -65,9 +69,7 @@ Add the named activity to the scenario, interpolating named parameters
activity [arg=value]... activity [arg=value]...
### General options ### ## Logging options
These options modify how the scenario is run.
Specify a directory for scenario log files: Specify a directory for scenario log files:
@ -111,12 +113,38 @@ Specify the logging pattern for logfile only:
# ANSI variants are auto promoted for console if --ansi=enable # ANSI variants are auto promoted for console if --ansi=enable
# ANSI variants are auto demoted for logfile in any case # ANSI variants are auto demoted for logfile in any case
## Console Options
Increase console logging levels: (Default console logging level is *warning*)
-v (info)
-vv (debug)
-vvv (trace)
--progress console:1m (disables itself if -v options are used)
These levels affect *only* the console output level. Other logging level parameters affect logging
to the scenario log, stored by default in logs/...
Explicitly enable or disable ANSI logging support: Explicitly enable or disable ANSI logging support:
(ANSI support is enabled if the TERM environment variable is defined) (ANSI support is enabled if the TERM environment variable is defined)
--ansi=enabled --ansi=enabled
--ansi=disabled --ansi=disabled
Adjust the progress reporting interval:
--progress console:1m
or
--progress logonly:5m
NOTE: The progress indicator on console is provided by default unless logging levels are turned up
or there is a script invocation on the command line.
## Metrics options
Specify a directory and enable CSV reporting of metrics: Specify a directory and enable CSV reporting of metrics:
--report-csv-to <dirname> --report-csv-to <dirname>
@ -158,17 +186,6 @@ Each activity can also override this value with the hdr_digits parameter. Be awa
increase in this number multiples the amount of detail tracked on the client by 10x, so use increase in this number multiples the amount of detail tracked on the client by 10x, so use
caution. caution.
Adjust the progress reporting interval:
--progress console:1m
or
--progress logonly:5m
NOTE: The progress indicator on console is provided by default unless logging levels are turned up
or there is a script invocation on the command line.
If you want to add in classic time decaying histogram metrics for your histograms and timers, you If you want to add in classic time decaying histogram metrics for your histograms and timers, you
may do so with this option: may do so with this option:
@ -191,22 +208,6 @@ automatically. It also imports a base dashboard for nosqlbench and configures gr
export to share with a central DataStax grafana instance (grafana can be found on localhost:3000 export to share with a central DataStax grafana instance (grafana can be found on localhost:3000
with the default credentials admin/admin). with the default credentials admin/admin).
### Console Options ###
Increase console logging levels: (Default console logging level is *warning*)
-v (info)
-vv (debug)
-vvv (trace)
--progress console:1m (disables itself if -v options are used)
These levels affect *only* the console output level. Other logging level parameters affect logging
to the scenario log, stored by default in logs/...
Show version, long form, with artifact coordinates.
--version
### Summary Reporting ### Summary Reporting

View File

@ -79,10 +79,12 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
// TODO: Doc how uninitialized activities do not propagate parameter map changes and how // TODO: Doc how uninitialized activities do not propagate parameter map changes and how
// TODO: this is different from preventing modification to uninitialized activities // TODO: this is different from preventing modification to uninitialized activities
// TODO: Determine whether this should really be synchronized
/** /**
* Simply stop the motors * Simply stop the motors
*/ */
public void stopActivity() { public void stopActivity() {
logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias()); logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias());
activity.setRunState(RunState.Stopping); activity.setRunState(RunState.Stopping);
@ -96,14 +98,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder() Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId) .session(sessionId)
.interval(this.startedAt, this.stoppedAt) .interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity) .layer(Layer.Activity)
.label("alias", getActivityDef().getAlias()) .label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType()) .label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString()) .detail("params", getActivityDef().toString())
.build() .build()
); );
} }
@ -123,14 +125,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder() Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId) .session(sessionId)
.interval(this.startedAt, this.stoppedAt) .interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity) .layer(Layer.Activity)
.label("alias", getActivityDef().getAlias()) .label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType()) .label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString()) .detail("params", getActivityDef().toString())
.build() .build()
); );
} }
@ -183,8 +185,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
/** /**
* Shutdown the activity executor, with a grace period for the motor threads. * Shutdown the activity executor, with a grace period for the motor threads.
* *
* @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing * @param initialMillisToWait
* everything to stop * milliseconds to wait after graceful shutdownActivity request, before forcing
* everything to stop
*/ */
public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) { public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) {
Exception exception = forceStopActivity(initialMillisToWait); Exception exception = forceStopActivity(initialMillisToWait);
@ -210,10 +213,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
adjustMotorCountToThreadParam(activity.getActivityDef()); adjustMotorCountToThreadParam(activity.getActivityDef());
} }
motors.stream() motors.stream()
.filter(m -> (m instanceof ActivityDefObserver)) .filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized) // .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting) // .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef)); .forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
} }
} }
@ -227,14 +230,15 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private String getSlotStatus() { private String getSlotStatus() {
return motors.stream() return motors.stream()
.map(m -> m.getState().get().getCode()) .map(m -> m.getState().get().getCode())
.collect(Collectors.joining(",", "[", "]")); .collect(Collectors.joining(",", "[", "]"));
} }
/** /**
* Stop extra motors, start missing motors * Stop extra motors, start missing motors
* *
* @param activityDef the activityDef for this activity instance * @param activityDef
* the activityDef for this activity instance
*/ */
private void adjustMotorCountToThreadParam(ActivityDef activityDef) { private void adjustMotorCountToThreadParam(ActivityDef activityDef) {
logger.trace(() -> ">-pre-adjust->" + getSlotStatus()); logger.trace(() -> ">-pre-adjust->" + getSlotStatus());
@ -276,7 +280,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
} }
} }
private void alignMotorStateToIntendedActivityState() { private synchronized void alignMotorStateToIntendedActivityState() {
RunState intended = activity.getRunState(); RunState intended = activity.getRunState();
logger.trace(() -> "ADJUSTING to INTENDED " + intended); logger.trace(() -> "ADJUSTING to INTENDED " + intended);
switch (intended) { switch (intended) {
@ -285,17 +289,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
case Running: case Running:
case Starting: case Starting:
motors.stream() motors.stream()
.filter(m -> m.getState().get() != RunState.Running) .filter(m -> m.getState().get() != RunState.Running)
.filter(m -> m.getState().get() != RunState.Finished) .filter(m -> m.getState().get() != RunState.Finished)
.filter(m -> m.getState().get() != RunState.Starting) .filter(m -> m.getState().get() != RunState.Starting)
.forEach(m -> { .forEach(m -> {
executorService.execute(m); executorService.execute(m);
}); });
break; break;
case Stopped: case Stopped:
motors.stream() motors.stream()
.filter(m -> m.getState().get() != RunState.Stopped) .filter(m -> m.getState().get() != RunState.Stopped)
.forEach(Motor::requestStop); .forEach(Motor::requestStop);
break; break;
case Finished: case Finished:
case Stopping: case Stopping:
@ -311,26 +315,28 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private void awaitAlignmentOfMotorStateToActivityState() { private void awaitAlignmentOfMotorStateToActivityState() {
logger.debug(() -> "awaiting state alignment from " + activity.getRunState()); logger.debug(() -> "awaiting state alignment from " + activity.getRunState());
RunStateImage states = null;
switch (activity.getRunState()) { switch (activity.getRunState()) {
case Starting: case Starting:
case Running: case Running:
tally.awaitNoneOther(RunState.Running, RunState.Finished); states = tally.awaitNoneOther(RunState.Running, RunState.Finished);
break; break;
case Errored: case Errored:
case Stopping: case Stopping:
case Stopped: case Stopped:
tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored); states = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
break; break;
case Uninitialized: case Uninitialized:
break; break;
case Finished: case Finished:
tally.awaitNoneOther(RunState.Finished); states = tally.awaitNoneOther(RunState.Finished);
break; break;
default: default:
throw new RuntimeException("Unmatched run state:" + activity.getRunState()); throw new RuntimeException("Unmatched run state:" + activity.getRunState());
} }
logger.debug("activity and threads are aligned to state " + activity.getRunState() + " for " + this.getActivity().getAlias()); RunState previousState = activity.getRunState();
activity.setRunState(states.getMaxState());
logger.debug("activity and threads are aligned to state " + previousState + " for " + this.getActivity().getAlias() + ", and advanced to " + activity.getRunState());
} }
@ -391,12 +397,16 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
// instantiate and configure fixtures that need to be present // instantiate and configure fixtures that need to be present
// before threads start running such as metrics instruments // before threads start running such as metrics instruments
activity.initActivity(); activity.initActivity();
startMotorExecutorService();
startRunningActivityThreads();
awaitMotorsAtLeastRunning(); awaitMotorsAtLeastRunning();
logger.debug("STARTED " + activityDef.getAlias());
awaitActivityCompletion(); awaitActivityCompletion();
activity.shutdownActivity();
activity.closeAutoCloseables();
} catch (Exception e) { } catch (Exception e) {
this.exception = e; this.exception = e;
} finally {
activity.shutdownActivity();
activity.closeAutoCloseables();
} }
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception); ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
return result; return result;
@ -420,7 +430,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
} }
public synchronized void startActivity() { public synchronized void startActivity() {
// we need an executor service to run motor threads on RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped);
if (startable.isTimeout()) {
throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable);
}
startMotorExecutorService(); startMotorExecutorService();
startRunningActivityThreads(); startRunningActivityThreads();
awaitMotorsAtLeastRunning(); awaitMotorsAtLeastRunning();
@ -471,10 +484,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private void startMotorExecutorService() { private void startMotorExecutorService() {
this.executorService = new ThreadPoolExecutor( this.executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS, 0L, TimeUnit.SECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this)) new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
); );
} }
@ -491,14 +504,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary()); logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
Annotators.recordAnnotation(Annotation.newBuilder() Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId) .session(sessionId)
.now() .now()
.layer(Layer.Activity) .layer(Layer.Activity)
.label("alias", getActivityDef().getAlias()) .label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType()) .label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString()) .detail("params", getActivityDef().toString())
.build() .build()
); );
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")"); activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");

View File

@ -86,8 +86,6 @@ public class ScenarioController {
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor); Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor); ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo); this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo);
executor.startActivity();
scenariologger.debug("STARTED " + activityDef.getAlias());
} }
return this.activityInfoMap.get(activityDef.getAlias()); return this.activityInfoMap.get(activityDef.getAlias());
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022 nosqlbench * Copyright (c) 2022-2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -47,7 +47,7 @@ class ActivityExecutorTest {
@Test @Test
synchronized void testRestart() { synchronized void testRestart() {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;"); ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-restart;cycles=1000;cyclerate=1;op=initdelay:initdelay=5000;");
new ActivityTypeLoader().load(activityDef); new ActivityTypeLoader().load(activityDef);
final Activity activity = new DelayedInitActivity(activityDef); final Activity activity = new DelayedInitActivity(activityDef);
@ -67,23 +67,23 @@ class ActivityExecutorTest {
try { try {
activityDef.setThreads(1); activityDef.setThreads(1);
activityExecutor.startActivity(); activityExecutor.startActivity();
Thread.sleep(500L);
activityExecutor.stopActivity(); activityExecutor.stopActivity();
activityExecutor.startActivity(); activityExecutor.startActivity();
activityExecutor.startActivity(); activityExecutor.stopActivity();
future.get(); future.get();
Thread.sleep(500L);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
executor.shutdown(); executor.shutdown();
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull(); assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNotNull();
} }
@Test @Test
synchronized void testDelayedStartSanity() { synchronized void testDelayedStartSanity() {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;"); final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
new ActivityTypeLoader().load(activityDef); new ActivityTypeLoader().load(activityDef);
final Activity activity = new DelayedInitActivity(activityDef); final Activity activity = new DelayedInitActivity(activityDef);
@ -119,7 +119,7 @@ class ActivityExecutorTest {
@Test @Test
synchronized void testNewActivityExecutor() { synchronized void testNewActivityExecutor() {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;"); ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef); new ActivityTypeLoader().load(activityDef);
getActivityMotorFactory(motorActionDelay(999), new AtomicInput(activityDef)); getActivityMotorFactory(motorActionDelay(999), new AtomicInput(activityDef));
@ -140,7 +140,7 @@ class ActivityExecutorTest {
activityDef.setThreads(5); activityDef.setThreads(5);
activityExecutor.startActivity(); activityExecutor.startActivity();
int[] speeds = new int[]{1, 2000, 5, 2000, 2, 2000}; int[] speeds = new int[]{1, 50, 5, 50, 2, 50};
for (int offset = 0; offset < speeds.length; offset += 2) { for (int offset = 0; offset < speeds.length; offset += 2) {
int threadTarget = speeds[offset]; int threadTarget = speeds[offset];
int threadTime = speeds[offset + 1]; int threadTime = speeds[offset + 1];
@ -158,7 +158,7 @@ class ActivityExecutorTest {
// Used for slowing the roll due to state transitions in test. // Used for slowing the roll due to state transitions in test.
try { try {
activityExecutor.stopActivity(); activityExecutor.stopActivity();
Thread.sleep(2000L); // Thread.sleep(2000L);
} catch (Exception e) { } catch (Exception e) {
fail("Not expecting exception", e); fail("Not expecting exception", e);
} }

View File

@ -40,6 +40,7 @@
<maven.compiler.source>17</maven.compiler.source> <maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target> <maven.compiler.target>17</maven.compiler.target>
<PROG>nb5</PROG>
</properties> </properties>
<name>${project.artifactId}</name> <name>${project.artifactId}</name>