consolidate op mapping to java functional types

This commit is contained in:
Jonathan Shook 2021-02-04 17:46:35 -06:00
parent ff05868fde
commit f8fe99f827
10 changed files with 46 additions and 33 deletions

View File

@ -3,16 +3,17 @@ package io.nosqlbench.activitytype.cmds;
import io.nosqlbench.activitytype.http.async.HttpAsyncAction;
import java.net.http.HttpClient;
import java.util.function.LongFunction;
public class HttpAsyncOp {
public final HttpAsyncAction action;
public final ReadyHttpOp op;
public final LongFunction<HttpOp> op;
public final long cycle;
private final HttpOp httpOp;
private final HttpClient client;
public HttpAsyncOp(HttpAsyncAction action, ReadyHttpOp op, long cycle, HttpClient client) {
public HttpAsyncOp(HttpAsyncAction action, LongFunction<HttpOp> op, long cycle, HttpClient client) {
this.action = action;
this.op = op;
this.cycle = cycle;

View File

@ -2,7 +2,6 @@ package io.nosqlbench.activitytype.http;
import com.codahale.metrics.Timer;
import io.nosqlbench.activitytype.cmds.HttpOp;
import io.nosqlbench.activitytype.cmds.ReadyHttpOp;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
@ -19,6 +18,7 @@ import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
public class HttpAction implements SyncAction {
@ -29,7 +29,7 @@ public class HttpAction implements SyncAction {
private final int slot;
private int maxTries = 1;
private OpSequence<ReadyHttpOp> sequencer;
private OpSequence<LongFunction<HttpOp>> sequencer;
private HttpClient client;
private final HttpResponse.BodyHandler<String> bodyreader = HttpResponse.BodyHandlers.ofString();
@ -61,8 +61,8 @@ public class HttpAction implements SyncAction {
// operation for execution, including data generation as well as
// op construction
try (Timer.Context bindTime = httpActivity.bindTimer.time()) {
ReadyHttpOp readHTTPOperation = sequencer.get(cycle);
httpOp = readHTTPOperation.apply(cycle);
LongFunction<HttpOp> readyOp = sequencer.get(cycle);
httpOp = readyOp.apply(cycle);
} catch (Exception e) {
if (httpActivity.isDiagnosticMode()) {
if (httpOp != null) {
@ -120,6 +120,7 @@ public class HttpAction implements SyncAction {
System.out.println();
}
// TODO: use this as a documented example for how to add error handling to a new activity
if (error == null) {
break; // break out of the tries loop without retrying, because there was no error
} else {

View File

@ -3,6 +3,7 @@ package io.nosqlbench.activitytype.http;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.nosqlbench.activitytype.cmds.HttpOp;
import io.nosqlbench.activitytype.cmds.ReadyHttpOp;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
@ -16,6 +17,7 @@ import org.apache.logging.log4j.Logger;
import java.net.http.HttpClient;
import java.util.function.Function;
import java.util.function.LongFunction;
public class HttpActivity extends SimpleActivity implements Activity, ActivityDefObserver {
private final static Logger logger = LogManager.getLogger(HttpActivity.class);
@ -35,7 +37,7 @@ public class HttpActivity extends SimpleActivity implements Activity, ActivityDe
public Timer resultSuccessTimer;
public Histogram statusCodeHisto;
private OpSequence<ReadyHttpOp> sequencer;
private OpSequence<LongFunction<HttpOp>> sequencer;
private boolean diagnosticsEnabled;
private long timeout = Long.MAX_VALUE;
private NBErrorHandler errorhandler;
@ -58,7 +60,6 @@ public class HttpActivity extends SimpleActivity implements Activity, ActivityDe
skippedTokens = ActivityMetrics.histogram(activityDef, "skipped-tokens");
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
this.sequencer = createOpSequence(ReadyHttpOp::new);
setDefaultsFromOpSequence(sequencer);
onActivityDefUpdate(activityDef);
this.errorhandler = new NBErrorHandler(
@ -128,7 +129,7 @@ public class HttpActivity extends SimpleActivity implements Activity, ActivityDe
return builder.build();
}
public OpSequence<ReadyHttpOp> getSequencer() {
public OpSequence<LongFunction<HttpOp>> getSequencer() {
return sequencer;
}

View File

@ -2,13 +2,12 @@ package io.nosqlbench.activitytype.http.async;
import io.nosqlbench.activitytype.cmds.HttpAsyncOp;
import io.nosqlbench.activitytype.cmds.HttpOp;
import io.nosqlbench.activitytype.cmds.ReadyHttpOp;
import io.nosqlbench.activitytype.http.HttpActivity;
import io.nosqlbench.engine.api.activityapi.core.BaseAsyncAction;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
@ -19,7 +18,7 @@ public class HttpAsyncAction extends BaseAsyncAction<HttpAsyncOp, HttpActivity>
private final static Logger logger = LogManager.getLogger(HttpAsyncAction.class);
private OpSequence<ReadyHttpOp> sequencer;
private OpSequence<LongFunction<HttpOp>> sequencer;
private HttpClient client;
private CompletableFuture<HttpResponse<Void>> future;
@ -46,7 +45,7 @@ public class HttpAsyncAction extends BaseAsyncAction<HttpAsyncOp, HttpActivity>
@Override
public LongFunction<HttpAsyncOp> getOpInitFunction() {
return l -> {
ReadyHttpOp readyHttpOp = sequencer.get(l);
LongFunction<HttpOp> readyHttpOp = sequencer.get(l);
return new HttpAsyncOp(this,readyHttpOp,l,client);
};
}

View File

@ -17,6 +17,7 @@ import org.apache.logging.log4j.Logger;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongFunction;
// This should not be exposed as as service directly unless it can
// be used with a modular JDBC configuration.
@ -30,7 +31,7 @@ public abstract class JDBCActivity extends SimpleActivity {
private SQLExceptionCountMetrics sqlExceptionCount;
protected DataSource dataSource;
protected OpSequence<ReadyJDBCOp> opSequence;
protected OpSequence<LongFunction<String>> opSequence;
public JDBCActivity(ActivityDef activityDef) {
super(activityDef);
@ -90,7 +91,7 @@ public abstract class JDBCActivity extends SimpleActivity {
return dataSource;
}
public OpSequence<ReadyJDBCOp> getOpSequence() {
public OpSequence<LongFunction<String>> getOpSequence() {
return opSequence;
}

View File

@ -7,14 +7,17 @@ import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.*;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
public class JDBCAction implements SyncAction {
private static final Logger LOGGER = LogManager.getLogger(JDBCAction.class);
private final JDBCActivity activity;
private OpSequence<ReadyJDBCOp> sequencer;
private OpSequence<LongFunction<String>> sequencer;
public JDBCAction(JDBCActivity a, int slot) {
activity = a;
@ -29,7 +32,7 @@ public class JDBCAction implements SyncAction {
public int runCycle(long cycle) {
String boundStmt;
ReadyJDBCOp unboundStmt = sequencer.get(cycle);
LongFunction<String> unboundStmt = sequencer.apply(cycle);
try (Timer.Context bindTime = activity.getBindTimer().time()) {
boundStmt = unboundStmt.apply(cycle);

View File

@ -4,8 +4,10 @@ import io.nosqlbench.driver.jmx.ops.JmxOp;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class JMXAction implements SyncAction {
@ -14,7 +16,7 @@ public class JMXAction implements SyncAction {
private final ActivityDef activityDef;
private final int slot;
private final JMXActivity activity;
private OpSequence<ReadyJmxOp> sequencer;
private OpSequence<LongFunction<JmxOp>> sequencer;
public JMXAction(ActivityDef activityDef, int slot, JMXActivity activity) {
this.activityDef = activityDef;
@ -29,8 +31,8 @@ public class JMXAction implements SyncAction {
@Override
public int runCycle(long cycle) {
ReadyJmxOp readyJmxOp = sequencer.get(cycle);
JmxOp jmxOp = readyJmxOp.bind(cycle);
LongFunction<JmxOp> readyJmxOp = sequencer.get(cycle);
JmxOp jmxOp = readyJmxOp.apply(cycle);
jmxOp.execute();
return 0;
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.jmx;
import io.nosqlbench.driver.jmx.ops.JmxOp;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
@ -7,10 +8,11 @@ import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.util.SSLKsFactory;
import javax.net.ssl.SSLContext;
import java.util.function.LongFunction;
public class JMXActivity extends SimpleActivity implements Activity {
private OpSequence<ReadyJmxOp> sequence;
private OpSequence<LongFunction<JmxOp>> sequence;
private SSLContext sslContext;
public JMXActivity(ActivityDef activityDef) {
@ -35,7 +37,7 @@ public class JMXActivity extends SimpleActivity implements Activity {
return sslContext;
}
public OpSequence<ReadyJmxOp> getSequencer() {
public OpSequence<LongFunction<JmxOp>> getSequencer() {
return sequence;
}
}

View File

@ -16,8 +16,9 @@ import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class ReadyJmxOp {
public class ReadyJmxOp implements LongFunction<JmxOp> {
private final CommandTemplate command;
@ -25,7 +26,7 @@ public class ReadyJmxOp {
this.command = command;
}
public JmxOp bind(long value) {
public JmxOp apply(long value) {
Map<String, String> cmdmap = command.getCommand(value);
JMXConnector connector = bindConnector(cmdmap);
@ -97,4 +98,5 @@ public class ReadyJmxOp {
}
return url;
}
}

View File

@ -17,7 +17,7 @@ import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.input.ProgressCapable;
import io.nosqlbench.engine.api.metrics.*;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import io.nosqlbench.engine.api.templating.StrInterpolator;
import io.nosqlbench.nb.api.errors.BasicError;
@ -31,6 +31,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.Supplier;
/**
@ -408,9 +409,9 @@ public class SimpleActivity implements Activity, ProgressCapable {
* @param <O>
* @return
*/
protected <O> OpSequence<O> createOpSequenceFromCommands(Function<CommandTemplate, O> opinit) {
protected <O> OpSequence<LongFunction<O>> createOpSequenceFromCommands(Function<CommandTemplate, LongFunction<O>> opinit) {
Function<OpTemplate, CommandTemplate> f = CommandTemplate::new;
Function<OpTemplate, O> opTemplateOFunction = f.andThen(opinit);
Function<OpTemplate, LongFunction<O>> opTemplateOFunction = f.andThen(opinit);
return createOpSequence(opTemplateOFunction);
}
@ -437,14 +438,14 @@ public class SimpleActivity implements Activity, ProgressCapable {
* @param <O> A holder for an executable operation for the native driver used by this activity.
* @return The sequence of operations as determined by filtering and ratios
*/
protected <O> OpSequence<O> createOpSequence(Function<OpTemplate, O> opinit) {
protected <O> OpSequence<LongFunction<O>> createOpSequence(Function<OpTemplate, LongFunction<O>> opinit) {
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
StrInterpolator interp = new StrInterpolator(activityDef);
SequencerType sequencerType = getParams()
.getOptionalString("seq")
.map(SequencerType::valueOf)
.orElse(SequencerType.bucket);
SequencePlanner<O> planner = new SequencePlanner<>(sequencerType);
SequencePlanner<LongFunction<O>> planner = new SequencePlanner<>(sequencerType);
StmtsDocList stmtsDocList = null;
@ -470,8 +471,8 @@ public class SimpleActivity implements Activity, ProgressCapable {
for (int i = 0; i < stmts.size(); i++) {
long ratio = ratios.get(i);
OpTemplate optemplate = stmts.get(i);
O driverSpecificOp = opinit.apply(optemplate);
planner.addOp(driverSpecificOp, ratio);
LongFunction<O> driverSpecificReadyOp = opinit.apply(optemplate);
planner.addOp(driverSpecificReadyOp, ratio);
}
return planner.resolve();