mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
specialize op template collection into a more useful type
This commit is contained in:
parent
638095a82a
commit
a8bbd9f923
@ -20,6 +20,7 @@ package io.nosqlbench.adapter.diag;
|
||||
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
|
||||
@ -107,9 +108,12 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> impl
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> params) {
|
||||
return OpsLoader.loadString("noop: noop", OpTemplateFormat.inline, params,null).getOps(true);
|
||||
// return OpsLoader.loadString("log:level=INFO", OpTemplateFormat.inline, params,null).getOps();
|
||||
public OpTemplates getSyntheticOpTemplates(
|
||||
OpTemplates opTemplates,
|
||||
Map<String, Object> params) {
|
||||
OpTemplates matching = OpsLoader.loadString(
|
||||
"noop: noop", OpTemplateFormat.inline, params, null).getOps().matching("", true);
|
||||
return matching;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ public class HttpOpMapperTest {
|
||||
|
||||
private static ParsedOp parsedOpFor(final String yaml) {
|
||||
final OpsDocList docs = OpsLoader.loadString(yaml, OpTemplateFormat.yaml, Map.of(), null);
|
||||
final OpTemplate opTemplate = docs.getOps(true).get(0);
|
||||
final OpTemplate opTemplate = docs.getOps().get(0);
|
||||
final ParsedOp parsedOp = new ParsedOp(opTemplate, HttpOpMapperTest.cfg, List.of(HttpOpMapperTest.adapter.getPreprocessor()), new TestComponent("parent","parent"));
|
||||
return parsedOp;
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package io.nosqlbench.adapter.stdout;
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpData;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
|
||||
@ -48,7 +49,7 @@ public class StdoutDriverAdapter extends BaseDriverAdapter<StdoutOp, StdoutSpace
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpMapper<StdoutOp,StdoutSpace> getOpMapper() {
|
||||
public OpMapper<StdoutOp, StdoutSpace> getOpMapper() {
|
||||
return new StdoutOpMapper(this);
|
||||
}
|
||||
|
||||
@ -59,57 +60,57 @@ public class StdoutDriverAdapter extends BaseDriverAdapter<StdoutOp, StdoutSpace
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return ConfigModel.of(this.getClass())
|
||||
.add(super.getConfigModel())
|
||||
.add(StdoutSpace.getConfigModel());
|
||||
return ConfigModel.of(this.getClass()).add(super.getConfigModel()).add(
|
||||
StdoutSpace.getConfigModel());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
|
||||
Set<String> activeBindingNames = new LinkedHashSet<>(opsDocList.getDocBindings().keySet());
|
||||
public OpTemplates getSyntheticOpTemplates(OpTemplates opTempl, Map<String, Object> cfg) {
|
||||
Set<String> activeBindingNames = new LinkedHashSet<>(opTempl.getDocBindings().keySet());
|
||||
|
||||
if (activeBindingNames.isEmpty()) {
|
||||
logger.warn("Unable to synthesize op for driver=" + this.getAdapterName() + " with zero bindings.");
|
||||
return List.of();
|
||||
logger.warn(
|
||||
"Unable to synthesize op for driver=" + this.getAdapterName() + " with zero bindings.");
|
||||
return new OpTemplates(List.of(),OpsDocList.none());
|
||||
}
|
||||
|
||||
String bindings = Optional.ofNullable(cfg.get("bindings")).map(Object::toString).orElse("doc");
|
||||
Pattern bindingsFilter = Pattern.compile(bindings.equalsIgnoreCase("doc") ? ".*" : bindings);
|
||||
String bindings = Optional.ofNullable(cfg.get("bindings")).map(Object::toString).orElse(
|
||||
"doc");
|
||||
Pattern bindingsFilter = Pattern.compile(
|
||||
bindings.equalsIgnoreCase("doc") ? ".*" : bindings);
|
||||
|
||||
Set<String> filteredBindingNames = activeBindingNames
|
||||
.stream()
|
||||
.filter(n -> {
|
||||
if (bindingsFilter.matcher(n).matches()) {
|
||||
logger.trace(() -> "bindings filter kept binding '" + n + "'");
|
||||
return true;
|
||||
} else {
|
||||
logger.trace(() -> "bindings filter removed binding '" + n + "'");
|
||||
return false;
|
||||
}
|
||||
})
|
||||
.collect(Collectors.toSet());
|
||||
Set<String> filteredBindingNames = activeBindingNames.stream().filter(n -> {
|
||||
if (bindingsFilter.matcher(n).matches()) {
|
||||
logger.trace(() -> "bindings filter kept binding '" + n + "'");
|
||||
return true;
|
||||
} else {
|
||||
logger.trace(() -> "bindings filter removed binding '" + n + "'");
|
||||
return false;
|
||||
}
|
||||
}).collect(Collectors.toSet());
|
||||
|
||||
if (filteredBindingNames.isEmpty()) {
|
||||
logger.warn("Unable to synthesize op for driver="+getAdapterName()+" when " + activeBindingNames.size()+"/"+activeBindingNames.size() + " bindings were filtered out with bindings=" + bindings);
|
||||
return List.of();
|
||||
logger.warn(
|
||||
"Unable to synthesize op for driver=" + getAdapterName() + " when " + activeBindingNames.size() + "/" + activeBindingNames.size() + " bindings were filtered out with bindings=" + bindings);
|
||||
return new OpTemplates(List.of(),OpsDocList.none());
|
||||
|
||||
}
|
||||
|
||||
OpData op = new OpData("synthetic", "synthetic", Map.of(), opsDocList.getDocBindings(), cfg,
|
||||
Map.of("stmt", genStatementTemplate(filteredBindingNames, cfg)),200);
|
||||
OpData op = new OpData(
|
||||
"synthetic", "synthetic", Map.of(), opTempl.getDocBindings(), cfg,
|
||||
Map.of("stmt", genStatementTemplate(filteredBindingNames, cfg)), 200
|
||||
);
|
||||
|
||||
return List.of(op);
|
||||
return new OpTemplates(List.of(op),OpsDocList.none());
|
||||
}
|
||||
|
||||
private String genStatementTemplate(Set<String> keySet, Map<String, Object> cfg) {
|
||||
TemplateFormat format = Optional.ofNullable(cfg.get("format"))
|
||||
.map(Object::toString)
|
||||
.map(TemplateFormat::valueOf)
|
||||
.orElse(TemplateFormat.assignments);
|
||||
TemplateFormat format = Optional.ofNullable(cfg.get("format")).map(Object::toString).map(
|
||||
TemplateFormat::valueOf).orElse(TemplateFormat.assignments);
|
||||
|
||||
boolean ensureNewline = Optional.ofNullable(cfg.get("newline"))
|
||||
.map(Object::toString)
|
||||
.map(Boolean::valueOf)
|
||||
.orElse(true);
|
||||
boolean ensureNewline = Optional.ofNullable(cfg.get("newline")).map(Object::toString).map(
|
||||
Boolean::valueOf).orElse(true);
|
||||
|
||||
String stmtTemplate = format.format(ensureNewline, new ArrayList<>(keySet));
|
||||
return stmtTemplate;
|
||||
|
@ -17,6 +17,7 @@
|
||||
package io.nosqlbench.adapter.tcpclient;
|
||||
|
||||
import io.nosqlbench.adapter.stdout.StdoutDriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.nb.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
@ -67,7 +68,9 @@ public class TcpClientDriverAdapter extends BaseDriverAdapter<TcpClientOp, TcpCl
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String,Object> cfg) {
|
||||
public OpTemplates getSyntheticOpTemplates(
|
||||
OpTemplates opsDocList,
|
||||
Map<String,Object> cfg) {
|
||||
return adap.getSyntheticOpTemplates(opsDocList, cfg);
|
||||
}
|
||||
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.tcpserver;
|
||||
import io.nosqlbench.adapter.stdout.StdoutDriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.nb.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
@ -65,7 +66,9 @@ public class TcpServerDriverAdapter extends BaseDriverAdapter<TcpServerOp, TcpSe
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String,Object> cfg) {
|
||||
public OpTemplates getSyntheticOpTemplates(
|
||||
OpTemplates opsDocList,
|
||||
Map<String,Object> cfg) {
|
||||
return adap.getSyntheticOpTemplates(opsDocList, cfg);
|
||||
}
|
||||
|
||||
|
@ -16,8 +16,10 @@
|
||||
|
||||
package io.nosqlbench.adapters.api.activityconfig.rawyaml;
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
@ -39,6 +41,13 @@ public class RawOpsDocList implements Iterable<RawOpsDoc> {
|
||||
return new RawOpsDocList(List.of());
|
||||
}
|
||||
|
||||
public static RawOpsDocList combine(RawOpsDocList l1, RawOpsDocList l2) {
|
||||
List<RawOpsDoc> rawOpsDocs = new ArrayList<>();
|
||||
rawOpsDocs.addAll(l1.getOpsDocs());
|
||||
rawOpsDocs.addAll(l2.getOpsDocs());
|
||||
return new RawOpsDocList(rawOpsDocs);
|
||||
}
|
||||
|
||||
public List<RawOpsDoc> getOpsDocs() {
|
||||
return rawOpsDocList;
|
||||
}
|
||||
|
@ -0,0 +1,92 @@
|
||||
package io.nosqlbench.adapters.api.activityconfig.yaml;
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpsDocList;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/// [OpTemplates] is a list of selected op templates and their backing data.
|
||||
///
|
||||
/// It is a value type which makes it easy to /// get matching subsets of op templates according to tag filters, to combine them, etc.
|
||||
///
|
||||
/// When a user selects an op template, they are expected to use the [TagFilter] mechanism.
|
||||
/// Any such lookup methods should be implemented on this class.
|
||||
public class OpTemplates implements Iterable<OpTemplate> {
|
||||
private final ArrayList<OpTemplate> templates = new ArrayList<>();
|
||||
private final static Logger logger = LogManager.getLogger(OpTemplates.class);
|
||||
private final OpsDocList opsDocList;
|
||||
|
||||
public OpTemplates(OpsDocList opsDocList) {
|
||||
opsDocList.getStmtDocs().stream().flatMap(d -> d.getOpTemplates().stream()).forEach(templates::add);
|
||||
this.opsDocList = opsDocList;
|
||||
}
|
||||
|
||||
public OpTemplates(List<OpTemplate> matchingOpTemplates, OpsDocList opsDocList) {
|
||||
this.opsDocList = opsDocList;
|
||||
templates.addAll(matchingOpTemplates);
|
||||
}
|
||||
|
||||
public OpTemplates() {
|
||||
this.opsDocList=new OpsDocList(new RawOpsDocList(List.of()));
|
||||
}
|
||||
|
||||
public OpTemplates and(OpTemplates other) {
|
||||
this.opsDocList.and(opsDocList);
|
||||
return new OpTemplates();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tagFilterSpec a comma-separated tag filter spec
|
||||
* @return The list of all included op templates for all included blocks of in this document,
|
||||
* including the inherited and overridden values from this doc and the parent block.
|
||||
*/
|
||||
public OpTemplates matching(String tagFilterSpec, boolean logit) {
|
||||
TagFilter ts = new TagFilter(tagFilterSpec);
|
||||
List<OpTemplate> matchingOpTemplates = new ArrayList<>();
|
||||
|
||||
List<String> matchlog = new ArrayList<>();
|
||||
templates.stream()
|
||||
.map(ts::matchesTaggedResult)
|
||||
.peek(r -> matchlog.add(r.getLog()))
|
||||
.filter(TagFilter.Result::matched)
|
||||
.map(TagFilter.Result::getElement)
|
||||
.forEach(matchingOpTemplates::add);
|
||||
|
||||
if (logit) {
|
||||
for (String s : matchlog) {
|
||||
logger.info(s);
|
||||
}
|
||||
}
|
||||
|
||||
return new OpTemplates(matchingOpTemplates,opsDocList);
|
||||
}
|
||||
|
||||
public Map<String,String> getDocBindings() {
|
||||
return opsDocList.getDocBindings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public @NotNull Iterator<OpTemplate> iterator() {
|
||||
return templates.iterator();
|
||||
}
|
||||
|
||||
public Stream<OpTemplate> stream() {
|
||||
return templates.stream();
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return templates.size();
|
||||
}
|
||||
|
||||
public OpTemplate get(int idx) {
|
||||
return templates.get(idx);
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return this.templates.isEmpty();
|
||||
}
|
||||
}
|
@ -43,6 +43,11 @@ public class OpsDocList implements Iterable<OpsDoc> {
|
||||
// this.applyModifier(new enumerator());
|
||||
}
|
||||
|
||||
private OpsDocList(RawOpsDocList rawOpsDocList, Map<String, String> templateVariables) {
|
||||
this.rawOpsDocList = rawOpsDocList;
|
||||
this.templateVariables.putAll(templateVariables);
|
||||
}
|
||||
|
||||
public static OpsDocList none() {
|
||||
return new OpsDocList(RawOpsDocList.none());
|
||||
}
|
||||
@ -60,41 +65,10 @@ public class OpsDocList implements Iterable<OpsDoc> {
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public List<OpTemplate> getOps(boolean logit) {
|
||||
return getOps("", logit);
|
||||
public OpTemplates getOps() {
|
||||
return new OpTemplates(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param tagFilterSpec a comma-separated tag filter spec
|
||||
* @return The list of all included op templates for all included blocks of in this document,
|
||||
* including the inherited and overridden values from this doc and the parent block.
|
||||
*/
|
||||
public List<OpTemplate> getOps(String tagFilterSpec, boolean logit) {
|
||||
TagFilter ts = new TagFilter(tagFilterSpec);
|
||||
List<OpTemplate> opTemplates = new ArrayList<>();
|
||||
|
||||
List<OpTemplate> rawtemplates = getStmtDocs().stream()
|
||||
.flatMap(d -> d.getOpTemplates().stream()).toList();
|
||||
|
||||
List<String> matchlog = new ArrayList<>();
|
||||
rawtemplates.stream()
|
||||
.map(ts::matchesTaggedResult)
|
||||
.peek(r -> matchlog.add(r.getLog()))
|
||||
.filter(TagFilter.Result::matched)
|
||||
.map(TagFilter.Result::getElement)
|
||||
.forEach(opTemplates::add);
|
||||
|
||||
if (logit) {
|
||||
for (String s : matchlog) {
|
||||
logger.info(s);
|
||||
}
|
||||
}
|
||||
|
||||
return opTemplates;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public Iterator<OpsDoc> iterator() {
|
||||
return getStmtDocs().iterator();
|
||||
@ -196,4 +170,13 @@ public class OpsDocList implements Iterable<OpsDoc> {
|
||||
return count;
|
||||
}
|
||||
|
||||
public OpsDocList and(OpsDocList other) {
|
||||
return new OpsDocList(
|
||||
RawOpsDocList.combine(this.rawOpsDocList,other.rawOpsDocList),
|
||||
new LinkedHashMap<>() {{
|
||||
putAll(templateVariables);
|
||||
putAll(other.templateVariables);
|
||||
}}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
package io.nosqlbench.adapters.api.activityimpl.uniform.decorators;
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.adapters.api.templating.DriverAdapterDecorators;
|
||||
|
||||
@ -41,10 +42,10 @@ public interface SyntheticOpTemplateProvider extends DriverAdapterDecorators {
|
||||
* If a driver adapter supports creating example op templates from bindings,
|
||||
* it must implement this method to do so.
|
||||
*
|
||||
* @param opsDocList
|
||||
* @param opTemplates
|
||||
* The existing doc structure, which should contain no fully defined op templates, but may contain other
|
||||
* elements like bindings
|
||||
* @return A list of op templates, size zero or more
|
||||
*/
|
||||
List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> params);
|
||||
OpTemplates getSyntheticOpTemplates(OpTemplates opTemplates, Map<String, Object> params);
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
|
||||
import io.nosqlbench.adapters.api.activityconfig.rawyaml.RawOpsLoader;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.nb.spectest.api.STAssemblyValidator;
|
||||
import io.nosqlbench.nb.spectest.core.STNodeAssembly;
|
||||
@ -110,7 +111,7 @@ public class YamlSpecValidator implements STAssemblyValidator {
|
||||
List<Map<String, Object>> expectedList = gson.fromJson(json, type);
|
||||
|
||||
OpsDocList stmtsDocs = OpsLoader.loadString(yaml, OpTemplateFormat.yaml, new HashMap<>(), null);
|
||||
List<OpTemplate> stmts = stmtsDocs.getOps(false);
|
||||
OpTemplates stmts = stmtsDocs.getOps();
|
||||
List<Map<String, Object>> stmt_objs = stmts.stream().map(OpTemplate::asData).collect(Collectors.toList());
|
||||
|
||||
try {
|
||||
|
@ -130,9 +130,9 @@ public class OpsDocListTest {
|
||||
|
||||
@Test
|
||||
public void testFilteredStmts() {
|
||||
List<OpTemplate> stmts = doclist.getOps("",true);
|
||||
OpTemplates stmts = doclist.getOps().matching("", true);
|
||||
Assertions.assertThat(stmts).hasSize(6);
|
||||
stmts = doclist.getOps("root1:value23",true);
|
||||
stmts = doclist.getOps().matching("root1:value23",true);
|
||||
Assertions.assertThat(stmts).hasSize(2);
|
||||
}
|
||||
|
||||
|
@ -81,8 +81,8 @@ public class ParsedOpTest {
|
||||
ps1: "param-one"
|
||||
""";
|
||||
final OpsDocList stmtsDocs = OpsLoader.loadString(opt, OpTemplateFormat.yaml, cfg.getMap(), null);
|
||||
assertThat(stmtsDocs.getOps(true).size()).isEqualTo(1);
|
||||
final OpTemplate opTemplate = stmtsDocs.getOps(true).get(0);
|
||||
assertThat(stmtsDocs.getOps().matching("",true).size()).isEqualTo(1);
|
||||
final OpTemplate opTemplate = stmtsDocs.getOps().matching("",true).get(0);
|
||||
final ParsedOp parsedOp = new ParsedOp(opTemplate, cfg, List.of(), getParent());
|
||||
|
||||
assertThat(parsedOp.getAsFunctionOr("d1", "invalid").apply(1L)).isEqualTo("one");
|
||||
|
@ -105,15 +105,6 @@ public class ActivityDef implements NBNamedElement {
|
||||
return parameterMap.getOptionalString("alias").orElse(DEFAULT_ALIAS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return tbe StandardActivity Driver Adapter Name
|
||||
*
|
||||
* @return the driver adapter name
|
||||
*/
|
||||
public String getActivityDriver() {
|
||||
return parameterMap.getOptionalString("type", "driver").orElse(DEFAULT_ATYPE);
|
||||
}
|
||||
|
||||
/**
|
||||
* The first cycle that will be used for execution of this activity, inclusive.
|
||||
* If the value is provided as a range as in 0..10, then the first number is the start cycle
|
||||
|
@ -35,12 +35,12 @@ public class NBCLIScenarioPreprocessorTemplateVarTest {
|
||||
cmds.forEach(System.out::println);
|
||||
|
||||
OpsDocList workload1 = OpsLoader.loadPath(cmds.get(0).getArgValue("workload"), cmds.get(0).getArgMap());
|
||||
OpTemplate optpl1 = workload1.getOps(true).get(0);
|
||||
OpTemplate optpl1 = workload1.getOps().matching("",true).get(0);
|
||||
System.out.println("op from cmd1:" + optpl1);
|
||||
assertThat(optpl1.getStmt()).contains("cycle {cycle} replaced replaced\n");
|
||||
|
||||
OpsDocList workload2 = OpsLoader.loadPath(cmds.get(1).getArgValue("workload"), cmds.get(1).getArgMap());
|
||||
OpTemplate optpl2 = workload2.getOps(true).get(0);
|
||||
OpTemplate optpl2 = workload2.getOps().matching("",true).get(0);
|
||||
System.out.println("op from cmd2:" + optpl2);
|
||||
assertThat(optpl2.getStmt()).contains("cycle {cycle} def1 def1\n");
|
||||
}
|
||||
@ -52,7 +52,7 @@ public class NBCLIScenarioPreprocessorTemplateVarTest {
|
||||
cmds.forEach(System.out::println);
|
||||
|
||||
OpsDocList workload1 = OpsLoader.loadPath(cmds.get(0).getArgValue("workload"), cmds.get(0).getArgMap());
|
||||
OpTemplate optpl1 = workload1.getOps(true).get(0);
|
||||
OpTemplate optpl1 = workload1.getOps().matching("",true).get(0);
|
||||
System.out.println("op from cmd1:" + optpl1);
|
||||
assertThat(optpl1.getStmt()).contains("cycle {cycle} overridden overridden\n");
|
||||
}
|
||||
|
@ -54,14 +54,13 @@ public class ComponentActivityInstrumentation {
|
||||
public ComponentActivityInstrumentation(final Activity activity) {
|
||||
this.activity = activity;
|
||||
def = activity.getActivityDef();
|
||||
this.hdrdigits = activity.getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3);
|
||||
params = this.def.getParams();
|
||||
hdrdigits = activity.getHdrDigits();
|
||||
initMetrics();
|
||||
}
|
||||
|
||||
private void initMetrics() {
|
||||
|
||||
|
||||
this.errorRate1m = activity.create().gauge("error_rate_1m",
|
||||
() -> {
|
||||
double result_1m_rate = this.resultTimer.getOneMinuteRate();
|
||||
|
@ -16,11 +16,11 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
|
||||
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpLookup;
|
||||
@ -58,9 +58,6 @@ import io.nosqlbench.nb.api.advisor.NBAdvisorOutput;
|
||||
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
|
||||
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricCounter;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricHistogram;
|
||||
import io.nosqlbench.nb.api.engine.metrics.instruments.NBMetricTimer;
|
||||
import io.nosqlbench.nb.api.lifecycle.Shutdownable;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
import io.nosqlbench.nb.api.config.standard.*;
|
||||
@ -77,13 +74,8 @@ import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
/// An [[Activity]] is a flywheel of operations. Each activity consumes ordinals
|
||||
@ -115,10 +107,8 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
private final RunStateTally tally = new RunStateTally();
|
||||
private ThreadLocal<RateLimiter> strideLimiterSource;
|
||||
private ThreadLocal<RateLimiter> cycleLimiterSource;
|
||||
private int nameEnumerator;
|
||||
private NBErrorHandler errorHandler;
|
||||
private final List<AutoCloseable> closeables = new ArrayList<>();
|
||||
private PrintWriter console;
|
||||
private ErrorMetrics errorMetrics;
|
||||
private Input input;
|
||||
private StandardAction<?, ?> action;
|
||||
@ -126,60 +116,53 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
public Activity(NBComponent parent, ActivityDef activityDef) {
|
||||
|
||||
super(
|
||||
parent,
|
||||
NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels())
|
||||
parent,
|
||||
NBLabels.forKV("activity", activityDef.getAlias()).and(activityDef.auxLabels())
|
||||
);
|
||||
this.activityDef = activityDef;
|
||||
this.metrics = new ActivityMetrics(this);
|
||||
|
||||
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
|
||||
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
|
||||
"workload", "yaml");
|
||||
if (workloadOpt.isPresent()) {
|
||||
activityDef.getParams().set("alias", workloadOpt.get());
|
||||
} else {
|
||||
activityDef.getParams().set(
|
||||
"alias", activityDef.getActivityDriver().toUpperCase(
|
||||
Locale.ROOT) + nameEnumerator
|
||||
);
|
||||
nameEnumerator++;
|
||||
}
|
||||
}
|
||||
getParams().set(
|
||||
"alias", Optional.ofNullable(activityDef.getAlias()).or(
|
||||
() -> getParams().getOptionalString("workload")).or(
|
||||
() -> getParams().getOptionalString("driver")).orElseThrow(
|
||||
() -> new RuntimeException(
|
||||
"Unable to determine name of activity from " + activityDef))
|
||||
);
|
||||
|
||||
OpsDocList workload;
|
||||
|
||||
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
|
||||
NBConfigModel yamlmodel;
|
||||
if (yaml_loc.isPresent()) {
|
||||
Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
|
||||
workload = OpsLoader.loadPath(yaml_loc.get(), disposable, "activities");
|
||||
yamlmodel = workload.getConfigModel();
|
||||
} else {
|
||||
yamlmodel = ConfigModel.of(Activity.class).asReadOnly();
|
||||
}
|
||||
|
||||
NBConfigModel yamlmodel = yaml_loc.map(path -> {
|
||||
return OpsLoader.loadPath(
|
||||
path, new LinkedHashMap<>(activityDef.getParams()), "activities").getConfigModel();
|
||||
}).orElse(ConfigModel.of(Activity.class).asReadOnly());
|
||||
|
||||
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
|
||||
|
||||
Optional<DriverAdapter<?, ?>> defaultAdapter = defaultDriverName.flatMap(
|
||||
name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV()));
|
||||
Optional<DriverAdapter<?, ?>> defaultAdapter = activityDef.getParams().getOptionalString(
|
||||
"driver").flatMap(name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV()));
|
||||
|
||||
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
|
||||
throw new BasicError(
|
||||
"Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this driver adapter. " + "Change '<activeByDefault>false</activeByDefault>' for the driver in " + "'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml' first.");
|
||||
"Unable to load '" + defaultDriverName.get() + "' driver adapter.\n" + "Rebuild NB5 to include this driver adapter. " + "Change '<activeByDefault>false</activeByDefault>' for the driver in " + "'./nb-adapters/pom.xml' and './nb-adapters/nb-adapters-included/pom.xml' first.");
|
||||
}
|
||||
|
||||
OpResolver opResolver = new OpResolver(() -> loadOpTemplates());
|
||||
|
||||
|
||||
// HERE, op templates are loaded before drivers are loaded
|
||||
// List<OpTemplate> opTemplates = loadOpTemplates(defaultAdapter.orElse(null), false);
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist = new ArrayList<>();
|
||||
|
||||
NBConfigModel supersetConfig = ConfigModel.of(Activity.class).add(yamlmodel);
|
||||
Optional<String> defaultDriverOption = defaultDriverName;
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers = new ConcurrentHashMap<>();
|
||||
|
||||
List<ParsedOp> allParsedOps = loadOpTemplates(
|
||||
defaultAdapter.orElse(null), false, false).stream().map(ot -> upconvert(
|
||||
ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)).toList();
|
||||
defaultAdapter.orElse(null), false, false).stream().map(ot -> upconvert(
|
||||
ot, defaultDriverOption, yamlmodel, supersetConfig, mappers, adapterlist)).toList();
|
||||
|
||||
OpLookup lookup = new OpLookupService(() -> allParsedOps);
|
||||
|
||||
@ -188,11 +171,11 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
|
||||
if (defaultDriverOption.isPresent()) {
|
||||
long matchingDefault = mappers.keySet().stream().filter(
|
||||
n -> n.equals(defaultDriverOption.get())).count();
|
||||
n -> n.equals(defaultDriverOption.get())).count();
|
||||
if (0 == matchingDefault) {
|
||||
logger.warn(
|
||||
"All op templates used a different driver than the default '{}'",
|
||||
defaultDriverOption.get()
|
||||
"All op templates used a different driver than the default '{}'",
|
||||
defaultDriverOption.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -204,22 +187,26 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
throw e;
|
||||
}
|
||||
throw new OpConfigError(
|
||||
"Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
"Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
}
|
||||
|
||||
initOpsMetrics();
|
||||
|
||||
}
|
||||
|
||||
private void initOpsMetrics() {
|
||||
create().gauge(
|
||||
"ops_pending", () -> this.getProgressMeter().getSummary().pending(),
|
||||
MetricCategory.Core,
|
||||
"The current number of operations which have not been dispatched for processing yet."
|
||||
"ops_pending", () -> this.getProgressMeter().getSummary().pending(),
|
||||
MetricCategory.Core,
|
||||
"The current number of operations which have not been dispatched for processing yet."
|
||||
);
|
||||
create().gauge(
|
||||
"ops_active", () -> this.getProgressMeter().getSummary().current(),
|
||||
MetricCategory.Core,
|
||||
"The current number of operations which have been dispatched for processing, but which have not yet completed."
|
||||
"ops_active", () -> this.getProgressMeter().getSummary().current(), MetricCategory.Core,
|
||||
"The current number of operations which have been dispatched for processing, but which have not yet completed."
|
||||
);
|
||||
create().gauge(
|
||||
"ops_complete", () -> this.getProgressMeter().getSummary().complete(),
|
||||
MetricCategory.Core, "The current number of operations which have been completed"
|
||||
"ops_complete", () -> this.getProgressMeter().getSummary().complete(),
|
||||
MetricCategory.Core, "The current number of operations which have been completed"
|
||||
);
|
||||
|
||||
|
||||
@ -227,18 +214,14 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
|
||||
|
||||
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps(
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops,
|
||||
OpLookup opLookup
|
||||
) {
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
|
||||
return createOpSourceFromParsedOps2(adapters, pops, opLookup);
|
||||
}
|
||||
|
||||
protected <O extends LongFunction> OpSequence<OpDispenser<? extends CycleOp<?>>> createOpSourceFromParsedOps2(
|
||||
// Map<String, DriverAdapter<?,?>> adapterCache,
|
||||
// Map<String, OpMapper<? extends Op>> mapperCache,
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops,
|
||||
OpLookup opLookup
|
||||
) {
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapters, List<ParsedOp> pops, OpLookup opLookup) {
|
||||
try {
|
||||
|
||||
List<Long> ratios = new ArrayList<>(pops.size());
|
||||
@ -249,9 +232,9 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
}
|
||||
|
||||
SequencerType sequencerType = getParams().getOptionalString("seq").map(
|
||||
SequencerType::valueOf).orElse(SequencerType.bucket);
|
||||
SequencerType::valueOf).orElse(SequencerType.bucket);
|
||||
SequencePlanner<OpDispenser<? extends CycleOp<?>>> planner = new SequencePlanner<>(
|
||||
sequencerType);
|
||||
sequencerType);
|
||||
|
||||
for (int i = 0; i < pops.size(); i++) {
|
||||
long ratio = ratios.get(i);
|
||||
@ -267,12 +250,12 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
OpMapper<CycleOp<?>, Space> opMapper = adapter.getOpMapper();
|
||||
LongFunction<Space> spaceFunc = adapter.getSpaceFunc(pop);
|
||||
OpDispenser<? extends CycleOp<?>> dispenser = opMapper.apply(
|
||||
this, pop, spaceFunc);
|
||||
this, pop, spaceFunc);
|
||||
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
|
||||
Dryrun dryrun = pop.takeEnumFromFieldOr(Dryrun.class, Dryrun.none, "dryrun");
|
||||
|
||||
dispenser = OpFunctionComposition.wrapOptionally(
|
||||
adapter, dispenser, pop, dryrun, opLookup);
|
||||
adapter, dispenser, pop, dryrun, opLookup);
|
||||
|
||||
// if (strict) {
|
||||
// optemplate.assertConsumed();
|
||||
@ -280,8 +263,8 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
planner.addOp((OpDispenser<? extends CycleOp<?>>) dispenser, ratio);
|
||||
} catch (Exception e) {
|
||||
throw new OpConfigError(
|
||||
"Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(),
|
||||
e
|
||||
"Error while mapping op from template named '" + pop.getName() + "': " + e.getMessage(),
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -306,39 +289,39 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
|
||||
|
||||
private ParsedOp upconvert(
|
||||
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
|
||||
NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist
|
||||
OpTemplate ot, Optional<String> defaultDriverOption, NBConfigModel yamlmodel,
|
||||
NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers,
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist
|
||||
) {
|
||||
// ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
|
||||
String driverName = ot.getOptionalStringParam("driver", String.class).or(
|
||||
() -> ot.getOptionalStringParam("type", String.class)).or(
|
||||
() -> defaultDriverOption).orElseThrow(
|
||||
() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
|
||||
() -> ot.getOptionalStringParam("type", String.class)).or(
|
||||
() -> defaultDriverOption).orElseThrow(
|
||||
() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
|
||||
|
||||
DriverAdapter<CycleOp<?>, Space> adapter = adapters.computeIfAbsent(
|
||||
driverName, dn -> loadAdapter(
|
||||
dn, yamlmodel, supersetConfig, mappers));
|
||||
driverName, dn -> loadAdapter(
|
||||
dn, yamlmodel, supersetConfig, mappers));
|
||||
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
|
||||
adapterlist.add(adapter);
|
||||
|
||||
ParsedOp pop = new ParsedOp(
|
||||
ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
|
||||
ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
|
||||
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
|
||||
|
||||
return pop;
|
||||
}
|
||||
|
||||
private DriverAdapter<CycleOp<?>, Space> loadAdapter(
|
||||
String driverName, NBConfigModel yamlmodel, NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
|
||||
String driverName, NBConfigModel yamlmodel, NBConfigModel supersetConfig,
|
||||
ConcurrentHashMap<String, OpMapper<? extends CycleOp<?>, ? extends Space>> mappers
|
||||
) {
|
||||
DriverAdapter<CycleOp<?>, Space> adapter = Optional.of(driverName).flatMap(
|
||||
name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV())).orElseThrow(() -> new OpConfigError(
|
||||
"driver adapter not present for name '" + driverName + "'"));
|
||||
name -> ServiceSelector.of(
|
||||
name, ServiceLoader.load(DriverAdapterLoader.class)).get()).map(
|
||||
l -> l.load(this, NBLabels.forKV())).orElseThrow(
|
||||
() -> new OpConfigError("driver adapter not present for name '" + driverName + "'"));
|
||||
|
||||
NBConfigModel combinedModel = yamlmodel;
|
||||
NBConfiguration combinedConfig = combinedModel.matchConfig(activityDef.getParams());
|
||||
@ -390,36 +373,17 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
}
|
||||
}
|
||||
|
||||
// @Override
|
||||
// public synchronized void onActivityDefUpdate(final ActivityDef activityDef) {
|
||||
// super.onActivityDefUpdate(activityDef);
|
||||
//
|
||||
// for (final DriverAdapter adapter : this.adapters.values())
|
||||
// if (adapter instanceof NBReconfigurable reconfigurable) {
|
||||
// NBConfigModel cfgModel = reconfigurable.getReconfigModel();
|
||||
// final Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
|
||||
// if (op_yaml_loc.isPresent()) {
|
||||
// final Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
|
||||
// final OpsDocList workload = OpsLoader.loadPath(op_yaml_loc.get(), disposable, "activities");
|
||||
// cfgModel = cfgModel.add(workload.getConfigModel());
|
||||
// }
|
||||
// final NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
|
||||
// reconfigurable.applyReconfig(cfg);
|
||||
// }
|
||||
//
|
||||
// }
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(
|
||||
OpsDocList opsDocList, Map<String, Object> cfg) {
|
||||
List<OpTemplate> opTemplates = new ArrayList<>();
|
||||
public OpTemplates getSyntheticOpTemplates(OpTemplates opsDocList, Map<String, Object> cfg) {
|
||||
OpTemplates accumulator = new OpTemplates();
|
||||
List<OpTemplates> combined = new ArrayList<>();
|
||||
for (DriverAdapter<?, ?> adapter : adapters.values()) {
|
||||
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
|
||||
List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg);
|
||||
opTemplates.addAll(newTemplates);
|
||||
OpTemplates newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg);
|
||||
accumulator = accumulator.and(newTemplates);
|
||||
}
|
||||
}
|
||||
return opTemplates;
|
||||
return accumulator;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -458,40 +422,38 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
}
|
||||
}
|
||||
|
||||
protected List<OpTemplate> loadOpTemplates(
|
||||
DriverAdapter<?, ?> defaultDriverAdapter, boolean logged, boolean filtered) {
|
||||
private OpTemplates loadOpTemplates(
|
||||
DriverAdapter<?, ?> defaultDriverAdapter, boolean logged, boolean filtered) {
|
||||
|
||||
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
|
||||
|
||||
OpsDocList opsDocList = loadStmtsDocList();
|
||||
|
||||
List<OpTemplate> filteredOps = opsDocList.getOps(filtered ? tagfilter : "", logged);
|
||||
OpTemplates templates = loadOpTemplates();
|
||||
OpTemplates filteredOps = templates.matching(filtered ? tagfilter : "", logged);
|
||||
|
||||
if (filteredOps.isEmpty()) {
|
||||
// There were no ops, and it *wasn't* because they were all filtered out.
|
||||
// In this case, let's try to synthesize the ops as long as at least a default driver was provided
|
||||
// But if there were no ops, and there was no default driver provided, we can't continue
|
||||
// There were no ops, and it was because they were all filtered out
|
||||
List<OpTemplate> unfilteredOps = opsDocList.getOps(false);
|
||||
OpTemplates unfilteredOps = templates.matching("",false);
|
||||
if (!unfilteredOps.isEmpty()) {
|
||||
String message = "There were no active op templates with tag filter '" + tagfilter + "', since all " + unfilteredOps.size() + " were filtered out. Examine the session log for details";
|
||||
NBAdvisorOutput.test(message);
|
||||
//throw new BasicError(message);
|
||||
}
|
||||
if (defaultDriverAdapter instanceof SyntheticOpTemplateProvider sotp) {
|
||||
filteredOps = sotp.getSyntheticOpTemplates(
|
||||
opsDocList, this.activityDef.getParams());
|
||||
filteredOps = sotp.getSyntheticOpTemplates(templates, this.activityDef.getParams());
|
||||
Objects.requireNonNull(filteredOps);
|
||||
if (filteredOps.isEmpty()) {
|
||||
throw new BasicError(
|
||||
"Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' + " but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
|
||||
"Attempted to create synthetic ops from driver '" + defaultDriverAdapter.getAdapterName() + '\'' + " but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
|
||||
}
|
||||
} else {
|
||||
throw new BasicError("""
|
||||
No op templates were provided. You must provide one of these activity parameters:
|
||||
1) workload=some.yaml
|
||||
2) op='inline template'
|
||||
3) driver=stdout (or any other drive that can synthesize ops)""");
|
||||
No op templates were provided. You must provide one of these activity parameters:
|
||||
1) workload=some.yaml
|
||||
2) op='inline template'
|
||||
3) driver=stdout (or any other drive that can synthesize ops)""");
|
||||
}
|
||||
}
|
||||
return filteredOps;
|
||||
@ -506,7 +468,7 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
@param seq
|
||||
- The {@link OpSequence} to derive the defaults from
|
||||
*/
|
||||
public synchronized void setDefaultsFromOpSequence(OpSequence<?> seq) {
|
||||
private synchronized void setDefaultsFromOpSequence(OpSequence<?> seq) {
|
||||
Optional<String> strideOpt = getParams().getOptionalString("stride");
|
||||
if (strideOpt.isEmpty()) {
|
||||
String stride = String.valueOf(seq.getSequence().length);
|
||||
@ -524,14 +486,14 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
} else {
|
||||
if (0 == activityDef.getCycleCount()) {
|
||||
throw new RuntimeException(
|
||||
"You specified cycles, but the range specified means zero cycles: " + getParams().get(
|
||||
"cycles"));
|
||||
"You specified cycles, but the range specified means zero cycles: " + getParams().get(
|
||||
"cycles"));
|
||||
}
|
||||
long stride = getParams().getOptionalLong("stride").orElseThrow();
|
||||
long cycles = this.activityDef.getCycleCount();
|
||||
if (cycles < stride) {
|
||||
throw new RuntimeException(
|
||||
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." + " If this was intended, then set stride low enough to allow it.");
|
||||
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." + " If this was intended, then set stride low enough to allow it.");
|
||||
}
|
||||
}
|
||||
|
||||
@ -540,7 +502,7 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
|
||||
if (0 < stride && 0 != cycleCount % stride) {
|
||||
logger.warn(
|
||||
() -> "The stride does not evenly divide cycles. Only full strides will be executed," + "leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
|
||||
() -> "The stride does not evenly divide cycles. Only full strides will be executed," + "leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
|
||||
}
|
||||
|
||||
Optional<String> threadSpec = activityDef.getParams().getOptionalString("threads");
|
||||
@ -552,9 +514,7 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
if (threads > activityDef.getCycleCount()) {
|
||||
threads = (int) activityDef.getCycleCount();
|
||||
logger.info(
|
||||
"setting threads to {} (auto) [10xCORES, cycle count limited]",
|
||||
threads
|
||||
);
|
||||
"setting threads to {} (auto) [10xCORES, cycle count limited]", threads);
|
||||
} else {
|
||||
logger.info("setting threads to {} (auto) [10xCORES]", threads);
|
||||
}
|
||||
@ -574,124 +534,118 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
|
||||
if (activityDef.getThreads() > activityDef.getCycleCount()) {
|
||||
logger.warn(
|
||||
() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary() + ", you should have more cycles than threads.");
|
||||
() -> "threads=" + activityDef.getThreads() + " and cycles=" + activityDef.getCycleSummary() + ", you should have more cycles than threads.");
|
||||
}
|
||||
|
||||
} else if (1000 < cycleCount) {
|
||||
logger.warn(
|
||||
() -> "For testing at scale, it is highly recommended that you " + "set threads to a value higher than the default of 1." + " hint: you can use threads=auto for reasonable default, or" + " consult the topic on threads with `help threads` for" + " more information.");
|
||||
() -> "For testing at scale, it is highly recommended that you " + "set threads to a value higher than the default of 1." + " hint: you can use threads=auto for reasonable default, or" + " consult the topic on threads with `help threads` for" + " more information.");
|
||||
}
|
||||
|
||||
if (0 < this.activityDef.getCycleCount() && seq.getOps().isEmpty()) {
|
||||
throw new BasicError(
|
||||
"You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity.");
|
||||
"You have configured a zero-length sequence and non-zero cycles. It is not possible to continue with this activity.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
Given a function that can create an op of type <O> from an OpTemplate, generate
|
||||
an indexed sequence of ready to call operations.
|
||||
<p>
|
||||
This method uses the following conventions to derive the sequence:
|
||||
|
||||
<OL>
|
||||
<LI>If an 'op', 'stmt', or 'statement' parameter is provided, then it's value is
|
||||
taken as the only provided statement.</LI>
|
||||
<LI>If a 'yaml, or 'workload' parameter is provided, then the statements in that file
|
||||
are taken with their ratios </LI>
|
||||
<LI>Any provided tags filter is used to select only the op templates which have matching
|
||||
tags. If no tags are provided, then all the found op templates are included.</LI>
|
||||
<LI>The ratios and the 'seq' parameter are used to build a sequence of the ready operations,
|
||||
where the sequence length is the sum of the ratios.</LI>
|
||||
</OL>
|
||||
@param <O>
|
||||
A holder for an executable operation for the native driver used by this activity.
|
||||
@param opinit
|
||||
A function to map an OpTemplate to the executable operation form required by
|
||||
the native driver for this activity.
|
||||
@param defaultAdapter
|
||||
The adapter which will be used for any op templates with no explicit adapter
|
||||
@return The sequence of operations as determined by filtering and ratios
|
||||
*/
|
||||
@Deprecated(forRemoval = true)
|
||||
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(
|
||||
Function<OpTemplate, OpDispenser<? extends O>> opinit, boolean strict,
|
||||
DriverAdapter<?, ?> defaultAdapter
|
||||
) {
|
||||
|
||||
List<OpTemplate> stmts = loadOpTemplates(defaultAdapter, true, false);
|
||||
|
||||
List<Long> ratios = new ArrayList<>(stmts.size());
|
||||
|
||||
for (OpTemplate opTemplate : stmts) {
|
||||
long ratio = opTemplate.removeParamOrDefault("ratio", 1);
|
||||
ratios.add(ratio);
|
||||
}
|
||||
|
||||
SequencerType sequencerType = getParams().getOptionalString("seq").map(
|
||||
SequencerType::valueOf).orElse(SequencerType.bucket);
|
||||
|
||||
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
|
||||
|
||||
try {
|
||||
for (int i = 0; i < stmts.size(); i++) {
|
||||
long ratio = ratios.get(i);
|
||||
OpTemplate optemplate = stmts.get(i);
|
||||
OpDispenser<? extends O> driverSpecificReadyOp = opinit.apply(optemplate);
|
||||
if (strict) {
|
||||
optemplate.assertConsumed();
|
||||
}
|
||||
planner.addOp(driverSpecificReadyOp, ratio);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new OpConfigError(e.getMessage(), workloadSource, e);
|
||||
}
|
||||
|
||||
return planner.resolve();
|
||||
}
|
||||
|
||||
protected OpsDocList loadStmtsDocList() {
|
||||
// /**
|
||||
// Given a function that can create an op of type <O> from an OpTemplate, generate
|
||||
// an indexed sequence of ready to call operations.
|
||||
// <p>
|
||||
// This method uses the following conventions to derive the sequence:
|
||||
//
|
||||
// <OL>
|
||||
// <LI>If an 'op', 'stmt', or 'statement' parameter is provided, then it's value is
|
||||
// taken as the only provided statement.</LI>
|
||||
// <LI>If a 'yaml, or 'workload' parameter is provided, then the statements in that file
|
||||
// are taken with their ratios </LI>
|
||||
// <LI>Any provided tags filter is used to select only the op templates which have matching
|
||||
// tags. If no tags are provided, then all the found op templates are included.</LI>
|
||||
// <LI>The ratios and the 'seq' parameter are used to build a sequence of the ready operations,
|
||||
// where the sequence length is the sum of the ratios.</LI>
|
||||
// </OL>
|
||||
// @param <O>
|
||||
// A holder for an executable operation for the native driver used by this activity.
|
||||
// @param opinit
|
||||
// A function to map an OpTemplate to the executable operation form required by
|
||||
// the native driver for this activity.
|
||||
// @param defaultAdapter
|
||||
// The adapter which will be used for any op templates with no explicit adapter
|
||||
// @return The sequence of operations as determined by filtering and ratios
|
||||
// */
|
||||
// @Deprecated(forRemoval = true)
|
||||
// protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(
|
||||
// Function<OpTemplate, OpDispenser<? extends O>> opinit, boolean strict,
|
||||
// DriverAdapter<?, ?> defaultAdapter
|
||||
// ) {
|
||||
//
|
||||
// List<OpTemplate> stmts = loadOpTemplates(defaultAdapter, true, false);
|
||||
//
|
||||
// List<Long> ratios = new ArrayList<>(stmts.size());
|
||||
//
|
||||
// for (OpTemplate opTemplate : stmts) {
|
||||
// long ratio = opTemplate.removeParamOrDefault("ratio", 1);
|
||||
// ratios.add(ratio);
|
||||
// }
|
||||
//
|
||||
// SequencerType sequencerType = getParams().getOptionalString("seq").map(
|
||||
// SequencerType::valueOf).orElse(SequencerType.bucket);
|
||||
//
|
||||
// SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
|
||||
//
|
||||
// try {
|
||||
// for (int i = 0; i < stmts.size(); i++) {
|
||||
// long ratio = ratios.get(i);
|
||||
// OpTemplate optemplate = stmts.get(i);
|
||||
// OpDispenser<? extends O> driverSpecificReadyOp = opinit.apply(optemplate);
|
||||
// if (strict) {
|
||||
// optemplate.assertConsumed();
|
||||
// }
|
||||
// planner.addOp(driverSpecificReadyOp, ratio);
|
||||
// }
|
||||
// } catch (Exception e) {
|
||||
// throw new OpConfigError(e.getMessage(), workloadSource, e);
|
||||
// }
|
||||
//
|
||||
// return planner.resolve();
|
||||
// }
|
||||
|
||||
/// TODO: Move this out, adjacent to [OpsLoader]
|
||||
protected OpTemplates loadOpTemplates() {
|
||||
OpsDocList opsDocs = null;
|
||||
try {
|
||||
String op = activityDef.getParams().getOptionalString("op").orElse(null);
|
||||
String stmt = activityDef.getParams().getOptionalString("stmt", "statement").orElse(
|
||||
null);
|
||||
null);
|
||||
String workload = activityDef.getParams().getOptionalString("workload").orElse(null);
|
||||
|
||||
if ((op != null ? 1 : 0) + (stmt != null ? 1 : 0) + (workload != null ? 1 : 0) > 1) {
|
||||
throw new OpConfigError(
|
||||
"Only op, statement, or workload may be provided, not more than one.");
|
||||
}
|
||||
|
||||
|
||||
if (workload != null && OpsLoader.isJson(workload)) {
|
||||
"Only op, statement, or workload may be provided, not more than one.");
|
||||
} else if (workload != null && OpsLoader.isJson(workload)) {
|
||||
workloadSource = "commandline: (workload/json):" + workload;
|
||||
return OpsLoader.loadString(
|
||||
workload, OpTemplateFormat.json, activityDef.getParams(), null);
|
||||
opsDocs = OpsLoader.loadString(
|
||||
workload, OpTemplateFormat.json, activityDef.getParams(), null);
|
||||
} else if (workload != null && OpsLoader.isYaml(workload)) {
|
||||
workloadSource = "commandline: (workload/yaml):" + workload;
|
||||
return OpsLoader.loadString(
|
||||
workload, OpTemplateFormat.yaml, activityDef.getParams(), null);
|
||||
opsDocs= OpsLoader.loadString(
|
||||
workload, OpTemplateFormat.yaml, activityDef.getParams(), null);
|
||||
} else if (workload != null) {
|
||||
return OpsLoader.loadPath(workload, activityDef.getParams(), "activities");
|
||||
}
|
||||
|
||||
if (stmt != null) {
|
||||
opsDocs= OpsLoader.loadPath(workload, activityDef.getParams(), "activities");
|
||||
} else if (stmt != null) {
|
||||
workloadSource = "commandline: (stmt/inline): '" + stmt + "'";
|
||||
return OpsLoader.loadString(
|
||||
stmt, OpTemplateFormat.inline, activityDef.getParams(), null);
|
||||
}
|
||||
|
||||
if (op != null && OpsLoader.isJson(op)) {
|
||||
opsDocs= OpsLoader.loadString(
|
||||
stmt, OpTemplateFormat.inline, activityDef.getParams(), null);
|
||||
} else if (op != null && OpsLoader.isJson(op)) {
|
||||
workloadSource = "commandline: (op/json): '" + op + "'";
|
||||
return OpsLoader.loadString(
|
||||
op, OpTemplateFormat.json, activityDef.getParams(), null);
|
||||
opsDocs= OpsLoader.loadString(
|
||||
op, OpTemplateFormat.json, activityDef.getParams(), null);
|
||||
} else if (op != null) {
|
||||
workloadSource = "commandline: (op/inline): '" + op + "'";
|
||||
return OpsLoader.loadString(
|
||||
op, OpTemplateFormat.inline, activityDef.getParams(), null);
|
||||
opsDocs= OpsLoader.loadString(
|
||||
op, OpTemplateFormat.inline, activityDef.getParams(), null);
|
||||
}
|
||||
return OpsDocList.none();
|
||||
return new OpTemplates(opsDocs);
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new OpConfigError("Error loading op templates: " + e, workloadSource, e);
|
||||
@ -735,20 +689,16 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
|
||||
|
||||
activityDef.getParams().getOptionalNamedParameter("striderate").map(
|
||||
StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
|
||||
StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
|
||||
|
||||
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate").map(
|
||||
CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
|
||||
CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
|
||||
|
||||
}
|
||||
|
||||
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
|
||||
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(
|
||||
this, strideLimiterSource, spec);
|
||||
}
|
||||
|
||||
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
|
||||
cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
|
||||
this, strideLimiterSource, spec);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -801,8 +751,8 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
public synchronized NBErrorHandler getErrorHandler() {
|
||||
if (null == this.errorHandler) {
|
||||
errorHandler = new NBErrorHandler(
|
||||
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
|
||||
this::getExceptionMetrics
|
||||
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
|
||||
this::getExceptionMetrics
|
||||
);
|
||||
}
|
||||
return errorHandler;
|
||||
@ -811,7 +761,7 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
public void closeAutoCloseables() {
|
||||
for (AutoCloseable closeable : closeables) {
|
||||
logger.debug(
|
||||
() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
|
||||
() -> "CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
|
||||
try {
|
||||
closeable.close();
|
||||
} catch (Exception e) {
|
||||
@ -826,25 +776,10 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
return this.getActivityDef().getAlias().compareTo(o.getActivityDef().getAlias());
|
||||
}
|
||||
|
||||
public void registerAutoCloseable(AutoCloseable closeable) {
|
||||
this.closeables.add(closeable);
|
||||
}
|
||||
|
||||
public synchronized PrintWriter getConsoleOut() {
|
||||
if (null == console) {
|
||||
this.console = new PrintWriter(System.out, false, StandardCharsets.UTF_8);
|
||||
}
|
||||
return this.console;
|
||||
}
|
||||
|
||||
public synchronized InputStream getConsoleIn() {
|
||||
return System.in;
|
||||
}
|
||||
|
||||
public void setConsoleOut(PrintWriter writer) {
|
||||
this.console = writer;
|
||||
}
|
||||
|
||||
// public void registerAutoCloseable(AutoCloseable closeable) {
|
||||
// this.closeables.add(closeable);
|
||||
// }
|
||||
//
|
||||
public synchronized ErrorMetrics getExceptionMetrics() {
|
||||
if (null == this.errorMetrics) {
|
||||
errorMetrics = new ErrorMetrics(this);
|
||||
@ -857,25 +792,20 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
return getActivityDef().getAlias();
|
||||
}
|
||||
|
||||
public int getHdrDigits() {
|
||||
return getComponentProp("hdr_digits").map(Integer::parseInt).orElse(3);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Motor getMotor(ActivityDef activityDef, int slot) {
|
||||
return new CoreMotor(this,slot,getInput(),getAction(),getOutput());
|
||||
return new CoreMotor(this, slot, getInput(), getAction(), getOutput());
|
||||
}
|
||||
|
||||
public synchronized Input getInput() {
|
||||
if (input==null) {
|
||||
this.input = new AtomicInput(this,this.getActivityDef());
|
||||
if (input == null) {
|
||||
this.input = new AtomicInput(this, this.getActivityDef());
|
||||
}
|
||||
return this.input;
|
||||
}
|
||||
|
||||
public synchronized SyncAction getAction() {
|
||||
if (this.action==null) {
|
||||
if (this.action == null) {
|
||||
this.action = new StandardAction(this);
|
||||
}
|
||||
return this.action;
|
||||
@ -885,4 +815,10 @@ public class Activity<R extends java.util.function.LongFunction, S> extends NBSt
|
||||
// TODO: Implement this as optional, only composing the optional behavior if required
|
||||
return null;
|
||||
}
|
||||
|
||||
private void createOrUpdateCycleLimiter(SimRateSpec spec) {
|
||||
cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,63 @@
|
||||
package io.nosqlbench.engine.api.activityimpl.uniform;
|
||||
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplates;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.Space;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.nosqlbench.nb.api.tagging.TagFilter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/// TODO: Auto-inject the default driver name into any op that doesn't have it set
|
||||
///
|
||||
/// ## Requirements for core
|
||||
/// * All lookups must be lazy init
|
||||
/// * All lookups must be cached
|
||||
/// * All cache state must be extractable as plan
|
||||
///
|
||||
/// ## Requirements for callers
|
||||
/// * Callers must be able to look up an op template by tag filter
|
||||
/// * Callers must be able to look up a parsed op by tag filter
|
||||
public class OpResolver {
|
||||
private OpTemplates opTemplates;
|
||||
private final Supplier<OpTemplates> optSupplier;
|
||||
List<DriverAdapter<CycleOp<?>, Space>> adapterlist = new ArrayList<>();
|
||||
|
||||
public OpResolver(Supplier<OpTemplates> optSupplier) {
|
||||
this.optSupplier = optSupplier;
|
||||
}
|
||||
|
||||
/// Find a required op template matching a tag filter
|
||||
public synchronized OpTemplate findOne(String tagFilter) {
|
||||
return findOneOptional(tagFilter).orElseThrow(
|
||||
() -> new OpConfigError("No op found for " + tagFilter));
|
||||
}
|
||||
|
||||
private synchronized void load() {
|
||||
if (opTemplates==null) {
|
||||
opTemplates=optSupplier.get();
|
||||
}
|
||||
}
|
||||
|
||||
/// Find an optional op template matching a tag filter
|
||||
public synchronized Optional<OpTemplate> findOneOptional(String tagFilter) {
|
||||
List<OpTemplate> matching = lookup(tagFilter);
|
||||
if (matching.size() > 1) {
|
||||
throw new OpConfigError(
|
||||
"Found more than one op templates with the tag filter: " + tagFilter);
|
||||
}
|
||||
return matching.size() == 1 ? Optional.of(matching.get(0)) : Optional.empty();
|
||||
}
|
||||
|
||||
/// Find any op templates matching a tag filter
|
||||
public synchronized List<OpTemplate> lookup(String tagFilter) {
|
||||
load();
|
||||
TagFilter tf = new TagFilter(tagFilter);
|
||||
return opTemplates.stream().filter(tf::matchesTagged).toList();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user