diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityconfig/yaml/OpTemplates.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityconfig/yaml/OpTemplates.java index 6ceac9194..492fad36a 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityconfig/yaml/OpTemplates.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityconfig/yaml/OpTemplates.java @@ -1,7 +1,26 @@ package io.nosqlbench.adapters.api.activityconfig.yaml; +/* + * Copyright (c) nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpsDocList; import io.nosqlbench.nb.api.tagging.TagFilter; +import java.util.function.Function; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.jetbrains.annotations.NotNull; @@ -45,12 +64,14 @@ public class OpTemplates implements Iterable { * including the inherited and overridden values from this doc and the parent block. */ public OpTemplates matching(String tagFilterSpec, boolean logit) { - TagFilter ts = new TagFilter(tagFilterSpec); + return matching(new TagFilter(tagFilterSpec), logit); + } + public OpTemplates matching(TagFilter tagFilter, boolean logit) { List matchingOpTemplates = new ArrayList<>(); List matchlog = new ArrayList<>(); templates.stream() - .map(ts::matchesTaggedResult) + .map(tagFilter::matchesTaggedResult) .peek(r -> matchlog.add(r.getLog())) .filter(TagFilter.Result::matched) .map(TagFilter.Result::getElement) @@ -63,6 +84,7 @@ public class OpTemplates implements Iterable { } return new OpTemplates(matchingOpTemplates,opsDocList); + } public Map getDocBindings() { @@ -89,4 +111,10 @@ public class OpTemplates implements Iterable { public boolean isEmpty() { return this.templates.isEmpty(); } + + public OpTemplates transform(Function transformF) { + List transformed = this.templates.stream().map(t -> transformF.apply(t)).toList(); + return new OpTemplates(transformed,opsDocList); + } + } diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java index 9b21c2a1f..36fda169c 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/BaseOpDispenser.java @@ -84,11 +84,13 @@ public abstract class BaseOpDispenser, SPACE extends Space */ private final CycleFunction _verifier; private final ThreadLocal> tlVerifier; + private final long ratio; protected BaseOpDispenser(final NBComponent parentC, final ParsedOp op, LongFunction spaceF) { super(parentC); opName = op.getName(); labels = op.getLabels(); + this.ratio = op.takeOptionalStaticValue("ratio", Long.class).orElse(1L); this.timerStarts = op.takeOptionalStaticValue(START_TIMERS, String.class) .map(s -> s.split(", *")) @@ -240,4 +242,9 @@ public abstract class BaseOpDispenser, SPACE extends Space public List getValidator(NBComponent parent, ParsedOp pop, OpLookup lookup) { return CoreOpValidators.getValidator(this, pop, lookup); } + + @Override + public long getRatio() { + return this.ratio; + } } diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/OpDispenser.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/OpDispenser.java index 4df163ae4..372028080 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/OpDispenser.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/OpDispenser.java @@ -109,4 +109,6 @@ public interface OpDispenser> extends LongFunction getVerifier(); String getOpName(); + + long getRatio(); } diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/planning/DerivedSequence.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/planning/DerivedSequence.java new file mode 100644 index 000000000..74d5d477d --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/planning/DerivedSequence.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2022-2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.api.activityapi.planning; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +/// This version of an [[OpSequence]] allows a sequence to be derived from another sequence +/// based on a mapping function. This is done lazily by default, to allow for incremental +/// initialization. +public class DerivedSequence implements OpSequence { + private final OpSequence baseTypeSequence; + private final SequencerType type; + private final List elems; + private final int[] seq; + private final Function deriveF; + + public DerivedSequence(OpSequence baseTypeSequence, Function deriveF) { + this.baseTypeSequence = baseTypeSequence; + this.deriveF = deriveF; + this.type = baseTypeSequence.getSequencerType(); + this.elems = new ArrayList<>(baseTypeSequence.getOps().size()); + this.seq = baseTypeSequence.getSequence(); + } + + @Override + public DERIVED apply(long selector) { + int index = (int) (selector % seq.length); + index = seq[index]; + return elems.get(index); + } + + public DERIVED derive(long selector) { + int index = (int) (selector % seq.length); + if (elems.get(index)==null) { + elems.set(index,this.deriveF.apply(baseTypeSequence.getOps().get(index))); + } + return elems.get(index); + } + + @Override + public List getOps() { + return elems; + } + + @Override + public int[] getSequence() { + return seq; + } + + public SequencerType getSequencerType() { + return type; + } + + @Override + public DerivedSequence transform(Function func) { + return new DerivedSequence(this, func); + } + + @Override + public String toString() { + return "seq len="+seq.length + ", LUT=" + Arrays.toString(this.seq); + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/planning/OpSequence.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/planning/OpSequence.java index 3daa3a0a6..6941d9575 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/planning/OpSequence.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityapi/planning/OpSequence.java @@ -40,6 +40,7 @@ public interface OpSequence extends LongFunction { */ int[] getSequence(); + SequencerType getSequencerType(); /** * Map this OpSequence to another type of OpSequence. * @param func The transformation function from this to another type diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/Activity.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/Activity.java index 38938bb2e..b81a9e87d 100644 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/Activity.java +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/Activity.java @@ -39,6 +39,7 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.output.Output; +import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner; import io.nosqlbench.engine.api.activityapi.planning.SequencerType; import io.nosqlbench.engine.api.activityapi.simrate.*; @@ -54,30 +55,33 @@ import io.nosqlbench.engine.core.lifecycle.commands.CMD_start; import io.nosqlbench.engine.core.lifecycle.commands.CMD_stop; import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult; import io.nosqlbench.engine.core.lifecycle.session.NBSession; +import io.nosqlbench.nb.annotations.ServiceSelector; import io.nosqlbench.nb.api.advisor.NBAdvisorOutput; -import io.nosqlbench.nb.api.components.status.NBStatusComponent; -import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; -import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; -import io.nosqlbench.nb.api.lifecycle.Shutdownable; import io.nosqlbench.nb.api.components.core.NBComponent; -import io.nosqlbench.nb.api.config.standard.*; -import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef; -import io.nosqlbench.nb.api.errors.BasicError; -import io.nosqlbench.nb.api.errors.OpConfigError; -import io.nosqlbench.nb.api.labels.NBLabels; import io.nosqlbench.nb.api.components.events.NBEvent; import io.nosqlbench.nb.api.components.events.ParamChange; import io.nosqlbench.nb.api.components.events.SetThreads; -import io.nosqlbench.engine.api.activityapi.planning.OpSequence; -import io.nosqlbench.nb.annotations.ServiceSelector; +import io.nosqlbench.nb.api.components.status.NBStatusComponent; +import io.nosqlbench.nb.api.config.standard.*; +import io.nosqlbench.nb.api.engine.activityimpl.ActivityConfig; +import io.nosqlbench.nb.api.engine.activityimpl.CyclesSpec; +import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; +import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory; +import io.nosqlbench.nb.api.errors.BasicError; +import io.nosqlbench.nb.api.errors.OpConfigError; +import io.nosqlbench.nb.api.labels.NBLabels; +import io.nosqlbench.nb.api.lifecycle.Shutdownable; import io.nosqlbench.nb.api.tagging.TagFilter; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiFunction; +import java.util.function.Function; import java.util.function.LongFunction; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + /// An [[Activity]] is a flywheel of operations. Each activity consumes ordinals /// from a specified interval, maps them to executable operations via _op synthesis_ as determined /// by the op templates supplied by the user, and executes those operations. @@ -115,11 +119,8 @@ public class Activity extends NBSt public Activity(NBComponent parent, ActivityDef activityDef) { - super( - parent, - NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels()) - ); - this.activityDef = activityDef; + this.applyConfig(config); + this.sequence = initSequence(); this.metrics = new ActivityMetrics(this); getParams().set( @@ -130,67 +131,114 @@ public class Activity extends NBSt "Unable to determine name of activity from " + activityDef)) ); - OpsDocList workload; - Optional yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); + private OpSequence>> initSequence() { + // this.activityDef = activityDef; + // this.metrics = new ActivityMetrics(this); NBConfigModel yamlmodel = yaml_loc.map(path -> { return OpsLoader.loadPath( path, new LinkedHashMap<>(activityDef.getParams()), "activities").getConfigModel(); }).orElse(ConfigModel.of(Activity.class).asReadOnly()); - Optional defaultDriverName = activityDef.getParams().getOptionalString("driver"); - Optional> defaultAdapter = activityDef.getParams().getOptionalString( - "driver").flatMap(name -> ServiceSelector.of( - name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map( - l -> l.load(this, NBLabels.forKV())); - if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) { - throw new BasicError( - "Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this driver adapter. " + "Change 'false' for the driver in " + "'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml' first."); + //region This region contains all of the refactored op synthesis logic + OpTemplates opTemplatesRef = loadOpTemplates(); + + /// How to load a named [DriverAdapter] with component parentage and labels, given + /// the driver name and the activity (cross-driver) configur ation + AdapterResolver adapterResolver = new AdapterResolver(); + ConcurrentHashMap adapterCache = new ConcurrentHashMap<>(); + + /// Promote the driver adapter function into a cached version + Function, Space>> adapterF + = (name) -> adapterCache.computeIfAbsent( + name, + cacheName -> adapterResolver.apply(this, name, this.config)); + + /// How to get a parsed op, given an op template and an activity. + /// A parsed op depends on back-fill from the activity params, assuming those params + /// are included in the activity's [[NBConfiguration]], but the params are also filtered + /// through the associated [[DriverAdapter]]'s configuration. + ParsedOpResolver parsedOpF = new ParsedOpResolver(); + + /// How to get an op dispenser, given an adapter and a parsed op + /// The cached [Space] mechanism is included within the adapter's API, and is specific to each parsed op, + /// since this is a dynamic op field + DispenserResolver dispenserResolver = new DispenserResolver(); + + OpResolverBank orb = new OpResolverBank( + this, adapterResolver, opTemplatesRef, config.get("tags"), dispenserResolver, parsedOpF, + config); + List> dispensers = orb.resolveDispensers(); + + /// TODO: Here, we have resolved the dispensers. The next step is to add any modifiers to them + /// as composed functions for things like dry-run, etc. + + + /// TODO: Here, we have resolved the dispensers and their modifiers. The next step is to create the LUT + /// for the conventional [[OpSequence]], although other non-deterministic op selection + /// methods should also be supported. + + SequencerType sequencerType = config.getOptional("seq").map(SequencerType::valueOf) + .orElse(SequencerType.bucket); + + SequencePlanner>> planner = new SequencePlanner<>( + sequencerType); + + for (OpDispenser dispenser : dispensers) { + planner.addOp(dispenser, d -> d.getRatio()); } + OpSequence>> sequence = planner.resolve(); + return sequence; - OpResolver opResolver = new OpResolver(() -> loadOpTemplates()); + // TODO: Perhaps, op templates should be split into core/reserved partition and another, with a proxy + // object retained for the core elements + + //endregion - // HERE, op templates are loaded before drivers are loaded -// List opTemplates = loadOpTemplates(defaultAdapter.orElse(null), false); - List, Space>> adapterlist = new ArrayList<>(); + // Optional defaultDriverName = activityDef.getParams().getOptionalString("driver"); + // Optional> defaultAdapter = activityDef.getParams() + // .getOptionalString("driver") + // .flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get()) + // .map(l -> l.load(this, NBLabels.forKV())); + // + // if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) { + // throw new BasicError("Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this " + "driver adapter. Change" + " 'false' for the driver in" + " './nb-adapters/pom.xml' and" + " './nb-adapters/nb-adapters-included/pom.xml' first."); + // } - NBConfigModel supersetConfig = ConfigModel.of(Activity.class).add(yamlmodel); - Optional defaultDriverOption = defaultDriverName; - ConcurrentHashMap, ? extends Space>> mappers = new ConcurrentHashMap<>(); + // NBConfigModel supersetConfig = ConfigModel.of(Activity.class).add(yamlmodel); + // Optional defaultDriverOption = defaultDriverName; + // ConcurrentHashMap, ? extends Space>> mappers = new ConcurrentHashMap<>(); - List allParsedOps = loadOpTemplates( - defaultAdapter.orElse(null), false, false).stream().map(ot -> upconvert( - ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)).toList(); + // List allParsedOps = loadOpTemplates(defaultAdapter.orElse(null), false, false).stream() + // .map(ot -> upconvert(ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)) + // .toList(); - OpLookup lookup = new OpLookupService(() -> allParsedOps); + // OpLookup lookup = new OpLookupService(() -> allParsedOps); - TagFilter ts = new TagFilter(activityDef.getParams().getOptionalString("tags").orElse("")); - List activeParsedOps = ts.filter(allParsedOps); + // TagFilter ts = new TagFilter(activityDef.getParams().getOptionalString("tags").orElse("")); + // List activeParsedOps = ts.filter(allParsedOps); - if (defaultDriverOption.isPresent()) { - long matchingDefault = mappers.keySet().stream().filter( - n -> n.equals(defaultDriverOption.get())).count(); - if (0 == matchingDefault) { - logger.warn( - "All op templates used a different driver than the default '{}'", - defaultDriverOption.get() - ); - } - } + // if (defaultDriverOption.isPresent()) { + // long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count(); + // if (0 == matchingDefault) { + // logger.warn( + // "All op templates used a different driver than the default '{}'", + // defaultDriverOption.get() + // ); + // } + // } - try { - sequence = createOpSourceFromParsedOps(adapterlist, activeParsedOps, lookup); - } catch (Exception e) { - if (e instanceof OpConfigError) { - throw e; - } - throw new OpConfigError( - "Error mapping workload template to operations: " + e.getMessage(), null, e); - } - initOpsMetrics(); + // try { + // sequence = createOpSourceFromParsedOps(adapterlist, activeParsedOps, lookup); + // } catch (Exception e) { + // if (e instanceof OpConfigError) { + // throw e; + // } + // throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e); + // } } @@ -212,16 +260,11 @@ public class Activity extends NBSt } - - protected OpSequence>> createOpSourceFromParsedOps( - List, Space>> adapters, List pops, OpLookup opLookup) { - return createOpSourceFromParsedOps2(adapters, pops, opLookup); - } - - protected OpSequence>> createOpSourceFromParsedOps2( -// Map> adapterCache, -// Map> mapperCache, - List, Space>> adapters, List pops, OpLookup opLookup) { + protected OpSequence>> createOpSourceFromParsedOps2(List, Space>> adapters, + List pops, + OpLookup opLookup + ) + { try { List ratios = new ArrayList<>(pops.size()); @@ -231,8 +274,8 @@ public class Activity extends NBSt ratios.add(ratio); } - SequencerType sequencerType = getParams().getOptionalString("seq").map( - SequencerType::valueOf).orElse(SequencerType.bucket); + SequencerType sequencerType = config.getOptional("seq").map(SequencerType::valueOf) + .orElse(SequencerType.bucket); SequencePlanner>> planner = new SequencePlanner<>( sequencerType); @@ -287,57 +330,71 @@ public class Activity extends NBSt return activityDef.getParams(); } + // private ParsedOp upconvert( + // OpTemplate ot, + // Optional defaultDriverOption, + // NBConfigModel yamlmodel, + // NBConfigModel supersetConfig, + // ConcurrentHashMap, ? extends Space>> mappers, + // List, Space>> adapterlist + // ) + // { + // // ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), + // // List.of(), this); + // String + // driverName = + // ot.getOptionalStringParam("driver", String.class) + // .or(() -> ot.getOptionalStringParam("type", String.class)) + // .or(() -> defaultDriverOption).orElseThrow(() -> new OpConfigError( + // "Unable to identify driver name for op template:\n" + ot)); + // + // DriverAdapter, Space> + // adapter = + // adapters.computeIfAbsent( + // driverName, + // dn -> loadAdapter(dn, yamlmodel, supersetConfig, mappers)); + // supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap()); + // adapterlist.add(adapter); + // + // ParsedOp + // pop = + // new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this); + // Optional discard = pop.takeOptionalStaticValue("driver", String.class); + // + // return pop; + // } - private ParsedOp upconvert( - OpTemplate ot, Optional defaultDriverOption, NBConfigModel yamlmodel, - NBConfigModel supersetConfig, - ConcurrentHashMap, ? extends Space>> mappers, - List, Space>> adapterlist - ) { - // ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this); - String driverName = ot.getOptionalStringParam("driver", String.class).or( - () -> ot.getOptionalStringParam("type", String.class)).or( - () -> defaultDriverOption).orElseThrow( - () -> new OpConfigError("Unable to identify driver name for op template:\n" + ot)); - - DriverAdapter, Space> adapter = adapters.computeIfAbsent( - driverName, dn -> loadAdapter( - dn, yamlmodel, supersetConfig, mappers)); - supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap()); - adapterlist.add(adapter); - - ParsedOp pop = new ParsedOp( - ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this); - Optional discard = pop.takeOptionalStaticValue("driver", String.class); - - return pop; - } - - private DriverAdapter, Space> loadAdapter( - String driverName, NBConfigModel yamlmodel, NBConfigModel supersetConfig, - ConcurrentHashMap, ? extends Space>> mappers - ) { - DriverAdapter, Space> adapter = Optional.of(driverName).flatMap( - name -> ServiceSelector.of( - name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map( - l -> l.load(this, NBLabels.forKV())).orElseThrow( - () -> new OpConfigError("driver adapter not present for name '" + driverName + "'")); - - NBConfigModel combinedModel = yamlmodel; - NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams()); - - if (adapter instanceof NBConfigurable configurable) { - NBConfigModel adapterModel = configurable.getConfigModel(); - supersetConfig.add(adapterModel); - - combinedModel = adapterModel.add(yamlmodel); - combinedConfig = combinedModel.matchConfig(activityDef.getParams()); - configurable.applyConfig(combinedConfig); - } - mappers.put(driverName, adapter.getOpMapper()); - return adapter; - } - + // private DriverAdapter, Space> loadAdapter( + // String driverName, + // NBConfigModel yamlmodel, + // NBConfigModel supersetConfig, + // ConcurrentHashMap, ? extends Space>> mappers + // ) + // { + // DriverAdapter, Space> + // adapter = + // Optional.of(driverName).flatMap(name -> ServiceSelector.of( + // name, + // ServiceLoader.load(DriverAdapterLoader.class)).get()) + // .map(l -> l.load(this, NBLabels.forKV())) + // .orElseThrow(() -> new OpConfigError("driver adapter not present for name '" + + // driverName + + // "'")); + // + // NBConfigModel combinedModel = yamlmodel; + // NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams()); + // + // if (adapter instanceof NBConfigurable configurable) { + // NBConfigModel adapterModel = configurable.getConfigModel(); + // supersetConfig.add(adapterModel); + // + // combinedModel = adapterModel.add(yamlmodel); + // combinedConfig = combinedModel.matchConfig(activityDef.getParams()); + // configurable.applyConfig(combinedConfig); + // } + // mappers.put(driverName, adapter.getOpMapper()); + // return adapter; + // } public void initActivity() { initOrUpdateRateLimiters(this.activityDef); @@ -422,42 +479,57 @@ public class Activity extends NBSt } } - private OpTemplates loadOpTemplates( - DriverAdapter defaultDriverAdapter, boolean logged, boolean filtered) { - - String tagfilter = activityDef.getParams().getOptionalString("tags").orElse(""); - - OpTemplates templates = loadOpTemplates(); - OpTemplates filteredOps = templates.matching(filtered ? tagfilter : "", logged); - - if (filteredOps.isEmpty()) { - // There were no ops, and it *wasn't* because they were all filtered out. - // In this case, let's try to synthesize the ops as long as at least a default driver was provided - // But if there were no ops, and there was no default driver provided, we can't continue - // There were no ops, and it was because they were all filtered out - OpTemplates unfilteredOps = templates.matching("",false); - if (!unfilteredOps.isEmpty()) { - String message = "There were no active op templates with tag filter '" + tagfilter + "', since all " + unfilteredOps.size() + " were filtered out. Examine the session log for details"; - NBAdvisorOutput.test(message); - //throw new BasicError(message); - } - if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) { - filteredOps = sotp.getSyntheticOpTemplates(templates, this.activityDef.getParams()); - Objects.requireNonNull(filteredOps); - if (filteredOps.isEmpty()) { - throw new BasicError( - "Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' + " but no ops were created. You must provide either a workload or an op parameter. Activities require op templates."); - } - } else { - throw new BasicError(""" - No op templates were provided. You must provide one of these activity parameters: - 1) workload=some.yaml - 2) op='inline template' - 3) driver=stdout (or any other drive that can synthesize ops)"""); - } - } - return filteredOps; - } + // private OpTemplates loadOpTemplates( + // DriverAdapter defaultDriverAdapter, + // boolean logged, + // boolean filtered + // ) + // { + // + // String tagfilter = activityDef.getParams().getOptionalString("tags").orElse(""); + // + // OpTemplates templates = loadOpTemplates(); + // OpTemplates filteredOps = templates.matching(filtered ? tagfilter : "", logged); + // + // if (filteredOps.isEmpty()) { + // // There were no ops, and it *wasn't* because they were all filtered out. + // // In this case, let's try to synthesize the ops as long as at least a default driver + // // was provided + // // But if there were no ops, and there was no default driver provided, we can't continue + // // There were no ops, and it was because they were all filtered out + // OpTemplates unfilteredOps = templates.matching("", false); + // if (!unfilteredOps.isEmpty()) { + // String + // message = + // "There were no active op templates with tag filter '" + + // tagfilter + + // "', since all " + + // unfilteredOps.size() + + // " were filtered out. Examine the session log for details"; + // NBAdvisorOutput.test(message); + // // throw new BasicError(message); + // } + // if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) { + // filteredOps = sotp.getSyntheticOpTemplates(templates, this.activityDef.getParams()); + // Objects.requireNonNull(filteredOps); + // if (filteredOps.isEmpty()) { + // throw new BasicError("Attempted to create synthetic ops from driver '" + + // defaultDriverAdapter.getAdapterName() + + // '\'' + + // " but no ops were created. You must provide either a workload" + + // " or an op parameter. Activities require op templates."); + // } + // } else { + // throw new BasicError(""" + // No op templates were provided. You must provide one of these activity parameters: + // 1) workload=some.yaml + // 2) op='inline template' + // 3) driver=stdout (or any other drive that can synthesize ops)\ + // """); + // } + // } + // return filteredOps; + // } /** Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been @@ -614,10 +686,9 @@ public class Activity extends NBSt protected OpTemplates loadOpTemplates() { OpsDocList opsDocs = null; try { - String op = activityDef.getParams().getOptionalString("op").orElse(null); - String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse( - null); - String workload = activityDef.getParams().getOptionalString("workload").orElse(null); + String op = config.getOptional("op").orElse(null); + String stmt = config.getOptional("stmt", "statement").orElse(null); + String workload = config.getOptional("workload").orElse(null); if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) { throw new OpConfigError( diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/AdapterResolver.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/AdapterResolver.java new file mode 100644 index 000000000..0cbc5d6d6 --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/AdapterResolver.java @@ -0,0 +1,62 @@ +package io.nosqlbench.engine.api.activityimpl.uniform; + +/* + * Copyright (c) nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import io.nosqlbench.adapter.diag.DriverAdapterLoader; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.adapters.api.activityimpl.uniform.Space; +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; +import io.nosqlbench.nb.annotations.ServiceSelector; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.config.standard.NBConfigModel; +import io.nosqlbench.nb.api.config.standard.NBConfigurable; +import io.nosqlbench.nb.api.config.standard.NBConfiguration; +import io.nosqlbench.nb.api.errors.OpConfigError; +import io.nosqlbench.nb.api.labels.NBLabels; + +import java.util.ServiceLoader; +import java.util.function.BiFunction; + +public class AdapterResolver + // implements BiFunction, Space>> +{ + public DriverAdapter, Space> apply( + NBComponent parent, + String name, + NBConfiguration configSuperset + ) + { + ServiceSelector + loader = + ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)); + DriverAdapterLoader + dal = + loader.get() + .orElseThrow(() -> new OpConfigError("No DriverAdapterLoader found for " + name)); + DriverAdapter, Space> adapter = dal.load(parent, NBLabels.forKV()); + + if (adapter instanceof NBConfigurable configurable) { + NBConfigModel adapterModel = configurable.getConfigModel(); + NBConfiguration matchingConfig = adapterModel.matchConfig(configSuperset.getMap()); + configurable.applyConfig(matchingConfig); + } + + return adapter; + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/DispenserResolver.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/DispenserResolver.java new file mode 100644 index 000000000..0ca534ade --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/DispenserResolver.java @@ -0,0 +1,40 @@ +package io.nosqlbench.engine.api.activityimpl.uniform; + +/* + * Copyright (c) nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.adapters.api.activityimpl.uniform.Space; +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; + +import java.util.function.BiFunction; + +public class DispenserResolver + implements BiFunction, Space>, ParsedOp, OpDispenser>> +{ + @Override + public OpDispenser> apply( + DriverAdapter, Space> adapter, + ParsedOp pop + ) + { + return adapter.getOpMapper().apply(adapter, pop, adapter.getSpaceFunc(pop)); + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolution.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolution.java new file mode 100644 index 000000000..fa07a1e9d --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolution.java @@ -0,0 +1,174 @@ +package io.nosqlbench.engine.api.activityimpl.uniform; + +/* + * Copyright (c) nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.activityimpl.OpMapper; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.adapters.api.activityimpl.uniform.Space; +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.nosqlbench.nb.api.engine.util.Tagged; +import io.nosqlbench.nb.api.errors.OpConfigError; + +import java.util.Map; + +/// This type represents the process of op synthesis, including all upstream sources +/// of data or configuration. This makes the stages explicit, identifies the functional +/// patterns used to create a _higher-order_ op synthesis, and allows for a fully lazy-init +/// form to be used. It also makes it possible for some op mapping logic to include any necessary +/// views of other ops, such as those used by reference to keep complex configurations DRY. +/// +/// This logic is __ENTIRELY__ setup logic, acting as an implicity op compiler before selected ops +/// are ready to be used in an activity. Most of the stack constructions here will fall away to GC +/// after the final assembled structures are captured at the end of op synthesis. As such, __DO +/// NOT__ +/// worry about over-presenting or being overly pedantic in this layer of code. It is more +/// important +/// to illustrate with clarity how this process works rather than to obsess over terseness or +/// optimizations here. +/// +/// To ensure consistency, these methods should be guarded as synchronized. It is not gauranteed +/// that this +/// system will not be used in a multi-threaded way in the future, and this is a simple way to +/// ensure +/// atomicity during the graph construction. (Remember, this is not a critical performance path.) +/// +/// Further, to avoid auto-recursion, each step of +/// resolution is guarded by a boolean which is used to detect when a particular __unresolved__ node +/// in the dependency +/// graph is resolved again. Basic graph theory says this should never happen unless some element is +/// dependent upon +/// itself through a circular reference. (in graph vernacular, a _cycle_, but that would be +/// confusing!) A +/// future improvement to this error-detection feature should be able to +/// look at the stack and extract the signature of traversal without the need to add explicit +/// traversal tracking data. +/// Even the latter may be useful if users start to build too many circular configurations. +public class OpResolution implements Tagged { + + /// starting point + private OpTemplate template; + private final Activity activity; + + /// adapter is resolved from the 'driver' op field, deferring to the config (activityConfig) for + /// the default value if needed. (it usually comes from there) + private boolean resolvingAdapter; + private final AdapterResolver adapterF; + private DriverAdapter, ? extends Space> adapter = null; + + /// parsed ops are object-level _normalized_ APIs around the op template and activity params + private boolean resolvingParsedOp = false; + private ParsedOpResolver parsedOpResolver; + private ParsedOp parsedOp; + + /// op dispensers yield executable ops from cycle numbers + /// NOTE: the op mapping layer is folded into this, as mappers are 1:1 with adapters + private boolean resolvingDispenser = false; + private final DispenserResolver dispenserResolver; + private OpDispenser> dispenser; + + /// This can be provided to any layer which neds access to context info during resolution. + /// It might be better as a part of a service or context view which is handed in to every layer + private final OpResolverBank resolver; + + public OpResolution( + Activity activity, + AdapterResolver adapterResolver, + OpTemplate template, + ParsedOpResolver parsedOpResolver, + DispenserResolver dispenserResolver, + OpResolverBank resolver + ) + { + this.activity = activity; + this.adapterF = adapterResolver; + this.template = template; + this.parsedOpResolver = parsedOpResolver; + this.dispenserResolver = dispenserResolver; + this.resolver = resolver; + } + + /// This is the top-level product of this resolution layer. An [OpDispenser] can produce a + /// stable + /// and type-specific [CycleOp] for a given coordinate. It will call other more primitive layers + /// as needed + /// to get the components or component functions around which to build the final dispenser. + /// These elements may be + /// initialized by side-effect of other operations being resolved. In every case, once an + /// element of any op + /// resolution is resolved, it should be considered _defined_, and not resolved again. + public synchronized , SPACETYPE extends Space> OpDispenser resolveDispenser() { + if (resolvingDispenser == true) { + throw new OpConfigError("Auto-recursion while resolving dispenser for op '" + + template.getName() + + "'"); + } + resolvingDispenser = true; + if (dispenser == null) { + this.dispenser = dispenserResolver.apply(resolveAdapter(), getParsedOp()); + // ParsedOp pop = resolveParsedOp(); + // DriverAdapter adapter = (DriverAdapter) resolveAdapter(); + // // TODO verify whether or not it is necessary to ensure static mapping between adapter and mapper instance + // this.dispenser = adapter.getOpMapper().apply(adapter, pop, adapter.getSpaceFunc(pop)); + } + resolvingDispenser = false; + return (OpDispenser) dispenser; + } + + /// Converting an op template into a ParsedOp means back-filling it with config data from the + /// activity parameter. + public synchronized , SPACETYPE extends Space> ParsedOp getParsedOp() { + if (resolvingParsedOp == true) { + throw new OpConfigError("Auto-recursion while resolving dispenser for op '" + + template.getName() + + "'"); + } + resolvingParsedOp = true; + if (parsedOp == null) { + this.parsedOp = parsedOpResolver.apply(activity, resolveAdapter(), template); + } + resolvingParsedOp = false; + return parsedOp; + } + + /// Each op template is interpreted by a specific [OpMapper] designated by the `driver` op + /// field. Thus + /// the associated driver needs to be loaded. + private synchronized , SPACETYPE extends Space> DriverAdapter resolveAdapter() { + if (resolvingAdapter) { + throw new OpConfigError("Auto-recursion while resolving adapter for op '" + + template.getName() + + "'"); + } + resolvingAdapter = true; + String + driverName = + template.getOptionalStringParam("driver", String.class) + .or(() -> activity.getConfig().getOptional("driver")).orElse("stdout"); + this.adapter = adapterF.apply(activity, driverName, activity.getConfig()); + resolvingAdapter = false; + return (DriverAdapter) adapter; + } + + @Override public Map getTags() { + return template.getTags(); + } +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolver.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolver.java deleted file mode 100644 index e0c935ec9..000000000 --- a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolver.java +++ /dev/null @@ -1,63 +0,0 @@ -package io.nosqlbench.engine.api.activityimpl.uniform; - -import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; -import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates; -import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; -import io.nosqlbench.adapters.api.activityimpl.uniform.Space; -import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; -import io.nosqlbench.nb.api.errors.OpConfigError; -import io.nosqlbench.nb.api.tagging.TagFilter; - -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.function.Supplier; - -/// TODO: Auto-inject the default driver name into any op that doesn't have it set -/// -/// ## Requirements for core -/// * All lookups must be lazy init -/// * All lookups must be cached -/// * All cache state must be extractable as plan -/// -/// ## Requirements for callers -/// * Callers must be able to look up an op template by tag filter -/// * Callers must be able to look up a parsed op by tag filter -public class OpResolver { - private OpTemplates opTemplates; - private final Supplier optSupplier; - List, Space>> adapterlist = new ArrayList<>(); - - public OpResolver(Supplier optSupplier) { - this.optSupplier = optSupplier; - } - - /// Find a required op template matching a tag filter - public synchronized OpTemplate findOne(String tagFilter) { - return findOneOptional(tagFilter).orElseThrow( - () -> new OpConfigError("No op found for " + tagFilter)); - } - - private synchronized void load() { - if (opTemplates==null) { - opTemplates=optSupplier.get(); - } - } - - /// Find an optional op template matching a tag filter - public synchronized Optional findOneOptional(String tagFilter) { - List matching = lookup(tagFilter); - if (matching.size() > 1) { - throw new OpConfigError( - "Found more than one op templates with the tag filter: " + tagFilter); - } - return matching.size() == 1 ? Optional.of(matching.get(0)) : Optional.empty(); - } - - /// Find any op templates matching a tag filter - public synchronized List lookup(String tagFilter) { - load(); - TagFilter tf = new TagFilter(tagFilter); - return opTemplates.stream().filter(tf::matchesTagged).toList(); - } -} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolverBank.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolverBank.java new file mode 100644 index 000000000..ec220933b --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/OpResolverBank.java @@ -0,0 +1,71 @@ +package io.nosqlbench.engine.api.activityimpl.uniform; + +/* + * Copyright (c) nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; +import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.adapters.api.activityimpl.uniform.Space; +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.nosqlbench.nb.api.config.standard.NBConfiguration; +import io.nosqlbench.nb.api.tagging.TagFilter; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; + +public class OpResolverBank { + private final OpTemplates optpl; + private final List resolvers = new ArrayList<>(); + private final TagFilter tagFilter; + + public OpResolverBank( + Activity activity, + AdapterResolver adapterF, + OpTemplates reference, + String tagFilter, + DispenserResolver dispF, + ParsedOpResolver popF, + NBConfiguration config + ) + { + this.optpl = reference; + this.tagFilter = new TagFilter(tagFilter); + OpTemplates activeOpTemplates = reference.matching(tagFilter, false); + for (OpTemplate opTemplate : activeOpTemplates) { + OpResolution + opres = + new OpResolution(activity, adapterF, opTemplate, popF, dispF, this); + resolvers.add(opres); + } + } + + public List>> resolveDispensers() { + List>> dispensers = new ArrayList<>(resolvers.size()); + for (OpResolution resolver : resolvers) { + OpDispenser> dispenser = resolver.resolveDispenser(); + dispensers.add(dispenser); + } + return dispensers; + } + +} diff --git a/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ParsedOpResolver.java b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ParsedOpResolver.java new file mode 100644 index 000000000..8f60f555e --- /dev/null +++ b/nb-engine/nb-engine-core/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ParsedOpResolver.java @@ -0,0 +1,44 @@ +package io.nosqlbench.engine.api.activityimpl.uniform; + +/* + * Copyright (c) nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.adapters.api.activityimpl.uniform.Space; +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.nosqlbench.nb.api.components.core.NBComponent; + +import java.util.List; +import java.util.function.BiFunction; + +public class ParsedOpResolver { + public ParsedOp apply( + NBComponent parent, + DriverAdapter, Space> adapter, + OpTemplate tpl + ) + { + return new ParsedOp( + tpl, + adapter.getConfiguration().getMap(), + List.of(adapter.getPreprocessor()), + parent); + } +}