From 862ea8fd3b3ff82a2403db91b0b88275c1a1ab16 Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Thu, 19 Dec 2024 13:10:08 -0600 Subject: [PATCH] this commit fully replaces SimpleActivity with StandardActivity --- .../api/components/core/NBBaseComponent.java | 21 + .../core/NBComponentExecutionScope.java | 4 +- .../api/activityapi/core/ActionDispenser.java | 2 +- .../engine/api/activityapi/core/Activity.java | 45 +- .../core/ActivityInstrumentation.java | 84 --- .../ComponentActivityInstrumentation.java | 164 +--- .../api/activityapi/core/IActivityWiring.java | 68 ++ .../engine/api/activityapi/core/Motor.java | 8 - .../api/activityapi/core/SyncAction.java | 8 - .../core/ops/fluent/OpTrackerImpl.java | 51 +- .../progress/ActivityMetricProgressMeter.java | 15 +- .../filters/ExperimentalResultFilterType.java | 2 +- .../inputs/cyclelog/CycleLogInput.java | 32 +- .../inputs/cyclelog/CycleLogInputType.java | 12 +- .../outputs/cyclelog/CycleLogOutput.java | 26 +- .../outputs/cyclelog/CycleLogOutputType.java | 20 +- .../api/activityapi/input/InputType.java | 5 +- .../api/activityapi/output/OutputType.java | 4 +- .../engine/api/activityimpl/CoreServices.java | 18 +- .../api/activityimpl/SimpleActivity.java | 662 ----------------- .../action/CoreActionDispenser.java | 8 +- .../input/CoreInputDispenser.java | 7 +- .../input/TargetRateInputType.java | 3 +- .../marker/ContiguousOutputChunker.java | 2 +- .../api/activityimpl/motor/CoreMotor.java | 273 +++---- .../motor/CoreMotorDispenser.java | 8 +- .../activityimpl/uniform/ActivityWiring.java | 78 ++ .../uniform/StandardActionDispenser.java | 6 +- .../uniform/StandardActivity.java | 699 +++++++++++++++++- .../uniform/StandardActivityType.java | 46 +- .../uniform/actions/StandardAction.java | 84 ++- .../engine/api/util/SimpleConfig.java | 16 +- .../lifecycle/activity/ActivityExecutor.java | 5 +- .../lifecycle/activity/ActivityLoader.java | 4 +- .../inputs/cyclelog/CycleLogInputTest.java | 3 +- .../engine/core/ActivityExecutorTest.java | 99 ++- .../nosqlbench/engine/core/CoreMotorTest.java | 37 +- .../simframe/optimizers/CMD_reset.java | 3 +- 38 files changed, 1310 insertions(+), 1322 deletions(-) create mode 100644 nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/IActivityWiring.java create mode 100644 nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ActivityWiring.java diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java index 556cc6c80..96e61c224 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBBaseComponent.java @@ -114,6 +114,10 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone @Override public NBComponent detachChild(NBComponent... children) { + for (NBComponent child : children) { + logger.debug(() -> "notifyinb before detaching " + child.description() + " from " + this.description()); + child.beforeDetach(); + } for (NBComponent child : children) { logger.debug(() -> "detaching " + child.description() + " from " + this.description()); this.children.remove(child); @@ -140,11 +144,28 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone } + /// Override this method when you need to do some action within the active + /// component tree after the parent decides to detach your component, but before + /// your component loses access to the live component tree. @Override public void beforeDetach() { logger.debug("before detach " + description()); } + /// The [java.io.Closeable] and [AutoCloseable] behaviors of components are + /// explicitly managed within the core [NBComponent] implementation. Thus, components can not + /// override this method, to ensure that subtype behaviors are not orphaned. The way you can + /// add a _close_ behavior is to implement [#teardown()]. + /// + /// During component tree unwinding, each component does the following in order: + /// 1. Changes state to [NBInvokableState#CLOSING] + /// 2. calls [#close()] on every child. + /// 3. calls [#beforeDetach()]] on every child + /// 4. detaches every child. + /// 5. calls [#teardown()] on itself + /// + /// This happens recursively, and is mediated by the [#close()] method itself. + @Override public final void close() throws RuntimeException { state = (state == NBInvokableState.ERRORED) ? state : NBInvokableState.CLOSING; diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java index 0a253d992..286cd35c7 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/components/core/NBComponentExecutionScope.java @@ -29,7 +29,9 @@ public class NBComponentExecutionScope implements AutoCloseable { @Override public void close() throws RuntimeException { for (NBComponent component : components) { - component.beforeDetach(); +// This is now handled inline with [NBComponent#detachChild], which puts it after the +// out of scope notification -- this might need testing adjustments or clarification +// component.beforeDetach(); component.onEvent(new ComponentOutOfScope(component)); NBComponent parent = component.getParent(); if (parent!=null) { diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActionDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActionDispenser.java index cad178703..353880d20 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActionDispenser.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActionDispenser.java @@ -29,5 +29,5 @@ public interface ActionDispenser { * @param slot The numbered slot within the activity instance for this action. * @return A new or cached Action for the specified slot. */ - Action getAction(int slot); + SyncAction getAction(int slot); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java index e1eea06fd..36165f735 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Activity.java @@ -16,26 +16,25 @@ package io.nosqlbench.engine.api.activityapi.core; +import com.codahale.metrics.Counting; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable; import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable; -import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser; import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics; -import io.nosqlbench.engine.api.activityapi.input.InputDispenser; -import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer; import java.io.InputStream; import java.io.PrintWriter; -import java.util.function.Supplier; /** * Provides the components needed to build and run an activity a runtime. - * The easiest way to build a useful Activity is to extend {@link SimpleActivity}. + * The easiest way to build a useful Activity is to extend {@link StandardActivity}. */ public interface Activity extends Comparable, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent { @@ -64,26 +63,6 @@ public interface Activity extends Comparable, ActivityDefObserver, Pro */ void closeAutoCloseables(); - MotorDispenser getMotorDispenserDelegate(); - - void setMotorDispenserDelegate(MotorDispenser motorDispenser); - - InputDispenser getInputDispenserDelegate(); - - void setInputDispenserDelegate(InputDispenser inputDispenser); - - ActionDispenser getActionDispenserDelegate(); - - void setActionDispenserDelegate(ActionDispenser actionDispenser); - - IntPredicateDispenser getResultFilterDispenserDelegate(); - - void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser); - - OutputDispenser getMarkerDispenserDelegate(); - - void setOutputDispenserDelegate(OutputDispenser outputDispenser); - @Override RunState getRunState(); @@ -115,15 +94,6 @@ public interface Activity extends Comparable, ActivityDefObserver, Pro */ RateLimiter getStrideLimiter(); - /** - * Get or create the instrumentation needed for this activity. This provides - * a single place to find and manage, and document instrumentation that is - * uniform across all activities. - * - * @return A new or existing instrumentation object for this activity. - */ - ActivityInstrumentation getInstrumentation(); - PrintWriter getConsoleOut(); InputStream getConsoleIn(); @@ -142,11 +112,12 @@ public interface Activity extends Comparable, ActivityDefObserver, Pro // return t -> t.getClass().getSimpleName(); // } // - int getMaxTries(); + int getMaxTries(); default int getHdrDigits() { return this.getParams().getOptionalInteger("hdr_digits").orElse(4); } - RunStateTally getRunStateTally(); + ActivityWiring getWiring(); + } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityInstrumentation.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityInstrumentation.java index 9e9d7a11c..6582f6b39 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityInstrumentation.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ActivityInstrumentation.java @@ -25,13 +25,6 @@ import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge; import java.util.concurrent.Future; -/** - * All the accessors of the metrics that will be used for each activity instance. - * Implementors of this interface should ensure that the methods are synchronized - * to avoid race conditions during lazy init from callers. - * - * - */ public interface ActivityInstrumentation { NBMetricGauge getOrCreateErrorsTotal(); @@ -44,103 +37,26 @@ public interface ActivityInstrumentation { NBMetricGauge getOrCreateErrorRateTotal(); - /** - * The input timer measures how long it takes to get the cycle value to be used for - * an operation. - * @return a new or existing {@link Timer} - */ Timer getOrCreateInputTimer(); - /** - * The strides service timer measures how long it takes to complete a stride of work. - * @return a new or existing {@link Timer} - */ Timer getOrCreateStridesServiceTimer(); - /** - * The strides response timer measures the total response time from the scheduled - * time a stride should start to when it completed. Stride scheduling is only defined - * when it is implied by a stride rate limiter, so this method should return null if - * there is no strides rate limiter. - * @return a new or existing {@link Timer} if appropriate, else null - */ Timer getStridesResponseTimerOrNull(); - /** - * The cycles service timer measures how long it takes to complete a cycle of work. - * @return a new or existing {@link Timer} - */ Timer getOrCreateCyclesServiceTimer(); - /** - * The cycles response timer measures the total response time from the scheduled - * time an operation should start to when it is completed. Cycle scheduling is only defined - * when it is implied by a cycle rate limiter, so this method should return null if - * there is no cycles rate limiter. - * @return a new or existing {@link Timer} if appropriate, else null - */ Timer getCyclesResponseTimerOrNull(); - /** - * The pending ops counter keeps track of how many ops are submitted or in-flight, but - * which haven't been completed yet. - * @return a new or existing {@link Counter} - */ Counter getOrCreatePendingOpCounter(); - /** - * The bind timer keeps track of how long it takes for NoSQLBench to create an instance - * of an executable operation, given the cycle. This is usually done by using an - * {@link OpSequence} in conjunction with - * an {@link OpDispenser}. This is named for "binding - * a cycle to an operation". - * @return a new or existing {@link Timer} - */ Timer getOrCreateBindTimer(); - /** - * The execute timer keeps track of how long it takes to submit an operation to be executed - * to an underlying native driver. For asynchronous APIs, such as those which return a - * {@link Future}, this is simply the amount of time it takes to acquire the future. - * - * When possible, APIs should be used via their async methods, even if you are implementing - * a {@link SyncAction}. This allows the execute timer to measure the hand-off to the underlying API, - * and the result timer to measure the blocking calls to aquire the result. - * @return a new or existing {@link Timer} - */ Timer getOrCreateExecuteTimer(); - /** - * The result timer keeps track of how long it takes a native driver to service a request once submitted. - * This timer, in contrast to the result-success timer ({@link #getOrCreateResultSuccessTimer()}), - * is used to track all operations. That is, no matter - * whether the operation succeeds or not, it should be tracked with this timer. The scope of this timer should - * cover each attempt at an operation through a native driver. Retries are not to be combined in this measurement. - * @return a new or existing {@link Timer} - */ Timer getOrCreateResultTimer(); - /** - * The result-success timer keeps track of operations which had no exception. The measurements for this timer should - * be exactly the same values as used for the result timer ({@link #getOrCreateResultTimer()}, except that - * attempts to complete an operation which yield an exception should be excluded from the results. These two metrics - * together provide a very high level sanity check against the error-specific metrics which can be reported by - * the error handler logic. - * @return a new or existing {@link Timer} - */ Timer getOrCreateResultSuccessTimer(); - /** - * The tries histogram tracks how many tries it takes to complete an operation successfully, or not. This histogram - * does not encode whether operations were successful or not. Ideally, if every attempt to complete an operation succeeds - * on its first try, the data in this histogram should all be 1. In practice, systems which are running near their - * capacity will see a few retried operations, and systems that are substantially over-driven will see many retried - * operations. As the retries value increases the further down the percentile scale you go, you can detect system loading - * patterns which are in excess of the real-time capability of the target system. - * - * This metric should be measured around every retry loop for a native operation. - * @return a new or existing {@link Histogram} - */ Histogram getOrCreateTriesHistogram(); Timer getOrCreateVerifierTimer(); diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ComponentActivityInstrumentation.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ComponentActivityInstrumentation.java index fd7ef7fd8..3e6cf6679 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ComponentActivityInstrumentation.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ComponentActivityInstrumentation.java @@ -19,11 +19,15 @@ package io.nosqlbench.engine.api.activityapi.core; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; import io.nosqlbench.nb.api.engine.metrics.instruments.*; -public class ComponentActivityInstrumentation implements ActivityInstrumentation { +import java.util.concurrent.Future; + +public class ComponentActivityInstrumentation { private static final String WAIT_TIME = "_waittime"; private static final String SERVICE_TIME = "_servicetime"; @@ -36,7 +40,6 @@ public class ComponentActivityInstrumentation implements ActivityInstrumentation private NBMetricTimer readInputTimer; private NBMetricTimer stridesServiceTimer; private NBMetricTimer stridesResponseTimer; - private NBMetricTimer cyclesServiceTimer; private NBMetricTimer cyclesResponseTimer; private NBMetricCounter pendingOpsCounter; private NBMetricTimer bindTimer; @@ -61,90 +64,8 @@ public class ComponentActivityInstrumentation implements ActivityInstrumentation } private void initMetrics() { - readInputTimer = activity.create().timer( - "read_input", - this.hdrdigits, - MetricCategory.Internals, - "measures overhead of acquiring a cycle range for an activity thread" - ); - stridesServiceTimer = activity.create().timer( - "strides", - this.hdrdigits, - MetricCategory.Core, - "service timer for a stride, which is the same as the op sequence length by default" - ); - if (null != activity.getStrideLimiter()) { - this.stridesResponseTimer = activity.create().timer( - "strides" + ComponentActivityInstrumentation.RESPONSE_TIME, - hdrdigits, - MetricCategory.Core, - "response timer for a stride, which is the same as the op sequence length by default;" + - " response timers include scheduling delays which occur when an activity falls behind its target rate" - ); - } - this.cyclesServiceTimer = activity.create().timer( - "cycles" + ComponentActivityInstrumentation.SERVICE_TIME, - hdrdigits, - MetricCategory.Core, - "service timer for a cycle, including all of bind, execute, result and result_success;" + - " service timers measure the time between submitting a request and receiving the response" - ); - if (null != activity.getCycleLimiter()) { - this.cyclesResponseTimer = activity.create().timer( - "cycles" + ComponentActivityInstrumentation.RESPONSE_TIME, - hdrdigits, - MetricCategory.Core, - "response timer for a cycle, including all of bind, execute, result and result_success;" + - " response timers include scheduling delays which occur when an activity falls behind its target rate" - ); - } - this.pendingOpsCounter = activity.create().counter( - "pending_ops", - MetricCategory.Core, - "Indicate the number of operations which have been started, but which have not been completed." + - " This starts " - ); - this.bindTimer = activity.create().timer( - "bind", - hdrdigits, - MetricCategory.Core, - "Time the step within a cycle which binds generated data to an op template to synthesize an executable operation." - ); - this.executeTimer = activity.create().timer( - "execute", - hdrdigits, - MetricCategory.Core, - "Time how long it takes to submit a request and receive a result, including reading the result in the client." - ); - this.resultTimer = activity.create().timer( - "result", - hdrdigits, - MetricCategory.Core, - "Time how long it takes to submit a request, receive a result, including binding, reading results, " + - "and optionally verifying them, including all operations whether successful or not, for each attempted request." - ); - this.resultSuccessTimer = activity.create().timer( - "result_success", - hdrdigits, - MetricCategory.Core, - "The execution time of successful operations, which includes submitting the operation, waiting for a response, and reading the result" - ); - this.triesHistogram = activity.create().histogram( - "tries", - hdrdigits, - MetricCategory.Core, - "A histogram of all tries for an activity. Perfect results mean all quantiles return 1." + - " Slight saturation is indicated by p99 or p95 returning higher values." + - " Lower quantiles returning more than 1, or higher values at high quantiles indicate incremental overload." - ); - this.verifierTimer = activity.create().timer( - "verifier", - hdrdigits, - MetricCategory.Verification, - "Time the execution of verifier code, if any" - ); this.errorRate1m = activity.create().gauge("error_rate_1m", () -> { double result_1m_rate = this.resultTimer.getOneMinuteRate(); @@ -204,87 +125,16 @@ public class ComponentActivityInstrumentation implements ActivityInstrumentation ); } - @Override - public NBMetricGauge getOrCreateErrorsTotal() { - return this.errorsTotal; - } - @Override - public NBMetricGauge getOrCreateErrorRate1m() { - return this.errorRate1m; - } - @Override - public NBMetricGauge getOrCreateErrorRate5m() { - return this.errorRate5m; - } - @Override - public NBMetricGauge getOrCreateErrorRate15m() { - return this.errorRate15m; - } - @Override - public NBMetricGauge getOrCreateErrorRateTotal() { - return this.errorRateTotal; - } - - - @Override - public Timer getOrCreateInputTimer() { - return readInputTimer; - } - - - @Override - public Timer getOrCreateStridesServiceTimer() { - return stridesServiceTimer; - } - - @Override - public Timer getStridesResponseTimerOrNull() { - return stridesResponseTimer; - } - - - @Override - public Timer getOrCreateCyclesServiceTimer() { - return cyclesServiceTimer; - } - - @Override - public Timer getCyclesResponseTimerOrNull() { - return cyclesResponseTimer; - } - - @Override + /// The pending ops counter keeps track of how many ops are submitted or in-flight, but + /// which haven't been completed yet. public Counter getOrCreatePendingOpCounter() { return pendingOpsCounter; } - @Override public Timer getOrCreateBindTimer() { return bindTimer; } - @Override - public Timer getOrCreateExecuteTimer() { - return executeTimer; - } - @Override - public Timer getOrCreateResultTimer() { - return resultTimer; - } - @Override - public Timer getOrCreateResultSuccessTimer() { - return resultSuccessTimer; - } - - @Override - public Histogram getOrCreateTriesHistogram() { - return triesHistogram; - } - - @Override - public Timer getOrCreateVerifierTimer() { - return verifierTimer; - } } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/IActivityWiring.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/IActivityWiring.java new file mode 100644 index 000000000..004240767 --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/IActivityWiring.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.api.activityapi.core; + +import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable; +import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable; +import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser; +import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics; +import io.nosqlbench.engine.api.activityapi.input.InputDispenser; +import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; +import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter; +import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; + +import java.io.InputStream; +import java.io.PrintWriter; + +/** + * Provides the components needed to build and run an activity a runtime. + * The easiest way to build a useful Activity is to extend {@link StandardActivity}. + */ +public interface IActivityWiring extends Comparable, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent { + + ActivityDef getActivityDef(); + + MotorDispenser getMotorDispenserDelegate(); + + void setMotorDispenserDelegate(MotorDispenser motorDispenser); + + InputDispenser getInputDispenserDelegate(); + + void setInputDispenserDelegate(InputDispenser inputDispenser); + + ActionDispenser getActionDispenserDelegate(); + + void setActionDispenserDelegate(ActionDispenser actionDispenser); + + IntPredicateDispenser getResultFilterDispenserDelegate(); + + void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser); + + OutputDispenser getMarkerDispenserDelegate(); + + void setOutputDispenserDelegate(OutputDispenser outputDispenser); + + PrintWriter getConsoleOut(); + + InputStream getConsoleIn(); + + void setConsoleOut(PrintWriter writer); +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java index 107fcb1f8..e6bd8540a 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/Motor.java @@ -34,14 +34,6 @@ public interface Motor extends Runnable, Stoppable { Input getInput(); - /** - * Set the action on this motor. It will be applied to each input. - * - * @param action an instance of activityAction - * @return this ActivityMotor, for method chaining - */ - Motor setAction(Action action); - Action getAction(); /** diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/SyncAction.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/SyncAction.java index 0225e2cbe..71cb4a736 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/SyncAction.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/SyncAction.java @@ -18,14 +18,6 @@ package io.nosqlbench.engine.api.activityapi.core; public interface SyncAction extends Action { - /** - *

Apply a work function to an input value, producing an int status code.

- * The meaning of status codes is activity specific, however the values Integer.MIN_VALUE, - * and Integer.MAX_VALUE are reserved. - * - * @param cycle a long input - * @return an int status - */ default int runCycle(long cycle) { return (int) cycle % 100; } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ops/fluent/OpTrackerImpl.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ops/fluent/OpTrackerImpl.java index f119db183..fdda125b8 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ops/fluent/OpTrackerImpl.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/ops/fluent/OpTrackerImpl.java @@ -17,44 +17,50 @@ package io.nosqlbench.engine.api.activityapi.core.ops.fluent; import com.codahale.metrics.Counter; +import com.codahale.metrics.Counting; import com.codahale.metrics.Timer; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.*; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongFunction; /** - * This tracker keeps track of the state of operations associated with it. - * - * @param The payload data type of the associated Op, based on OpImpl - */ + This tracker keeps track of the state of operations associated with it. + @param + The payload data type of the associated Op, based on OpImpl */ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { private final AtomicInteger pendingOps = new AtomicInteger(0); private final String label; private final long slot; + private final Counter pendingOpsCounter; private final Timer cycleServiceTimer; private final Timer cycleResponseTimer; - private final Counter pendingOpsCounter; - private int maxPendingOps =1; + private int maxPendingOps = 1; private LongFunction cycleOpFunction; - public OpTrackerImpl(Activity activity, long slot) { + public OpTrackerImpl(StandardActivity activity, long slot) { this.slot = slot; this.label = "tracker-" + slot + "_" + activity.getAlias(); - this.pendingOpsCounter = activity.getInstrumentation().getOrCreatePendingOpCounter(); - this.cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer(); - this.cycleResponseTimer = activity.getInstrumentation().getCyclesResponseTimerOrNull(); + this.pendingOpsCounter = activity.pendingOpsCounter; + this.cycleServiceTimer = activity.cycleServiceTimer; + this.cycleResponseTimer = activity.cycleResponseTimer; } // for testing - public OpTrackerImpl(String name, int slot, Timer cycleServiceTimer, Timer cycleResponseTimer, Counter pendingOpsCounter) { + public OpTrackerImpl( + String name, int slot, Timer cycleServiceTimer, Timer cycleResponseTimer, + Counter pendingOpsCounter + ) { this.label = name; this.slot = slot; this.cycleResponseTimer = cycleResponseTimer; @@ -74,9 +80,11 @@ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { int pending = this.pendingOps.decrementAndGet(); cycleServiceTimer.update(op.getServiceTimeNanos(), TimeUnit.NANOSECONDS); - if (cycleResponseTimer !=null) { cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS); } + if (cycleResponseTimer != null) { + cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS); + } - if (pending< maxPendingOps) { + if (pending < maxPendingOps) { synchronized (this) { notify(); } @@ -88,7 +96,7 @@ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { pendingOpsCounter.dec(); int pending = this.pendingOps.decrementAndGet(); - if (pending< maxPendingOps) { + if (pending < maxPendingOps) { synchronized (this) { notify(); } @@ -97,16 +105,17 @@ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { } - @Override public void onOpFailure(FailedOp op) { pendingOpsCounter.dec(); int pending = this.pendingOps.decrementAndGet(); cycleServiceTimer.update(op.getServiceTimeNanos(), TimeUnit.NANOSECONDS); - if (cycleResponseTimer !=null) { cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS); } + if (cycleResponseTimer != null) { + cycleResponseTimer.update(op.getResponseTimeNanos(), TimeUnit.NANOSECONDS); + } - if (pending< maxPendingOps) { + if (pending < maxPendingOps) { synchronized (this) { notify(); } @@ -115,7 +124,7 @@ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { @Override public void setMaxPendingOps(int maxPendingOps) { - this.maxPendingOps =maxPendingOps; + this.maxPendingOps = maxPendingOps; synchronized (this) { notifyAll(); } @@ -123,7 +132,7 @@ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { @Override public boolean isFull() { - return this.pendingOps.intValue()>=maxPendingOps; + return this.pendingOps.intValue() >= maxPendingOps; } @Override @@ -139,7 +148,7 @@ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { @Override public TrackedOp newOp(long cycle, OpEvents strideTracker) { D opstate = cycleOpFunction.apply(cycle); - OpImpl op = new EventedOpImpl<>(this,strideTracker); + OpImpl op = new EventedOpImpl<>(this, strideTracker); op.setCycle(cycle); op.setData(opstate); return op; @@ -169,7 +178,7 @@ public class OpTrackerImpl implements OpTracker, ActivityDefObserver { @Override public void onActivityDefUpdate(ActivityDef activityDef) { - this.maxPendingOps=getMaxPendingOpsForThisThread(activityDef); + this.maxPendingOps = getMaxPendingOpsForThisThread(activityDef); } private int getMaxPendingOpsForThisThread(ActivityDef def) { diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/progress/ActivityMetricProgressMeter.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/progress/ActivityMetricProgressMeter.java index 6de029f37..ed6550ef0 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/progress/ActivityMetricProgressMeter.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/core/progress/ActivityMetricProgressMeter.java @@ -16,7 +16,10 @@ package io.nosqlbench.engine.api.activityapi.core.progress; +import com.codahale.metrics.Counting; import com.codahale.metrics.Timer; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer; import io.nosqlbench.nb.api.engine.util.Unit; import io.nosqlbench.engine.api.activityapi.core.Activity; @@ -26,14 +29,14 @@ public class ActivityMetricProgressMeter implements ProgressMeterDisplay, Comple private final Activity activity; private final Instant startInstant; - private final Timer bindTimer; - private final Timer cyclesTimer; + private final NBMetricTimer bindTimer; + private final NBMetricTimer cyclesTimer; - public ActivityMetricProgressMeter(Activity activity) { + public ActivityMetricProgressMeter(StandardActivity activity) { this.activity = activity; this.startInstant = Instant.ofEpochMilli(activity.getStartedAtMillis()); - this.bindTimer = activity.getInstrumentation().getOrCreateBindTimer(); - this.cyclesTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer(); + this.bindTimer = activity.bindTimer; + this.cyclesTimer = activity.cycleServiceTimer; } @Override @@ -60,7 +63,7 @@ public class ActivityMetricProgressMeter implements ProgressMeterDisplay, Comple @Override public double getCurrentValue() { - return activity.getInstrumentation().getOrCreateBindTimer().getCount(); + return bindTimer.getCount(); } @Override diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/filters/ExperimentalResultFilterType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/filters/ExperimentalResultFilterType.java index 614a531c4..6db163319 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/filters/ExperimentalResultFilterType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/filters/ExperimentalResultFilterType.java @@ -29,7 +29,7 @@ public interface ExperimentalResultFilterType { new SimpleServiceLoader<>(ExperimentalResultFilterType.class, Maturity.Any); default IntPredicateDispenser getFilterDispenser(Activity activity) { - SimpleConfig conf = new SimpleConfig(activity, "resultfilter"); + SimpleConfig conf = new SimpleConfig(activity.getWiring(), "resultfilter"); return getFilterDispenser(conf); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInput.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInput.java index d16eaf942..1987ae218 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInput.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInput.java @@ -16,6 +16,11 @@ package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction; +import io.nosqlbench.nb.api.components.core.NBBaseComponent; +import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.labels.NBLabeledElement; import io.nosqlbench.nb.api.labels.NBLabels; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleResultsSegment; @@ -38,24 +43,28 @@ import java.nio.channels.FileChannel; import java.util.Iterator; import java.util.function.Predicate; -public class CycleLogInput implements Input, AutoCloseable, Iterable, CanFilterResultValue, NBLabeledElement { +public class CycleLogInput extends NBBaseComponent implements Input, AutoCloseable, + Iterable + , CanFilterResultValue, NBLabeledElement { private final static Logger logger = LogManager.getLogger(CycleLogInput.class); private final Iterator cycleResultSegmentIterator; - private final NBLabeledElement parent; private RandomAccessFile raf; private MappedByteBuffer mbb; private Iterator segmentIter; private Predicate filter; - public CycleLogInput(Activity activity) { - SimpleConfig conf = new SimpleConfig(activity, "input"); - mbb = initMappedBuffer(conf.getString("file").orElse(activity.getAlias()) + ".cyclelog"); + public CycleLogInput(StandardActivity activity) { + super(activity, NBLabels.forKV("input","cyclelog")); + SimpleConfig conf = new SimpleConfig(activity.getActivityDef(), "input"); + mbb = + initMappedBuffer(conf.getString("file").orElse(activity.getActivityDef().getAlias()) + + ".cyclelog"); cycleResultSegmentIterator = iterator(); segmentIter = cycleResultSegmentIterator.next().iterator(); - this.parent = activity; } - public CycleLogInput(String filename) { + public CycleLogInput(NBComponent parent, String filename) { + super(parent, NBLabels.forKV("input","cyclelog")); File cycleFile = null; try { cycleFile = new File(filename); @@ -71,7 +80,6 @@ public class CycleLogInput implements Input, AutoCloseable, IterableIt is valid for RLE segments to be broken apart into contiguous * ranges. Any implementation should treat this as normal. */ -public class CycleLogOutput implements Output, CanFilterResultValue { +public class CycleLogOutput extends NBBaseComponent implements Output, CanFilterResultValue, Closeable { // For use in allocating file data, etc private final static Logger logger = LogManager.getLogger(CycleLogOutput.class); @@ -59,11 +65,13 @@ public class CycleLogOutput implements Output, CanFilterResultValue { private final File outputFile; private Predicate filter; - public CycleLogOutput(Activity activity) { + public CycleLogOutput(NBComponent parent, NBLabels componentOnlyLabels, ActivityWiring wiring) { + super(parent, componentOnlyLabels); - SimpleConfig conf = new SimpleConfig(activity, "output"); + SimpleConfig conf = new SimpleConfig(wiring, "output"); this.extentSizeInSpans = conf.getInteger("extentSize").orElse(1000); - this.outputFile = new File(conf.getString("file").orElse(activity.getAlias()) + ".cyclelog"); + this.outputFile = new File(conf.getString("file").orElse(wiring.getActivityDef().getAlias()) + + ".cyclelog"); targetBuffer = new CycleResultsRLEBufferTarget(extentSizeInSpans); @@ -71,6 +79,8 @@ public class CycleLogOutput implements Output, CanFilterResultValue { } public CycleLogOutput(File outputFile, int extentSizeInSpans) { + super(new NBBaseComponent(null),NBLabels.forKV("running","standalone","type", + "cycle_log_output")); this.extentSizeInSpans = extentSizeInSpans; this.outputFile = outputFile; targetBuffer = new CycleResultsRLEBufferTarget(extentSizeInSpans); @@ -131,7 +141,7 @@ public class CycleLogOutput implements Output, CanFilterResultValue { } @Override - public synchronized void close() throws Exception { + protected void teardown() { try { flush(); if (file != null) { @@ -141,9 +151,13 @@ public class CycleLogOutput implements Output, CanFilterResultValue { } } catch (Throwable t) { logger.error("Error while closing CycleLogOutput: " + t, t); - throw t; + throw new RuntimeException(t); } + } + @Override + public void beforeDetach() { + super.beforeDetach(); } private synchronized void ensureCapacity(long newCapacity) { diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/outputs/cyclelog/CycleLogOutputType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/outputs/cyclelog/CycleLogOutputType.java index 65061c3ec..3961c40be 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/outputs/cyclelog/CycleLogOutputType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/cyclelog/outputs/cyclelog/CycleLogOutputType.java @@ -22,7 +22,10 @@ import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.output.Output; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputType; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; import io.nosqlbench.nb.annotations.Service; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.labels.NBLabels; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -30,20 +33,21 @@ import org.apache.logging.log4j.Logger; public class CycleLogOutputType implements OutputType { @Override - public OutputDispenser getOutputDispenser(Activity activity) { - return new Dispenser(activity); + public OutputDispenser getOutputDispenser(NBComponent parent, ActivityWiring wiring) { + return new Dispenser(parent, wiring); } public static class Dispenser implements OutputDispenser { private final static Logger logger = LogManager.getLogger(OutputDispenser.class); private final Output output; - private final Activity activity; + private final ActivityWiring activity; - public Dispenser(Activity activity) { - this.activity = activity; - Input input = activity.getInputDispenserDelegate().getInput(0); - CycleLogOutput rleFileWriter = new CycleLogOutput(activity); + public Dispenser(NBComponent parent, ActivityWiring wiring) { + this.activity = wiring; + Input input = wiring.getInputDispenserDelegate().getInput(0); + CycleLogOutput rleFileWriter = new CycleLogOutput(parent, NBLabels.forKV("type", + "output"), wiring); // TODO: Rework this so that the contiguous marking chunker can onAfterOpStop filtering // if (input.isContiguous()) { @@ -58,7 +62,7 @@ public class CycleLogOutputType implements OutputType { new ReorderingConcurrentResultBuffer(rleFileWriter); this.output=prebuffer; // } - activity.registerAutoCloseable(output); +// wiring.registerAutoCloseable(output); } @Override diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputType.java index 248597919..960e25724 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/input/InputType.java @@ -17,7 +17,10 @@ package io.nosqlbench.engine.api.activityapi.input; import io.nosqlbench.engine.api.activityapi.core.Activity; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; import io.nosqlbench.nb.annotations.Maturity; +import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.spi.SimpleServiceLoader; public interface InputType { @@ -25,5 +28,5 @@ public interface InputType { SimpleServiceLoader FINDER = new SimpleServiceLoader<>(InputType.class, Maturity.Any); - InputDispenser getInputDispenser(Activity activity); + InputDispenser getInputDispenser(StandardActivity parent); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/output/OutputType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/output/OutputType.java index 20b4fbb26..4fb9d1c62 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/output/OutputType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/output/OutputType.java @@ -17,7 +17,9 @@ package io.nosqlbench.engine.api.activityapi.output; import io.nosqlbench.engine.api.activityapi.core.Activity; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; import io.nosqlbench.nb.annotations.Maturity; +import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.spi.SimpleServiceLoader; public interface OutputType { @@ -25,6 +27,6 @@ public interface OutputType { SimpleServiceLoader FINDER = new SimpleServiceLoader<>(OutputType.class, Maturity.Any); - OutputDispenser getOutputDispenser(Activity activity); + OutputDispenser getOutputDispenser(NBComponent parent, ActivityWiring activity); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/CoreServices.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/CoreServices.java index a6c8f218d..c862b45b3 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/CoreServices.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/CoreServices.java @@ -17,6 +17,9 @@ package io.nosqlbench.engine.api.activityimpl; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction; import io.nosqlbench.engine.api.util.SimpleConfig; import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.cyclelog.filters.ResultFilterDispenser; @@ -25,16 +28,21 @@ import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.input.InputType; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputType; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import java.util.Optional; import java.util.function.Predicate; public class CoreServices { - public static Optional getOutputDispenser(A activity) { + private static StandardActivity parent; + + public static Optional getOutputDispenser( + NBComponent parent, ActivityWiring activity) { OutputDispenser outputDispenser = new SimpleConfig(activity, "output").getString("type") .flatMap(OutputType.FINDER::get) - .map(mt -> mt.getOutputDispenser(activity)).orElse(null); + .map(mt -> mt.getOutputDispenser(parent, activity)).orElse(null); if (outputDispenser==null) { return Optional.empty(); } @@ -47,7 +55,7 @@ public class CoreServices { return Optional.ofNullable(outputDispenser); } - public static Optional> getOutputFilter(A activity) { + public static Optional> getOutputFilter(ActivityWiring activity) { String paramdata= activity.getParams().getOptionalString("of") .orElse(activity.getParams().getOptionalString("outputfilter").orElse(null)); if (paramdata==null) { @@ -64,7 +72,7 @@ public class CoreServices { // return intPredicateDispenser; // } // - public static InputDispenser getInputDispenser(A activity) { + public static InputDispenser getInputDispenser(StandardActivity activity) { String inputTypeName = new SimpleConfig(activity, "input").getString("type").orElse("atomicseq"); InputType inputType = InputType.FINDER.getOrThrow(inputTypeName); InputDispenser dispenser = inputType.getInputDispenser(activity); @@ -75,7 +83,7 @@ public class CoreServices { return dispenser; } - public static Optional> getInputFilter(A activity) { + public static Optional> getInputFilter(Activity activity) { String paramdata= activity.getParams().getOptionalString("if") .orElse(activity.getParams().getOptionalString("inputfilter").orElse(null)); if (paramdata==null) { diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java index 315334c5e..e69de29bb 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java @@ -1,662 +0,0 @@ -/* - * Copyright (c) 2022-2024 nosqlbench - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.nosqlbench.engine.api.activityimpl; - -import io.nosqlbench.adapters.api.activityconfig.OpsLoader; -import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; -import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat; -import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList; -import io.nosqlbench.adapters.api.activityimpl.OpDispenser; -import io.nosqlbench.adapters.api.activityimpl.OpLookup; -import io.nosqlbench.adapters.api.activityimpl.OpMapper; -import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; -import io.nosqlbench.adapters.api.activityimpl.uniform.Space; -import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider; -import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; -import io.nosqlbench.adapters.api.templating.ParsedOp; -import io.nosqlbench.engine.api.activityapi.core.*; -import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter; -import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; -import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser; -import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics; -import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; -import io.nosqlbench.engine.api.activityapi.input.InputDispenser; -import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; -import io.nosqlbench.engine.api.activityapi.planning.OpSequence; -import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner; -import io.nosqlbench.engine.api.activityapi.planning.SequencerType; -import io.nosqlbench.engine.api.activityapi.simrate.*; -import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; -import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult; -import io.nosqlbench.nb.api.advisor.NBAdvisorOutput; -import io.nosqlbench.nb.api.components.core.NBComponent; -import io.nosqlbench.nb.api.components.events.ParamChange; -import io.nosqlbench.nb.api.components.status.NBStatusComponent; -import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; -import io.nosqlbench.nb.api.errors.BasicError; -import io.nosqlbench.nb.api.errors.OpConfigError; -import io.nosqlbench.nb.api.labels.NBLabels; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.InputStream; -import java.io.PrintWriter; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.function.Function; -import java.util.function.LongFunction; - -/** - * A default implementation of an Activity, suitable for building upon. - */ -public class SimpleActivity extends NBStatusComponent implements Activity, InvokableResult { - private static final Logger logger = LogManager.getLogger("ACTIVITY"); - - protected ActivityDef activityDef; - private final List closeables = new ArrayList<>(); - private MotorDispenser motorDispenser; - private InputDispenser inputDispenser; - private ActionDispenser actionDispenser; - private OutputDispenser markerDispenser; - private IntPredicateDispenser resultFilterDispenser; - private RunState runState = RunState.Uninitialized; - private ThreadLocal strideLimiterSource; - private ThreadLocal cycleLimiterSource; - private ActivityInstrumentation activityInstrumentation; - private PrintWriter console; - private long startedAtMillis; - private int nameEnumerator; - private ErrorMetrics errorMetrics; - private NBErrorHandler errorHandler; - private ActivityMetricProgressMeter progressMeter; - private String workloadSource = "unspecified"; - private final RunStateTally tally = new RunStateTally(); - - public SimpleActivity(NBComponent parent, ActivityDef activityDef) { - super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels())); - this.activityDef = activityDef; - if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) { - Optional workloadOpt = activityDef.getParams().getOptionalString( - "workload", - "yaml" - ); - if (workloadOpt.isPresent()) { - activityDef.getParams().set("alias", workloadOpt.get()); - } else { - activityDef.getParams().set("alias", - activityDef.getActivityDriver().toUpperCase(Locale.ROOT) - + nameEnumerator); - nameEnumerator++; - } - } - } - - public SimpleActivity(NBComponent parent, String activityDefString) { - this(parent, ActivityDef.parseActivityDef(activityDefString)); - } - - @Override - public synchronized void initActivity() { - initOrUpdateRateLimiters(this.activityDef); - } - - public synchronized NBErrorHandler getErrorHandler() { - if (null == this.errorHandler) { - errorHandler = new NBErrorHandler( - () -> activityDef.getParams().getOptionalString("errors").orElse("stop"), - this::getExceptionMetrics); - } - return errorHandler; - } - - @Override - public synchronized RunState getRunState() { - return runState; - } - - @Override - public synchronized void setRunState(RunState runState) { - this.runState = runState; - if (RunState.Running == runState) { - this.startedAtMillis = System.currentTimeMillis(); - } - } - - @Override - public long getStartedAtMillis() { - return startedAtMillis; - } - - @Override - public final MotorDispenser getMotorDispenserDelegate() { - return motorDispenser; - } - - @Override - public final void setMotorDispenserDelegate(MotorDispenser motorDispenser) { - this.motorDispenser = motorDispenser; - } - - @Override - public final InputDispenser getInputDispenserDelegate() { - return inputDispenser; - } - - @Override - public final void setInputDispenserDelegate(InputDispenser inputDispenser) { - this.inputDispenser = inputDispenser; - } - - @Override - public final ActionDispenser getActionDispenserDelegate() { - return actionDispenser; - } - - @Override - public final void setActionDispenserDelegate(ActionDispenser actionDispenser) { - this.actionDispenser = actionDispenser; - } - - @Override - public IntPredicateDispenser getResultFilterDispenserDelegate() { - return resultFilterDispenser; - } - - @Override - public void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser) { - this.resultFilterDispenser = resultFilterDispenser; - } - - @Override - public OutputDispenser getMarkerDispenserDelegate() { - return this.markerDispenser; - } - - @Override - public void setOutputDispenserDelegate(OutputDispenser outputDispenser) { - this.markerDispenser = outputDispenser; - } - - @Override - public ActivityDef getActivityDef() { - return activityDef; - } - - public String toString() { - return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.tally; - } - - @Override - public int compareTo(Activity o) { - return getAlias().compareTo(o.getAlias()); - } - - @Override - public void registerAutoCloseable(AutoCloseable closeable) { - this.closeables.add(closeable); - } - - @Override - public void closeAutoCloseables() { - for (AutoCloseable closeable : closeables) { - logger.debug(() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable); - try { - closeable.close(); - } catch (Exception e) { - throw new RuntimeException("Error closing " + closeable + ": " + e, e); - } - } - closeables.clear(); - } - - @Override - public RateLimiter getCycleLimiter() { - if (cycleLimiterSource!=null) { - return cycleLimiterSource.get(); - } else { - return null; - } - } - @Override - public synchronized RateLimiter getStrideLimiter() { - if (strideLimiterSource!=null) { - return strideLimiterSource.get(); - } else { - return null; - } - } - - @Override - public synchronized ActivityInstrumentation getInstrumentation() { - if (null == this.activityInstrumentation) { - activityInstrumentation = new ComponentActivityInstrumentation(this); -// activityInstrumentation = new CoreActivityInstrumentation(this); - } - return activityInstrumentation; - } - - @Override - public synchronized PrintWriter getConsoleOut() { - if (null == console) { - this.console = new PrintWriter(System.out, false, StandardCharsets.UTF_8); - } - return this.console; - } - - @Override - public synchronized InputStream getConsoleIn() { - return System.in; - } - - @Override - public void setConsoleOut(PrintWriter writer) { - this.console = writer; - } - - @Override - public synchronized ErrorMetrics getExceptionMetrics() { - if (null == this.errorMetrics) { - errorMetrics = new ErrorMetrics(this); - } - return errorMetrics; - } - - @Override - public synchronized void onActivityDefUpdate(ActivityDef activityDef) { -// initOrUpdateRateLimiters(activityDef); - } - - public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) { - -// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false); - - activityDef.getParams().getOptionalNamedParameter("striderate") - .map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr))); - - activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate") - .map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr))); - - } - - public void createOrUpdateStrideLimiter(SimRateSpec spec) { - strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec); - } - - public void createOrUpdateCycleLimiter(SimRateSpec spec) { - cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec); - } - - - /** - * Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the - * length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable - * defaults when requested. - * - * @param seq - * - The {@link OpSequence} to derive the defaults from - */ - public synchronized void setDefaultsFromOpSequence(OpSequence seq) { - Optional strideOpt = getParams().getOptionalString("stride"); - if (strideOpt.isEmpty()) { - String stride = String.valueOf(seq.getSequence().length); - logger.info(() -> "defaulting stride to " + stride + " (the sequence length)"); -// getParams().set("stride", stride); - getParams().setSilently("stride", stride); - } - - // CYCLES - Optional cyclesOpt = getParams().getOptionalString("cycles"); - if (cyclesOpt.isEmpty()) { - String cycles = getParams().getOptionalString("stride").orElseThrow(); - logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)"); - this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow()); - } else { - if (0 == activityDef.getCycleCount()) { - throw new RuntimeException( - "You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles") - ); - } - long stride = getParams().getOptionalLong("stride").orElseThrow(); - long cycles = this.activityDef.getCycleCount(); - if (cycles < stride) { - throw new RuntimeException( - "The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." + - " If this was intended, then set stride low enough to allow it." - ); - } - } - - long cycleCount = this.activityDef.getCycleCount(); - long stride = this.activityDef.getParams().getOptionalLong("stride").orElseThrow(); - - if (0 < stride && 0 != cycleCount % stride) { - logger.warn(() -> "The stride does not evenly divide cycles. Only full strides will be executed," + - "leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')'); - } - - Optional threadSpec = activityDef.getParams().getOptionalString("threads"); - if (threadSpec.isPresent()) { - String spec = threadSpec.get(); - int processors = Runtime.getRuntime().availableProcessors(); - if ("auto".equalsIgnoreCase(spec)) { - int threads = processors * 10; - if (threads > activityDef.getCycleCount()) { - threads = (int) activityDef.getCycleCount(); - logger.info("setting threads to {} (auto) [10xCORES, cycle count limited]", threads); - } else { - logger.info("setting threads to {} (auto) [10xCORES]", threads); - } -// activityDef.setThreads(threads); - activityDef.getParams().setSilently("threads", threads); - } else if (spec.toLowerCase().matches("\\d+x")) { - String multiplier = spec.substring(0, spec.length() - 1); - int threads = processors * Integer.parseInt(multiplier); - logger.info(() -> "setting threads to " + threads + " (" + multiplier + "x)"); -// activityDef.setThreads(threads); - activityDef.getParams().setSilently("threads", threads); - } else if (spec.toLowerCase().matches("\\d+")) { - logger.info(() -> "setting threads to " + spec + " (direct)"); -// activityDef.setThreads(Integer.parseInt(spec)); - activityDef.getParams().setSilently("threads", Integer.parseInt(spec)); - } - - if (activityDef.getThreads() > activityDef.getCycleCount()) { - logger.warn(() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary() - + ", you should have more cycles than threads."); - } - - } else if (1000 < cycleCount) { - logger.warn(() -> "For testing at scale, it is highly recommended that you " + - "set threads to a value higher than the default of 1." + - " hint: you can use threads=auto for reasonable default, or" + - " consult the topic on threads with `help threads` for" + - " more information."); - } - - if (0 < this.activityDef.getCycleCount() && seq.getOps().isEmpty()) { - throw new BasicError("You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity."); - } - } - - - protected OpSequence>> createOpSourceFromParsedOps( -// Map> adapterCache, -// Map> mapperCache, - List, Space>> adapters, - List pops, - OpLookup opLookup - ) { - try { - - List ratios = new ArrayList<>(pops.size()); - - for (ParsedOp pop : pops) { - long ratio = pop.takeStaticConfigOr("ratio", 1); - ratios.add(ratio); - } - - SequencerType sequencerType = getParams() - .getOptionalString("seq") - .map(SequencerType::valueOf) - .orElse(SequencerType.bucket); - SequencePlanner>> planner = new SequencePlanner<>(sequencerType); - - for (int i = 0; i < pops.size(); i++) { - long ratio = ratios.get(i); - ParsedOp pop = pops.get(i); - - try { - if (0 == ratio) { - logger.info(() -> "skipped mapping op '" + pop.getName() + '\''); - continue; - } - - DriverAdapter, Space> adapter = adapters.get(i); - OpMapper, Space> opMapper = adapter.getOpMapper(); - LongFunction spaceFunc = adapter.getSpaceFunc(pop); - OpDispenser> dispenser = opMapper.apply(this, pop, spaceFunc); - String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none"); - Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun"); - - dispenser = OpFunctionComposition.wrapOptionally( - adapter, - dispenser, - pop, - dryrun, - opLookup - ); - -// if (strict) { -// optemplate.assertConsumed(); -// } - planner.addOp((OpDispenser>) dispenser, ratio); - } catch (Exception e) { - throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e); - } - } - - - return planner.resolve(); - - } catch (Exception e) { - if (e instanceof OpConfigError oce) { - throw oce; - } else { - throw new OpConfigError(e.getMessage(), workloadSource, e); - } - } - - - } - - protected List loadOpTemplates( - DriverAdapter defaultDriverAdapter, - boolean logged, - boolean filtered - ) { - - String tagfilter = activityDef.getParams().getOptionalString("tags").orElse(""); - - OpsDocList opsDocList = loadStmtsDocList(); - - List filteredOps = opsDocList.getOps(filtered?tagfilter:"", logged); - - if (filteredOps.isEmpty()) { - // There were no ops, and it *wasn't* because they were all filtered out. - // In this case, let's try to synthesize the ops as long as at least a default driver was provided - // But if there were no ops, and there was no default driver provided, we can't continue - // There were no ops, and it was because they were all filtered out - List unfilteredOps = opsDocList.getOps(false); - if (!unfilteredOps.isEmpty()) { - String message = "There were no active op templates with tag filter '"+ tagfilter + "', since all " + - unfilteredOps.size() + " were filtered out. Examine the session log for details"; - NBAdvisorOutput.test(message); - //throw new BasicError(message); - } - if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) { - filteredOps = sotp.getSyntheticOpTemplates(opsDocList, this.activityDef.getParams()); - Objects.requireNonNull(filteredOps); - if (filteredOps.isEmpty()) { - throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' + - " but no ops were created. You must provide either a workload or an op parameter. Activities require op templates."); - } - } else { - throw new BasicError(""" - No op templates were provided. You must provide one of these activity parameters: - 1) workload=some.yaml - 2) op='inline template' - 3) driver=stdout (or any other drive that can synthesize ops)"""); - } - } - return filteredOps; - } - - /** - * Given a function that can create an op of type from an OpTemplate, generate - * an indexed sequence of ready to call operations. - *

- * This method uses the following conventions to derive the sequence: - * - *

    - *
  1. If an 'op', 'stmt', or 'statement' parameter is provided, then it's value is - * taken as the only provided statement.
  2. - *
  3. If a 'yaml, or 'workload' parameter is provided, then the statements in that file - * are taken with their ratios
  4. - *
  5. Any provided tags filter is used to select only the op templates which have matching - * tags. If no tags are provided, then all the found op templates are included.
  6. - *
  7. The ratios and the 'seq' parameter are used to build a sequence of the ready operations, - * where the sequence length is the sum of the ratios.
  8. - *
- * - * @param - * A holder for an executable operation for the native driver used by this activity. - * @param opinit - * A function to map an OpTemplate to the executable operation form required by - * the native driver for this activity. - * @param defaultAdapter - * The adapter which will be used for any op templates with no explicit adapter - * @return The sequence of operations as determined by filtering and ratios - */ - @Deprecated(forRemoval = true) - protected OpSequence> createOpSequence(Function> opinit, boolean strict, DriverAdapter defaultAdapter) { - - List stmts = loadOpTemplates(defaultAdapter,true,false); - - List ratios = new ArrayList<>(stmts.size()); - - for (OpTemplate opTemplate : stmts) { - long ratio = opTemplate.removeParamOrDefault("ratio", 1); - ratios.add(ratio); - } - - SequencerType sequencerType = getParams() - .getOptionalString("seq") - .map(SequencerType::valueOf) - .orElse(SequencerType.bucket); - - SequencePlanner> planner = new SequencePlanner<>(sequencerType); - - try { - for (int i = 0; i < stmts.size(); i++) { - long ratio = ratios.get(i); - OpTemplate optemplate = stmts.get(i); - OpDispenser driverSpecificReadyOp = opinit.apply(optemplate); - if (strict) { - optemplate.assertConsumed(); - } - planner.addOp(driverSpecificReadyOp, ratio); - } - } catch (Exception e) { - throw new OpConfigError(e.getMessage(), workloadSource, e); - } - - return planner.resolve(); - } - - protected OpsDocList loadStmtsDocList() { - - try { - String op = activityDef.getParams().getOptionalString("op").orElse(null); - String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(null); - String workload = activityDef.getParams().getOptionalString("workload").orElse(null); - - if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) { - throw new OpConfigError("Only op, statement, or workload may be provided, not more than one."); - } - logger.debug("loadStmtsDocList #1"); - if (workload != null && OpsLoader.isJson(workload)) { - workloadSource = "commandline: (workload/json):" + workload; - return OpsLoader.loadString(workload, OpTemplateFormat.json, activityDef.getParams(), null); - } else if (workload != null && OpsLoader.isYaml(workload)) { - workloadSource = "commandline: (workload/yaml):" + workload; - return OpsLoader.loadString(workload, OpTemplateFormat.yaml, activityDef.getParams(), null); - } else if (workload != null) { - return OpsLoader.loadPath(workload, activityDef.getParams(), "activities"); - } - - logger.debug("loadStmtsDocList #2"); - if (stmt != null) { - workloadSource = "commandline: (stmt/inline): '" + stmt + "'"; - return OpsLoader.loadString(stmt, OpTemplateFormat.inline, activityDef.getParams(), null); - } - - logger.debug("loadStmtsDocList #3"); - if (op != null && OpsLoader.isJson(op)) { - workloadSource = "commandline: (op/json): '" + op + "'"; - return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null); - } - else if (op != null) { - workloadSource = "commandline: (op/inline): '" + op + "'"; - return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null); - } - return OpsDocList.none(); - - } catch (Exception e) { - throw new OpConfigError("Error loading op templates: " + e, workloadSource, e); - } - } - - @Override - public synchronized ProgressMeterDisplay getProgressMeter() { - if (null == this.progressMeter) { - this.progressMeter = new ActivityMetricProgressMeter(this); - } - return this.progressMeter; - } - - /** - * Activities with retryable operations (when specified with the retry error handler for some - * types of error), should allow the user to specify how many retries are allowed before - * giving up on the operation. - * - * @return The number of allowable retries - */ - @Override - public int getMaxTries() { - return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10); - } - - @Override - public RunStateTally getRunStateTally() { - return tally; - } - - - @Override - public Map asResult() { - return Map.of("activity",this.getAlias()); - } - -// private final ThreadLocal cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> { -// RateLimiters.createOrUpdate(this,null,new SimRateSpec() -// if (cycleratePerThread) { -// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,) -// } else { -// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef) -// } -// if (getCycleLimiter() != null) { -// return RateLimiters.createOrUpdate( -// new NBThreadComponent(this), -// getCycleLimiter(), -// getCycleLimiter().getSpec()); -// } else { -// return null; -// } -// }); - -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/action/CoreActionDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/action/CoreActionDispenser.java index d7e72711c..5d5e9e172 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/action/CoreActionDispenser.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/action/CoreActionDispenser.java @@ -18,6 +18,8 @@ package io.nosqlbench.engine.api.activityimpl.action; import io.nosqlbench.engine.api.activityapi.core.Action; import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; import io.nosqlbench.engine.api.activityapi.core.Activity; +import io.nosqlbench.engine.api.activityapi.core.SyncAction; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -28,14 +30,14 @@ public class CoreActionDispenser implements ActionDispenser { private final static Logger logger = LogManager.getLogger(CoreActionDispenser.class); - private final Activity activity; + private final ActivityWiring activity; - public CoreActionDispenser(Activity activity) { + public CoreActionDispenser(ActivityWiring activity) { this.activity = activity; } @Override - public Action getAction(int slot) { + public SyncAction getAction(int slot) { return new CoreAction(activity.getActivityDef(), slot); } } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/CoreInputDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/CoreInputDispenser.java index 50884907b..27a2fe8d8 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/CoreInputDispenser.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/CoreInputDispenser.java @@ -16,6 +16,7 @@ package io.nosqlbench.engine.api.activityimpl.input; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; import io.nosqlbench.engine.api.util.SimpleConfig; import io.nosqlbench.engine.api.activityapi.core.ActivitiesAware; import io.nosqlbench.engine.api.activityapi.core.Activity; @@ -27,11 +28,11 @@ import java.util.Map; public class CoreInputDispenser implements InputDispenser, ActivitiesAware { - private final Activity activity; + private final StandardActivity activity; private Map activities; private Input input; - public CoreInputDispenser(Activity activity) { + public CoreInputDispenser(StandardActivity activity) { this.activity = activity; } @@ -44,7 +45,7 @@ public class CoreInputDispenser implements InputDispenser, ActivitiesAware { } private synchronized Input createInput(long slot) { - SimpleConfig conf = new SimpleConfig(activity, "input"); + SimpleConfig conf = new SimpleConfig(activity.getActivityDef(), "input"); String inputType = conf.getString("type").orElse("atomicseq"); InputType inputTypeImpl = InputType.FINDER.getOrThrow(inputType); InputDispenser inputDispenser = inputTypeImpl.getInputDispenser(activity); diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/TargetRateInputType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/TargetRateInputType.java index 5ffb9ac40..f3967dc1c 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/TargetRateInputType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/input/TargetRateInputType.java @@ -20,13 +20,14 @@ import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.input.InputType; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; import io.nosqlbench.nb.annotations.Service; @Service(value= InputType.class, selector="atomicseq") public class TargetRateInputType implements InputType { @Override - public InputDispenser getInputDispenser(Activity activity) { + public InputDispenser getInputDispenser(StandardActivity activity) { return new Dispenser(activity); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/marker/ContiguousOutputChunker.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/marker/ContiguousOutputChunker.java index 3d91e89fa..c0ff373fe 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/marker/ContiguousOutputChunker.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/marker/ContiguousOutputChunker.java @@ -73,7 +73,7 @@ public class ContiguousOutputChunker implements Output { public ContiguousOutputChunker(Activity activity) { - if (!(activity.getInputDispenserDelegate().getInput(0).isContiguous())) { + if (!(activity.getWiring().getInputDispenserDelegate().getInput(0).isContiguous())) { throw new RuntimeException("This type of output may not be used with non-contiguous inputs yet."); // If you are looking at this code, it's because we count updates to extents to provide // efficient marker extent handling. The ability to use segmented inputs with markers will diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java index 5b150be1b..5a8184590 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotor.java @@ -25,7 +25,13 @@ import io.nosqlbench.engine.api.activityapi.core.ops.fluent.OpTracker; import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.output.Output; import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.nb.api.components.core.NBBaseComponent; +import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer; +import io.nosqlbench.nb.api.labels.NBLabels; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -35,26 +41,26 @@ import java.util.concurrent.TimeUnit; import static io.nosqlbench.engine.api.activityapi.core.RunState.*; /** - * ActivityMotor is a Runnable which runs in one of an activity's many threads. - * It is the iteration harness for individual cycles of an activity. Each ActivityMotor - * instance is responsible for taking input from a LongSupplier and applying - * the provided LongConsumer to it on each cycle. These two parameters are called - * input and action, respectively. - * - * This motor implementation splits the handling of sync and async actions with a hard - * fork in the middle to limit potential breakage of the prior sync implementation - * with new async logic. - */ -public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { + ActivityMotor is a Runnable which runs in one of an activity's many threads. + It is the iteration harness for individual cycles of an activity. Each ActivityMotor + instance is responsible for taking input from a LongSupplier and applying + the provided LongConsumer to it on each cycle. These two parameters are called + input and action, respectively. + + This motor implementation splits the handling of sync and async actions with a hard + fork in the middle to limit potential breakage of the prior sync implementation + with new async logic. */ +public class CoreMotor extends NBBaseComponent implements ActivityDefObserver, Motor, Stoppable { private static final Logger logger = LogManager.getLogger(CoreMotor.class); private final long slotId; + private final Activity activity; private Timer inputTimer; private RateLimiter strideRateLimiter; - private Timer strideServiceTimer; + private Timer stridesServiceTimer; private Timer stridesResponseTimer; private RateLimiter cycleRateLimiter; @@ -62,8 +68,8 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { private Timer cycleResponseTimer; private Input input; - private Action action; - private final Activity activity; + private SyncAction action; + // private final Activity activity; private Output output; private final MotorState motorState; @@ -72,69 +78,38 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { private OpTracker opTracker; + + /** - * Create an ActivityMotor. - * - * @param activity The activity that this motor will be associated with. - * @param slotId The enumeration of the motor, as assigned by its executor. - * @param input A LongSupplier which provides the cycle number inputs. + Create an ActivityMotor. + + // * @param activity The activity that this motor will be associated with. + @param slotId + The enumeration of the motor, as assigned by its executor. + @param input + A LongSupplier which provides the cycle number inputs. */ - public CoreMotor( - Activity activity, - long slotId, - Input input) { + public CoreMotor(StandardActivity activity, long slotId, Input input, SyncAction action, + Output output) { + super(activity, NBLabels.forKV("motor", "coremotor")); this.activity = activity; this.slotId = slotId; setInput(input); + setResultOutput(output); motorState = new MotorState(slotId, activity.getRunStateTally()); onActivityDefUpdate(activity.getActivityDef()); - } + this.action = action; + + int hdrdigits = activity.getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3); - /** - * Create an ActivityMotor. - * - * @param activity The activity that this motor is based on. - * @param slotId The enumeration of the motor, as assigned by its executor. - * @param input A LongSupplier which provides the cycle number inputs. - * @param action An LongConsumer which is applied to the input for each cycle. - */ - public CoreMotor( - Activity activity, - long slotId, - Input input, - Action action - ) { - this(activity, slotId, input); - setAction(action); } /** - * Create an ActivityMotor. - * - * @param activity The activity that this motor is based on. - * @param slotId The enumeration of the motor, as assigned by its executor. - * @param input A LongSupplier which provides the cycle number inputs. - * @param action An LongConsumer which is applied to the input for each cycle. - * @param output An optional opTracker. - */ - public CoreMotor( - Activity activity, - long slotId, - Input input, - Action action, - Output output - ) { - this(activity, slotId, input); - setAction(action); - setResultOutput(output); - } - - /** - * Set the input for this ActivityMotor. - * - * @param input The LongSupplier that provides the cycle number. - * @return this ActivityMotor, for chaining + Set the input for this ActivityMotor. + @param input + The LongSupplier that provides the cycle number. + @return this ActivityMotor, for chaining */ @Override public Motor setInput(Input input) { @@ -147,19 +122,6 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { return input; } - - /** - * Set the action for this ActivityMotor. - * - * @param action The LongConsumer that will be applied to the next cycle number. - * @return this ActivityMotor, for chaining - */ - @Override - public Motor setAction(Action action) { - this.action = action; - return this; - } - @Override public Action getAction() { return action; @@ -185,16 +147,13 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { motorState.enterState(Starting); try { - inputTimer = activity.getInstrumentation().getOrCreateInputTimer(); - strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer(); - stridesResponseTimer = activity.getInstrumentation().getStridesResponseTimerOrNull(); strideRateLimiter = activity.getStrideLimiter(); cycleRateLimiter = activity.getCycleLimiter(); - if (motorState.get() == Finished) { - logger.warn(() -> "Input was already exhausted for slot " + slotId + ", remaining in finished state."); + logger.warn( + () -> "Input was already exhausted for slot " + slotId + ", remaining in finished state."); } action.init(); @@ -211,105 +170,104 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { long strideDelay = 0L; long cycleDelay = 0L; - if (action instanceof SyncAction sync) { - cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer(); - strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer(); + motorState.enterState(Running); + while (motorState.get() == Running) { - if (activity.getActivityDef().getParams().containsKey("async")) { - throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async."); + CycleSegment cycleSegment = null; + CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride); + + try (Timer.Context inputTime = inputTimer.time()) { + cycleSegment = input.getInputSegment(stride); } - motorState.enterState(Running); - while (motorState.get() == Running) { - - CycleSegment cycleSegment = null; - CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride); - - try (Timer.Context inputTime = inputTimer.time()) { - cycleSegment = input.getInputSegment(stride); - } - - if (cycleSegment == null) { - logger.trace(() -> "input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId); - motorState.enterState(Finished); - continue; - } + if (cycleSegment == null) { + logger.trace( + () -> "input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId); + motorState.enterState(Finished); + continue; + } - if (strideRateLimiter != null) { - // block for strides rate limiter - strideDelay = strideRateLimiter.block(); - } + if (strideRateLimiter != null) { + // block for strides rate limiter + strideDelay = strideRateLimiter.block(); + } - long strideStart = System.nanoTime(); - try { + long strideStart = System.nanoTime(); + try { - while (!cycleSegment.isExhausted()) { - long cyclenum = cycleSegment.nextCycle(); - if (cyclenum < 0) { - if (cycleSegment.isExhausted()) { - logger.trace(() -> "input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId); - motorState.enterState(Finished); - continue; - } - } - - if (motorState.get() != Running) { - logger.trace(() -> "motor stopped after input (input " + cyclenum + "), stopping motor thread " + slotId); + while (!cycleSegment.isExhausted()) { + long cyclenum = cycleSegment.nextCycle(); + if (cyclenum < 0) { + if (cycleSegment.isExhausted()) { + logger.trace( + () -> "input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId); + motorState.enterState(Finished); continue; } - int result = -1; - - if (cycleRateLimiter != null) { - // Block for cycle rate limiter - cycleDelay = cycleRateLimiter.block(); - } - - long cycleStart = System.nanoTime(); - try { - logger.trace(()->"cycle " + cyclenum); - result = sync.runCycle(cyclenum); - } catch (Exception e) { - motorState.enterState(Errored); - throw e; - } finally { - long cycleEnd = System.nanoTime(); - cycleServiceTimer.update((cycleEnd - cycleStart) + cycleDelay, TimeUnit.NANOSECONDS); - } - segBuffer.append(cyclenum, result); } - } finally { - long strideEnd = System.nanoTime(); - strideServiceTimer.update((strideEnd - strideStart) + strideDelay, TimeUnit.NANOSECONDS); - } + if (motorState.get() != Running) { + logger.trace( + () -> "motor stopped after input (input " + cyclenum + "), stopping motor thread " + slotId); + continue; + } + int result = -1; - if (output != null) { - CycleResultsSegment outputBuffer = segBuffer.toReader(); + if (cycleRateLimiter != null) { + // Block for cycle rate limiter + cycleDelay = cycleRateLimiter.block(); + } + + long cycleStart = System.nanoTime(); try { - output.onCycleResultSegment(outputBuffer); - } catch (Exception t) { - logger.error(()->"Error while feeding result segment " + outputBuffer + " to output '" + output + "', error:" + t); - throw t; + logger.trace(() -> "cycle " + cyclenum); + result = action.runCycle(cyclenum); + } catch (Exception e) { + motorState.enterState(Errored); + throw e; + } finally { + long cycleEnd = System.nanoTime(); + cycleServiceTimer.update( + (cycleEnd - cycleStart) + cycleDelay, TimeUnit.NANOSECONDS); } + segBuffer.append(cyclenum, result); } + + } finally { + long strideEnd = System.nanoTime(); + stridesServiceTimer.update( + (strideEnd - strideStart) + strideDelay, + TimeUnit.NANOSECONDS + ); } - } else { - throw new RuntimeException("Valid Action implementations must implement SyncAction"); + if (output != null) { + CycleResultsSegment outputBuffer = segBuffer.toReader(); + try { + output.onCycleResultSegment(outputBuffer); + } catch (Exception t) { + logger.error( + () -> "Error while feeding result segment " + outputBuffer + " to output '" + output + "', error:" + t); + throw t; + } + } } if (motorState.get() == Stopping) { motorState.enterState(Stopped); - logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get()); + logger.trace( + () -> Thread.currentThread().getName() + " shutting down as " + motorState.get()); } else if (motorState.get() == Finished) { - logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get()); + logger.trace( + () -> Thread.currentThread().getName() + " shutting down as " + motorState.get()); } else { - logger.warn(()->"Unexpected motor state for CoreMotor shutdown: " + motorState.get()); + logger.warn( + () -> "Unexpected motor state for CoreMotor shutdown: " + motorState.get()); } } catch (Throwable t) { - logger.error(()->"Error in core motor loop:" + t, t); + logger.error(() -> "Error in core motor loop:" + t, t); motorState.enterState(Errored); throw t; } @@ -342,7 +300,8 @@ public class CoreMotor implements ActivityDefObserver, Motor, Stoppable { Stoppable.stop(input, action); motorState.enterState(Stopping); } else { - logger.warn(() -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState); + logger.warn( + () -> "attempted to stop motor " + this.getSlotId() + ": from non Running state:" + currentState); } } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java index 6a376e52f..bee86b61c 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/motor/CoreMotorDispenser.java @@ -15,6 +15,8 @@ */ package io.nosqlbench.engine.api.activityimpl.motor; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityapi.core.*; import io.nosqlbench.engine.api.activityapi.input.Input; @@ -30,12 +32,12 @@ import java.util.function.IntPredicate; */ public class CoreMotorDispenser implements MotorDispenser { - private final Activity activity; + private final StandardActivity activity; private final InputDispenser inputDispenser; private final ActionDispenser actionDispenser; private final OutputDispenser outputDispenser; - public CoreMotorDispenser(Activity activity, + public CoreMotorDispenser(StandardActivity activity, InputDispenser inputDispenser, ActionDispenser actionDispenser, OutputDispenser outputDispenser @@ -48,7 +50,7 @@ public class CoreMotorDispenser implements MotorDispenser { @Override public Motor getMotor(ActivityDef activityDef, int slotId) { - Action action = actionDispenser.getAction(slotId); + SyncAction action = actionDispenser.getAction(slotId); Input input = inputDispenser.getInput(slotId); Output output = null; if (outputDispenser !=null) { diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ActivityWiring.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ActivityWiring.java new file mode 100644 index 000000000..9010626d5 --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ActivityWiring.java @@ -0,0 +1,78 @@ +package io.nosqlbench.engine.api.activityimpl.uniform; + +import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; +import io.nosqlbench.engine.api.activityapi.core.MotorDispenser; +import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser; +import io.nosqlbench.engine.api.activityapi.input.InputDispenser; +import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; +import io.nosqlbench.nb.api.labels.NBLabels; + +public class ActivityWiring { + + private final ActivityDef activityDef; + private MotorDispenser motorDispenser; + private InputDispenser inputDispenser; + private ActionDispenser actionDispenser; + private OutputDispenser markerDispenser; + private IntPredicateDispenser resultFilterDispenser; + + public ActivityWiring(ActivityDef activityDef) { + this.activityDef = activityDef; + } + + public static ActivityWiring of(ActivityDef activityDef) { + return new ActivityWiring(activityDef); + } + + public ActivityDef getActivityDef() { + return activityDef; + } + + public final MotorDispenser getMotorDispenserDelegate() { + return motorDispenser; + } + + public final void setMotorDispenserDelegate(MotorDispenser motorDispenser) { + this.motorDispenser = motorDispenser; + } + + public final InputDispenser getInputDispenserDelegate() { + return inputDispenser; + } + + public final void setInputDispenserDelegate(InputDispenser inputDispenser) { + this.inputDispenser = inputDispenser; + } + + public final ActionDispenser getActionDispenserDelegate() { + return actionDispenser; + } + + public final void setActionDispenserDelegate(ActionDispenser actionDispenser) { + this.actionDispenser = actionDispenser; + } + + public IntPredicateDispenser getResultFilterDispenserDelegate() { + return resultFilterDispenser; + } + + public void setResultFilterDispenserDelegate(IntPredicateDispenser resultFilterDispenser) { + this.resultFilterDispenser = resultFilterDispenser; + } + + public OutputDispenser getMarkerDispenserDelegate() { + return this.markerDispenser; + } + + public void setOutputDispenserDelegate(OutputDispenser outputDispenser) { + this.markerDispenser = outputDispenser; + } + + + public ParameterMap getParams() { + return activityDef.getParams(); + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActionDispenser.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActionDispenser.java index 72ff9f1f5..f3c3974af 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActionDispenser.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActionDispenser.java @@ -21,14 +21,14 @@ import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction; public class StandardActionDispenser implements ActionDispenser { - private final StandardActivity activity; + private final StandardActivity activity; - public
StandardActionDispenser(StandardActivity activity) { + public StandardActionDispenser(StandardActivity activity) { this.activity = activity; } @Override public StandardAction getAction(int slot) { - return new StandardAction<>(activity,slot); + return new StandardAction<>(activity, slot); } } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java index 78de6de24..b6ff59ed8 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java @@ -16,9 +16,11 @@ package io.nosqlbench.engine.api.activityimpl.uniform; +import com.codahale.metrics.Timer; import io.nosqlbench.adapter.diag.DriverAdapterLoader; import io.nosqlbench.adapters.api.activityconfig.OpsLoader; import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; +import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat; import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList; import io.nosqlbench.adapters.api.activityimpl.OpDispenser; import io.nosqlbench.adapters.api.activityimpl.OpLookup; @@ -28,8 +30,26 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.Space; import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider; import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.nosqlbench.engine.api.activityapi.core.*; +import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter; +import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; +import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics; +import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; +import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner; +import io.nosqlbench.engine.api.activityapi.planning.SequencerType; +import io.nosqlbench.engine.api.activityapi.simrate.*; +import io.nosqlbench.engine.api.activityimpl.Dryrun; +import io.nosqlbench.engine.api.activityimpl.OpFunctionComposition; import io.nosqlbench.engine.api.activityimpl.OpLookupService; +import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; +import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult; +import io.nosqlbench.nb.api.advisor.NBAdvisorOutput; +import io.nosqlbench.nb.api.components.core.NBBaseComponent; +import io.nosqlbench.nb.api.components.status.NBStatusComponent; import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer; import io.nosqlbench.nb.api.lifecycle.Shutdownable; import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.config.standard.*; @@ -40,20 +60,20 @@ import io.nosqlbench.nb.api.labels.NBLabels; import io.nosqlbench.nb.api.components.events.NBEvent; import io.nosqlbench.nb.api.components.events.ParamChange; import io.nosqlbench.nb.api.components.events.SetThreads; -import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; -import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec; -import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.nb.annotations.ServiceSelector; import io.nosqlbench.nb.api.tagging.TagFilter; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.InputStream; +import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.function.Function; import java.util.function.LongFunction; -import java.util.function.Supplier; /** This is a typed activity which is expected to become the standard @@ -64,19 +84,177 @@ import java.util.function.Supplier; @param The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph */ -public class StandardActivity extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver { +public class StandardActivity extends NBStatusComponent implements Activity, InvokableResult, SyntheticOpTemplateProvider, ActivityDefObserver { private static final Logger logger = LogManager.getLogger("ACTIVITY"); private final OpSequence>> sequence; private final ConcurrentHashMap, Space>> adapters = new ConcurrentHashMap<>(); + protected final ActivityDef activityDef; - @Override - protected OpSequence>> createOpSourceFromParsedOps( - List, Space>> adapters, List pops, OpLookup opLookup) { - return super.createOpSourceFromParsedOps(adapters, pops, opLookup); - } + public final NBMetricCounter pendingOpsCounter; + public final NBMetricHistogram triesHistogram; + public final NBMetricTimer bindTimer; + public final NBMetricTimer executeTimer; + public final NBMetricTimer resultTimer; + public final NBMetricTimer resultSuccessTimer; + public final NBMetricTimer cycleServiceTimer; + public final NBMetricTimer inputTimer; + public final NBMetricTimer stridesServiceTimer; + public final NBMetricTimer stridesResponseTimer; + public final NBMetricTimer cycleResponseTimer; + + private ActivityMetricProgressMeter progressMeter; + private String workloadSource = "unspecified"; + private RunState runState = RunState.Uninitialized; + private long startedAtMillis; + private final RunStateTally tally = new RunStateTally(); + private ThreadLocal strideLimiterSource; + private ThreadLocal cycleLimiterSource; + private int nameEnumerator; + private NBErrorHandler errorHandler; + private final List closeables = new ArrayList<>(); + private PrintWriter console; + private ErrorMetrics errorMetrics; + private ActivityWiring wiring; + + private static final String WAIT_TIME = "_waittime"; + private static final String RESPONSE_TIME = "_responsetime"; + private static final String SERVICE_TIME = "_servicetime"; + + public StandardActivity(NBComponent parent, ActivityDef activityDef, ActivityWiring wiring) { + + super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels())); + this.activityDef = activityDef; + this.wiring = wiring; + + int hdrdigits = getComponentProp("hdr_digits") + .map(Integer::parseInt).orElse(3); + + + this.pendingOpsCounter = create().counter( + "pending_ops", + MetricCategory.Core, + "Indicate the number of operations which have been started, but which have not been completed." + + " This starts " + ); + + /// The bind timer keeps track of how long it takes for NoSQLBench to create an instance + /// of an executable operation, given the cycle. This is usually done by using an + /// {@link OpSequence} in conjunction with + /// an {@link OpDispenser}. This is named for "binding + /// a cycle to an operation". + this.bindTimer = create().timer( + "bind", hdrdigits, MetricCategory.Core, + "Time the step within a cycle which binds generated data to an op template to synthesize an executable operation." + ); + + /// The execute timer keeps track of how long it takes to submit an operation to be executed + /// to an underlying native driver. For asynchronous APIs, such as those which return a + /// {@link Future}, this is simply the amount of time it takes to acquire the future. + /// /// When possible, APIs should be used via their async methods, even if you are implementing + /// a {@link SyncAction}. This allows the execute timer to measure the hand-off to the underlying API, + /// and the result timer to measure the blocking calls to aquire the result. + this.executeTimer = create().timer( + "execute", + hdrdigits, + MetricCategory.Core, + "Time how long it takes to submit a request and receive a result, including reading the result in the client." + ); + + /// The cycles service timer measures how long it takes to complete a cycle of work. + this.cycleServiceTimer = create().timer( + "cycles" + SERVICE_TIME, hdrdigits, MetricCategory.Core, + "service timer for a cycle, including all of bind, execute, result and result_success;" + " service timers measure the time between submitting a request and receiving the response" + ); + + + /// The result timer keeps track of how long it takes a native driver to service a request once submitted. + /// This timer, in contrast to the result-success timer ({@link #getOrCreateResultSuccessTimer()}), + /// is used to track all operations. That is, no matter + /// whether the operation succeeds or not, it should be tracked with this timer. The scope of this timer should + /// cover each attempt at an operation through a native driver. Retries are not to be combined in this measurement. + this.resultTimer = create().timer( + "result", + hdrdigits, + MetricCategory.Core, + "Time how long it takes to submit a request, receive a result, including binding, reading results, " + + "and optionally verifying them, including all operations whether successful or not, for each attempted request." + ); + + /// The result-success timer keeps track of operations which had no exception. The measurements for this timer should + /// be exactly the same values as used for the result timer ({@link #getOrCreateResultTimer()}, except that + /// attempts to complete an operation which yield an exception should be excluded from the results. These two metrics + /// together provide a very high level sanity check against the error-specific metrics which can be reported by + /// the error handler logic. + this.resultSuccessTimer = create().timer( + "result_success", + hdrdigits, + MetricCategory.Core, + "The execution time of successful operations, which includes submitting the operation, waiting for a response, and reading the result" + ); + + /// The input timer measures how long it takes to get the cycle value to be used for + /// an operation. + this.inputTimer = create().timer( + "read_input", getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3), + MetricCategory.Internals, + "measures overhead of acquiring a cycle range for an activity thread" + ); + + /// The strides service timer measures how long it takes to complete a stride of work. + this.stridesServiceTimer = create().timer( + "strides", getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3), + MetricCategory.Core, + "service timer for a stride, which is the same as the op sequence length by default" + ); + + if (null != getStrideLimiter()) { + + /// The strides response timer measures the total response time from the scheduled + /// time a stride should start to when it completed. Stride scheduling is only defined + /// when it is implied by a stride rate limiter, so this method should return null if + /// there is no strides rate limiter. + this.stridesResponseTimer = create().timer( + "strides" + RESPONSE_TIME, hdrdigits, MetricCategory.Core, + "response timer for a stride, which is the same as the op sequence length by default;" + " response timers include scheduling delays which occur when an activity falls behind its target rate" + ); + } else { + stridesResponseTimer=null; + } + + + /** + * The cycles response timer measures the total response time from the scheduled + * time an operation should start to when it is completed. Cycle scheduling is only defined + * when it is implied by a cycle rate limiter, so this method should return null if + * there is no cycles rate limiter. + * @return a new or existing {@link Timer} if appropriate, else null + */ + if (null != getCycleLimiter()) { + this.cycleResponseTimer = create().timer( + "cycles" + RESPONSE_TIME, hdrdigits, MetricCategory.Core, + "response timer for a cycle, including all of bind, execute, result and result_success;" + " response timers include scheduling delays which occur when an activity falls behind its target rate" + ); + } else { + cycleResponseTimer=null; + } + + + + if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) { + Optional workloadOpt = activityDef.getParams().getOptionalString( + "workload", + "yaml" + ); + if (workloadOpt.isPresent()) { + activityDef.getParams().set("alias", workloadOpt.get()); + } else { + activityDef.getParams().set("alias", + activityDef.getActivityDriver().toUpperCase(Locale.ROOT) + + nameEnumerator); + nameEnumerator++; + } + } - public StandardActivity(NBComponent parent, ActivityDef activityDef) { - super(parent, activityDef); OpsDocList workload; Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); @@ -151,8 +329,102 @@ public class StandardActivity exte "ops_complete", () -> this.getProgressMeter().getSummary().complete(), MetricCategory.Core, "The current number of operations which have been completed" ); + + /// The tries histogram tracks how many tries it takes to complete an operation successfully, or not. This histogram + /// does not encode whether operations were successful or not. Ideally, if every attempt to complete an operation succeeds + /// on its first try, the data in this histogram should all be 1. In practice, systems which are running near their + /// capacity will see a few retried operations, and systems that are substantially over-driven will see many retried + /// operations. As the retries value increases the further down the percentile scale you go, you can detect system loading + /// patterns which are in excess of the real-time capability of the target system. + /// This metric should be measured around every retry loop for a native operation. + this.triesHistogram = create().histogram( + "tries", + hdrdigits, + MetricCategory.Core, + "A histogram of all tries for an activity. Perfect results mean all quantiles return 1." + + " Slight saturation is indicated by p99 or p95 returning higher values." + + " Lower quantiles returning more than 1, or higher values at high quantiles indicate incremental overload." + ); + } + + protected OpSequence>> createOpSourceFromParsedOps( + List, Space>> adapters, List pops, OpLookup opLookup) { + return createOpSourceFromParsedOps2(adapters, pops, opLookup); + } + + protected OpSequence>> createOpSourceFromParsedOps2( +// Map> adapterCache, +// Map> mapperCache, + List, Space>> adapters, + List pops, + OpLookup opLookup + ) { + try { + + List ratios = new ArrayList<>(pops.size()); + + for (ParsedOp pop : pops) { + long ratio = pop.takeStaticConfigOr("ratio", 1); + ratios.add(ratio); + } + + SequencerType sequencerType = getParams() + .getOptionalString("seq") + .map(SequencerType::valueOf) + .orElse(SequencerType.bucket); + SequencePlanner>> planner = new SequencePlanner<>(sequencerType); + + for (int i = 0; i < pops.size(); i++) { + long ratio = ratios.get(i); + ParsedOp pop = pops.get(i); + + try { + if (0 == ratio) { + logger.info(() -> "skipped mapping op '" + pop.getName() + '\''); + continue; + } + + DriverAdapter, Space> adapter = adapters.get(i); + OpMapper, Space> opMapper = adapter.getOpMapper(); + LongFunction spaceFunc = adapter.getSpaceFunc(pop); + OpDispenser> dispenser = opMapper.apply(this, pop, spaceFunc); + String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none"); + Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun"); + + dispenser = OpFunctionComposition.wrapOptionally( + adapter, + dispenser, + pop, + dryrun, + opLookup + ); + +// if (strict) { +// optemplate.assertConsumed(); +// } + planner.addOp((OpDispenser>) dispenser, ratio); + } catch (Exception e) { + throw new OpConfigError("Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(), e); + } + } + + + return planner.resolve(); + + } catch (Exception e) { + if (e instanceof OpConfigError oce) { + throw oce; + } else { + throw new OpConfigError(e.getMessage(), workloadSource, e); + } + } + + + } + + private ParsedOp upconvert( OpTemplate ot, Optional defaultDriverOption, NBConfigModel yamlmodel, NBConfigModel supersetConfig, @@ -206,7 +478,7 @@ public class StandardActivity exte @Override public void initActivity() { - super.initActivity(); + initOrUpdateRateLimiters(this.activityDef); setDefaultsFromOpSequence(sequence); } @@ -229,7 +501,6 @@ public class StandardActivity exte @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { - super.onActivityDefUpdate(activityDef); for (DriverAdapter adapter : adapters.values()) { if (adapter instanceof NBReconfigurable configurable) { @@ -309,5 +580,403 @@ public class StandardActivity exte } } + protected List loadOpTemplates( + DriverAdapter defaultDriverAdapter, + boolean logged, + boolean filtered + ) { + + String tagfilter = activityDef.getParams().getOptionalString("tags").orElse(""); + + OpsDocList opsDocList = loadStmtsDocList(); + + List filteredOps = opsDocList.getOps(filtered?tagfilter:"", logged); + + if (filteredOps.isEmpty()) { + // There were no ops, and it *wasn't* because they were all filtered out. + // In this case, let's try to synthesize the ops as long as at least a default driver was provided + // But if there were no ops, and there was no default driver provided, we can't continue + // There were no ops, and it was because they were all filtered out + List unfilteredOps = opsDocList.getOps(false); + if (!unfilteredOps.isEmpty()) { + String message = "There were no active op templates with tag filter '"+ tagfilter + "', since all " + + unfilteredOps.size() + " were filtered out. Examine the session log for details"; + NBAdvisorOutput.test(message); + //throw new BasicError(message); + } + if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) { + filteredOps = sotp.getSyntheticOpTemplates(opsDocList, this.activityDef.getParams()); + Objects.requireNonNull(filteredOps); + if (filteredOps.isEmpty()) { + throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' + + " but no ops were created. You must provide either a workload or an op parameter. Activities require op templates."); + } + } else { + throw new BasicError(""" + No op templates were provided. You must provide one of these activity parameters: + 1) workload=some.yaml + 2) op='inline template' + 3) driver=stdout (or any other drive that can synthesize ops)"""); + } + } + return filteredOps; + } + + /** + * Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the + * length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable + * defaults when requested. + * + * @param seq + * - The {@link OpSequence} to derive the defaults from + */ + public synchronized void setDefaultsFromOpSequence(OpSequence seq) { + Optional strideOpt = getParams().getOptionalString("stride"); + if (strideOpt.isEmpty()) { + String stride = String.valueOf(seq.getSequence().length); + logger.info(() -> "defaulting stride to " + stride + " (the sequence length)"); +// getParams().set("stride", stride); + getParams().setSilently("stride", stride); + } + + // CYCLES + Optional cyclesOpt = getParams().getOptionalString("cycles"); + if (cyclesOpt.isEmpty()) { + String cycles = getParams().getOptionalString("stride").orElseThrow(); + logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)"); + this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow()); + } else { + if (0 == activityDef.getCycleCount()) { + throw new RuntimeException( + "You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles") + ); + } + long stride = getParams().getOptionalLong("stride").orElseThrow(); + long cycles = this.activityDef.getCycleCount(); + if (cycles < stride) { + throw new RuntimeException( + "The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." + + " If this was intended, then set stride low enough to allow it." + ); + } + } + + long cycleCount = this.activityDef.getCycleCount(); + long stride = this.activityDef.getParams().getOptionalLong("stride").orElseThrow(); + + if (0 < stride && 0 != cycleCount % stride) { + logger.warn(() -> "The stride does not evenly divide cycles. Only full strides will be executed," + + "leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')'); + } + + Optional threadSpec = activityDef.getParams().getOptionalString("threads"); + if (threadSpec.isPresent()) { + String spec = threadSpec.get(); + int processors = Runtime.getRuntime().availableProcessors(); + if ("auto".equalsIgnoreCase(spec)) { + int threads = processors * 10; + if (threads > activityDef.getCycleCount()) { + threads = (int) activityDef.getCycleCount(); + logger.info("setting threads to {} (auto) [10xCORES, cycle count limited]", threads); + } else { + logger.info("setting threads to {} (auto) [10xCORES]", threads); + } +// activityDef.setThreads(threads); + activityDef.getParams().setSilently("threads", threads); + } else if (spec.toLowerCase().matches("\\d+x")) { + String multiplier = spec.substring(0, spec.length() - 1); + int threads = processors * Integer.parseInt(multiplier); + logger.info(() -> "setting threads to " + threads + " (" + multiplier + "x)"); +// activityDef.setThreads(threads); + activityDef.getParams().setSilently("threads", threads); + } else if (spec.toLowerCase().matches("\\d+")) { + logger.info(() -> "setting threads to " + spec + " (direct)"); +// activityDef.setThreads(Integer.parseInt(spec)); + activityDef.getParams().setSilently("threads", Integer.parseInt(spec)); + } + + if (activityDef.getThreads() > activityDef.getCycleCount()) { + logger.warn(() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary() + + ", you should have more cycles than threads."); + } + + } else if (1000 < cycleCount) { + logger.warn(() -> "For testing at scale, it is highly recommended that you " + + "set threads to a value higher than the default of 1." + + " hint: you can use threads=auto for reasonable default, or" + + " consult the topic on threads with `help threads` for" + + " more information."); + } + + if (0 < this.activityDef.getCycleCount() && seq.getOps().isEmpty()) { + throw new BasicError("You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity."); + } + } + + /** + * Given a function that can create an op of type from an OpTemplate, generate + * an indexed sequence of ready to call operations. + *

+ * This method uses the following conventions to derive the sequence: + * + *

    + *
  1. If an 'op', 'stmt', or 'statement' parameter is provided, then it's value is + * taken as the only provided statement.
  2. + *
  3. If a 'yaml, or 'workload' parameter is provided, then the statements in that file + * are taken with their ratios
  4. + *
  5. Any provided tags filter is used to select only the op templates which have matching + * tags. If no tags are provided, then all the found op templates are included.
  6. + *
  7. The ratios and the 'seq' parameter are used to build a sequence of the ready operations, + * where the sequence length is the sum of the ratios.
  8. + *
+ * + * @param + * A holder for an executable operation for the native driver used by this activity. + * @param opinit + * A function to map an OpTemplate to the executable operation form required by + * the native driver for this activity. + * @param defaultAdapter + * The adapter which will be used for any op templates with no explicit adapter + * @return The sequence of operations as determined by filtering and ratios + */ + @Deprecated(forRemoval = true) + protected OpSequence> createOpSequence( + Function> opinit, boolean strict, DriverAdapter defaultAdapter) { + + List stmts = loadOpTemplates(defaultAdapter,true,false); + + List ratios = new ArrayList<>(stmts.size()); + + for (OpTemplate opTemplate : stmts) { + long ratio = opTemplate.removeParamOrDefault("ratio", 1); + ratios.add(ratio); + } + + SequencerType sequencerType = getParams() + .getOptionalString("seq") + .map(SequencerType::valueOf) + .orElse(SequencerType.bucket); + + SequencePlanner> planner = new SequencePlanner<>(sequencerType); + + try { + for (int i = 0; i < stmts.size(); i++) { + long ratio = ratios.get(i); + OpTemplate optemplate = stmts.get(i); + OpDispenser driverSpecificReadyOp = opinit.apply(optemplate); + if (strict) { + optemplate.assertConsumed(); + } + planner.addOp(driverSpecificReadyOp, ratio); + } + } catch (Exception e) { + throw new OpConfigError(e.getMessage(), workloadSource, e); + } + + return planner.resolve(); + } + + protected OpsDocList loadStmtsDocList() { + + try { + String op = activityDef.getParams().getOptionalString("op").orElse(null); + String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(null); + String workload = activityDef.getParams().getOptionalString("workload").orElse(null); + + if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) { + throw new OpConfigError("Only op, statement, or workload may be provided, not more than one."); + } + + + if (workload != null && OpsLoader.isJson(workload)) { + workloadSource = "commandline: (workload/json):" + workload; + return OpsLoader.loadString(workload, OpTemplateFormat.json, activityDef.getParams(), null); + } else if (workload != null && OpsLoader.isYaml(workload)) { + workloadSource = "commandline: (workload/yaml):" + workload; + return OpsLoader.loadString(workload, OpTemplateFormat.yaml, activityDef.getParams(), null); + } else if (workload != null) { + return OpsLoader.loadPath(workload, activityDef.getParams(), "activities"); + } + + if (stmt != null) { + workloadSource = "commandline: (stmt/inline): '" + stmt + "'"; + return OpsLoader.loadString(stmt, OpTemplateFormat.inline, activityDef.getParams(), null); + } + + if (op != null && OpsLoader.isJson(op)) { + workloadSource = "commandline: (op/json): '" + op + "'"; + return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null); + } + else if (op != null) { + workloadSource = "commandline: (op/inline): '" + op + "'"; + return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null); + } + return OpsDocList.none(); + + } catch (Exception e) { + throw new OpConfigError("Error loading op templates: " + e, workloadSource, e); + } + } + + @Override + public synchronized ProgressMeterDisplay getProgressMeter() { + if (null == this.progressMeter) { + this.progressMeter = new ActivityMetricProgressMeter(this); + } + return this.progressMeter; + } + + @Override + public synchronized RunState getRunState() { + return runState; + } + + @Override + public synchronized void setRunState(RunState runState) { + this.runState = runState; + if (RunState.Running == runState) { + this.startedAtMillis = System.currentTimeMillis(); + } + } + + @Override + public long getStartedAtMillis() { + return startedAtMillis; + } + + @Override + public ActivityDef getActivityDef() { + return activityDef; + } + + public String toString() { + return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.tally; + } + + public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) { + +// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false); + + activityDef.getParams().getOptionalNamedParameter("striderate") + .map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr))); + + activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate") + .map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr))); + + } + + public void createOrUpdateStrideLimiter(SimRateSpec spec) { + strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec); + } + + public void createOrUpdateCycleLimiter(SimRateSpec spec) { + cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec); + } + + @Override + public RateLimiter getCycleLimiter() { + if (cycleLimiterSource!=null) { + return cycleLimiterSource.get(); + } else { + return null; + } + } + @Override + public synchronized RateLimiter getStrideLimiter() { + if (strideLimiterSource!=null) { + return strideLimiterSource.get(); + } else { + return null; + } + } + + @Override + public RunStateTally getRunStateTally() { + return tally; + } + + @Override + public ActivityWiring getWiring() { + return this.wiring; + } + + @Override + public Map asResult() { + return Map.of("activity",this.getAlias()); + } + + /** + * Activities with retryable operations (when specified with the retry error handler for some + * types of error), should allow the user to specify how many retries are allowed before + * giving up on the operation. + * + * @return The number of allowable retries + */ + @Override + public int getMaxTries() { + return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10); + } + + public synchronized NBErrorHandler getErrorHandler() { + if (null == this.errorHandler) { + errorHandler = new NBErrorHandler( + () -> activityDef.getParams().getOptionalString("errors").orElse("stop"), + this::getExceptionMetrics); + } + return errorHandler; + } + + @Override + public void closeAutoCloseables() { + for (AutoCloseable closeable : closeables) { + logger.debug(() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable); + try { + closeable.close(); + } catch (Exception e) { + throw new RuntimeException("Error closing " + closeable + ": " + e, e); + } + } + closeables.clear(); + } + + @Override + public int compareTo(Activity o) { + return getAlias().compareTo(o.getAlias()); + } + + @Override + public void registerAutoCloseable(AutoCloseable closeable) { + this.closeables.add(closeable); + } + + @Override + public synchronized PrintWriter getConsoleOut() { + if (null == console) { + this.console = new PrintWriter(System.out, false, StandardCharsets.UTF_8); + } + return this.console; + } + + @Override + public synchronized InputStream getConsoleIn() { + return System.in; + } + + @Override + public void setConsoleOut(PrintWriter writer) { + this.console = writer; + } + + @Override + public synchronized ErrorMetrics getExceptionMetrics() { + if (null == this.errorMetrics) { + errorMetrics = new ErrorMetrics(this); + } + return errorMetrics; + } + + + } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java index 861208831..2682de943 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java @@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.activityimpl.uniform; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction; import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityapi.core.Activity; @@ -27,8 +28,6 @@ import io.nosqlbench.engine.api.activityapi.core.MotorDispenser; import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityimpl.CoreServices; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; -import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser; import org.apache.logging.log4j.LogManager; @@ -72,20 +71,20 @@ public class StandardActivityType
> { * @return a distinct Activity instance for each call */ @SuppressWarnings("unchecked") - public A getActivity(final ActivityDef activityDef, final NBComponent parent) { + public A getActivity(final ActivityDef activityDef, + final NBComponent parent, + final ActivityWiring wiring) { if (activityDef.getParams().getOptionalString("async").isPresent()) throw new RuntimeException("This driver does not support async mode yet."); - return (A) new StandardActivity(parent, activityDef); + return (A) new StandardActivity(parent, activityDef, wiring); } /** * This method will be called once per action instance. - * - * @param activity The activity instance that will parameterize the returned ActionDispenser instance. * @return an instance of ActionDispenser */ - public ActionDispenser getActionDispenser(final A activity) { + public ActionDispenser getActionDispenser(final StandardActivity activity) { return new StandardActionDispenser(activity); } @@ -97,28 +96,35 @@ public class StandardActivityType> { * @param activities a map of existing activities * @return a distinct activity instance for each call */ - public Activity getAssembledActivity(final ActivityDef activityDef, final Map activities, final NBComponent parent) { - final A activity = this.getActivity(activityDef, parent); + public Activity getAssembledActivity( + final NBComponent parent, final ActivityDef activityDef, + final Map activities + ) { + // final A activity = this.getActivity(activityDef, parent); + ActivityWiring wiring = new ActivityWiring(activityDef); + StandardActivity activity = new StandardActivity(parent, activityDef, wiring); final InputDispenser inputDispenser = this.getInputDispenser(activity); if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities); - activity.setInputDispenserDelegate(inputDispenser); + wiring.setInputDispenserDelegate(inputDispenser); + final ActionDispenser actionDispenser = this.getActionDispenser(activity); if (actionDispenser instanceof ActivitiesAware) ((ActivitiesAware) actionDispenser).setActivitiesMap(activities); - activity.setActionDispenserDelegate(actionDispenser); + wiring.setActionDispenserDelegate(actionDispenser); - final OutputDispenser outputDispenser = this.getOutputDispenser(activity).orElse(null); + final OutputDispenser outputDispenser = this.getOutputDispenser(wiring).orElse(null); if ((null != outputDispenser) && (outputDispenser instanceof ActivitiesAware)) ((ActivitiesAware) outputDispenser).setActivitiesMap(activities); - activity.setOutputDispenserDelegate(outputDispenser); + wiring.setOutputDispenserDelegate(outputDispenser); - final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); + final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, + actionDispenser, outputDispenser); if (motorDispenser instanceof ActivitiesAware) ((ActivitiesAware) motorDispenser).setActivitiesMap(activities); - activity.setMotorDispenserDelegate(motorDispenser); + wiring.setMotorDispenserDelegate(motorDispenser); - return activity; + return this.getActivity(activityDef,parent,wiring); } /** @@ -127,8 +133,8 @@ public class StandardActivityType> { * @param activity The activity instance that will parameterize the returned MarkerDispenser instance. * @return an instance of MarkerDispenser */ - public Optional getOutputDispenser(final A activity) { - return CoreServices.getOutputDispenser(activity); + public Optional getOutputDispenser(ActivityWiring activity) { + return CoreServices.getOutputDispenser(parent, activity); } /** @@ -138,12 +144,12 @@ public class StandardActivityType> { * @param activity the Activity instance which will parameterize this InputDispenser * @return the InputDispenser for the associated activity */ - public InputDispenser getInputDispenser(final A activity) { + public InputDispenser getInputDispenser(final StandardActivity activity) { return CoreServices.getInputDispenser(activity); } public MotorDispenser getMotorDispenser( - final A activity, + final StandardActivity activity, final InputDispenser inputDispenser, final ActionDispenser actionDispenser, final OutputDispenser outputDispenser) { diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java index d45e01e86..fc8f0a896 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java @@ -21,7 +21,12 @@ import com.codahale.metrics.Timer; import io.nosqlbench.adapters.api.activityimpl.OpDispenser; import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.*; import io.nosqlbench.adapters.api.evalctx.CycleFunction; +import io.nosqlbench.engine.api.metrics.ExceptionHistoMetrics; +import io.nosqlbench.nb.api.components.core.NBBaseComponent; +import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; +import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram; import io.nosqlbench.nb.api.errors.ResultVerificationError; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.core.SyncAction; @@ -29,59 +34,66 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.nb.api.labels.NBLabels; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** - * This is the generified version of an Action. All driver adapters us this, as opposed - * to previous NB versions where it was implemented for each driver. - *

- * This allows the API to be consolidated so that the internal machinery of NB - * works in a very consistent and uniform way for all users and drivers. - * - * @param - * The type of activity - * @param - * The type of operation - */ -public class StandardAction, R extends java.util.function.LongFunction> implements SyncAction, ActivityDefObserver { + This is the generified version of an Action. All driver adapters us this, as opposed + to previous NB versions where it was implemented for each driver. +

+ This allows the API to be consolidated so that the internal machinery of NB + works in a very consistent and uniform way for all users and drivers. + @param + The type of activity + @param + The type of operation */ +public class StandardAction, R extends java.util.function.LongFunction> extends NBBaseComponent implements SyncAction, ActivityDefObserver { private final static Logger logger = LogManager.getLogger("ACTION"); - private final Timer executeTimer; - private final Histogram triesHistogram; - private final Timer resultSuccessTimer; - private final Timer resultTimer; - private final Timer bindTimer; private final NBErrorHandler errorHandler; private final OpSequence>> opsequence; private final int maxTries; private final Timer verifierTimer; + private final A activity; + public NBMetricHistogram triesHistogram; public StandardAction(A activity, int slot) { + super(activity, NBLabels.forKV("action", StandardAction.class.getSimpleName())); + this.activity = activity; this.opsequence = activity.getOpSequence(); this.maxTries = activity.getMaxTries(); - bindTimer = activity.getInstrumentation().getOrCreateBindTimer(); - executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer(); - triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram(); - resultTimer = activity.getInstrumentation().getOrCreateResultTimer(); - resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer(); + + int hdrdigits = activity.getComponentProp("hdr_digits") + .map(Integer::parseInt).orElse(3); + + errorHandler = activity.getErrorHandler(); - verifierTimer = activity.getInstrumentation().getOrCreateVerifierTimer(); + + this.verifierTimer = activity.create().timer( + "verifier", + hdrdigits, + MetricCategory.Verification, + "Time the execution of verifier code, if any" + ); } @Override public int runCycle(long cycle) { - OpDispenser> dispenser=null; + OpDispenser> dispenser = null; CycleOp op = null; - try (Timer.Context ct = bindTimer.time()) { + try (Timer.Context ct = activity.bindTimer.time()) { dispenser = opsequence.apply(cycle); op = dispenser.getOp(cycle); } catch (Exception e) { - throw new RuntimeException("while binding request in cycle " + cycle + " for op template named '" + (dispenser!=null?dispenser.getOpName():"NULL")+ - "': " + e.getMessage(), e); + throw new RuntimeException( + "while binding request in cycle " + cycle + " for op template named '" + (dispenser != null ? dispenser.getOpName() : "NULL") + "': " + e.getMessage(), + e + ); } int code = 0; @@ -95,29 +107,33 @@ public class StandardAction, R extends java.uti dispenser.onStart(cycle); - try (Timer.Context ct = executeTimer.time()) { + try (Timer.Context ct = activity.executeTimer.time()) { result = op.apply(cycle); // TODO: break out validation timer from execute try (Timer.Context ignored = verifierTimer.time()) { CycleFunction verifier = dispenser.getVerifier(); try { verifier.setVariable("result", result); - verifier.setVariable("cycle",cycle); + verifier.setVariable("cycle", cycle); Boolean isGood = verifier.apply(cycle); if (!isGood) { - throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails()); + throw new ResultVerificationError( + "result verification failed", maxTries - tries, + verifier.getExpressionDetails() + ); } } catch (Exception e) { - throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails()); + throw new ResultVerificationError( + e, maxTries - tries, verifier.getExpressionDetails()); } } } catch (Exception e) { error = e; } finally { long nanos = System.nanoTime() - startedAt; - resultTimer.update(nanos, TimeUnit.NANOSECONDS); + activity.resultTimer.update(nanos, TimeUnit.NANOSECONDS); if (error == null) { - resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS); + activity.resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS); dispenser.onSuccess(cycle, nanos); break; } else { @@ -130,7 +146,7 @@ public class StandardAction, R extends java.uti } } } - triesHistogram.update(tries); + this.triesHistogram.update(tries); if (op instanceof OpGenerator) { logger.trace(() -> "GEN OP for cycle(" + cycle + ")"); diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/util/SimpleConfig.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/util/SimpleConfig.java index 0a3b7f3cb..9aa600711 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/util/SimpleConfig.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/util/SimpleConfig.java @@ -17,6 +17,9 @@ package io.nosqlbench.engine.api.util; import io.nosqlbench.engine.api.activityapi.core.Activity; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; import java.util.Arrays; import java.util.Map; @@ -32,8 +35,17 @@ public class SimpleConfig { this.params = parseParams(configdata); } - public SimpleConfig(Activity activity, String param) { - this(activity.getParams().getOptionalString(param).orElse("")); + public SimpleConfig(Activity activity, String params) { + this(activity.getActivityDef(),params); + } + public SimpleConfig(ActivityWiring wiring, String param) { + this(wiring.getParams().getOptionalString(param).orElse("")); + } + public SimpleConfig(ActivityDef activityDef, String param) { + this(activityDef.getParams().getOptionalString(param).orElse("")); + } + public SimpleConfig(ParameterMap parameters, String param) { + this(parameters.getOptionalString(param).orElse("")); } private Map parseParams(String configdata) { diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java index 16c93b88d..8332937f9 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java @@ -16,6 +16,7 @@ package io.nosqlbench.engine.core.lifecycle.activity; import com.codahale.metrics.Gauge; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory; import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricGauge; @@ -70,6 +71,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener private final Activity activity; private final ActivityDef activityDef; private final RunStateTally tally; + private final MotorDispenser motorSource; private ExecutorService executorService; private Exception exception; private String sessionId = ""; @@ -82,6 +84,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener public ActivityExecutor(Activity activity) { this.activity = activity; this.activityDef = activity.getActivityDef(); + this.motorSource = activity.getWiring().getMotorDispenserDelegate(); activity.getActivityDef().getParams().addListener(this); this.tally = activity.getRunStateTally(); } @@ -260,7 +263,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener // Create motor slots try { while (motors.size() < activityDef.getThreads()) { - Motor motor = activity.getMotorDispenserDelegate().getMotor(activityDef, motors.size()); + Motor motor = motorSource.getMotor(activityDef, motors.size()); logger.trace(() -> "Starting cycle motor thread:" + motor); motors.add(motor); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityLoader.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityLoader.java index 4246ef46c..20722943b 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityLoader.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityLoader.java @@ -40,8 +40,8 @@ public class ActivityLoader { public synchronized Activity loadActivity(ActivityDef activityDef, final NBComponent parent) { activityDef= activityDef.deprecate("yaml","workload").deprecate("type","driver"); - final Activity activity = new StandardActivityType<>(activityDef, parent).getAssembledActivity(activityDef, this.activityMap, parent); - this.activityMap.put(activity.getAlias(),activity); + final Activity activity = + new StandardActivityType<>(activityDef, parent).getAssembledActivity(parent, activityDef, this.activityMap);this.activityMap.put(activity.getAlias(),activity); ActivityLoader.logger.debug("Resolved activity for alias '{}'", activityDef.getAlias()); return activity; } diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInputTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInputTest.java index acaf97a18..5011686ae 100644 --- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInputTest.java +++ b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/api/activityapi/cyclelog/inputs/cyclelog/CycleLogInputTest.java @@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.activityapi.cyclelog.inputs.cyclelog; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.CycleSegment; import io.nosqlbench.engine.api.activityapi.cyclelog.outputs.cyclelog.CycleLogOutput; +import io.nosqlbench.nb.api.config.standard.TestComponent; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -52,7 +53,7 @@ public class CycleLogInputTest { @Test public void testReader() { - CycleLogInput cycleLogInput = new CycleLogInput(cyclefile.getPath()); + CycleLogInput cycleLogInput = new CycleLogInput(TestComponent.INSTANCE,cyclefile.getPath()); CycleSegment i1; long c; i1 = cycleLogInput.getInputSegment(1); diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java index 8b76aa436..3c0ae00bb 100644 --- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java +++ b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java @@ -16,6 +16,9 @@ package io.nosqlbench.engine.core; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType; import io.nosqlbench.nb.api.config.standard.TestComponent; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.nb.api.advisor.NBAdvisorException; @@ -24,7 +27,6 @@ import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityimpl.CoreServices; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser; import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor; @@ -36,6 +38,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Test; +import java.util.Optional; import java.util.concurrent.*; import static org.assertj.core.api.Assertions.assertThat; @@ -86,11 +89,12 @@ class ActivityExecutorTest { synchronized void testAdvisorError() { try { - ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;"); + ActivityDef activityDef = ActivityDef.parseActivityDef( + "driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;"); new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE); - Activity activity = new DelayedInitActivity(activityDef); - fail("Expected an Advisor exception"); - } catch (NBAdvisorException e) { + StandardActivity activity = new DelayedInitActivity(activityDef); + fail("Expected an Advisor exception"); + } catch (NBAdvisorException e) { assertThat(e.toString().contains("error")); assertThat(e.getExitCode() == 2); } @@ -99,19 +103,28 @@ class ActivityExecutorTest { @Test synchronized void testDelayedStartSanity() { - ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test_delayed_start;cycles=1000;initdelay=2000;"); - new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE); + ActivityDef activityDef = ActivityDef.parseActivityDef( + "driver=diag;alias=test_delayed_start;cycles=1000;initdelay=2000;"); + Optional standardActivityType = new ActivityTypeLoader().load( + activityDef, TestComponent.INSTANCE); - Activity activity = new DelayedInitActivity(activityDef); +// Activity activity = new DelayedInitActivity(activityDef); + ActivityWiring wiring = new ActivityWiring(activityDef); + StandardActivity activity = standardActivityType.get().getActivity( + activityDef, TestComponent.INSTANCE, wiring); final InputDispenser inputDispenser = new CoreInputDispenser(activity); - final ActionDispenser actionDispenser = new CoreActionDispenser(activity); - final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null); + final ActionDispenser actionDispenser = new CoreActionDispenser(wiring); + final OutputDispenser outputDispenser = CoreServices.getOutputDispenser( + TestComponent.INSTANCE, wiring).orElse(null); - MotorDispenser motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); - activity.setActionDispenserDelegate(actionDispenser); - activity.setOutputDispenserDelegate(outputDispenser); - activity.setInputDispenserDelegate(inputDispenser); - activity.setMotorDispenserDelegate(motorDispenser); + MotorDispenser motorDispenser = new CoreMotorDispenser( + activity, inputDispenser, + actionDispenser, outputDispenser + ); + wiring.setActionDispenserDelegate(actionDispenser); + wiring.setOutputDispenserDelegate(outputDispenser); + wiring.setInputDispenserDelegate(inputDispenser); + wiring.setMotorDispenserDelegate(motorDispenser); ActivityExecutor activityExecutor = new ActivityExecutor(activity); @@ -133,27 +146,34 @@ class ActivityExecutorTest { @Test synchronized void testNewActivityExecutor() { - final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test_dynamic_params;cycles=1000;initdelay=5000;"); - new ActivityTypeLoader().load(activityDef,TestComponent.INSTANCE); + final ActivityDef activityDef = ActivityDef.parseActivityDef( + "driver=diag;alias=test_dynamic_params;cycles=1000;initdelay=5000;"); + new ActivityTypeLoader().load(activityDef, TestComponent.INSTANCE); + ActivityWiring wiring = new ActivityWiring(activityDef); - Activity simpleActivity = new SimpleActivity(TestComponent.INSTANCE,activityDef); + StandardActivity activity = new StandardActivity( + TestComponent.INSTANCE, activityDef, wiring); -// this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef)); + final InputDispenser inputDispenser = new CoreInputDispenser(activity); + final ActionDispenser actionDispenser = new CoreActionDispenser(wiring); + final OutputDispenser outputDispenser = CoreServices.getOutputDispenser( + TestComponent.INSTANCE, wiring).orElse(null); - final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity); - final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity); - final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null); + MotorDispenser motorDispenser = new CoreMotorDispenser<>( + activity, inputDispenser, actionDispenser, outputDispenser); - MotorDispenser motorDispenser = new CoreMotorDispenser<>(simpleActivity, - inputDispenser, actionDispenser, outputDispenser); - - simpleActivity.setActionDispenserDelegate(actionDispenser); - simpleActivity.setInputDispenserDelegate(inputDispenser); - simpleActivity.setMotorDispenserDelegate(motorDispenser); + wiring.setActionDispenserDelegate(actionDispenser); + wiring.setInputDispenserDelegate(inputDispenser); + wiring.setMotorDispenserDelegate(motorDispenser); + StandardActivity simpleActivity = new StandardActivity<>( + TestComponent.INSTANCE, + activityDef, wiring + ); ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity); activityDef.setThreads(5); - ForkJoinTask executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor); + ForkJoinTask executionResultForkJoinTask = ForkJoinPool.commonPool().submit( + activityExecutor); // activityExecutor.startActivity(); @@ -162,7 +182,8 @@ class ActivityExecutorTest { final int threadTarget = speeds[offset]; final int threadTime = speeds[offset + 1]; - ActivityExecutorTest.logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds."); + ActivityExecutorTest.logger.debug( + () -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds."); activityDef.setThreads(threadTarget); try { @@ -182,13 +203,13 @@ class ActivityExecutorTest { } } - private MotorDispenser getActivityMotorFactory(final Action lc, Input ls) { + private MotorDispenser getActivityMotorFactory(final SyncAction lc, Input ls) { return new MotorDispenser<>() { @Override public Motor getMotor(final ActivityDef activityDef, final int slotId) { - final Activity activity = new SimpleActivity(TestComponent.INSTANCE,activityDef); - final Motor cm = new CoreMotor<>(activity, slotId, ls); - cm.setAction(lc); + final StandardActivity activity = new StandardActivity( + TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef)); + final Motor cm = new CoreMotor<>(activity, slotId, ls, lc, null); return cm; } }; @@ -198,7 +219,8 @@ class ActivityExecutorTest { return new SyncAction() { @Override public int runCycle(final long cycle) { - ActivityExecutorTest.logger.info(() -> "consuming " + cycle + ", delaying:" + delay); + ActivityExecutorTest.logger.info( + () -> "consuming " + cycle + ", delaying:" + delay); try { Thread.sleep(delay); } catch (final InterruptedException ignored) { @@ -209,16 +231,17 @@ class ActivityExecutorTest { } - private static class DelayedInitActivity extends SimpleActivity { + private static class DelayedInitActivity extends StandardActivity { private static final Logger logger = LogManager.getLogger(DelayedInitActivity.class); public DelayedInitActivity(final ActivityDef activityDef) { - super(TestComponent.INSTANCE,activityDef); + super(TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef)); } @Override public void initActivity() { - final Integer initDelay = this.activityDef.getParams().getOptionalInteger("initdelay").orElse(0); + final Integer initDelay = this.activityDef.getParams().getOptionalInteger( + "initdelay").orElse(0); DelayedInitActivity.logger.info(() -> "delaying for " + initDelay); try { Thread.sleep(initDelay); diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/CoreMotorTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/CoreMotorTest.java index e210ddd23..94f82fe5d 100644 --- a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/CoreMotorTest.java +++ b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/CoreMotorTest.java @@ -16,13 +16,15 @@ package io.nosqlbench.engine.core; +import io.nosqlbench.engine.api.activityimpl.uniform.ActivityWiring; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivity; +import io.nosqlbench.engine.api.activityimpl.uniform.actions.StandardAction; import io.nosqlbench.nb.api.config.standard.TestComponent; import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityapi.core.Action; import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.core.Motor; import io.nosqlbench.engine.api.activityapi.core.SyncAction; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor; import io.nosqlbench.engine.core.fortesting.BlockingSegmentInput; import org.junit.jupiter.api.Test; @@ -37,14 +39,13 @@ public class CoreMotorTest { @Test public void testBasicActivityMotor() { - final Activity activity = new SimpleActivity( - new TestComponent("testing", "coremotor"), - ActivityDef.parseActivityDef("alias=foo") - ); + ActivityDef activityDef = ActivityDef.parseActivityDef("alias=foo"); + final StandardActivity activity = new StandardActivity<>( + new TestComponent("testing", "coremotor"), activityDef, ActivityWiring.of(activityDef)); final BlockingSegmentInput lockstepper = new BlockingSegmentInput(); - final Motor cm = new CoreMotor(activity, 5L, lockstepper); final AtomicLong observableAction = new AtomicLong(-3L); - cm.setAction(this.getTestConsumer(observableAction)); + SyncAction action = this.getTestConsumer(observableAction); + final Motor cm = new CoreMotor(activity, 5L, lockstepper, action, null); final Thread t = new Thread(cm); t.setName("TestMotor"); t.start(); @@ -54,18 +55,20 @@ public class CoreMotorTest { } lockstepper.publishSegment(5L); - final boolean result = this.awaitCondition(atomicInteger -> 5L == atomicInteger.get(), observableAction, 5000, 100); + final boolean result = this.awaitCondition( + atomicInteger -> 5L == atomicInteger.get(), observableAction, 5000, 100); assertThat(observableAction.get()).isEqualTo(5L); } @Test public void testIteratorStride() { - SimpleActivity activity = new SimpleActivity(TestComponent.INSTANCE, "stride=3"); + ActivityDef activityDef = ActivityDef.parseActivityDef("stride=3"); + StandardActivity activity = new StandardActivity( + TestComponent.INSTANCE, activityDef, ActivityWiring.of(activityDef)); final BlockingSegmentInput lockstepper = new BlockingSegmentInput(); - final Motor cm1 = new CoreMotor(activity, 1L, lockstepper); final AtomicLongArray ary = new AtomicLongArray(10); - final Action a1 = this.getTestArrayConsumer(ary); - cm1.setAction(a1); + final SyncAction a1 = this.getTestArrayConsumer(ary); + final Motor cm1 = new CoreMotor(activity, 1L, lockstepper, a1, null); final Thread t1 = new Thread(cm1); t1.setName("cm1"); @@ -109,7 +112,10 @@ public class CoreMotorTest { } - private boolean awaitAryCondition(final Predicate atomicLongAryPredicate, final AtomicLongArray ary, final long millis, final long retry) { + private boolean awaitAryCondition( + final Predicate atomicLongAryPredicate, final AtomicLongArray ary, + final long millis, final long retry + ) { final long start = System.currentTimeMillis(); long now = start; while (now < (start + millis)) { @@ -124,7 +130,10 @@ public class CoreMotorTest { return false; } - private boolean awaitCondition(final Predicate atomicPredicate, final AtomicLong atomicInteger, final long millis, final long retry) { + private boolean awaitCondition( + final Predicate atomicPredicate, final AtomicLong atomicInteger, + final long millis, final long retry + ) { final long start = System.currentTimeMillis(); long now = start; while (now < (start + millis)) { diff --git a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java index 4fa690440..c2196d90d 100644 --- a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java +++ b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimizers/CMD_reset.java @@ -94,7 +94,8 @@ public class CMD_reset extends NBBaseCommand { //TODO: This needs to be reworked, but simply calling controller.start on the flywheel results in 2 // copies of the activity running simultaneously. This is a temporary workaround. SimFrameUtils.awaitActivity(flywheel); - flywheel.getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(), 0).run(); + flywheel.getWiring().getMotorDispenserDelegate().getMotor(flywheel.getActivityDef(), + 0).run(); } return null;