From 11245c07ae26bd9dcbe967116f7e0f99eeb46cea Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Tue, 15 Feb 2022 21:23:24 -0600 Subject: [PATCH] distinguish between base runnables and op measurables --- .../cqld4/Cqld4CqlReboundStatement.java | 4 +- .../opdispensers/BaseCqlStmtDispenser.java | 82 +++++++++++++++++++ .../Cqld4PreparedStmtDispenser.java | 56 ++++++++----- .../opdispensers/Cqld4RawStmtDispenser.java | 39 +++++++++ .../Cqld4SimpleCqlStmtDispenser.java | 42 +++++----- .../cqld4/opmappers/CqlD4RawStmtMapper.java | 26 ++++++ .../cqld4/optypes/Cqld4CqlBatchStatement.java | 6 +- .../adapter/cqld4/optypes/Cqld4CqlOp.java | 23 +----- .../optypes/Cqld4CqlPreparedStatement.java | 5 +- .../optypes/Cqld4CqlSimpleStatement.java | 6 +- .../activityimpl/uniform/ResultProcessor.java | 1 + .../uniform/flowtypes/RunnableSource.java | 6 ++ .../activitytype/stdout/StdoutActivity.java | 3 +- 13 files changed, 225 insertions(+), 74 deletions(-) create mode 100644 adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/BaseCqlStmtDispenser.java create mode 100644 adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java create mode 100644 adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4RawStmtMapper.java create mode 100644 adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/RunnableSource.java diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/Cqld4CqlReboundStatement.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/Cqld4CqlReboundStatement.java index 6be4e5a3a..6026214f4 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/Cqld4CqlReboundStatement.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/Cqld4CqlReboundStatement.java @@ -8,8 +8,8 @@ import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; public class Cqld4CqlReboundStatement extends Cqld4CqlOp { private final BoundStatement stmt; - public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, BoundStatement rebound, RSProcessors processors) { - super(session,maxpages,retryreplace,metrics,processors); + public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, BoundStatement rebound, RSProcessors processors) { + super(session,maxpages,retryreplace,processors); this.stmt = rebound; } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/BaseCqlStmtDispenser.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/BaseCqlStmtDispenser.java new file mode 100644 index 000000000..b19ec757d --- /dev/null +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/BaseCqlStmtDispenser.java @@ -0,0 +1,82 @@ +package io.nosqlbench.adapter.cqld4.opdispensers; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.cql.Statement; +import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics; +import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; +import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; +import io.nosqlbench.engine.api.templating.ParsedOp; + +import java.util.function.LongFunction; + +public abstract class BaseCqlStmtDispenser extends BaseOpDispenser { + + private final LongFunction stmtFunc; + private final int maxpages; + private final Cqld4OpMetrics metrics = new Cqld4OpMetrics(); + private final LongFunction sessionFunc; + private final boolean isRetryReplace; + + public BaseCqlStmtDispenser(LongFunction sessionFunc, ParsedOp op) { + super(op); + this.sessionFunc = sessionFunc; + this.stmtFunc = this.getCommonStmtFunc(op); + this.maxpages = op.getStaticConfigOr("maxpages",1); + this.isRetryReplace = op.getStaticConfigOr("retryreplace",false); + } + + public int getMaxPages() { + return maxpages; + } + + public boolean isRetryReplace() { + return isRetryReplace; + } + + public LongFunction getSessionFunc() { + return sessionFunc; + } + /** + * Implement this method to define a statement function, considering only + * the functionality that is specific to that statement type. + * Do not implement decorators which apply to {@link Statement} as these are + * applied uniformly internal to the logic of {@link #getStmtFunc()}. + * @param op The parsed op template + * @return A statement function + */ + protected abstract LongFunction getPartialStmtFunction(ParsedOp op); + + /** + * All implementations of a CQL Statement Dispenser should be using the method + * provided by this function. This ensures that {@link Statement}-level attributes + * are handled uniformly and in one place. + * @return A function which produces a statement, fully ready to execute, with all + * cross-type attributes handled consistently. + */ + public LongFunction getStmtFunc() { + return stmtFunc; + } + + /** + * Any {@link Statement}-level attributes need to be handled here. + * This is the initializer for the {@link #getStmtFunc()}} accessor method. + * This takes the base statement function and decorates it optionally with each + * additional qualified modifier, short-circuiting those which are not specified. + * This allows default behavior to take precedence as well as avoids unnecessary calling + * overhead for implicit attributes. + * @param op A parsed op template. + * @return A function which is used to construct {@link Statement} objects, ready to run. + * However, this method is hidden to ensure that it is used only as a one-time initializer + * at construction time. + */ + private LongFunction getCommonStmtFunc(ParsedOp op) { + LongFunction partial = getPartialStmtFunction(op); + partial = op.enhanceEnum(partial, "cl", DefaultConsistencyLevel.class, Statement::setConsistencyLevel); + partial = op.enhanceEnum(partial, "scl", DefaultConsistencyLevel.class, Statement::setSerialConsistencyLevel); + partial = op.enhance(partial, "idempotent", Boolean.class, Statement::setIdempotent); + return partial; + } + + +} diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java index 87d3ff81d..99b57755a 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4PreparedStmtDispenser.java @@ -3,47 +3,59 @@ package io.nosqlbench.adapter.cqld4.opdispensers; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; import com.datastax.oss.driver.api.core.cql.PreparedStatement; -import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; -import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics; +import com.datastax.oss.driver.api.core.cql.Statement; import io.nosqlbench.adapter.cqld4.RSProcessors; +import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement; -import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; import io.nosqlbench.engine.api.templating.ParsedOp; import io.nosqlbench.virtdata.core.templates.ParsedTemplate; import java.util.function.LongFunction; -public class Cqld4PreparedStmtDispenser extends BaseOpDispenser { +public class Cqld4PreparedStmtDispenser extends BaseCqlStmtDispenser { - private final CqlSession session; - - private final LongFunction varbinder; - private final PreparedStatement preparedStmt; - private final int maxpages; - private final boolean retryreplace; - private final Cqld4OpMetrics metrics; private final RSProcessors processors; + private final LongFunction stmtFunc; + private PreparedStatement preparedStmt; + private CqlSession boundSession; - public Cqld4PreparedStmtDispenser(CqlSession session, ParsedOp cmd, RSProcessors processors) { - super(cmd); - this.session = session; + public Cqld4PreparedStmtDispenser(LongFunction sessionFunc, ParsedOp cmd, RSProcessors processors) { + super(sessionFunc, cmd); + if (cmd.isDynamic("space")) { + throw new RuntimeException("Prepared statements and dynamic space values are not supported." + + " This would churn the prepared statement cache, defeating the purpose of prepared statements."); + } this.processors = processors; + stmtFunc = super.getStmtFunc(); + } + @Override + protected LongFunction getPartialStmtFunction(ParsedOp cmd) { + + LongFunction varbinder; ParsedTemplate parsed = cmd.getStmtAsTemplate().orElseThrow(); varbinder = cmd.newArrayBinderFromBindPoints(parsed.getBindPoints()); - String preparedQueryString = parsed.getPositionalStatement(s -> "?"); - preparedStmt = session.prepare(preparedQueryString); + boundSession = getSessionFunc().apply(0); + preparedStmt = boundSession.prepare(preparedQueryString); - this.maxpages = cmd.getStaticConfigOr("maxpages",1); - this.retryreplace = cmd.getStaticConfigOr("retryreplace", false); - this.metrics = new Cqld4OpMetrics(); + LongFunction boundStmtFunc = c -> { + Object[] apply = varbinder.apply(c); + return preparedStmt.bind(apply); + }; + return boundStmtFunc; } @Override public Cqld4CqlOp apply(long value) { - Object[] parameters = varbinder.apply(value); - BoundStatement stmt = preparedStmt.bind(parameters); - return new Cqld4CqlPreparedStatement(session, stmt, maxpages, retryreplace, metrics, processors); + + return new Cqld4CqlPreparedStatement( + boundSession, + (BoundStatement) getStmtFunc().apply(value), + getMaxPages(), + isRetryReplace(), + processors + ); } + } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java new file mode 100644 index 000000000..7e0633e5d --- /dev/null +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4RawStmtDispenser.java @@ -0,0 +1,39 @@ +package io.nosqlbench.adapter.cqld4.opdispensers; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder; +import com.datastax.oss.driver.api.core.cql.Statement; +import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; +import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement; +import io.nosqlbench.engine.api.templating.ParsedOp; + +import java.util.function.LongFunction; + +public class Cqld4RawStmtDispenser extends BaseCqlStmtDispenser { + + private final LongFunction stmtFunc; + private final LongFunction targetFunction; + + public Cqld4RawStmtDispenser(LongFunction sessionFunc, LongFunction targetFunction, ParsedOp cmd) { + super(sessionFunc, cmd); + this.targetFunction=targetFunction; + this.stmtFunc = super.getStmtFunc(); + } + + @Override + protected LongFunction getPartialStmtFunction(ParsedOp cmd) { + return l -> new SimpleStatementBuilder(targetFunction.apply(l)).build(); + } + + @Override + public Cqld4CqlOp apply(long value) { + return new Cqld4CqlSimpleStatement( + getSessionFunc().apply(value), + (SimpleStatement) stmtFunc.apply(value), + getMaxPages(), + isRetryReplace() + ); + } + +} diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java index f5a9d423c..47d795624 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opdispensers/Cqld4SimpleCqlStmtDispenser.java @@ -2,34 +2,36 @@ package io.nosqlbench.adapter.cqld4.opdispensers; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.SimpleStatement; -import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; -import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics; +import com.datastax.oss.driver.api.core.cql.Statement; import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement; -import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; import io.nosqlbench.engine.api.templating.ParsedOp; -public class Cqld4SimpleCqlStmtDispenser extends BaseOpDispenser { +import java.util.function.LongFunction; - private final CqlSession session; - private final ParsedOp cmd; - private final int maxpages; - private final boolean retryreplace; - private final Cqld4OpMetrics metrics; +public class Cqld4SimpleCqlStmtDispenser extends BaseCqlStmtDispenser { - public Cqld4SimpleCqlStmtDispenser(CqlSession session, ParsedOp cmd) { - super(cmd); - this.session = session; - this.cmd = cmd; - this.maxpages = cmd.getStaticConfigOr("maxpages",1); - this.retryreplace = cmd.getStaticConfigOr("retryreplace",false); - this.metrics = new Cqld4OpMetrics(); + private final LongFunction stmtFunc; + private final LongFunction targetFunction; + + public Cqld4SimpleCqlStmtDispenser(LongFunction sessionFunc, LongFunction targetFunction, ParsedOp cmd) { + super(sessionFunc,cmd); + this.targetFunction=targetFunction; + this.stmtFunc = super.getStmtFunc(); + } + + @Override + protected LongFunction getPartialStmtFunction(ParsedOp op) { + return l -> SimpleStatement.newInstance(targetFunction.apply(l)); } @Override public Cqld4CqlSimpleStatement apply(long value) { - // TODO: allow for the pre-rendering of a statement with certain template fields, for RAW mode - String stmtBody = cmd.get("stmt",value); - SimpleStatement simpleStatement = SimpleStatement.newInstance(stmtBody); - return new Cqld4CqlSimpleStatement(session,simpleStatement,maxpages,retryreplace,metrics); + return new Cqld4CqlSimpleStatement( + getSessionFunc().apply(value), + (SimpleStatement) stmtFunc.apply(value), + getMaxPages(), + isRetryReplace() + ); } + } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4RawStmtMapper.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4RawStmtMapper.java new file mode 100644 index 000000000..53e3d7984 --- /dev/null +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/opmappers/CqlD4RawStmtMapper.java @@ -0,0 +1,26 @@ +package io.nosqlbench.adapter.cqld4.opmappers; + +import com.datastax.oss.driver.api.core.CqlSession; +import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4RawStmtDispenser; +import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp; +import io.nosqlbench.engine.api.activityimpl.OpDispenser; +import io.nosqlbench.engine.api.activityimpl.OpMapper; +import io.nosqlbench.engine.api.templating.ParsedOp; + +import java.util.function.LongFunction; + +public class CqlD4RawStmtMapper implements OpMapper { + + private final LongFunction sessionFunc; + private final LongFunction targetFunction; + + public CqlD4RawStmtMapper(LongFunction sessionFunc, LongFunction targetFunction) { + this.sessionFunc = sessionFunc; + this.targetFunction = targetFunction; + } + + @Override + public OpDispenser apply(ParsedOp cmd) { + return new Cqld4RawStmtDispenser(sessionFunc, targetFunction, cmd); + } +} diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlBatchStatement.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlBatchStatement.java index d68d6a66f..49d522a36 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlBatchStatement.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlBatchStatement.java @@ -2,14 +2,14 @@ package io.nosqlbench.adapter.cqld4.optypes; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BatchStatement; -import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics; +import io.nosqlbench.adapter.cqld4.RSProcessors; public class Cqld4CqlBatchStatement extends Cqld4CqlOp { private final BatchStatement stmt; - public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) { - super(session,maxpages,retryreplace,metrics); + public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace) { + super(session,maxpages,retryreplace,new RSProcessors()); this.stmt = stmt; } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java index a16435d8f..f50ecf7eb 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlOp.java @@ -37,38 +37,24 @@ public abstract class Cqld4CqlOp implements CycleOp, VariableCapture, private final CqlSession session; private final int maxpages; private final boolean retryreplace; - private final Cqld4OpMetrics metrics; private ResultSet rs; private Cqld4CqlOp nextOp; private final RSProcessors processors; - public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) { - this.session = session; - this.maxpages = maxpages; - this.retryreplace = retryreplace; - this.processors = new RSProcessors(); - this.metrics = metrics; - } - - public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) { + public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, RSProcessors processors) { this.session = session; this.maxpages = maxpages; this.retryreplace = retryreplace; this.processors = processors; - this.metrics = metrics; } public final ResultSet apply(long cycle) { - metrics.onStart(); Statement stmt = getStmt(); - rs = session.execute(stmt); - processors.start(cycle, rs); - - int totalRows=0; + int totalRows = 0; if (!rs.wasApplied()) { if (!retryreplace) { @@ -97,10 +83,9 @@ public abstract class Cqld4CqlOp implements CycleOp, VariableCapture, if (rs.isFullyFetched()) { break; } - totalRows+=pageRows; + totalRows += pageRows; } processors.flush(); - metrics.onSuccess(); return rs; } @@ -125,7 +110,7 @@ public abstract class Cqld4CqlOp implements CycleOp, VariableCapture, private Cqld4CqlOp rebindLwt(Statement stmt, Row row) { BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row); - return new Cqld4CqlReboundStatement(session,maxpages,retryreplace,metrics,rebound,processors); + return new Cqld4CqlReboundStatement(session, maxpages, retryreplace, rebound, processors); } } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java index 041d0bd4f..bc0e0d589 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlPreparedStatement.java @@ -2,15 +2,14 @@ package io.nosqlbench.adapter.cqld4.optypes; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.BoundStatement; -import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics; import io.nosqlbench.adapter.cqld4.RSProcessors; public class Cqld4CqlPreparedStatement extends Cqld4CqlOp { private final BoundStatement stmt; - public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) { - super(session,maxpages,retryreplace,metrics,processors); + public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, RSProcessors processors) { + super(session,maxpages,retryreplace,processors); this.stmt = stmt; } diff --git a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java index c705c1600..e45f505d5 100644 --- a/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java +++ b/adapter-cqld4/src/main/java/io/nosqlbench/adapter/cqld4/optypes/Cqld4CqlSimpleStatement.java @@ -2,13 +2,13 @@ package io.nosqlbench.adapter.cqld4.optypes; import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.SimpleStatement; -import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics; +import io.nosqlbench.adapter.cqld4.RSProcessors; public class Cqld4CqlSimpleStatement extends Cqld4CqlOp { private final SimpleStatement stmt; - public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) { - super(session, maxpages,retryreplace,metrics); + public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace) { + super(session, maxpages,retryreplace, new RSProcessors()); this.stmt = stmt; } diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ResultProcessor.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ResultProcessor.java index 9c787652d..724535e88 100644 --- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ResultProcessor.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/ResultProcessor.java @@ -28,4 +28,5 @@ public interface ResultProcessor { * ResultProcessors which need to see all the data can finish processing here. */ void flush(); + } diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/RunnableSource.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/RunnableSource.java new file mode 100644 index 000000000..3cfd04595 --- /dev/null +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/flowtypes/RunnableSource.java @@ -0,0 +1,6 @@ +package io.nosqlbench.engine.api.activityimpl.uniform.flowtypes; + +import java.util.function.Supplier; + +public interface RunnableSource extends Supplier { +} diff --git a/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivity.java b/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivity.java index d4351e836..7bc705ef4 100644 --- a/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivity.java +++ b/driver-stdout/src/main/java/io/nosqlbench/activitytype/stdout/StdoutActivity.java @@ -173,8 +173,7 @@ public class StdoutActivity extends SimpleActivity implements ActivityDefObserve String generatedStmt = genStatementTemplate(activeBindingNames); BindingsTemplate bt = new BindingsTemplate(); stmtsDocList.getDocBindings().forEach(bt::addFieldBinding); - StringBindingsTemplate sbt = new StringBindingsTemplate(generatedStmt, bt); - StringBindings sb = sbt.resolve(); + StringBindings sb = new StringBindings(generatedStmt,bt.getMap()); sequencer.addOp(sb, 1L); } } else if (stmts.size() > 0) {