adapters ("drivers") can be specified per op template within a workload

This commit is contained in:
Jonathan Shook 2022-06-29 20:45:27 -05:00
parent 8d3d92987d
commit c55bbfc127
8 changed files with 108 additions and 71 deletions

View File

@ -159,6 +159,7 @@ public abstract class BaseDriverAdapter<R extends Op,S> implements DriverAdapter
.add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver",String.class))
.asReadOnly();
}

View File

@ -42,6 +42,7 @@ import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import java.util.Map;
import java.util.Optional;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -136,7 +137,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
createPulsarSchemaFromConf();
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this), false);
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this), false, Optional.empty());
setDefaultsFromOpSequence(sequencer);
onActivityDefUpdate(activityDef);

View File

@ -52,7 +52,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
private final Timer resultTimer;
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
private final OpSequence<OpDispenser<? extends R>> opsequence;
private final OpSequence<OpDispenser<? extends Op>> opsequence;
public StandardAction(A activity, int slot) {
this.activity = activity;
@ -69,7 +69,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
@Override
public int runCycle(long cycle) {
OpDispenser<? extends R> dispenser;
OpDispenser<? extends Op> dispenser;
Op op = null;
try (Timer.Context ct = bindTimer.time()) {

View File

@ -26,16 +26,15 @@ import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* This is a typed activity which is expected to become the standard
@ -48,38 +47,64 @@ import java.util.function.Function;
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider {
private final static Logger logger = LogManager.getLogger("ACTIVITY");
private final DriverAdapter<R, S> adapter;
private final OpSequence<OpDispenser<? extends R>> sequence;
private final OpSequence<OpDispenser<? extends Op>> sequence;
private final NBConfigModel yamlmodel;
private final ConcurrentHashMap<String, DriverAdapter> adapters = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, OpMapper<Op>> mappers = new ConcurrentHashMap<>();
public StandardActivity(DriverAdapter<R, S> adapter, ActivityDef activityDef) {
public StandardActivity(ActivityDef activityDef) {
super(activityDef);
this.adapter = adapter;
this.adapters.putAll(adapters);
if (adapter instanceof NBConfigurable configurable) {
NBConfigModel cmodel = configurable.getConfigModel();
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (yaml_loc.isPresent()) {
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
}
else {
yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly();
}
NBConfigModel combinedModel = cmodel.add(yamlmodel);
NBConfiguration configuration = combinedModel.apply(activityDef.getParams());
configurable.applyConfig(configuration);
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (yaml_loc.isPresent()) {
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
StmtsDocList workload = StatementsLoader.loadPath(logger, yaml_loc.get(), disposable, "activities");
yamlmodel = workload.getConfigModel();
}
else {
yamlmodel= ConfigModel.of(StandardActivity.class).asReadOnly();
}
ServiceLoader<DriverAdapter> adapterLoader = ServiceLoader.load(DriverAdapter.class);
Optional<DriverAdapter> defaultAdapter = activityDef.getParams().getOptionalString("driver")
.map(s -> ServiceSelector.of(s, adapterLoader).getOne());
List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter);
List<ParsedOp> pops = new ArrayList<>();
for (OpTemplate ot : opTemplates) {
String driverName = ot.getOptionalStringParam("driver")
.or(() -> activityDef.getParams().getOptionalString("driver"))
.orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
if (!adapters.containsKey(driverName)) {
DriverAdapter adapter = ServiceSelector.of(driverName, adapterLoader).get().orElseThrow(
() -> new OpConfigError("Unable to load driver adapter for name '" + driverName + "'")
);
NBConfigModel combinedModel = yamlmodel;
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
if (adapter instanceof NBConfigurable configurable) {
NBConfigModel adapterModel = configurable.getConfigModel();
combinedModel = adapterModel.add(yamlmodel);
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
configurable.applyConfig(combinedConfig);
}
adapters.put(driverName,adapter);
mappers.put(driverName,adapter.getOpMapper());
}
DriverAdapter adapter = adapters.get(driverName);
ParsedOp pop = new ParsedOp(ot,adapter.getConfiguration(),List.of(adapter.getPreprocessor()));
pops.add(pop);
}
try {
OpMapper<R> opmapper = adapter.getOpMapper();
Function<Map<String, Object>, Map<String, Object>> preprocessor = adapter.getPreprocessor();
boolean strict = activityDef.getParams().getOptionalBoolean("strict").orElse(false);
sequence = createOpSourceFromCommands(opmapper, adapter.getConfiguration(), List.of(preprocessor), strict);
sequence = createOpSourceFromParsedOps(adapters, mappers, pops);
} catch (Exception e) {
if (e instanceof OpConfigError) {
throw e;
@ -95,40 +120,45 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
setDefaultsFromOpSequence(sequence);
}
public OpSequence<OpDispenser<? extends R>> getOpSequence() {
public OpSequence<OpDispenser<? extends Op>> getOpSequence() {
return sequence;
}
/**
* When an adapter needs to identify an error uniquely for the purposes of
* routing it to the correct error handler, or naming it in logs, or naming
* metrics, override this method in your activity.
*
* @return A function that can reliably and safely map an instance of Throwable to a stable name.
*/
@Override
public final Function<Throwable, String> getErrorNameMapper() {
return adapter.getErrorNameMapper();
}
// /**
// * When an adapter needs to identify an error uniquely for the purposes of
// * routing it to the correct error handler, or naming it in logs, or naming
// * metrics, override this method in your activity.
// *
// * @return A function that can reliably and safely map an instance of Throwable to a stable name.
// */
// @Override
// public final Function<Throwable, String> getErrorNameMapper() {
// return adapter.getErrorNameMapper();
// }
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg,List.of(configurable));
for (DriverAdapter adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
NBReconfigurable.applyMatching(cfg,List.of(configurable));
}
}
}
@Override
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String,Object> cfg) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
return sotp.getSyntheticOpTemplates(stmtsDocList, cfg);
} else {
return List.of();
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(stmtsDocList, cfg);
opTemplates.addAll(newTemplates);
}
}
return opTemplates;
}
}

View File

@ -150,7 +150,7 @@ public class ActivityTypeLoader {
if (oda.isPresent()) {
DriverAdapter<?, ?> driverAdapter = oda.get();
activityDef.getParams().remove("driver");
// activityDef.getParams().remove("driver");
// if (driverAdapter instanceof NBConfigurable) {
// NBConfigModel cfgModel = ((NBConfigurable) driverAdapter).getConfigModel();
// Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");

View File

@ -21,6 +21,7 @@ import io.nosqlbench.engine.api.activityapi.core.RunState;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.uniform.StandardActivityType;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.nb.annotations.Maturity;
@ -319,27 +320,31 @@ public class ScenarioController {
ActivityExecutor executor = activityExecutors.get(activityDef.getAlias());
if (executor == null && createIfMissing) {
if (activityDef.getParams().containsKey("driver")) {
ActivityType<?> activityType = new ActivityTypeLoader()
.setMaturity(this.minMaturity)
.load(activityDef)
.orElseThrow(
() -> new RuntimeException("Driver for '" + activityDef + "' was not found." +
"\nYou can use --list-drivers to see what drivers are supported in this runtime." +
ConfigSuggestions.suggestAlternates(
new ActivityTypeLoader().getAllSelectors(),activityDef.getActivityType(),4)
.orElse("")
)
);
ActivityType<?> activityType = new ActivityTypeLoader()
.setMaturity(this.minMaturity)
.load(activityDef)
.orElseThrow(
() -> new RuntimeException("Driver for '" + activityDef + "' was not found." +
"\nYou can use --list-drivers to see what drivers are supported in this runtime." +
ConfigSuggestions.suggestAlternates(
new ActivityTypeLoader().getAllSelectors(),activityDef.getActivityType(),4)
.orElse("")
)
executor = new ActivityExecutor(
activityType.getAssembledActivity(
activityDef,
getActivityMap()
),
this.sessionId
);
activityExecutors.put(activityDef.getAlias(), executor);
} else {
new StandardActivityType(activityDef);
}
executor = new ActivityExecutor(
activityType.getAssembledActivity(
activityDef,
getActivityMap()
),
this.sessionId
);
activityExecutors.put(activityDef.getAlias(), executor);
}
return executor;
}

View File

@ -51,7 +51,7 @@ public class ScriptExampleTests {
paramsMap.put(params[i], params[i + 1]);
}
String scenarioName = "scenario " + scriptname;
System.out.println("=".repeat(29) + " Running ASYNC integration test for: " + scenarioName);
System.out.println("=".repeat(29) + " Running integration test for example scenario: " + scenarioName);
ScenariosExecutor executor = new ScenariosExecutor(ScriptExampleTests.class.getSimpleName() + ":" + scriptname, 1);
Scenario s = new Scenario(scenarioName, Scenario.Engine.Graalvm,"stdout:300", Maturity.Any);
@ -236,7 +236,7 @@ public class ScriptExampleTests {
public void testExceptionPropagationFromActivityInit() {
ScenarioResult scenarioResult = runScenario("activityiniterror");
assertThat(scenarioResult.getException()).isPresent();
assertThat(scenarioResult.getException().get().getMessage()).contains("Unknown config parameter 'unknown_config'");
assertThat(scenarioResult.getException().get().getMessage()).contains("Unable to convert end cycle from invalid");
assertThat(scenarioResult.getException()).isNotNull();
}

View File

@ -17,7 +17,7 @@
activitydef1 = {
"alias" : "erroring_activity_init",
"driver" : "diag",
"cycles" : "0..1500000",
"cycles" : "invalid",
"threads" : "1",
"targetrate" : "500",
"unknown_config" : "unparsable",