allow single ops to be specified on CLI

This commit is contained in:
Jonathan Shook 2020-07-15 11:44:22 -05:00
parent 8e8591ea02
commit 74ea7ee14b
4 changed files with 55 additions and 20 deletions

View File

@ -17,6 +17,8 @@
package io.nosqlbench.engine.api.activityconfig;
import io.nosqlbench.engine.api.activityconfig.rawyaml.RawStmtDef;
import io.nosqlbench.engine.api.activityconfig.rawyaml.RawStmtsDoc;
import io.nosqlbench.engine.api.activityconfig.rawyaml.RawStmtsDocList;
import io.nosqlbench.engine.api.activityconfig.rawyaml.RawStmtsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
@ -25,6 +27,7 @@ import io.nosqlbench.nb.api.content.Content;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.function.Function;
public class StatementsLoader {
@ -67,6 +70,15 @@ public class StatementsLoader {
return new StmtsDocList(list);
}
public static StmtsDocList loadStmt(
Logger logger,
String statement, Function<String,String> transformer) {
String transformed = transformer.apply(statement);
RawStmtsDocList rawStmtsDocList = RawStmtsDocList.forSingleStatement(transformed);
return new StmtsDocList(rawStmtsDocList);
}
public static StmtsDocList loadPath(
Logger logger,
String path,

View File

@ -39,6 +39,12 @@ public class RawStmtsDoc extends StatementsOwner {
public RawStmtsDoc() {
}
public static RawStmtsDoc forSingleStatement(String statement) {
RawStmtsDoc rawStmtsDoc = new RawStmtsDoc();
rawStmtsDoc.setStatementsFieldByObjectType(statement);
return rawStmtsDoc;
}
public void setFieldsByReflection(Map<String, Object> properties) {
Object blocksObjects = properties.remove("blocks");
if (blocksObjects instanceof List) {

View File

@ -27,6 +27,11 @@ public class RawStmtsDocList {
this.rawStmtsDocList = rawStmtsDocList;
}
public static RawStmtsDocList forSingleStatement(String statement) {
RawStmtsDoc rawStmtsDoc = RawStmtsDoc.forSingleStatement(statement);
return new RawStmtsDocList(List.of(rawStmtsDoc));
}
public List<RawStmtsDoc> getStmtsDocs() {
return rawStmtsDocList;
}

View File

@ -14,6 +14,7 @@ import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDoc;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.templating.CommandTemplate;
@ -207,7 +208,7 @@ public class SimpleActivity implements Activity {
@Override
public Timer getResultTimer() {
return ActivityMetrics.timer(getActivityDef(),"result");
return ActivityMetrics.timer(getActivityDef(), "result");
}
@Override
@ -238,7 +239,7 @@ public class SimpleActivity implements Activity {
.map(RateSpec::new)
.ifPresent(spec -> strideLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "strides", strideLimiter, spec));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate","rate")
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(RateSpec::new).ifPresent(
spec -> cycleLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "cycles", cycleLimiter, spec));
@ -249,9 +250,10 @@ public class SimpleActivity implements Activity {
}
/**
* Modify the provided ActivityDef with defaults for stride and cycles, if
* they haven't been provided, based on the length of the sequence as determined
* by the provided ratios. Also, modify the ActivityDef with reasonable defaults when requested.
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
* defaults when requested.
*
* @param seq - The {@link OpSequence} to derive the defaults from
*/
public void setDefaultsFromOpSequence(OpSequence<?> seq) {
@ -295,9 +297,9 @@ public class SimpleActivity implements Activity {
String spec = threadSpec.get();
int processors = Runtime.getRuntime().availableProcessors();
if (spec.toLowerCase().equals("auto")) {
int threads = processors*10;
if (threads>activityDef.getCycleCount()) {
threads=(int) activityDef.getCycleCount();
int threads = processors * 10;
if (threads > activityDef.getCycleCount()) {
threads = (int) activityDef.getCycleCount();
logger.info("setting threads to " + threads + " (auto) [10xCORES, cycle count limited]");
} else {
logger.info("setting threads to " + threads + " (auto) [10xCORES]");
@ -306,20 +308,20 @@ public class SimpleActivity implements Activity {
} else if (spec.toLowerCase().matches("\\d+x")) {
String multiplier = spec.substring(0, spec.length() - 1);
int threads = processors * Integer.parseInt(multiplier);
logger.info("setting threads to " + threads + " (" + multiplier +"x)");
logger.info("setting threads to " + threads + " (" + multiplier + "x)");
activityDef.setThreads(threads);
} else if (spec.toLowerCase().matches("\\d+")) {
logger.info("setting threads to " + spec + "(direct)");
activityDef.setThreads(Integer.parseInt(spec));
}
if (activityDef.getThreads()>activityDef.getCycleCount()) {
logger.warn("threads="+activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary()
+ ", you should have more cycles than threads.");
if (activityDef.getThreads() > activityDef.getCycleCount()) {
logger.warn("threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary()
+ ", you should have more cycles than threads.");
}
} else {
if (cycleCount>1000) {
if (cycleCount > 1000) {
logger.warn("For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
@ -330,24 +332,33 @@ public class SimpleActivity implements Activity {
}
}
protected <O> OpSequence<O> createOpSequenceFromCommands(Function<CommandTemplate,O> opinit) {
Function<OpTemplate,CommandTemplate> f = CommandTemplate::new;
protected <O> OpSequence<O> createOpSequenceFromCommands(Function<CommandTemplate, O> opinit) {
Function<OpTemplate, CommandTemplate> f = CommandTemplate::new;
Function<OpTemplate, O> opTemplateOFunction = f.andThen(opinit);
return createOpSequence(opTemplateOFunction);
}
protected <O> OpSequence<O> createOpSequence(Function<OpTemplate,O> opinit) {
protected <O> OpSequence<O> createOpSequence(Function<OpTemplate, O> opinit) {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
StrInterpolator interp = new StrInterpolator(activityDef);
String yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload").orElse("default");
StmtsDocList stmtsDocList = StatementsLoader.loadPath(logger, yaml_loc, interp, "activities");
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<O> planner = new SequencePlanner<>(sequencerType);
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
StmtsDocList stmtsDocList = null;
Optional<String> stmt = activityDef.getParams().getOptionalString("op", "stmt", "statement");
Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (stmt.isPresent()) {
stmtsDocList = StatementsLoader.loadStmt(logger, stmt.get(), interp);
} else if (op_yaml_loc.isPresent()) {
stmtsDocList = StatementsLoader.loadPath(logger, op_yaml_loc.get(), interp, "activities");
}
List<OpTemplate> stmts = stmtsDocList.getStmts(tagfilter);
if (stmts.size() == 0) {
@ -361,6 +372,7 @@ public class SimpleActivity implements Activity {
O driverSpecificOp = opinit.apply(optemplate);
planner.addOp(driverSpecificOp, ratio);
}
return planner.resolve();
}