diff --git a/driver-dsegraph-shaded/pom.xml b/driver-dsegraph-shaded/pom.xml
new file mode 100644
index 000000000..77bdc0b0e
--- /dev/null
+++ b/driver-dsegraph-shaded/pom.xml
@@ -0,0 +1,266 @@
+
+ 4.0.0
+
+
+ io.nosqlbench
+ mvn-defaults
+ 3.12.155-SNAPSHOT
+ ../mvn-defaults
+
+
+ driver-dsegraph-shaded
+ jar
+ ${project.artifactId}
+
+
+ A DSE Graph ActivityType driver for nosqlbench, based on http://nosqlbench.io/
+
+
+
+
+
+
+
+ io.nosqlbench
+ engine-api
+ 3.12.155-SNAPSHOT
+
+
+
+ io.nosqlbench
+ drivers-api
+ 3.12.155-SNAPSHOT
+
+
+
+ com.datastax.dse
+ dse-java-driver-graph
+ 1.9.0
+
+
+
+ com.datastax.dse
+ dse-java-driver-core
+ 1.9.0
+
+
+
+ com.datastax.dse
+ dse-java-driver-extras
+ 1.9.0
+
+
+ com.datastax.dse
+ dse-java-driver-mapping
+ 1.9.0
+
+
+
+
+ org.lz4
+ lz4-java
+
+
+
+
+ org.xerial.snappy
+ snappy-java
+
+
+
+ org.antlr
+ antlr4-runtime
+ 4.8
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ io.dropwizard.metrics
+ metrics-core
+ 3.2.2
+
+
+
+
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.25
+
+
+
+
+
+ org.testng
+ testng
+ 6.13.1
+ test
+
+
+
+ org.assertj
+ assertj-core-java8
+ 1.0.0m1
+ test
+
+
+
+
+
+ org.yaml
+ snakeyaml
+ 1.23
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ org.antlr
+ antlr4-maven-plugin
+ 4.8
+
+ src/main/grammars/cql3
+
+
+ -package
+ io.nosqlbench.generators.cql.generated
+
+
+
+ src/main/java/io/nosqlbench/generators/cql/generated
+
+
+
+
+ antlr
+
+ antlr4
+
+ generate-sources
+
+
+
+
+
+ maven-shade-plugin
+ 3.2.3
+
+
+ package
+
+ shade
+
+
+
+
+ false
+ true
+ true
+
+
+
+
+ com.google.common
+ com.datastax.internal.com_google_common
+
+
+
+
+
+
+
+
+
+
+
+
+ *:*
+
+
+
+
+ io.nosqlbench.engine.cli.NBCLI
+
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphAction.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphAction.java
new file mode 100644
index 000000000..d76f59275
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphAction.java
@@ -0,0 +1,123 @@
+package com.datastax.ebdrivers.dsegraph;
+
+import com.codahale.metrics.Timer;
+import com.datastax.driver.dse.graph.GraphResultSet;
+import com.datastax.driver.dse.graph.SimpleGraphStatement;
+import com.datastax.ebdrivers.dsegraph.errorhandling.ErrorResponse;
+import com.datastax.ebdrivers.dsegraph.errorhandling.GraphErrorHandler;
+import com.datastax.ebdrivers.dsegraph.statements.BindableGraphStatement;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
+import io.nosqlbench.engine.api.activityapi.core.SyncAction;
+import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+@SuppressWarnings("Duplicates")
+public class GraphAction implements SyncAction, ActivityDefObserver {
+
+ private static final Logger logger = LoggerFactory.getLogger(GraphAction.class);
+ List readyGraphStmts;
+ private int slot;
+ private GraphActivity activity;
+ private int maxTries = 10;
+ private boolean showstmts;
+ private GraphErrorHandler graphErrorHandler;
+ private ErrorResponse retryableResponse;
+ private ErrorResponse realErrorResponse;
+ private OpSequence opSequencer;
+
+ public GraphAction(int slot, GraphActivity activity) {
+ this.slot = slot;
+ this.activity = activity;
+ }
+
+ @Override
+ public void init() {
+ onActivityDefUpdate(activity.getActivityDef());
+ }
+
+ @Override
+ public int runCycle(long cycleValue) {
+
+ int tries = 0;
+ BindableGraphStatement readyGraphStatement;
+ SimpleGraphStatement simpleGraphStatement;
+ ListenableFuture resultSetFuture;
+
+ try (Timer.Context graphOpTime = activity.logicalGraphOps.time()) {
+
+ try (Timer.Context bindTime = activity.bindTimer.time()) {
+
+ BindableGraphStatement bindableGraphStatement = opSequencer.get(cycleValue);
+ simpleGraphStatement = bindableGraphStatement.bind(cycleValue);
+
+ if (showstmts) {
+ logger.info("GRAPH QUERY(cycle=" + cycleValue + "):\n" + simpleGraphStatement.getQueryString());
+ }
+ }
+
+ while (tries < maxTries) {
+ tries++;
+
+ try (Timer.Context executeTime = activity.executeTimer.time()) {
+ resultSetFuture = activity.getSession().executeGraphAsync(simpleGraphStatement);
+ }
+
+ try (Timer.Context resultTime = activity.resultTimer.time()) {
+
+ GraphResultSet resultSet = resultSetFuture.get();
+ break; // This is normal termination of this loop, when retries aren't needed
+ } catch (Exception e) {
+ if (!graphErrorHandler.HandleError(e, simpleGraphStatement, cycleValue)) {
+ e.printStackTrace();
+ logger.error(e.toString(),e);
+ break;
+ }
+ }
+ }
+ }
+
+ activity.triesHisto.update(tries);
+ return 0;
+
+// ReadyGraphStmt = activity.get
+ }
+
+ @Override
+ public void onActivityDefUpdate(ActivityDef activityDef) {
+
+ this.maxTries = activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
+ this.showstmts = activityDef.getParams().getOptionalBoolean("showcql").orElse(false);
+
+ boolean diagnose = activityDef.getParams().getOptionalBoolean("diagnose").orElse(false);
+
+ if (diagnose) {
+ logger.warn("You are wiring all error handlers to stop for any exception." +
+ " This is useful for setup and troubleshooting, but unlikely to" +
+ " be useful for long-term or bulk testing, as retryable errors" +
+ " are normal in a busy system.");
+ this.realErrorResponse = this.retryableResponse = ErrorResponse.stop;
+ } else {
+ String realErrorsSpec = activityDef.getParams()
+ .getOptionalString("realerrors").orElse(ErrorResponse.stop.toString());
+ this.realErrorResponse = ErrorResponse.valueOf(realErrorsSpec);
+
+ String retryableSpec = activityDef.getParams()
+ .getOptionalString("retryable").orElse(ErrorResponse.retry.toString());
+
+ this.retryableResponse = ErrorResponse.valueOf(retryableSpec);
+ }
+ graphErrorHandler = new GraphErrorHandler(
+ realErrorResponse,
+ retryableResponse,
+ activity.getExceptionCountMetrics());
+
+ this.opSequencer = activity.getOpSequence();
+
+ }
+
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActionDispenser.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActionDispenser.java
new file mode 100644
index 000000000..0e461605b
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActionDispenser.java
@@ -0,0 +1,18 @@
+package com.datastax.ebdrivers.dsegraph;
+
+import io.nosqlbench.engine.api.activityapi.core.Action;
+import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
+
+public class GraphActionDispenser implements ActionDispenser {
+
+ private GraphActivity activity;
+
+ public GraphActionDispenser(GraphActivity activity) {
+ this.activity = activity;
+ }
+
+ @Override
+ public Action getAction(int slot) {
+ return new GraphAction(slot, activity);
+ }
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActivity.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActivity.java
new file mode 100644
index 000000000..f5c042272
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActivity.java
@@ -0,0 +1,291 @@
+package com.datastax.ebdrivers.dsegraph;
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.dse.DseCluster;
+import com.datastax.driver.dse.DseSession;
+import com.datastax.driver.dse.graph.GraphOptions;
+import com.datastax.driver.dse.graph.GraphProtocol;
+import com.datastax.ebdrivers.dsegraph.statements.BindableGraphStatement;
+import com.datastax.ebdrivers.dsegraph.statements.ReadyGraphStatementTemplate;
+import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
+import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
+import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
+import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
+import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
+import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
+import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
+import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
+import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.engine.api.activityimpl.ParameterMap;
+import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
+import io.nosqlbench.engine.api.metrics.ActivityMetrics;
+import io.nosqlbench.engine.api.metrics.ExceptionMeterMetrics;
+import io.nosqlbench.engine.api.scripting.NashornEvaluator;
+import io.nosqlbench.engine.api.templating.StrInterpolator;
+import io.nosqlbench.engine.api.util.TagFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+@SuppressWarnings("Duplicates")
+public class GraphActivity extends SimpleActivity implements ActivityDefObserver {
+ private final static Logger logger = LoggerFactory.getLogger(GraphActivity.class);
+
+ public Timer bindTimer;
+ public Timer executeTimer;
+ public Timer resultTimer;
+ public Timer logicalGraphOps;
+ public Histogram triesHisto;
+ protected List stmts;
+ private int stride;
+ private DseSession session;
+ private DseCluster cluster;
+ private ExceptionMeterMetrics exceptionMeterMetrics;
+ private OpSequence opsequence;
+
+ public GraphActivity(ActivityDef activityDef) {
+ super(activityDef);
+ StrInterpolator interp = new StrInterpolator(activityDef);
+ String yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload").orElse("default");
+ }
+
+ @Override
+ public void initActivity() {
+ logger.debug("initializing activity: " + this.activityDef.getAlias());
+ exceptionMeterMetrics = new ExceptionMeterMetrics(activityDef);
+
+ stride = activityDef.getParams().getOptionalInteger("stride").orElse(1);
+ cluster = createCluster();
+ session = createSession();
+
+ bindTimer = ActivityMetrics.timer(activityDef, "bind");
+ executeTimer = ActivityMetrics.timer(activityDef, "execute");
+ resultTimer = ActivityMetrics.timer(activityDef, "result");
+ triesHisto = ActivityMetrics.histogram(activityDef, "tries");
+ logicalGraphOps = ActivityMetrics.timer(activityDef, "graphops");
+
+ this.opsequence = initSequencer();
+ setDefaultsFromOpSequence(this.opsequence);
+
+ onActivityDefUpdate(activityDef);
+ }
+
+ private OpSequence initSequencer() {
+ SequencerType sequencerType = SequencerType.valueOf(
+ getParams().getOptionalString("seq").orElse("bucket")
+ );
+ SequencePlanner planner = new SequencePlanner<>(sequencerType);
+
+ String yaml_loc = activityDef.getParams().getOptionalString("yaml","workload").orElse("default");
+ StrInterpolator interp = new StrInterpolator(activityDef);
+ StmtsDocList unfiltered = StatementsLoader.loadPath(logger, yaml_loc, interp, "activities");
+
+ // log tag filtering results
+ String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
+ TagFilter tagFilter = new TagFilter(tagfilter);
+ unfiltered.getStmts().stream().map(tagFilter::matchesTaggedResult).forEach(r -> logger.info(r.getLog()));
+
+ stmts = unfiltered.getStmts(tagfilter);
+
+ if (stmts.size() == 0) {
+ throw new RuntimeException("There were no unfiltered statements found for this activity.");
+ }
+
+ for (OpTemplate stmtDef : stmts) {
+
+ ParsedStmt parsed = stmtDef.getParsed().orError();
+
+ ReadyGraphStatementTemplate readyGraphStatement;
+ long ratio = Long.valueOf(stmtDef.getParams().getOrDefault("ratio", "1").toString());
+ Optional repeat = Optional.ofNullable(stmtDef.getParams().get("repeat"))
+ .map(String::valueOf)
+ .map(Integer::valueOf);
+ if (repeat.isPresent()) {
+ readyGraphStatement = new ReadyGraphStatementTemplate(
+ stmtDef.getName(),
+ GraphStmtParser.getCookedRepeatedStatement(stmtDef.getStmt(), repeat.get()),
+ stmtDef.getParsed().getBindPoints(),
+ GraphStmtParser.getFields(stmtDef.getStmt(), stmtDef.getBindings()).toArray(new String[0]),
+ repeat.get());
+ } else {
+ readyGraphStatement = new ReadyGraphStatementTemplate(
+ stmtDef.getName(),
+ GraphStmtParser.getCookedStatement(stmtDef.getStmt()),
+ stmtDef.getParsed().getBindPoints(),
+ GraphStmtParser.getFields(stmtDef.getStmt(), stmtDef.getBindings()).toArray(new String[0]));
+ }
+ planner.addOp(readyGraphStatement, ratio);
+ }
+
+ if (getActivityDef().getCycleCount() == 0) {
+ getActivityDef().setCycles(String.valueOf(stmts.size()));
+ }
+
+ OpSequence sequencer = planner.resolve();
+ return sequencer;
+ }
+
+ public DseSession getSession() {
+ return session;
+ }
+
+ private DseCluster createCluster() {
+
+ String host = activityDef.getParams().getOptionalString("host").orElse("localhost");
+ int port = activityDef.getParams().getOptionalInteger("port").orElse(9042);
+
+ DseCluster.Builder builder = DseCluster.builder()
+ .withPort(port)
+ .withCompression(ProtocolOptions.Compression.NONE);
+
+ DseCluster.Builder finalBuilder = builder;
+ List hosts = activityDef.getParams().getOptionalString("host", "hosts")
+ .map(s -> Arrays.asList(s.split(",")))
+ .orElse(List.of("localhost"));
+
+ for (String h : hosts) {
+ logger.debug("adding host as contact point: " + h);
+ builder.addContactPoint(h);
+ }
+
+ Optional usernameOpt = activityDef.getParams().getOptionalString("username");
+ Optional passwordOpt = activityDef.getParams().getOptionalString("password");
+ Optional passfileOpt = activityDef.getParams().getOptionalString("passfile");
+
+ if (usernameOpt.isPresent()) {
+ String username = usernameOpt.get();
+ String password;
+ if (passwordOpt.isPresent()) {
+ password = passwordOpt.get();
+ } else if (passfileOpt.isPresent()) {
+ Path path = Paths.get(passfileOpt.get());
+ try {
+ password = Files.readAllLines(path).get(0);
+ } catch (IOException e) {
+ String error = "Error while reading password from file:" + passfileOpt;
+ logger.error(error, e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ String error = "username is present, but neither password nor passfile are defined.";
+ logger.error(error);
+ throw new RuntimeException(error);
+ }
+ builder.withCredentials(username, password);
+ }
+
+ Optional clusteropts = activityDef.getParams().getOptionalString("cbopts");
+ if (clusteropts.isPresent()) {
+ try {
+ logger.info("applying cbopts:" + clusteropts.get());
+ NashornEvaluator clusterEval = new NashornEvaluator<>(DseCluster.Builder.class);
+ clusterEval.put("builder", builder);
+ String importEnv =
+ "load(\"nashorn:mozilla_compat.js\");\n" +
+ " importPackage(com.google.common.collect.Lists);\n" +
+ " importPackage(com.google.common.collect.Maps);\n" +
+ " importPackage(com.datastax.driver);\n" +
+ " importPackage(com.datastax.driver.core);\n" +
+ " importPackage(com.datastax.driver.core.policies);\n" +
+ "builder" + clusteropts.get() + "\n";
+ clusterEval.script(importEnv);
+ builder = clusterEval.eval();
+ logger.info("successfully applied:" + clusteropts.get());
+ } catch (Exception e) {
+ logger.error("Unable to evaluate: " + clusteropts.get() + " in script context:" + e.getMessage());
+ throw e;
+ }
+ }
+
+ try {
+ cluster = builder.build();
+ } catch (Exception e) {
+ logger.error("Error while instantiating cluster from builder: " + e.toString(), e);
+ throw e;
+ }
+ activityDef.getParams().getOptionalBoolean("defaultidempotence").map(
+ b -> cluster.getConfiguration().getQueryOptions().setDefaultIdempotence(b)
+ );
+
+ String graphson_version = activityDef.getParams().getOptionalString("graphson").orElse("2");
+ switch (Integer.valueOf(graphson_version)) {
+ case 1:
+ cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_1_0);
+ break;
+ case 2:
+ cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_2_0);
+ break;
+ }
+ cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_2_0);
+ return cluster;
+ }
+
+ private DseSession createSession() {
+
+ try {
+ DseSession session = cluster.newSession();
+ logger.info("cluster-metadata-allhosts:\n" + session.getCluster().getMetadata().getAllHosts());
+ return session;
+ } catch (Exception e) {
+ logger.error("Error while creating a session for dsegraph: " + e.toString(), e);
+ throw e;
+ }
+
+ }
+
+ @Override
+ public void onActivityDefUpdate(ActivityDef activityDef) {
+ super.onActivityDefUpdate(activityDef);
+
+ ParameterMap params = activityDef.getParams();
+ GraphOptions options = cluster.getConfiguration().getGraphOptions();
+
+ params.getOptionalString("graphlanguage").ifPresent(options::setGraphLanguage);
+ params.getOptionalString("graphname").ifPresent(options::setGraphName);
+ params.getOptionalString("graphsource").ifPresent(options::setGraphSource);
+
+ params.getOptionalString("graph_read_cl").ifPresent(
+ s -> options.setGraphReadConsistencyLevel(ConsistencyLevel.valueOf(s))
+ );
+
+ params.getOptionalString("graph_write_cl").ifPresent(
+ s -> options.setGraphWriteConsistencyLevel(ConsistencyLevel.valueOf(s))
+ );
+
+ params.getOptionalLong("graph_write_cl").ifPresent(
+ i -> options.setReadTimeoutMillis(i.intValue())
+ );
+
+ }
+
+ public ExceptionMeterMetrics getExceptionCountMetrics() {
+ return exceptionMeterMetrics;
+ }
+
+ /**
+ * Return the stride as configured in the activity parameters. This only
+ * available activity init()
+ *
+ * @return long stride
+ */
+ public long getStride() {
+ return stride;
+ }
+
+ public OpSequence getOpSequence() {
+ return this.opsequence.transform(ReadyGraphStatementTemplate::resolve);
+ }
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActivityType.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActivityType.java
new file mode 100644
index 000000000..848e2754d
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphActivityType.java
@@ -0,0 +1,25 @@
+package com.datastax.ebdrivers.dsegraph;
+
+import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
+import io.nosqlbench.engine.api.activityapi.core.ActivityType;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.nb.annotations.Service;
+
+@Service(ActivityType.class)
+public class GraphActivityType implements ActivityType {
+
+ @Override
+ public String getName() {
+ return "dsegraph";
+ }
+
+ @Override
+ public GraphActivity getActivity(ActivityDef activityDef) {
+ return new GraphActivity(activityDef);
+ }
+
+ @Override
+ public ActionDispenser getActionDispenser(GraphActivity activity) {
+ return new GraphActionDispenser(activity);
+ }
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphStmtParser.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphStmtParser.java
new file mode 100644
index 000000000..f3a0653d5
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/GraphStmtParser.java
@@ -0,0 +1,59 @@
+package com.datastax.ebdrivers.dsegraph;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+public class GraphStmtParser {
+
+ private final static Pattern stmtToken = Pattern.compile("\\?(\\w+[-_\\d\\w]*)|\\{(\\w+[-_\\d\\w.]*)}");
+
+ public static List getFields(String statement, Map bindings) {
+
+ ArrayList fields = new ArrayList<>();
+ Matcher m = stmtToken.matcher(statement);
+ while (m.find()) {
+ String namedAnchor = m.group(1);
+ if (namedAnchor==null) {
+ namedAnchor=m.group(2);
+ if (namedAnchor==null) {
+ throw new RuntimeException("Pattern '" + stmtToken.pattern() + "' failed to match '" + statement +"'");
+ }
+ }
+ if (!bindings.containsKey(namedAnchor)) {
+ throw new RuntimeException("Named anchor " + namedAnchor + " not found in bindings!");
+ }
+ fields.add(namedAnchor);
+ }
+ return fields;
+ }
+
+ public static String getCookedRepeatedStatement(String statement, int repeat) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < repeat; i++) {
+ String varSuffix = String.valueOf(i);
+ String indexedStmt = getCookedSuffixedStatement(statement, varSuffix);
+ sb.append(indexedStmt);
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ public static String getCookedStatement(String statement) {
+ String replaced = stmtToken.matcher(statement).replaceAll("$1$2");
+ return replaced;
+ }
+
+ public static List getCookedStatements(List statements) {
+ return statements.stream().map(GraphStmtParser::getCookedStatement).collect(Collectors.toList());
+ }
+
+ public static String getCookedSuffixedStatement(String statement, String suffix) {
+ String replaced = stmtToken.matcher(statement).replaceAll("$1" + suffix);
+ return replaced;
+ }
+
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/ErrorResponse.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/ErrorResponse.java
new file mode 100644
index 000000000..502724c7e
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/ErrorResponse.java
@@ -0,0 +1,15 @@
+package com.datastax.ebdrivers.dsegraph.errorhandling;
+
+/**
+ * When an error filter allows us to see and handle an error in a specific way,
+ * the ErrorResponse determine exactly how we handle it. Each level represents
+ * a starting point in handling, including everything after the starting point.
+ * The first enum is the most severe response.
+ */
+public enum ErrorResponse {
+ stop, // Rethrow this error to the runtime, forcing it to handle the error or stop
+ warn, // log a warning with some details about this error
+ retry, // resubmit this operation up to the available tries
+ count, // count this metric separatelycount, // count this metric separately
+ ignore // do nothing
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/GraphErrorHandler.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/GraphErrorHandler.java
new file mode 100644
index 000000000..5549b302a
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/GraphErrorHandler.java
@@ -0,0 +1,112 @@
+package com.datastax.ebdrivers.dsegraph.errorhandling;
+
+import com.datastax.driver.core.exceptions.*;
+import com.datastax.driver.dse.graph.GraphStatement;
+import io.nosqlbench.engine.api.metrics.ExceptionMeterMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+
+@SuppressWarnings("Duplicates")
+public class GraphErrorHandler {
+ private final static Logger logger = LoggerFactory.getLogger(GraphErrorHandler.class);
+
+ private final ErrorResponse realErrorResponse;
+ // private final ErrorResponse unappliedResponse;
+ private final ErrorResponse retryableResponse;
+ private ExceptionMeterMetrics exceptionMeterMetrics;
+
+ public GraphErrorHandler(
+ ErrorResponse realErrorResponse,
+ // ErrorResponse unappliedResponse,
+ ErrorResponse retryableResponse,
+ ExceptionMeterMetrics exceptionMeterMetrics) {
+ this.realErrorResponse = realErrorResponse;
+ // this.unappliedResponse = unappliedResponse;
+ this.retryableResponse = retryableResponse;
+ this.exceptionMeterMetrics = exceptionMeterMetrics;
+ }
+
+ /**
+ * @param e Exception to be handled
+ * @param statement statement that yielded the exception
+ * @param cycle the input cycle that made the statement
+ * @return true, if the error handler determines that a retry is needed
+ */
+ public boolean HandleError(Exception e, GraphStatement statement, long cycle) {
+ boolean retry = false;
+
+ try {
+ if (e != null) {
+ throw e;
+ }
+ } catch ( ExecutionException |
+ InvalidQueryException | ReadFailureException | WriteFailureException
+ | SyntaxError realerror) {
+
+ if (e instanceof SyntaxError) {
+ logger.error("Syntax error:" + GraphQueryStringMapper.getQueryString(statement));
+ }
+
+ switch (realErrorResponse) {
+ case stop:
+ logger.error("error with cycle " + cycle + ": " + e.getMessage());
+ e.printStackTrace();
+ throw new RuntimeException(realerror);
+ case warn:
+ logger.warn("error with cycle " + cycle + ": " + e.getMessage());
+ case retry:
+ retry = true;
+ case count:
+ exceptionMeterMetrics.count(realerror);
+ case ignore:
+ default:
+ break;
+ }
+
+ } catch (NoHostAvailableException | UnavailableException | OperationTimedOutException | OverloadedException
+ | WriteTimeoutException | ReadTimeoutException retryable) {
+ // retryable errors
+ switch (retryableResponse) {
+ case stop:
+ logger.error("error with cycle " + cycle + ": " + e.getMessage());
+ e.printStackTrace();
+ throw retryable;
+ case warn:
+ logger.warn("error with cycle " + cycle + ": " + e.getMessage());
+ case retry:
+ retry = true;
+ case count:
+ exceptionMeterMetrics.count(retryable);
+ case ignore:
+ default:
+ break;
+ }
+ }
+// catch (ChangeUnappliedException cua) {
+// boolean retry = false;
+// switch (retryableResponse) {
+// case stop:
+// throw cua;
+// case warn:
+// logger.warn("error with cycle " + cycle + ": " + e.getMessage());
+// case retry:
+// retry = true;
+// case count:
+// exceptionCountMetrics.count(cua);
+// case ignore:
+// default:
+// break;
+// }
+// return retry;
+// }
+ catch (Exception unknown) {
+ throw new RuntimeException(
+ "Unrecognized exception in error handler:"
+ + unknown.getClass().getCanonicalName() + ": " + unknown.getMessage(), unknown
+ );
+ }
+ return retry;
+ }
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/GraphQueryStringMapper.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/GraphQueryStringMapper.java
new file mode 100644
index 000000000..afb9e05a8
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/errorhandling/GraphQueryStringMapper.java
@@ -0,0 +1,18 @@
+package com.datastax.ebdrivers.dsegraph.errorhandling;
+
+import com.datastax.driver.dse.graph.GraphStatement;
+import com.datastax.driver.dse.graph.RegularGraphStatement;
+import com.datastax.driver.dse.graph.SimpleGraphStatement;
+
+public class GraphQueryStringMapper {
+ public static String getQueryString(GraphStatement statement) {
+ String queryString;
+ if (statement instanceof RegularGraphStatement) {
+ queryString = ((SimpleGraphStatement) statement).getQueryString();
+ } else {
+ queryString = "(ERROR) Unknown statement type: " + statement.getClass().getCanonicalName();
+ }
+ return queryString;
+ }
+
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/BindableGraphStatement.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/BindableGraphStatement.java
new file mode 100644
index 000000000..4759f7aea
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/BindableGraphStatement.java
@@ -0,0 +1,7 @@
+package com.datastax.ebdrivers.dsegraph.statements;
+
+import com.datastax.driver.dse.graph.SimpleGraphStatement;
+
+public interface BindableGraphStatement {
+ SimpleGraphStatement bind(long value);
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/BindableGraphStatementsTemplate.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/BindableGraphStatementsTemplate.java
new file mode 100644
index 000000000..2d96a7a1d
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/BindableGraphStatementsTemplate.java
@@ -0,0 +1,22 @@
+package com.datastax.ebdrivers.dsegraph.statements;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class BindableGraphStatementsTemplate {
+ private List templateList = new ArrayList<>();
+
+ public void addTemplate(ReadyGraphStatementTemplate template) {
+ this.templateList.add(template);
+ }
+
+ public List resolve() {
+ return templateList.stream().map(ReadyGraphStatementTemplate::resolve)
+ .collect(Collectors.toList());
+ }
+
+ public int size() {
+ return templateList.size();
+ }
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/ReadyGraphStatement.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/ReadyGraphStatement.java
new file mode 100644
index 000000000..81ca54e08
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/ReadyGraphStatement.java
@@ -0,0 +1,18 @@
+package com.datastax.ebdrivers.dsegraph.statements;
+
+import com.datastax.driver.dse.graph.SimpleGraphStatement;
+import io.nosqlbench.virtdata.core.bindings.ContextualBindings;
+
+public class ReadyGraphStatement implements BindableGraphStatement {
+
+ private ContextualBindings contextualBindings;
+
+ public ReadyGraphStatement(ContextualBindings contextualBindings) {
+ this.contextualBindings = contextualBindings;
+ }
+
+ @Override
+ public SimpleGraphStatement bind(long value) {
+ return contextualBindings.bind(value);
+ }
+}
diff --git a/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/ReadyGraphStatementTemplate.java b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/ReadyGraphStatementTemplate.java
new file mode 100644
index 000000000..440af68f6
--- /dev/null
+++ b/driver-dsegraph-shaded/src/main/java/com/datastax/ebdrivers/dsegraph/statements/ReadyGraphStatementTemplate.java
@@ -0,0 +1,78 @@
+package com.datastax.ebdrivers.dsegraph.statements;
+
+import com.datastax.driver.dse.graph.SimpleGraphStatement;
+import io.nosqlbench.virtdata.core.bindings.Bindings;
+import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
+import io.nosqlbench.virtdata.core.bindings.ContextualBindingsTemplate;
+import io.nosqlbench.virtdata.core.bindings.ValuesBinder;
+import io.nosqlbench.virtdata.core.templates.BindPoint;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ReadyGraphStatementTemplate {
+
+ private ContextualBindingsTemplate contextualBindingsTemplate;
+ private String name;
+ private String[] fields;
+
+ public ReadyGraphStatementTemplate(String name, String stmtTemplate, List bindPoints, String[] fields) {
+ this.name = name;
+ SimpleGraphStatement simpleGraphStatement = new SimpleGraphStatement(stmtTemplate);
+ BindingsTemplate bindingsTemplate = new BindingsTemplate(bindPoints);
+ contextualBindingsTemplate = new ContextualBindingsTemplate<>(
+ simpleGraphStatement, bindingsTemplate ,
+ new ParameterizedGraphStatementValuesBinder(fields)
+ );
+ }
+
+ public ReadyGraphStatementTemplate(String name, String stmtTemplate, List bindPoints, String[] fields, int repeat) {
+ this.name = name;
+ SimpleGraphStatement simpleGraphStatement = new SimpleGraphStatement(stmtTemplate);
+ BindingsTemplate bindingsTemplate = new BindingsTemplate(bindPoints);
+
+ contextualBindingsTemplate = new ContextualBindingsTemplate<>(simpleGraphStatement, bindingsTemplate , new ParameterizedIteratedGraphStatementValuesBinder(fields, repeat));
+ }
+
+ public static class ParameterizedIteratedGraphStatementValuesBinder implements ValuesBinder {
+
+ private final String[] fields;
+ private int repeat;
+
+ public ParameterizedIteratedGraphStatementValuesBinder(String[] fields, int repeat) {
+ this.fields = fields;
+ this.repeat = repeat;
+ }
+
+ @Override
+ public SimpleGraphStatement bindValues(SimpleGraphStatement context, Bindings bindings, long cycle) {
+ Map iteratedSuffixMap = bindings.getIteratedSuffixMap(cycle, repeat, fields);
+ return new SimpleGraphStatement(context.getQueryString(), iteratedSuffixMap);
+ }
+ }
+
+ public static class ParameterizedGraphStatementValuesBinder
+ implements ValuesBinder {
+ private final String[] fields;
+ private final Map valuesMap = new HashMap();
+ private final ThreadLocal