Merge pull request #227 from ds-steven-matison/main

Initial Migration of DSE Graph
This commit is contained in:
Jonathan Shook 2020-11-11 10:42:40 -06:00 committed by GitHub
commit a24a487135
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1394 additions and 0 deletions

View File

@ -0,0 +1,266 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>3.12.155-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-dsegraph-shaded</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A DSE Graph ActivityType driver for nosqlbench, based on http://nosqlbench.io/
</description>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>3.12.155-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>3.12.155-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-graph</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-core</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-extras</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-java-driver-mapping</artifactId>
<version>1.9.0</version>
</dependency>
<!-- For CQL compression option -->
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
</dependency>
<!-- For CQL compression option -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.8</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.netty</groupId>-->
<!-- <artifactId>netty-transport-native-epoll</artifactId>-->
<!-- <version>4.1.47.Final</version>-->
<!-- <classifier>linux-x86_64</classifier>-->
<!-- </dependency>-->
<!-- test only scope -->
<!-- This is added as shaded to satisfy old jmx reporting dependencies-->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.2.2</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.commons</groupId>-->
<!-- <artifactId>commons-lang3</artifactId>-->
<!-- <version>3.7</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- test only scope -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core-java8</artifactId>
<version>1.0.0m1</version>
<scope>test</scope>
</dependency>
<!-- compile only scope -->
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.23</version>
</dependency>
</dependencies>
<build>
<plugins>
<!--
If this plugin is re-enabled, the local CQL grammar will
be overwritten. The grammar has some syntax issues, so
fixes will be made to it before it is submitted back.
(lack of composite key syntax, nested type syntax, etc)
-->
<!-- <plugin>-->
<!-- <groupId>com.googlecode.maven-download-plugin</groupId>-->
<!-- <artifactId>download-maven-plugin</artifactId>-->
<!-- <version>1.4.0</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <id>get-cql-lexer</id>-->
<!-- <phase>generate-sources</phase>-->
<!-- <goals>-->
<!-- <goal>wget</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <url>-->
<!-- https://raw.githubusercontent.com/antlr/grammars-v4/master/cql3/CqlLexer.g4-->
<!-- </url>-->
<!-- <outputFileName>CqlLexer.g4</outputFileName>-->
<!-- <outputDirectory>src/main/grammars/cql3/-->
<!-- </outputDirectory>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- <execution>-->
<!-- <id>get-cql-parser</id>-->
<!-- <phase>generate-sources</phase>-->
<!-- <goals>-->
<!-- <goal>wget</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <url>-->
<!-- https://raw.githubusercontent.com/antlr/grammars-v4/master/cql3/CqlParser.g4-->
<!-- </url>-->
<!-- <outputFileName>CqlParser.g4</outputFileName>-->
<!-- <outputDirectory>src/main/grammars/cql3/-->
<!-- </outputDirectory>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>4.8</version>
<configuration>
<sourceDirectory>src/main/grammars/cql3
</sourceDirectory>
<arguments>
<argument>-package</argument>
<argument>io.nosqlbench.generators.cql.generated
</argument>
</arguments>
<outputDirectory>
src/main/java/io/nosqlbench/generators/cql/generated
</outputDirectory>
</configuration>
<executions>
<execution>
<id>antlr</id>
<goals>
<goal>antlr4</goal>
</goals>
<phase>generate-sources</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
<createSourcesJar>true</createSourcesJar>
<!-- <shadedArtifactAttached>true</shadedArtifactAttached>-->
<!-- <shadedClassifierName>shaded</shadedClassifierName>-->
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>com.datastax.internal.com_google_common</shadedPattern>
</relocation>
<!-- <relocation>-->
<!-- <pattern>com.datastax</pattern>-->
<!-- <shadedPattern>dse19.com.datastax</shadedPattern>-->
<!-- </relocation>-->
<!-- <relocation>-->
<!-- <pattern>io.netty</pattern>-->
<!-- <shadedPattern>dse19.io.netty</shadedPattern>-->
<!-- </relocation>-->
</relocations>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<transformers combine.children="append">
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>io.nosqlbench.engine.cli.NBCLI</mainClass>
</transformer>
</transformers>
<!-- <finalName>${project.artifactId}</finalName>-->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,123 @@
package com.datastax.ebdrivers.dsegraph;
import com.codahale.metrics.Timer;
import com.datastax.driver.dse.graph.GraphResultSet;
import com.datastax.driver.dse.graph.SimpleGraphStatement;
import com.datastax.ebdrivers.dsegraph.errorhandling.ErrorResponse;
import com.datastax.ebdrivers.dsegraph.errorhandling.GraphErrorHandler;
import com.datastax.ebdrivers.dsegraph.statements.BindableGraphStatement;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
@SuppressWarnings("Duplicates")
public class GraphAction implements SyncAction, ActivityDefObserver {
private static final Logger logger = LoggerFactory.getLogger(GraphAction.class);
List<BindableGraphStatement> readyGraphStmts;
private int slot;
private GraphActivity activity;
private int maxTries = 10;
private boolean showstmts;
private GraphErrorHandler graphErrorHandler;
private ErrorResponse retryableResponse;
private ErrorResponse realErrorResponse;
private OpSequence<BindableGraphStatement> opSequencer;
public GraphAction(int slot, GraphActivity activity) {
this.slot = slot;
this.activity = activity;
}
@Override
public void init() {
onActivityDefUpdate(activity.getActivityDef());
}
@Override
public int runCycle(long cycleValue) {
int tries = 0;
BindableGraphStatement readyGraphStatement;
SimpleGraphStatement simpleGraphStatement;
ListenableFuture<GraphResultSet> resultSetFuture;
try (Timer.Context graphOpTime = activity.logicalGraphOps.time()) {
try (Timer.Context bindTime = activity.bindTimer.time()) {
BindableGraphStatement bindableGraphStatement = opSequencer.get(cycleValue);
simpleGraphStatement = bindableGraphStatement.bind(cycleValue);
if (showstmts) {
logger.info("GRAPH QUERY(cycle=" + cycleValue + "):\n" + simpleGraphStatement.getQueryString());
}
}
while (tries < maxTries) {
tries++;
try (Timer.Context executeTime = activity.executeTimer.time()) {
resultSetFuture = activity.getSession().executeGraphAsync(simpleGraphStatement);
}
try (Timer.Context resultTime = activity.resultTimer.time()) {
GraphResultSet resultSet = resultSetFuture.get();
break; // This is normal termination of this loop, when retries aren't needed
} catch (Exception e) {
if (!graphErrorHandler.HandleError(e, simpleGraphStatement, cycleValue)) {
e.printStackTrace();
logger.error(e.toString(),e);
break;
}
}
}
}
activity.triesHisto.update(tries);
return 0;
// ReadyGraphStmt = activity.get
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
this.maxTries = activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
this.showstmts = activityDef.getParams().getOptionalBoolean("showcql").orElse(false);
boolean diagnose = activityDef.getParams().getOptionalBoolean("diagnose").orElse(false);
if (diagnose) {
logger.warn("You are wiring all error handlers to stop for any exception." +
" This is useful for setup and troubleshooting, but unlikely to" +
" be useful for long-term or bulk testing, as retryable errors" +
" are normal in a busy system.");
this.realErrorResponse = this.retryableResponse = ErrorResponse.stop;
} else {
String realErrorsSpec = activityDef.getParams()
.getOptionalString("realerrors").orElse(ErrorResponse.stop.toString());
this.realErrorResponse = ErrorResponse.valueOf(realErrorsSpec);
String retryableSpec = activityDef.getParams()
.getOptionalString("retryable").orElse(ErrorResponse.retry.toString());
this.retryableResponse = ErrorResponse.valueOf(retryableSpec);
}
graphErrorHandler = new GraphErrorHandler(
realErrorResponse,
retryableResponse,
activity.getExceptionCountMetrics());
this.opSequencer = activity.getOpSequence();
}
}

View File

@ -0,0 +1,18 @@
package com.datastax.ebdrivers.dsegraph;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
public class GraphActionDispenser implements ActionDispenser {
private GraphActivity activity;
public GraphActionDispenser(GraphActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new GraphAction(slot, activity);
}
}

View File

@ -0,0 +1,291 @@
package com.datastax.ebdrivers.dsegraph;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.dse.DseCluster;
import com.datastax.driver.dse.DseSession;
import com.datastax.driver.dse.graph.GraphOptions;
import com.datastax.driver.dse.graph.GraphProtocol;
import com.datastax.ebdrivers.dsegraph.statements.BindableGraphStatement;
import com.datastax.ebdrivers.dsegraph.statements.ReadyGraphStatementTemplate;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.metrics.ExceptionMeterMetrics;
import io.nosqlbench.engine.api.scripting.NashornEvaluator;
import io.nosqlbench.engine.api.templating.StrInterpolator;
import io.nosqlbench.engine.api.util.TagFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@SuppressWarnings("Duplicates")
public class GraphActivity extends SimpleActivity implements ActivityDefObserver {
private final static Logger logger = LoggerFactory.getLogger(GraphActivity.class);
public Timer bindTimer;
public Timer executeTimer;
public Timer resultTimer;
public Timer logicalGraphOps;
public Histogram triesHisto;
protected List<OpTemplate> stmts;
private int stride;
private DseSession session;
private DseCluster cluster;
private ExceptionMeterMetrics exceptionMeterMetrics;
private OpSequence<ReadyGraphStatementTemplate> opsequence;
public GraphActivity(ActivityDef activityDef) {
super(activityDef);
StrInterpolator interp = new StrInterpolator(activityDef);
String yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload").orElse("default");
}
@Override
public void initActivity() {
logger.debug("initializing activity: " + this.activityDef.getAlias());
exceptionMeterMetrics = new ExceptionMeterMetrics(activityDef);
stride = activityDef.getParams().getOptionalInteger("stride").orElse(1);
cluster = createCluster();
session = createSession();
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
resultTimer = ActivityMetrics.timer(activityDef, "result");
triesHisto = ActivityMetrics.histogram(activityDef, "tries");
logicalGraphOps = ActivityMetrics.timer(activityDef, "graphops");
this.opsequence = initSequencer();
setDefaultsFromOpSequence(this.opsequence);
onActivityDefUpdate(activityDef);
}
private OpSequence<ReadyGraphStatementTemplate> initSequencer() {
SequencerType sequencerType = SequencerType.valueOf(
getParams().getOptionalString("seq").orElse("bucket")
);
SequencePlanner<ReadyGraphStatementTemplate> planner = new SequencePlanner<>(sequencerType);
String yaml_loc = activityDef.getParams().getOptionalString("yaml","workload").orElse("default");
StrInterpolator interp = new StrInterpolator(activityDef);
StmtsDocList unfiltered = StatementsLoader.loadPath(logger, yaml_loc, interp, "activities");
// log tag filtering results
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
TagFilter tagFilter = new TagFilter(tagfilter);
unfiltered.getStmts().stream().map(tagFilter::matchesTaggedResult).forEach(r -> logger.info(r.getLog()));
stmts = unfiltered.getStmts(tagfilter);
if (stmts.size() == 0) {
throw new RuntimeException("There were no unfiltered statements found for this activity.");
}
for (OpTemplate stmtDef : stmts) {
ParsedStmt parsed = stmtDef.getParsed().orError();
ReadyGraphStatementTemplate readyGraphStatement;
long ratio = Long.valueOf(stmtDef.getParams().getOrDefault("ratio", "1").toString());
Optional<Integer> repeat = Optional.ofNullable(stmtDef.getParams().get("repeat"))
.map(String::valueOf)
.map(Integer::valueOf);
if (repeat.isPresent()) {
readyGraphStatement = new ReadyGraphStatementTemplate(
stmtDef.getName(),
GraphStmtParser.getCookedRepeatedStatement(stmtDef.getStmt(), repeat.get()),
stmtDef.getParsed().getBindPoints(),
GraphStmtParser.getFields(stmtDef.getStmt(), stmtDef.getBindings()).toArray(new String[0]),
repeat.get());
} else {
readyGraphStatement = new ReadyGraphStatementTemplate(
stmtDef.getName(),
GraphStmtParser.getCookedStatement(stmtDef.getStmt()),
stmtDef.getParsed().getBindPoints(),
GraphStmtParser.getFields(stmtDef.getStmt(), stmtDef.getBindings()).toArray(new String[0]));
}
planner.addOp(readyGraphStatement, ratio);
}
if (getActivityDef().getCycleCount() == 0) {
getActivityDef().setCycles(String.valueOf(stmts.size()));
}
OpSequence<ReadyGraphStatementTemplate> sequencer = planner.resolve();
return sequencer;
}
public DseSession getSession() {
return session;
}
private DseCluster createCluster() {
String host = activityDef.getParams().getOptionalString("host").orElse("localhost");
int port = activityDef.getParams().getOptionalInteger("port").orElse(9042);
DseCluster.Builder builder = DseCluster.builder()
.withPort(port)
.withCompression(ProtocolOptions.Compression.NONE);
DseCluster.Builder finalBuilder = builder;
List<String> hosts = activityDef.getParams().getOptionalString("host", "hosts")
.map(s -> Arrays.asList(s.split(",")))
.orElse(List.of("localhost"));
for (String h : hosts) {
logger.debug("adding host as contact point: " + h);
builder.addContactPoint(h);
}
Optional<String> usernameOpt = activityDef.getParams().getOptionalString("username");
Optional<String> passwordOpt = activityDef.getParams().getOptionalString("password");
Optional<String> passfileOpt = activityDef.getParams().getOptionalString("passfile");
if (usernameOpt.isPresent()) {
String username = usernameOpt.get();
String password;
if (passwordOpt.isPresent()) {
password = passwordOpt.get();
} else if (passfileOpt.isPresent()) {
Path path = Paths.get(passfileOpt.get());
try {
password = Files.readAllLines(path).get(0);
} catch (IOException e) {
String error = "Error while reading password from file:" + passfileOpt;
logger.error(error, e);
throw new RuntimeException(e);
}
} else {
String error = "username is present, but neither password nor passfile are defined.";
logger.error(error);
throw new RuntimeException(error);
}
builder.withCredentials(username, password);
}
Optional<String> clusteropts = activityDef.getParams().getOptionalString("cbopts");
if (clusteropts.isPresent()) {
try {
logger.info("applying cbopts:" + clusteropts.get());
NashornEvaluator<DseCluster.Builder> clusterEval = new NashornEvaluator<>(DseCluster.Builder.class);
clusterEval.put("builder", builder);
String importEnv =
"load(\"nashorn:mozilla_compat.js\");\n" +
" importPackage(com.google.common.collect.Lists);\n" +
" importPackage(com.google.common.collect.Maps);\n" +
" importPackage(com.datastax.driver);\n" +
" importPackage(com.datastax.driver.core);\n" +
" importPackage(com.datastax.driver.core.policies);\n" +
"builder" + clusteropts.get() + "\n";
clusterEval.script(importEnv);
builder = clusterEval.eval();
logger.info("successfully applied:" + clusteropts.get());
} catch (Exception e) {
logger.error("Unable to evaluate: " + clusteropts.get() + " in script context:" + e.getMessage());
throw e;
}
}
try {
cluster = builder.build();
} catch (Exception e) {
logger.error("Error while instantiating cluster from builder: " + e.toString(), e);
throw e;
}
activityDef.getParams().getOptionalBoolean("defaultidempotence").map(
b -> cluster.getConfiguration().getQueryOptions().setDefaultIdempotence(b)
);
String graphson_version = activityDef.getParams().getOptionalString("graphson").orElse("2");
switch (Integer.valueOf(graphson_version)) {
case 1:
cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_1_0);
break;
case 2:
cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_2_0);
break;
}
cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_2_0);
return cluster;
}
private DseSession createSession() {
try {
DseSession session = cluster.newSession();
logger.info("cluster-metadata-allhosts:\n" + session.getCluster().getMetadata().getAllHosts());
return session;
} catch (Exception e) {
logger.error("Error while creating a session for dsegraph: " + e.toString(), e);
throw e;
}
}
@Override
public void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
ParameterMap params = activityDef.getParams();
GraphOptions options = cluster.getConfiguration().getGraphOptions();
params.getOptionalString("graphlanguage").ifPresent(options::setGraphLanguage);
params.getOptionalString("graphname").ifPresent(options::setGraphName);
params.getOptionalString("graphsource").ifPresent(options::setGraphSource);
params.getOptionalString("graph_read_cl").ifPresent(
s -> options.setGraphReadConsistencyLevel(ConsistencyLevel.valueOf(s))
);
params.getOptionalString("graph_write_cl").ifPresent(
s -> options.setGraphWriteConsistencyLevel(ConsistencyLevel.valueOf(s))
);
params.getOptionalLong("graph_write_cl").ifPresent(
i -> options.setReadTimeoutMillis(i.intValue())
);
}
public ExceptionMeterMetrics getExceptionCountMetrics() {
return exceptionMeterMetrics;
}
/**
* Return the stride as configured in the activity parameters. This only
* available activity init()
*
* @return long stride
*/
public long getStride() {
return stride;
}
public OpSequence<BindableGraphStatement> getOpSequence() {
return this.opsequence.transform(ReadyGraphStatementTemplate::resolve);
}
}

View File

@ -0,0 +1,25 @@
package com.datastax.ebdrivers.dsegraph;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(ActivityType.class)
public class GraphActivityType implements ActivityType<GraphActivity> {
@Override
public String getName() {
return "dsegraph";
}
@Override
public GraphActivity getActivity(ActivityDef activityDef) {
return new GraphActivity(activityDef);
}
@Override
public ActionDispenser getActionDispenser(GraphActivity activity) {
return new GraphActionDispenser(activity);
}
}

View File

@ -0,0 +1,59 @@
package com.datastax.ebdrivers.dsegraph;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class GraphStmtParser {
private final static Pattern stmtToken = Pattern.compile("\\?(\\w+[-_\\d\\w]*)|\\{(\\w+[-_\\d\\w.]*)}");
public static List<String> getFields(String statement, Map<String, String> bindings) {
ArrayList<String> fields = new ArrayList<>();
Matcher m = stmtToken.matcher(statement);
while (m.find()) {
String namedAnchor = m.group(1);
if (namedAnchor==null) {
namedAnchor=m.group(2);
if (namedAnchor==null) {
throw new RuntimeException("Pattern '" + stmtToken.pattern() + "' failed to match '" + statement +"'");
}
}
if (!bindings.containsKey(namedAnchor)) {
throw new RuntimeException("Named anchor " + namedAnchor + " not found in bindings!");
}
fields.add(namedAnchor);
}
return fields;
}
public static String getCookedRepeatedStatement(String statement, int repeat) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < repeat; i++) {
String varSuffix = String.valueOf(i);
String indexedStmt = getCookedSuffixedStatement(statement, varSuffix);
sb.append(indexedStmt);
sb.append("\n");
}
return sb.toString();
}
public static String getCookedStatement(String statement) {
String replaced = stmtToken.matcher(statement).replaceAll("$1$2");
return replaced;
}
public static List<String> getCookedStatements(List<String> statements) {
return statements.stream().map(GraphStmtParser::getCookedStatement).collect(Collectors.toList());
}
public static String getCookedSuffixedStatement(String statement, String suffix) {
String replaced = stmtToken.matcher(statement).replaceAll("$1" + suffix);
return replaced;
}
}

View File

@ -0,0 +1,15 @@
package com.datastax.ebdrivers.dsegraph.errorhandling;
/**
* When an error filter allows us to see and handle an error in a specific way,
* the ErrorResponse determine exactly how we handle it. Each level represents
* a starting point in handling, including everything after the starting point.
* The first enum is the most severe response.
*/
public enum ErrorResponse {
stop, // Rethrow this error to the runtime, forcing it to handle the error or stop
warn, // log a warning with some details about this error
retry, // resubmit this operation up to the available tries
count, // count this metric separatelycount, // count this metric separately
ignore // do nothing
}

View File

@ -0,0 +1,112 @@
package com.datastax.ebdrivers.dsegraph.errorhandling;
import com.datastax.driver.core.exceptions.*;
import com.datastax.driver.dse.graph.GraphStatement;
import io.nosqlbench.engine.api.metrics.ExceptionMeterMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
@SuppressWarnings("Duplicates")
public class GraphErrorHandler {
private final static Logger logger = LoggerFactory.getLogger(GraphErrorHandler.class);
private final ErrorResponse realErrorResponse;
// private final ErrorResponse unappliedResponse;
private final ErrorResponse retryableResponse;
private ExceptionMeterMetrics exceptionMeterMetrics;
public GraphErrorHandler(
ErrorResponse realErrorResponse,
// ErrorResponse unappliedResponse,
ErrorResponse retryableResponse,
ExceptionMeterMetrics exceptionMeterMetrics) {
this.realErrorResponse = realErrorResponse;
// this.unappliedResponse = unappliedResponse;
this.retryableResponse = retryableResponse;
this.exceptionMeterMetrics = exceptionMeterMetrics;
}
/**
* @param e Exception to be handled
* @param statement statement that yielded the exception
* @param cycle the input cycle that made the statement
* @return true, if the error handler determines that a retry is needed
*/
public boolean HandleError(Exception e, GraphStatement statement, long cycle) {
boolean retry = false;
try {
if (e != null) {
throw e;
}
} catch ( ExecutionException |
InvalidQueryException | ReadFailureException | WriteFailureException
| SyntaxError realerror) {
if (e instanceof SyntaxError) {
logger.error("Syntax error:" + GraphQueryStringMapper.getQueryString(statement));
}
switch (realErrorResponse) {
case stop:
logger.error("error with cycle " + cycle + ": " + e.getMessage());
e.printStackTrace();
throw new RuntimeException(realerror);
case warn:
logger.warn("error with cycle " + cycle + ": " + e.getMessage());
case retry:
retry = true;
case count:
exceptionMeterMetrics.count(realerror);
case ignore:
default:
break;
}
} catch (NoHostAvailableException | UnavailableException | OperationTimedOutException | OverloadedException
| WriteTimeoutException | ReadTimeoutException retryable) {
// retryable errors
switch (retryableResponse) {
case stop:
logger.error("error with cycle " + cycle + ": " + e.getMessage());
e.printStackTrace();
throw retryable;
case warn:
logger.warn("error with cycle " + cycle + ": " + e.getMessage());
case retry:
retry = true;
case count:
exceptionMeterMetrics.count(retryable);
case ignore:
default:
break;
}
}
// catch (ChangeUnappliedException cua) {
// boolean retry = false;
// switch (retryableResponse) {
// case stop:
// throw cua;
// case warn:
// logger.warn("error with cycle " + cycle + ": " + e.getMessage());
// case retry:
// retry = true;
// case count:
// exceptionCountMetrics.count(cua);
// case ignore:
// default:
// break;
// }
// return retry;
// }
catch (Exception unknown) {
throw new RuntimeException(
"Unrecognized exception in error handler:"
+ unknown.getClass().getCanonicalName() + ": " + unknown.getMessage(), unknown
);
}
return retry;
}
}

View File

@ -0,0 +1,18 @@
package com.datastax.ebdrivers.dsegraph.errorhandling;
import com.datastax.driver.dse.graph.GraphStatement;
import com.datastax.driver.dse.graph.RegularGraphStatement;
import com.datastax.driver.dse.graph.SimpleGraphStatement;
public class GraphQueryStringMapper {
public static String getQueryString(GraphStatement statement) {
String queryString;
if (statement instanceof RegularGraphStatement) {
queryString = ((SimpleGraphStatement) statement).getQueryString();
} else {
queryString = "(ERROR) Unknown statement type: " + statement.getClass().getCanonicalName();
}
return queryString;
}
}

View File

@ -0,0 +1,7 @@
package com.datastax.ebdrivers.dsegraph.statements;
import com.datastax.driver.dse.graph.SimpleGraphStatement;
public interface BindableGraphStatement {
SimpleGraphStatement bind(long value);
}

View File

@ -0,0 +1,22 @@
package com.datastax.ebdrivers.dsegraph.statements;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class BindableGraphStatementsTemplate {
private List<ReadyGraphStatementTemplate> templateList = new ArrayList<>();
public void addTemplate(ReadyGraphStatementTemplate template) {
this.templateList.add(template);
}
public List<BindableGraphStatement> resolve() {
return templateList.stream().map(ReadyGraphStatementTemplate::resolve)
.collect(Collectors.toList());
}
public int size() {
return templateList.size();
}
}

View File

@ -0,0 +1,18 @@
package com.datastax.ebdrivers.dsegraph.statements;
import com.datastax.driver.dse.graph.SimpleGraphStatement;
import io.nosqlbench.virtdata.core.bindings.ContextualBindings;
public class ReadyGraphStatement implements BindableGraphStatement {
private ContextualBindings<SimpleGraphStatement, SimpleGraphStatement> contextualBindings;
public ReadyGraphStatement(ContextualBindings<SimpleGraphStatement, SimpleGraphStatement> contextualBindings) {
this.contextualBindings = contextualBindings;
}
@Override
public SimpleGraphStatement bind(long value) {
return contextualBindings.bind(value);
}
}

View File

@ -0,0 +1,78 @@
package com.datastax.ebdrivers.dsegraph.statements;
import com.datastax.driver.dse.graph.SimpleGraphStatement;
import io.nosqlbench.virtdata.core.bindings.Bindings;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.bindings.ContextualBindingsTemplate;
import io.nosqlbench.virtdata.core.bindings.ValuesBinder;
import io.nosqlbench.virtdata.core.templates.BindPoint;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReadyGraphStatementTemplate {
private ContextualBindingsTemplate<SimpleGraphStatement,SimpleGraphStatement> contextualBindingsTemplate;
private String name;
private String[] fields;
public ReadyGraphStatementTemplate(String name, String stmtTemplate, List<BindPoint> bindPoints, String[] fields) {
this.name = name;
SimpleGraphStatement simpleGraphStatement = new SimpleGraphStatement(stmtTemplate);
BindingsTemplate bindingsTemplate = new BindingsTemplate(bindPoints);
contextualBindingsTemplate = new ContextualBindingsTemplate<>(
simpleGraphStatement, bindingsTemplate ,
new ParameterizedGraphStatementValuesBinder(fields)
);
}
public ReadyGraphStatementTemplate(String name, String stmtTemplate, List<BindPoint> bindPoints, String[] fields, int repeat) {
this.name = name;
SimpleGraphStatement simpleGraphStatement = new SimpleGraphStatement(stmtTemplate);
BindingsTemplate bindingsTemplate = new BindingsTemplate(bindPoints);
contextualBindingsTemplate = new ContextualBindingsTemplate<>(simpleGraphStatement, bindingsTemplate , new ParameterizedIteratedGraphStatementValuesBinder(fields, repeat));
}
public static class ParameterizedIteratedGraphStatementValuesBinder implements ValuesBinder<SimpleGraphStatement, SimpleGraphStatement> {
private final String[] fields;
private int repeat;
public ParameterizedIteratedGraphStatementValuesBinder(String[] fields, int repeat) {
this.fields = fields;
this.repeat = repeat;
}
@Override
public SimpleGraphStatement bindValues(SimpleGraphStatement context, Bindings bindings, long cycle) {
Map<String, Object> iteratedSuffixMap = bindings.getIteratedSuffixMap(cycle, repeat, fields);
return new SimpleGraphStatement(context.getQueryString(), iteratedSuffixMap);
}
}
public static class ParameterizedGraphStatementValuesBinder
implements ValuesBinder<SimpleGraphStatement, SimpleGraphStatement> {
private final String[] fields;
private final Map valuesMap = new HashMap();
private final ThreadLocal<Map<String, Object>> mapTL;
public ParameterizedGraphStatementValuesBinder(String[] fields){
this.fields = fields;
for (String field : fields) {
valuesMap.put(field, null);
}
mapTL = ThreadLocal.withInitial(() -> new HashMap<String, Object>(valuesMap));
}
@Override
public SimpleGraphStatement bindValues(SimpleGraphStatement context, Bindings bindings, long cycle) {
bindings.updateMap(mapTL.get(), cycle);
return new SimpleGraphStatement(context.getQueryString(), mapTL.get());
}
}
public ReadyGraphStatement resolve() {
return new ReadyGraphStatement(contextualBindingsTemplate.resolveBindings());
}
}

View File

@ -0,0 +1 @@
io.nosqlbench.nb.annotations.ServiceProcessor

View File

@ -0,0 +1,67 @@
scenarios:
default:
creategraph: run driver=dsegraph graphname=graph_wheels tags=phase:graph-schema
schema: run driver=dsegraph graphname=graph_wheels tags=phase:graph-schema
main: run driver==dsegraph graphname=graph_wheels tags=name:main-add cycles=100000
devmode: run driver=dsegraph graphname=graph_wheels tags=name:dev-mode
prodmode: run driver=dsegraph graphname=graph_wheels tags=name:dev-mode
bindings:
sessionid: ToEpochTimeUUID()->java.util.UUID; ToString();
deviceid: Add(200000); Div(<<sessons_per_device:10>>); ToEpochTimeUUID()->java.util.UUID; ToString();
type: WeightedStrings('phone:10;computer:10;')
os: WeightedStrings('android:6;ios:4;linux:2;osx:7;windows:3')
osversion: WeightedStrings('nougat:3;oreo:1;jellybean:2;4:1;4c:1;5:1;5c:1;trusty:1;xenial:1;yosemite:1;el capitan:2;sierra:3;high sierra:1;7:1;10:2')
ipaddress: Combinations('1;7;0-3;.;0-2;0-2;0-5;.;0-2;0-2;0-5')
createdtime: Add(1505256898)
blocks:
- name: create-graph
tags:
phase: create-graph
statements:
- creategraph: >-
system.graph('<<graphname:graph_wheels>>').ifNotExists().create()
- name: create-schema
tags:
phase: graph-schema
statements:
- graph-schema: >-
schema.propertyKey('sessionid').Uuid().ifNotExists().create();
schema.propertyKey('deviceid').Uuid().ifNotExists().create();
schema.propertyKey('ipaddress').Text().ifNotExists().create();
schema.propertyKey('createdtime').Bigint().ifNotExists().create();
schema.vertexLabel('session').partitionKey('sessionid').properties('ipaddress', 'deviceid', 'createdtime').ifNotExists().create();
schema.propertyKey('type').Text().ifNotExists().create();
schema.propertyKey('os').Text().ifNotExists().create();
schema.propertyKey('osversion').Text().ifNotExists().create();
schema.vertexLabel('device').partitionKey('deviceid').properties('type', 'os', 'osversion').ifNotExists().create();
schema.edgeLabel('using').single().connection('session','device').ifNotExists().create();
tags:
name: graph-schema
- name: dev-mode
tags:
phase: dev-mode
statements:
- dev-mode: >-
schema.config().option('graph.schema_mode').set('Development');
tags:
name: dev-mode
- name: prod-mode
tags:
phase: prod-mode
statements:
- prod-mode: >-
schema.config().option('graph.schema_mode').set('Production');
tags:
name: prod-mode
- name: main
tags:
phase: main
statements:
- main-add: >-
device = graph.addVertex(label, 'device','deviceid', {deviceid}, 'type', {type}, 'os', {os}, 'osversion', {osversion});
session = graph.addVertex(label, 'session', 'sessionid', {sessionid}, 'ipaddress', {ipaddress}, 'deviceid', {deviceid}, 'createdtime', {createdtime});
session.addEdge('using', device);
tags:
name: main-add

View File

@ -0,0 +1,242 @@
# dsegraph activity type
# warning; These docs are a work in progress
This is an activity type which allows for the execution of workloads
using DSE Graph and the DSE Java Driver.
This activity type is wired synchronously within each client
thread, however the async API is used in order to expose fine-grain
metrics about op binding, op submission, and waiting for a result.
## Example activity definitions
Run a dsegraph activity named 'a1', with definitions from activities/graphs.yaml
~~~
... type=dsegraph alias=a1 yaml=graphs
~~~
Run a dsegraph activity defined by graphs.yaml, but with shortcut naming
~~~
... type=dsegraph yaml=graphs
~~~
Only run statement groups which match a tag regex
~~~
... type=dsegraph yaml=graphs tags=group:'ddl.*'
~~~
Run the matching 'dml' statements, with 100 cycles, from [1000..1100)
~~~
... type=dsegraph yaml=graphs tags=group:'dml.*' cycles=1000..11000
~~~
This last example shows that the cycle range is [inclusive..exclusive),
to allow for stacking test intervals. This is standard across all
activity types.
## dsegraph ActivityType Parameters
- **yaml** - The file which holds the schema and statement defs.
(no default, required)
~~~
DOCS TBD FOR THIS SECTION
- **cl** - An override to consistency levels for the activity. If
this option is used, then all consistency levels will be replaced
by this one for the current activity, and a log line explaining
the difference with respect to the yaml will be emitted.
This is not a dynamic parameter. It will only be applied at
activity start.
~~~~
- **cbopts** - this is how you customize the cluster settings for
the client, including policies, compression, etc. This is
a string of *Java*-like method calls just as you would use them
in the Cluster.Builder fluent API. They are evaluated inline
with the default Cluster.Builder options not covered below.
Example: cbopts=".withCompression(ProtocolOptions.Compression.NONE)"
- **maxtries** - how many times an operation may be attempted
~~~
DOCS TBD FOR THIS SECTION
- **diagnose** - if this is set to true, then any exception for an
operation are thrown instead of handled internally. This can
be useful for diagnosing exceptions during scenario development.
In this version of ebdse, this is a shortcut for setting all the
exception handlers to **stop**.
~~~
- **cycles** - standard, however the cql activity type will default
this to however many statements are included in the current
activity, after tag filtering, etc.
- **username** - the user to authenticate as. This option requires
that one of **password** or **passfile** also be defined.
- **password** - the password to authenticate with. This will be
ignored if passfile is also present.
- **passfile** - the file to read the password from. The first
line of this file is used as the password.
- **alias** - this is a standard engineblock parameter, however
the cql type will use the yaml value also as the alias value
when not specified.
- **graphson** - the version of the graphson protocol to use:
default: 2
## Statement Parameters
- **repeat** - if specified, causes the statement blocks to be
lexically repeated before being evaluated as statements,
including enumerated bindings.
## Error Handling
#### Error Handlers
When an error occurs, you can control how it is handled.
This is the error handler stack:
- **stop** - causes the exception to be thrown to the runtime, forcing a shutdown.
- **warn** - log a warning in the log, with details about the error and associated statement.
- **count** - keep a count in metrics for the exception, under the name
exceptions.classname, using the simple class name, of course.
- **retry** - Retry the operation if the number of retries hasn't been
used up.
- **ignore** - do nothing, do not even retry or count
They are ordered from the most extreme to the most oblivious starting
at the top. With the exception of the **stop** handler, the rest of
them will be applied to an error all the way to the bottom. One way
to choose the right handler is to say "How serious is this to the test
run or the results of the test if it happens?" In general, it is best
to be more conservative and choose a more aggressive setting unless you
are specifically wanting to measure how often a given error happens,
for example.
#### Error Types
The errors that can be detected are sorted into three categories:
~~~
DOCS TBD FOR THIS SECTION
- **unapplied** - This was a LWT that did not get applied. All operations
are checked, and a ChangeUnapplied exception is thrown.
(This is a local exception to make error handling consistent)
This is a separate category from retryable, because you have to
have reactive logic to properly submit a valid request when it occurs.
~~~
- **retryable** - NoHostAvailable, Overloaded, WriteTimeout, and
ReadTimeout exceptions. These are all exceptions that might
succeed if tried again with the same payload.
- **realerrors** - ReadFailure, WriteFailure, SyntaxError, InvalidQuery.
These represent errors that are likely a persistent issue, and
will not likely succeed if tried again.
To set the error handling behavior, simply pair these categories up with
an entry point in the error handler stack. Here is an example, showing
also the defaults that are used if you do not specify otherwise:
retryable=retry realerror=stop
## Generic Parameters
*provided by the runtime*
- **targetrate** - The target rate in ops/s
- **linkinput** - if the name of another activity is specified, this activity
will only go as fast as that one.
- **tags** - optional filter for matching tags in yaml sections (detailed help
link needed)
- **threads** - the number of client threads driving this activity
## Metrics
- \<alias\>.cycles - (provided by core input) A timer around the whole cycle
- \<alias\>.bind - A timer which tracks the performance of the statement
binding logic, including the generation of data immediately prior
- \<alias\>.execute - A timer which tracks the performance of op submission
only. This is the async execution call, broken out as a separate step.
- \<alias\>.result - A timer which tracks the performance of an op result only.
This is the async get on the future, broken out as a separate step.
- \<alias\>.tries - A histogram of how many tries were required to get a
completed operation
## YAML Format
The YAML file for a DSE Graph activity has one or more logical yaml documents,
each separted by tree dashes: --- the standard yaml document separator. Each
yaml document may contain a tags section for the purpose of including or
excluding statements for a given activity:
~~~ (optional)
tags:
tagname: value
...
~~~
If no tags are provided in a document section, then it will be matched by
all possible tag filters. Conversely, if no tag filter is applied in
the activity definition, all tagged documents will match.
Statements can be specified at the top level or within named blocks. When
you have simple needs to just put a few statements into the yaml, the top-level
style will suffice:
~~~
name: statement-top-level-example
statements:
- statement 1
- statement 2
~~~
If you need to represent multiple blocks of statements in the same activity,
you might want to group them into blocks:
~~~
blocks:
- name: statement-block-1
statements:
- statement 1
- statement 2
~~~
At any level that you can specify statements, you can also specify data bindings:
~~~
statements:
- statement 1
- statement 2
bindings:
bindto1: foo
bindto2: bar
blocks:
- name: statement-block-1
statements:
- statement 1
bindings:
bindto1: foo
~~~
Data bindings specify how values are generated to plug into each operation. More
details on data bindings are available in the activity usage guide.
### Parameter Templating
Double angle brackets may be used to drop parameters into the YAML
arbitrarily. When the YAML file is loaded, and only then, these parameters
are interpolated from activity parameters like those above. This allows you
to create activity templates that can be customized simply by providing
additional parameters to the activity. There are two forms,
\<\<some_var_name:default_value\>\> and \<\<some_var_name\>\>. The first
form contains a default value. In any case, if one of these parameters is
encountered and a qualifying value is not found, an error will be thrown.
### YAML Location
The YAML file referenced in the yaml= parameter will be searched for in the following places, in this order:
1. A URL, if it starts with 'http:' or 'https:'
2. The local filesystem, if it exists there
3. The internal classpath and assets in the ebdse jar.
The '.yaml' suffix is not required in the yaml= parameter, however it is
required on the actual file. As well, the logical search path "activities/"
will be used if necessary to locate the file, both on the filesystem and in
the classpath.
This is a basic example below that can be copied as a starting template.
## YAML Example
---
CONTENT TBD

View File

@ -0,0 +1,25 @@
tags:
type: testtag
kind: somekind
oevure: bananas
name: outerblock
statements:
- some foo
- some bar
bindings:
bar: set-the-bar
blocks:
- name: block1
statements:
- some foo
- some bar
- some baz
bindings:
aleph: null
tags:
some: tag
- name: block2
statements:
- some baz
- some zab

View File

@ -99,6 +99,12 @@
<version>3.12.155-SNAPSHOT</version> <version>3.12.155-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-dsegraph-shaded</artifactId>
<version>3.12.155-SNAPSHOT</version>
</dependency>
<dependency> <dependency>
<groupId>io.nosqlbench</groupId> <groupId>io.nosqlbench</groupId>
<artifactId>driver-cql-shaded</artifactId> <artifactId>driver-cql-shaded</artifactId>

View File

@ -46,6 +46,7 @@
<module>driver-stdout</module> <module>driver-stdout</module>
<module>driver-tcp</module> <module>driver-tcp</module>
<module>driver-http</module> <module>driver-http</module>
<module>driver-dsegraph-shaded</module>
<module>driver-cql-shaded</module> <module>driver-cql-shaded</module>
<module>driver-cqlverify</module> <module>driver-cqlverify</module>
<module>driver-web</module> <module>driver-web</module>