mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
support gremlin in cqld4 driver
This commit is contained in:
@@ -3,11 +3,12 @@ package io.nosqlbench.adapter.cqld4;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
|
||||
public class Cqld4ReboundStatement extends Cqld4Op {
|
||||
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
|
||||
private final BoundStatement stmt;
|
||||
|
||||
public Cqld4ReboundStatement(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, BoundStatement rebound, RSProcessors processors) {
|
||||
public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, BoundStatement rebound, RSProcessors processors) {
|
||||
super(session,maxpages,retryreplace,metrics,processors);
|
||||
this.stmt = rebound;
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
|
||||
import io.nosqlbench.adapter.cqld4.opmappers.Cqld4OpMapper;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
@@ -11,10 +13,10 @@ import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import java.util.function.Function;
|
||||
|
||||
@Service(value = DriverAdapter.class, selector = "cqld4")
|
||||
public class Cqld4DriverAdapter extends BaseDriverAdapter<Cqld4Op, Cqld4Space> {
|
||||
public class Cqld4DriverAdapter extends BaseDriverAdapter<Cqld4BaseOp, Cqld4Space> {
|
||||
|
||||
@Override
|
||||
public OpMapper<Cqld4Op> getOpMapper() {
|
||||
public OpMapper<Cqld4BaseOp> getOpMapper() {
|
||||
DriverSpaceCache<? extends Cqld4Space> spaceCache = getSpaceCache();
|
||||
NBConfiguration config = getConfiguration();
|
||||
return new Cqld4OpMapper(config, spaceCache);
|
||||
@@ -27,7 +29,7 @@ public class Cqld4DriverAdapter extends BaseDriverAdapter<Cqld4Op, Cqld4Space> {
|
||||
|
||||
@Override
|
||||
public NBConfigModel getConfigModel() {
|
||||
return Cqld4Space.getConfigModel();
|
||||
return super.getConfigModel().add(Cqld4Space.getConfigModel());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
package io.nosqlbench.adapter.cqld4.exceptions;
|
||||
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
|
||||
public class UndefinedResultSetException extends RuntimeException {
|
||||
private final Cqld4Op cqld4op;
|
||||
private final Cqld4CqlOp cqld4op;
|
||||
|
||||
public UndefinedResultSetException(Cqld4Op cqld4Op) {
|
||||
public UndefinedResultSetException(Cqld4CqlOp cqld4Op) {
|
||||
this.cqld4op = cqld4Op;
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
|
||||
public class CqlD4PreparedBatchOpDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
public class CqlD4PreparedBatchOpDispenser extends BaseOpDispenser<Cqld4CqlOp> {
|
||||
|
||||
private final CqlSession session;
|
||||
private final ParsedOp cmd;
|
||||
@@ -18,7 +18,7 @@ public class CqlD4PreparedBatchOpDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4Op apply(long value) {
|
||||
public Cqld4CqlOp apply(long value) {
|
||||
throw new BasicError("this is not implemented yet.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
public class Cqld4BatchStatementDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
public class Cqld4BatchStatementDispenser extends BaseOpDispenser<Cqld4CqlOp> {
|
||||
private final CqlSession session;
|
||||
private final ParsedOp cmd;
|
||||
|
||||
@@ -16,7 +16,7 @@ public class Cqld4BatchStatementDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4Op apply(long value) {
|
||||
public Cqld4CqlOp apply(long value) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,17 +3,17 @@ package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4PreparedStatement;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class Cqld4PreparedStmtDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
public class Cqld4PreparedStmtDispenser extends BaseOpDispenser<Cqld4CqlOp> {
|
||||
|
||||
private final CqlSession session;
|
||||
|
||||
@@ -41,9 +41,9 @@ public class Cqld4PreparedStmtDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4Op apply(long value) {
|
||||
public Cqld4CqlOp apply(long value) {
|
||||
Object[] parameters = varbinder.apply(value);
|
||||
BoundStatement stmt = preparedStmt.bind(parameters);
|
||||
return new Cqld4PreparedStatement(session, stmt, maxpages, retryreplace, metrics, processors);
|
||||
return new Cqld4CqlPreparedStatement(session, stmt, maxpages, retryreplace, metrics, processors);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,13 +2,13 @@ package io.nosqlbench.adapter.cqld4.opdispensers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4SimpleCqlStatement;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleCqlStatement;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
public class Cqld4SimpleCqlStmtDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
public class Cqld4SimpleCqlStmtDispenser extends BaseOpDispenser<Cqld4CqlOp> {
|
||||
|
||||
private final CqlSession session;
|
||||
private final ParsedOp cmd;
|
||||
@@ -26,9 +26,9 @@ public class Cqld4SimpleCqlStmtDispenser extends BaseOpDispenser<Cqld4Op> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4SimpleCqlStatement apply(long value) {
|
||||
public Cqld4CqlSimpleCqlStatement apply(long value) {
|
||||
String stmtBody = cmd.get("stmt",value);
|
||||
SimpleStatement simpleStatement = SimpleStatement.newInstance(stmtBody);
|
||||
return new Cqld4SimpleCqlStatement(session,simpleStatement,maxpages,retryreplace,metrics);
|
||||
return new Cqld4CqlSimpleCqlStatement(session,simpleStatement,maxpages,retryreplace,metrics);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,17 +1,19 @@
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
package io.nosqlbench.adapter.cqld4.opmappers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Processors;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.CqlD4PreparedBatchOpDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4BatchStatementDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedStmtDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4SimpleCqlStmtDispenser;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
|
||||
import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.config.params.ParamsParser;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.nb.api.errors.BasicError;
|
||||
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
|
||||
|
||||
@@ -19,19 +21,14 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
|
||||
public class CqlD4CqlOpMapper implements OpMapper<Cqld4CqlOp> {
|
||||
private final CqlSession session;
|
||||
|
||||
|
||||
private final DriverSpaceCache<? extends Cqld4Space> cache;
|
||||
private final NBConfiguration cfg;
|
||||
|
||||
public Cqld4OpMapper(NBConfiguration config, DriverSpaceCache<? extends Cqld4Space> cache) {
|
||||
this.cfg = config;
|
||||
this.cache = cache;
|
||||
public CqlD4CqlOpMapper(CqlSession session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public OpDispenser<Cqld4Op> apply(ParsedOp cmd) {
|
||||
|
||||
public OpDispenser<Cqld4CqlOp> apply(ParsedOp cmd) {
|
||||
ParsedTemplate stmtTpl = cmd.getStmtAsTemplate().orElseThrow(() -> new BasicError(
|
||||
"No statement was found in the op template:" + cmd
|
||||
));
|
||||
@@ -41,11 +38,6 @@ public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
|
||||
processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures()));
|
||||
}
|
||||
|
||||
// cmd.getOptionalStaticConfig("processor",String.class)
|
||||
// .map(s -> ParamsParser.parseToMap(s,"type"))
|
||||
// .map(Cqld4Processors::resolve)
|
||||
// .ifPresent(processors::add);
|
||||
//
|
||||
Optional<List> processorList = cmd.getOptionalStaticConfig("processors", List.class);
|
||||
processorList.ifPresent(l -> {
|
||||
l.forEach(m -> {
|
||||
@@ -54,18 +46,10 @@ public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
|
||||
processors.add(() -> processor);
|
||||
});
|
||||
});
|
||||
//
|
||||
// processorList.stream()
|
||||
// .map(s -> ParamsParser.parseToMap(s,"type"))
|
||||
// .map(Cqld4Processors::resolve)
|
||||
// .forEach(processors::add);
|
||||
|
||||
Cqld4Space cqld4Space = cache.get(cmd.getStaticConfigOr("space", "default"));
|
||||
|
||||
boolean prepared = cmd.getStaticConfigOr("prepared", true);
|
||||
boolean batch = cmd.getStaticConfigOr("boolean", false);
|
||||
CqlSession session = cqld4Space.getSession();
|
||||
|
||||
|
||||
|
||||
if (prepared && batch) {
|
||||
return new CqlD4PreparedBatchOpDispenser(session, cmd);
|
||||
@@ -76,6 +60,6 @@ public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
|
||||
} else {
|
||||
return new Cqld4SimpleCqlStmtDispenser(session, cmd);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package io.nosqlbench.adapter.cqld4.opmappers;
|
||||
|
||||
public enum CqlD4OpType {
|
||||
cql,
|
||||
gremlin
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package io.nosqlbench.adapter.cqld4.opmappers;
|
||||
|
||||
import com.datastax.dse.driver.api.core.graph.GraphResultSet;
|
||||
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
|
||||
public class Cqld4GremlinOp extends Cqld4BaseOp implements CycleOp<GraphResultSet> {
|
||||
private final CqlSession session;
|
||||
private final ScriptGraphStatement stmt;
|
||||
private int resultSize=0;
|
||||
|
||||
public Cqld4GremlinOp(CqlSession session, ScriptGraphStatement stmt) {
|
||||
this.session = session;
|
||||
this.stmt = stmt;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphResultSet apply(long value) {
|
||||
GraphResultSet result = session.execute(stmt);
|
||||
this.resultSize = result.all().size();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getResultSize() {
|
||||
return resultSize;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package io.nosqlbench.adapter.cqld4.opmappers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
public class Cqld4GremlinOpMapper implements OpMapper<Cqld4GremlinOp> {
|
||||
private final CqlSession session;
|
||||
|
||||
public Cqld4GremlinOpMapper(CqlSession session) {
|
||||
this.session = session;
|
||||
}
|
||||
|
||||
public OpDispenser<Cqld4GremlinOp> apply(ParsedOp cmd) {
|
||||
return new GremlinOpDispenser(session, cmd);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package io.nosqlbench.adapter.cqld4.opmappers;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Space;
|
||||
import io.nosqlbench.adapter.cqld4.optypes.Cqld4BaseOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
|
||||
|
||||
public class Cqld4OpMapper implements OpMapper<Cqld4BaseOp> {
|
||||
|
||||
|
||||
private final DriverSpaceCache<? extends Cqld4Space> cache;
|
||||
private final NBConfiguration cfg;
|
||||
|
||||
public Cqld4OpMapper(NBConfiguration config, DriverSpaceCache<? extends Cqld4Space> cache) {
|
||||
this.cfg = config;
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine what type of op dispenser to use for a given parsed op template, and return a new instance
|
||||
* for it. Since the operations under the CQL driver 4.* do not follow a common type structure, we use the
|
||||
* base types in the NoSQLBench APIs and treat them somewhat more generically than with other drivers.
|
||||
*
|
||||
* @param cmd The {@link ParsedOp} which is the parsed version of the user-provided op template.
|
||||
* This contains all the fields provided by the user, as well as explicit knowledge of
|
||||
* which ones are static and dynamic.
|
||||
* @return An op dispenser for each provided op command
|
||||
*/
|
||||
public OpDispenser<? extends Cqld4BaseOp> apply(ParsedOp cmd) {
|
||||
|
||||
Cqld4Space cqld4Space = cache.get(cmd.getStaticConfigOr("space", "default"));
|
||||
CqlSession session = cqld4Space.getSession();
|
||||
|
||||
CqlD4OpType cmdtype = cmd.getEnumFromFieldOr(CqlD4OpType.class, CqlD4OpType.cql, "type");
|
||||
|
||||
// OpDispenser<Cqld4CqlOp> t = new CqlD4CqlOpMapper(session).apply(cmd);
|
||||
|
||||
return switch (cmdtype) {
|
||||
case cql -> new CqlD4CqlOpMapper(session).apply(cmd);
|
||||
case gremlin -> new Cqld4GremlinOpMapper(session).apply(cmd);
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package io.nosqlbench.adapter.cqld4.opmappers;
|
||||
|
||||
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
|
||||
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatementBuilder;
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class GremlinOpDispenser extends BaseOpDispenser<Cqld4GremlinOp> {
|
||||
|
||||
private final LongFunction<? extends ScriptGraphStatement> stmtFunc;
|
||||
private final CqlSession session;
|
||||
private final LongFunction<Long> diagFunc;
|
||||
|
||||
public GremlinOpDispenser(CqlSession session, ParsedOp cmd) {
|
||||
super(cmd);
|
||||
this.session = session;
|
||||
this.diagFunc = cmd.getAsFunctionOr("diag", 0L);
|
||||
|
||||
LongFunction<ScriptGraphStatementBuilder> func = l -> new ScriptGraphStatementBuilder();
|
||||
|
||||
// graphname
|
||||
Optional<LongFunction<String>> graphnameFunc = cmd.getAsOptionalFunction("graphname");
|
||||
if (graphnameFunc.isPresent()) {
|
||||
LongFunction<ScriptGraphStatementBuilder> finalFunc = func;
|
||||
LongFunction<String> stringLongFunction = graphnameFunc.get();
|
||||
func = l -> finalFunc.apply(l).setGraphName(stringLongFunction.apply(l));
|
||||
}
|
||||
|
||||
// script
|
||||
Optional<LongFunction<String>> scriptFunc = cmd.getAsOptionalFunction("script");
|
||||
if (scriptFunc.isPresent()) {
|
||||
LongFunction<ScriptGraphStatementBuilder> finalFunc = func;
|
||||
func = l -> finalFunc.apply(l).setScript(scriptFunc.get().apply(l));
|
||||
}
|
||||
|
||||
LongFunction<ScriptGraphStatementBuilder> finalFunc = func;
|
||||
this.stmtFunc = l -> finalFunc.apply(l).build();
|
||||
// LongFunction<SimpleGraphStatement> gsFunc = l -> finalFunc.apply(l).build();
|
||||
//
|
||||
// this.stmtFunc = gsFunc;
|
||||
// // graph-internal-options
|
||||
// Optional<LongFunction<Map>> internalOptionsFunc = cmd.getAsOptionalFunction("graph-internal-options", Map.class);
|
||||
// if (internalOptionsFunc.isPresent()) {
|
||||
// LongFunction<ScriptGraphStatementBuilder> finalFunc = func;
|
||||
// LongFunction<Map> optionsFunc = internalOptionsFunc.get();
|
||||
// func = l -> {
|
||||
// GraphStatement gs = gsFunc.apply(l);
|
||||
// Map options = optionsFunc.apply(l);
|
||||
// options.forEach((k,v) -> gs.s);
|
||||
//
|
||||
// }
|
||||
// }
|
||||
/**
|
||||
* gs.setGraphInternalOption("cfg.external_vertex_verify",String.valueOf(verifyVertexIds));
|
||||
* gs.setGraphInternalOption("cfg.verify_unique",String.valueOf(java.lang.Boolean.FALSE));
|
||||
*
|
||||
*/
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4GremlinOp apply(long value) {
|
||||
ScriptGraphStatement stmt = stmtFunc.apply(value);
|
||||
if (diagFunc.apply(value)>0L) {
|
||||
System.out.println("## GREMLIN DIAG: ScriptGraphStatement on graphname(" + stmt.getGraphName() + "):\n" + stmt.getScript());
|
||||
}
|
||||
return new Cqld4GremlinOp(session, stmt);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,6 @@
|
||||
package io.nosqlbench.adapter.cqld4.optypes;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
|
||||
public abstract class Cqld4BaseOp implements Op {
|
||||
}
|
||||
@@ -2,14 +2,13 @@ package io.nosqlbench.adapter.cqld4.optypes;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BatchStatement;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
|
||||
|
||||
public class Cqld4BatchStatement extends Cqld4Op {
|
||||
public class Cqld4CqlBatchStatement extends Cqld4CqlOp {
|
||||
|
||||
private final BatchStatement stmt;
|
||||
|
||||
public Cqld4BatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
|
||||
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
|
||||
super(session,maxpages,retryreplace,metrics);
|
||||
this.stmt = stmt;
|
||||
}
|
||||
@@ -1,11 +1,13 @@
|
||||
package io.nosqlbench.adapter.cqld4;
|
||||
package io.nosqlbench.adapter.cqld4.optypes;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import com.datastax.oss.driver.api.core.cql.ResultSet;
|
||||
import com.datastax.oss.driver.api.core.cql.Row;
|
||||
import com.datastax.oss.driver.api.core.cql.Statement;
|
||||
import io.nosqlbench.adapter.cqld4.*;
|
||||
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
|
||||
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
|
||||
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
@@ -30,8 +32,7 @@ import java.util.Map;
|
||||
// TODO: add rows histogram resultSetSizeHisto
|
||||
|
||||
|
||||
|
||||
public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, OpGenerator {
|
||||
public abstract class Cqld4CqlOp extends Cqld4BaseOp implements CycleOp<ResultSet>, VariableCapture, OpGenerator {
|
||||
|
||||
private final CqlSession session;
|
||||
private final int maxpages;
|
||||
@@ -39,10 +40,10 @@ public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, Op
|
||||
private final Cqld4OpMetrics metrics;
|
||||
|
||||
private ResultSet rs;
|
||||
private Cqld4Op nextOp;
|
||||
private Cqld4CqlOp nextOp;
|
||||
private final RSProcessors processors;
|
||||
|
||||
public Cqld4Op(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
|
||||
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
|
||||
this.session = session;
|
||||
this.maxpages = maxpages;
|
||||
this.retryreplace = retryreplace;
|
||||
@@ -50,7 +51,7 @@ public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, Op
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
||||
public Cqld4Op(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
|
||||
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
|
||||
this.session = session;
|
||||
this.maxpages = maxpages;
|
||||
this.retryreplace = retryreplace;
|
||||
@@ -122,9 +123,9 @@ public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, Op
|
||||
|
||||
public abstract String getQueryString();
|
||||
|
||||
private Cqld4Op rebindLwt(Statement<?> stmt, Row row) {
|
||||
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
|
||||
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
|
||||
return new Cqld4ReboundStatement(session,maxpages,retryreplace,metrics,rebound,processors);
|
||||
return new Cqld4CqlReboundStatement(session,maxpages,retryreplace,metrics,rebound,processors);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -2,15 +2,14 @@ package io.nosqlbench.adapter.cqld4.optypes;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
|
||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||
|
||||
public class Cqld4PreparedStatement extends Cqld4Op {
|
||||
public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
|
||||
|
||||
private final BoundStatement stmt;
|
||||
|
||||
public Cqld4PreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
|
||||
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
|
||||
super(session,maxpages,retryreplace,metrics,processors);
|
||||
this.stmt = stmt;
|
||||
}
|
||||
@@ -2,16 +2,12 @@ package io.nosqlbench.adapter.cqld4.optypes;
|
||||
|
||||
import com.datastax.oss.driver.api.core.CqlSession;
|
||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4Op;
|
||||
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
|
||||
import io.nosqlbench.virtdata.core.templates.CapturePoint;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class Cqld4SimpleCqlStatement extends Cqld4Op {
|
||||
public class Cqld4CqlSimpleCqlStatement extends Cqld4CqlOp {
|
||||
private final SimpleStatement stmt;
|
||||
|
||||
public Cqld4SimpleCqlStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
|
||||
public Cqld4CqlSimpleCqlStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
|
||||
super(session, maxpages,retryreplace,metrics);
|
||||
this.stmt = stmt;
|
||||
}
|
||||
Reference in New Issue
Block a user