add dryrun=emit support

This commit is contained in:
Jonathan Shook 2023-12-09 23:29:27 -06:00
parent 3547c04c41
commit 5ab00a2c11
5 changed files with 197 additions and 98 deletions

View File

@ -99,7 +99,7 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
this.metrics = metrics;
}
public final List<Row> apply(long cycle) {
public final ArrayList<Row> apply(long cycle) {
Statement<?> statement = getStmt();
logger.trace(() -> "apply() invoked, statement obtained, executing async with page size: " + statement.getPageSize() + " thread local rows: ");
@ -118,7 +118,7 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
});
try {
return rowsStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
return new PrintableRowList(rowsStage.toCompletableFuture().get(300, TimeUnit.SECONDS));
} catch (ExecutionException exe) {
Throwable ee = exe.getCause();
if (ee instanceof RuntimeException re) {
@ -142,6 +142,21 @@ public abstract class Cqld4CqlOp implements CycleOp<List<Row>>, VariableCapture,
// processors.flush();
}
private static class PrintableRowList extends ArrayList<Row> {
public PrintableRowList(List<Row> values) {
super(values);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Row row : this) {
sb.append(row.getFormattedContents());
}
return sb.toString();
}
}
// private BiFunction<AsyncResultSet,Throwable> handler
@Override
public Op getNextOp() {

View File

@ -176,7 +176,7 @@ public abstract class BaseDriverAdapter<R extends Op, S> extends NBBaseComponent
.add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))
.add(Param.optional("driver", String.class))
.add(Param.defaultTo("dryrun", "none").setRegex("(op|jsonnet|none)"))
.add(Param.defaultTo("dryrun", "none").setRegex("(op|jsonnet|emit|none)"))
.add(Param.optional("maxtries", Integer.class))
.asReadOnly();
}

View File

@ -0,0 +1,34 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapters.api.activityimpl.uniform;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
public class EmitterOp implements CycleOp<Object> {
private final CycleOp<?> cycleOp;
public EmitterOp(CycleOp<?> cycleOp) {
this.cycleOp = cycleOp;
}
@Override
public Object apply(long value) {
Object result = cycleOp.apply(value);
System.out.println("result from cycle " + value + ":\n"+result);
return result;
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapters.api.activityimpl.uniform;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.adapters.api.templating.ParsedOp;
public class EmitterOpDispenserWrapper extends BaseOpDispenser<Op, Object> {
private final OpDispenser<? extends CycleOp<?>> realDispenser;
public EmitterOpDispenserWrapper(DriverAdapter<Op,Object> adapter, ParsedOp pop, OpDispenser<? extends CycleOp<?>> realDispenser) {
super(adapter, pop);
this.realDispenser = realDispenser;
}
@Override
public EmitterOp apply(long cycle) {
CycleOp<?> cycleOp = realDispenser.apply(cycle);
return new EmitterOp(cycleOp);
}
}

View File

@ -16,6 +16,8 @@
package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.adapters.api.activityimpl.uniform.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@ -56,12 +58,10 @@ import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.io.PrintWriter;
import java.lang.reflect.AnnotatedType;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* A default implementation of an Activity, suitable for building upon.
@ -71,7 +71,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
protected ActivityDef activityDef;
private final List<AutoCloseable> closeables = new ArrayList<>();
private MotorDispenser motorDispenser;
private MotorDispenser<?> motorDispenser;
private InputDispenser inputDispenser;
private ActionDispenser actionDispenser;
private OutputDispenser markerDispenser;
@ -90,26 +90,26 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
private final RunStateTally tally = new RunStateTally();
public SimpleActivity(NBComponent parent, ActivityDef activityDef) {
super(parent,NBLabels.forKV("activity",activityDef.getAlias()).and(activityDef.auxLabels()));
super(parent, NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels()));
this.activityDef = activityDef;
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload",
"yaml"
"workload",
"yaml"
);
if (workloadOpt.isPresent()) {
activityDef.getParams().set("alias", workloadOpt.get());
} else {
activityDef.getParams().set("alias",
activityDef.getActivityType().toUpperCase(Locale.ROOT)
+ nameEnumerator);
activityDef.getActivityType().toUpperCase(Locale.ROOT)
+ nameEnumerator);
nameEnumerator++;
}
}
}
public SimpleActivity(NBComponent parent, String activityDefString) {
this(parent,ActivityDef.parseActivityDef(activityDefString));
this(parent, ActivityDef.parseActivityDef(activityDefString));
}
@Override
@ -120,8 +120,8 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
public synchronized NBErrorHandler getErrorHandler() {
if (null == this.errorHandler) {
errorHandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
() -> getExceptionMetrics());
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics);
}
return errorHandler;
}
@ -200,7 +200,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
}
public String toString() {
return (activityDef!=null ? activityDef.getAlias() : "unset_alias" ) + ':' + this.runState + ':' + this.tally ;
return (activityDef != null ? activityDef.getAlias() : "unset_alias") + ':' + this.runState + ':' + this.tally;
}
@Override
@ -306,16 +306,17 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
.map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
}
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec);
}
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec);
}
@ -326,7 +327,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
* defaults when requested.
*
* @param seq
* - The {@link OpSequence} to derive the defaults from
* - The {@link OpSequence} to derive the defaults from
*/
public synchronized void setDefaultsFromOpSequence(OpSequence<?> seq) {
Optional<String> strideOpt = getParams().getOptionalString("stride");
@ -349,15 +350,15 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
} else {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
);
}
long stride = getParams().getOptionalLong("stride").orElseThrow();
long cycles = this.activityDef.getCycleCount();
if (cycles < stride) {
throw new RuntimeException(
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
" If this was intended, then set stride low enough to allow it."
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
" If this was intended, then set stride low enough to allow it."
);
}
}
@ -367,7 +368,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
if (0 < stride && 0 != cycleCount % stride) {
logger.warn(() -> "The stride does not evenly divide cycles. Only full strides will be executed," +
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
}
Optional<String> threadSpec = activityDef.getParams().getOptionalString("threads");
@ -398,43 +399,42 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
if (activityDef.getThreads() > activityDef.getCycleCount()) {
logger.warn(() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary()
+ ", you should have more cycles than threads.");
+ ", you should have more cycles than threads.");
}
} else if (1000 < cycleCount) {
logger.warn(() -> "For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
}
if (0 < this.activityDef.getCycleCount() && 0 == seq.getOps().size()) {
if (0 < this.activityDef.getCycleCount() && seq.getOps().isEmpty()) {
throw new BasicError("You have configured a zero-length sequence and non-zero cycles. Tt is not possible to continue with this activity.");
}
}
protected <O extends Op> OpSequence<OpDispenser<? extends O>> createOpSourceFromParsedOps(
Map<String, DriverAdapter> adapterCache,
Map<String, OpMapper<Op>> mapperCache,
List<DriverAdapter> adapters,
List<ParsedOp> pops
// Map<String, DriverAdapter<?,?>> adapterCache,
// Map<String, OpMapper<? extends Op>> mapperCache,
List<DriverAdapter<?,?>> adapters,
List<ParsedOp> pops
) {
try {
List<Long> ratios = new ArrayList<>(pops.size());
for (int i = 0; i < pops.size(); i++) {
ParsedOp pop = pops.get(i);
for (ParsedOp pop : pops) {
long ratio = pop.takeStaticConfigOr("ratio", 1);
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
int dryrunCount = 0;
@ -445,16 +445,21 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
logger.info(() -> "skipped mapping op '" + pop.getName() + '\'');
continue;
}
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
boolean dryrun = "op".equalsIgnoreCase(dryrunSpec);
DriverAdapter adapter = adapters.get(i);
OpMapper opMapper = adapter.getOpMapper();
DriverAdapter<?,?> adapter = adapters.get(i);
OpMapper<? extends Op> opMapper = adapter.getOpMapper();
OpDispenser<? extends Op> dispenser = opMapper.apply(pop);
if (dryrun) {
dispenser = new DryRunOpDispenserWrapper(adapter, pop, dispenser);
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
if ("op".equalsIgnoreCase(dryrunSpec)) {
dispenser = new DryRunOpDispenserWrapper((DriverAdapter<Op,Object>)adapter, pop, dispenser);
dryrunCount++;
} else if ("emit".equalsIgnoreCase(dryrunSpec)) {
dispenser = new EmitterOpDispenserWrapper(
(DriverAdapter<Op,Object>)adapter,
pop,
(OpDispenser<? extends CycleOp<?>>) dispenser
);
}
// if (strict) {
@ -475,7 +480,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
}
protected List<OpTemplate> loadOpTemplates(Optional<DriverAdapter<?,?>> defaultDriverAdapter) {
protected List<OpTemplate> loadOpTemplates(DriverAdapter<?, ?> defaultDriverAdapter) {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
@ -491,44 +496,44 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
// There were no ops, and it was because they were all filtered out
if (!unfilteredOps.isEmpty()) {
throw new BasicError("There were no active op templates with tag filter '"
+ tagfilter + "', since all " + unfilteredOps.size() + " were filtered out.");
+ tagfilter + "', since all " + unfilteredOps.size() + " were filtered out.");
}
if (defaultDriverAdapter.isPresent() && defaultDriverAdapter.get() instanceof SyntheticOpTemplateProvider sotp) {
if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
filteredOps = sotp.getSyntheticOpTemplates(opsDocList, this.activityDef.getParams());
Objects.requireNonNull(filteredOps);
if (filteredOps.isEmpty()) {
throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.get().getAdapterName() + '\'' +
" but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' +
" but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
}
} else {
throw new BasicError("""
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
}
if (filteredOps.isEmpty()) {
throw new BasicError("There were no active op templates with tag filter '" + tagfilter + '\'');
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
}
}
// if (filteredOps.isEmpty()) {
// throw new BasicError("There were no active op templates with tag filter '" + tagfilter + '\'');
// }
if (filteredOps.isEmpty()) {
throw new OpConfigError("No op templates found. You must provide either workload=... or op=..., or use " +
"a default driver (driver=___). This includes " +
ServiceLoader.load(DriverAdapter.class).stream()
.filter(p -> {
AnnotatedType[] annotatedInterfaces = p.type().getAnnotatedInterfaces();
for (AnnotatedType ai : annotatedInterfaces) {
if (ai.getType().equals(SyntheticOpTemplateProvider.class)) {
return true;
}
}
return false;
})
.map(d -> d.get().getAdapterName())
.collect(Collectors.joining(",")));
}
// if (filteredOps.isEmpty()) {
// throw new OpConfigError("No op templates found. You must provide either workload=... or op=..., or use " +
// "a default driver (driver=___). This includes " +
// ServiceLoader.load(DriverAdapter.class).stream()
// .filter(p -> {
// AnnotatedType[] annotatedInterfaces = p.type().getAnnotatedInterfaces();
// for (AnnotatedType ai : annotatedInterfaces) {
// if (ai.getType().equals(SyntheticOpTemplateProvider.class)) {
// return true;
// }
// }
// return false;
// })
// .map(d -> d.get().getAdapterName())
// .collect(Collectors.joining(",")));
// }
//
return filteredOps;
}
@ -550,30 +555,29 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
* </OL>
*
* @param <O>
* A holder for an executable operation for the native driver used by this activity.
* A holder for an executable operation for the native driver used by this activity.
* @param opinit
* A function to map an OpTemplate to the executable operation form required by
* the native driver for this activity.
* @param defaultAdapter
* A function to map an OpTemplate to the executable operation form required by
* the native driver for this activity.
* @param defaultAdapter The adapter which will be used for any op templates with no explicit adapter
* @return The sequence of operations as determined by filtering and ratios
*/
@Deprecated(forRemoval = true)
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(Function<OpTemplate, OpDispenser<? extends O>> opinit, boolean strict, Optional<DriverAdapter<?,?>> defaultAdapter) {
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(Function<OpTemplate, OpDispenser<? extends O>> opinit, boolean strict, DriverAdapter<?, ?> defaultAdapter) {
var stmts = loadOpTemplates(defaultAdapter);
List<Long> ratios = new ArrayList<>(stmts.size());
for (int i = 0; i < stmts.size(); i++) {
OpTemplate opTemplate = stmts.get(i);
for (OpTemplate opTemplate : stmts) {
long ratio = opTemplate.removeParamOrDefault("ratio", 1);
ratios.add(ratio);
}
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
try {
@ -599,29 +603,37 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
String op = activityDef.getParams().getOptionalString("op").orElse(null);
String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(null);
String workload = activityDef.getParams().getOptionalString("workload").orElse(null);
if ((op!=null ? 1 : 0) + (stmt!=null ? 1 : 0) + (workload!=null ? 1 : 0) > 1) {
if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) {
throw new OpConfigError("Only op, statement, or workload may be provided, not more than one.");
}
if (op!=null && op.matches("^\\{[^}]+:[^}]+}$(?s)(?m)")) {
workloadSource = "commandline: (op/json): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null);
} else if (op!=null && op.matches("^\\[[^]]+]$")) {
workloadSource = "commandline: (op/json): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null);
} else if (op!=null) {
workloadSource = "commandline: (op/inline): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null);
} else if (stmt!=null) {
if (workload != null && OpsLoader.isJson(workload)) {
workloadSource = "commandline: (workload/json):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.json, activityDef.getParams(), null);
} else if (workload != null && OpsLoader.isYaml(workload)) {
workloadSource = "commandline: (workload/yaml):" + workload;
return OpsLoader.loadString(workload, OpTemplateFormat.yaml, activityDef.getParams(), null);
} else if (workload != null) {
return OpsLoader.loadPath(workload, activityDef.getParams(), "activities");
}
if (stmt != null) {
workloadSource = "commandline: (stmt/inline): '" + stmt + "'";
return OpsLoader.loadString(stmt, OpTemplateFormat.inline, activityDef.getParams(), null);
} else if (workload!=null) {
workloadSource = "yaml:" + workload;
return OpsLoader.loadPath(workload, activityDef.getParams(), "activities");
} else {
return OpsDocList.none();
}
if (op != null && OpsLoader.isJson(op)) {
workloadSource = "commandline: (op/json): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.json, activityDef.getParams(), null);
}
else if (op != null) {
workloadSource = "commandline: (op/inline): '" + op + "'";
return OpsLoader.loadString(op, OpTemplateFormat.inline, activityDef.getParams(), null);
}
return OpsDocList.none();
} catch (Exception e) {
throw new OpConfigError("Error loading op templates: " + e, workloadSource, e);
}