mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Initial Migration of DSE Graph
This commit is contained in:
parent
fbecd599ed
commit
2390f25351
266
driver-dsegraph-shaded/pom.xml
Normal file
266
driver-dsegraph-shaded/pom.xml
Normal file
@ -0,0 +1,266 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>mvn-defaults</artifactId>
|
||||
<version>3.12.155-SNAPSHOT</version>
|
||||
<relativePath>../mvn-defaults</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>driver-dsegraph-shaded</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
|
||||
<description>
|
||||
A DSE Graph ActivityType driver for nosqlbench, based on http://nosqlbench.io/
|
||||
</description>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- core dependencies -->
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>engine-api</artifactId>
|
||||
<version>3.12.155-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>drivers-api</artifactId>
|
||||
<version>3.12.155-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.datastax.dse</groupId>
|
||||
<artifactId>dse-java-driver-graph</artifactId>
|
||||
<version>1.9.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.datastax.dse</groupId>
|
||||
<artifactId>dse-java-driver-core</artifactId>
|
||||
<version>1.9.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.datastax.dse</groupId>
|
||||
<artifactId>dse-java-driver-extras</artifactId>
|
||||
<version>1.9.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.datastax.dse</groupId>
|
||||
<artifactId>dse-java-driver-mapping</artifactId>
|
||||
<version>1.9.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For CQL compression option -->
|
||||
<dependency>
|
||||
<groupId>org.lz4</groupId>
|
||||
<artifactId>lz4-java</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- For CQL compression option -->
|
||||
<dependency>
|
||||
<groupId>org.xerial.snappy</groupId>
|
||||
<artifactId>snappy-java</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.antlr</groupId>
|
||||
<artifactId>antlr4-runtime</artifactId>
|
||||
<version>4.8</version>
|
||||
</dependency>
|
||||
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>io.netty</groupId>-->
|
||||
<!-- <artifactId>netty-transport-native-epoll</artifactId>-->
|
||||
<!-- <version>4.1.47.Final</version>-->
|
||||
<!-- <classifier>linux-x86_64</classifier>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<!-- test only scope -->
|
||||
|
||||
<!-- This is added as shaded to satisfy old jmx reporting dependencies-->
|
||||
|
||||
<dependency>
|
||||
<groupId>io.dropwizard.metrics</groupId>
|
||||
<artifactId>metrics-core</artifactId>
|
||||
<version>3.2.2</version>
|
||||
</dependency>
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>org.apache.commons</groupId>-->
|
||||
<!-- <artifactId>commons-lang3</artifactId>-->
|
||||
<!-- <version>3.7</version>-->
|
||||
<!-- </dependency>-->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.25</version>
|
||||
</dependency>
|
||||
|
||||
<!-- test only scope -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.testng</groupId>
|
||||
<artifactId>testng</artifactId>
|
||||
<version>6.13.1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core-java8</artifactId>
|
||||
<version>1.0.0m1</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- compile only scope -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.yaml</groupId>
|
||||
<artifactId>snakeyaml</artifactId>
|
||||
<version>1.23</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
|
||||
<!--
|
||||
If this plugin is re-enabled, the local CQL grammar will
|
||||
be overwritten. The grammar has some syntax issues, so
|
||||
fixes will be made to it before it is submitted back.
|
||||
|
||||
(lack of composite key syntax, nested type syntax, etc)
|
||||
-->
|
||||
<!-- <plugin>-->
|
||||
<!-- <groupId>com.googlecode.maven-download-plugin</groupId>-->
|
||||
<!-- <artifactId>download-maven-plugin</artifactId>-->
|
||||
<!-- <version>1.4.0</version>-->
|
||||
<!-- <executions>-->
|
||||
<!-- <execution>-->
|
||||
<!-- <id>get-cql-lexer</id>-->
|
||||
<!-- <phase>generate-sources</phase>-->
|
||||
<!-- <goals>-->
|
||||
<!-- <goal>wget</goal>-->
|
||||
<!-- </goals>-->
|
||||
<!-- <configuration>-->
|
||||
<!-- <url>-->
|
||||
<!-- https://raw.githubusercontent.com/antlr/grammars-v4/master/cql3/CqlLexer.g4-->
|
||||
<!-- </url>-->
|
||||
<!-- <outputFileName>CqlLexer.g4</outputFileName>-->
|
||||
<!-- <outputDirectory>src/main/grammars/cql3/-->
|
||||
<!-- </outputDirectory>-->
|
||||
<!-- </configuration>-->
|
||||
<!-- </execution>-->
|
||||
<!-- <execution>-->
|
||||
<!-- <id>get-cql-parser</id>-->
|
||||
<!-- <phase>generate-sources</phase>-->
|
||||
<!-- <goals>-->
|
||||
<!-- <goal>wget</goal>-->
|
||||
<!-- </goals>-->
|
||||
<!-- <configuration>-->
|
||||
<!-- <url>-->
|
||||
<!-- https://raw.githubusercontent.com/antlr/grammars-v4/master/cql3/CqlParser.g4-->
|
||||
<!-- </url>-->
|
||||
<!-- <outputFileName>CqlParser.g4</outputFileName>-->
|
||||
<!-- <outputDirectory>src/main/grammars/cql3/-->
|
||||
<!-- </outputDirectory>-->
|
||||
<!-- </configuration>-->
|
||||
<!-- </execution>-->
|
||||
<!-- </executions>-->
|
||||
<!-- </plugin>-->
|
||||
|
||||
<plugin>
|
||||
<groupId>org.antlr</groupId>
|
||||
<artifactId>antlr4-maven-plugin</artifactId>
|
||||
<version>4.8</version>
|
||||
<configuration>
|
||||
<sourceDirectory>src/main/grammars/cql3
|
||||
</sourceDirectory>
|
||||
<arguments>
|
||||
<argument>-package</argument>
|
||||
<argument>io.nosqlbench.generators.cql.generated
|
||||
</argument>
|
||||
</arguments>
|
||||
<outputDirectory>
|
||||
src/main/java/io/nosqlbench/generators/cql/generated
|
||||
</outputDirectory>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>antlr</id>
|
||||
<goals>
|
||||
<goal>antlr4</goal>
|
||||
</goals>
|
||||
<phase>generate-sources</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-shade-plugin</artifactId>
|
||||
<version>3.2.3</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>shade</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<createDependencyReducedPom>false</createDependencyReducedPom>
|
||||
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>
|
||||
<createSourcesJar>true</createSourcesJar>
|
||||
<!-- <shadedArtifactAttached>true</shadedArtifactAttached>-->
|
||||
<!-- <shadedClassifierName>shaded</shadedClassifierName>-->
|
||||
<relocations>
|
||||
<relocation>
|
||||
<pattern>com.google.common</pattern>
|
||||
<shadedPattern>com.datastax.internal.com_google_common</shadedPattern>
|
||||
</relocation>
|
||||
<!-- <relocation>-->
|
||||
<!-- <pattern>com.datastax</pattern>-->
|
||||
<!-- <shadedPattern>dse19.com.datastax</shadedPattern>-->
|
||||
<!-- </relocation>-->
|
||||
<!-- <relocation>-->
|
||||
<!-- <pattern>io.netty</pattern>-->
|
||||
<!-- <shadedPattern>dse19.io.netty</shadedPattern>-->
|
||||
<!-- </relocation>-->
|
||||
</relocations>
|
||||
<artifactSet>
|
||||
<includes>
|
||||
<include>*:*</include>
|
||||
</includes>
|
||||
</artifactSet>
|
||||
<transformers combine.children="append">
|
||||
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
|
||||
<mainClass>io.nosqlbench.engine.cli.NBCLI</mainClass>
|
||||
</transformer>
|
||||
</transformers>
|
||||
<!-- <finalName>${project.artifactId}</finalName>-->
|
||||
<filters>
|
||||
<filter>
|
||||
<artifact>*:*</artifact>
|
||||
<excludes>
|
||||
<exclude>META-INF/*.SF</exclude>
|
||||
<exclude>META-INF/*.DSA</exclude>
|
||||
<exclude>META-INF/*.RSA</exclude>
|
||||
</excludes>
|
||||
</filter>
|
||||
</filters>
|
||||
</configuration>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
</project>
|
@ -0,0 +1,123 @@
|
||||
package com.datastax.ebdrivers.dsegraph;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datastax.driver.dse.graph.GraphResultSet;
|
||||
import com.datastax.driver.dse.graph.SimpleGraphStatement;
|
||||
import com.datastax.ebdrivers.dsegraph.errorhandling.ErrorResponse;
|
||||
import com.datastax.ebdrivers.dsegraph.errorhandling.GraphErrorHandler;
|
||||
import com.datastax.ebdrivers.dsegraph.statements.BindableGraphStatement;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
public class GraphAction implements SyncAction, ActivityDefObserver {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(GraphAction.class);
|
||||
List<BindableGraphStatement> readyGraphStmts;
|
||||
private int slot;
|
||||
private GraphActivity activity;
|
||||
private int maxTries = 10;
|
||||
private boolean showstmts;
|
||||
private GraphErrorHandler graphErrorHandler;
|
||||
private ErrorResponse retryableResponse;
|
||||
private ErrorResponse realErrorResponse;
|
||||
private OpSequence<BindableGraphStatement> opSequencer;
|
||||
|
||||
public GraphAction(int slot, GraphActivity activity) {
|
||||
this.slot = slot;
|
||||
this.activity = activity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
onActivityDefUpdate(activity.getActivityDef());
|
||||
}
|
||||
|
||||
@Override
|
||||
public int runCycle(long cycleValue) {
|
||||
|
||||
int tries = 0;
|
||||
BindableGraphStatement readyGraphStatement;
|
||||
SimpleGraphStatement simpleGraphStatement;
|
||||
ListenableFuture<GraphResultSet> resultSetFuture;
|
||||
|
||||
try (Timer.Context graphOpTime = activity.logicalGraphOps.time()) {
|
||||
|
||||
try (Timer.Context bindTime = activity.bindTimer.time()) {
|
||||
|
||||
BindableGraphStatement bindableGraphStatement = opSequencer.get(cycleValue);
|
||||
simpleGraphStatement = bindableGraphStatement.bind(cycleValue);
|
||||
|
||||
if (showstmts) {
|
||||
logger.info("GRAPH QUERY(cycle=" + cycleValue + "):\n" + simpleGraphStatement.getQueryString());
|
||||
}
|
||||
}
|
||||
|
||||
while (tries < maxTries) {
|
||||
tries++;
|
||||
|
||||
try (Timer.Context executeTime = activity.executeTimer.time()) {
|
||||
resultSetFuture = activity.getSession().executeGraphAsync(simpleGraphStatement);
|
||||
}
|
||||
|
||||
try (Timer.Context resultTime = activity.resultTimer.time()) {
|
||||
|
||||
GraphResultSet resultSet = resultSetFuture.get();
|
||||
break; // This is normal termination of this loop, when retries aren't needed
|
||||
} catch (Exception e) {
|
||||
if (!graphErrorHandler.HandleError(e, simpleGraphStatement, cycleValue)) {
|
||||
e.printStackTrace();
|
||||
logger.error(e.toString(),e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
activity.triesHisto.update(tries);
|
||||
return 0;
|
||||
|
||||
// ReadyGraphStmt = activity.get
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
|
||||
this.maxTries = activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
|
||||
this.showstmts = activityDef.getParams().getOptionalBoolean("showcql").orElse(false);
|
||||
|
||||
boolean diagnose = activityDef.getParams().getOptionalBoolean("diagnose").orElse(false);
|
||||
|
||||
if (diagnose) {
|
||||
logger.warn("You are wiring all error handlers to stop for any exception." +
|
||||
" This is useful for setup and troubleshooting, but unlikely to" +
|
||||
" be useful for long-term or bulk testing, as retryable errors" +
|
||||
" are normal in a busy system.");
|
||||
this.realErrorResponse = this.retryableResponse = ErrorResponse.stop;
|
||||
} else {
|
||||
String realErrorsSpec = activityDef.getParams()
|
||||
.getOptionalString("realerrors").orElse(ErrorResponse.stop.toString());
|
||||
this.realErrorResponse = ErrorResponse.valueOf(realErrorsSpec);
|
||||
|
||||
String retryableSpec = activityDef.getParams()
|
||||
.getOptionalString("retryable").orElse(ErrorResponse.retry.toString());
|
||||
|
||||
this.retryableResponse = ErrorResponse.valueOf(retryableSpec);
|
||||
}
|
||||
graphErrorHandler = new GraphErrorHandler(
|
||||
realErrorResponse,
|
||||
retryableResponse,
|
||||
activity.getExceptionCountMetrics());
|
||||
|
||||
this.opSequencer = activity.getOpSequence();
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.datastax.ebdrivers.dsegraph;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.Action;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
|
||||
public class GraphActionDispenser implements ActionDispenser {
|
||||
|
||||
private GraphActivity activity;
|
||||
|
||||
public GraphActionDispenser(GraphActivity activity) {
|
||||
this.activity = activity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Action getAction(int slot) {
|
||||
return new GraphAction(slot, activity);
|
||||
}
|
||||
}
|
@ -0,0 +1,291 @@
|
||||
package com.datastax.ebdrivers.dsegraph;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datastax.driver.core.ConsistencyLevel;
|
||||
import com.datastax.driver.core.ProtocolOptions;
|
||||
import com.datastax.driver.dse.DseCluster;
|
||||
import com.datastax.driver.dse.DseSession;
|
||||
import com.datastax.driver.dse.graph.GraphOptions;
|
||||
import com.datastax.driver.dse.graph.GraphProtocol;
|
||||
import com.datastax.ebdrivers.dsegraph.statements.BindableGraphStatement;
|
||||
import com.datastax.ebdrivers.dsegraph.statements.ReadyGraphStatementTemplate;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
|
||||
import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
|
||||
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.engine.api.metrics.ExceptionMeterMetrics;
|
||||
import io.nosqlbench.engine.api.scripting.NashornEvaluator;
|
||||
import io.nosqlbench.engine.api.templating.StrInterpolator;
|
||||
import io.nosqlbench.engine.api.util.TagFilter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
public class GraphActivity extends SimpleActivity implements ActivityDefObserver {
|
||||
private final static Logger logger = LoggerFactory.getLogger(GraphActivity.class);
|
||||
|
||||
public Timer bindTimer;
|
||||
public Timer executeTimer;
|
||||
public Timer resultTimer;
|
||||
public Timer logicalGraphOps;
|
||||
public Histogram triesHisto;
|
||||
protected List<OpTemplate> stmts;
|
||||
private int stride;
|
||||
private DseSession session;
|
||||
private DseCluster cluster;
|
||||
private ExceptionMeterMetrics exceptionMeterMetrics;
|
||||
private OpSequence<ReadyGraphStatementTemplate> opsequence;
|
||||
|
||||
public GraphActivity(ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
StrInterpolator interp = new StrInterpolator(activityDef);
|
||||
String yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload").orElse("default");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initActivity() {
|
||||
logger.debug("initializing activity: " + this.activityDef.getAlias());
|
||||
exceptionMeterMetrics = new ExceptionMeterMetrics(activityDef);
|
||||
|
||||
stride = activityDef.getParams().getOptionalInteger("stride").orElse(1);
|
||||
cluster = createCluster();
|
||||
session = createSession();
|
||||
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result");
|
||||
triesHisto = ActivityMetrics.histogram(activityDef, "tries");
|
||||
logicalGraphOps = ActivityMetrics.timer(activityDef, "graphops");
|
||||
|
||||
this.opsequence = initSequencer();
|
||||
setDefaultsFromOpSequence(this.opsequence);
|
||||
|
||||
onActivityDefUpdate(activityDef);
|
||||
}
|
||||
|
||||
private OpSequence<ReadyGraphStatementTemplate> initSequencer() {
|
||||
SequencerType sequencerType = SequencerType.valueOf(
|
||||
getParams().getOptionalString("seq").orElse("bucket")
|
||||
);
|
||||
SequencePlanner<ReadyGraphStatementTemplate> planner = new SequencePlanner<>(sequencerType);
|
||||
|
||||
String yaml_loc = activityDef.getParams().getOptionalString("yaml","workload").orElse("default");
|
||||
StrInterpolator interp = new StrInterpolator(activityDef);
|
||||
StmtsDocList unfiltered = StatementsLoader.loadPath(logger, yaml_loc, interp, "activities");
|
||||
|
||||
// log tag filtering results
|
||||
String tagfilter = activityDef.getParams().getOptionalString("tags").orElse("");
|
||||
TagFilter tagFilter = new TagFilter(tagfilter);
|
||||
unfiltered.getStmts().stream().map(tagFilter::matchesTaggedResult).forEach(r -> logger.info(r.getLog()));
|
||||
|
||||
stmts = unfiltered.getStmts(tagfilter);
|
||||
|
||||
if (stmts.size() == 0) {
|
||||
throw new RuntimeException("There were no unfiltered statements found for this activity.");
|
||||
}
|
||||
|
||||
for (OpTemplate stmtDef : stmts) {
|
||||
|
||||
ParsedStmt parsed = stmtDef.getParsed().orError();
|
||||
|
||||
ReadyGraphStatementTemplate readyGraphStatement;
|
||||
long ratio = Long.valueOf(stmtDef.getParams().getOrDefault("ratio", "1").toString());
|
||||
Optional<Integer> repeat = Optional.ofNullable(stmtDef.getParams().get("repeat"))
|
||||
.map(String::valueOf)
|
||||
.map(Integer::valueOf);
|
||||
if (repeat.isPresent()) {
|
||||
readyGraphStatement = new ReadyGraphStatementTemplate(
|
||||
stmtDef.getName(),
|
||||
GraphStmtParser.getCookedRepeatedStatement(stmtDef.getStmt(), repeat.get()),
|
||||
stmtDef.getParsed().getBindPoints(),
|
||||
GraphStmtParser.getFields(stmtDef.getStmt(), stmtDef.getBindings()).toArray(new String[0]),
|
||||
repeat.get());
|
||||
} else {
|
||||
readyGraphStatement = new ReadyGraphStatementTemplate(
|
||||
stmtDef.getName(),
|
||||
GraphStmtParser.getCookedStatement(stmtDef.getStmt()),
|
||||
stmtDef.getParsed().getBindPoints(),
|
||||
GraphStmtParser.getFields(stmtDef.getStmt(), stmtDef.getBindings()).toArray(new String[0]));
|
||||
}
|
||||
planner.addOp(readyGraphStatement, ratio);
|
||||
}
|
||||
|
||||
if (getActivityDef().getCycleCount() == 0) {
|
||||
getActivityDef().setCycles(String.valueOf(stmts.size()));
|
||||
}
|
||||
|
||||
OpSequence<ReadyGraphStatementTemplate> sequencer = planner.resolve();
|
||||
return sequencer;
|
||||
}
|
||||
|
||||
public DseSession getSession() {
|
||||
return session;
|
||||
}
|
||||
|
||||
private DseCluster createCluster() {
|
||||
|
||||
String host = activityDef.getParams().getOptionalString("host").orElse("localhost");
|
||||
int port = activityDef.getParams().getOptionalInteger("port").orElse(9042);
|
||||
|
||||
DseCluster.Builder builder = DseCluster.builder()
|
||||
.withPort(port)
|
||||
.withCompression(ProtocolOptions.Compression.NONE);
|
||||
|
||||
DseCluster.Builder finalBuilder = builder;
|
||||
List<String> hosts = activityDef.getParams().getOptionalString("host", "hosts")
|
||||
.map(s -> Arrays.asList(s.split(",")))
|
||||
.orElse(List.of("localhost"));
|
||||
|
||||
for (String h : hosts) {
|
||||
logger.debug("adding host as contact point: " + h);
|
||||
builder.addContactPoint(h);
|
||||
}
|
||||
|
||||
Optional<String> usernameOpt = activityDef.getParams().getOptionalString("username");
|
||||
Optional<String> passwordOpt = activityDef.getParams().getOptionalString("password");
|
||||
Optional<String> passfileOpt = activityDef.getParams().getOptionalString("passfile");
|
||||
|
||||
if (usernameOpt.isPresent()) {
|
||||
String username = usernameOpt.get();
|
||||
String password;
|
||||
if (passwordOpt.isPresent()) {
|
||||
password = passwordOpt.get();
|
||||
} else if (passfileOpt.isPresent()) {
|
||||
Path path = Paths.get(passfileOpt.get());
|
||||
try {
|
||||
password = Files.readAllLines(path).get(0);
|
||||
} catch (IOException e) {
|
||||
String error = "Error while reading password from file:" + passfileOpt;
|
||||
logger.error(error, e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
} else {
|
||||
String error = "username is present, but neither password nor passfile are defined.";
|
||||
logger.error(error);
|
||||
throw new RuntimeException(error);
|
||||
}
|
||||
builder.withCredentials(username, password);
|
||||
}
|
||||
|
||||
Optional<String> clusteropts = activityDef.getParams().getOptionalString("cbopts");
|
||||
if (clusteropts.isPresent()) {
|
||||
try {
|
||||
logger.info("applying cbopts:" + clusteropts.get());
|
||||
NashornEvaluator<DseCluster.Builder> clusterEval = new NashornEvaluator<>(DseCluster.Builder.class);
|
||||
clusterEval.put("builder", builder);
|
||||
String importEnv =
|
||||
"load(\"nashorn:mozilla_compat.js\");\n" +
|
||||
" importPackage(com.google.common.collect.Lists);\n" +
|
||||
" importPackage(com.google.common.collect.Maps);\n" +
|
||||
" importPackage(com.datastax.driver);\n" +
|
||||
" importPackage(com.datastax.driver.core);\n" +
|
||||
" importPackage(com.datastax.driver.core.policies);\n" +
|
||||
"builder" + clusteropts.get() + "\n";
|
||||
clusterEval.script(importEnv);
|
||||
builder = clusterEval.eval();
|
||||
logger.info("successfully applied:" + clusteropts.get());
|
||||
} catch (Exception e) {
|
||||
logger.error("Unable to evaluate: " + clusteropts.get() + " in script context:" + e.getMessage());
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
cluster = builder.build();
|
||||
} catch (Exception e) {
|
||||
logger.error("Error while instantiating cluster from builder: " + e.toString(), e);
|
||||
throw e;
|
||||
}
|
||||
activityDef.getParams().getOptionalBoolean("defaultidempotence").map(
|
||||
b -> cluster.getConfiguration().getQueryOptions().setDefaultIdempotence(b)
|
||||
);
|
||||
|
||||
String graphson_version = activityDef.getParams().getOptionalString("graphson").orElse("2");
|
||||
switch (Integer.valueOf(graphson_version)) {
|
||||
case 1:
|
||||
cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_1_0);
|
||||
break;
|
||||
case 2:
|
||||
cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_2_0);
|
||||
break;
|
||||
}
|
||||
cluster.getConfiguration().getGraphOptions().setGraphSubProtocol(GraphProtocol.GRAPHSON_2_0);
|
||||
return cluster;
|
||||
}
|
||||
|
||||
private DseSession createSession() {
|
||||
|
||||
try {
|
||||
DseSession session = cluster.newSession();
|
||||
logger.info("cluster-metadata-allhosts:\n" + session.getCluster().getMetadata().getAllHosts());
|
||||
return session;
|
||||
} catch (Exception e) {
|
||||
logger.error("Error while creating a session for dsegraph: " + e.toString(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
super.onActivityDefUpdate(activityDef);
|
||||
|
||||
ParameterMap params = activityDef.getParams();
|
||||
GraphOptions options = cluster.getConfiguration().getGraphOptions();
|
||||
|
||||
params.getOptionalString("graphlanguage").ifPresent(options::setGraphLanguage);
|
||||
params.getOptionalString("graphname").ifPresent(options::setGraphName);
|
||||
params.getOptionalString("graphsource").ifPresent(options::setGraphSource);
|
||||
|
||||
params.getOptionalString("graph_read_cl").ifPresent(
|
||||
s -> options.setGraphReadConsistencyLevel(ConsistencyLevel.valueOf(s))
|
||||
);
|
||||
|
||||
params.getOptionalString("graph_write_cl").ifPresent(
|
||||
s -> options.setGraphWriteConsistencyLevel(ConsistencyLevel.valueOf(s))
|
||||
);
|
||||
|
||||
params.getOptionalLong("graph_write_cl").ifPresent(
|
||||
i -> options.setReadTimeoutMillis(i.intValue())
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
public ExceptionMeterMetrics getExceptionCountMetrics() {
|
||||
return exceptionMeterMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the stride as configured in the activity parameters. This only
|
||||
* available activity init()
|
||||
*
|
||||
* @return long stride
|
||||
*/
|
||||
public long getStride() {
|
||||
return stride;
|
||||
}
|
||||
|
||||
public OpSequence<BindableGraphStatement> getOpSequence() {
|
||||
return this.opsequence.transform(ReadyGraphStatementTemplate::resolve);
|
||||
}
|
||||
}
|
@ -0,0 +1,25 @@
|
||||
package com.datastax.ebdrivers.dsegraph;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
|
||||
@Service(ActivityType.class)
|
||||
public class GraphActivityType implements ActivityType<GraphActivity> {
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "dsegraph";
|
||||
}
|
||||
|
||||
@Override
|
||||
public GraphActivity getActivity(ActivityDef activityDef) {
|
||||
return new GraphActivity(activityDef);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionDispenser getActionDispenser(GraphActivity activity) {
|
||||
return new GraphActionDispenser(activity);
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
package com.datastax.ebdrivers.dsegraph;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class GraphStmtParser {
|
||||
|
||||
private final static Pattern stmtToken = Pattern.compile("\\?(\\w+[-_\\d\\w]*)|\\{(\\w+[-_\\d\\w.]*)}");
|
||||
|
||||
public static List<String> getFields(String statement, Map<String, String> bindings) {
|
||||
|
||||
ArrayList<String> fields = new ArrayList<>();
|
||||
Matcher m = stmtToken.matcher(statement);
|
||||
while (m.find()) {
|
||||
String namedAnchor = m.group(1);
|
||||
if (namedAnchor==null) {
|
||||
namedAnchor=m.group(2);
|
||||
if (namedAnchor==null) {
|
||||
throw new RuntimeException("Pattern '" + stmtToken.pattern() + "' failed to match '" + statement +"'");
|
||||
}
|
||||
}
|
||||
if (!bindings.containsKey(namedAnchor)) {
|
||||
throw new RuntimeException("Named anchor " + namedAnchor + " not found in bindings!");
|
||||
}
|
||||
fields.add(namedAnchor);
|
||||
}
|
||||
return fields;
|
||||
}
|
||||
|
||||
public static String getCookedRepeatedStatement(String statement, int repeat) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < repeat; i++) {
|
||||
String varSuffix = String.valueOf(i);
|
||||
String indexedStmt = getCookedSuffixedStatement(statement, varSuffix);
|
||||
sb.append(indexedStmt);
|
||||
sb.append("\n");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public static String getCookedStatement(String statement) {
|
||||
String replaced = stmtToken.matcher(statement).replaceAll("$1$2");
|
||||
return replaced;
|
||||
}
|
||||
|
||||
public static List<String> getCookedStatements(List<String> statements) {
|
||||
return statements.stream().map(GraphStmtParser::getCookedStatement).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public static String getCookedSuffixedStatement(String statement, String suffix) {
|
||||
String replaced = stmtToken.matcher(statement).replaceAll("$1" + suffix);
|
||||
return replaced;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,15 @@
|
||||
package com.datastax.ebdrivers.dsegraph.errorhandling;
|
||||
|
||||
/**
|
||||
* When an error filter allows us to see and handle an error in a specific way,
|
||||
* the ErrorResponse determine exactly how we handle it. Each level represents
|
||||
* a starting point in handling, including everything after the starting point.
|
||||
* The first enum is the most severe response.
|
||||
*/
|
||||
public enum ErrorResponse {
|
||||
stop, // Rethrow this error to the runtime, forcing it to handle the error or stop
|
||||
warn, // log a warning with some details about this error
|
||||
retry, // resubmit this operation up to the available tries
|
||||
count, // count this metric separatelycount, // count this metric separately
|
||||
ignore // do nothing
|
||||
}
|
@ -0,0 +1,112 @@
|
||||
package com.datastax.ebdrivers.dsegraph.errorhandling;
|
||||
|
||||
import com.datastax.driver.core.exceptions.*;
|
||||
import com.datastax.driver.dse.graph.GraphStatement;
|
||||
import io.nosqlbench.engine.api.metrics.ExceptionMeterMetrics;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@SuppressWarnings("Duplicates")
|
||||
public class GraphErrorHandler {
|
||||
private final static Logger logger = LoggerFactory.getLogger(GraphErrorHandler.class);
|
||||
|
||||
private final ErrorResponse realErrorResponse;
|
||||
// private final ErrorResponse unappliedResponse;
|
||||
private final ErrorResponse retryableResponse;
|
||||
private ExceptionMeterMetrics exceptionMeterMetrics;
|
||||
|
||||
public GraphErrorHandler(
|
||||
ErrorResponse realErrorResponse,
|
||||
// ErrorResponse unappliedResponse,
|
||||
ErrorResponse retryableResponse,
|
||||
ExceptionMeterMetrics exceptionMeterMetrics) {
|
||||
this.realErrorResponse = realErrorResponse;
|
||||
// this.unappliedResponse = unappliedResponse;
|
||||
this.retryableResponse = retryableResponse;
|
||||
this.exceptionMeterMetrics = exceptionMeterMetrics;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param e Exception to be handled
|
||||
* @param statement statement that yielded the exception
|
||||
* @param cycle the input cycle that made the statement
|
||||
* @return true, if the error handler determines that a retry is needed
|
||||
*/
|
||||
public boolean HandleError(Exception e, GraphStatement statement, long cycle) {
|
||||
boolean retry = false;
|
||||
|
||||
try {
|
||||
if (e != null) {
|
||||
throw e;
|
||||
}
|
||||
} catch ( ExecutionException |
|
||||
InvalidQueryException | ReadFailureException | WriteFailureException
|
||||
| SyntaxError realerror) {
|
||||
|
||||
if (e instanceof SyntaxError) {
|
||||
logger.error("Syntax error:" + GraphQueryStringMapper.getQueryString(statement));
|
||||
}
|
||||
|
||||
switch (realErrorResponse) {
|
||||
case stop:
|
||||
logger.error("error with cycle " + cycle + ": " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
throw new RuntimeException(realerror);
|
||||
case warn:
|
||||
logger.warn("error with cycle " + cycle + ": " + e.getMessage());
|
||||
case retry:
|
||||
retry = true;
|
||||
case count:
|
||||
exceptionMeterMetrics.count(realerror);
|
||||
case ignore:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
} catch (NoHostAvailableException | UnavailableException | OperationTimedOutException | OverloadedException
|
||||
| WriteTimeoutException | ReadTimeoutException retryable) {
|
||||
// retryable errors
|
||||
switch (retryableResponse) {
|
||||
case stop:
|
||||
logger.error("error with cycle " + cycle + ": " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
throw retryable;
|
||||
case warn:
|
||||
logger.warn("error with cycle " + cycle + ": " + e.getMessage());
|
||||
case retry:
|
||||
retry = true;
|
||||
case count:
|
||||
exceptionMeterMetrics.count(retryable);
|
||||
case ignore:
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
// catch (ChangeUnappliedException cua) {
|
||||
// boolean retry = false;
|
||||
// switch (retryableResponse) {
|
||||
// case stop:
|
||||
// throw cua;
|
||||
// case warn:
|
||||
// logger.warn("error with cycle " + cycle + ": " + e.getMessage());
|
||||
// case retry:
|
||||
// retry = true;
|
||||
// case count:
|
||||
// exceptionCountMetrics.count(cua);
|
||||
// case ignore:
|
||||
// default:
|
||||
// break;
|
||||
// }
|
||||
// return retry;
|
||||
// }
|
||||
catch (Exception unknown) {
|
||||
throw new RuntimeException(
|
||||
"Unrecognized exception in error handler:"
|
||||
+ unknown.getClass().getCanonicalName() + ": " + unknown.getMessage(), unknown
|
||||
);
|
||||
}
|
||||
return retry;
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.datastax.ebdrivers.dsegraph.errorhandling;
|
||||
|
||||
import com.datastax.driver.dse.graph.GraphStatement;
|
||||
import com.datastax.driver.dse.graph.RegularGraphStatement;
|
||||
import com.datastax.driver.dse.graph.SimpleGraphStatement;
|
||||
|
||||
public class GraphQueryStringMapper {
|
||||
public static String getQueryString(GraphStatement statement) {
|
||||
String queryString;
|
||||
if (statement instanceof RegularGraphStatement) {
|
||||
queryString = ((SimpleGraphStatement) statement).getQueryString();
|
||||
} else {
|
||||
queryString = "(ERROR) Unknown statement type: " + statement.getClass().getCanonicalName();
|
||||
}
|
||||
return queryString;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
package com.datastax.ebdrivers.dsegraph.statements;
|
||||
|
||||
import com.datastax.driver.dse.graph.SimpleGraphStatement;
|
||||
|
||||
public interface BindableGraphStatement {
|
||||
SimpleGraphStatement bind(long value);
|
||||
}
|
@ -0,0 +1,22 @@
|
||||
package com.datastax.ebdrivers.dsegraph.statements;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class BindableGraphStatementsTemplate {
|
||||
private List<ReadyGraphStatementTemplate> templateList = new ArrayList<>();
|
||||
|
||||
public void addTemplate(ReadyGraphStatementTemplate template) {
|
||||
this.templateList.add(template);
|
||||
}
|
||||
|
||||
public List<BindableGraphStatement> resolve() {
|
||||
return templateList.stream().map(ReadyGraphStatementTemplate::resolve)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return templateList.size();
|
||||
}
|
||||
}
|
@ -0,0 +1,18 @@
|
||||
package com.datastax.ebdrivers.dsegraph.statements;
|
||||
|
||||
import com.datastax.driver.dse.graph.SimpleGraphStatement;
|
||||
import io.nosqlbench.virtdata.core.bindings.ContextualBindings;
|
||||
|
||||
public class ReadyGraphStatement implements BindableGraphStatement {
|
||||
|
||||
private ContextualBindings<SimpleGraphStatement, SimpleGraphStatement> contextualBindings;
|
||||
|
||||
public ReadyGraphStatement(ContextualBindings<SimpleGraphStatement, SimpleGraphStatement> contextualBindings) {
|
||||
this.contextualBindings = contextualBindings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleGraphStatement bind(long value) {
|
||||
return contextualBindings.bind(value);
|
||||
}
|
||||
}
|
@ -0,0 +1,78 @@
|
||||
package com.datastax.ebdrivers.dsegraph.statements;
|
||||
|
||||
import com.datastax.driver.dse.graph.SimpleGraphStatement;
|
||||
import io.nosqlbench.virtdata.core.bindings.Bindings;
|
||||
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
|
||||
import io.nosqlbench.virtdata.core.bindings.ContextualBindingsTemplate;
|
||||
import io.nosqlbench.virtdata.core.bindings.ValuesBinder;
|
||||
import io.nosqlbench.virtdata.core.templates.BindPoint;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ReadyGraphStatementTemplate {
|
||||
|
||||
private ContextualBindingsTemplate<SimpleGraphStatement,SimpleGraphStatement> contextualBindingsTemplate;
|
||||
private String name;
|
||||
private String[] fields;
|
||||
|
||||
public ReadyGraphStatementTemplate(String name, String stmtTemplate, List<BindPoint> bindPoints, String[] fields) {
|
||||
this.name = name;
|
||||
SimpleGraphStatement simpleGraphStatement = new SimpleGraphStatement(stmtTemplate);
|
||||
BindingsTemplate bindingsTemplate = new BindingsTemplate(bindPoints);
|
||||
contextualBindingsTemplate = new ContextualBindingsTemplate<>(
|
||||
simpleGraphStatement, bindingsTemplate ,
|
||||
new ParameterizedGraphStatementValuesBinder(fields)
|
||||
);
|
||||
}
|
||||
|
||||
public ReadyGraphStatementTemplate(String name, String stmtTemplate, List<BindPoint> bindPoints, String[] fields, int repeat) {
|
||||
this.name = name;
|
||||
SimpleGraphStatement simpleGraphStatement = new SimpleGraphStatement(stmtTemplate);
|
||||
BindingsTemplate bindingsTemplate = new BindingsTemplate(bindPoints);
|
||||
|
||||
contextualBindingsTemplate = new ContextualBindingsTemplate<>(simpleGraphStatement, bindingsTemplate , new ParameterizedIteratedGraphStatementValuesBinder(fields, repeat));
|
||||
}
|
||||
|
||||
public static class ParameterizedIteratedGraphStatementValuesBinder implements ValuesBinder<SimpleGraphStatement, SimpleGraphStatement> {
|
||||
|
||||
private final String[] fields;
|
||||
private int repeat;
|
||||
|
||||
public ParameterizedIteratedGraphStatementValuesBinder(String[] fields, int repeat) {
|
||||
this.fields = fields;
|
||||
this.repeat = repeat;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SimpleGraphStatement bindValues(SimpleGraphStatement context, Bindings bindings, long cycle) {
|
||||
Map<String, Object> iteratedSuffixMap = bindings.getIteratedSuffixMap(cycle, repeat, fields);
|
||||
return new SimpleGraphStatement(context.getQueryString(), iteratedSuffixMap);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ParameterizedGraphStatementValuesBinder
|
||||
implements ValuesBinder<SimpleGraphStatement, SimpleGraphStatement> {
|
||||
private final String[] fields;
|
||||
private final Map valuesMap = new HashMap();
|
||||
private final ThreadLocal<Map<String, Object>> mapTL;
|
||||
|
||||
public ParameterizedGraphStatementValuesBinder(String[] fields){
|
||||
this.fields = fields;
|
||||
for (String field : fields) {
|
||||
valuesMap.put(field, null);
|
||||
}
|
||||
mapTL = ThreadLocal.withInitial(() -> new HashMap<String, Object>(valuesMap));
|
||||
}
|
||||
@Override
|
||||
public SimpleGraphStatement bindValues(SimpleGraphStatement context, Bindings bindings, long cycle) {
|
||||
bindings.updateMap(mapTL.get(), cycle);
|
||||
return new SimpleGraphStatement(context.getQueryString(), mapTL.get());
|
||||
}
|
||||
}
|
||||
|
||||
public ReadyGraphStatement resolve() {
|
||||
return new ReadyGraphStatement(contextualBindingsTemplate.resolveBindings());
|
||||
}
|
||||
}
|
@ -0,0 +1 @@
|
||||
io.nosqlbench.nb.annotations.ServiceProcessor
|
@ -0,0 +1,67 @@
|
||||
scenarios:
|
||||
default:
|
||||
creategraph: run driver=dsegraph graphname=graph_wheels tags=phase:graph-schema
|
||||
schema: run driver=dsegraph graphname=graph_wheels tags=phase:graph-schema
|
||||
main: run driver==dsegraph graphname=graph_wheels tags=name:main-add cycles=100000
|
||||
devmode: run driver=dsegraph graphname=graph_wheels tags=name:dev-mode
|
||||
prodmode: run driver=dsegraph graphname=graph_wheels tags=name:dev-mode
|
||||
|
||||
bindings:
|
||||
sessionid: ToEpochTimeUUID()->java.util.UUID; ToString();
|
||||
deviceid: Add(200000); Div(<<sessons_per_device:10>>); ToEpochTimeUUID()->java.util.UUID; ToString();
|
||||
type: WeightedStrings('phone:10;computer:10;')
|
||||
os: WeightedStrings('android:6;ios:4;linux:2;osx:7;windows:3')
|
||||
osversion: WeightedStrings('nougat:3;oreo:1;jellybean:2;4:1;4c:1;5:1;5c:1;trusty:1;xenial:1;yosemite:1;el capitan:2;sierra:3;high sierra:1;7:1;10:2')
|
||||
ipaddress: Combinations('1;7;0-3;.;0-2;0-2;0-5;.;0-2;0-2;0-5')
|
||||
createdtime: Add(1505256898)
|
||||
|
||||
blocks:
|
||||
- name: create-graph
|
||||
tags:
|
||||
phase: create-graph
|
||||
statements:
|
||||
- creategraph: >-
|
||||
system.graph('<<graphname:graph_wheels>>').ifNotExists().create()
|
||||
- name: create-schema
|
||||
tags:
|
||||
phase: graph-schema
|
||||
statements:
|
||||
- graph-schema: >-
|
||||
schema.propertyKey('sessionid').Uuid().ifNotExists().create();
|
||||
schema.propertyKey('deviceid').Uuid().ifNotExists().create();
|
||||
schema.propertyKey('ipaddress').Text().ifNotExists().create();
|
||||
schema.propertyKey('createdtime').Bigint().ifNotExists().create();
|
||||
schema.vertexLabel('session').partitionKey('sessionid').properties('ipaddress', 'deviceid', 'createdtime').ifNotExists().create();
|
||||
schema.propertyKey('type').Text().ifNotExists().create();
|
||||
schema.propertyKey('os').Text().ifNotExists().create();
|
||||
schema.propertyKey('osversion').Text().ifNotExists().create();
|
||||
schema.vertexLabel('device').partitionKey('deviceid').properties('type', 'os', 'osversion').ifNotExists().create();
|
||||
schema.edgeLabel('using').single().connection('session','device').ifNotExists().create();
|
||||
tags:
|
||||
name: graph-schema
|
||||
- name: dev-mode
|
||||
tags:
|
||||
phase: dev-mode
|
||||
statements:
|
||||
- dev-mode: >-
|
||||
schema.config().option('graph.schema_mode').set('Development');
|
||||
tags:
|
||||
name: dev-mode
|
||||
- name: prod-mode
|
||||
tags:
|
||||
phase: prod-mode
|
||||
statements:
|
||||
- prod-mode: >-
|
||||
schema.config().option('graph.schema_mode').set('Production');
|
||||
tags:
|
||||
name: prod-mode
|
||||
- name: main
|
||||
tags:
|
||||
phase: main
|
||||
statements:
|
||||
- main-add: >-
|
||||
device = graph.addVertex(label, 'device','deviceid', {deviceid}, 'type', {type}, 'os', {os}, 'osversion', {osversion});
|
||||
session = graph.addVertex(label, 'session', 'sessionid', {sessionid}, 'ipaddress', {ipaddress}, 'deviceid', {deviceid}, 'createdtime', {createdtime});
|
||||
session.addEdge('using', device);
|
||||
tags:
|
||||
name: main-add
|
242
driver-dsegraph-shaded/src/main/resources/dsegraph.md
Normal file
242
driver-dsegraph-shaded/src/main/resources/dsegraph.md
Normal file
@ -0,0 +1,242 @@
|
||||
# dsegraph activity type
|
||||
|
||||
# warning; These docs are a work in progress
|
||||
This is an activity type which allows for the execution of workloads
|
||||
using DSE Graph and the DSE Java Driver.
|
||||
|
||||
This activity type is wired synchronously within each client
|
||||
thread, however the async API is used in order to expose fine-grain
|
||||
metrics about op binding, op submission, and waiting for a result.
|
||||
|
||||
## Example activity definitions
|
||||
|
||||
Run a dsegraph activity named 'a1', with definitions from activities/graphs.yaml
|
||||
~~~
|
||||
... type=dsegraph alias=a1 yaml=graphs
|
||||
~~~
|
||||
|
||||
Run a dsegraph activity defined by graphs.yaml, but with shortcut naming
|
||||
~~~
|
||||
... type=dsegraph yaml=graphs
|
||||
~~~
|
||||
|
||||
Only run statement groups which match a tag regex
|
||||
~~~
|
||||
... type=dsegraph yaml=graphs tags=group:'ddl.*'
|
||||
~~~
|
||||
|
||||
Run the matching 'dml' statements, with 100 cycles, from [1000..1100)
|
||||
~~~
|
||||
... type=dsegraph yaml=graphs tags=group:'dml.*' cycles=1000..11000
|
||||
~~~
|
||||
This last example shows that the cycle range is [inclusive..exclusive),
|
||||
to allow for stacking test intervals. This is standard across all
|
||||
activity types.
|
||||
|
||||
## dsegraph ActivityType Parameters
|
||||
|
||||
- **yaml** - The file which holds the schema and statement defs.
|
||||
(no default, required)
|
||||
~~~
|
||||
DOCS TBD FOR THIS SECTION
|
||||
- **cl** - An override to consistency levels for the activity. If
|
||||
this option is used, then all consistency levels will be replaced
|
||||
by this one for the current activity, and a log line explaining
|
||||
the difference with respect to the yaml will be emitted.
|
||||
This is not a dynamic parameter. It will only be applied at
|
||||
activity start.
|
||||
~~~~
|
||||
- **cbopts** - this is how you customize the cluster settings for
|
||||
the client, including policies, compression, etc. This is
|
||||
a string of *Java*-like method calls just as you would use them
|
||||
in the Cluster.Builder fluent API. They are evaluated inline
|
||||
with the default Cluster.Builder options not covered below.
|
||||
Example: cbopts=".withCompression(ProtocolOptions.Compression.NONE)"
|
||||
- **maxtries** - how many times an operation may be attempted
|
||||
~~~
|
||||
DOCS TBD FOR THIS SECTION
|
||||
- **diagnose** - if this is set to true, then any exception for an
|
||||
operation are thrown instead of handled internally. This can
|
||||
be useful for diagnosing exceptions during scenario development.
|
||||
In this version of ebdse, this is a shortcut for setting all the
|
||||
exception handlers to **stop**.
|
||||
~~~
|
||||
- **cycles** - standard, however the cql activity type will default
|
||||
this to however many statements are included in the current
|
||||
activity, after tag filtering, etc.
|
||||
- **username** - the user to authenticate as. This option requires
|
||||
that one of **password** or **passfile** also be defined.
|
||||
- **password** - the password to authenticate with. This will be
|
||||
ignored if passfile is also present.
|
||||
- **passfile** - the file to read the password from. The first
|
||||
line of this file is used as the password.
|
||||
- **alias** - this is a standard engineblock parameter, however
|
||||
the cql type will use the yaml value also as the alias value
|
||||
when not specified.
|
||||
- **graphson** - the version of the graphson protocol to use:
|
||||
default: 2
|
||||
|
||||
## Statement Parameters
|
||||
|
||||
- **repeat** - if specified, causes the statement blocks to be
|
||||
lexically repeated before being evaluated as statements,
|
||||
including enumerated bindings.
|
||||
|
||||
## Error Handling
|
||||
|
||||
#### Error Handlers
|
||||
|
||||
When an error occurs, you can control how it is handled.
|
||||
|
||||
This is the error handler stack:
|
||||
|
||||
- **stop** - causes the exception to be thrown to the runtime, forcing a shutdown.
|
||||
- **warn** - log a warning in the log, with details about the error and associated statement.
|
||||
- **count** - keep a count in metrics for the exception, under the name
|
||||
exceptions.classname, using the simple class name, of course.
|
||||
- **retry** - Retry the operation if the number of retries hasn't been
|
||||
used up.
|
||||
- **ignore** - do nothing, do not even retry or count
|
||||
|
||||
They are ordered from the most extreme to the most oblivious starting
|
||||
at the top. With the exception of the **stop** handler, the rest of
|
||||
them will be applied to an error all the way to the bottom. One way
|
||||
to choose the right handler is to say "How serious is this to the test
|
||||
run or the results of the test if it happens?" In general, it is best
|
||||
to be more conservative and choose a more aggressive setting unless you
|
||||
are specifically wanting to measure how often a given error happens,
|
||||
for example.
|
||||
|
||||
#### Error Types
|
||||
|
||||
The errors that can be detected are sorted into three categories:
|
||||
~~~
|
||||
DOCS TBD FOR THIS SECTION
|
||||
|
||||
- **unapplied** - This was a LWT that did not get applied. All operations
|
||||
are checked, and a ChangeUnapplied exception is thrown.
|
||||
(This is a local exception to make error handling consistent)
|
||||
This is a separate category from retryable, because you have to
|
||||
have reactive logic to properly submit a valid request when it occurs.
|
||||
~~~
|
||||
- **retryable** - NoHostAvailable, Overloaded, WriteTimeout, and
|
||||
ReadTimeout exceptions. These are all exceptions that might
|
||||
succeed if tried again with the same payload.
|
||||
- **realerrors** - ReadFailure, WriteFailure, SyntaxError, InvalidQuery.
|
||||
These represent errors that are likely a persistent issue, and
|
||||
will not likely succeed if tried again.
|
||||
|
||||
To set the error handling behavior, simply pair these categories up with
|
||||
an entry point in the error handler stack. Here is an example, showing
|
||||
also the defaults that are used if you do not specify otherwise:
|
||||
|
||||
retryable=retry realerror=stop
|
||||
|
||||
## Generic Parameters
|
||||
|
||||
*provided by the runtime*
|
||||
- **targetrate** - The target rate in ops/s
|
||||
- **linkinput** - if the name of another activity is specified, this activity
|
||||
will only go as fast as that one.
|
||||
- **tags** - optional filter for matching tags in yaml sections (detailed help
|
||||
link needed)
|
||||
- **threads** - the number of client threads driving this activity
|
||||
|
||||
## Metrics
|
||||
- \<alias\>.cycles - (provided by core input) A timer around the whole cycle
|
||||
- \<alias\>.bind - A timer which tracks the performance of the statement
|
||||
binding logic, including the generation of data immediately prior
|
||||
- \<alias\>.execute - A timer which tracks the performance of op submission
|
||||
only. This is the async execution call, broken out as a separate step.
|
||||
- \<alias\>.result - A timer which tracks the performance of an op result only.
|
||||
This is the async get on the future, broken out as a separate step.
|
||||
- \<alias\>.tries - A histogram of how many tries were required to get a
|
||||
completed operation
|
||||
|
||||
## YAML Format
|
||||
|
||||
The YAML file for a DSE Graph activity has one or more logical yaml documents,
|
||||
each separted by tree dashes: --- the standard yaml document separator. Each
|
||||
yaml document may contain a tags section for the purpose of including or
|
||||
excluding statements for a given activity:
|
||||
|
||||
~~~ (optional)
|
||||
tags:
|
||||
tagname: value
|
||||
...
|
||||
~~~
|
||||
If no tags are provided in a document section, then it will be matched by
|
||||
all possible tag filters. Conversely, if no tag filter is applied in
|
||||
the activity definition, all tagged documents will match.
|
||||
|
||||
Statements can be specified at the top level or within named blocks. When
|
||||
you have simple needs to just put a few statements into the yaml, the top-level
|
||||
style will suffice:
|
||||
|
||||
~~~
|
||||
name: statement-top-level-example
|
||||
statements:
|
||||
- statement 1
|
||||
- statement 2
|
||||
~~~
|
||||
|
||||
If you need to represent multiple blocks of statements in the same activity,
|
||||
you might want to group them into blocks:
|
||||
~~~
|
||||
blocks:
|
||||
- name: statement-block-1
|
||||
statements:
|
||||
- statement 1
|
||||
- statement 2
|
||||
~~~
|
||||
|
||||
At any level that you can specify statements, you can also specify data bindings:
|
||||
|
||||
~~~
|
||||
statements:
|
||||
- statement 1
|
||||
- statement 2
|
||||
bindings:
|
||||
bindto1: foo
|
||||
bindto2: bar
|
||||
|
||||
blocks:
|
||||
- name: statement-block-1
|
||||
statements:
|
||||
- statement 1
|
||||
bindings:
|
||||
bindto1: foo
|
||||
~~~
|
||||
|
||||
Data bindings specify how values are generated to plug into each operation. More
|
||||
details on data bindings are available in the activity usage guide.
|
||||
|
||||
### Parameter Templating
|
||||
|
||||
Double angle brackets may be used to drop parameters into the YAML
|
||||
arbitrarily. When the YAML file is loaded, and only then, these parameters
|
||||
are interpolated from activity parameters like those above. This allows you
|
||||
to create activity templates that can be customized simply by providing
|
||||
additional parameters to the activity. There are two forms,
|
||||
\<\<some_var_name:default_value\>\> and \<\<some_var_name\>\>. The first
|
||||
form contains a default value. In any case, if one of these parameters is
|
||||
encountered and a qualifying value is not found, an error will be thrown.
|
||||
|
||||
### YAML Location
|
||||
|
||||
The YAML file referenced in the yaml= parameter will be searched for in the following places, in this order:
|
||||
1. A URL, if it starts with 'http:' or 'https:'
|
||||
2. The local filesystem, if it exists there
|
||||
3. The internal classpath and assets in the ebdse jar.
|
||||
|
||||
The '.yaml' suffix is not required in the yaml= parameter, however it is
|
||||
required on the actual file. As well, the logical search path "activities/"
|
||||
will be used if necessary to locate the file, both on the filesystem and in
|
||||
the classpath.
|
||||
|
||||
This is a basic example below that can be copied as a starting template.
|
||||
|
||||
## YAML Example
|
||||
---
|
||||
CONTENT TBD
|
||||
|
@ -0,0 +1,25 @@
|
||||
tags:
|
||||
type: testtag
|
||||
kind: somekind
|
||||
oevure: bananas
|
||||
name: outerblock
|
||||
statements:
|
||||
- some foo
|
||||
- some bar
|
||||
bindings:
|
||||
bar: set-the-bar
|
||||
|
||||
blocks:
|
||||
- name: block1
|
||||
statements:
|
||||
- some foo
|
||||
- some bar
|
||||
- some baz
|
||||
bindings:
|
||||
aleph: null
|
||||
tags:
|
||||
some: tag
|
||||
- name: block2
|
||||
statements:
|
||||
- some baz
|
||||
- some zab
|
@ -99,6 +99,12 @@
|
||||
<version>3.12.155-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>driver-dsegraph-shaded</artifactId>
|
||||
<version>3.12.155-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>driver-cql-shaded</artifactId>
|
||||
|
Loading…
Reference in New Issue
Block a user