mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
temp checkpoint
This commit is contained in:
parent
7153ef6ee1
commit
8c62576ff1
@ -17,23 +17,27 @@
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import com.datastax.oss.driver.api.core.cql.*;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4DriverAdapter;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Space;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
import io.nosqlbench.adapter.cqld4.diagnostics.CQLD4PreparedStmtDiagnostics;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.FieldBindingsMetadata;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.components.core.NBNamedElement;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.virtdata.core.templates.BindPoint;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedTemplateString;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPreparedStatement> {
|
||||
public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPreparedStatement> implements FieldBindingsMetadata {
|
||||
private final static Logger logger = LogManager.getLogger(Cqld4PreparedStmtDispenser.class);
|
||||
|
||||
private final RSProcessors processors;
|
||||
@ -41,6 +45,7 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPre
|
||||
private final ParsedTemplateString stmtTpl;
|
||||
private final LongFunction<Object[]> fieldsF;
|
||||
private final LongFunction<Cqld4Space> spaceInitF;
|
||||
private final LongFunction<PreparedStatement> cachedStatementF;
|
||||
private PreparedStatement preparedStmt;
|
||||
// This is a stable enum for the op template from the workload, bounded by cardinality of all op templates
|
||||
private int refkey;
|
||||
@ -57,7 +62,8 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPre
|
||||
this.stmtTpl = stmtTpl;
|
||||
this.fieldsF = getFieldsFunction(op);
|
||||
this.spaceInitF = spaceInitF;
|
||||
stmtFunc = createStmtFunc(fieldsF, op);
|
||||
this.cachedStatementF = getCachedStatementF(fieldsF, op);
|
||||
stmtFunc = createStmtFunc(fieldsF,cachedStatementF, op);
|
||||
}
|
||||
|
||||
private LongFunction<Object[]> getFieldsFunction(ParsedOp op) {
|
||||
@ -68,8 +74,7 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPre
|
||||
}
|
||||
|
||||
|
||||
protected LongFunction<Statement> createStmtFunc(LongFunction<Object[]> fieldsF, ParsedOp op) {
|
||||
|
||||
protected LongFunction<PreparedStatement> getCachedStatementF(LongFunction<Object[]> fieldsF, ParsedOp op) {
|
||||
try {
|
||||
String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");
|
||||
|
||||
@ -80,9 +85,21 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPre
|
||||
(long l) -> spaceInitF.apply(l);
|
||||
|
||||
int refKey = op.getRefKey();
|
||||
LongFunction<PreparedStatement> cachedStatementF =
|
||||
(long l) -> lookupSpaceF.apply(l).getOrCreatePreparedStatement(refKey,prepareStatementF);
|
||||
LongFunction<PreparedStatement> cStmtF = (long l) -> lookupSpaceF.apply(
|
||||
l).getOrCreatePreparedStatement(refKey, prepareStatementF);
|
||||
|
||||
return cStmtF;
|
||||
} catch (Exception e) {
|
||||
throw new OpConfigError(e + "( for statement '" + stmtTpl + "')");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected LongFunction<Statement> createStmtFunc(LongFunction<Object[]> fieldsF,
|
||||
LongFunction<PreparedStatement> cachedStatementF,
|
||||
ParsedOp op) {
|
||||
|
||||
try {
|
||||
LongFunction<Statement> boundStatementF =
|
||||
(long l) -> cachedStatementF.apply(l).bind(fieldsF.apply(l));
|
||||
|
||||
@ -94,6 +111,26 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser<Cqld4CqlPre
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String,BindPoint> getFieldBindingsMap() {
|
||||
PreparedStatement ps = this.cachedStatementF.apply(0);
|
||||
|
||||
ColumnDefinitions cdefs = ps.getVariableDefinitions();
|
||||
List<BindPoint> bdefs = stmtTpl.getBindPoints();
|
||||
|
||||
if (cdefs.size()!=bdefs.size()){
|
||||
throw new OpConfigError("The number of column defs does not match the number of " +
|
||||
"bindings specified for " + this.getOpName());
|
||||
}
|
||||
|
||||
Map<String,BindPoint> fbmap = new LinkedHashMap<>(cdefs.size());
|
||||
for (int i = 0; i < cdefs.size(); i++) {
|
||||
ColumnDefinition cdef = cdefs.get(i);
|
||||
fbmap.put(cdefs.get(i).getName().asCql(true),bdefs.get(i));
|
||||
}
|
||||
return fbmap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4CqlPreparedStatement getOp(long cycle) {
|
||||
BoundStatement stmt = (BoundStatement) stmtFunc.apply(cycle);
|
||||
|
@ -246,6 +246,7 @@ public class CGWorkloadExporter implements BundledApp {
|
||||
case "select_seq" -> genSelectOpTemplates(model, blockname);
|
||||
case "scan_10_seq" -> genScanOpTemplates(model, blockname);
|
||||
case "update_seq" -> genUpdateOpTemplates(model, blockname);
|
||||
case "verify_rampup" -> genVerifyRampupTemplates(model,blockname,"main_insert");
|
||||
default -> throw new RuntimeException("Unable to create block entries for " + component + ".");
|
||||
};
|
||||
block.putAll(additions);
|
||||
@ -379,29 +380,74 @@ public class CGWorkloadExporter implements BundledApp {
|
||||
}
|
||||
|
||||
|
||||
|
||||
private Map<String, Object> genSelectOpTemplates(CqlModel model, String blockname) {
|
||||
Map<String, Object> blockdata = new LinkedHashMap<>();
|
||||
Map<String, Object> ops = new LinkedHashMap<>();
|
||||
blockdata.put("ops", ops);
|
||||
for (CqlTable table : model.getTableDefs()) {
|
||||
ops.put(
|
||||
namer.nameFor(table, "optype", "select", "blockname", blockname),
|
||||
Map.of(
|
||||
"prepared", genSelectSyntax(table),
|
||||
"timeout", timeouts.get("select"),
|
||||
"ratio", readRatioFor(table)
|
||||
)
|
||||
namer.nameFor(table, "optype", "select", "blockname", blockname),
|
||||
Map.of(
|
||||
"prepared", genSelectSyntax(table,false),
|
||||
"timeout", timeouts.get("select"),
|
||||
"ratio", readRatioFor(table)
|
||||
)
|
||||
);
|
||||
}
|
||||
return blockdata;
|
||||
}
|
||||
private Map<String, Object> genVerifyRampupTemplates(CqlModel model, String blockname,
|
||||
String refBlockName) {
|
||||
|
||||
private String genSelectSyntax(CqlTable table) {
|
||||
Map<String, Object> blockdata = new LinkedHashMap<>();
|
||||
Map<String, Object> ops = new LinkedHashMap<>();
|
||||
|
||||
// select
|
||||
blockdata.put("ops", ops);
|
||||
for (CqlTable table : model.getTableDefs()) {
|
||||
String opName = namer.nameFor(table, "optype", "verify", "blockname", blockname);
|
||||
// String refOpName = namer.nameFor(table,"optype",);
|
||||
|
||||
ops.put(
|
||||
opName,
|
||||
Map.of(
|
||||
"prepared", genSelectSyntax(table,true),
|
||||
"timeout", timeouts.get("select"),
|
||||
"ratio", readRatioFor(table),
|
||||
"verify", "op(block:"+refBlockName+",op:"+")"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// insert
|
||||
blockdata.put("ops", ops);
|
||||
for (CqlTable table : model.getTableDefs()) {
|
||||
if (!isCounterTable(table)) {
|
||||
ops.put(
|
||||
namer.nameFor(table, "optype", "insert", "blockname", blockname),
|
||||
Map.of(
|
||||
"prepared", genInsertSyntax(table),
|
||||
"timeout", timeouts.get("insert"),
|
||||
"ratio", writeRatioFor(table)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return blockdata;
|
||||
|
||||
}
|
||||
|
||||
private String genSelectSyntax(CqlTable table, boolean withCapture) {
|
||||
return """
|
||||
select * from KEYSPACE.TABLE
|
||||
select FIELDS from KEYSPACE.TABLE
|
||||
where PREDICATE
|
||||
LIMIT;
|
||||
"""
|
||||
.replace("FIELDS",withCapture ? "[*]" : "*")
|
||||
.replace("KEYSPACE", table.getKeyspace().getName())
|
||||
.replace("TABLE", table.getName())
|
||||
.replace("PREDICATE", genPredicateTemplate(table, 0))
|
||||
|
@ -105,7 +105,6 @@ public class OpDef extends OpTemplate {
|
||||
private LinkedHashMap<String, String> composeTags() {
|
||||
LinkedHashMap<String, String> tagsWithName = new LinkedHashMap<>(new MultiMapLookup<>(rawOpDef.getTags(), block.getTags()));
|
||||
tagsWithName.put("block",block.getName());
|
||||
tagsWithName.put("name",this.rawOpDef.getName());
|
||||
tagsWithName.put("op",this.rawOpDef.getName());
|
||||
return tagsWithName;
|
||||
}
|
||||
|
@ -19,6 +19,8 @@ package io.nosqlbench.adapters.api.activityimpl;
|
||||
import com.codahale.metrics.Timer;
|
||||
import groovy.lang.Binding;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Validator;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.ValidatorSource;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.adapters.api.evalctx.*;
|
||||
import io.nosqlbench.adapters.api.metrics.ThreadLocalNamedTimers;
|
||||
@ -35,6 +37,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
@ -48,7 +51,9 @@ import java.util.function.LongFunction;
|
||||
* The type of operation
|
||||
*/
|
||||
public abstract class BaseOpDispenser<OP extends CycleOp<?>, SPACE extends Space>
|
||||
extends NBBaseComponent implements OpDispenser<OP> {
|
||||
extends NBBaseComponent
|
||||
implements OpDispenser<OP>, ValidatorSource
|
||||
{
|
||||
protected final static Logger logger = LogManager.getLogger(BaseOpDispenser.class);
|
||||
public static final String VERIFIER = "verifier";
|
||||
public static final String VERIFIER_INIT = "verifier-init";
|
||||
@ -230,4 +235,9 @@ public abstract class BaseOpDispenser<OP extends CycleOp<?>, SPACE extends Space
|
||||
OP op = getOp(value);
|
||||
return op;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Validator> getValidator(NBComponent parent, ParsedOp pop, OpLookup lookup) {
|
||||
return CoreOpValidators.getValidator(this, pop, lookup);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,51 @@
|
||||
package io.nosqlbench.adapters.api.activityimpl;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Validator;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class CoreOpValidators {
|
||||
private static final Logger logger = LogManager.getLogger(CoreOpValidators.class);
|
||||
|
||||
public static List<Validator> getValidator(NBComponent parent, ParsedOp pop, OpLookup lookup) {
|
||||
List<Validator> validators = new ArrayList();
|
||||
Optional<TypeAndTarget<CoreValidators, Object>> optionalValidator = pop.getOptionalTypeAndTargetEnum(
|
||||
CoreValidators.class, Object.class);
|
||||
|
||||
if (optionalValidator.isPresent()) {
|
||||
TypeAndTarget<CoreValidators, Object> validator = optionalValidator.get();
|
||||
logger.debug("found validator '" + validator.enumId.name() + "' for op '" + pop.getName() + "'");
|
||||
switch (validator.enumId) {
|
||||
case verify_fields:
|
||||
validators.add(new FieldVerifier(parent, pop, lookup));
|
||||
}
|
||||
}
|
||||
|
||||
return validators;
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
package io.nosqlbench.adapters.api.activityimpl;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Validator;
|
||||
|
||||
public enum CoreValidators {
|
||||
verify_fields(FieldVerifier.class);
|
||||
private final Class<? extends Validator> validatorImpl;
|
||||
|
||||
CoreValidators(Class<? extends Validator> validatorClass) {
|
||||
this.validatorImpl = validatorClass;
|
||||
}
|
||||
}
|
@ -0,0 +1,237 @@
|
||||
package io.nosqlbench.adapters.api.activityimpl;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Validator;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DiffType;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.virtdata.core.templates.BindPoint;
|
||||
import io.nosqlbench.virtdata.core.templates.CapturePoints;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
// TODO: Make op(verifyref) use tags, and require 1
|
||||
public class FieldVerifier implements Validator {
|
||||
|
||||
private final LongFunction<Map<String, Object>> expectedValuesF;
|
||||
private final DiffType diffType;
|
||||
private final NBMetricCounter resultsVerifiedError;
|
||||
private final NBMetricCounter resultsOkCounter;
|
||||
private final NBMetricCounter verifiedFieldsCounter;
|
||||
private final String[] fieldNames;
|
||||
private final String[] bindingNames;
|
||||
|
||||
public FieldVerifier(NBComponent parent, ParsedOp pop, OpLookup lookup) {
|
||||
this.resultsVerifiedError = parent.create().counter(
|
||||
"results_verified_error", MetricCategory.Verification,
|
||||
"The number of results which have been verified with no error"
|
||||
);
|
||||
this.resultsOkCounter = parent.create().counter(
|
||||
"results_verified_ok",
|
||||
MetricCategory.Verification,
|
||||
"The number of results which had " + "a verification error"
|
||||
);
|
||||
this.verifiedFieldsCounter = parent.create().counter(
|
||||
"field_verified_ok", MetricCategory.Verification,
|
||||
"the number of fields in results which have been verified with no error"
|
||||
);
|
||||
|
||||
this.diffType = pop.takeEnumFromFieldOr(DiffType.class, DiffType.all, "compare");
|
||||
|
||||
List<String> fields = new ArrayList<>();
|
||||
List<String> bindings = new ArrayList<>();
|
||||
CapturePoints captures = pop.getCaptures();
|
||||
|
||||
ParsedOp config = pop.takeAsSubConfig("verify_fields");
|
||||
|
||||
Optional<Object> vspec = config.takeOptionalStaticValue("verify_fields", Object.class);
|
||||
if (vspec.isPresent()) {
|
||||
Object vspeco = vspec.get();
|
||||
if (vspeco instanceof Map verifyers) {
|
||||
verifyers.forEach((k, v) -> {
|
||||
if (k instanceof CharSequence keyName && v instanceof CharSequence keyValue) {
|
||||
fields.add(keyName.toString());
|
||||
bindings.add(keyValue.toString());
|
||||
|
||||
} else {
|
||||
throw new RuntimeException(
|
||||
"Strings must be used in map form of " + "verify_field");
|
||||
}
|
||||
|
||||
});
|
||||
} else if (vspeco instanceof String verifyBindingSpec) {
|
||||
parseFieldSpec(verifyBindingSpec, lookup, fields, bindings, captures, pop);
|
||||
} else {
|
||||
throw new OpConfigError("Unrecognized type for verify_fields value:" + vspeco.getClass().getSimpleName());
|
||||
}
|
||||
} else {
|
||||
config.getDefinedNames().forEach(name -> {
|
||||
fields.add(name);
|
||||
bindings.add(config.getStaticValue(name));
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
List<BindPoint> bindPoints = pop.getBindPoints();
|
||||
|
||||
// Optional<String> vb = config.getOptionalStaticValue("verify_bindings", String.class);
|
||||
// if (vb.isPresent()) {
|
||||
// String verifyBindingSpec = vb.get();
|
||||
// if (verifyBindingSpec.startsWith("op(") && verifyBindingSpec.endsWith(")")) {
|
||||
// String toLookup = verifyBindingSpec.substring(2, verifyBindingSpec.lastIndexOf(-1));
|
||||
// ParsedOp referenced = lookup.lookup(toLookup).orElseThrow();
|
||||
// }
|
||||
// }
|
||||
|
||||
this.fieldNames = fields.toArray(new String[fields.size()]);
|
||||
this.bindingNames = bindings.toArray(new String[bindings.size()]);
|
||||
this.expectedValuesF = pop.newOrderedMapBinder(bindingNames);
|
||||
|
||||
}
|
||||
|
||||
private void parseFieldSpec(
|
||||
String fieldSpec, OpLookup lookup, List<String> fields,
|
||||
List<String> bindings, CapturePoints captures, ParsedOp pop
|
||||
) {
|
||||
if (fieldSpec.startsWith("op(") && fieldSpec.endsWith(")")) {
|
||||
String toLookup = fieldSpec.substring("op(".length(), fieldSpec.length() - 1);
|
||||
Optional<ParsedOp> referenced = lookup.lookup(toLookup);
|
||||
if (referenced.isPresent()) {
|
||||
List<String> vars = referenced.get().getBindPoints().stream().map(
|
||||
bp -> bp.getAnchor()).toList();
|
||||
fields.addAll(vars);
|
||||
bindings.addAll(vars);
|
||||
} else {
|
||||
throw new OpConfigError(
|
||||
"no op found for verify setting '" + fieldSpec + "' " + "for op " + "template" + " '" + pop.getName() + "'");
|
||||
}
|
||||
} else {
|
||||
String[] vfields = fieldSpec.split("\\s*,\\s*");
|
||||
for (String vfield : vfields) {
|
||||
// if (vfield.equals("*")) {
|
||||
// fields.addAll(captures.getAsNames());
|
||||
// fields.addAll(bindPoints.stream().map(bp -> bp.getAnchor()).toList());
|
||||
// } else
|
||||
if (vfield.startsWith("+")) {
|
||||
fields.add(vfield.substring(1));
|
||||
} else if (vfield.startsWith("-")) {
|
||||
fields.remove(vfield.substring(1));
|
||||
} else if (vfield.matches("\\w+(\\w+->[\\w-]+)?")) {
|
||||
String[] parts = vfield.split("->", 2);
|
||||
fields.add(parts[0]);
|
||||
bindings.add(parts[1]);
|
||||
} else {
|
||||
throw new RuntimeException("unknown verify_fields format: '" + vfield + "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// Compare the values of the row with the values generated.
|
||||
///
|
||||
/// Specifically,
|
||||
/// - Ensure the same number of fields.
|
||||
/// - Ensure the same pair-wise field names.
|
||||
/// - Ensure that each pair of same-named fields has the same data type.
|
||||
/// - Ensure that the value of each pair of fields is equal according to the equals
|
||||
/// operator for the respective type.
|
||||
/// @return a count of differences between the row and the reference values
|
||||
@Override
|
||||
public void validate(long cycle, Object data) {
|
||||
if (data instanceof Map<?, ?> r) {
|
||||
Map<String, ?> result = (Map<String, ?>) r;
|
||||
Map<String, Object> referenceMap = this.expectedValuesF.apply(cycle);
|
||||
|
||||
int diff = 0;
|
||||
StringBuilder logbuffer = new StringBuilder(); // make this a TL
|
||||
logbuffer.setLength(0);
|
||||
|
||||
if (diffType.is(DiffType.reffields)) {
|
||||
|
||||
List<String> missingRowFields = Arrays.stream(this.fieldNames).filter(
|
||||
gk -> !result.containsKey(gk)).collect(Collectors.toList());
|
||||
if (missingRowFields.size() > 0) {
|
||||
diff += missingRowFields.size();
|
||||
|
||||
logbuffer.append("\nexpected fields '");
|
||||
logbuffer.append(String.join("','", missingRowFields));
|
||||
logbuffer.append("' not in row.");
|
||||
}
|
||||
}
|
||||
|
||||
// if (diffType.is(DiffType.rowfields)) {
|
||||
// List<String> missingRefFields = result.keySet().stream().filter(
|
||||
// k -> !referenceMap.containsKey(k)).collect(Collectors.toList());
|
||||
// if (missingRefFields.size() > 0) {
|
||||
// diff += missingRefFields.size();
|
||||
//
|
||||
// logbuffer.append("\nexpected fields '");
|
||||
// logbuffer.append(String.join("','", missingRefFields));
|
||||
// logbuffer.append("' not in reference data: " + referenceMap);
|
||||
// }
|
||||
// }
|
||||
|
||||
if (diffType.is(DiffType.values)) {
|
||||
for (int fidx = 0; fidx < fieldNames.length; fidx++) {
|
||||
String fname = fieldNames[fidx];
|
||||
;
|
||||
String rname = bindingNames[fidx];
|
||||
if (referenceMap.containsKey(rname)) {
|
||||
if (referenceMap.get(rname).equals(result.get(fname))) {
|
||||
verifiedFieldsCounter.inc();
|
||||
} else {
|
||||
logbuffer.append("\nvalue differs for '").append(fname).append("' ");
|
||||
logbuffer.append("expected:'").append(
|
||||
referenceMap.get(fname).toString()).append("'");
|
||||
logbuffer.append(" actual:'").append(result.get(rname)).append("'");
|
||||
diff++;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
if (diff == 0) {
|
||||
resultsVerifiedError.inc();
|
||||
} else {
|
||||
resultsOkCounter.inc();
|
||||
throw new RuntimeException("in cycle " + cycle + ", " + logbuffer.toString());
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new OpConfigError("Can only validate fields of type Map");
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "verify_fields";
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,27 @@
|
||||
package io.nosqlbench.adapters.api.activityimpl;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public interface OpLookup {
|
||||
Optional<ParsedOp> lookup(String opName);
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package io.nosqlbench.adapters.api.activityimpl.uniform;
|
||||
|
||||
import java.util.Map;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.nb.api.components.core.NBNamedElement;
|
||||
import io.nosqlbench.virtdata.core.templates.BindPoint;
|
||||
|
||||
/// This optional type allows for [OpDispenser] (or other) implementations to
|
||||
/// map native field names to their associated binding names. Often, the
|
||||
/// adapter-native logic is the only place this association can be derived, although
|
||||
/// it is sometimes needed in core adapter-agnostic logic.
|
||||
public interface FieldBindingsMetadata<FIELDTYPE> {
|
||||
|
||||
/// Get the map of native fields to bind points.
|
||||
/// The bind points don't need to be the same actual object which is used, but both the
|
||||
/// field names and the binding points should be equivalent as in [Object#equals].
|
||||
/// @return an ordered map of native driver/client fields to their associated bindpoints.
|
||||
Map<String, BindPoint> getFieldBindingsMap();
|
||||
|
||||
}
|
@ -2,13 +2,13 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@ -18,6 +18,8 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
|
||||
*/
|
||||
|
||||
|
||||
public interface Validator<RESULT> {
|
||||
public void validate(RESULT result);
|
||||
import io.nosqlbench.nb.api.components.core.NBNamedElement;
|
||||
|
||||
public interface Validator<RESULT> extends NBNamedElement {
|
||||
public void validate(long cycle, RESULT result);
|
||||
}
|
||||
|
@ -2,13 +2,13 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@ -18,12 +18,15 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/// A [DriverAdapter] may implement this interface to provide adapter-specific
|
||||
/// validators.
|
||||
public interface ValidatorSource {
|
||||
Optional<Validator> getValidator(String name, ParsedOp pop);
|
||||
List<Validator> getValidator(NBComponent parent, ParsedOp pop, OpLookup lookup);
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ public class AssertingOp<T> implements CycleOp<T> {
|
||||
@Override
|
||||
public T apply(long value) {
|
||||
T result = op.apply(value);
|
||||
validator.validate(result);
|
||||
validator.validate(value, result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,55 @@
|
||||
package io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
public enum DiffType {
|
||||
|
||||
|
||||
/// Verify nothing for this statement
|
||||
none(0),
|
||||
|
||||
/// Verify that fields named in the row are present in the reference map.
|
||||
rowfields(0x1),
|
||||
|
||||
/// Verify that fields in the reference map are present in the row data.
|
||||
reffields(0x1 << 1),
|
||||
|
||||
/// Verify that all fields present in either the row or the reference data
|
||||
/// are also present in the other.
|
||||
fields(0x1 | 0x1 << 1),
|
||||
|
||||
/// Verify that all values of the same named field are equal, according to
|
||||
/// {@link Object#equals(Object)}}.
|
||||
values(0x1<<2),
|
||||
|
||||
/// Cross-verify all fields and field values between the reference data and
|
||||
/// the actual data.
|
||||
all(0x1|0x1<<1|0x1<<2);
|
||||
|
||||
public int bitmask;
|
||||
|
||||
DiffType(int bit) {
|
||||
this.bitmask = bit;
|
||||
}
|
||||
|
||||
public boolean is(DiffType option) {
|
||||
return (bitmask & option.bitmask) > 0;
|
||||
}
|
||||
|
||||
}
|
@ -30,6 +30,7 @@ import io.nosqlbench.nb.api.config.fieldreaders.DynamicFieldReader;
|
||||
import io.nosqlbench.nb.api.config.fieldreaders.StaticFieldReader;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigError;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.engine.util.Tagged;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.nb.api.labels.NBLabelSpec;
|
||||
import io.nosqlbench.nb.api.labels.NBLabels;
|
||||
@ -380,7 +381,7 @@ prepared: false
|
||||
field within the set of possible fields. More than one will throw an error.</LI>
|
||||
</UL>
|
||||
</P> */
|
||||
public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String, ?>>, NBComponent, StaticFieldReader, DynamicFieldReader {
|
||||
public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String, ?>>, NBComponent, StaticFieldReader, DynamicFieldReader, Tagged {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ParsedOp.class);
|
||||
|
||||
@ -418,9 +419,12 @@ public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String
|
||||
List<Function<Map<String, Object>, Map<String, Object>>> preprocessors,
|
||||
NBComponent parent
|
||||
) {
|
||||
// TODO: the block and op name below should be populated more robustly
|
||||
// They should not be strictly required, but a way of taking "what is provided" in the
|
||||
// name should be used
|
||||
super(
|
||||
parent,
|
||||
NBLabels.forKV(((parent instanceof ParsedOp) ? "subop" : "op"), opTemplate.getName())
|
||||
NBLabels.forMap(opTemplate.getTags())
|
||||
);
|
||||
this._opTemplate = opTemplate;
|
||||
this.activityCfg = activityCfg;
|
||||
@ -894,6 +898,7 @@ public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String
|
||||
return tmap.getOptionalTargetEnum(enumclass, valueClass);
|
||||
}
|
||||
|
||||
|
||||
public <E extends Enum<E>, V> Optional<TypeAndTarget<E, V>> getOptionalTypeAndTargetEnum(
|
||||
Class<E> enumclass, Class<V> valueClass) {
|
||||
return tmap.getOptionalTargetEnum(enumclass, valueClass);
|
||||
@ -1025,6 +1030,11 @@ public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String
|
||||
return this._opTemplate.getRefKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getTags() {
|
||||
return this._opTemplate.getTags();
|
||||
}
|
||||
|
||||
|
||||
public static enum SubOpNaming {
|
||||
SubKey, ParentAndSubKey
|
||||
@ -1046,9 +1056,9 @@ public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String
|
||||
return new ParsedOp(
|
||||
new OpData(
|
||||
"sub-op of '" + this.getName() + "' field '" + fromOpField + "', element '" + elemName + "' name '" + subopName + "'",
|
||||
subopName, new LinkedHashMap<String, String>(_opTemplate.getTags()) {{
|
||||
put("subop", subopName);
|
||||
}}, _opTemplate.getBindings(), _opTemplate.getParams(), opfields, 100
|
||||
subopName,
|
||||
new LinkedHashMap<String, String>(Map.of("subop", subopName)),
|
||||
_opTemplate.getBindings(), _opTemplate.getParams(), opfields, 100
|
||||
), this.activityCfg, List.of(), this
|
||||
);
|
||||
}
|
||||
@ -1096,10 +1106,23 @@ public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String
|
||||
return subOpMap;
|
||||
}
|
||||
|
||||
public ParsedOp takeAsSubConfig(String s) {
|
||||
Object subtree = tmap.takeStaticValue(s, Object.class);
|
||||
if (subtree instanceof Map map) {
|
||||
return makeSubOp(s, s, map, SubOpNaming.SubKey);
|
||||
} else if (subtree instanceof String seq) {
|
||||
return makeSubOp(s, s, Map.of(s, seq), SubOpNaming.SubKey);
|
||||
} else {
|
||||
throw new RuntimeException(
|
||||
"unable to make sub config from key '" + s + "', because " + "it is a " + subtree.getClass().getCanonicalName());
|
||||
}
|
||||
}
|
||||
|
||||
public ParsedOp getAsSubOp(String name, SubOpNaming naming) {
|
||||
Object o = _opTemplate.getOp().map(raw -> raw.get(name)).orElseThrow(
|
||||
() -> new OpConfigError(
|
||||
"Could not find op field '" + name + "' for subop on parent op '" + name + "'"));
|
||||
|
||||
if (o instanceof Map map) {
|
||||
return makeSubOp(this.getName(), name, map, naming);
|
||||
} else {
|
||||
@ -1230,8 +1253,8 @@ public class ParsedOp extends NBBaseComponent implements LongFunction<Map<String
|
||||
return tmap.getCaptures();
|
||||
}
|
||||
|
||||
public Map<String, String> getBindPoints() {
|
||||
return null;
|
||||
public List<BindPoint> getBindPoints() {
|
||||
return tmap.getBindPoints();
|
||||
}
|
||||
|
||||
public boolean isDefinedExactly(String... fields) {
|
||||
|
@ -24,8 +24,8 @@ import java.util.regex.Pattern;
|
||||
|
||||
public class MapLabels implements NBLabels {
|
||||
|
||||
// private final static Logger logger = LogManager.getLogger(MapLabels.class);
|
||||
protected final Map<String,String> labels;
|
||||
// private final static Logger logger = LogManager.getLogger(MapLabels.class);
|
||||
protected final Map<String, String> labels;
|
||||
|
||||
public MapLabels(final Map<String, String> labels) {
|
||||
verifyValidNamesAndValues(labels);
|
||||
@ -34,23 +34,26 @@ public class MapLabels implements NBLabels {
|
||||
}
|
||||
|
||||
|
||||
public MapLabels(final Map<String,String> parentLabels, final Map<String,String> childLabels) {
|
||||
public MapLabels(
|
||||
final Map<String, String> parentLabels, final Map<String, String> childLabels) {
|
||||
final Map<String, String> combined = new LinkedHashMap<>(parentLabels);
|
||||
childLabels.forEach((k,v) -> {
|
||||
if (combined.containsKey(k))
|
||||
throw new RuntimeException("Can't overlap label keys (for instance " + k + ") between parent and child elements. parent:" + parentLabels + ", child:" + childLabels);
|
||||
combined.put(k,v);
|
||||
childLabels.forEach((k, v) -> {
|
||||
if (combined.containsKey(k)) throw new RuntimeException(
|
||||
"Can't overlap label (any) key '" + k + "' between parent (a " + parentLabels.getClass().getSimpleName() + ") and child (a " + childLabels.getClass().getSimpleName() + ") parent:" + parentLabels + ", child:" + childLabels);
|
||||
combined.put(k, v);
|
||||
});
|
||||
verifyValidNamesAndValues(combined);
|
||||
// verifyValidValues(combined);
|
||||
labels=Collections.unmodifiableMap(combined);
|
||||
labels = Collections.unmodifiableMap(combined);
|
||||
}
|
||||
|
||||
private final Pattern validNamesPattern = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*");
|
||||
|
||||
private void verifyValidNamesAndValues(Map<String, String> labels) {
|
||||
labels.forEach((label,value) -> {
|
||||
labels.forEach((label, value) -> {
|
||||
if (!validNamesPattern.matcher(label).matches()) {
|
||||
throw new RuntimeException("Invalid label name '" + label + "', only a-z,A-Z,_ are allowed as the initial character, and a-z,A-Z,0-9,_ are allowed after.");
|
||||
throw new RuntimeException(
|
||||
"Invalid label name '" + label + "', only a-z,A-Z,_ are allowed as the initial character, and a-z,A-Z,0-9,_ are allowed after.");
|
||||
}
|
||||
// if (!validNamesPattern.matcher(value).matches()) {
|
||||
// throw new RuntimeException("Invalid label value '" + value + "', only a-z,A-Z,_ are allowed as the initial character, and a-z,A-Z,0-9,_ are allowed after.");
|
||||
@ -61,13 +64,13 @@ public class MapLabels implements NBLabels {
|
||||
private void verifyValidValues(Map<String, String> labels) {
|
||||
for (String value : labels.values()) {
|
||||
if (!validNamesPattern.matcher(value).matches()) {
|
||||
throw new RuntimeException("Invalid label value '" + value + "', only a-z,A-Z,_ are allowed as the initial character, and a-z,A-Z,0-9,_ are allowed after.");
|
||||
throw new RuntimeException(
|
||||
"Invalid label value '" + value + "', only a-z,A-Z,_ are allowed as the initial character, and a-z,A-Z,0-9,_ are allowed after.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public String linearizeValues(final char delim, final String... included) {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
@ -76,8 +79,9 @@ public class MapLabels implements NBLabels {
|
||||
else includedNames.addAll(this.labels.keySet());
|
||||
|
||||
for (String includedName : includedNames) {
|
||||
final boolean optional= includedName.startsWith("[") && includedName.endsWith("]");
|
||||
includedName=optional?includedName.substring(1,includedName.length()-1):includedName;
|
||||
final boolean optional = includedName.startsWith("[") && includedName.endsWith("]");
|
||||
includedName = optional ? includedName.substring(
|
||||
1, includedName.length() - 1) : includedName;
|
||||
|
||||
final String component = this.labels.get(includedName);
|
||||
if (null == component) {
|
||||
@ -86,7 +90,7 @@ public class MapLabels implements NBLabels {
|
||||
}
|
||||
sb.append(component).append(delim);
|
||||
}
|
||||
sb.setLength(sb.length()-1);
|
||||
sb.setLength(sb.length() - 1);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
@ -101,7 +105,7 @@ public class MapLabels implements NBLabels {
|
||||
}
|
||||
}
|
||||
if (!sb.isEmpty()) {
|
||||
sb.setLength(sb.length()-"__".length());
|
||||
sb.setLength(sb.length() - "__".length());
|
||||
}
|
||||
|
||||
List<String> keys = new ArrayList<>(keyset);
|
||||
@ -110,7 +114,7 @@ public class MapLabels implements NBLabels {
|
||||
for (String key : keys) {
|
||||
sb.append("_").append(key).append("_").append(labels.get(key)).append("__");
|
||||
}
|
||||
sb.setLength(sb.length()-"__".length());
|
||||
sb.setLength(sb.length() - "__".length());
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
@ -126,7 +130,8 @@ public class MapLabels implements NBLabels {
|
||||
if (null != bareName) {
|
||||
rawName = this.labels.get(bareName);
|
||||
includedNames.remove(bareName);
|
||||
if (null == rawName) throw new RuntimeException("Unable to get value for key '" + bareName + '\'');
|
||||
if (null == rawName) throw new RuntimeException(
|
||||
"Unable to get value for key '" + bareName + '\'');
|
||||
sb.append(rawName);
|
||||
}
|
||||
if (!includedNames.isEmpty()) {
|
||||
@ -134,13 +139,9 @@ public class MapLabels implements NBLabels {
|
||||
for (final String includedName : includedNames) {
|
||||
final String includedValue = this.labels.get(includedName);
|
||||
Objects.requireNonNull(includedValue);
|
||||
sb.append(includedName)
|
||||
.append("=\"")
|
||||
.append(includedValue)
|
||||
.append('"')
|
||||
.append(',');
|
||||
sb.append(includedName).append("=\"").append(includedValue).append('"').append(',');
|
||||
}
|
||||
sb.setLength(sb.length()-",".length());
|
||||
sb.setLength(sb.length() - ",".length());
|
||||
sb.append('}');
|
||||
}
|
||||
|
||||
@ -158,7 +159,7 @@ public class MapLabels implements NBLabels {
|
||||
for (String key : keys) {
|
||||
sb.append(key).append("=\"").append(labels.get(key)).append("\",");
|
||||
}
|
||||
sb.setLength(sb.length()-",".length());
|
||||
sb.setLength(sb.length() - ",".length());
|
||||
sb.append("}");
|
||||
return sb.toString();
|
||||
|
||||
@ -176,7 +177,7 @@ public class MapLabels implements NBLabels {
|
||||
for (String key : keys) {
|
||||
sb.append(key).append("=").append(labels.get(key)).append(",");
|
||||
}
|
||||
sb.setLength(sb.length()-",".length());
|
||||
sb.setLength(sb.length() - ",".length());
|
||||
return sb.toString();
|
||||
|
||||
}
|
||||
@ -196,8 +197,8 @@ public class MapLabels implements NBLabels {
|
||||
|
||||
@Override
|
||||
public MapLabels and(final Object... labelsAndValues) {
|
||||
final Map<String,String> childLabels = getStringStringMap(labelsAndValues);
|
||||
return new MapLabels(labels,childLabels);
|
||||
final Map<String, String> childLabels = getStringStringMap(labelsAndValues);
|
||||
return new MapLabels(labels, childLabels);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -213,46 +214,51 @@ public class MapLabels implements NBLabels {
|
||||
NBLabels updated = this;
|
||||
Map<String, String> defaultMap = defaults.asMap();
|
||||
for (String name : defaultMap.keySet()) {
|
||||
updated = updated.andDefault(name,defaultMap.get(name));
|
||||
updated = updated.andDefault(name, defaultMap.get(name));
|
||||
}
|
||||
return updated;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MapLabels and(NBLabels labels) {
|
||||
return new MapLabels(this.labels,labels.asMap());
|
||||
return new MapLabels(this.labels, labels.asMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBLabels modifyName(final String nameToModify, final Function<String, String> transform) {
|
||||
if (!this.labels.containsKey(nameToModify))
|
||||
throw new RuntimeException("Missing name in labels for transform: '" + nameToModify + '\'');
|
||||
public NBLabels modifyName(
|
||||
final String nameToModify,
|
||||
final Function<String, String> transform
|
||||
) {
|
||||
if (!this.labels.containsKey(nameToModify)) throw new RuntimeException(
|
||||
"Missing name in labels for transform: '" + nameToModify + '\'');
|
||||
final LinkedHashMap<String, String> newLabels = new LinkedHashMap<>(this.labels);
|
||||
final String removedValue = newLabels.remove(nameToModify);
|
||||
final String newName = transform.apply(nameToModify);
|
||||
newLabels.put(newName,removedValue);
|
||||
newLabels.put(newName, removedValue);
|
||||
return new MapLabels(newLabels);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NBLabels modifyValue(final String labelName, final Function<String, String> transform) {
|
||||
if(!this.labels.containsKey(labelName))
|
||||
throw new RuntimeException("Unable to find label name '" + labelName + "' for value transform.");
|
||||
if (!this.labels.containsKey(labelName)) throw new RuntimeException(
|
||||
"Unable to find label name '" + labelName + "' for value transform.");
|
||||
final LinkedHashMap<String, String> newMap = new LinkedHashMap<>(this.labels);
|
||||
final String value = newMap.remove(labelName);
|
||||
if (null == value) throw new RuntimeException("The value for named label '" + labelName + "' is null.");
|
||||
newMap.put(labelName,transform.apply(value));
|
||||
if (null == value) throw new RuntimeException(
|
||||
"The value for named label '" + labelName + "' is null.");
|
||||
newMap.put(labelName, transform.apply(value));
|
||||
return NBLabels.forMap(newMap);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
if (labels.size()==0) {
|
||||
if (labels.size() == 0) {
|
||||
return "{}";
|
||||
}
|
||||
StringBuilder sb = new StringBuilder("{");
|
||||
labels.forEach((k,v) -> {
|
||||
labels.forEach((k, v) -> {
|
||||
sb.append(k).append(":\\\"").append(v).append("\\\"").append(",");
|
||||
});
|
||||
sb.setLength(sb.length()-",".length());
|
||||
sb.setLength(sb.length() - ",".length());
|
||||
sb.append("}");
|
||||
|
||||
return sb.toString();
|
||||
@ -260,10 +266,11 @@ public class MapLabels implements NBLabels {
|
||||
|
||||
@Override
|
||||
public String valueOf(final String name) {
|
||||
if (!this.labels.containsKey(name))
|
||||
throw new RuntimeException("The specified key does not exist: '" + name + '\'');
|
||||
if (!this.labels.containsKey(name)) throw new RuntimeException(
|
||||
"The specified key does not exist: '" + name + '\'');
|
||||
final String only = labels.get(name);
|
||||
if (null == only) throw new RuntimeException("The specified value is null for key '" + name + '\'');
|
||||
if (null == only) throw new RuntimeException(
|
||||
"The specified value is null for key '" + name + '\'');
|
||||
return only;
|
||||
}
|
||||
|
||||
@ -285,25 +292,27 @@ public class MapLabels implements NBLabels {
|
||||
}
|
||||
|
||||
private String[] concat(String[] a, String[] b) {
|
||||
String[] c = new String[a.length+b.length];
|
||||
System.arraycopy(a,0,c,0,a.length);
|
||||
System.arraycopy(b,0,c,a.length,b.length);
|
||||
String[] c = new String[a.length + b.length];
|
||||
System.arraycopy(a, 0, c, 0, a.length);
|
||||
System.arraycopy(b, 0, c, a.length, b.length);
|
||||
return c;
|
||||
}
|
||||
|
||||
private static String[] getNamesArray(final Object... labelsAndValues) {
|
||||
String[] keys = new String[labelsAndValues.length>>1];
|
||||
String[] keys = new String[labelsAndValues.length >> 1];
|
||||
for (int i = 0; i < keys.length; i++) {
|
||||
keys[i]=labelsAndValues[i<<1].toString();
|
||||
keys[i] = labelsAndValues[i << 1].toString();
|
||||
}
|
||||
return keys;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private static Map<String, String> getStringStringMap(Object[] labelsAndValues) {
|
||||
if (0 != (labelsAndValues.length % 2))
|
||||
throw new RuntimeException("Must provide even number of keys and values: " + Arrays.toString(labelsAndValues));
|
||||
if (0 != (labelsAndValues.length % 2)) throw new RuntimeException(
|
||||
"Must provide even number of keys and values: " + Arrays.toString(labelsAndValues));
|
||||
final Map<String, String> childLabels = new LinkedHashMap<>();
|
||||
for (int i = 0; i < labelsAndValues.length; i+=2) childLabels.put(labelsAndValues[i].toString(), labelsAndValues[i + 1].toString());
|
||||
for (int i = 0; i < labelsAndValues.length; i += 2)
|
||||
childLabels.put(labelsAndValues[i].toString(), labelsAndValues[i + 1].toString());
|
||||
return childLabels;
|
||||
}
|
||||
|
||||
@ -324,31 +333,33 @@ public class MapLabels implements NBLabels {
|
||||
|
||||
|
||||
/**
|
||||
* Take the intersection of the two label sets, considering both key
|
||||
* and value for each label entry. If both have the same label name
|
||||
* but different values for it, then that label is not considered
|
||||
* common and it is not retained in the intersection.
|
||||
* @param otherLabels The label set to intersect
|
||||
Take the intersection of the two label sets, considering both key
|
||||
and value for each label entry. If both have the same label name
|
||||
but different values for it, then that label is not considered
|
||||
common and it is not retained in the intersection.
|
||||
@param otherLabels
|
||||
The label set to intersect
|
||||
*/
|
||||
@Override
|
||||
public NBLabels intersection(NBLabels otherLabels) {
|
||||
Map<String, String> other = otherLabels.asMap();
|
||||
Map<String,String> common = new LinkedHashMap<>();
|
||||
asMap().forEach((k,v) -> {
|
||||
Map<String, String> common = new LinkedHashMap<>();
|
||||
asMap().forEach((k, v) -> {
|
||||
if (other.containsKey(k) && other.get(k).equals(v)) {
|
||||
common.put(k,v);
|
||||
common.put(k, v);
|
||||
}
|
||||
});
|
||||
return NBLabels.forMap(common);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subtract all matching labels from the other label set from this one,
|
||||
* considering label names and values. If the other label set contains
|
||||
* the same name but a different value, then it is not considered a
|
||||
* match and thus not removed from the labels of this element.
|
||||
* @param otherLabels Labels to remove, where key and value matches
|
||||
* @return The same, or a smaller set of labels for this element
|
||||
Subtract all matching labels from the other label set from this one,
|
||||
considering label names and values. If the other label set contains
|
||||
the same name but a different value, then it is not considered a
|
||||
match and thus not removed from the labels of this element.
|
||||
@param otherLabels
|
||||
Labels to remove, where key and value matches
|
||||
@return The same, or a smaller set of labels for this element
|
||||
*/
|
||||
@Override
|
||||
public NBLabels difference(NBLabels otherLabels) {
|
||||
@ -356,7 +367,7 @@ public class MapLabels implements NBLabels {
|
||||
NBLabels difference = NBLabels.forKV();
|
||||
for (String key : labels.keySet()) {
|
||||
if (!other.containsKey(key) || !other.get(key).equals(labels.get(key))) {
|
||||
difference = difference.and(key,labels.get(key));
|
||||
difference = difference.and(key, labels.get(key));
|
||||
}
|
||||
}
|
||||
return difference;
|
||||
|
@ -19,6 +19,7 @@ package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Validator;
|
||||
@ -30,6 +31,8 @@ import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/// This is a functional wrapper layer which will upgrade a basic [CycleOp] to
|
||||
@ -40,23 +43,31 @@ public class OpAssertions {
|
||||
public final static Logger logger = LogManager.getLogger(OpAssertions.class);
|
||||
|
||||
public static <OP extends CycleOp<?>, SPACE extends Space> OpDispenser<? extends OP> wrapOptionally(
|
||||
DriverAdapter<? extends OP, ? extends SPACE> adapter, OpDispenser<? extends OP> dispenser,
|
||||
ParsedOp pop
|
||||
DriverAdapter<? extends OP, ? extends SPACE> adapter,
|
||||
OpDispenser<? extends OP> dispenser,
|
||||
ParsedOp pop,
|
||||
OpLookup lookup
|
||||
) {
|
||||
|
||||
Optional<String> validatorName = pop.takeOptionalStaticValue("validator", String.class);
|
||||
if (validatorName.isEmpty()) return dispenser;
|
||||
// Optional<String> validatorName = pop.takeOptionalStaticValue("validators", String.class);
|
||||
// if (validatorName.isEmpty()) return dispenser;
|
||||
|
||||
if (adapter instanceof ValidatorSource vs) {
|
||||
Optional<Validator> validator = vs.getValidator(validatorName.get(),pop);
|
||||
if (validator.isEmpty()) {
|
||||
throw new OpConfigError(
|
||||
"a validator '" + validatorName.get() + "' was requested, but adapter '" + adapter.getAdapterName() + "' did not find it.");
|
||||
}
|
||||
return new AssertingOpDispenser(adapter, pop, dispenser, validator.get());
|
||||
} else {
|
||||
throw new OpConfigError(
|
||||
"a validator '" + validatorName.get() + "' was specified, " + "but the adapter '" + adapter.getAdapterName() + "' does " + "not implement " + adapter.getClass().getSimpleName());
|
||||
List<ValidatorSource> sources = new ArrayList<>();
|
||||
if (adapter instanceof ValidatorSource s) {
|
||||
sources.add(s);
|
||||
}
|
||||
if (dispenser instanceof ValidatorSource s) {
|
||||
sources.add(s);
|
||||
}
|
||||
|
||||
for (ValidatorSource source : sources) {
|
||||
List<Validator> validator = source.getValidator(adapter, pop, lookup);
|
||||
for (Validator v : validator) {
|
||||
dispenser = new AssertingOpDispenser(adapter, pop, dispenser, v);
|
||||
logger.trace("added post-run validator for op '" + pop.getName() + "'");
|
||||
}
|
||||
}
|
||||
|
||||
return dispenser;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
@ -65,11 +66,11 @@ public class OpFunctionComposition {
|
||||
|
||||
public static <OP extends CycleOp<?>, SPACE extends Space> OpDispenser<? extends OP> wrapOptionally(
|
||||
DriverAdapter<? extends OP, ? extends SPACE> adapter, OpDispenser<? extends OP> dispenser,
|
||||
ParsedOp pop, Dryrun dryrun
|
||||
ParsedOp pop, Dryrun dryrun, OpLookup lookup
|
||||
) {
|
||||
|
||||
dispenser = OpCapture.wrapOptionally(adapter, dispenser, pop);
|
||||
dispenser = OpAssertions.wrapOptionally(adapter, dispenser, pop);
|
||||
dispenser = OpAssertions.wrapOptionally(adapter, dispenser, pop, lookup);
|
||||
dispenser = OpDryrun.wrapOptionally(adapter, dispenser, pop, dryrun);
|
||||
|
||||
return dispenser;
|
||||
|
@ -0,0 +1,54 @@
|
||||
package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class OpLookupService implements OpLookup {
|
||||
private List<ParsedOp> pops;
|
||||
private final Supplier<List<ParsedOp>> popsF;
|
||||
|
||||
public OpLookupService(Supplier<List<ParsedOp>> popsF) {
|
||||
this.popsF = popsF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Optional<ParsedOp> lookup(String tagSpec) {
|
||||
if (pops == null) {
|
||||
pops = popsF.get();
|
||||
}
|
||||
TagFilter filter = new TagFilter(tagSpec);
|
||||
List<ParsedOp> list = filter.filter(pops);
|
||||
|
||||
if (list.size() > 1) {
|
||||
throw new RuntimeException(
|
||||
"Too many (" + list.size() + ") ops were found when looking up '" + tagSpec + "'");
|
||||
}
|
||||
if (list.size() == 0) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(list.get(0));
|
||||
}
|
||||
}
|
@ -21,6 +21,7 @@ import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
@ -396,8 +397,9 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
|
||||
// Map<String, DriverAdapter<?,?>> adapterCache,
|
||||
// Map<String, OpMapper<? extends Op>> mapperCache,
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters,
|
||||
List<ParsedOp> pops
|
||||
) {
|
||||
List<ParsedOp> pops,
|
||||
OpLookup opLookup
|
||||
) {
|
||||
try {
|
||||
|
||||
List<Long> ratios = new ArrayList<>(pops.size());
|
||||
@ -429,7 +431,14 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
|
||||
OpDispenser<? extends CycleOp<?>> dispenser = opMapper.apply(this, pop, spaceFunc);
|
||||
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
|
||||
Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun");
|
||||
dispenser = OpFunctionComposition.wrapOptionally(adapter, dispenser, pop, dryrun);
|
||||
|
||||
dispenser = OpFunctionComposition.wrapOptionally(
|
||||
adapter,
|
||||
dispenser,
|
||||
pop,
|
||||
dryrun,
|
||||
opLookup
|
||||
);
|
||||
|
||||
// if (strict) {
|
||||
// optemplate.assertConsumed();
|
||||
@ -454,20 +463,24 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
|
||||
|
||||
}
|
||||
|
||||
protected List<OpTemplate> loadOpTemplates(DriverAdapter<?, ?> defaultDriverAdapter) {
|
||||
protected List<OpTemplate> loadOpTemplates(
|
||||
DriverAdapter<?, ?> defaultDriverAdapter,
|
||||
boolean logged,
|
||||
boolean filtered
|
||||
) {
|
||||
|
||||
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
|
||||
|
||||
OpsDocList opsDocList = loadStmtsDocList();
|
||||
|
||||
List<OpTemplate> unfilteredOps = opsDocList.getOps(false);
|
||||
List<OpTemplate> filteredOps = opsDocList.getOps(tagfilter, true);
|
||||
List<OpTemplate> filteredOps = opsDocList.getOps(filtered?tagfilter:"", logged);
|
||||
|
||||
if (filteredOps.isEmpty()) {
|
||||
// There were no ops, and it *wasn't* because they were all filtered out.
|
||||
// In this case, let's try to synthesize the ops as long as at least a default driver was provided
|
||||
// But if there were no ops, and there was no default driver provided, we can't continue
|
||||
// There were no ops, and it was because they were all filtered out
|
||||
List<OpTemplate> unfilteredOps = opsDocList.getOps(false);
|
||||
if (!unfilteredOps.isEmpty()) {
|
||||
String message = "There were no active op templates with tag filter '"+ tagfilter + "', since all " +
|
||||
unfilteredOps.size() + " were filtered out. Examine the session log for details";
|
||||
@ -489,27 +502,6 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
|
||||
3) driver=stdout (or any other drive that can synthesize ops)""");
|
||||
}
|
||||
}
|
||||
// if (filteredOps.isEmpty()) {
|
||||
// throw new BasicError("There were no active op templates with tag filter '" + tagfilter + '\'');
|
||||
// }
|
||||
|
||||
// if (filteredOps.isEmpty()) {
|
||||
// throw new OpConfigError("No op templates found. You must provide either workload=... or op=..., or use " +
|
||||
// "a default driver (driver=___). This includes " +
|
||||
// ServiceLoader.load(DriverAdapter.class).stream()
|
||||
// .filter(p -> {
|
||||
// AnnotatedType[] annotatedInterfaces = p.type().getAnnotatedInterfaces();
|
||||
// for (AnnotatedType ai : annotatedInterfaces) {
|
||||
// if (ai.getType().equals(SyntheticOpTemplateProvider.class)) {
|
||||
// return true;
|
||||
// }
|
||||
// }
|
||||
// return false;
|
||||
// })
|
||||
// .map(d -> d.get().getAdapterName())
|
||||
// .collect(Collectors.joining(",")));
|
||||
// }
|
||||
//
|
||||
return filteredOps;
|
||||
}
|
||||
|
||||
@ -543,7 +535,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
|
||||
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(Function<OpTemplate,
|
||||
OpDispenser<? extends O>> opinit, boolean strict, DriverAdapter<?, ?> defaultAdapter) {
|
||||
|
||||
List<OpTemplate> stmts = loadOpTemplates(defaultAdapter);
|
||||
List<OpTemplate> stmts = loadOpTemplates(defaultAdapter,true,false);
|
||||
|
||||
List<Long> ratios = new ArrayList<>(stmts.size());
|
||||
|
||||
@ -556,6 +548,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
|
||||
.getOptionalString("seq")
|
||||
.map(SequencerType::valueOf)
|
||||
.orElse(SequencerType.bucket);
|
||||
|
||||
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
|
||||
|
||||
try {
|
||||
|
@ -21,19 +21,20 @@ import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.advisor.NBAdvisorPoint;
|
||||
import io.nosqlbench.nb.api.advisor.conditions.Conditions;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpLookupService;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
|
||||
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
import io.nosqlbench.nb.api.config.standard.*;
|
||||
import io.nosqlbench.nb.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.nb.api.labels.NBLabels;
|
||||
import io.nosqlbench.nb.api.components.events.NBEvent;
|
||||
@ -45,42 +46,41 @@ import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
|
||||
import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* This is a typed activity which is expected to become the standard
|
||||
* core of all new activity types. Extant NB drivers should also migrate
|
||||
* to this when possible.
|
||||
*
|
||||
* @param <R>
|
||||
* A type of runnable which wraps the operations for this type of driver.
|
||||
* @param <S>
|
||||
* The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
|
||||
*/
|
||||
This is a typed activity which is expected to become the standard
|
||||
core of all new activity types. Extant NB drivers should also migrate
|
||||
to this when possible.
|
||||
@param <R>
|
||||
A type of runnable which wraps the operations for this type of driver.
|
||||
@param <S>
|
||||
The context type for the activity, AKA the 'space' for a named driver instance and its
|
||||
associated object graph */
|
||||
public class StandardActivity<R extends java.util.function.LongFunction, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
|
||||
private static final Logger logger = LogManager.getLogger("ACTIVITY");
|
||||
private final OpSequence<OpDispenser<? extends CycleOp<?>>> sequence;
|
||||
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>,Space>> adapters = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<String, DriverAdapter<CycleOp<?>, Space>> adapters = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
|
||||
return super.createOpSourceFromParsedOps(adapters, pops, opLookup);
|
||||
}
|
||||
|
||||
public StandardActivity(NBComponent parent, ActivityDef activityDef) {
|
||||
super(parent, activityDef);
|
||||
NBAdvisorPoint<String> paramsAdvisor = create().advisor(b -> b.name("Workload"));
|
||||
paramsAdvisor.add(Conditions.ValidNameError);
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist = new ArrayList<>();
|
||||
List<ParsedOp> pops = new ArrayList<>();
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
|
||||
NBConfigModel activityModel = activityDef.getConfigModel();
|
||||
NBConfigModel yamlmodel;
|
||||
NBConfigModel adapterModel;
|
||||
String defaultDriverName = activityDef.getActivityDriver();
|
||||
DriverAdapter<CycleOp<?>,Space> defaultAdapter = getDriverAdapter(defaultDriverName);
|
||||
DriverAdapter<CycleOp<?>,Space> adapter;
|
||||
OpsDocList workload;
|
||||
|
||||
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
|
||||
NBConfigModel yamlmodel;
|
||||
if (yaml_loc.isPresent()) {
|
||||
Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
|
||||
workload = OpsLoader.loadPath(yaml_loc.get(), disposable, "activities");
|
||||
@ -88,107 +88,122 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
|
||||
} else {
|
||||
yamlmodel = ConfigModel.of(StandardActivity.class).asReadOnly();
|
||||
}
|
||||
yamlmodel.log();
|
||||
NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel);
|
||||
//NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(activityModel);
|
||||
// Load the op templates
|
||||
List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter);
|
||||
NBConfigModel combinedAdapterModel = ConfigModel.of(StandardActivity.class);
|
||||
for (OpTemplate ot : opTemplates) {
|
||||
logger.debug(() -> "StandardActivity.opTemplate = "+ot);
|
||||
String driverName = ot.getOptionalStringParam("driver", String.class)
|
||||
.or(() -> ot.getOptionalStringParam("type", String.class))
|
||||
.orElse(defaultDriverName);
|
||||
if (!adapters.containsKey(driverName)) {
|
||||
adapter = defaultDriverName.equals(driverName) ? defaultAdapter : getDriverAdapter(driverName);
|
||||
NBConfigModel combinedModel = yamlmodel;
|
||||
//NBConfigModel combinedModel = activityModel;
|
||||
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
if (adapter instanceof NBConfigurable configurable) {
|
||||
adapterModel = configurable.getConfigModel();
|
||||
combinedAdapterModel.add(adapterModel);
|
||||
supersetConfig.add(adapterModel);
|
||||
combinedModel = adapterModel.add(yamlmodel);
|
||||
//combinedModel = adapterModel.add(activityModel);
|
||||
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
configurable.applyConfig(combinedConfig);
|
||||
}
|
||||
adapters.put(driverName, adapter);
|
||||
mappers.put(driverName, adapter.getOpMapper());
|
||||
} else {
|
||||
adapter = adapters.get(driverName);
|
||||
}
|
||||
if (adapter instanceof NBConfigurable configurable) {
|
||||
adapterModel = configurable.getConfigModel();
|
||||
adapterModel.assertValidConfig(ot.getParams());
|
||||
}
|
||||
paramsAdvisor.validateAll(ot.getParams().keySet());
|
||||
paramsAdvisor.validateAll(ot.getTags().keySet());
|
||||
paramsAdvisor.validateAll(ot.getBindings().keySet());
|
||||
adapterlist.add(adapter);
|
||||
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
|
||||
logger.debug("StandardActivity.pop="+pop);
|
||||
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
|
||||
pops.add(pop);
|
||||
|
||||
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
|
||||
|
||||
Optional<DriverAdapter<?, ?>> defaultAdapter = defaultDriverName.flatMap(
|
||||
name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV()));
|
||||
|
||||
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
|
||||
throw new BasicError(
|
||||
"Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this driver adapter. " + "Change '<activeByDefault>false</activeByDefault>' for the driver in " + "'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml' first.");
|
||||
}
|
||||
logger.debug(() -> "StandardActivity.opTemplate loop complete");
|
||||
|
||||
paramsAdvisor.setName("Workload", "Check parameters, template, and binding names")
|
||||
.logName().evaluate();
|
||||
// HERE, op templates are loaded before drivers are loaded
|
||||
// List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter.orElse(null), false);
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist = new ArrayList<>();
|
||||
NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel);
|
||||
Optional<String> defaultDriverOption = defaultDriverName;
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
|
||||
|
||||
paramsAdvisor.clear().setName("Superset", "Check overall parameters");
|
||||
paramsAdvisor.validateAll(supersetConfig.getNamedParams().keySet());
|
||||
paramsAdvisor.logName().evaluate();
|
||||
List<ParsedOp> allParsedOps = loadOpTemplates(
|
||||
defaultAdapter.orElse(null), false, false).stream().map(ot -> upconvert(
|
||||
ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)).toList();
|
||||
|
||||
combinedAdapterModel.assertNoConflicts(yamlmodel.getNamedParams(), "Template");
|
||||
combinedAdapterModel.log();
|
||||
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
|
||||
supersetConfig.log();
|
||||
OpLookup lookup = new OpLookupService(() -> allParsedOps);
|
||||
|
||||
if (0 == mappers.keySet().stream().filter(n -> n.equals(defaultDriverName)).count()) {
|
||||
logger.warn(() -> "All op templates used a different driver than the default '" + defaultDriverName + "'");
|
||||
TagFilter ts = new TagFilter(activityDef.getParams().getOptionalString("tags").orElse(""));
|
||||
List<ParsedOp> activeParsedOps = ts.filter(allParsedOps);
|
||||
|
||||
if (defaultDriverOption.isPresent()) {
|
||||
long matchingDefault = mappers.keySet().stream().filter(
|
||||
n -> n.equals(defaultDriverOption.get())).count();
|
||||
if (0 == matchingDefault) {
|
||||
logger.warn(
|
||||
"All op templates used a different driver than the default '{}'",
|
||||
defaultDriverOption.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
sequence = createOpSourceFromParsedOps(adapterlist, pops);
|
||||
sequence = createOpSourceFromParsedOps(adapterlist, activeParsedOps, lookup);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof OpConfigError) {
|
||||
throw e;
|
||||
}
|
||||
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
throw new OpConfigError(
|
||||
"Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
}
|
||||
|
||||
create().gauge(
|
||||
"ops_pending",
|
||||
() -> this.getProgressMeter().getSummary().pending(),
|
||||
"ops_pending", () -> this.getProgressMeter().getSummary().pending(),
|
||||
MetricCategory.Core,
|
||||
"The current number of operations which have not been dispatched for processing yet."
|
||||
);
|
||||
create().gauge(
|
||||
"ops_active",
|
||||
() -> this.getProgressMeter().getSummary().current(),
|
||||
MetricCategory.Core,
|
||||
"ops_active", () -> this.getProgressMeter().getSummary().current(), MetricCategory.Core,
|
||||
"The current number of operations which have been dispatched for processing, but which have not yet completed."
|
||||
);
|
||||
create().gauge(
|
||||
"ops_complete",
|
||||
() -> this.getProgressMeter().getSummary().complete(),
|
||||
MetricCategory.Core,
|
||||
"The current number of operations which have been completed"
|
||||
"ops_complete", () -> this.getProgressMeter().getSummary().complete(),
|
||||
MetricCategory.Core, "The current number of operations which have been completed"
|
||||
);
|
||||
}
|
||||
|
||||
private DriverAdapter<CycleOp<?>, Space> getDriverAdapter(String driverName) {
|
||||
return Optional.of(driverName)
|
||||
.flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
|
||||
.map(l -> l.load(this, NBLabels.forKV())
|
||||
)
|
||||
.orElseThrow(() -> new OpConfigError("Unable to load '" + driverName + "' driver adapter.\n"+
|
||||
"If this is a valid driver then you may need to rebuild NoSqlBench to include this driver adapter. "+
|
||||
"Change '<activeByDefault>false</activeByDefault>' for the driver in "+
|
||||
"'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml'."));
|
||||
private ParsedOp upconvert(
|
||||
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
|
||||
NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist
|
||||
) {
|
||||
// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
|
||||
String driverName = ot.getOptionalStringParam("driver", String.class).or(
|
||||
() -> ot.getOptionalStringParam("type", String.class)).or(
|
||||
() -> defaultDriverOption).orElseThrow(
|
||||
() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
|
||||
|
||||
DriverAdapter<CycleOp<?>, Space> adapter = adapters.computeIfAbsent(
|
||||
driverName, dn -> loadAdapter(
|
||||
dn, yamlmodel, supersetConfig, mappers));
|
||||
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
|
||||
adapterlist.add(adapter);
|
||||
|
||||
ParsedOp pop = new ParsedOp(
|
||||
ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
|
||||
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
|
||||
|
||||
return pop;
|
||||
}
|
||||
|
||||
private DriverAdapter<CycleOp<?>, Space> loadAdapter(
|
||||
String driverName, NBConfigModel yamlmodel, NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
|
||||
) {
|
||||
DriverAdapter<CycleOp<?>, Space> adapter = Optional.of(driverName).flatMap(
|
||||
name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV())).orElseThrow(
|
||||
() -> new OpConfigError("driver adapter not present for name '" + driverName + "'"));
|
||||
|
||||
NBConfigModel combinedModel = yamlmodel;
|
||||
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
|
||||
if (adapter instanceof NBConfigurable configurable) {
|
||||
NBConfigModel adapterModel = configurable.getConfigModel();
|
||||
supersetConfig.add(adapterModel);
|
||||
|
||||
combinedModel = adapterModel.add(yamlmodel);
|
||||
combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
configurable.applyConfig(combinedConfig);
|
||||
}
|
||||
mappers.put(driverName, adapter.getOpMapper());
|
||||
return adapter;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void initActivity() {
|
||||
super.initActivity();
|
||||
@ -215,6 +230,7 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
super.onActivityDefUpdate(activityDef);
|
||||
|
||||
for (DriverAdapter<?, ?> adapter : adapters.values()) {
|
||||
if (adapter instanceof NBReconfigurable configurable) {
|
||||
NBConfigModel cfgModel = configurable.getReconfigModel();
|
||||
@ -244,7 +260,8 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
|
||||
// }
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
|
||||
public List<OpTemplate> getSyntheticOpTemplates(
|
||||
OpsDocList opsDocList, Map<String, Object> cfg) {
|
||||
List<OpTemplate> opTemplates = new ArrayList<>();
|
||||
for (DriverAdapter<?, ?> adapter : adapters.values()) {
|
||||
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
|
||||
@ -256,13 +273,13 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
|
||||
}
|
||||
|
||||
/**
|
||||
* This is done here since driver adapters are intended to keep all of their state within
|
||||
* dedicated <em>state space</em> types. Any space which implements {@link Shutdownable}
|
||||
* will be closed when this activity shuts down.
|
||||
This is done here since driver adapters are intended to keep all of their state within
|
||||
dedicated <em>state space</em> types. Any space which implements {@link Shutdownable}
|
||||
will be closed when this activity shuts down.
|
||||
*/
|
||||
@Override
|
||||
public void shutdownActivity() {
|
||||
for (Map.Entry<String, DriverAdapter<CycleOp<?>,Space>> entry : adapters.entrySet()) {
|
||||
for (Map.Entry<String, DriverAdapter<CycleOp<?>, Space>> entry : adapters.entrySet()) {
|
||||
String adapterName = entry.getKey();
|
||||
DriverAdapter<?, ?> adapter = entry.getValue();
|
||||
if (adapter instanceof AutoCloseable autoCloseable) {
|
||||
@ -276,6 +293,7 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
|
||||
return super.getLabels();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onEvent(NBEvent event) {
|
||||
switch (event) {
|
||||
@ -291,4 +309,5 @@ public class StandardActivity<R extends java.util.function.LongFunction, S> exte
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.engine.api.templating;
|
||||
|
||||
import com.fasterxml.jackson.databind.introspect.POJOPropertyBuilder;
|
||||
import io.nosqlbench.engine.api.templating.binders.ArrayBinder;
|
||||
import io.nosqlbench.engine.api.templating.binders.ListBinder;
|
||||
import io.nosqlbench.engine.api.templating.binders.OrderedMapBinder;
|
||||
@ -76,6 +77,7 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
|
||||
representation of a result. If the values are defined, then each one represents the name
|
||||
that the found value should be saved as instead of the original name.
|
||||
*/
|
||||
private final List<BindPoint> bindpoints = new ArrayList<>();
|
||||
private final CapturePoints captures = new CapturePoints();
|
||||
private final int mapsize;
|
||||
|
||||
@ -127,6 +129,7 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
|
||||
if (v instanceof CharSequence charvalue) {
|
||||
ParsedTemplateString pt = ParsedTemplateString.of(charvalue.toString(), bindings);
|
||||
this.captures.addAll(pt.getCaptures());
|
||||
this.bindpoints.addAll(pt.getBindPoints());
|
||||
switch (pt.getType()) {
|
||||
case literal:
|
||||
statics.put(k, charvalue.toString());
|
||||
@ -929,6 +932,17 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
|
||||
*/
|
||||
public LongFunction<?> getMapper(String field) {
|
||||
LongFunction<?> mapper = dynamics.get(field);
|
||||
|
||||
if (mapper == null) {
|
||||
if (bindings.containsKey(field)) {
|
||||
Optional<DataMapper<Object>> globalFunction =
|
||||
VirtData.getOptionalMapper(bindings.get(field));
|
||||
return globalFunction.orElseThrow(() ->
|
||||
new OpConfigError("mapper requsted for field '" + field + "' which is not " +
|
||||
"a defined op field nor a binding in the op template " +
|
||||
"scope."));
|
||||
}
|
||||
}
|
||||
return mapper;
|
||||
}
|
||||
|
||||
@ -1179,4 +1193,11 @@ public class ParsedTemplateMap implements LongFunction<Map<String, ?>>, StaticFi
|
||||
return prototype;
|
||||
}
|
||||
|
||||
public List<BindPoint> getBindPoints() {
|
||||
return this.bindpoints;
|
||||
}
|
||||
|
||||
public Map<String,String> getBindings() {
|
||||
return this.bindings;
|
||||
}
|
||||
}
|
||||
|
@ -36,8 +36,11 @@ public class OrderedMapBinder implements LongFunction<Map<String, Object>> {
|
||||
} else if (cmd.isDynamic(field)) {
|
||||
bindermap.put(field,cmd.getMapper(field));
|
||||
protomap.put(field,null);
|
||||
} else if (cmd.getBindings().containsKey(field)){
|
||||
bindermap.put(field,cmd.getMapper(field));
|
||||
protomap.put(field,null);
|
||||
} else {
|
||||
throw new OpConfigError("There was no field named " + field + " while building a MapBinder");
|
||||
throw new OpConfigError("There was no field named '" + field + "' while building a MapBinder");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,13 +2,13 @@ package io.nosqlbench.virtdata.core.templates;
|
||||
|
||||
/*
|
||||
* Copyright (c) nosqlbench
|
||||
*
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
@ -22,6 +22,8 @@ import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class CapturePoints<TYPE> extends ArrayList<CapturePoint<TYPE>> {
|
||||
|
||||
@ -34,4 +36,11 @@ public class CapturePoints<TYPE> extends ArrayList<CapturePoint<TYPE>> {
|
||||
public boolean isGlob() {
|
||||
return this.size()==1 && this.get(0).getSourceName().equals("*");
|
||||
}
|
||||
|
||||
public List<String> getSourceNames() {
|
||||
return this.stream().map(cp -> cp.getAsName()).toList();
|
||||
}
|
||||
public List<String> getAsNames() {
|
||||
return this.stream().map(cp -> cp.getAsName()).toList();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user