cqld4 incremental process

This commit is contained in:
Jonathan Shook 2020-06-08 13:15:36 -05:00
parent a6582d519c
commit 3628cc0f24
31 changed files with 544 additions and 446 deletions

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>3.12.104-SNAPSHOT</version>
<version>3.12.120-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,7 +23,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.104-SNAPSHOT</version>
<version>3.12.120-SNAPSHOT</version>
</dependency>

View File

@ -1,18 +1,19 @@
package io.nosqlbench.activitytype.cqld4.api;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
/**
* An operator interface for performing a modular action on CQL ResultSets per-cycle.
*/
public interface ResultSetCycleOperator {
public interface D4ResultSetCycleOperator {
/**
* Perform an action on a result set for a specific cycle.
* @param resultSet The ResultSet for the given cycle
* @param pageInfo The ResultSet for the given cycle
* @param statement The statement for the given cycle
* @param cycle The cycle for which the statement was submitted
* @return A value, only meaningful when used with aggregated operators
*/
int apply(ResultSet resultSet, Statement statement, long cycle);
int apply(AsyncResultSet pageInfo, Statement<?> statement, long cycle);
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.activitytype.cqld4.core;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.session.Session;
@ -18,14 +19,13 @@ import java.util.regex.Pattern;
public class CQLBindHelper {
private final ProtocolVersion protocolVersion;
private final Session session;
private final CodecRegistry codecRegistry;
// private final ColumnDefinitions definitions;
// refrence ProtocolConstants.DataType
public CQLBindHelper(Session session) {
this.session = session;
this.protocolVersion = this.session.getContext().getProtocolVersion();
public CQLBindHelper(CqlSession session) {
this.protocolVersion = session.getContext().getProtocolVersion();
this.codecRegistry = session.getContext().getCodecRegistry();
}

View File

@ -3,14 +3,12 @@ package io.nosqlbench.activitytype.cqld4.core;
import com.codahale.metrics.Timer;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.session.Session;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.StatementFilter;
import io.nosqlbench.activitytype.cqld4.errorhandling.ErrorStatus;
import io.nosqlbench.activitytype.cqld4.errorhandling.HashedCQLErrorHandler;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.CQLCycleWithStatementException;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.MaxTriesExhaustedException;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.UnexpectedPagingException;
import io.nosqlbench.activitytype.cqld4.statements.core.ReadyCQLStatement;
@ -34,7 +32,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
private final CqlActivity cqlActivity;
private final ActivityDef activityDef;
private List<RowCycleOperator> rowOps;
private List<ResultSetCycleOperator> cycleOps;
private List<D4ResultSetCycleOperator> cycleOps;
private List<StatementModifier> modifiers;
private StatementFilter statementFilter;
private OpSequence<ReadyCQLStatement> sequencer;
@ -44,7 +42,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
private int pagesFetched = 0;
private long totalRowsFetchedForQuery = 0L;
private ResultSet pagingResultSet;
private AsyncResultSet pagingResultSet;
private Statement pagingStatement;
private ReadyCQLStatement pagingReadyStatement;
private boolean showcql;
@ -125,84 +123,74 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
}
CompletionStage<AsyncResultSet> completion;
try (Timer.Context executeTime = cqlActivity.executeTimer.time()) {
CompletionStage<AsyncResultSet> completion = cqlActivity.getSession().executeAsync(statement);
completion = cqlActivity.getSession().executeAsync(statement);
}
Timer.Context resultTime = cqlActivity.resultTimer.time();
try {
ResultSet resultSet = resultSetFuture.getUninterruptibly();
AsyncResultSet resultSet = completion.toCompletableFuture().get();
if (cycleOps != null) {
for (ResultSetCycleOperator cycleOp : cycleOps) {
for (D4ResultSetCycleOperator cycleOp : cycleOps) {
cycleOp.apply(resultSet, statement, cycleValue);
}
}
ResultSetCycleOperator[] perStmtRSOperators = readyCQLStatement.getResultSetOperators();
if (perStmtRSOperators != null) {
for (ResultSetCycleOperator perStmtRSOperator : perStmtRSOperators) {
D4ResultSetCycleOperator[] rsOperators = readyCQLStatement.getResultSetOperators();
if (rsOperators != null) {
for (D4ResultSetCycleOperator perStmtRSOperator : rsOperators) {
perStmtRSOperator.apply(resultSet, statement, cycleValue);
}
}
if (!resultSet.wasApplied()) {
//resultSet.b
Row row = resultSet.one();
ColumnDefinitions defs = row.getColumnDefinitions();
if (retryReplace) {
statement =
new CQLBindHelper(getCqlActivity().getSession()).rebindUnappliedStatement(statement, defs,row);
}
logger.trace(readyCQLStatement.getQueryString(cycleValue));
// To make exception handling logic flow more uniformly
throw new ChangeUnappliedCycleException(
cycleValue, resultSet, readyCQLStatement.getQueryString(cycleValue)
);
}
int pageRows = resultSet.getAvailableWithoutFetching();
int remaining = pageRows;
RowCycleOperator[] perStmtRowOperators = readyCQLStatement.getRowCycleOperators();
if (rowOps == null && perStmtRowOperators==null) {
while (remaining-- > 0) {
Row row = resultSet.one();
// NOTE: This has been replaced by:
// params:
// rowops: savevars
// You must add this to the YAML for statements that are meant to capture vars
// HashMap<String, Object> bindings = SharedState.tl_ObjectMap.get();
// for (ColumnDefinitions.Definition cdef : row.getColumnDefinitions()) {
// bindings.put(cdef.getName(), row.getObject(cdef.getName()));
// }
// TODO: Add parameter rebind support in cqld4 via op
// if (!resultSet.wasApplied()) {
// //resultSet.b
// Row row = resultSet.one();
// ColumnDefinitions defs = row.getColumnDefinitions();
// if (retryReplace) {
// statement =
// new CQLBindHelper(getCqlActivity().getSession()).rebindUnappliedStatement(statement, defs,row);
// }
//
// logger.trace(readyCQLStatement.getQueryString(cycleValue));
// // To make exception handling logic flow more uniformly
// throw new ChangeUnappliedCycleException(
// cycleValue, resultSet, readyCQLStatement.getQueryString(cycleValue)
// );
// }
// int pageRows = resultSet.getAvailableWithoutFetching();
int rowsInPage=0;
RowCycleOperator[] perStmtRowOperators = readyCQLStatement.getRowCycleOperators();
if (rowOps==null && perStmtRowOperators==null) {
for (Row row : resultSet.currentPage()) {
rowsInPage++;
}
} else {
while (remaining-- > 0) {
Row onerow = resultSet.one();
for (Row row : resultSet.currentPage()) {
if (rowOps!=null) {
for (RowCycleOperator rowOp : rowOps) {
rowOp.apply(onerow, cycleValue);
rowOp.apply(row, cycleValue);
}
}
if (perStmtRowOperators!=null) {
for (RowCycleOperator rowOp : perStmtRowOperators) {
rowOp.apply(onerow, cycleValue);
rowOp.apply(row, cycleValue);
}
}
rowsInPage++;
}
}
cqlActivity.rowsCounter.mark(pageRows);
totalRowsFetchedForQuery += pageRows;
if (resultSet.isFullyFetched()) {
long resultNanos = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
readyCQLStatement.onSuccess(cycleValue, resultNanos, totalRowsFetchedForQuery);
} else {
cqlActivity.rowsCounter.mark(rowsInPage);
totalRowsFetchedForQuery += rowsInPage;
if (resultSet.hasMorePages()) {
if (cqlActivity.maxpages > 1) {
pagingResultSet = resultSet;
pagingStatement = statement;
@ -218,6 +206,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
cqlActivity.getSession().getContext().getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE)
);
}
} else {
long resultNanos = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
readyCQLStatement.onSuccess(cycleValue, resultNanos, totalRowsFetchedForQuery);
}
break; // This is normal termination of this loop, when retries aren't needed
} catch (Exception e) {
@ -248,56 +241,58 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
throw new MaxTriesExhaustedException(cycleValue, maxTries);
}
ListenableFuture<ResultSet> pagingFuture;
try (Timer.Context pagingTime = cqlActivity.pagesTimer.time()) {
CompletionStage<AsyncResultSet> completion;
try (Timer.Context executeTime = cqlActivity.executeTimer.time()) {
pagingFuture = pagingResultSet.fetchMoreResults();
completion = pagingResultSet.fetchNextPage();
}
Timer.Context resultTime = cqlActivity.resultTimer.time();
try {
ResultSet resultSet = pagingFuture.get();
AsyncResultSet resultSet = completion.toCompletableFuture().get();
if (cycleOps != null) {
for (ResultSetCycleOperator cycleOp : cycleOps) {
for (D4ResultSetCycleOperator cycleOp : cycleOps) {
cycleOp.apply(resultSet, pagingStatement, cycleValue);
}
}
ResultSetCycleOperator[] perStmtRSOperators = pagingReadyStatement.getResultSetOperators();
D4ResultSetCycleOperator[] perStmtRSOperators = pagingReadyStatement.getResultSetOperators();
if (perStmtRSOperators != null) {
for (ResultSetCycleOperator perStmtRSOperator : perStmtRSOperators) {
for (D4ResultSetCycleOperator perStmtRSOperator : perStmtRSOperators) {
perStmtRSOperator.apply(resultSet, pagingStatement, cycleValue);
}
}
pagesFetched++;
int pageRows = resultSet.getAvailableWithoutFetching();
int remaining = pageRows;
if (rowOps == null) {
while (remaining-- > 0) {
resultSet.one();
RowCycleOperator[] perStmtRowCycleOp = pagingReadyStatement.getRowCycleOperators();
int rowsInPage=0;
if (rowOps==null && perStmtRowCycleOp==null) {
for (Row row : resultSet.currentPage()) {
rowsInPage++;
}
} else {
while (remaining-- > 0) {
for (RowCycleOperator rowOp : rowOps) {
rowOp.apply(resultSet.one(), cycleValue);
for (Row row : resultSet.currentPage()) {
rowsInPage++;
if (rowOps!=null) {
for (RowCycleOperator rowOp : rowOps) {
rowOp.apply(row,cycleValue);
}
}
if (perStmtRowCycleOp!=null) {
for (RowCycleOperator rowCycleOperator : perStmtRowCycleOp) {
rowCycleOperator.apply(row,cycleValue);
}
}
}
}
cqlActivity.rowsCounter.mark(pageRows);
totalRowsFetchedForQuery += pageRows;
if (resultSet.isFullyFetched()) {
long nanoTime = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
pagingReadyStatement.onSuccess(cycleValue, nanoTime, totalRowsFetchedForQuery);
pagingResultSet = null;
cqlActivity.rowsCounter.mark(rowsInPage);
totalRowsFetchedForQuery += rowsInPage;
} else {
if (resultSet.hasMorePages()) {
if (pagesFetched > cqlActivity.maxpages) {
throw new UnexpectedPagingException(
cycleValue,
@ -309,6 +304,12 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
);
}
pagingResultSet = resultSet;
} else {
long nanoTime = System.nanoTime() - nanoStartTime;
cqlActivity.resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS);
cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery);
pagingReadyStatement.onSuccess(cycleValue, nanoTime, totalRowsFetchedForQuery);
pagingResultSet = null;
}
break; // This is normal termination of this loop, when retries aren't needed
} catch (Exception e) {
@ -350,7 +351,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
this.ebdseErrorHandler = cqlActivity.getCqlErrorHandler();
this.statementFilter = cqlActivity.getStatementFilter();
this.rowOps = cqlActivity.getRowCycleOperators();
this.cycleOps = cqlActivity.getResultSetCycleOperators();
this.cycleOps = cqlActivity.getPageInfoCycleOperators();
this.modifiers = cqlActivity.getStatementModifiers();
}

View File

@ -3,16 +3,15 @@ package io.nosqlbench.activitytype.cqld4.core;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.*;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.session.Session;
import io.nosqlbench.activitytype.cqld4.codecsupport.UDTCodecInjector;
import com.datastax.driver.core.TokenRangeStmtFilter;
import io.nosqlbench.activitytype.cqld4.api.ErrorResponse;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.StatementFilter;
import io.nosqlbench.activitytype.cqld4.errorhandling.NBCycleErrorHandler;
@ -82,7 +81,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
private StatementFilter statementFilter;
private Boolean showcql;
private List<RowCycleOperator> rowCycleOperators;
private List<ResultSetCycleOperator> resultSetCycleOperators;
private List<D4ResultSetCycleOperator> pageInfoCycleOperators;
private List<StatementModifier> statementModifiers;
private Long maxTotalOpsInFlight;
private long retryDelay;
@ -109,7 +108,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
public synchronized void initActivity() {
logger.debug("initializing activity: " + this.activityDef.getAlias());
profileName = getParams().getOptionalString("profile").orElse("default");
session = getSession(profileName);
session = getSession();
if (getParams().getOptionalBoolean("usercodecs").orElse(false)) {
registerCodecs(session);
@ -131,9 +130,9 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
logger.debug("activity fully initialized: " + this.activityDef.getAlias());
}
public synchronized CqlSession getSession(String profileName) {
public synchronized CqlSession getSession() {
if (session == null) {
session = CQLSessionCache.get().getSession(this.getActivityDef(), profileName);
session = CQLSessionCache.get().getSession(this.getActivityDef()).session;
}
return session;
}
@ -430,7 +429,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
}
logger.trace("setting fetchSize to " + fetchSize);
cluster.getConfiguration().getQueryOptions().setFetchSize(fetchSize);
CQLSessionCache.get().getSession(activityDef).set(DefaultDriverOption.REQUEST_PAGE_SIZE,fetchSize);
}
this.retryDelay = params.getOptionalLong("retrydelay").orElse(0L);
@ -441,7 +440,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
this.maxpages = params.getOptionalInteger("maxpages").orElse(1);
this.statementFilter = params.getOptionalString("tokens")
.map(s -> new TokenRangeStmtFilter(cluster, s))
.map(s -> new TokenRangeStmtFilter(getSession(), s))
.orElse(null);
if (statementFilter != null) {
@ -461,49 +460,50 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
this.maxTotalOpsInFlight = params.getOptionalLong("async").orElse(1L);
Optional<String> dynpooling = params.getOptionalString("pooling");
if (dynpooling.isPresent()) {
logger.info("dynamically updating pooling");
if (!dynpooling.get().equals(this.pooling)) {
PoolingOptions opts = CQLOptions.poolingOptionsFor(dynpooling.get());
logger.info("pooling=>" + dynpooling.get());
PoolingOptions cfg = getSession().getCluster().getConfiguration().getPoolingOptions();
// This looks funny, because we have to set max conns per host
// in an order that will appease the driver, as there is no "apply settings"
// to do that for us, so we raise max first if it goes higher, and we lower
// it last, if it goes lower
int prior_mcph_l = cfg.getMaxConnectionsPerHost(HostDistance.LOCAL);
int mcph_l = opts.getMaxConnectionsPerHost(HostDistance.LOCAL);
int ccph_l = opts.getCoreConnectionsPerHost(HostDistance.LOCAL);
if (prior_mcph_l < mcph_l) {
logger.info("setting mcph_l to " + mcph_l);
cfg.setMaxConnectionsPerHost(HostDistance.LOCAL, mcph_l);
}
logger.info("setting ccph_l to " + ccph_l);
cfg.setCoreConnectionsPerHost(HostDistance.LOCAL, ccph_l);
if (mcph_l < prior_mcph_l) {
logger.info("setting mcph_l to " + mcph_l);
cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, mcph_l);
}
cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, opts.getMaxRequestsPerConnection(HostDistance.LOCAL));
int prior_mcph_r = cfg.getMaxConnectionsPerHost(HostDistance.REMOTE);
int mcph_r = opts.getMaxConnectionsPerHost(HostDistance.REMOTE);
int ccph_r = opts.getCoreConnectionsPerHost(HostDistance.REMOTE);
if (mcph_r > 0) {
if (mcph_r > prior_mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r);
opts.setCoreConnectionsPerHost(HostDistance.REMOTE, ccph_r);
if (prior_mcph_r > mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r);
if (opts.getMaxConnectionsPerHost(HostDistance.REMOTE) > 0) {
cfg.setMaxRequestsPerConnection(HostDistance.REMOTE, opts.getMaxRequestsPerConnection(HostDistance.REMOTE));
}
}
this.pooling = dynpooling.get();
}
}
// TODO: Support dynamic pooling options
// Optional<String> dynpooling = params.getOptionalString("pooling");
// if (dynpooling.isPresent()) {
// logger.info("dynamically updating pooling");
// if (!dynpooling.get().equals(this.pooling)) {
// PoolingOptions opts = CQLOptions.poolingOptionsFor(dynpooling.get());
// logger.info("pooling=>" + dynpooling.get());
//
// PoolingOptions cfg = getSession().getCluster().getConfiguration().getPoolingOptions();
//
// // This looks funny, because we have to set max conns per host
// // in an order that will appease the driver, as there is no "apply settings"
// // to do that for us, so we raise max first if it goes higher, and we lower
// // it last, if it goes lower
// int prior_mcph_l = cfg.getMaxConnectionsPerHost(HostDistance.LOCAL);
// int mcph_l = opts.getMaxConnectionsPerHost(HostDistance.LOCAL);
// int ccph_l = opts.getCoreConnectionsPerHost(HostDistance.LOCAL);
// if (prior_mcph_l < mcph_l) {
// logger.info("setting mcph_l to " + mcph_l);
// cfg.setMaxConnectionsPerHost(HostDistance.LOCAL, mcph_l);
// }
// logger.info("setting ccph_l to " + ccph_l);
// cfg.setCoreConnectionsPerHost(HostDistance.LOCAL, ccph_l);
// if (mcph_l < prior_mcph_l) {
// logger.info("setting mcph_l to " + mcph_l);
// cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, mcph_l);
// }
// cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, opts.getMaxRequestsPerConnection(HostDistance.LOCAL));
//
// int prior_mcph_r = cfg.getMaxConnectionsPerHost(HostDistance.REMOTE);
// int mcph_r = opts.getMaxConnectionsPerHost(HostDistance.REMOTE);
// int ccph_r = opts.getCoreConnectionsPerHost(HostDistance.REMOTE);
//
// if (mcph_r > 0) {
// if (mcph_r > prior_mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r);
// opts.setCoreConnectionsPerHost(HostDistance.REMOTE, ccph_r);
// if (prior_mcph_r > mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r);
// if (opts.getMaxConnectionsPerHost(HostDistance.REMOTE) > 0) {
// cfg.setMaxRequestsPerConnection(HostDistance.REMOTE, opts.getMaxRequestsPerConnection(HostDistance.REMOTE));
// }
// }
// this.pooling = dynpooling.get();
// }
// }
}
@ -528,7 +528,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
ErrorResponse.valueOf(verb),
exceptionCountMetrics,
exceptionHistoMetrics,
!getParams().getOptionalLong("async").isPresent()
getParams().getOptionalLong("async").isEmpty()
)
);
} else {
@ -540,7 +540,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
ErrorResponse.valueOf(verb),
exceptionCountMetrics,
exceptionHistoMetrics,
!getParams().getOptionalLong("async").isPresent()
getParams().getOptionalLong("async").isEmpty()
);
logger.info("Handling error group '" + pattern + "' with handler:" + handler);
newerrorHandler.setHandlerForGroup(pattern, handler);
@ -549,7 +549,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
ErrorResponse.valueOf(keyval[1]),
exceptionCountMetrics,
exceptionHistoMetrics,
!getParams().getOptionalLong("async").isPresent()
getParams().getOptionalLong("async").isEmpty()
);
logger.info("Handling error pattern '" + pattern + "' with handler:" + handler);
newerrorHandler.setHandlerForPattern(keyval[0], handler);
@ -599,19 +599,19 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
this.rowCycleOperators = null;
}
public List<ResultSetCycleOperator> getResultSetCycleOperators() {
return resultSetCycleOperators;
public List<D4ResultSetCycleOperator> getPageInfoCycleOperators() {
return pageInfoCycleOperators;
}
protected synchronized void addResultSetCycleOperator(ResultSetCycleOperator resultSetCycleOperator) {
if (this.resultSetCycleOperators == null) {
this.resultSetCycleOperators = new ArrayList<>();
protected synchronized void addResultSetCycleOperator(D4ResultSetCycleOperator pageInfoCycleOperator) {
if (this.pageInfoCycleOperators == null) {
this.pageInfoCycleOperators = new ArrayList<>();
}
this.resultSetCycleOperators.add(resultSetCycleOperator);
this.pageInfoCycleOperators.add(pageInfoCycleOperator);
}
private void clearResultSetCycleOperators() {
this.resultSetCycleOperators = null;
this.pageInfoCycleOperators = null;
}
public List<StatementModifier> getStatementModifiers() {

View File

@ -1,9 +1,11 @@
package io.nosqlbench.activitytype.cqld4.core;
import com.codahale.metrics.Timer;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import io.nosqlbench.activitytype.cqld4.api.ErrorResponse;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.StatementFilter;
import io.nosqlbench.activitytype.cqld4.errorhandling.ErrorStatus;
@ -11,6 +13,7 @@ import io.nosqlbench.activitytype.cqld4.errorhandling.HashedCQLErrorHandler;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.CQLCycleWithStatementException;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.UnexpectedPagingException;
import io.nosqlbench.activitytype.cqld4.statements.core.CQLSessionCache;
import io.nosqlbench.activitytype.cqld4.statements.core.ReadyCQLStatement;
import io.nosqlbench.engine.api.activityapi.core.BaseAsyncAction;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.FailedOp;
@ -23,6 +26,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
@ -33,7 +37,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
private final ActivityDef activityDef;
private List<RowCycleOperator> rowOps;
private List<ResultSetCycleOperator> cycleOps;
private List<D4ResultSetCycleOperator> cycleOps;
private List<StatementModifier> modifiers;
private StatementFilter statementFilter;
private OpSequence<ReadyCQLStatement> sequencer;
@ -112,13 +116,13 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
// The execute timer covers only the point at which EB hands the op to the driver to be executed
try (Timer.Context executeTime = activity.executeTimer.time()) {
cqlop.completionStage = activity.getSession().executeAsync(cqlop.statement);
Futures.addCallback(cqlop.completionStage, cqlop);
CompletionStage<AsyncResultSet> completionStage = activity.getSession().executeAsync(cqlop.statement);
completionStage.whenComplete(cqlop::handleAsyncResult);
}
}
public void onSuccess(StartedOp<CqlOpData> sop) {
public void onSuccess(StartedOp<CqlOpData> sop, AsyncResultSet resultSet) {
CqlOpData cqlop = sop.getData();
HashedCQLErrorHandler.resetThreadStatusCode();
@ -128,39 +132,41 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
try {
ResultSet resultSet = cqlop.resultSet;
cqlop.totalPagesFetchedForQuery++;
// Apply any defined ResultSetCycleOperators
if (cycleOps != null) {
for (ResultSetCycleOperator cycleOp : cycleOps) {
cycleOp.apply(resultSet, cqlop.statement, cqlop.cycle);
// TODO: Implement result and row operators for cqld4 actions
// if (cycleOps != null) {
// for (ResultSetCycleOperator cycleOp : cycleOps) {
// cycleOp.apply(resultSet, cqlop.statement, cqlop.cycle);
// resultSet.
// }
// }
//
int rowsInPage = 0;
// if (rowOps==null) {
for (Row row : resultSet.currentPage()) {
rowsInPage++;
}
}
int pageRows = resultSet.getAvailableWithoutFetching();
int remaining = pageRows;
if (rowOps == null) {
while (remaining-- > 0) {
resultSet.one();
}
} else {
while (remaining-- > 0) {
for (RowCycleOperator rowOp : rowOps) {
rowOp.apply(resultSet.one(), cqlop.cycle);
}
}
}
cqlop.totalRowsFetchedForQuery += pageRows;
// } else {
// for (Row row : resultSet.currentPage()) {
// rowsInPage++;
// for (RowCycleOperator rowOp : rowOps) {
// rowOp.apply(row, cqlop.cycle);
// }
// }
// }
cqlop.totalRowsFetchedForQuery += rowsInPage;
if (cqlop.totalPagesFetchedForQuery++ > activity.maxpages) {
Integer pagesize = CQLSessionCache.get().getSession(activityDef).optionsMap.get(TypedDriverOption.REQUEST_PAGE_SIZE);
throw new UnexpectedPagingException(
cqlop.cycle,
resultSet,
cqlop.readyCQLStatement.getQueryString(cqlop.cycle),
1,
activity.maxpages,
activity.getSession().getCluster().getConfiguration().getQueryOptions().getFetchSize()
pagesize
);
}
@ -171,10 +177,11 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
);
}
if (!resultSet.isFullyFetched()) {
if (!resultSet.hasMorePages()) {
logger.trace("async paging request " + cqlop.totalPagesFetchedForQuery + " for cycle " + cqlop.cycle);
ListenableFuture<ResultSet> resultSetListenableFuture = resultSet.fetchMoreResults();
Futures.addCallback(resultSetListenableFuture, cqlop);
resultSet.fetchNextPage().whenComplete(cqlop::handleAsyncResult);
return;
}
@ -196,9 +203,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
ErrorStatus errorStatus = cqlActivityErrorHandler.handleError(cqlop.cycle, cqlCycleException);
if (errorStatus.isRetryable() && ++cqlop.triesAttempted < maxTries) {
ResultSetFuture resultSetFuture = activity.getSession().executeAsync(cqlop.statement);
sop.retry();
Futures.addCallback(resultSetFuture, cqlop);
activity.getSession().executeAsync(cqlop.statement).whenComplete(cqlop::handleAsyncResult);
return;
} else {
sop.fail(errorStatus.getResultCode());
@ -231,8 +236,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
if (errorStatus.isRetryable() && cqlop.triesAttempted < maxTries) {
startedOp.retry();
try (Timer.Context executeTime = activity.executeTimer.time()) {
cqlop.completionStage = activity.getSession().executeAsync(cqlop.statement);
Futures.addCallback(cqlop.completionStage, cqlop);
activity.getSession().executeAsync(cqlop.statement).whenComplete(cqlop::handleAsyncResult);
return;
}
}
@ -252,7 +256,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
this.cqlActivityErrorHandler = activity.getCqlErrorHandler();
this.statementFilter = activity.getStatementFilter();
this.rowOps = activity.getRowCycleOperators();
this.cycleOps = activity.getResultSetCycleOperators();
this.cycleOps = activity.getPageInfoCycleOperators();
this.modifiers = activity.getStatementModifiers();
}

View File

@ -5,13 +5,17 @@ import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.statements.core.ReadyCQLStatement;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.StartedOp;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Function;
public class CqlOpData extends CompletableFuture<AsyncResultSet> {
public class CqlOpData {
final long cycle;
public CompletionStage<AsyncResultSet> completionStage;
// public CompletionStage<AsyncResultSet> completionStage;
// op state is managed via callbacks, we keep a ref here
StartedOp<CqlOpData> startedOp;
@ -36,21 +40,16 @@ public class CqlOpData extends CompletableFuture<AsyncResultSet> {
this.action = action;
}
@Override
public boolean completeExceptionally(Throwable ex) {
this.throwable=ex;
this.errorAt = System.nanoTime();
action.onFailure(startedOp);
return true;
}
@Override
public boolean complete(AsyncResultSet value) {
this.page = value.currentPage();
this.resultAt = System.nanoTime();
action.onSuccess(startedOp);
return true;
// ? return !value.hasMorePages();
public void handleAsyncResult(AsyncResultSet asyncResultSet, Throwable throwable) {
if (throwable!=null) {
this.throwable = throwable;
this.errorAt = System.nanoTime();
action.onFailure(startedOp);
} else {
this.page = asyncResultSet.currentPage();
this.resultAt = System.nanoTime();
action.onSuccess(startedOp, asyncResultSet);
}
}
}

View File

@ -7,5 +7,5 @@ import com.datastax.oss.driver.api.core.cql.Statement;
* Each active modifier returns a statement in turn.
*/
public interface StatementModifier {
Statement modify(Statement unmodified, long cycleNum);
Statement<?> modify(Statement<?> unmodified, long cycleNum);
}

View File

@ -1,13 +1,15 @@
package io.nosqlbench.activitytype.cqld4.errorhandling;
import com.datastax.oss.driver.api.core.DriverException;
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
import com.datastax.oss.driver.api.core.auth.AuthenticationException;
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
import com.datastax.dse.driver.api.core.servererrors.UnfitClientException;
import com.datastax.oss.driver.api.core.*;
import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
import com.datastax.oss.driver.api.core.connection.ConnectionInitException;
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
import com.datastax.oss.driver.api.core.connection.HeartbeatException;
import com.datastax.oss.driver.api.core.servererrors.*;
import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.*;
import com.datastax.oss.driver.internal.core.channel.ClusterNameMismatchException;
import com.datastax.oss.driver.shaded.guava.common.collect.ComputationException;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.CqlGenericCycleException;
import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -20,7 +22,7 @@ import java.util.Map;
/**
* This enumerates all known exception classes, including supertypes,
* for the purposes of stable naming in error handling.
* This is current as of com.datastax.cassandra:cassandra-driver-core:3.2.0
* This is current as of driver 4.6.0
*
* TODO: for cqld4, add all exceptions again, keeping the previous ones in their existing places, but eliding the
* removed ones and leaving a place holder there, adding the new ones after
@ -32,17 +34,17 @@ public enum CQLExceptionEnum implements ResultReadable {
DriverException(com.datastax.oss.driver.api.core.DriverException.class, 3),
AuthenticationException(com.datastax.oss.driver.api.core.auth.AuthenticationException.class, 4),
TraceRetrievalException(TraceRetrievalException.class, 5),
// TraceRetrievalException(TraceRetrievalException.class, 5),
UnsupportedProtocolVersionException(com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException.class, 6),
NoHostAvailableException(NoHostAvailableException.class, 7),
// NoHostAvailableException(NoHostAvailableException.class, 7),
QueryValidationException(com.datastax.oss.driver.api.core.servererrors.QueryValidationException.class, 8),
InvalidQueryException(com.datastax.oss.driver.api.core.servererrors.InvalidQueryException.class, 9),
InvalidConfigurationInQueryException(com.datastax.oss.driver.api.core.servererrors.InvalidConfigurationInQueryException.class, 10),
UnauthorizedException(com.datastax.oss.driver.api.core.servererrors.UnauthorizedException.class, 11),
SyntaxError(com.datastax.oss.driver.api.core.servererrors.SyntaxError.class, 12),
AlreadyExistsException(AlreadyExistsException.class, 13),
UnpreparedException(UnpreparedException.class, 14),
InvalidTypeException(InvalidTypeException.class, 15),
// UnpreparedException(UnpreparedException.class, 14),
// InvalidTypeException(InvalidTypeException.class, 15),
QueryExecutionException(QueryExecutionException.class, 16),
UnavailableException(UnavailableException.class, 17),
BootstrappingException(BootstrappingException.class, 18),
@ -53,17 +55,17 @@ public enum CQLExceptionEnum implements ResultReadable {
WriteFailureException(WriteFailureException.class, 23),
ReadFailureException(ReadFailureException.class, 24),
ReadTimeoutException(ReadTimeoutException.class, 25),
FunctionExecutionException(FunctionExecutionException.class, 26),
DriverInternalError(DriverInternalError.class, 27),
// FunctionExecutionException(FunctionExecutionException.class, 26),
// DriverInternalError(DriverInternalError.class, 27),
ProtocolError(ProtocolError.class, 28),
ServerError(ServerError.class, 29),
BusyPoolException(BusyPoolException.class, 30),
ConnectionException(ConnectionException.class, 31),
TransportException(TransportException.class, 32),
OperationTimedOutException(OperationTimedOutException.class, 33),
PagingStateException(PagingStateException.class, 34),
UnresolvedUserTypeException(UnresolvedUserTypeException.class, 35),
UnsupportedFeatureException(UnsupportedFeatureException.class, 36),
// BusyPoolException(BusyPoolException.class, 30),
// ConnectionException(ConnectionException.class, 31),
// TransportException(TransportException.class, 32),
// OperationTimedOutException(OperationTimedOutException.class, 33),
// PagingStateException(PagingStateException.class, 34),
// UnresolvedUserTypeException(UnresolvedUserTypeException.class, 35),
// UnsupportedFeatureException(UnsupportedFeatureException.class, 36),
BusyConnectionException(com.datastax.oss.driver.api.core.connection.BusyConnectionException.class, 37),
ChangeUnappliedCycleException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ChangeUnappliedCycleException.class, 38),
@ -71,7 +73,24 @@ public enum CQLExceptionEnum implements ResultReadable {
RowVerificationException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.RowVerificationException.class, 40),
UnexpectedPagingException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.UnexpectedPagingException.class, 41),
EbdseCycleException(CqlGenericCycleException.class, 42),
MaxTriesExhaustedException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.MaxTriesExhaustedException.class,43);
MaxTriesExhaustedException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.MaxTriesExhaustedException.class,43),
// Added for 4.6
ClusterNameMismatchException(com.datastax.oss.driver.internal.core.channel.ClusterNameMismatchException.class, 44),
ComputationException(com.datastax.oss.driver.shaded.guava.common.collect.ComputationException.class,45),
AllNodesFailedException(com.datastax.oss.driver.api.core.AllNodesFailedException.class,46),
NoNodeAvailableException(com.datastax.oss.driver.api.core.NoNodeAvailableException.class,47),
ClosedConnectionException(ClosedConnectionException.class,48),
ConnectionInitException(com.datastax.oss.driver.api.core.connection.ConnectionInitException.class,49),
CoordinatorException(CoordinatorException.class,50),
FunctionFailureException(FunctionFailureException.class,51),
UnfitClientException(com.datastax.dse.driver.api.core.servererrors.UnfitClientException.class,52),
DriverExecutionException(com.datastax.oss.driver.api.core.DriverExecutionException.class,53),
DriverTimeoutException(com.datastax.oss.driver.api.core.DriverTimeoutException.class,54),
HeartbeatException(com.datastax.oss.driver.api.core.connection.HeartbeatException.class,55),
InvalidKeyspaceException(com.datastax.oss.driver.api.core.InvalidKeyspaceException.class,56),
RequestThrottlingException(RequestThrottlingException.class,57),
CqlGenericCycleException(CqlGenericCycleException.class,58);
private final static Logger logger = LoggerFactory.getLogger(CQLExceptionEnum.class);

View File

@ -1,6 +1,10 @@
package io.nosqlbench.activitytype.cqld4.errorhandling;
import com.datastax.oss.driver.api.core.DriverTimeoutException;
import com.datastax.oss.driver.api.core.NoNodeAvailableException;
import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
import com.datastax.oss.driver.api.core.servererrors.OverloadedException;
import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException;
import com.datastax.oss.driver.api.core.servererrors.UnavailableException;
@ -30,10 +34,13 @@ public class HashedCQLErrorHandler extends HashedErrorHandler<Throwable, ErrorSt
this.setGroup("retryable",
NoNodeAvailableException.class,
UnavailableException.class,
OperationTimedOutException.class,
BusyConnectionException.class,
ClosedConnectionException.class,
OverloadedException.class,
WriteTimeoutException.class,
ReadTimeoutException.class
ReadTimeoutException.class,
DriverTimeoutException.class,
RequestThrottlingException.class
);
this.setGroup(
"unapplied",

View File

@ -67,15 +67,13 @@ public class NBCycleErrorHandler implements CycleErrorHandler<Throwable, ErrorSt
boolean retry = false;
switch (errorResponse) {
case stop:
logger.error("error with cycle " + cycle + ": statement: " + cce.getStatement() + " errmsg: " +
CQLExceptionDetailer.messageFor(cycle, error));
logger.error("error with cycle " + cycle + ": statement: " + cce.getStatement() + " errmsg: " + error.getMessage());
if (throwExceptionOnStop) {
throw new RuntimeException(error);
}
case warn:
logger.warn("error with cycle " + cycle + ": statement: " + cce.getStatement() + " errmsg: " +
CQLExceptionDetailer.messageFor(cycle, error));
logger.warn("error with cycle " + cycle + ": statement: " + cce.getStatement() + " errmsg: " + error.getMessage());
case retry:
retry = true;
case histogram:

View File

@ -1,52 +1,51 @@
package io.nosqlbench.activitytype.cqld4.errorhandling.exceptions;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.cql.*;
public abstract class CQLResultSetException extends CqlGenericCycleException {
private final Statement statement;
private final ResultSet resultSet;
private final Statement<?> statement;
private final AsyncResultSet resultSet;
public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement, String message, Throwable cause) {
public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement<?> statement, String message,
Throwable cause) {
super(cycle,message,cause);
this.resultSet = resultSet;
this.statement = statement;
}
public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement) {
public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement<?> statement) {
super(cycle);
this.resultSet = resultSet;
this.statement = statement;
}
public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement, String message) {
public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement<?> statement, String message) {
super(cycle,message);
this.resultSet = resultSet;
this.statement=statement;
}
public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement, Throwable cause) {
public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement<?> statement, Throwable cause) {
super(cycle,cause);
this.resultSet = resultSet;
this.statement = statement;
}
public Statement getStatement() {
public Statement<?> getStatement() {
return statement;
}
public ResultSet getResultSet() {
public AsyncResultSet getResultSet() {
return resultSet;
}
protected static String getQueryString(Statement stmt) {
protected static String getQueryString(Statement<?> stmt) {
if (stmt instanceof BoundStatement) {
return ((BoundStatement)stmt).preparedStatement().getQueryString();
return ((BoundStatement)stmt).getPreparedStatement().getQuery();
} else if (stmt instanceof SimpleStatement) {
return ((SimpleStatement) stmt).getQueryString();
return ((SimpleStatement) stmt).getQuery();
} else {
return "UNKNOWN Statement type:" + stmt.getClass().getSimpleName();
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.activitytype.cqld4.errorhandling.exceptions;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
/**
@ -12,11 +13,19 @@ public class ChangeUnappliedCycleException extends CqlGenericCycleException {
private final ResultSet resultSet;
private final String queryString;
private final AsyncResultSet asyncResultSet;
public ChangeUnappliedCycleException(long cycle, AsyncResultSet asyncResultSet, String queryString) {
super(cycle, "Operation was not applied:" + queryString);
this.asyncResultSet = asyncResultSet;
this.queryString = queryString;
this.resultSet=null;
}
public ChangeUnappliedCycleException(long cycle, ResultSet resultSet, String queryString) {
super(cycle, "Operation was not applied:" + queryString);
this.resultSet = resultSet;
this.queryString = queryString;
this.asyncResultSet=null;
}
public ResultSet getResultSet() {

View File

@ -1,17 +1,18 @@
package io.nosqlbench.activitytype.cqld4.errorhandling.exceptions;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
public class ResultSetVerificationException extends CQLResultSetException {
public ResultSetVerificationException(
long cycle, ResultSet resultSet, Statement statement, Throwable cause) {
long cycle, AsyncResultSet resultSet, Statement<?> statement, Throwable cause) {
super(cycle, resultSet, statement, cause);
}
public ResultSetVerificationException(
long cycle, ResultSet resultSet, Statement statement, String s) {
long cycle, AsyncResultSet resultSet, Statement<?> statement, String s) {
super(cycle, resultSet, statement, s + ", \nquery string:\n" + getQueryString(statement));
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.activitytype.cqld4.errorhandling.exceptions;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
/**
@ -19,7 +20,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet;
*/
public class UnexpectedPagingException extends CqlGenericCycleException {
private final ResultSet resultSet;
private final AsyncResultSet resultSet;
private final String queryString;
private final int fetchSize;
private int fetchedPages;
@ -27,7 +28,7 @@ public class UnexpectedPagingException extends CqlGenericCycleException {
public UnexpectedPagingException(
long cycle,
ResultSet resultSet,
AsyncResultSet resultSet,
String queryString,
int fetchedPages,
int maxpages,
@ -40,7 +41,7 @@ public class UnexpectedPagingException extends CqlGenericCycleException {
this.fetchSize = fetchSize;
}
public ResultSet getResultSet() {
public AsyncResultSet getAsyncResultSet() {
return resultSet;
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.activitytype.cqld4.statements.binders;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.session.Session;
@ -8,19 +9,20 @@ import io.nosqlbench.virtdata.core.bindings.ValuesArrayBinder;
import java.util.function.Function;
public enum CqlBinderTypes {
direct_array(s -> new DirectArrayValuesBinder()),
direct_array(DirectArrayValuesBinder::new),
unset_aware(UnsettableValuesBinder::new),
diagnostic(s -> new DiagnosticPreparedBinder());
diag_binder(DiagnosticPreparedBinder::new);
private final Function<Session, ValuesArrayBinder<PreparedStatement, Statement<?>>> mapper;
private final Function<CqlSession, ValuesArrayBinder<PreparedStatement, Statement<?>>> mapper;
CqlBinderTypes(Function<Session,ValuesArrayBinder<PreparedStatement,Statement<?>>> mapper) {
CqlBinderTypes(Function<CqlSession,ValuesArrayBinder<PreparedStatement,Statement<?>>> mapper) {
this.mapper = mapper;
}
public final static CqlBinderTypes DEFAULT = unset_aware;
public ValuesArrayBinder<PreparedStatement,Statement<?>> get(Session session) {
public ValuesArrayBinder<PreparedStatement,Statement<?>> get(CqlSession session) {
return mapper.apply(session);
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.activitytype.cqld4.statements.binders;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.type.DataType;
import io.nosqlbench.activitytype.cqld4.core.CQLBindHelper;
@ -17,7 +18,14 @@ import java.util.List;
* order to explain in more detail what is happening for users.
*/
public class DiagnosticPreparedBinder implements ValuesArrayBinder<PreparedStatement, Statement<?>> {
public static final Logger logger = LoggerFactory.getLogger(DiagnosticPreparedBinder.class);
private final CqlSession session;
public DiagnosticPreparedBinder(CqlSession session) {
this.session = session;
}
@Override
public Statement<?> bindValues(PreparedStatement prepared, Object[] values) {
ColumnDefinitions columnDefinitions = prepared.getVariableDefinitions();
@ -26,9 +34,7 @@ public class DiagnosticPreparedBinder implements ValuesArrayBinder<PreparedState
List<ColumnDefinition> columnDefList = new ArrayList<>();
prepared.getVariableDefinitions().forEach(columnDefList::add);
if (columnDefList.size() == values.length) {
columnDefList = columnDefinitions.asList();
} else {
if (columnDefList.size() != values.length) {
throw new RuntimeException("The number of named anchors in your statement does not match the number of bindings provided.");
}
@ -41,7 +47,7 @@ public class DiagnosticPreparedBinder implements ValuesArrayBinder<PreparedState
String colName = columnDef.getName().toString();
DataType type =columnDef.getType();
try {
bound = CQLBindHelper.bindStatement(bound, colName, value, type);
new CQLBindHelper(session).bindStatement(bound, colName, value, type);
} catch (ClassCastException e) {
logger.error(String.format("Unable to bind column %s to cql type %s with value %s", colName, type, value));
throw e;

View File

@ -1,5 +1,6 @@
package io.nosqlbench.activitytype.cqld4.statements.binders;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.virtdata.core.bindings.ValuesArrayBinder;
@ -20,9 +21,14 @@ import java.util.Arrays;
*/
public class DirectArrayValuesBinder implements ValuesArrayBinder<PreparedStatement, Statement<?>> {
public final static Logger logger = LoggerFactory.getLogger(DirectArrayValuesBinder.class);
private final CqlSession session;
public DirectArrayValuesBinder(CqlSession session) {
this.session = session;
}
@Override
public Statement bindValues(PreparedStatement preparedStatement, Object[] objects) {
public Statement<?> bindValues(PreparedStatement preparedStatement, Object[] objects) {
try {
return preparedStatement.bind(objects);
} catch (Exception e) {
@ -30,7 +36,7 @@ public class DirectArrayValuesBinder implements ValuesArrayBinder<PreparedStatem
sb.append("Error binding objects to prepared statement directly, falling back to diagnostic binding layer:");
sb.append(Arrays.toString(objects));
logger.warn(sb.toString(),e);
DiagnosticPreparedBinder diag = new DiagnosticPreparedBinder();
DiagnosticPreparedBinder diag = new DiagnosticPreparedBinder(session);
return diag.bindValues(preparedStatement, objects);
}
}

View File

@ -5,6 +5,7 @@ import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.*;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
@ -12,6 +13,7 @@ import com.datastax.oss.driver.internal.core.config.map.MapBasedDriverConfigLoad
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy;
import com.typesafe.config.ConfigFactory;
import io.nosqlbench.activitytype.cqld4.config.CQLD4OptionsMapper;
import io.nosqlbench.activitytype.cqld4.core.CQLOptions;
import io.nosqlbench.activitytype.cqld4.core.ProxyTranslator;
import io.nosqlbench.engine.api.activityapi.core.Shutdownable;
@ -41,12 +43,17 @@ public class CQLSessionCache implements Shutdownable {
private Map<String, SessionConfig> sessionCache = new HashMap<>();
private final static class SessionConfig extends ConcurrentHashMap<String,String> {
public final static class SessionConfig extends ConcurrentHashMap<String,String> {
public CqlSession session;
public Map<String,String> config = new ConcurrentHashMap<>();
public OptionsMap optionsMap;
public SessionConfig(CqlSession session) {
public SessionConfig(CqlSession session, OptionsMap optionsMap) {
this.session = session;
this.optionsMap = optionsMap;
}
public void set(DefaultDriverOption intOption, Object value) {
CQLD4OptionsMapper.apply(optionsMap,intOption.getPath(), value.toString());
}
}
@ -63,11 +70,11 @@ public class CQLSessionCache implements Shutdownable {
sessionConfig.session.close();
}
public CqlSession getSession(ActivityDef activityDef) {
public SessionConfig getSession(ActivityDef activityDef) {
String key = activityDef.getParams().getOptionalString("sessionid").orElse(DEFAULT_SESSION_ID);
String profileName = activityDef.getParams().getOptionalString("profile").orElse("default");
SessionConfig sessionConfig = sessionCache.computeIfAbsent(key, (cid) -> createSession(activityDef, key, profileName));
return sessionConfig.session;
return sessionConfig;
}
// cbopts=\".withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new DCAwareRoundRobinPolicy(\"dc1-us-east\", 0, false))).build()).withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))\"
@ -93,11 +100,6 @@ public class CQLSessionCache implements Shutdownable {
// DriverConfigLoader cl = DriverConfigLoader.fromMap(defaults);
// DriverConfig cfg = cl.getInitialConfig();
OptionsMap optionsMap = OptionsMap.driverDefaults();
builder.withConfigLoader(new MapBasedDriverConfigLoader())
builder.withConfigLoader(optionsMap);
Optional<Path> scb = activityDef.getParams().getOptionalString("secureconnectbundle")
.map(Path::of);
@ -107,6 +109,7 @@ public class CQLSessionCache implements Shutdownable {
Optional<Integer> port1 = activityDef.getParams().getOptionalInteger("port");
if (scb.isPresent()) {
scb.map(b -> {
logger.debug("adding secureconnectbundle: " + b.toString());
@ -162,11 +165,12 @@ public class CQLSessionCache implements Shutdownable {
}
}
Optional<String> clusteropts = activityDef.getParams().getOptionalString("cbopts");
if (clusteropts.isPresent()) {
try {
logger.info("applying cbopts:" + clusteropts.get());
NashornEvaluator<DseCluster.Builder> clusterEval = new NashornEvaluator<>(DseCluster.Builder.class);
NashornEvaluator<CqlSessionBuilder> clusterEval = new NashornEvaluator<>(CqlSessionBuilder.class);
clusterEval.put("builder", builder);
String importEnv =
"load(\"nashorn:mozilla_compat.js\");\n" +
@ -185,135 +189,157 @@ public class CQLSessionCache implements Shutdownable {
}
}
SpeculativeExecutionPolicy speculativePolicy = activityDef.getParams()
.getOptionalString("speculative")
.map(speculative -> {
logger.info("speculative=>" + speculative);
return speculative;
})
.map(CQLOptions::speculativeFor)
.orElse(CQLOptions.defaultSpeculativePolicy());
builder.withSpeculativeExecutionPolicy(speculativePolicy);
// TODO: Support speculative=>
// SpeculativeExecutionPolicy speculativePolicy = activityDef.getParams()
// .getOptionalString("speculative")
// .map(speculative -> {
// logger.info("speculative=>" + speculative);
// return speculative;
// })
// .map(CQLOptions::speculativeFor)
// .orElse(CQLOptions.defaultSpeculativePolicy());
// builder.withSpeculativeExecutionPolicy(speculativePolicy);
activityDef.getParams().getOptionalString("socketoptions")
.map(sockopts -> {
logger.info("socketoptions=>" + sockopts);
return sockopts;
})
.map(CQLOptions::socketOptionsFor)
.ifPresent(builder::withSocketOptions);
// TODO: Support socketoptions=>
// activityDef.getParams().getOptionalString("socketoptions")
// .map(sockopts -> {
// logger.info("socketoptions=>" + sockopts);
// return sockopts;
// })
// .map(CQLOptions::socketOptionsFor)
// .ifPresent(builder::withSocketOptions);
//
activityDef.getParams().getOptionalString("reconnectpolicy")
.map(reconnectpolicy-> {
logger.info("reconnectpolicy=>" + reconnectpolicy);
return reconnectpolicy;
})
.map(CQLOptions::reconnectPolicyFor)
.ifPresent(builder::withReconnectionPolicy);
// TODO: Support reconnectpolicy
// activityDef.getParams().getOptionalString("reconnectpolicy")
// .map(reconnectpolicy-> {
// logger.info("reconnectpolicy=>" + reconnectpolicy);
// return reconnectpolicy;
// })
// .map(CQLOptions::reconnectPolicyFor)
// .ifPresent(builder::withReconnectionPolicy);
activityDef.getParams().getOptionalString("pooling")
.map(pooling -> {
logger.info("pooling=>" + pooling);
return pooling;
})
.map(CQLOptions::poolingOptionsFor)
.ifPresent(builder::withPoolingOptions);
// TODO: support pooling options
// activityDef.getParams().getOptionalString("pooling")
// .map(pooling -> {
// logger.info("pooling=>" + pooling);
// return pooling;
// })
// .map(CQLOptions::poolingOptionsFor)
// .ifPresent(builder::withPoolingOptions);
activityDef.getParams().getOptionalString("whitelist")
.map(whitelist -> {
logger.info("whitelist=>" + whitelist);
return whitelist;
})
.map(p -> CQLOptions.whitelistFor(p, null))
.ifPresent(builder::withLoadBalancingPolicy);
// TODO: support whitelist options
// activityDef.getParams().getOptionalString("whitelist")
// .map(whitelist -> {
// logger.info("whitelist=>" + whitelist);
// return whitelist;
// })
// .map(p -> CQLOptions.whitelistFor(p, null))
// .ifPresent(builder::withLoadBalancingPolicy);
//
activityDef.getParams().getOptionalString("tickduration")
.map(tickduration -> {
logger.info("tickduration=>" + tickduration);
return tickduration;
})
.map(CQLOptions::withTickDuration)
.ifPresent(builder::withNettyOptions);
// TODO: support tickduration
// activityDef.getParams().getOptionalString("tickduration")
// .map(tickduration -> {
// logger.info("tickduration=>" + tickduration);
// return tickduration;
// })
// .map(CQLOptions::withTickDuration)
// .ifPresent(builder::withNettyOptions);
activityDef.getParams().getOptionalString("compression")
.map(compression -> {
logger.info("compression=>" + compression);
return compression;
})
.map(CQLOptions::withCompression)
.ifPresent(builder::withCompression);
// TODO: support compression
// activityDef.getParams().getOptionalString("compression")
// .map(compression -> {
// logger.info("compression=>" + compression);
// return compression;
// })
// .map(CQLOptions::withCompression)
// .ifPresent(builder::withCompression);
if (activityDef.getParams().getOptionalString("ssl").isPresent()) {
logger.info("Cluster builder proceeding with SSL but no Client Auth");
Object context = SSLKsFactory.get().getContext(activityDef);
SSLOptions sslOptions;
if (context instanceof javax.net.ssl.SSLContext) {
sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
.withSSLContext((javax.net.ssl.SSLContext) context).build();
builder.withSSL(sslOptions);
} else if (context instanceof io.netty.handler.ssl.SslContext) {
sslOptions =
new RemoteEndpointAwareNettySSLOptions((io.netty.handler.ssl.SslContext) context);
} else {
throw new RuntimeException("Unrecognized ssl context object type: " + context.getClass().getCanonicalName());
}
builder.withSSL(sslOptions);
}
// TODO: Support SSL standard config interface
// if (activityDef.getParams().getOptionalString("ssl").isPresent()) {
// logger.info("Cluster builder proceeding with SSL but no Client Auth");
// Object context = SSLKsFactory.get().getContext(activityDef);
// SSLOptions sslOptions;
// if (context instanceof javax.net.ssl.SSLContext) {
// sslOptions = RemoteEndpointAwareJdkSSLOptions.builder()
// .withSSLContext((javax.net.ssl.SSLContext) context).build();
// builder.withSSL(sslOptions);
// } else if (context instanceof io.netty.handler.ssl.SslContext) {
// sslOptions =
// new RemoteEndpointAwareNettySSLOptions((io.netty.handler.ssl.SslContext) context);
// } else {
// throw new RuntimeException("Unrecognized ssl context object type: " + context.getClass().getCanonicalName());
// }
// builder.withSSL(sslOptions);
// }
// TODO: Support retry policy
// RetryPolicy retryPolicy = activityDef.getParams()
// .getOptionalString("retrypolicy")
// .map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE);
//
// if (retryPolicy instanceof LoggingRetryPolicy) {
// logger.info("using LoggingRetryPolicy");
// }
//
// builder.withRetryPolicy(retryPolicy);
// TODO: Support JMX reporting toggle
// if (!activityDef.getParams().getOptionalBoolean("jmxreporting").orElse(false)) {
// builder.withoutJMXReporting();
// }
// TODO: Support single-endpoint options?
// // Proxy Translator and Whitelist for use with DS Cloud on-demand single-endpoint setup
// if (activityDef.getParams().getOptionalBoolean("single-endpoint").orElse(false)) {
// InetSocketAddress inetHost = new InetSocketAddress(host, port);
// final List<InetSocketAddress> whiteList = new ArrayList<>();
// whiteList.add(inetHost);
//
// LoadBalancingPolicy whitelistPolicy = new WhiteListPolicy(new RoundRobinPolicy(), whiteList);
// builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy);
// }
CqlSession session = builder.build();
// Cluster cl = builder.build();
RetryPolicy retryPolicy = activityDef.getParams()
.getOptionalString("retrypolicy")
.map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE);
if (retryPolicy instanceof LoggingRetryPolicy) {
logger.info("using LoggingRetryPolicy");
}
builder.withRetryPolicy(retryPolicy);
if (!activityDef.getParams().getOptionalBoolean("jmxreporting").orElse(false)) {
builder.withoutJMXReporting();
}
// Proxy Translator and Whitelist for use with DS Cloud on-demand single-endpoint setup
if (activityDef.getParams().getOptionalBoolean("single-endpoint").orElse(false)) {
InetSocketAddress inetHost = new InetSocketAddress(host, port);
final List<InetSocketAddress> whiteList = new ArrayList<>();
whiteList.add(inetHost);
LoadBalancingPolicy whitelistPolicy = new WhiteListPolicy(new RoundRobinPolicy(), whiteList);
builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy);
}
Cluster cl = builder.build();
// Apply default idempotence, if set
activityDef.getParams().getOptionalBoolean("defaultidempotence").map(
b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b)
);
Session session = cl.newSession();
// TODO: Support default idempotence
// // Apply default idempotence, if set
// activityDef.getParams().getOptionalBoolean("defaultidempotence").map(
// b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b)
// );
// This also forces init of metadata
logger.info("cluster-metadata-allhosts:\n" + session.getCluster().getMetadata().getAllHosts());
Map<UUID, Node> nodes = session.getMetadata().getNodes();
if (nodes.size()>25) {
logger.info("Found " + nodes.size() + " nodes in cluster.");
} else {
nodes.forEach((k,v)->{
logger.info("found node " + k);
});
}
logger.info("cluster-metadata-allhosts:\n" + session.getMetadata().getNodes());
if (activityDef.getParams().getOptionalBoolean("drivermetrics").orElse(false)) {
String driverPrefix = "driver." + sessid;
driverPrefix = activityDef.getParams().getOptionalString("driverprefix").orElse(driverPrefix) + ".";
ActivityMetrics.mountSubRegistry(driverPrefix, cl.getMetrics().getRegistry());
String driverPrefix = activityDef.getParams().getOptionalString("driverprefix").orElse("driver."+sessid) + ".";
session.getMetrics().ifPresent(m -> ActivityMetrics.mountSubRegistry(driverPrefix,m.getRegistry()));
}
return session;
OptionsMap optionsMap = OptionsMap.driverDefaults();
return new SessionConfig(session,optionsMap);
}
@Override
public void shutdown() {
for (Session session : sessionCache.values()) {
Cluster cluster = session.getCluster();
session.close();
cluster.close();
for (SessionConfig session : sessionCache.values()) {
session.session.close();
}
}
}

View File

@ -5,7 +5,7 @@ import com.codahale.metrics.Timer;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator;
import io.nosqlbench.virtdata.core.bindings.ContextualArrayBindings;
@ -20,9 +20,9 @@ import java.util.concurrent.TimeUnit;
public class ReadyCQLStatement {
private String name;
private ContextualArrayBindings<?, Statement> contextualBindings;
private ContextualArrayBindings<?, Statement<?>> contextualBindings;
private long ratio;
private ResultSetCycleOperator[] resultSetOperators = null;
private D4ResultSetCycleOperator[] resultSetOperators = null;
private RowCycleOperator[] rowCycleOperators = null;
private Timer successTimer;
@ -30,7 +30,7 @@ public class ReadyCQLStatement {
private Histogram rowsFetchedHisto;
private Writer resultCsvWriter;
public ReadyCQLStatement(ContextualArrayBindings<?, Statement> contextualBindings, long ratio, String name) {
public ReadyCQLStatement(ContextualArrayBindings<?, Statement<?>> contextualBindings, long ratio, String name) {
this.contextualBindings = contextualBindings;
this.ratio = ratio;
this.name = name;
@ -47,7 +47,7 @@ public class ReadyCQLStatement {
return contextualBindings.bind(value);
}
public ResultSetCycleOperator[] getResultSetOperators() {
public D4ResultSetCycleOperator[] getResultSetOperators() {
return resultSetOperators;
}
@ -161,8 +161,8 @@ public class ReadyCQLStatement {
}
public ReadyCQLStatement withResultSetCycleOperators(ResultSetCycleOperator[] resultSetCycleOperators) {
this.resultSetOperators = resultSetCycleOperators;
public ReadyCQLStatement withResultSetCycleOperators(D4ResultSetCycleOperator[] pageInfoCycleOperators) {
this.resultSetOperators = pageInfoCycleOperators;
return this;
}

View File

@ -2,11 +2,12 @@ package io.nosqlbench.activitytype.cqld4.statements.core;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.session.Session;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator;
import io.nosqlbench.activitytype.cqld4.core.CqlActivity;
import io.nosqlbench.activitytype.cqld4.statements.binders.CqlBinderTypes;
@ -25,11 +26,11 @@ public class ReadyCQLStatementTemplate {
private final static Logger logger = LoggerFactory.getLogger(ReadyCQLStatementTemplate.class);
private final Session session;
private ContextualBindingsArrayTemplate<?, Statement> template;
private ContextualBindingsArrayTemplate<?, Statement<?>> template;
private long ratio;
private String name;
private ResultSetCycleOperator[] resultSetCycleOperators;
private D4ResultSetCycleOperator[] pageInfoCycleOperators;
private RowCycleOperator[] rowCycleOperators;
private Timer successTimer;
@ -37,11 +38,17 @@ public class ReadyCQLStatementTemplate {
private Histogram rowsFetchedHisto;
private Writer resultCsvWriter;
public ReadyCQLStatementTemplate(Map<String,Object> fconfig, CqlBinderTypes binderType, Session session,
PreparedStatement preparedStmt, long ratio, String name) {
public ReadyCQLStatementTemplate(
Map<String,Object> fconfig,
CqlBinderTypes binderType,
CqlSession session,
PreparedStatement preparedStmt,
long ratio,
String name
) {
this.session = session;
this.name = name;
ValuesArrayBinder<PreparedStatement, Statement> binder = binderType.get(session);
ValuesArrayBinder<PreparedStatement, Statement<?>> binder = binderType.get(session);
logger.trace("Using binder_type=>" + binder.toString());
template = new ContextualBindingsArrayTemplate<>(
@ -52,10 +59,17 @@ public class ReadyCQLStatementTemplate {
this.ratio = ratio;
}
public ReadyCQLStatementTemplate(Map<String,Object> fconfig, Session session, SimpleStatement simpleStatement, long ratio, String name, boolean parametrized) {
public ReadyCQLStatementTemplate(
Map<String,Object> fconfig,
Session session,
SimpleStatement simpleStatement,
long ratio,
String name,
boolean parametrized
) {
this.session = session;
this.name = name;
template = new ContextualBindingsArrayTemplate<>(
template = new ContextualBindingsArrayTemplate(
simpleStatement,
new BindingsTemplate(fconfig),
new SimpleStatementValuesBinder(parametrized)
@ -66,12 +80,12 @@ public class ReadyCQLStatementTemplate {
public ReadyCQLStatement resolve() {
return new ReadyCQLStatement(template.resolveBindings(), ratio, name)
.withMetrics(this.successTimer, this.errorTimer, this.rowsFetchedHisto)
.withResultSetCycleOperators(resultSetCycleOperators)
.withResultSetCycleOperators(pageInfoCycleOperators)
.withRowCycleOperators(rowCycleOperators)
.withResultCsvWriter(resultCsvWriter);
}
public ContextualBindingsArrayTemplate<?, Statement> getContextualBindings() {
public ContextualBindingsArrayTemplate<?, Statement<?>> getContextualBindings() {
return template;
}
@ -90,13 +104,13 @@ public class ReadyCQLStatementTemplate {
this.resultCsvWriter = activity.getNamedWriter(name);
}
public void addResultSetOperators(ResultSetCycleOperator... addingOperators) {
resultSetCycleOperators = (resultSetCycleOperators==null) ? new ResultSetCycleOperator[0]: resultSetCycleOperators;
public void addResultSetOperators(D4ResultSetCycleOperator... addingOperators) {
pageInfoCycleOperators = (pageInfoCycleOperators ==null) ? new D4ResultSetCycleOperator[0]: pageInfoCycleOperators;
ResultSetCycleOperator[] newOperators = new ResultSetCycleOperator[resultSetCycleOperators.length + addingOperators.length];
System.arraycopy(resultSetCycleOperators,0,newOperators,0,resultSetCycleOperators.length);
System.arraycopy(addingOperators,0,newOperators,resultSetCycleOperators.length,addingOperators.length);
this.resultSetCycleOperators=newOperators;
D4ResultSetCycleOperator[] newOperators = new D4ResultSetCycleOperator[pageInfoCycleOperators.length + addingOperators.length];
System.arraycopy(pageInfoCycleOperators,0,newOperators,0, pageInfoCycleOperators.length);
System.arraycopy(addingOperators,0,newOperators, pageInfoCycleOperators.length,addingOperators.length);
this.pageInfoCycleOperators =newOperators;
}
public void addRowCycleOperators(RowCycleOperator... addingOperators) {

View File

@ -1,18 +1,19 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ResultSetVerificationException;
/**
* Throws a {@link ResultSetVerificationException} unless there is exactly one row in the result set.
*/
public class AssertSingleRowResultSet implements ResultSetCycleOperator {
public class AssertSingleRowD4ResultSet implements D4ResultSetCycleOperator {
@Override
public int apply(ResultSet resultSet, Statement statement, long cycle) {
int rowsIncoming = resultSet.getAvailableWithoutFetching();
@Override
public int apply(AsyncResultSet resultSet, Statement<?> statement, long cycle) {
int rowsIncoming = resultSet.remaining();
if (rowsIncoming<1) {
throw new ResultSetVerificationException(cycle, resultSet, statement, "no row in result set, expected exactly 1");
}

View File

@ -1,14 +1,15 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
public class ClearVars implements ResultSetCycleOperator {
public class ClearVars implements D4ResultSetCycleOperator {
@Override
public int apply(ResultSet resultSet, Statement statement, long cycle) {
public int apply(AsyncResultSet pageInfo, Statement<?> statement, long cycle) {
SharedState.tl_ObjectMap.get().clear();
return 0;
}

View File

@ -1,7 +1,7 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.*;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -9,8 +9,8 @@ import org.slf4j.LoggerFactory;
* Logs a trace-level event for the result set, including
* cycles, rows, fetched row count, and the statement.
*/
public class CqlResultSetLogger implements ResultSetCycleOperator {
private final static Logger logger = LoggerFactory.getLogger(CqlResultSetLogger.class);
public class CqlD4ResultSetLogger implements D4ResultSetCycleOperator {
private final static Logger logger = LoggerFactory.getLogger(CqlD4ResultSetLogger.class);
private static String getQueryString(Statement stmt) {
if (stmt instanceof PreparedStatement) {
@ -25,14 +25,14 @@ public class CqlResultSetLogger implements ResultSetCycleOperator {
}
@Override
public int apply(ResultSet resultSet, Statement statement, long cycle) {
public int apply(AsyncResultSet resultSet, Statement statement, long cycle) {
logger.debug("result-set-logger: "
+ " cycle=" + cycle
+ " rows=" + resultSet.getAvailableWithoutFetching()
+ " fetched=" + resultSet.isFullyFetched()
+ " remaining=" + resultSet.remaining()
+ " hasmore=" + resultSet.hasMorePages()
+ " statement=" + getQueryString(statement).stripTrailing()
);
for (Row row : resultSet) {
for (Row row : resultSet.currentPage()) {
logger.trace(row.toString());
}
return 0;

View File

@ -1,16 +1,17 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
import java.util.HashMap;
public class PopVars implements ResultSetCycleOperator {
public class PopVars implements D4ResultSetCycleOperator {
@Override
public int apply(ResultSet resultSet, Statement statement, long cycle) {
public int apply(AsyncResultSet resultSet, Statement<?> statement, long cycle) {
HashMap<String, Object> stringObjectHashMap = SharedState.tl_ObjectMap.get();
Object o = SharedState.tl_ObjectStack.get().pollLast();
if (o != null && o instanceof HashMap) {

View File

@ -1,13 +1,14 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
public class Print implements ResultSetCycleOperator {
public class Print implements D4ResultSetCycleOperator {
@Override
public int apply(ResultSet resultSet, Statement statement, long cycle) {
public int apply(AsyncResultSet resultSet, Statement<?> statement, long cycle) {
System.out.println("RS:"+ resultSet.toString());
return 0;
}

View File

@ -1,16 +1,16 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState;
import java.util.HashMap;
public class PushVars implements ResultSetCycleOperator {
public class PushVars implements D4ResultSetCycleOperator {
@Override
public int apply(ResultSet resultSet, Statement statement, long cycle) {
public int apply(AsyncResultSet resultSet, Statement<?> statement, long cycle) {
HashMap<String, Object> existingVars = SharedState.tl_ObjectMap.get();
HashMap<String, Object> topush = new HashMap<>(existingVars);

View File

@ -1,6 +1,6 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
public enum ResultSetCycleOperators {
@ -9,23 +9,23 @@ public enum ResultSetCycleOperators {
clearvars(ClearVars.class),
trace(TraceLogger.class),
log(CqlResultSetLogger.class),
assert_singlerow(AssertSingleRowResultSet.class),
log(CqlD4ResultSetLogger.class),
assert_singlerow(AssertSingleRowD4ResultSet.class),
print(Print.class);
private final Class<? extends ResultSetCycleOperator> implClass;
private final Class<? extends D4ResultSetCycleOperator> implClass;
ResultSetCycleOperators(Class<? extends ResultSetCycleOperator> traceLoggerClass) {
ResultSetCycleOperators(Class<? extends D4ResultSetCycleOperator> traceLoggerClass) {
this.implClass = traceLoggerClass;
}
public Class<? extends ResultSetCycleOperator> getImplementation() {
public Class<? extends D4ResultSetCycleOperator> getImplementation() {
return implClass;
}
public ResultSetCycleOperator getInstance() {
public D4ResultSetCycleOperator getInstance() {
try {
return getImplementation().getConstructor().newInstance();
} catch (Exception e) {
@ -33,7 +33,7 @@ public enum ResultSetCycleOperators {
}
}
public static ResultSetCycleOperator newOperator(String name) {
public static D4ResultSetCycleOperator newOperator(String name) {
return ResultSetCycleOperators.valueOf(name).getInstance();
}

View File

@ -1,15 +1,15 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import java.util.LinkedList;
public class RowCapture implements ResultSetCycleOperator {
public class RowCapture implements D4ResultSetCycleOperator {
@Override
public int apply(ResultSet resultSet, Statement statement, long cycle) {
public int apply(AsyncResultSet resultSet, Statement<?> statement, long cycle) {
ThreadLocal<LinkedList<Row>> rows = PerThreadCQLData.rows;
return 0;
}

View File

@ -1,7 +1,7 @@
package io.nosqlbench.activitytype.cqld4.statements.rsoperators;
import com.datastax.oss.driver.api.core.cql.*;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.core.StatementModifier;
import io.nosqlbench.engine.api.util.SimpleConfig;
import org.slf4j.Logger;
@ -13,7 +13,7 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class TraceLogger implements ResultSetCycleOperator, StatementModifier {
public class TraceLogger implements D4ResultSetCycleOperator, StatementModifier {
private final static Logger logger = LoggerFactory.getLogger(TraceLogger.class);
@ -46,7 +46,8 @@ public class TraceLogger implements ResultSetCycleOperator, StatementModifier {
}
@Override
public int apply(ResultSet rs, Statement statement, long cycle) {
public int apply(AsyncResultSet rs, Statement<?> statement, long cycle) {
rs.getExecutionInfo().getQueryTrace();
if ((cycle%modulo)!=0) {
return 0;
}

View File

@ -246,7 +246,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cqld4</artifactId>
<version>3.12.104-SNAPSHOT</version>
<version>3.12.120-SNAPSHOT</version>
</dependency>
</dependencies>
</profile>