cqld4 improvements

This commit is contained in:
Jonathan Shook 2021-08-10 10:34:11 -05:00
parent 14f43d7835
commit c389c8571b
22 changed files with 678 additions and 34 deletions

View File

@ -1,4 +1,137 @@
package io.nosqlbench.adapter.cqld4;
public abstract class Cqld4Op implements Runnable {
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.OpGenerator;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.VariableCapture;
import java.util.Iterator;
import java.util.Map;
// TODO: add statement filtering
// TODO: add statement pre and post processing for trace capture and start timer op
// TODO: add trace capture
// TODO: add start timer op
// TODO: add stop timer op
// TODO: add showcql equivalent
// TODO: add/document max tries exhausted exception
// TODO: add/document UnexpectedPagingException
// TODO: add/document chnge unapplied exception
// TODO: add instrumented metrics
// TODO: add total rows metrics
// TODO: add rows histogram resultSetSizeHisto
public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, OpGenerator {
private final CqlSession session;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
private ResultSet rs;
private Cqld4Op nextOp;
private final RSProcessors processors;
public Cqld4Op(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
this.session = session;
this.maxpages = maxpages;
this.retryreplace = retryreplace;
this.processors = new RSProcessors();
this.metrics = metrics;
}
public Cqld4Op(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
this.session = session;
this.maxpages = maxpages;
this.retryreplace = retryreplace;
this.processors = processors;
this.metrics = metrics;
}
@Override
public void accept(long cycle) {
}
public final ResultSet apply(long cycle) {
metrics.onStart();
Statement<?> stmt = getStmt();
rs = session.execute(stmt);
processors.start(cycle, rs);
int totalRows=0;
if (!rs.wasApplied()) {
if (!retryreplace) {
throw new ChangeUnappliedCycleException(rs, getQueryString());
} else {
Row one = rs.one();
processors.buffer(one);
totalRows++;
nextOp = this.rebindLwt(stmt, one);
}
}
// Paginated Op
Iterator<Row> reader = rs.iterator();
int pages = 0;
while (true) {
int pageRows = rs.getAvailableWithoutFetching();
for (int i = 0; i < pageRows; i++) {
Row row = reader.next();
processors.buffer(row);
}
if (pages++ > maxpages) {
throw new UnexpectedPagingException(rs, getQueryString(), pages, maxpages, stmt.getPageSize());
}
if (rs.isFullyFetched()) {
break;
}
totalRows+=pageRows;
}
processors.flush();
metrics.onSuccess();
return rs;
}
@Override
public Op getNextOp() {
Op next = nextOp;
nextOp = null;
return next;
}
@Override
public Map<String, ?> capture() {
if (rs == null) {
throw new UndefinedResultSetException(this);
}
return null;
}
public abstract Statement<?> getStmt();
public abstract String getQueryString();
private Cqld4Op rebindLwt(Statement<?> stmt, Row row) {
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
return new Cqld4ReboundStatement(session,maxpages,retryreplace,metrics,rebound,processors);
}
}

View File

@ -5,11 +5,19 @@ import io.nosqlbench.adapter.cqld4.opdispensers.CqlD4PreparedBatchOpDispenser;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4BatchStatementDispenser;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedStmtDispenser;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4SimpleCqlStmtDispenser;
import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedCommand;
import io.nosqlbench.nb.api.config.params.ParamsParser;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
@ -24,19 +32,49 @@ public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
public OpDispenser<Cqld4Op> apply(ParsedCommand cmd) {
ParsedTemplate stmtTpl = cmd.getStmtAsTemplate().orElseThrow(() -> new BasicError(
"No statement was found in the op template:" + cmd
));
RSProcessors processors = new RSProcessors();
if (stmtTpl.getCaptures().size()>0) {
processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures()));
}
// cmd.getOptionalStaticConfig("processor",String.class)
// .map(s -> ParamsParser.parseToMap(s,"type"))
// .map(Cqld4Processors::resolve)
// .ifPresent(processors::add);
//
Optional<List> processorList = cmd.getOptionalStaticConfig("processors", List.class);
processorList.ifPresent(l -> {
l.forEach(m -> {
Map<String, String> pconfig = ParamsParser.parseToMap(m, "type");
ResultSetProcessor processor = Cqld4Processors.resolve(pconfig);
processors.add(() -> processor);
});
});
//
// processorList.stream()
// .map(s -> ParamsParser.parseToMap(s,"type"))
// .map(Cqld4Processors::resolve)
// .forEach(processors::add);
Cqld4Space cqld4Space = cache.get(cmd.getStaticConfigOr("space", "default"));
boolean prepared = cmd.getStaticConfigOr("prepared", true);
boolean batch = cmd.getStaticConfigOr("boolean", false);
CqlSession session = cqld4Space.getSession();
if (prepared && batch) {
return new CqlD4PreparedBatchOpDispenser(session, cmd, cfg);
return new CqlD4PreparedBatchOpDispenser(session, cmd);
} else if (prepared) {
return new Cqld4PreparedStmtDispenser(session, cmd);
return new Cqld4PreparedStmtDispenser(session, cmd, processors);
} else if (batch) {
return new Cqld4BatchStatementDispenser(session, cmd, cfg);
return new Cqld4BatchStatementDispenser(session, cmd);
} else {
return new Cqld4SimpleCqlStmtDispenser(session, cmd, cfg);
return new Cqld4SimpleCqlStmtDispenser(session, cmd);
}
}

View File

@ -0,0 +1,11 @@
package io.nosqlbench.adapter.cqld4;
public class Cqld4OpMetrics {
public void onStart() {
}
public void onSuccess() {
}
}

View File

@ -0,0 +1,30 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import java.util.Map;
public class Cqld4PrintProcessor implements ResultSetProcessor {
StringBuilder sb = new StringBuilder();
public Cqld4PrintProcessor(Map<String, ?> cfg) {
}
@Override
public void start(long cycle, ResultSet container) {
sb.setLength(0);
sb.append("c[").append(cycle).append("] ");
}
@Override
public void buffer(Row element) {
sb.append(element.getFormattedContents()).append("\n");
}
@Override
public void flush() {
System.out.print(sb.toString());
}
}

View File

@ -0,0 +1,25 @@
package io.nosqlbench.adapter.cqld4;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
public enum Cqld4Processors {
print(Cqld4PrintProcessor::new);
private final Function<Map<String, ?>, ResultSetProcessor> initializer;
Cqld4Processors(Function<Map<String,?>,ResultSetProcessor> initializer) {
this.initializer = initializer;
}
public static ResultSetProcessor resolve(Map<String,?> cfg) {
String type = Optional.ofNullable(cfg.get("type"))
.map(Object::toString)
.orElseThrow(() -> new RuntimeException("Map config provided for a processor, but with no type field."));
Cqld4Processors procType = Cqld4Processors.valueOf(type);
ResultSetProcessor p = procType.initializer.apply(cfg);
return p;
}
}

View File

@ -0,0 +1,24 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
public class Cqld4ReboundStatement extends Cqld4Op {
private final BoundStatement stmt;
public Cqld4ReboundStatement(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, BoundStatement rebound, RSProcessors processors) {
super(session,maxpages,retryreplace,metrics,processors);
this.stmt = rebound;
}
@Override
public Statement<?> getStmt() {
return stmt;
}
@Override
public String getQueryString() {
return stmt.getPreparedStatement().getQuery();
}
}

View File

@ -0,0 +1,104 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import io.nosqlbench.nb.api.errors.BasicError;
/**
* @see <a href="https://github.com/apache/cassandra/blob/bf96367f4d55692017e144980cf17963e31df127/doc/native_protocol_v5.spec">native protocol v5</a>
*
*/
public class LWTRebinder {
public static BoundStatement rebindUnappliedStatement(Statement<?> statement, Row row) {
BoundStatement bs;
if (statement instanceof BoundStatement) {
bs = (BoundStatement)statement;
} else {
throw new BasicError("An LWT operation was being rebound to new values, but this is not possible yet " +
"unless it is a bound statement as in prepared statements.");
}
for (ColumnDefinition def : row.getColumnDefinitions()) {
String name = def.getName().toString();
if (!name.equals("[applied]")) {
DataType typeName = def.getType();
int code = typeName.getProtocolCode();
switch (code) {
case ProtocolConstants.DataType.ASCII:
case ProtocolConstants.DataType.VARCHAR:
bs=bs.setString(name,row.getString(name));
break;
case ProtocolConstants.DataType.CUSTOM:
throw new BasicError("Rebinding custom datatypes is not supported. (Custom DataType)");
case ProtocolConstants.DataType.BIGINT:
case ProtocolConstants.DataType.COUNTER:
bs = bs.setLong(name, row.getLong(name));
break;
case ProtocolConstants.DataType.BLOB:
bs = bs.setByteBuffer(name, row.getByteBuffer(name));
break;
case ProtocolConstants.DataType.BOOLEAN:
bs = bs.setBoolean(name,row.getBoolean(name));
break;
case ProtocolConstants.DataType.DECIMAL:
bs = bs.setBigDecimal(name,row.getBigDecimal(name));
break;
case ProtocolConstants.DataType.DOUBLE:
bs = bs.setDouble(name,row.getDouble(name));
break;
case ProtocolConstants.DataType.FLOAT:
bs=bs.setFloat(name,row.getFloat(name));
break;
case ProtocolConstants.DataType.INT:
case ProtocolConstants.DataType.VARINT:
case ProtocolConstants.DataType.SMALLINT:
case ProtocolConstants.DataType.TINYINT:
bs=bs.setInt(name,row.getInt(name));
break;
case ProtocolConstants.DataType.TIMESTAMP:
break;
case ProtocolConstants.DataType.UUID:
case ProtocolConstants.DataType.TIMEUUID:
bs=bs.setUuid(name,row.getUuid(name));
break;
case ProtocolConstants.DataType.INET:
bs=bs.setInetAddress(name,row.getInetAddress(name));
break;
case ProtocolConstants.DataType.DATE:
bs=bs.setLocalDate(name,row.getLocalDate(name));
break;
case ProtocolConstants.DataType.TIME:
bs = bs.setLocalTime(name,bs.getLocalTime(name));
break;
case ProtocolConstants.DataType.DURATION:
bs = bs.setCqlDuration(name,bs.getCqlDuration(name));
break;
case ProtocolConstants.DataType.LIST:
bs = bs.setList(name,bs.getList(name,Object.class),Object.class);
break;
case ProtocolConstants.DataType.MAP:
bs = bs.setMap(name,bs.getMap(name,Object.class,Object.class),Object.class,Object.class);
break;
case ProtocolConstants.DataType.SET:
bs = bs.setSet(name,bs.getSet(name,Object.class),Object.class);
break;
case ProtocolConstants.DataType.UDT:
bs = bs.setUdtValue(name,bs.getUdtValue(name));
break;
case ProtocolConstants.DataType.TUPLE:
bs = bs.setTupleValue(name,bs.getTupleValue(name));
break;
default:
throw new RuntimeException("Unrecognized type:" + typeName);
}
}
}
return bs;
}
}

View File

@ -0,0 +1,54 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.engine.api.activityimpl.uniform.ResultProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class RSProcessors implements ResultProcessor<ResultSet,Row>, Supplier<List<ResultSetProcessor>> {
private final List<Supplier<ResultSetProcessor>> suppliers = new ArrayList<>();
private final ThreadLocal<List<ResultSetProcessor>> processors = ThreadLocal.withInitial(this);
@Override
public List<ResultSetProcessor> get() {
return suppliers.stream().map(Supplier::get).collect(Collectors.toList());
}
public List<Supplier<ResultSetProcessor>> getProcessors() {
return suppliers;
}
public RSProcessors add(Supplier<ResultSetProcessor> processor) {
suppliers.add(processor);
return this;
}
@Override
public void start(long cycle, ResultSet container) {
for (ResultSetProcessor processor : get()) {
processor.start(cycle, container);
}
}
@Override
public void buffer(Row element) {
for (ResultSetProcessor processor : get()) {
processor.buffer(element);
}
}
@Override
public void flush() {
for (ResultSetProcessor processor : get()) {
processor.flush();
}
}
}

View File

@ -0,0 +1,8 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.engine.api.activityimpl.uniform.ResultProcessor;
public interface ResultSetProcessor extends ResultProcessor<ResultSet, Row> {
}

View File

@ -0,0 +1,14 @@
package io.nosqlbench.adapter.cqld4;
public class UndefinedResultSetException extends RuntimeException {
private final Cqld4Op cqld4op;
public UndefinedResultSetException(Cqld4Op cqld4Op) {
this.cqld4op = cqld4Op;
}
@Override
public String getMessage() {
return "Attempted to access a result set which was not defined in op " + cqld4op.toString();
}
}

View File

@ -0,0 +1,27 @@
package io.nosqlbench.adapter.cqld4.exceptions;
import com.datastax.oss.driver.api.core.cql.ResultSet;
/**
* This was added to nosqlbench because the error handling logic was
* starting to look a bit contrived. Because we need to be able
* to respond to different result outcomes, it
* is just simpler to have a single type of error-handling logic for all outcomes.
*/
public class ChangeUnappliedCycleException extends CqlGenericCycleException {
private final ResultSet resultSet;
private final String queryString;
public ChangeUnappliedCycleException(ResultSet resultSet, String queryString) {
super("Operation was not applied:" + queryString);
this.resultSet = resultSet;
this.queryString = queryString;
}
public ResultSet getResultSet() {
return resultSet;
}
public String getQueryString() { return queryString; }
}

View File

@ -0,0 +1,17 @@
package io.nosqlbench.adapter.cqld4.exceptions;
public abstract class CqlGenericCycleException extends RuntimeException {
public CqlGenericCycleException(Throwable cause) {
super(cause);
}
public CqlGenericCycleException(String message) {
super(message);
}
public CqlGenericCycleException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,53 @@
package io.nosqlbench.adapter.cqld4.exceptions;
import com.datastax.oss.driver.api.core.cql.ResultSet;
/**
* <p>This is not a core exception. It was added to the CQL activity type
* driver for nosqlbench specifically to catch the following unexpected
* condition:
* Paging would be needed to read all the results from a read query, but the user
* is not expecting to intentionally check and iterate the result sets for paging.
* <p>
* This should only be thrown if a result set would need paging, but configuration
* options specific that it should not expect to. Rather than assume paging is completely
* expected or unexpected, we simply assume that only 1 page is allowed, being the
* first page, or what is thought of as "not paging".
* <p>If this error is thrown, and paging is expected, then the user can adjust
* fetchsize or maxpages in order to open up paging to the degree that is allowable or
* expected.
*/
public class UnexpectedPagingException extends RuntimeException {
private final ResultSet resultSet;
private final String queryString;
private final int fetchSize;
private final int fetchedPages;
private final int maxpages;
public UnexpectedPagingException(
ResultSet resultSet,
String queryString,
int fetchedPages,
int maxpages,
int fetchSize) {
this.resultSet = resultSet;
this.queryString = queryString;
this.fetchedPages = fetchedPages;
this.maxpages = maxpages;
this.fetchSize = fetchSize;
}
public ResultSet getResultSet() {
return resultSet;
}
public String getMessage() {
StringBuilder sb = new StringBuilder();
sb.append("Additional paging would be required to read the results from this query fully" +
", but the user has not explicitly indicated that paging was expected.")
.append(" fetched/allowed: ").append(fetchedPages).append("/").append(maxpages)
.append(" fetchSize(").append(fetchSize).append("): ").append(queryString);
return sb.toString();
}
}

View File

@ -4,22 +4,20 @@ import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedCommand;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.errors.BasicError;
public class CqlD4PreparedBatchOpDispenser implements OpDispenser<Cqld4Op> {
private final CqlSession session;
private final ParsedCommand cmd;
private final NBConfiguration cfg;
public CqlD4PreparedBatchOpDispenser(CqlSession session, ParsedCommand cmd, NBConfiguration cfg) {
public CqlD4PreparedBatchOpDispenser(CqlSession session, ParsedCommand cmd) {
this.session = session;
this.cmd = cmd;
this.cfg = cfg;
}
@Override
public Cqld4Op apply(long value) {
return null;
throw new BasicError("this is not implemented yet.");
}
}

View File

@ -4,17 +4,14 @@ import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedCommand;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
public class Cqld4BatchStatementDispenser implements OpDispenser<Cqld4Op> {
private final CqlSession session;
private final ParsedCommand cmd;
private final NBConfiguration cfg;
public Cqld4BatchStatementDispenser(CqlSession session, ParsedCommand cmd, NBConfiguration cfg) {
public Cqld4BatchStatementDispenser(CqlSession session, ParsedCommand cmd) {
this.session = session;
this.cmd = cmd;
this.cfg = cfg;
}
@Override

View File

@ -4,7 +4,9 @@ import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4PreparedStatement;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedCommand;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
@ -17,22 +19,30 @@ public class Cqld4PreparedStmtDispenser implements OpDispenser<Cqld4Op> {
private final LongFunction<Object[]> varbinder;
private final PreparedStatement preparedStmt;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
private final RSProcessors processors;
public Cqld4PreparedStmtDispenser(CqlSession session, ParsedCommand cmd) {
public Cqld4PreparedStmtDispenser(CqlSession session, ParsedCommand cmd, RSProcessors processors) {
this.session = session;
this.processors = processors;
ParsedTemplate parsed = cmd.getStmtAsTemplate().orElseThrow();
varbinder = cmd.newArrayBinderFromBindPoints(parsed.getBindPoints());
String preparedQueryString = parsed.getPositionalStatement(s -> "?");
preparedStmt = session.prepare(preparedQueryString);
this.maxpages = cmd.getStaticConfigOr("maxpages",1);
this.retryreplace = cmd.getStaticConfigOr("retryreplace", false);
this.metrics = new Cqld4OpMetrics();
}
@Override
public Cqld4Op apply(long value) {
Object[] parameters = varbinder.apply(value);
BoundStatement stmt = preparedStmt.bind(parameters);
return new Cqld4PreparedStatement(session, stmt);
return new Cqld4PreparedStatement(session, stmt, maxpages, retryreplace, metrics, processors);
}
}

View File

@ -3,27 +3,31 @@ package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4SimpleCqlStatement;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedCommand;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
public class Cqld4SimpleCqlStmtDispenser implements OpDispenser<Cqld4Op> {
private final CqlSession session;
private final ParsedCommand cmd;
private final NBConfiguration cfg;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
public Cqld4SimpleCqlStmtDispenser(CqlSession session, ParsedCommand cmd, NBConfiguration cfg) {
public Cqld4SimpleCqlStmtDispenser(CqlSession session, ParsedCommand cmd) {
this.session = session;
this.cmd = cmd;
this.cfg = cfg;
this.maxpages = cmd.getStaticConfigOr("maxpages",1);
this.retryreplace = cmd.getStaticConfigOr("retryreplace",false);
this.metrics = new Cqld4OpMetrics();
}
@Override
public Cqld4SimpleCqlStatement apply(long value) {
String stmtBody = cmd.get("stmt",value);
SimpleStatement simpleStatement = SimpleStatement.newInstance(stmtBody);
return new Cqld4SimpleCqlStatement(session,simpleStatement);
return new Cqld4SimpleCqlStatement(session,simpleStatement,maxpages,retryreplace,metrics);
}
}

View File

@ -1,10 +1,28 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
public class Cqld4BatchStatement extends Cqld4Op {
@Override
public void run() {
private final BatchStatement stmt;
public Cqld4BatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
super(session,maxpages,retryreplace,metrics);
this.stmt = stmt;
}
@Override
public BatchStatement getStmt() {
return stmt;
}
@Override
public String getQueryString() {
StringBuilder sb = new StringBuilder();
stmt.iterator().forEachRemaining(s -> sb.append(s).append("\n"));
return sb.toString();
}
}

View File

@ -3,19 +3,24 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.RSProcessors;
public class Cqld4PreparedStatement extends Cqld4Op {
private final CqlSession session;
private final BoundStatement stmt;
public Cqld4PreparedStatement(CqlSession session, BoundStatement stmt) {
this.session = session;
public Cqld4PreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
super(session,maxpages,retryreplace,metrics,processors);
this.stmt = stmt;
}
public BoundStatement getStmt() {
return stmt;
}
@Override
public void run() {
session.execute(stmt);
public String getQueryString() {
return stmt.getPreparedStatement().getQuery();
}
}

View File

@ -3,18 +3,27 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.virtdata.core.templates.CapturePoint;
import java.util.Map;
public class Cqld4SimpleCqlStatement extends Cqld4Op {
private final CqlSession session;
private final SimpleStatement stmt;
public Cqld4SimpleCqlStatement(CqlSession session, SimpleStatement stmt) {
this.session = session;
public Cqld4SimpleCqlStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
super(session, maxpages,retryreplace,metrics);
this.stmt = stmt;
}
@Override
public void run() {
session.execute(stmt);
public SimpleStatement getStmt() {
return stmt;
}
@Override
public String getQueryString() {
return stmt.getQuery();
}
}

View File

@ -0,0 +1,32 @@
package io.nosqlbench.adapter.cqld4.processors;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
import io.nosqlbench.virtdata.core.templates.CapturePoint;
import java.util.List;
public class CqlFieldCaptureProcessor implements ResultSetProcessor {
private final List<CapturePoint> captures;
public CqlFieldCaptureProcessor(List<CapturePoint> captures) {
this.captures = captures;
}
@Override
public void start(long cycle, ResultSet container) {
}
@Override
public void buffer(Row element) {
}
@Override
public void flush() {
}
}

View File

@ -0,0 +1,33 @@
package io.nosqlbench.adapter.cqld4.processors;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
import java.util.ArrayList;
/**
* An accumulator for rows, sized to a page of results.
*/
public class RSIterableCapture implements ResultSetProcessor {
private long cycle;
private ArrayList<Row> rows = new ArrayList<>();
@Override
public void start(long cycle, ResultSet container) {
this.cycle = cycle;
rows = new ArrayList<Row>(container.getAvailableWithoutFetching());
}
@Override
public void buffer(Row element) {
rows.add(element);
}
@Override
public void flush() {
}
}