diff --git a/mvn-defaults/pom.xml b/mvn-defaults/pom.xml index 1704c21af..69651a200 100644 --- a/mvn-defaults/pom.xml +++ b/mvn-defaults/pom.xml @@ -527,7 +527,7 @@ org.apache.maven.plugins maven-surefire-plugin - -ea @{argLine} + -ea --enable-preview @{argLine} org.apache.logging.log4j.core.async.AsyncLoggerContextSelector @@ -612,6 +612,7 @@ + --enable-preview 1 false @@ -733,12 +734,12 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M8 + 3.2.5 org.apache.maven.plugins maven-failsafe-plugin - 3.0.0-M8 + 3.2.5 org.apache.maven.plugins @@ -884,7 +885,7 @@ false - -Xdoclint:none + -Xdoclint:none --enable-preview none diff --git a/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/rainbow/TokenMapFileAPIService.java b/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/rainbow/TokenMapFileAPIService.java index 03bac47ea..25316fb92 100644 --- a/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/rainbow/TokenMapFileAPIService.java +++ b/nb-adapters/adapter-cqld4/src/main/java/io/nosqlbench/datamappers/functions/rainbow/TokenMapFileAPIService.java @@ -34,9 +34,6 @@ import java.nio.ByteBuffer; */ public class TokenMapFileAPIService { -// public static ThreadLocal> tl_cll = -// ThreadLocal.withInitial(HashMap::new); -// private final int recordCount; private final ByteBuffer buffer; private final int RECORD_LEN = Long.BYTES * 2; diff --git a/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_tabular2.yaml b/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_tabular2.yaml index 686d02a0a..39b82299a 100644 --- a/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_tabular2.yaml +++ b/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_tabular2.yaml @@ -31,7 +31,7 @@ scenarios: rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto astra: - schema: run driver=cql tags==block:schema-astra threads==1 cycles==UNDEF + schema: run driver=cql tags==block:schema_astra threads==1 cycles==UNDEF rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto main: run driver=cql tags==block:"main.*" cycles===TEMPLATE(main-cycles,100) threads=auto basic_check: diff --git a/nb-adapters/adapter-diag/src/main/java/io/nosqlbench/adapter/diag/optasks/DiagTask.java b/nb-adapters/adapter-diag/src/main/java/io/nosqlbench/adapter/diag/optasks/DiagTask.java index e4719596b..114902c64 100644 --- a/nb-adapters/adapter-diag/src/main/java/io/nosqlbench/adapter/diag/optasks/DiagTask.java +++ b/nb-adapters/adapter-diag/src/main/java/io/nosqlbench/adapter/diag/optasks/DiagTask.java @@ -31,9 +31,7 @@ import java.util.function.BiFunction; * be the core driver for the purposes of validating the NoSQLBench engine * and associated libraries and extensions. * - * The tasks are retained as shared across threads by default. This means that if - * you want to have per-thread logic or to do things which are not thread-safe, - * you need to use ThreadLocal or some other pattern to do so. + * The tasks are retained as shared across threads by default. * * Tasks must provide a no-args constructor (or no constructor, which does the same). * Tasks must specify their configuration requirements via the {@link NBConfigurable} diff --git a/nb-adapters/adapter-http/src/main/java/io/nosqlbench/adapter/http/core/HttpSpace.java b/nb-adapters/adapter-http/src/main/java/io/nosqlbench/adapter/http/core/HttpSpace.java index 63cffd03f..65d7a11fc 100644 --- a/nb-adapters/adapter-http/src/main/java/io/nosqlbench/adapter/http/core/HttpSpace.java +++ b/nb-adapters/adapter-http/src/main/java/io/nosqlbench/adapter/http/core/HttpSpace.java @@ -32,9 +32,8 @@ import java.util.List; import java.util.Locale; /** - * ThreadLocal http clients have been removed from this version, as the built-in - * HTTP client implementation is meant to be immutable. If shared-state issues - * occur, thread-local support will be re-added. + * per-thread http clients have been removed from this version, as the built-in + * HTTP client implementation is meant to be immutable. */ public class HttpSpace implements NBLabeledElement { private final static Logger logger = LogManager.getLogger(HttpSpace.class); diff --git a/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java b/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java index a7fb0ce80..5143b982e 100644 --- a/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java +++ b/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java @@ -19,6 +19,7 @@ import io.nosqlbench.adapter.jdbc.JDBCSpace; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException; import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector; +import io.nosqlbench.nb.api.lifecycle.ObjectPool; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -36,14 +37,12 @@ public abstract class JDBCDMLOp extends JDBCOp { protected final String pStmtSqlStr; protected final List pStmtValList; - protected static ThreadLocal jdbcStmtTL = ThreadLocal.withInitial(() -> null); - public JDBCDMLOp(JDBCSpace jdbcSpace, boolean isReadStmt, String pStmtSqlStr, List pStmtValList) { super(jdbcSpace); - assert(StringUtils.isNotBlank(pStmtSqlStr)); + assert (StringUtils.isNotBlank(pStmtSqlStr)); this.isReadStmt = isReadStmt; this.pStmtSqlStr = pStmtSqlStr; @@ -75,7 +74,7 @@ public abstract class JDBCDMLOp extends JDBCOp { protected PreparedStatement setPrepStmtValues(PreparedStatement stmt) throws SQLException { assert (stmt != null); - for (int i=0; i, , ... ]" format, @@ -96,8 +95,7 @@ public abstract class JDBCDMLOp extends JDBCOp { } } stmt.setObject(fieldIdx, fieldValObj); - } - catch ( SQLException e) { + } catch (SQLException e) { throw new SQLException( "Failed to parse the prepared statement value for field[" + fieldIdx + "] " + fieldValObj); } @@ -115,22 +113,19 @@ public abstract class JDBCDMLOp extends JDBCOp { } protected Statement createDMLStatement() throws SQLException { - Statement stmt = jdbcStmtTL.get(); - if (stmt == null) { - if (isPreparedStmt) - stmt = jdbcConnection.prepareStatement(pStmtSqlStr); - else - stmt = jdbcConnection.createStatement(); + final Statement stmt = isPreparedStmt + ? jdbcConnection.prepareStatement(pStmtSqlStr) + : jdbcConnection.createStatement(); - jdbcStmtTL.set(stmt); + LOGGER.debug( + () -> "A statement is created -- prepared: " + +isPreparedStmt + +", read/write: " + + (isReadStmt ? "read" : "write) " + + ", stmt: " + + stmt) + ); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}", - isPreparedStmt, - isReadStmt ? "read" : "write", - stmt); - } - } return stmt; } } diff --git a/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java b/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java index 11d06fe9c..dae8e2009 100644 --- a/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java +++ b/nb-adapters/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java @@ -20,6 +20,7 @@ import io.nosqlbench.adapter.jdbc.JDBCSpace; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException; import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector; import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; +import io.nosqlbench.nb.api.lifecycle.ObjectPool; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,6 +29,7 @@ import java.util.Properties; import java.util.Random; public abstract class JDBCOp implements CycleOp { + private static final Logger LOGGER = LogManager.getLogger(JDBCOp.class); protected static final String LOG_COMMIT_SUCCESS = "Executed the JDBC statement & committed the connection successfully"; diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java index 8a7f45d2a..4f7f8d464 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java @@ -71,8 +71,7 @@ public abstract class BaseOpDispenser extends NBBaseComponent i private final List> verifierStaticImports = new ArrayList<>(); /** * optional invokable functions which throw exceptions when results are not verifiable. - * This variable is kept here for diagnostics and debugging. The actual instance used within - * each thread is provided by a {@link ThreadLocal} via {@link #getVerifier()} + * This variable is kept here for diagnostics and debugging. */ private final CycleFunction _verifier; private final ThreadLocal> tlVerifier; diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java index a2ae8e5fb..4ef94c530 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java @@ -26,6 +26,7 @@ public class NBComponentExecutionScope implements AutoCloseable { public NBComponentExecutionScope(NBComponent... components) { this.components = components; } + @Override public void close() throws RuntimeException { for (NBComponent component : components) { diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/lifecycle/ObjectPool.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/lifecycle/ObjectPool.java index b5792206f..73158e50a 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/lifecycle/ObjectPool.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/lifecycle/ObjectPool.java @@ -26,30 +26,30 @@ import java.util.function.Supplier; /** *

This object pooling class provides a simple way to pool objects which: *

    - *
  • Have a non-trivial creation cost
  • + *
  • Are expensive to create relative to how they are used
  • *
  • Need to be used my multiple threads
  • *
  • Are not thread safe
  • *
*

- * The overhead of using this pool should be weighed against the simpler case of just - * creating the object where it is needed. Generally speaking, avoid using this pool - * unless you know that object caching or pooling is needed. + * The overhead of using this pool should be weighed against the simpler case of just creating the object where it is + * needed. Generally speaking, avoid using this pool unless you know that object caching or pooling is needed from + * profiling. *

* - *

This pool keeps count of how many active elements are in use by caller threads. - * In the event that the available pool of objects is more than 2X the size of the active - * ones, it is reduced to 1/2 its current size, after the last element is released. - * This means that the pool will size down automatically when there are transient or episodic - * spikes. + *

This pool keeps count of how many active elements are in use by caller threads. In the event that the available + * pool of objects is more than 2X the size of the active ones, it is reduced to 1/2 its current size, after + * the last element is released. This means that the pool will size down automatically when there are transient or + * episodic spikes. *

* - *

The pool also handles object resetting via the consumer provided. Although it is expressed as a consumer, - * it doesn't strictly consume references. It simply does whatever the caller needs in order to effectively - * reset one of the pooled objects as it is returned to the resource pool.

+ *

The pool also handles object resetting via the {@link Consumer} provided. Although it is expressed as a consumer, + * it doesn't strictly consume references. It simply does whatever the caller needs in order to effectively reset one of + * the pooled objects as it is returned to the resource pool.

+ * + *

Safe usage of this class is achieved by wrapping it with a try-with-resources clause so that automatic + * resource management takes care of returning and reusing objects. The type returned by {@link #get()} is a + * reference wrapper. For example, you can create and use a pool of {@link StringBuilder}s like this: * - *

Typical usage of this class will wrap it in a try-with-resources clause so that automatic resource - * management takes care of returning and reusing objects. The type returned by {@link #get()} is a reference - * wrapper. For example, you can create and use a pool of {@link StringBuilder}s like this: *


  *         ObjectPool pool = new ObjectPool<>(StringBuilder::new, sb -> sb.setLength());
  *         try (var handle = pool.get()) {
@@ -61,10 +61,11 @@ import java.util.function.Supplier;
  *             throw new RuntimeException(e);
  *         }
  * 
- * - * At the end of the try-with-resources, the StringBuilder will be returned to the pool after - * its length has been reset to 0. + *

+ * At the end of the try-with-resources, the StringBuilder will be returned to the pool after its length has been + * reset to 0. *

+ * * @param */ public class ObjectPool implements Supplier> { diff --git a/nb-engine/nb-engine-core/pom.xml b/nb-engine/nb-engine-core/pom.xml index 1db09de7b..2764eab7a 100644 --- a/nb-engine/nb-engine-core/pom.xml +++ b/nb-engine/nb-engine-core/pom.xml @@ -130,25 +130,26 @@ - - - perftests - - false - - - - - org.apache.maven.plugins - maven-surefire-plugin - - perf - - - - - - + + + + + + + + + + + + + + + + + + + + diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java index e1eea06fd..4f0004608 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java @@ -27,7 +27,6 @@ import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; -import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; import java.io.InputStream; import java.io.PrintWriter; @@ -84,11 +83,6 @@ public interface Activity extends Comparable, ActivityDefObserver, Pro void setOutputDispenserDelegate(OutputDispenser outputDispenser); - @Override - RunState getRunState(); - - void setRunState(RunState runState); - long getStartedAtMillis(); default void shutdownActivity() { @@ -132,21 +126,9 @@ public interface Activity extends Comparable, ActivityDefObserver, Pro ErrorMetrics getExceptionMetrics(); -// /** -// * When a driver needs to identify an error uniquely for the purposes of -// * routing it to the correct error handler, or naming it in logs, or naming -// * metrics, override this method in your activity. -// * @return A function that can reliably and safely map an instance of Throwable to a stable name. -// */ -// default Function getErrorNameMapper() { -// return t -> t.getClass().getSimpleName(); -// } -// int getMaxTries(); default int getHdrDigits() { return this.getParams().getOptionalInteger("hdr_digits").orElse(4); } - - RunStateTally getRunStateTally(); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityType.java index a6fe93b6b..919bd06ba 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityType.java @@ -22,7 +22,6 @@ import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityimpl.CoreServices; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; -import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser; import java.util.Map; @@ -37,7 +36,7 @@ import java.util.Optional; * and by extension, default inputs and motors.

*/ //@Deprecated(forRemoval = true,since = "5.0") -public interface ActivityType { +public interface ActivityType> { /** @@ -67,17 +66,12 @@ public interface ActivityType { if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities); activity.setInputDispenserDelegate(inputDispenser); - final ActionDispenser actionDispenser = this.getActionDispenser(activity); - if (actionDispenser instanceof ActivitiesAware) - ((ActivitiesAware) actionDispenser).setActivitiesMap(activities); - activity.setActionDispenserDelegate(actionDispenser); - final OutputDispenser outputDispenser = this.getOutputDispenser(activity).orElse(null); if ((null != outputDispenser) && (outputDispenser instanceof ActivitiesAware)) ((ActivitiesAware) outputDispenser).setActivitiesMap(activities); activity.setOutputDispenserDelegate(outputDispenser); - final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); + final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, outputDispenser); if (motorDispenser instanceof ActivitiesAware) ((ActivitiesAware) motorDispenser).setActivitiesMap(activities); activity.setMotorDispenserDelegate(motorDispenser); @@ -94,15 +88,6 @@ public interface ActivityType { return CoreServices.getOutputDispenser(activity); } - /** - * This method will be called once per action instance. - * - * @param activity The activity instance that will parameterize the returned ActionDispenser instance. - * @return an instance of ActionDispenser - */ - default ActionDispenser getActionDispenser(final A activity) { - return new CoreActionDispenser(activity); - } /** * Return the InputDispenser instance that will be used by the associated activity to create Input factories @@ -118,9 +103,8 @@ public interface ActivityType { default MotorDispenser getMotorDispenser( final A activity, final InputDispenser inputDispenser, - final ActionDispenser actionDispenser, final OutputDispenser outputDispenser) { - return new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); + return new CoreMotorDispenser(activity, inputDispenser, outputDispenser); } /** diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java index 107fcb1f8..bbe3ba9f5 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java @@ -16,13 +16,14 @@ package io.nosqlbench.engine.api.activityapi.core; -import io.nosqlbench.engine.api.activityimpl.MotorState; import io.nosqlbench.engine.api.activityapi.input.Input; +import java.util.concurrent.Callable; + /** * The core threading harness within an activity. */ -public interface Motor extends Runnable, Stoppable { +public interface Motor extends Callable { /** * Set the input on this motor. It will be read from each cycle before applying the action. @@ -34,27 +35,9 @@ public interface Motor extends Runnable, Stoppable { Input getInput(); - /** - * Set the action on this motor. It will be applied to each input. - * - * @param action an instance of activityAction - * @return this ActivityMotor, for method chaining - */ - Motor setAction(Action action); - - Action getAction(); - - /** + /** * get the slotId which this motor is assigned to within the activity instance. * @return long slot id */ long getSlotId(); - - /** - * Get a description of the current slot run status. - * @return - a value from the {@link RunState} enum - */ - MotorState getState(); - - void removeState(); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java index 53cfc2a74..8c82dff0d 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java @@ -16,8 +16,6 @@ package io.nosqlbench.engine.api.activityapi.core; -import io.nosqlbench.engine.api.activityimpl.MotorState; - /** *

This enum indicates the state of a thread within an activity. The state is kept in an atomic * register. Ownership of state changes is shared between a supervising thread and a managed thread. diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputDispenser.java index 159561d4e..997613c7b 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputDispenser.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputDispenser.java @@ -27,8 +27,7 @@ public interface InputDispenser { /** * Resolve (find or create) an Input instance for the slot specified. * The input is not required to be per-slot (per-thread), but any shared inputs must be thread safe. - * @param slot The numbered slot within the activity instance for this action. * @return A new or cached Input for the specified slot. */ - Input getInput(long slot); + Input getInput(); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/MotorState.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/MotorState.java deleted file mode 100644 index ef59b7a1e..000000000 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/MotorState.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.api.activityimpl; - -import io.nosqlbench.engine.api.activityapi.core.RunState; -import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Supplier; - -/** - * Holds the state of a slot, allows only valid transitions, and shares the - * slot state as - */ -public class MotorState implements Supplier { - private final static Logger logger = LogManager.getLogger("MOTORS"); - private final AtomicReference atomicState = new AtomicReference<>(RunState.Uninitialized); - private final long slotId; - private final RunStateTally tally; - - public MotorState(long slotId, RunStateTally tally) { - this.slotId = slotId; - this.tally = tally; - tally.add(atomicState.get()); - } - - public RunState get() { - return atomicState.get(); - } - - /** - * This is how you share the current slot state most directly, but it has a caveat. By sharing the - * slot state in this way, you allow external changes. You should only use this method to share slot - * state for observers. - * @return an atomic reference for SlotState - */ - public AtomicReference getAtomicSlotState() { - return atomicState; - } - - /** - *

Transition the thread slot to a new state. only accepting valid transitions.

- *

The valid slot states will be moved to a data type eventually, simplifying this method.

- * - * @param to The next SlotState for this thread/slot/motor - */ - public synchronized void enterState(RunState to) { - RunState from = atomicState.get(); - if (!from.canTransitionTo(to)) { - throw new RuntimeException("Invalid transition from " + from + " to " + to); - } - while (!atomicState.compareAndSet(from, to)) { - logger.trace(() -> "retrying transition from:" + from + " to:" + to); - } - tally.change(from,to); - logger.trace(() -> "TRANSITION[" + slotId + "]: " + from + " ==> " + to); - } - - public void removeState() { - logger.trace(() -> "Removing motor state " + atomicState.get()); - tally.remove(atomicState.get()); - } -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java index 4765f161f..a13f09f97 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java @@ -16,6 +16,7 @@ package io.nosqlbench.engine.api.activityimpl; +import io.nosqlbench.adapter.diag.DriverAdapterLoader; import io.nosqlbench.adapters.api.activityconfig.OpsLoader; import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat; @@ -41,12 +42,16 @@ import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner; import io.nosqlbench.engine.api.activityapi.planning.SequencerType; import io.nosqlbench.engine.api.activityapi.simrate.*; -import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult; +import io.nosqlbench.nb.annotations.ServiceSelector; import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.components.events.NBEvent; import io.nosqlbench.nb.api.components.events.ParamChange; +import io.nosqlbench.nb.api.components.events.SetThreads; import io.nosqlbench.nb.api.components.status.NBStatusComponent; +import io.nosqlbench.nb.api.config.standard.*; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; import io.nosqlbench.nb.api.errors.BasicError; import io.nosqlbench.nb.api.errors.OpConfigError; import io.nosqlbench.nb.api.labels.NBLabels; @@ -57,12 +62,10 @@ import java.io.InputStream; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; -/** - * A default implementation of an Activity, suitable for building upon. - */ -public class SimpleActivity extends NBStatusComponent implements Activity, InvokableResult { +public class SimpleActivity extends NBStatusComponent implements Activity, InvokableResult, SyntheticOpTemplateProvider, ActivityDefObserver { private static final Logger logger = LogManager.getLogger("ACTIVITY"); protected ActivityDef activityDef; @@ -83,7 +86,9 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok private NBErrorHandler errorHandler; private ActivityMetricProgressMeter progressMeter; private String workloadSource = "unspecified"; - private final RunStateTally tally = new RunStateTally(); + private final ConcurrentHashMap> adapters = new ConcurrentHashMap<>(); + + private final OpSequence> sequence; public SimpleActivity(NBComponent parent, ActivityDef activityDef) { super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels())); @@ -102,6 +107,126 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok nameEnumerator++; } } + OpsDocList workload; + + Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); + NBConfigModel yamlmodel; + if (yaml_loc.isPresent()) { + Map disposable = new LinkedHashMap<>(activityDef.getParams()); + workload = OpsLoader.loadPath(yaml_loc.get(), disposable, "activities"); + yamlmodel = workload.getConfigModel(); + } else { + yamlmodel = ConfigModel.of(SimpleActivity.class).asReadOnly(); + } + + Optional defaultDriverName = activityDef.getParams().getOptionalString("driver"); + Optional> defaultAdapter = defaultDriverName + .flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get()) + .map(l -> l.load(this, NBLabels.forKV())); + + if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) { + throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\''); + } + + // HERE, op templates are loaded before drivers are loaded + List opTemplates = loadOpTemplates(defaultAdapter.orElse(null)); + + + List pops = new ArrayList<>(); + List> adapterlist = new ArrayList<>(); + NBConfigModel supersetConfig = ConfigModel.of(SimpleActivity.class).add(yamlmodel); + + Optional defaultDriverOption = defaultDriverName; + ConcurrentHashMap> mappers = new ConcurrentHashMap<>(); + for (OpTemplate ot : opTemplates) { +// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this); + String driverName = ot.getOptionalStringParam("driver", String.class) + .or(() -> ot.getOptionalStringParam("type", String.class)) + .or(() -> defaultDriverOption) + .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot)); + +// String driverName = ot.getOptionalStringParam("driver") +// .or(() -> activityDef.getParams().getOptionalString("driver")) +// .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot)); + + + // HERE + if (!adapters.containsKey(driverName)) { + + DriverAdapter adapter = Optional.of(driverName) + .flatMap( + name -> ServiceSelector.of( + name, + ServiceLoader.load(DriverAdapterLoader.class) + ) + .get()) + .map( + l -> l.load( + this, + NBLabels.forKV() + ) + ) + .orElseThrow(() -> new OpConfigError("driver adapter not present for name '" + driverName + "'")); + + NBConfigModel combinedModel = yamlmodel; + NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams()); + + if (adapter instanceof NBConfigurable configurable) { + NBConfigModel adapterModel = configurable.getConfigModel(); + supersetConfig.add(adapterModel); + + combinedModel = adapterModel.add(yamlmodel); + combinedConfig = combinedModel.matchConfig(activityDef.getParams()); + configurable.applyConfig(combinedConfig); + } + adapters.put(driverName, adapter); + mappers.put(driverName, adapter.getOpMapper()); + } + + supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap()); + + DriverAdapter adapter = adapters.get(driverName); + adapterlist.add(adapter); + ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this); + Optional discard = pop.takeOptionalStaticValue("driver", String.class); + pops.add(pop); + } + + if (defaultDriverOption.isPresent()) { + long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count(); + if (0 == matchingDefault) { + logger.warn("All op templates used a different driver than the default '{}'", defaultDriverOption.get()); + } + } + + try { + sequence = createOpSourceFromParsedOps(adapterlist, pops); + } catch (Exception e) { + if (e instanceof OpConfigError) { + throw e; + } + throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e); + } + + create().gauge( + "ops_pending", + () -> this.getProgressMeter().getSummary().pending(), + MetricCategory.Core, + "The current number of operations which have not been dispatched for processing yet." + ); + create().gauge( + "ops_active", + () -> this.getProgressMeter().getSummary().current(), + MetricCategory.Core, + "The current number of operations which have been dispatched for processing, but which have not yet completed." + ); + create().gauge( + "ops_complete", + () -> this.getProgressMeter().getSummary().complete(), + MetricCategory.Core, + "The current number of operations which have been completed" + ); + } public SimpleActivity(NBComponent parent, String activityDefString) { @@ -111,6 +236,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok @Override public synchronized void initActivity() { initOrUpdateRateLimiters(this.activityDef); + setDefaultsFromOpSequence(sequence); + } public synchronized NBErrorHandler getErrorHandler() { @@ -127,14 +254,6 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok return runState; } - @Override - public synchronized void setRunState(RunState runState) { - this.runState = runState; - if (RunState.Running == runState) { - this.startedAtMillis = System.currentTimeMillis(); - } - } - @Override public long getStartedAtMillis() { return startedAtMillis; @@ -196,7 +315,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok } public String toString() { - return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.tally; + return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.runState; } @Override @@ -224,15 +343,16 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok @Override public RateLimiter getCycleLimiter() { - if (cycleLimiterSource!=null) { + if (cycleLimiterSource != null) { return cycleLimiterSource.get(); } else { return null; } } + @Override public synchronized RateLimiter getStrideLimiter() { - if (strideLimiterSource!=null) { + if (strideLimiterSource != null) { return strideLimiterSource.get(); } else { return null; @@ -276,7 +396,28 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { -// initOrUpdateRateLimiters(activityDef); + for (DriverAdapter adapter : adapters.values()) { + if (adapter instanceof NBReconfigurable configurable) { + NBConfigModel cfgModel = configurable.getReconfigModel(); + NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams()); + NBReconfigurable.applyMatching(cfg, List.of(configurable)); + } + } + + } + + public void onEvent(NBEvent event) { + switch (event) { + case ParamChange pc -> { + switch (pc.value()) { + case SetThreads st -> activityDef.setThreads(st.threads); + case CycleRateSpec crs -> createOrUpdateCycleLimiter(crs); + case StrideRateSpec srs -> createOrUpdateStrideLimiter(srs); + default -> super.onEvent(event); + } + } + default -> super.onEvent(event); + } } public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) { @@ -613,8 +754,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok if (op != null && OpsLoader.isJson(op)) { workloadSource = "commandline: (op/json): '" + op + "'"; return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null); - } - else if (op != null) { + } else if (op != null) { workloadSource = "commandline: (op/inline): '" + op + "'"; return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null); } @@ -645,32 +785,48 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10); } - @Override - public RunStateTally getRunStateTally() { - return tally; - } - @Override public Map asResult() { - return Map.of("activity",this.getAlias()); + return Map.of("activity", this.getAlias()); } -// private final ThreadLocal cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> { -// RateLimiters.createOrUpdate(this,null,new SimRateSpec() -// if (cycleratePerThread) { -// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,) -// } else { -// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef) -// } -// if (getCycleLimiter() != null) { -// return RateLimiters.createOrUpdate( -// new NBThreadComponent(this), -// getCycleLimiter(), -// getCycleLimiter().getSpec()); -// } else { -// return null; -// } -// }); + + @Override + public NBLabels getLabels() { + return super.getLabels(); + } + + public void shutdownActivity() { + for (Map.Entry> entry : adapters.entrySet()) { + String adapterName = entry.getKey(); + DriverAdapter adapter = entry.getValue(); + adapter.getSpaceCache().getElements().forEach((spaceName, space) -> { + if (space instanceof AutoCloseable autocloseable) { + try { + autocloseable.close(); + } catch (Exception e) { + throw new RuntimeException("Error while shutting down state space for " + + "adapter=" + adapterName + ", space=" + spaceName + ": " + e, e); + } + } + }); + } + } + + public List getSyntheticOpTemplates(OpsDocList opsDocList, Map cfg) { + List opTemplates = new ArrayList<>(); + for (DriverAdapter adapter : adapters.values()) { + if (adapter instanceof SyntheticOpTemplateProvider sotp) { + List newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg); + opTemplates.addAll(newTemplates); + } + } + return opTemplates; + } + + public OpSequence> getOpSequence() { + return sequence; + } } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java index 5b150be1b..3f59bc2f2 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java @@ -15,17 +15,25 @@ */ package io.nosqlbench.engine.api.activityimpl.motor; +import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*; +import io.nosqlbench.adapters.api.evalctx.CycleFunction; import io.nosqlbench.engine.api.activityapi.core.*; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultSegmentBuffer; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment; -import io.nosqlbench.engine.api.activityimpl.MotorState; +import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; +import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; +import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.core.ops.fluent.OpTracker; import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.output.Output; import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter; +import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.nb.api.errors.ResultVerificationError; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -40,12 +48,12 @@ import static io.nosqlbench.engine.api.activityapi.core.RunState.*; * instance is responsible for taking input from a LongSupplier and applying * the provided LongConsumer to it on each cycle. These two parameters are called * input and action, respectively. - * + *

* This motor implementation splits the handling of sync and async actions with a hard * fork in the middle to limit potential breakage of the prior sync implementation * with new async logic. */ -public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { +public class CoreMotor implements ActivityDefObserver, Motor, SyncAction { private static final Logger logger = LogManager.getLogger(CoreMotor.class); @@ -62,78 +70,81 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { private Timer cycleResponseTimer; private Input input; - private Action action; private final Activity activity; private Output output; - private final MotorState motorState; - // private final AtomicReference slotState; private int stride = 1; private OpTracker opTracker; + private final Timer executeTimer; + private final Histogram triesHistogram; + private final Timer resultSuccessTimer; + private final Timer resultTimer; + private final Timer bindTimer; + private final NBErrorHandler errorHandler; + private final OpSequence> opsequence; + private final int maxTries; + private final Timer verifierTimer; + /** * Create an ActivityMotor. * - * @param activity The activity that this motor will be associated with. - * @param slotId The enumeration of the motor, as assigned by its executor. - * @param input A LongSupplier which provides the cycle number inputs. + * @param activity + * The activity that this motor will be associated with. + * @param slotId + * The enumeration of the motor, as assigned by its executor. + * @param input + * A LongSupplier which provides the cycle number inputs. */ public CoreMotor( - Activity activity, + SimpleActivity activity, long slotId, Input input) { this.activity = activity; this.slotId = slotId; setInput(input); - motorState = new MotorState(slotId, activity.getRunStateTally()); onActivityDefUpdate(activity.getActivityDef()); + this.opsequence = activity.getOpSequence(); + this.maxTries = activity.getMaxTries(); + bindTimer = activity.getInstrumentation().getOrCreateBindTimer(); + executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer(); + triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram(); + resultTimer = activity.getInstrumentation().getOrCreateResultTimer(); + resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer(); + errorHandler = activity.getErrorHandler(); + verifierTimer = activity.getInstrumentation().getOrCreateVerifierTimer(); + } /** * Create an ActivityMotor. * - * @param activity The activity that this motor is based on. - * @param slotId The enumeration of the motor, as assigned by its executor. - * @param input A LongSupplier which provides the cycle number inputs. - * @param action An LongConsumer which is applied to the input for each cycle. + * @param activity + * The activity that this motor is based on. + * @param slotId + * The enumeration of the motor, as assigned by its executor. + * @param input + * A LongSupplier which provides the cycle number inputs. + * @param output + * An optional opTracker. */ public CoreMotor( - Activity activity, + SimpleActivity activity, long slotId, Input input, - Action action - ) { - this(activity, slotId, input); - setAction(action); - } - - /** - * Create an ActivityMotor. - * - * @param activity The activity that this motor is based on. - * @param slotId The enumeration of the motor, as assigned by its executor. - * @param input A LongSupplier which provides the cycle number inputs. - * @param action An LongConsumer which is applied to the input for each cycle. - * @param output An optional opTracker. - */ - public CoreMotor( - Activity activity, - long slotId, - Input input, - Action action, Output output ) { this(activity, slotId, input); - setAction(action); setResultOutput(output); } /** * Set the input for this ActivityMotor. * - * @param input The LongSupplier that provides the cycle number. + * @param input + * The LongSupplier that provides the cycle number. * @return this ActivityMotor, for chaining */ @Override @@ -148,41 +159,14 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { } - /** - * Set the action for this ActivityMotor. - * - * @param action The LongConsumer that will be applied to the next cycle number. - * @return this ActivityMotor, for chaining - */ - @Override - public Motor setAction(Action action) { - this.action = action; - return this; - } - - @Override - public Action getAction() { - return action; - } - @Override public long getSlotId() { return this.slotId; } - @Override - public MotorState getState() { - return motorState; - } @Override - public void removeState() { - motorState.removeState(); - } - - @Override - public void run() { - motorState.enterState(Starting); + public Void call() { try { inputTimer = activity.getInstrumentation().getOrCreateInputTimer(); @@ -193,12 +177,6 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { cycleRateLimiter = activity.getCycleLimiter(); - if (motorState.get() == Finished) { - logger.warn(() -> "Input was already exhausted for slot " + slotId + ", remaining in finished state."); - } - - action.init(); - if (input instanceof Startable) { ((Startable) input).start(); } @@ -211,119 +189,97 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { long strideDelay = 0L; long cycleDelay = 0L; - if (action instanceof SyncAction sync) { - cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer(); - strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer(); + SyncAction sync = this; + cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer(); + strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer(); - if (activity.getActivityDef().getParams().containsKey("async")) { - throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async."); - } + if (activity.getActivityDef().getParams().containsKey("async")) { + throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async."); + } - motorState.enterState(Running); - while (motorState.get() == Running) { - CycleSegment cycleSegment = null; - CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride); + CycleSegment cycleSegment = null; + CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride); - try (Timer.Context inputTime = inputTimer.time()) { - cycleSegment = input.getInputSegment(stride); + + try (Timer.Context inputTime = inputTimer.time()) { + cycleSegment = input.getInputSegment(stride); + } + + if (cycleSegment == null) { + throw new RuntimeException("invalid state with cycle segment = null"); + } + + + if (strideRateLimiter != null) { + // block for strides rate limiter + strideDelay = strideRateLimiter.block(); + } + + long strideStart = System.nanoTime(); + try { + + while (!cycleSegment.isExhausted()) { + long cyclenum = cycleSegment.nextCycle(); + if (cyclenum < 0) { + if (cycleSegment.isExhausted()) { + logger.trace(() -> "input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId); + continue; + } } - if (cycleSegment == null) { - logger.trace(() -> "input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId); - motorState.enterState(Finished); - continue; + int result = -1; + + if (cycleRateLimiter != null) { + // Block for cycle rate limiter + cycleDelay = cycleRateLimiter.block(); } - - if (strideRateLimiter != null) { - // block for strides rate limiter - strideDelay = strideRateLimiter.block(); - } - - long strideStart = System.nanoTime(); + long cycleStart = System.nanoTime(); try { - - while (!cycleSegment.isExhausted()) { - long cyclenum = cycleSegment.nextCycle(); - if (cyclenum < 0) { - if (cycleSegment.isExhausted()) { - logger.trace(() -> "input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId); - motorState.enterState(Finished); - continue; - } - } - - if (motorState.get() != Running) { - logger.trace(() -> "motor stopped after input (input " + cyclenum + "), stopping motor thread " + slotId); - continue; - } - int result = -1; - - if (cycleRateLimiter != null) { - // Block for cycle rate limiter - cycleDelay = cycleRateLimiter.block(); - } - - long cycleStart = System.nanoTime(); - try { - logger.trace(()->"cycle " + cyclenum); - result = sync.runCycle(cyclenum); - } catch (Exception e) { - motorState.enterState(Errored); - throw e; - } finally { - long cycleEnd = System.nanoTime(); - cycleServiceTimer.update((cycleEnd - cycleStart) + cycleDelay, TimeUnit.NANOSECONDS); - } - segBuffer.append(cyclenum, result); - } - + logger.trace(() -> "cycle " + cyclenum); + result = sync.runCycle(cyclenum); + } catch (Exception e) { + throw e; } finally { - long strideEnd = System.nanoTime(); - strideServiceTimer.update((strideEnd - strideStart) + strideDelay, TimeUnit.NANOSECONDS); - } - - if (output != null) { - CycleResultsSegment outputBuffer = segBuffer.toReader(); - try { - output.onCycleResultSegment(outputBuffer); - } catch (Exception t) { - logger.error(()->"Error while feeding result segment " + outputBuffer + " to output '" + output + "', error:" + t); - throw t; - } + long cycleEnd = System.nanoTime(); + cycleServiceTimer.update((cycleEnd - cycleStart) + cycleDelay, TimeUnit.NANOSECONDS); } + segBuffer.append(cyclenum, result); } - } else { - throw new RuntimeException("Valid Action implementations must implement SyncAction"); + } finally { + long strideEnd = System.nanoTime(); + strideServiceTimer.update((strideEnd - strideStart) + strideDelay, TimeUnit.NANOSECONDS); } - if (motorState.get() == Stopping) { - motorState.enterState(Stopped); - logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get()); - } else if (motorState.get() == Finished) { - logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get()); - } else { - logger.warn(()->"Unexpected motor state for CoreMotor shutdown: " + motorState.get()); + if (output != null) { + CycleResultsSegment outputBuffer = segBuffer.toReader(); + try { + output.onCycleResultSegment(outputBuffer); + } catch (Exception t) { + logger.error(() -> "Error while feeding result segment " + outputBuffer + " to output '" + output + "', error:" + t); + throw t; + } } + } catch (Throwable t) { - logger.error(()->"Error in core motor loop:" + t, t); - motorState.enterState(Errored); + logger.error(() -> "Error in core motor loop:" + t, t); throw t; } + return null; } @Override public String toString() { - return this.activity.getAlias() + ": slot:" + this.slotId + "; state:" + motorState.get(); + return this.activity.getAlias() + ": slot:" + this.slotId; } @Override public void onActivityDefUpdate(ActivityDef activityDef) { - for (Object component : (new Object[]{input, opTracker, action, output})) { + for (Object component : (new Object[]{input, opTracker, this, output})) { if (component instanceof ActivityDefObserver) { ((ActivityDefObserver) component).onActivityDefUpdate(activityDef); } @@ -335,19 +291,89 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { } - @Override - public synchronized void requestStop() { - RunState currentState = motorState.get(); - if (Objects.requireNonNull(currentState) == Running) { - Stoppable.stop(input, action); - motorState.enterState(Stopping); - } else { - logger.warn(() -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState); - } - } - public void setResultOutput(Output resultOutput) { this.output = resultOutput; } + public int runCycle(long cycle) { + + OpDispenser dispenser = null; + Op op = null; + + try (Timer.Context ct = bindTimer.time()) { + dispenser = opsequence.apply(cycle); + op = dispenser.getOp(cycle); + } catch (Exception e) { + throw new RuntimeException("while binding request in cycle " + cycle + " for op template named '" + (dispenser != null ? dispenser.getOpName() : "NULL") + + "': " + e.getMessage(), e); + } + + int code = 0; + Object result = null; + while (op != null) { + + int tries = 0; + while (tries++ < maxTries) { + Throwable error = null; + long startedAt = System.nanoTime(); + + dispenser.onStart(cycle); + + try (Timer.Context ct = executeTimer.time()) { + if (op instanceof RunnableOp runnableOp) { + runnableOp.run(); + } else if (op instanceof CycleOp cycleOp) { + result = cycleOp.apply(cycle); + } else if (op instanceof ChainingOp chainingOp) { + result = chainingOp.apply(result); + } else { + throw new RuntimeException("The op implementation did not implement any active logic. Implement " + + "one of [RunnableOp, CycleOp, or ChainingOp]"); + } + // TODO: break out validation timer from execute + try (Timer.Context ignored = verifierTimer.time()) { + CycleFunction verifier = dispenser.getVerifier(); + try { + verifier.setVariable("result", result); + verifier.setVariable("cycle", cycle); + Boolean isGood = verifier.apply(cycle); + if (!isGood) { + throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails()); + } + } catch (Exception e) { + throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails()); + } + } + } catch (Exception e) { + error = e; + } finally { + long nanos = System.nanoTime() - startedAt; + resultTimer.update(nanos, TimeUnit.NANOSECONDS); + if (error == null) { + resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS); + dispenser.onSuccess(cycle, nanos); + break; + } else { + ErrorDetail detail = errorHandler.handleError(error, cycle, nanos); + dispenser.onError(cycle, nanos, error); + code = detail.resultCode; + if (!detail.isRetryable()) { + break; + } + } + } + } + triesHistogram.update(tries); + + if (op instanceof OpGenerator) { + logger.trace(() -> "GEN OP for cycle(" + cycle + ")"); + op = ((OpGenerator) op).getNextOp(); + } else { + op = null; + } + } + + return code; + } + } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java index 6a376e52f..91146e004 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java @@ -15,6 +15,7 @@ */ package io.nosqlbench.engine.api.activityimpl.motor; +import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityapi.core.*; import io.nosqlbench.engine.api.activityapi.input.Input; @@ -30,32 +31,28 @@ import java.util.function.IntPredicate; */ public class CoreMotorDispenser implements MotorDispenser { - private final Activity activity; + private final SimpleActivity activity; private final InputDispenser inputDispenser; - private final ActionDispenser actionDispenser; private final OutputDispenser outputDispenser; - public CoreMotorDispenser(Activity activity, + public CoreMotorDispenser(SimpleActivity activity, InputDispenser inputDispenser, - ActionDispenser actionDispenser, OutputDispenser outputDispenser ) { this.activity = activity; this.inputDispenser = inputDispenser; - this.actionDispenser = actionDispenser; this.outputDispenser = outputDispenser; } @Override public Motor getMotor(ActivityDef activityDef, int slotId) { - Action action = actionDispenser.getAction(slotId); Input input = inputDispenser.getInput(slotId); Output output = null; if (outputDispenser !=null) { output = outputDispenser.getOutput(slotId); } IntPredicate resultFilter = null; - Motor am = new CoreMotor<>(activity, slotId, input, action, output); + Motor am = new CoreMotor<>(activity, slotId, input, output); return am; } } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActionDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActionDispenser.java deleted file mode 100644 index 72ff9f1f5..000000000 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActionDispenser.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.api.activityimpl.uniform; - -import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; -import io.nosqlbench.engine.api.activityapi.core.Activity; -import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction; - -public class StandardActionDispenser implements ActionDispenser { - private final StandardActivity activity; - - public StandardActionDispenser(StandardActivity activity) { - this.activity = activity; - } - - @Override - public StandardAction getAction(int slot) { - return new StandardAction<>(activity,slot); - } -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java deleted file mode 100644 index d3154d15d..000000000 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.api.activityimpl.uniform; - -import io.nosqlbench.adapter.diag.DriverAdapterLoader; -import io.nosqlbench.adapters.api.activityconfig.OpsLoader; -import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; -import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList; -import io.nosqlbench.adapters.api.activityimpl.OpDispenser; -import io.nosqlbench.adapters.api.activityimpl.OpMapper; -import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; -import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider; -import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op; -import io.nosqlbench.adapters.api.templating.ParsedOp; -import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; -import io.nosqlbench.nb.api.lifecycle.Shutdownable; -import io.nosqlbench.nb.api.components.core.NBComponent; -import io.nosqlbench.nb.api.config.standard.*; -import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; -import io.nosqlbench.nb.api.errors.BasicError; -import io.nosqlbench.nb.api.errors.OpConfigError; -import io.nosqlbench.nb.api.labels.NBLabels; -import io.nosqlbench.nb.api.components.events.NBEvent; -import io.nosqlbench.nb.api.components.events.ParamChange; -import io.nosqlbench.nb.api.components.events.SetThreads; -import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; -import io.nosqlbench.engine.api.activityapi.planning.OpSequence; -import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec; -import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; -import io.nosqlbench.nb.annotations.ServiceSelector; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * This is a typed activity which is expected to become the standard - * core of all new activity types. Extant NB drivers should also migrate - * to this when possible. - * - * @param - * A type of runnable which wraps the operations for this type of driver. - * @param - * The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph - */ -public class StandardActivity extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver { - private static final Logger logger = LogManager.getLogger("ACTIVITY"); - private final OpSequence> sequence; - private final ConcurrentHashMap> adapters = new ConcurrentHashMap<>(); - - public StandardActivity(NBComponent parent, ActivityDef activityDef) { - super(parent, activityDef); - OpsDocList workload; - - Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); - NBConfigModel yamlmodel; - if (yaml_loc.isPresent()) { - Map disposable = new LinkedHashMap<>(activityDef.getParams()); - workload = OpsLoader.loadPath(yaml_loc.get(), disposable, "activities"); - yamlmodel = workload.getConfigModel(); - } else { - yamlmodel = ConfigModel.of(StandardActivity.class).asReadOnly(); - } - - Optional defaultDriverName = activityDef.getParams().getOptionalString("driver"); - Optional> defaultAdapter = defaultDriverName - .flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get()) - .map(l -> l.load(this, NBLabels.forKV())); - - if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) { - throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\''); - } - - // HERE, op templates are loaded before drivers are loaded - List opTemplates = loadOpTemplates(defaultAdapter.orElse(null)); - - - List pops = new ArrayList<>(); - List> adapterlist = new ArrayList<>(); - NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel); - - Optional defaultDriverOption = defaultDriverName; - ConcurrentHashMap> mappers = new ConcurrentHashMap<>(); - for (OpTemplate ot : opTemplates) { -// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this); - String driverName = ot.getOptionalStringParam("driver", String.class) - .or(() -> ot.getOptionalStringParam("type", String.class)) - .or(() -> defaultDriverOption) - .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot)); - -// String driverName = ot.getOptionalStringParam("driver") -// .or(() -> activityDef.getParams().getOptionalString("driver")) -// .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot)); - - - // HERE - if (!adapters.containsKey(driverName)) { - - DriverAdapter adapter = Optional.of(driverName) - .flatMap( - name -> ServiceSelector.of( - name, - ServiceLoader.load(DriverAdapterLoader.class) - ) - .get()) - .map( - l -> l.load( - this, - NBLabels.forKV() - ) - ) - .orElseThrow(() -> new OpConfigError("driver adapter not present for name '" + driverName + "'")); - - NBConfigModel combinedModel = yamlmodel; - NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams()); - - if (adapter instanceof NBConfigurable configurable) { - NBConfigModel adapterModel = configurable.getConfigModel(); - supersetConfig.add(adapterModel); - - combinedModel = adapterModel.add(yamlmodel); - combinedConfig = combinedModel.matchConfig(activityDef.getParams()); - configurable.applyConfig(combinedConfig); - } - adapters.put(driverName, adapter); - mappers.put(driverName, adapter.getOpMapper()); - } - - supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap()); - - DriverAdapter adapter = adapters.get(driverName); - adapterlist.add(adapter); - ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this); - Optional discard = pop.takeOptionalStaticValue("driver", String.class); - pops.add(pop); - } - - if (defaultDriverOption.isPresent()) { - long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count(); - if (0 == matchingDefault) { - logger.warn("All op templates used a different driver than the default '{}'", defaultDriverOption.get()); - } - } - - try { - sequence = createOpSourceFromParsedOps(adapterlist, pops); - } catch (Exception e) { - if (e instanceof OpConfigError) { - throw e; - } - throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e); - } - - create().gauge( - "ops_pending", - () -> this.getProgressMeter().getSummary().pending(), - MetricCategory.Core, - "The current number of operations which have not been dispatched for processing yet." - ); - create().gauge( - "ops_active", - () -> this.getProgressMeter().getSummary().current(), - MetricCategory.Core, - "The current number of operations which have been dispatched for processing, but which have not yet completed." - ); - create().gauge( - "ops_complete", - () -> this.getProgressMeter().getSummary().complete(), - MetricCategory.Core, - "The current number of operations which have been completed" - ); - } - - @Override - public void initActivity() { - super.initActivity(); - setDefaultsFromOpSequence(sequence); - } - - - public OpSequence> getOpSequence() { - return sequence; - } - -// /** -// * When an adapter needs to identify an error uniquely for the purposes of -// * routing it to the correct error handler, or naming it in logs, or naming -// * metrics, override this method in your activity. -// * -// * @return A function that can reliably and safely map an instance of Throwable to a stable name. -// */ -// @Override -// public final Function getErrorNameMapper() { -// return adapter.getErrorNameMapper(); -// } - - @Override - public synchronized void onActivityDefUpdate(ActivityDef activityDef) { - super.onActivityDefUpdate(activityDef); - - for (DriverAdapter adapter : adapters.values()) { - if (adapter instanceof NBReconfigurable configurable) { - NBConfigModel cfgModel = configurable.getReconfigModel(); - NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams()); - NBReconfigurable.applyMatching(cfg, List.of(configurable)); - } - } - } - -// @Override -// public synchronized void onActivityDefUpdate(final ActivityDef activityDef) { -// super.onActivityDefUpdate(activityDef); -// -// for (final DriverAdapter adapter : this.adapters.values()) -// if (adapter instanceof NBReconfigurable reconfigurable) { -// NBConfigModel cfgModel = reconfigurable.getReconfigModel(); -// final Optional op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); -// if (op_yaml_loc.isPresent()) { -// final Map disposable = new LinkedHashMap<>(activityDef.getParams()); -// final OpsDocList workload = OpsLoader.loadPath(op_yaml_loc.get(), disposable, "activities"); -// cfgModel = cfgModel.add(workload.getConfigModel()); -// } -// final NBConfiguration cfg = cfgModel.apply(activityDef.getParams()); -// reconfigurable.applyReconfig(cfg); -// } -// -// } - - @Override - public List getSyntheticOpTemplates(OpsDocList opsDocList, Map cfg) { - List opTemplates = new ArrayList<>(); - for (DriverAdapter adapter : adapters.values()) { - if (adapter instanceof SyntheticOpTemplateProvider sotp) { - List newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg); - opTemplates.addAll(newTemplates); - } - } - return opTemplates; - } - - /** - * This is done here since driver adapters are intended to keep all of their state within - * dedicated state space types. Any space which implements {@link Shutdownable} - * will be closed when this activity shuts down. - */ - @Override - public void shutdownActivity() { - for (Map.Entry> entry : adapters.entrySet()) { - String adapterName = entry.getKey(); - DriverAdapter adapter = entry.getValue(); - adapter.getSpaceCache().getElements().forEach((spaceName, space) -> { - if (space instanceof AutoCloseable autocloseable) { - try { - autocloseable.close(); - } catch (Exception e) { - throw new RuntimeException("Error while shutting down state space for " + - "adapter=" + adapterName + ", space=" + spaceName + ": " + e, e); - } - } - }); - } - } - - @Override - public NBLabels getLabels() { - return super.getLabels(); - } - - - @Override - public void onEvent(NBEvent event) { - switch (event) { - case ParamChange pc -> { - switch (pc.value()) { - case SetThreads st -> activityDef.setThreads(st.threads); - case CycleRateSpec crs -> createOrUpdateCycleLimiter(crs); - case StrideRateSpec srs -> createOrUpdateStrideLimiter(srs); - default -> super.onEvent(event); - } - } - default -> super.onEvent(event); - } - } - - -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java index a3e03db0d..ae03ca7b6 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java @@ -17,6 +17,7 @@ package io.nosqlbench.engine.api.activityimpl.uniform; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; @@ -27,7 +28,7 @@ import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.Map; -public class StandardActivityType> implements ActivityType { +public class StandardActivityType> implements ActivityType { private static final Logger logger = LogManager.getLogger("ACTIVITY"); private final Map adapters = new HashMap<>(); @@ -59,13 +60,8 @@ public class StandardActivityType> implements Ac if (activityDef.getParams().getOptionalString("async").isPresent()) throw new RuntimeException("This driver does not support async mode yet."); - return (A) new StandardActivity(parent, activityDef); + return (A) new SimpleActivity(parent, activityDef); } - @Override - public ActionDispenser getActionDispenser(final A activity) { - return new StandardActionDispenser(activity); - } - } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java deleted file mode 100644 index 05d4bd507..000000000 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.api.activityimpl.uniform.actions; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Timer; -import io.nosqlbench.adapters.api.activityimpl.OpDispenser; -import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*; -import io.nosqlbench.adapters.api.evalctx.CycleFunction; -import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; -import io.nosqlbench.nb.api.errors.ResultVerificationError; -import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; -import io.nosqlbench.engine.api.activityapi.core.SyncAction; -import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; -import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; -import io.nosqlbench.engine.api.activityapi.planning.OpSequence; -import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.concurrent.TimeUnit; - -/** - * This is the generified version of an Action. All driver adapters us this, as opposed - * to previous NB versions where it was implemented for each driver. - *

- * This allows the API to be consolidated so that the internal machinery of NB - * works in a very consistent and uniform way for all users and drivers. - * - * @param - * The type of activity - * @param - * The type of operation - */ -public class StandardAction, R extends Op> implements SyncAction, ActivityDefObserver { - private final static Logger logger = LogManager.getLogger("ACTION"); - private final Timer executeTimer; - private final Histogram triesHistogram; - private final Timer resultSuccessTimer; - private final Timer resultTimer; - private final Timer bindTimer; - private final NBErrorHandler errorHandler; - private final OpSequence> opsequence; - private final int maxTries; - private final Timer verifierTimer; - - public StandardAction(A activity, int slot) { - this.opsequence = activity.getOpSequence(); - this.maxTries = activity.getMaxTries(); - bindTimer = activity.getInstrumentation().getOrCreateBindTimer(); - executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer(); - triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram(); - resultTimer = activity.getInstrumentation().getOrCreateResultTimer(); - resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer(); - errorHandler = activity.getErrorHandler(); - verifierTimer = activity.getInstrumentation().getOrCreateVerifierTimer(); - } - - @Override - public int runCycle(long cycle) { - - OpDispenser dispenser=null; - Op op = null; - - try (Timer.Context ct = bindTimer.time()) { - dispenser = opsequence.apply(cycle); - op = dispenser.getOp(cycle); - } catch (Exception e) { - throw new RuntimeException("while binding request in cycle " + cycle + " for op template named '" + (dispenser!=null?dispenser.getOpName():"NULL")+ - "': " + e.getMessage(), e); - } - - int code = 0; - Object result = null; - while (op != null) { - - int tries = 0; - while (tries++ < maxTries) { - Throwable error = null; - long startedAt = System.nanoTime(); - - dispenser.onStart(cycle); - - try (Timer.Context ct = executeTimer.time()) { - if (op instanceof RunnableOp runnableOp) { - runnableOp.run(); - } else if (op instanceof CycleOp cycleOp) { - result = cycleOp.apply(cycle); - } else if (op instanceof ChainingOp chainingOp) { - result = chainingOp.apply(result); - } else { - throw new RuntimeException("The op implementation did not implement any active logic. Implement " + - "one of [RunnableOp, CycleOp, or ChainingOp]"); - } - // TODO: break out validation timer from execute - try (Timer.Context ignored = verifierTimer.time()) { - CycleFunction verifier = dispenser.getVerifier(); - try { - verifier.setVariable("result", result); - verifier.setVariable("cycle",cycle); - Boolean isGood = verifier.apply(cycle); - if (!isGood) { - throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails()); - } - } catch (Exception e) { - throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails()); - } - } - } catch (Exception e) { - error = e; - } finally { - long nanos = System.nanoTime() - startedAt; - resultTimer.update(nanos, TimeUnit.NANOSECONDS); - if (error == null) { - resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS); - dispenser.onSuccess(cycle, nanos); - break; - } else { - ErrorDetail detail = errorHandler.handleError(error, cycle, nanos); - dispenser.onError(cycle, nanos, error); - code = detail.resultCode; - if (!detail.isRetryable()) { - break; - } - } - } - } - triesHistogram.update(tries); - - if (op instanceof OpGenerator) { - logger.trace(() -> "GEN OP for cycle(" + cycle + ")"); - op = ((OpGenerator) op).getNextOp(); - } else { - op = null; - } - } - - return code; - } - - @Override - public void onActivityDefUpdate(ActivityDef activityDef) { - } - -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExceptionHandler.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExceptionHandler.java deleted file mode 100644 index 822b07dc7..000000000 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExceptionHandler.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.core.lifecycle.activity; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class ActivityExceptionHandler implements Thread.UncaughtExceptionHandler { - - private static final Logger logger = LogManager.getLogger(ActivityExceptionHandler.class); - - private final ActivityExecutor executor; - - public ActivityExceptionHandler(ActivityExecutor executor) { - this.executor = executor; - logger.debug(() -> "Activity executor exception handler starting up for executor '" + executor.getActivityDef().getAlias() + "'"); - } - - - @Override - public void uncaughtException(Thread t, Throwable e) { - logger.error("Uncaught exception in thread '" + t.getName() + ", state[" + t.getState() + "], notifying executor '" + executor + "': " + e); - executor.notifyException(t, e); - } -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java deleted file mode 100644 index 16c93b88d..000000000 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java +++ /dev/null @@ -1,615 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.nosqlbench.engine.core.lifecycle.activity; - -import com.codahale.metrics.Gauge; -import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory; -import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; -import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge; -import io.nosqlbench.nb.api.labels.NBLabeledElement; -import io.nosqlbench.nb.api.labels.NBLabels; -import io.nosqlbench.nb.api.components.core.NBComponentExecutionScope; -import io.nosqlbench.engine.api.activityapi.core.*; -import io.nosqlbench.engine.api.activityimpl.MotorState; -import io.nosqlbench.nb.api.annotations.Annotation; -import io.nosqlbench.nb.api.annotations.Layer; -import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; -import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; -import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable; -import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; -import io.nosqlbench.engine.api.activityimpl.motor.RunStateImage; -import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; -import io.nosqlbench.engine.core.annotation.Annotators; -import io.nosqlbench.engine.core.lifecycle.ExecutionResult; -//import io.nosqlbench.virtdata.userlibs.apps.valuechecker.IndexedThreadFactory; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.*; -import java.util.stream.Collectors; - -/** - *

An ActivityExecutor is an execution harness for a single activity instance. - * It is responsible for managing threads and activity settings which may be changed while the activity is running.

- * - *

In order to allow for dynamic thread management, which is not easily supported as an explicit feature - * of most executor services, threads are started as long-running processes and managed via state signaling. - * The {@link RunState} enum, {@link MotorState} type, and {@link RunStateTally} - * state tracking class are used together to represent valid states and transitions, contain and transition state - * atomically, - * and provide blocking conditions for observers, respectively.

- * - *

Some basic rules and invariants must be observed for consistent concurrent behavior. - * Any state changes for a Motor must be made through {@link Motor#getState()}. - * This allows the state tracking to work consistently for all observers.

- */ - -public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener, ProgressCapable, Callable { - - // TODO Encapsulate valid state transitions to be only modifiable within the appropriate type view. - - private static final Logger logger = LogManager.getLogger(ActivityExecutor.class); - private static final Logger activitylogger = LogManager.getLogger("ACTIVITY"); - - private final LinkedList> motors = new LinkedList<>(); - private final Activity activity; - private final ActivityDef activityDef; - private final RunStateTally tally; - private ExecutorService executorService; - private Exception exception; - private String sessionId = ""; - private long startedAt = 0L; - private long stoppedAt = 0L; - - private ActivityExecutorShutdownHook shutdownHook = null; - private NBMetricGauge threadsGauge; - - public ActivityExecutor(Activity activity) { - this.activity = activity; - this.activityDef = activity.getActivityDef(); - activity.getActivityDef().getParams().addListener(this); - this.tally = activity.getRunStateTally(); - } - - // TODO: Doc how uninitialized activities do not propagate parameter map changes and how - // TODO: this is different from preventing modification to uninitialized activities - - // TODO: Determine whether this should really be synchronized - - /** - * Simply stop the motors - */ - public void stopActivity() { - logger.info(() -> "stopping activity in progress: " + this.getActivityDef().getAlias()); - - activity.setRunState(RunState.Stopping); - motors.forEach(Motor::requestStop); - tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored); - - shutdownExecutorService(Integer.MAX_VALUE); - tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored); - activity.setRunState(RunState.Stopped); - - logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); - - Annotators.recordAnnotation(Annotation.newBuilder() - .element(this) - .interval(this.startedAt, this.stoppedAt) - .layer(Layer.Activity) - .addDetail("params", getActivityDef().toString()) - .build() - ); - } - - /** - * Force stop the motors without trying to wait for the activity to reach stopped/finished state - */ - public void forceStopActivity() { - logger.info(() -> "force stopping activity in progress: " + this.getActivityDef().getAlias()); - - activity.setRunState(RunState.Stopping); - motors.forEach(Motor::requestStop); - - shutdownExecutorService(Integer.MAX_VALUE); - tally.awaitNoneOther(RunState.Stopped, RunState.Finished); - activity.setRunState(RunState.Stopped); - - logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); - - Annotators.recordAnnotation(Annotation.newBuilder() - .element(this) - .interval(this.startedAt, this.stoppedAt) - .layer(Layer.Activity) - .addDetail("params", getActivityDef().toString()) - .build() - ); - } - - public Exception forceStopActivity(int initialMillisToWait) { - - activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")"); - activity.setRunState(RunState.Stopped); - - executorService.shutdownNow(); - requestStopMotors(); - - int divisor = 100; - int polltime = initialMillisToWait / divisor; - long gracefulWaitStartedAt = System.currentTimeMillis(); - long waitUntil = initialMillisToWait + gracefulWaitStartedAt; - long time = gracefulWaitStartedAt; - while (time < waitUntil && !executorService.isTerminated()) { - try { - Thread.sleep(polltime); - time = System.currentTimeMillis(); - } catch (InterruptedException ignored) { - } - } - long gracefulWaitEndedAt = System.currentTimeMillis(); - logger.debug("took " + (gracefulWaitEndedAt - gracefulWaitStartedAt) + " ms to shutdown gracefully"); - - if (!executorService.isTerminated()) { - logger.info(() -> "stopping activity forcibly " + activity.getAlias()); - List runnables = executorService.shutdownNow(); - long forcibleShutdownCompletedAt = System.currentTimeMillis(); - logger.debug(() -> "took " + (forcibleShutdownCompletedAt - gracefulWaitEndedAt) + " ms to shutdown forcibly"); - logger.debug(() -> runnables.size() + " tasks never started."); - } - - long activityShutdownStartedAt = System.currentTimeMillis(); - logger.debug("invoking activity-specific shutdown hooks"); - activity.shutdownActivity(); - activity.closeAutoCloseables(); - long activityShutdownEndedAt = System.currentTimeMillis(); - logger.debug("took " + (activityShutdownEndedAt - activityShutdownStartedAt) + " ms to shutdown activity threads"); - activitylogger.debug("FORCE STOP/after alias=(" + activity.getAlias() + ")"); - - if (exception != null) { - activitylogger.debug("FORCE STOP/exception alias=(" + activity.getAlias() + ")"); - } - return exception; - - } - - /** - * Shutdown the activity executor, with a grace period for the motor threads. - * - * @param initialMillisToWait - * milliseconds to wait after graceful shutdownActivity request, before forcing - * everything to stop - */ - public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) { - Exception exception = forceStopActivity(initialMillisToWait); - if (exception != null && rethrow) { - throw new RuntimeException(exception); - } - } - - /** - * Listens for changes to parameter maps, maps them to the activity instance, and notifies all eligible listeners of - * changes. - */ - @Override - public void handleParameterMapUpdate(ParameterMap parameterMap) { - - activity.onActivityDefUpdate(activityDef); - - // An activity must be initialized before the motors and other components are - // considered ready to handle parameter map changes. This is signaled in an activity - // by the RunState. - if (activity.getRunState() != RunState.Uninitialized) { - if (activity.getRunState() == RunState.Running) { - adjustMotorCountToThreadParam(activity.getActivityDef()); - } - motors.stream() - .filter(m -> (m instanceof ActivityDefObserver)) -// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized) -// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting) - .forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef)); - } - } - - public ActivityDef getActivityDef() { - return activityDef; - } - - public String toString() { - return getClass().getSimpleName() + "~" + activityDef.getAlias(); - } - - private String getSlotStatus() { - return motors.stream() - .map(m -> m.getState().get().getCode()) - .collect(Collectors.joining(",", "[", "]")); - } - - /** - * Stop extra motors, start missing motors - * - * @param activityDef - * the activityDef for this activity instance - */ - private void adjustMotorCountToThreadParam(ActivityDef activityDef) { // TODO: Ensure that threads area allowed to complete their current op gracefully - logger.trace(() -> ">-pre-adjust->" + getSlotStatus()); - - reduceActiveMotorCountDownToThreadParam(activityDef); - increaseActiveMotorCountUpToThreadParam(activityDef); - alignMotorStateToIntendedActivityState(); - awaitAlignmentOfMotorStateToActivityState(); - - logger.trace(() -> ">post-adjust->" + getSlotStatus()); - - } - - private void increaseActiveMotorCountUpToThreadParam(ActivityDef activityDef) { - // Create motor slots - try { - while (motors.size() < activityDef.getThreads()) { - Motor motor = activity.getMotorDispenserDelegate().getMotor(activityDef, motors.size()); - logger.trace(() -> "Starting cycle motor thread:" + motor); - motors.add(motor); - } - } catch (Exception e) { - System.out.print("critical error while starting motors: " + e); - logger.error("critical error while starting motors:" + e,e); - throw new RuntimeException(e); - } - } - - private void reduceActiveMotorCountDownToThreadParam(ActivityDef activityDef) { - // Stop and remove extra motor slots - if (activityDef.getThreads()==0) { - logger.warn("setting threads to zero is not advised. At least one thread has to be active to keep the activity alive."); - } -// LinkedList> toremove = new LinkedList<>(); -// while (activityDef.getThreads()>motors.size()) { -// Motor motor = motors.removeLast(); -// toremove.addFirst(motor); -// } -// for (Motor motor : toremove) { -// motor.requestStop(); -// } -// for (Motor motor : toremove) { -// motor.removeState(); -// } -// - while (motors.size() > activityDef.getThreads()) { - Motor motor = motors.get(motors.size() - 1); - logger.trace(() -> "Stopping cycle motor thread:" + motor); - motor.requestStop(); - motor.removeState(); - - /** - * NOTE: this leaves trailing, longer-running threads which might hold the executor open - * to potentially be cleaned up by {@link ExecutorService#shutdown()} or - * {@link ExecutorService#shutdownNow()}. At this point, the motor thread has - * been instructed to shutdown, and it is effectively thread-non-grata to the activity. - */ - motors.remove(motors.size() - 1); - } - } - - private synchronized void alignMotorStateToIntendedActivityState() { - RunState intended = activity.getRunState(); - logger.trace(() -> "ADJUSTING to INTENDED " + intended); - switch (intended) { - case Uninitialized: - break; - case Running: - case Starting: - motors.stream() - .filter(m -> m.getState().get() != RunState.Running) - .filter(m -> m.getState().get() != RunState.Finished) - .filter(m -> m.getState().get() != RunState.Starting) - .forEach(m -> { - executorService.execute(m); - }); - break; - case Stopped: - motors.stream() - .filter(m -> m.getState().get() != RunState.Stopped) - .forEach(Motor::requestStop); - break; - case Finished: - case Stopping: - case Errored: - break; -// throw new RuntimeException("Invalid requested state in activity executor:" + activity.getRunState()); - - default: - throw new RuntimeException("Unmatched run state:" + activity.getRunState()); - } - } - - private void awaitAlignmentOfMotorStateToActivityState() { - - logger.debug(() -> "awaiting state alignment from " + activity.getRunState()); - RunStateImage states = null; - switch (activity.getRunState()) { - case Starting: - case Running: - states = tally.awaitNoneOther(RunState.Running, RunState.Finished); - break; - case Errored: - case Stopping: - case Stopped: - states = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored); - break; - case Uninitialized: - break; - case Finished: - states = tally.awaitNoneOther(RunState.Finished); - break; - default: - throw new RuntimeException("Unmatched run state:" + activity.getRunState()); - } - RunState previousState = activity.getRunState(); - activity.setRunState(states.getMaxState()); - logger.debug("activity and threads are aligned to state " + previousState + " for " + this.getActivity().getAlias() + ", and advanced to " + activity.getRunState()); - } - - - private void requestStopMotors() { - logger.info(() -> "stopping activity " + activity); - activity.setRunState(RunState.Stopping); - motors.forEach(Motor::requestStop); - } - - - public boolean isRunning() { - return motors.stream().anyMatch(m -> m.getState().get() == RunState.Running); - } - - public Activity getActivity() { - return activity; - } - - public synchronized void notifyException(Thread t, Throwable e) { - logger.debug(() -> "Uncaught exception in activity thread forwarded to activity executor: " + e.getMessage()); - this.exception = new RuntimeException("Error in activity thread " + t.getName(), e); - this.requestStopMotors(); - } - - @Override - public ProgressMeterDisplay getProgressMeter() { - return this.activity.getProgressMeter(); - } - - - @Override - public ExecutionResult call() throws Exception { - try (NBComponentExecutionScope scope = new NBComponentExecutionScope(activity)) { - - shutdownHook = new ActivityExecutorShutdownHook(this); - Runtime.getRuntime().addShutdownHook(shutdownHook); - - Annotators.recordAnnotation(Annotation.newBuilder() - .element(this) - .now() - .layer(Layer.Activity) - .addDetail("event", "start-activity") - .addDetail("params", activityDef.toString()) - .build()); - - try { - // instantiate and configure fixtures that need to be present - // before threads start running such as metrics instruments - activity.initActivity(); - startMotorExecutorService(); - registerMetrics(); - startRunningActivityThreads(); - awaitMotorsAtLeastRunning(); - logger.debug("STARTED " + activityDef.getAlias()); - awaitActivityCompletion(); - } catch (Exception e) { - this.exception = e; - } finally { - stoppedAt = System.currentTimeMillis(); - // TODO: close out metrics outputs on component tree if needed - activity.shutdownActivity(); - activity.closeAutoCloseables(); - ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception); - finish(true); - return result; - } - } catch (Exception e2) { - throw new RuntimeException(e2); - } - - } - - /** - * This waits for at least one motor to be in running, finished or stopped state. - * A motor with enough cycles to read will go into a running state. A motor which has - * a short read immediately after being started will go into a finished state. A motor - * which has been stopped for some reason, like an error or a stop command will go into - * stopped state. All of these states are sufficient to signal that successful startup - * has been completed at least. - */ - private void awaitMotorsAtLeastRunning() { - if (this.exception!=null) { - return; - } - RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored); - RunState maxState = states.getMaxState(); - if (maxState == RunState.Errored) { - activity.setRunState(maxState); - throw new RuntimeException("Error in activity"); - } - } - - -// public synchronized void startActivity() { -// RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped); -// if (startable.isTimeout()) { -// throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable); -// } -// startMotorExecutorService(); -// startRunningActivityThreads(); -// awaitMotorsAtLeastRunning(); -// } - - private void registerMetrics() { - this.activity.create().gauge( - "threads", - () -> (double) this.motors.size(), - MetricCategory.Core, - "The current number of threads in activity " + this.description() - ); - } - - - private boolean shutdownExecutorService(int secondsToWait) { - - activitylogger.debug(() -> "Shutting down motor executor for (" + activity.getAlias() + ")"); - - boolean wasStopped = false; - try { - executorService.shutdown(); - logger.trace(() -> "awaiting termination with timeout of " + secondsToWait + " seconds"); - wasStopped = executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - logger.trace("interrupted while awaiting termination"); - logger.warn("while waiting termination of shutdown " + activity.getAlias() + ", " + ie.getMessage()); - activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped); - } catch (RuntimeException e) { - logger.trace("Received exception while awaiting termination: " + e.getMessage()); - wasStopped = true; - exception = e; - } finally { - logger.trace(() -> "finally shutting down activity " + this.getActivity().getAlias()); - this.stoppedAt = System.currentTimeMillis(); - activity.setRunState(RunState.Stopped); - } - - if (exception != null) { - logger.trace(() -> "an exception caused the activity to stop:" + exception.getMessage()); - logger.warn("Setting ERROR on motor executor for activity '" + activity.getAlias() + "': " + exception.getMessage()); - throw new RuntimeException(exception); - } - - activitylogger.debug("motor executor for " + activity.getAlias() + ") wasstopped=" + wasStopped); - - return wasStopped; - } - - private void awaitActivityCompletion() { - RunStateImage state = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored); - RunState maxState = state.getMaxState(); - activity.setRunState(maxState); - if (maxState == RunState.Errored) { - throw new RuntimeException("Error while waiting for activity completion with states [" + tally.toString() + "], error=" + (this.exception!=null ? - this.exception.toString() : "[no error on activity executor]")); - - } - } - - private void startMotorExecutorService() { - this.executorService = new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, - 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(), - new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this)) - ); - } - - - /** - *

True-up the number of motor instances known to the executor. Start all non-running motors. - * The protocol between the motors and the executor should be safe as long as each state change is owned by either - * the motor logic or the activity executor but not both, and strictly serialized as well. This is enforced by - * forcing start(...) to be serialized as well as using CAS on the motor states.

- *

The startActivity method may be called to true-up the number of active motors in an activity executor after - * changes to threads.

- */ - private void startRunningActivityThreads() { - - logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary()); - Annotators.recordAnnotation(Annotation.newBuilder() - .element(this) - .now() - .layer(Layer.Activity) - .addDetail("params", getActivityDef().toString()) - .build() - ); - - activitylogger.debug("START/before alias=(" + activity.getAlias() + ")"); - - try { - activity.setRunState(RunState.Starting); - this.startedAt = System.currentTimeMillis(); - activity.onActivityDefUpdate(activityDef); - } catch (Exception e) { - this.exception = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e); - activitylogger.error(() -> "error initializing activity '" + activity.getAlias() + "': " + exception); - throw new RuntimeException(exception); - } - adjustMotorCountToThreadParam(activity.getActivityDef()); - tally.awaitAny(RunState.Running, RunState.Finished, RunState.Stopped); - activity.setRunState(RunState.Running); - activitylogger.debug("START/after alias=(" + activity.getAlias() + ")"); - } - - - @Override - public NBLabels getLabels() { - return activity.getLabels(); - } - - public synchronized void finish(boolean graceful) { - if (graceful) { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } else { - logger.warn("Activity was interrupted by process exit, shutting down ungracefully. Annotations are still submitted."); - } - if (shutdownHook == null) return; // In case of a race condition, only prevented by object monitor - else shutdownHook = null; - - stoppedAt = System.currentTimeMillis(); //TODO: Make only one endedAtMillis assignment - - Annotators.recordAnnotation(Annotation.newBuilder() - .element(this) - .interval(startedAt, stoppedAt) - .layer(Layer.Activity) - .addDetail("event", "stop-activity") - .addDetail("params", activityDef.toString()) - .build()); - } - - public void awaitMotorsRunningOrTerminalState() { - awaitMotorsAtLeastRunning(); - } - - public boolean awaitAllThreadsOnline(long timeoutMs) { - RunStateImage image = tally.awaitNoneOther(timeoutMs, RunState.Running); - return image.isNoneOther(RunState.Running); - } - - private class ThreadsGauge implements Gauge { - public ThreadsGauge(ActivityExecutor activityExecutor) { - ActivityExecutor ae = activityExecutor; - } - - @Override - public Double getValue() { - return (double) ActivityExecutor.this.motors.size(); - } - } -} - - diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java index 67cfae398..4ef76127c 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityRuntimeInfo.java @@ -21,6 +21,7 @@ import io.nosqlbench.engine.api.activityapi.core.RunState; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; import io.nosqlbench.engine.core.lifecycle.ExecutionResult; +import io.nosqlbench.engine.core.lifecycle.scenario.container.ActivityExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -37,10 +38,9 @@ public class ActivityRuntimeInfo implements ProgressCapable { private final ActivityExecutor executor; public ActivityRuntimeInfo(Activity activity, Future result, ActivityExecutor executor) { - + this.executor = executor; this.activity = activity; this.future = result; - this.executor = executor; } @Override @@ -76,23 +76,24 @@ public class ActivityRuntimeInfo implements ProgressCapable { return this.activity; } - public boolean isRunning() { - return executor.isRunning(); - } public RunState getRunState() { return this.activity.getRunState(); } - public void stopActivity() { this.executor.stopActivity(); } + public boolean isRunning() { + throw new RuntimeException("implement me"); + } - public void forceStopActivity() { this.executor.forceStopActivity(); } + public void forceStopActivity() { + throw new RuntimeException("implement me"); + } + + public void stopActivity() { + throw new RuntimeException("implement me"); + } public ActivityExecutor getActivityExecutor() { - return executor; - } - - public boolean awaitAllThreadsOnline(long timeoutMs) { - return this.executor.awaitAllThreadsOnline(timeoutMs); + return this.executor; } } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/LifetimeThread.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/LifetimeThread.java new file mode 100644 index 000000000..38125b26a --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/LifetimeThread.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.lifecycle.activity; + +import io.nosqlbench.engine.core.lifecycle.ExecutionResult; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class LifetimeThread implements Callable { + private final static Logger logger = LogManager.getLogger(); + private ExecutionResult result = null; + + private final String name; + private CompletableFuture future; + + public LifetimeThread(String name) { + this.name = name; + } + + @Override + public ExecutionResult call() { + logger.debug("lifetime scope '" + name + "' starting"); + this.future = new CompletableFuture(); + + while (result == null) { + try { + this.result = future.get(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + logger.debug("lifetime scope '" + name + "' ending"); + return result; + } + + public void shutdown(ExecutionResult result) { + this.future.complete(result); + } + +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/commands/CMD_run2.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/commands/CMD_run2.java new file mode 100644 index 000000000..1bbc12902 --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/commands/CMD_run2.java @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.lifecycle.commands; + +import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController; +import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer; +import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams; +import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand; +import io.nosqlbench.nb.annotations.Service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.PrintWriter; +import java.io.Reader; + +@Service(value = NBBaseCommand.class, selector = "run2") +public class CMD_run2 extends NBBaseCommand { + public final static Logger logger = LogManager.getLogger("run2"); + + public CMD_run2(NBBufferedContainer parentComponent, String stepName, String targetScenario) { + super(parentComponent, stepName, targetScenario); + } + + @Override + public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) { + controller.run(params); + return null; + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/commands/INFO_run2.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/commands/INFO_run2.java new file mode 100644 index 000000000..68ad0ff7e --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/commands/INFO_run2.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.lifecycle.commands; + +import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBCommandInfo; +import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBInvokableCommand; +import io.nosqlbench.nb.annotations.Service; + +@Service(value = NBCommandInfo.class,selector = "run2") +public class INFO_run2 extends NBCommandInfo { + @Override + public Class getType() { + return CMD_run2.class; + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/container/ActivityExecutor.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/container/ActivityExecutor.java new file mode 100644 index 000000000..1fc2e963c --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/container/ActivityExecutor.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.lifecycle.scenario.container; + +import io.nosqlbench.engine.api.activityapi.core.Activity; +import io.nosqlbench.engine.api.activityapi.core.Startable; +import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultSegmentBuffer; +import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment; +import io.nosqlbench.engine.api.activityapi.input.Input; +import io.nosqlbench.engine.core.lifecycle.ExecutionResult; +import io.nosqlbench.engine.core.lifecycle.activity.LifetimeThread; + +import java.util.concurrent.Callable; +import java.util.concurrent.StructuredTaskScope; + +public class ActivityExecutor implements Callable { + private final Activity activity; + private LifetimeThread lifetimeThread; + + StructuredTaskScope scope; + public ActivityExecutor(Activity activity) { + this.activity = activity; + } + + @Override + public ExecutionResult call() throws Exception { + long startedAt=System.currentTimeMillis(); + long endedAt=0L; + Exception error = null; + try (StructuredTaskScope.ShutdownOnFailure t= new StructuredTaskScope.ShutdownOnFailure()) { + lifetimeThread = new LifetimeThread(activity.getAlias()); + t.fork(lifetimeThread); + runCycles(t,activity); + lifetimeThread = new LifetimeThread(activity.getAlias()); + t.join(); + } catch (Exception e) { + error=e; + + } finally { + endedAt=System.currentTimeMillis(); + } + return new ExecutionResult(startedAt,endedAt,"",error); + } + + private void runCycles(StructuredTaskScope.ShutdownOnFailure t, Activity activity) { + Input input = activity.getInputDispenserDelegate().getInput(); + if (input instanceof Startable) { + ((Startable) input).start(); + } + + CycleSegment cycleSegment = null; + int stride = activity.getActivityDef().getParams().getOptionalInteger("stride").orElse(1); + CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride); + + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/container/ContainerActivitiesController.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/container/ContainerActivitiesController.java index a721fe474..e8abea678 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/container/ContainerActivitiesController.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/scenario/container/ContainerActivitiesController.java @@ -1,4 +1,4 @@ -/* + /* * Copyright (c) 2022-2023 nosqlbench * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -25,7 +25,6 @@ import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; import io.nosqlbench.engine.core.lifecycle.ExecutionResult; import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory; import io.nosqlbench.engine.core.lifecycle.activity.ActivitiesExceptionHandler; -import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor; import io.nosqlbench.engine.core.lifecycle.activity.ActivityLoader; import io.nosqlbench.engine.core.lifecycle.activity.ActivityRuntimeInfo; import org.apache.logging.log4j.LogManager; @@ -70,7 +69,6 @@ public class ContainerActivitiesController extends NBBaseComponent { return ari.getActivity(); } - private ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) { if (!this.activityInfoMap.containsKey(activityDef.getAlias())) { Activity activity = this.activityLoader.loadActivity(activityDef, this); @@ -78,13 +76,13 @@ public class ContainerActivitiesController extends NBBaseComponent { ActivityExecutor executor = new ActivityExecutor(activity); Future startedActivity = executorService.submit(executor); ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor); - activityRuntimeInfo.getActivityExecutor().awaitMotorsRunningOrTerminalState(); this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo); } return this.activityInfoMap.get(activityDef.getAlias()); } + /** * Start an activity, given a map which holds the activity definition for it. The activity will be known in * the scenario by the alias parameter. @@ -94,7 +92,6 @@ public class ContainerActivitiesController extends NBBaseComponent { public Activity start(Map activityDefMap) { ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap)); Activity started = start(ad); - awaitAllThreadsOnline(started,30000L); return started; } @@ -120,9 +117,7 @@ public class ContainerActivitiesController extends NBBaseComponent { * @param activityDef A definition for an activity to run */ public synchronized void run(ActivityDef activityDef, long timeoutMs) { - - doStartActivity(activityDef); - awaitActivity(activityDef, timeoutMs); + throw new RuntimeException("implement me"); } public synchronized void run(int timeout, String activityDefString) { @@ -176,26 +171,12 @@ public class ContainerActivitiesController extends NBBaseComponent { runtimeInfo.stopActivity(); } - public boolean awaitAllThreadsOnline(ActivityDef activityDef, long timeoutMs) { - ActivityRuntimeInfo runtimeInfo = this.activityInfoMap.get(activityDef.getAlias()); - if (null == runtimeInfo) { - throw new RuntimeException("could not stop missing activity:" + activityDef); - } - - scenariologger.debug("STOP {}", activityDef.getAlias()); - return runtimeInfo.awaitAllThreadsOnline(timeoutMs); - } public synchronized void stop(Activity activity) { stop(activity.getActivityDef()); } - public boolean awaitAllThreadsOnline(Activity activity, long timeoutMs) { - return awaitAllThreadsOnline(activity.getActivityDef(), timeoutMs); - } - - /** *

Stop an activity, given an activity def map. The only part of the map that is important is the @@ -342,8 +323,8 @@ public class ContainerActivitiesController extends NBBaseComponent { */ public synchronized void forceStopActivities(int waitTimeMillis) { logger.debug("force stopping scenario {}", description()); - activityInfoMap.values().forEach(a -> a.getActivityExecutor().forceStopActivity(2000)); - logger.debug("Scenario force stopped."); + throw new RuntimeException("implement me"); + //logger.debug("Scenario force stopped."); } // public synchronized void stopAll() { diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityimpl/motor/RunStateImageTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityimpl/motor/RunStateImageTest.java deleted file mode 100644 index 2084f90d4..000000000 --- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityimpl/motor/RunStateImageTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.api.activityimpl.motor; - -import io.nosqlbench.engine.api.activityapi.core.RunState; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -public class RunStateImageTest { - - @Test - public void testMaxStateImage() { - int[] counts = new int[RunState.values().length]; - counts[RunState.Running.ordinal()]=3; - counts[RunState.Starting.ordinal()]=2; - RunStateImage image = new RunStateImage(counts, false); - assertThat(image.is(RunState.Running)).isTrue(); - assertThat(image.is(RunState.Starting)).isTrue(); - assertThat(image.isTimeout()).isFalse(); - assertThat(image.is(RunState.Errored)).isFalse(); - assertThat(image.isNoneOther(RunState.Starting, RunState.Running)).isTrue(); - RunState maxState = image.getMaxState(); - assertThat(maxState).isEqualTo(RunState.values()[RunState.Running.ordinal()]); - RunState minState = image.getMinState(); - assertThat(minState).isEqualTo(RunState.values()[RunState.Starting.ordinal()]); - } - -} diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityimpl/motor/RunStateTallyTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityimpl/motor/RunStateTallyTest.java deleted file mode 100644 index 5eb13c631..000000000 --- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityimpl/motor/RunStateTallyTest.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.api.activityimpl.motor; - -import io.nosqlbench.engine.api.activityapi.core.RunState; -import org.junit.jupiter.api.*; - -import static org.assertj.core.api.Assertions.assertThat; - -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) -public class RunStateTallyTest { - - volatile boolean awaited = false; - volatile RunStateImage event = null; - - @BeforeEach - public void setup() { - awaited = false; - event = null; - } - - @Test - @Order(1) - public void testAwaitAny() { - Thread.currentThread().setName("SETTER"); - - RunStateTally tally = new RunStateTally(); - awaited = false; - Thread waiter = new Thread(new Runnable() { - @Override - public void run() { - event = tally.awaitAny(RunState.Running); - awaited = true; - } - }); - waiter.setName("WAITER"); - waiter.setDaemon(true); - waiter.start(); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isFalse(); - tally.add(RunState.Running); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - - assertThat(event.is(RunState.Running)).isTrue(); - assertThat(event.isOnly(RunState.Running)).isTrue(); - - assertThat(awaited).isTrue(); - assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE); - } - - @Test - @Order(2) - public void testAwaitNoneOf() { - Thread.currentThread().setName("SETTER"); - - RunStateTally tally = new RunStateTally(); - tally.add(RunState.Uninitialized); - tally.add(RunState.Stopped); - awaited = false; - Thread waiter = new Thread(new Runnable() { - @Override - public void run() { - tally.awaitNoneOf(RunState.Stopped, RunState.Uninitialized); - awaited = true; - } - }); - waiter.setName("WAITER"); - waiter.setDaemon(true); - waiter.start(); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isFalse(); - tally.change(RunState.Stopped, RunState.Finished); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isFalse(); - tally.change(RunState.Uninitialized, RunState.Finished); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isTrue(); - assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE); - - } - - @Test - @Order(3) - public void testAwaitNoneOther() { - Thread.currentThread().setName("SETTER"); - - RunStateTally tally = new RunStateTally(); - tally.add(RunState.Uninitialized); - tally.add(RunState.Running); - awaited = false; - Thread waiter = new Thread(new Runnable() { - @Override - public void run() { - event = tally.awaitNoneOther(RunState.Stopped, RunState.Finished); - awaited = true; - } - }); - waiter.setName("WAITER"); - waiter.setDaemon(true); - waiter.start(); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isFalse(); - tally.change(RunState.Uninitialized, RunState.Finished); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isFalse(); - - // Note that neither Stopped or Finished are required to be positive, - // as long as all others are zero total. - tally.remove(RunState.Running); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isTrue(); - assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE); - - } - - @Test - @Order(4) - public void testAwaitNoneOtherTimedOut() { - Thread.currentThread().setName("SETTER"); - - RunStateTally tally = new RunStateTally(); - tally.add(RunState.Uninitialized); - tally.add(RunState.Running); - Thread waiter = new Thread(new Runnable() { - @Override - public void run() { - event = tally.awaitNoneOther(1500, RunState.Stopped, RunState.Finished); - awaited = true; - } - }); - waiter.setName("WAITER"); - waiter.setDaemon(true); - waiter.start(); - - try { - Thread.sleep(100); - } catch (Exception e) { - } - - assertThat(awaited).isFalse(); - tally.change(RunState.Uninitialized, RunState.Finished); - - try { - Thread.sleep(1500); - } catch (Exception e) { - } - -// try { -// waiter.join(); -// } catch (InterruptedException e) { -// throw new RuntimeException(e); -// } - - assertThat(event.isOnly(RunState.Errored)).isFalse(); - assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE); - - } - - -} diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java deleted file mode 100644 index cc2d65bb8..000000000 --- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.core; - -import io.nosqlbench.nb.api.config.standard.TestComponent; -import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; -import io.nosqlbench.engine.api.activityapi.core.*; -import io.nosqlbench.engine.api.activityapi.input.Input; -import io.nosqlbench.engine.api.activityapi.input.InputDispenser; -import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; -import io.nosqlbench.engine.api.activityimpl.CoreServices; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; -import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser; -import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser; -import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor; -import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser; -import io.nosqlbench.engine.core.lifecycle.ExecutionResult; -import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor; -import io.nosqlbench.engine.core.lifecycle.activity.ActivityTypeLoader; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.*; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; - -class ActivityExecutorTest { - private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class); - -// TODO: Design review of this mechanism -// @Test -// synchronized void testRestart() { -// ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-restart;cycles=1000;cyclerate=10;op=initdelay:initdelay=5000;"); -// new ActivityTypeLoader().load(activityDef); -// -// final Activity activity = new DelayedInitActivity(activityDef); -// InputDispenser inputDispenser = new CoreInputDispenser(activity); -// ActionDispenser adisp = new CoreActionDispenser(activity); -// OutputDispenser tdisp = CoreServices.getOutputDispenser(activity).orElse(null); -// -// final MotorDispenser mdisp = new CoreMotorDispenser(activity, inputDispenser, adisp, tdisp); -// activity.setActionDispenserDelegate(adisp); -// activity.setOutputDispenserDelegate(tdisp); -// activity.setInputDispenserDelegate(inputDispenser); -// activity.setMotorDispenserDelegate(mdisp); -// -// final ExecutorService executor = Executors.newCachedThreadPool(); -// ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-restart"); -// final Future future = executor.submit(activityExecutor); -// try { -// activityDef.setThreads(1); -// activityExecutor.startActivity(); -// Thread.sleep(100L); -// activityExecutor.stopActivity(); -// Thread.sleep(100L); -// activityExecutor.startActivity(); -// Thread.sleep(100L); -// activityExecutor.stopActivity(); -// future.get(); -// } catch (Exception e) { -// throw new RuntimeException(e); -// } -// executor.shutdown(); -// assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNotNull(); -// -// } - - @Test - synchronized void testDelayedStartSanity() { - - ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;"); - new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE); - - Activity activity = new DelayedInitActivity(activityDef); - final InputDispenser inputDispenser = new CoreInputDispenser(activity); - final ActionDispenser actionDispenser = new CoreActionDispenser(activity); - final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null); - - MotorDispenser motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); - activity.setActionDispenserDelegate(actionDispenser); - activity.setOutputDispenserDelegate(outputDispenser); - activity.setInputDispenserDelegate(inputDispenser); - activity.setMotorDispenserDelegate(motorDispenser); - - ActivityExecutor activityExecutor = new ActivityExecutor(activity); - - ExecutorService testExecutor = Executors.newCachedThreadPool(); - Future future = testExecutor.submit(activityExecutor); - - try { - activityDef.setThreads(1); - future.get(); - testExecutor.shutdownNow(); - - } catch (final Exception e) { - fail("Unexpected exception", e); - } - - assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull(); - } - - @Test - synchronized void testNewActivityExecutor() { - - final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-dynamic-params;cycles=1000;initdelay=5000;"); - new ActivityTypeLoader().load(activityDef,TestComponent.INSTANCE); - - Activity simpleActivity = new SimpleActivity(TestComponent.INSTANCE,activityDef); - -// this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef)); - - final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity); - final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity); - final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null); - - MotorDispenser motorDispenser = new CoreMotorDispenser<>(simpleActivity, - inputDispenser, actionDispenser, outputDispenser); - - simpleActivity.setActionDispenserDelegate(actionDispenser); - simpleActivity.setInputDispenserDelegate(inputDispenser); - simpleActivity.setMotorDispenserDelegate(motorDispenser); - - ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity); - activityDef.setThreads(5); - ForkJoinTask executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor); - -// activityExecutor.startActivity(); - - final int[] speeds = {1, 50, 5, 50, 2, 50}; - for (int offset = 0; offset < speeds.length; offset += 2) { - final int threadTarget = speeds[offset]; - final int threadTime = speeds[offset + 1]; - - ActivityExecutorTest.logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds."); - activityDef.setThreads(threadTarget); - - try { - Thread.sleep(threadTime); - } catch (final Exception e) { - fail("Not expecting exception", e); - } - } - executionResultForkJoinTask.cancel(true); - - // Used for slowing the roll due to state transitions in test. - try { - activityExecutor.stopActivity(); -// Thread.sleep(2000L); - } catch (final Exception e) { - fail("Not expecting exception", e); - } - } - - private MotorDispenser getActivityMotorFactory(final Action lc, Input ls) { - return new MotorDispenser<>() { - @Override - public Motor getMotor(final ActivityDef activityDef, final int slotId) { - final Activity activity = new SimpleActivity(TestComponent.INSTANCE,activityDef); - final Motor cm = new CoreMotor<>(activity, slotId, ls); - cm.setAction(lc); - return cm; - } - }; - } - - private SyncAction motorActionDelay(long delay) { - return new SyncAction() { - @Override - public int runCycle(final long cycle) { - ActivityExecutorTest.logger.info(() -> "consuming " + cycle + ", delaying:" + delay); - try { - Thread.sleep(delay); - } catch (final InterruptedException ignored) { - } - return 0; - } - }; - - } - - private static class DelayedInitActivity extends SimpleActivity { - private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class); - - public DelayedInitActivity(final ActivityDef activityDef) { - super(TestComponent.INSTANCE,activityDef); - } - - @Override - public void initActivity() { - final Integer initDelay = this.activityDef.getParams().getOptionalInteger("initdelay").orElse(0); - DelayedInitActivity.logger.info(() -> "delaying for " + initDelay); - try { - Thread.sleep(initDelay); - } catch (final InterruptedException ignored) { - } - DelayedInitActivity.logger.info(() -> "delayed for " + initDelay); - } - } -} diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/CoreMotorTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/CoreMotorTest.java deleted file mode 100644 index e210ddd23..000000000 --- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/CoreMotorTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (c) 2022-2023 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.core; - -import io.nosqlbench.nb.api.config.standard.TestComponent; -import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; -import io.nosqlbench.engine.api.activityapi.core.Action; -import io.nosqlbench.engine.api.activityapi.core.Activity; -import io.nosqlbench.engine.api.activityapi.core.Motor; -import io.nosqlbench.engine.api.activityapi.core.SyncAction; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; -import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor; -import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongArray; -import java.util.function.Predicate; - -import static org.assertj.core.api.Assertions.assertThat; - -public class CoreMotorTest { - - @Test - public void testBasicActivityMotor() { - final Activity activity = new SimpleActivity( - new TestComponent("testing", "coremotor"), - ActivityDef.parseActivityDef("alias=foo") - ); - final BlockingSegmentInput lockstepper = new BlockingSegmentInput(); - final Motor cm = new CoreMotor(activity, 5L, lockstepper); - final AtomicLong observableAction = new AtomicLong(-3L); - cm.setAction(this.getTestConsumer(observableAction)); - final Thread t = new Thread(cm); - t.setName("TestMotor"); - t.start(); - try { - Thread.sleep(1000); // allow action time to be waiting in monitor for test fixture - } catch (final InterruptedException ignored) { - } - - lockstepper.publishSegment(5L); - final boolean result = this.awaitCondition(atomicInteger -> 5L == atomicInteger.get(), observableAction, 5000, 100); - assertThat(observableAction.get()).isEqualTo(5L); - } - - @Test - public void testIteratorStride() { - SimpleActivity activity = new SimpleActivity(TestComponent.INSTANCE, "stride=3"); - final BlockingSegmentInput lockstepper = new BlockingSegmentInput(); - final Motor cm1 = new CoreMotor(activity, 1L, lockstepper); - final AtomicLongArray ary = new AtomicLongArray(10); - final Action a1 = this.getTestArrayConsumer(ary); - cm1.setAction(a1); - - final Thread t1 = new Thread(cm1); - t1.setName("cm1"); - t1.start(); - try { - Thread.sleep(500); // allow action time to be waiting in monitor for test fixture - } catch (final InterruptedException ignored) { - } - - lockstepper.publishSegment(11L, 12L, 13L); - - final boolean result = this.awaitAryCondition(ala -> 13L == ala.get(2), ary, 5000, 100); - assertThat(ary.get(0)).isEqualTo(11L); - assertThat(ary.get(1)).isEqualTo(12L); - assertThat(ary.get(2)).isEqualTo(13L); - assertThat(ary.get(3)).isEqualTo(0L); - - } - - private SyncAction getTestArrayConsumer(AtomicLongArray ary) { - return new SyncAction() { - private int offset; - - @Override - public int runCycle(final long cycle) { - ary.set(this.offset, cycle); - this.offset++; - return 0; - } - }; - } - - private SyncAction getTestConsumer(AtomicLong atomicLong) { - return new SyncAction() { - @Override - public int runCycle(final long cycle) { - atomicLong.set(cycle); - return 0; - } - }; - } - - - private boolean awaitAryCondition(final Predicate atomicLongAryPredicate, final AtomicLongArray ary, final long millis, final long retry) { - final long start = System.currentTimeMillis(); - long now = start; - while (now < (start + millis)) { - final boolean result = atomicLongAryPredicate.test(ary); - if (result) return true; - try { - Thread.sleep(retry); - } catch (final InterruptedException ignored) { - } - now = System.currentTimeMillis(); - } - return false; - } - - private boolean awaitCondition(final Predicate atomicPredicate, final AtomicLong atomicInteger, final long millis, final long retry) { - final long start = System.currentTimeMillis(); - long now = start; - while (now < (start + millis)) { - final boolean result = atomicPredicate.test(atomicInteger); - if (result) return true; - try { - Thread.sleep(retry); - } catch (final InterruptedException ignored) { - } - now = System.currentTimeMillis(); - } - return false; - } -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutorShutdownHook.java b/nb-virtdata/virtdata-api/src/main/java/io/nosqlbench/virtdata/api/coords/VirtualCoord.java similarity index 57% rename from nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutorShutdownHook.java rename to nb-virtdata/virtdata-api/src/main/java/io/nosqlbench/virtdata/api/coords/VirtualCoord.java index 07b2be4ec..1c9550c04 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutorShutdownHook.java +++ b/nb-virtdata/virtdata-api/src/main/java/io/nosqlbench/virtdata/api/coords/VirtualCoord.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023 nosqlbench + * Copyright (c) 2024 nosqlbench * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,18 +14,14 @@ * limitations under the License. */ -package io.nosqlbench.engine.core.lifecycle.activity; +package io.nosqlbench.virtdata.api.coords; -public class ActivityExecutorShutdownHook extends Thread { - - private final ActivityExecutor activityExecutor; - public ActivityExecutorShutdownHook(ActivityExecutor activityExecutor) { - this.activityExecutor = activityExecutor; - } - - @Override - public void run() { - activityExecutor.finish(false); - } +/** + * A virtual coordinate, meant to index into virtual data space with named dimensions + */ +public record VirtualCoord( + VirtualSpace space, + long[] coords +) { } diff --git a/nb-virtdata/virtdata-api/src/main/java/io/nosqlbench/virtdata/api/coords/VirtualSpace.java b/nb-virtdata/virtdata-api/src/main/java/io/nosqlbench/virtdata/api/coords/VirtualSpace.java new file mode 100644 index 000000000..239188e3c --- /dev/null +++ b/nb-virtdata/virtdata-api/src/main/java/io/nosqlbench/virtdata/api/coords/VirtualSpace.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.virtdata.api.coords; + +public record VirtualSpace( + String[] names +) { + public static VirtualSpace BASE = + new VirtualSpace(new String[] { + "recycle", + "cycle", + "stride" + }); +} diff --git a/nb-virtdata/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/AlphaNumericString.java b/nb-virtdata/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/AlphaNumericString.java index 06f51e3c7..3420289ab 100644 --- a/nb-virtdata/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/AlphaNumericString.java +++ b/nb-virtdata/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/AlphaNumericString.java @@ -33,7 +33,7 @@ import java.util.function.LongToIntFunction; @Categories({Category.general}) public class AlphaNumericString implements LongFunction { private static final String AVAILABLE_CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"; - private final ThreadLocal threadStringBuilder = ThreadLocal.withInitial(StringBuilder::new); +// private final ThreadLocal threadStringBuilder = ThreadLocal.withInitial(StringBuilder::new); private final Hash hash = new Hash(); private final LongToIntFunction lengthFunc; @@ -69,8 +69,7 @@ public class AlphaNumericString implements LongFunction { } long hashValue = operand; - StringBuilder sb = threadStringBuilder.get(); - sb.setLength(0); + StringBuilder sb = new StringBuilder(); for (int i = 0; i < length; i++) { hashValue = hash.applyAsLong(hashValue); diff --git a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java index 4fa690440..8e3fb790a 100644 --- a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java +++ b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java @@ -94,7 +94,11 @@ public class CMD_reset extends NBBaseCommand { //TODO: This needs to be reworked, but simply calling controller.start on the flywheel results in 2 // copies of the activity running simultaneously. This is a temporary workaround. SimFrameUtils.awaitActivity(flywheel); - flywheel.getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(), 0).run(); + try { + flywheel.getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(), 0).call(); + } catch (Exception e) { + throw new RuntimeException(e); + } } return null; diff --git a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/findmax/FindmaxFrameFunction.java b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/findmax/FindmaxFrameFunction.java index 3a64fd2cd..a363ff3ff 100644 --- a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/findmax/FindmaxFrameFunction.java +++ b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/findmax/FindmaxFrameFunction.java @@ -62,7 +62,7 @@ public class FindmaxFrameFunction implements SimFrameFunction capture.stopWindow(); journal.record(params,capture.last()); System.out.println(journal.last()); - if (flywheel.getRunStateTally().tallyFor(RunState.Running)==0) { + if (flywheel.getRunState()!=RunState.Running) { System.out.println("state:" + flywheel.getRunState()); throw new RuntimeException("Early exit of flywheel activity '" + flywheel.getAlias() + "'. Can't continue."); + } return journal.last().value(); } diff --git a/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java b/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java index 334b6ef06..a9ebabf72 100644 --- a/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java +++ b/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java @@ -34,7 +34,7 @@ class ExitStatusIntegrationTests { ProcessInvoker invoker = new ProcessInvoker(); invoker.setLogDir("logs/test"); ProcessResult result = invoker.run("exitstatus_badparam", 15, - java, "-jar", JARNAME, "--logs-dir", "logs/test/badparam/", + java, "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/badparam/", "badparam" ); assertThat(result.exception).isNull(); @@ -48,7 +48,7 @@ class ExitStatusIntegrationTests { ProcessInvoker invoker = new ProcessInvoker(); invoker.setLogDir("logs/test"); ProcessResult result = invoker.run("exitstatus_initexception", 15, - java, "-jar", JARNAME, "--logs-dir", "logs/test/initerror", "run", + java, "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/initerror", "run", "driver=diag", "op=initdelay:initdelay=notanumber" ); assertThat(result.exception).isNull(); @@ -64,7 +64,8 @@ class ExitStatusIntegrationTests { // Forcing a thread exception via basic command issue. ProcessResult result = invoker.run("exitstatus_threadexception", 30, - "java", "-jar", JARNAME, "--logs-dir", "logs/test/threadexcep", "--logs-level", "debug", "run", + "java", "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/threadexcep", "--logs-level", + "debug", "run", "driver=diag", "cyclerate=10", "not_a_thing", "cycles=100", "-vvv" ); String stdout = String.join("\n", result.getStdoutData()); @@ -77,7 +78,8 @@ class ExitStatusIntegrationTests { ProcessInvoker invoker = new ProcessInvoker(); invoker.setLogDir("logs/test"); ProcessResult result = invoker.run("exitstatus_asyncstoprequest", 60, - "java", "-jar", JARNAME, "--logs-dir", "logs/test/asyncstop", "--logs-level", "debug", "run", + "java", "--enable-preview", "-jar", JARNAME, "--logs-dir", "logs/test/asyncstop", "--logs-level", + "debug", "run", "driver=diag", "threads=2", "cyclerate=10", "op=erroroncycle:erroroncycle=10", "cycles=50", "-vvv" ); assertThat(result.exception).isNull();