mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
op dispensers remember their parent adapter
This commit is contained in:
parent
d80820b963
commit
a7c29b2825
@ -42,7 +42,7 @@ public class Cqld4DriverAdapter extends BaseDriverAdapter<Op, Cqld4Space> {
|
||||
public OpMapper<Op> getOpMapper() {
|
||||
DriverSpaceCache<? extends Cqld4Space> spaceCache = getSpaceCache();
|
||||
NBConfiguration config = getConfiguration();
|
||||
return new Cqld4CoreOpMapper(config, spaceCache);
|
||||
return new Cqld4CoreOpMapper(this, config, spaceCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -22,6 +22,7 @@ import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -33,8 +34,8 @@ public abstract class BaseCqlStmtDispenser extends BaseOpDispenser<Cqld4CqlOp> {
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final boolean isRetryReplace;
|
||||
|
||||
public BaseCqlStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp op) {
|
||||
super(op);
|
||||
public BaseCqlStmtDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, ParsedOp op) {
|
||||
super(adapter, op);
|
||||
this.sessionFunc = sessionFunc;
|
||||
this.maxpages = op.getStaticConfigOr("maxpages",1);
|
||||
this.isRetryReplace = op.getStaticConfigOr("retryreplace",false);
|
||||
|
@ -131,11 +131,13 @@ public class CQLD4PreparedStmtDiagnostics {
|
||||
try {
|
||||
bound = CQLD4PreparedStmtDiagnostics.bindStatement(bound, defname, value, type);
|
||||
} catch (ClassCastException cce) {
|
||||
String fullValue = value.toString();
|
||||
String valueToPrint = fullValue.length() > 100 ? fullValue.substring(0,100) + " ... (abbreviated for console, since the size is " + fullValue.length() + ")" : fullValue;
|
||||
String errormsg = String.format(
|
||||
"Unable to bind column '%s' to cql type '%s' with value '%s' (class '%s')",
|
||||
defname,
|
||||
type.asCql(false, false),
|
||||
value,
|
||||
valueToPrint,
|
||||
value.getClass().getCanonicalName()
|
||||
);
|
||||
logger.error(errormsg);
|
||||
|
@ -22,6 +22,7 @@ import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import groovy.lang.Script;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4FluentGraphOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.virtdata.core.bindings.Bindings;
|
||||
@ -40,13 +41,14 @@ public class Cqld4FluentGraphOpDispenser extends BaseOpDispenser<Op> {
|
||||
private final ThreadLocal<Script> tlScript;
|
||||
|
||||
public Cqld4FluentGraphOpDispenser(
|
||||
DriverAdapter adapter,
|
||||
ParsedOp optpl,
|
||||
LongFunction<? extends String> graphnameFunc,
|
||||
LongFunction<CqlSession> sessionFunc,
|
||||
Bindings virtdataBindings,
|
||||
Supplier<Script> scriptSource
|
||||
) {
|
||||
super(optpl);
|
||||
super(adapter, optpl);
|
||||
this.graphnameFunc = graphnameFunc;
|
||||
this.sessionFunc = sessionFunc;
|
||||
this.virtdataBindings = virtdataBindings;
|
||||
|
@ -21,6 +21,7 @@ import com.datastax.dse.driver.api.core.graph.ScriptGraphStatementBuilder;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4ScriptGraphOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.Optional;
|
||||
@ -32,8 +33,8 @@ public class Cqld4GremlinOpDispenser extends BaseOpDispenser<Cqld4ScriptGraphOp>
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final LongFunction<Long> diagFunc;
|
||||
|
||||
public Cqld4GremlinOpDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
|
||||
super(cmd);
|
||||
public Cqld4GremlinOpDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
|
||||
super(adapter,cmd);
|
||||
this.sessionFunc = sessionFunc;
|
||||
this.diagFunc = cmd.getAsFunctionOr("diag", 0L);
|
||||
|
||||
|
@ -23,6 +23,7 @@ import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedStringTemplate;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -40,8 +41,9 @@ public class Cqld4PreparedStmtDispenser extends BaseCqlStmtDispenser {
|
||||
private PreparedStatement preparedStmt;
|
||||
private CqlSession boundSession;
|
||||
|
||||
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp op, ParsedStringTemplate stmtTpl, RSProcessors processors) {
|
||||
super(sessionFunc, op);
|
||||
public Cqld4PreparedStmtDispenser(
|
||||
DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, ParsedOp op, ParsedStringTemplate stmtTpl, RSProcessors processors) {
|
||||
super(adapter, sessionFunc, op);
|
||||
if (op.isDynamic("space")) {
|
||||
throw new RuntimeException("Prepared statements and dynamic space values are not supported." +
|
||||
" This would churn the prepared statement cache, defeating the purpose of prepared statements.");
|
||||
|
@ -22,6 +22,7 @@ import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -31,8 +32,8 @@ public class Cqld4RawStmtDispenser extends BaseCqlStmtDispenser {
|
||||
private final LongFunction<Statement> stmtFunc;
|
||||
private final LongFunction<String> targetFunction;
|
||||
|
||||
public Cqld4RawStmtDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
|
||||
super(sessionFunc, cmd);
|
||||
public Cqld4RawStmtDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
|
||||
super(adapter, sessionFunc, cmd);
|
||||
this.targetFunction=targetFunction;
|
||||
this.stmtFunc = createStmtFunc(cmd);
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -29,8 +30,8 @@ public class Cqld4SimpleCqlStmtDispenser extends BaseCqlStmtDispenser {
|
||||
private final LongFunction<Statement> stmtFunc;
|
||||
private final LongFunction<String> targetFunction;
|
||||
|
||||
public Cqld4SimpleCqlStmtDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
|
||||
super(sessionFunc,cmd);
|
||||
public Cqld4SimpleCqlStmtDispenser(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
|
||||
super(adapter, sessionFunc,cmd);
|
||||
this.targetFunction=targetFunction;
|
||||
this.stmtFunc =createStmtFunc(cmd);
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4SimpleCqlStmtDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -28,14 +29,16 @@ import java.util.function.LongFunction;
|
||||
public class CqlD4CqlSimpleStmtMapper implements OpMapper<Cqld4CqlOp> {
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final LongFunction<String> targetFunction;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public CqlD4CqlSimpleStmtMapper(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction) {
|
||||
public CqlD4CqlSimpleStmtMapper(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction) {
|
||||
this.sessionFunc = sessionFunc;
|
||||
this.targetFunction = targetFunction;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<? extends Cqld4CqlOp> apply(ParsedOp op) {
|
||||
return new Cqld4SimpleCqlStmtDispenser(sessionFunc,targetFunction, op);
|
||||
return new Cqld4SimpleCqlStmtDispenser(adapter, sessionFunc,targetFunction, op);
|
||||
}
|
||||
}
|
||||
|
@ -25,8 +25,9 @@ import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||
import io.nosqlbench.nb.api.config.params.ParamsParser;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedStringTemplate;
|
||||
@ -40,10 +41,12 @@ public class CqlD4PreparedStmtMapper implements OpMapper<Cqld4CqlOp> {
|
||||
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final TypeAndTarget<CqlD4OpType, String> target;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public CqlD4PreparedStmtMapper(LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType,String> target) {
|
||||
public CqlD4PreparedStmtMapper(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType,String> target) {
|
||||
this.sessionFunc=sessionFunc;
|
||||
this.target = target;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
public OpDispenser<Cqld4CqlOp> apply(ParsedOp op) {
|
||||
@ -67,7 +70,7 @@ public class CqlD4PreparedStmtMapper implements OpMapper<Cqld4CqlOp> {
|
||||
});
|
||||
});
|
||||
|
||||
return new Cqld4PreparedStmtDispenser(sessionFunc, op, stmtTpl, processors);
|
||||
return new Cqld4PreparedStmtDispenser(adapter, sessionFunc, op, stmtTpl, processors);
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4RawStmtDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -29,14 +30,16 @@ public class CqlD4RawStmtMapper implements OpMapper<Cqld4CqlOp> {
|
||||
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final LongFunction<String> targetFunction;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public CqlD4RawStmtMapper(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction) {
|
||||
public CqlD4RawStmtMapper(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction) {
|
||||
this.sessionFunc = sessionFunc;
|
||||
this.targetFunction = targetFunction;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<? extends Cqld4CqlOp> apply(ParsedOp op) {
|
||||
return new Cqld4RawStmtDispenser(sessionFunc, targetFunction, op);
|
||||
return new Cqld4RawStmtDispenser(adapter, sessionFunc, targetFunction, op);
|
||||
}
|
||||
}
|
||||
|
@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Space;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
@ -33,10 +34,12 @@ public class Cqld4CoreOpMapper implements OpMapper<Op> {
|
||||
|
||||
private final DriverSpaceCache<? extends Cqld4Space> cache;
|
||||
private final NBConfiguration cfg;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public Cqld4CoreOpMapper(NBConfiguration config, DriverSpaceCache<? extends Cqld4Space> cache) {
|
||||
public Cqld4CoreOpMapper(DriverAdapter adapter, NBConfiguration config, DriverSpaceCache<? extends Cqld4Space> cache) {
|
||||
this.cfg = config;
|
||||
this.cache = cache;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -61,11 +64,11 @@ public class Cqld4CoreOpMapper implements OpMapper<Op> {
|
||||
TypeAndTarget<CqlD4OpType, String> target = op.getTypeAndTarget(CqlD4OpType.class, String.class, "type", "stmt");
|
||||
|
||||
return switch (target.enumId) {
|
||||
case raw -> new CqlD4RawStmtMapper(sessionFunc, target.targetFunction).apply(op);
|
||||
case simple -> new CqlD4CqlSimpleStmtMapper(sessionFunc, target.targetFunction).apply(op);
|
||||
case prepared -> new CqlD4PreparedStmtMapper(sessionFunc, target).apply(op);
|
||||
case gremlin -> new Cqld4GremlinOpMapper(sessionFunc, target.targetFunction).apply(op);
|
||||
case fluent -> new Cqld4FluentGraphOpMapper(sessionFunc, target).apply(op);
|
||||
case raw -> new CqlD4RawStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
||||
case simple -> new CqlD4CqlSimpleStmtMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
||||
case prepared -> new CqlD4PreparedStmtMapper(adapter, sessionFunc, target).apply(op);
|
||||
case gremlin -> new Cqld4GremlinOpMapper(adapter, sessionFunc, target.targetFunction).apply(op);
|
||||
case fluent -> new Cqld4FluentGraphOpMapper(adapter, sessionFunc, target).apply(op);
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@ import groovy.lang.Script;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4FluentGraphOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||
@ -37,7 +38,10 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSo
|
||||
import org.codehaus.groovy.control.CompilerConfiguration;
|
||||
import org.codehaus.groovy.control.customizers.ImportCustomizer;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@ -46,11 +50,13 @@ public class Cqld4FluentGraphOpMapper implements OpMapper<Op> {
|
||||
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final TypeAndTarget<CqlD4OpType, String> target;
|
||||
private final DriverAdapter adapter;
|
||||
private GraphTraversalSource gtsPlaceHolder;
|
||||
|
||||
public Cqld4FluentGraphOpMapper(LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType, String> target) {
|
||||
public Cqld4FluentGraphOpMapper(DriverAdapter adapter, LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType, String> target) {
|
||||
this.sessionFunc = sessionFunc;
|
||||
this.target = target;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -82,7 +88,7 @@ public class Cqld4FluentGraphOpMapper implements OpMapper<Op> {
|
||||
LongFunction<? extends String> graphnameFunc = op.getAsRequiredFunction("graphname");
|
||||
Bindings virtdataBindings = new BindingsTemplate(fluent.getBindPoints()).resolveBindings();
|
||||
|
||||
return new Cqld4FluentGraphOpDispenser(op, graphnameFunc, sessionFunc, virtdataBindings, supplier);
|
||||
return new Cqld4FluentGraphOpDispenser(adapter, op, graphnameFunc, sessionFunc, virtdataBindings, supplier);
|
||||
}
|
||||
|
||||
private String[] expandClassNames(List l) {
|
||||
|
@ -21,6 +21,7 @@ import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4GremlinOpDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4ScriptGraphOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -28,13 +29,15 @@ import java.util.function.LongFunction;
|
||||
public class Cqld4GremlinOpMapper implements OpMapper<Cqld4ScriptGraphOp> {
|
||||
private final LongFunction<CqlSession> sessionFunc;
|
||||
private final LongFunction<String> targetFunction;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public Cqld4GremlinOpMapper(LongFunction<CqlSession> session, LongFunction<String> targetFunction) {
|
||||
public Cqld4GremlinOpMapper(DriverAdapter adapter, LongFunction<CqlSession> session, LongFunction<String> targetFunction) {
|
||||
this.sessionFunc = session;
|
||||
this.targetFunction = targetFunction;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
public OpDispenser<Cqld4ScriptGraphOp> apply(ParsedOp op) {
|
||||
return new Cqld4GremlinOpDispenser(sessionFunc, targetFunction, op);
|
||||
return new Cqld4GremlinOpDispenser(adapter, sessionFunc, targetFunction, op);
|
||||
}
|
||||
}
|
||||
|
@ -17,9 +17,13 @@
|
||||
package io.nosqlbench.adapter.diag;
|
||||
|
||||
|
||||
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import io.nosqlbench.nb.api.config.params.NBParams;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
@ -35,7 +39,7 @@ import java.util.Optional;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Service(value = DriverAdapter.class, selector = "diag")
|
||||
public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> {
|
||||
public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> implements SyntheticOpTemplateProvider {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(DiagDriverAdapter.class);
|
||||
private DiagOpMapper mapper;
|
||||
@ -47,7 +51,7 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> {
|
||||
@Override
|
||||
public synchronized OpMapper<DiagOp> getOpMapper() {
|
||||
if (this.mapper == null) {
|
||||
this.mapper = new DiagOpMapper(getSpaceCache());
|
||||
this.mapper = new DiagOpMapper(this, getSpaceCache());
|
||||
}
|
||||
return this.mapper;
|
||||
}
|
||||
@ -76,7 +80,7 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> {
|
||||
return List.of(
|
||||
stmt -> {
|
||||
if (stmt.matches("^\\w+$")) {
|
||||
return Optional.of(new LinkedHashMap<String, Object>(Map.of("type",stmt)));
|
||||
return Optional.of(new LinkedHashMap<String, Object>(Map.of("type", stmt)));
|
||||
} else {
|
||||
return Optional.empty();
|
||||
}
|
||||
@ -90,10 +94,14 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> {
|
||||
super.applyConfig(cfg);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void applyReconfig(NBConfiguration cfg) {
|
||||
super.applyReconfig(cfg);
|
||||
NBReconfigurable.applyMatching(cfg,List.of(mapper));
|
||||
NBReconfigurable.applyMatching(cfg, List.of(mapper));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(StmtsDocList stmtsDocList, Map<String, Object> params) {
|
||||
return StatementsLoader.loadString("ops: 'log:level=INFO'", params).getStmts();
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package io.nosqlbench.adapter.diag;
|
||||
import io.nosqlbench.adapter.diag.optasks.DiagTask;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
@ -38,8 +39,8 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp> implements NBReconf
|
||||
private LongFunction<DiagSpace> spaceF;
|
||||
private OpFunc opFuncs;
|
||||
|
||||
public DiagOpDispenser(ParsedOp op) {
|
||||
super(op);
|
||||
public DiagOpDispenser(DriverAdapter adapter, ParsedOp op) {
|
||||
super(adapter,op);
|
||||
this.opFunc = resolveOpFunc(op);
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ package io.nosqlbench.adapter.diag;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
|
||||
@ -32,14 +33,16 @@ import java.util.function.LongFunction;
|
||||
public class DiagOpMapper implements OpMapper<DiagOp>, NBReconfigurable {
|
||||
private final DriverSpaceCache<? extends DiagSpace> spaceCache;
|
||||
private final Map<String,DiagOpDispenser> dispensers = new LinkedHashMap<>();
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public DiagOpMapper(DriverSpaceCache<? extends DiagSpace> spaceCache) {
|
||||
public DiagOpMapper(DriverAdapter adapter, DriverSpaceCache<? extends DiagSpace> spaceCache) {
|
||||
this.spaceCache = spaceCache;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<? extends DiagOp> apply(ParsedOp op) {
|
||||
DiagOpDispenser dispenser = new DiagOpDispenser(op);
|
||||
DiagOpDispenser dispenser = new DiagOpDispenser(adapter,op);
|
||||
LongFunction<String> spaceName = op.getAsFunctionOr("space", "default");
|
||||
LongFunction<DiagSpace> spacef = l -> spaceCache.get(spaceName.apply(l));
|
||||
dispensers.put(op.getName(),dispenser);
|
||||
|
@ -35,7 +35,7 @@ public class DynamoDBDriverAdapter extends BaseDriverAdapter<DynamoDBOp, DynamoD
|
||||
public OpMapper<DynamoDBOp> getOpMapper() {
|
||||
DriverSpaceCache<? extends DynamoDBSpace> spaceCache = getSpaceCache();
|
||||
NBConfiguration adapterConfig = getConfiguration();
|
||||
return new DynamoDBOpMapper(adapterConfig, spaceCache);
|
||||
return new DynamoDBOpMapper(this, adapterConfig, spaceCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,6 +21,7 @@ import io.nosqlbench.adapter.dynamodb.opdispensers.*;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||
@ -30,10 +31,12 @@ public class DynamoDBOpMapper implements OpMapper<DynamoDBOp> {
|
||||
|
||||
private final NBConfiguration cfg;
|
||||
private final DriverSpaceCache<? extends DynamoDBSpace> cache;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public DynamoDBOpMapper(NBConfiguration cfg, DriverSpaceCache<? extends DynamoDBSpace> cache) {
|
||||
public DynamoDBOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends DynamoDBSpace> cache) {
|
||||
this.cfg = cfg;
|
||||
this.cache = cache;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -52,11 +55,11 @@ public class DynamoDBOpMapper implements OpMapper<DynamoDBOp> {
|
||||
} else {
|
||||
TypeAndTarget<DynamoDBCmdType,String> cmdType = op.getTypeAndTarget(DynamoDBCmdType.class,String.class);
|
||||
return switch (cmdType.enumId) {
|
||||
case CreateTable -> new DDBCreateTableOpDispenser(ddb, op, cmdType.targetFunction);
|
||||
case DeleteTable -> new DDBDeleteTableOpDispenser(ddb, op, cmdType.targetFunction);
|
||||
case PutItem -> new DDBPutItemOpDispenser(ddb, op, cmdType.targetFunction);
|
||||
case GetItem -> new DDBGetItemOpDispenser(ddb, op, cmdType.targetFunction);
|
||||
case Query -> new DDBQueryOpDispenser(ddb, op, cmdType.targetFunction);
|
||||
case CreateTable -> new DDBCreateTableOpDispenser(adapter, ddb, op, cmdType.targetFunction);
|
||||
case DeleteTable -> new DDBDeleteTableOpDispenser(adapter, ddb, op, cmdType.targetFunction);
|
||||
case PutItem -> new DDBPutItemOpDispenser(adapter, ddb, op, cmdType.targetFunction);
|
||||
case GetItem -> new DDBGetItemOpDispenser(adapter, ddb, op, cmdType.targetFunction);
|
||||
case Query -> new DDBQueryOpDispenser(adapter,ddb, op, cmdType.targetFunction);
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import com.amazonaws.services.dynamodbv2.model.*;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DDBCreateTableOp;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -114,8 +115,8 @@ public class DDBCreateTableOpDispenser extends BaseOpDispenser<DynamoDBOp> {
|
||||
private final LongFunction<String> writeCapacityFunc;
|
||||
private final LongFunction<String> billingModeFunc;
|
||||
|
||||
public DDBCreateTableOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(cmd);
|
||||
public DDBCreateTableOpDispenser(DriverAdapter adapter, DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(adapter,cmd);
|
||||
this.ddb = ddb;
|
||||
this.tableNameFunc = l -> targetFunc.apply(l).toString();
|
||||
this.keySchemaFunc = resolveKeySchemaFunction(cmd);
|
||||
|
@ -21,6 +21,7 @@ import com.amazonaws.services.dynamodbv2.model.DeleteTableRequest;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DDBDeleteTableOp;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -38,8 +39,8 @@ public class DDBDeleteTableOpDispenser extends BaseOpDispenser<DynamoDBOp> {
|
||||
private final DynamoDB ddb;
|
||||
private final LongFunction<String> tableNameFunc;
|
||||
|
||||
public DDBDeleteTableOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(cmd);
|
||||
public DDBDeleteTableOpDispenser(DriverAdapter adapter, DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(adapter, cmd);
|
||||
this.ddb = ddb;
|
||||
this.tableNameFunc = l -> targetFunc.apply(l).toString();
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DDBGetItemOp;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.Map;
|
||||
@ -34,8 +35,8 @@ public class DDBGetItemOpDispenser extends BaseOpDispenser<DynamoDBOp> {
|
||||
private final LongFunction<Table> targetTableFunction;
|
||||
private final LongFunction<GetItemSpec> getItemSpecFunc;
|
||||
|
||||
public DDBGetItemOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunction) {
|
||||
super(cmd);
|
||||
public DDBGetItemOpDispenser(DriverAdapter adapter, DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunction) {
|
||||
super(adapter,cmd);
|
||||
this.ddb = ddb;
|
||||
this.targetTableFunction = l -> ddb.getTable(targetFunction.apply(l).toString());
|
||||
this.getItemSpecFunc = resolveGetItemSpecFunction(cmd);
|
||||
|
@ -21,6 +21,7 @@ import com.amazonaws.services.dynamodbv2.document.Item;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DDBPutItemOp;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
|
||||
@ -33,8 +34,8 @@ public class DDBPutItemOpDispenser extends BaseOpDispenser<DynamoDBOp> {
|
||||
private final LongFunction<String> tableNameFunc;
|
||||
private final LongFunction<? extends Item> itemfunc;
|
||||
|
||||
public DDBPutItemOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(cmd);
|
||||
public DDBPutItemOpDispenser(DriverAdapter adapter, DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(adapter, cmd);
|
||||
this.ddb = ddb;
|
||||
this.tableNameFunc = l -> targetFunc.apply(l).toString();
|
||||
if (cmd.isDefined("item")) {
|
||||
|
@ -23,6 +23,7 @@ import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DDBQueryOp;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.Map;
|
||||
@ -138,8 +139,8 @@ public class DDBQueryOpDispenser extends BaseOpDispenser<DynamoDBOp> {
|
||||
private final LongFunction<Table> tableFunc;
|
||||
private final LongFunction<QuerySpec> querySpecFunc;
|
||||
|
||||
public DDBQueryOpDispenser(DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(cmd);
|
||||
public DDBQueryOpDispenser(DriverAdapter adapter, DynamoDB ddb, ParsedOp cmd, LongFunction<?> targetFunc) {
|
||||
super(adapter,cmd);
|
||||
this.ddb = ddb;
|
||||
LongFunction<String> tableNameFunc = l -> targetFunc.apply(l).toString();
|
||||
this.tableFunc = l -> ddb.getTable(tableNameFunc.apply(l));
|
||||
|
@ -20,6 +20,7 @@ import com.amazonaws.services.dynamodbv2.document.DynamoDB;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.DynamoDBOp;
|
||||
import io.nosqlbench.adapter.dynamodb.optypes.RawDynamodOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -29,8 +30,8 @@ public class RawDynamoDBOpDispenser extends BaseOpDispenser<DynamoDBOp> {
|
||||
private final LongFunction<? extends String> jsonFunction;
|
||||
private final DynamoDB ddb;
|
||||
|
||||
public RawDynamoDBOpDispenser(DynamoDB ddb, ParsedOp pop) {
|
||||
super(pop);
|
||||
public RawDynamoDBOpDispenser(DriverAdapter adapter, DynamoDB ddb, ParsedOp pop) {
|
||||
super(adapter,pop);
|
||||
this.ddb = ddb;
|
||||
|
||||
String bodytype = pop.getValueType("body").getSimpleName();
|
||||
|
@ -41,7 +41,7 @@ public class HttpDriverAdapter extends BaseDriverAdapter<HttpOp, HttpSpace> {
|
||||
public OpMapper<HttpOp> getOpMapper() {
|
||||
DriverSpaceCache<? extends HttpSpace> spaceCache = getSpaceCache();
|
||||
NBConfiguration config = getConfiguration();
|
||||
return new HttpOpMapper(config, spaceCache);
|
||||
return new HttpOpMapper(this, config, spaceCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -17,6 +17,7 @@
|
||||
package io.nosqlbench.adapter.http.core;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.net.URI;
|
||||
@ -35,8 +36,8 @@ public class HttpOpDispenser extends BaseOpDispenser<HttpOp> {
|
||||
public static final String DEFAULT_OK_STATUS = "2..";
|
||||
|
||||
|
||||
public HttpOpDispenser(LongFunction<HttpSpace> ctxF, ParsedOp op) {
|
||||
super(op);
|
||||
public HttpOpDispenser(DriverAdapter adapter, LongFunction<HttpSpace> ctxF, ParsedOp op) {
|
||||
super(adapter, op);
|
||||
opFunc = getOpFunc(ctxF, op);
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ package io.nosqlbench.adapter.http.core;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
@ -28,16 +29,18 @@ public class HttpOpMapper implements OpMapper<HttpOp> {
|
||||
|
||||
private final NBConfiguration cfg;
|
||||
private final DriverSpaceCache<? extends HttpSpace> spaceCache;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public HttpOpMapper(NBConfiguration cfg, DriverSpaceCache<? extends HttpSpace> spaceCache) {
|
||||
public HttpOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends HttpSpace> spaceCache) {
|
||||
this.cfg = cfg;
|
||||
this.spaceCache = spaceCache;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<? extends HttpOp> apply(ParsedOp op) {
|
||||
LongFunction<String> spaceNameF = op.getAsFunctionOr("space", "default");
|
||||
LongFunction<HttpSpace> spaceFunc = l -> spaceCache.get(spaceNameF.apply(l));
|
||||
return new HttpOpDispenser(spaceFunc, op);
|
||||
return new HttpOpDispenser(adapter, spaceFunc, op);
|
||||
}
|
||||
}
|
||||
|
@ -44,7 +44,7 @@ public class HttpOpMapperTest {
|
||||
adapter = new HttpDriverAdapter();
|
||||
adapter.applyConfig(cfg);
|
||||
DriverSpaceCache<? extends HttpSpace> cache = adapter.getSpaceCache();
|
||||
mapper = new HttpOpMapper(cfg, cache);
|
||||
mapper = new HttpOpMapper(adapter,cfg, cache);
|
||||
}
|
||||
|
||||
private static ParsedOp parsedOpFor(String yaml) {
|
||||
|
@ -19,6 +19,7 @@ package io.nosqlbench.adapter.mongodb.core;
|
||||
import com.mongodb.ReadPreference;
|
||||
import io.nosqlbench.adapter.mongodb.ops.MongoOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import org.bson.Document;
|
||||
@ -31,8 +32,8 @@ public class MongoOpDispenser extends BaseOpDispenser<Op> {
|
||||
private final LongFunction<MongoOp> opFunc;
|
||||
private final LongFunction<MongoOp> mongoOpF;
|
||||
|
||||
public MongoOpDispenser(LongFunction<MongoSpace> ctxFunc, ParsedOp op) {
|
||||
super(op);
|
||||
public MongoOpDispenser(DriverAdapter adapter, LongFunction<MongoSpace> ctxFunc, ParsedOp op) {
|
||||
super(adapter,op);
|
||||
opFunc = createOpFunc(ctxFunc, op);
|
||||
this.mongoOpF = createOpFunc(ctxFunc,op);
|
||||
}
|
||||
|
@ -34,7 +34,7 @@ public class MongodbDriverAdapter extends BaseDriverAdapter<Op, MongoSpace> {
|
||||
|
||||
@Override
|
||||
public OpMapper<Op> getOpMapper() {
|
||||
return new MongodbOpMapper(getSpaceCache());
|
||||
return new MongodbOpMapper(this, getSpaceCache());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -18,6 +18,7 @@ package io.nosqlbench.adapter.mongodb.core;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
@ -27,15 +28,17 @@ import java.util.function.LongFunction;
|
||||
public class MongodbOpMapper implements OpMapper<Op> {
|
||||
|
||||
private final DriverSpaceCache<? extends MongoSpace> ctxcache;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public MongodbOpMapper(DriverSpaceCache<? extends MongoSpace> ctxcache) {
|
||||
public MongodbOpMapper(DriverAdapter adapter, DriverSpaceCache<? extends MongoSpace> ctxcache) {
|
||||
this.ctxcache = ctxcache;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<? extends Op> apply(ParsedOp op) {
|
||||
LongFunction<String> ctxNamer = op.getAsFunctionOr("space","default");
|
||||
LongFunction<MongoSpace> ctxFunc = l -> ctxcache.get(ctxNamer.apply(l));
|
||||
return new MongoOpDispenser(ctxFunc, op);
|
||||
return new MongoOpDispenser(adapter,ctxFunc, op);
|
||||
}
|
||||
}
|
||||
|
@ -42,7 +42,7 @@ public class StdoutDriverAdapter extends BaseDriverAdapter<StdoutOp, StdoutSpace
|
||||
@Override
|
||||
public OpMapper<StdoutOp> getOpMapper() {
|
||||
DriverSpaceCache<? extends StdoutSpace> ctxCache = getSpaceCache();
|
||||
return new StdoutOpMapper(ctxCache);
|
||||
return new StdoutOpMapper(this,ctxCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -17,6 +17,7 @@
|
||||
package io.nosqlbench.adapter.stdout;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
@ -26,8 +27,8 @@ public class StdoutOpDispenser extends BaseOpDispenser<StdoutOp> {
|
||||
private final LongFunction<StdoutSpace> ctxfunc;
|
||||
private final LongFunction<String> outFunction;
|
||||
|
||||
public StdoutOpDispenser(ParsedOp cmd, LongFunction<StdoutSpace> ctxfunc) {
|
||||
super(cmd);
|
||||
public StdoutOpDispenser(DriverAdapter adapter, ParsedOp cmd, LongFunction<StdoutSpace> ctxfunc) {
|
||||
super(adapter,cmd);
|
||||
this.ctxfunc = ctxfunc;
|
||||
LongFunction<Object> objectFunction = cmd.getAsRequiredFunction("stmt", Object.class);
|
||||
LongFunction<String> stringfunc = l -> objectFunction.apply(l).toString();
|
||||
|
@ -18,6 +18,7 @@ package io.nosqlbench.adapter.stdout;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
@ -26,16 +27,18 @@ import java.util.function.LongFunction;
|
||||
public class StdoutOpMapper implements OpMapper<StdoutOp> {
|
||||
|
||||
private final DriverSpaceCache<? extends StdoutSpace> ctxcache;
|
||||
private final DriverAdapter adapter;
|
||||
|
||||
public StdoutOpMapper(DriverSpaceCache<? extends StdoutSpace> ctxcache) {
|
||||
public StdoutOpMapper(DriverAdapter adapter, DriverSpaceCache<? extends StdoutSpace> ctxcache) {
|
||||
this.ctxcache = ctxcache;
|
||||
this.adapter = adapter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpDispenser<StdoutOp> apply(ParsedOp op) {
|
||||
LongFunction<String> spacefunc = op.getAsFunctionOr("space", "default");
|
||||
LongFunction<StdoutSpace> ctxfunc = (cycle) -> ctxcache.get(spacefunc.apply(cycle));
|
||||
return new StdoutOpDispenser(op,ctxfunc);
|
||||
return new StdoutOpDispenser(adapter,op,ctxfunc);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.engine.api.metrics.ThreadLocalNamedTimers;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public abstract class BaseOpDispenser<T> implements OpDispenser<T> {
|
||||
|
||||
private final String name;
|
||||
private final DriverAdapter adapter;
|
||||
private boolean instrument;
|
||||
private Histogram resultSizeHistogram;
|
||||
private Timer successTimer;
|
||||
@ -44,15 +45,9 @@ public abstract class BaseOpDispenser<T> implements OpDispenser<T> {
|
||||
private String[] timerStarts = new String[0];
|
||||
private String[] timerStops = new String[0];
|
||||
|
||||
|
||||
// TODO: Consider changing this to "ready op template" or similar
|
||||
@Deprecated
|
||||
public BaseOpDispenser(OpTemplate optpl) {
|
||||
this.name = optpl.getName();
|
||||
}
|
||||
|
||||
public BaseOpDispenser(ParsedOp op) {
|
||||
public BaseOpDispenser(DriverAdapter adapter,ParsedOp op) {
|
||||
this.name = op.getName();
|
||||
this.adapter = adapter;
|
||||
timerStarts = op.takeOptionalStaticValue("start-timers", String.class)
|
||||
.map(s -> s.split(", *"))
|
||||
.orElse(null);
|
||||
@ -69,6 +64,10 @@ public abstract class BaseOpDispenser<T> implements OpDispenser<T> {
|
||||
configureInstrumentation(op);
|
||||
}
|
||||
|
||||
public DriverAdapter getAdapter() {
|
||||
return adapter;
|
||||
}
|
||||
|
||||
// public BaseOpDispenser(CommandTemplate cmdtpl) {
|
||||
// this.name = cmdtpl.getName();
|
||||
// }
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
@ -25,8 +26,8 @@ public class StandardOpDispenser<T extends Op> extends BaseOpDispenser<T>{
|
||||
|
||||
private final LongFunction<T> opfunc;
|
||||
|
||||
public StandardOpDispenser(ParsedOp op, LongFunction<T> opfunc) {
|
||||
super(op);
|
||||
public StandardOpDispenser(DriverAdapter adapter, ParsedOp op, LongFunction<T> opfunc) {
|
||||
super(adapter, op);
|
||||
this.opfunc = opfunc;
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSContext;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
@ -114,7 +115,7 @@ public class JmsActivity extends SimpleActivity {
|
||||
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize", this.getHdrDigits());
|
||||
|
||||
if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
|
||||
this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this), false);
|
||||
this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this), false, Optional.empty());
|
||||
}
|
||||
|
||||
setDefaultsFromOpSequence(sequence);
|
||||
|
@ -28,6 +28,7 @@ import io.nosqlbench.nb.api.config.standard.NBReconfigurable;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -35,40 +36,45 @@ import java.util.Optional;
|
||||
public class StandardActivityType<A extends StandardActivity<?,?>> extends SimpleActivity implements ActivityType<A> {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger("ACTIVITY");
|
||||
|
||||
private final DriverAdapter<?,?> adapter;
|
||||
private final Map<String,DriverAdapter> adapters = new HashMap<>();
|
||||
|
||||
public StandardActivityType(DriverAdapter<?,?> adapter, ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
this.adapter = adapter;
|
||||
this.adapters.put(adapter.getAdapterName(),adapter);
|
||||
if (adapter instanceof ActivityDefAware) {
|
||||
((ActivityDefAware) adapter).setActivityDef(activityDef);
|
||||
}
|
||||
}
|
||||
|
||||
public StandardActivityType(ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
}
|
||||
|
||||
@Override
|
||||
public A getActivity(ActivityDef activityDef) {
|
||||
if (activityDef.getParams().getOptionalString("async").isPresent()) {
|
||||
throw new RuntimeException("This driver does not support async mode yet.");
|
||||
}
|
||||
|
||||
return (A) new StandardActivity(adapter,activityDef);
|
||||
return (A) new StandardActivity(activityDef);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
super.onActivityDefUpdate(activityDef);
|
||||
|
||||
if (adapter instanceof NBReconfigurable reconfigurable) {
|
||||
NBConfigModel cfgModel = reconfigurable.getReconfigModel();
|
||||
Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
|
||||
if (op_yaml_loc.isPresent()) {
|
||||
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
|
||||
StmtsDocList workload = StatementsLoader.loadPath(logger, op_yaml_loc.get(), disposable, "activities");
|
||||
cfgModel=cfgModel.add(workload.getConfigModel());
|
||||
for (DriverAdapter adapter : adapters.values()) {
|
||||
if (adapter instanceof NBReconfigurable reconfigurable) {
|
||||
NBConfigModel cfgModel = reconfigurable.getReconfigModel();
|
||||
Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
|
||||
if (op_yaml_loc.isPresent()) {
|
||||
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
|
||||
StmtsDocList workload = StatementsLoader.loadPath(logger, op_yaml_loc.get(), disposable, "activities");
|
||||
cfgModel=cfgModel.add(workload.getConfigModel());
|
||||
}
|
||||
NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
|
||||
reconfigurable.applyReconfig(cfg);
|
||||
}
|
||||
NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
|
||||
reconfigurable.applyReconfig(cfg);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user