From 3628cc0f24e1d293dfbdc30fdd2cd2ee2ce732e7 Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Mon, 8 Jun 2020 13:15:36 -0500 Subject: [PATCH] cqld4 incremental process --- driver-cqld4/pom.xml | 4 +- ...tor.java => D4ResultSetCycleOperator.java} | 7 +- .../cqld4/core/CQLBindHelper.java | 8 +- .../activitytype/cqld4/core/CqlAction.java | 161 +++++------ .../activitytype/cqld4/core/CqlActivity.java | 124 ++++---- .../cqld4/core/CqlAsyncAction.java | 76 ++--- .../activitytype/cqld4/core/CqlOpData.java | 33 ++- .../cqld4/core/StatementModifier.java | 2 +- .../cqld4/errorhandling/CQLExceptionEnum.java | 61 ++-- .../errorhandling/HashedCQLErrorHandler.java | 11 +- .../errorhandling/NBCycleErrorHandler.java | 6 +- .../exceptions/CQLResultSetException.java | 29 +- .../ChangeUnappliedCycleException.java | 9 + .../ResultSetVerificationException.java | 5 +- .../exceptions/UnexpectedPagingException.java | 7 +- .../statements/binders/CqlBinderTypes.java | 12 +- .../binders/DiagnosticPreparedBinder.java | 14 +- .../binders/DirectArrayValuesBinder.java | 10 +- .../statements/core/CQLSessionCache.java | 264 ++++++++++-------- .../statements/core/ReadyCQLStatement.java | 14 +- .../core/ReadyCQLStatementTemplate.java | 46 +-- ...t.java => AssertSingleRowD4ResultSet.java} | 11 +- .../statements/rsoperators/ClearVars.java | 7 +- ...tLogger.java => CqlD4ResultSetLogger.java} | 14 +- .../cqld4/statements/rsoperators/PopVars.java | 7 +- .../cqld4/statements/rsoperators/Print.java | 7 +- .../statements/rsoperators/PushVars.java | 8 +- .../rsoperators/ResultSetCycleOperators.java | 16 +- .../statements/rsoperators/RowCapture.java | 8 +- .../statements/rsoperators/TraceLogger.java | 7 +- nb/pom.xml | 2 +- 31 files changed, 544 insertions(+), 446 deletions(-) rename driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/api/{ResultSetCycleOperator.java => D4ResultSetCycleOperator.java} (69%) rename driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/{AssertSingleRowResultSet.java => AssertSingleRowD4ResultSet.java} (67%) rename driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/{CqlResultSetLogger.java => CqlD4ResultSetLogger.java} (73%) diff --git a/driver-cqld4/pom.xml b/driver-cqld4/pom.xml index bb82b349f..f6a2b9a5c 100644 --- a/driver-cqld4/pom.xml +++ b/driver-cqld4/pom.xml @@ -4,7 +4,7 @@ io.nosqlbench mvn-defaults - 3.12.104-SNAPSHOT + 3.12.120-SNAPSHOT ../mvn-defaults @@ -23,7 +23,7 @@ io.nosqlbench engine-api - 3.12.104-SNAPSHOT + 3.12.120-SNAPSHOT diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/api/ResultSetCycleOperator.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/api/D4ResultSetCycleOperator.java similarity index 69% rename from driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/api/ResultSetCycleOperator.java rename to driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/api/D4ResultSetCycleOperator.java index 42e24f742..bc8762d0b 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/api/ResultSetCycleOperator.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/api/D4ResultSetCycleOperator.java @@ -1,18 +1,19 @@ package io.nosqlbench.activitytype.cqld4.api; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Statement; /** * An operator interface for performing a modular action on CQL ResultSets per-cycle. */ -public interface ResultSetCycleOperator { +public interface D4ResultSetCycleOperator { /** * Perform an action on a result set for a specific cycle. - * @param resultSet The ResultSet for the given cycle + * @param pageInfo The ResultSet for the given cycle * @param statement The statement for the given cycle * @param cycle The cycle for which the statement was submitted * @return A value, only meaningful when used with aggregated operators */ - int apply(ResultSet resultSet, Statement statement, long cycle); + int apply(AsyncResultSet pageInfo, Statement statement, long cycle); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CQLBindHelper.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CQLBindHelper.java index 3b81b8f07..669f561e2 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CQLBindHelper.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CQLBindHelper.java @@ -1,5 +1,6 @@ package io.nosqlbench.activitytype.cqld4.core; +import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.ProtocolVersion; import com.datastax.oss.driver.api.core.cql.*; import com.datastax.oss.driver.api.core.session.Session; @@ -18,14 +19,13 @@ import java.util.regex.Pattern; public class CQLBindHelper { private final ProtocolVersion protocolVersion; - private final Session session; private final CodecRegistry codecRegistry; +// private final ColumnDefinitions definitions; // refrence ProtocolConstants.DataType - public CQLBindHelper(Session session) { - this.session = session; - this.protocolVersion = this.session.getContext().getProtocolVersion(); + public CQLBindHelper(CqlSession session) { + this.protocolVersion = session.getContext().getProtocolVersion(); this.codecRegistry = session.getContext().getCodecRegistry(); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAction.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAction.java index 8fba3e419..a23e7961c 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAction.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAction.java @@ -3,14 +3,12 @@ package io.nosqlbench.activitytype.cqld4.core; import com.codahale.metrics.Timer; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.cql.*; -import com.datastax.oss.driver.api.core.session.Session; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator; import io.nosqlbench.activitytype.cqld4.api.StatementFilter; import io.nosqlbench.activitytype.cqld4.errorhandling.ErrorStatus; import io.nosqlbench.activitytype.cqld4.errorhandling.HashedCQLErrorHandler; import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.CQLCycleWithStatementException; -import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ChangeUnappliedCycleException; import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.MaxTriesExhaustedException; import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.UnexpectedPagingException; import io.nosqlbench.activitytype.cqld4.statements.core.ReadyCQLStatement; @@ -34,7 +32,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser private final CqlActivity cqlActivity; private final ActivityDef activityDef; private List rowOps; - private List cycleOps; + private List cycleOps; private List modifiers; private StatementFilter statementFilter; private OpSequence sequencer; @@ -44,7 +42,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser private int pagesFetched = 0; private long totalRowsFetchedForQuery = 0L; - private ResultSet pagingResultSet; + private AsyncResultSet pagingResultSet; private Statement pagingStatement; private ReadyCQLStatement pagingReadyStatement; private boolean showcql; @@ -125,84 +123,74 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser } } + CompletionStage completion; try (Timer.Context executeTime = cqlActivity.executeTimer.time()) { - CompletionStage completion = cqlActivity.getSession().executeAsync(statement); + completion = cqlActivity.getSession().executeAsync(statement); } Timer.Context resultTime = cqlActivity.resultTimer.time(); try { - ResultSet resultSet = resultSetFuture.getUninterruptibly(); + AsyncResultSet resultSet = completion.toCompletableFuture().get(); if (cycleOps != null) { - for (ResultSetCycleOperator cycleOp : cycleOps) { + for (D4ResultSetCycleOperator cycleOp : cycleOps) { cycleOp.apply(resultSet, statement, cycleValue); } } - ResultSetCycleOperator[] perStmtRSOperators = readyCQLStatement.getResultSetOperators(); - if (perStmtRSOperators != null) { - for (ResultSetCycleOperator perStmtRSOperator : perStmtRSOperators) { + D4ResultSetCycleOperator[] rsOperators = readyCQLStatement.getResultSetOperators(); + if (rsOperators != null) { + for (D4ResultSetCycleOperator perStmtRSOperator : rsOperators) { perStmtRSOperator.apply(resultSet, statement, cycleValue); } } - if (!resultSet.wasApplied()) { - //resultSet.b - Row row = resultSet.one(); - ColumnDefinitions defs = row.getColumnDefinitions(); - if (retryReplace) { - statement = - new CQLBindHelper(getCqlActivity().getSession()).rebindUnappliedStatement(statement, defs,row); - } - - logger.trace(readyCQLStatement.getQueryString(cycleValue)); - // To make exception handling logic flow more uniformly - throw new ChangeUnappliedCycleException( - cycleValue, resultSet, readyCQLStatement.getQueryString(cycleValue) - ); - } - - int pageRows = resultSet.getAvailableWithoutFetching(); - int remaining = pageRows; - RowCycleOperator[] perStmtRowOperators = readyCQLStatement.getRowCycleOperators(); - if (rowOps == null && perStmtRowOperators==null) { - while (remaining-- > 0) { - Row row = resultSet.one(); - -// NOTE: This has been replaced by: -// params: -// rowops: savevars -// You must add this to the YAML for statements that are meant to capture vars -// HashMap bindings = SharedState.tl_ObjectMap.get(); -// for (ColumnDefinitions.Definition cdef : row.getColumnDefinitions()) { -// bindings.put(cdef.getName(), row.getObject(cdef.getName())); -// } + // TODO: Add parameter rebind support in cqld4 via op +// if (!resultSet.wasApplied()) { +// //resultSet.b +// Row row = resultSet.one(); +// ColumnDefinitions defs = row.getColumnDefinitions(); +// if (retryReplace) { +// statement = +// new CQLBindHelper(getCqlActivity().getSession()).rebindUnappliedStatement(statement, defs,row); +// } // +// logger.trace(readyCQLStatement.getQueryString(cycleValue)); +// // To make exception handling logic flow more uniformly +// throw new ChangeUnappliedCycleException( +// cycleValue, resultSet, readyCQLStatement.getQueryString(cycleValue) +// ); +// } + +// int pageRows = resultSet.getAvailableWithoutFetching(); + + int rowsInPage=0; + RowCycleOperator[] perStmtRowOperators = readyCQLStatement.getRowCycleOperators(); + + if (rowOps==null && perStmtRowOperators==null) { + for (Row row : resultSet.currentPage()) { + rowsInPage++; } } else { - while (remaining-- > 0) { - Row onerow = resultSet.one(); + for (Row row : resultSet.currentPage()) { if (rowOps!=null) { for (RowCycleOperator rowOp : rowOps) { - rowOp.apply(onerow, cycleValue); + rowOp.apply(row, cycleValue); } } if (perStmtRowOperators!=null) { for (RowCycleOperator rowOp : perStmtRowOperators) { - rowOp.apply(onerow, cycleValue); + rowOp.apply(row, cycleValue); } } + rowsInPage++; } } - cqlActivity.rowsCounter.mark(pageRows); - totalRowsFetchedForQuery += pageRows; - if (resultSet.isFullyFetched()) { - long resultNanos = System.nanoTime() - nanoStartTime; - cqlActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS); - cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery); - readyCQLStatement.onSuccess(cycleValue, resultNanos, totalRowsFetchedForQuery); - } else { + cqlActivity.rowsCounter.mark(rowsInPage); + totalRowsFetchedForQuery += rowsInPage; + + if (resultSet.hasMorePages()) { if (cqlActivity.maxpages > 1) { pagingResultSet = resultSet; pagingStatement = statement; @@ -218,6 +206,11 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser cqlActivity.getSession().getContext().getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE) ); } + } else { + long resultNanos = System.nanoTime() - nanoStartTime; + cqlActivity.resultSuccessTimer.update(resultNanos, TimeUnit.NANOSECONDS); + cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery); + readyCQLStatement.onSuccess(cycleValue, resultNanos, totalRowsFetchedForQuery); } break; // This is normal termination of this loop, when retries aren't needed } catch (Exception e) { @@ -248,56 +241,58 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser throw new MaxTriesExhaustedException(cycleValue, maxTries); } - ListenableFuture pagingFuture; - try (Timer.Context pagingTime = cqlActivity.pagesTimer.time()) { + + CompletionStage completion; try (Timer.Context executeTime = cqlActivity.executeTimer.time()) { - pagingFuture = pagingResultSet.fetchMoreResults(); + completion = pagingResultSet.fetchNextPage(); } Timer.Context resultTime = cqlActivity.resultTimer.time(); try { - ResultSet resultSet = pagingFuture.get(); + AsyncResultSet resultSet = completion.toCompletableFuture().get(); if (cycleOps != null) { - for (ResultSetCycleOperator cycleOp : cycleOps) { + for (D4ResultSetCycleOperator cycleOp : cycleOps) { cycleOp.apply(resultSet, pagingStatement, cycleValue); } } - ResultSetCycleOperator[] perStmtRSOperators = pagingReadyStatement.getResultSetOperators(); + D4ResultSetCycleOperator[] perStmtRSOperators = pagingReadyStatement.getResultSetOperators(); if (perStmtRSOperators != null) { - for (ResultSetCycleOperator perStmtRSOperator : perStmtRSOperators) { + for (D4ResultSetCycleOperator perStmtRSOperator : perStmtRSOperators) { perStmtRSOperator.apply(resultSet, pagingStatement, cycleValue); } } pagesFetched++; - int pageRows = resultSet.getAvailableWithoutFetching(); - int remaining = pageRows; - if (rowOps == null) { - while (remaining-- > 0) { - resultSet.one(); + RowCycleOperator[] perStmtRowCycleOp = pagingReadyStatement.getRowCycleOperators(); + int rowsInPage=0; + + if (rowOps==null && perStmtRowCycleOp==null) { + for (Row row : resultSet.currentPage()) { + rowsInPage++; } } else { - while (remaining-- > 0) { - for (RowCycleOperator rowOp : rowOps) { - rowOp.apply(resultSet.one(), cycleValue); - + for (Row row : resultSet.currentPage()) { + rowsInPage++; + if (rowOps!=null) { + for (RowCycleOperator rowOp : rowOps) { + rowOp.apply(row,cycleValue); + } + } + if (perStmtRowCycleOp!=null) { + for (RowCycleOperator rowCycleOperator : perStmtRowCycleOp) { + rowCycleOperator.apply(row,cycleValue); + } } } } - cqlActivity.rowsCounter.mark(pageRows); - totalRowsFetchedForQuery += pageRows; - if (resultSet.isFullyFetched()) { - long nanoTime = System.nanoTime() - nanoStartTime; - cqlActivity.resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS); - cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery); - pagingReadyStatement.onSuccess(cycleValue, nanoTime, totalRowsFetchedForQuery); - pagingResultSet = null; + cqlActivity.rowsCounter.mark(rowsInPage); + totalRowsFetchedForQuery += rowsInPage; - } else { + if (resultSet.hasMorePages()) { if (pagesFetched > cqlActivity.maxpages) { throw new UnexpectedPagingException( cycleValue, @@ -309,6 +304,12 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser ); } pagingResultSet = resultSet; + } else { + long nanoTime = System.nanoTime() - nanoStartTime; + cqlActivity.resultSuccessTimer.update(nanoTime, TimeUnit.NANOSECONDS); + cqlActivity.resultSetSizeHisto.update(totalRowsFetchedForQuery); + pagingReadyStatement.onSuccess(cycleValue, nanoTime, totalRowsFetchedForQuery); + pagingResultSet = null; } break; // This is normal termination of this loop, when retries aren't needed } catch (Exception e) { @@ -350,7 +351,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser this.ebdseErrorHandler = cqlActivity.getCqlErrorHandler(); this.statementFilter = cqlActivity.getStatementFilter(); this.rowOps = cqlActivity.getRowCycleOperators(); - this.cycleOps = cqlActivity.getResultSetCycleOperators(); + this.cycleOps = cqlActivity.getPageInfoCycleOperators(); this.modifiers = cqlActivity.getStatementModifiers(); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivity.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivity.java index 8be641e5d..859f20048 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivity.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlActivity.java @@ -3,16 +3,15 @@ package io.nosqlbench.activitytype.cqld4.core; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; -import com.datastax.driver.core.*; -import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.cql.*; import com.datastax.oss.driver.api.core.session.Session; import io.nosqlbench.activitytype.cqld4.codecsupport.UDTCodecInjector; import com.datastax.driver.core.TokenRangeStmtFilter; import io.nosqlbench.activitytype.cqld4.api.ErrorResponse; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator; import io.nosqlbench.activitytype.cqld4.api.StatementFilter; import io.nosqlbench.activitytype.cqld4.errorhandling.NBCycleErrorHandler; @@ -82,7 +81,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef private StatementFilter statementFilter; private Boolean showcql; private List rowCycleOperators; - private List resultSetCycleOperators; + private List pageInfoCycleOperators; private List statementModifiers; private Long maxTotalOpsInFlight; private long retryDelay; @@ -109,7 +108,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef public synchronized void initActivity() { logger.debug("initializing activity: " + this.activityDef.getAlias()); profileName = getParams().getOptionalString("profile").orElse("default"); - session = getSession(profileName); + session = getSession(); if (getParams().getOptionalBoolean("usercodecs").orElse(false)) { registerCodecs(session); @@ -131,9 +130,9 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef logger.debug("activity fully initialized: " + this.activityDef.getAlias()); } - public synchronized CqlSession getSession(String profileName) { + public synchronized CqlSession getSession() { if (session == null) { - session = CQLSessionCache.get().getSession(this.getActivityDef(), profileName); + session = CQLSessionCache.get().getSession(this.getActivityDef()).session; } return session; } @@ -430,7 +429,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef } logger.trace("setting fetchSize to " + fetchSize); - cluster.getConfiguration().getQueryOptions().setFetchSize(fetchSize); + CQLSessionCache.get().getSession(activityDef).set(DefaultDriverOption.REQUEST_PAGE_SIZE,fetchSize); } this.retryDelay = params.getOptionalLong("retrydelay").orElse(0L); @@ -441,7 +440,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef this.maxpages = params.getOptionalInteger("maxpages").orElse(1); this.statementFilter = params.getOptionalString("tokens") - .map(s -> new TokenRangeStmtFilter(cluster, s)) + .map(s -> new TokenRangeStmtFilter(getSession(), s)) .orElse(null); if (statementFilter != null) { @@ -461,49 +460,50 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef this.maxTotalOpsInFlight = params.getOptionalLong("async").orElse(1L); - Optional dynpooling = params.getOptionalString("pooling"); - if (dynpooling.isPresent()) { - logger.info("dynamically updating pooling"); - if (!dynpooling.get().equals(this.pooling)) { - PoolingOptions opts = CQLOptions.poolingOptionsFor(dynpooling.get()); - logger.info("pooling=>" + dynpooling.get()); - - PoolingOptions cfg = getSession().getCluster().getConfiguration().getPoolingOptions(); - - // This looks funny, because we have to set max conns per host - // in an order that will appease the driver, as there is no "apply settings" - // to do that for us, so we raise max first if it goes higher, and we lower - // it last, if it goes lower - int prior_mcph_l = cfg.getMaxConnectionsPerHost(HostDistance.LOCAL); - int mcph_l = opts.getMaxConnectionsPerHost(HostDistance.LOCAL); - int ccph_l = opts.getCoreConnectionsPerHost(HostDistance.LOCAL); - if (prior_mcph_l < mcph_l) { - logger.info("setting mcph_l to " + mcph_l); - cfg.setMaxConnectionsPerHost(HostDistance.LOCAL, mcph_l); - } - logger.info("setting ccph_l to " + ccph_l); - cfg.setCoreConnectionsPerHost(HostDistance.LOCAL, ccph_l); - if (mcph_l < prior_mcph_l) { - logger.info("setting mcph_l to " + mcph_l); - cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, mcph_l); - } - cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, opts.getMaxRequestsPerConnection(HostDistance.LOCAL)); - - int prior_mcph_r = cfg.getMaxConnectionsPerHost(HostDistance.REMOTE); - int mcph_r = opts.getMaxConnectionsPerHost(HostDistance.REMOTE); - int ccph_r = opts.getCoreConnectionsPerHost(HostDistance.REMOTE); - - if (mcph_r > 0) { - if (mcph_r > prior_mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r); - opts.setCoreConnectionsPerHost(HostDistance.REMOTE, ccph_r); - if (prior_mcph_r > mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r); - if (opts.getMaxConnectionsPerHost(HostDistance.REMOTE) > 0) { - cfg.setMaxRequestsPerConnection(HostDistance.REMOTE, opts.getMaxRequestsPerConnection(HostDistance.REMOTE)); - } - } - this.pooling = dynpooling.get(); - } - } + // TODO: Support dynamic pooling options +// Optional dynpooling = params.getOptionalString("pooling"); +// if (dynpooling.isPresent()) { +// logger.info("dynamically updating pooling"); +// if (!dynpooling.get().equals(this.pooling)) { +// PoolingOptions opts = CQLOptions.poolingOptionsFor(dynpooling.get()); +// logger.info("pooling=>" + dynpooling.get()); +// +// PoolingOptions cfg = getSession().getCluster().getConfiguration().getPoolingOptions(); +// +// // This looks funny, because we have to set max conns per host +// // in an order that will appease the driver, as there is no "apply settings" +// // to do that for us, so we raise max first if it goes higher, and we lower +// // it last, if it goes lower +// int prior_mcph_l = cfg.getMaxConnectionsPerHost(HostDistance.LOCAL); +// int mcph_l = opts.getMaxConnectionsPerHost(HostDistance.LOCAL); +// int ccph_l = opts.getCoreConnectionsPerHost(HostDistance.LOCAL); +// if (prior_mcph_l < mcph_l) { +// logger.info("setting mcph_l to " + mcph_l); +// cfg.setMaxConnectionsPerHost(HostDistance.LOCAL, mcph_l); +// } +// logger.info("setting ccph_l to " + ccph_l); +// cfg.setCoreConnectionsPerHost(HostDistance.LOCAL, ccph_l); +// if (mcph_l < prior_mcph_l) { +// logger.info("setting mcph_l to " + mcph_l); +// cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, mcph_l); +// } +// cfg.setMaxRequestsPerConnection(HostDistance.LOCAL, opts.getMaxRequestsPerConnection(HostDistance.LOCAL)); +// +// int prior_mcph_r = cfg.getMaxConnectionsPerHost(HostDistance.REMOTE); +// int mcph_r = opts.getMaxConnectionsPerHost(HostDistance.REMOTE); +// int ccph_r = opts.getCoreConnectionsPerHost(HostDistance.REMOTE); +// +// if (mcph_r > 0) { +// if (mcph_r > prior_mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r); +// opts.setCoreConnectionsPerHost(HostDistance.REMOTE, ccph_r); +// if (prior_mcph_r > mcph_r) opts.setMaxConnectionsPerHost(HostDistance.REMOTE, mcph_r); +// if (opts.getMaxConnectionsPerHost(HostDistance.REMOTE) > 0) { +// cfg.setMaxRequestsPerConnection(HostDistance.REMOTE, opts.getMaxRequestsPerConnection(HostDistance.REMOTE)); +// } +// } +// this.pooling = dynpooling.get(); +// } +// } } @@ -528,7 +528,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef ErrorResponse.valueOf(verb), exceptionCountMetrics, exceptionHistoMetrics, - !getParams().getOptionalLong("async").isPresent() + getParams().getOptionalLong("async").isEmpty() ) ); } else { @@ -540,7 +540,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef ErrorResponse.valueOf(verb), exceptionCountMetrics, exceptionHistoMetrics, - !getParams().getOptionalLong("async").isPresent() + getParams().getOptionalLong("async").isEmpty() ); logger.info("Handling error group '" + pattern + "' with handler:" + handler); newerrorHandler.setHandlerForGroup(pattern, handler); @@ -549,7 +549,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef ErrorResponse.valueOf(keyval[1]), exceptionCountMetrics, exceptionHistoMetrics, - !getParams().getOptionalLong("async").isPresent() + getParams().getOptionalLong("async").isEmpty() ); logger.info("Handling error pattern '" + pattern + "' with handler:" + handler); newerrorHandler.setHandlerForPattern(keyval[0], handler); @@ -599,19 +599,19 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef this.rowCycleOperators = null; } - public List getResultSetCycleOperators() { - return resultSetCycleOperators; + public List getPageInfoCycleOperators() { + return pageInfoCycleOperators; } - protected synchronized void addResultSetCycleOperator(ResultSetCycleOperator resultSetCycleOperator) { - if (this.resultSetCycleOperators == null) { - this.resultSetCycleOperators = new ArrayList<>(); + protected synchronized void addResultSetCycleOperator(D4ResultSetCycleOperator pageInfoCycleOperator) { + if (this.pageInfoCycleOperators == null) { + this.pageInfoCycleOperators = new ArrayList<>(); } - this.resultSetCycleOperators.add(resultSetCycleOperator); + this.pageInfoCycleOperators.add(pageInfoCycleOperator); } private void clearResultSetCycleOperators() { - this.resultSetCycleOperators = null; + this.pageInfoCycleOperators = null; } public List getStatementModifiers() { diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java index beede72aa..1702eb60e 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlAsyncAction.java @@ -1,9 +1,11 @@ package io.nosqlbench.activitytype.cqld4.core; import com.codahale.metrics.Timer; -import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.config.TypedDriverOption; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.Row; import io.nosqlbench.activitytype.cqld4.api.ErrorResponse; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator; import io.nosqlbench.activitytype.cqld4.api.StatementFilter; import io.nosqlbench.activitytype.cqld4.errorhandling.ErrorStatus; @@ -11,6 +13,7 @@ import io.nosqlbench.activitytype.cqld4.errorhandling.HashedCQLErrorHandler; import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.CQLCycleWithStatementException; import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ChangeUnappliedCycleException; import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.UnexpectedPagingException; +import io.nosqlbench.activitytype.cqld4.statements.core.CQLSessionCache; import io.nosqlbench.activitytype.cqld4.statements.core.ReadyCQLStatement; import io.nosqlbench.engine.api.activityapi.core.BaseAsyncAction; import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.FailedOp; @@ -23,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.function.LongFunction; @@ -33,7 +37,7 @@ public class CqlAsyncAction extends BaseAsyncAction { private final ActivityDef activityDef; private List rowOps; - private List cycleOps; + private List cycleOps; private List modifiers; private StatementFilter statementFilter; private OpSequence sequencer; @@ -112,13 +116,13 @@ public class CqlAsyncAction extends BaseAsyncAction { // The execute timer covers only the point at which EB hands the op to the driver to be executed try (Timer.Context executeTime = activity.executeTimer.time()) { - cqlop.completionStage = activity.getSession().executeAsync(cqlop.statement); - Futures.addCallback(cqlop.completionStage, cqlop); + CompletionStage completionStage = activity.getSession().executeAsync(cqlop.statement); + completionStage.whenComplete(cqlop::handleAsyncResult); } } - public void onSuccess(StartedOp sop) { + public void onSuccess(StartedOp sop, AsyncResultSet resultSet) { CqlOpData cqlop = sop.getData(); HashedCQLErrorHandler.resetThreadStatusCode(); @@ -128,39 +132,41 @@ public class CqlAsyncAction extends BaseAsyncAction { try { - ResultSet resultSet = cqlop.resultSet; cqlop.totalPagesFetchedForQuery++; // Apply any defined ResultSetCycleOperators - if (cycleOps != null) { - for (ResultSetCycleOperator cycleOp : cycleOps) { - cycleOp.apply(resultSet, cqlop.statement, cqlop.cycle); + // TODO: Implement result and row operators for cqld4 actions +// if (cycleOps != null) { +// for (ResultSetCycleOperator cycleOp : cycleOps) { +// cycleOp.apply(resultSet, cqlop.statement, cqlop.cycle); +// resultSet. +// } +// } +// + int rowsInPage = 0; +// if (rowOps==null) { + for (Row row : resultSet.currentPage()) { + rowsInPage++; } - } - - int pageRows = resultSet.getAvailableWithoutFetching(); - int remaining = pageRows; - if (rowOps == null) { - while (remaining-- > 0) { - resultSet.one(); - } - } else { - while (remaining-- > 0) { - for (RowCycleOperator rowOp : rowOps) { - rowOp.apply(resultSet.one(), cqlop.cycle); - } - } - } - cqlop.totalRowsFetchedForQuery += pageRows; +// } else { +// for (Row row : resultSet.currentPage()) { +// rowsInPage++; +// for (RowCycleOperator rowOp : rowOps) { +// rowOp.apply(row, cqlop.cycle); +// } +// } +// } + cqlop.totalRowsFetchedForQuery += rowsInPage; if (cqlop.totalPagesFetchedForQuery++ > activity.maxpages) { + Integer pagesize = CQLSessionCache.get().getSession(activityDef).optionsMap.get(TypedDriverOption.REQUEST_PAGE_SIZE); throw new UnexpectedPagingException( cqlop.cycle, resultSet, cqlop.readyCQLStatement.getQueryString(cqlop.cycle), 1, activity.maxpages, - activity.getSession().getCluster().getConfiguration().getQueryOptions().getFetchSize() + pagesize ); } @@ -171,10 +177,11 @@ public class CqlAsyncAction extends BaseAsyncAction { ); } - if (!resultSet.isFullyFetched()) { + + if (!resultSet.hasMorePages()) { logger.trace("async paging request " + cqlop.totalPagesFetchedForQuery + " for cycle " + cqlop.cycle); - ListenableFuture resultSetListenableFuture = resultSet.fetchMoreResults(); - Futures.addCallback(resultSetListenableFuture, cqlop); + + resultSet.fetchNextPage().whenComplete(cqlop::handleAsyncResult); return; } @@ -196,9 +203,7 @@ public class CqlAsyncAction extends BaseAsyncAction { ErrorStatus errorStatus = cqlActivityErrorHandler.handleError(cqlop.cycle, cqlCycleException); if (errorStatus.isRetryable() && ++cqlop.triesAttempted < maxTries) { - ResultSetFuture resultSetFuture = activity.getSession().executeAsync(cqlop.statement); - sop.retry(); - Futures.addCallback(resultSetFuture, cqlop); + activity.getSession().executeAsync(cqlop.statement).whenComplete(cqlop::handleAsyncResult); return; } else { sop.fail(errorStatus.getResultCode()); @@ -231,8 +236,7 @@ public class CqlAsyncAction extends BaseAsyncAction { if (errorStatus.isRetryable() && cqlop.triesAttempted < maxTries) { startedOp.retry(); try (Timer.Context executeTime = activity.executeTimer.time()) { - cqlop.completionStage = activity.getSession().executeAsync(cqlop.statement); - Futures.addCallback(cqlop.completionStage, cqlop); + activity.getSession().executeAsync(cqlop.statement).whenComplete(cqlop::handleAsyncResult); return; } } @@ -252,7 +256,7 @@ public class CqlAsyncAction extends BaseAsyncAction { this.cqlActivityErrorHandler = activity.getCqlErrorHandler(); this.statementFilter = activity.getStatementFilter(); this.rowOps = activity.getRowCycleOperators(); - this.cycleOps = activity.getResultSetCycleOperators(); + this.cycleOps = activity.getPageInfoCycleOperators(); this.modifiers = activity.getStatementModifiers(); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlOpData.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlOpData.java index 7c4372d01..4f46a18aa 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlOpData.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/CqlOpData.java @@ -5,13 +5,17 @@ import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.Statement; import io.nosqlbench.activitytype.cqld4.statements.core.ReadyCQLStatement; import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.StartedOp; +import org.jetbrains.annotations.NotNull; + import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; +import java.util.function.Function; -public class CqlOpData extends CompletableFuture { +public class CqlOpData { final long cycle; - public CompletionStage completionStage; +// public CompletionStage completionStage; // op state is managed via callbacks, we keep a ref here StartedOp startedOp; @@ -36,21 +40,16 @@ public class CqlOpData extends CompletableFuture { this.action = action; } - @Override - public boolean completeExceptionally(Throwable ex) { - this.throwable=ex; - this.errorAt = System.nanoTime(); - action.onFailure(startedOp); - return true; - } - - @Override - public boolean complete(AsyncResultSet value) { - this.page = value.currentPage(); - this.resultAt = System.nanoTime(); - action.onSuccess(startedOp); - return true; - // ? return !value.hasMorePages(); + public void handleAsyncResult(AsyncResultSet asyncResultSet, Throwable throwable) { + if (throwable!=null) { + this.throwable = throwable; + this.errorAt = System.nanoTime(); + action.onFailure(startedOp); + } else { + this.page = asyncResultSet.currentPage(); + this.resultAt = System.nanoTime(); + action.onSuccess(startedOp, asyncResultSet); + } } } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/StatementModifier.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/StatementModifier.java index 4c2492098..f4dd91453 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/StatementModifier.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/core/StatementModifier.java @@ -7,5 +7,5 @@ import com.datastax.oss.driver.api.core.cql.Statement; * Each active modifier returns a statement in turn. */ public interface StatementModifier { - Statement modify(Statement unmodified, long cycleNum); + Statement modify(Statement unmodified, long cycleNum); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/CQLExceptionEnum.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/CQLExceptionEnum.java index d5df8e79c..5d4a81909 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/CQLExceptionEnum.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/CQLExceptionEnum.java @@ -1,13 +1,15 @@ package io.nosqlbench.activitytype.cqld4.errorhandling; -import com.datastax.oss.driver.api.core.DriverException; -import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException; -import com.datastax.oss.driver.api.core.auth.AuthenticationException; -import com.datastax.oss.driver.api.core.connection.BusyConnectionException; +import com.datastax.dse.driver.api.core.servererrors.UnfitClientException; +import com.datastax.oss.driver.api.core.*; +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; +import com.datastax.oss.driver.api.core.connection.ConnectionInitException; import com.datastax.oss.driver.api.core.connection.FrameTooLongException; +import com.datastax.oss.driver.api.core.connection.HeartbeatException; import com.datastax.oss.driver.api.core.servererrors.*; -import com.datastax.oss.driver.api.core.type.codec.CodecNotFoundException; -import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.*; +import com.datastax.oss.driver.internal.core.channel.ClusterNameMismatchException; +import com.datastax.oss.driver.shaded.guava.common.collect.ComputationException; +import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.CqlGenericCycleException; import io.nosqlbench.engine.api.activityapi.cyclelog.buffers.results.ResultReadable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +22,7 @@ import java.util.Map; /** * This enumerates all known exception classes, including supertypes, * for the purposes of stable naming in error handling. - * This is current as of com.datastax.cassandra:cassandra-driver-core:3.2.0 + * This is current as of driver 4.6.0 * * TODO: for cqld4, add all exceptions again, keeping the previous ones in their existing places, but eliding the * removed ones and leaving a place holder there, adding the new ones after @@ -32,17 +34,17 @@ public enum CQLExceptionEnum implements ResultReadable { DriverException(com.datastax.oss.driver.api.core.DriverException.class, 3), AuthenticationException(com.datastax.oss.driver.api.core.auth.AuthenticationException.class, 4), - TraceRetrievalException(TraceRetrievalException.class, 5), +// TraceRetrievalException(TraceRetrievalException.class, 5), UnsupportedProtocolVersionException(com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException.class, 6), - NoHostAvailableException(NoHostAvailableException.class, 7), +// NoHostAvailableException(NoHostAvailableException.class, 7), QueryValidationException(com.datastax.oss.driver.api.core.servererrors.QueryValidationException.class, 8), InvalidQueryException(com.datastax.oss.driver.api.core.servererrors.InvalidQueryException.class, 9), InvalidConfigurationInQueryException(com.datastax.oss.driver.api.core.servererrors.InvalidConfigurationInQueryException.class, 10), UnauthorizedException(com.datastax.oss.driver.api.core.servererrors.UnauthorizedException.class, 11), SyntaxError(com.datastax.oss.driver.api.core.servererrors.SyntaxError.class, 12), AlreadyExistsException(AlreadyExistsException.class, 13), - UnpreparedException(UnpreparedException.class, 14), - InvalidTypeException(InvalidTypeException.class, 15), +// UnpreparedException(UnpreparedException.class, 14), +// InvalidTypeException(InvalidTypeException.class, 15), QueryExecutionException(QueryExecutionException.class, 16), UnavailableException(UnavailableException.class, 17), BootstrappingException(BootstrappingException.class, 18), @@ -53,17 +55,17 @@ public enum CQLExceptionEnum implements ResultReadable { WriteFailureException(WriteFailureException.class, 23), ReadFailureException(ReadFailureException.class, 24), ReadTimeoutException(ReadTimeoutException.class, 25), - FunctionExecutionException(FunctionExecutionException.class, 26), - DriverInternalError(DriverInternalError.class, 27), +// FunctionExecutionException(FunctionExecutionException.class, 26), +// DriverInternalError(DriverInternalError.class, 27), ProtocolError(ProtocolError.class, 28), ServerError(ServerError.class, 29), - BusyPoolException(BusyPoolException.class, 30), - ConnectionException(ConnectionException.class, 31), - TransportException(TransportException.class, 32), - OperationTimedOutException(OperationTimedOutException.class, 33), - PagingStateException(PagingStateException.class, 34), - UnresolvedUserTypeException(UnresolvedUserTypeException.class, 35), - UnsupportedFeatureException(UnsupportedFeatureException.class, 36), +// BusyPoolException(BusyPoolException.class, 30), +// ConnectionException(ConnectionException.class, 31), +// TransportException(TransportException.class, 32), +// OperationTimedOutException(OperationTimedOutException.class, 33), +// PagingStateException(PagingStateException.class, 34), +// UnresolvedUserTypeException(UnresolvedUserTypeException.class, 35), +// UnsupportedFeatureException(UnsupportedFeatureException.class, 36), BusyConnectionException(com.datastax.oss.driver.api.core.connection.BusyConnectionException.class, 37), ChangeUnappliedCycleException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ChangeUnappliedCycleException.class, 38), @@ -71,7 +73,24 @@ public enum CQLExceptionEnum implements ResultReadable { RowVerificationException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.RowVerificationException.class, 40), UnexpectedPagingException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.UnexpectedPagingException.class, 41), EbdseCycleException(CqlGenericCycleException.class, 42), - MaxTriesExhaustedException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.MaxTriesExhaustedException.class,43); + MaxTriesExhaustedException(io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.MaxTriesExhaustedException.class,43), + + // Added for 4.6 + ClusterNameMismatchException(com.datastax.oss.driver.internal.core.channel.ClusterNameMismatchException.class, 44), + ComputationException(com.datastax.oss.driver.shaded.guava.common.collect.ComputationException.class,45), + AllNodesFailedException(com.datastax.oss.driver.api.core.AllNodesFailedException.class,46), + NoNodeAvailableException(com.datastax.oss.driver.api.core.NoNodeAvailableException.class,47), + ClosedConnectionException(ClosedConnectionException.class,48), + ConnectionInitException(com.datastax.oss.driver.api.core.connection.ConnectionInitException.class,49), + CoordinatorException(CoordinatorException.class,50), + FunctionFailureException(FunctionFailureException.class,51), + UnfitClientException(com.datastax.dse.driver.api.core.servererrors.UnfitClientException.class,52), + DriverExecutionException(com.datastax.oss.driver.api.core.DriverExecutionException.class,53), + DriverTimeoutException(com.datastax.oss.driver.api.core.DriverTimeoutException.class,54), + HeartbeatException(com.datastax.oss.driver.api.core.connection.HeartbeatException.class,55), + InvalidKeyspaceException(com.datastax.oss.driver.api.core.InvalidKeyspaceException.class,56), + RequestThrottlingException(RequestThrottlingException.class,57), + CqlGenericCycleException(CqlGenericCycleException.class,58); private final static Logger logger = LoggerFactory.getLogger(CQLExceptionEnum.class); diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/HashedCQLErrorHandler.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/HashedCQLErrorHandler.java index cd0350786..8d4cae63c 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/HashedCQLErrorHandler.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/HashedCQLErrorHandler.java @@ -1,6 +1,10 @@ package io.nosqlbench.activitytype.cqld4.errorhandling; +import com.datastax.oss.driver.api.core.DriverTimeoutException; import com.datastax.oss.driver.api.core.NoNodeAvailableException; +import com.datastax.oss.driver.api.core.RequestThrottlingException; +import com.datastax.oss.driver.api.core.connection.BusyConnectionException; +import com.datastax.oss.driver.api.core.connection.ClosedConnectionException; import com.datastax.oss.driver.api.core.servererrors.OverloadedException; import com.datastax.oss.driver.api.core.servererrors.ReadTimeoutException; import com.datastax.oss.driver.api.core.servererrors.UnavailableException; @@ -30,10 +34,13 @@ public class HashedCQLErrorHandler extends HashedErrorHandler statement; + private final AsyncResultSet resultSet; - public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement, String message, Throwable cause) { + public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement statement, String message, + Throwable cause) { super(cycle,message,cause); this.resultSet = resultSet; this.statement = statement; } - public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement) { + public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement statement) { super(cycle); this.resultSet = resultSet; this.statement = statement; } - public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement, String message) { + public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement statement, String message) { super(cycle,message); this.resultSet = resultSet; this.statement=statement; } - public CQLResultSetException(long cycle, ResultSet resultSet, Statement statement, Throwable cause) { + public CQLResultSetException(long cycle, AsyncResultSet resultSet, Statement statement, Throwable cause) { super(cycle,cause); this.resultSet = resultSet; this.statement = statement; } - public Statement getStatement() { + public Statement getStatement() { return statement; } - public ResultSet getResultSet() { + public AsyncResultSet getResultSet() { return resultSet; } - protected static String getQueryString(Statement stmt) { + protected static String getQueryString(Statement stmt) { if (stmt instanceof BoundStatement) { - return ((BoundStatement)stmt).preparedStatement().getQueryString(); + + return ((BoundStatement)stmt).getPreparedStatement().getQuery(); } else if (stmt instanceof SimpleStatement) { - return ((SimpleStatement) stmt).getQueryString(); + return ((SimpleStatement) stmt).getQuery(); } else { return "UNKNOWN Statement type:" + stmt.getClass().getSimpleName(); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ChangeUnappliedCycleException.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ChangeUnappliedCycleException.java index 662b6255c..3774f82fe 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ChangeUnappliedCycleException.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ChangeUnappliedCycleException.java @@ -1,5 +1,6 @@ package io.nosqlbench.activitytype.cqld4.errorhandling.exceptions; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; /** @@ -12,11 +13,19 @@ public class ChangeUnappliedCycleException extends CqlGenericCycleException { private final ResultSet resultSet; private final String queryString; + private final AsyncResultSet asyncResultSet; + public ChangeUnappliedCycleException(long cycle, AsyncResultSet asyncResultSet, String queryString) { + super(cycle, "Operation was not applied:" + queryString); + this.asyncResultSet = asyncResultSet; + this.queryString = queryString; + this.resultSet=null; + } public ChangeUnappliedCycleException(long cycle, ResultSet resultSet, String queryString) { super(cycle, "Operation was not applied:" + queryString); this.resultSet = resultSet; this.queryString = queryString; + this.asyncResultSet=null; } public ResultSet getResultSet() { diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ResultSetVerificationException.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ResultSetVerificationException.java index 516c00d65..e46b16d1b 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ResultSetVerificationException.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/ResultSetVerificationException.java @@ -1,17 +1,18 @@ package io.nosqlbench.activitytype.cqld4.errorhandling.exceptions; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Statement; public class ResultSetVerificationException extends CQLResultSetException { public ResultSetVerificationException( - long cycle, ResultSet resultSet, Statement statement, Throwable cause) { + long cycle, AsyncResultSet resultSet, Statement statement, Throwable cause) { super(cycle, resultSet, statement, cause); } public ResultSetVerificationException( - long cycle, ResultSet resultSet, Statement statement, String s) { + long cycle, AsyncResultSet resultSet, Statement statement, String s) { super(cycle, resultSet, statement, s + ", \nquery string:\n" + getQueryString(statement)); } } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/UnexpectedPagingException.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/UnexpectedPagingException.java index 225aeec26..22b9b5f8d 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/UnexpectedPagingException.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/errorhandling/exceptions/UnexpectedPagingException.java @@ -1,5 +1,6 @@ package io.nosqlbench.activitytype.cqld4.errorhandling.exceptions; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; /** @@ -19,7 +20,7 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; */ public class UnexpectedPagingException extends CqlGenericCycleException { - private final ResultSet resultSet; + private final AsyncResultSet resultSet; private final String queryString; private final int fetchSize; private int fetchedPages; @@ -27,7 +28,7 @@ public class UnexpectedPagingException extends CqlGenericCycleException { public UnexpectedPagingException( long cycle, - ResultSet resultSet, + AsyncResultSet resultSet, String queryString, int fetchedPages, int maxpages, @@ -40,7 +41,7 @@ public class UnexpectedPagingException extends CqlGenericCycleException { this.fetchSize = fetchSize; } - public ResultSet getResultSet() { + public AsyncResultSet getAsyncResultSet() { return resultSet; } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/CqlBinderTypes.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/CqlBinderTypes.java index cd3398f24..7954b5089 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/CqlBinderTypes.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/CqlBinderTypes.java @@ -1,5 +1,6 @@ package io.nosqlbench.activitytype.cqld4.statements.binders; +import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.session.Session; @@ -8,19 +9,20 @@ import io.nosqlbench.virtdata.core.bindings.ValuesArrayBinder; import java.util.function.Function; public enum CqlBinderTypes { - direct_array(s -> new DirectArrayValuesBinder()), + + direct_array(DirectArrayValuesBinder::new), unset_aware(UnsettableValuesBinder::new), - diagnostic(s -> new DiagnosticPreparedBinder()); + diag_binder(DiagnosticPreparedBinder::new); - private final Function>> mapper; + private final Function>> mapper; - CqlBinderTypes(Function>> mapper) { + CqlBinderTypes(Function>> mapper) { this.mapper = mapper; } public final static CqlBinderTypes DEFAULT = unset_aware; - public ValuesArrayBinder> get(Session session) { + public ValuesArrayBinder> get(CqlSession session) { return mapper.apply(session); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/DiagnosticPreparedBinder.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/DiagnosticPreparedBinder.java index 15357cfce..354d71746 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/DiagnosticPreparedBinder.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/binders/DiagnosticPreparedBinder.java @@ -1,5 +1,6 @@ package io.nosqlbench.activitytype.cqld4.statements.binders; +import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.*; import com.datastax.oss.driver.api.core.type.DataType; import io.nosqlbench.activitytype.cqld4.core.CQLBindHelper; @@ -17,7 +18,14 @@ import java.util.List; * order to explain in more detail what is happening for users. */ public class DiagnosticPreparedBinder implements ValuesArrayBinder> { + public static final Logger logger = LoggerFactory.getLogger(DiagnosticPreparedBinder.class); + private final CqlSession session; + + public DiagnosticPreparedBinder(CqlSession session) { + this.session = session; + } + @Override public Statement bindValues(PreparedStatement prepared, Object[] values) { ColumnDefinitions columnDefinitions = prepared.getVariableDefinitions(); @@ -26,9 +34,7 @@ public class DiagnosticPreparedBinder implements ValuesArrayBinder columnDefList = new ArrayList<>(); prepared.getVariableDefinitions().forEach(columnDefList::add); - if (columnDefList.size() == values.length) { - columnDefList = columnDefinitions.asList(); - } else { + if (columnDefList.size() != values.length) { throw new RuntimeException("The number of named anchors in your statement does not match the number of bindings provided."); } @@ -41,7 +47,7 @@ public class DiagnosticPreparedBinder implements ValuesArrayBinder> { public final static Logger logger = LoggerFactory.getLogger(DirectArrayValuesBinder.class); + private final CqlSession session; + + public DirectArrayValuesBinder(CqlSession session) { + this.session = session; + } @Override - public Statement bindValues(PreparedStatement preparedStatement, Object[] objects) { + public Statement bindValues(PreparedStatement preparedStatement, Object[] objects) { try { return preparedStatement.bind(objects); } catch (Exception e) { @@ -30,7 +36,7 @@ public class DirectArrayValuesBinder implements ValuesArrayBinder sessionCache = new HashMap<>(); - private final static class SessionConfig extends ConcurrentHashMap { + public final static class SessionConfig extends ConcurrentHashMap { public CqlSession session; - public Map config = new ConcurrentHashMap<>(); + public OptionsMap optionsMap; - public SessionConfig(CqlSession session) { + public SessionConfig(CqlSession session, OptionsMap optionsMap) { this.session = session; + this.optionsMap = optionsMap; + } + + public void set(DefaultDriverOption intOption, Object value) { + CQLD4OptionsMapper.apply(optionsMap,intOption.getPath(), value.toString()); } } @@ -63,11 +70,11 @@ public class CQLSessionCache implements Shutdownable { sessionConfig.session.close(); } - public CqlSession getSession(ActivityDef activityDef) { + public SessionConfig getSession(ActivityDef activityDef) { String key = activityDef.getParams().getOptionalString("sessionid").orElse(DEFAULT_SESSION_ID); String profileName = activityDef.getParams().getOptionalString("profile").orElse("default"); SessionConfig sessionConfig = sessionCache.computeIfAbsent(key, (cid) -> createSession(activityDef, key, profileName)); - return sessionConfig.session; + return sessionConfig; } // cbopts=\".withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new DCAwareRoundRobinPolicy(\"dc1-us-east\", 0, false))).build()).withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))\" @@ -93,11 +100,6 @@ public class CQLSessionCache implements Shutdownable { // DriverConfigLoader cl = DriverConfigLoader.fromMap(defaults); // DriverConfig cfg = cl.getInitialConfig(); - OptionsMap optionsMap = OptionsMap.driverDefaults(); - - builder.withConfigLoader(new MapBasedDriverConfigLoader()) - builder.withConfigLoader(optionsMap); - Optional scb = activityDef.getParams().getOptionalString("secureconnectbundle") .map(Path::of); @@ -107,6 +109,7 @@ public class CQLSessionCache implements Shutdownable { Optional port1 = activityDef.getParams().getOptionalInteger("port"); + if (scb.isPresent()) { scb.map(b -> { logger.debug("adding secureconnectbundle: " + b.toString()); @@ -162,11 +165,12 @@ public class CQLSessionCache implements Shutdownable { } } + Optional clusteropts = activityDef.getParams().getOptionalString("cbopts"); if (clusteropts.isPresent()) { try { logger.info("applying cbopts:" + clusteropts.get()); - NashornEvaluator clusterEval = new NashornEvaluator<>(DseCluster.Builder.class); + NashornEvaluator clusterEval = new NashornEvaluator<>(CqlSessionBuilder.class); clusterEval.put("builder", builder); String importEnv = "load(\"nashorn:mozilla_compat.js\");\n" + @@ -185,135 +189,157 @@ public class CQLSessionCache implements Shutdownable { } } - SpeculativeExecutionPolicy speculativePolicy = activityDef.getParams() - .getOptionalString("speculative") - .map(speculative -> { - logger.info("speculative=>" + speculative); - return speculative; - }) - .map(CQLOptions::speculativeFor) - .orElse(CQLOptions.defaultSpeculativePolicy()); - builder.withSpeculativeExecutionPolicy(speculativePolicy); + // TODO: Support speculative=> +// SpeculativeExecutionPolicy speculativePolicy = activityDef.getParams() +// .getOptionalString("speculative") +// .map(speculative -> { +// logger.info("speculative=>" + speculative); +// return speculative; +// }) +// .map(CQLOptions::speculativeFor) +// .orElse(CQLOptions.defaultSpeculativePolicy()); +// builder.withSpeculativeExecutionPolicy(speculativePolicy); - activityDef.getParams().getOptionalString("socketoptions") - .map(sockopts -> { - logger.info("socketoptions=>" + sockopts); - return sockopts; - }) - .map(CQLOptions::socketOptionsFor) - .ifPresent(builder::withSocketOptions); + // TODO: Support socketoptions=> +// activityDef.getParams().getOptionalString("socketoptions") +// .map(sockopts -> { +// logger.info("socketoptions=>" + sockopts); +// return sockopts; +// }) +// .map(CQLOptions::socketOptionsFor) +// .ifPresent(builder::withSocketOptions); +// - activityDef.getParams().getOptionalString("reconnectpolicy") - .map(reconnectpolicy-> { - logger.info("reconnectpolicy=>" + reconnectpolicy); - return reconnectpolicy; - }) - .map(CQLOptions::reconnectPolicyFor) - .ifPresent(builder::withReconnectionPolicy); + // TODO: Support reconnectpolicy +// activityDef.getParams().getOptionalString("reconnectpolicy") +// .map(reconnectpolicy-> { +// logger.info("reconnectpolicy=>" + reconnectpolicy); +// return reconnectpolicy; +// }) +// .map(CQLOptions::reconnectPolicyFor) +// .ifPresent(builder::withReconnectionPolicy); - activityDef.getParams().getOptionalString("pooling") - .map(pooling -> { - logger.info("pooling=>" + pooling); - return pooling; - }) - .map(CQLOptions::poolingOptionsFor) - .ifPresent(builder::withPoolingOptions); + // TODO: support pooling options +// activityDef.getParams().getOptionalString("pooling") +// .map(pooling -> { +// logger.info("pooling=>" + pooling); +// return pooling; +// }) +// .map(CQLOptions::poolingOptionsFor) +// .ifPresent(builder::withPoolingOptions); - activityDef.getParams().getOptionalString("whitelist") - .map(whitelist -> { - logger.info("whitelist=>" + whitelist); - return whitelist; - }) - .map(p -> CQLOptions.whitelistFor(p, null)) - .ifPresent(builder::withLoadBalancingPolicy); + // TODO: support whitelist options +// activityDef.getParams().getOptionalString("whitelist") +// .map(whitelist -> { +// logger.info("whitelist=>" + whitelist); +// return whitelist; +// }) +// .map(p -> CQLOptions.whitelistFor(p, null)) +// .ifPresent(builder::withLoadBalancingPolicy); +// - activityDef.getParams().getOptionalString("tickduration") - .map(tickduration -> { - logger.info("tickduration=>" + tickduration); - return tickduration; - }) - .map(CQLOptions::withTickDuration) - .ifPresent(builder::withNettyOptions); + // TODO: support tickduration +// activityDef.getParams().getOptionalString("tickduration") +// .map(tickduration -> { +// logger.info("tickduration=>" + tickduration); +// return tickduration; +// }) +// .map(CQLOptions::withTickDuration) +// .ifPresent(builder::withNettyOptions); - activityDef.getParams().getOptionalString("compression") - .map(compression -> { - logger.info("compression=>" + compression); - return compression; - }) - .map(CQLOptions::withCompression) - .ifPresent(builder::withCompression); + // TODO: support compression +// activityDef.getParams().getOptionalString("compression") +// .map(compression -> { +// logger.info("compression=>" + compression); +// return compression; +// }) +// .map(CQLOptions::withCompression) +// .ifPresent(builder::withCompression); - if (activityDef.getParams().getOptionalString("ssl").isPresent()) { - logger.info("Cluster builder proceeding with SSL but no Client Auth"); - Object context = SSLKsFactory.get().getContext(activityDef); - SSLOptions sslOptions; - if (context instanceof javax.net.ssl.SSLContext) { - sslOptions = RemoteEndpointAwareJdkSSLOptions.builder() - .withSSLContext((javax.net.ssl.SSLContext) context).build(); - builder.withSSL(sslOptions); - } else if (context instanceof io.netty.handler.ssl.SslContext) { - sslOptions = - new RemoteEndpointAwareNettySSLOptions((io.netty.handler.ssl.SslContext) context); - } else { - throw new RuntimeException("Unrecognized ssl context object type: " + context.getClass().getCanonicalName()); - } - builder.withSSL(sslOptions); - } + // TODO: Support SSL standard config interface +// if (activityDef.getParams().getOptionalString("ssl").isPresent()) { +// logger.info("Cluster builder proceeding with SSL but no Client Auth"); +// Object context = SSLKsFactory.get().getContext(activityDef); +// SSLOptions sslOptions; +// if (context instanceof javax.net.ssl.SSLContext) { +// sslOptions = RemoteEndpointAwareJdkSSLOptions.builder() +// .withSSLContext((javax.net.ssl.SSLContext) context).build(); +// builder.withSSL(sslOptions); +// } else if (context instanceof io.netty.handler.ssl.SslContext) { +// sslOptions = +// new RemoteEndpointAwareNettySSLOptions((io.netty.handler.ssl.SslContext) context); +// } else { +// throw new RuntimeException("Unrecognized ssl context object type: " + context.getClass().getCanonicalName()); +// } +// builder.withSSL(sslOptions); +// } + + // TODO: Support retry policy +// RetryPolicy retryPolicy = activityDef.getParams() +// .getOptionalString("retrypolicy") +// .map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE); +// +// if (retryPolicy instanceof LoggingRetryPolicy) { +// logger.info("using LoggingRetryPolicy"); +// } +// +// builder.withRetryPolicy(retryPolicy); + + // TODO: Support JMX reporting toggle +// if (!activityDef.getParams().getOptionalBoolean("jmxreporting").orElse(false)) { +// builder.withoutJMXReporting(); +// } + + // TODO: Support single-endpoint options? +// // Proxy Translator and Whitelist for use with DS Cloud on-demand single-endpoint setup +// if (activityDef.getParams().getOptionalBoolean("single-endpoint").orElse(false)) { +// InetSocketAddress inetHost = new InetSocketAddress(host, port); +// final List whiteList = new ArrayList<>(); +// whiteList.add(inetHost); +// +// LoadBalancingPolicy whitelistPolicy = new WhiteListPolicy(new RoundRobinPolicy(), whiteList); +// builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy); +// } + + CqlSession session = builder.build(); + +// Cluster cl = builder.build(); - RetryPolicy retryPolicy = activityDef.getParams() - .getOptionalString("retrypolicy") - .map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE); - - if (retryPolicy instanceof LoggingRetryPolicy) { - logger.info("using LoggingRetryPolicy"); - } - - builder.withRetryPolicy(retryPolicy); - - if (!activityDef.getParams().getOptionalBoolean("jmxreporting").orElse(false)) { - builder.withoutJMXReporting(); - } - - // Proxy Translator and Whitelist for use with DS Cloud on-demand single-endpoint setup - if (activityDef.getParams().getOptionalBoolean("single-endpoint").orElse(false)) { - InetSocketAddress inetHost = new InetSocketAddress(host, port); - final List whiteList = new ArrayList<>(); - whiteList.add(inetHost); - - LoadBalancingPolicy whitelistPolicy = new WhiteListPolicy(new RoundRobinPolicy(), whiteList); - builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy); - } - - Cluster cl = builder.build(); - - // Apply default idempotence, if set - activityDef.getParams().getOptionalBoolean("defaultidempotence").map( - b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b) - ); - - Session session = cl.newSession(); + // TODO: Support default idempotence +// // Apply default idempotence, if set +// activityDef.getParams().getOptionalBoolean("defaultidempotence").map( +// b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b) +// ); // This also forces init of metadata - logger.info("cluster-metadata-allhosts:\n" + session.getCluster().getMetadata().getAllHosts()); + + Map nodes = session.getMetadata().getNodes(); + if (nodes.size()>25) { + logger.info("Found " + nodes.size() + " nodes in cluster."); + } else { + nodes.forEach((k,v)->{ + logger.info("found node " + k); + }); + } + logger.info("cluster-metadata-allhosts:\n" + session.getMetadata().getNodes()); if (activityDef.getParams().getOptionalBoolean("drivermetrics").orElse(false)) { - String driverPrefix = "driver." + sessid; - driverPrefix = activityDef.getParams().getOptionalString("driverprefix").orElse(driverPrefix) + "."; - ActivityMetrics.mountSubRegistry(driverPrefix, cl.getMetrics().getRegistry()); + String driverPrefix = activityDef.getParams().getOptionalString("driverprefix").orElse("driver."+sessid) + "."; + session.getMetrics().ifPresent(m -> ActivityMetrics.mountSubRegistry(driverPrefix,m.getRegistry())); } - return session; + OptionsMap optionsMap = OptionsMap.driverDefaults(); + + return new SessionConfig(session,optionsMap); } @Override public void shutdown() { - for (Session session : sessionCache.values()) { - Cluster cluster = session.getCluster(); - session.close(); - cluster.close(); + for (SessionConfig session : sessionCache.values()) { + session.session.close(); } } } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatement.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatement.java index 5d13e511c..7ef7e26fd 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatement.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatement.java @@ -5,7 +5,7 @@ import com.codahale.metrics.Timer; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator; import io.nosqlbench.virtdata.core.bindings.ContextualArrayBindings; @@ -20,9 +20,9 @@ import java.util.concurrent.TimeUnit; public class ReadyCQLStatement { private String name; - private ContextualArrayBindings contextualBindings; + private ContextualArrayBindings> contextualBindings; private long ratio; - private ResultSetCycleOperator[] resultSetOperators = null; + private D4ResultSetCycleOperator[] resultSetOperators = null; private RowCycleOperator[] rowCycleOperators = null; private Timer successTimer; @@ -30,7 +30,7 @@ public class ReadyCQLStatement { private Histogram rowsFetchedHisto; private Writer resultCsvWriter; - public ReadyCQLStatement(ContextualArrayBindings contextualBindings, long ratio, String name) { + public ReadyCQLStatement(ContextualArrayBindings> contextualBindings, long ratio, String name) { this.contextualBindings = contextualBindings; this.ratio = ratio; this.name = name; @@ -47,7 +47,7 @@ public class ReadyCQLStatement { return contextualBindings.bind(value); } - public ResultSetCycleOperator[] getResultSetOperators() { + public D4ResultSetCycleOperator[] getResultSetOperators() { return resultSetOperators; } @@ -161,8 +161,8 @@ public class ReadyCQLStatement { } - public ReadyCQLStatement withResultSetCycleOperators(ResultSetCycleOperator[] resultSetCycleOperators) { - this.resultSetOperators = resultSetCycleOperators; + public ReadyCQLStatement withResultSetCycleOperators(D4ResultSetCycleOperator[] pageInfoCycleOperators) { + this.resultSetOperators = pageInfoCycleOperators; return this; } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatementTemplate.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatementTemplate.java index 52156dc18..0fca5d02d 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatementTemplate.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/core/ReadyCQLStatementTemplate.java @@ -2,11 +2,12 @@ package io.nosqlbench.activitytype.cqld4.statements.core; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; +import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.session.Session; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator; import io.nosqlbench.activitytype.cqld4.core.CqlActivity; import io.nosqlbench.activitytype.cqld4.statements.binders.CqlBinderTypes; @@ -25,11 +26,11 @@ public class ReadyCQLStatementTemplate { private final static Logger logger = LoggerFactory.getLogger(ReadyCQLStatementTemplate.class); private final Session session; - private ContextualBindingsArrayTemplate template; + private ContextualBindingsArrayTemplate> template; private long ratio; private String name; - private ResultSetCycleOperator[] resultSetCycleOperators; + private D4ResultSetCycleOperator[] pageInfoCycleOperators; private RowCycleOperator[] rowCycleOperators; private Timer successTimer; @@ -37,11 +38,17 @@ public class ReadyCQLStatementTemplate { private Histogram rowsFetchedHisto; private Writer resultCsvWriter; - public ReadyCQLStatementTemplate(Map fconfig, CqlBinderTypes binderType, Session session, - PreparedStatement preparedStmt, long ratio, String name) { + public ReadyCQLStatementTemplate( + Map fconfig, + CqlBinderTypes binderType, + CqlSession session, + PreparedStatement preparedStmt, + long ratio, + String name + ) { this.session = session; this.name = name; - ValuesArrayBinder binder = binderType.get(session); + ValuesArrayBinder> binder = binderType.get(session); logger.trace("Using binder_type=>" + binder.toString()); template = new ContextualBindingsArrayTemplate<>( @@ -52,10 +59,17 @@ public class ReadyCQLStatementTemplate { this.ratio = ratio; } - public ReadyCQLStatementTemplate(Map fconfig, Session session, SimpleStatement simpleStatement, long ratio, String name, boolean parametrized) { + public ReadyCQLStatementTemplate( + Map fconfig, + Session session, + SimpleStatement simpleStatement, + long ratio, + String name, + boolean parametrized + ) { this.session = session; this.name = name; - template = new ContextualBindingsArrayTemplate<>( + template = new ContextualBindingsArrayTemplate( simpleStatement, new BindingsTemplate(fconfig), new SimpleStatementValuesBinder(parametrized) @@ -66,12 +80,12 @@ public class ReadyCQLStatementTemplate { public ReadyCQLStatement resolve() { return new ReadyCQLStatement(template.resolveBindings(), ratio, name) .withMetrics(this.successTimer, this.errorTimer, this.rowsFetchedHisto) - .withResultSetCycleOperators(resultSetCycleOperators) + .withResultSetCycleOperators(pageInfoCycleOperators) .withRowCycleOperators(rowCycleOperators) .withResultCsvWriter(resultCsvWriter); } - public ContextualBindingsArrayTemplate getContextualBindings() { + public ContextualBindingsArrayTemplate> getContextualBindings() { return template; } @@ -90,13 +104,13 @@ public class ReadyCQLStatementTemplate { this.resultCsvWriter = activity.getNamedWriter(name); } - public void addResultSetOperators(ResultSetCycleOperator... addingOperators) { - resultSetCycleOperators = (resultSetCycleOperators==null) ? new ResultSetCycleOperator[0]: resultSetCycleOperators; + public void addResultSetOperators(D4ResultSetCycleOperator... addingOperators) { + pageInfoCycleOperators = (pageInfoCycleOperators ==null) ? new D4ResultSetCycleOperator[0]: pageInfoCycleOperators; - ResultSetCycleOperator[] newOperators = new ResultSetCycleOperator[resultSetCycleOperators.length + addingOperators.length]; - System.arraycopy(resultSetCycleOperators,0,newOperators,0,resultSetCycleOperators.length); - System.arraycopy(addingOperators,0,newOperators,resultSetCycleOperators.length,addingOperators.length); - this.resultSetCycleOperators=newOperators; + D4ResultSetCycleOperator[] newOperators = new D4ResultSetCycleOperator[pageInfoCycleOperators.length + addingOperators.length]; + System.arraycopy(pageInfoCycleOperators,0,newOperators,0, pageInfoCycleOperators.length); + System.arraycopy(addingOperators,0,newOperators, pageInfoCycleOperators.length,addingOperators.length); + this.pageInfoCycleOperators =newOperators; } public void addRowCycleOperators(RowCycleOperator... addingOperators) { diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/AssertSingleRowResultSet.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/AssertSingleRowD4ResultSet.java similarity index 67% rename from driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/AssertSingleRowResultSet.java rename to driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/AssertSingleRowD4ResultSet.java index e4f6c551d..89bf3d6e0 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/AssertSingleRowResultSet.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/AssertSingleRowD4ResultSet.java @@ -1,18 +1,19 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Statement; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.activitytype.cqld4.errorhandling.exceptions.ResultSetVerificationException; /** * Throws a {@link ResultSetVerificationException} unless there is exactly one row in the result set. */ -public class AssertSingleRowResultSet implements ResultSetCycleOperator { +public class AssertSingleRowD4ResultSet implements D4ResultSetCycleOperator { - @Override - public int apply(ResultSet resultSet, Statement statement, long cycle) { - int rowsIncoming = resultSet.getAvailableWithoutFetching(); + @Override + public int apply(AsyncResultSet resultSet, Statement statement, long cycle) { + int rowsIncoming = resultSet.remaining(); if (rowsIncoming<1) { throw new ResultSetVerificationException(cycle, resultSet, statement, "no row in result set, expected exactly 1"); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ClearVars.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ClearVars.java index a09f6eb01..ab6946df3 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ClearVars.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ClearVars.java @@ -1,14 +1,15 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Statement; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState; -public class ClearVars implements ResultSetCycleOperator { +public class ClearVars implements D4ResultSetCycleOperator { @Override - public int apply(ResultSet resultSet, Statement statement, long cycle) { + public int apply(AsyncResultSet pageInfo, Statement statement, long cycle) { SharedState.tl_ObjectMap.get().clear(); return 0; } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/CqlResultSetLogger.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/CqlD4ResultSetLogger.java similarity index 73% rename from driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/CqlResultSetLogger.java rename to driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/CqlD4ResultSetLogger.java index 6122c80f6..8ddce18c7 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/CqlResultSetLogger.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/CqlD4ResultSetLogger.java @@ -1,7 +1,7 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; import com.datastax.oss.driver.api.core.cql.*; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,8 +9,8 @@ import org.slf4j.LoggerFactory; * Logs a trace-level event for the result set, including * cycles, rows, fetched row count, and the statement. */ -public class CqlResultSetLogger implements ResultSetCycleOperator { - private final static Logger logger = LoggerFactory.getLogger(CqlResultSetLogger.class); +public class CqlD4ResultSetLogger implements D4ResultSetCycleOperator { + private final static Logger logger = LoggerFactory.getLogger(CqlD4ResultSetLogger.class); private static String getQueryString(Statement stmt) { if (stmt instanceof PreparedStatement) { @@ -25,14 +25,14 @@ public class CqlResultSetLogger implements ResultSetCycleOperator { } @Override - public int apply(ResultSet resultSet, Statement statement, long cycle) { + public int apply(AsyncResultSet resultSet, Statement statement, long cycle) { logger.debug("result-set-logger: " + " cycle=" + cycle - + " rows=" + resultSet.getAvailableWithoutFetching() - + " fetched=" + resultSet.isFullyFetched() + + " remaining=" + resultSet.remaining() + + " hasmore=" + resultSet.hasMorePages() + " statement=" + getQueryString(statement).stripTrailing() ); - for (Row row : resultSet) { + for (Row row : resultSet.currentPage()) { logger.trace(row.toString()); } return 0; diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PopVars.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PopVars.java index 35886aeb9..008c76295 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PopVars.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PopVars.java @@ -1,16 +1,17 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Statement; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState; import java.util.HashMap; -public class PopVars implements ResultSetCycleOperator { +public class PopVars implements D4ResultSetCycleOperator { @Override - public int apply(ResultSet resultSet, Statement statement, long cycle) { + public int apply(AsyncResultSet resultSet, Statement statement, long cycle) { HashMap stringObjectHashMap = SharedState.tl_ObjectMap.get(); Object o = SharedState.tl_ObjectStack.get().pollLast(); if (o != null && o instanceof HashMap) { diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/Print.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/Print.java index e93298325..b43dafbe5 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/Print.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/Print.java @@ -1,13 +1,14 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Statement; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; -public class Print implements ResultSetCycleOperator { +public class Print implements D4ResultSetCycleOperator { @Override - public int apply(ResultSet resultSet, Statement statement, long cycle) { + public int apply(AsyncResultSet resultSet, Statement statement, long cycle) { System.out.println("RS:"+ resultSet.toString()); return 0; } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PushVars.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PushVars.java index 5588c4a2e..28940d4fc 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PushVars.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/PushVars.java @@ -1,16 +1,16 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; -import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.Statement; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.virtdata.library.basics.core.threadstate.SharedState; import java.util.HashMap; -public class PushVars implements ResultSetCycleOperator { +public class PushVars implements D4ResultSetCycleOperator { @Override - public int apply(ResultSet resultSet, Statement statement, long cycle) { + public int apply(AsyncResultSet resultSet, Statement statement, long cycle) { HashMap existingVars = SharedState.tl_ObjectMap.get(); HashMap topush = new HashMap<>(existingVars); diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ResultSetCycleOperators.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ResultSetCycleOperators.java index a50e8bcab..34ce2953b 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ResultSetCycleOperators.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/ResultSetCycleOperators.java @@ -1,6 +1,6 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; public enum ResultSetCycleOperators { @@ -9,23 +9,23 @@ public enum ResultSetCycleOperators { clearvars(ClearVars.class), trace(TraceLogger.class), - log(CqlResultSetLogger.class), - assert_singlerow(AssertSingleRowResultSet.class), + log(CqlD4ResultSetLogger.class), + assert_singlerow(AssertSingleRowD4ResultSet.class), print(Print.class); - private final Class implClass; + private final Class implClass; - ResultSetCycleOperators(Class traceLoggerClass) { + ResultSetCycleOperators(Class traceLoggerClass) { this.implClass = traceLoggerClass; } - public Class getImplementation() { + public Class getImplementation() { return implClass; } - public ResultSetCycleOperator getInstance() { + public D4ResultSetCycleOperator getInstance() { try { return getImplementation().getConstructor().newInstance(); } catch (Exception e) { @@ -33,7 +33,7 @@ public enum ResultSetCycleOperators { } } - public static ResultSetCycleOperator newOperator(String name) { + public static D4ResultSetCycleOperator newOperator(String name) { return ResultSetCycleOperators.valueOf(name).getInstance(); } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/RowCapture.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/RowCapture.java index 4f04b6bb2..2de1b1f83 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/RowCapture.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/RowCapture.java @@ -1,15 +1,15 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; -import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.AsyncResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.Statement; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import java.util.LinkedList; -public class RowCapture implements ResultSetCycleOperator { +public class RowCapture implements D4ResultSetCycleOperator { @Override - public int apply(ResultSet resultSet, Statement statement, long cycle) { + public int apply(AsyncResultSet resultSet, Statement statement, long cycle) { ThreadLocal> rows = PerThreadCQLData.rows; return 0; } diff --git a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/TraceLogger.java b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/TraceLogger.java index 4be0e41c7..263197529 100644 --- a/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/TraceLogger.java +++ b/driver-cqld4/src/main/java/io/nosqlbench/activitytype/cqld4/statements/rsoperators/TraceLogger.java @@ -1,7 +1,7 @@ package io.nosqlbench.activitytype.cqld4.statements.rsoperators; import com.datastax.oss.driver.api.core.cql.*; -import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator; +import io.nosqlbench.activitytype.cqld4.api.D4ResultSetCycleOperator; import io.nosqlbench.activitytype.cqld4.core.StatementModifier; import io.nosqlbench.engine.api.util.SimpleConfig; import org.slf4j.Logger; @@ -13,7 +13,7 @@ import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; -public class TraceLogger implements ResultSetCycleOperator, StatementModifier { +public class TraceLogger implements D4ResultSetCycleOperator, StatementModifier { private final static Logger logger = LoggerFactory.getLogger(TraceLogger.class); @@ -46,7 +46,8 @@ public class TraceLogger implements ResultSetCycleOperator, StatementModifier { } @Override - public int apply(ResultSet rs, Statement statement, long cycle) { + public int apply(AsyncResultSet rs, Statement statement, long cycle) { + rs.getExecutionInfo().getQueryTrace(); if ((cycle%modulo)!=0) { return 0; } diff --git a/nb/pom.xml b/nb/pom.xml index 615dc2308..b6b66afb3 100644 --- a/nb/pom.xml +++ b/nb/pom.xml @@ -246,7 +246,7 @@ io.nosqlbench driver-cqld4 - 3.12.104-SNAPSHOT + 3.12.120-SNAPSHOT