mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-12-28 01:31:05 -06:00
allow op type specialization/covariance
This commit is contained in:
parent
0cea93e77b
commit
1a0b079ad7
@ -53,7 +53,7 @@ import java.util.function.Function;
|
||||
* rules provided to the user. Conversely, the driver maintainer should take care to provide
|
||||
* rules of construction and examples in the documentation.
|
||||
* Each {@link io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter} has a unique
|
||||
* name, just as with {@link io.nosqlbench.engine.api.activityapi.core.ActivityType}s. The documentation
|
||||
* name. The documentation
|
||||
* for each of these should be kept in the bundled resources in a top-level markdown file that
|
||||
* matches the driver name.
|
||||
* </p>
|
||||
@ -62,7 +62,7 @@ import java.util.function.Function;
|
||||
* to hold all the details for executing an operation,
|
||||
* generally something that implements {@link Runnable}.
|
||||
*/
|
||||
public interface OpMapper<T extends Op> extends Function<ParsedOp, OpDispenser<T>> {
|
||||
public interface OpMapper<T extends Op> extends Function<ParsedOp, OpDispenser<? extends T>> {
|
||||
|
||||
/**
|
||||
* Interrogate the parsed command, and provide a new
|
||||
@ -73,5 +73,5 @@ public interface OpMapper<T extends Op> extends Function<ParsedOp, OpDispenser<T
|
||||
* @return An OpDispenser which can be used to synthesize real operations.
|
||||
*/
|
||||
@Override
|
||||
OpDispenser<T> apply(ParsedOp cmd);
|
||||
OpDispenser<? extends T> apply(ParsedOp cmd);
|
||||
}
|
||||
|
@ -7,13 +7,13 @@ import java.util.function.LongFunction;
|
||||
|
||||
public class HttpAsyncOp {
|
||||
public final HttpAsyncAction action;
|
||||
public final LongFunction<HttpOp> op;
|
||||
public final LongFunction<? extends HttpOp> op;
|
||||
public final long cycle;
|
||||
|
||||
private final HttpOp httpOp;
|
||||
private final HttpClient client;
|
||||
|
||||
public HttpAsyncOp(HttpAsyncAction action, LongFunction<HttpOp> op, long cycle, HttpClient client) {
|
||||
public HttpAsyncOp(HttpAsyncAction action, LongFunction<? extends HttpOp> op, long cycle, HttpClient client) {
|
||||
this.action = action;
|
||||
this.op = op;
|
||||
this.cycle = cycle;
|
||||
|
@ -30,7 +30,7 @@ public class HttpAction implements SyncAction {
|
||||
private final int slot;
|
||||
private int maxTries = 1;
|
||||
|
||||
private OpSequence<OpDispenser<HttpOp>> sequencer;
|
||||
private OpSequence<OpDispenser<? extends HttpOp>> sequencer;
|
||||
private HttpClient client;
|
||||
|
||||
private final HttpResponse.BodyHandler<String> bodyreader = HttpResponse.BodyHandlers.ofString();
|
||||
@ -62,7 +62,7 @@ public class HttpAction implements SyncAction {
|
||||
// operation for execution, including data generation as well as
|
||||
// op construction
|
||||
try (Timer.Context bindTime = httpActivity.bindTimer.time()) {
|
||||
LongFunction<HttpOp> readyOp = sequencer.apply(cycle);
|
||||
LongFunction<? extends HttpOp> readyOp = sequencer.apply(cycle);
|
||||
httpOp = readyOp.apply(cycle);
|
||||
} catch (Exception e) {
|
||||
if (httpActivity.isDiagnosticMode()) {
|
||||
|
@ -37,7 +37,7 @@ public class HttpActivity extends SimpleActivity implements Activity, ActivityDe
|
||||
public Timer resultSuccessTimer;
|
||||
public Histogram statusCodeHisto;
|
||||
|
||||
private OpSequence<OpDispenser<HttpOp>> sequencer;
|
||||
private OpSequence<OpDispenser<? extends HttpOp>> sequencer;
|
||||
private boolean diagnosticsEnabled;
|
||||
private long timeout = Long.MAX_VALUE;
|
||||
private NBErrorHandler errorhandler;
|
||||
@ -121,7 +121,7 @@ public class HttpActivity extends SimpleActivity implements Activity, ActivityDe
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
public OpSequence<OpDispenser<HttpOp>> getSequencer() {
|
||||
public OpSequence<OpDispenser<? extends HttpOp>> getSequencer() {
|
||||
return sequencer;
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ public class HttpAsyncAction extends BaseAsyncAction<HttpAsyncOp, HttpActivity>
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(HttpAsyncAction.class);
|
||||
|
||||
private OpSequence<OpDispenser<HttpOp>> sequencer;
|
||||
private OpSequence<OpDispenser<? extends HttpOp>> sequencer;
|
||||
private HttpClient client;
|
||||
|
||||
private CompletableFuture<HttpResponse<Void>> future;
|
||||
@ -46,7 +46,7 @@ public class HttpAsyncAction extends BaseAsyncAction<HttpAsyncOp, HttpActivity>
|
||||
@Override
|
||||
public LongFunction<HttpAsyncOp> getOpInitFunction() {
|
||||
return l -> {
|
||||
LongFunction<HttpOp> readyHttpOp = sequencer.apply(l);
|
||||
LongFunction<? extends HttpOp> readyHttpOp = sequencer.apply(l);
|
||||
return new HttpAsyncOp(this,readyHttpOp,l,client);
|
||||
};
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
private int minRetryDelayMs;
|
||||
|
||||
protected DataSource dataSource;
|
||||
protected OpSequence<OpDispenser<String>> opSequence;
|
||||
protected OpSequence<OpDispenser<? extends String>> opSequence;
|
||||
|
||||
public JDBCActivity(ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
@ -107,7 +107,7 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
return dataSource;
|
||||
}
|
||||
|
||||
public OpSequence<OpDispenser<String>> getOpSequence() {
|
||||
public OpSequence<OpDispenser<? extends String>> getOpSequence() {
|
||||
return opSequence;
|
||||
}
|
||||
|
||||
|
@ -18,7 +18,7 @@ public class JDBCAction implements SyncAction {
|
||||
private static final Logger LOGGER = LogManager.getLogger(JDBCAction.class);
|
||||
|
||||
private final JDBCActivity activity;
|
||||
private OpSequence<OpDispenser<String>> sequencer;
|
||||
private OpSequence<OpDispenser<? extends String>> sequencer;
|
||||
|
||||
public JDBCAction(JDBCActivity a, int slot) {
|
||||
activity = a;
|
||||
@ -33,7 +33,7 @@ public class JDBCAction implements SyncAction {
|
||||
public int runCycle(long cycle) {
|
||||
String boundStmt;
|
||||
|
||||
LongFunction<String> unboundStmt = sequencer.apply(cycle);
|
||||
LongFunction<? extends String> unboundStmt = sequencer.apply(cycle);
|
||||
|
||||
try (Timer.Context bindTime = activity.getBindTimer().time()) {
|
||||
boundStmt = unboundStmt.apply(cycle);
|
||||
|
@ -36,7 +36,7 @@ public class JmsAction implements SyncAction {
|
||||
|
||||
JmsOp jmsOp;
|
||||
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
||||
LongFunction<JmsOp> readyJmsOp = activity.getSequencer().apply(cycle);
|
||||
LongFunction<? extends JmsOp> readyJmsOp = activity.getSequencer().apply(cycle);
|
||||
jmsOp = readyJmsOp.apply(cycle);
|
||||
} catch (Exception bindException) {
|
||||
// if diagnostic mode ...
|
||||
|
@ -32,7 +32,7 @@ public class JmsActivity extends SimpleActivity {
|
||||
|
||||
private JMSContext jmsContext;
|
||||
|
||||
private OpSequence<OpDispenser<JmsOp>> sequence;
|
||||
private OpSequence<OpDispenser<? extends JmsOp>> sequence;
|
||||
private volatile Throwable asyncOperationFailure;
|
||||
private NBErrorHandler errorhandler;
|
||||
|
||||
@ -144,7 +144,7 @@ public class JmsActivity extends SimpleActivity {
|
||||
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
|
||||
public OpSequence<OpDispenser<JmsOp>> getSequencer() { return sequence; }
|
||||
public OpSequence<OpDispenser<? extends JmsOp>> getSequencer() { return sequence; }
|
||||
|
||||
public String getJmsProviderType() { return jmsProviderType; }
|
||||
public JmsConnInfo getJmsConnInfo() { return jmsConnInfo; }
|
||||
|
@ -17,7 +17,7 @@ public class JMXAction implements SyncAction {
|
||||
private final ActivityDef activityDef;
|
||||
private final int slot;
|
||||
private final JMXActivity activity;
|
||||
private OpSequence<OpDispenser<JmxOp>> sequencer;
|
||||
private OpSequence<OpDispenser<? extends JmxOp>> sequencer;
|
||||
|
||||
public JMXAction(ActivityDef activityDef, int slot, JMXActivity activity) {
|
||||
this.activityDef = activityDef;
|
||||
@ -32,7 +32,7 @@ public class JMXAction implements SyncAction {
|
||||
|
||||
@Override
|
||||
public int runCycle(long cycle) {
|
||||
LongFunction<JmxOp> readyJmxOp = sequencer.apply(cycle);
|
||||
LongFunction<? extends JmxOp> readyJmxOp = sequencer.apply(cycle);
|
||||
JmxOp jmxOp = readyJmxOp.apply(cycle);
|
||||
jmxOp.execute();
|
||||
return 0;
|
||||
|
@ -13,7 +13,7 @@ import javax.net.ssl.SSLContext;
|
||||
|
||||
public class JMXActivity extends SimpleActivity implements Activity {
|
||||
|
||||
private OpSequence<OpDispenser<JmxOp>> sequence;
|
||||
private OpSequence<OpDispenser<? extends JmxOp>> sequence;
|
||||
private SSLContext sslContext;
|
||||
|
||||
public JMXActivity(ActivityDef activityDef) {
|
||||
@ -39,7 +39,7 @@ public class JMXActivity extends SimpleActivity implements Activity {
|
||||
return sslContext;
|
||||
}
|
||||
|
||||
public OpSequence<OpDispenser<JmxOp>> getSequencer() {
|
||||
public OpSequence<OpDispenser<? extends JmxOp>> getSequencer() {
|
||||
return sequence;
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ public class PulsarAction implements SyncAction {
|
||||
|
||||
PulsarOp pulsarOp;
|
||||
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
||||
LongFunction<PulsarOp> readyPulsarOp = activity.getSequencer().apply(cycle);
|
||||
LongFunction<? extends PulsarOp> readyPulsarOp = activity.getSequencer().apply(cycle);
|
||||
pulsarOp = readyPulsarOp.apply(cycle);
|
||||
} catch (Exception bindException) {
|
||||
// if diagnostic mode ...
|
||||
|
@ -66,7 +66,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
private Schema<?> pulsarSchema;
|
||||
|
||||
private NBErrorHandler errorHandler;
|
||||
private OpSequence<OpDispenser<PulsarOp>> sequencer;
|
||||
private OpSequence<OpDispenser<? extends PulsarOp>> sequencer;
|
||||
private volatile Throwable asyncOperationFailure;
|
||||
private boolean cycleratePerThread;
|
||||
|
||||
@ -151,7 +151,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
|
||||
public NBErrorHandler getErrorHandler() { return errorHandler; }
|
||||
|
||||
public OpSequence<OpDispenser<PulsarOp>> getSequencer() { return sequencer; }
|
||||
public OpSequence<OpDispenser<? extends PulsarOp>> getSequencer() { return sequencer; }
|
||||
|
||||
public void failOnAsyncOperationFailure() {
|
||||
if (asyncOperationFailure != null) {
|
||||
|
@ -13,7 +13,7 @@ import java.util.function.LongFunction;
|
||||
*/
|
||||
public interface OpSource<T> extends LongFunction<T> {
|
||||
|
||||
static <O extends Op> OpSource<O> of(OpSequence<OpDispenser<O>> seq) {
|
||||
static <O extends Op> OpSource<O> of(OpSequence<OpDispenser<? extends O>> seq) {
|
||||
return (long l) -> seq.apply(l).apply(l);
|
||||
}
|
||||
|
||||
|
@ -419,20 +419,20 @@ public class SimpleActivity implements Activity, ProgressCapable {
|
||||
* @param <O>
|
||||
* @return
|
||||
*/
|
||||
protected <O extends Op> OpSequence<OpDispenser<O>> createOpSequenceFromCommands(Function<CommandTemplate, OpDispenser<O>> opinit) {
|
||||
protected <O extends Op> OpSequence<OpDispenser<? extends O>> createOpSequenceFromCommands(Function<CommandTemplate, OpDispenser<O>> opinit) {
|
||||
Function<OpTemplate, CommandTemplate> f = CommandTemplate::new;
|
||||
Function<OpTemplate, OpDispenser<O>> opTemplateOFunction = f.andThen(opinit);
|
||||
Function<OpTemplate, OpDispenser<? extends O>> opTemplateOFunction = f.andThen(opinit);
|
||||
|
||||
return createOpSequence(opTemplateOFunction);
|
||||
}
|
||||
|
||||
protected <O extends Op> OpSequence<OpDispenser<O>> createOpSourceFromCommands(
|
||||
Function<ParsedOp, OpDispenser<O>> opinit,
|
||||
protected <O extends Op> OpSequence<OpDispenser<? extends O>> createOpSourceFromCommands(
|
||||
Function<ParsedOp, OpDispenser<? extends O>> opinit,
|
||||
NBConfiguration cfg,
|
||||
List<Function<Map<String, Object>, Map<String, Object>>> parsers
|
||||
) {
|
||||
Function<OpTemplate, ParsedOp> f = t -> new ParsedOp(t, cfg, parsers);
|
||||
Function<OpTemplate, OpDispenser<O>> opTemplateOFunction = f.andThen(opinit);
|
||||
Function<OpTemplate, OpDispenser<? extends O>> opTemplateOFunction = f.andThen(opinit);
|
||||
return createOpSequence(opTemplateOFunction);
|
||||
}
|
||||
|
||||
@ -459,14 +459,14 @@ public class SimpleActivity implements Activity, ProgressCapable {
|
||||
* @return The sequence of operations as determined by filtering and ratios
|
||||
*/
|
||||
@Deprecated(forRemoval = true)
|
||||
protected <O> OpSequence<OpDispenser<O>> createOpSequence(Function<OpTemplate, OpDispenser<O>> opinit) {
|
||||
protected <O> OpSequence<OpDispenser<? extends O>> createOpSequence(Function<OpTemplate, OpDispenser<? extends O>> opinit) {
|
||||
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
|
||||
StrInterpolator interp = new StrInterpolator(activityDef);
|
||||
// StrInterpolator interp = new StrInterpolator(activityDef);
|
||||
SequencerType sequencerType = getParams()
|
||||
.getOptionalString("seq")
|
||||
.map(SequencerType::valueOf)
|
||||
.orElse(SequencerType.bucket);
|
||||
SequencePlanner<OpDispenser<O>> planner = new SequencePlanner<>(sequencerType);
|
||||
SequencePlanner<OpDispenser<? extends O>> planner = new SequencePlanner<>(sequencerType);
|
||||
|
||||
StmtsDocList stmtsDocList = null;
|
||||
|
||||
@ -475,10 +475,10 @@ public class SimpleActivity implements Activity, ProgressCapable {
|
||||
Optional<String> stmt = activityDef.getParams().getOptionalString("op", "stmt", "statement");
|
||||
Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
|
||||
if (stmt.isPresent()) {
|
||||
stmtsDocList = StatementsLoader.loadStmt(logger, stmt.get(), interp);
|
||||
stmtsDocList = StatementsLoader.loadStmt(logger, stmt.get(), activityDef.getParams());
|
||||
workloadSource = "commandline:" + stmt.get();
|
||||
} else if (op_yaml_loc.isPresent()) {
|
||||
stmtsDocList = StatementsLoader.loadPath(logger, op_yaml_loc.get(), interp, "activities");
|
||||
stmtsDocList = StatementsLoader.loadPath(logger, op_yaml_loc.get(), activityDef.getParams(), "activities");
|
||||
workloadSource = "yaml:" + op_yaml_loc.get();
|
||||
}
|
||||
|
||||
@ -500,7 +500,7 @@ public class SimpleActivity implements Activity, ProgressCapable {
|
||||
for (int i = 0; i < stmts.size(); i++) {
|
||||
long ratio = ratios.get(i);
|
||||
OpTemplate optemplate = stmts.get(i);
|
||||
OpDispenser<O> driverSpecificReadyOp = opinit.apply(optemplate);
|
||||
OpDispenser<? extends O> driverSpecificReadyOp = opinit.apply(optemplate);
|
||||
planner.addOp(driverSpecificReadyOp, ratio);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -5,9 +5,9 @@ import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSource;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpMapper;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
|
||||
import java.util.List;
|
||||
@ -27,14 +27,15 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity {
|
||||
private final DriverAdapter<R, S> adapter;
|
||||
private final OpSource<R> opsource;
|
||||
private NBErrorHandler errorHandler;
|
||||
private final OpSequence<OpDispenser<R>> sequence;
|
||||
private final OpSequence<OpDispenser<? extends R>> sequence;
|
||||
|
||||
public StandardActivity(DriverAdapter<R, S> adapter, ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
this.adapter = adapter;
|
||||
|
||||
try {
|
||||
Function<ParsedOp, OpDispenser<R>> opmapper = adapter.getOpMapper();
|
||||
// Function<ParsedOp, OpDispenser<R>> opmapper;
|
||||
OpMapper<R> opmapper = adapter.getOpMapper();
|
||||
Function<Map<String, Object>, Map<String, Object>> preprocessor = adapter.getPreprocessor();
|
||||
sequence = createOpSourceFromCommands(opmapper, adapter.getConfiguration(), List.of(preprocessor));
|
||||
opsource = OpSource.of(sequence);
|
||||
@ -53,7 +54,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity {
|
||||
setDefaultsFromOpSequence(sequence);
|
||||
}
|
||||
|
||||
public OpSequence<OpDispenser<R>> getOpSequence() {
|
||||
public OpSequence<OpDispenser<? extends R>> getOpSequence() {
|
||||
return sequence;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user