OpSource API improvements

This commit is contained in:
Jonathan Shook 2021-07-06 11:12:23 -05:00
parent 78a270bd2e
commit 45b3a123ea
2 changed files with 52 additions and 29 deletions

View File

@ -1,5 +1,7 @@
package io.nosqlbench.engine.api.activityapi.planning;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import java.util.function.LongFunction;
/**
@ -10,6 +12,10 @@ import java.util.function.LongFunction;
*/
public interface OpSource<T> extends LongFunction<T> {
static <O extends Runnable> OpSource<O> of(OpSequence<OpDispenser<O>> seq) {
return (long l) -> seq.apply(l).apply(l);
}
/**
* Get the next operation for the given long value. This is simply
* the offset indicated by the offset sequence array at a modulo

View File

@ -20,17 +20,16 @@ import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.input.ProgressCapable;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import io.nosqlbench.engine.api.templating.ParsedCommand;
import io.nosqlbench.engine.api.templating.StrInterpolator;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.nb.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
@ -63,15 +62,15 @@ public class SimpleActivity implements Activity, ProgressCapable {
this.activityDef = activityDef;
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload",
"yaml"
"workload",
"yaml"
);
if (workloadOpt.isPresent()) {
activityDef.getParams().set("alias", workloadOpt.get());
} else {
activityDef.getParams().set("alias",
activityDef.getActivityType().toUpperCase(Locale.ROOT)
+ String.valueOf(nameEnumerator++));
activityDef.getActivityType().toUpperCase(Locale.ROOT)
+ String.valueOf(nameEnumerator++));
}
}
}
@ -86,8 +85,8 @@ public class SimpleActivity implements Activity, ProgressCapable {
}
public synchronized NBErrorHandler getErrorHandler() {
if (errorHandler==null) {
errorHandler=new NBErrorHandler(
if (errorHandler == null) {
errorHandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
() -> getExceptionMetrics(),
getErrorNameMapper());
@ -246,7 +245,6 @@ public class SimpleActivity implements Activity, ProgressCapable {
}
@Override
public Timer getResultTimer() {
return ActivityMetrics.timer(getActivityDef(), "result");
@ -275,7 +273,7 @@ public class SimpleActivity implements Activity, ProgressCapable {
@Override
public synchronized PrintWriter getConsoleOut() {
if (this.console==null) {
if (this.console == null) {
this.console = new PrintWriter(System.out);
}
return this.console;
@ -308,11 +306,11 @@ public class SimpleActivity implements Activity, ProgressCapable {
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(RateSpec::new).ifPresent(
spec -> cycleLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "cycles", cycleLimiter, spec));
spec -> cycleLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "cycles", cycleLimiter, spec));
activityDef.getParams().getOptionalNamedParameter("phaserate")
.map(RateSpec::new)
.ifPresent(spec -> phaseLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "phases", phaseLimiter, spec));
.map(RateSpec::new)
.ifPresent(spec -> phaseLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "phases", phaseLimiter, spec));
}
@ -339,15 +337,15 @@ public class SimpleActivity implements Activity, ProgressCapable {
} else {
if (getActivityDef().getCycleCount() == 0) {
throw new RuntimeException(
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
);
}
long stride = getParams().getOptionalLong("stride").orElseThrow();
long cycles = getActivityDef().getCycleCount();
if (cycles < stride) {
throw new RuntimeException(
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
" If this was intended, then set stride low enough to allow it."
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
" If this was intended, then set stride low enough to allow it."
);
}
}
@ -355,7 +353,7 @@ public class SimpleActivity implements Activity, ProgressCapable {
long cycleCount = getActivityDef().getCycleCount();
long stride = getActivityDef().getParams().getOptionalLong("stride").orElseThrow();
if (stride>0 && (cycleCount % stride) != 0) {
if (stride > 0 && (cycleCount % stride) != 0) {
logger.warn("The stride does not evenly divide cycles. Only full strides will be executed," +
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ")");
}
@ -385,15 +383,15 @@ public class SimpleActivity implements Activity, ProgressCapable {
if (activityDef.getThreads() > activityDef.getCycleCount()) {
logger.warn("threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary()
+ ", you should have more cycles than threads.");
+ ", you should have more cycles than threads.");
}
} else {
if (cycleCount > 1000) {
logger.warn("For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
}
@ -426,6 +424,15 @@ public class SimpleActivity implements Activity, ProgressCapable {
return createOpSequence(opTemplateOFunction);
}
protected <O> OpSequence<OpDispenser<O>> createOpSourceFromCommands(
Function<ParsedCommand, OpDispenser<O>> opinit,
List<Function<Map<String, Object>, Map<String, Object>>> parsers
) {
Function<OpTemplate, ParsedCommand> f = t -> new ParsedCommand(t, parsers);
Function<OpTemplate, OpDispenser<O>> opTemplateOFunction = f.andThen(opinit);
return createOpSequence(opTemplateOFunction);
}
/**
* Given a function that can create an op of type <O> from an OpTemplate, generate
* an indexed sequence of ready to call operations.
@ -460,16 +467,21 @@ public class SimpleActivity implements Activity, ProgressCapable {
StmtsDocList stmtsDocList = null;
String workloadSource = "unspecified";
Optional<String> stmt = activityDef.getParams().getOptionalString("op", "stmt", "statement");
Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (stmt.isPresent()) {
stmtsDocList = StatementsLoader.loadStmt(logger, stmt.get(), interp);
workloadSource = "commandline:" + stmt.get();
} else if (op_yaml_loc.isPresent()) {
stmtsDocList = StatementsLoader.loadPath(logger, op_yaml_loc.get(), interp, "activities");
workloadSource = "yaml:" + op_yaml_loc.get();
}
List<OpTemplate> stmts = stmtsDocList.getStmts(tagfilter);
List<Long> ratios = new ArrayList<>(stmts.size());
for (int i = 0; i < stmts.size(); i++) {
OpTemplate opTemplate = stmts.get(i);
long ratio = opTemplate.removeParamOrDefault("ratio", 1);
@ -480,11 +492,16 @@ public class SimpleActivity implements Activity, ProgressCapable {
throw new BasicError("There were no active statements with tag filter '" + tagfilter + "'");
}
for (int i = 0; i < stmts.size(); i++) {
long ratio = ratios.get(i);
OpTemplate optemplate = stmts.get(i);
OpDispenser<O> driverSpecificReadyOp = opinit.apply(optemplate);
planner.addOp(driverSpecificReadyOp, ratio);
try {
for (int i = 0; i < stmts.size(); i++) {
long ratio = ratios.get(i);
OpTemplate optemplate = stmts.get(i);
OpDispenser<O> driverSpecificReadyOp = opinit.apply(optemplate);
planner.addOp(driverSpecificReadyOp, ratio);
}
} catch (Exception e) {
throw new OpConfigError("error while configuring op",workloadSource,e);
}
return planner.resolve();
@ -505,6 +522,7 @@ public class SimpleActivity implements Activity, ProgressCapable {
* Activities with retryable operations (when specified with the retry error handler for some
* types of error), should allow the user to specify how many retries are allowed before
* giving up on the operation.
*
* @return The number of allowable retries
*/
@Override
@ -513,5 +531,4 @@ public class SimpleActivity implements Activity, ProgressCapable {
}
}