this commit fully replaces Activity with StandardActivity

This commit is contained in:
Jonathan Shook 2024-12-19 14:26:43 -06:00
parent 862ea8fd3b
commit ced01b132b
58 changed files with 151 additions and 298 deletions

View File

@ -30,7 +30,7 @@ import io.nosqlbench.nb.annotations.Service;
import java.util.function.LongFunction;
/**
* Special thanks to Justin Chu who authored the original NoSQLBench MongoDB ActivityType.
* Special thanks to Justin Chu who authored the original NoSQLBench MongoDB StandardActivityType.
*/
@Service(value = DriverAdapter.class, selector = "mongodb")
public class MongodbDriverAdapter extends BaseDriverAdapter<MongoOp<?>, MongoSpace> {

View File

@ -225,7 +225,7 @@ opfield1: value1
</UL>
</P>
<H3>Enabling Activity Params</H3>
<H3>Enabling StandardActivity Params</H3>
<P>If a user wants to allow an activity param as an default for an fields, they must publish the op
field
name in the configuration model for the activity. Otherwise it is an error to specify the value at
@ -353,7 +353,7 @@ prepared: false
document level,
down to each block and then down to each statement.
<H3>Activity Params</H3>
<H3>StandardActivity Params</H3>
<PRE>{@code
./nb run driver=... workload=... cl=LOCAL_QUORUM
}</PRE>

View File

@ -106,7 +106,7 @@ public class ActivityDef implements NBNamedElement {
}
/**
* Return tbe Activity Driver Adapter Name
* Return tbe StandardActivity Driver Adapter Name
*
* @return the driver adapter name
*/

View File

@ -76,7 +76,7 @@ public class GrafanaRegionAnalyzer implements Runnable {
//details:
// params: ActivityDef:(4)/{keycount=5000000000L, hosts=node1, main-cycles=500, threads=1, workload=./keyvalue.yaml, cycles=2, stride=2, tags=block:'schema.*', password=cassandra, rf=3, pooling=16:16:500, driver=cql, rampup-cycles=5000000000, alias=keyvalue_default_schema, valuecount=5000000000L, errors=count, username=cassandra}
//labels:
// layer: Activity
// layer: StandardActivity
// alias: keyvalue_default_schema
// driver: cql
// workload: ./keyvalue.yaml

View File

@ -16,7 +16,7 @@
package io.nosqlbench.engine.api.activityapi.core;
/**
* An ActionDispenser is created for each Activity instance within a scenario.
* An ActionDispenser is created for each StandardActivity instance within a scenario.
* When a thread is created, the motor and its input and action instances are resolved.
* The ActionDispenser is responsible for choosing how the action is resolved,
* whether that is a shared thread-safe action or an action per slot.

View File

@ -16,6 +16,8 @@
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import java.util.Map;
/**
@ -23,5 +25,5 @@ import java.util.Map;
* activities that are present in a configuration. Those dispensers will have th
*/
public interface ActivitiesAware {
void setActivitiesMap(Map<String,Activity> activities);
void setActivitiesMap(Map<String, StandardActivity> activities);
}

View File

@ -1,123 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.core;
import com.codahale.metrics.Counting;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import java.io.InputStream;
import java.io.PrintWriter;
/**
* Provides the components needed to build and run an activity a runtime.
* The easiest way to build a useful Activity is to extend {@link StandardActivity}.
*/
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent {
/**
* Register an object which should be closed after this activity is shutdown.
*
* @param closeable An Autocloseable object
*/
void registerAutoCloseable(AutoCloseable closeable);
ActivityDef getActivityDef();
default String getAlias() {
return this.getActivityDef().getAlias();
}
default ParameterMap getParams() {
return this.getActivityDef().getParams();
}
default void initActivity() {
}
/**
* Close all autocloseables that have been registered with this Activity.
*/
void closeAutoCloseables();
@Override
RunState getRunState();
void setRunState(RunState runState);
long getStartedAtMillis();
default void shutdownActivity() {
}
default String getCycleSummary() {
return this.getActivityDef().getCycleSummary();
}
/**
* Get the current cycle rate limiter for this activity.
* The cycle rate limiter is used to throttle the rate at which
* cycles are dispatched across all threads in the activity
* @return the cycle {@link RateLimiter}
*/
RateLimiter getCycleLimiter();
/**
* Get the current stride rate limiter for this activity.
* The stride rate limiter is used to throttle the rate at which
* new strides are dispatched across all threads in an activity.
* @return The stride {@link RateLimiter}
*/
RateLimiter getStrideLimiter();
PrintWriter getConsoleOut();
InputStream getConsoleIn();
void setConsoleOut(PrintWriter writer);
ErrorMetrics getExceptionMetrics();
// /**
// * When a driver needs to identify an error uniquely for the purposes of
// * routing it to the correct error handler, or naming it in logs, or naming
// * metrics, override this method in your activity.
// * @return A function that can reliably and safely map an instance of Throwable to a stable name.
// */
// default Function<Throwable,String> getErrorNameMapper() {
// return t -> t.getClass().getSimpleName();
// }
//
int getMaxTries();
default int getHdrDigits() {
return this.getParams().getOptionalInteger("hdr_digits").orElse(4);
}
RunStateTally getRunStateTally();
ActivityWiring getWiring();
}

View File

@ -17,6 +17,7 @@
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import org.apache.logging.log4j.Logger;
@ -25,9 +26,10 @@ import org.apache.logging.log4j.LogManager;
/**
*
* @param <D> An type of state holder for an operation, holding everything unique to that cycle and operation
* @param <A> An type of of an Activity, a state holder for a runtime instance of an Activity
* @param <A> An type of of an Activity, a state holder for a runtime instance of an StandardActivity
*/
public abstract class BaseAsyncAction<D, A extends Activity> implements AsyncAction<D>, Stoppable, ActivityDefObserver {
public abstract class BaseAsyncAction<D, A extends StandardActivity> implements AsyncAction<D>,
Stoppable, ActivityDefObserver {
private final static Logger logger = LogManager.getLogger("BaseAsyncAction");
protected final A activity;

View File

@ -21,6 +21,7 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.engine.metrics.instruments.*;
@ -33,7 +34,7 @@ public class ComponentActivityInstrumentation {
private static final String SERVICE_TIME = "_servicetime";
private static final String RESPONSE_TIME = "_responsetime";
private final Activity activity;
private final StandardActivity activity;
private final ActivityDef def;
private final ParameterMap params;
private final int hdrdigits;
@ -55,7 +56,7 @@ public class ComponentActivityInstrumentation {
private NBMetricGauge errorRateTotal;
private NBMetricGauge errorsTotal;
public ComponentActivityInstrumentation(final Activity activity) {
public ComponentActivityInstrumentation(final StandardActivity activity) {
this.activity = activity;
def = activity.getActivityDef();
params = this.def.getParams();

View File

@ -34,7 +34,7 @@ import java.io.PrintWriter;
/**
* Provides the components needed to build and run an activity a runtime.
* The easiest way to build a useful Activity is to extend {@link StandardActivity}.
* The easiest way to build a useful StandardActivity is to extend {@link StandardActivity}.
*/
public interface IActivityWiring extends Comparable<IActivityWiring>, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent {

View File

@ -18,7 +18,7 @@ package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
/**
* A MotorDispenser is created for each Activity instance within a scenario.
* A MotorDispenser is created for each StandardActivity instance within a scenario.
* When a thread is created, the motor and its input and action instances are resolved.
* The MotorDispenser is responsible for choosing how the motor is resolved,
* whether that is a shared thread-safe motor or, more conventionally, a separate motor per slot.

View File

@ -17,15 +17,11 @@
package io.nosqlbench.engine.api.activityapi.core.ops.fluent;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Counting;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.*;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

View File

@ -16,18 +16,15 @@
package io.nosqlbench.engine.api.activityapi.core.progress;
import com.codahale.metrics.Counting;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.nb.api.engine.util.Unit;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import java.time.Instant;
public class ActivityMetricProgressMeter implements ProgressMeterDisplay, CompletedMeter, RemainingMeter, ActiveMeter {
private final Activity activity;
private final StandardActivity activity;
private final Instant startInstant;
private final NBMetricTimer bindTimer;
private final NBMetricTimer cyclesTimer;

View File

@ -16,8 +16,8 @@
package io.nosqlbench.engine.api.activityapi.cyclelog.filters;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.nb.api.spi.SimpleServiceLoader;
@ -28,7 +28,7 @@ public interface ExperimentalResultFilterType {
SimpleServiceLoader<ExperimentalResultFilterType> FINDER =
new SimpleServiceLoader<>(ExperimentalResultFilterType.class, Maturity.Any);
default IntPredicateDispenser getFilterDispenser(Activity activity) {
default IntPredicateDispenser getFilterDispenser(StandardActivity activity) {
SimpleConfig conf = new SimpleConfig(activity.getWiring(), "resultfilter");
return getFilterDispenser(conf);
}

View File

@ -16,9 +16,7 @@
package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabeledElement;
@ -29,7 +27,6 @@ import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegmen
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results_rle.CycleResultsRLEBufferReadable;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResult;
import io.nosqlbench.engine.api.activityapi.input.Input;

View File

@ -22,13 +22,11 @@ import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results_rle.CycleRe
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results_rle.CycleSpanResults;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResult;
import io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog.CanFilterResultValue;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;

View File

@ -16,7 +16,6 @@
package io.nosqlbench.engine.api.activityapi.cyclelog.outputs.cyclelog;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.outputs.ReorderingConcurrentResultBuffer;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.output.Output;

View File

@ -18,7 +18,7 @@ package io.nosqlbench.engine.api.activityapi.cyclelog.outputs.logger;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -27,9 +27,9 @@ import org.apache.logging.log4j.Logger;
public class LoggingMarkerDispenser implements OutputDispenser {
private final static Logger logger = LogManager.getLogger(LoggingMarkerDispenser.class);
private final Activity activity;
private final StandardActivity activity;
public LoggingMarkerDispenser(Activity activity) {
public LoggingMarkerDispenser(StandardActivity activity) {
this.activity = activity;
}

View File

@ -17,7 +17,7 @@
package io.nosqlbench.engine.api.activityapi.input;
/**
* An InputDispenser is created for each Activity instance within a scenario.
* An InputDispenser is created for each StandardActivity instance within a scenario.
* When a thread is created, the motor and its input and action instances are resolved.
* The InputDispenser is responsible for choosing how the input is resolved,
* whether that is a shared thread-safe input or an input per slot.

View File

@ -16,11 +16,8 @@
package io.nosqlbench.engine.api.activityapi.input;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.spi.SimpleServiceLoader;
public interface InputType {

View File

@ -16,7 +16,6 @@
package io.nosqlbench.engine.api.activityapi.output;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.nb.api.components.core.NBComponent;

View File

@ -16,9 +16,9 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Activity;
public interface ActivityDispenser {
Activity getActivity(ActivityDef activityDef);
StandardActivity getActivity(ActivityDef activityDef);
}

View File

@ -19,9 +19,7 @@ package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.ResultFilterDispenser;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.ResultValueFilterType;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
@ -29,7 +27,6 @@ import io.nosqlbench.engine.api.activityapi.input.InputType;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputType;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import java.util.Optional;
import java.util.function.Predicate;
@ -38,7 +35,7 @@ public class CoreServices {
private static StandardActivity parent;
public static <A extends Activity> Optional<OutputDispenser> getOutputDispenser(
public static <A> Optional<OutputDispenser> getOutputDispenser(
NBComponent parent, ActivityWiring activity) {
OutputDispenser outputDispenser = new SimpleConfig(activity, "output").getString("type")
.flatMap(OutputType.FINDER::get)
@ -55,7 +52,7 @@ public class CoreServices {
return Optional.ofNullable(outputDispenser);
}
public static <A extends Activity> Optional<Predicate<ResultReadable>> getOutputFilter(ActivityWiring activity) {
public static <A> Optional<Predicate<ResultReadable>> getOutputFilter(ActivityWiring activity) {
String paramdata= activity.getParams().getOptionalString("of")
.orElse(activity.getParams().getOptionalString("outputfilter").orElse(null));
if (paramdata==null) {
@ -72,7 +69,7 @@ public class CoreServices {
// return intPredicateDispenser;
// }
//
public static <A extends Activity> InputDispenser getInputDispenser(StandardActivity activity) {
public static <A> InputDispenser getInputDispenser(StandardActivity activity) {
String inputTypeName = new SimpleConfig(activity, "input").getString("type").orElse("atomicseq");
InputType inputType = InputType.FINDER.getOrThrow(inputTypeName);
InputDispenser dispenser = inputType.getInputDispenser(activity);
@ -83,7 +80,7 @@ public class CoreServices {
return dispenser;
}
public static <A extends Activity> Optional<Predicate<ResultReadable>> getInputFilter(Activity activity) {
public static <A> Optional<Predicate<ResultReadable>> getInputFilter(StandardActivity activity) {
String paramdata= activity.getParams().getOptionalString("if")
.orElse(activity.getParams().getOptionalString("inputfilter").orElse(null));
if (paramdata==null) {

View File

@ -15,9 +15,7 @@
*/
package io.nosqlbench.engine.api.activityimpl.action;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import org.apache.logging.log4j.Logger;

View File

@ -19,7 +19,6 @@ package io.nosqlbench.engine.api.activityimpl.input;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.ActivitiesAware;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.input.InputType;
@ -29,7 +28,7 @@ import java.util.Map;
public class CoreInputDispenser implements InputDispenser, ActivitiesAware {
private final StandardActivity activity;
private Map<String, Activity> activities;
private Map<String, StandardActivity> activities;
private Input input;
public CoreInputDispenser(StandardActivity activity) {
@ -57,7 +56,7 @@ public class CoreInputDispenser implements InputDispenser, ActivitiesAware {
}
@Override
public void setActivitiesMap(Map<String, Activity> activities) {
public void setActivitiesMap(Map<String, StandardActivity> activities) {
this.activities = activities;
}

View File

@ -16,7 +16,6 @@
package io.nosqlbench.engine.api.activityimpl.input;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.input.InputType;
@ -33,10 +32,10 @@ public class TargetRateInputType implements InputType {
public static class Dispenser implements InputDispenser {
private final Activity activity;
private final StandardActivity activity;
private final AtomicInput input;
public Dispenser(Activity activity) {
public Dispenser(StandardActivity activity) {
this.activity = activity;
this.input = new AtomicInput(activity, activity.getActivityDef());
}

View File

@ -17,10 +17,10 @@
package io.nosqlbench.engine.api.activityimpl.marker;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResult;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsIntervalSegment;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@ -71,7 +71,7 @@ public class ContiguousOutputChunker implements Output {
initExtents();
}
public ContiguousOutputChunker(Activity activity) {
public ContiguousOutputChunker(StandardActivity activity) {
if (!(activity.getWiring().getInputDispenserDelegate().getInput(0).isContiguous())) {
throw new RuntimeException("This type of output may not be used with non-contiguous inputs yet.");

View File

@ -55,7 +55,7 @@ public class CoreMotor<D> extends NBBaseComponent implements ActivityDefObserver
private static final Logger logger = LogManager.getLogger(CoreMotor.class);
private final long slotId;
private final Activity activity;
private final StandardActivity activity;
private Timer inputTimer;
@ -69,7 +69,7 @@ public class CoreMotor<D> extends NBBaseComponent implements ActivityDefObserver
private Input input;
private SyncAction action;
// private final Activity activity;
// private final StandardActivity activity;
private Output output;
private final MotorState motorState;

View File

@ -17,13 +17,12 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
public class StandardActionDispenser implements ActionDispenser {
private final StandardActivity activity;
public <A extends Activity> StandardActionDispenser(StandardActivity activity) {
public <A> StandardActionDispenser(StandardActivity activity) {
this.activity = activity;
}

View File

@ -32,7 +32,9 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
@ -44,8 +46,8 @@ import io.nosqlbench.engine.api.activityimpl.OpLookupService;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
@ -84,7 +86,9 @@ import java.util.function.LongFunction;
@param <S>
The context type for the activity, AKA the 'space' for a named driver instance and its
associated object graph */
public class StandardActivity<R extends java.util.function.LongFunction, S> extends NBStatusComponent implements Activity, InvokableResult, SyntheticOpTemplateProvider, ActivityDefObserver {
public class StandardActivity<R extends java.util.function.LongFunction, S>
extends NBStatusComponent implements InvokableResult, SyntheticOpTemplateProvider, ActivityDefObserver, StateCapable,
ProgressCapable, Comparable<StandardActivity> {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends CycleOp<?>>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>, Space>> adapters = new ConcurrentHashMap<>();
@ -126,10 +130,6 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
this.activityDef = activityDef;
this.wiring = wiring;
int hdrdigits = getComponentProp("hdr_digits")
.map(Integer::parseInt).orElse(3);
this.pendingOpsCounter = create().counter(
"pending_ops",
MetricCategory.Core,
@ -142,6 +142,7 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
/// {@link OpSequence} in conjunction with
/// an {@link OpDispenser}. This is named for "binding
/// a cycle to an operation".
int hdrdigits = getHdrDigits();
this.bindTimer = create().timer(
"bind", hdrdigits, MetricCategory.Core,
"Time the step within a cycle which binds generated data to an op template to synthesize an executable operation."
@ -424,6 +425,10 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
}
public ParameterMap getParams() {
return activityDef.getParams();
}
private ParsedOp upconvert(
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
@ -476,7 +481,6 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
}
@Override
public void initActivity() {
initOrUpdateRateLimiters(this.activityDef);
setDefaultsFromOpSequence(sequence);
@ -548,7 +552,6 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
dedicated <em>state space</em> types. Any space which implements {@link Shutdownable}
will be closed when this activity shuts down.
*/
@Override
public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter<CycleOp<?>, Space>> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
@ -832,7 +835,6 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
return runState;
}
@Override
public synchronized void setRunState(RunState runState) {
this.runState = runState;
if (RunState.Running == runState) {
@ -840,12 +842,10 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
}
}
@Override
public long getStartedAtMillis() {
return startedAtMillis;
}
@Override
public ActivityDef getActivityDef() {
return activityDef;
}
@ -874,7 +874,12 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
}
@Override
/**
* Get the current cycle rate limiter for this activity.
* The cycle rate limiter is used to throttle the rate at which
* cycles are dispatched across all threads in the activity
* @return the cycle {@link RateLimiter}
*/
public RateLimiter getCycleLimiter() {
if (cycleLimiterSource!=null) {
return cycleLimiterSource.get();
@ -882,7 +887,12 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
return null;
}
}
@Override
/**
* Get the current stride rate limiter for this activity.
* The stride rate limiter is used to throttle the rate at which
* new strides are dispatched across all threads in an activity.
* @return The stride {@link RateLimiter}
*/
public synchronized RateLimiter getStrideLimiter() {
if (strideLimiterSource!=null) {
return strideLimiterSource.get();
@ -891,19 +901,17 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
}
}
@Override
public RunStateTally getRunStateTally() {
return tally;
}
@Override
public ActivityWiring getWiring() {
return this.wiring;
}
@Override
public Map<String, String> asResult() {
return Map.of("activity",this.getAlias());
return Map.of("activity",this.getActivityDef().getAlias());
}
/**
@ -913,7 +921,6 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
*
* @return The number of allowable retries
*/
@Override
public int getMaxTries() {
return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
}
@ -927,7 +934,6 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
return errorHandler;
}
@Override
public void closeAutoCloseables() {
for (AutoCloseable closeable : closeables) {
logger.debug(() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
@ -941,16 +947,14 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
}
@Override
public int compareTo(Activity o) {
return getAlias().compareTo(o.getAlias());
public int compareTo(StandardActivity o) {
return this.getActivityDef().getAlias().compareTo(o.getActivityDef().getAlias());
}
@Override
public void registerAutoCloseable(AutoCloseable closeable) {
this.closeables.add(closeable);
}
@Override
public synchronized PrintWriter getConsoleOut() {
if (null == console) {
this.console = new PrintWriter(System.out, false, StandardCharsets.UTF_8);
@ -958,17 +962,14 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
return this.console;
}
@Override
public synchronized InputStream getConsoleIn() {
return System.in;
}
@Override
public void setConsoleOut(PrintWriter writer) {
this.console = writer;
}
@Override
public synchronized ErrorMetrics getExceptionMetrics() {
if (null == this.errorMetrics) {
errorMetrics = new ErrorMetrics(this);
@ -977,6 +978,12 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
}
public String getAlias() {
return getActivityDef().getAlias();
}
public int getHdrDigits() {
return getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3);
}
}

View File

@ -18,10 +18,8 @@ package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivitiesAware;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.MotorDispenser;
@ -68,7 +66,7 @@ public class StandardActivityType<A extends StandardActivity<?,?>> {
* Create an instance of an activity from the activity type.
*
* @param activityDef the definition that initializes and controls the activity.
* @return a distinct Activity instance for each call
* @return a distinct StandardActivity instance for each call
*/
@SuppressWarnings("unchecked")
public A getActivity(final ActivityDef activityDef,
@ -96,9 +94,9 @@ public class StandardActivityType<A extends StandardActivity<?,?>> {
* @param activities a map of existing activities
* @return a distinct activity instance for each call
*/
public Activity getAssembledActivity(
public StandardActivity getAssembledActivity(
final NBComponent parent, final ActivityDef activityDef,
final Map<String, Activity> activities
final Map<String, StandardActivity> activities
) {
// final A activity = this.getActivity(activityDef, parent);
ActivityWiring wiring = new ActivityWiring(activityDef);
@ -141,7 +139,7 @@ public class StandardActivityType<A extends StandardActivity<?,?>> {
* Return the InputDispenser instance that will be used by the associated activity to create Input factories
* for each thread slot.
*
* @param activity the Activity instance which will parameterize this InputDispenser
* @param activity the StandardActivity instance which will parameterize this InputDispenser
* @return the InputDispenser for the associated activity
*/
public InputDispenser getInputDispenser(final StandardActivity activity) {

View File

@ -16,8 +16,8 @@
package io.nosqlbench.engine.api.util;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import java.util.*;
import java.util.regex.Pattern;
@ -34,13 +34,13 @@ public class ConfigTuples implements Iterable<ConfigTuples.Section> {
this.sections = sections;
}
public ConfigTuples(Activity activity, String param) {
public ConfigTuples(StandardActivity activity, String param) {
this(activity.getParams().getOptionalString(param).orElse(""));
}
private List<Section> parseParams(String configdata) {
try {
List<Section> sections = Arrays.stream(configdata.split("[,]"))
List<Section> sections = Arrays.stream(configdata.split(","))
.filter(Objects::nonNull)
.filter(s -> !s.isEmpty())
.map(s -> new Section(s, "[:=]"))

View File

@ -16,8 +16,8 @@
package io.nosqlbench.engine.api.util;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
@ -35,7 +35,7 @@ public class SimpleConfig {
this.params = parseParams(configdata);
}
public SimpleConfig(Activity activity, String params) {
public SimpleConfig(StandardActivity activity, String params) {
this(activity.getActivityDef(),params);
}
public SimpleConfig(ActivityWiring wiring, String param) {
@ -50,7 +50,7 @@ public class SimpleConfig {
private Map<String, String> parseParams(String configdata) {
try {
return Arrays.stream(configdata.split("[,]"))
return Arrays.stream(configdata.split(","))
.filter(Objects::nonNull)
.filter(s -> !s.isEmpty())
// .peek(System.out::println)

View File

@ -17,6 +17,7 @@ package io.nosqlbench.engine.core.lifecycle.activity;
import com.codahale.metrics.Gauge;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
@ -68,7 +69,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
private static final Logger activitylogger = LogManager.getLogger("ACTIVITY");
private final LinkedList<Motor<?>> motors = new LinkedList<>();
private final Activity activity;
private final StandardActivity activity;
private final ActivityDef activityDef;
private final RunStateTally tally;
private final MotorDispenser motorSource;
@ -81,7 +82,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
private ActivityExecutorShutdownHook shutdownHook = null;
private NBMetricGauge threadsGauge;
public ActivityExecutor(Activity activity) {
public ActivityExecutor(StandardActivity activity) {
this.activity = activity;
this.activityDef = activity.getActivityDef();
this.motorSource = activity.getWiring().getMotorDispenserDelegate();
@ -145,7 +146,8 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
public Exception forceStopActivity(int initialMillisToWait) {
activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")");
activitylogger.debug("FORCE STOP/before alias=(" + activity.getActivityDef().getAlias() +
")");
activity.setRunState(RunState.Stopped);
executorService.shutdownNow();
@ -378,7 +380,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
return motors.stream().anyMatch(m -> m.getState().get() == RunState.Running);
}
public Activity getActivity() {
public StandardActivity getActivity() {
return activity;
}
@ -542,7 +544,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
*/
private void startRunningActivityThreads() {
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getActivityDef().getCycleSummary());
Annotators.recordAnnotation(Annotation.newBuilder()
.element(this)
.now()

View File

@ -16,9 +16,9 @@
package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -33,14 +33,14 @@ import java.util.concurrent.ConcurrentHashMap;
*/
public class ActivityLoader {
private static final Logger logger = LogManager.getLogger("ACTIVITIES");
private final Map<String, Activity> activityMap = new ConcurrentHashMap<>();
private final Map<String, StandardActivity> activityMap = new ConcurrentHashMap<>();
public ActivityLoader() {
}
public synchronized Activity loadActivity(ActivityDef activityDef, final NBComponent parent) {
public synchronized StandardActivity loadActivity(ActivityDef activityDef, final NBComponent parent) {
activityDef= activityDef.deprecate("yaml","workload").deprecate("type","driver");
final Activity activity =
final StandardActivity activity =
new StandardActivityType<>(activityDef, parent).getAssembledActivity(parent, activityDef, this.activityMap);this.activityMap.put(activity.getAlias(),activity);
ActivityLoader.logger.debug("Resolved activity for alias '{}'", activityDef.getAlias());
return activity;

View File

@ -16,10 +16,10 @@
package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -32,11 +32,12 @@ import java.util.concurrent.TimeoutException;
public class ActivityRuntimeInfo implements ProgressCapable {
private final static Logger logger = LogManager.getLogger(ActivityRuntimeInfo.class);
private final Activity activity;
private final StandardActivity activity;
private final Future<ExecutionResult> future;
private final ActivityExecutor executor;
public ActivityRuntimeInfo(Activity activity, Future<ExecutionResult> result, ActivityExecutor executor) {
public ActivityRuntimeInfo(StandardActivity activity, Future<ExecutionResult> result,
ActivityExecutor executor) {
this.activity = activity;
this.future = result;
@ -72,7 +73,7 @@ public class ActivityRuntimeInfo implements ProgressCapable {
return result;
}
public Activity getActivity() {
public StandardActivity getActivity() {
return this.activity;
}

View File

@ -16,19 +16,16 @@
package io.nosqlbench.engine.core.lifecycle.commands;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.PrintWriter;
import java.io.Reader;
import java.util.Optional;
@Service(value = NBBaseCommand.class,selector = "example")
public class CMD_example extends NBBaseCommand {

View File

@ -16,19 +16,16 @@
package io.nosqlbench.engine.core.lifecycle.commands;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.PrintWriter;
import java.io.Reader;
import java.util.Optional;
@Service(value = NBBaseCommand.class,selector = "force_stop")
public class CMD_forceStop extends NBBaseCommand {

View File

@ -16,19 +16,16 @@
package io.nosqlbench.engine.core.lifecycle.commands;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.PrintWriter;
import java.io.Reader;
import java.util.Optional;
@Service(value = NBBaseCommand.class,selector = "start")
public class CMD_start extends NBBaseCommand {

View File

@ -16,7 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.commands;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
@ -44,7 +44,7 @@ public class CMD_stop extends NBBaseCommand {
= params.maybeGet("activity").orElseThrow(
() -> new RuntimeException("The stop command requires an 'activity' parameter")
);
Optional<Activity> activity = controller.getActivity(activityName);
Optional<StandardActivity> activity = controller.getActivity(activityName);
if (activity.isEmpty()) {
BasicError error = new BasicError("Activity '" + activityName + "' was not found for stop command.");
logger.warn(error);

View File

@ -15,12 +15,12 @@
*/
package io.nosqlbench.engine.core.lifecycle.scenario.container;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponentErrorHandler;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
@ -65,7 +65,7 @@ public class ContainerActivitiesController extends NBBaseComponent {
*
* @param activityDef string in alias=value1;driver=value2;... format
*/
public Activity start(ActivityDef activityDef) {
public StandardActivity start(ActivityDef activityDef) {
ActivityRuntimeInfo ari = doStartActivity(activityDef);
return ari.getActivity();
}
@ -73,7 +73,7 @@ public class ContainerActivitiesController extends NBBaseComponent {
private ActivityRuntimeInfo doStartActivity(ActivityDef activityDef) {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef, this);
StandardActivity activity = this.activityLoader.loadActivity(activityDef, this);
activity.initActivity();
ActivityExecutor executor = new ActivityExecutor(activity);
Future<ExecutionResult> startedActivity = executorService.submit(executor);
@ -91,9 +91,9 @@ public class ContainerActivitiesController extends NBBaseComponent {
*
* @param activityDefMap A map containing the activity definition
*/
public Activity start(Map<String, String> activityDefMap) {
public StandardActivity start(Map<String, String> activityDefMap) {
ActivityDef ad = new ActivityDef(new ParameterMap(activityDefMap));
Activity started = start(ad);
StandardActivity started = start(ad);
awaitAllThreadsOnline(started,30000L);
return started;
}
@ -104,7 +104,7 @@ public class ContainerActivitiesController extends NBBaseComponent {
*
* @param alias the alias of an activity that is already known to the scenario
*/
public Activity start(String alias) {
public StandardActivity start(String alias) {
return start(ActivityDef.parseActivityDef(alias));
}
@ -186,12 +186,12 @@ public class ContainerActivitiesController extends NBBaseComponent {
return runtimeInfo.awaitAllThreadsOnline(timeoutMs);
}
public synchronized void stop(Activity activity) {
public synchronized void stop(StandardActivity activity) {
stop(activity.getActivityDef());
}
public boolean awaitAllThreadsOnline(Activity activity, long timeoutMs) {
public boolean awaitAllThreadsOnline(StandardActivity activity, long timeoutMs) {
return awaitAllThreadsOnline(activity.getActivityDef(), timeoutMs);
}
@ -442,13 +442,13 @@ public class ContainerActivitiesController extends NBBaseComponent {
// ActivityMetrics.reportTo(System.out);
}
public Optional<Activity> getSoloActivity() {
public Optional<StandardActivity> getSoloActivity() {
if (this.getActivityExecutorMap().size()==1) {
return Optional.of(activityInfoMap.values().iterator().next().getActivity());
}
return Optional.empty();
}
public Optional<Activity> getActivity(String activityName) {
public Optional<StandardActivity> getActivity(String activityName) {
return Optional.ofNullable(this.activityInfoMap.get(activityName)).map(ActivityRuntimeInfo::getActivity);
}

View File

@ -53,7 +53,7 @@ class ActivityExecutorTest {
// ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-restart;cycles=1000;cyclerate=10;op=initdelay:initdelay=5000;");
// new ActivityTypeLoader().load(activityDef);
//
// final Activity activity = new DelayedInitActivity(activityDef);
// final StandardActivity activity = new DelayedInitActivity(activityDef);
// InputDispenser inputDispenser = new CoreInputDispenser(activity);
// ActionDispenser adisp = new CoreActionDispenser(activity);
// OutputDispenser tdisp = CoreServices.getOutputDispenser(activity).orElse(null);
@ -108,7 +108,7 @@ class ActivityExecutorTest {
Optional<StandardActivityType> standardActivityType = new ActivityTypeLoader().load(
activityDef, TestComponent.INSTANCE);
// Activity activity = new DelayedInitActivity(activityDef);
// StandardActivity activity = new DelayedInitActivity(activityDef);
ActivityWiring wiring = new ActivityWiring(activityDef);
StandardActivity activity = standardActivityType.get().getActivity(
activityDef, TestComponent.INSTANCE, wiring);

View File

@ -18,11 +18,8 @@ package io.nosqlbench.engine.core;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;

View File

@ -16,6 +16,7 @@
package io.nosqlbench.nbr.examples.injava;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
@ -23,7 +24,6 @@ import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
@ -107,7 +107,7 @@ public class NB_cocycledelay_bursty extends NBBaseCommand {
controller.waitMillis(500);
stdout.println("starting activity co_cycle_delay_bursty");
Activity activity = controller.start(co_cycle_delay_bursty);
StandardActivity activity = controller.start(co_cycle_delay_bursty);
controller.waitMillis(1000);
NBMetricTimer service_time_counter = container.find().topMetric("activity=co_cycle_delay_bursty,name=cycles_servicetime", NBMetricTimer.class);

View File

@ -16,7 +16,7 @@
package io.nosqlbench.nbr.examples.injava;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
@ -57,7 +57,7 @@ public class NB_readmetrics extends NBBaseCommand {
*/
@Override
public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) {
Activity activity = controller.start(Map.of(
StandardActivity activity = controller.start(Map.of(
"alias", "testactivity",
"driver", "diag",
"cycles", "0..1000000000",

View File

@ -16,7 +16,7 @@
package io.nosqlbench.nbr.examples.injava;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
@ -47,7 +47,7 @@ public class NB_threadchange extends NBBaseCommand {
@Override
public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) {
Activity activity = controller.start(
StandardActivity activity = controller.start(
"driver=diag;alias=threadchange;cycles=0..60000;threads=1;interval=2000;op='noop';rate=1000");
activity.getActivityDef().setThreads(1);
stdout.println("threads now " + activity.getActivityDef().getThreads());

View File

@ -16,9 +16,9 @@
package io.nosqlbench.scenarios.simframe;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.nb.api.components.events.ParamChange;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
@ -29,7 +29,7 @@ import java.util.concurrent.locks.LockSupport;
public class SimFrameUtils {
public static final String SIM_CYCLES = "sim_cycles";
public static void awaitActivity(Activity flywheel) {
public static void awaitActivity(StandardActivity flywheel) {
// await flywheel actually spinning, or timeout with error
NBMetricTimer result_success_timer = flywheel.find().timer("name:result_success");
for (int i = 0; i < 1000; i++) {
@ -44,12 +44,12 @@ public class SimFrameUtils {
}
}
public static Activity findFlywheelActivity(ContainerActivitiesController controller, String providedActivityName) {
Optional<Activity> optionalActivity = Optional.ofNullable(providedActivityName).flatMap(controller::getActivity);
public static StandardActivity findFlywheelActivity(ContainerActivitiesController controller, String providedActivityName) {
Optional<StandardActivity> optionalActivity = Optional.ofNullable(providedActivityName).flatMap(controller::getActivity);
if (providedActivityName!=null && optionalActivity.isEmpty()) {
throw new RuntimeException("you specified activity '" + providedActivityName + "' but it was not found.");
}
Activity flywheel = optionalActivity.or(controller::getSoloActivity)
StandardActivity flywheel = optionalActivity.or(controller::getSoloActivity)
.orElseThrow(() -> new RuntimeException("You didn't provide the name of an activity to attach to, nor was there a solo activity available in this context"));
// Start the flywheel at an "idle" speed, even if the user hasn't set it

View File

@ -16,14 +16,13 @@
package io.nosqlbench.scenarios.simframe.capture;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.scenarios.simframe.capture.SimFrameCapture;
public class SimFrameValueData extends SimFrameCapture {
public SimFrameValueData(Activity activity) {
public SimFrameValueData(StandardActivity activity) {
NBMetricTimer result_timer = activity.find().timer("name:result");
NBMetricTimer result_success_timer = activity.find().timer("name:result_success");
NBMetricGauge cyclerate_gauge = activity.find().gauge("name=config_cyclerate");

View File

@ -16,9 +16,9 @@
package io.nosqlbench.scenarios.simframe.optimizers;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.nb.annotations.Service;
@ -63,7 +63,7 @@ public class CMD_optimize extends NBBaseCommand {
@Override
public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) {
Activity flywheel = SimFrameUtils.findFlywheelActivity(controller, params.get("activity"));
StandardActivity flywheel = SimFrameUtils.findFlywheelActivity(controller, params.get("activity"));
stdout.println("starting analysis on activity '" + flywheel.getAlias() + "'");
SimFrameUtils.awaitActivity(flywheel);
SimFrameCapture capture = new SimFrameValueData(flywheel);

View File

@ -17,9 +17,9 @@
package io.nosqlbench.scenarios.simframe.optimizers;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
@ -60,11 +60,12 @@ public class CMD_reset extends NBBaseCommand {
*/
@Override
public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) {
Optional<Activity> optionalActivity = Optional.ofNullable(params.get("activity")).flatMap(controller::getActivity);
Optional<StandardActivity> optionalActivity =
Optional.ofNullable(params.get("activity")).flatMap(controller::getActivity);
if (params.get("activity")!=null && optionalActivity.isEmpty()) {
throw new RuntimeException("you specified activity '" + params.get("activity") + "' but it was not found.");
}
try (Activity flywheel = optionalActivity.or(controller::getSoloActivity)
try (StandardActivity flywheel = optionalActivity.or(controller::getSoloActivity)
.orElseThrow(() -> new RuntimeException("You didn't provide the name of an activity to attach to, nor was there a solo activity available in this context"))) {
params.forEach((key, value) -> {

View File

@ -16,9 +16,9 @@
package io.nosqlbench.scenarios.simframe.optimizers.findmax;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
@ -48,7 +48,7 @@ public class CMD_findmax extends NBBaseCommand {
@Override
public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) {
Activity flywheel = SimFrameUtils.findFlywheelActivity(controller, params.get("activity"));
StandardActivity flywheel = SimFrameUtils.findFlywheelActivity(controller, params.get("activity"));
stdout.println("starting analysis on activity '" + flywheel.getAlias() + "'");
SimFrameUtils.awaitActivity(flywheel);

View File

@ -16,8 +16,8 @@
package io.nosqlbench.scenarios.simframe.optimizers.findmax;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.scenarios.simframe.capture.SimFrameCapture;
import io.nosqlbench.scenarios.simframe.capture.SimFrameJournal;
@ -25,7 +25,7 @@ import io.nosqlbench.scenarios.simframe.planning.SimFrameFunction;
public class FindmaxFrameFunction implements SimFrameFunction<FindmaxFrameParams> {
private final Activity flywheel;
private final StandardActivity flywheel;
private final SimFrameCapture capture;
private final SimFrameJournal<FindmaxFrameParams> journal;
private final FindmaxConfig settings;
@ -35,7 +35,7 @@ public class FindmaxFrameFunction implements SimFrameFunction<FindmaxFrameParams
public FindmaxFrameFunction(
ContainerActivitiesController controller,
FindmaxConfig settings,
Activity flywheel,
StandardActivity flywheel,
SimFrameCapture capture,
SimFrameJournal<FindmaxFrameParams> journal,
FindmaxParamModel model

View File

@ -16,6 +16,7 @@
package io.nosqlbench.scenarios.simframe.optimizers.optimo;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBBufferedContainer;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBBaseCommand;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
@ -23,7 +24,6 @@ import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.nb.api.components.events.ParamChange;
import io.nosqlbench.nb.api.components.events.SetThreads;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
@ -74,7 +74,7 @@ public class CMD_optimo extends NBBaseCommand {
public Object invoke(NBCommandParams params, PrintWriter stdout, PrintWriter stderr, Reader stdin, ContainerActivitiesController controller) {
// TODO: having "scenario" here as well as in "named scenario" in workload templates is confusing. Make this clearer.
Activity flywheel = SimFrameUtils.findFlywheelActivity(controller, params.get("activity"));
StandardActivity flywheel = SimFrameUtils.findFlywheelActivity(controller, params.get("activity"));
stdout.println("starting analysis on activity '" + flywheel.getAlias() + "'");
SimFrameUtils.awaitActivity(flywheel);
@ -125,7 +125,7 @@ public class CMD_optimo extends NBBaseCommand {
// could be a better result if the range is arbitrarily limiting the parameter space.
}
private SimFrameCapture perfValueMeasures(Activity activity, OptimoSearchSettings settings) {
private SimFrameCapture perfValueMeasures(StandardActivity activity, OptimoSearchSettings settings) {
SimFrameCapture sampler = new SimFrameCapture();
NBMetricTimer result_timer = activity.find().timer("name:result");

View File

@ -16,8 +16,8 @@
package io.nosqlbench.scenarios.simframe.optimizers.optimo;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.scenarios.simframe.capture.SimFrameCapture;
import io.nosqlbench.scenarios.simframe.capture.SimFrameJournal;
@ -25,7 +25,7 @@ import io.nosqlbench.scenarios.simframe.planning.SimFrameFunction;
public class OptimoFrameFunction implements SimFrameFunction<OptimoFrameParams> {
private final Activity flywheel;
private final StandardActivity flywheel;
private final SimFrameCapture capture;
private final SimFrameJournal<OptimoFrameParams> journal;
private final OptimoSearchSettings settings;
@ -34,7 +34,7 @@ public class OptimoFrameFunction implements SimFrameFunction<OptimoFrameParams>
public OptimoFrameFunction(
ContainerActivitiesController controller,
OptimoSearchSettings settings,
Activity flywheel,
StandardActivity flywheel,
SimFrameCapture capture,
SimFrameJournal<OptimoFrameParams> journal
) {

View File

@ -16,9 +16,9 @@
package io.nosqlbench.scenarios.simframe.optimizers.planners.findmax;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@ -102,7 +102,7 @@ public class FindmaxPlanner extends SimFramePlanner<FindmaxConfig, FindmaxFrameP
}
@Override
public void applyParams(FindmaxFrameParams params, Activity flywheel) {
public void applyParams(FindmaxFrameParams params, StandardActivity flywheel) {
flywheel.onEvent(ParamChange.of(new CycleRateSpec(params.rate_shelf()+params.rate_delta(), 1.1d, SimRateSpec.Verb.restart)));
}

View File

@ -16,9 +16,9 @@
package io.nosqlbench.scenarios.simframe.optimizers.planners.ratchet;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@ -79,7 +79,7 @@ public class RatchetPlanner extends SimFramePlanner<RatchetConfig, RatchetFrameP
@Override
public void applyParams(RatchetFrameParams params, Activity flywheel) {
public void applyParams(RatchetFrameParams params, StandardActivity flywheel) {
flywheel.onEvent(ParamChange.of(new CycleRateSpec(params.rate(), 1.1d, SimRateSpec.Verb.restart)));
}

View File

@ -16,9 +16,9 @@
package io.nosqlbench.scenarios.simframe.optimizers.planners.rcurve;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@ -92,7 +92,7 @@ public class RCurvePlanner extends SimFramePlanner<RCurveConfig, RCurveFramePara
}
@Override
public void applyParams(RCurveFrameParams params, Activity flywheel) {
public void applyParams(RCurveFrameParams params, StandardActivity flywheel) {
flywheel.onEvent(ParamChange.of(new CycleRateSpec(params.rate(), 1.1d, SimRateSpec.Verb.restart)));
}

View File

@ -16,7 +16,7 @@
package io.nosqlbench.scenarios.simframe.planning;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.core.lifecycle.scenario.container.ContainerActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.container.NBCommandParams;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
@ -64,8 +64,8 @@ public abstract class SimFramePlanner<C,P extends Record> extends NBBaseComponen
*/
public abstract P nextStep(JournalView<P> journal);
public abstract void applyParams(P params, Activity activity);
public P analyze(Activity flywheel, SimFrameCapture capture, PrintWriter stdout, PrintWriter stderr, ContainerActivitiesController controller) {
public abstract void applyParams(P params, StandardActivity activity);
public P analyze(StandardActivity flywheel, SimFrameCapture capture, PrintWriter stdout, PrintWriter stderr, ContainerActivitiesController controller) {
var frameParams = initialStep();
while (frameParams != null) {