distinguish between base runnables and op measurables

This commit is contained in:
Jonathan Shook
2022-02-15 21:23:24 -06:00
parent ade9cae769
commit 11245c07ae
13 changed files with 225 additions and 74 deletions

View File

@@ -8,8 +8,8 @@ import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
private final BoundStatement stmt;
public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, BoundStatement rebound, RSProcessors processors) {
super(session,maxpages,retryreplace,metrics,processors);
public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, BoundStatement rebound, RSProcessors processors) {
super(session,maxpages,retryreplace,processors);
this.stmt = rebound;
}

View File

@@ -0,0 +1,82 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public abstract class BaseCqlStmtDispenser extends BaseOpDispenser<Cqld4CqlOp> {
private final LongFunction<Statement> stmtFunc;
private final int maxpages;
private final Cqld4OpMetrics metrics = new Cqld4OpMetrics();
private final LongFunction<CqlSession> sessionFunc;
private final boolean isRetryReplace;
public BaseCqlStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp op) {
super(op);
this.sessionFunc = sessionFunc;
this.stmtFunc = this.getCommonStmtFunc(op);
this.maxpages = op.getStaticConfigOr("maxpages",1);
this.isRetryReplace = op.getStaticConfigOr("retryreplace",false);
}
public int getMaxPages() {
return maxpages;
}
public boolean isRetryReplace() {
return isRetryReplace;
}
public LongFunction<CqlSession> getSessionFunc() {
return sessionFunc;
}
/**
* Implement this method to define a statement function, considering only
* the functionality that is specific to that statement type.
* Do not implement decorators which apply to {@link Statement} as these are
* applied uniformly internal to the logic of {@link #getStmtFunc()}.
* @param op The parsed op template
* @return A statement function
*/
protected abstract LongFunction<Statement> getPartialStmtFunction(ParsedOp op);
/**
* All implementations of a CQL Statement Dispenser should be using the method
* provided by this function. This ensures that {@link Statement}-level attributes
* are handled uniformly and in one place.
* @return A function which produces a statement, fully ready to execute, with all
* cross-type attributes handled consistently.
*/
public LongFunction<Statement> getStmtFunc() {
return stmtFunc;
}
/**
* Any {@link Statement}-level attributes need to be handled here.
* This is the initializer for the {@link #getStmtFunc()}} accessor method.
* This takes the base statement function and decorates it optionally with each
* additional qualified modifier, short-circuiting those which are not specified.
* This allows default behavior to take precedence as well as avoids unnecessary calling
* overhead for implicit attributes.
* @param op A parsed op template.
* @return A function which is used to construct {@link Statement} objects, ready to run.
* However, this method is hidden to ensure that it is used only as a one-time initializer
* at construction time.
*/
private LongFunction<Statement> getCommonStmtFunc(ParsedOp op) {
LongFunction<Statement> partial = getPartialStmtFunction(op);
partial = op.enhanceEnum(partial, "cl", DefaultConsistencyLevel.class, Statement::setConsistencyLevel);
partial = op.enhanceEnum(partial, "scl", DefaultConsistencyLevel.class, Statement::setSerialConsistencyLevel);
partial = op.enhance(partial, "idempotent", Boolean.class, Statement::setIdempotent);
return partial;
}
}

View File

@@ -3,47 +3,59 @@ package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import java.util.function.LongFunction;
public class Cqld4PreparedStmtDispenser extends BaseOpDispenser<Cqld4CqlOp> {
public class Cqld4PreparedStmtDispenser extends BaseCqlStmtDispenser {
private final CqlSession session;
private final LongFunction<Object[]> varbinder;
private final PreparedStatement preparedStmt;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
private final RSProcessors processors;
private final LongFunction<Statement> stmtFunc;
private PreparedStatement preparedStmt;
private CqlSession boundSession;
public Cqld4PreparedStmtDispenser(CqlSession session, ParsedOp cmd, RSProcessors processors) {
super(cmd);
this.session = session;
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp cmd, RSProcessors processors) {
super(sessionFunc, cmd);
if (cmd.isDynamic("space")) {
throw new RuntimeException("Prepared statements and dynamic space values are not supported." +
" This would churn the prepared statement cache, defeating the purpose of prepared statements.");
}
this.processors = processors;
stmtFunc = super.getStmtFunc();
}
@Override
protected LongFunction<Statement> getPartialStmtFunction(ParsedOp cmd) {
LongFunction<Object[]> varbinder;
ParsedTemplate parsed = cmd.getStmtAsTemplate().orElseThrow();
varbinder = cmd.newArrayBinderFromBindPoints(parsed.getBindPoints());
String preparedQueryString = parsed.getPositionalStatement(s -> "?");
preparedStmt = session.prepare(preparedQueryString);
boundSession = getSessionFunc().apply(0);
preparedStmt = boundSession.prepare(preparedQueryString);
this.maxpages = cmd.getStaticConfigOr("maxpages",1);
this.retryreplace = cmd.getStaticConfigOr("retryreplace", false);
this.metrics = new Cqld4OpMetrics();
LongFunction<Statement> boundStmtFunc = c -> {
Object[] apply = varbinder.apply(c);
return preparedStmt.bind(apply);
};
return boundStmtFunc;
}
@Override
public Cqld4CqlOp apply(long value) {
Object[] parameters = varbinder.apply(value);
BoundStatement stmt = preparedStmt.bind(parameters);
return new Cqld4CqlPreparedStatement(session, stmt, maxpages, retryreplace, metrics, processors);
return new Cqld4CqlPreparedStatement(
boundSession,
(BoundStatement) getStmtFunc().apply(value),
getMaxPages(),
isRetryReplace(),
processors
);
}
}

View File

@@ -0,0 +1,39 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class Cqld4RawStmtDispenser extends BaseCqlStmtDispenser {
private final LongFunction<Statement> stmtFunc;
private final LongFunction<String> targetFunction;
public Cqld4RawStmtDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
super(sessionFunc, cmd);
this.targetFunction=targetFunction;
this.stmtFunc = super.getStmtFunc();
}
@Override
protected LongFunction<Statement> getPartialStmtFunction(ParsedOp cmd) {
return l -> new SimpleStatementBuilder(targetFunction.apply(l)).build();
}
@Override
public Cqld4CqlOp apply(long value) {
return new Cqld4CqlSimpleStatement(
getSessionFunc().apply(value),
(SimpleStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace()
);
}
}

View File

@@ -2,34 +2,36 @@ package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
public class Cqld4SimpleCqlStmtDispenser extends BaseOpDispenser<Cqld4CqlOp> {
import java.util.function.LongFunction;
private final CqlSession session;
private final ParsedOp cmd;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
public class Cqld4SimpleCqlStmtDispenser extends BaseCqlStmtDispenser {
public Cqld4SimpleCqlStmtDispenser(CqlSession session, ParsedOp cmd) {
super(cmd);
this.session = session;
this.cmd = cmd;
this.maxpages = cmd.getStaticConfigOr("maxpages",1);
this.retryreplace = cmd.getStaticConfigOr("retryreplace",false);
this.metrics = new Cqld4OpMetrics();
private final LongFunction<Statement> stmtFunc;
private final LongFunction<String> targetFunction;
public Cqld4SimpleCqlStmtDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
super(sessionFunc,cmd);
this.targetFunction=targetFunction;
this.stmtFunc = super.getStmtFunc();
}
@Override
protected LongFunction<Statement> getPartialStmtFunction(ParsedOp op) {
return l -> SimpleStatement.newInstance(targetFunction.apply(l));
}
@Override
public Cqld4CqlSimpleStatement apply(long value) {
// TODO: allow for the pre-rendering of a statement with certain template fields, for RAW mode
String stmtBody = cmd.get("stmt",value);
SimpleStatement simpleStatement = SimpleStatement.newInstance(stmtBody);
return new Cqld4CqlSimpleStatement(session,simpleStatement,maxpages,retryreplace,metrics);
return new Cqld4CqlSimpleStatement(
getSessionFunc().apply(value),
(SimpleStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace()
);
}
}

View File

@@ -0,0 +1,26 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4RawStmtDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class CqlD4RawStmtMapper implements OpMapper<Cqld4CqlOp> {
private final LongFunction<CqlSession> sessionFunc;
private final LongFunction<String> targetFunction;
public CqlD4RawStmtMapper(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction) {
this.sessionFunc = sessionFunc;
this.targetFunction = targetFunction;
}
@Override
public OpDispenser<? extends Cqld4CqlOp> apply(ParsedOp cmd) {
return new Cqld4RawStmtDispenser(sessionFunc, targetFunction, cmd);
}
}

View File

@@ -2,14 +2,14 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.RSProcessors;
public class Cqld4CqlBatchStatement extends Cqld4CqlOp {
private final BatchStatement stmt;
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
super(session,maxpages,retryreplace,metrics);
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace) {
super(session,maxpages,retryreplace,new RSProcessors());
this.stmt = stmt;
}

View File

@@ -37,38 +37,24 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
private final CqlSession session;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
private ResultSet rs;
private Cqld4CqlOp nextOp;
private final RSProcessors processors;
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
this.session = session;
this.maxpages = maxpages;
this.retryreplace = retryreplace;
this.processors = new RSProcessors();
this.metrics = metrics;
}
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, RSProcessors processors) {
this.session = session;
this.maxpages = maxpages;
this.retryreplace = retryreplace;
this.processors = processors;
this.metrics = metrics;
}
public final ResultSet apply(long cycle) {
metrics.onStart();
Statement<?> stmt = getStmt();
rs = session.execute(stmt);
processors.start(cycle, rs);
int totalRows=0;
int totalRows = 0;
if (!rs.wasApplied()) {
if (!retryreplace) {
@@ -97,10 +83,9 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
if (rs.isFullyFetched()) {
break;
}
totalRows+=pageRows;
totalRows += pageRows;
}
processors.flush();
metrics.onSuccess();
return rs;
}
@@ -125,7 +110,7 @@ public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture,
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
return new Cqld4CqlReboundStatement(session,maxpages,retryreplace,metrics,rebound,processors);
return new Cqld4CqlReboundStatement(session, maxpages, retryreplace, rebound, processors);
}
}

View File

@@ -2,15 +2,14 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.RSProcessors;
public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
private final BoundStatement stmt;
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
super(session,maxpages,retryreplace,metrics,processors);
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, RSProcessors processors) {
super(session,maxpages,retryreplace,processors);
this.stmt = stmt;
}

View File

@@ -2,13 +2,13 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.RSProcessors;
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
private final SimpleStatement stmt;
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
super(session, maxpages,retryreplace,metrics);
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace) {
super(session, maxpages,retryreplace, new RSProcessors());
this.stmt = stmt;
}