mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Verify expected result with MVEL
This commit is contained in:
@@ -90,7 +90,8 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
|
|||||||
getMaxPages(),
|
getMaxPages(),
|
||||||
isRetryReplace(),
|
isRetryReplace(),
|
||||||
getMaxLwtRetries(),
|
getMaxLwtRetries(),
|
||||||
processors
|
processors,
|
||||||
|
getExpectedResultExpression()
|
||||||
);
|
);
|
||||||
} catch (Exception exception) {
|
} catch (Exception exception) {
|
||||||
return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics(
|
return CQLD4PreparedStmtDiagnostics.rebindWithDiagnostics(
|
||||||
|
|||||||
@@ -50,7 +50,8 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser {
|
|||||||
(SimpleStatement) stmtFunc.apply(value),
|
(SimpleStatement) stmtFunc.apply(value),
|
||||||
getMaxPages(),
|
getMaxPages(),
|
||||||
isRetryReplace(),
|
isRetryReplace(),
|
||||||
getMaxLwtRetries()
|
getMaxLwtRetries(),
|
||||||
|
getExpectedResultExpression()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,8 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser {
|
|||||||
(SimpleStatement) stmtFunc.apply(value),
|
(SimpleStatement) stmtFunc.apply(value),
|
||||||
getMaxPages(),
|
getMaxPages(),
|
||||||
isRetryReplace(),
|
isRetryReplace(),
|
||||||
getMaxLwtRetries()
|
getMaxLwtRetries(),
|
||||||
|
getExpectedResultExpression()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,8 +27,12 @@ import io.nosqlbench.adapter.cqld4.exceptions.ExceededRetryReplaceException;
|
|||||||
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
|
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
|
||||||
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
|
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.*;
|
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.*;
|
||||||
|
import org.mvel2.MVEL;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
|
||||||
@@ -54,12 +58,20 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
|||||||
private Cqld4CqlOp nextOp;
|
private Cqld4CqlOp nextOp;
|
||||||
private final RSProcessors processors;
|
private final RSProcessors processors;
|
||||||
|
|
||||||
|
private final ThreadLocal<List<Row>> results = new ThreadLocal<>();
|
||||||
|
private Serializable expectedResultExpression;
|
||||||
|
|
||||||
public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
|
public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
|
||||||
|
this(session, maxPages, retryReplace, maxLwtRetries, processors, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors, Serializable expectedResultExpressions) {
|
||||||
this.session = session;
|
this.session = session;
|
||||||
this.maxPages = maxPages;
|
this.maxPages = maxPages;
|
||||||
this.retryReplace = retryReplace;
|
this.retryReplace = retryReplace;
|
||||||
this.maxLwtRetries =maxLwtRetries;
|
this.maxLwtRetries =maxLwtRetries;
|
||||||
this.processors = processors;
|
this.processors = processors;
|
||||||
|
this.expectedResultExpression = expectedResultExpressions;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int retryRplaceCount, RSProcessors processors) {
|
protected Cqld4CqlOp(CqlSession session, int maxPages, boolean retryReplace, int maxLwtRetries, int retryRplaceCount, RSProcessors processors) {
|
||||||
@@ -97,19 +109,22 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
|||||||
|
|
||||||
Iterator<Row> reader = rs.iterator();
|
Iterator<Row> reader = rs.iterator();
|
||||||
int pages = 0;
|
int pages = 0;
|
||||||
|
var resultRows = new ArrayList<Row>();
|
||||||
while (true) {
|
while (true) {
|
||||||
int pageRows = rs.getAvailableWithoutFetching();
|
int pageRows = rs.getAvailableWithoutFetching();
|
||||||
for (int i = 0; i < pageRows; i++) {
|
for (int i = 0; i < pageRows; i++) {
|
||||||
Row row = reader.next();
|
Row row = reader.next();
|
||||||
|
resultRows.add(row);
|
||||||
processors.buffer(row);
|
processors.buffer(row);
|
||||||
}
|
}
|
||||||
if (pages++ > maxPages) {
|
if (pages++ > maxPages) {
|
||||||
throw new UnexpectedPagingException(rs, getQueryString(), pages, maxPages, stmt.getPageSize());
|
throw new UnexpectedPagingException(rs, getQueryString(), pages, maxPages, stmt.getPageSize());
|
||||||
}
|
}
|
||||||
if (rs.isFullyFetched()) {
|
if (rs.isFullyFetched()) {
|
||||||
|
results.set(resultRows);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
totalRows += pageRows;
|
totalRows += pageRows; // TODO JK what is this for?
|
||||||
}
|
}
|
||||||
processors.flush();
|
processors.flush();
|
||||||
return rs;
|
return rs;
|
||||||
@@ -139,4 +154,8 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
|
|||||||
return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors);
|
return new Cqld4CqlReboundStatement(session, maxPages, retryReplace, maxLwtRetries, retryReplaceCount, rebound, processors);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean verified() { // TODO JK can this be made CQL agnostic? And moved to BaseOpDispenser?
|
||||||
|
return MVEL.executeExpression(expectedResultExpression, results.get(), boolean.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,12 +20,14 @@ import com.datastax.oss.driver.api.core.CqlSession;
|
|||||||
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
import com.datastax.oss.driver.api.core.cql.BoundStatement;
|
||||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
|
public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
|
||||||
|
|
||||||
private final BoundStatement stmt;
|
private final BoundStatement stmt;
|
||||||
|
|
||||||
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors) {
|
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, RSProcessors processors, Serializable expectedResultExpression) {
|
||||||
super(session,maxPages,retryReplace,maxLwtRetries,processors);
|
super(session, maxPages, retryReplace, maxLwtRetries, processors, expectedResultExpression);
|
||||||
this.stmt = stmt;
|
this.stmt = stmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -20,11 +20,13 @@ import com.datastax.oss.driver.api.core.CqlSession;
|
|||||||
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
|
||||||
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
import io.nosqlbench.adapter.cqld4.RSProcessors;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
|
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
|
||||||
private final SimpleStatement stmt;
|
private final SimpleStatement stmt;
|
||||||
|
|
||||||
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries) {
|
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxPages, boolean retryReplace, int maxLwtRetries, Serializable expectedResultExpression) {
|
||||||
super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors());
|
super(session, maxPages,retryReplace, maxLwtRetries, new RSProcessors(), expectedResultExpression);
|
||||||
this.stmt = stmt;
|
this.stmt = stmt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -23,7 +23,9 @@ import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
|||||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||||
import io.nosqlbench.engine.api.metrics.ThreadLocalNamedTimers;
|
import io.nosqlbench.engine.api.metrics.ThreadLocalNamedTimers;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
|
import org.mvel2.MVEL;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -38,6 +40,7 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T> {
|
public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T> {
|
||||||
|
|
||||||
private final String opName;
|
private final String opName;
|
||||||
|
private Serializable expectedResultExpression;
|
||||||
protected final DriverAdapter<T, S> adapter;
|
protected final DriverAdapter<T, S> adapter;
|
||||||
private boolean instrument;
|
private boolean instrument;
|
||||||
private Histogram resultSizeHistogram;
|
private Histogram resultSizeHistogram;
|
||||||
@@ -63,6 +66,17 @@ public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
configureInstrumentation(op);
|
configureInstrumentation(op);
|
||||||
|
configureExpectations(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void configureExpectations(ParsedOp op) {
|
||||||
|
op.getOptionalStaticValue("expected-result", String.class)
|
||||||
|
.map(MVEL::compileExpression)
|
||||||
|
.ifPresent(result -> this.expectedResultExpression = result);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Serializable getExpectedResultExpression() {
|
||||||
|
return expectedResultExpression;
|
||||||
}
|
}
|
||||||
|
|
||||||
String getOpName() {
|
String getOpName() {
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package io.nosqlbench.engine.api.activityimpl;
|
package io.nosqlbench.engine.api.activityimpl;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.function.LongFunction;
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -81,5 +82,6 @@ public interface OpDispenser<T> extends LongFunction<T>, OpResultTracker {
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
T apply(long value);
|
T apply(long value);
|
||||||
|
Serializable getExpectedResultExpression();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,4 +33,7 @@ package io.nosqlbench.engine.api.activityimpl.uniform.flowtypes;
|
|||||||
*/
|
*/
|
||||||
// TODO: optimize the runtime around the specific op type
|
// TODO: optimize the runtime around the specific op type
|
||||||
public interface Op extends OpResultSize {
|
public interface Op extends OpResultSize {
|
||||||
|
default boolean verified() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
|
|||||||
while (op != null) {
|
while (op != null) {
|
||||||
|
|
||||||
int tries = 0;
|
int tries = 0;
|
||||||
while (tries++ <= maxTries) {
|
while (tries++ < maxTries) {
|
||||||
Throwable error = null;
|
Throwable error = null;
|
||||||
long startedAt = System.nanoTime();
|
long startedAt = System.nanoTime();
|
||||||
|
|
||||||
@@ -112,7 +112,27 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
|
|||||||
if (error == null) {
|
if (error == null) {
|
||||||
resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
|
resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
|
||||||
dispenser.onSuccess(cycle, nanos, op.getResultSize());
|
dispenser.onSuccess(cycle, nanos, op.getResultSize());
|
||||||
break;
|
|
||||||
|
if (dispenser.getExpectedResultExpression() != null) { // TODO JK refactor the whole if/else break/continue tree
|
||||||
|
if (op.verified()) { // TODO JK Could this be moved to BaseOpDispenser?
|
||||||
|
logger.info(() -> "Verification of result passed");
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// retry
|
||||||
|
var triesLeft = maxTries - tries;
|
||||||
|
logger.info("Verification of result did not pass - {} retries left", triesLeft);
|
||||||
|
if (triesLeft == 0) {
|
||||||
|
var retriesExhausted = new RuntimeException("Max retries for verification step exhausted."); // TODO JK do we need a dedicated exception here? VerificationRetriesExhaustedException?
|
||||||
|
var errorDetail = errorHandler.handleError(retriesExhausted, cycle, nanos);
|
||||||
|
dispenser.onError(cycle, nanos, retriesExhausted);
|
||||||
|
code = ErrorDetail.ERROR_RETRYABLE.resultCode; // TODO JK use code from errorDetail.resultCode?
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
ErrorDetail detail = errorHandler.handleError(error, cycle, nanos);
|
ErrorDetail detail = errorHandler.handleError(error, cycle, nanos);
|
||||||
dispenser.onError(cycle, nanos, error);
|
dispenser.onError(cycle, nanos, error);
|
||||||
|
|||||||
Reference in New Issue
Block a user