mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
fully generify OpSequence
This commit is contained in:
parent
68c0ba277b
commit
ed91ec7d41
@ -100,7 +100,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
|
|||||||
int tries = 0;
|
int tries = 0;
|
||||||
|
|
||||||
try (Timer.Context bindTime = bindTimer.time()) {
|
try (Timer.Context bindTime = bindTimer.time()) {
|
||||||
readyCQLStatement = sequencer.get(cycleValue);
|
readyCQLStatement = sequencer.apply(cycleValue);
|
||||||
readyCQLStatement.onStart();
|
readyCQLStatement.onStart();
|
||||||
|
|
||||||
statement = readyCQLStatement.bind(cycleValue);
|
statement = readyCQLStatement.bind(cycleValue);
|
||||||
|
@ -95,7 +95,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
|
|||||||
|
|
||||||
// bind timer covers all statement selection and binding, skipping, transforming logic
|
// bind timer covers all statement selection and binding, skipping, transforming logic
|
||||||
try (Timer.Context bindTime = bindTimer.time()) {
|
try (Timer.Context bindTime = bindTimer.time()) {
|
||||||
cqlop.readyCQLStatement = sequencer.get(cycle);
|
cqlop.readyCQLStatement = sequencer.apply(cycle);
|
||||||
cqlop.statement = cqlop.readyCQLStatement.bind(cycle);
|
cqlop.statement = cqlop.readyCQLStatement.bind(cycle);
|
||||||
|
|
||||||
// If a filter is defined, skip and count any statements that do not match it
|
// If a filter is defined, skip and count any statements that do not match it
|
||||||
|
@ -99,7 +99,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
|
|||||||
int tries = 0;
|
int tries = 0;
|
||||||
|
|
||||||
try (Timer.Context bindTime = bindTimer.time()) {
|
try (Timer.Context bindTime = bindTimer.time()) {
|
||||||
readyCQLStatement = sequencer.get(cycleValue);
|
readyCQLStatement = sequencer.apply(cycleValue);
|
||||||
readyCQLStatement.onStart();
|
readyCQLStatement.onStart();
|
||||||
|
|
||||||
statement = readyCQLStatement.bind(cycleValue);
|
statement = readyCQLStatement.bind(cycleValue);
|
||||||
|
@ -95,7 +95,7 @@ public class CqlAsyncAction extends BaseAsyncAction<CqlOpData, CqlActivity> {
|
|||||||
|
|
||||||
// bind timer covers all statement selection and binding, skipping, transforming logic
|
// bind timer covers all statement selection and binding, skipping, transforming logic
|
||||||
try (Timer.Context bindTime = bindTimer.time()) {
|
try (Timer.Context bindTime = bindTimer.time()) {
|
||||||
cqlop.readyCQLStatement = sequencer.get(cycle);
|
cqlop.readyCQLStatement = sequencer.apply(cycle);
|
||||||
cqlop.statement = cqlop.readyCQLStatement.bind(cycle);
|
cqlop.statement = cqlop.readyCQLStatement.bind(cycle);
|
||||||
|
|
||||||
// If a filter is defined, skip and count any statements that do not match it
|
// If a filter is defined, skip and count any statements that do not match it
|
||||||
|
@ -43,7 +43,7 @@ public class Cqld4Action implements SyncAction, ActivityDefObserver {
|
|||||||
|
|
||||||
Cqld4Op cql4op;
|
Cqld4Op cql4op;
|
||||||
try (Timer.Context ctx = bindTimer.time()) {
|
try (Timer.Context ctx = bindTimer.time()) {
|
||||||
OpDispenser<Cqld4Op> opDispenser = activity.getSequence().get(cycle);
|
OpDispenser<Cqld4Op> opDispenser = activity.getSequence().apply(cycle);
|
||||||
cql4op = opDispenser.apply(cycle);
|
cql4op = opDispenser.apply(cycle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ public class GraphAction implements SyncAction, ActivityDefObserver {
|
|||||||
|
|
||||||
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
||||||
|
|
||||||
BindableGraphStatement bindableGraphStatement = opSequencer.get(cycle);
|
BindableGraphStatement bindableGraphStatement = opSequencer.apply(cycle);
|
||||||
simpleGraphStatement = bindableGraphStatement.bind(cycle);
|
simpleGraphStatement = bindableGraphStatement.bind(cycle);
|
||||||
|
|
||||||
if (showstmts) {
|
if (showstmts) {
|
||||||
|
@ -62,7 +62,7 @@ public class HttpAction implements SyncAction {
|
|||||||
// operation for execution, including data generation as well as
|
// operation for execution, including data generation as well as
|
||||||
// op construction
|
// op construction
|
||||||
try (Timer.Context bindTime = httpActivity.bindTimer.time()) {
|
try (Timer.Context bindTime = httpActivity.bindTimer.time()) {
|
||||||
LongFunction<HttpOp> readyOp = sequencer.get(cycle);
|
LongFunction<HttpOp> readyOp = sequencer.apply(cycle);
|
||||||
httpOp = readyOp.apply(cycle);
|
httpOp = readyOp.apply(cycle);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (httpActivity.isDiagnosticMode()) {
|
if (httpActivity.isDiagnosticMode()) {
|
||||||
|
@ -46,7 +46,7 @@ public class HttpAsyncAction extends BaseAsyncAction<HttpAsyncOp, HttpActivity>
|
|||||||
@Override
|
@Override
|
||||||
public LongFunction<HttpAsyncOp> getOpInitFunction() {
|
public LongFunction<HttpAsyncOp> getOpInitFunction() {
|
||||||
return l -> {
|
return l -> {
|
||||||
LongFunction<HttpOp> readyHttpOp = sequencer.get(l);
|
LongFunction<HttpOp> readyHttpOp = sequencer.apply(l);
|
||||||
return new HttpAsyncOp(this,readyHttpOp,l,client);
|
return new HttpAsyncOp(this,readyHttpOp,l,client);
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ public class JmsAction implements SyncAction {
|
|||||||
|
|
||||||
JmsOp jmsOp;
|
JmsOp jmsOp;
|
||||||
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
||||||
LongFunction<JmsOp> readyJmsOp = activity.getSequencer().get(cycle);
|
LongFunction<JmsOp> readyJmsOp = activity.getSequencer().apply(cycle);
|
||||||
jmsOp = readyJmsOp.apply(cycle);
|
jmsOp = readyJmsOp.apply(cycle);
|
||||||
} catch (Exception bindException) {
|
} catch (Exception bindException) {
|
||||||
// if diagnostic mode ...
|
// if diagnostic mode ...
|
||||||
|
@ -32,7 +32,7 @@ public class JMXAction implements SyncAction {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int runCycle(long cycle) {
|
public int runCycle(long cycle) {
|
||||||
LongFunction<JmxOp> readyJmxOp = sequencer.get(cycle);
|
LongFunction<JmxOp> readyJmxOp = sequencer.apply(cycle);
|
||||||
JmxOp jmxOp = readyJmxOp.apply(cycle);
|
JmxOp jmxOp = readyJmxOp.apply(cycle);
|
||||||
jmxOp.execute();
|
jmxOp.execute();
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -27,7 +27,7 @@ public class KafkaAction implements SyncAction {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int runCycle(long cycle) {
|
public int runCycle(long cycle) {
|
||||||
sequencer.get(cycle).write(cycle);
|
sequencer.apply(cycle).write(cycle);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ public class MongoAction implements SyncAction {
|
|||||||
ReadyMongoStatement rms;
|
ReadyMongoStatement rms;
|
||||||
Bson queryBson;
|
Bson queryBson;
|
||||||
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
||||||
rms = sequencer.get(cycle);
|
rms = sequencer.apply(cycle);
|
||||||
queryBson = rms.bind(cycle);
|
queryBson = rms.bind(cycle);
|
||||||
|
|
||||||
// Maybe show the query in log/console - only for diagnostic use
|
// Maybe show the query in log/console - only for diagnostic use
|
||||||
|
@ -37,7 +37,7 @@ public class PulsarAction implements SyncAction {
|
|||||||
|
|
||||||
PulsarOp pulsarOp;
|
PulsarOp pulsarOp;
|
||||||
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
||||||
LongFunction<PulsarOp> readyPulsarOp = activity.getSequencer().get(cycle);
|
LongFunction<PulsarOp> readyPulsarOp = activity.getSequencer().apply(cycle);
|
||||||
pulsarOp = readyPulsarOp.apply(cycle);
|
pulsarOp = readyPulsarOp.apply(cycle);
|
||||||
} catch (Exception bindException) {
|
} catch (Exception bindException) {
|
||||||
// if diagnostic mode ...
|
// if diagnostic mode ...
|
||||||
|
@ -32,7 +32,7 @@ public class AsyncStdoutAction extends BaseAsyncAction<StdoutOpContext, StdoutAc
|
|||||||
|
|
||||||
StdoutOpContext opc = new StdoutOpContext();
|
StdoutOpContext opc = new StdoutOpContext();
|
||||||
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
||||||
opc.stringBindings = sequencer.get(cycle);
|
opc.stringBindings = sequencer.apply(cycle);
|
||||||
opc.statement = opc.stringBindings.bind(cycle);
|
opc.statement = opc.stringBindings.bind(cycle);
|
||||||
if (activity.getShowstmts()) {
|
if (activity.getShowstmts()) {
|
||||||
logger.info("STMT(cycle=" + cycle + "):\n" + opc.statement);
|
logger.info("STMT(cycle=" + cycle + "):\n" + opc.statement);
|
||||||
|
@ -19,10 +19,10 @@ package io.nosqlbench.activitytype.stdout;
|
|||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||||
import io.nosqlbench.engine.api.activityapi.planning.OpSource;
|
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||||
import io.nosqlbench.virtdata.core.templates.StringBindings;
|
import io.nosqlbench.virtdata.core.templates.StringBindings;
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
@SuppressWarnings("Duplicates")
|
@SuppressWarnings("Duplicates")
|
||||||
public class StdoutAction implements SyncAction {
|
public class StdoutAction implements SyncAction {
|
||||||
@ -32,7 +32,7 @@ public class StdoutAction implements SyncAction {
|
|||||||
private final StdoutActivity activity;
|
private final StdoutActivity activity;
|
||||||
private final int maxTries = 10;
|
private final int maxTries = 10;
|
||||||
private boolean showstmts;
|
private boolean showstmts;
|
||||||
private OpSource<StringBindings> opsource;
|
private OpSequence<StringBindings> opsource;
|
||||||
|
|
||||||
public StdoutAction(int slot, StdoutActivity activity) {
|
public StdoutAction(int slot, StdoutActivity activity) {
|
||||||
this.slot = slot;
|
this.slot = slot;
|
||||||
@ -49,7 +49,7 @@ public class StdoutAction implements SyncAction {
|
|||||||
StringBindings stringBindings;
|
StringBindings stringBindings;
|
||||||
String statement = null;
|
String statement = null;
|
||||||
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
||||||
stringBindings = opsource.get(cycle);
|
stringBindings = opsource.apply(cycle);
|
||||||
statement = stringBindings.bind(cycle);
|
statement = stringBindings.bind(cycle);
|
||||||
showstmts = activity.getShowstmts();
|
showstmts = activity.getShowstmts();
|
||||||
if (showstmts) {
|
if (showstmts) {
|
||||||
|
@ -45,7 +45,7 @@ public class WebDriverAction implements SyncAction, ActivityDefObserver {
|
|||||||
@Override
|
@Override
|
||||||
public int runCycle(long cycle) {
|
public int runCycle(long cycle) {
|
||||||
|
|
||||||
CommandTemplate commandTemplate = activity.getOpSequence().get(cycle);
|
CommandTemplate commandTemplate = activity.getOpSequence().apply(cycle);
|
||||||
try {
|
try {
|
||||||
WebDriverVerbs.execute(cycle, commandTemplate, context, dryrun);
|
WebDriverVerbs.execute(cycle, commandTemplate, context, dryrun);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -19,6 +19,7 @@ package io.nosqlbench.engine.api.activityapi.planning;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An OpSequence provides fast access to a set of operations in a specific
|
* An OpSequence provides fast access to a set of operations in a specific
|
||||||
@ -26,10 +27,10 @@ import java.util.function.Function;
|
|||||||
*
|
*
|
||||||
* @param <T> The type of element which is to be sequenced
|
* @param <T> The type of element which is to be sequenced
|
||||||
*/
|
*/
|
||||||
public interface OpSequence<T> extends OpSource<T> {
|
public interface OpSequence<T> extends LongFunction<T> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the list of individual operations which could be returned by {@link #get(long)}.
|
* Get the list of individual operations which could be returned by {@link #apply(long)}.
|
||||||
* @return A {@link List} of T
|
* @return A {@link List} of T
|
||||||
*/
|
*/
|
||||||
List<T> getOps();
|
List<T> getOps();
|
||||||
@ -47,4 +48,5 @@ public interface OpSequence<T> extends OpSource<T> {
|
|||||||
* @return A new OpSequence of type U
|
* @return A new OpSequence of type U
|
||||||
*/
|
*/
|
||||||
<U> OpSequence<U> transform(Function<T, U> func);
|
<U> OpSequence<U> transform(Function<T, U> func);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,7 @@ public class Sequence<T> implements OpSequence<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T get(long selector) {
|
public T apply(long selector) {
|
||||||
int index = (int) (selector % seq.length);
|
int index = (int) (selector % seq.length);
|
||||||
index = seq[index];
|
index = seq[index];
|
||||||
return elems.get(index);
|
return elems.get(index);
|
||||||
|
Loading…
Reference in New Issue
Block a user