mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
move op method on op dispenser to template method to allow base impl to intercede
This commit is contained in:
parent
80a70b183c
commit
897140c871
@ -120,7 +120,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
|
||||
|
||||
|
||||
@Override
|
||||
public AmqpTimeTrackOp apply(long cycle) {
|
||||
public AmqpTimeTrackOp getOp(long cycle) {
|
||||
Channel channel = getAmqpChannelForReceiver(cycle);
|
||||
if (channel == null) {
|
||||
throw new AmqpAdapterUnexpectedException(
|
||||
|
@ -175,7 +175,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AmqpTimeTrackOp apply(long cycle) {
|
||||
public AmqpTimeTrackOp getOp(long cycle) {
|
||||
String msgPayload = msgPayloadFunc.apply(cycle);
|
||||
if (StringUtils.isBlank(msgPayload)) {
|
||||
throw new AmqpAdapterInvalidParamException("Message payload must be specified and can't be empty!");
|
||||
|
@ -44,7 +44,7 @@ public abstract class BaseOpenSearchOpDispenser extends BaseOpDispenser<Op,Objec
|
||||
);
|
||||
|
||||
@Override
|
||||
public Op apply(long value) {
|
||||
public Op getOp(long value) {
|
||||
return opF.apply(value);
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ public class CqlD4RainbowTableDispenser extends Cqld4BaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4CqlOp apply(long cycle) {
|
||||
public Cqld4CqlOp getOp(long cycle) {
|
||||
throw new RuntimeException("implement me");
|
||||
// return new Cqld4RainbowTableOp(
|
||||
// getSessionFunc().apply(value),
|
||||
|
@ -57,7 +57,7 @@ public class Cqld4FluentGraphOpDispenser extends BaseOpDispenser<Op, Cqld4Space>
|
||||
}
|
||||
|
||||
@Override
|
||||
public Op apply(long value) {
|
||||
public Op getOp(long value) {
|
||||
String graphname = graphnameFunc.apply(value);
|
||||
Script script = tlScript.get();
|
||||
Map<String, Object> allMap = virtdataBindings.getAllMap(value);
|
||||
|
@ -55,7 +55,7 @@ public class Cqld4GremlinOpDispenser extends BaseOpDispenser<Cqld4ScriptGraphOp,
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4ScriptGraphOp apply(long value) {
|
||||
public Cqld4ScriptGraphOp getOp(long value) {
|
||||
ScriptGraphStatement stmt = stmtFunc.apply(value);
|
||||
if (diagFunc.apply(value)>0L) {
|
||||
System.out.println("## GREMLIN DIAG: ScriptGraphStatement on graphname(" + stmt.getGraphName() + "):\n" + stmt.getScript());
|
||||
|
@ -85,7 +85,7 @@ public class Cqld4PreparedStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4CqlOp apply(long cycle) {
|
||||
public Cqld4CqlOp getOp(long cycle) {
|
||||
|
||||
BoundStatement boundStatement;
|
||||
try {
|
||||
|
@ -44,7 +44,7 @@ public class Cqld4RawStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4CqlOp apply(long value) {
|
||||
public Cqld4CqlOp getOp(long value) {
|
||||
return new Cqld4CqlSimpleStatement(
|
||||
getSessionFunc().apply(value),
|
||||
(SimpleStatement) stmtFunc.apply(value),
|
||||
|
@ -41,7 +41,7 @@ public class Cqld4SimpleCqlStmtDispenser extends Cqld4BaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4CqlSimpleStatement apply(long value) {
|
||||
public Cqld4CqlSimpleStatement getOp(long value) {
|
||||
return new Cqld4CqlSimpleStatement(
|
||||
getSessionFunc().apply(value),
|
||||
(SimpleStatement) stmtFunc.apply(value),
|
||||
|
@ -40,7 +40,7 @@ public class Cqld4SsTableDispenser extends Cqld4BaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cqld4CqlOp apply(long cycle) {
|
||||
public Cqld4CqlOp getOp(long cycle) {
|
||||
// return new CqlD4SsTable(
|
||||
// getSessionFunc().apply(value),
|
||||
// (SsTable) stmtFunc.apply(value),
|
||||
|
@ -129,7 +129,7 @@ public class DiagOpDispenser extends BaseOpDispenser<DiagOp,DiagSpace> implement
|
||||
}
|
||||
|
||||
@Override
|
||||
public DiagOp apply(long value) {
|
||||
public DiagOp getOp(long value) {
|
||||
return opFunc.apply(value);
|
||||
}
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ public class DDBCreateTableOpDispenser extends BaseOpDispenser<DynamoDBOp, Dynam
|
||||
}
|
||||
|
||||
@Override
|
||||
public DDBCreateTableOp apply(long cycle) {
|
||||
public DDBCreateTableOp getOp(long cycle) {
|
||||
CreateTableRequest rq = new CreateTableRequest();
|
||||
rq.setTableName(tableNameFunc.apply(cycle));
|
||||
rq.setKeySchema(keySchemaFunc.apply(cycle));
|
||||
|
@ -47,7 +47,7 @@ public class DDBDeleteTableOpDispenser extends BaseOpDispenser<DynamoDBOp, Dynam
|
||||
}
|
||||
|
||||
@Override
|
||||
public DDBDeleteTableOp apply(long cycle) {
|
||||
public DDBDeleteTableOp getOp(long cycle) {
|
||||
DeleteTableRequest rq = new DeleteTableRequest();
|
||||
rq.setTableName(tableNameFunc.apply(cycle));
|
||||
return new DDBDeleteTableOp(ddb, rq);
|
||||
|
@ -83,7 +83,7 @@ public class DDBGetItemOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDBS
|
||||
}
|
||||
|
||||
@Override
|
||||
public DDBGetItemOp apply(long value) {
|
||||
public DDBGetItemOp getOp(long value) {
|
||||
Table table = targetTableFunction.apply(value);
|
||||
GetItemSpec getitemSpec = getItemSpecFunc.apply(value);
|
||||
return new DDBGetItemOp(ddb, table, getitemSpec);
|
||||
|
@ -51,7 +51,7 @@ public class DDBPutItemOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDBS
|
||||
}
|
||||
|
||||
@Override
|
||||
public DynamoDBOp apply(long value) {
|
||||
public DynamoDBOp getOp(long value) {
|
||||
String tablename = tableNameFunc.apply(value);
|
||||
Item item = itemfunc.apply(value);
|
||||
return new DDBPutItemOp(ddb,tablename,item);
|
||||
|
@ -150,7 +150,7 @@ public class DDBQueryOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDBSpa
|
||||
}
|
||||
|
||||
@Override
|
||||
public DDBQueryOp apply(long cycle) {
|
||||
public DDBQueryOp getOp(long cycle) {
|
||||
Table table = tableFunc.apply(cycle);
|
||||
QuerySpec queryspec = querySpecFunc.apply(cycle);
|
||||
return new DDBQueryOp(ddb,table,queryspec);
|
||||
|
@ -44,7 +44,7 @@ public class RawDynamoDBOpDispenser extends BaseOpDispenser<DynamoDBOp, DynamoDB
|
||||
}
|
||||
|
||||
@Override
|
||||
public DynamoDBOp apply(long value) {
|
||||
public DynamoDBOp getOp(long value) {
|
||||
String body = jsonFunction.apply(value);
|
||||
return new RawDynamodOp(ddb,body);
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ public class HttpOpDispenser extends BaseOpDispenser<HttpOp, HttpSpace> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpOp apply(long value) {
|
||||
public HttpOp getOp(long value) {
|
||||
HttpOp op = this.opFunc.apply(value);
|
||||
return op;
|
||||
|
||||
|
@ -48,7 +48,7 @@ public class JDBCDDLOpDispenser extends JDBCBaseOpDispenser {
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public JDBCDDLOp apply(long cycle) {
|
||||
public JDBCDDLOp getOp(long cycle) {
|
||||
String ddlSqlStr = ddlSqlStrFunc.apply(cycle);
|
||||
return new JDBCDDLOp(jdbcSpace, ddlSqlStr);
|
||||
}
|
||||
|
@ -101,7 +101,7 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public JDBCDMLOp apply(long cycle) {
|
||||
public JDBCDMLOp getOp(long cycle) {
|
||||
if (isReadStatement) {
|
||||
return new JDBCDMLReadOp(
|
||||
jdbcSpace,
|
||||
|
@ -155,7 +155,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public KafkaOp apply(final long cycle) {
|
||||
public KafkaOp getOp(final long cycle) {
|
||||
final List<String> topicNameList = this.getEffectiveTopicNameList(cycle);
|
||||
final String groupId = this.getEffectiveGroupId(cycle);
|
||||
if ((0 == topicNameList.size()) || StringUtils.isBlank(groupId)) throw new KafkaAdapterInvalidParamException(
|
||||
|
@ -200,7 +200,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public KafkaOp apply(final long cycle) {
|
||||
public KafkaOp getOp(final long cycle) {
|
||||
final String topicName = this.topicNameStrFunc.apply(cycle);
|
||||
final String clientId = this.getEffectiveClientId(cycle);
|
||||
|
||||
|
@ -58,7 +58,7 @@ public abstract class MilvusBaseOpDispenser<T> extends BaseOpDispenser<MilvusBas
|
||||
);
|
||||
|
||||
@Override
|
||||
public MilvusBaseOp<T> apply(long value) {
|
||||
public MilvusBaseOp<T> getOp(long value) {
|
||||
return opF.apply(value);
|
||||
}
|
||||
}
|
||||
|
@ -60,7 +60,7 @@ public class MongoCommandOpDispenser extends BaseOpDispenser<Op, MongoSpace> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Op apply(long cycle) {
|
||||
public Op getOp(long cycle) {
|
||||
return mongoOpF.apply(cycle);
|
||||
}
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ public class MongoDbUpdateOpDispenser extends BaseOpDispenser<Op, MongoSpace> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Op apply(long value) {
|
||||
public Op getOp(long value) {
|
||||
Op op = opF.apply(value);
|
||||
return op;
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ public class AdminNamespaceOpDispenser extends PulsarAdminOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminNamespaceOp apply(long cycle) {
|
||||
public AdminNamespaceOp getOp(long cycle) {
|
||||
return new AdminNamespaceOp(
|
||||
pulsarAdapterMetrics,
|
||||
pulsarAdmin,
|
||||
|
@ -43,7 +43,7 @@ public class AdminTenantOpDispenser extends PulsarAdminOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminTenantOp apply(long cycle) {
|
||||
public AdminTenantOp getOp(long cycle) {
|
||||
return new AdminTenantOp(
|
||||
pulsarAdapterMetrics,
|
||||
pulsarAdmin,
|
||||
|
@ -44,7 +44,7 @@ public class AdminTopicOpDispenser extends PulsarAdminOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AdminTopicOp apply(long cycle) {
|
||||
public AdminTopicOp getOp(long cycle) {
|
||||
|
||||
return new AdminTopicOp(
|
||||
pulsarAdapterMetrics,
|
||||
|
@ -76,7 +76,7 @@ public class MessageConsumerOpDispenser extends PulsarClientOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumerOp apply(final long cycle) {
|
||||
public MessageConsumerOp getOp(final long cycle) {
|
||||
return new MessageConsumerOp(
|
||||
this.pulsarAdapterMetrics,
|
||||
this.pulsarClient,
|
||||
|
@ -56,7 +56,7 @@ public class MessageProducerOpDispenser extends PulsarClientOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageProducerOp apply(long cycle) {
|
||||
public MessageProducerOp getOp(long cycle) {
|
||||
return new MessageProducerOp(
|
||||
pulsarAdapterMetrics,
|
||||
pulsarClient,
|
||||
|
@ -52,7 +52,7 @@ public class MessageReaderOpDispenser extends PulsarClientOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageReaderOp apply(long cycle) {
|
||||
public MessageReaderOp getOp(long cycle) {
|
||||
|
||||
return new MessageReaderOp(
|
||||
pulsarAdapterMetrics,
|
||||
|
@ -122,7 +122,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageConsumerOp apply(long cycle) {
|
||||
public MessageConsumerOp getOp(long cycle) {
|
||||
S4JJMSContextWrapper s4JJMSContextWrapper = getS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap);
|
||||
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
|
||||
boolean commitTransact = super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
|
||||
|
@ -269,7 +269,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageProducerOp apply(long cycle) {
|
||||
public MessageProducerOp getOp(long cycle) {
|
||||
String destName = destNameStrFunc.apply(cycle);
|
||||
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
|
||||
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
|
||||
|
@ -37,7 +37,7 @@ public class StdoutOpDispenser extends BaseOpDispenser<StdoutOp,StdoutSpace> {
|
||||
}
|
||||
|
||||
@Override
|
||||
public StdoutOp apply(long value) {
|
||||
public StdoutOp getOp(long value) {
|
||||
StdoutSpace ctx = ctxfunc.apply(value);
|
||||
String output = outFunction.apply(value);
|
||||
return new StdoutOp(ctx,output);
|
||||
|
@ -35,7 +35,7 @@ public class TcpClientOpDispenser extends BaseOpDispenser<TcpClientOp, TcpClient
|
||||
}
|
||||
|
||||
@Override
|
||||
public TcpClientOp apply(long value) {
|
||||
public TcpClientOp getOp(long value) {
|
||||
TcpClientAdapterSpace ctx = ctxFunction.apply(value);
|
||||
String output = outFunction.apply(value);
|
||||
return new TcpClientOp(ctx,output);
|
||||
|
@ -35,7 +35,7 @@ public class TcpServerOpDispenser extends BaseOpDispenser<TcpServerOp,TcpServerA
|
||||
}
|
||||
|
||||
@Override
|
||||
public TcpServerOp apply(long value) {
|
||||
public TcpServerOp getOp(long value) {
|
||||
TcpServerAdapterSpace ctx = ctxFunction.apply(value);
|
||||
String output = outFunction.apply(value);
|
||||
return new TcpServerOp(ctx,output);
|
||||
|
@ -226,6 +226,9 @@ public abstract class BaseOpDispenser<T extends Op, S> extends NBBaseComponent i
|
||||
return this.labels;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public final T apply(long value) {
|
||||
T op = getOp(value);
|
||||
return op;
|
||||
}
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ public interface OpDispenser<T> extends LongFunction<T>, OpResultTracker {
|
||||
* @return an executable operation
|
||||
*/
|
||||
|
||||
T apply(long value);
|
||||
T getOp(long value);
|
||||
|
||||
CycleFunction<Boolean> getVerifier();
|
||||
|
||||
|
@ -31,8 +31,8 @@ public class DryRunOpDispenserWrapper extends BaseOpDispenser<Op, Object> {
|
||||
this.realDispenser = realDispenser;
|
||||
}
|
||||
@Override
|
||||
public DryRunOp apply(long cycle) {
|
||||
Op op = realDispenser.apply(cycle);
|
||||
public DryRunOp getOp(long cycle) {
|
||||
Op op = realDispenser.getOp(cycle);
|
||||
return new DryRunOp(op);
|
||||
}
|
||||
}
|
||||
|
@ -32,8 +32,8 @@ public class EmitterOpDispenserWrapper extends BaseOpDispenser<Op, Object> {
|
||||
this.realDispenser = realDispenser;
|
||||
}
|
||||
@Override
|
||||
public EmitterOp apply(long cycle) {
|
||||
CycleOp<?> cycleOp = realDispenser.apply(cycle);
|
||||
public EmitterOp getOp(long cycle) {
|
||||
CycleOp<?> cycleOp = realDispenser.getOp(cycle);
|
||||
return new EmitterOp(cycleOp);
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ import java.util.function.LongFunction;
|
||||
public interface OpSource<T> extends LongFunction<T> {
|
||||
|
||||
static <O extends Op> OpSource<O> of(OpSequence<OpDispenser<? extends O>> seq) {
|
||||
return (long l) -> seq.apply(l).apply(l);
|
||||
return (long l) -> seq.apply(l).getOp(l);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -16,10 +16,8 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.PollingOpDispenserWrapper;
|
||||
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
|
||||
import io.nosqlbench.nb.api.components.core.NBComponent;
|
||||
import io.nosqlbench.nb.api.components.events.ParamChange;
|
||||
|
@ -78,7 +78,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
|
||||
|
||||
try (Timer.Context ct = bindTimer.time()) {
|
||||
dispenser = opsequence.apply(cycle);
|
||||
op = dispenser.apply(cycle);
|
||||
op = dispenser.getOp(cycle);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("while binding request in cycle " + cycle + " for op template named '" + (dispenser!=null?dispenser.getOpName():"NULL")+
|
||||
"': " + e.getMessage(), e);
|
||||
|
Loading…
Reference in New Issue
Block a user