mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
refactor op synthesis
This commit is contained in:
parent
0574203536
commit
4b14ea6f4a
@ -1,7 +1,26 @@
|
||||
package io.nosqlbench.adapters.api.activityconfig.yaml;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpsDocList;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
import java.util.function.Function;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@ -45,12 +64,14 @@ public class OpTemplates implements Iterable<OpTemplate> {
|
||||
* including the inherited and overridden values from this doc and the parent block.
|
||||
*/
|
||||
public OpTemplates matching(String tagFilterSpec, boolean logit) {
|
||||
TagFilter ts = new TagFilter(tagFilterSpec);
|
||||
return matching(new TagFilter(tagFilterSpec), logit);
|
||||
}
|
||||
public OpTemplates matching(TagFilter tagFilter, boolean logit) {
|
||||
List<OpTemplate> matchingOpTemplates = new ArrayList<>();
|
||||
|
||||
List<String> matchlog = new ArrayList<>();
|
||||
templates.stream()
|
||||
.map(ts::matchesTaggedResult)
|
||||
.map(tagFilter::matchesTaggedResult)
|
||||
.peek(r -> matchlog.add(r.getLog()))
|
||||
.filter(TagFilter.Result::matched)
|
||||
.map(TagFilter.Result::getElement)
|
||||
@ -63,6 +84,7 @@ public class OpTemplates implements Iterable<OpTemplate> {
|
||||
}
|
||||
|
||||
return new OpTemplates(matchingOpTemplates,opsDocList);
|
||||
|
||||
}
|
||||
|
||||
public Map<String,String> getDocBindings() {
|
||||
@ -89,4 +111,10 @@ public class OpTemplates implements Iterable<OpTemplate> {
|
||||
public boolean isEmpty() {
|
||||
return this.templates.isEmpty();
|
||||
}
|
||||
|
||||
public OpTemplates transform(Function<OpTemplate,OpTemplate> transformF) {
|
||||
List<OpTemplate> transformed = this.templates.stream().map(t -> transformF.apply(t)).toList();
|
||||
return new OpTemplates(transformed,opsDocList);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -84,11 +84,13 @@ public abstract class BaseOpDispenser<OP extends CycleOp<?>, SPACE extends Space
|
||||
*/
|
||||
private final CycleFunction<Boolean> _verifier;
|
||||
private final ThreadLocal<CycleFunction<Boolean>> tlVerifier;
|
||||
private final long ratio;
|
||||
|
||||
protected BaseOpDispenser(final NBComponent parentC, final ParsedOp op, LongFunction<? extends SPACE> spaceF) {
|
||||
super(parentC);
|
||||
opName = op.getName();
|
||||
labels = op.getLabels();
|
||||
this.ratio = op.takeOptionalStaticValue("ratio", Long.class).orElse(1L);
|
||||
|
||||
this.timerStarts = op.takeOptionalStaticValue(START_TIMERS, String.class)
|
||||
.map(s -> s.split(", *"))
|
||||
@ -240,4 +242,9 @@ public abstract class BaseOpDispenser<OP extends CycleOp<?>, SPACE extends Space
|
||||
public List<Validator> getValidator(NBComponent parent, ParsedOp pop, OpLookup lookup) {
|
||||
return CoreOpValidators.getValidator(this, pop, lookup);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRatio() {
|
||||
return this.ratio;
|
||||
}
|
||||
}
|
||||
|
@ -109,4 +109,6 @@ public interface OpDispenser<OPTYPE extends CycleOp<?>> extends LongFunction<OPT
|
||||
CycleFunction<Boolean> getVerifier();
|
||||
|
||||
String getOpName();
|
||||
|
||||
long getRatio();
|
||||
}
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.planning;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/// This version of an [[OpSequence]] allows a sequence to be derived from another sequence
|
||||
/// based on a mapping function. This is done lazily by default, to allow for incremental
|
||||
/// initialization.
|
||||
public class DerivedSequence<BASE, DERIVED> implements OpSequence<DERIVED> {
|
||||
private final OpSequence<BASE> baseTypeSequence;
|
||||
private final SequencerType type;
|
||||
private final List<DERIVED> elems;
|
||||
private final int[] seq;
|
||||
private final Function<BASE, DERIVED> deriveF;
|
||||
|
||||
public DerivedSequence(OpSequence<BASE> baseTypeSequence, Function<BASE,DERIVED> deriveF) {
|
||||
this.baseTypeSequence = baseTypeSequence;
|
||||
this.deriveF = deriveF;
|
||||
this.type = baseTypeSequence.getSequencerType();
|
||||
this.elems = new ArrayList<>(baseTypeSequence.getOps().size());
|
||||
this.seq = baseTypeSequence.getSequence();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DERIVED apply(long selector) {
|
||||
int index = (int) (selector % seq.length);
|
||||
index = seq[index];
|
||||
return elems.get(index);
|
||||
}
|
||||
|
||||
public DERIVED derive(long selector) {
|
||||
int index = (int) (selector % seq.length);
|
||||
if (elems.get(index)==null) {
|
||||
elems.set(index,this.deriveF.apply(baseTypeSequence.getOps().get(index)));
|
||||
}
|
||||
return elems.get(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DERIVED> getOps() {
|
||||
return elems;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int[] getSequence() {
|
||||
return seq;
|
||||
}
|
||||
|
||||
public SequencerType getSequencerType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <OTHER> DerivedSequence<DERIVED,OTHER> transform(Function<DERIVED, OTHER> func) {
|
||||
return new DerivedSequence<DERIVED,OTHER>(this, func);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "seq len="+seq.length + ", LUT=" + Arrays.toString(this.seq);
|
||||
}
|
||||
}
|
@ -40,6 +40,7 @@ public interface OpSequence<T> extends LongFunction<T> {
|
||||
*/
|
||||
int[] getSequence();
|
||||
|
||||
SequencerType getSequencerType();
|
||||
/**
|
||||
* Map this OpSequence to another type of OpSequence.
|
||||
* @param func The transformation function from this to another type
|
||||
|
@ -39,6 +39,7 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
|
||||
import io.nosqlbench.engine.api.activityapi.input.Input;
|
||||
import io.nosqlbench.engine.api.activityapi.output.Output;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
|
||||
import io.nosqlbench.engine.api.activityapi.simrate.*;
|
||||
@ -54,30 +55,33 @@ import io.nosqlbench.engine.core.lifecycle.commands.CMD_start;
|
||||
import io.nosqlbench.engine.core.lifecycle.commands.CMD_stop;
|
||||
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
|
||||
import io.nosqlbench.engine.core.lifecycle.session.NBSession;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
|
||||
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
|
||||
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
|
||||
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
import io.nosqlbench.nb.api.config.standard.*;
|
||||
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.nb.api.labels.NBLabels;
|
||||
import io.nosqlbench.nb.api.components.events.NBEvent;
|
||||
import io.nosqlbench.nb.api.components.events.ParamChange;
|
||||
import io.nosqlbench.nb.api.components.events.SetThreads;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
|
||||
import io.nosqlbench.nb.api.config.standard.*;
|
||||
import io.nosqlbench.nb.api.engine.activityimpl.ActivityConfig;
|
||||
import io.nosqlbench.nb.api.engine.activityimpl.CyclesSpec;
|
||||
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.nb.api.labels.NBLabels;
|
||||
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
/// An [[Activity]] is a flywheel of operations. Each activity consumes ordinals
|
||||
/// from a specified interval, maps them to executable operations via _op synthesis_ as determined
|
||||
/// by the op templates supplied by the user, and executes those operations.
|
||||
@ -115,11 +119,8 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
|
||||
public Activity(NBComponent parent, ActivityDef activityDef) {
|
||||
|
||||
super(
|
||||
parent,
|
||||
NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels())
|
||||
);
|
||||
this.activityDef = activityDef;
|
||||
this.applyConfig(config);
|
||||
this.sequence = initSequence();
|
||||
this.metrics = new ActivityMetrics(this);
|
||||
|
||||
getParams().set(
|
||||
@ -130,67 +131,114 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
"Unable to determine name of activity from " + activityDef))
|
||||
);
|
||||
|
||||
OpsDocList workload;
|
||||
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
|
||||
private OpSequence<OpDispenser<? extends CycleOp<?>>> initSequence() {
|
||||
// this.activityDef = activityDef;
|
||||
// this.metrics = new ActivityMetrics(this);
|
||||
|
||||
NBConfigModel yamlmodel = yaml_loc.map(path -> {
|
||||
return OpsLoader.loadPath(
|
||||
path, new LinkedHashMap<>(activityDef.getParams()), "activities").getConfigModel();
|
||||
}).orElse(ConfigModel.of(Activity.class).asReadOnly());
|
||||
|
||||
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
|
||||
Optional<DriverAdapter<?, ?>> defaultAdapter = activityDef.getParams().getOptionalString(
|
||||
"driver").flatMap(name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV()));
|
||||
|
||||
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
|
||||
throw new BasicError(
|
||||
"Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this driver adapter. " + "Change '<activeByDefault>false</activeByDefault>' for the driver in " + "'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml' first.");
|
||||
//region This region contains all of the refactored op synthesis logic
|
||||
OpTemplates opTemplatesRef = loadOpTemplates();
|
||||
|
||||
/// How to load a named [DriverAdapter] with component parentage and labels, given
|
||||
/// the driver name and the activity (cross-driver) configur ation
|
||||
AdapterResolver adapterResolver = new AdapterResolver();
|
||||
ConcurrentHashMap<String, DriverAdapter> adapterCache = new ConcurrentHashMap<>();
|
||||
|
||||
/// Promote the driver adapter function into a cached version
|
||||
Function<String, DriverAdapter<? extends CycleOp<?>, Space>> adapterF
|
||||
= (name) -> adapterCache.computeIfAbsent(
|
||||
name,
|
||||
cacheName -> adapterResolver.apply(this, name, this.config));
|
||||
|
||||
/// How to get a parsed op, given an op template and an activity.
|
||||
/// A parsed op depends on back-fill from the activity params, assuming those params
|
||||
/// are included in the activity's [[NBConfiguration]], but the params are also filtered
|
||||
/// through the associated [[DriverAdapter]]'s configuration.
|
||||
ParsedOpResolver parsedOpF = new ParsedOpResolver();
|
||||
|
||||
/// How to get an op dispenser, given an adapter and a parsed op
|
||||
/// The cached [Space] mechanism is included within the adapter's API, and is specific to each parsed op,
|
||||
/// since this is a dynamic op field
|
||||
DispenserResolver dispenserResolver = new DispenserResolver();
|
||||
|
||||
OpResolverBank orb = new OpResolverBank(
|
||||
this, adapterResolver, opTemplatesRef, config.get("tags"), dispenserResolver, parsedOpF,
|
||||
config);
|
||||
List<? extends OpDispenser<?>> dispensers = orb.resolveDispensers();
|
||||
|
||||
/// TODO: Here, we have resolved the dispensers. The next step is to add any modifiers to them
|
||||
/// as composed functions for things like dry-run, etc.
|
||||
|
||||
|
||||
/// TODO: Here, we have resolved the dispensers and their modifiers. The next step is to create the LUT
|
||||
/// for the conventional [[OpSequence]], although other non-deterministic op selection
|
||||
/// methods should also be supported.
|
||||
|
||||
SequencerType sequencerType = config.getOptional("seq").map(SequencerType::valueOf)
|
||||
.orElse(SequencerType.bucket);
|
||||
|
||||
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(
|
||||
sequencerType);
|
||||
|
||||
for (OpDispenser<?> dispenser : dispensers) {
|
||||
planner.addOp(dispenser, d -> d.getRatio());
|
||||
}
|
||||
OpSequence<OpDispenser<? extends CycleOp<?>>> sequence = planner.resolve();
|
||||
return sequence;
|
||||
|
||||
OpResolver opResolver = new OpResolver(() -> loadOpTemplates());
|
||||
// TODO: Perhaps, op templates should be split into core/reserved partition and another, with a proxy
|
||||
// object retained for the core elements
|
||||
|
||||
//endregion
|
||||
|
||||
|
||||
// HERE, op templates are loaded before drivers are loaded
|
||||
// List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter.orElse(null), false);
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist = new ArrayList<>();
|
||||
// Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
|
||||
// Optional<DriverAdapter<?, ?>> defaultAdapter = activityDef.getParams()
|
||||
// .getOptionalString("driver")
|
||||
// .flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
|
||||
// .map(l -> l.load(this, NBLabels.forKV()));
|
||||
//
|
||||
// if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
|
||||
// throw new BasicError("Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this " + "driver adapter. Change" + " '<activeByDefault>false</activeByDefault>' for the driver in" + " './nb-adapters/pom.xml' and" + " './nb-adapters/nb-adapters-included/pom.xml' first.");
|
||||
// }
|
||||
|
||||
NBConfigModel supersetConfig = ConfigModel.of(Activity.class).add(yamlmodel);
|
||||
Optional<String> defaultDriverOption = defaultDriverName;
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
|
||||
// NBConfigModel supersetConfig = ConfigModel.of(Activity.class).add(yamlmodel);
|
||||
// Optional<String> defaultDriverOption = defaultDriverName;
|
||||
// ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
|
||||
|
||||
List<ParsedOp> allParsedOps = loadOpTemplates(
|
||||
defaultAdapter.orElse(null), false, false).stream().map(ot -> upconvert(
|
||||
ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)).toList();
|
||||
// List<ParsedOp> allParsedOps = loadOpTemplates(defaultAdapter.orElse(null), false, false).stream()
|
||||
// .map(ot -> upconvert(ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist))
|
||||
// .toList();
|
||||
|
||||
OpLookup lookup = new OpLookupService(() -> allParsedOps);
|
||||
// OpLookup lookup = new OpLookupService(() -> allParsedOps);
|
||||
|
||||
TagFilter ts = new TagFilter(activityDef.getParams().getOptionalString("tags").orElse(""));
|
||||
List<ParsedOp> activeParsedOps = ts.filter(allParsedOps);
|
||||
// TagFilter ts = new TagFilter(activityDef.getParams().getOptionalString("tags").orElse(""));
|
||||
// List<ParsedOp> activeParsedOps = ts.filter(allParsedOps);
|
||||
|
||||
if (defaultDriverOption.isPresent()) {
|
||||
long matchingDefault = mappers.keySet().stream().filter(
|
||||
n -> n.equals(defaultDriverOption.get())).count();
|
||||
if (0 == matchingDefault) {
|
||||
logger.warn(
|
||||
"All op templates used a different driver than the default '{}'",
|
||||
defaultDriverOption.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
// if (defaultDriverOption.isPresent()) {
|
||||
// long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count();
|
||||
// if (0 == matchingDefault) {
|
||||
// logger.warn(
|
||||
// "All op templates used a different driver than the default '{}'",
|
||||
// defaultDriverOption.get()
|
||||
// );
|
||||
// }
|
||||
// }
|
||||
|
||||
try {
|
||||
sequence = createOpSourceFromParsedOps(adapterlist, activeParsedOps, lookup);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof OpConfigError) {
|
||||
throw e;
|
||||
}
|
||||
throw new OpConfigError(
|
||||
"Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
}
|
||||
|
||||
initOpsMetrics();
|
||||
// try {
|
||||
// sequence = createOpSourceFromParsedOps(adapterlist, activeParsedOps, lookup);
|
||||
// } catch (Exception e) {
|
||||
// if (e instanceof OpConfigError) {
|
||||
// throw e;
|
||||
// }
|
||||
// throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
@ -212,16 +260,11 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
|
||||
}
|
||||
|
||||
|
||||
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
|
||||
return createOpSourceFromParsedOps2(adapters, pops, opLookup);
|
||||
}
|
||||
|
||||
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps2(
|
||||
// Map<String, DriverAdapter<?,?>> adapterCache,
|
||||
// Map<String, OpMapper<? extends Op>> mapperCache,
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
|
||||
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps2(List<DriverAdapter<CycleOp<?>, Space>> adapters,
|
||||
List<ParsedOp> pops,
|
||||
OpLookup opLookup
|
||||
)
|
||||
{
|
||||
try {
|
||||
|
||||
List<Long> ratios = new ArrayList<>(pops.size());
|
||||
@ -231,8 +274,8 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
ratios.add(ratio);
|
||||
}
|
||||
|
||||
SequencerType sequencerType = getParams().getOptionalString("seq").map(
|
||||
SequencerType::valueOf).orElse(SequencerType.bucket);
|
||||
SequencerType sequencerType = config.getOptional("seq").map(SequencerType::valueOf)
|
||||
.orElse(SequencerType.bucket);
|
||||
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(
|
||||
sequencerType);
|
||||
|
||||
@ -287,57 +330,71 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
return activityDef.getParams();
|
||||
}
|
||||
|
||||
// private ParsedOp upconvert(
|
||||
// OpTemplate ot,
|
||||
// Optional<String> defaultDriverOption,
|
||||
// NBConfigModel yamlmodel,
|
||||
// NBConfigModel supersetConfig,
|
||||
// ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
|
||||
// List<DriverAdapter<CycleOp<?>, Space>> adapterlist
|
||||
// )
|
||||
// {
|
||||
// // ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(),
|
||||
// // List.of(), this);
|
||||
// String
|
||||
// driverName =
|
||||
// ot.getOptionalStringParam("driver", String.class)
|
||||
// .or(() -> ot.getOptionalStringParam("type", String.class))
|
||||
// .or(() -> defaultDriverOption).orElseThrow(() -> new OpConfigError(
|
||||
// "Unable to identify driver name for op template:\n" + ot));
|
||||
//
|
||||
// DriverAdapter<CycleOp<?>, Space>
|
||||
// adapter =
|
||||
// adapters.computeIfAbsent(
|
||||
// driverName,
|
||||
// dn -> loadAdapter(dn, yamlmodel, supersetConfig, mappers));
|
||||
// supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
|
||||
// adapterlist.add(adapter);
|
||||
//
|
||||
// ParsedOp
|
||||
// pop =
|
||||
// new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
|
||||
// Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
|
||||
//
|
||||
// return pop;
|
||||
// }
|
||||
|
||||
private ParsedOp upconvert(
|
||||
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
|
||||
NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist
|
||||
) {
|
||||
// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
|
||||
String driverName = ot.getOptionalStringParam("driver", String.class).or(
|
||||
() -> ot.getOptionalStringParam("type", String.class)).or(
|
||||
() -> defaultDriverOption).orElseThrow(
|
||||
() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
|
||||
|
||||
DriverAdapter<CycleOp<?>, Space> adapter = adapters.computeIfAbsent(
|
||||
driverName, dn -> loadAdapter(
|
||||
dn, yamlmodel, supersetConfig, mappers));
|
||||
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
|
||||
adapterlist.add(adapter);
|
||||
|
||||
ParsedOp pop = new ParsedOp(
|
||||
ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
|
||||
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
|
||||
|
||||
return pop;
|
||||
}
|
||||
|
||||
private DriverAdapter<CycleOp<?>, Space> loadAdapter(
|
||||
String driverName, NBConfigModel yamlmodel, NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
|
||||
) {
|
||||
DriverAdapter<CycleOp<?>, Space> adapter = Optional.of(driverName).flatMap(
|
||||
name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV())).orElseThrow(
|
||||
() -> new OpConfigError("driver adapter not present for name '" + driverName + "'"));
|
||||
|
||||
NBConfigModel combinedModel = yamlmodel;
|
||||
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
|
||||
if (adapter instanceof NBConfigurable configurable) {
|
||||
NBConfigModel adapterModel = configurable.getConfigModel();
|
||||
supersetConfig.add(adapterModel);
|
||||
|
||||
combinedModel = adapterModel.add(yamlmodel);
|
||||
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
configurable.applyConfig(combinedConfig);
|
||||
}
|
||||
mappers.put(driverName, adapter.getOpMapper());
|
||||
return adapter;
|
||||
}
|
||||
|
||||
// private DriverAdapter<CycleOp<?>, Space> loadAdapter(
|
||||
// String driverName,
|
||||
// NBConfigModel yamlmodel,
|
||||
// NBConfigModel supersetConfig,
|
||||
// ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
|
||||
// )
|
||||
// {
|
||||
// DriverAdapter<CycleOp<?>, Space>
|
||||
// adapter =
|
||||
// Optional.of(driverName).flatMap(name -> ServiceSelector.of(
|
||||
// name,
|
||||
// ServiceLoader.load(DriverAdapterLoader.class)).get())
|
||||
// .map(l -> l.load(this, NBLabels.forKV()))
|
||||
// .orElseThrow(() -> new OpConfigError("driver adapter not present for name '" +
|
||||
// driverName +
|
||||
// "'"));
|
||||
//
|
||||
// NBConfigModel combinedModel = yamlmodel;
|
||||
// NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
//
|
||||
// if (adapter instanceof NBConfigurable configurable) {
|
||||
// NBConfigModel adapterModel = configurable.getConfigModel();
|
||||
// supersetConfig.add(adapterModel);
|
||||
//
|
||||
// combinedModel = adapterModel.add(yamlmodel);
|
||||
// combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
// configurable.applyConfig(combinedConfig);
|
||||
// }
|
||||
// mappers.put(driverName, adapter.getOpMapper());
|
||||
// return adapter;
|
||||
// }
|
||||
|
||||
public void initActivity() {
|
||||
initOrUpdateRateLimiters(this.activityDef);
|
||||
@ -422,42 +479,57 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
}
|
||||
}
|
||||
|
||||
private OpTemplates loadOpTemplates(
|
||||
DriverAdapter<?, ?> defaultDriverAdapter, boolean logged, boolean filtered) {
|
||||
|
||||
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
|
||||
|
||||
OpTemplates templates = loadOpTemplates();
|
||||
OpTemplates filteredOps = templates.matching(filtered ? tagfilter : "", logged);
|
||||
|
||||
if (filteredOps.isEmpty()) {
|
||||
// There were no ops, and it *wasn't* because they were all filtered out.
|
||||
// In this case, let's try to synthesize the ops as long as at least a default driver was provided
|
||||
// But if there were no ops, and there was no default driver provided, we can't continue
|
||||
// There were no ops, and it was because they were all filtered out
|
||||
OpTemplates unfilteredOps = templates.matching("",false);
|
||||
if (!unfilteredOps.isEmpty()) {
|
||||
String message = "There were no active op templates with tag filter '" + tagfilter + "', since all " + unfilteredOps.size() + " were filtered out. Examine the session log for details";
|
||||
NBAdvisorOutput.test(message);
|
||||
//throw new BasicError(message);
|
||||
}
|
||||
if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
|
||||
filteredOps = sotp.getSyntheticOpTemplates(templates, this.activityDef.getParams());
|
||||
Objects.requireNonNull(filteredOps);
|
||||
if (filteredOps.isEmpty()) {
|
||||
throw new BasicError(
|
||||
"Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' + " but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
|
||||
}
|
||||
} else {
|
||||
throw new BasicError("""
|
||||
No op templates were provided. You must provide one of these activity parameters:
|
||||
1) workload=some.yaml
|
||||
2) op='inline template'
|
||||
3) driver=stdout (or any other drive that can synthesize ops)""");
|
||||
}
|
||||
}
|
||||
return filteredOps;
|
||||
}
|
||||
// private OpTemplates loadOpTemplates(
|
||||
// DriverAdapter<?, ?> defaultDriverAdapter,
|
||||
// boolean logged,
|
||||
// boolean filtered
|
||||
// )
|
||||
// {
|
||||
//
|
||||
// String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
|
||||
//
|
||||
// OpTemplates templates = loadOpTemplates();
|
||||
// OpTemplates filteredOps = templates.matching(filtered ? tagfilter : "", logged);
|
||||
//
|
||||
// if (filteredOps.isEmpty()) {
|
||||
// // There were no ops, and it *wasn't* because they were all filtered out.
|
||||
// // In this case, let's try to synthesize the ops as long as at least a default driver
|
||||
// // was provided
|
||||
// // But if there were no ops, and there was no default driver provided, we can't continue
|
||||
// // There were no ops, and it was because they were all filtered out
|
||||
// OpTemplates unfilteredOps = templates.matching("", false);
|
||||
// if (!unfilteredOps.isEmpty()) {
|
||||
// String
|
||||
// message =
|
||||
// "There were no active op templates with tag filter '" +
|
||||
// tagfilter +
|
||||
// "', since all " +
|
||||
// unfilteredOps.size() +
|
||||
// " were filtered out. Examine the session log for details";
|
||||
// NBAdvisorOutput.test(message);
|
||||
// // throw new BasicError(message);
|
||||
// }
|
||||
// if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
|
||||
// filteredOps = sotp.getSyntheticOpTemplates(templates, this.activityDef.getParams());
|
||||
// Objects.requireNonNull(filteredOps);
|
||||
// if (filteredOps.isEmpty()) {
|
||||
// throw new BasicError("Attempted to create synthetic ops from driver '" +
|
||||
// defaultDriverAdapter.getAdapterName() +
|
||||
// '\'' +
|
||||
// " but no ops were created. You must provide either a workload" +
|
||||
// " or an op parameter. Activities require op templates.");
|
||||
// }
|
||||
// } else {
|
||||
// throw new BasicError("""
|
||||
// No op templates were provided. You must provide one of these activity parameters:
|
||||
// 1) workload=some.yaml
|
||||
// 2) op='inline template'
|
||||
// 3) driver=stdout (or any other drive that can synthesize ops)\
|
||||
// """);
|
||||
// }
|
||||
// }
|
||||
// return filteredOps;
|
||||
// }
|
||||
|
||||
/**
|
||||
Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been
|
||||
@ -614,10 +686,9 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
protected OpTemplates loadOpTemplates() {
|
||||
OpsDocList opsDocs = null;
|
||||
try {
|
||||
String op = activityDef.getParams().getOptionalString("op").orElse(null);
|
||||
String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(
|
||||
null);
|
||||
String workload = activityDef.getParams().getOptionalString("workload").orElse(null);
|
||||
String op = config.getOptional("op").orElse(null);
|
||||
String stmt = config.getOptional("stmt", "statement").orElse(null);
|
||||
String workload = config.getOptional("workload").orElse(null);
|
||||
|
||||
if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) {
|
||||
throw new OpConfigError(
|
||||
|
@ -0,0 +1,62 @@
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigurable;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.nb.api.labels.NBLabels;
|
||||
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public class AdapterResolver
|
||||
// implements BiFunction<String, NBConfiguration, DriverAdapter<? extends CycleOp<?>, Space>>
|
||||
{
|
||||
public DriverAdapter<? extends CycleOp<?>, Space> apply(
|
||||
NBComponent parent,
|
||||
String name,
|
||||
NBConfiguration configSuperset
|
||||
)
|
||||
{
|
||||
ServiceSelector<DriverAdapterLoader>
|
||||
loader =
|
||||
ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class));
|
||||
DriverAdapterLoader
|
||||
dal =
|
||||
loader.get()
|
||||
.orElseThrow(() -> new OpConfigError("No DriverAdapterLoader found for " + name));
|
||||
DriverAdapter<CycleOp<?>, Space> adapter = dal.load(parent, NBLabels.forKV());
|
||||
|
||||
if (adapter instanceof NBConfigurable configurable) {
|
||||
NBConfigModel adapterModel = configurable.getConfigModel();
|
||||
NBConfiguration matchingConfig = adapterModel.matchConfig(configSuperset.getMap());
|
||||
configurable.applyConfig(matchingConfig);
|
||||
}
|
||||
|
||||
return adapter;
|
||||
}
|
||||
}
|
@ -0,0 +1,40 @@
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public class DispenserResolver
|
||||
implements BiFunction<DriverAdapter<? extends CycleOp<?>, Space>, ParsedOp, OpDispenser<? extends CycleOp<?>>>
|
||||
{
|
||||
@Override
|
||||
public OpDispenser<? extends CycleOp<?>> apply(
|
||||
DriverAdapter<? extends CycleOp<?>, Space> adapter,
|
||||
ParsedOp pop
|
||||
)
|
||||
{
|
||||
return adapter.getOpMapper().apply(adapter, pop, adapter.getSpaceFunc(pop));
|
||||
}
|
||||
}
|
@ -0,0 +1,174 @@
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.engine.util.Tagged;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/// This type represents the process of op synthesis, including all upstream sources
|
||||
/// of data or configuration. This makes the stages explicit, identifies the functional
|
||||
/// patterns used to create a _higher-order_ op synthesis, and allows for a fully lazy-init
|
||||
/// form to be used. It also makes it possible for some op mapping logic to include any necessary
|
||||
/// views of other ops, such as those used by reference to keep complex configurations DRY.
|
||||
///
|
||||
/// This logic is __ENTIRELY__ setup logic, acting as an implicity op compiler before selected ops
|
||||
/// are ready to be used in an activity. Most of the stack constructions here will fall away to GC
|
||||
/// after the final assembled structures are captured at the end of op synthesis. As such, __DO
|
||||
/// NOT__
|
||||
/// worry about over-presenting or being overly pedantic in this layer of code. It is more
|
||||
/// important
|
||||
/// to illustrate with clarity how this process works rather than to obsess over terseness or
|
||||
/// optimizations here.
|
||||
///
|
||||
/// To ensure consistency, these methods should be guarded as synchronized. It is not gauranteed
|
||||
/// that this
|
||||
/// system will not be used in a multi-threaded way in the future, and this is a simple way to
|
||||
/// ensure
|
||||
/// atomicity during the graph construction. (Remember, this is not a critical performance path.)
|
||||
///
|
||||
/// Further, to avoid auto-recursion, each step of
|
||||
/// resolution is guarded by a boolean which is used to detect when a particular __unresolved__ node
|
||||
/// in the dependency
|
||||
/// graph is resolved again. Basic graph theory says this should never happen unless some element is
|
||||
/// dependent upon
|
||||
/// itself through a circular reference. (in graph vernacular, a _cycle_, but that would be
|
||||
/// confusing!) A
|
||||
/// future improvement to this error-detection feature should be able to
|
||||
/// look at the stack and extract the signature of traversal without the need to add explicit
|
||||
/// traversal tracking data.
|
||||
/// Even the latter may be useful if users start to build too many circular configurations.
|
||||
public class OpResolution implements Tagged {
|
||||
|
||||
/// starting point
|
||||
private OpTemplate template;
|
||||
private final Activity activity;
|
||||
|
||||
/// adapter is resolved from the 'driver' op field, deferring to the config (activityConfig) for
|
||||
/// the default value if needed. (it usually comes from there)
|
||||
private boolean resolvingAdapter;
|
||||
private final AdapterResolver adapterF;
|
||||
private DriverAdapter<? extends CycleOp<?>, ? extends Space> adapter = null;
|
||||
|
||||
/// parsed ops are object-level _normalized_ APIs around the op template and activity params
|
||||
private boolean resolvingParsedOp = false;
|
||||
private ParsedOpResolver parsedOpResolver;
|
||||
private ParsedOp parsedOp;
|
||||
|
||||
/// op dispensers yield executable ops from cycle numbers
|
||||
/// NOTE: the op mapping layer is folded into this, as mappers are 1:1 with adapters
|
||||
private boolean resolvingDispenser = false;
|
||||
private final DispenserResolver dispenserResolver;
|
||||
private OpDispenser<? extends CycleOp<?>> dispenser;
|
||||
|
||||
/// This can be provided to any layer which neds access to context info during resolution.
|
||||
/// It might be better as a part of a service or context view which is handed in to every layer
|
||||
private final OpResolverBank resolver;
|
||||
|
||||
public OpResolution(
|
||||
Activity activity,
|
||||
AdapterResolver adapterResolver,
|
||||
OpTemplate template,
|
||||
ParsedOpResolver parsedOpResolver,
|
||||
DispenserResolver dispenserResolver,
|
||||
OpResolverBank resolver
|
||||
)
|
||||
{
|
||||
this.activity = activity;
|
||||
this.adapterF = adapterResolver;
|
||||
this.template = template;
|
||||
this.parsedOpResolver = parsedOpResolver;
|
||||
this.dispenserResolver = dispenserResolver;
|
||||
this.resolver = resolver;
|
||||
}
|
||||
|
||||
/// This is the top-level product of this resolution layer. An [OpDispenser] can produce a
|
||||
/// stable
|
||||
/// and type-specific [CycleOp] for a given coordinate. It will call other more primitive layers
|
||||
/// as needed
|
||||
/// to get the components or component functions around which to build the final dispenser.
|
||||
/// These elements may be
|
||||
/// initialized by side-effect of other operations being resolved. In every case, once an
|
||||
/// element of any op
|
||||
/// resolution is resolved, it should be considered _defined_, and not resolved again.
|
||||
public synchronized <OPTYPE extends CycleOp<?>, SPACETYPE extends Space> OpDispenser<OPTYPE> resolveDispenser() {
|
||||
if (resolvingDispenser == true) {
|
||||
throw new OpConfigError("Auto-recursion while resolving dispenser for op '" +
|
||||
template.getName() +
|
||||
"'");
|
||||
}
|
||||
resolvingDispenser = true;
|
||||
if (dispenser == null) {
|
||||
this.dispenser = dispenserResolver.apply(resolveAdapter(), getParsedOp());
|
||||
// ParsedOp pop = resolveParsedOp();
|
||||
// DriverAdapter<OPTYPE, SPACETYPE> adapter = (DriverAdapter<OPTYPE, SPACETYPE>) resolveAdapter();
|
||||
// // TODO verify whether or not it is necessary to ensure static mapping between adapter and mapper instance
|
||||
// this.dispenser = adapter.getOpMapper().apply(adapter, pop, adapter.getSpaceFunc(pop));
|
||||
}
|
||||
resolvingDispenser = false;
|
||||
return (OpDispenser<OPTYPE>) dispenser;
|
||||
}
|
||||
|
||||
/// Converting an op template into a ParsedOp means back-filling it with config data from the
|
||||
/// activity parameter.
|
||||
public synchronized <OPTYPE extends CycleOp<?>, SPACETYPE extends Space> ParsedOp getParsedOp() {
|
||||
if (resolvingParsedOp == true) {
|
||||
throw new OpConfigError("Auto-recursion while resolving dispenser for op '" +
|
||||
template.getName() +
|
||||
"'");
|
||||
}
|
||||
resolvingParsedOp = true;
|
||||
if (parsedOp == null) {
|
||||
this.parsedOp = parsedOpResolver.apply(activity, resolveAdapter(), template);
|
||||
}
|
||||
resolvingParsedOp = false;
|
||||
return parsedOp;
|
||||
}
|
||||
|
||||
/// Each op template is interpreted by a specific [OpMapper] designated by the `driver` op
|
||||
/// field. Thus
|
||||
/// the associated driver needs to be loaded.
|
||||
private synchronized <OPTYPE extends CycleOp<?>, SPACETYPE extends Space> DriverAdapter<OPTYPE, SPACETYPE> resolveAdapter() {
|
||||
if (resolvingAdapter) {
|
||||
throw new OpConfigError("Auto-recursion while resolving adapter for op '" +
|
||||
template.getName() +
|
||||
"'");
|
||||
}
|
||||
resolvingAdapter = true;
|
||||
String
|
||||
driverName =
|
||||
template.getOptionalStringParam("driver", String.class)
|
||||
.or(() -> activity.getConfig().getOptional("driver")).orElse("stdout");
|
||||
this.adapter = adapterF.apply(activity, driverName, activity.getConfig());
|
||||
resolvingAdapter = false;
|
||||
return (DriverAdapter<OPTYPE, SPACETYPE>) adapter;
|
||||
}
|
||||
|
||||
@Override public Map<String, String> getTags() {
|
||||
return template.getTags();
|
||||
}
|
||||
}
|
@ -1,63 +0,0 @@
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/// TODO: Auto-inject the default driver name into any op that doesn't have it set
|
||||
///
|
||||
/// ## Requirements for core
|
||||
/// * All lookups must be lazy init
|
||||
/// * All lookups must be cached
|
||||
/// * All cache state must be extractable as plan
|
||||
///
|
||||
/// ## Requirements for callers
|
||||
/// * Callers must be able to look up an op template by tag filter
|
||||
/// * Callers must be able to look up a parsed op by tag filter
|
||||
public class OpResolver {
|
||||
private OpTemplates opTemplates;
|
||||
private final Supplier<OpTemplates> optSupplier;
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist = new ArrayList<>();
|
||||
|
||||
public OpResolver(Supplier<OpTemplates> optSupplier) {
|
||||
this.optSupplier = optSupplier;
|
||||
}
|
||||
|
||||
/// Find a required op template matching a tag filter
|
||||
public synchronized OpTemplate findOne(String tagFilter) {
|
||||
return findOneOptional(tagFilter).orElseThrow(
|
||||
() -> new OpConfigError("No op found for " + tagFilter));
|
||||
}
|
||||
|
||||
private synchronized void load() {
|
||||
if (opTemplates==null) {
|
||||
opTemplates=optSupplier.get();
|
||||
}
|
||||
}
|
||||
|
||||
/// Find an optional op template matching a tag filter
|
||||
public synchronized Optional<OpTemplate> findOneOptional(String tagFilter) {
|
||||
List<OpTemplate> matching = lookup(tagFilter);
|
||||
if (matching.size() > 1) {
|
||||
throw new OpConfigError(
|
||||
"Found more than one op templates with the tag filter: " + tagFilter);
|
||||
}
|
||||
return matching.size() == 1 ? Optional.of(matching.get(0)) : Optional.empty();
|
||||
}
|
||||
|
||||
/// Find any op templates matching a tag filter
|
||||
public synchronized List<OpTemplate> lookup(String tagFilter) {
|
||||
load();
|
||||
TagFilter tf = new TagFilter(tagFilter);
|
||||
return opTemplates.stream().filter(tf::matchesTagged).toList();
|
||||
}
|
||||
}
|
@ -0,0 +1,71 @@
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class OpResolverBank {
|
||||
private final OpTemplates optpl;
|
||||
private final List<OpResolution> resolvers = new ArrayList<>();
|
||||
private final TagFilter tagFilter;
|
||||
|
||||
public OpResolverBank(
|
||||
Activity activity,
|
||||
AdapterResolver adapterF,
|
||||
OpTemplates reference,
|
||||
String tagFilter,
|
||||
DispenserResolver dispF,
|
||||
ParsedOpResolver popF,
|
||||
NBConfiguration config
|
||||
)
|
||||
{
|
||||
this.optpl = reference;
|
||||
this.tagFilter = new TagFilter(tagFilter);
|
||||
OpTemplates activeOpTemplates = reference.matching(tagFilter, false);
|
||||
for (OpTemplate opTemplate : activeOpTemplates) {
|
||||
OpResolution
|
||||
opres =
|
||||
new OpResolution(activity, adapterF, opTemplate, popF, dispF, this);
|
||||
resolvers.add(opres);
|
||||
}
|
||||
}
|
||||
|
||||
public List<OpDispenser<? extends CycleOp<?>>> resolveDispensers() {
|
||||
List<OpDispenser<? extends CycleOp<?>>> dispensers = new ArrayList<>(resolvers.size());
|
||||
for (OpResolution resolver : resolvers) {
|
||||
OpDispenser<? extends CycleOp<?>> dispenser = resolver.resolveDispenser();
|
||||
dispensers.add(dispenser);
|
||||
}
|
||||
return dispensers;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,44 @@
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public class ParsedOpResolver {
|
||||
public ParsedOp apply(
|
||||
NBComponent parent,
|
||||
DriverAdapter<? extends CycleOp<?>, Space> adapter,
|
||||
OpTemplate tpl
|
||||
)
|
||||
{
|
||||
return new ParsedOp(
|
||||
tpl,
|
||||
adapter.getConfiguration().getMap(),
|
||||
List.of(adapter.getPreprocessor()),
|
||||
parent);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user