diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java index 4c7fd1c90..09932a207 100644 --- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java @@ -27,14 +27,11 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -public abstract class BaseDriverAdapter implements DriverAdapter, NBConfigurable { +public abstract class BaseDriverAdapter implements DriverAdapter, NBConfigurable, NBReconfigurable { private DriverSpaceCache spaceCache; private NBConfiguration cfg; - protected BaseDriverAdapter() { - } - /** * BaseDriverAdapter will take any provided functions from {@link #getOpStmtRemappers()} * and {@link #getOpFieldRemappers()} and construct a preprocessor list. These are applied @@ -72,12 +69,34 @@ public abstract class BaseDriverAdapter implements DriverAdapter * in the op template. These are useful, for example, for taking the 'stmt' field * and parsing it into component fields which might otherwise be specified by the user. * This allows users to specify String-form op templates which are automatically - * destructured into the canonical field-wise form for a given type of operation.

+ * parsed and destructured into the canonical field-wise form for a given type of + * operation.

+ * *
+ * *

Each function in this list is applied in order. If the function returns a value, * then the 'stmt' field is removed and the resulting map is added to the other * fields in the op template.

* + *
+ * + *

If a driver adapter is meant to support the {@code stmt} field, then this + * must be provided. The use of the stmt field should be documented in + * the driver adapter user docs with examples for any supported formats. A default + * implementation does nothing, as it must be decided per-driver whether or not + * the stmt field will be used directly or whether it is short-hand for a more + * canonical form. + * + *
+ * + *

If you want to automatically destructure stmt values into a map and inject + * its entries into the op template fields, simply provide an implementation + * like this:

+     * {@code
+     *     return List.of(stmt -> Optional.of(NBParams.one(stmt).getMap()));
+     * }
+     * 

+ * * @return A list of optionally applied remapping functions. */ public List>>> getOpStmtRemappers() { @@ -112,6 +131,12 @@ public abstract class BaseDriverAdapter implements DriverAdapter this.cfg = cfg; } + @Override + public void applyReconfig(NBConfiguration reconf) { + this.cfg = getReconfigModel().apply(reconf.getMap()); + } + + /** * In order to be provided with config information, it is required * that the driver adapter specify the valid configuration options, @@ -123,7 +148,6 @@ public abstract class BaseDriverAdapter implements DriverAdapter .add(Param.optional("alias")) .add(Param.defaultTo("strict",true,"strict op field mode, which requires that provided op fields are recognized and used")) .add(Param.optional(List.of("op", "stmt", "statement"), String.class, "op template in statement form")) - .add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file")) .add(Param.optional("tags", String.class, "tags to be used to filter operations")) .add(Param.defaultTo("errors", "stop", "error handler configuration")) .add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool")) @@ -134,6 +158,16 @@ public abstract class BaseDriverAdapter implements DriverAdapter .add(Param.optional("phaserate", String.class, "rate limit for phases per second")) .add(Param.optional("seq", String.class, "sequencing algorithm")) .add(Param.optional("instrument", Boolean.class)) + .add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file")) + .asReadOnly(); + } + + @Override + public NBConfigModel getReconfigModel() { + return ConfigModel.of(BaseDriverAdapter.class) + .add(Param.optional("threads").setRegex("\\d+|\\d+x|auto").setDescription("number of concurrent operations, controlled by threadpool")) + .add(Param.optional("striderate", String.class, "rate limit for strides per second")) + .add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second")) .asReadOnly(); } diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java index 5fd99129c..fc37c9403 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/SimpleActivity.java @@ -19,13 +19,11 @@ package io.nosqlbench.engine.api.activityimpl; import com.codahale.metrics.Timer; import io.nosqlbench.engine.api.activityapi.core.*; import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter; -import io.nosqlbench.engine.api.activityapi.core.progress.InputProgressMeter; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeter; import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser; import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; -import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; @@ -102,7 +100,7 @@ public class SimpleActivity implements Activity, ProgressCapable { @Override public void initActivity() { - onActivityDefUpdate(this.activityDef); + initOrUpdateRateLimiters(this.activityDef); } public synchronized NBErrorHandler getErrorHandler() { @@ -268,7 +266,7 @@ public class SimpleActivity implements Activity, ProgressCapable { @Override public Timer getResultTimer() { - return ActivityMetrics.timer(getActivityDef(), "result"); + return ActivityMetrics.timer(getActivityDef(), "result", getParams().getOptionalInteger("hdr_digits").orElse(4)); } @Override @@ -320,6 +318,10 @@ public class SimpleActivity implements Activity, ProgressCapable { @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { + initOrUpdateRateLimiters(activityDef); + } + + public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) { activityDef.getParams().getOptionalNamedParameter("striderate") .map(RateSpec::new) @@ -347,14 +349,16 @@ public class SimpleActivity implements Activity, ProgressCapable { if (strideOpt.isEmpty()) { String stride = String.valueOf(seq.getSequence().length); logger.info("defaulting stride to " + stride + " (the sequence length)"); - getParams().set("stride", stride); +// getParams().set("stride", stride); + getParams().setSilently("stride",stride); } Optional cyclesOpt = getParams().getOptionalString("cycles"); if (cyclesOpt.isEmpty()) { String cycles = getParams().getOptionalString("stride").orElseThrow(); logger.info("defaulting cycles to " + cycles + " (the stride length)"); - getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow()); +// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow()); + getParams().setSilently("cycles", getParams().getOptionalString("stride").orElseThrow()); } else { if (getActivityDef().getCycleCount() == 0) { throw new RuntimeException( @@ -391,15 +395,18 @@ public class SimpleActivity implements Activity, ProgressCapable { } else { logger.info("setting threads to " + threads + " (auto) [10xCORES]"); } - activityDef.setThreads(threads); +// activityDef.setThreads(threads); + activityDef.getParams().setSilently("threads",threads); } else if (spec.toLowerCase().matches("\\d+x")) { String multiplier = spec.substring(0, spec.length() - 1); int threads = processors * Integer.parseInt(multiplier); logger.info("setting threads to " + threads + " (" + multiplier + "x)"); - activityDef.setThreads(threads); +// activityDef.setThreads(threads); + activityDef.getParams().setSilently("threads",threads); } else if (spec.toLowerCase().matches("\\d+")) { logger.info("setting threads to " + spec + " (direct)"); - activityDef.setThreads(Integer.parseInt(spec)); +// activityDef.setThreads(Integer.parseInt(spec)); + activityDef.getParams().setSilently("threads",Integer.parseInt(spec)); } if (activityDef.getThreads() > activityDef.getCycleCount()) { @@ -563,4 +570,8 @@ public class SimpleActivity implements Activity, ProgressCapable { } + @Override + public String getName() { + return this.activityDef.getAlias(); + } } diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java index 312f49633..c28438624 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java @@ -17,15 +17,22 @@ package io.nosqlbench.engine.api.activityimpl.uniform; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; +import io.nosqlbench.engine.api.activityconfig.StatementsLoader; +import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.OpMapper; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op; +import io.nosqlbench.nb.api.config.standard.*; import io.nosqlbench.nb.api.errors.OpConfigError; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; /** @@ -37,14 +44,35 @@ import java.util.function.Function; * @param The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph */ public class StandardActivity extends SimpleActivity { + private final static Logger logger = LogManager.getLogger("ACTIVITY"); private final DriverAdapter adapter; private final OpSequence> sequence; + private final NBConfigModel yamlmodel; public StandardActivity(DriverAdapter adapter, ActivityDef activityDef) { super(activityDef); this.adapter = adapter; + if (adapter instanceof NBConfigurable configurable) { + NBConfigModel cmodel = configurable.getConfigModel(); + Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); + if (yaml_loc.isPresent()) { + Map disposable = new LinkedHashMap<>(activityDef.getParams()); + StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities"); + yamlmodel = workload.getConfigModel(); + } + else { + yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly(); + } + NBConfigModel combinedModel = cmodel.add(yamlmodel); + NBConfiguration configuration = combinedModel.apply(activityDef.getParams()); + configurable.applyConfig(configuration); + } + else { + yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly(); + } + try { OpMapper opmapper = adapter.getOpMapper(); Function, Map> preprocessor = adapter.getPreprocessor(); @@ -81,4 +109,16 @@ public class StandardActivity extends SimpleActivity { return adapter.getErrorNameMapper(); } + @Override + public synchronized void onActivityDefUpdate(ActivityDef activityDef) { + super.onActivityDefUpdate(activityDef); + + if (adapter instanceof NBReconfigurable configurable) { + NBConfigModel cfgModel = configurable.getReconfigModel(); + NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams()); + NBReconfigurable.applyMatching(cfg,List.of(configurable)); + } +// +// ActivityDefObserver.apply(activityDef, adapter, sequence); + } } diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java index 33e634669..f3643b8bc 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivityType.java @@ -18,11 +18,24 @@ package io.nosqlbench.engine.api.activityimpl.uniform; import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; import io.nosqlbench.engine.api.activityapi.core.ActivityType; +import io.nosqlbench.engine.api.activityconfig.StatementsLoader; +import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; +import io.nosqlbench.nb.api.config.standard.NBConfigModel; +import io.nosqlbench.nb.api.config.standard.NBConfiguration; +import io.nosqlbench.nb.api.config.standard.NBReconfigurable; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; public class StandardActivityType> extends SimpleActivity implements ActivityType { + private final static Logger logger = LogManager.getLogger("ACTIVITY"); + private final DriverAdapter adapter; public StandardActivityType(DriverAdapter adapter, ActivityDef activityDef) { @@ -42,6 +55,24 @@ public class StandardActivityType> extends Simpl return (A) new StandardActivity(adapter,activityDef); } + @Override + public synchronized void onActivityDefUpdate(ActivityDef activityDef) { + super.onActivityDefUpdate(activityDef); + + if (adapter instanceof NBReconfigurable reconfigurable) { + NBConfigModel cfgModel = reconfigurable.getReconfigModel(); + Optional op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); + if (op_yaml_loc.isPresent()) { + Map disposable = new LinkedHashMap<>(activityDef.getParams()); + StmtsDocList workload = StatementsLoader.loadPath(logger, op_yaml_loc.get(), disposable, "activities"); + cfgModel=cfgModel.add(workload.getConfigModel()); + } + NBConfiguration cfg = cfgModel.apply(activityDef.getParams()); + reconfigurable.applyReconfig(cfg); + } + + } + @Override public ActionDispenser getActionDispenser(A activity) { return new StandardActionDispenser(activity); diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java index 302514694..eb5e11d24 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java @@ -17,16 +17,11 @@ package io.nosqlbench.engine.core.lifecycle; import io.nosqlbench.engine.api.activityapi.core.ActivityType; -import io.nosqlbench.engine.api.activityconfig.StatementsLoader; -import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType; import io.nosqlbench.nb.annotations.Maturity; import io.nosqlbench.nb.api.NBEnvironment; -import io.nosqlbench.nb.api.config.standard.NBConfigModel; -import io.nosqlbench.nb.api.config.standard.NBConfigurable; -import io.nosqlbench.nb.api.config.standard.NBConfiguration; import io.nosqlbench.nb.api.content.Content; import io.nosqlbench.nb.api.content.NBIO; import io.nosqlbench.nb.api.errors.BasicError; @@ -156,17 +151,17 @@ public class ActivityTypeLoader { DriverAdapter driverAdapter = oda.get(); activityDef.getParams().remove("driver"); - if (driverAdapter instanceof NBConfigurable) { - NBConfigModel cfgModel = ((NBConfigurable) driverAdapter).getConfigModel(); - Optional op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); - if (op_yaml_loc.isPresent()) { - Map disposable = new LinkedHashMap<>(activityDef.getParams()); - StmtsDocList workload = StatementsLoader.loadPath(logger, op_yaml_loc.get(), disposable, "activities"); - cfgModel=cfgModel.add(workload.getConfigModel()); - } - NBConfiguration cfg = cfgModel.apply(activityDef.getParams()); - ((NBConfigurable) driverAdapter).applyConfig(cfg); - } +// if (driverAdapter instanceof NBConfigurable) { +// NBConfigModel cfgModel = ((NBConfigurable) driverAdapter).getConfigModel(); +// Optional op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); +// if (op_yaml_loc.isPresent()) { +// Map disposable = new LinkedHashMap<>(activityDef.getParams()); +// StmtsDocList workload = StatementsLoader.loadPath(logger, op_yaml_loc.get(), disposable, "activities"); +// cfgModel=cfgModel.add(workload.getConfigModel()); +// } +// NBConfiguration cfg = cfgModel.apply(activityDef.getParams()); +// ((NBConfigurable) driverAdapter).applyConfig(cfg); +// } ActivityType activityType = new StandardActivityType<>(driverAdapter, activityDef); return Optional.of(activityType); } else { diff --git a/nbr-examples/src/test/resources/scripts/sync/awaitfinished.js b/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/NBReconfigModelProvider.java similarity index 66% rename from nbr-examples/src/test/resources/scripts/sync/awaitfinished.js rename to nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/NBReconfigModelProvider.java index a43334d10..e20671114 100644 --- a/nbr-examples/src/test/resources/scripts/sync/awaitfinished.js +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/NBReconfigModelProvider.java @@ -14,15 +14,8 @@ * limitations under the License. */ -activitydef1 = { - "alias" : "activity_to_await", - "type" : "diag", - "cycles" : "0..1500", - "threads" : "1", - "targetrate" : "500" -}; +package io.nosqlbench.nb.api.config.standard; -print('starting activity teststartstopdiag'); -scenario.start(activitydef1); -scenario.awaitActivity("activity_to_await"); -print("awaited activity"); +public interface NBReconfigModelProvider { + NBConfigModel getReconfigModel(); +} diff --git a/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/NBReconfigurable.java b/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/NBReconfigurable.java new file mode 100644 index 000000000..0c57d95ee --- /dev/null +++ b/nb-api/src/main/java/io/nosqlbench/nb/api/config/standard/NBReconfigurable.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.nb.api.config.standard; + +import java.util.Collection; + +/** + * All implementation types which wish to have a type-marshalled configuration + * should implement this interface IFF they wish to support follow-on configuration + * after initialization. This is distinct and separate from initial configuration + * via {@link NBConfigurable}. A type may be NBReconfigurable without implementing + * the NBConfigurable interface, given that initialization for a type may happen + * via constructor or other means. + * + * When a type which implements this interface is instantiated, and the + * {@link NBConfiguration} was not injected into its constructor, + * the builder should call + * {@link NBConfigurable#applyConfig(NBConfiguration)} immediately + * after calling the constructor. + * + * Subsequently, when an owning instance has a configuration update to provide to + * the original NBConfigurable which ALSO implements NBReconfigurable, then + * {@link NBReconfigurable#applyReconfig(NBConfiguration)} should be called. + * The helper methods {@link #collectModels(Class, Collection)} and + * {@link #applyMatching(NBConfiguration, Object...)} can be used to apply + * reconfigurations to groups of elements with a shared configuration model. + */ +public interface NBReconfigurable extends NBCanReconfigure, NBReconfigModelProvider { + + /** + * This applies a configuration to an element AFTER the initial + * configuration from {@link NBConfigurable}. + * @param recfg The configuration data to be applied to a new instance + */ + @Override + void applyReconfig(NBConfiguration recfg); + + /** + * Implement this method by returning an instance of {@link ConfigModel}. + * Any configuration which is provided to the {@link #applyReconfig(NBConfiguration)} + * method will be validated through this model. A configuration model + * is required in order to build a validated configuration + * from source data provided by a user. + * @return A valid configuration model for the implementing class + */ + @Override + NBConfigModel getReconfigModel(); + + /** + * Convenience method to apply a configuration to any object which + * is expected to be be configurable. + * @param cfg The cfg to apply + * @param configurables zero or more Objects which may implement NBConfigurable + */ + static void applyMatching(NBConfiguration cfg, Collection configurables) { + for (Object configurable : configurables) { + if (configurable instanceof NBReconfigurable c) { + NBConfiguration partial = c.getReconfigModel().matchConfig(cfg); + c.applyReconfig(partial); + } + } + } + + static NBConfigModel collectModels(Class of, Collection configurables) { + ConfigModel model = ConfigModel.of(of); + for (Object configurable : configurables) { + if (configurable instanceof NBReconfigurable c) { + model = model.add(c.getReconfigModel()); + } + } + return model.asReadOnly(); + } + +}