diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java index 09932a207..0eb0fa62b 100644 --- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java @@ -159,6 +159,7 @@ public abstract class BaseDriverAdapter implements DriverAdapter .add(Param.optional("seq", String.class, "sequencing algorithm")) .add(Param.optional("instrument", Boolean.class)) .add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file")) + .add(Param.optional("driver",String.class)) .asReadOnly(); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index b3f40b1fb..bedd2979e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.schema.KeyValueEncodingType; import java.util.Map; +import java.util.Optional; public class PulsarActivity extends SimpleActivity implements ActivityDefObserver { @@ -136,7 +137,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve createPulsarSchemaFromConf(); - this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this), false); + this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this), false, Optional.empty()); setDefaultsFromOpSequence(sequencer); onActivityDefUpdate(activityDef); diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardAction.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardAction.java index c9c7adc80..b902b978d 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardAction.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardAction.java @@ -52,7 +52,7 @@ public class StandardAction, R extends Op> impl private final Timer resultTimer; private final Timer bindTimer; private final NBErrorHandler errorHandler; - private final OpSequence> opsequence; + private final OpSequence> opsequence; public StandardAction(A activity, int slot) { this.activity = activity; @@ -69,7 +69,7 @@ public class StandardAction, R extends Op> impl @Override public int runCycle(long cycle) { - OpDispenser dispenser; + OpDispenser dispenser; Op op = null; try (Timer.Context ct = bindTimer.time()) { diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java index 8491e71ff..d0ee007e9 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/StandardActivity.java @@ -26,16 +26,15 @@ import io.nosqlbench.engine.api.activityimpl.OpMapper; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider; import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op; +import io.nosqlbench.engine.api.templating.ParsedOp; +import io.nosqlbench.nb.annotations.ServiceSelector; import io.nosqlbench.nb.api.config.standard.*; import io.nosqlbench.nb.api.errors.OpConfigError; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; /** * This is a typed activity which is expected to become the standard @@ -48,38 +47,64 @@ import java.util.function.Function; public class StandardActivity extends SimpleActivity implements SyntheticOpTemplateProvider { private final static Logger logger = LogManager.getLogger("ACTIVITY"); - private final DriverAdapter adapter; - private final OpSequence> sequence; + private final OpSequence> sequence; private final NBConfigModel yamlmodel; + private final ConcurrentHashMap adapters = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> mappers = new ConcurrentHashMap<>(); - public StandardActivity(DriverAdapter adapter, ActivityDef activityDef) { + public StandardActivity(ActivityDef activityDef) { super(activityDef); - this.adapter = adapter; + this.adapters.putAll(adapters); - if (adapter instanceof NBConfigurable configurable) { - NBConfigModel cmodel = configurable.getConfigModel(); - Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); - if (yaml_loc.isPresent()) { - Map disposable = new LinkedHashMap<>(activityDef.getParams()); - StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities"); - yamlmodel = workload.getConfigModel(); - } - else { - yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly(); - } - NBConfigModel combinedModel = cmodel.add(yamlmodel); - NBConfiguration configuration = combinedModel.apply(activityDef.getParams()); - configurable.applyConfig(configuration); + Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); + if (yaml_loc.isPresent()) { + Map disposable = new LinkedHashMap<>(activityDef.getParams()); + StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities"); + yamlmodel = workload.getConfigModel(); } else { yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly(); } + ServiceLoader adapterLoader = ServiceLoader.load(DriverAdapter.class); + Optional defaultAdapter = activityDef.getParams().getOptionalString("driver") + .map(s -> ServiceSelector.of(s, adapterLoader).getOne()); + + List opTemplates = loadOpTemplates(defaultAdapter); + + + List pops = new ArrayList<>(); + for (OpTemplate ot : opTemplates) { + String driverName = ot.getOptionalStringParam("driver") + .or(() -> activityDef.getParams().getOptionalString("driver")) + .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot)); + + if (!adapters.containsKey(driverName)) { + DriverAdapter adapter = ServiceSelector.of(driverName, adapterLoader).get().orElseThrow( + () -> new OpConfigError("Unable to load driver adapter for name '" + driverName + "'") + ); + + NBConfigModel combinedModel = yamlmodel; + NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams()); + + if (adapter instanceof NBConfigurable configurable) { + NBConfigModel adapterModel = configurable.getConfigModel(); + combinedModel = adapterModel.add(yamlmodel); + combinedConfig = combinedModel.matchConfig(activityDef.getParams()); + configurable.applyConfig(combinedConfig); + } + adapters.put(driverName,adapter); + mappers.put(driverName,adapter.getOpMapper()); + } + + DriverAdapter adapter = adapters.get(driverName); + ParsedOp pop = new ParsedOp(ot,adapter.getConfiguration(),List.of(adapter.getPreprocessor())); + pops.add(pop); + } + try { - OpMapper opmapper = adapter.getOpMapper(); - Function, Map> preprocessor = adapter.getPreprocessor(); boolean strict = activityDef.getParams().getOptionalBoolean("strict").orElse(false); - sequence = createOpSourceFromCommands(opmapper, adapter.getConfiguration(), List.of(preprocessor), strict); + sequence = createOpSourceFromParsedOps(adapters, mappers, pops); } catch (Exception e) { if (e instanceof OpConfigError) { throw e; @@ -95,40 +120,45 @@ public class StandardActivity extends SimpleActivity implements setDefaultsFromOpSequence(sequence); } - public OpSequence> getOpSequence() { + public OpSequence> getOpSequence() { return sequence; } - /** - * When an adapter needs to identify an error uniquely for the purposes of - * routing it to the correct error handler, or naming it in logs, or naming - * metrics, override this method in your activity. - * - * @return A function that can reliably and safely map an instance of Throwable to a stable name. - */ - @Override - public final Function getErrorNameMapper() { - return adapter.getErrorNameMapper(); - } +// /** +// * When an adapter needs to identify an error uniquely for the purposes of +// * routing it to the correct error handler, or naming it in logs, or naming +// * metrics, override this method in your activity. +// * +// * @return A function that can reliably and safely map an instance of Throwable to a stable name. +// */ +// @Override +// public final Function getErrorNameMapper() { +// return adapter.getErrorNameMapper(); +// } @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); - if (adapter instanceof NBReconfigurable configurable) { - NBConfigModel cfgModel = configurable.getReconfigModel(); - NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams()); - NBReconfigurable.applyMatching(cfg,List.of(configurable)); + for (DriverAdapter adapter : adapters.values()) { + if (adapter instanceof NBReconfigurable configurable) { + NBConfigModel cfgModel = configurable.getReconfigModel(); + NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams()); + NBReconfigurable.applyMatching(cfg,List.of(configurable)); + } } } @Override public List getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map cfg) { - if (adapter instanceof SyntheticOpTemplateProvider sotp) { - return sotp.getSyntheticOpTemplates(stmtsDocList, cfg); - } else { - return List.of(); + List opTemplates = new ArrayList<>(); + for (DriverAdapter adapter : adapters.values()) { + if (adapter instanceof SyntheticOpTemplateProvider sotp) { + List newTemplates = sotp.getSyntheticOpTemplates(stmtsDocList, cfg); + opTemplates.addAll(newTemplates); + } } + return opTemplates; } } diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java index eb5e11d24..9eef262ab 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityTypeLoader.java @@ -150,7 +150,7 @@ public class ActivityTypeLoader { if (oda.isPresent()) { DriverAdapter driverAdapter = oda.get(); - activityDef.getParams().remove("driver"); +// activityDef.getParams().remove("driver"); // if (driverAdapter instanceof NBConfigurable) { // NBConfigModel cfgModel = ((NBConfigurable) driverAdapter).getConfigModel(); // Optional op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java index bcc9044db..3b60bd0b1 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java @@ -21,6 +21,7 @@ import io.nosqlbench.engine.api.activityapi.core.RunState; import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.ParameterMap; +import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType; import io.nosqlbench.engine.api.metrics.ActivityMetrics; import io.nosqlbench.engine.core.annotation.Annotators; import io.nosqlbench.nb.annotations.Maturity; @@ -319,27 +320,31 @@ public class ScenarioController { ActivityExecutor executor = activityExecutors.get(activityDef.getAlias()); if (executor == null && createIfMissing) { + if (activityDef.getParams().containsKey("driver")) { + ActivityType activityType = new ActivityTypeLoader() + .setMaturity(this.minMaturity) + .load(activityDef) + .orElseThrow( + () -> new RuntimeException("Driver for '" + activityDef + "' was not found." + + "\nYou can use --list-drivers to see what drivers are supported in this runtime." + + ConfigSuggestions.suggestAlternates( + new ActivityTypeLoader().getAllSelectors(),activityDef.getActivityType(),4) + .orElse("") + ) + ); - ActivityType activityType = new ActivityTypeLoader() - .setMaturity(this.minMaturity) - .load(activityDef) - .orElseThrow( - () -> new RuntimeException("Driver for '" + activityDef + "' was not found." + - "\nYou can use --list-drivers to see what drivers are supported in this runtime." + - ConfigSuggestions.suggestAlternates( - new ActivityTypeLoader().getAllSelectors(),activityDef.getActivityType(),4) - .orElse("") - ) + executor = new ActivityExecutor( + activityType.getAssembledActivity( + activityDef, + getActivityMap() + ), + this.sessionId ); + activityExecutors.put(activityDef.getAlias(), executor); + } else { + new StandardActivityType(activityDef); + } - executor = new ActivityExecutor( - activityType.getAssembledActivity( - activityDef, - getActivityMap() - ), - this.sessionId - ); - activityExecutors.put(activityDef.getAlias(), executor); } return executor; } diff --git a/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java b/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java index 0b5af975d..bc230712f 100644 --- a/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java +++ b/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java @@ -51,7 +51,7 @@ public class ScriptExampleTests { paramsMap.put(params[i], params[i + 1]); } String scenarioName = "scenario " + scriptname; - System.out.println("=".repeat(29) + " Running ASYNC integration test for: " + scenarioName); + System.out.println("=".repeat(29) + " Running integration test for example scenario: " + scenarioName); ScenariosExecutor executor = new ScenariosExecutor(ScriptExampleTests.class.getSimpleName() + ":" + scriptname, 1); Scenario s = new Scenario(scenarioName, Scenario.Engine.Graalvm,"stdout:300", Maturity.Any); @@ -236,7 +236,7 @@ public class ScriptExampleTests { public void testExceptionPropagationFromActivityInit() { ScenarioResult scenarioResult = runScenario("activityiniterror"); assertThat(scenarioResult.getException()).isPresent(); - assertThat(scenarioResult.getException().get().getMessage()).contains("Unknown config parameter 'unknown_config'"); + assertThat(scenarioResult.getException().get().getMessage()).contains("Unable to convert end cycle from invalid"); assertThat(scenarioResult.getException()).isNotNull(); } diff --git a/nbr-examples/src/test/resources/scripts/examples/activityiniterror.js b/nbr-examples/src/test/resources/scripts/examples/activityiniterror.js index e43cb2145..3c95579b2 100644 --- a/nbr-examples/src/test/resources/scripts/examples/activityiniterror.js +++ b/nbr-examples/src/test/resources/scripts/examples/activityiniterror.js @@ -17,7 +17,7 @@ activitydef1 = { "alias" : "erroring_activity_init", "driver" : "diag", - "cycles" : "0..1500000", + "cycles" : "invalid", "threads" : "1", "targetrate" : "500", "unknown_config" : "unparsable",