partial work, reset and refactor before use

This commit is contained in:
Jonathan Shook 2024-06-11 15:30:28 -05:00
parent 3f23d85df3
commit f520d15037
43 changed files with 747 additions and 2259 deletions

View File

@ -527,7 +527,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-ea @{argLine}</argLine>
<argLine>-ea --enable-preview @{argLine}</argLine>
<systemPropertyVariables>
<Log4jContextSelector>
org.apache.logging.log4j.core.async.AsyncLoggerContextSelector
@ -612,6 +612,7 @@
</execution>
</executions>
<configuration>
<compilerArgs>--enable-preview</compilerArgs>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<includes>
@ -733,12 +734,12 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M8</version>
<version>3.2.5</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M8</version>
<version>3.2.5</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@ -884,7 +885,7 @@
<detectOfflineLinks>false</detectOfflineLinks>
<!-- <additionalparam>-Xdoclint:none</additionalparam>-->
<additionalOptions>
<additionalOption>-Xdoclint:none</additionalOption>
<additionalOption>-Xdoclint:none --enable-preview</additionalOption>
</additionalOptions>
<!-- <additionalJOption>-Xdoclint:none</additionalJOption>-->
<doclint>none</doclint>

View File

@ -34,9 +34,6 @@ import java.nio.ByteBuffer;
*/
public class TokenMapFileAPIService {
// public static ThreadLocal<Map<String, BinaryCursorForTokenCycle>> tl_cll =
// ThreadLocal.withInitial(HashMap::new);
//
private final int recordCount;
private final ByteBuffer buffer;
private final int RECORD_LEN = Long.BYTES * 2;

View File

@ -31,7 +31,7 @@ scenarios:
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto
astra:
schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF
schema: run driver=cql tags==block:schema_astra threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto
basic_check:

View File

@ -31,9 +31,7 @@ import java.util.function.BiFunction;
* be the core driver for the purposes of validating the NoSQLBench engine
* and associated libraries and extensions.
*
* The tasks are retained as shared across threads by default. This means that if
* you want to have per-thread logic or to do things which are not thread-safe,
* you need to use ThreadLocal or some other pattern to do so.
* The tasks are retained as shared across threads by default.
*
* Tasks must provide a no-args constructor (or no constructor, which does the same).
* Tasks must specify their configuration requirements via the {@link NBConfigurable}

View File

@ -32,9 +32,8 @@ import java.util.List;
import java.util.Locale;
/**
* ThreadLocal http clients have been removed from this version, as the built-in
* HTTP client implementation is meant to be immutable. If shared-state issues
* occur, thread-local support will be re-added.
* per-thread http clients have been removed from this version, as the built-in
* HTTP client implementation is meant to be immutable.
*/
public class HttpSpace implements NBLabeledElement {
private final static Logger logger = LogManager.getLogger(HttpSpace.class);

View File

@ -19,6 +19,7 @@ import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
import io.nosqlbench.nb.api.lifecycle.ObjectPool;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -36,8 +37,6 @@ public abstract class JDBCDMLOp extends JDBCOp {
protected final String pStmtSqlStr;
protected final List<Object> pStmtValList;
protected static ThreadLocal<Statement> jdbcStmtTL = ThreadLocal.withInitial(() -> null);
public JDBCDMLOp(JDBCSpace jdbcSpace,
boolean isReadStmt,
String pStmtSqlStr,
@ -96,8 +95,7 @@ public abstract class JDBCDMLOp extends JDBCOp {
}
}
stmt.setObject(fieldIdx, fieldValObj);
}
catch ( SQLException e) {
} catch (SQLException e) {
throw new SQLException(
"Failed to parse the prepared statement value for field[" + fieldIdx + "] " + fieldValObj);
}
@ -115,22 +113,19 @@ public abstract class JDBCDMLOp extends JDBCOp {
}
protected Statement createDMLStatement() throws SQLException {
Statement stmt = jdbcStmtTL.get();
if (stmt == null) {
if (isPreparedStmt)
stmt = jdbcConnection.prepareStatement(pStmtSqlStr);
else
stmt = jdbcConnection.createStatement();
final Statement stmt = isPreparedStmt
? jdbcConnection.prepareStatement(pStmtSqlStr)
: jdbcConnection.createStatement();
jdbcStmtTL.set(stmt);
LOGGER.debug(
() -> "A statement is created -- prepared: "
+isPreparedStmt
+", read/write: "
+ (isReadStmt ? "read" : "write) "
+ ", stmt: "
+ stmt)
);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}",
isPreparedStmt,
isReadStmt ? "read" : "write",
stmt);
}
}
return stmt;
}
}

View File

@ -20,6 +20,7 @@ import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.nb.api.lifecycle.ObjectPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -28,6 +29,7 @@ import java.util.Properties;
import java.util.Random;
public abstract class JDBCOp implements CycleOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCOp.class);
protected static final String LOG_COMMIT_SUCCESS =
"Executed the JDBC statement & committed the connection successfully";

View File

@ -71,8 +71,7 @@ public abstract class BaseOpDispenser<T extends Op, S> extends NBBaseComponent i
private final List<Class<?>> verifierStaticImports = new ArrayList<>();
/**
* optional invokable functions which throw exceptions when results are not verifiable.
* This variable is kept here for diagnostics and debugging. The actual instance used within
* each thread is provided by a {@link ThreadLocal} via {@link #getVerifier()}
* This variable is kept here for diagnostics and debugging.
*/
private final CycleFunction<Boolean> _verifier;
private final ThreadLocal<CycleFunction<Boolean>> tlVerifier;

View File

@ -26,6 +26,7 @@ public class NBComponentExecutionScope implements AutoCloseable {
public NBComponentExecutionScope(NBComponent... components) {
this.components = components;
}
@Override
public void close() throws RuntimeException {
for (NBComponent component : components) {

View File

@ -26,30 +26,30 @@ import java.util.function.Supplier;
/**
* <P>This object pooling class provides a simple way to pool objects which:
* <UL>
* <LI>Have a non-trivial creation cost</LI>
* <LI>Are expensive to create relative to how they are used</LI>
* <LI>Need to be used my multiple threads</LI>
* <LI>Are not thread safe</LI>
* </UL>
* <p>
* The overhead of using this pool should be weighed against the simpler case of just
* creating the object where it is needed. Generally speaking, avoid using this pool
* unless you know that object caching or pooling is needed.
* The overhead of using this pool should be weighed against the simpler case of just creating the object where it is
* needed. Generally speaking, avoid using this pool unless you know that object caching or pooling is needed from
* profiling.
* </P>
*
* <P>This pool keeps count of how many active elements are in use by caller threads.
* In the event that the available pool of objects is more than 2X the size of the active
* ones, it is reduced to 1/2 its current size, <em>after</em> the last element is released.
* This means that the pool will size down automatically when there are transient or episodic
* spikes.
* <P>This pool keeps count of how many active elements are in use by caller threads. In the event that the available
* pool of objects is more than 2X the size of the active ones, it is reduced to 1/2 its current size, <em>after</em>
* the last element is released. This means that the pool will size down automatically when there are transient or
* episodic spikes.
* </P>
*
* <P>The pool also handles object resetting via the consumer provided. Although it is expressed as a consumer,
* it doesn't strictly consume references. It simply does whatever the caller needs in order to effectively
* reset one of the pooled objects as it is returned to the resource pool.</P>
* <P>The pool also handles object resetting via the {@link Consumer} provided. Although it is expressed as a consumer,
* it doesn't strictly consume references. It simply does whatever the caller needs in order to effectively reset one of
* the pooled objects as it is returned to the resource pool.</P>
*
* <P>Safe usage of this class is achieved by wrapping it with a try-with-resources clause so that automatic
* resource management takes care of returning and reusing objects. The type returned by {@link #get()} is a
* reference wrapper. For example, you can create and use a pool of {@link StringBuilder}s like this:
*
* <P>Typical usage of this class will wrap it in a try-with-resources clause so that automatic resource
* management takes care of returning and reusing objects. The type returned by {@link #get()} is a reference
* wrapper. For example, you can create and use a pool of {@link StringBuilder}s like this:
* <pre><code>
* ObjectPool<StringBuilder> pool = new ObjectPool<>(StringBuilder::new, sb -> sb.setLength());
* try (var handle = pool.get()) {
@ -61,10 +61,11 @@ import java.util.function.Supplier;
* throw new RuntimeException(e);
* }
* </code></pre>
*
* At the end of the try-with-resources, the StringBuilder will be returned to the pool after
* its length has been reset to 0.
* <p>
* At the end of the try-with-resources, the StringBuilder will be returned to the pool after its length has been
* reset to 0.
* </P>
*
* @param <T>
*/
public class ObjectPool<T> implements Supplier<ObjectPool.Borrowed<T>> {

View File

@ -130,25 +130,26 @@
<!-- test scope only -->
</dependencies>
<profiles>
<profile>
<id>perftests</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<groups>perf</groups>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<!-- <profiles>-->
<!-- <profile>-->
<!-- <id>perftests</id>-->
<!-- <activation>-->
<!-- <activeByDefault>false</activeByDefault>-->
<!-- </activation>-->
<!-- <build>-->
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-surefire-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <groups>perf</groups>-->
<!-- <compilerArgs>&#45;&#45;enable-preview</compilerArgs>-->
<!-- </configuration>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
<!-- </profile>-->
<!-- </profiles>-->
<build>

View File

@ -27,7 +27,6 @@ import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import java.io.InputStream;
import java.io.PrintWriter;
@ -84,11 +83,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
void setOutputDispenserDelegate(OutputDispenser outputDispenser);
@Override
RunState getRunState();
void setRunState(RunState runState);
long getStartedAtMillis();
default void shutdownActivity() {
@ -132,21 +126,9 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
ErrorMetrics getExceptionMetrics();
// /**
// * When a driver needs to identify an error uniquely for the purposes of
// * routing it to the correct error handler, or naming it in logs, or naming
// * metrics, override this method in your activity.
// * @return A function that can reliably and safely map an instance of Throwable to a stable name.
// */
// default Function<Throwable,String> getErrorNameMapper() {
// return t -> t.getClass().getSimpleName();
// }
//
int getMaxTries();
default int getHdrDigits() {
return this.getParams().getOptionalInteger("hdr_digits").orElse(4);
}
RunStateTally getRunStateTally();
}

View File

@ -22,7 +22,6 @@ import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityimpl.CoreServices;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
import java.util.Map;
@ -37,7 +36,7 @@ import java.util.Optional;
* and by extension, default inputs and motors.</p>
*/
//@Deprecated(forRemoval = true,since = "5.0")
public interface ActivityType<A extends Activity> {
public interface ActivityType<A extends SimpleActivity<?,?>> {
/**
@ -67,17 +66,12 @@ public interface ActivityType<A extends Activity> {
if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities);
activity.setInputDispenserDelegate(inputDispenser);
final ActionDispenser actionDispenser = this.getActionDispenser(activity);
if (actionDispenser instanceof ActivitiesAware)
((ActivitiesAware) actionDispenser).setActivitiesMap(activities);
activity.setActionDispenserDelegate(actionDispenser);
final OutputDispenser outputDispenser = this.getOutputDispenser(activity).orElse(null);
if ((null != outputDispenser) && (outputDispenser instanceof ActivitiesAware))
((ActivitiesAware) outputDispenser).setActivitiesMap(activities);
activity.setOutputDispenserDelegate(outputDispenser);
final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, outputDispenser);
if (motorDispenser instanceof ActivitiesAware) ((ActivitiesAware) motorDispenser).setActivitiesMap(activities);
activity.setMotorDispenserDelegate(motorDispenser);
@ -94,15 +88,6 @@ public interface ActivityType<A extends Activity> {
return CoreServices.getOutputDispenser(activity);
}
/**
* This method will be called <em>once</em> per action instance.
*
* @param activity The activity instance that will parameterize the returned ActionDispenser instance.
* @return an instance of ActionDispenser
*/
default ActionDispenser getActionDispenser(final A activity) {
return new CoreActionDispenser(activity);
}
/**
* Return the InputDispenser instance that will be used by the associated activity to create Input factories
@ -118,9 +103,8 @@ public interface ActivityType<A extends Activity> {
default <T> MotorDispenser<T> getMotorDispenser(
final A activity,
final InputDispenser inputDispenser,
final ActionDispenser actionDispenser,
final OutputDispenser outputDispenser) {
return new CoreMotorDispenser<T>(activity, inputDispenser, actionDispenser, outputDispenser);
return new CoreMotorDispenser<T>(activity, inputDispenser, outputDispenser);
}
/**

View File

@ -16,13 +16,14 @@
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.engine.api.activityimpl.MotorState;
import io.nosqlbench.engine.api.activityapi.input.Input;
import java.util.concurrent.Callable;
/**
* The core threading harness within an activity.
*/
public interface Motor<T> extends Runnable, Stoppable {
public interface Motor<T> extends Callable<Void> {
/**
* Set the input on this motor. It will be read from each cycle before applying the action.
@ -34,27 +35,9 @@ public interface Motor<T> extends Runnable, Stoppable {
Input getInput();
/**
* Set the action on this motor. It will be applied to each input.
*
* @param action an instance of activityAction
* @return this ActivityMotor, for method chaining
*/
Motor<T> setAction(Action action);
Action getAction();
/**
* get the slotId which this motor is assigned to within the activity instance.
* @return long slot id
*/
long getSlotId();
/**
* Get a description of the current slot run status.
* @return - a value from the {@link RunState} enum
*/
MotorState getState();
void removeState();
}

View File

@ -16,8 +16,6 @@
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.engine.api.activityimpl.MotorState;
/**
* <P>This enum indicates the state of a thread within an activity. The state is kept in an atomic
* register. Ownership of state changes is shared between a supervising thread and a managed thread.

View File

@ -27,8 +27,7 @@ public interface InputDispenser {
/**
* Resolve (find or create) an Input instance for the slot specified.
* The input is not required to be per-slot (per-thread), but any shared inputs must be thread safe.
* @param slot The numbered slot within the activity instance for this action.
* @return A new or cached Input for the specified slot.
*/
Input getInput(long slot);
Input getInput();
}

View File

@ -1,79 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
/**
* Holds the state of a slot, allows only valid transitions, and shares the
* slot state as
*/
public class MotorState implements Supplier<RunState> {
private final static Logger logger = LogManager.getLogger("MOTORS");
private final AtomicReference<RunState> atomicState = new AtomicReference<>(RunState.Uninitialized);
private final long slotId;
private final RunStateTally tally;
public MotorState(long slotId, RunStateTally tally) {
this.slotId = slotId;
this.tally = tally;
tally.add(atomicState.get());
}
public RunState get() {
return atomicState.get();
}
/**
* This is how you share the current slot state most directly, but it has a caveat. By sharing the
* slot state in this way, you allow external changes. You should only use this method to share slot
* state for observers.
* @return an atomic reference for SlotState
*/
public AtomicReference<RunState> getAtomicSlotState() {
return atomicState;
}
/**
* <p>Transition the thread slot to a new state. only accepting valid transitions.</p>
* <p>The valid slot states will be moved to a data type eventually, simplifying this method.</p>
*
* @param to The next SlotState for this thread/slot/motor
*/
public synchronized void enterState(RunState to) {
RunState from = atomicState.get();
if (!from.canTransitionTo(to)) {
throw new RuntimeException("Invalid transition from " + from + " to " + to);
}
while (!atomicState.compareAndSet(from, to)) {
logger.trace(() -> "retrying transition from:" + from + " to:" + to);
}
tally.change(from,to);
logger.trace(() -> "TRANSITION[" + slotId + "]: " + from + " ==> " + to);
}
public void removeState() {
logger.trace(() -> "Removing motor state " + atomicState.get());
tally.remove(atomicState.get());
}
}

View File

@ -16,6 +16,7 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
@ -41,12 +42,16 @@ import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.NBEvent;
import io.nosqlbench.nb.api.components.events.ParamChange;
import io.nosqlbench.nb.api.components.events.SetThreads;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.labels.NBLabels;
@ -57,12 +62,10 @@ import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
/**
* A default implementation of an Activity, suitable for building upon.
*/
public class SimpleActivity extends NBStatusComponent implements Activity, InvokableResult {
public class SimpleActivity<R extends Op, S> extends NBStatusComponent implements Activity, InvokableResult, SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
protected ActivityDef activityDef;
@ -83,7 +86,9 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
private NBErrorHandler errorHandler;
private ActivityMetricProgressMeter progressMeter;
private String workloadSource = "unspecified";
private final RunStateTally tally = new RunStateTally();
private final ConcurrentHashMap<String, DriverAdapter<?, ?>> adapters = new ConcurrentHashMap<>();
private final OpSequence<OpDispenser<? extends Op>> sequence;
public SimpleActivity(NBComponent parent, ActivityDef activityDef) {
super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels()));
@ -102,6 +107,126 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
nameEnumerator++;
}
}
OpsDocList workload;
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
NBConfigModel yamlmodel;
if (yaml_loc.isPresent()) {
Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
workload = OpsLoader.loadPath(yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
} else {
yamlmodel = ConfigModel.of(SimpleActivity.class).asReadOnly();
}
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
Optional<DriverAdapter<?, ?>> defaultAdapter = defaultDriverName
.flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
.map(l -> l.load(this, NBLabels.forKV()));
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\'');
}
// HERE, op templates are loaded before drivers are loaded
List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter.orElse(null));
List<ParsedOp> pops = new ArrayList<>();
List<DriverAdapter<?, ?>> adapterlist = new ArrayList<>();
NBConfigModel supersetConfig = ConfigModel.of(SimpleActivity.class).add(yamlmodel);
Optional<String> defaultDriverOption = defaultDriverName;
ConcurrentHashMap<String, OpMapper<? extends Op>> mappers = new ConcurrentHashMap<>();
for (OpTemplate ot : opTemplates) {
// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
String driverName = ot.getOptionalStringParam("driver", String.class)
.or(() -> ot.getOptionalStringParam("type", String.class))
.or(() -> defaultDriverOption)
.orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
// String driverName = ot.getOptionalStringParam("driver")
// .or(() -> activityDef.getParams().getOptionalString("driver"))
// .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
// HERE
if (!adapters.containsKey(driverName)) {
DriverAdapter<?, ?> adapter = Optional.of(driverName)
.flatMap(
name -> ServiceSelector.of(
name,
ServiceLoader.load(DriverAdapterLoader.class)
)
.get())
.map(
l -> l.load(
this,
NBLabels.forKV()
)
)
.orElseThrow(() -> new OpConfigError("driver adapter not present for name '" + driverName + "'"));
NBConfigModel combinedModel = yamlmodel;
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
if (adapter instanceof NBConfigurable configurable) {
NBConfigModel adapterModel = configurable.getConfigModel();
supersetConfig.add(adapterModel);
combinedModel = adapterModel.add(yamlmodel);
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
configurable.applyConfig(combinedConfig);
}
adapters.put(driverName, adapter);
mappers.put(driverName, adapter.getOpMapper());
}
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
DriverAdapter<?, ?> adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
pops.add(pop);
}
if (defaultDriverOption.isPresent()) {
long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count();
if (0 == matchingDefault) {
logger.warn("All op templates used a different driver than the default '{}'", defaultDriverOption.get());
}
}
try {
sequence = createOpSourceFromParsedOps(adapterlist, pops);
} catch (Exception e) {
if (e instanceof OpConfigError) {
throw e;
}
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
}
create().gauge(
"ops_pending",
() -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core,
"The current number of operations which have not been dispatched for processing yet."
);
create().gauge(
"ops_active",
() -> this.getProgressMeter().getSummary().current(),
MetricCategory.Core,
"The current number of operations which have been dispatched for processing, but which have not yet completed."
);
create().gauge(
"ops_complete",
() -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core,
"The current number of operations which have been completed"
);
}
public SimpleActivity(NBComponent parent, String activityDefString) {
@ -111,6 +236,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
@Override
public synchronized void initActivity() {
initOrUpdateRateLimiters(this.activityDef);
setDefaultsFromOpSequence(sequence);
}
public synchronized NBErrorHandler getErrorHandler() {
@ -127,14 +254,6 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
return runState;
}
@Override
public synchronized void setRunState(RunState runState) {
this.runState = runState;
if (RunState.Running == runState) {
this.startedAtMillis = System.currentTimeMillis();
}
}
@Override
public long getStartedAtMillis() {
return startedAtMillis;
@ -196,7 +315,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
}
public String toString() {
return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.tally;
return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.runState;
}
@Override
@ -230,6 +349,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
return null;
}
}
@Override
public synchronized RateLimiter getStrideLimiter() {
if (strideLimiterSource != null) {
@ -276,7 +396,28 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
// initOrUpdateRateLimiters(activityDef);
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg, List.of(configurable));
}
}
}
public void onEvent(NBEvent event) {
switch (event) {
case ParamChange<?> pc -> {
switch (pc.value()) {
case SetThreads st -> activityDef.setThreads(st.threads);
case CycleRateSpec crs -> createOrUpdateCycleLimiter(crs);
case StrideRateSpec srs -> createOrUpdateStrideLimiter(srs);
default -> super.onEvent(event);
}
}
default -> super.onEvent(event);
}
}
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
@ -613,8 +754,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
if (op != null && OpsLoader.isJson(op)) {
workloadSource = "commandline: (op/json): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null);
}
else if (op != null) {
} else if (op != null) {
workloadSource = "commandline: (op/inline): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null);
}
@ -645,32 +785,48 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
}
@Override
public RunStateTally getRunStateTally() {
return tally;
}
@Override
public Map<String, String> asResult() {
return Map.of("activity", this.getAlias());
}
// private final ThreadLocal<RateLimiter> cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> {
// RateLimiters.createOrUpdate(this,null,new SimRateSpec()
// if (cycleratePerThread) {
// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,)
// } else {
// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef)
// }
// if (getCycleLimiter() != null) {
// return RateLimiters.createOrUpdate(
// new NBThreadComponent(this),
// getCycleLimiter(),
// getCycleLimiter().getSpec());
// } else {
// return null;
// }
// });
@Override
public NBLabels getLabels() {
return super.getLabels();
}
public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter<?, ?>> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
DriverAdapter<?, ?> adapter = entry.getValue();
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
if (space instanceof AutoCloseable autocloseable) {
try {
autocloseable.close();
} catch (Exception e) {
throw new RuntimeException("Error while shutting down state space for " +
"adapter=" + adapterName + ", space=" + spaceName + ": " + e, e);
}
}
});
}
}
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg);
opTemplates.addAll(newTemplates);
}
}
return opTemplates;
}
public OpSequence<OpDispenser<? extends Op>> getOpSequence() {
return sequence;
}
}

View File

@ -15,17 +15,25 @@
*/
package io.nosqlbench.engine.api.activityimpl.motor;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*;
import io.nosqlbench.adapters.api.evalctx.CycleFunction;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultSegmentBuffer;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.engine.api.activityimpl.MotorState;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.OpTracker;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.errors.ResultVerificationError;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -40,12 +48,12 @@ import static io.nosqlbench.engine.api.activityapi.core.RunState.*;
* instance is responsible for taking input from a LongSupplier and applying
* the provided LongConsumer to it on each cycle. These two parameters are called
* input and action, respectively.
*
* <p>
* This motor implementation splits the handling of sync and async actions with a hard
* fork in the middle to limit potential breakage of the prior sync implementation
* with new async logic.
*/
public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, SyncAction {
private static final Logger logger = LogManager.getLogger(CoreMotor.class);
@ -62,78 +70,81 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
private Timer cycleResponseTimer;
private Input input;
private Action action;
private final Activity activity;
private Output output;
private final MotorState motorState;
// private final AtomicReference<RunState> slotState;
private int stride = 1;
private OpTracker<D> opTracker;
private final Timer executeTimer;
private final Histogram triesHistogram;
private final Timer resultSuccessTimer;
private final Timer resultTimer;
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends Op>> opsequence;
private final int maxTries;
private final Timer verifierTimer;
/**
* Create an ActivityMotor.
*
* @param activity The activity that this motor will be associated with.
* @param slotId The enumeration of the motor, as assigned by its executor.
* @param input A LongSupplier which provides the cycle number inputs.
* @param activity
* The activity that this motor will be associated with.
* @param slotId
* The enumeration of the motor, as assigned by its executor.
* @param input
* A LongSupplier which provides the cycle number inputs.
*/
public CoreMotor(
Activity activity,
SimpleActivity activity,
long slotId,
Input input) {
this.activity = activity;
this.slotId = slotId;
setInput(input);
motorState = new MotorState(slotId, activity.getRunStateTally());
onActivityDefUpdate(activity.getActivityDef());
this.opsequence = activity.getOpSequence();
this.maxTries = activity.getMaxTries();
bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram();
resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
errorHandler = activity.getErrorHandler();
verifierTimer = activity.getInstrumentation().getOrCreateVerifierTimer();
}
/**
* Create an ActivityMotor.
*
* @param activity The activity that this motor is based on.
* @param slotId The enumeration of the motor, as assigned by its executor.
* @param input A LongSupplier which provides the cycle number inputs.
* @param action An LongConsumer which is applied to the input for each cycle.
* @param activity
* The activity that this motor is based on.
* @param slotId
* The enumeration of the motor, as assigned by its executor.
* @param input
* A LongSupplier which provides the cycle number inputs.
* @param output
* An optional opTracker.
*/
public CoreMotor(
Activity activity,
SimpleActivity activity,
long slotId,
Input input,
Action action
) {
this(activity, slotId, input);
setAction(action);
}
/**
* Create an ActivityMotor.
*
* @param activity The activity that this motor is based on.
* @param slotId The enumeration of the motor, as assigned by its executor.
* @param input A LongSupplier which provides the cycle number inputs.
* @param action An LongConsumer which is applied to the input for each cycle.
* @param output An optional opTracker.
*/
public CoreMotor(
Activity activity,
long slotId,
Input input,
Action action,
Output output
) {
this(activity, slotId, input);
setAction(action);
setResultOutput(output);
}
/**
* Set the input for this ActivityMotor.
*
* @param input The LongSupplier that provides the cycle number.
* @param input
* The LongSupplier that provides the cycle number.
* @return this ActivityMotor, for chaining
*/
@Override
@ -148,41 +159,14 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
}
/**
* Set the action for this ActivityMotor.
*
* @param action The LongConsumer that will be applied to the next cycle number.
* @return this ActivityMotor, for chaining
*/
@Override
public Motor<D> setAction(Action action) {
this.action = action;
return this;
}
@Override
public Action getAction() {
return action;
}
@Override
public long getSlotId() {
return this.slotId;
}
@Override
public MotorState getState() {
return motorState;
}
@Override
public void removeState() {
motorState.removeState();
}
@Override
public void run() {
motorState.enterState(Starting);
public Void call() {
try {
inputTimer = activity.getInstrumentation().getOrCreateInputTimer();
@ -193,12 +177,6 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
cycleRateLimiter = activity.getCycleLimiter();
if (motorState.get() == Finished) {
logger.warn(() -> "Input was already exhausted for slot " + slotId + ", remaining in finished state.");
}
action.init();
if (input instanceof Startable) {
((Startable) input).start();
}
@ -211,7 +189,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
long strideDelay = 0L;
long cycleDelay = 0L;
if (action instanceof SyncAction sync) {
SyncAction sync = this;
cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer();
strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer();
@ -219,20 +197,17 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async.");
}
motorState.enterState(Running);
while (motorState.get() == Running) {
CycleSegment cycleSegment = null;
CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride);
try (Timer.Context inputTime = inputTimer.time()) {
cycleSegment = input.getInputSegment(stride);
}
if (cycleSegment == null) {
logger.trace(() -> "input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId);
motorState.enterState(Finished);
continue;
throw new RuntimeException("invalid state with cycle segment = null");
}
@ -249,15 +224,10 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (cyclenum < 0) {
if (cycleSegment.isExhausted()) {
logger.trace(() -> "input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId);
motorState.enterState(Finished);
continue;
}
}
if (motorState.get() != Running) {
logger.trace(() -> "motor stopped after input (input " + cyclenum + "), stopping motor thread " + slotId);
continue;
}
int result = -1;
if (cycleRateLimiter != null) {
@ -270,7 +240,6 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
logger.trace(() -> "cycle " + cyclenum);
result = sync.runCycle(cyclenum);
} catch (Exception e) {
motorState.enterState(Errored);
throw e;
} finally {
long cycleEnd = System.nanoTime();
@ -293,37 +262,24 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
throw t;
}
}
}
} else {
throw new RuntimeException("Valid Action implementations must implement SyncAction");
}
if (motorState.get() == Stopping) {
motorState.enterState(Stopped);
logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
} else if (motorState.get() == Finished) {
logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
} else {
logger.warn(()->"Unexpected motor state for CoreMotor shutdown: " + motorState.get());
}
} catch (Throwable t) {
logger.error(() -> "Error in core motor loop:" + t, t);
motorState.enterState(Errored);
throw t;
}
return null;
}
@Override
public String toString() {
return this.activity.getAlias() + ": slot:" + this.slotId + "; state:" + motorState.get();
return this.activity.getAlias() + ": slot:" + this.slotId;
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
for (Object component : (new Object[]{input, opTracker, action, output})) {
for (Object component : (new Object[]{input, opTracker, this, output})) {
if (component instanceof ActivityDefObserver) {
((ActivityDefObserver) component).onActivityDefUpdate(activityDef);
}
@ -335,19 +291,89 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
}
@Override
public synchronized void requestStop() {
RunState currentState = motorState.get();
if (Objects.requireNonNull(currentState) == Running) {
Stoppable.stop(input, action);
motorState.enterState(Stopping);
} else {
logger.warn(() -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState);
}
}
public void setResultOutput(Output resultOutput) {
this.output = resultOutput;
}
public int runCycle(long cycle) {
OpDispenser<? extends Op> dispenser = null;
Op op = null;
try (Timer.Context ct = bindTimer.time()) {
dispenser = opsequence.apply(cycle);
op = dispenser.getOp(cycle);
} catch (Exception e) {
throw new RuntimeException("while binding request in cycle " + cycle + " for op template named '" + (dispenser != null ? dispenser.getOpName() : "NULL") +
"': " + e.getMessage(), e);
}
int code = 0;
Object result = null;
while (op != null) {
int tries = 0;
while (tries++ < maxTries) {
Throwable error = null;
long startedAt = System.nanoTime();
dispenser.onStart(cycle);
try (Timer.Context ct = executeTimer.time()) {
if (op instanceof RunnableOp runnableOp) {
runnableOp.run();
} else if (op instanceof CycleOp<?> cycleOp) {
result = cycleOp.apply(cycle);
} else if (op instanceof ChainingOp chainingOp) {
result = chainingOp.apply(result);
} else {
throw new RuntimeException("The op implementation did not implement any active logic. Implement " +
"one of [RunnableOp, CycleOp, or ChainingOp]");
}
// TODO: break out validation timer from execute
try (Timer.Context ignored = verifierTimer.time()) {
CycleFunction<Boolean> verifier = dispenser.getVerifier();
try {
verifier.setVariable("result", result);
verifier.setVariable("cycle", cycle);
Boolean isGood = verifier.apply(cycle);
if (!isGood) {
throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails());
}
} catch (Exception e) {
throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails());
}
}
} catch (Exception e) {
error = e;
} finally {
long nanos = System.nanoTime() - startedAt;
resultTimer.update(nanos, TimeUnit.NANOSECONDS);
if (error == null) {
resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
dispenser.onSuccess(cycle, nanos);
break;
} else {
ErrorDetail detail = errorHandler.handleError(error, cycle, nanos);
dispenser.onError(cycle, nanos, error);
code = detail.resultCode;
if (!detail.isRetryable()) {
break;
}
}
}
}
triesHistogram.update(tries);
if (op instanceof OpGenerator) {
logger.trace(() -> "GEN OP for cycle(" + cycle + ")");
op = ((OpGenerator) op).getNextOp();
} else {
op = null;
}
}
return code;
}
}

View File

@ -15,6 +15,7 @@
*/
package io.nosqlbench.engine.api.activityimpl.motor;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.input.Input;
@ -30,32 +31,28 @@ import java.util.function.IntPredicate;
*/
public class CoreMotorDispenser<D> implements MotorDispenser<D> {
private final Activity activity;
private final SimpleActivity activity;
private final InputDispenser inputDispenser;
private final ActionDispenser actionDispenser;
private final OutputDispenser outputDispenser;
public CoreMotorDispenser(Activity activity,
public CoreMotorDispenser(SimpleActivity activity,
InputDispenser inputDispenser,
ActionDispenser actionDispenser,
OutputDispenser outputDispenser
) {
this.activity = activity;
this.inputDispenser = inputDispenser;
this.actionDispenser = actionDispenser;
this.outputDispenser = outputDispenser;
}
@Override
public Motor<D> getMotor(ActivityDef activityDef, int slotId) {
Action action = actionDispenser.getAction(slotId);
Input input = inputDispenser.getInput(slotId);
Output output = null;
if (outputDispenser !=null) {
output = outputDispenser.getOutput(slotId);
}
IntPredicate resultFilter = null;
Motor<D> am = new CoreMotor<>(activity, slotId, input, action, output);
Motor<D> am = new CoreMotor<>(activity, slotId, input, output);
return am;
}
}

View File

@ -1,34 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
public class StandardActionDispenser implements ActionDispenser {
private final StandardActivity<?,?> activity;
public <A extends Activity> StandardActionDispenser(StandardActivity<?,?> activity) {
this.activity = activity;
}
@Override
public StandardAction<?,?> getAction(int slot) {
return new StandardAction<>(activity,slot);
}
}

View File

@ -1,302 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.events.NBEvent;
import io.nosqlbench.nb.api.components.events.ParamChange;
import io.nosqlbench.nb.api.components.events.SetThreads;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.annotations.ServiceSelector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* This is a typed activity which is expected to become the standard
* core of all new activity types. Extant NB drivers should also migrate
* to this when possible.
*
* @param <R>
* A type of runnable which wraps the operations for this type of driver.
* @param <S>
* The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
*/
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends Op>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<?, ?>> adapters = new ConcurrentHashMap<>();
public StandardActivity(NBComponent parent, ActivityDef activityDef) {
super(parent, activityDef);
OpsDocList workload;
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
NBConfigModel yamlmodel;
if (yaml_loc.isPresent()) {
Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
workload = OpsLoader.loadPath(yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
} else {
yamlmodel = ConfigModel.of(StandardActivity.class).asReadOnly();
}
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
Optional<DriverAdapter<?, ?>> defaultAdapter = defaultDriverName
.flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
.map(l -> l.load(this, NBLabels.forKV()));
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\'');
}
// HERE, op templates are loaded before drivers are loaded
List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter.orElse(null));
List<ParsedOp> pops = new ArrayList<>();
List<DriverAdapter<?, ?>> adapterlist = new ArrayList<>();
NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel);
Optional<String> defaultDriverOption = defaultDriverName;
ConcurrentHashMap<String, OpMapper<? extends Op>> mappers = new ConcurrentHashMap<>();
for (OpTemplate ot : opTemplates) {
// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
String driverName = ot.getOptionalStringParam("driver", String.class)
.or(() -> ot.getOptionalStringParam("type", String.class))
.or(() -> defaultDriverOption)
.orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
// String driverName = ot.getOptionalStringParam("driver")
// .or(() -> activityDef.getParams().getOptionalString("driver"))
// .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
// HERE
if (!adapters.containsKey(driverName)) {
DriverAdapter<?, ?> adapter = Optional.of(driverName)
.flatMap(
name -> ServiceSelector.of(
name,
ServiceLoader.load(DriverAdapterLoader.class)
)
.get())
.map(
l -> l.load(
this,
NBLabels.forKV()
)
)
.orElseThrow(() -> new OpConfigError("driver adapter not present for name '" + driverName + "'"));
NBConfigModel combinedModel = yamlmodel;
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
if (adapter instanceof NBConfigurable configurable) {
NBConfigModel adapterModel = configurable.getConfigModel();
supersetConfig.add(adapterModel);
combinedModel = adapterModel.add(yamlmodel);
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
configurable.applyConfig(combinedConfig);
}
adapters.put(driverName, adapter);
mappers.put(driverName, adapter.getOpMapper());
}
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
DriverAdapter<?, ?> adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
pops.add(pop);
}
if (defaultDriverOption.isPresent()) {
long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count();
if (0 == matchingDefault) {
logger.warn("All op templates used a different driver than the default '{}'", defaultDriverOption.get());
}
}
try {
sequence = createOpSourceFromParsedOps(adapterlist, pops);
} catch (Exception e) {
if (e instanceof OpConfigError) {
throw e;
}
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
}
create().gauge(
"ops_pending",
() -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core,
"The current number of operations which have not been dispatched for processing yet."
);
create().gauge(
"ops_active",
() -> this.getProgressMeter().getSummary().current(),
MetricCategory.Core,
"The current number of operations which have been dispatched for processing, but which have not yet completed."
);
create().gauge(
"ops_complete",
() -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core,
"The current number of operations which have been completed"
);
}
@Override
public void initActivity() {
super.initActivity();
setDefaultsFromOpSequence(sequence);
}
public OpSequence<OpDispenser<? extends Op>> getOpSequence() {
return sequence;
}
// /**
// * When an adapter needs to identify an error uniquely for the purposes of
// * routing it to the correct error handler, or naming it in logs, or naming
// * metrics, override this method in your activity.
// *
// * @return A function that can reliably and safely map an instance of Throwable to a stable name.
// */
// @Override
// public final Function<Throwable, String> getErrorNameMapper() {
// return adapter.getErrorNameMapper();
// }
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg, List.of(configurable));
}
}
}
// @Override
// public synchronized void onActivityDefUpdate(final ActivityDef activityDef) {
// super.onActivityDefUpdate(activityDef);
//
// for (final DriverAdapter adapter : this.adapters.values())
// if (adapter instanceof NBReconfigurable reconfigurable) {
// NBConfigModel cfgModel = reconfigurable.getReconfigModel();
// final Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
// if (op_yaml_loc.isPresent()) {
// final Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
// final OpsDocList workload = OpsLoader.loadPath(op_yaml_loc.get(), disposable, "activities");
// cfgModel = cfgModel.add(workload.getConfigModel());
// }
// final NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
// reconfigurable.applyReconfig(cfg);
// }
//
// }
@Override
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg);
opTemplates.addAll(newTemplates);
}
}
return opTemplates;
}
/**
* This is done here since driver adapters are intended to keep all of their state within
* dedicated <em>state space</em> types. Any space which implements {@link Shutdownable}
* will be closed when this activity shuts down.
*/
@Override
public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter<?, ?>> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
DriverAdapter<?, ?> adapter = entry.getValue();
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
if (space instanceof AutoCloseable autocloseable) {
try {
autocloseable.close();
} catch (Exception e) {
throw new RuntimeException("Error while shutting down state space for " +
"adapter=" + adapterName + ", space=" + spaceName + ": " + e, e);
}
}
});
}
}
@Override
public NBLabels getLabels() {
return super.getLabels();
}
@Override
public void onEvent(NBEvent event) {
switch (event) {
case ParamChange<?> pc -> {
switch (pc.value()) {
case SetThreads st -> activityDef.setThreads(st.threads);
case CycleRateSpec crs -> createOrUpdateCycleLimiter(crs);
case StrideRateSpec srs -> createOrUpdateStrideLimiter(srs);
default -> super.onEvent(event);
}
}
default -> super.onEvent(event);
}
}
}

View File

@ -17,6 +17,7 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
@ -27,7 +28,7 @@ import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
public class StandardActivityType<A extends StandardActivity<?,?>> implements ActivityType<A> {
public class StandardActivityType<A extends SimpleActivity<?,?>> implements ActivityType<A> {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final Map<String, DriverAdapter> adapters = new HashMap<>();
@ -59,13 +60,8 @@ public class StandardActivityType<A extends StandardActivity<?,?>> implements Ac
if (activityDef.getParams().getOptionalString("async").isPresent())
throw new RuntimeException("This driver does not support async mode yet.");
return (A) new StandardActivity(parent, activityDef);
return (A) new SimpleActivity(parent, activityDef);
}
@Override
public ActionDispenser getActionDispenser(final A activity) {
return new StandardActionDispenser(activity);
}
}

View File

@ -1,159 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.uniform.actions;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*;
import io.nosqlbench.adapters.api.evalctx.CycleFunction;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.errors.ResultVerificationError;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
/**
* This is the generified version of an Action. All driver adapters us this, as opposed
* to previous NB versions where it was implemented for each driver.
* <p>
* This allows the API to be consolidated so that the internal machinery of NB
* works in a very consistent and uniform way for all users and drivers.
*
* @param <A>
* The type of activity
* @param <R>
* The type of operation
*/
public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> implements SyncAction, ActivityDefObserver {
private final static Logger logger = LogManager.getLogger("ACTION");
private final Timer executeTimer;
private final Histogram triesHistogram;
private final Timer resultSuccessTimer;
private final Timer resultTimer;
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends Op>> opsequence;
private final int maxTries;
private final Timer verifierTimer;
public StandardAction(A activity, int slot) {
this.opsequence = activity.getOpSequence();
this.maxTries = activity.getMaxTries();
bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram();
resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
errorHandler = activity.getErrorHandler();
verifierTimer = activity.getInstrumentation().getOrCreateVerifierTimer();
}
@Override
public int runCycle(long cycle) {
OpDispenser<? extends Op> dispenser=null;
Op op = null;
try (Timer.Context ct = bindTimer.time()) {
dispenser = opsequence.apply(cycle);
op = dispenser.getOp(cycle);
} catch (Exception e) {
throw new RuntimeException("while binding request in cycle " + cycle + " for op template named '" + (dispenser!=null?dispenser.getOpName():"NULL")+
"': " + e.getMessage(), e);
}
int code = 0;
Object result = null;
while (op != null) {
int tries = 0;
while (tries++ < maxTries) {
Throwable error = null;
long startedAt = System.nanoTime();
dispenser.onStart(cycle);
try (Timer.Context ct = executeTimer.time()) {
if (op instanceof RunnableOp runnableOp) {
runnableOp.run();
} else if (op instanceof CycleOp<?> cycleOp) {
result = cycleOp.apply(cycle);
} else if (op instanceof ChainingOp chainingOp) {
result = chainingOp.apply(result);
} else {
throw new RuntimeException("The op implementation did not implement any active logic. Implement " +
"one of [RunnableOp, CycleOp, or ChainingOp]");
}
// TODO: break out validation timer from execute
try (Timer.Context ignored = verifierTimer.time()) {
CycleFunction<Boolean> verifier = dispenser.getVerifier();
try {
verifier.setVariable("result", result);
verifier.setVariable("cycle",cycle);
Boolean isGood = verifier.apply(cycle);
if (!isGood) {
throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails());
}
} catch (Exception e) {
throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails());
}
}
} catch (Exception e) {
error = e;
} finally {
long nanos = System.nanoTime() - startedAt;
resultTimer.update(nanos, TimeUnit.NANOSECONDS);
if (error == null) {
resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
dispenser.onSuccess(cycle, nanos);
break;
} else {
ErrorDetail detail = errorHandler.handleError(error, cycle, nanos);
dispenser.onError(cycle, nanos, error);
code = detail.resultCode;
if (!detail.isRetryable()) {
break;
}
}
}
}
triesHistogram.update(tries);
if (op instanceof OpGenerator) {
logger.trace(() -> "GEN OP for cycle(" + cycle + ")");
op = ((OpGenerator) op).getNextOp();
} else {
op = null;
}
}
return code;
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.activity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class ActivityExceptionHandler implements Thread.UncaughtExceptionHandler {
private static final Logger logger = LogManager.getLogger(ActivityExceptionHandler.class);
private final ActivityExecutor executor;
public ActivityExceptionHandler(ActivityExecutor executor) {
this.executor = executor;
logger.debug(() -> "Activity executor exception handler starting up for executor '" + executor.getActivityDef().getAlias() + "'");
}
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Uncaught exception in thread '" + t.getName() + ", state[" + t.getState() + "], notifying executor '" + executor + "': " + e);
executor.notifyException(t, e);
}
}

View File

@ -1,615 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.activity;
import com.codahale.metrics.Gauge;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.nb.api.labels.NBLabeledElement;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.core.NBComponentExecutionScope;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityimpl.MotorState;
import io.nosqlbench.nb.api.annotations.Annotation;
import io.nosqlbench.nb.api.annotations.Layer;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateImage;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
//import io.nosqlbench.virtdata.userlibs.apps.valuechecker.IndexedThreadFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* <p>An ActivityExecutor is an execution harness for a single activity instance.
* It is responsible for managing threads and activity settings which may be changed while the activity is running.</p>
*
* <p>In order to allow for dynamic thread management, which is not easily supported as an explicit feature
* of most executor services, threads are started as long-running processes and managed via state signaling.
* The {@link RunState} enum, {@link MotorState} type, and {@link RunStateTally}
* state tracking class are used together to represent valid states and transitions, contain and transition state
* atomically,
* and provide blocking conditions for observers, respectively.</p>
*
* <P>Some basic rules and invariants must be observed for consistent concurrent behavior.
* Any state changes for a Motor must be made through {@link Motor#getState()}.
* This allows the state tracking to work consistently for all observers.</p>
*/
public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {
// TODO Encapsulate valid state transitions to be only modifiable within the appropriate type view.
private static final Logger logger = LogManager.getLogger(ActivityExecutor.class);
private static final Logger activitylogger = LogManager.getLogger("ACTIVITY");
private final LinkedList<Motor<?>> motors = new LinkedList<>();
private final Activity activity;
private final ActivityDef activityDef;
private final RunStateTally tally;
private ExecutorService executorService;
private Exception exception;
private String sessionId = "";
private long startedAt = 0L;
private long stoppedAt = 0L;
private ActivityExecutorShutdownHook shutdownHook = null;
private NBMetricGauge threadsGauge;
public ActivityExecutor(Activity activity) {
this.activity = activity;
this.activityDef = activity.getActivityDef();
activity.getActivityDef().getParams().addListener(this);
this.tally = activity.getRunStateTally();
}
// TODO: Doc how uninitialized activities do not propagate parameter map changes and how
// TODO: this is different from preventing modification to uninitialized activities
// TODO: Determine whether this should really be synchronized
/**
* Simply stop the motors
*/
public void stopActivity() {
logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias());
activity.setRunState(RunState.Stopping);
motors.forEach(Motor::requestStop);
tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
activity.setRunState(RunState.Stopped);
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.addDetail("params", getActivityDef().toString())
.build()
);
}
/**
* Force stop the motors without trying to wait for the activity to reach stopped/finished state
*/
public void forceStopActivity() {
logger.info(() -> "force stopping activity in progress: " + this.getActivityDef().getAlias());
activity.setRunState(RunState.Stopping);
motors.forEach(Motor::requestStop);
shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
activity.setRunState(RunState.Stopped);
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.addDetail("params", getActivityDef().toString())
.build()
);
}
public Exception forceStopActivity(int initialMillisToWait) {
activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")");
activity.setRunState(RunState.Stopped);
executorService.shutdownNow();
requestStopMotors();
int divisor = 100;
int polltime = initialMillisToWait / divisor;
long gracefulWaitStartedAt = System.currentTimeMillis();
long waitUntil = initialMillisToWait + gracefulWaitStartedAt;
long time = gracefulWaitStartedAt;
while (time < waitUntil && !executorService.isTerminated()) {
try {
Thread.sleep(polltime);
time = System.currentTimeMillis();
} catch (InterruptedException ignored) {
}
}
long gracefulWaitEndedAt = System.currentTimeMillis();
logger.debug("took " + (gracefulWaitEndedAt - gracefulWaitStartedAt) + " ms to shutdown gracefully");
if (!executorService.isTerminated()) {
logger.info(() -> "stopping activity forcibly " + activity.getAlias());
List<Runnable> runnables = executorService.shutdownNow();
long forcibleShutdownCompletedAt = System.currentTimeMillis();
logger.debug(() -> "took " + (forcibleShutdownCompletedAt - gracefulWaitEndedAt) + " ms to shutdown forcibly");
logger.debug(() -> runnables.size() + " tasks never started.");
}
long activityShutdownStartedAt = System.currentTimeMillis();
logger.debug("invoking activity-specific shutdown hooks");
activity.shutdownActivity();
activity.closeAutoCloseables();
long activityShutdownEndedAt = System.currentTimeMillis();
logger.debug("took " + (activityShutdownEndedAt - activityShutdownStartedAt) + " ms to shutdown activity threads");
activitylogger.debug("FORCE STOP/after alias=(" + activity.getAlias() + ")");
if (exception != null) {
activitylogger.debug("FORCE STOP/exception alias=(" + activity.getAlias() + ")");
}
return exception;
}
/**
* Shutdown the activity executor, with a grace period for the motor threads.
*
* @param initialMillisToWait
* milliseconds to wait after graceful shutdownActivity request, before forcing
* everything to stop
*/
public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) {
Exception exception = forceStopActivity(initialMillisToWait);
if (exception != null && rethrow) {
throw new RuntimeException(exception);
}
}
/**
* Listens for changes to parameter maps, maps them to the activity instance, and notifies all eligible listeners of
* changes.
*/
@Override
public void handleParameterMapUpdate(ParameterMap parameterMap) {
activity.onActivityDefUpdate(activityDef);
// An activity must be initialized before the motors and other components are
// considered ready to handle parameter map changes. This is signaled in an activity
// by the RunState.
if (activity.getRunState() != RunState.Uninitialized) {
if (activity.getRunState() == RunState.Running) {
adjustMotorCountToThreadParam(activity.getActivityDef());
}
motors.stream()
.filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
}
}
public ActivityDef getActivityDef() {
return activityDef;
}
public String toString() {
return getClass().getSimpleName() + "~" + activityDef.getAlias();
}
private String getSlotStatus() {
return motors.stream()
.map(m -> m.getState().get().getCode())
.collect(Collectors.joining(",", "[", "]"));
}
/**
* Stop extra motors, start missing motors
*
* @param activityDef
* the activityDef for this activity instance
*/
private void adjustMotorCountToThreadParam(ActivityDef activityDef) { // TODO: Ensure that threads area allowed to complete their current op gracefully
logger.trace(() -> ">-pre-adjust->" + getSlotStatus());
reduceActiveMotorCountDownToThreadParam(activityDef);
increaseActiveMotorCountUpToThreadParam(activityDef);
alignMotorStateToIntendedActivityState();
awaitAlignmentOfMotorStateToActivityState();
logger.trace(() -> ">post-adjust->" + getSlotStatus());
}
private void increaseActiveMotorCountUpToThreadParam(ActivityDef activityDef) {
// Create motor slots
try {
while (motors.size() < activityDef.getThreads()) {
Motor motor = activity.getMotorDispenserDelegate().getMotor(activityDef, motors.size());
logger.trace(() -> "Starting cycle motor thread:" + motor);
motors.add(motor);
}
} catch (Exception e) {
System.out.print("critical error while starting motors: " + e);
logger.error("critical error while starting motors:" + e,e);
throw new RuntimeException(e);
}
}
private void reduceActiveMotorCountDownToThreadParam(ActivityDef activityDef) {
// Stop and remove extra motor slots
if (activityDef.getThreads()==0) {
logger.warn("setting threads to zero is not advised. At least one thread has to be active to keep the activity alive.");
}
// LinkedList<Motor<?>> toremove = new LinkedList<>();
// while (activityDef.getThreads()>motors.size()) {
// Motor<?> motor = motors.removeLast();
// toremove.addFirst(motor);
// }
// for (Motor<?> motor : toremove) {
// motor.requestStop();
// }
// for (Motor<?> motor : toremove) {
// motor.removeState();
// }
//
while (motors.size() > activityDef.getThreads()) {
Motor motor = motors.get(motors.size() - 1);
logger.trace(() -> "Stopping cycle motor thread:" + motor);
motor.requestStop();
motor.removeState();
/**
* NOTE: this leaves trailing, longer-running threads which might hold the executor open
* to potentially be cleaned up by {@link ExecutorService#shutdown()} or
* {@link ExecutorService#shutdownNow()}. At this point, the motor thread has
* been instructed to shutdown, and it is effectively thread-non-grata to the activity.
*/
motors.remove(motors.size() - 1);
}
}
private synchronized void alignMotorStateToIntendedActivityState() {
RunState intended = activity.getRunState();
logger.trace(() -> "ADJUSTING to INTENDED " + intended);
switch (intended) {
case Uninitialized:
break;
case Running:
case Starting:
motors.stream()
.filter(m -> m.getState().get() != RunState.Running)
.filter(m -> m.getState().get() != RunState.Finished)
.filter(m -> m.getState().get() != RunState.Starting)
.forEach(m -> {
executorService.execute(m);
});
break;
case Stopped:
motors.stream()
.filter(m -> m.getState().get() != RunState.Stopped)
.forEach(Motor::requestStop);
break;
case Finished:
case Stopping:
case Errored:
break;
// throw new RuntimeException("Invalid requested state in activity executor:" + activity.getRunState());
default:
throw new RuntimeException("Unmatched run state:" + activity.getRunState());
}
}
private void awaitAlignmentOfMotorStateToActivityState() {
logger.debug(() -> "awaiting state alignment from " + activity.getRunState());
RunStateImage states = null;
switch (activity.getRunState()) {
case Starting:
case Running:
states = tally.awaitNoneOther(RunState.Running, RunState.Finished);
break;
case Errored:
case Stopping:
case Stopped:
states = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
break;
case Uninitialized:
break;
case Finished:
states = tally.awaitNoneOther(RunState.Finished);
break;
default:
throw new RuntimeException("Unmatched run state:" + activity.getRunState());
}
RunState previousState = activity.getRunState();
activity.setRunState(states.getMaxState());
logger.debug("activity and threads are aligned to state " + previousState + " for " + this.getActivity().getAlias() + ", and advanced to " + activity.getRunState());
}
private void requestStopMotors() {
logger.info(() -> "stopping activity " + activity);
activity.setRunState(RunState.Stopping);
motors.forEach(Motor::requestStop);
}
public boolean isRunning() {
return motors.stream().anyMatch(m -> m.getState().get() == RunState.Running);
}
public Activity getActivity() {
return activity;
}
public synchronized void notifyException(Thread t, Throwable e) {
logger.debug(() -> "Uncaught exception in activity thread forwarded to activity executor: " + e.getMessage());
this.exception = new RuntimeException("Error in activity thread " + t.getName(), e);
this.requestStopMotors();
}
@Override
public ProgressMeterDisplay getProgressMeter() {
return this.activity.getProgressMeter();
}
@Override
public ExecutionResult call() throws Exception {
try (NBComponentExecutionScope scope = new NBComponentExecutionScope(activity)) {
shutdownHook = new ActivityExecutorShutdownHook(this);
Runtime.getRuntime().addShutdownHook(shutdownHook);
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.now()
.layer(Layer.Activity)
.addDetail("event", "start-activity")
.addDetail("params", activityDef.toString())
.build());
try {
// instantiate and configure fixtures that need to be present
// before threads start running such as metrics instruments
activity.initActivity();
startMotorExecutorService();
registerMetrics();
startRunningActivityThreads();
awaitMotorsAtLeastRunning();
logger.debug("STARTED " + activityDef.getAlias());
awaitActivityCompletion();
} catch (Exception e) {
this.exception = e;
} finally {
stoppedAt = System.currentTimeMillis();
// TODO: close out metrics outputs on component tree if needed
activity.shutdownActivity();
activity.closeAutoCloseables();
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
finish(true);
return result;
}
} catch (Exception e2) {
throw new RuntimeException(e2);
}
}
/**
* This waits for at least one motor to be in running, finished or stopped state.
* A motor with enough cycles to read will go into a running state. A motor which has
* a short read immediately after being started will go into a finished state. A motor
* which has been stopped for some reason, like an error or a stop command will go into
* stopped state. All of these states are sufficient to signal that successful startup
* has been completed at least.
*/
private void awaitMotorsAtLeastRunning() {
if (this.exception!=null) {
return;
}
RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored);
RunState maxState = states.getMaxState();
if (maxState == RunState.Errored) {
activity.setRunState(maxState);
throw new RuntimeException("Error in activity");
}
}
// public synchronized void startActivity() {
// RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped);
// if (startable.isTimeout()) {
// throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable);
// }
// startMotorExecutorService();
// startRunningActivityThreads();
// awaitMotorsAtLeastRunning();
// }
private void registerMetrics() {
this.activity.create().gauge(
"threads",
() -> (double) this.motors.size(),
MetricCategory.Core,
"The current number of threads in activity " + this.description()
);
}
private boolean shutdownExecutorService(int secondsToWait) {
activitylogger.debug(() -> "Shutting down motor executor for (" + activity.getAlias() + ")");
boolean wasStopped = false;
try {
executorService.shutdown();
logger.trace(() -> "awaiting termination with timeout of " + secondsToWait + " seconds");
wasStopped = executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
logger.trace("interrupted while awaiting termination");
logger.warn("while waiting termination of shutdown " + activity.getAlias() + ", " + ie.getMessage());
activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped);
} catch (RuntimeException e) {
logger.trace("Received exception while awaiting termination: " + e.getMessage());
wasStopped = true;
exception = e;
} finally {
logger.trace(() -> "finally shutting down activity " + this.getActivity().getAlias());
this.stoppedAt = System.currentTimeMillis();
activity.setRunState(RunState.Stopped);
}
if (exception != null) {
logger.trace(() -> "an exception caused the activity to stop:" + exception.getMessage());
logger.warn("Setting ERROR on motor executor for activity '" + activity.getAlias() + "': " + exception.getMessage());
throw new RuntimeException(exception);
}
activitylogger.debug("motor executor for " + activity.getAlias() + ") wasstopped=" + wasStopped);
return wasStopped;
}
private void awaitActivityCompletion() {
RunStateImage state = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
RunState maxState = state.getMaxState();
activity.setRunState(maxState);
if (maxState == RunState.Errored) {
throw new RuntimeException("Error while waiting for activity completion with states [" + tally.toString() + "], error=" + (this.exception!=null ?
this.exception.toString() : "[no error on activity executor]"));
}
}
private void startMotorExecutorService() {
this.executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
);
}
/**
* <p>True-up the number of motor instances known to the executor. Start all non-running motors.
* The protocol between the motors and the executor should be safe as long as each state change is owned by either
* the motor logic or the activity executor but not both, and strictly serialized as well. This is enforced by
* forcing start(...) to be serialized as well as using CAS on the motor states.</p>
* <p>The startActivity method may be called to true-up the number of active motors in an activity executor after
* changes to threads.</p>
*/
private void startRunningActivityThreads() {
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.now()
.layer(Layer.Activity)
.addDetail("params", getActivityDef().toString())
.build()
);
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
try {
activity.setRunState(RunState.Starting);
this.startedAt = System.currentTimeMillis();
activity.onActivityDefUpdate(activityDef);
} catch (Exception e) {
this.exception = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e);
activitylogger.error(() -> "error initializing activity '" + activity.getAlias() + "': " + exception);
throw new RuntimeException(exception);
}
adjustMotorCountToThreadParam(activity.getActivityDef());
tally.awaitAny(RunState.Running, RunState.Finished, RunState.Stopped);
activity.setRunState(RunState.Running);
activitylogger.debug("START/after alias=(" + activity.getAlias() + ")");
}
@Override
public NBLabels getLabels() {
return activity.getLabels();
}
public synchronized void finish(boolean graceful) {
if (graceful) {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
} else {
logger.warn("Activity was interrupted by process exit, shutting down ungracefully. Annotations are still submitted.");
}
if (shutdownHook == null) return; // In case of a race condition, only prevented by object monitor
else shutdownHook = null;
stoppedAt = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.interval(startedAt, stoppedAt)
.layer(Layer.Activity)
.addDetail("event", "stop-activity")
.addDetail("params", activityDef.toString())
.build());
}
public void awaitMotorsRunningOrTerminalState() {
awaitMotorsAtLeastRunning();
}
public boolean awaitAllThreadsOnline(long timeoutMs) {
RunStateImage image = tally.awaitNoneOther(timeoutMs, RunState.Running);
return image.isNoneOther(RunState.Running);
}
private class ThreadsGauge implements Gauge<Double> {
public ThreadsGauge(ActivityExecutor activityExecutor) {
ActivityExecutor ae = activityExecutor;
}
@Override
public Double getValue() {
return (double) ActivityExecutor.this.motors.size();
}
}
}

View File

@ -21,6 +21,7 @@ import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ActivityExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -37,10 +38,9 @@ public class ActivityRuntimeInfo implements ProgressCapable {
private final ActivityExecutor executor;
public ActivityRuntimeInfo(Activity activity, Future<ExecutionResult> result, ActivityExecutor executor) {
this.executor = executor;
this.activity = activity;
this.future = result;
this.executor = executor;
}
@Override
@ -76,23 +76,24 @@ public class ActivityRuntimeInfo implements ProgressCapable {
return this.activity;
}
public boolean isRunning() {
return executor.isRunning();
}
public RunState getRunState() {
return this.activity.getRunState();
}
public void stopActivity() { this.executor.stopActivity(); }
public boolean isRunning() {
throw new RuntimeException("implement me");
}
public void forceStopActivity() { this.executor.forceStopActivity(); }
public void forceStopActivity() {
throw new RuntimeException("implement me");
}
public void stopActivity() {
throw new RuntimeException("implement me");
}
public ActivityExecutor getActivityExecutor() {
return executor;
}
public boolean awaitAllThreadsOnline(long timeoutMs) {
return this.executor.awaitAllThreadsOnline(timeoutMs);
return this.executor;
}
}

View File

@ -0,0 +1,60 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
public class LifetimeThread implements Callable<ExecutionResult> {
private final static Logger logger = LogManager.getLogger();
private ExecutionResult result = null;
private final String name;
private CompletableFuture<ExecutionResult> future;
public LifetimeThread(String name) {
this.name = name;
}
@Override
public ExecutionResult call() {
logger.debug("lifetime scope '" + name + "' starting");
this.future = new CompletableFuture<ExecutionResult>();
while (result == null) {
try {
this.result = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
logger.debug("lifetime scope '" + name + "' ending");
return result;
}
public void shutdown(ExecutionResult result) {
this.future.complete(result);
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.commands;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.PrintWriter;
import java.io.Reader;
@Service(value = NBBaseCommand.class, selector = "run2")
public class CMD_run2 extends NBBaseCommand {
public final static Logger logger = LogManager.getLogger("run2");
public CMD_run2(NBBufferedContainer parentComponent, String stepName, String targetScenario) {
super(parentComponent, stepName, targetScenario);
}
@Override
public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) {
controller.run(params);
return null;
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.commands;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBCommandInfo;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBInvokableCommand;
import io.nosqlbench.nb.annotations.Service;
@Service(value = NBCommandInfo.class,selector = "run2")
public class INFO_run2 extends NBCommandInfo {
@Override
public Class<? extends NBInvokableCommand> getType() {
return CMD_run2.class;
}
}

View File

@ -0,0 +1,70 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.scenario.container;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.Startable;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultSegmentBuffer;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.activity.LifetimeThread;
import java.util.concurrent.Callable;
import java.util.concurrent.StructuredTaskScope;
public class ActivityExecutor implements Callable<ExecutionResult> {
private final Activity activity;
private LifetimeThread lifetimeThread;
StructuredTaskScope<ExecutionResult> scope;
public ActivityExecutor(Activity activity) {
this.activity = activity;
}
@Override
public ExecutionResult call() throws Exception {
long startedAt=System.currentTimeMillis();
long endedAt=0L;
Exception error = null;
try (StructuredTaskScope.ShutdownOnFailure t= new StructuredTaskScope.ShutdownOnFailure()) {
lifetimeThread = new LifetimeThread(activity.getAlias());
t.fork(lifetimeThread);
runCycles(t,activity);
lifetimeThread = new LifetimeThread(activity.getAlias());
t.join();
} catch (Exception e) {
error=e;
} finally {
endedAt=System.currentTimeMillis();
}
return new ExecutionResult(startedAt,endedAt,"",error);
}
private void runCycles(StructuredTaskScope.ShutdownOnFailure t, Activity activity) {
Input input = activity.getInputDispenserDelegate().getInput();
if (input instanceof Startable) {
((Startable) input).start();
}
CycleSegment cycleSegment = null;
int stride = activity.getActivityDef().getParams().getOptionalInteger("stride").orElse(1);
CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride);
}
}

View File

@ -25,7 +25,6 @@ import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.engine.core.lifecycle.activity.ActivitiesExceptionHandler;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityLoader;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityRuntimeInfo;
import org.apache.logging.log4j.LogManager;
@ -70,7 +69,6 @@ public class ContainerActivitiesController extends NBBaseComponent {
return ari.getActivity();
}
private ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef, this);
@ -78,13 +76,13 @@ public class ContainerActivitiesController extends NBBaseComponent {
ActivityExecutor executor = new ActivityExecutor(activity);
Future<ExecutionResult> startedActivity = executorService.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
activityRuntimeInfo.getActivityExecutor().awaitMotorsRunningOrTerminalState();
this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo);
}
return this.activityInfoMap.get(activityDef.getAlias());
}
/**
* Start an activity, given a map which holds the activity definition for it. The activity will be known in
* the scenario by the alias parameter.
@ -94,7 +92,6 @@ public class ContainerActivitiesController extends NBBaseComponent {
public Activity start(Map<String, String> activityDefMap) {
ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap));
Activity started = start(ad);
awaitAllThreadsOnline(started,30000L);
return started;
}
@ -120,9 +117,7 @@ public class ContainerActivitiesController extends NBBaseComponent {
* @param activityDef A definition for an activity to run
*/
public synchronized void run(ActivityDef activityDef, long timeoutMs) {
doStartActivity(activityDef);
awaitActivity(activityDef, timeoutMs);
throw new RuntimeException("implement me");
}
public synchronized void run(int timeout, String activityDefString) {
@ -176,26 +171,12 @@ public class ContainerActivitiesController extends NBBaseComponent {
runtimeInfo.stopActivity();
}
public boolean awaitAllThreadsOnline(ActivityDef activityDef, long timeoutMs) {
ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias());
if (null == runtimeInfo) {
throw new RuntimeException("could not stop missing activity:" + activityDef);
}
scenariologger.debug("STOP {}", activityDef.getAlias());
return runtimeInfo.awaitAllThreadsOnline(timeoutMs);
}
public synchronized void stop(Activity activity) {
stop(activity.getActivityDef());
}
public boolean awaitAllThreadsOnline(Activity activity, long timeoutMs) {
return awaitAllThreadsOnline(activity.getActivityDef(), timeoutMs);
}
/**
* <p>Stop an activity, given an activity def map. The only part of the map that is important is the
@ -342,8 +323,8 @@ public class ContainerActivitiesController extends NBBaseComponent {
*/
public synchronized void forceStopActivities(int waitTimeMillis) {
logger.debug("force stopping scenario {}", description());
activityInfoMap.values().forEach(a -> a.getActivityExecutor().forceStopActivity(2000));
logger.debug("Scenario force stopped.");
throw new RuntimeException("implement me");
//logger.debug("Scenario force stopped.");
}
// public synchronized void stopAll() {

View File

@ -1,43 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.motor;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class RunStateImageTest {
@Test
public void testMaxStateImage() {
int[] counts = new int[RunState.values().length];
counts[RunState.Running.ordinal()]=3;
counts[RunState.Starting.ordinal()]=2;
RunStateImage image = new RunStateImage(counts, false);
assertThat(image.is(RunState.Running)).isTrue();
assertThat(image.is(RunState.Starting)).isTrue();
assertThat(image.isTimeout()).isFalse();
assertThat(image.is(RunState.Errored)).isFalse();
assertThat(image.isNoneOther(RunState.Starting, RunState.Running)).isTrue();
RunState maxState = image.getMaxState();
assertThat(maxState).isEqualTo(RunState.values()[RunState.Running.ordinal()]);
RunState minState = image.getMinState();
assertThat(minState).isEqualTo(RunState.values()[RunState.Starting.ordinal()]);
}
}

View File

@ -1,214 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl.motor;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import org.junit.jupiter.api.*;
import static org.assertj.core.api.Assertions.assertThat;
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class RunStateTallyTest {
volatile boolean awaited = false;
volatile RunStateImage event = null;
@BeforeEach
public void setup() {
awaited = false;
event = null;
}
@Test
@Order(1)
public void testAwaitAny() {
Thread.currentThread().setName("SETTER");
RunStateTally tally = new RunStateTally();
awaited = false;
Thread waiter = new Thread(new Runnable() {
@Override
public void run() {
event = tally.awaitAny(RunState.Running);
awaited = true;
}
});
waiter.setName("WAITER");
waiter.setDaemon(true);
waiter.start();
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isFalse();
tally.add(RunState.Running);
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(event.is(RunState.Running)).isTrue();
assertThat(event.isOnly(RunState.Running)).isTrue();
assertThat(awaited).isTrue();
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
}
@Test
@Order(2)
public void testAwaitNoneOf() {
Thread.currentThread().setName("SETTER");
RunStateTally tally = new RunStateTally();
tally.add(RunState.Uninitialized);
tally.add(RunState.Stopped);
awaited = false;
Thread waiter = new Thread(new Runnable() {
@Override
public void run() {
tally.awaitNoneOf(RunState.Stopped, RunState.Uninitialized);
awaited = true;
}
});
waiter.setName("WAITER");
waiter.setDaemon(true);
waiter.start();
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isFalse();
tally.change(RunState.Stopped, RunState.Finished);
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isFalse();
tally.change(RunState.Uninitialized, RunState.Finished);
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isTrue();
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
}
@Test
@Order(3)
public void testAwaitNoneOther() {
Thread.currentThread().setName("SETTER");
RunStateTally tally = new RunStateTally();
tally.add(RunState.Uninitialized);
tally.add(RunState.Running);
awaited = false;
Thread waiter = new Thread(new Runnable() {
@Override
public void run() {
event = tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
awaited = true;
}
});
waiter.setName("WAITER");
waiter.setDaemon(true);
waiter.start();
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isFalse();
tally.change(RunState.Uninitialized, RunState.Finished);
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isFalse();
// Note that neither Stopped or Finished are required to be positive,
// as long as all others are zero total.
tally.remove(RunState.Running);
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isTrue();
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
}
@Test
@Order(4)
public void testAwaitNoneOtherTimedOut() {
Thread.currentThread().setName("SETTER");
RunStateTally tally = new RunStateTally();
tally.add(RunState.Uninitialized);
tally.add(RunState.Running);
Thread waiter = new Thread(new Runnable() {
@Override
public void run() {
event = tally.awaitNoneOther(1500, RunState.Stopped, RunState.Finished);
awaited = true;
}
});
waiter.setName("WAITER");
waiter.setDaemon(true);
waiter.start();
try {
Thread.sleep(100);
} catch (Exception e) {
}
assertThat(awaited).isFalse();
tally.change(RunState.Uninitialized, RunState.Finished);
try {
Thread.sleep(1500);
} catch (Exception e) {
}
// try {
// waiter.join();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
assertThat(event.isOnly(RunState.Errored)).isFalse();
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
}
}

View File

@ -1,215 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityimpl.CoreServices;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.concurrent.*;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
class ActivityExecutorTest {
private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class);
// TODO: Design review of this mechanism
// @Test
// synchronized void testRestart() {
// ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-restart;cycles=1000;cyclerate=10;op=initdelay:initdelay=5000;");
// new ActivityTypeLoader().load(activityDef);
//
// final Activity activity = new DelayedInitActivity(activityDef);
// InputDispenser inputDispenser = new CoreInputDispenser(activity);
// ActionDispenser adisp = new CoreActionDispenser(activity);
// OutputDispenser tdisp = CoreServices.getOutputDispenser(activity).orElse(null);
//
// final MotorDispenser<?> mdisp = new CoreMotorDispenser(activity, inputDispenser, adisp, tdisp);
// activity.setActionDispenserDelegate(adisp);
// activity.setOutputDispenserDelegate(tdisp);
// activity.setInputDispenserDelegate(inputDispenser);
// activity.setMotorDispenserDelegate(mdisp);
//
// final ExecutorService executor = Executors.newCachedThreadPool();
// ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-restart");
// final Future<ExecutionResult> future = executor.submit(activityExecutor);
// try {
// activityDef.setThreads(1);
// activityExecutor.startActivity();
// Thread.sleep(100L);
// activityExecutor.stopActivity();
// Thread.sleep(100L);
// activityExecutor.startActivity();
// Thread.sleep(100L);
// activityExecutor.stopActivity();
// future.get();
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// executor.shutdown();
// assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNotNull();
//
// }
@Test
synchronized void testDelayedStartSanity() {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
Activity activity = new DelayedInitActivity(activityDef);
final InputDispenser inputDispenser = new CoreInputDispenser(activity);
final ActionDispenser actionDispenser = new CoreActionDispenser(activity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
activity.setActionDispenserDelegate(actionDispenser);
activity.setOutputDispenserDelegate(outputDispenser);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser);
ActivityExecutor activityExecutor = new ActivityExecutor(activity);
ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
try {
activityDef.setThreads(1);
future.get();
testExecutor.shutdownNow();
} catch (final Exception e) {
fail("Unexpected exception", e);
}
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
}
@Test
synchronized void testNewActivityExecutor() {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef,TestComponent.INSTANCE);
Activity simpleActivity = new SimpleActivity(TestComponent.INSTANCE,activityDef);
// this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef));
final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
inputDispenser, actionDispenser, outputDispenser);
simpleActivity.setActionDispenserDelegate(actionDispenser);
simpleActivity.setInputDispenserDelegate(inputDispenser);
simpleActivity.setMotorDispenserDelegate(motorDispenser);
ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity);
activityDef.setThreads(5);
ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor);
// activityExecutor.startActivity();
final int[] speeds = {1, 50, 5, 50, 2, 50};
for (int offset = 0; offset < speeds.length; offset += 2) {
final int threadTarget = speeds[offset];
final int threadTime = speeds[offset + 1];
ActivityExecutorTest.logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
activityDef.setThreads(threadTarget);
try {
Thread.sleep(threadTime);
} catch (final Exception e) {
fail("Not expecting exception", e);
}
}
executionResultForkJoinTask.cancel(true);
// Used for slowing the roll due to state transitions in test.
try {
activityExecutor.stopActivity();
// Thread.sleep(2000L);
} catch (final Exception e) {
fail("Not expecting exception", e);
}
}
private MotorDispenser<?> getActivityMotorFactory(final Action lc, Input ls) {
return new MotorDispenser<>() {
@Override
public Motor getMotor(final ActivityDef activityDef, final int slotId) {
final Activity activity = new SimpleActivity(TestComponent.INSTANCE,activityDef);
final Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
cm.setAction(lc);
return cm;
}
};
}
private SyncAction motorActionDelay(long delay) {
return new SyncAction() {
@Override
public int runCycle(final long cycle) {
ActivityExecutorTest.logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
try {
Thread.sleep(delay);
} catch (final InterruptedException ignored) {
}
return 0;
}
};
}
private static class DelayedInitActivity extends SimpleActivity {
private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class);
public DelayedInitActivity(final ActivityDef activityDef) {
super(TestComponent.INSTANCE,activityDef);
}
@Override
public void initActivity() {
final Integer initDelay = this.activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
DelayedInitActivity.logger.info(() -> "delaying for " + initDelay);
try {
Thread.sleep(initDelay);
} catch (final InterruptedException ignored) {
}
DelayedInitActivity.logger.info(() -> "delayed for " + initDelay);
}
}
}

View File

@ -1,141 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.function.Predicate;
import static org.assertj.core.api.Assertions.assertThat;
public class CoreMotorTest {
@Test
public void testBasicActivityMotor() {
final Activity activity = new SimpleActivity(
new TestComponent("testing", "coremotor"),
ActivityDef.parseActivityDef("alias=foo")
);
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm = new CoreMotor(activity, 5L, lockstepper);
final AtomicLong observableAction = new AtomicLong(-3L);
cm.setAction(this.getTestConsumer(observableAction));
final Thread t = new Thread(cm);
t.setName("TestMotor");
t.start();
try {
Thread.sleep(1000); // allow action time to be waiting in monitor for test fixture
} catch (final InterruptedException ignored) {
}
lockstepper.publishSegment(5L);
final boolean result = this.awaitCondition(atomicInteger -> 5L == atomicInteger.get(), observableAction, 5000, 100);
assertThat(observableAction.get()).isEqualTo(5L);
}
@Test
public void testIteratorStride() {
SimpleActivity activity = new SimpleActivity(TestComponent.INSTANCE, "stride=3");
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm1 = new CoreMotor(activity, 1L, lockstepper);
final AtomicLongArray ary = new AtomicLongArray(10);
final Action a1 = this.getTestArrayConsumer(ary);
cm1.setAction(a1);
final Thread t1 = new Thread(cm1);
t1.setName("cm1");
t1.start();
try {
Thread.sleep(500); // allow action time to be waiting in monitor for test fixture
} catch (final InterruptedException ignored) {
}
lockstepper.publishSegment(11L, 12L, 13L);
final boolean result = this.awaitAryCondition(ala -> 13L == ala.get(2), ary, 5000, 100);
assertThat(ary.get(0)).isEqualTo(11L);
assertThat(ary.get(1)).isEqualTo(12L);
assertThat(ary.get(2)).isEqualTo(13L);
assertThat(ary.get(3)).isEqualTo(0L);
}
private SyncAction getTestArrayConsumer(AtomicLongArray ary) {
return new SyncAction() {
private int offset;
@Override
public int runCycle(final long cycle) {
ary.set(this.offset, cycle);
this.offset++;
return 0;
}
};
}
private SyncAction getTestConsumer(AtomicLong atomicLong) {
return new SyncAction() {
@Override
public int runCycle(final long cycle) {
atomicLong.set(cycle);
return 0;
}
};
}
private boolean awaitAryCondition(final Predicate<AtomicLongArray> atomicLongAryPredicate, final AtomicLongArray ary, final long millis, final long retry) {
final long start = System.currentTimeMillis();
long now = start;
while (now < (start + millis)) {
final boolean result = atomicLongAryPredicate.test(ary);
if (result) return true;
try {
Thread.sleep(retry);
} catch (final InterruptedException ignored) {
}
now = System.currentTimeMillis();
}
return false;
}
private boolean awaitCondition(final Predicate<AtomicLong> atomicPredicate, final AtomicLong atomicInteger, final long millis, final long retry) {
final long start = System.currentTimeMillis();
long now = start;
while (now < (start + millis)) {
final boolean result = atomicPredicate.test(atomicInteger);
if (result) return true;
try {
Thread.sleep(retry);
} catch (final InterruptedException ignored) {
}
now = System.currentTimeMillis();
}
return false;
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023 nosqlbench
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -14,18 +14,14 @@
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.activity;
package io.nosqlbench.virtdata.api.coords;
public class ActivityExecutorShutdownHook extends Thread {
private final ActivityExecutor activityExecutor;
public ActivityExecutorShutdownHook(ActivityExecutor activityExecutor) {
this.activityExecutor = activityExecutor;
}
@Override
public void run() {
activityExecutor.finish(false);
}
/**
* A virtual coordinate, meant to index into virtual data space with named dimensions
*/
public record VirtualCoord(
VirtualSpace space,
long[] coords
) {
}

View File

@ -0,0 +1,28 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.api.coords;
public record VirtualSpace(
String[] names
) {
public static VirtualSpace BASE =
new VirtualSpace(new String[] {
"recycle",
"cycle",
"stride"
});
}

View File

@ -33,7 +33,7 @@ import java.util.function.LongToIntFunction;
@Categories({Category.general})
public class AlphaNumericString implements LongFunction<String> {
private static final String AVAILABLE_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
private final ThreadLocal<StringBuilder> threadStringBuilder = ThreadLocal.withInitial(StringBuilder::new);
// private final ThreadLocal<StringBuilder> threadStringBuilder = ThreadLocal.withInitial(StringBuilder::new);
private final Hash hash = new Hash();
private final LongToIntFunction lengthFunc;
@ -69,8 +69,7 @@ public class AlphaNumericString implements LongFunction<String> {
}
long hashValue = operand;
StringBuilder sb = threadStringBuilder.get();
sb.setLength(0);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++)
{
hashValue = hash.applyAsLong(hashValue);

View File

@ -94,7 +94,11 @@ public class CMD_reset extends NBBaseCommand {
//TODO: This needs to be reworked, but simply calling controller.start on the flywheel results in 2
// copies of the activity running simultaneously. This is a temporary workaround.
SimFrameUtils.awaitActivity(flywheel);
flywheel.getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(), 0).run();
try {
flywheel.getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(), 0).call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
return null;

View File

@ -62,7 +62,7 @@ public class FindmaxFrameFunction implements SimFrameFunction<FindmaxFrameParams
capture.stopWindow();
journal.record(params,capture.last());
System.out.println(journal.last());
if (flywheel.getRunStateTally().tallyFor(RunState.Running)==0) {
if (flywheel.getRunState()!=RunState.Running) {
System.out.println("state:" + flywheel.getRunState());
throw new RuntimeException("Early exit of flywheel activity '" + flywheel.getAlias() + "'. Can't continue.");
}

View File

@ -59,9 +59,10 @@ public class OptimoFrameFunction implements SimFrameFunction<OptimoFrameParams>
capture.stopWindow();
journal.record(params,capture.last());
System.out.println(journal.last());
if (flywheel.getRunStateTally().tallyFor(RunState.Running)==0) {
if (flywheel.getRunState()!=RunState.Running) {
System.out.println("state:" + flywheel.getRunState());
throw new RuntimeException("Early exit of flywheel activity '" + flywheel.getAlias() + "'. Can't continue.");
}
return journal.last().value();
}

View File

@ -34,7 +34,7 @@ class ExitStatusIntegrationTests {
ProcessInvoker invoker = new ProcessInvoker();
invoker.setLogDir("logs/test");
ProcessResult result = invoker.run("exitstatus_badparam", 15,
java, "-jar", JARNAME, "--logs-dir", "logs/test/badparam/",
java, "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/badparam/",
"badparam"
);
assertThat(result.exception).isNull();
@ -48,7 +48,7 @@ class ExitStatusIntegrationTests {
ProcessInvoker invoker = new ProcessInvoker();
invoker.setLogDir("logs/test");
ProcessResult result = invoker.run("exitstatus_initexception", 15,
java, "-jar", JARNAME, "--logs-dir", "logs/test/initerror", "run",
java, "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/initerror", "run",
"driver=diag", "op=initdelay:initdelay=notanumber"
);
assertThat(result.exception).isNull();
@ -64,7 +64,8 @@ class ExitStatusIntegrationTests {
// Forcing a thread exception via basic command issue.
ProcessResult result = invoker.run("exitstatus_threadexception", 30,
"java", "-jar", JARNAME, "--logs-dir", "logs/test/threadexcep", "--logs-level", "debug", "run",
"java", "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/threadexcep", "--logs-level",
"debug", "run",
"driver=diag", "cyclerate=10", "not_a_thing", "cycles=100", "-vvv"
);
String stdout = String.join("\n", result.getStdoutData());
@ -77,7 +78,8 @@ class ExitStatusIntegrationTests {
ProcessInvoker invoker = new ProcessInvoker();
invoker.setLogDir("logs/test");
ProcessResult result = invoker.run("exitstatus_asyncstoprequest", 60,
"java", "-jar", JARNAME, "--logs-dir", "logs/test/asyncstop", "--logs-level", "debug", "run",
"java", "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/asyncstop", "--logs-level",
"debug", "run",
"driver=diag", "threads=2", "cyclerate=10", "op=erroroncycle:erroroncycle=10", "cycles=50", "-vvv"
);
assertThat(result.exception).isNull();