diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java index 398ce1e6e..4e72dad5c 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java @@ -90,7 +90,8 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser { getMaxPages(), isRetryReplace(), getMaxLwtRetries(), - processors + processors, + getExpectedResultExpression() ); } catch (Exception exception) { return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics( diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java index 16b999e60..4fcf44b13 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java @@ -50,7 +50,8 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser { (SimpleStatement) stmtFunc.apply(value), getMaxPages(), isRetryReplace(), - getMaxLwtRetries() + getMaxLwtRetries(), + getExpectedResultExpression() ); } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java index f58ce6dc3..ecb23b312 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java @@ -47,7 +47,8 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser { (SimpleStatement) stmtFunc.apply(value), getMaxPages(), isRetryReplace(), - getMaxLwtRetries() + getMaxLwtRetries(), + getExpectedResultExpression() ); } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java index 67931b83a..4d46e5685 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java @@ -27,8 +27,12 @@ import io.nosqlbench.adapter.cqld4.exceptions.ExceededRetryReplaceException; import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException; import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException; import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.*; +import org.mvel2.MVEL; +import java.io.Serializable; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; @@ -54,12 +58,20 @@ public abstract class Cqld4CqlOp implements CycleOp, VariableCapture, private Cqld4CqlOp nextOp; private final RSProcessors processors; + private final ThreadLocal> results = new ThreadLocal<>(); + private Serializable expectedResultExpression; + public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) { + this(session, maxPages, retryReplace, maxLwtRetries, processors, null); + } + + public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors, Serializable expectedResultExpressions) { this.session = session; this.maxPages = maxPages; this.retryReplace = retryReplace; this.maxLwtRetries =maxLwtRetries; this.processors = processors; + this.expectedResultExpression = expectedResultExpressions; } protected Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int retryRplaceCount, RSProcessors processors) { @@ -97,19 +109,22 @@ public abstract class Cqld4CqlOp implements CycleOp, VariableCapture, Iterator reader = rs.iterator(); int pages = 0; + var resultRows = new ArrayList(); while (true) { int pageRows = rs.getAvailableWithoutFetching(); for (int i = 0; i < pageRows; i++) { Row row = reader.next(); + resultRows.add(row); processors.buffer(row); } if (pages++ > maxPages) { throw new UnexpectedPagingException(rs, getQueryString(), pages, maxPages, stmt.getPageSize()); } if (rs.isFullyFetched()) { + results.set(resultRows); break; } - totalRows += pageRows; + totalRows += pageRows; // TODO JK what is this for? } processors.flush(); return rs; @@ -139,4 +154,8 @@ public abstract class Cqld4CqlOp implements CycleOp, VariableCapture, return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors); } + @Override + public boolean verified() { // TODO JK can this be made CQL agnostic? And moved to BaseOpDispenser? + return MVEL.executeExpression(expectedResultExpression, results.get(), boolean.class); + } } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java index 9e6dfcad9..cb1d29b67 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java @@ -20,12 +20,14 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import io.nosqlbench.adapter.cqld4.RSProcessors; +import java.io.Serializable; + public class Cqld4CqlPreparedStatement extends Cqld4CqlOp { private final BoundStatement stmt; - public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) { - super(session,maxPages,retryReplace,maxLwtRetries,processors); + public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors, Serializable expectedResultExpression) { + super(session, maxPages, retryReplace, maxLwtRetries, processors, expectedResultExpression); this.stmt = stmt; } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java index 0f119c91d..39141c8df 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java @@ -20,11 +20,13 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.SimpleStatement; import io.nosqlbench.adapter.cqld4.RSProcessors; +import java.io.Serializable; + public class Cqld4CqlSimpleStatement extends Cqld4CqlOp { private final SimpleStatement stmt; - public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries) { - super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors()); + public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, Serializable expectedResultExpression) { + super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors(), expectedResultExpression); this.stmt = stmt; } diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java index 83c7b796d..79212b554 100644 --- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java @@ -23,7 +23,9 @@ import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op; import io.nosqlbench.engine.api.metrics.ThreadLocalNamedTimers; import io.nosqlbench.engine.api.templating.ParsedOp; +import org.mvel2.MVEL; +import java.io.Serializable; import java.util.concurrent.TimeUnit; /** @@ -38,6 +40,7 @@ import java.util.concurrent.TimeUnit; public abstract class BaseOpDispenser implements OpDispenser { private final String opName; + private Serializable expectedResultExpression; protected final DriverAdapter adapter; private boolean instrument; private Histogram resultSizeHistogram; @@ -63,6 +66,17 @@ public abstract class BaseOpDispenser implements OpDispenser } } configureInstrumentation(op); + configureExpectations(op); + } + + private void configureExpectations(ParsedOp op) { + op.getOptionalStaticValue("expected-result", String.class) + .map(MVEL::compileExpression) + .ifPresent(result -> this.expectedResultExpression = result); + } + + public Serializable getExpectedResultExpression() { + return expectedResultExpression; } String getOpName() { diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/OpDispenser.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/OpDispenser.java index 162621902..c8d584f18 100644 --- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/OpDispenser.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/OpDispenser.java @@ -16,6 +16,7 @@ package io.nosqlbench.engine.api.activityimpl; +import java.io.Serializable; import java.util.function.LongFunction; /** @@ -81,5 +82,6 @@ public interface OpDispenser extends LongFunction, OpResultTracker { */ T apply(long value); + Serializable getExpectedResultExpression(); } diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/Op.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/Op.java index e5ecdf6a2..d70014c8f 100644 --- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/Op.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/Op.java @@ -33,4 +33,7 @@ package io.nosqlbench.engine.api.activityimpl.uniform.flowtypes; */ // TODO: optimize the runtime around the specific op type public interface Op extends OpResultSize { + default boolean verified() { + return false; + } } diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java index d1f5010e8..0f3d236a0 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/actions/StandardAction.java @@ -87,7 +87,7 @@ public class StandardAction, R extends Op> impl while (op != null) { int tries = 0; - while (tries++ <= maxTries) { + while (tries++ < maxTries) { Throwable error = null; long startedAt = System.nanoTime(); @@ -112,7 +112,27 @@ public class StandardAction, R extends Op> impl if (error == null) { resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS); dispenser.onSuccess(cycle, nanos, op.getResultSize()); - break; + + if (dispenser.getExpectedResultExpression() != null) { // TODO JK refactor the whole if/else break/continue tree + if (op.verified()) { // TODO JK Could this be moved to BaseOpDispenser? + logger.info(() -> "Verification of result passed"); + break; + } else { + // retry + var triesLeft = maxTries - tries; + logger.info("Verification of result did not pass - {} retries left", triesLeft); + if (triesLeft == 0) { + var retriesExhausted = new RuntimeException("Max retries for verification step exhausted."); // TODO JK do we need a dedicated exception here? VerificationRetriesExhaustedException? + var errorDetail = errorHandler.handleError(retriesExhausted, cycle, nanos); + dispenser.onError(cycle, nanos, retriesExhausted); + code = ErrorDetail.ERROR_RETRYABLE.resultCode; // TODO JK use code from errorDetail.resultCode? + break; + } + continue; + } + } else { + break; + } } else { ErrorDetail detail = errorHandler.handleError(error, cycle, nanos); dispenser.onError(cycle, nanos, error);