cleanups and formatting

This commit is contained in:
Jonathan Shook 2025-01-08 12:46:04 -06:00
parent 0756b6e9d8
commit 5a12392102
16 changed files with 397 additions and 645 deletions

View File

@ -61,14 +61,21 @@ public class OpsLoader {
public static OpsDocList loadPath(String path, Map<String, ?> params, String... searchPaths) {
String[] extensions = path.indexOf('.') > -1 ? new String[]{} : YAML_EXTENSIONS;
ResolverChain chain = new ResolverChain(path);
Content<?> foundPath = NBIO.chain(chain.getChain()).searchPrefixes(searchPaths).pathname(chain.getPath()).extensionSet(extensions).first()
.orElseThrow(() -> new RuntimeException("Unable to load path '" + path + "'"));
Content<?> foundPath =
NBIO.chain(chain.getChain()).searchPrefixes(searchPaths).pathname(chain.getPath())
.extensionSet(extensions).first()
.orElseThrow(() -> new RuntimeException("Unable to load path '" + path + "'"));
OpTemplateFormat fmt = OpTemplateFormat.valueOfURI(foundPath.getURI());
return loadString(foundPath.asString(), fmt, params, foundPath.getURI());
}
public static OpsDocList loadString(
final String sourceData, OpTemplateFormat fmt, Map<String, ?> params, URI srcuri) {
final String sourceData,
OpTemplateFormat fmt,
Map<String, ?> params,
URI srcuri
)
{
if (srcuri != null) {
logger.info("workload URI: '" + srcuri + "'");
@ -113,9 +120,15 @@ public class OpsLoader {
}
int resultStatus = SjsonnetMain.main0(
injected.toArray(new String[0]), new DefaultParseCache(), inputStream, stdoutStream,
stderrStream, new os.Path(Path.of(System.getProperty("user.dir"))), Option.empty(),
Option.empty(), null
injected.toArray(new String[0]),
new DefaultParseCache(),
inputStream,
stdoutStream,
stderrStream,
new os.Path(Path.of(System.getProperty("user.dir"))),
Option.empty(),
Option.empty(),
null
);
String stdoutOutput = stdoutBuffer.toString(StandardCharsets.UTF_8);
@ -133,16 +146,17 @@ public class OpsLoader {
}
}
if (!stderrOutput.isEmpty()) {
BasicError error = new BasicError(
"stderr output from jsonnet preprocessing: " + stderrOutput);
BasicError error =
new BasicError("stderr output from jsonnet preprocessing: " + stderrOutput);
if (resultStatus != 0) {
throw error;
} else {
logger.warn(error.toString(), error);
}
}
logger.info("jsonnet processing read '" + uri + "', rendered " + stdoutOutput.split(
"\n").length + " lines.");
logger.info(
"jsonnet processing read '" + uri + "', rendered " + stdoutOutput.split("\n").length
+ " lines.");
logger.trace("jsonnet result:\n" + stdoutOutput);
return stdoutOutput;
@ -152,8 +166,11 @@ public class OpsLoader {
// into the parsers in a non-exception way
public static boolean isJson(String workload) {
try {
new GsonBuilder().setPrettyPrinting().create().fromJson(workload, Map.class);
return true;
if (workload.matches("^\\s*\\{.+")) {
new GsonBuilder().setPrettyPrinting().create().fromJson(workload, Map.class);
return true;
}
return false;
} catch (Exception e) {
return false;
}
@ -163,8 +180,11 @@ public class OpsLoader {
// into the parsers in a non-exception way
public static boolean isYaml(String workload) {
try {
Object result = new Load(LoadSettings.builder().build()).loadFromString(workload);
return (result instanceof Map);
if (workload.indexOf('\n')>=0) {
Object result = new Load(LoadSettings.builder().build()).loadFromString(workload);
return (result instanceof Map);
}
return false;
} catch (Exception e) {
return false;
}

View File

@ -107,7 +107,7 @@ public class OpsOwner extends RawOpFields {
}
setOpsFieldByType(itemizedMaps);
} else if (object instanceof String) {
setOpsFieldByType(Map.of("stmt1", (String) object));
setOpsFieldByType(Map.of("stmt", (String) object));
} else {
throw new RuntimeException("Unknown object type: " + object.getClass());
}

View File

@ -102,6 +102,10 @@ public class OpDef extends OpTemplate {
return tags;
}
/// Op template definitions are auto-tagged according to their placement within the workload
/// template. The block name and op name are both added as individual labels.
/// No other label should be added as before with auto-concatenation, since this breaks the
/// definitive behavior of tag filters over label combinations.
private LinkedHashMap<String, String> composeTags() {
LinkedHashMap<String, String> tagsWithName = new LinkedHashMap<>(new MultiMapLookup<>(rawOpDef.getTags(), block.getTags()));
tagsWithName.put("block",block.getName());

View File

@ -2,13 +2,13 @@ package io.nosqlbench.adapters.api.activityconfig.yaml;
/*
* Copyright (c) nosqlbench
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -20,7 +20,9 @@ package io.nosqlbench.adapters.api.activityconfig.yaml;
import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpsDocList;
import io.nosqlbench.nb.api.tagging.TagFilter;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
@ -30,17 +32,19 @@ import java.util.stream.Stream;
/// [OpTemplates] is a list of selected op templates and their backing data.
///
/// It is a value type which makes it easy to /// get matching subsets of op templates according to tag filters, to combine them, etc.
/// It is a value type which makes it easy to get matching subsets of op templates according to
/// tag filters, to combine them, etc.
///
/// When a user selects an op template, they are expected to use the [TagFilter] mechanism.
/// Any such lookup methods should be implemented on this class.
public class OpTemplates implements Iterable<OpTemplate> {
public class OpTemplates implements Iterable<OpTemplate> {
private final ArrayList<OpTemplate> templates = new ArrayList<>();
private final static Logger logger = LogManager.getLogger(OpTemplates.class);
private final OpsDocList opsDocList;
public OpTemplates(OpsDocList opsDocList) {
opsDocList.getStmtDocs().stream().flatMap(d -> d.getOpTemplates().stream()).forEach(templates::add);
opsDocList.getStmtDocs().stream().flatMap(d -> d.getOpTemplates().stream())
.forEach(templates::add);
this.opsDocList = opsDocList;
}
@ -50,7 +54,7 @@ public class OpTemplates implements Iterable<OpTemplate> {
}
public OpTemplates() {
this.opsDocList=new OpsDocList(new RawOpsDocList(List.of()));
this.opsDocList = new OpsDocList(new RawOpsDocList(List.of()));
}
public OpTemplates and(OpTemplates other) {
@ -59,22 +63,21 @@ public class OpTemplates implements Iterable<OpTemplate> {
}
/**
* @param tagFilterSpec a comma-separated tag filter spec
* @return The list of all included op templates for all included blocks of in this document,
* including the inherited and overridden values from this doc and the parent block.
@param tagFilterSpec
a comma-separated tag filter spec
@return The list of all included op templates for all included blocks of in this document,
including the inherited and overridden values from this doc and the parent block.
*/
public OpTemplates matching(String tagFilterSpec, boolean logit) {
return matching(new TagFilter(tagFilterSpec), logit);
}
public OpTemplates matching(TagFilter tagFilter, boolean logit) {
List<OpTemplate> matchingOpTemplates = new ArrayList<>();
List<String> matchlog = new ArrayList<>();
templates.stream()
.map(tagFilter::matchesTaggedResult)
.peek(r -> matchlog.add(r.getLog()))
.filter(TagFilter.Result::matched)
.map(TagFilter.Result::getElement)
templates.stream().map(tagFilter::matchesTaggedResult).peek(r -> matchlog.add(r.getLog()))
.filter(TagFilter.Result::matched).map(TagFilter.Result::getElement)
.forEach(matchingOpTemplates::add);
if (logit) {
@ -83,11 +86,11 @@ public class OpTemplates implements Iterable<OpTemplate> {
}
}
return new OpTemplates(matchingOpTemplates,opsDocList);
return new OpTemplates(matchingOpTemplates, opsDocList);
}
public Map<String,String> getDocBindings() {
public Map<String, String> getDocBindings() {
return opsDocList.getDocBindings();
}
@ -112,9 +115,10 @@ public class OpTemplates implements Iterable<OpTemplate> {
return this.templates.isEmpty();
}
public OpTemplates transform(Function<OpTemplate,OpTemplate> transformF) {
List<OpTemplate> transformed = this.templates.stream().map(t -> transformF.apply(t)).toList();
return new OpTemplates(transformed,opsDocList);
public OpTemplates transform(Function<OpTemplate, OpTemplate> transformF) {
List<OpTemplate> transformed = this.templates.stream().map(t -> transformF.apply(t))
.toList();
return new OpTemplates(transformed, opsDocList);
}
}

View File

@ -103,25 +103,22 @@ public class NBBaseComponent extends NBBaseComponentMetrics
public synchronized NBComponent attachChild(NBComponent... children) {
for (NBComponent adding : children) {
logger.debug(
() -> "attaching " + adding.description() + " to parent " + this.description());
logger.debug(() -> "attaching " + adding.description() + " to parent "
+ this.description());
for (NBComponent extant : this.children) {
NBLabels eachLabels = extant.getComponentOnlyLabels();
NBLabels newLabels = adding.getComponentOnlyLabels();
if (eachLabels != null &&
newLabels != null &&
!eachLabels.isEmpty() &&
!newLabels.isEmpty() &&
adding.getComponentOnlyLabels().equals(extant.getComponentOnlyLabels()))
if (eachLabels != null && newLabels != null && !eachLabels.isEmpty()
&& !newLabels.isEmpty() && adding.getComponentOnlyLabels()
.equals(extant.getComponentOnlyLabels()))
{
throw new RuntimeException("""
Adding second child under already-defined labels is not allowed:
parent: (PARENTCLASS) PARENTNAME
extant: (EXTANTCLASS) EXTANTNAME
adding: (ADDINGCLASS) ADDINGNAME
"""
.replaceAll("PARENTCLASS", this.getClass().getSimpleName())
""".replaceAll("PARENTCLASS", this.getClass().getSimpleName())
.replaceAll("PARENTNAME", this.description())
.replaceAll("EXTANTCLASS", extant.getClass().getSimpleName())
.replaceAll("EXTANTNAME", extant.description())
@ -137,10 +134,8 @@ public class NBBaseComponent extends NBBaseComponentMetrics
@Override
public NBComponent detachChild(NBComponent... children) {
for (NBComponent child : children) {
logger.debug(() -> "notifyinb before detaching " +
child.description() +
" from " +
this.description());
logger.debug(() -> "notifying before detaching " + child.description() + " from "
+ this.description());
child.beforeDetach();
}
for (NBComponent child : children) {
@ -159,8 +154,7 @@ public class NBBaseComponent extends NBBaseComponentMetrics
@Override
public NBLabels getLabels() {
NBLabels effectiveLabels = (this.parent == null ? NBLabels.forKV() : parent.getLabels());
effectiveLabels = (this.labels == null) ?
effectiveLabels :
effectiveLabels = (this.labels == null) ? effectiveLabels :
effectiveLabels.and(this.labels);
return effectiveLabels;
}

View File

@ -26,6 +26,7 @@ public class NBComponentExecutionScope implements AutoCloseable {
public NBComponentExecutionScope(NBComponent... components) {
this.components = components;
}
@Override
public void close() throws RuntimeException {
for (NBComponent component : components) {

View File

@ -18,6 +18,7 @@ package io.nosqlbench.nb.api.config.standard;
import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -63,10 +64,10 @@ public class ConfigModel implements NBConfigModel {
}
/**
* Add a param that, when present in a runtime configuration, will cause the config
* model to be expanded dynamically. This is for scenarios in which you have external
* configurable resources or templates which contain their own models that can
* only be known at runtime.
Add a param that, when present in a runtime configuration, will cause the config
model to be expanded dynamically. This is for scenarios in which you have external
configurable resources or templates which contain their own models that can
only be known at runtime.
*/
public NBConfigModel asReadOnly() {
@ -88,7 +89,13 @@ public class ConfigModel implements NBConfigModel {
return ofType;
}
public static <T> T convertValueTo(String configName, String paramName, Object value, Class<T> type) {
public static <T> T convertValueTo(
String configName,
String paramName,
Object value,
Class<T> type
)
{
try {
if (type.isAssignableFrom(value.getClass())) {
return type.cast(value);

View File

@ -47,7 +47,8 @@ public class NBConfiguration {
public static NBConfiguration empty() {
return new NBConfiguration(
ConfigModel.of(Object.class).asReadOnly(),
new LinkedHashMap<>());
new LinkedHashMap<>()
);
}
/**
@ -76,22 +77,19 @@ public class NBConfiguration {
public <T> T getWithEnv(String name, Class<? extends T> vclass) {
T value = get(name, vclass);
if (value == null) {
}
if (value instanceof String) {
Optional<String> interpolated = NBEnvironment.INSTANCE.interpolate(value.toString());
if (interpolated.isEmpty()) {
throw new NBConfigError("Unable to interpolate env and sys props in '" +
value +
"'");
throw new NBConfigError(
"Unable to interpolate env and sys props in '" + value + "'");
}
String result = interpolated.get();
return ConfigModel.convertValueTo(
this.getClass().getSimpleName(),
name,
result,
vclass);
vclass
);
} else {
return value;
}
@ -111,27 +109,23 @@ public class NBConfiguration {
public <T> T get(String name) {
Param<T> param = (Param<T>) model.getNamedParams().get(name);
if (param == null) {
throw new NBConfigError("Attempted to get parameter for name '" +
name +
"' but this parameter has no " +
"model defined for " +
this.getModel().getOf());
throw new NBConfigError(
"Attempted to get parameter for name '" + name + "' but this parameter has no "
+ "model defined for " + this.getModel().getOf());
}
// if (param.isRequired() && (param.getDefaultValue()==null) && )
Object object = this.data.get(name);
object = object != null ? object : param.getDefaultValue();
if (object == null && param.isRequired()) {
throw new NBConfigError("An object by name '" +
name +
"' was requested as required, and no value was" +
" defined for it. This user provided value must be set or otherwise marked optional or given a" +
" default value in the parameter model.");
throw new NBConfigError(
"An object by name '" + name + "' was requested as required, and no value was"
+ " defined for it. This user provided value must be set or otherwise marked optional or given a"
+ " default value in the parameter model.");
} else if (object == null && !param.isRequired()) {
throw new NBConfigError("An object by name '" +
name +
"' was requested as given by the config layer," +
" but no value was present, and no default was found in the config model. This is an ambiguous " +
"scenario. Either access the object as optional, or give it a default value. (code change)");
throw new NBConfigError(
"An object by name '" + name + "' was requested as given by the config layer,"
+ " but no value was present, and no default was found in the config model. This is an ambiguous "
+ "scenario. Either access the object as optional, or give it a default value. (code change)");
}
if (param.type.isInstance(object)) {
return (T) object;
@ -140,14 +134,11 @@ public class NBConfiguration {
} else if (NBTypeConverter.canConvert(object, param.type)) {
return NBTypeConverter.convert(object, param.type);
} else {
throw new NBConfigError("Unable to assign config value for field '" +
name +
"' of type '" +
object.getClass().getCanonicalName() +
"' to the required return type '" +
param.type.getCanonicalName() +
"' as specified in the config model for '" +
model.getOf().getCanonicalName());
throw new NBConfigError(
"Unable to assign config value for field '" + name + "' of type '"
+ object.getClass().getCanonicalName() + "' to the required return type '"
+ param.type.getCanonicalName() + "' as specified in the config model for '"
+ model.getOf().getCanonicalName());
}
}
@ -155,20 +146,16 @@ public class NBConfiguration {
Param<T> param = model.getParam(name);
if (param == null) {
throw new NBConfigError("Parameter named '" +
name +
"' is not valid for " +
model.getOf().getSimpleName() +
".");
throw new NBConfigError(
"Parameter named '" + name + "' is not valid for " + model.getOf().getSimpleName()
+ ".");
}
if ((!param.isRequired()) && param.getDefaultValue() == null) {
throw new RuntimeException("Non-optional get on optional parameter " +
name +
"' which has no default value while configuring " +
model.getOf() +
"." +
"\nTo avoid user impact, ensure that ConfigModel and NBConfigurable usage are aligned.");
throw new RuntimeException("""
Non-optional get on optional parameter 'PNAME' which has no default value while configuring OF.
To avoid user impact, ensure that ConfigModel and NBConfigurable usage are aligned.
""".replaceAll("PNAME", name).replaceAll("OF", model.getOf().getSimpleName()));
}
Object o = data.get(name);
@ -203,9 +190,8 @@ public class NBConfiguration {
}
}
} else {
throw new NBConfigError("Parameter definition was not found for " +
Arrays.toString(names) +
".");
throw new NBConfigError(
"Parameter definition was not found for " + Arrays.toString(names) + ".");
}
}
if (o == null) {
@ -222,11 +208,9 @@ public class NBConfiguration {
} else if (NBTypeConverter.canConvert(o, type)) {
return Optional.of((T) NBTypeConverter.convert(o, type));
} else {
throw new NBConfigError("config param " +
Arrays.toString(names) +
" was not assignable to class '" +
type.getCanonicalName() +
"'");
throw new NBConfigError(
"config param " + Arrays.toString(names) + " was not assignable to class '"
+ type.getCanonicalName() + "'");
}
}
@ -239,11 +223,9 @@ public class NBConfiguration {
if (defaultValue.getClass().isAssignableFrom(o.getClass())) {
return (T) o;
}
throw new NBConfigError("config parameter '" +
name +
"' is not assignable to required type '" +
defaultValue.getClass() +
"'");
throw new NBConfigError(
"config parameter '" + name + "' is not assignable to required type '"
+ defaultValue.getClass() + "'");
}
public <T> T param(String name, Class<? extends T> vclass) {
@ -275,7 +257,7 @@ public class NBConfiguration {
/// see [#update(Map)]
public <T> NBConfiguration update(String fieldName, T value) {
return update(Map.of(fieldName,value));
return update(Map.of(fieldName, value));
}
/// This will create a new configuration without modifying the existing one,
@ -288,7 +270,7 @@ public class NBConfiguration {
///
/// Any holders of an updated configurations must maintain their own copies if necessary for
/// deltas.
public <T> NBConfiguration update(Map<String,Object> entries) {
public <T> NBConfiguration update(Map<String, Object> entries) {
NBConfiguration updated = model.apply(new LinkedHashMap<>(this.data) {
{
putAll(entries);

View File

@ -28,11 +28,15 @@ import io.nosqlbench.nb.api.engine.activityimpl.ActivityConfig;
import java.io.InputStream;
import java.io.PrintWriter;
/**
* Provides the components needed to build and run an activity a runtime.
* The easiest way to build a useful StandardActivity is to extend {@link Activity}.
*/
public interface IActivityWiring extends Comparable<IActivityWiring>, ProgressCapable, StateCapable, NBComponent {
/// This is a vestigial layer which will be removed. It originally provided a way
/// to assemble ad-hoc activity logic from component factory delegates. This meant that the
/// core activity engine could be wired differently from a set of variations in each component.
/// The core engine has been consolidated at this point and modal behaviors pushed to variations
/// of edge components -- particularly in op synthesis and modifiers to op behavior. Thus, this
/// layer is no longer needed and should be removed.
public interface ActivityWiring
extends Comparable<ActivityWiring>, ProgressCapable, StateCapable, NBComponent
{
ActivityConfig getActivityConfig();

View File

@ -1,81 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.planning;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
/// This version of an [[OpSequence]] allows a sequence to be derived from another sequence
/// based on a mapping function. This is done lazily by default, to allow for incremental
/// initialization.
public class DerivedSequence<BASE, DERIVED> implements OpSequence<DERIVED> {
private final OpSequence<BASE> baseTypeSequence;
private final SequencerType type;
private final List<DERIVED> elems;
private final int[] seq;
private final Function<BASE, DERIVED> deriveF;
public DerivedSequence(OpSequence<BASE> baseTypeSequence, Function<BASE,DERIVED> deriveF) {
this.baseTypeSequence = baseTypeSequence;
this.deriveF = deriveF;
this.type = baseTypeSequence.getSequencerType();
this.elems = new ArrayList<>(baseTypeSequence.getOps().size());
this.seq = baseTypeSequence.getSequence();
}
@Override
public DERIVED apply(long selector) {
int index = (int) (selector % seq.length);
index = seq[index];
return elems.get(index);
}
public DERIVED derive(long selector) {
int index = (int) (selector % seq.length);
if (elems.get(index)==null) {
elems.set(index,this.deriveF.apply(baseTypeSequence.getOps().get(index)));
}
return elems.get(index);
}
@Override
public List<DERIVED> getOps() {
return elems;
}
@Override
public int[] getSequence() {
return seq;
}
public SequencerType getSequencerType() {
return type;
}
@Override
public <OTHER> DerivedSequence<DERIVED,OTHER> transform(Function<DERIVED, OTHER> func) {
return new DerivedSequence<DERIVED,OTHER>(this, func);
}
@Override
public String toString() {
return "seq len="+seq.length + ", LUT=" + Arrays.toString(this.seq);
}
}

View File

@ -18,7 +18,6 @@ package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
@ -45,7 +44,6 @@ import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.api.activityimpl.Dryrun;
import io.nosqlbench.engine.api.activityimpl.OpFunctionComposition;
import io.nosqlbench.engine.api.activityimpl.OpLookupService;
import io.nosqlbench.engine.api.activityimpl.input.AtomicInput;
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
@ -56,7 +54,6 @@ import io.nosqlbench.engine.core.lifecycle.commands.CMD_stop;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.engine.core.lifecycle.session.NBSession;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.NBEvent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@ -71,11 +68,9 @@ import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.labels.NBLabels;
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
import io.nosqlbench.nb.api.tagging.TagFilter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.LongFunction;
@ -110,8 +105,8 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends CycleOp<?>>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>, Space>> adapters
= new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>, Space>> adapters =
new ConcurrentHashMap<>();
public final ActivityMetrics metrics;
private ActivityMetricProgressMeter progressMeter;
@ -130,8 +125,6 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
public Activity(NBComponent parent, ActivityConfig config) {
super(parent, NBLabels.forKV("activity", config.getAlias()).and(config.auxLabels()));
// NBConfiguration validConfig = getConfigModel().apply(config.getMap());
this.applyConfig(config);
this.sequence = initSequence();
this.metrics = new ActivityMetrics(this);
@ -141,34 +134,22 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
return configFor(ParameterMap.parseParams(s).orElseThrow());
}
private OpSequence<OpDispenser<? extends CycleOp<?>>> initSequence() {
// this.activityDef = activityDef;
// this.metrics = new ActivityMetrics(this);
// OpsDocList workload;
Optional<String> yaml_loc = config.getOptional("yaml", "workload");
// TODO: avoid having to load this duplicitously to parse the template variables in a separate phase
NBConfigModel yamlmodel = yaml_loc.map(path -> {
return OpsLoader.loadPath(path, new LinkedHashMap<>(config.getMap()), "activities")
.getConfigModel();
}).orElse(ConfigModel.of(Activity.class).asReadOnly());
//region This region contains all of the refactored op synthesis logic
OpTemplates opTemplatesRef = loadOpTemplates();
/// How to load a named [DriverAdapter] with component parentage and labels, given
/// the driver name and the activity (cross-driver) configur ation
/// the driver name and the activity (cross-driver) configuration
AdapterResolver adapterResolver = new AdapterResolver();
ConcurrentHashMap<String, DriverAdapter> adapterCache = new ConcurrentHashMap<>();
/// Promote the driver adapter function into a cached version
Function<String, DriverAdapter<? extends CycleOp<?>, Space>> adapterF
= (name) -> adapterCache.computeIfAbsent(
name,
cacheName -> adapterResolver.apply(this, name, this.config));
Function<String, DriverAdapter<? extends CycleOp<?>, Space>> adapterF =
(name) -> adapterCache.computeIfAbsent(
name,
cacheName -> adapterResolver.apply(this, name, this.config)
);
/// How to get a parsed op, given an op template and an activity.
/// A parsed op depends on back-fill from the activity params, assuming those params
@ -182,94 +163,55 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
DispenserResolver dispenserResolver = new DispenserResolver();
OpResolverBank orb = new OpResolverBank(
this, adapterResolver, opTemplatesRef, config.get("tags"), dispenserResolver, parsedOpF,
config);
this,
adapterResolver,
opTemplatesRef,
config.get("tags"),
dispenserResolver,
parsedOpF,
config
);
List<? extends OpDispenser<?>> dispensers = orb.resolveDispensers();
/// TODO: Here, we have resolved the dispensers. The next step is to add any modifiers to them
/// as composed functions for things like dry-run, etc.
if (config.get("dryrun", Dryrun.class) == Dryrun.mapper) {
System.out.println(Diagnostics.summarizeMappedOps(dispensers));
System.exit(1);
}
SequencerType sequencerType =
config.getOptional("seq").map(SequencerType::valueOf).orElse(SequencerType.bucket);
/// TODO: Here, we have resolved the dispensers and their modifiers. The next step is to create the LUT
/// for the conventional [[OpSequence]], although other non-deterministic op selection
/// methods should also be supported.
SequencerType sequencerType = config.getOptional("seq").map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(
sequencerType);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner =
new SequencePlanner<>(sequencerType);
for (OpDispenser<?> dispenser : dispensers) {
planner.addOp(dispenser, d -> d.getRatio());
}
OpSequence<OpDispenser<? extends CycleOp<?>>> sequence = planner.resolve();
return sequence;
// TODO: Perhaps, op templates should be split into core/reserved partition and another, with a proxy
// object retained for the core elements
//endregion
// Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
// Optional<DriverAdapter<?, ?>> defaultAdapter = activityDef.getParams()
// .getOptionalString("driver")
// .flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
// .map(l -> l.load(this, NBLabels.forKV()));
//
// if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
// throw new BasicError("Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this " + "driver adapter. Change" + " '<activeByDefault>false</activeByDefault>' for the driver in" + " './nb-adapters/pom.xml' and" + " './nb-adapters/nb-adapters-included/pom.xml' first.");
// }
// NBConfigModel supersetConfig = ConfigModel.of(Activity.class).add(yamlmodel);
// Optional<String> defaultDriverOption = defaultDriverName;
// ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
// List<ParsedOp> allParsedOps = loadOpTemplates(defaultAdapter.orElse(null), false, false).stream()
// .map(ot -> upconvert(ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist))
// .toList();
// OpLookup lookup = new OpLookupService(() -> allParsedOps);
// TagFilter ts = new TagFilter(activityDef.getParams().getOptionalString("tags").orElse(""));
// List<ParsedOp> activeParsedOps = ts.filter(allParsedOps);
// if (defaultDriverOption.isPresent()) {
// long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count();
// if (0 == matchingDefault) {
// logger.warn(
// "All op templates used a different driver than the default '{}'",
// defaultDriverOption.get()
// );
// }
// }
// try {
// sequence = createOpSourceFromParsedOps(adapterlist, activeParsedOps, lookup);
// } catch (Exception e) {
// if (e instanceof OpConfigError) {
// throw e;
// }
// throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
// }
}
private void initOpsMetrics() {
create().gauge(
"ops_pending", () -> this.getProgressMeter().getSummary().pending(),
"ops_pending",
() -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core,
"The current number of operations which have not been dispatched for" +
" processing yet.");
"The current number of operations which have not been dispatched for"
+ " processing yet."
);
create().gauge(
"ops_active", () -> this.getProgressMeter().getSummary().current(), MetricCategory.Core,
"The current number of operations which have been dispatched for" +
" processing, but which have not yet completed.");
"ops_active",
() -> this.getProgressMeter().getSummary().current(),
MetricCategory.Core,
"The current number of operations which have been dispatched for"
+ " processing, but which have not yet completed."
);
create().gauge(
"ops_complete", () -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core, "The current number of operations which have been completed");
"ops_complete",
() -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core,
"The current number of operations which have been completed"
);
}
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps2(List<DriverAdapter<CycleOp<?>, Space>> adapters,
@ -286,10 +228,10 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
ratios.add(ratio);
}
SequencerType sequencerType = config.getOptional("seq").map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(
sequencerType);
SequencerType sequencerType =
config.getOptional("seq").map(SequencerType::valueOf).orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner =
new SequencePlanner<>(sequencerType);
for (int i = 0; i < pops.size(); i++) {
long ratio = ratios.get(i);
@ -304,22 +246,25 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
DriverAdapter<CycleOp<?>, Space> adapter = adapters.get(i);
OpMapper<CycleOp<?>, Space> opMapper = adapter.getOpMapper();
LongFunction<Space> spaceFunc = adapter.getSpaceFunc(pop);
OpDispenser<? extends CycleOp<?>> dispenser = opMapper.apply(
this, pop, spaceFunc);
OpDispenser<? extends CycleOp<?>> dispenser =
opMapper.apply(this, pop, spaceFunc);
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun");
dispenser = OpFunctionComposition.wrapOptionally(
adapter, dispenser, pop,
dryrun, opLookup);
adapter,
dispenser,
pop,
dryrun,
opLookup
);
planner.addOp((OpDispenser<? extends CycleOp<?>>) dispenser, ratio);
} catch (Exception e) {
throw new OpConfigError(
"Error while mapping op from template named '" +
pop.getName() +
"': " +
e.getMessage(), e);
"Error while mapping op from template named '" + pop.getName() + "': "
+ e.getMessage(), e
);
}
}
@ -334,72 +279,6 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
}
}
// private ParsedOp upconvert(
// OpTemplate ot,
// Optional<String> defaultDriverOption,
// NBConfigModel yamlmodel,
// NBConfigModel supersetConfig,
// ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
// List<DriverAdapter<CycleOp<?>, Space>> adapterlist
// )
// {
// // ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(),
// // List.of(), this);
// String
// driverName =
// ot.getOptionalStringParam("driver", String.class)
// .or(() -> ot.getOptionalStringParam("type", String.class))
// .or(() -> defaultDriverOption).orElseThrow(() -> new OpConfigError(
// "Unable to identify driver name for op template:\n" + ot));
//
// DriverAdapter<CycleOp<?>, Space>
// adapter =
// adapters.computeIfAbsent(
// driverName,
// dn -> loadAdapter(dn, yamlmodel, supersetConfig, mappers));
// supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
// adapterlist.add(adapter);
//
// ParsedOp
// pop =
// new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
// Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
//
// return pop;
// }
// private DriverAdapter<CycleOp<?>, Space> loadAdapter(
// String driverName,
// NBConfigModel yamlmodel,
// NBConfigModel supersetConfig,
// ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
// )
// {
// DriverAdapter<CycleOp<?>, Space>
// adapter =
// Optional.of(driverName).flatMap(name -> ServiceSelector.of(
// name,
// ServiceLoader.load(DriverAdapterLoader.class)).get())
// .map(l -> l.load(this, NBLabels.forKV()))
// .orElseThrow(() -> new OpConfigError("driver adapter not present for name '" +
// driverName +
// "'"));
//
// NBConfigModel combinedModel = yamlmodel;
// NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
//
// if (adapter instanceof NBConfigurable configurable) {
// NBConfigModel adapterModel = configurable.getConfigModel();
// supersetConfig.add(adapterModel);
//
// combinedModel = adapterModel.add(yamlmodel);
// combinedConfig = combinedModel.matchConfig(activityDef.getParams());
// configurable.applyConfig(combinedConfig);
// }
// mappers.put(driverName, adapter.getOpMapper());
// return adapter;
// }
public void initActivity() {
initOrUpdateRateLimiters();
setDefaultsFromOpSequence(sequence);
@ -409,19 +288,6 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
return sequence;
}
// /**
// * When an adapter needs to identify an error uniquely for the purposes of
// * routing it to the correct error handler, or naming it in logs, or naming
// * metrics, override this method in your activity.
// *
// * @return A function that can reliably and safely map an instance of Throwable to a
// stable name.
// */
// @Override
// public final Function<Throwable, String> getErrorNameMapper() {
// return adapter.getErrorNameMapper();
// }
@Override
public OpTemplates getSyntheticOpTemplates(OpTemplates opsDocList, Map<String, Object> cfg) {
OpTemplates accumulator = new OpTemplates();
@ -470,68 +336,11 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
}
}
// private OpTemplates loadOpTemplates(
// DriverAdapter<?, ?> defaultDriverAdapter,
// boolean logged,
// boolean filtered
// )
// {
//
// String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
//
// OpTemplates templates = loadOpTemplates();
// OpTemplates filteredOps = templates.matching(filtered ? tagfilter : "", logged);
//
// if (filteredOps.isEmpty()) {
// // There were no ops, and it *wasn't* because they were all filtered out.
// // In this case, let's try to synthesize the ops as long as at least a default driver
// // was provided
// // But if there were no ops, and there was no default driver provided, we can't continue
// // There were no ops, and it was because they were all filtered out
// OpTemplates unfilteredOps = templates.matching("", false);
// if (!unfilteredOps.isEmpty()) {
// String
// message =
// "There were no active op templates with tag filter '" +
// tagfilter +
// "', since all " +
// unfilteredOps.size() +
// " were filtered out. Examine the session log for details";
// NBAdvisorOutput.test(message);
// // throw new BasicError(message);
// }
// if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
// filteredOps = sotp.getSyntheticOpTemplates(templates, this.activityDef.getParams());
// Objects.requireNonNull(filteredOps);
// if (filteredOps.isEmpty()) {
// throw new BasicError("Attempted to create synthetic ops from driver '" +
// defaultDriverAdapter.getAdapterName() +
// '\'' +
// " but no ops were created. You must provide either a workload" +
// " or an op parameter. Activities require op templates.");
// }
// } else {
// throw new BasicError("""
// No op templates were provided. You must provide one of these activity parameters:
// 1) workload=some.yaml
// 2) op='inline template'
// 3) driver=stdout (or any other drive that can synthesize ops)\
// """);
// }
// }
// return filteredOps;
// }
/**
Modify the provided activity config with defaults for stride and cycles, if they haven't been
provided, based on the
length of the sequence as determined by the provided ratios. Also, modify the activity config
with
reasonable
defaults when requested.
@param seq
- The {@link OpSequence} to derive the defaults from
*/
/// Modify the provided activity config with defaults for stride and cycles, if they haven't
/// been provided, based on the length of the sequence as determined by the provided ratios.
/// Also, modify the activity config with reasonable defaults when requested.
/// @param seq
/// - The {@link OpSequence} to derive the defaults from
private synchronized void setDefaultsFromOpSequence(OpSequence<?> seq) {
Map<String, Object> updates = new LinkedHashMap<>(config.getMap());
@ -540,29 +349,30 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
String stride = String.valueOf(seq.getSequence().length);
logger.info(() -> "defaulting stride to " + stride + " (the sequence length)");
return stride;
});
}
);
updates.computeIfAbsent(
"cycles", k -> {
String cycles = (String) updates.get("stride");
logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)");
return cycles;
});
}
);
long cycles = CyclesSpec.parse(updates.get("cycles").toString()).cycle_count();
long stride = Long.parseLong(updates.get("stride").toString());
if (cycles < stride) {
throw new RuntimeException("The specified cycles (" +
cycles +
") are less than the stride (" +
stride +
"). This means there aren't enough cycles to cause a stride to be" +
" executed. If this was intended, then set stride low enough to" +
" allow it.");
throw new RuntimeException("""
The specified cycles (CYCLES) are less than the stride (STRIDE)
This means there aren't enough cycles to cause a stride to be
executed. If this was intended, then set stride low enough to allow it
""".replaceAll("CYCLES", String.valueOf(cycles))
.replaceAll("STRIDE", String.valueOf(stride)));
}
Optional<String> threadSpec = Optional.ofNullable(updates.get("threads"))
.map(String::valueOf);
Optional<String> threadSpec =
Optional.ofNullable(updates.get("threads")).map(String::valueOf);
if (threadSpec.isPresent()) {
String spec = threadSpec.get();
@ -574,7 +384,8 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
threads = (int) cycles;
logger.info(
"setting threads to {} (auto) [10xCORES, cycle count limited]",
threads);
threads
);
} else {
logger.info("setting threads to {} (auto) [10xCORES]", threads);
}
@ -593,93 +404,26 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
if (threads > cycles) {
int finalThreads1 = threads;
logger.warn(() -> "threads=" +
finalThreads1 +
" and cycles=" +
updates.get("cycles").toString() +
", you should have more cycles than threads.");
logger.warn(() -> "threads=" + finalThreads1 + " and cycles="
+ updates.get("cycles").toString()
+ ", you should have more cycles than threads.");
}
} else if (1000 < cycles) {
logger.warn(() -> "For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
logger.warn(() -> "For testing at scale, it is highly recommended that you "
+ "set threads to a value higher than the default of 1."
+ " hint: you can use threads=auto for reasonable default, or"
+ " consult the topic on threads with `help threads` for"
+ " more information.");
}
if (0 < cycles && seq.getOps().isEmpty()) {
throw new BasicError(
"You have configured a zero-length sequence and non-zero cycles. It is not" +
" possible to continue with this activity.");
"You have configured a zero-length sequence and non-zero cycles. It is not"
+ " possible to continue with this activity.");
}
}
// /**
// Given a function that can create an op of type <O> from an OpTemplate, generate
// an indexed sequence of ready to call operations.
// <p>
// This method uses the following conventions to derive the sequence:
//
// <OL>
// <LI>If an 'op', 'stmt', or 'statement' parameter is provided, then it's value is
// taken as the only provided statement.</LI>
// <LI>If a 'yaml, or 'workload' parameter is provided, then the statements in that file
// are taken with their ratios </LI>
// <LI>Any provided tags filter is used to select only the op templates which have matching
// tags. If no tags are provided, then all the found op templates are included.</LI>
// <LI>The ratios and the 'seq' parameter are used to build a sequence of the ready
// operations,
// where the sequence length is the sum of the ratios.</LI>
// </OL>
// @param <O>
// A holder for an executable operation for the native driver used by this activity.
// @param opinit
// A function to map an OpTemplate to the executable operation form required by
// the native driver for this activity.
// @param defaultAdapter
// The adapter which will be used for any op templates with no explicit adapter
// @return The sequence of operations as determined by filtering and ratios
// */
// @Deprecated(forRemoval = true)
// protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(
// Function<OpTemplate, OpDispenser<? extends O>> opinit, boolean strict,
// DriverAdapter<?, ?> defaultAdapter
// ) {
//
// List<OpTemplate> stmts = loadOpTemplates(defaultAdapter, true, false);
//
// List<Long> ratios = new ArrayList<>(stmts.size());
//
// for (OpTemplate opTemplate : stmts) {
// long ratio = opTemplate.removeParamOrDefault("ratio", 1);
// ratios.add(ratio);
// }
//
// SequencerType sequencerType = getParams().getOptionalString("seq").map(
// SequencerType::valueOf).orElse(SequencerType.bucket);
//
// SequencePlanner<OpDispenser<? extends O>> planner = new
// SequencePlanner<>(sequencerType);
//
// try {
// for (int i = 0; i < stmts.size(); i++) {
// long ratio = ratios.get(i);
// OpTemplate optemplate = stmts.get(i);
// OpDispenser<? extends O> driverSpecificReadyOp = opinit.apply(optemplate);
// if (strict) {
// optemplate.assertConsumed();
// }
// planner.addOp(driverSpecificReadyOp, ratio);
// }
// } catch (Exception e) {
// throw new OpConfigError(e.getMessage(), workloadSource, e);
// }
//
// return planner.resolve();
// }
/// TODO: Move this out, adjacent to [OpsLoader]
protected OpTemplates loadOpTemplates() {
OpsDocList opsDocs = null;
try {
@ -687,31 +431,100 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
String stmt = config.getOptional("stmt", "statement").orElse(null);
String workload = config.getOptional("workload").orElse(null);
// If the user has specified more than one way of loading operations via
// activity parameters, then throw an error saying so
if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) {
throw new OpConfigError(
"Only op, statement, or workload may be provided, not more than one.");
} else if (workload != null && OpsLoader.isJson(workload)) {
}
// If the workload is literally in JSON format that starts with '{' and parsed to JSON,
// then it is parsed into the workload template structure
// instead of being loaded from a file.
if (workload != null && OpsLoader.isJson(workload)) {
workloadSource = "commandline: (workload/json):" + workload;
opsDocs = OpsLoader.loadString(
workload, OpTemplateFormat.json, config.getMap(), null);
} else if (workload != null && OpsLoader.isYaml(workload)) {
opsDocs =
OpsLoader.loadString(workload, OpTemplateFormat.json, config.getMap(), null);
return new OpTemplates(opsDocs);
}
// If the workload is literally in a multiline format that parses to YAML,
// then it is parsed into the workload template structure
// instead of being loaded from a file.
if (workload != null && OpsLoader.isYaml(workload)) {
workloadSource = "commandline: (workload/yaml):" + workload;
opsDocs = OpsLoader.loadString(
workload, OpTemplateFormat.yaml, config.getMap(), null);
} else if (workload != null) {
opsDocs =
OpsLoader.loadString(workload, OpTemplateFormat.yaml, config.getMap(), null);
return new OpTemplates(opsDocs);
}
// We try to load the workload from a file assuming YAML format,
// if the workload parameter is defined and not loadable by other means
if (workload != null) {
opsDocs = OpsLoader.loadPath(workload, config.getMap(), "activities");
} else if (stmt != null) {
return new OpTemplates(opsDocs);
}
// We take the stmt parameter
// if defined
// then wrap it in default workload template structure
// as an op with an op field 'stmt'
if (stmt != null) {
workloadSource = "commandline: (stmt/inline): '" + stmt + "'";
opsDocs = OpsLoader.loadString(
stmt, OpTemplateFormat.inline, config.getMap(), null);
} else if (op != null && OpsLoader.isJson(op)) {
opsDocs =
OpsLoader.loadString(stmt, OpTemplateFormat.inline, config.getMap(), null);
return new OpTemplates(opsDocs);
}
// We take the op parameter
// if defined and in JSON format
// the wrap it in default workload structure
// as a set of op fields in JSON format
if (op != null && OpsLoader.isJson(op)) {
workloadSource = "commandline: (op/json): '" + op + "'";
opsDocs = OpsLoader.loadString(op, OpTemplateFormat.json, config.getMap(), null);
} else if (op != null) {
}
// We take the op parameter
// if defined and not loadable via other means
// then assume it is yaml
// then wrap it in default workload structure
// as a set of op fields in YAML format
if (op != null) {
workloadSource = "commandline: (op/inline): '" + op + "'";
opsDocs = OpsLoader.loadString(op, OpTemplateFormat.inline, config.getMap(), null);
return new OpTemplates(opsDocs);
}
return new OpTemplates(opsDocs);
// If no methods of loading op templates were provided
// via op, stmt, or workload parameters, then we also check
// for synthetic ops, as in stdout making default formats from
// binding names
if (config.getOptional("driver").isPresent()) {
String driverName = config.get("driver");
DriverAdapter<? extends CycleOp<?>, Space> defaultDriverAdapter =
AdapterResolver.loadNamedAdapter(this, driverName);
if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
var filteredOps =
sotp.getSyntheticOpTemplates(new OpTemplates(), config.getMap());
Objects.requireNonNull(filteredOps);
if (filteredOps.isEmpty()) {
throw new BasicError("""
Attempted to create synthetic ops from driver ADAPTERNAME
but no ops were created. You must provide either a workload
or an op parameter. Activities require op templates.
""".replaceAll("ADAPTERNAME", defaultDriverAdapter.getAdapterName()));
}
}
}
// All possible methods of loading a op templates have failed
throw new BasicError("""
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)\
""");
} catch (Exception e) {
throw new OpConfigError("Error loading op templates: " + e, workloadSource, e);
@ -759,16 +572,14 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
}
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(
this, strideLimiterSource, spec);
strideLimiterSource =
ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
}
/**
Get the current cycle rate limiter for this activity.
The cycle rate limiter is used to throttle the rate at which
cycles are dispatched across all threads in the activity
@return the cycle {@link RateLimiter}
*/
/// Get the current cycle rate limiter for this activity.
/// The cycle rate limiter is used to throttle the rate at which
/// cycles are dispatched across all threads in the activity
/// @return the cycle {@link RateLimiter}
public RateLimiter getCycleLimiter() {
if (cycleLimiterSource != null) {
return cycleLimiterSource.get();
@ -777,12 +588,10 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
}
}
/**
Get the current stride rate limiter for this activity.
The stride rate limiter is used to throttle the rate at which
new strides are dispatched across all threads in an activity.
@return The stride {@link RateLimiter}
*/
/// Get the current stride rate limiter for this activity.
/// The stride rate limiter is used to throttle the rate at which
/// new strides are dispatched across all threads in an activity.
/// @return The stride {@link RateLimiter}
public synchronized RateLimiter getStrideLimiter() {
if (strideLimiterSource != null) {
return strideLimiterSource.get();
@ -800,12 +609,10 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
return Map.of("activity", config.getAlias());
}
/**
Activities with retryable operations (when specified with the retry error handler for some
types of error), should allow the user to specify how many retries are allowed before
giving up on the operation.
@return The number of allowable retries
*/
/// Activities with retryable operations (when specified with the retry error handler for some
/// types of error), should allow the user to specify how many retries are allowed before
/// giving up on the operation.
/// @return The number of allowable retries
public int getMaxTries() {
return config.getOptional(Integer.class, "maxtries").orElse(10);
}
@ -814,15 +621,16 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
if (null == this.errorHandler) {
errorHandler = new NBErrorHandler(
() -> config.getOptional("errors").orElse("stop"),
this::getExceptionMetrics);
this::getExceptionMetrics
);
}
return errorHandler;
}
public void closeAutoCloseables() {
for (AutoCloseable closeable : closeables) {
logger.debug(
() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
logger.debug(() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": "
+ closeable);
try {
closeable.close();
} catch (Exception e) {
@ -888,41 +696,47 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
return new ActivityConfig(configModel.apply(params));
}
private static NBConfigModel configModel = ConfigModel.of(Activity.class)
.add(Param.optional("alias")).add(Param.optional(
"labels", String.class,
"Labels which will apply to metrics and annotations for this activity only"))
.add(Param.defaultTo(
"strict", true,
"strict op field mode, which requires that provided op fields are recognized and used"))
.add(Param.optional("op", String.class, "op template in statement form")).add(
Param.optional(
List.of("stmt", "statement"), String.class,
"op template in statement " + "form"))
.add(Param.defaultTo("tags", "", "tag filter to be used to filter operations"))
.add(Param.defaultTo("errors", "stop", "error handler configuration")).add(
Param.defaultTo("threads","1").setRegex("\\d+|\\d+x|auto")
private static NBConfigModel configModel =
ConfigModel.of(Activity.class).add(Param.optional("alias")).add(Param.optional(
"labels",
String.class,
"Labels which will apply to metrics and annotations for this activity only"
)).add(Param.defaultTo(
"strict",
true,
"strict op field mode, which requires that provided op fields are recognized and used"
)).add(Param.optional("op", String.class, "op template in statement form"))
.add(Param.optional(
List.of("stmt", "statement"),
String.class,
"op template in statement " + "form"
)).add(Param.defaultTo("tags", "", "tag filter to be used to filter operations"))
.add(Param.defaultTo("errors", "stop", "error handler configuration"))
.add(Param.defaultTo("threads", "1").setRegex("\\d+|\\d+x|auto")
.setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("stride").setRegex("\\d+"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second")).add(
Param.defaultTo("cycles", "1")
.add(Param.optional("stride").setRegex("\\d+"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.defaultTo("cycles", "1")
.setRegex("\\d+[KMBGTPE]?|\\d+[KMBGTPE]?\\.\\" + ".\\d+[KMBGTPE]?")
.setDescription("cycle interval to use")).add(Param.defaultTo("recycles", "1")
.setDescription("allow cycles to be re-used this many " + "times")).add(Param.optional(
List.of("cyclerate", "targetrate", "rate"), String.class,
"rate limit for cycles per second"))
.add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class)).add(
Param.optional(
List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver", String.class))
.add(Param.defaultTo("dryrun", "none").setRegex("(op|jsonnet|emit|none)"))
.add(Param.optional("maxtries", Integer.class)).add(
Param.defaultTo(
"input", "type=atomicseq", "The type of cycle input to use for this " + "activity"))
.add(Param.optional(List.of("if","inputfilter"),String.class,"an input filter"))
.add(Param.optional("output",String.class))
.asReadOnly();
.setDescription("allow cycles to be re-used this many " + "times")).add(Param.optional(
List.of("cyclerate", "targetrate", "rate"),
String.class,
"rate limit for cycles per second"
)).add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(
List.of("workload", "yaml"),
String.class,
"location of workload yaml file"
)).add(Param.optional("driver", String.class))
.add(Param.defaultTo("dryrun", "none").setRegex("(op|jsonnet|emit|none)"))
.add(Param.optional("maxtries", Integer.class)).add(Param.defaultTo(
"input",
"type=atomicseq",
"The type of cycle input to use for this " + "activity"
)).add(Param.optional(List.of("if", "inputfilter"), String.class, "an input filter"))
.add(Param.optional("output", String.class)).asReadOnly();
@Override
public NBConfigModel getConfigModel() {
@ -933,14 +747,6 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
public void applyConfig(NBConfiguration config) {
Optional<String> directAlias = config.getOptional("alias");
// if (!directAlias.isPresent()) {
// String indirectAlias = config.getOptional(ActivityConfig.FIELD_ALIAS)
// .or(() -> config.getOptional("workload")).or(() -> config.getOptional("driver"))
// .orElse("ACTIVITYNAME");
//
// config.getMap().put("alias", indirectAlias);
// }
//
NBConfigurable.applyMatchingCollection(config, adapters.values());
this.config = new ActivityConfig(config);
@ -953,13 +759,15 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
@Override
public NBConfigModel getReconfigModel() {
return ConfigModel.of(Activity.class).add(
Param.optional("threads").setRegex("\\d+|\\d+x|auto")
.setDescription("number of concurrent operations, controlled by threadpool"))
return ConfigModel.of(Activity.class)
.add(Param.optional("threads").setRegex("\\d+|\\d+x|auto")
.setDescription("number of concurrent operations, controlled by threadpool"))
.add(Param.optional("striderate", String.class, "rate limit for strides per second"))
.add(Param.optional(
List.of("cyclerate", "targetrate", "rate"), String.class,
"rate limit for cycles per second")).asReadOnly();
List.of("cyclerate", "targetrate", "rate"),
String.class,
"rate limit for cycles per second"
)).asReadOnly();
}

View File

@ -42,12 +42,7 @@ public class AdapterResolver
NBConfiguration configSuperset
)
{
ServiceSelector<DriverAdapterLoader> selector = ServiceSelector.of(
name, ServiceLoader.load(DriverAdapterLoader.class));
DriverAdapterLoader loader = selector.get()
.orElseThrow(() -> new OpConfigError("No DriverAdapterLoader found for " + name));
DriverAdapter<CycleOp<?>, Space> adapter = loader.load(parent, NBLabels.forKV());
DriverAdapter<? extends CycleOp<?>, Space> adapter = loadNamedAdapter(parent, name);
if (adapter instanceof NBConfigurable configurable) {
NBConfigModel adapterModel = configurable.getConfigModel();
NBConfiguration matchingConfig = adapterModel.matchConfig(configSuperset.getMap());
@ -56,4 +51,17 @@ public class AdapterResolver
return adapter;
}
public static DriverAdapter<? extends CycleOp<?>, Space> loadNamedAdapter(
NBComponent parent,
String adapterName
)
{
ServiceSelector<DriverAdapterLoader> selector =
ServiceSelector.of(adapterName, ServiceLoader.load(DriverAdapterLoader.class));
DriverAdapterLoader loader = selector.get()
.orElseThrow(() -> new OpConfigError("No DriverAdapterLoader found for " + adapterName));
DriverAdapter<CycleOp<?>, Space> adapter = loader.load(parent, NBLabels.forKV());
return adapter;
}
}

View File

@ -2,13 +2,13 @@ package io.nosqlbench.engine.api.activityimpl.uniform;
/*
* Copyright (c) nosqlbench
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -121,10 +121,6 @@ public class OpResolution implements Tagged {
resolvingDispenser = true;
if (dispenser == null) {
this.dispenser = dispenserResolver.apply(resolveAdapter(), getParsedOp());
// ParsedOp pop = resolveParsedOp();
// DriverAdapter<OPTYPE, SPACETYPE> adapter = (DriverAdapter<OPTYPE, SPACETYPE>) resolveAdapter();
// // TODO verify whether or not it is necessary to ensure static mapping between adapter and mapper instance
// this.dispenser = adapter.getOpMapper().apply(adapter, pop, adapter.getSpaceFunc(pop));
}
resolvingDispenser = false;
return (OpDispenser<OPTYPE>) dispenser;

View File

@ -51,9 +51,14 @@ public class OpResolverBank {
this.optpl = reference;
this.tagFilter = new TagFilter(tagFilter);
OpTemplates activeOpTemplates = reference.matching(tagFilter, false);
if (reference.size() > 0 && activeOpTemplates.size() == 0) {
String message =
"There were no active op templates with tag filter '" + tagFilter + "', since all "
+ reference.size() + " were filtered out. Examine the session log for details";
}
for (OpTemplate opTemplate : activeOpTemplates) {
OpResolution
opres =
OpResolution opres =
new OpResolution(activity, adapterF, opTemplate, popF, dispF, this);
resolvers.add(opres);
}

View File

@ -68,7 +68,7 @@
<option name="ALIGN_MULTILINE_THROWS_LIST" value="true" />
<option name="ALIGN_MULTILINE_EXTENDS_LIST" value="true" />
<option name="ALIGN_MULTILINE_ARRAY_INITIALIZER_EXPRESSION" value="true" />
<option name="CALL_PARAMETERS_WRAP" value="1" />
<option name="CALL_PARAMETERS_WRAP" value="5" />
<option name="CALL_PARAMETERS_LPAREN_ON_NEXT_LINE" value="true" />
<option name="CALL_PARAMETERS_RPAREN_ON_NEXT_LINE" value="true" />
<option name="METHOD_PARAMETERS_WRAP" value="5" />
@ -80,6 +80,7 @@
<option name="EXTENDS_KEYWORD_WRAP" value="1" />
<option name="THROWS_KEYWORD_WRAP" value="1" />
<option name="METHOD_CALL_CHAIN_WRAP" value="1" />
<option name="PARENTHESES_EXPRESSION_LPAREN_WRAP" value="true" />
<option name="BINARY_OPERATION_WRAP" value="1" />
<option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
<option name="TERNARY_OPERATION_WRAP" value="1" />
@ -87,7 +88,6 @@
<option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true" />
<option name="ARRAY_INITIALIZER_RBRACE_ON_NEXT_LINE" value="true" />
<option name="ASSIGNMENT_WRAP" value="1" />
<option name="PLACE_ASSIGNMENT_SIGN_ON_NEXT_LINE" value="true" />
<option name="WRAP_COMMENTS" value="true" />
<option name="ASSERT_STATEMENT_WRAP" value="5" />
<option name="SWITCH_EXPRESSIONS_WRAP" value="5" />

View File

@ -75,7 +75,7 @@ ij_java_align_group_field_declarations = false
ij_java_align_multiline_annotation_parameters = false
ij_java_align_multiline_array_initializer_expression = true
ij_java_align_multiline_assignment = true
ij_java_align_multiline_binary_operation = false
ij_java_align_multiline_binary_operation = true
ij_java_align_multiline_chained_methods = false
ij_java_align_multiline_deconstruction_list_components = true
ij_java_align_multiline_extends_list = true
@ -95,12 +95,12 @@ ij_java_align_types_in_multi_catch = true
ij_java_annotation_parameter_wrap = on_every_item
ij_java_array_initializer_new_line_after_left_brace = true
ij_java_array_initializer_right_brace_on_new_line = true
ij_java_array_initializer_wrap = on_every_item
ij_java_array_initializer_wrap = normal
ij_java_assert_statement_colon_on_next_line = false
ij_java_assert_statement_wrap = on_every_item
ij_java_assignment_wrap = normal
ij_java_binary_operation_sign_on_next_line = false
ij_java_binary_operation_wrap = on_every_item
ij_java_binary_operation_sign_on_next_line = true
ij_java_binary_operation_wrap = normal
ij_java_blank_lines_after_anonymous_class_header = 0
ij_java_blank_lines_after_class_header = 0
ij_java_blank_lines_after_imports = 1
@ -121,8 +121,8 @@ ij_java_block_comment_add_space = false
ij_java_block_comment_at_first_column = true
ij_java_builder_methods =
ij_java_call_parameters_new_line_after_left_paren = true
ij_java_call_parameters_right_paren_on_new_line = false
ij_java_call_parameters_wrap = normal
ij_java_call_parameters_right_paren_on_new_line = true
ij_java_call_parameters_wrap = on_every_item
ij_java_case_statement_on_separate_line = true
ij_java_catch_on_new_line = false
ij_java_class_annotation_wrap = split_into_lines
@ -239,8 +239,8 @@ ij_java_parameter_annotation_wrap = on_every_item
ij_java_parameter_name_prefix =
ij_java_parameter_name_suffix =
ij_java_parentheses_expression_new_line_after_left_paren = true
ij_java_parentheses_expression_right_paren_on_new_line = true
ij_java_place_assignment_sign_on_next_line = true
ij_java_parentheses_expression_right_paren_on_new_line = false
ij_java_place_assignment_sign_on_next_line = false
ij_java_prefer_longer_names = true
ij_java_prefer_parameters_wrap = false
ij_java_record_components_wrap = on_every_item
@ -359,7 +359,7 @@ ij_java_subclass_name_prefix =
ij_java_subclass_name_suffix = Impl
ij_java_switch_expressions_wrap = on_every_item
ij_java_ternary_operation_signs_on_next_line = false
ij_java_ternary_operation_wrap = on_every_item
ij_java_ternary_operation_wrap = normal
ij_java_test_name_prefix =
ij_java_test_name_suffix = Test
ij_java_throws_keyword_wrap = normal