reorg Activity for clarity

This commit is contained in:
Jonathan Shook 2024-12-22 05:11:47 -06:00
parent e9a19968d8
commit 18a607cc43
12 changed files with 357 additions and 288 deletions

View File

@ -47,9 +47,9 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
this.slot = slot;
this.label = "tracker-" + slot + "_" + activity.getAlias();
this.pendingOpsCounter = activity.pendingOpsCounter;
this.cycleServiceTimer = activity.cycleServiceTimer;
this.cycleResponseTimer = activity.cycleResponseTimer;
this.pendingOpsCounter = activity.metrics.pendingOpsCounter;
this.cycleServiceTimer = activity.metrics.cycleServiceTimer;
this.cycleResponseTimer = activity.metrics.cycleResponseTimer;
}
// for testing

View File

@ -32,8 +32,8 @@ public class ActivityMetricProgressMeter implements ProgressMeterDisplay, Comple
public ActivityMetricProgressMeter(Activity activity) {
this.activity = activity;
this.startInstant = Instant.ofEpochMilli(activity.getStartedAtMillis());
this.bindTimer = activity.bindTimer;
this.cyclesTimer = activity.cycleServiceTimer;
this.bindTimer = activity.metrics.bindTimer;
this.cyclesTimer = activity.metrics.cycleServiceTimer;
}
@Override

View File

@ -29,8 +29,8 @@ public interface ExperimentalResultFilterType {
new SimpleServiceLoader<>(ExperimentalResultFilterType.class, Maturity.Any);
default IntPredicateDispenser getFilterDispenser(Activity activity) {
SimpleConfig conf = new SimpleConfig(activity.getWiring(), "resultfilter");
return getFilterDispenser(conf);
SimpleConfig conf = new SimpleConfig(activity, "resultfilter");
return getFilterDispenser(activity);
}
default IntPredicateDispenser getFilterDispenser(SimpleConfig conf) {

View File

@ -37,14 +37,23 @@ import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.api.activityimpl.Dryrun;
import io.nosqlbench.engine.api.activityimpl.OpFunctionComposition;
import io.nosqlbench.engine.api.activityimpl.OpLookupService;
import io.nosqlbench.engine.api.activityimpl.input.AtomicInput;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.engine.core.lifecycle.commands.CMD_await;
import io.nosqlbench.engine.core.lifecycle.commands.CMD_start;
import io.nosqlbench.engine.core.lifecycle.commands.CMD_stop;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.engine.core.lifecycle.session.NBSession;
import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
@ -94,18 +103,7 @@ public class Activity<R extends java.util.function.LongFunction, S>
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>, Space>> adapters = new ConcurrentHashMap<>();
protected final ActivityDef activityDef;
public final NBMetricCounter pendingOpsCounter;
public final NBMetricHistogram triesHistogram;
public final NBMetricTimer bindTimer;
public final NBMetricTimer executeTimer;
public final NBMetricTimer resultTimer;
public final NBMetricTimer resultSuccessTimer;
public final NBMetricTimer cycleServiceTimer;
public final NBMetricTimer inputTimer;
public final NBMetricTimer stridesServiceTimer;
public final NBMetricTimer stridesResponseTimer;
public final NBMetricTimer cycleResponseTimer;
public final ActivityMetrics metrics;
private ActivityMetricProgressMeter progressMeter;
private String workloadSource = "unspecified";
private RunState runState = RunState.Uninitialized;
@ -118,140 +116,28 @@ public class Activity<R extends java.util.function.LongFunction, S>
private final List<AutoCloseable> closeables = new ArrayList<>();
private PrintWriter console;
private ErrorMetrics errorMetrics;
private ActivityWiring wiring;
private Input input;
private StandardAction<?, ?> action;
private static final String WAIT_TIME = "_waittime";
private static final String RESPONSE_TIME = "_responsetime";
private static final String SERVICE_TIME = "_servicetime";
public Activity(NBComponent parent, ActivityDef activityDef) {
public Activity(NBComponent parent, ActivityDef activityDef, ActivityWiring wiring) {
super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels()));
super(
parent,
NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels())
);
this.activityDef = activityDef;
this.wiring = wiring;
this.pendingOpsCounter = create().counter(
"pending_ops",
MetricCategory.Core,
"Indicate the number of operations which have been started, but which have not been completed." +
" This starts "
);
/// The bind timer keeps track of how long it takes for NoSQLBench to create an instance
/// of an executable operation, given the cycle. This is usually done by using an
/// {@link OpSequence} in conjunction with
/// an {@link OpDispenser}. This is named for "binding
/// a cycle to an operation".
int hdrdigits = getHdrDigits();
this.bindTimer = create().timer(
"bind", hdrdigits, MetricCategory.Core,
"Time the step within a cycle which binds generated data to an op template to synthesize an executable operation."
);
/// The execute timer keeps track of how long it takes to submit an operation to be executed
/// to an underlying native driver. For asynchronous APIs, such as those which return a
/// {@link Future}, this is simply the amount of time it takes to acquire the future.
/// /// When possible, APIs should be used via their async methods, even if you are implementing
/// a {@link SyncAction}. This allows the execute timer to measure the hand-off to the underlying API,
/// and the result timer to measure the blocking calls to aquire the result.
this.executeTimer = create().timer(
"execute",
hdrdigits,
MetricCategory.Core,
"Time how long it takes to submit a request and receive a result, including reading the result in the client."
);
/// The cycles service timer measures how long it takes to complete a cycle of work.
this.cycleServiceTimer = create().timer(
"cycles" + SERVICE_TIME, hdrdigits, MetricCategory.Core,
"service timer for a cycle, including all of bind, execute, result and result_success;" + " service timers measure the time between submitting a request and receiving the response"
);
/// The result timer keeps track of how long it takes a native driver to service a request once submitted.
/// This timer, in contrast to the result-success timer ({@link #getOrCreateResultSuccessTimer()}),
/// is used to track all operations. That is, no matter
/// whether the operation succeeds or not, it should be tracked with this timer. The scope of this timer should
/// cover each attempt at an operation through a native driver. Retries are not to be combined in this measurement.
this.resultTimer = create().timer(
"result",
hdrdigits,
MetricCategory.Core,
"Time how long it takes to submit a request, receive a result, including binding, reading results, " +
"and optionally verifying them, including all operations whether successful or not, for each attempted request."
);
/// The result-success timer keeps track of operations which had no exception. The measurements for this timer should
/// be exactly the same values as used for the result timer ({@link #getOrCreateResultTimer()}, except that
/// attempts to complete an operation which yield an exception should be excluded from the results. These two metrics
/// together provide a very high level sanity check against the error-specific metrics which can be reported by
/// the error handler logic.
this.resultSuccessTimer = create().timer(
"result_success",
hdrdigits,
MetricCategory.Core,
"The execution time of successful operations, which includes submitting the operation, waiting for a response, and reading the result"
);
/// The input timer measures how long it takes to get the cycle value to be used for
/// an operation.
this.inputTimer = create().timer(
"read_input", getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3),
MetricCategory.Internals,
"measures overhead of acquiring a cycle range for an activity thread"
);
/// The strides service timer measures how long it takes to complete a stride of work.
this.stridesServiceTimer = create().timer(
"strides", getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3),
MetricCategory.Core,
"service timer for a stride, which is the same as the op sequence length by default"
);
if (null != getStrideLimiter()) {
/// The strides response timer measures the total response time from the scheduled
/// time a stride should start to when it completed. Stride scheduling is only defined
/// when it is implied by a stride rate limiter, so this method should return null if
/// there is no strides rate limiter.
this.stridesResponseTimer = create().timer(
"strides" + RESPONSE_TIME, hdrdigits, MetricCategory.Core,
"response timer for a stride, which is the same as the op sequence length by default;" + " response timers include scheduling delays which occur when an activity falls behind its target rate"
);
} else {
stridesResponseTimer=null;
}
/**
* The cycles response timer measures the total response time from the scheduled
* time an operation should start to when it is completed. Cycle scheduling is only defined
* when it is implied by a cycle rate limiter, so this method should return null if
* there is no cycles rate limiter.
* @return a new or existing {@link Timer} if appropriate, else null
*/
if (null != getCycleLimiter()) {
this.cycleResponseTimer = create().timer(
"cycles" + RESPONSE_TIME, hdrdigits, MetricCategory.Core,
"response timer for a cycle, including all of bind, execute, result and result_success;" + " response timers include scheduling delays which occur when an activity falls behind its target rate"
);
} else {
cycleResponseTimer=null;
}
this.metrics = new ActivityMetrics(this);
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload",
"yaml"
);
"workload", "yaml");
if (workloadOpt.isPresent()) {
activityDef.getParams().set("alias", workloadOpt.get());
} else {
activityDef.getParams().set("alias",
activityDef.getActivityDriver().toUpperCase(Locale.ROOT)
+ nameEnumerator);
activityDef.getParams().set(
"alias", activityDef.getActivityDriver().toUpperCase(
Locale.ROOT) + nameEnumerator
);
nameEnumerator++;
}
}
@ -271,13 +157,13 @@ public class Activity<R extends java.util.function.LongFunction, S>
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
Optional<DriverAdapter<?, ?>> defaultAdapter = defaultDriverName.flatMap(
name -> ServiceSelector.of(
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
l -> l.load(this, NBLabels.forKV()));
name -> ServiceSelector.of(
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
l -> l.load(this, NBLabels.forKV()));
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
throw new BasicError(
"Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this driver adapter. " + "Change '<activeByDefault>false</activeByDefault>' for the driver in " + "'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml' first.");
"Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this driver adapter. " + "Change '<activeByDefault>false</activeByDefault>' for the driver in " + "'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml' first.");
}
// HERE, op templates are loaded before drivers are loaded
@ -288,8 +174,8 @@ public class Activity<R extends java.util.function.LongFunction, S>
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
List<ParsedOp> allParsedOps = loadOpTemplates(
defaultAdapter.orElse(null), false, false).stream().map(ot -> upconvert(
ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)).toList();
defaultAdapter.orElse(null), false, false).stream().map(ot -> upconvert(
ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)).toList();
OpLookup lookup = new OpLookupService(() -> allParsedOps);
@ -298,11 +184,11 @@ public class Activity<R extends java.util.function.LongFunction, S>
if (defaultDriverOption.isPresent()) {
long matchingDefault = mappers.keySet().stream().filter(
n -> n.equals(defaultDriverOption.get())).count();
n -> n.equals(defaultDriverOption.get())).count();
if (0 == matchingDefault) {
logger.warn(
"All op templates used a different driver than the default '{}'",
defaultDriverOption.get()
"All op templates used a different driver than the default '{}'",
defaultDriverOption.get()
);
}
}
@ -314,53 +200,40 @@ public class Activity<R extends java.util.function.LongFunction, S>
throw e;
}
throw new OpConfigError(
"Error mapping workload template to operations: " + e.getMessage(), null, e);
"Error mapping workload template to operations: " + e.getMessage(), null, e);
}
create().gauge(
"ops_pending", () -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core,
"The current number of operations which have not been dispatched for processing yet."
"ops_pending", () -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core,
"The current number of operations which have not been dispatched for processing yet."
);
create().gauge(
"ops_active", () -> this.getProgressMeter().getSummary().current(), MetricCategory.Core,
"The current number of operations which have been dispatched for processing, but which have not yet completed."
"ops_active", () -> this.getProgressMeter().getSummary().current(),
MetricCategory.Core,
"The current number of operations which have been dispatched for processing, but which have not yet completed."
);
create().gauge(
"ops_complete", () -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core, "The current number of operations which have been completed"
"ops_complete", () -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core, "The current number of operations which have been completed"
);
/// The tries histogram tracks how many tries it takes to complete an operation successfully, or not. This histogram
/// does not encode whether operations were successful or not. Ideally, if every attempt to complete an operation succeeds
/// on its first try, the data in this histogram should all be 1. In practice, systems which are running near their
/// capacity will see a few retried operations, and systems that are substantially over-driven will see many retried
/// operations. As the retries value increases the further down the percentile scale you go, you can detect system loading
/// patterns which are in excess of the real-time capability of the target system.
/// This metric should be measured around every retry loop for a native operation.
this.triesHistogram = create().histogram(
"tries",
hdrdigits,
MetricCategory.Core,
"A histogram of all tries for an activity. Perfect results mean all quantiles return 1." +
" Slight saturation is indicated by p99 or p95 returning higher values." +
" Lower quantiles returning more than 1, or higher values at high quantiles indicate incremental overload."
);
}
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops,
OpLookup opLookup
) {
return createOpSourceFromParsedOps2(adapters, pops, opLookup);
}
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps2(
// Map<String, DriverAdapter<?,?>> adapterCache,
// Map<String, OpMapper<? extends Op>> mapperCache,
List<DriverAdapter<CycleOp<?>, Space>> adapters,
List<ParsedOp> pops,
OpLookup opLookup
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops,
OpLookup opLookup
) {
try {
@ -371,11 +244,10 @@ public class Activity<R extends java.util.function.LongFunction, S>
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(sequencerType);
SequencerType sequencerType = getParams().getOptionalString("seq").map(
SequencerType::valueOf).orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(
sequencerType);
for (int i = 0; i < pops.size(); i++) {
long ratio = ratios.get(i);
@ -390,24 +262,23 @@ public class Activity<R extends java.util.function.LongFunction, S>
DriverAdapter<CycleOp<?>, Space> adapter = adapters.get(i);
OpMapper<CycleOp<?>, Space> opMapper = adapter.getOpMapper();
LongFunction<Space> spaceFunc = adapter.getSpaceFunc(pop);
OpDispenser<? extends CycleOp<?>> dispenser = opMapper.apply(this, pop, spaceFunc);
OpDispenser<? extends CycleOp<?>> dispenser = opMapper.apply(
this, pop, spaceFunc);
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun");
dispenser = OpFunctionComposition.wrapOptionally(
adapter,
dispenser,
pop,
dryrun,
opLookup
);
adapter, dispenser, pop, dryrun, opLookup);
// if (strict) {
// optemplate.assertConsumed();
// }
planner.addOp((OpDispenser<? extends CycleOp<?>>) dispenser, ratio);
} catch (Exception e) {
throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e);
throw new OpConfigError(
"Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(),
e
);
}
}
@ -431,39 +302,39 @@ public class Activity<R extends java.util.function.LongFunction, S>
private ParsedOp upconvert(
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
NBConfigModel supersetConfig,
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
List<DriverAdapter<CycleOp<?>, Space>> adapterlist
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
NBConfigModel supersetConfig,
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
List<DriverAdapter<CycleOp<?>, Space>> adapterlist
) {
// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
String driverName = ot.getOptionalStringParam("driver", String.class).or(
() -> ot.getOptionalStringParam("type", String.class)).or(
() -> defaultDriverOption).orElseThrow(
() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
() -> ot.getOptionalStringParam("type", String.class)).or(
() -> defaultDriverOption).orElseThrow(
() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
DriverAdapter<CycleOp<?>, Space> adapter = adapters.computeIfAbsent(
driverName, dn -> loadAdapter(
dn, yamlmodel, supersetConfig, mappers));
driverName, dn -> loadAdapter(
dn, yamlmodel, supersetConfig, mappers));
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(
ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
return pop;
}
private DriverAdapter<CycleOp<?>, Space> loadAdapter(
String driverName, NBConfigModel yamlmodel, NBConfigModel supersetConfig,
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
String driverName, NBConfigModel yamlmodel, NBConfigModel supersetConfig,
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
) {
DriverAdapter<CycleOp<?>, Space> adapter = Optional.of(driverName).flatMap(
name -> ServiceSelector.of(
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
l -> l.load(this, NBLabels.forKV())).orElseThrow(
() -> new OpConfigError("driver adapter not present for name '" + driverName + "'"));
name -> ServiceSelector.of(
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
l -> l.load(this, NBLabels.forKV())).orElseThrow(() -> new OpConfigError(
"driver adapter not present for name '" + driverName + "'"));
NBConfigModel combinedModel = yamlmodel;
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
@ -536,7 +407,7 @@ public class Activity<R extends java.util.function.LongFunction, S>
@Override
public List<OpTemplate> getSyntheticOpTemplates(
OpsDocList opsDocList, Map<String, Object> cfg) {
OpsDocList opsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
@ -584,16 +455,13 @@ public class Activity<R extends java.util.function.LongFunction, S>
}
protected List<OpTemplate> loadOpTemplates(
DriverAdapter<?, ?> defaultDriverAdapter,
boolean logged,
boolean filtered
) {
DriverAdapter<?, ?> defaultDriverAdapter, boolean logged, boolean filtered) {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
OpsDocList opsDocList = loadStmtsDocList();
List<OpTemplate> filteredOps = opsDocList.getOps(filtered?tagfilter:"", logged);
List<OpTemplate> filteredOps = opsDocList.getOps(filtered ? tagfilter : "", logged);
if (filteredOps.isEmpty()) {
// There were no ops, and it *wasn't* because they were all filtered out.
@ -602,24 +470,24 @@ public class Activity<R extends java.util.function.LongFunction, S>
// There were no ops, and it was because they were all filtered out
List<OpTemplate> unfilteredOps = opsDocList.getOps(false);
if (!unfilteredOps.isEmpty()) {
String message = "There were no active op templates with tag filter '"+ tagfilter + "', since all " +
unfilteredOps.size() + " were filtered out. Examine the session log for details";
String message = "There were no active op templates with tag filter '" + tagfilter + "', since all " + unfilteredOps.size() + " were filtered out. Examine the session log for details";
NBAdvisorOutput.test(message);
//throw new BasicError(message);
}
if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
filteredOps = sotp.getSyntheticOpTemplates(opsDocList, this.activityDef.getParams());
filteredOps = sotp.getSyntheticOpTemplates(
opsDocList, this.activityDef.getParams());
Objects.requireNonNull(filteredOps);
if (filteredOps.isEmpty()) {
throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' +
" but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
throw new BasicError(
"Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' + " but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
}
} else {
throw new BasicError("""
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
}
}
return filteredOps;
@ -651,16 +519,14 @@ public class Activity<R extends java.util.function.LongFunction, S>
} else {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
);
"You specified cycles, but the range specified means zero cycles: " + getParams().get(
"cycles"));
}
long stride = getParams().getOptionalLong("stride").orElseThrow();
long cycles = this.activityDef.getCycleCount();
if (cycles < stride) {
throw new RuntimeException(
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
" If this was intended, then set stride low enough to allow it."
);
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." + " If this was intended, then set stride low enough to allow it.");
}
}
@ -668,8 +534,8 @@ public class Activity<R extends java.util.function.LongFunction, S>
long stride = this.activityDef.getParams().getOptionalLong("stride").orElseThrow();
if (0 < stride && 0 != cycleCount % stride) {
logger.warn(() -> "The stride does not evenly divide cycles. Only full strides will be executed," +
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
logger.warn(
() -> "The stride does not evenly divide cycles. Only full strides will be executed," + "leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
}
Optional<String> threadSpec = activityDef.getParams().getOptionalString("threads");
@ -680,7 +546,10 @@ public class Activity<R extends java.util.function.LongFunction, S>
int threads = processors * 10;
if (threads > activityDef.getCycleCount()) {
threads = (int) activityDef.getCycleCount();
logger.info("setting threads to {} (auto) [10xCORES, cycle count limited]", threads);
logger.info(
"setting threads to {} (auto) [10xCORES, cycle count limited]",
threads
);
} else {
logger.info("setting threads to {} (auto) [10xCORES]", threads);
}
@ -699,20 +568,18 @@ public class Activity<R extends java.util.function.LongFunction, S>
}
if (activityDef.getThreads() > activityDef.getCycleCount()) {
logger.warn(() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary()
+ ", you should have more cycles than threads.");
logger.warn(
() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary() + ", you should have more cycles than threads.");
}
} else if (1000 < cycleCount) {
logger.warn(() -> "For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
logger.warn(
() -> "For testing at scale, it is highly recommended that you " + "set threads to a value higher than the default of 1." + " hint: you can use threads=auto for reasonable default, or" + " consult the topic on threads with `help threads` for" + " more information.");
}
if (0 < this.activityDef.getCycleCount() && seq.getOps().isEmpty()) {
throw new BasicError("You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity.");
throw new BasicError(
"You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity.");
}
}
@ -744,10 +611,11 @@ public class Activity<R extends java.util.function.LongFunction, S>
*/
@Deprecated(forRemoval = true)
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(
Function<OpTemplate,
OpDispenser<? extends O>> opinit, boolean strict, DriverAdapter<?, ?> defaultAdapter) {
Function<OpTemplate, OpDispenser<? extends O>> opinit, boolean strict,
DriverAdapter<?, ?> defaultAdapter
) {
List<OpTemplate> stmts = loadOpTemplates(defaultAdapter,true,false);
List<OpTemplate> stmts = loadOpTemplates(defaultAdapter, true, false);
List<Long> ratios = new ArrayList<>(stmts.size());
@ -756,10 +624,8 @@ public class Activity<R extends java.util.function.LongFunction, S>
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencerType sequencerType = getParams().getOptionalString("seq").map(
SequencerType::valueOf).orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
@ -784,36 +650,42 @@ public class Activity<R extends java.util.function.LongFunction, S>
try {
String op = activityDef.getParams().getOptionalString("op").orElse(null);
String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(null);
String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(
null);
String workload = activityDef.getParams().getOptionalString("workload").orElse(null);
if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) {
throw new OpConfigError("Only op, statement, or workload may be provided, not more than one.");
throw new OpConfigError(
"Only op, statement, or workload may be provided, not more than one.");
}
if (workload != null && OpsLoader.isJson(workload)) {
workloadSource = "commandline: (workload/json):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.json, activityDef.getParams(), null);
return OpsLoader.loadString(
workload, OpTemplateFormat.json, activityDef.getParams(), null);
} else if (workload != null && OpsLoader.isYaml(workload)) {
workloadSource = "commandline: (workload/yaml):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.yaml, activityDef.getParams(), null);
return OpsLoader.loadString(
workload, OpTemplateFormat.yaml, activityDef.getParams(), null);
} else if (workload != null) {
return OpsLoader.loadPath(workload, activityDef.getParams(), "activities");
}
if (stmt != null) {
workloadSource = "commandline: (stmt/inline): '" + stmt + "'";
return OpsLoader.loadString(stmt, OpTemplateFormat.inline, activityDef.getParams(), null);
return OpsLoader.loadString(
stmt, OpTemplateFormat.inline, activityDef.getParams(), null);
}
if (op != null && OpsLoader.isJson(op)) {
workloadSource = "commandline: (op/json): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null);
}
else if (op != null) {
return OpsLoader.loadString(
op, OpTemplateFormat.json, activityDef.getParams(), null);
} else if (op != null) {
workloadSource = "commandline: (op/inline): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null);
return OpsLoader.loadString(
op, OpTemplateFormat.inline, activityDef.getParams(), null);
}
return OpsDocList.none();
@ -858,16 +730,17 @@ public class Activity<R extends java.util.function.LongFunction, S>
// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
activityDef.getParams().getOptionalNamedParameter("striderate").map(
StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate").map(
CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
}
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(
this, strideLimiterSource, spec);
}
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
@ -881,7 +754,7 @@ public class Activity<R extends java.util.function.LongFunction, S>
* @return the cycle {@link RateLimiter}
*/
public RateLimiter getCycleLimiter() {
if (cycleLimiterSource!=null) {
if (cycleLimiterSource != null) {
return cycleLimiterSource.get();
} else {
return null;
@ -894,7 +767,7 @@ public class Activity<R extends java.util.function.LongFunction, S>
* @return The stride {@link RateLimiter}
*/
public synchronized RateLimiter getStrideLimiter() {
if (strideLimiterSource!=null) {
if (strideLimiterSource != null) {
return strideLimiterSource.get();
} else {
return null;
@ -911,7 +784,7 @@ public class Activity<R extends java.util.function.LongFunction, S>
@Override
public Map<String, String> asResult() {
return Map.of("activity",this.getActivityDef().getAlias());
return Map.of("activity", this.getActivityDef().getAlias());
}
/**
@ -928,15 +801,17 @@ public class Activity<R extends java.util.function.LongFunction, S>
public synchronized NBErrorHandler getErrorHandler() {
if (null == this.errorHandler) {
errorHandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics);
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics
);
}
return errorHandler;
}
public void closeAutoCloseables() {
for (AutoCloseable closeable : closeables) {
logger.debug(() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
logger.debug(
() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
try {
closeable.close();
} catch (Exception e) {
@ -986,4 +861,28 @@ public class Activity<R extends java.util.function.LongFunction, S>
return getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3);
}
@Override
public Motor getMotor(ActivityDef activityDef, int slot) {
return new CoreMotor(this,slot,getInput(),getAction(),getOutput());
}
public synchronized Input getInput() {
if (input==null) {
this.input = new AtomicInput(this,this.getActivityDef());
}
return this.input;
}
public synchronized SyncAction getAction() {
if (this.action==null) {
this.action = new StandardAction(this);
}
return this.action;
}
public synchronized Output getOutput() {
// TODO: Implement this as optional, only composing the optional behavior if required
return null;
}
}

View File

@ -0,0 +1,175 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
/*
* Copyright (c) nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.codahale.metrics.Timer;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import java.util.concurrent.Future;
import java.util.function.LongFunction;
public class ActivityMetrics {
public static final String WAIT_TIME = "_waittime";
public static final String RESPONSE_TIME = "_responsetime";
public static final String SERVICE_TIME = "_servicetime";
public final Activity<?,?> activity;
public final int hdrdigits;
public NBMetricCounter pendingOpsCounter;
public NBMetricTimer bindTimer;
public NBMetricTimer executeTimer;
public NBMetricTimer cycleServiceTimer;
public NBMetricTimer resultTimer;
public NBMetricTimer resultSuccessTimer;
public NBMetricTimer inputTimer;
public NBMetricTimer stridesServiceTimer;
public NBMetricTimer stridesResponseTimer;
public NBMetricTimer cycleResponseTimer;
public NBMetricHistogram triesHistogram;
public <S, R extends LongFunction> ActivityMetrics(Activity<?,?> activity) {
this.activity = activity;
this.hdrdigits = activity.getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3);
initMetrics();
}
private void initMetrics() {
this.pendingOpsCounter = activity.create().counter(
"pending_ops", MetricCategory.Core,
"Indicate the number of operations which have been started, but which have not been completed." + " This starts "
);
/// The bind timer keeps track of how long it takes for NoSQLBench to create an instance
/// of an executable operation, given the cycle. This is usually done by using an
/// {@link OpSequence} in conjunction with
/// an {@link OpDispenser}. This is named for "binding
/// a cycle to an operation".
this.bindTimer = activity.create().timer(
"bind", hdrdigits, MetricCategory.Core,
"Time the step within a cycle which binds generated data to an op template to synthesize an executable operation."
);
/// The execute timer keeps track of how long it takes to submit an operation to be executed
/// to an underlying native driver. For asynchronous APIs, such as those which return a
/// {@link Future}, this is simply the amount of time it takes to acquire the future.
/// /// When possible, APIs should be used via their async methods, even if you are implementing
/// a {@link SyncAction}. This allows the execute timer to measure the hand-off to the underlying API,
/// and the result timer to measure the blocking calls to aquire the result.
this.executeTimer = activity.create().timer(
"execute", hdrdigits, MetricCategory.Core,
"Time how long it takes to submit a request and receive a result, including reading the result in the client."
);
/// The cycles service timer measures how long it takes to complete a cycle of work.
this.cycleServiceTimer = activity.create().timer(
"cycles" + SERVICE_TIME, hdrdigits, MetricCategory.Core,
"service timer for a cycle, including all of bind, execute, result and result_success;" + " service timers measure the time between submitting a request and receiving the response"
);
/// The result timer keeps track of how long it takes a native driver to service a request once submitted.
/// This timer, in contrast to the result-success timer ({@link #getOrCreateResultSuccessTimer()}),
/// is used to track all operations. That is, no matter
/// whether the operation succeeds or not, it should be tracked with this timer. The scope of this timer should
/// cover each attempt at an operation through a native driver. Retries are not to be combined in this measurement.
this.resultTimer = activity.create().timer(
"result", hdrdigits, MetricCategory.Core,
"Time how long it takes to submit a request, receive a result, including binding, reading results, " + "and optionally verifying them, including all operations whether successful or not, for each attempted request."
);
/// The result-success timer keeps track of operations which had no exception. The measurements for this timer should
/// be exactly the same values as used for the result timer ({@link #getOrCreateResultTimer()}, except that
/// attempts to complete an operation which yield an exception should be excluded from the results. These two metrics
/// together provide a very high level sanity check against the error-specific metrics which can be reported by
/// the error handler logic.
this.resultSuccessTimer = activity.create().timer(
"result_success", hdrdigits, MetricCategory.Core,
"The execution time of successful operations, which includes submitting the operation, waiting for a response, and reading the result"
);
/// The input timer measures how long it takes to get the cycle value to be used for
/// an operation.
this.inputTimer = activity.create().timer(
"read_input", activity.getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3),
MetricCategory.Internals,
"measures overhead of acquiring a cycle range for an activity thread"
);
/// The strides service timer measures how long it takes to complete a stride of work.
this.stridesServiceTimer = activity.create().timer(
"strides", activity.getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3),
MetricCategory.Core,
"service timer for a stride, which is the same as the op sequence length by default"
);
if (null != activity.getStrideLimiter()) {
/// The strides response timer measures the total response time from the scheduled
/// time a stride should start to when it completed. Stride scheduling is only defined
/// when it is implied by a stride rate limiter, so this method should return null if
/// there is no strides rate limiter.
this.stridesResponseTimer = activity.create().timer(
"strides" + RESPONSE_TIME, hdrdigits, MetricCategory.Core,
"response timer for a stride, which is the same as the op sequence length by default;" + " response timers include scheduling delays which occur when an activity falls behind its target rate"
);
} else {
stridesResponseTimer = null;
}
/**
* The cycles response timer measures the total response time from the scheduled
* time an operation should start to when it is completed. Cycle scheduling is only defined
* when it is implied by a cycle rate limiter, so this method should return null if
* there is no cycles rate limiter.
* @return a new or existing {@link Timer} if appropriate, else null
*/
if (null != activity.getCycleLimiter()) {
this.cycleResponseTimer = activity.create().timer(
"cycles" + RESPONSE_TIME, hdrdigits, MetricCategory.Core,
"response timer for a cycle, including all of bind, execute, result and result_success;" + " response timers include scheduling delays which occur when an activity falls behind its target rate"
);
} else {
cycleResponseTimer = null;
}
/// The tries histogram tracks how many tries it takes to complete an operation successfully, or not. This histogram
/// does not encode whether operations were successful or not. Ideally, if every attempt to complete an operation succeeds
/// on its first try, the data in this histogram should all be 1. In practice, systems which are running near their
/// capacity will see a few retried operations, and systems that are substantially over-driven will see many retried
/// operations. As the retries value increases the further down the percentile scale you go, you can detect system loading
/// patterns which are in excess of the real-time capability of the target system.
/// This metric should be measured around every retry loop for a native operation.
this.triesHistogram = activity.create().histogram(
"tries", hdrdigits, MetricCategory.Core,
"A histogram of all tries for an activity. Perfect results mean all quantiles return 1." + " Slight saturation is indicated by p99 or p95 returning higher values." + " Lower quantiles returning more than 1, or higher values at high quantiles indicate incremental overload."
);
}
}

View File

@ -28,6 +28,6 @@ public class StandardActionDispenser implements ActionDispenser {
@Override
public StandardAction<?,?> getAction(int slot) {
return new StandardAction<>(activity, slot);
return new StandardAction<>(activity);
}
}

View File

@ -75,7 +75,7 @@ public class StandardActivityType<A extends Activity<?,?>> {
if (activityDef.getParams().getOptionalString("async").isPresent())
throw new RuntimeException("This driver does not support async mode yet.");
return (A) new Activity(parent, activityDef, wiring);
return (A) new Activity(parent, activityDef);
}
/**
@ -100,7 +100,7 @@ public class StandardActivityType<A extends Activity<?,?>> {
) {
// final A activity = this.getActivity(activityDef, parent);
ActivityWiring wiring = new ActivityWiring(activityDef);
Activity activity = new Activity(parent, activityDef, wiring);
Activity activity = new Activity(parent, activityDef);
final InputDispenser inputDispenser = this.getInputDispenser(activity);
if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities);

View File

@ -56,7 +56,7 @@ public class StandardAction<A extends Activity<R, ?>, R extends java.util.functi
private final A activity;
public NBMetricHistogram triesHistogram;
public StandardAction(A activity, int slot) {
public StandardAction(A activity) {
super(activity, NBLabels.forKV("action", StandardAction.class.getSimpleName()));
this.activity = activity;
this.opsequence = activity.getOpSequence();
@ -82,7 +82,7 @@ public class StandardAction<A extends Activity<R, ?>, R extends java.util.functi
OpDispenser<? extends CycleOp<?>> dispenser = null;
CycleOp op = null;
try (Timer.Context ct = activity.bindTimer.time()) {
try (Timer.Context ct = activity.metrics.bindTimer.time()) {
dispenser = opsequence.apply(cycle);
op = dispenser.getOp(cycle);
} catch (Exception e) {
@ -103,7 +103,7 @@ public class StandardAction<A extends Activity<R, ?>, R extends java.util.functi
dispenser.onStart(cycle);
try (Timer.Context ct = activity.executeTimer.time()) {
try (Timer.Context ct = activity.metrics.executeTimer.time()) {
result = op.apply(cycle);
// TODO: break out validation timer from execute
try (Timer.Context ignored = verifierTimer.time()) {
@ -127,9 +127,9 @@ public class StandardAction<A extends Activity<R, ?>, R extends java.util.functi
error = e;
} finally {
long nanos = System.nanoTime() - startedAt;
activity.resultTimer.update(nanos, TimeUnit.NANOSECONDS);
activity.metrics.resultTimer.update(nanos, TimeUnit.NANOSECONDS);
if (error == null) {
activity.resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
activity.metrics.resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
dispenser.onSuccess(cycle, nanos);
break;
} else {

View File

@ -16,6 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.activity;
import com.codahale.metrics.Gauge;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.Activity;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
@ -84,7 +85,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
public ActivityExecutor(Activity activity) {
this.activity = activity;
this.activityDef = activity.getActivityDef();
this.motorSource = activity.getWiring().getMotorDispenserDelegate();
this.motorSource = activity;
activity.getActivityDef().getParams().addListener(this);
this.tally = activity.getRunStateTally();
}

View File

@ -151,8 +151,7 @@ class ActivityExecutorTest {
new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
ActivityWiring wiring = new ActivityWiring(activityDef);
Activity activity = new Activity(
TestComponent.INSTANCE, activityDef, wiring);
Activity activity = new Activity(TestComponent.INSTANCE, activityDef);
final InputDispenser inputDispenser = new CoreInputDispenser(activity);
final ActionDispenser actionDispenser = new CoreActionDispenser(wiring);
@ -166,10 +165,7 @@ class ActivityExecutorTest {
wiring.setInputDispenserDelegate(inputDispenser);
wiring.setMotorDispenserDelegate(motorDispenser);
Activity simpleActivity = new Activity<>(
TestComponent.INSTANCE,
activityDef, wiring
);
Activity simpleActivity = new Activity<>(TestComponent.INSTANCE, activityDef);
ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity);
activityDef.setThreads(5);
ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(
@ -207,8 +203,7 @@ class ActivityExecutorTest {
return new MotorDispenser<>() {
@Override
public Motor getMotor(final ActivityDef activityDef, final int slotId) {
final Activity activity = new Activity(
TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef));
final Activity activity = new Activity(TestComponent.INSTANCE, activityDef);
final Motor<?> cm = new CoreMotor<>(activity, slotId, ls, lc, null);
return cm;
}
@ -235,7 +230,7 @@ class ActivityExecutorTest {
private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class);
public DelayedInitActivity(final ActivityDef activityDef) {
super(TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef));
super(TestComponent.INSTANCE, activityDef);
}
@Override

View File

@ -17,7 +17,6 @@
package io.nosqlbench.engine.core;
import io.nosqlbench.engine.api.activityimpl.uniform.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Motor;
@ -38,7 +37,7 @@ public class CoreMotorTest {
public void testBasicActivityMotor() {
ActivityDef activityDef = ActivityDef.parseActivityDef("alias=foo");
final Activity activity = new Activity<>(
new TestComponent("testing", "coremotor"), activityDef, ActivityWiring.of(activityDef));
new TestComponent("testing", "coremotor"), activityDef);
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final AtomicLong observableAction = new AtomicLong(-3L);
SyncAction action = this.getTestConsumer(observableAction);
@ -61,7 +60,7 @@ public class CoreMotorTest {
public void testIteratorStride() {
ActivityDef activityDef = ActivityDef.parseActivityDef("stride=3");
Activity activity = new Activity(
TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef));
TestComponent.INSTANCE, activityDef);
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final AtomicLongArray ary = new AtomicLongArray(10);
final SyncAction a1 = this.getTestArrayConsumer(ary);