this commit fully replaces SimpleActivity with StandardActivity

This commit is contained in:
Jonathan Shook
2024-12-19 13:10:08 -06:00
parent 8c62576ff1
commit 862ea8fd3b
38 changed files with 1310 additions and 1322 deletions

View File

@@ -114,6 +114,10 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
@Override
public NBComponent detachChild(NBComponent... children) {
for (NBComponent child : children) {
logger.debug(() -> "notifyinb before detaching " + child.description() + " from " + this.description());
child.beforeDetach();
}
for (NBComponent child : children) {
logger.debug(() -> "detaching " + child.description() + " from " + this.description());
this.children.remove(child);
@@ -140,11 +144,28 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
}
/// Override this method when you need to do some action within the active
/// component tree after the parent decides to detach your component, but before
/// your component loses access to the live component tree.
@Override
public void beforeDetach() {
logger.debug("before detach " + description());
}
/// The [java.io.Closeable] and [AutoCloseable] behaviors of components are
/// explicitly managed within the core [NBComponent] implementation. Thus, components can not
/// override this method, to ensure that subtype behaviors are not orphaned. The way you can
/// add a _close_ behavior is to implement [#teardown()].
///
/// During component tree unwinding, each component does the following in order:
/// 1. Changes state to [NBInvokableState#CLOSING]
/// 2. calls [#close()] on every child.
/// 3. calls [#beforeDetach()]] on every child
/// 4. detaches every child.
/// 5. calls [#teardown()] on itself
///
/// This happens recursively, and is mediated by the [#close()] method itself.
@Override
public final void close() throws RuntimeException {
state = (state == NBInvokableState.ERRORED) ? state : NBInvokableState.CLOSING;

View File

@@ -29,7 +29,9 @@ public class NBComponentExecutionScope implements AutoCloseable {
@Override
public void close() throws RuntimeException {
for (NBComponent component : components) {
component.beforeDetach();
// This is now handled inline with [NBComponent#detachChild], which puts it after the
// out of scope notification -- this might need testing adjustments or clarification
// component.beforeDetach();
component.onEvent(new ComponentOutOfScope(component));
NBComponent parent = component.getParent();
if (parent!=null) {

View File

@@ -29,5 +29,5 @@ public interface ActionDispenser {
* @param slot The numbered slot within the activity instance for this action.
* @return A new or cached Action for the specified slot.
*/
Action getAction(int slot);
SyncAction getAction(int slot);
}

View File

@@ -16,26 +16,25 @@
package io.nosqlbench.engine.api.activityapi.core;
import com.codahale.metrics.Counting;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.function.Supplier;
/**
* Provides the components needed to build and run an activity a runtime.
* The easiest way to build a useful Activity is to extend {@link SimpleActivity}.
* The easiest way to build a useful Activity is to extend {@link StandardActivity}.
*/
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent {
@@ -64,26 +63,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
*/
void closeAutoCloseables();
MotorDispenser<?> getMotorDispenserDelegate();
void setMotorDispenserDelegate(MotorDispenser<?> motorDispenser);
InputDispenser getInputDispenserDelegate();
void setInputDispenserDelegate(InputDispenser inputDispenser);
ActionDispenser getActionDispenserDelegate();
void setActionDispenserDelegate(ActionDispenser actionDispenser);
IntPredicateDispenser getResultFilterDispenserDelegate();
void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser);
OutputDispenser getMarkerDispenserDelegate();
void setOutputDispenserDelegate(OutputDispenser outputDispenser);
@Override
RunState getRunState();
@@ -115,15 +94,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
*/
RateLimiter getStrideLimiter();
/**
* Get or create the instrumentation needed for this activity. This provides
* a single place to find and manage, and document instrumentation that is
* uniform across all activities.
*
* @return A new or existing instrumentation object for this activity.
*/
ActivityInstrumentation getInstrumentation();
PrintWriter getConsoleOut();
InputStream getConsoleIn();
@@ -142,11 +112,12 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
// return t -> t.getClass().getSimpleName();
// }
//
int getMaxTries();
int getMaxTries();
default int getHdrDigits() {
return this.getParams().getOptionalInteger("hdr_digits").orElse(4);
}
RunStateTally getRunStateTally();
ActivityWiring getWiring();
}

View File

@@ -25,13 +25,6 @@ import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
import java.util.concurrent.Future;
/**
* All the accessors of the metrics that will be used for each activity instance.
* Implementors of this interface should ensure that the methods are synchronized
* to avoid race conditions during lazy init from callers.
*
*
*/
public interface ActivityInstrumentation {
NBMetricGauge getOrCreateErrorsTotal();
@@ -44,103 +37,26 @@ public interface ActivityInstrumentation {
NBMetricGauge getOrCreateErrorRateTotal();
/**
* The input timer measures how long it takes to get the cycle value to be used for
* an operation.
* @return a new or existing {@link Timer}
*/
Timer getOrCreateInputTimer();
/**
* The strides service timer measures how long it takes to complete a stride of work.
* @return a new or existing {@link Timer}
*/
Timer getOrCreateStridesServiceTimer();
/**
* The strides response timer measures the total response time from the scheduled
* time a stride should start to when it completed. Stride scheduling is only defined
* when it is implied by a stride rate limiter, so this method should return null if
* there is no strides rate limiter.
* @return a new or existing {@link Timer} if appropriate, else null
*/
Timer getStridesResponseTimerOrNull();
/**
* The cycles service timer measures how long it takes to complete a cycle of work.
* @return a new or existing {@link Timer}
*/
Timer getOrCreateCyclesServiceTimer();
/**
* The cycles response timer measures the total response time from the scheduled
* time an operation should start to when it is completed. Cycle scheduling is only defined
* when it is implied by a cycle rate limiter, so this method should return null if
* there is no cycles rate limiter.
* @return a new or existing {@link Timer} if appropriate, else null
*/
Timer getCyclesResponseTimerOrNull();
/**
* The pending ops counter keeps track of how many ops are submitted or in-flight, but
* which haven't been completed yet.
* @return a new or existing {@link Counter}
*/
Counter getOrCreatePendingOpCounter();
/**
* The bind timer keeps track of how long it takes for NoSQLBench to create an instance
* of an executable operation, given the cycle. This is usually done by using an
* {@link OpSequence} in conjunction with
* an {@link OpDispenser}. This is named for "binding
* a cycle to an operation".
* @return a new or existing {@link Timer}
*/
Timer getOrCreateBindTimer();
/**
* The execute timer keeps track of how long it takes to submit an operation to be executed
* to an underlying native driver. For asynchronous APIs, such as those which return a
* {@link Future}, this is simply the amount of time it takes to acquire the future.
*
* When possible, APIs should be used via their async methods, even if you are implementing
* a {@link SyncAction}. This allows the execute timer to measure the hand-off to the underlying API,
* and the result timer to measure the blocking calls to aquire the result.
* @return a new or existing {@link Timer}
*/
Timer getOrCreateExecuteTimer();
/**
* The result timer keeps track of how long it takes a native driver to service a request once submitted.
* This timer, in contrast to the result-success timer ({@link #getOrCreateResultSuccessTimer()}),
* is used to track all operations. That is, no matter
* whether the operation succeeds or not, it should be tracked with this timer. The scope of this timer should
* cover each attempt at an operation through a native driver. Retries are not to be combined in this measurement.
* @return a new or existing {@link Timer}
*/
Timer getOrCreateResultTimer();
/**
* The result-success timer keeps track of operations which had no exception. The measurements for this timer should
* be exactly the same values as used for the result timer ({@link #getOrCreateResultTimer()}, except that
* attempts to complete an operation which yield an exception should be excluded from the results. These two metrics
* together provide a very high level sanity check against the error-specific metrics which can be reported by
* the error handler logic.
* @return a new or existing {@link Timer}
*/
Timer getOrCreateResultSuccessTimer();
/**
* The tries histogram tracks how many tries it takes to complete an operation successfully, or not. This histogram
* does not encode whether operations were successful or not. Ideally, if every attempt to complete an operation succeeds
* on its first try, the data in this histogram should all be 1. In practice, systems which are running near their
* capacity will see a few retried operations, and systems that are substantially over-driven will see many retried
* operations. As the retries value increases the further down the percentile scale you go, you can detect system loading
* patterns which are in excess of the real-time capability of the target system.
*
* This metric should be measured around every retry loop for a native operation.
* @return a new or existing {@link Histogram}
*/
Histogram getOrCreateTriesHistogram();
Timer getOrCreateVerifierTimer();

View File

@@ -19,11 +19,15 @@ package io.nosqlbench.engine.api.activityapi.core;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.engine.metrics.instruments.*;
public class ComponentActivityInstrumentation implements ActivityInstrumentation {
import java.util.concurrent.Future;
public class ComponentActivityInstrumentation {
private static final String WAIT_TIME = "_waittime";
private static final String SERVICE_TIME = "_servicetime";
@@ -36,7 +40,6 @@ public class ComponentActivityInstrumentation implements ActivityInstrumentation
private NBMetricTimer readInputTimer;
private NBMetricTimer stridesServiceTimer;
private NBMetricTimer stridesResponseTimer;
private NBMetricTimer cyclesServiceTimer;
private NBMetricTimer cyclesResponseTimer;
private NBMetricCounter pendingOpsCounter;
private NBMetricTimer bindTimer;
@@ -61,90 +64,8 @@ public class ComponentActivityInstrumentation implements ActivityInstrumentation
}
private void initMetrics() {
readInputTimer = activity.create().timer(
"read_input",
this.hdrdigits,
MetricCategory.Internals,
"measures overhead of acquiring a cycle range for an activity thread"
);
stridesServiceTimer = activity.create().timer(
"strides",
this.hdrdigits,
MetricCategory.Core,
"service timer for a stride, which is the same as the op sequence length by default"
);
if (null != activity.getStrideLimiter()) {
this.stridesResponseTimer = activity.create().timer(
"strides" + ComponentActivityInstrumentation.RESPONSE_TIME,
hdrdigits,
MetricCategory.Core,
"response timer for a stride, which is the same as the op sequence length by default;" +
" response timers include scheduling delays which occur when an activity falls behind its target rate"
);
}
this.cyclesServiceTimer = activity.create().timer(
"cycles" + ComponentActivityInstrumentation.SERVICE_TIME,
hdrdigits,
MetricCategory.Core,
"service timer for a cycle, including all of bind, execute, result and result_success;" +
" service timers measure the time between submitting a request and receiving the response"
);
if (null != activity.getCycleLimiter()) {
this.cyclesResponseTimer = activity.create().timer(
"cycles" + ComponentActivityInstrumentation.RESPONSE_TIME,
hdrdigits,
MetricCategory.Core,
"response timer for a cycle, including all of bind, execute, result and result_success;" +
" response timers include scheduling delays which occur when an activity falls behind its target rate"
);
}
this.pendingOpsCounter = activity.create().counter(
"pending_ops",
MetricCategory.Core,
"Indicate the number of operations which have been started, but which have not been completed." +
" This starts "
);
this.bindTimer = activity.create().timer(
"bind",
hdrdigits,
MetricCategory.Core,
"Time the step within a cycle which binds generated data to an op template to synthesize an executable operation."
);
this.executeTimer = activity.create().timer(
"execute",
hdrdigits,
MetricCategory.Core,
"Time how long it takes to submit a request and receive a result, including reading the result in the client."
);
this.resultTimer = activity.create().timer(
"result",
hdrdigits,
MetricCategory.Core,
"Time how long it takes to submit a request, receive a result, including binding, reading results, " +
"and optionally verifying them, including all operations whether successful or not, for each attempted request."
);
this.resultSuccessTimer = activity.create().timer(
"result_success",
hdrdigits,
MetricCategory.Core,
"The execution time of successful operations, which includes submitting the operation, waiting for a response, and reading the result"
);
this.triesHistogram = activity.create().histogram(
"tries",
hdrdigits,
MetricCategory.Core,
"A histogram of all tries for an activity. Perfect results mean all quantiles return 1." +
" Slight saturation is indicated by p99 or p95 returning higher values." +
" Lower quantiles returning more than 1, or higher values at high quantiles indicate incremental overload."
);
this.verifierTimer = activity.create().timer(
"verifier",
hdrdigits,
MetricCategory.Verification,
"Time the execution of verifier code, if any"
);
this.errorRate1m = activity.create().gauge("error_rate_1m",
() -> {
double result_1m_rate = this.resultTimer.getOneMinuteRate();
@@ -204,87 +125,16 @@ public class ComponentActivityInstrumentation implements ActivityInstrumentation
);
}
@Override
public NBMetricGauge getOrCreateErrorsTotal() {
return this.errorsTotal;
}
@Override
public NBMetricGauge getOrCreateErrorRate1m() {
return this.errorRate1m;
}
@Override
public NBMetricGauge getOrCreateErrorRate5m() {
return this.errorRate5m;
}
@Override
public NBMetricGauge getOrCreateErrorRate15m() {
return this.errorRate15m;
}
@Override
public NBMetricGauge getOrCreateErrorRateTotal() {
return this.errorRateTotal;
}
@Override
public Timer getOrCreateInputTimer() {
return readInputTimer;
}
@Override
public Timer getOrCreateStridesServiceTimer() {
return stridesServiceTimer;
}
@Override
public Timer getStridesResponseTimerOrNull() {
return stridesResponseTimer;
}
@Override
public Timer getOrCreateCyclesServiceTimer() {
return cyclesServiceTimer;
}
@Override
public Timer getCyclesResponseTimerOrNull() {
return cyclesResponseTimer;
}
@Override
/// The pending ops counter keeps track of how many ops are submitted or in-flight, but
/// which haven't been completed yet.
public Counter getOrCreatePendingOpCounter() {
return pendingOpsCounter;
}
@Override
public Timer getOrCreateBindTimer() {
return bindTimer;
}
@Override
public Timer getOrCreateExecuteTimer() {
return executeTimer;
}
@Override
public Timer getOrCreateResultTimer() {
return resultTimer;
}
@Override
public Timer getOrCreateResultSuccessTimer() {
return resultSuccessTimer;
}
@Override
public Histogram getOrCreateTriesHistogram() {
return triesHistogram;
}
@Override
public Timer getOrCreateVerifierTimer() {
return verifierTimer;
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import java.io.InputStream;
import java.io.PrintWriter;
/**
* Provides the components needed to build and run an activity a runtime.
* The easiest way to build a useful Activity is to extend {@link StandardActivity}.
*/
public interface IActivityWiring extends Comparable<IActivityWiring>, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent {
ActivityDef getActivityDef();
MotorDispenser<?> getMotorDispenserDelegate();
void setMotorDispenserDelegate(MotorDispenser<?> motorDispenser);
InputDispenser getInputDispenserDelegate();
void setInputDispenserDelegate(InputDispenser inputDispenser);
ActionDispenser getActionDispenserDelegate();
void setActionDispenserDelegate(ActionDispenser actionDispenser);
IntPredicateDispenser getResultFilterDispenserDelegate();
void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser);
OutputDispenser getMarkerDispenserDelegate();
void setOutputDispenserDelegate(OutputDispenser outputDispenser);
PrintWriter getConsoleOut();
InputStream getConsoleIn();
void setConsoleOut(PrintWriter writer);
}

View File

@@ -34,14 +34,6 @@ public interface Motor<T> extends Runnable, Stoppable {
Input getInput();
/**
* Set the action on this motor. It will be applied to each input.
*
* @param action an instance of activityAction
* @return this ActivityMotor, for method chaining
*/
Motor<T> setAction(Action action);
Action getAction();
/**

View File

@@ -18,14 +18,6 @@ package io.nosqlbench.engine.api.activityapi.core;
public interface SyncAction extends Action {
/**
* <p>Apply a work function to an input value, producing an int status code.</p>
* The meaning of status codes is activity specific, however the values Integer.MIN_VALUE,
* and Integer.MAX_VALUE are reserved.
*
* @param cycle a long input
* @return an int status
*/
default int runCycle(long cycle) {
return (int) cycle % 100;
}

View File

@@ -17,44 +17,50 @@
package io.nosqlbench.engine.api.activityapi.core.ops.fluent;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Counting;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.*;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongFunction;
/**
* This tracker keeps track of the state of operations associated with it.
*
* @param <D> The payload data type of the associated Op, based on OpImpl
*/
This tracker keeps track of the state of operations associated with it.
@param <D>
The payload data type of the associated Op, based on OpImpl */
public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
private final AtomicInteger pendingOps = new AtomicInteger(0);
private final String label;
private final long slot;
private final Counter pendingOpsCounter;
private final Timer cycleServiceTimer;
private final Timer cycleResponseTimer;
private final Counter pendingOpsCounter;
private int maxPendingOps =1;
private int maxPendingOps = 1;
private LongFunction<D> cycleOpFunction;
public OpTrackerImpl(Activity activity, long slot) {
public OpTrackerImpl(StandardActivity activity, long slot) {
this.slot = slot;
this.label = "tracker-" + slot + "_" + activity.getAlias();
this.pendingOpsCounter = activity.getInstrumentation().getOrCreatePendingOpCounter();
this.cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer();
this.cycleResponseTimer = activity.getInstrumentation().getCyclesResponseTimerOrNull();
this.pendingOpsCounter = activity.pendingOpsCounter;
this.cycleServiceTimer = activity.cycleServiceTimer;
this.cycleResponseTimer = activity.cycleResponseTimer;
}
// for testing
public OpTrackerImpl(String name, int slot, Timer cycleServiceTimer, Timer cycleResponseTimer, Counter pendingOpsCounter) {
public OpTrackerImpl(
String name, int slot, Timer cycleServiceTimer, Timer cycleResponseTimer,
Counter pendingOpsCounter
) {
this.label = name;
this.slot = slot;
this.cycleResponseTimer = cycleResponseTimer;
@@ -74,9 +80,11 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
int pending = this.pendingOps.decrementAndGet();
cycleServiceTimer.update(op.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
if (cycleResponseTimer !=null) { cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS); }
if (cycleResponseTimer != null) {
cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS);
}
if (pending< maxPendingOps) {
if (pending < maxPendingOps) {
synchronized (this) {
notify();
}
@@ -88,7 +96,7 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
pendingOpsCounter.dec();
int pending = this.pendingOps.decrementAndGet();
if (pending< maxPendingOps) {
if (pending < maxPendingOps) {
synchronized (this) {
notify();
}
@@ -97,16 +105,17 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
}
@Override
public void onOpFailure(FailedOp<D> op) {
pendingOpsCounter.dec();
int pending = this.pendingOps.decrementAndGet();
cycleServiceTimer.update(op.getServiceTimeNanos(), TimeUnit.NANOSECONDS);
if (cycleResponseTimer !=null) { cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS); }
if (cycleResponseTimer != null) {
cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS);
}
if (pending< maxPendingOps) {
if (pending < maxPendingOps) {
synchronized (this) {
notify();
}
@@ -115,7 +124,7 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
@Override
public void setMaxPendingOps(int maxPendingOps) {
this.maxPendingOps =maxPendingOps;
this.maxPendingOps = maxPendingOps;
synchronized (this) {
notifyAll();
}
@@ -123,7 +132,7 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
@Override
public boolean isFull() {
return this.pendingOps.intValue()>=maxPendingOps;
return this.pendingOps.intValue() >= maxPendingOps;
}
@Override
@@ -139,7 +148,7 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
@Override
public TrackedOp<D> newOp(long cycle, OpEvents<D> strideTracker) {
D opstate = cycleOpFunction.apply(cycle);
OpImpl<D> op = new EventedOpImpl<>(this,strideTracker);
OpImpl<D> op = new EventedOpImpl<>(this, strideTracker);
op.setCycle(cycle);
op.setData(opstate);
return op;
@@ -169,7 +178,7 @@ public class OpTrackerImpl<D> implements OpTracker<D>, ActivityDefObserver {
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
this.maxPendingOps=getMaxPendingOpsForThisThread(activityDef);
this.maxPendingOps = getMaxPendingOpsForThisThread(activityDef);
}
private int getMaxPendingOpsForThisThread(ActivityDef def) {

View File

@@ -16,7 +16,10 @@
package io.nosqlbench.engine.api.activityapi.core.progress;
import com.codahale.metrics.Counting;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.nb.api.engine.util.Unit;
import io.nosqlbench.engine.api.activityapi.core.Activity;
@@ -26,14 +29,14 @@ public class ActivityMetricProgressMeter implements ProgressMeterDisplay, Comple
private final Activity activity;
private final Instant startInstant;
private final Timer bindTimer;
private final Timer cyclesTimer;
private final NBMetricTimer bindTimer;
private final NBMetricTimer cyclesTimer;
public ActivityMetricProgressMeter(Activity activity) {
public ActivityMetricProgressMeter(StandardActivity activity) {
this.activity = activity;
this.startInstant = Instant.ofEpochMilli(activity.getStartedAtMillis());
this.bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
this.cyclesTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer();
this.bindTimer = activity.bindTimer;
this.cyclesTimer = activity.cycleServiceTimer;
}
@Override
@@ -60,7 +63,7 @@ public class ActivityMetricProgressMeter implements ProgressMeterDisplay, Comple
@Override
public double getCurrentValue() {
return activity.getInstrumentation().getOrCreateBindTimer().getCount();
return bindTimer.getCount();
}
@Override

View File

@@ -29,7 +29,7 @@ public interface ExperimentalResultFilterType {
new SimpleServiceLoader<>(ExperimentalResultFilterType.class, Maturity.Any);
default IntPredicateDispenser getFilterDispenser(Activity activity) {
SimpleConfig conf = new SimpleConfig(activity, "resultfilter");
SimpleConfig conf = new SimpleConfig(activity.getWiring(), "resultfilter");
return getFilterDispenser(conf);
}

View File

@@ -16,6 +16,11 @@
package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabeledElement;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment;
@@ -38,24 +43,28 @@ import java.nio.channels.FileChannel;
import java.util.Iterator;
import java.util.function.Predicate;
public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResultsSegment>, CanFilterResultValue, NBLabeledElement {
public class CycleLogInput extends NBBaseComponent implements Input, AutoCloseable,
Iterable<CycleResultsSegment>
, CanFilterResultValue, NBLabeledElement {
private final static Logger logger = LogManager.getLogger(CycleLogInput.class);
private final Iterator<CycleResultsSegment> cycleResultSegmentIterator;
private final NBLabeledElement parent;
private RandomAccessFile raf;
private MappedByteBuffer mbb;
private Iterator<CycleResult> segmentIter;
private Predicate<ResultReadable> filter;
public CycleLogInput(Activity activity) {
SimpleConfig conf = new SimpleConfig(activity, "input");
mbb = initMappedBuffer(conf.getString("file").orElse(activity.getAlias()) + ".cyclelog");
public CycleLogInput(StandardActivity activity) {
super(activity, NBLabels.forKV("input","cyclelog"));
SimpleConfig conf = new SimpleConfig(activity.getActivityDef(), "input");
mbb =
initMappedBuffer(conf.getString("file").orElse(activity.getActivityDef().getAlias()) +
".cyclelog");
cycleResultSegmentIterator = iterator();
segmentIter = cycleResultSegmentIterator.next().iterator();
this.parent = activity;
}
public CycleLogInput(String filename) {
public CycleLogInput(NBComponent parent, String filename) {
super(parent, NBLabels.forKV("input","cyclelog"));
File cycleFile = null;
try {
cycleFile = new File(filename);
@@ -71,7 +80,6 @@ public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResult
mbb = initMappedBuffer(cycleFile.getPath());
cycleResultSegmentIterator = new CycleResultsRLEBufferReadable(mbb).iterator();
segmentIter = cycleResultSegmentIterator.next().iterator();
this.parent = NBLabeledElement.EMPTY;
}
@Override
@@ -140,9 +148,13 @@ public class CycleLogInput implements Input, AutoCloseable, Iterable<CycleResult
}
@Override
public void close() throws Exception {
public void teardown() {
if (raf != null) {
raf.close();
try {
raf.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
mbb = null;
}
}

View File

@@ -16,26 +16,26 @@
package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.input.InputType;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.annotations.Service;
@Service(value = InputType.class, selector = "cyclelog")
public class CycleLogInputType implements InputType {
@Override
public InputDispenser getInputDispenser(Activity activity) {
return new Dispenser(activity);
public InputDispenser getInputDispenser(StandardActivity activity) {
return new CycleLogInputDispenser(activity);
}
public static class Dispenser implements InputDispenser {
public static class CycleLogInputDispenser implements InputDispenser {
private final Activity activity;
private final StandardActivity activity;
private final Input input;
public Dispenser(Activity activity) {
public CycleLogInputDispenser(StandardActivity activity) {
this.activity = activity;
this.input = new CycleLogInput(activity);
}

View File

@@ -20,14 +20,20 @@ import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResult
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results_rle.CycleResultsRLEBufferTarget;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results_rle.CycleSpanResults;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResult;
import io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog.CanFilterResultValue;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.io.Closeable;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
@@ -47,7 +53,7 @@ import java.util.function.Predicate;
* <p>It <em>is</em> valid for RLE segments to be broken apart into contiguous
* ranges. Any implementation should treat this as normal.
*/
public class CycleLogOutput implements Output, CanFilterResultValue {
public class CycleLogOutput extends NBBaseComponent implements Output, CanFilterResultValue, Closeable {
// For use in allocating file data, etc
private final static Logger logger = LogManager.getLogger(CycleLogOutput.class);
@@ -59,11 +65,13 @@ public class CycleLogOutput implements Output, CanFilterResultValue {
private final File outputFile;
private Predicate<ResultReadable> filter;
public CycleLogOutput(Activity activity) {
public CycleLogOutput(NBComponent parent, NBLabels componentOnlyLabels, ActivityWiring wiring) {
super(parent, componentOnlyLabels);
SimpleConfig conf = new SimpleConfig(activity, "output");
SimpleConfig conf = new SimpleConfig(wiring, "output");
this.extentSizeInSpans = conf.getInteger("extentSize").orElse(1000);
this.outputFile = new File(conf.getString("file").orElse(activity.getAlias()) + ".cyclelog");
this.outputFile = new File(conf.getString("file").orElse(wiring.getActivityDef().getAlias()) +
".cyclelog");
targetBuffer = new CycleResultsRLEBufferTarget(extentSizeInSpans);
@@ -71,6 +79,8 @@ public class CycleLogOutput implements Output, CanFilterResultValue {
}
public CycleLogOutput(File outputFile, int extentSizeInSpans) {
super(new NBBaseComponent(null),NBLabels.forKV("running","standalone","type",
"cycle_log_output"));
this.extentSizeInSpans = extentSizeInSpans;
this.outputFile = outputFile;
targetBuffer = new CycleResultsRLEBufferTarget(extentSizeInSpans);
@@ -131,7 +141,7 @@ public class CycleLogOutput implements Output, CanFilterResultValue {
}
@Override
public synchronized void close() throws Exception {
protected void teardown() {
try {
flush();
if (file != null) {
@@ -141,9 +151,13 @@ public class CycleLogOutput implements Output, CanFilterResultValue {
}
} catch (Throwable t) {
logger.error("Error while closing CycleLogOutput: " + t, t);
throw t;
throw new RuntimeException(t);
}
}
@Override
public void beforeDetach() {
super.beforeDetach();
}
private synchronized void ensureCapacity(long newCapacity) {

View File

@@ -22,7 +22,10 @@ import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputType;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -30,20 +33,21 @@ import org.apache.logging.log4j.Logger;
public class CycleLogOutputType implements OutputType {
@Override
public OutputDispenser getOutputDispenser(Activity activity) {
return new Dispenser(activity);
public OutputDispenser getOutputDispenser(NBComponent parent, ActivityWiring wiring) {
return new Dispenser(parent, wiring);
}
public static class Dispenser implements OutputDispenser {
private final static Logger logger = LogManager.getLogger(OutputDispenser.class);
private final Output output;
private final Activity activity;
private final ActivityWiring activity;
public Dispenser(Activity activity) {
this.activity = activity;
Input input = activity.getInputDispenserDelegate().getInput(0);
CycleLogOutput rleFileWriter = new CycleLogOutput(activity);
public Dispenser(NBComponent parent, ActivityWiring wiring) {
this.activity = wiring;
Input input = wiring.getInputDispenserDelegate().getInput(0);
CycleLogOutput rleFileWriter = new CycleLogOutput(parent, NBLabels.forKV("type",
"output"), wiring);
// TODO: Rework this so that the contiguous marking chunker can onAfterOpStop filtering
// if (input.isContiguous()) {
@@ -58,7 +62,7 @@ public class CycleLogOutputType implements OutputType {
new ReorderingConcurrentResultBuffer(rleFileWriter);
this.output=prebuffer;
// }
activity.registerAutoCloseable(output);
// wiring.registerAutoCloseable(output);
}
@Override

View File

@@ -17,7 +17,10 @@
package io.nosqlbench.engine.api.activityapi.input;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.spi.SimpleServiceLoader;
public interface InputType {
@@ -25,5 +28,5 @@ public interface InputType {
SimpleServiceLoader<InputType> FINDER =
new SimpleServiceLoader<>(InputType.class, Maturity.Any);
InputDispenser getInputDispenser(Activity activity);
InputDispenser getInputDispenser(StandardActivity parent);
}

View File

@@ -17,7 +17,9 @@
package io.nosqlbench.engine.api.activityapi.output;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.nb.annotations.Maturity;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.spi.SimpleServiceLoader;
public interface OutputType {
@@ -25,6 +27,6 @@ public interface OutputType {
SimpleServiceLoader<OutputType> FINDER =
new SimpleServiceLoader<>(OutputType.class, Maturity.Any);
OutputDispenser getOutputDispenser(Activity activity);
OutputDispenser getOutputDispenser(NBComponent parent, ActivityWiring activity);
}

View File

@@ -17,6 +17,9 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.ResultFilterDispenser;
@@ -25,16 +28,21 @@ import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.input.InputType;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputType;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import java.util.Optional;
import java.util.function.Predicate;
public class CoreServices {
public static <A extends Activity> Optional<OutputDispenser> getOutputDispenser(A activity) {
private static StandardActivity parent;
public static <A extends Activity> Optional<OutputDispenser> getOutputDispenser(
NBComponent parent, ActivityWiring activity) {
OutputDispenser outputDispenser = new SimpleConfig(activity, "output").getString("type")
.flatMap(OutputType.FINDER::get)
.map(mt -> mt.getOutputDispenser(activity)).orElse(null);
.map(mt -> mt.getOutputDispenser(parent, activity)).orElse(null);
if (outputDispenser==null) {
return Optional.empty();
}
@@ -47,7 +55,7 @@ public class CoreServices {
return Optional.ofNullable(outputDispenser);
}
public static <A extends Activity> Optional<Predicate<ResultReadable>> getOutputFilter(A activity) {
public static <A extends Activity> Optional<Predicate<ResultReadable>> getOutputFilter(ActivityWiring activity) {
String paramdata= activity.getParams().getOptionalString("of")
.orElse(activity.getParams().getOptionalString("outputfilter").orElse(null));
if (paramdata==null) {
@@ -64,7 +72,7 @@ public class CoreServices {
// return intPredicateDispenser;
// }
//
public static <A extends Activity> InputDispenser getInputDispenser(A activity) {
public static <A extends Activity> InputDispenser getInputDispenser(StandardActivity activity) {
String inputTypeName = new SimpleConfig(activity, "input").getString("type").orElse("atomicseq");
InputType inputType = InputType.FINDER.getOrThrow(inputTypeName);
InputDispenser dispenser = inputType.getInputDispenser(activity);
@@ -75,7 +83,7 @@ public class CoreServices {
return dispenser;
}
public static <A extends Activity> Optional<Predicate<ResultReadable>> getInputFilter(A activity) {
public static <A extends Activity> Optional<Predicate<ResultReadable>> getInputFilter(Activity activity) {
String paramdata= activity.getParams().getOptionalString("if")
.orElse(activity.getParams().getOptionalString("inputfilter").orElse(null));
if (paramdata==null) {

View File

@@ -1,662 +0,0 @@
/*
* Copyright (c) 2022-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Function;
import java.util.function.LongFunction;
/**
* A default implementation of an Activity, suitable for building upon.
*/
public class SimpleActivity extends NBStatusComponent implements Activity, InvokableResult {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
protected ActivityDef activityDef;
private final List<AutoCloseable> closeables = new ArrayList<>();
private MotorDispenser<?> motorDispenser;
private InputDispenser inputDispenser;
private ActionDispenser actionDispenser;
private OutputDispenser markerDispenser;
private IntPredicateDispenser resultFilterDispenser;
private RunState runState = RunState.Uninitialized;
private ThreadLocal<RateLimiter> strideLimiterSource;
private ThreadLocal<RateLimiter> cycleLimiterSource;
private ActivityInstrumentation activityInstrumentation;
private PrintWriter console;
private long startedAtMillis;
private int nameEnumerator;
private ErrorMetrics errorMetrics;
private NBErrorHandler errorHandler;
private ActivityMetricProgressMeter progressMeter;
private String workloadSource = "unspecified";
private final RunStateTally tally = new RunStateTally();
public SimpleActivity(NBComponent parent, ActivityDef activityDef) {
super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels()));
this.activityDef = activityDef;
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload",
"yaml"
);
if (workloadOpt.isPresent()) {
activityDef.getParams().set("alias", workloadOpt.get());
} else {
activityDef.getParams().set("alias",
activityDef.getActivityDriver().toUpperCase(Locale.ROOT)
+ nameEnumerator);
nameEnumerator++;
}
}
}
public SimpleActivity(NBComponent parent, String activityDefString) {
this(parent, ActivityDef.parseActivityDef(activityDefString));
}
@Override
public synchronized void initActivity() {
initOrUpdateRateLimiters(this.activityDef);
}
public synchronized NBErrorHandler getErrorHandler() {
if (null == this.errorHandler) {
errorHandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics);
}
return errorHandler;
}
@Override
public synchronized RunState getRunState() {
return runState;
}
@Override
public synchronized void setRunState(RunState runState) {
this.runState = runState;
if (RunState.Running == runState) {
this.startedAtMillis = System.currentTimeMillis();
}
}
@Override
public long getStartedAtMillis() {
return startedAtMillis;
}
@Override
public final MotorDispenser<?> getMotorDispenserDelegate() {
return motorDispenser;
}
@Override
public final void setMotorDispenserDelegate(MotorDispenser<?> motorDispenser) {
this.motorDispenser = motorDispenser;
}
@Override
public final InputDispenser getInputDispenserDelegate() {
return inputDispenser;
}
@Override
public final void setInputDispenserDelegate(InputDispenser inputDispenser) {
this.inputDispenser = inputDispenser;
}
@Override
public final ActionDispenser getActionDispenserDelegate() {
return actionDispenser;
}
@Override
public final void setActionDispenserDelegate(ActionDispenser actionDispenser) {
this.actionDispenser = actionDispenser;
}
@Override
public IntPredicateDispenser getResultFilterDispenserDelegate() {
return resultFilterDispenser;
}
@Override
public void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser) {
this.resultFilterDispenser = resultFilterDispenser;
}
@Override
public OutputDispenser getMarkerDispenserDelegate() {
return this.markerDispenser;
}
@Override
public void setOutputDispenserDelegate(OutputDispenser outputDispenser) {
this.markerDispenser = outputDispenser;
}
@Override
public ActivityDef getActivityDef() {
return activityDef;
}
public String toString() {
return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.tally;
}
@Override
public int compareTo(Activity o) {
return getAlias().compareTo(o.getAlias());
}
@Override
public void registerAutoCloseable(AutoCloseable closeable) {
this.closeables.add(closeable);
}
@Override
public void closeAutoCloseables() {
for (AutoCloseable closeable : closeables) {
logger.debug(() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
try {
closeable.close();
} catch (Exception e) {
throw new RuntimeException("Error closing " + closeable + ": " + e, e);
}
}
closeables.clear();
}
@Override
public RateLimiter getCycleLimiter() {
if (cycleLimiterSource!=null) {
return cycleLimiterSource.get();
} else {
return null;
}
}
@Override
public synchronized RateLimiter getStrideLimiter() {
if (strideLimiterSource!=null) {
return strideLimiterSource.get();
} else {
return null;
}
}
@Override
public synchronized ActivityInstrumentation getInstrumentation() {
if (null == this.activityInstrumentation) {
activityInstrumentation = new ComponentActivityInstrumentation(this);
// activityInstrumentation = new CoreActivityInstrumentation(this);
}
return activityInstrumentation;
}
@Override
public synchronized PrintWriter getConsoleOut() {
if (null == console) {
this.console = new PrintWriter(System.out, false, StandardCharsets.UTF_8);
}
return this.console;
}
@Override
public synchronized InputStream getConsoleIn() {
return System.in;
}
@Override
public void setConsoleOut(PrintWriter writer) {
this.console = writer;
}
@Override
public synchronized ErrorMetrics getExceptionMetrics() {
if (null == this.errorMetrics) {
errorMetrics = new ErrorMetrics(this);
}
return errorMetrics;
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
// initOrUpdateRateLimiters(activityDef);
}
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
}
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
}
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
}
/**
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
* defaults when requested.
*
* @param seq
* - The {@link OpSequence} to derive the defaults from
*/
public synchronized void setDefaultsFromOpSequence(OpSequence<?> seq) {
Optional<String> strideOpt = getParams().getOptionalString("stride");
if (strideOpt.isEmpty()) {
String stride = String.valueOf(seq.getSequence().length);
logger.info(() -> "defaulting stride to " + stride + " (the sequence length)");
// getParams().set("stride", stride);
getParams().setSilently("stride", stride);
}
// CYCLES
Optional<String> cyclesOpt = getParams().getOptionalString("cycles");
if (cyclesOpt.isEmpty()) {
String cycles = getParams().getOptionalString("stride").orElseThrow();
logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)");
this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow());
} else {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
);
}
long stride = getParams().getOptionalLong("stride").orElseThrow();
long cycles = this.activityDef.getCycleCount();
if (cycles < stride) {
throw new RuntimeException(
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
" If this was intended, then set stride low enough to allow it."
);
}
}
long cycleCount = this.activityDef.getCycleCount();
long stride = this.activityDef.getParams().getOptionalLong("stride").orElseThrow();
if (0 < stride && 0 != cycleCount % stride) {
logger.warn(() -> "The stride does not evenly divide cycles. Only full strides will be executed," +
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
}
Optional<String> threadSpec = activityDef.getParams().getOptionalString("threads");
if (threadSpec.isPresent()) {
String spec = threadSpec.get();
int processors = Runtime.getRuntime().availableProcessors();
if ("auto".equalsIgnoreCase(spec)) {
int threads = processors * 10;
if (threads > activityDef.getCycleCount()) {
threads = (int) activityDef.getCycleCount();
logger.info("setting threads to {} (auto) [10xCORES, cycle count limited]", threads);
} else {
logger.info("setting threads to {} (auto) [10xCORES]", threads);
}
// activityDef.setThreads(threads);
activityDef.getParams().setSilently("threads", threads);
} else if (spec.toLowerCase().matches("\\d+x")) {
String multiplier = spec.substring(0, spec.length() - 1);
int threads = processors * Integer.parseInt(multiplier);
logger.info(() -> "setting threads to " + threads + " (" + multiplier + "x)");
// activityDef.setThreads(threads);
activityDef.getParams().setSilently("threads", threads);
} else if (spec.toLowerCase().matches("\\d+")) {
logger.info(() -> "setting threads to " + spec + " (direct)");
// activityDef.setThreads(Integer.parseInt(spec));
activityDef.getParams().setSilently("threads", Integer.parseInt(spec));
}
if (activityDef.getThreads() > activityDef.getCycleCount()) {
logger.warn(() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary()
+ ", you should have more cycles than threads.");
}
} else if (1000 < cycleCount) {
logger.warn(() -> "For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
}
if (0 < this.activityDef.getCycleCount() && seq.getOps().isEmpty()) {
throw new BasicError("You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity.");
}
}
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
// Map<String, DriverAdapter<?,?>> adapterCache,
// Map<String, OpMapper<? extends Op>> mapperCache,
List<DriverAdapter<CycleOp<?>, Space>> adapters,
List<ParsedOp> pops,
OpLookup opLookup
) {
try {
List<Long> ratios = new ArrayList<>(pops.size());
for (ParsedOp pop : pops) {
long ratio = pop.takeStaticConfigOr("ratio", 1);
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(sequencerType);
for (int i = 0; i < pops.size(); i++) {
long ratio = ratios.get(i);
ParsedOp pop = pops.get(i);
try {
if (0 == ratio) {
logger.info(() -> "skipped mapping op '" + pop.getName() + '\'');
continue;
}
DriverAdapter<CycleOp<?>, Space> adapter = adapters.get(i);
OpMapper<CycleOp<?>, Space> opMapper = adapter.getOpMapper();
LongFunction<Space> spaceFunc = adapter.getSpaceFunc(pop);
OpDispenser<? extends CycleOp<?>> dispenser = opMapper.apply(this, pop, spaceFunc);
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun");
dispenser = OpFunctionComposition.wrapOptionally(
adapter,
dispenser,
pop,
dryrun,
opLookup
);
// if (strict) {
// optemplate.assertConsumed();
// }
planner.addOp((OpDispenser<? extends CycleOp<?>>) dispenser, ratio);
} catch (Exception e) {
throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e);
}
}
return planner.resolve();
} catch (Exception e) {
if (e instanceof OpConfigError oce) {
throw oce;
} else {
throw new OpConfigError(e.getMessage(), workloadSource, e);
}
}
}
protected List<OpTemplate> loadOpTemplates(
DriverAdapter<?, ?> defaultDriverAdapter,
boolean logged,
boolean filtered
) {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
OpsDocList opsDocList = loadStmtsDocList();
List<OpTemplate> filteredOps = opsDocList.getOps(filtered?tagfilter:"", logged);
if (filteredOps.isEmpty()) {
// There were no ops, and it *wasn't* because they were all filtered out.
// In this case, let's try to synthesize the ops as long as at least a default driver was provided
// But if there were no ops, and there was no default driver provided, we can't continue
// There were no ops, and it was because they were all filtered out
List<OpTemplate> unfilteredOps = opsDocList.getOps(false);
if (!unfilteredOps.isEmpty()) {
String message = "There were no active op templates with tag filter '"+ tagfilter + "', since all " +
unfilteredOps.size() + " were filtered out. Examine the session log for details";
NBAdvisorOutput.test(message);
//throw new BasicError(message);
}
if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
filteredOps = sotp.getSyntheticOpTemplates(opsDocList, this.activityDef.getParams());
Objects.requireNonNull(filteredOps);
if (filteredOps.isEmpty()) {
throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' +
" but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
}
} else {
throw new BasicError("""
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
}
}
return filteredOps;
}
/**
* Given a function that can create an op of type <O> from an OpTemplate, generate
* an indexed sequence of ready to call operations.
* <p>
* This method uses the following conventions to derive the sequence:
*
* <OL>
* <LI>If an 'op', 'stmt', or 'statement' parameter is provided, then it's value is
* taken as the only provided statement.</LI>
* <LI>If a 'yaml, or 'workload' parameter is provided, then the statements in that file
* are taken with their ratios </LI>
* <LI>Any provided tags filter is used to select only the op templates which have matching
* tags. If no tags are provided, then all the found op templates are included.</LI>
* <LI>The ratios and the 'seq' parameter are used to build a sequence of the ready operations,
* where the sequence length is the sum of the ratios.</LI>
* </OL>
*
* @param <O>
* A holder for an executable operation for the native driver used by this activity.
* @param opinit
* A function to map an OpTemplate to the executable operation form required by
* the native driver for this activity.
* @param defaultAdapter
* The adapter which will be used for any op templates with no explicit adapter
* @return The sequence of operations as determined by filtering and ratios
*/
@Deprecated(forRemoval = true)
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(Function<OpTemplate,
OpDispenser<? extends O>> opinit, boolean strict, DriverAdapter<?, ?> defaultAdapter) {
List<OpTemplate> stmts = loadOpTemplates(defaultAdapter,true,false);
List<Long> ratios = new ArrayList<>(stmts.size());
for (OpTemplate opTemplate : stmts) {
long ratio = opTemplate.removeParamOrDefault("ratio", 1);
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
try {
for (int i = 0; i < stmts.size(); i++) {
long ratio = ratios.get(i);
OpTemplate optemplate = stmts.get(i);
OpDispenser<? extends O> driverSpecificReadyOp = opinit.apply(optemplate);
if (strict) {
optemplate.assertConsumed();
}
planner.addOp(driverSpecificReadyOp, ratio);
}
} catch (Exception e) {
throw new OpConfigError(e.getMessage(), workloadSource, e);
}
return planner.resolve();
}
protected OpsDocList loadStmtsDocList() {
try {
String op = activityDef.getParams().getOptionalString("op").orElse(null);
String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(null);
String workload = activityDef.getParams().getOptionalString("workload").orElse(null);
if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) {
throw new OpConfigError("Only op, statement, or workload may be provided, not more than one.");
}
logger.debug("loadStmtsDocList #1");
if (workload != null && OpsLoader.isJson(workload)) {
workloadSource = "commandline: (workload/json):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.json, activityDef.getParams(), null);
} else if (workload != null && OpsLoader.isYaml(workload)) {
workloadSource = "commandline: (workload/yaml):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.yaml, activityDef.getParams(), null);
} else if (workload != null) {
return OpsLoader.loadPath(workload, activityDef.getParams(), "activities");
}
logger.debug("loadStmtsDocList #2");
if (stmt != null) {
workloadSource = "commandline: (stmt/inline): '" + stmt + "'";
return OpsLoader.loadString(stmt, OpTemplateFormat.inline, activityDef.getParams(), null);
}
logger.debug("loadStmtsDocList #3");
if (op != null && OpsLoader.isJson(op)) {
workloadSource = "commandline: (op/json): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null);
}
else if (op != null) {
workloadSource = "commandline: (op/inline): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null);
}
return OpsDocList.none();
} catch (Exception e) {
throw new OpConfigError("Error loading op templates: " + e, workloadSource, e);
}
}
@Override
public synchronized ProgressMeterDisplay getProgressMeter() {
if (null == this.progressMeter) {
this.progressMeter = new ActivityMetricProgressMeter(this);
}
return this.progressMeter;
}
/**
* Activities with retryable operations (when specified with the retry error handler for some
* types of error), should allow the user to specify how many retries are allowed before
* giving up on the operation.
*
* @return The number of allowable retries
*/
@Override
public int getMaxTries() {
return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
}
@Override
public RunStateTally getRunStateTally() {
return tally;
}
@Override
public Map<String, String> asResult() {
return Map.of("activity",this.getAlias());
}
// private final ThreadLocal<RateLimiter> cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> {
// RateLimiters.createOrUpdate(this,null,new SimRateSpec()
// if (cycleratePerThread) {
// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,)
// } else {
// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef)
// }
// if (getCycleLimiter() != null) {
// return RateLimiters.createOrUpdate(
// new NBThreadComponent(this),
// getCycleLimiter(),
// getCycleLimiter().getSpec());
// } else {
// return null;
// }
// });
}

View File

@@ -18,6 +18,8 @@ package io.nosqlbench.engine.api.activityimpl.action;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@@ -28,14 +30,14 @@ public class CoreActionDispenser implements ActionDispenser {
private final static Logger logger = LogManager.getLogger(CoreActionDispenser.class);
private final Activity activity;
private final ActivityWiring activity;
public CoreActionDispenser(Activity activity) {
public CoreActionDispenser(ActivityWiring activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
public SyncAction getAction(int slot) {
return new CoreAction(activity.getActivityDef(), slot);
}
}

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.api.activityimpl.input;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.util.SimpleConfig;
import io.nosqlbench.engine.api.activityapi.core.ActivitiesAware;
import io.nosqlbench.engine.api.activityapi.core.Activity;
@@ -27,11 +28,11 @@ import java.util.Map;
public class CoreInputDispenser implements InputDispenser, ActivitiesAware {
private final Activity activity;
private final StandardActivity activity;
private Map<String, Activity> activities;
private Input input;
public CoreInputDispenser(Activity activity) {
public CoreInputDispenser(StandardActivity activity) {
this.activity = activity;
}
@@ -44,7 +45,7 @@ public class CoreInputDispenser implements InputDispenser, ActivitiesAware {
}
private synchronized Input createInput(long slot) {
SimpleConfig conf = new SimpleConfig(activity, "input");
SimpleConfig conf = new SimpleConfig(activity.getActivityDef(), "input");
String inputType = conf.getString("type").orElse("atomicseq");
InputType inputTypeImpl = InputType.FINDER.getOrThrow(inputType);
InputDispenser inputDispenser = inputTypeImpl.getInputDispenser(activity);

View File

@@ -20,13 +20,14 @@ import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.input.InputType;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.annotations.Service;
@Service(value= InputType.class, selector="atomicseq")
public class TargetRateInputType implements InputType {
@Override
public InputDispenser getInputDispenser(Activity activity) {
public InputDispenser getInputDispenser(StandardActivity activity) {
return new Dispenser(activity);
}

View File

@@ -73,7 +73,7 @@ public class ContiguousOutputChunker implements Output {
public ContiguousOutputChunker(Activity activity) {
if (!(activity.getInputDispenserDelegate().getInput(0).isContiguous())) {
if (!(activity.getWiring().getInputDispenserDelegate().getInput(0).isContiguous())) {
throw new RuntimeException("This type of output may not be used with non-contiguous inputs yet.");
// If you are looking at this code, it's because we count updates to extents to provide
// efficient marker extent handling. The ability to use segmented inputs with markers will

View File

@@ -25,7 +25,13 @@ import io.nosqlbench.engine.api.activityapi.core.ops.fluent.OpTracker;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
@@ -35,26 +41,26 @@ import java.util.concurrent.TimeUnit;
import static io.nosqlbench.engine.api.activityapi.core.RunState.*;
/**
* ActivityMotor is a Runnable which runs in one of an activity's many threads.
* It is the iteration harness for individual cycles of an activity. Each ActivityMotor
* instance is responsible for taking input from a LongSupplier and applying
* the provided LongConsumer to it on each cycle. These two parameters are called
* input and action, respectively.
*
* This motor implementation splits the handling of sync and async actions with a hard
* fork in the middle to limit potential breakage of the prior sync implementation
* with new async logic.
*/
public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
ActivityMotor is a Runnable which runs in one of an activity's many threads.
It is the iteration harness for individual cycles of an activity. Each ActivityMotor
instance is responsible for taking input from a LongSupplier and applying
the provided LongConsumer to it on each cycle. These two parameters are called
input and action, respectively.
This motor implementation splits the handling of sync and async actions with a hard
fork in the middle to limit potential breakage of the prior sync implementation
with new async logic. */
public class CoreMotor<D> extends NBBaseComponent implements ActivityDefObserver, Motor<D>, Stoppable {
private static final Logger logger = LogManager.getLogger(CoreMotor.class);
private final long slotId;
private final Activity activity;
private Timer inputTimer;
private RateLimiter strideRateLimiter;
private Timer strideServiceTimer;
private Timer stridesServiceTimer;
private Timer stridesResponseTimer;
private RateLimiter cycleRateLimiter;
@@ -62,8 +68,8 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
private Timer cycleResponseTimer;
private Input input;
private Action action;
private final Activity activity;
private SyncAction action;
// private final Activity activity;
private Output output;
private final MotorState motorState;
@@ -72,69 +78,38 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
private OpTracker<D> opTracker;
/**
* Create an ActivityMotor.
*
* @param activity The activity that this motor will be associated with.
* @param slotId The enumeration of the motor, as assigned by its executor.
* @param input A LongSupplier which provides the cycle number inputs.
Create an ActivityMotor.
// * @param activity The activity that this motor will be associated with.
@param slotId
The enumeration of the motor, as assigned by its executor.
@param input
A LongSupplier which provides the cycle number inputs.
*/
public CoreMotor(
Activity activity,
long slotId,
Input input) {
public CoreMotor(StandardActivity activity, long slotId, Input input, SyncAction action,
Output output) {
super(activity, NBLabels.forKV("motor", "coremotor"));
this.activity = activity;
this.slotId = slotId;
setInput(input);
setResultOutput(output);
motorState = new MotorState(slotId, activity.getRunStateTally());
onActivityDefUpdate(activity.getActivityDef());
}
this.action = action;
int hdrdigits = activity.getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3);
/**
* Create an ActivityMotor.
*
* @param activity The activity that this motor is based on.
* @param slotId The enumeration of the motor, as assigned by its executor.
* @param input A LongSupplier which provides the cycle number inputs.
* @param action An LongConsumer which is applied to the input for each cycle.
*/
public CoreMotor(
Activity activity,
long slotId,
Input input,
Action action
) {
this(activity, slotId, input);
setAction(action);
}
/**
* Create an ActivityMotor.
*
* @param activity The activity that this motor is based on.
* @param slotId The enumeration of the motor, as assigned by its executor.
* @param input A LongSupplier which provides the cycle number inputs.
* @param action An LongConsumer which is applied to the input for each cycle.
* @param output An optional opTracker.
*/
public CoreMotor(
Activity activity,
long slotId,
Input input,
Action action,
Output output
) {
this(activity, slotId, input);
setAction(action);
setResultOutput(output);
}
/**
* Set the input for this ActivityMotor.
*
* @param input The LongSupplier that provides the cycle number.
* @return this ActivityMotor, for chaining
Set the input for this ActivityMotor.
@param input
The LongSupplier that provides the cycle number.
@return this ActivityMotor, for chaining
*/
@Override
public Motor<D> setInput(Input input) {
@@ -147,19 +122,6 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
return input;
}
/**
* Set the action for this ActivityMotor.
*
* @param action The LongConsumer that will be applied to the next cycle number.
* @return this ActivityMotor, for chaining
*/
@Override
public Motor<D> setAction(Action action) {
this.action = action;
return this;
}
@Override
public Action getAction() {
return action;
@@ -185,16 +147,13 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
motorState.enterState(Starting);
try {
inputTimer = activity.getInstrumentation().getOrCreateInputTimer();
strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer();
stridesResponseTimer = activity.getInstrumentation().getStridesResponseTimerOrNull();
strideRateLimiter = activity.getStrideLimiter();
cycleRateLimiter = activity.getCycleLimiter();
if (motorState.get() == Finished) {
logger.warn(() -> "Input was already exhausted for slot " + slotId + ", remaining in finished state.");
logger.warn(
() -> "Input was already exhausted for slot " + slotId + ", remaining in finished state.");
}
action.init();
@@ -211,105 +170,104 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
long strideDelay = 0L;
long cycleDelay = 0L;
if (action instanceof SyncAction sync) {
cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer();
strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer();
motorState.enterState(Running);
while (motorState.get() == Running) {
if (activity.getActivityDef().getParams().containsKey("async")) {
throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async.");
CycleSegment cycleSegment = null;
CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride);
try (Timer.Context inputTime = inputTimer.time()) {
cycleSegment = input.getInputSegment(stride);
}
motorState.enterState(Running);
while (motorState.get() == Running) {
CycleSegment cycleSegment = null;
CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride);
try (Timer.Context inputTime = inputTimer.time()) {
cycleSegment = input.getInputSegment(stride);
}
if (cycleSegment == null) {
logger.trace(() -> "input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId);
motorState.enterState(Finished);
continue;
}
if (cycleSegment == null) {
logger.trace(
() -> "input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId);
motorState.enterState(Finished);
continue;
}
if (strideRateLimiter != null) {
// block for strides rate limiter
strideDelay = strideRateLimiter.block();
}
if (strideRateLimiter != null) {
// block for strides rate limiter
strideDelay = strideRateLimiter.block();
}
long strideStart = System.nanoTime();
try {
long strideStart = System.nanoTime();
try {
while (!cycleSegment.isExhausted()) {
long cyclenum = cycleSegment.nextCycle();
if (cyclenum < 0) {
if (cycleSegment.isExhausted()) {
logger.trace(() -> "input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId);
motorState.enterState(Finished);
continue;
}
}
if (motorState.get() != Running) {
logger.trace(() -> "motor stopped after input (input " + cyclenum + "), stopping motor thread " + slotId);
while (!cycleSegment.isExhausted()) {
long cyclenum = cycleSegment.nextCycle();
if (cyclenum < 0) {
if (cycleSegment.isExhausted()) {
logger.trace(
() -> "input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId);
motorState.enterState(Finished);
continue;
}
int result = -1;
if (cycleRateLimiter != null) {
// Block for cycle rate limiter
cycleDelay = cycleRateLimiter.block();
}
long cycleStart = System.nanoTime();
try {
logger.trace(()->"cycle " + cyclenum);
result = sync.runCycle(cyclenum);
} catch (Exception e) {
motorState.enterState(Errored);
throw e;
} finally {
long cycleEnd = System.nanoTime();
cycleServiceTimer.update((cycleEnd - cycleStart) + cycleDelay, TimeUnit.NANOSECONDS);
}
segBuffer.append(cyclenum, result);
}
} finally {
long strideEnd = System.nanoTime();
strideServiceTimer.update((strideEnd - strideStart) + strideDelay, TimeUnit.NANOSECONDS);
}
if (motorState.get() != Running) {
logger.trace(
() -> "motor stopped after input (input " + cyclenum + "), stopping motor thread " + slotId);
continue;
}
int result = -1;
if (output != null) {
CycleResultsSegment outputBuffer = segBuffer.toReader();
if (cycleRateLimiter != null) {
// Block for cycle rate limiter
cycleDelay = cycleRateLimiter.block();
}
long cycleStart = System.nanoTime();
try {
output.onCycleResultSegment(outputBuffer);
} catch (Exception t) {
logger.error(()->"Error while feeding result segment " + outputBuffer + " to output '" + output + "', error:" + t);
throw t;
logger.trace(() -> "cycle " + cyclenum);
result = action.runCycle(cyclenum);
} catch (Exception e) {
motorState.enterState(Errored);
throw e;
} finally {
long cycleEnd = System.nanoTime();
cycleServiceTimer.update(
(cycleEnd - cycleStart) + cycleDelay, TimeUnit.NANOSECONDS);
}
segBuffer.append(cyclenum, result);
}
} finally {
long strideEnd = System.nanoTime();
stridesServiceTimer.update(
(strideEnd - strideStart) + strideDelay,
TimeUnit.NANOSECONDS
);
}
} else {
throw new RuntimeException("Valid Action implementations must implement SyncAction");
if (output != null) {
CycleResultsSegment outputBuffer = segBuffer.toReader();
try {
output.onCycleResultSegment(outputBuffer);
} catch (Exception t) {
logger.error(
() -> "Error while feeding result segment " + outputBuffer + " to output '" + output + "', error:" + t);
throw t;
}
}
}
if (motorState.get() == Stopping) {
motorState.enterState(Stopped);
logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
logger.trace(
() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
} else if (motorState.get() == Finished) {
logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
logger.trace(
() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
} else {
logger.warn(()->"Unexpected motor state for CoreMotor shutdown: " + motorState.get());
logger.warn(
() -> "Unexpected motor state for CoreMotor shutdown: " + motorState.get());
}
} catch (Throwable t) {
logger.error(()->"Error in core motor loop:" + t, t);
logger.error(() -> "Error in core motor loop:" + t, t);
motorState.enterState(Errored);
throw t;
}
@@ -342,7 +300,8 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
Stoppable.stop(input, action);
motorState.enterState(Stopping);
} else {
logger.warn(() -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState);
logger.warn(
() -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState);
}
}

View File

@@ -15,6 +15,8 @@
*/
package io.nosqlbench.engine.api.activityimpl.motor;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.input.Input;
@@ -30,12 +32,12 @@ import java.util.function.IntPredicate;
*/
public class CoreMotorDispenser<D> implements MotorDispenser<D> {
private final Activity activity;
private final StandardActivity activity;
private final InputDispenser inputDispenser;
private final ActionDispenser actionDispenser;
private final OutputDispenser outputDispenser;
public CoreMotorDispenser(Activity activity,
public CoreMotorDispenser(StandardActivity activity,
InputDispenser inputDispenser,
ActionDispenser actionDispenser,
OutputDispenser outputDispenser
@@ -48,7 +50,7 @@ public class CoreMotorDispenser<D> implements MotorDispenser<D> {
@Override
public Motor<D> getMotor(ActivityDef activityDef, int slotId) {
Action action = actionDispenser.getAction(slotId);
SyncAction action = actionDispenser.getAction(slotId);
Input input = inputDispenser.getInput(slotId);
Output output = null;
if (outputDispenser !=null) {

View File

@@ -0,0 +1,78 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.MotorDispenser;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.labels.NBLabels;
public class ActivityWiring {
private final ActivityDef activityDef;
private MotorDispenser<?> motorDispenser;
private InputDispenser inputDispenser;
private ActionDispenser actionDispenser;
private OutputDispenser markerDispenser;
private IntPredicateDispenser resultFilterDispenser;
public ActivityWiring(ActivityDef activityDef) {
this.activityDef = activityDef;
}
public static ActivityWiring of(ActivityDef activityDef) {
return new ActivityWiring(activityDef);
}
public ActivityDef getActivityDef() {
return activityDef;
}
public final MotorDispenser<?> getMotorDispenserDelegate() {
return motorDispenser;
}
public final void setMotorDispenserDelegate(MotorDispenser<?> motorDispenser) {
this.motorDispenser = motorDispenser;
}
public final InputDispenser getInputDispenserDelegate() {
return inputDispenser;
}
public final void setInputDispenserDelegate(InputDispenser inputDispenser) {
this.inputDispenser = inputDispenser;
}
public final ActionDispenser getActionDispenserDelegate() {
return actionDispenser;
}
public final void setActionDispenserDelegate(ActionDispenser actionDispenser) {
this.actionDispenser = actionDispenser;
}
public IntPredicateDispenser getResultFilterDispenserDelegate() {
return resultFilterDispenser;
}
public void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser) {
this.resultFilterDispenser = resultFilterDispenser;
}
public OutputDispenser getMarkerDispenserDelegate() {
return this.markerDispenser;
}
public void setOutputDispenserDelegate(OutputDispenser outputDispenser) {
this.markerDispenser = outputDispenser;
}
public ParameterMap getParams() {
return activityDef.getParams();
}
}

View File

@@ -21,14 +21,14 @@ import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
public class StandardActionDispenser implements ActionDispenser {
private final StandardActivity<?,?> activity;
private final StandardActivity activity;
public <A extends Activity> StandardActionDispenser(StandardActivity<?,?> activity) {
public <A extends Activity> StandardActionDispenser(StandardActivity activity) {
this.activity = activity;
}
@Override
public StandardAction<?,?> getAction(int slot) {
return new StandardAction<>(activity,slot);
return new StandardAction<>(activity, slot);
}
}

View File

@@ -16,9 +16,11 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
@@ -28,8 +30,26 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.api.activityimpl.Dryrun;
import io.nosqlbench.engine.api.activityimpl.OpFunctionComposition;
import io.nosqlbench.engine.api.activityimpl.OpLookupService;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.*;
@@ -40,20 +60,20 @@ import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.components.events.NBEvent;
import io.nosqlbench.nb.api.components.events.ParamChange;
import io.nosqlbench.nb.api.components.events.SetThreads;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.nb.api.tagging.TagFilter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.Supplier;
/**
This is a typed activity which is expected to become the standard
@@ -64,19 +84,177 @@ import java.util.function.Supplier;
@param <S>
The context type for the activity, AKA the 'space' for a named driver instance and its
associated object graph */
public class StandardActivity<R extends java.util.function.LongFunction, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
public class StandardActivity<R extends java.util.function.LongFunction, S> extends NBStatusComponent implements Activity, InvokableResult, SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends CycleOp<?>>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>, Space>> adapters = new ConcurrentHashMap<>();
protected final ActivityDef activityDef;
@Override
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
return super.createOpSourceFromParsedOps(adapters, pops, opLookup);
}
public final NBMetricCounter pendingOpsCounter;
public final NBMetricHistogram triesHistogram;
public final NBMetricTimer bindTimer;
public final NBMetricTimer executeTimer;
public final NBMetricTimer resultTimer;
public final NBMetricTimer resultSuccessTimer;
public final NBMetricTimer cycleServiceTimer;
public final NBMetricTimer inputTimer;
public final NBMetricTimer stridesServiceTimer;
public final NBMetricTimer stridesResponseTimer;
public final NBMetricTimer cycleResponseTimer;
private ActivityMetricProgressMeter progressMeter;
private String workloadSource = "unspecified";
private RunState runState = RunState.Uninitialized;
private long startedAtMillis;
private final RunStateTally tally = new RunStateTally();
private ThreadLocal<RateLimiter> strideLimiterSource;
private ThreadLocal<RateLimiter> cycleLimiterSource;
private int nameEnumerator;
private NBErrorHandler errorHandler;
private final List<AutoCloseable> closeables = new ArrayList<>();
private PrintWriter console;
private ErrorMetrics errorMetrics;
private ActivityWiring wiring;
private static final String WAIT_TIME = "_waittime";
private static final String RESPONSE_TIME = "_responsetime";
private static final String SERVICE_TIME = "_servicetime";
public StandardActivity(NBComponent parent, ActivityDef activityDef, ActivityWiring wiring) {
super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels()));
this.activityDef = activityDef;
this.wiring = wiring;
int hdrdigits = getComponentProp("hdr_digits")
.map(Integer::parseInt).orElse(3);
this.pendingOpsCounter = create().counter(
"pending_ops",
MetricCategory.Core,
"Indicate the number of operations which have been started, but which have not been completed." +
" This starts "
);
/// The bind timer keeps track of how long it takes for NoSQLBench to create an instance
/// of an executable operation, given the cycle. This is usually done by using an
/// {@link OpSequence} in conjunction with
/// an {@link OpDispenser}. This is named for "binding
/// a cycle to an operation".
this.bindTimer = create().timer(
"bind", hdrdigits, MetricCategory.Core,
"Time the step within a cycle which binds generated data to an op template to synthesize an executable operation."
);
/// The execute timer keeps track of how long it takes to submit an operation to be executed
/// to an underlying native driver. For asynchronous APIs, such as those which return a
/// {@link Future}, this is simply the amount of time it takes to acquire the future.
/// /// When possible, APIs should be used via their async methods, even if you are implementing
/// a {@link SyncAction}. This allows the execute timer to measure the hand-off to the underlying API,
/// and the result timer to measure the blocking calls to aquire the result.
this.executeTimer = create().timer(
"execute",
hdrdigits,
MetricCategory.Core,
"Time how long it takes to submit a request and receive a result, including reading the result in the client."
);
/// The cycles service timer measures how long it takes to complete a cycle of work.
this.cycleServiceTimer = create().timer(
"cycles" + SERVICE_TIME, hdrdigits, MetricCategory.Core,
"service timer for a cycle, including all of bind, execute, result and result_success;" + " service timers measure the time between submitting a request and receiving the response"
);
/// The result timer keeps track of how long it takes a native driver to service a request once submitted.
/// This timer, in contrast to the result-success timer ({@link #getOrCreateResultSuccessTimer()}),
/// is used to track all operations. That is, no matter
/// whether the operation succeeds or not, it should be tracked with this timer. The scope of this timer should
/// cover each attempt at an operation through a native driver. Retries are not to be combined in this measurement.
this.resultTimer = create().timer(
"result",
hdrdigits,
MetricCategory.Core,
"Time how long it takes to submit a request, receive a result, including binding, reading results, " +
"and optionally verifying them, including all operations whether successful or not, for each attempted request."
);
/// The result-success timer keeps track of operations which had no exception. The measurements for this timer should
/// be exactly the same values as used for the result timer ({@link #getOrCreateResultTimer()}, except that
/// attempts to complete an operation which yield an exception should be excluded from the results. These two metrics
/// together provide a very high level sanity check against the error-specific metrics which can be reported by
/// the error handler logic.
this.resultSuccessTimer = create().timer(
"result_success",
hdrdigits,
MetricCategory.Core,
"The execution time of successful operations, which includes submitting the operation, waiting for a response, and reading the result"
);
/// The input timer measures how long it takes to get the cycle value to be used for
/// an operation.
this.inputTimer = create().timer(
"read_input", getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3),
MetricCategory.Internals,
"measures overhead of acquiring a cycle range for an activity thread"
);
/// The strides service timer measures how long it takes to complete a stride of work.
this.stridesServiceTimer = create().timer(
"strides", getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3),
MetricCategory.Core,
"service timer for a stride, which is the same as the op sequence length by default"
);
if (null != getStrideLimiter()) {
/// The strides response timer measures the total response time from the scheduled
/// time a stride should start to when it completed. Stride scheduling is only defined
/// when it is implied by a stride rate limiter, so this method should return null if
/// there is no strides rate limiter.
this.stridesResponseTimer = create().timer(
"strides" + RESPONSE_TIME, hdrdigits, MetricCategory.Core,
"response timer for a stride, which is the same as the op sequence length by default;" + " response timers include scheduling delays which occur when an activity falls behind its target rate"
);
} else {
stridesResponseTimer=null;
}
/**
* The cycles response timer measures the total response time from the scheduled
* time an operation should start to when it is completed. Cycle scheduling is only defined
* when it is implied by a cycle rate limiter, so this method should return null if
* there is no cycles rate limiter.
* @return a new or existing {@link Timer} if appropriate, else null
*/
if (null != getCycleLimiter()) {
this.cycleResponseTimer = create().timer(
"cycles" + RESPONSE_TIME, hdrdigits, MetricCategory.Core,
"response timer for a cycle, including all of bind, execute, result and result_success;" + " response timers include scheduling delays which occur when an activity falls behind its target rate"
);
} else {
cycleResponseTimer=null;
}
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload",
"yaml"
);
if (workloadOpt.isPresent()) {
activityDef.getParams().set("alias", workloadOpt.get());
} else {
activityDef.getParams().set("alias",
activityDef.getActivityDriver().toUpperCase(Locale.ROOT)
+ nameEnumerator);
nameEnumerator++;
}
}
public StandardActivity(NBComponent parent, ActivityDef activityDef) {
super(parent, activityDef);
OpsDocList workload;
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
@@ -151,8 +329,102 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
"ops_complete", () -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core, "The current number of operations which have been completed"
);
/// The tries histogram tracks how many tries it takes to complete an operation successfully, or not. This histogram
/// does not encode whether operations were successful or not. Ideally, if every attempt to complete an operation succeeds
/// on its first try, the data in this histogram should all be 1. In practice, systems which are running near their
/// capacity will see a few retried operations, and systems that are substantially over-driven will see many retried
/// operations. As the retries value increases the further down the percentile scale you go, you can detect system loading
/// patterns which are in excess of the real-time capability of the target system.
/// This metric should be measured around every retry loop for a native operation.
this.triesHistogram = create().histogram(
"tries",
hdrdigits,
MetricCategory.Core,
"A histogram of all tries for an activity. Perfect results mean all quantiles return 1." +
" Slight saturation is indicated by p99 or p95 returning higher values." +
" Lower quantiles returning more than 1, or higher values at high quantiles indicate incremental overload."
);
}
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
return createOpSourceFromParsedOps2(adapters, pops, opLookup);
}
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps2(
// Map<String, DriverAdapter<?,?>> adapterCache,
// Map<String, OpMapper<? extends Op>> mapperCache,
List<DriverAdapter<CycleOp<?>, Space>> adapters,
List<ParsedOp> pops,
OpLookup opLookup
) {
try {
List<Long> ratios = new ArrayList<>(pops.size());
for (ParsedOp pop : pops) {
long ratio = pop.takeStaticConfigOr("ratio", 1);
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(sequencerType);
for (int i = 0; i < pops.size(); i++) {
long ratio = ratios.get(i);
ParsedOp pop = pops.get(i);
try {
if (0 == ratio) {
logger.info(() -> "skipped mapping op '" + pop.getName() + '\'');
continue;
}
DriverAdapter<CycleOp<?>, Space> adapter = adapters.get(i);
OpMapper<CycleOp<?>, Space> opMapper = adapter.getOpMapper();
LongFunction<Space> spaceFunc = adapter.getSpaceFunc(pop);
OpDispenser<? extends CycleOp<?>> dispenser = opMapper.apply(this, pop, spaceFunc);
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun");
dispenser = OpFunctionComposition.wrapOptionally(
adapter,
dispenser,
pop,
dryrun,
opLookup
);
// if (strict) {
// optemplate.assertConsumed();
// }
planner.addOp((OpDispenser<? extends CycleOp<?>>) dispenser, ratio);
} catch (Exception e) {
throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e);
}
}
return planner.resolve();
} catch (Exception e) {
if (e instanceof OpConfigError oce) {
throw oce;
} else {
throw new OpConfigError(e.getMessage(), workloadSource, e);
}
}
}
private ParsedOp upconvert(
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
NBConfigModel supersetConfig,
@@ -206,7 +478,7 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
@Override
public void initActivity() {
super.initActivity();
initOrUpdateRateLimiters(this.activityDef);
setDefaultsFromOpSequence(sequence);
}
@@ -229,7 +501,6 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) {
@@ -309,5 +580,403 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
}
}
protected List<OpTemplate> loadOpTemplates(
DriverAdapter<?, ?> defaultDriverAdapter,
boolean logged,
boolean filtered
) {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
OpsDocList opsDocList = loadStmtsDocList();
List<OpTemplate> filteredOps = opsDocList.getOps(filtered?tagfilter:"", logged);
if (filteredOps.isEmpty()) {
// There were no ops, and it *wasn't* because they were all filtered out.
// In this case, let's try to synthesize the ops as long as at least a default driver was provided
// But if there were no ops, and there was no default driver provided, we can't continue
// There were no ops, and it was because they were all filtered out
List<OpTemplate> unfilteredOps = opsDocList.getOps(false);
if (!unfilteredOps.isEmpty()) {
String message = "There were no active op templates with tag filter '"+ tagfilter + "', since all " +
unfilteredOps.size() + " were filtered out. Examine the session log for details";
NBAdvisorOutput.test(message);
//throw new BasicError(message);
}
if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
filteredOps = sotp.getSyntheticOpTemplates(opsDocList, this.activityDef.getParams());
Objects.requireNonNull(filteredOps);
if (filteredOps.isEmpty()) {
throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' +
" but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
}
} else {
throw new BasicError("""
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
}
}
return filteredOps;
}
/**
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
* defaults when requested.
*
* @param seq
* - The {@link OpSequence} to derive the defaults from
*/
public synchronized void setDefaultsFromOpSequence(OpSequence<?> seq) {
Optional<String> strideOpt = getParams().getOptionalString("stride");
if (strideOpt.isEmpty()) {
String stride = String.valueOf(seq.getSequence().length);
logger.info(() -> "defaulting stride to " + stride + " (the sequence length)");
// getParams().set("stride", stride);
getParams().setSilently("stride", stride);
}
// CYCLES
Optional<String> cyclesOpt = getParams().getOptionalString("cycles");
if (cyclesOpt.isEmpty()) {
String cycles = getParams().getOptionalString("stride").orElseThrow();
logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)");
this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow());
} else {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
);
}
long stride = getParams().getOptionalLong("stride").orElseThrow();
long cycles = this.activityDef.getCycleCount();
if (cycles < stride) {
throw new RuntimeException(
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
" If this was intended, then set stride low enough to allow it."
);
}
}
long cycleCount = this.activityDef.getCycleCount();
long stride = this.activityDef.getParams().getOptionalLong("stride").orElseThrow();
if (0 < stride && 0 != cycleCount % stride) {
logger.warn(() -> "The stride does not evenly divide cycles. Only full strides will be executed," +
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
}
Optional<String> threadSpec = activityDef.getParams().getOptionalString("threads");
if (threadSpec.isPresent()) {
String spec = threadSpec.get();
int processors = Runtime.getRuntime().availableProcessors();
if ("auto".equalsIgnoreCase(spec)) {
int threads = processors * 10;
if (threads > activityDef.getCycleCount()) {
threads = (int) activityDef.getCycleCount();
logger.info("setting threads to {} (auto) [10xCORES, cycle count limited]", threads);
} else {
logger.info("setting threads to {} (auto) [10xCORES]", threads);
}
// activityDef.setThreads(threads);
activityDef.getParams().setSilently("threads", threads);
} else if (spec.toLowerCase().matches("\\d+x")) {
String multiplier = spec.substring(0, spec.length() - 1);
int threads = processors * Integer.parseInt(multiplier);
logger.info(() -> "setting threads to " + threads + " (" + multiplier + "x)");
// activityDef.setThreads(threads);
activityDef.getParams().setSilently("threads", threads);
} else if (spec.toLowerCase().matches("\\d+")) {
logger.info(() -> "setting threads to " + spec + " (direct)");
// activityDef.setThreads(Integer.parseInt(spec));
activityDef.getParams().setSilently("threads", Integer.parseInt(spec));
}
if (activityDef.getThreads() > activityDef.getCycleCount()) {
logger.warn(() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary()
+ ", you should have more cycles than threads.");
}
} else if (1000 < cycleCount) {
logger.warn(() -> "For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
}
if (0 < this.activityDef.getCycleCount() && seq.getOps().isEmpty()) {
throw new BasicError("You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity.");
}
}
/**
* Given a function that can create an op of type <O> from an OpTemplate, generate
* an indexed sequence of ready to call operations.
* <p>
* This method uses the following conventions to derive the sequence:
*
* <OL>
* <LI>If an 'op', 'stmt', or 'statement' parameter is provided, then it's value is
* taken as the only provided statement.</LI>
* <LI>If a 'yaml, or 'workload' parameter is provided, then the statements in that file
* are taken with their ratios </LI>
* <LI>Any provided tags filter is used to select only the op templates which have matching
* tags. If no tags are provided, then all the found op templates are included.</LI>
* <LI>The ratios and the 'seq' parameter are used to build a sequence of the ready operations,
* where the sequence length is the sum of the ratios.</LI>
* </OL>
*
* @param <O>
* A holder for an executable operation for the native driver used by this activity.
* @param opinit
* A function to map an OpTemplate to the executable operation form required by
* the native driver for this activity.
* @param defaultAdapter
* The adapter which will be used for any op templates with no explicit adapter
* @return The sequence of operations as determined by filtering and ratios
*/
@Deprecated(forRemoval = true)
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(
Function<OpTemplate,
OpDispenser<? extends O>> opinit, boolean strict, DriverAdapter<?, ?> defaultAdapter) {
List<OpTemplate> stmts = loadOpTemplates(defaultAdapter,true,false);
List<Long> ratios = new ArrayList<>(stmts.size());
for (OpTemplate opTemplate : stmts) {
long ratio = opTemplate.removeParamOrDefault("ratio", 1);
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
try {
for (int i = 0; i < stmts.size(); i++) {
long ratio = ratios.get(i);
OpTemplate optemplate = stmts.get(i);
OpDispenser<? extends O> driverSpecificReadyOp = opinit.apply(optemplate);
if (strict) {
optemplate.assertConsumed();
}
planner.addOp(driverSpecificReadyOp, ratio);
}
} catch (Exception e) {
throw new OpConfigError(e.getMessage(), workloadSource, e);
}
return planner.resolve();
}
protected OpsDocList loadStmtsDocList() {
try {
String op = activityDef.getParams().getOptionalString("op").orElse(null);
String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(null);
String workload = activityDef.getParams().getOptionalString("workload").orElse(null);
if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) {
throw new OpConfigError("Only op, statement, or workload may be provided, not more than one.");
}
if (workload != null && OpsLoader.isJson(workload)) {
workloadSource = "commandline: (workload/json):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.json, activityDef.getParams(), null);
} else if (workload != null && OpsLoader.isYaml(workload)) {
workloadSource = "commandline: (workload/yaml):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.yaml, activityDef.getParams(), null);
} else if (workload != null) {
return OpsLoader.loadPath(workload, activityDef.getParams(), "activities");
}
if (stmt != null) {
workloadSource = "commandline: (stmt/inline): '" + stmt + "'";
return OpsLoader.loadString(stmt, OpTemplateFormat.inline, activityDef.getParams(), null);
}
if (op != null && OpsLoader.isJson(op)) {
workloadSource = "commandline: (op/json): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null);
}
else if (op != null) {
workloadSource = "commandline: (op/inline): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null);
}
return OpsDocList.none();
} catch (Exception e) {
throw new OpConfigError("Error loading op templates: " + e, workloadSource, e);
}
}
@Override
public synchronized ProgressMeterDisplay getProgressMeter() {
if (null == this.progressMeter) {
this.progressMeter = new ActivityMetricProgressMeter(this);
}
return this.progressMeter;
}
@Override
public synchronized RunState getRunState() {
return runState;
}
@Override
public synchronized void setRunState(RunState runState) {
this.runState = runState;
if (RunState.Running == runState) {
this.startedAtMillis = System.currentTimeMillis();
}
}
@Override
public long getStartedAtMillis() {
return startedAtMillis;
}
@Override
public ActivityDef getActivityDef() {
return activityDef;
}
public String toString() {
return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.tally;
}
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
}
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
}
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
}
@Override
public RateLimiter getCycleLimiter() {
if (cycleLimiterSource!=null) {
return cycleLimiterSource.get();
} else {
return null;
}
}
@Override
public synchronized RateLimiter getStrideLimiter() {
if (strideLimiterSource!=null) {
return strideLimiterSource.get();
} else {
return null;
}
}
@Override
public RunStateTally getRunStateTally() {
return tally;
}
@Override
public ActivityWiring getWiring() {
return this.wiring;
}
@Override
public Map<String, String> asResult() {
return Map.of("activity",this.getAlias());
}
/**
* Activities with retryable operations (when specified with the retry error handler for some
* types of error), should allow the user to specify how many retries are allowed before
* giving up on the operation.
*
* @return The number of allowable retries
*/
@Override
public int getMaxTries() {
return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
}
public synchronized NBErrorHandler getErrorHandler() {
if (null == this.errorHandler) {
errorHandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics);
}
return errorHandler;
}
@Override
public void closeAutoCloseables() {
for (AutoCloseable closeable : closeables) {
logger.debug(() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
try {
closeable.close();
} catch (Exception e) {
throw new RuntimeException("Error closing " + closeable + ": " + e, e);
}
}
closeables.clear();
}
@Override
public int compareTo(Activity o) {
return getAlias().compareTo(o.getAlias());
}
@Override
public void registerAutoCloseable(AutoCloseable closeable) {
this.closeables.add(closeable);
}
@Override
public synchronized PrintWriter getConsoleOut() {
if (null == console) {
this.console = new PrintWriter(System.out, false, StandardCharsets.UTF_8);
}
return this.console;
}
@Override
public synchronized InputStream getConsoleIn() {
return System.in;
}
@Override
public void setConsoleOut(PrintWriter writer) {
this.console = writer;
}
@Override
public synchronized ErrorMetrics getExceptionMetrics() {
if (null == this.errorMetrics) {
errorMetrics = new ErrorMetrics(this);
}
return errorMetrics;
}
}

View File

@@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Activity;
@@ -27,8 +28,6 @@ import io.nosqlbench.engine.api.activityapi.core.MotorDispenser;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityimpl.CoreServices;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
import org.apache.logging.log4j.LogManager;
@@ -72,20 +71,20 @@ public class StandardActivityType<A extends StandardActivity<?,?>> {
* @return a distinct Activity instance for each call
*/
@SuppressWarnings("unchecked")
public A getActivity(final ActivityDef activityDef, final NBComponent parent) {
public A getActivity(final ActivityDef activityDef,
final NBComponent parent,
final ActivityWiring wiring) {
if (activityDef.getParams().getOptionalString("async").isPresent())
throw new RuntimeException("This driver does not support async mode yet.");
return (A) new StandardActivity(parent, activityDef);
return (A) new StandardActivity(parent, activityDef, wiring);
}
/**
* This method will be called <em>once</em> per action instance.
*
* @param activity The activity instance that will parameterize the returned ActionDispenser instance.
* @return an instance of ActionDispenser
*/
public ActionDispenser getActionDispenser(final A activity) {
public ActionDispenser getActionDispenser(final StandardActivity activity) {
return new StandardActionDispenser(activity);
}
@@ -97,28 +96,35 @@ public class StandardActivityType<A extends StandardActivity<?,?>> {
* @param activities a map of existing activities
* @return a distinct activity instance for each call
*/
public Activity getAssembledActivity(final ActivityDef activityDef, final Map<String, Activity> activities, final NBComponent parent) {
final A activity = this.getActivity(activityDef, parent);
public Activity getAssembledActivity(
final NBComponent parent, final ActivityDef activityDef,
final Map<String, Activity> activities
) {
// final A activity = this.getActivity(activityDef, parent);
ActivityWiring wiring = new ActivityWiring(activityDef);
StandardActivity activity = new StandardActivity(parent, activityDef, wiring);
final InputDispenser inputDispenser = this.getInputDispenser(activity);
if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities);
activity.setInputDispenserDelegate(inputDispenser);
wiring.setInputDispenserDelegate(inputDispenser);
final ActionDispenser actionDispenser = this.getActionDispenser(activity);
if (actionDispenser instanceof ActivitiesAware)
((ActivitiesAware) actionDispenser).setActivitiesMap(activities);
activity.setActionDispenserDelegate(actionDispenser);
wiring.setActionDispenserDelegate(actionDispenser);
final OutputDispenser outputDispenser = this.getOutputDispenser(activity).orElse(null);
final OutputDispenser outputDispenser = this.getOutputDispenser(wiring).orElse(null);
if ((null != outputDispenser) && (outputDispenser instanceof ActivitiesAware))
((ActivitiesAware) outputDispenser).setActivitiesMap(activities);
activity.setOutputDispenserDelegate(outputDispenser);
wiring.setOutputDispenserDelegate(outputDispenser);
final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser,
actionDispenser, outputDispenser);
if (motorDispenser instanceof ActivitiesAware) ((ActivitiesAware) motorDispenser).setActivitiesMap(activities);
activity.setMotorDispenserDelegate(motorDispenser);
wiring.setMotorDispenserDelegate(motorDispenser);
return activity;
return this.getActivity(activityDef,parent,wiring);
}
/**
@@ -127,8 +133,8 @@ public class StandardActivityType<A extends StandardActivity<?,?>> {
* @param activity The activity instance that will parameterize the returned MarkerDispenser instance.
* @return an instance of MarkerDispenser
*/
public Optional<OutputDispenser> getOutputDispenser(final A activity) {
return CoreServices.getOutputDispenser(activity);
public Optional<OutputDispenser> getOutputDispenser(ActivityWiring activity) {
return CoreServices.getOutputDispenser(parent, activity);
}
/**
@@ -138,12 +144,12 @@ public class StandardActivityType<A extends StandardActivity<?,?>> {
* @param activity the Activity instance which will parameterize this InputDispenser
* @return the InputDispenser for the associated activity
*/
public InputDispenser getInputDispenser(final A activity) {
public InputDispenser getInputDispenser(final StandardActivity activity) {
return CoreServices.getInputDispenser(activity);
}
public <T> MotorDispenser<T> getMotorDispenser(
final A activity,
final StandardActivity activity,
final InputDispenser inputDispenser,
final ActionDispenser actionDispenser,
final OutputDispenser outputDispenser) {

View File

@@ -21,7 +21,12 @@ import com.codahale.metrics.Timer;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*;
import io.nosqlbench.adapters.api.evalctx.CycleFunction;
import io.nosqlbench.engine.api.metrics.ExceptionHistoMetrics;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
import io.nosqlbench.nb.api.errors.ResultVerificationError;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
@@ -29,59 +34,66 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* This is the generified version of an Action. All driver adapters us this, as opposed
* to previous NB versions where it was implemented for each driver.
* <p>
* This allows the API to be consolidated so that the internal machinery of NB
* works in a very consistent and uniform way for all users and drivers.
*
* @param <A>
* The type of activity
* @param <R>
* The type of operation
*/
public class StandardAction<A extends StandardActivity<R, ?>, R extends java.util.function.LongFunction> implements SyncAction, ActivityDefObserver {
This is the generified version of an Action. All driver adapters us this, as opposed
to previous NB versions where it was implemented for each driver.
<p>
This allows the API to be consolidated so that the internal machinery of NB
works in a very consistent and uniform way for all users and drivers.
@param <A>
The type of activity
@param <R>
The type of operation */
public class StandardAction<A extends StandardActivity<R, ?>, R extends java.util.function.LongFunction> extends NBBaseComponent implements SyncAction, ActivityDefObserver {
private final static Logger logger = LogManager.getLogger("ACTION");
private final Timer executeTimer;
private final Histogram triesHistogram;
private final Timer resultSuccessTimer;
private final Timer resultTimer;
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends CycleOp<?>>> opsequence;
private final int maxTries;
private final Timer verifierTimer;
private final A activity;
public NBMetricHistogram triesHistogram;
public StandardAction(A activity, int slot) {
super(activity, NBLabels.forKV("action", StandardAction.class.getSimpleName()));
this.activity = activity;
this.opsequence = activity.getOpSequence();
this.maxTries = activity.getMaxTries();
bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram();
resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
int hdrdigits = activity.getComponentProp("hdr_digits")
.map(Integer::parseInt).orElse(3);
errorHandler = activity.getErrorHandler();
verifierTimer = activity.getInstrumentation().getOrCreateVerifierTimer();
this.verifierTimer = activity.create().timer(
"verifier",
hdrdigits,
MetricCategory.Verification,
"Time the execution of verifier code, if any"
);
}
@Override
public int runCycle(long cycle) {
OpDispenser<? extends CycleOp<?>> dispenser=null;
OpDispenser<? extends CycleOp<?>> dispenser = null;
CycleOp op = null;
try (Timer.Context ct = bindTimer.time()) {
try (Timer.Context ct = activity.bindTimer.time()) {
dispenser = opsequence.apply(cycle);
op = dispenser.getOp(cycle);
} catch (Exception e) {
throw new RuntimeException("while binding request in cycle " + cycle + " for op template named '" + (dispenser!=null?dispenser.getOpName():"NULL")+
"': " + e.getMessage(), e);
throw new RuntimeException(
"while binding request in cycle " + cycle + " for op template named '" + (dispenser != null ? dispenser.getOpName() : "NULL") + "': " + e.getMessage(),
e
);
}
int code = 0;
@@ -95,29 +107,33 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends java.uti
dispenser.onStart(cycle);
try (Timer.Context ct = executeTimer.time()) {
try (Timer.Context ct = activity.executeTimer.time()) {
result = op.apply(cycle);
// TODO: break out validation timer from execute
try (Timer.Context ignored = verifierTimer.time()) {
CycleFunction<Boolean> verifier = dispenser.getVerifier();
try {
verifier.setVariable("result", result);
verifier.setVariable("cycle",cycle);
verifier.setVariable("cycle", cycle);
Boolean isGood = verifier.apply(cycle);
if (!isGood) {
throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails());
throw new ResultVerificationError(
"result verification failed", maxTries - tries,
verifier.getExpressionDetails()
);
}
} catch (Exception e) {
throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails());
throw new ResultVerificationError(
e, maxTries - tries, verifier.getExpressionDetails());
}
}
} catch (Exception e) {
error = e;
} finally {
long nanos = System.nanoTime() - startedAt;
resultTimer.update(nanos, TimeUnit.NANOSECONDS);
activity.resultTimer.update(nanos, TimeUnit.NANOSECONDS);
if (error == null) {
resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
activity.resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
dispenser.onSuccess(cycle, nanos);
break;
} else {
@@ -130,7 +146,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends java.uti
}
}
}
triesHistogram.update(tries);
this.triesHistogram.update(tries);
if (op instanceof OpGenerator) {
logger.trace(() -> "GEN OP for cycle(" + cycle + ")");

View File

@@ -17,6 +17,9 @@
package io.nosqlbench.engine.api.util;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import java.util.Arrays;
import java.util.Map;
@@ -32,8 +35,17 @@ public class SimpleConfig {
this.params = parseParams(configdata);
}
public SimpleConfig(Activity activity, String param) {
this(activity.getParams().getOptionalString(param).orElse(""));
public SimpleConfig(Activity activity, String params) {
this(activity.getActivityDef(),params);
}
public SimpleConfig(ActivityWiring wiring, String param) {
this(wiring.getParams().getOptionalString(param).orElse(""));
}
public SimpleConfig(ActivityDef activityDef, String param) {
this(activityDef.getParams().getOptionalString(param).orElse(""));
}
public SimpleConfig(ParameterMap parameters, String param) {
this(parameters.getOptionalString(param).orElse(""));
}
private Map<String, String> parseParams(String configdata) {

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.activity;
import com.codahale.metrics.Gauge;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge;
@@ -70,6 +71,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
private final Activity activity;
private final ActivityDef activityDef;
private final RunStateTally tally;
private final MotorDispenser motorSource;
private ExecutorService executorService;
private Exception exception;
private String sessionId = "";
@@ -82,6 +84,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
public ActivityExecutor(Activity activity) {
this.activity = activity;
this.activityDef = activity.getActivityDef();
this.motorSource = activity.getWiring().getMotorDispenserDelegate();
activity.getActivityDef().getParams().addListener(this);
this.tally = activity.getRunStateTally();
}
@@ -260,7 +263,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
// Create motor slots
try {
while (motors.size() < activityDef.getThreads()) {
Motor motor = activity.getMotorDispenserDelegate().getMotor(activityDef, motors.size());
Motor motor = motorSource.getMotor(activityDef, motors.size());
logger.trace(() -> "Starting cycle motor thread:" + motor);
motors.add(motor);
}

View File

@@ -40,8 +40,8 @@ public class ActivityLoader {
public synchronized Activity loadActivity(ActivityDef activityDef, final NBComponent parent) {
activityDef= activityDef.deprecate("yaml","workload").deprecate("type","driver");
final Activity activity = new StandardActivityType<>(activityDef, parent).getAssembledActivity(activityDef, this.activityMap, parent);
this.activityMap.put(activity.getAlias(),activity);
final Activity activity =
new StandardActivityType<>(activityDef, parent).getAssembledActivity(parent, activityDef, this.activityMap);this.activityMap.put(activity.getAlias(),activity);
ActivityLoader.logger.debug("Resolved activity for alias '{}'", activityDef.getAlias());
return activity;
}

View File

@@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment;
import io.nosqlbench.engine.api.activityapi.cyclelog.outputs.cyclelog.CycleLogOutput;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -52,7 +53,7 @@ public class CycleLogInputTest {
@Test
public void testReader() {
CycleLogInput cycleLogInput = new CycleLogInput(cyclefile.getPath());
CycleLogInput cycleLogInput = new CycleLogInput(TestComponent.INSTANCE,cyclefile.getPath());
CycleSegment i1;
long c;
i1 = cycleLogInput.getInputSegment(1);

View File

@@ -16,6 +16,9 @@
package io.nosqlbench.engine.core;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.nb.api.advisor.NBAdvisorException;
@@ -24,7 +27,6 @@ import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityimpl.CoreServices;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
@@ -36,6 +38,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.concurrent.*;
import static org.assertj.core.api.Assertions.assertThat;
@@ -86,11 +89,12 @@ class ActivityExecutorTest {
synchronized void testAdvisorError() {
try {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
ActivityDef activityDef = ActivityDef.parseActivityDef(
"driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
Activity activity = new DelayedInitActivity(activityDef);
fail("Expected an Advisor exception");
} catch (NBAdvisorException e) {
StandardActivity activity = new DelayedInitActivity(activityDef);
fail("Expected an Advisor exception");
} catch (NBAdvisorException e) {
assertThat(e.toString().contains("error"));
assertThat(e.getExitCode() == 2);
}
@@ -99,19 +103,28 @@ class ActivityExecutorTest {
@Test
synchronized void testDelayedStartSanity() {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test_delayed_start;cycles=1000;initdelay=2000;");
new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
ActivityDef activityDef = ActivityDef.parseActivityDef(
"driver=diag;alias=test_delayed_start;cycles=1000;initdelay=2000;");
Optional<StandardActivityType> standardActivityType = new ActivityTypeLoader().load(
activityDef, TestComponent.INSTANCE);
Activity activity = new DelayedInitActivity(activityDef);
// Activity activity = new DelayedInitActivity(activityDef);
ActivityWiring wiring = new ActivityWiring(activityDef);
StandardActivity activity = standardActivityType.get().getActivity(
activityDef, TestComponent.INSTANCE, wiring);
final InputDispenser inputDispenser = new CoreInputDispenser(activity);
final ActionDispenser actionDispenser = new CoreActionDispenser(activity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
final ActionDispenser actionDispenser = new CoreActionDispenser(wiring);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(
TestComponent.INSTANCE, wiring).orElse(null);
MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
activity.setActionDispenserDelegate(actionDispenser);
activity.setOutputDispenserDelegate(outputDispenser);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser);
MotorDispenser<?> motorDispenser = new CoreMotorDispenser(
activity, inputDispenser,
actionDispenser, outputDispenser
);
wiring.setActionDispenserDelegate(actionDispenser);
wiring.setOutputDispenserDelegate(outputDispenser);
wiring.setInputDispenserDelegate(inputDispenser);
wiring.setMotorDispenserDelegate(motorDispenser);
ActivityExecutor activityExecutor = new ActivityExecutor(activity);
@@ -133,27 +146,34 @@ class ActivityExecutorTest {
@Test
synchronized void testNewActivityExecutor() {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test_dynamic_params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef,TestComponent.INSTANCE);
final ActivityDef activityDef = ActivityDef.parseActivityDef(
"driver=diag;alias=test_dynamic_params;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE);
ActivityWiring wiring = new ActivityWiring(activityDef);
Activity simpleActivity = new SimpleActivity(TestComponent.INSTANCE,activityDef);
StandardActivity activity = new StandardActivity(
TestComponent.INSTANCE, activityDef, wiring);
// this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef));
final InputDispenser inputDispenser = new CoreInputDispenser(activity);
final ActionDispenser actionDispenser = new CoreActionDispenser(wiring);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(
TestComponent.INSTANCE, wiring).orElse(null);
final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(
activity, inputDispenser, actionDispenser, outputDispenser);
MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
inputDispenser, actionDispenser, outputDispenser);
simpleActivity.setActionDispenserDelegate(actionDispenser);
simpleActivity.setInputDispenserDelegate(inputDispenser);
simpleActivity.setMotorDispenserDelegate(motorDispenser);
wiring.setActionDispenserDelegate(actionDispenser);
wiring.setInputDispenserDelegate(inputDispenser);
wiring.setMotorDispenserDelegate(motorDispenser);
StandardActivity simpleActivity = new StandardActivity<>(
TestComponent.INSTANCE,
activityDef, wiring
);
ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity);
activityDef.setThreads(5);
ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor);
ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(
activityExecutor);
// activityExecutor.startActivity();
@@ -162,7 +182,8 @@ class ActivityExecutorTest {
final int threadTarget = speeds[offset];
final int threadTime = speeds[offset + 1];
ActivityExecutorTest.logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
ActivityExecutorTest.logger.debug(
() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
activityDef.setThreads(threadTarget);
try {
@@ -182,13 +203,13 @@ class ActivityExecutorTest {
}
}
private MotorDispenser<?> getActivityMotorFactory(final Action lc, Input ls) {
private MotorDispenser<?> getActivityMotorFactory(final SyncAction lc, Input ls) {
return new MotorDispenser<>() {
@Override
public Motor getMotor(final ActivityDef activityDef, final int slotId) {
final Activity activity = new SimpleActivity(TestComponent.INSTANCE,activityDef);
final Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
cm.setAction(lc);
final StandardActivity activity = new StandardActivity(
TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef));
final Motor<?> cm = new CoreMotor<>(activity, slotId, ls, lc, null);
return cm;
}
};
@@ -198,7 +219,8 @@ class ActivityExecutorTest {
return new SyncAction() {
@Override
public int runCycle(final long cycle) {
ActivityExecutorTest.logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
ActivityExecutorTest.logger.info(
() -> "consuming " + cycle + ", delaying:" + delay);
try {
Thread.sleep(delay);
} catch (final InterruptedException ignored) {
@@ -209,16 +231,17 @@ class ActivityExecutorTest {
}
private static class DelayedInitActivity extends SimpleActivity {
private static class DelayedInitActivity extends StandardActivity {
private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class);
public DelayedInitActivity(final ActivityDef activityDef) {
super(TestComponent.INSTANCE,activityDef);
super(TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef));
}
@Override
public void initActivity() {
final Integer initDelay = this.activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
final Integer initDelay = this.activityDef.getParams().getOptionalInteger(
"initdelay").orElse(0);
DelayedInitActivity.logger.info(() -> "delaying for " + initDelay);
try {
Thread.sleep(initDelay);

View File

@@ -16,13 +16,15 @@
package io.nosqlbench.engine.core;
import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction;
import io.nosqlbench.nb.api.config.standard.TestComponent;
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.Motor;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput;
import org.junit.jupiter.api.Test;
@@ -37,14 +39,13 @@ public class CoreMotorTest {
@Test
public void testBasicActivityMotor() {
final Activity activity = new SimpleActivity(
new TestComponent("testing", "coremotor"),
ActivityDef.parseActivityDef("alias=foo")
);
ActivityDef activityDef = ActivityDef.parseActivityDef("alias=foo");
final StandardActivity activity = new StandardActivity<>(
new TestComponent("testing", "coremotor"), activityDef, ActivityWiring.of(activityDef));
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm = new CoreMotor(activity, 5L, lockstepper);
final AtomicLong observableAction = new AtomicLong(-3L);
cm.setAction(this.getTestConsumer(observableAction));
SyncAction action = this.getTestConsumer(observableAction);
final Motor cm = new CoreMotor(activity, 5L, lockstepper, action, null);
final Thread t = new Thread(cm);
t.setName("TestMotor");
t.start();
@@ -54,18 +55,20 @@ public class CoreMotorTest {
}
lockstepper.publishSegment(5L);
final boolean result = this.awaitCondition(atomicInteger -> 5L == atomicInteger.get(), observableAction, 5000, 100);
final boolean result = this.awaitCondition(
atomicInteger -> 5L == atomicInteger.get(), observableAction, 5000, 100);
assertThat(observableAction.get()).isEqualTo(5L);
}
@Test
public void testIteratorStride() {
SimpleActivity activity = new SimpleActivity(TestComponent.INSTANCE, "stride=3");
ActivityDef activityDef = ActivityDef.parseActivityDef("stride=3");
StandardActivity activity = new StandardActivity(
TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef));
final BlockingSegmentInput lockstepper = new BlockingSegmentInput();
final Motor cm1 = new CoreMotor(activity, 1L, lockstepper);
final AtomicLongArray ary = new AtomicLongArray(10);
final Action a1 = this.getTestArrayConsumer(ary);
cm1.setAction(a1);
final SyncAction a1 = this.getTestArrayConsumer(ary);
final Motor cm1 = new CoreMotor(activity, 1L, lockstepper, a1, null);
final Thread t1 = new Thread(cm1);
t1.setName("cm1");
@@ -109,7 +112,10 @@ public class CoreMotorTest {
}
private boolean awaitAryCondition(final Predicate<AtomicLongArray> atomicLongAryPredicate, final AtomicLongArray ary, final long millis, final long retry) {
private boolean awaitAryCondition(
final Predicate<AtomicLongArray> atomicLongAryPredicate, final AtomicLongArray ary,
final long millis, final long retry
) {
final long start = System.currentTimeMillis();
long now = start;
while (now < (start + millis)) {
@@ -124,7 +130,10 @@ public class CoreMotorTest {
return false;
}
private boolean awaitCondition(final Predicate<AtomicLong> atomicPredicate, final AtomicLong atomicInteger, final long millis, final long retry) {
private boolean awaitCondition(
final Predicate<AtomicLong> atomicPredicate, final AtomicLong atomicInteger,
final long millis, final long retry
) {
final long start = System.currentTimeMillis();
long now = start;
while (now < (start + millis)) {

View File

@@ -94,7 +94,8 @@ public class CMD_reset extends NBBaseCommand {
//TODO: This needs to be reworked, but simply calling controller.start on the flywheel results in 2
// copies of the activity running simultaneously. This is a temporary workaround.
SimFrameUtils.awaitActivity(flywheel);
flywheel.getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(), 0).run();
flywheel.getWiring().getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(),
0).run();
}
return null;