merge of nb5-preview onto main, main is now the feed for nb5-preview work

This commit is contained in:
Jonathan Shook 2022-03-07 16:41:25 -06:00
commit 4ffc05254e
723 changed files with 7120 additions and 25081 deletions

15
.github/workflows/blocking_issues.yml vendored Normal file
View File

@ -0,0 +1,15 @@
name: Blocking Issues
on:
issues:
types: [closed]
pull_request_target:
types: [opened, edited]
jobs:
blocking_issues:
runs-on: ubuntu-latest
name: Checks for blocking issues
steps:
- uses: Levi-Lesches/blocking-issues@v1.1

View File

@ -13,7 +13,7 @@ jobs:
- uses: actions/setup-java@v1
name: setup java
with:
java-version: '15'
java-version: '17'
java-package: jdk
architecture: x64

View File

@ -3,7 +3,7 @@ name: release
on:
push:
branches:
- nb4-maintenance
- nb5-preview
paths:
- RELEASENOTES.**
@ -18,7 +18,7 @@ jobs:
- name: setup java
uses: actions/setup-java@v1
with:
java-version: '15'
java-version: '17'
java-package: jdk
architecture: x64

View File

@ -1,4 +1,4 @@
FROM adoptopenjdk/openjdk15:alpine-slim
FROM eclipse-temurin:17-alpine
RUN apk --no-cache add curl
COPY nb/target/nb.jar nb.jar

View File

@ -1,6 +1 @@
- 2d833091a (HEAD -> main) unapply uniqueness constraints on template values
- 70789ee99 properly initialize grafana client for annotations
- 440df9541 (origin/main, origin/HEAD) Merge pull request #426 from nosqlbench/dependabot/maven/driver-cockroachdb/org.postgresql-postgresql-42.3.3
- 0ff75f20d (origin/dependabot/maven/driver-cockroachdb/org.postgresql-postgresql-42.3.3) Bump postgresql from 42.2.25 to 42.3.3 in /driver-cockroachdb
- a3c445ba1 Merge pull request #420 from eolivelli/fix/pulsar-remove-debug
- fcf5fd5ed Pulsar Driver: remove debug log that breaks Schema.AUTO_CONSUME() usage
- 4b6019e7d (HEAD -> j17-preview) add missing list templating logic to fix imports bug

View File

@ -1,16 +1,14 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.81-SNAPSHOT</version>
<version>4.17.10-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-cqld4</artifactId>
<artifactId>adapter-cqld4</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
@ -21,56 +19,59 @@
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>3.0.9</version>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>gremlin-core</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>tinkergraph-gremlin</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.81-SNAPSHOT</version>
<version>4.17.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.81-SNAPSHOT</version>
<version>4.17.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.15.81-SNAPSHOT</version>
<version>4.17.10-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.12.0</version>
<version>4.13.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.datastax.oss</groupId>-->
<!-- <artifactId>java-driver-core-shaded</artifactId>-->
<!-- <version>4.12.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.12.0</version>
<version>4.13.0</version>
</dependency>
<!-- &lt;!&ndash; For CQL compression option &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.lz4</groupId>-->
<!-- <artifactId>lz4-java</artifactId>-->
<!-- </dependency>-->
<!-- &lt;!&ndash; For CQL compression option &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.xerial.snappy</groupId>-->
<!-- <artifactId>snappy-java</artifactId>-->
<!-- </dependency>-->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.7.3</version>
</dependency>
</dependencies>

View File

@ -0,0 +1,23 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@Service(value = DriverAdapter.class, selector = "cql")
public class CqlDriverAdapterStub extends Cqld4DriverAdapter {
private final static Logger logger = LogManager.getLogger(CqlDriverAdapterStub.class);
public CqlDriverAdapterStub() {
super();
}
@Override
public OpMapper<Op> getOpMapper() {
logger.warn("This version of NoSQLBench uses the DataStax Java Driver version 4 for all CQL workloads. In this preview version, advanced testing features present in the previous cql and cqld3 drivers are being back-ported. If you need those features before the porting is complete, please use only the release artifacts from the 4.15.x branch. To suppress this message, use driver=cqld4. This warning will be removed in a future version when all features are completely back-ported.");
return super.getOpMapper();
}
}

View File

@ -0,0 +1,23 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@Service(value = DriverAdapter.class, selector = "cqld3")
public class Cqld3DriverAdapterStub extends Cqld4DriverAdapter {
private final static Logger logger = LogManager.getLogger(Cqld3DriverAdapterStub.class);
public Cqld3DriverAdapterStub() {
super();
}
@Override
public OpMapper<Op> getOpMapper() {
logger.warn("This version of NoSQLBench uses the DataStax Java Driver version 4 for all CQL workloads. In this preview version, advanced testing features present in the previous cql and cqld3 drivers are being back-ported. If you need those features before the porting is complete, please use only the release artifacts from the 4.15.x branch. To suppress this message, use driver=cqld4. This warning will be removed in a future version when all features are completely back-ported.");
return super.getOpMapper();
}
}

View File

@ -3,12 +3,13 @@ package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
public class Cqld4ReboundStatement extends Cqld4Op {
public class Cqld4CqlReboundStatement extends Cqld4CqlOp {
private final BoundStatement stmt;
public Cqld4ReboundStatement(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, BoundStatement rebound, RSProcessors processors) {
super(session,maxpages,retryreplace,metrics,processors);
public Cqld4CqlReboundStatement(CqlSession session, int maxpages, boolean retryreplace, BoundStatement rebound, RSProcessors processors) {
super(session,maxpages,retryreplace,processors);
this.stmt = rebound;
}

View File

@ -1,34 +1,88 @@
package io.nosqlbench.adapter.cqld4;
import io.nosqlbench.adapter.cqld4.opmappers.Cqld4CoreOpMapper;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "cqld4")
public class Cqld4DriverAdapter extends BaseDriverAdapter<Cqld4Op, Cqld4Space> {
public class Cqld4DriverAdapter extends BaseDriverAdapter<Op, Cqld4Space> {
private final static Logger logger = LogManager.getLogger(Cqld4DriverAdapter.class);
@Override
public OpMapper<Cqld4Op> getOpMapper() {
public OpMapper<Op> getOpMapper() {
DriverSpaceCache<? extends Cqld4Space> spaceCache = getSpaceCache();
NBConfiguration config = getConfiguration();
return new Cqld4OpMapper(config, spaceCache);
return new Cqld4CoreOpMapper(config, spaceCache);
}
@Override
public Function<String, ? extends Cqld4Space> getSpaceInitializer(NBConfiguration cfg) {
return s -> new Cqld4Space(cfg);
return s -> new Cqld4Space(s,cfg);
}
@Override
public NBConfigModel getConfigModel() {
return Cqld4Space.getConfigModel();
return super.getConfigModel().add(Cqld4Space.getConfigModel());
}
@Override
public List<Function<Map<String, Object>, Map<String, Object>>> getOpFieldRemappers() {
List<Function<Map<String, Object>, Map<String, Object>>> remappers = new ArrayList<>();
remappers.addAll(super.getOpFieldRemappers());
// Simplify to the modern form and provide a helpful warning to the user
// This auto updates to 'simple: <stmt>' or 'prepared: <stmt>' for cql types
remappers.add(m -> {
Map<String,Object> map = new LinkedHashMap<>(m);
if (map.containsKey("stmt")) {
String type = map.containsKey("type") ? map.get("type").toString() : "cql";
if (type.equals("cql")){
boolean prepared = (!map.containsKey("prepared")) || map.get("prepared").equals(true);
map.put(prepared?"prepared":"simple",map.get("stmt"));
map.remove("stmt");
map.remove("type");
}
}
if (map.containsKey("type")) {
String type = map.get("type").toString();
if (type.equals("gremlin")&&map.containsKey("script")) {
map.put("gremlin",map.get("script").toString());
map.remove("script");
map.remove("type");
}
if (type.equals("gremlin")&&map.containsKey("stmt")) {
map.put("gremlin",map.get("stmt"));
map.remove("type");
map.remove("stmt");
}
if (type.equals("fluent")&&map.containsKey("fluent")) {
map.remove("type");
}
if (type.equals("fluent")&&map.containsKey("stmt")) {
map.put("fluent",map.get("stmt"));
map.remove("stmt");
}
}
return map;
});
return remappers;
}
}

View File

@ -1,81 +0,0 @@
package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.opdispensers.CqlD4PreparedBatchOpDispenser;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4BatchStatementDispenser;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedStmtDispenser;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4SimpleCqlStmtDispenser;
import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.api.config.params.ParamsParser;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class Cqld4OpMapper implements OpMapper<Cqld4Op> {
private final DriverSpaceCache<? extends Cqld4Space> cache;
private final NBConfiguration cfg;
public Cqld4OpMapper(NBConfiguration config, DriverSpaceCache<? extends Cqld4Space> cache) {
this.cfg = config;
this.cache = cache;
}
public OpDispenser<Cqld4Op> apply(ParsedOp cmd) {
ParsedTemplate stmtTpl = cmd.getStmtAsTemplate().orElseThrow(() -> new BasicError(
"No statement was found in the op template:" + cmd
));
RSProcessors processors = new RSProcessors();
if (stmtTpl.getCaptures().size()>0) {
processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures()));
}
// cmd.getOptionalStaticConfig("processor",String.class)
// .map(s -> ParamsParser.parseToMap(s,"type"))
// .map(Cqld4Processors::resolve)
// .ifPresent(processors::add);
//
Optional<List> processorList = cmd.getOptionalStaticConfig("processors", List.class);
processorList.ifPresent(l -> {
l.forEach(m -> {
Map<String, String> pconfig = ParamsParser.parseToMap(m, "type");
ResultSetProcessor processor = Cqld4Processors.resolve(pconfig);
processors.add(() -> processor);
});
});
//
// processorList.stream()
// .map(s -> ParamsParser.parseToMap(s,"type"))
// .map(Cqld4Processors::resolve)
// .forEach(processors::add);
Cqld4Space cqld4Space = cache.get(cmd.getStaticConfigOr("space", "default"));
boolean prepared = cmd.getStaticConfigOr("prepared", true);
boolean batch = cmd.getStaticConfigOr("boolean", false);
CqlSession session = cqld4Space.getSession();
if (prepared && batch) {
return new CqlD4PreparedBatchOpDispenser(session, cmd);
} else if (prepared) {
return new Cqld4PreparedStmtDispenser(session, cmd, processors);
} else if (batch) {
return new Cqld4BatchStatementDispenser(session, cmd);
} else {
return new Cqld4SimpleCqlStmtDispenser(session, cmd);
}
}
}

View File

@ -2,7 +2,15 @@ package io.nosqlbench.adapter.cqld4;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfig;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.internal.core.config.composite.CompositeDriverConfigLoader;
import com.datastax.oss.driver.internal.core.loadbalancing.helper.NodeFilterToDistanceEvaluatorAdapter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import io.nosqlbench.adapter.cqld4.optionhelpers.OptionHelpers;
import io.nosqlbench.engine.api.util.SSLKsFactory;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.content.Content;
@ -18,23 +26,59 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class Cqld4Space {
private final static Logger logger = LogManager.getLogger(Cqld4Space.class);
private final String space;
CqlSession session;
public Cqld4Space(NBConfiguration cfg) {
public Cqld4Space(String space, NBConfiguration cfg) {
this.space = space;
session = createSession(cfg);
}
private static NBConfigModel getDriverOptionsModel() {
ConfigModel driverOpts = ConfigModel.of(DriverConfig.class);
Iterable<TypedDriverOption<?>> builtins = TypedDriverOption.builtInValues();
for (TypedDriverOption<?> builtin : builtins) {
String path = builtin.getRawOption().getPath();
Class<?> rawType = builtin.getExpectedType().getRawType();
driverOpts.add(Param.optional("driver." + path, rawType));
}
return driverOpts.asReadOnly();
}
private CqlSession createSession(NBConfiguration cfg) {
CqlSessionBuilder builder = new CqlSessionBuilder();
resolveConfigLoader(cfg).ifPresent(builder::withConfigLoader);
// stop insights for testing
OptionsMap defaults = new OptionsMap();
defaults.put(TypedDriverOption.MONITOR_REPORTING_ENABLED, false); // We don't need to do this every time we run a test or sanity check
DriverConfigLoader dcl = DriverConfigLoader.fromMap(defaults);
// add streamlined cql parameters
OptionHelpers helpers = new OptionHelpers(defaults);
NBConfiguration cqlHelperCfg = helpers.getConfigModel().extractConfig(cfg);
helpers.applyConfig(cqlHelperCfg);
// add user-provided parameters
NBConfiguration driverCfg = getDriverOptionsModel().extractConfig(cfg);
if (!driverCfg.isEmpty()) {
Map<String, Object> remapped = new LinkedHashMap<>();
driverCfg.getMap().forEach((k, v) -> remapped.put(k.substring("driver.".length()), v));
Gson gson = new GsonBuilder().setPrettyPrinting().create();
String remappedViaSerdesToSatisfyObtuseConfigAPI = gson.toJson(remapped);
DriverConfigLoader userProvidedOptions = DriverConfigLoader.fromString(remappedViaSerdesToSatisfyObtuseConfigAPI);
dcl = new CompositeDriverConfigLoader(dcl, userProvidedOptions);
}
// add referenced config from 'cfg' activity parameter
DriverConfigLoader cfgDefaults = resolveConfigLoader(cfg).orElse(DriverConfigLoader.fromMap(OptionsMap.driverDefaults()));
dcl = new CompositeDriverConfigLoader(dcl, cfgDefaults);
builder.withConfigLoader(dcl);
int port = cfg.getOrDefault("port", 9042);
@ -91,8 +135,27 @@ public class Cqld4Space {
builder.withAuthCredentials(username, password);
}
cfg.getOptional("whitelist").ifPresent(wl -> {
List<InetSocketAddress> addrs = Arrays
.stream(wl.split(","))
.map(this::toSocketAddr)
.toList();
builder.withNodeDistanceEvaluator(new NodeFilterToDistanceEvaluatorAdapter(n -> {
return (n.getBroadcastAddress().isPresent() && addrs.contains(n.getBroadcastAddress().get()))
||(n.getBroadcastRpcAddress().isPresent() && addrs.contains(n.getBroadcastRpcAddress().get()))
||(n.getListenAddress().isPresent() && addrs.contains(n.getListenAddress().get()));
}));
});
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extract(cfg);
cfg.getOptional("cloud_proxy_address").ifPresent(cpa -> {
String[] addr = cpa.split(":",2);
if (addr.length==1) {
throw new RuntimeException("cloud_proxy_address must be specified in host:port form.");
}
builder.withCloudProxyAddress(InetSocketAddress.createUnresolved(addr[0],Integer.parseInt(addr[1])));
});
NBConfiguration sslCfg = SSLKsFactory.get().getConfigModel().extractConfig(cfg);
SSLContext ctx = SSLKsFactory.get().getContext(sslCfg);
if (ctx != null) {
@ -103,42 +166,14 @@ public class Cqld4Space {
return session;
}
/**
* Split off any clearly separate config loader specifications from the beginning or end,
* so they can be composed as an ordered set of config loaders.
*
* @param driverconfig The string containing driver config specs as described in the cqld4.md
* documentation.
* @return A list of zero or more strings, each representing a config source
*/
// for testing
public static List<String> splitConfigLoaders(String driverconfig) {
List<String> configs = new ArrayList<>();
Pattern preconfig = Pattern.compile("(?<pre>((\\w+://.+?)|[a-zA-z0-9_:'/\\\\]+?))\\s*,\\s*(?<rest>.+)");
Matcher matcher = preconfig.matcher(driverconfig);
while (matcher.matches()) {
configs.add(matcher.group("pre"));
driverconfig = matcher.group("rest");
matcher = preconfig.matcher(driverconfig);
}
Pattern postconfig = Pattern.compile("(?<head>.+?)\\s*,\\s*(?<post>(\\w+://.+?)|([a-zA-Z0-9_:'/\\\\]+?))");
matcher = postconfig.matcher(driverconfig);
LinkedList<String> tail = new LinkedList<>();
while (matcher.matches()) {
tail.push(matcher.group("post"));
driverconfig = matcher.group("head");
matcher = postconfig.matcher(driverconfig);
}
if (!driverconfig.isEmpty()) {
configs.add(driverconfig);
}
while (tail.size() > 0) {
configs.add(tail.pop());
}
return configs;
private InetSocketAddress toSocketAddr(String addr) {
String[] addrs = addr.split(":", 2);
String inetHost = addrs[0];
String inetPort = (addrs.length == 2) ? addrs[1] : "9042";
return new InetSocketAddress(inetHost, Integer.valueOf(inetPort));
}
private Optional<DriverConfigLoader> resolveConfigLoader(NBConfiguration cfg) {
Optional<String> maybeDriverConfig = cfg.getOptional("driverconfig");
@ -148,7 +183,7 @@ public class Cqld4Space {
String driverconfig = maybeDriverConfig.get();
List<String> loaderspecs = splitConfigLoaders(driverconfig);
List<String> loaderspecs = NBConfigSplitter.splitConfigLoaders(driverconfig);
LinkedList<DriverConfigLoader> loaders = new LinkedList<>();
for (String loaderspec : loaderspecs) {
@ -204,15 +239,19 @@ public class Cqld4Space {
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(Cqld4DriverAdapter.class)
return ConfigModel.of(Cqld4Space.class)
.add(Param.optional("localdc"))
.add(Param.optional("secureconnectbundle"))
.add(Param.optional("hosts"))
.add(Param.optional("driverconfig"))
.add(Param.optional("username"))
.add(Param.optional("password"))
.add(Param.optional("passfile"))
.add(Param.optional("driverconfig",String.class))
.add(Param.optional("username",String.class,"user name (see also password and passfile)"))
.add(Param.optional("password", String.class, "password (see also passfile)"))
.add(Param.optional("passfile",String.class,"file to load the password from"))
.add(Param.optional("whitelist",String.class,"list of whitelist hosts addresses"))
.add(Param.optional("cloud_proxy_address",String.class,"Cloud Proxy Address"))
.add(SSLKsFactory.get().getConfigModel())
.add(getDriverOptionsModel())
.add(new OptionHelpers(new OptionsMap()).getConfigModel())
.asReadOnly();
}

View File

@ -1,9 +1,11 @@
package io.nosqlbench.adapter.cqld4;
package io.nosqlbench.adapter.cqld4.exceptions;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
public class UndefinedResultSetException extends RuntimeException {
private final Cqld4Op cqld4op;
private final Cqld4CqlOp cqld4op;
public UndefinedResultSetException(Cqld4Op cqld4Op) {
public UndefinedResultSetException(Cqld4CqlOp cqld4Op) {
this.cqld4op = cqld4Op;
}

View File

@ -0,0 +1,59 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public abstract class BaseCqlStmtDispenser extends BaseOpDispenser<Cqld4CqlOp> {
private final int maxpages;
private final Cqld4OpMetrics metrics = new Cqld4OpMetrics();
private final LongFunction<CqlSession> sessionFunc;
private final boolean isRetryReplace;
public BaseCqlStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp op) {
super(op);
this.sessionFunc = sessionFunc;
this.maxpages = op.getStaticConfigOr("maxpages",1);
this.isRetryReplace = op.getStaticConfigOr("retryreplace",false);
}
public int getMaxPages() {
return maxpages;
}
public boolean isRetryReplace() {
return isRetryReplace;
}
public LongFunction<CqlSession> getSessionFunc() {
return sessionFunc;
}
/**
* All implementations of a CQL Statement Dispenser should be using the method
* provided by this function. This ensures that {@link Statement}-level attributes
* are handled uniformly and in one place.
*
* This takes the base statement function and decorates it optionally with each
* additional qualified modifier, short-circuiting those which are not specified.
* This allows default behavior to take precedence as well as avoids unnecessary calling
* overhead for implicit attributes. This should be called when the stmt function is
* initialized within each dispenser, not for each time dispensing occurs.
*/
protected LongFunction<Statement> getEnhancedStmtFunc(LongFunction<Statement> basefunc, ParsedOp op) {
LongFunction<Statement> partial = basefunc;
partial = op.enhanceEnum(partial, "cl", DefaultConsistencyLevel.class, Statement::setConsistencyLevel);
partial = op.enhanceEnum(partial, "scl", DefaultConsistencyLevel.class, Statement::setSerialConsistencyLevel);
partial = op.enhance(partial, "idempotent", Boolean.class, Statement::setIdempotent);
return partial;
}
}

View File

@ -1,23 +0,0 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.api.errors.BasicError;
public class CqlD4PreparedBatchOpDispenser implements OpDispenser<Cqld4Op> {
private final CqlSession session;
private final ParsedOp cmd;
public CqlD4PreparedBatchOpDispenser(CqlSession session, ParsedOp cmd) {
this.session = session;
this.cmd = cmd;
}
@Override
public Cqld4Op apply(long value) {
throw new BasicError("this is not implemented yet.");
}
}

View File

@ -1,22 +0,0 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
public class Cqld4BatchStatementDispenser implements OpDispenser<Cqld4Op> {
private final CqlSession session;
private final ParsedOp cmd;
public Cqld4BatchStatementDispenser(CqlSession session, ParsedOp cmd) {
this.session = session;
this.cmd = cmd;
}
@Override
public Cqld4Op apply(long value) {
return null;
}
}

View File

@ -0,0 +1,53 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
import com.datastax.dse.driver.api.core.graph.FluentGraphStatementBuilder;
import com.datastax.oss.driver.api.core.CqlSession;
import groovy.lang.Script;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4FluentGraphOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.virtdata.core.bindings.Bindings;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class Cqld4FluentGraphOpDispenser extends BaseOpDispenser<Op> {
private final LongFunction<? extends String> graphnameFunc;
private final LongFunction<CqlSession> sessionFunc;
private final Bindings virtdataBindings;
private final ThreadLocal<Script> tlScript;
public Cqld4FluentGraphOpDispenser(
ParsedOp optpl,
LongFunction<? extends String> graphnameFunc,
LongFunction<CqlSession> sessionFunc,
Bindings virtdataBindings,
Supplier<Script> scriptSource
) {
super(optpl);
this.graphnameFunc = graphnameFunc;
this.sessionFunc = sessionFunc;
this.virtdataBindings = virtdataBindings;
this.tlScript = ThreadLocal.withInitial(scriptSource);
}
@Override
public Op apply(long value) {
String graphname = graphnameFunc.apply(value);
Script script = tlScript.get();
Map<String, Object> allMap = virtdataBindings.getAllMap(value);
allMap.forEach((k,v) -> script.getBinding().setVariable(k,v));
GraphTraversal<Vertex,Vertex> v = (GraphTraversal<Vertex, Vertex>) script.run();
FluentGraphStatement fgs = new FluentGraphStatementBuilder(v).setGraphName(graphname).build();
return new Cqld4FluentGraphOp(sessionFunc.apply(value),fgs);
}
}

View File

@ -0,0 +1,48 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatementBuilder;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4ScriptGraphOp;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.Optional;
import java.util.function.LongFunction;
public class Cqld4GremlinOpDispenser extends BaseOpDispenser<Cqld4ScriptGraphOp> {
private final LongFunction<? extends ScriptGraphStatement> stmtFunc;
private final LongFunction<CqlSession> sessionFunc;
private final LongFunction<Long> diagFunc;
public Cqld4GremlinOpDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
super(cmd);
this.sessionFunc = sessionFunc;
this.diagFunc = cmd.getAsFunctionOr("diag", 0L);
LongFunction<ScriptGraphStatementBuilder> func = l -> new ScriptGraphStatementBuilder();
// graphname
Optional<LongFunction<String>> graphnameFunc = cmd.getAsOptionalFunction("graphname");
if (graphnameFunc.isPresent()) {
LongFunction<ScriptGraphStatementBuilder> finalFunc = func;
LongFunction<? extends String> stringLongFunction = graphnameFunc.get();
func = l -> finalFunc.apply(l).setGraphName(stringLongFunction.apply(l));
}
LongFunction<ScriptGraphStatementBuilder> finalFunc = func;
this.stmtFunc = l -> finalFunc.apply(l).setScript(targetFunction.apply(l)).build();
}
@Override
public Cqld4ScriptGraphOp apply(long value) {
ScriptGraphStatement stmt = stmtFunc.apply(value);
if (diagFunc.apply(value)>0L) {
System.out.println("## GREMLIN DIAG: ScriptGraphStatement on graphname(" + stmt.getGraphName() + "):\n" + stmt.getScript());
}
return new Cqld4ScriptGraphOp(sessionFunc.apply(value), stmt);
}
}

View File

@ -3,46 +3,59 @@ package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlPreparedStatement;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import java.util.function.LongFunction;
public class Cqld4PreparedStmtDispenser implements OpDispenser<Cqld4Op> {
public class Cqld4PreparedStmtDispenser extends BaseCqlStmtDispenser {
private final CqlSession session;
private final LongFunction<Object[]> varbinder;
private final PreparedStatement preparedStmt;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
private final RSProcessors processors;
private final LongFunction<Statement> stmtFunc;
private final ParsedTemplate stmtTpl;
private PreparedStatement preparedStmt;
private CqlSession boundSession;
public Cqld4PreparedStmtDispenser(CqlSession session, ParsedOp cmd, RSProcessors processors) {
this.session = session;
public Cqld4PreparedStmtDispenser(LongFunction<CqlSession> sessionFunc, ParsedOp cmd, ParsedTemplate stmtTpl, RSProcessors processors) {
super(sessionFunc, cmd);
if (cmd.isDynamic("space")) {
throw new RuntimeException("Prepared statements and dynamic space values are not supported." +
" This would churn the prepared statement cache, defeating the purpose of prepared statements.");
}
this.processors = processors;
this.stmtTpl = stmtTpl;
stmtFunc = createStmtFunc(cmd);
}
ParsedTemplate parsed = cmd.getStmtAsTemplate().orElseThrow();
varbinder = cmd.newArrayBinderFromBindPoints(parsed.getBindPoints());
protected LongFunction<Statement> createStmtFunc(ParsedOp cmd) {
String preparedQueryString = parsed.getPositionalStatement(s -> "?");
preparedStmt = session.prepare(preparedQueryString);
LongFunction<Object[]> varbinder;
varbinder = cmd.newArrayBinderFromBindPoints(stmtTpl.getBindPoints());
String preparedQueryString = stmtTpl.getPositionalStatement(s -> "?");
boundSession = getSessionFunc().apply(0);
preparedStmt = boundSession.prepare(preparedQueryString);
this.maxpages = cmd.getStaticConfigOr("maxpages",1);
this.retryreplace = cmd.getStaticConfigOr("retryreplace", false);
this.metrics = new Cqld4OpMetrics();
LongFunction<Statement> boundStmtFunc = c -> {
Object[] apply = varbinder.apply(c);
return preparedStmt.bind(apply);
};
return super.getEnhancedStmtFunc(boundStmtFunc, cmd);
}
@Override
public Cqld4Op apply(long value) {
Object[] parameters = varbinder.apply(value);
BoundStatement stmt = preparedStmt.bind(parameters);
return new Cqld4PreparedStatement(session, stmt, maxpages, retryreplace, metrics, processors);
public Cqld4CqlOp apply(long value) {
return new Cqld4CqlPreparedStatement(
boundSession,
(BoundStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace(),
processors
);
}
}

View File

@ -0,0 +1,39 @@
package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class Cqld4RawStmtDispenser extends BaseCqlStmtDispenser {
private final LongFunction<Statement> stmtFunc;
private final LongFunction<String> targetFunction;
public Cqld4RawStmtDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
super(sessionFunc, cmd);
this.targetFunction=targetFunction;
this.stmtFunc = createStmtFunc(cmd);
}
protected LongFunction<Statement> createStmtFunc(ParsedOp cmd) {
LongFunction<Statement> basefunc = l -> new SimpleStatementBuilder(targetFunction.apply(l)).build();
return super.getEnhancedStmtFunc(basefunc,cmd);
}
@Override
public Cqld4CqlOp apply(long value) {
return new Cqld4CqlSimpleStatement(
getSessionFunc().apply(value),
(SimpleStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace()
);
}
}

View File

@ -2,32 +2,35 @@ package io.nosqlbench.adapter.cqld4.opdispensers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4SimpleCqlStatement;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlSimpleStatement;
import io.nosqlbench.engine.api.templating.ParsedOp;
public class Cqld4SimpleCqlStmtDispenser implements OpDispenser<Cqld4Op> {
import java.util.function.LongFunction;
private final CqlSession session;
private final ParsedOp cmd;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
public class Cqld4SimpleCqlStmtDispenser extends BaseCqlStmtDispenser {
public Cqld4SimpleCqlStmtDispenser(CqlSession session, ParsedOp cmd) {
this.session = session;
this.cmd = cmd;
this.maxpages = cmd.getStaticConfigOr("maxpages",1);
this.retryreplace = cmd.getStaticConfigOr("retryreplace",false);
this.metrics = new Cqld4OpMetrics();
private final LongFunction<Statement> stmtFunc;
private final LongFunction<String> targetFunction;
public Cqld4SimpleCqlStmtDispenser(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction, ParsedOp cmd) {
super(sessionFunc,cmd);
this.targetFunction=targetFunction;
this.stmtFunc =createStmtFunc(cmd);
}
protected LongFunction<Statement> createStmtFunc(ParsedOp op) {
return super.getEnhancedStmtFunc(l -> SimpleStatement.newInstance(targetFunction.apply(l)),op);
}
@Override
public Cqld4SimpleCqlStatement apply(long value) {
String stmtBody = cmd.get("stmt",value);
SimpleStatement simpleStatement = SimpleStatement.newInstance(stmtBody);
return new Cqld4SimpleCqlStatement(session,simpleStatement,maxpages,retryreplace,metrics);
public Cqld4CqlSimpleStatement apply(long value) {
return new Cqld4CqlSimpleStatement(
getSessionFunc().apply(value),
(SimpleStatement) stmtFunc.apply(value),
getMaxPages(),
isRetryReplace()
);
}
}

View File

@ -0,0 +1,25 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4SimpleCqlStmtDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class CqlD4CqlSimpleStmtMapper implements OpMapper<Cqld4CqlOp> {
private final LongFunction<CqlSession> sessionFunc;
private final LongFunction<String> targetFunction;
public CqlD4CqlSimpleStmtMapper(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction) {
this.sessionFunc = sessionFunc;
this.targetFunction = targetFunction;
}
@Override
public OpDispenser<? extends Cqld4CqlOp> apply(ParsedOp cmd) {
return new Cqld4SimpleCqlStmtDispenser(sessionFunc,targetFunction,cmd);
}
}

View File

@ -0,0 +1,51 @@
package io.nosqlbench.adapter.cqld4.opmappers;
public enum CqlD4OpType {
/**
* uses {@link com.datastax.oss.driver.api.core.cql.SimpleStatement}
* <em>does not</em> parameterize values via the SimpleStatement API.
* Pre-renderes the statement string with values included. This is not
* efficient nor recommended for production use, although it is useful
* for certain testing scenarios in which you need to create a lot
* of DDL or other statements which require non-parameterizable fields
* to be present in binding values.
*/
raw,
/**
* uses {@link com.datastax.oss.driver.api.core.cql.SimpleStatement}
* This parameterizes values and applies them as positional fields,
* where the binding points are aligned by binding name.
*/
simple,
/**
* uses {@link com.datastax.oss.driver.api.core.cql.SimpleStatement}
* This type does everything that the {@link #simple} mode does, and
* additionally uses prepared statements.
*/
prepared,
/**
* uses {@link com.datastax.dse.driver.api.core.graph.ScriptGraphStatement}
* This is the "raw" mode of using gremlin. It is not as efficient, and thus
* is only recommended for testing or legacy apps.
*/
gremlin,
/**
* uses {@link com.datastax.dse.driver.api.core.graph.FluentGraphStatement}
* This mode is the recommended mode for gremlin execution. It uses the fluent
* API on the client side. The fluent syntax is compiled and cached as bytecode
* within a per-thread execution environment (for each op template). For each
* cycle, the bindings are rendered, injected into that execution environment,
* and then the bytecode is executed to render the current operation, which is
* then sent to the server. Although this is arguably more involved, the result
* is quite efficient and provides the closes idiomatic experience <em>AND</em>
* the best performance.
*
* <p>This is the mode that is recommended for all graph usage.</p>
*/
fluent
}

View File

@ -0,0 +1,57 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Processors;
import io.nosqlbench.adapter.cqld4.RSProcessors;
import io.nosqlbench.adapter.cqld4.ResultSetProcessor;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4PreparedStmtDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.adapter.cqld4.processors.CqlFieldCaptureProcessor;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.nb.api.config.params.ParamsParser;
import io.nosqlbench.nb.api.errors.BasicError;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongFunction;
public class CqlD4PreparedStmtMapper implements OpMapper<Cqld4CqlOp> {
private final LongFunction<CqlSession> sessionFunc;
private final TypeAndTarget<CqlD4OpType, String> target;
public CqlD4PreparedStmtMapper(LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType,String> target) {
this.sessionFunc=sessionFunc;
this.target = target;
}
public OpDispenser<Cqld4CqlOp> apply(ParsedOp cmd) {
ParsedTemplate stmtTpl = cmd.getAsTemplate(target.field).orElseThrow(() -> new BasicError(
"No statement was found in the op template:" + cmd
));
RSProcessors processors = new RSProcessors();
if (stmtTpl.getCaptures().size()>0) {
processors.add(() -> new CqlFieldCaptureProcessor(stmtTpl.getCaptures()));
}
Optional<List> processorList = cmd.getOptionalStaticConfig("processors", List.class);
processorList.ifPresent(l -> {
l.forEach(m -> {
Map<String, String> pconfig = ParamsParser.parseToMap(m, "type");
ResultSetProcessor processor = Cqld4Processors.resolve(pconfig);
processors.add(() -> processor);
});
});
return new Cqld4PreparedStmtDispenser(sessionFunc, cmd, stmtTpl, processors);
}
}

View File

@ -0,0 +1,26 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4RawStmtDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4CqlOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class CqlD4RawStmtMapper implements OpMapper<Cqld4CqlOp> {
private final LongFunction<CqlSession> sessionFunc;
private final LongFunction<String> targetFunction;
public CqlD4RawStmtMapper(LongFunction<CqlSession> sessionFunc, LongFunction<String> targetFunction) {
this.sessionFunc = sessionFunc;
this.targetFunction = targetFunction;
}
@Override
public OpDispenser<? extends Cqld4CqlOp> apply(ParsedOp cmd) {
return new Cqld4RawStmtDispenser(sessionFunc, targetFunction, cmd);
}
}

View File

@ -0,0 +1,57 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.Cqld4Space;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import java.util.function.LongFunction;
public class Cqld4CoreOpMapper implements OpMapper<Op> {
private final DriverSpaceCache<? extends Cqld4Space> cache;
private final NBConfiguration cfg;
public Cqld4CoreOpMapper(NBConfiguration config, DriverSpaceCache<? extends Cqld4Space> cache) {
this.cfg = config;
this.cache = cache;
}
/**
* Determine what type of op dispenser to use for a given parsed op template, and return a new instance
* for it. Since the operations under the CQL driver 4.* do not follow a common type structure, we use the
* base types in the NoSQLBench APIs and treat them somewhat more generically than with other drivers.
*
* @param cmd The {@link ParsedOp} which is the parsed version of the user-provided op template.
* This contains all the fields provided by the user, as well as explicit knowledge of
* which ones are static and dynamic.
* @return An op dispenser for each provided op command
*/
public OpDispenser<? extends Op> apply(ParsedOp cmd) {
LongFunction<String> spaceName = cmd.getAsFunctionOr("space", "default");
// Since the only needed thing in the Cqld4Space is the session, we can short-circuit
// to it here instead of stepping down from the cycle to the space to the session
LongFunction<CqlSession> sessionFunc = l -> cache.get(spaceName.apply(l)).getSession();
CqlD4OpType opType = CqlD4OpType.prepared;
TypeAndTarget<CqlD4OpType, String> target = cmd.getTargetEnum(CqlD4OpType.class, String.class, "type", "stmt");
return switch (target.enumId) {
case raw -> new CqlD4RawStmtMapper(sessionFunc, target.targetFunction).apply(cmd);
case simple -> new CqlD4CqlSimpleStmtMapper(sessionFunc, target.targetFunction).apply(cmd);
case prepared -> new CqlD4PreparedStmtMapper(sessionFunc, target).apply(cmd);
case gremlin -> new Cqld4GremlinOpMapper(sessionFunc, target.targetFunction).apply(cmd);
case fluent -> new Cqld4FluentGraphOpMapper(sessionFunc, target).apply(cmd);
};
}
}

View File

@ -0,0 +1,91 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.dse.driver.api.core.graph.DseGraph;
import com.datastax.oss.driver.api.core.CqlSession;
import groovy.lang.Binding;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4FluentGraphOpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.virtdata.core.bindings.Bindings;
import io.nosqlbench.virtdata.core.bindings.BindingsTemplate;
import io.nosqlbench.virtdata.core.templates.ParsedTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.control.customizers.ImportCustomizer;
import java.util.*;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class Cqld4FluentGraphOpMapper implements OpMapper<Op> {
private final static Logger logger = LogManager.getLogger(Cqld4FluentGraphOpMapper.class);
private final LongFunction<CqlSession> sessionFunc;
private final TypeAndTarget<CqlD4OpType, String> target;
private GraphTraversalSource gtsPlaceHolder;
public Cqld4FluentGraphOpMapper(LongFunction<CqlSession> sessionFunc, TypeAndTarget<CqlD4OpType, String> target) {
this.sessionFunc = sessionFunc;
this.target = target;
}
@Override
public OpDispenser<? extends Op> apply(ParsedOp cmd) {
GraphTraversalSource g = DseGraph.g;
ParsedTemplate fluent = cmd.getAsTemplate(target.field).orElseThrow();
String scriptBodyWithRawVarRefs = fluent.getPositionalStatement();
CompilerConfiguration compilerConfiguration = new CompilerConfiguration();
if (cmd.isDynamic("imports")) {
throw new OpConfigError("You may only define imports as a static list. Dynamic values are not allowed.");
}
List imports = cmd.getOptionalStaticValue("imports", List.class)
.orElse(List.of("org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__"));
String[] verifiedClasses = expandClassNames(imports);
ImportCustomizer importer = new ImportCustomizer();
importer.addImports(verifiedClasses);
compilerConfiguration.addCompilationCustomizers(importer);
Supplier<Script> supplier = () -> {
groovy.lang.Binding groovyBindings = new Binding(new LinkedHashMap<String, Object>(Map.of("g", g)));
GroovyShell gshell = new GroovyShell(groovyBindings, compilerConfiguration);
return gshell.parse(scriptBodyWithRawVarRefs);
};
LongFunction<? extends String> graphnameFunc = cmd.getAsRequiredFunction("graphname");
Bindings virtdataBindings = new BindingsTemplate(fluent.getBindPoints()).resolveBindings();
return new Cqld4FluentGraphOpDispenser(cmd, graphnameFunc, sessionFunc, virtdataBindings, supplier);
}
private String[] expandClassNames(List l) {
ClassLoader loader = Cqld4FluentGraphOpMapper.class.getClassLoader();
List<String> classNames = new ArrayList<>();
for (Object name : l) {
String candidateName = name.toString();
if (candidateName.endsWith(".*")) {
throw new RuntimeException("You can not use wildcard package imports like '" + candidateName + "'");
}
try {
loader.loadClass(candidateName);
classNames.add(candidateName);
logger.debug("added import " + candidateName);
} catch (Exception e) {
throw new RuntimeException("Class '" + candidateName + "' was not found for fluent imports.");
}
}
return classNames.toArray(new String[0]);
}
}

View File

@ -0,0 +1,24 @@
package io.nosqlbench.adapter.cqld4.opmappers;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.adapter.cqld4.opdispensers.Cqld4GremlinOpDispenser;
import io.nosqlbench.adapter.cqld4.optypes.Cqld4ScriptGraphOp;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.templating.ParsedOp;
import java.util.function.LongFunction;
public class Cqld4GremlinOpMapper implements OpMapper<Cqld4ScriptGraphOp> {
private final LongFunction<CqlSession> sessionFunc;
private final LongFunction<String> targetFunction;
public Cqld4GremlinOpMapper(LongFunction<CqlSession> session, LongFunction<String> targetFunction) {
this.sessionFunc = session;
this.targetFunction = targetFunction;
}
public OpDispenser<Cqld4ScriptGraphOp> apply(ParsedOp cmd) {
return new Cqld4GremlinOpDispenser(sessionFunc, targetFunction, cmd);
}
}

View File

@ -0,0 +1,271 @@
package io.nosqlbench.adapter.cqld4.optionhelpers;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.datastax.oss.driver.internal.core.connection.ExponentialReconnectionPolicy;
import com.datastax.oss.driver.internal.core.specex.ConstantSpeculativeExecutionPolicy;
import io.nosqlbench.nb.api.config.standard.*;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Provide a concise way to express configuration option helpers which simplify
* usage of the long-form options with the latest driver. Essentially, make
* lambda-style definition of optional parameter handling _readable_, and
* provide a working blueprint for how to handle config helpers.
*/
public class OptionHelpers implements NBConfigurable {
private final static Logger logger = LogManager.getLogger(OptionHelpers.class);
private final Map<String, Modifier> modifiers = new LinkedHashMap<>();
private final Map<String, String> descriptions = new LinkedHashMap<>();
private final OptionsMap options;
public OptionHelpers(OptionsMap options) {
this.options = options;
addModifiers();
}
private void addModifiers() {
add("insights", "Insights Reporting", (m, v) -> {
m.put(TypedDriverOption.MONITOR_REPORTING_ENABLED, Boolean.parseBoolean(v));
});
Pattern CONSTANT_EAGER_PATTERN = Pattern.compile("^((?<msThreshold>\\d++)ms)(:(?<executions>\\d+))?$");
Pattern PERCENTILE_EAGER_PATTERN = Pattern.compile("^p(?<pctile>[^:]+)(:(?<executions>\\d+))?(:(?<tracked>\\d+)ms)?$");
add("speculative", "Speculative Execution", (m, v) -> {
if (PERCENTILE_EAGER_PATTERN.matcher(v).matches()) {
throw new RuntimeException("Option 'speculative' with percentile thresholds (" + v + ") is not supported in driver 4." +
" If you want to provide a custom speculative execution policy, you can configure it directly via the Java driver options.");
}
Matcher constantMatcher = CONSTANT_EAGER_PATTERN.matcher(v);
if (constantMatcher.matches()) {
int threshold = Integer.valueOf(constantMatcher.group("msThreshold"));
String executionsSpec = constantMatcher.group("executions");
int executions = (executionsSpec != null && !executionsSpec.isEmpty()) ? Integer.parseInt(executionsSpec) : 5;
m.put(TypedDriverOption.SPECULATIVE_EXECUTION_MAX, threshold);
m.put(TypedDriverOption.SPECULATIVE_EXECUTION_DELAY, Duration.ofMillis(executions));
m.put(TypedDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, ConstantSpeculativeExecutionPolicy.class.getSimpleName());
}
});
add("protocol_version", "Protocol Version", (m, v) -> {
String version = v.toUpperCase(Locale.ROOT);
try {
DefaultProtocolVersion defaultProtocolVersion = DefaultProtocolVersion.valueOf(version);
version = defaultProtocolVersion.toString();
} catch (IllegalArgumentException iae) {
try {
Field field = ProtocolVersion.class.getField(version);
} catch (NoSuchFieldException e) {
Set<String> known = new HashSet<>();
for (DefaultProtocolVersion value : DefaultProtocolVersion.values()) {
known.add(value.toString());
}
for (Field field : ProtocolVersion.class.getFields()) {
known.add(field.getName());
}
throw new RuntimeException("There was no protocol name that matched '" + v + "'. The known values are " + known.stream().sorted().toList().toString());
}
}
m.put(TypedDriverOption.PROTOCOL_VERSION, version);
});
add("socket_options", "Socket Options", (m, v) -> {
String[] assignments = v.split("[,;]");
Map<String, String> values = new HashMap<>();
for (String assignment : assignments) {
String[] namevalue = assignment.split("[:=]", 2);
String name = namevalue[0];
String value = namevalue[1];
values.put(name, value);
}
Optional.ofNullable(values.remove("read_timeout_ms")).map(Integer::parseInt)
.ifPresent(i -> {
logger.warn("Since the driver parameters do not map directly from previous versions to driver 4," +
" the 'read_timeout_ms' option is being applied to all configurable timeout parameters. If you want" +
" to customize this, do not set read_timeout_ms directly, but instead set the individual timeouts" +
" as documented for CQL Java driver 4.*");
m.put(TypedDriverOption.GRAPH_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.REQUEST_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.HEARTBEAT_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.REPREPARE_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.CONTROL_CONNECTION_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.CONNECTION_CONNECT_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.NETTY_ADMIN_SHUTDOWN_TIMEOUT, i);
m.put(TypedDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.CONNECTION_SET_KEYSPACE_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, Duration.ofMillis(i));
m.put(TypedDriverOption.CONTINUOUS_PAGING_TIMEOUT_FIRST_PAGE, Duration.ofMillis(i));
m.put(TypedDriverOption.CONTINUOUS_PAGING_TIMEOUT_OTHER_PAGES, Duration.ofMillis(i));
});
Optional.ofNullable(values.remove("connect_timeout_ms")).map(Integer::parseInt)
.ifPresent(i -> m.put(TypedDriverOption.CONNECTION_CONNECT_TIMEOUT, Duration.ofMillis(i)));
Optional.ofNullable(values.remove("keep_alive")).map(Boolean::parseBoolean)
.ifPresent(ka -> m.put(TypedDriverOption.SOCKET_KEEP_ALIVE, ka));
Optional.ofNullable(values.remove("reuse_address")).map(Boolean::parseBoolean)
.ifPresent(ru -> m.put(TypedDriverOption.SOCKET_REUSE_ADDRESS, ru));
Optional.ofNullable(values.remove("so_linger")).map(Integer::parseInt)
.ifPresent(li -> m.put(TypedDriverOption.SOCKET_LINGER_INTERVAL, li));
Optional.ofNullable(values.remove("tcp_no_delay")).map(Boolean::parseBoolean)
.ifPresent(nd -> m.put(TypedDriverOption.SOCKET_TCP_NODELAY, nd));
Optional.ofNullable(values.remove("receive_buffer_size")).map(Integer::parseInt)
.ifPresent(bs -> m.put(TypedDriverOption.SOCKET_RECEIVE_BUFFER_SIZE, bs));
Optional.ofNullable(values.remove("send_buffer_size")).map(Integer::parseInt)
.ifPresent(bs -> m.put(TypedDriverOption.SOCKET_SEND_BUFFER_SIZE, bs));
for (String s : values.keySet()) {
throw new RuntimeException("socket_options field '" + s + "' was not recognized.");
}
});
add("reconnect_policy", "Reconnect Policy", (m, spec) -> {
if (spec.startsWith("exponential(")) {
String argsString = spec.substring(12);
String[] args = argsString.substring(0, argsString.length() - 1).split("[,;]");
if (args.length != 2) {
throw new BasicError("Invalid reconnect_policy, try reconnect_policy=exponential(<baseDelay>, <maxDelay>)");
}
long baseDelay = Long.parseLong(args[0]);
long maxDelay = Long.parseLong(args[1]);
m.put(TypedDriverOption.RECONNECTION_POLICY_CLASS, ExponentialReconnectionPolicy.class.getSimpleName());
m.put(TypedDriverOption.RECONNECTION_BASE_DELAY, Duration.ofMillis(baseDelay));
m.put(TypedDriverOption.RECONNECTION_MAX_DELAY, Duration.ofMillis(maxDelay));
}
});
add("pooling", "Pooling Options", (m, spec) -> {
Pattern CORE_AND_MAX_RQ_PATTERN = Pattern.compile(
"(?<core>\\d+)(:(?<max>\\d+)(:(?<rq>\\d+))?)?(,(?<rcore>\\d+)(:(?<rmax>\\d+)(:(?<rrq>\\d+))?)?)?(,?heartbeat_interval_s:(?<heartbeatinterval>\\d+))?(,?heartbeat_timeout_s:(?<heartbeattimeout>\\d+))?(,?idle_timeout_s:(?<idletimeout>\\d+))?(,?pool_timeout_ms:(?<pooltimeout>\\d+))?"
);
Matcher matcher = CORE_AND_MAX_RQ_PATTERN.matcher(spec);
if (matcher.matches()) {
Optional<Integer> coreLocal = Optional.ofNullable(matcher.group("core")).map(Integer::valueOf);
Optional<Integer> maxLocal = Optional.ofNullable(matcher.group("max")).map(Integer::valueOf);
Optional<Integer> localRq = Optional.ofNullable(matcher.group("rq")).map(Integer::valueOf);
if (coreLocal.isPresent() && maxLocal.isPresent() && !coreLocal.get().equals(maxLocal.get())) {
throw new RuntimeException("In CQL Java driver 4, core and max connections have been reduced to a single value." +
" You have two different values in (" + spec + "). If you make them the same, you can continue to use the 'pooling' helper option.");
}
coreLocal.ifPresent(i -> m.put(TypedDriverOption.CONNECTION_POOL_LOCAL_SIZE, i));
localRq.ifPresent(r -> m.put(TypedDriverOption.CONNECTION_MAX_REQUESTS, r));
Optional<Integer> coreRemote = Optional.ofNullable(matcher.group("rcore")).map(Integer::valueOf);
Optional<Integer> maxRemote = Optional.ofNullable(matcher.group("rmax")).map(Integer::valueOf);
Optional<Integer> rqRemote = Optional.ofNullable(matcher.group("rrq")).map(Integer::valueOf);
if (coreRemote.isPresent() && maxRemote.isPresent() && !coreRemote.get().equals(maxRemote.get())) {
throw new RuntimeException("In CQL Java driver 4, rcore and rmax connections have been reduced to a single value." +
" You have two different values in (" + spec + "). If you make them the same, you can continue to use the 'pooling' helper option." +
" Otherwise, set the driver options directly according to driver 4 docs.");
}
if (localRq.isPresent() && rqRemote.isPresent() && !localRq.get().equals(rqRemote.get())) {
throw new RuntimeException("In CQL Java driver 4, remote and local max requests per connection have been reduced to a single value." +
" You have two different values in (" + spec + "). If you make them the same, you can continue to use the 'pooling' helper option." +
" Otherwise, set the driver options directly according to driver 4 docs.");
}
coreRemote.ifPresent(i -> m.put(TypedDriverOption.CONNECTION_POOL_REMOTE_SIZE, i));
localRq.ifPresent(r -> m.put(TypedDriverOption.CONNECTION_MAX_REQUESTS, r));
Optional.ofNullable(matcher.group("heartbeatinterval")).map(Integer::valueOf)
.ifPresent(hbi -> m.put(TypedDriverOption.HEARTBEAT_INTERVAL, Duration.ofMillis(hbi)));
Optional.ofNullable(matcher.group("heartbeattimeout")).map(Integer::valueOf)
.ifPresent(ito -> m.put(TypedDriverOption.HEARTBEAT_TIMEOUT, Duration.ofMillis(ito)));
Optional.ofNullable(matcher.group("idletimeout")).map(Integer::valueOf)
.ifPresent(ito -> {
logger.warn(("Since this parameter doesn't have a direct equivalent in CQL driver 4, it is being applied to HEARTBEAT_TIMEOUT."));
m.put(TypedDriverOption.HEARTBEAT_TIMEOUT, Duration.ofMillis(ito));
});
} else {
throw new RuntimeException("No pooling options could be parsed from spec: " + spec);
}
});
add("lbp","load balancer policy (deprecated)",(m,v) -> {
throw new RuntimeException("Composable load balancers have been removed in Java driver 4 unless you provide a custom implementation.");
});
add("loadbalancingpolicy","load balancing policy (deprecated)",(m,v) -> {
throw new RuntimeException("Composable load balancers have been removed in Java driver 4 unless you provide a custom implementation.");
});
add("tickduration","Netty Tick Duration",(m,v) -> {
m.put(TypedDriverOption.NETTY_TIMER_TICK_DURATION,Duration.ofMillis(Long.parseLong(v)));
});
add("compression","Compression",(m,v) -> {
m.put(TypedDriverOption.PROTOCOL_COMPRESSION,v);
});
add("retrypolicy","Retry Policy",(m,v) -> {
m.put(TypedDriverOption.RETRY_POLICY_CLASS,v);
});
add("jmxreporting","",(m,v) -> {
throw new RuntimeException("enabling or disabling JMX reporting is not supported in Java driver 4.*");
});
add("single-endpoint","",(m,v) -> {
throw new RuntimeException("the proxy translator setting has been removed from CQL driver 4. You might be interested in setting cloud_proxy_address.");
});
add("haproxy_source_ip","",(m,v) -> {});
add("defaultidempotence","",(m,v) -> {});
add("drivermetrics","",(m,v) -> {});
}
public void add(String name, String description, Modifier modifier) {
this.modifiers.put(name, modifier);
this.descriptions.put(name,description);
}
@Override
public void applyConfig(NBConfiguration cfg) {
Map<String, Object> values = cfg.getMap();
for (String paramName : values.keySet()) {
if (modifiers.containsKey(paramName)) {
modifiers.get(paramName).accept(options, values.get(paramName).toString());
}
}
}
@Override
public NBConfigModel getConfigModel() {
ConfigModel model = ConfigModel.of(OptionHelpers.class);
modifiers.forEach((k, v) -> model.add(Param.optional(k, String.class)));
return model.asReadOnly();
}
public interface Modifier extends BiConsumer<OptionsMap, String> { }
}

View File

@ -2,15 +2,14 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.RSProcessors;
public class Cqld4BatchStatement extends Cqld4Op {
public class Cqld4CqlBatchStatement extends Cqld4CqlOp {
private final BatchStatement stmt;
public Cqld4BatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
super(session,maxpages,retryreplace,metrics);
public Cqld4CqlBatchStatement(CqlSession session, BatchStatement stmt, int maxpages, boolean retryreplace) {
super(session,maxpages,retryreplace,new RSProcessors());
this.stmt = stmt;
}

View File

@ -1,11 +1,13 @@
package io.nosqlbench.adapter.cqld4;
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.adapter.cqld4.*;
import io.nosqlbench.adapter.cqld4.exceptions.ChangeUnappliedCycleException;
import io.nosqlbench.adapter.cqld4.exceptions.UndefinedResultSetException;
import io.nosqlbench.adapter.cqld4.exceptions.UnexpectedPagingException;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
@ -30,44 +32,29 @@ import java.util.Map;
// TODO: add rows histogram resultSetSizeHisto
public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, OpGenerator {
public abstract class Cqld4CqlOp implements CycleOp<ResultSet>, VariableCapture, OpGenerator {
private final CqlSession session;
private final int maxpages;
private final boolean retryreplace;
private final Cqld4OpMetrics metrics;
private ResultSet rs;
private Cqld4Op nextOp;
private Cqld4CqlOp nextOp;
private final RSProcessors processors;
public Cqld4Op(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
this.session = session;
this.maxpages = maxpages;
this.retryreplace = retryreplace;
this.processors = new RSProcessors();
this.metrics = metrics;
}
public Cqld4Op(CqlSession session, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
public Cqld4CqlOp(CqlSession session, int maxpages, boolean retryreplace, RSProcessors processors) {
this.session = session;
this.maxpages = maxpages;
this.retryreplace = retryreplace;
this.processors = processors;
this.metrics = metrics;
}
public final ResultSet apply(long cycle) {
metrics.onStart();
Statement<?> stmt = getStmt();
rs = session.execute(stmt);
processors.start(cycle, rs);
int totalRows=0;
int totalRows = 0;
if (!rs.wasApplied()) {
if (!retryreplace) {
@ -96,10 +83,9 @@ public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, Op
if (rs.isFullyFetched()) {
break;
}
totalRows+=pageRows;
totalRows += pageRows;
}
processors.flush();
metrics.onSuccess();
return rs;
}
@ -122,9 +108,9 @@ public abstract class Cqld4Op implements CycleOp<ResultSet>, VariableCapture, Op
public abstract String getQueryString();
private Cqld4Op rebindLwt(Statement<?> stmt, Row row) {
private Cqld4CqlOp rebindLwt(Statement<?> stmt, Row row) {
BoundStatement rebound = LWTRebinder.rebindUnappliedStatement(stmt, row);
return new Cqld4ReboundStatement(session,maxpages,retryreplace,metrics,rebound,processors);
return new Cqld4CqlReboundStatement(session, maxpages, retryreplace, rebound, processors);
}
}

View File

@ -2,16 +2,14 @@ package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.adapter.cqld4.RSProcessors;
public class Cqld4PreparedStatement extends Cqld4Op {
public class Cqld4CqlPreparedStatement extends Cqld4CqlOp {
private final BoundStatement stmt;
public Cqld4PreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics, RSProcessors processors) {
super(session,maxpages,retryreplace,metrics,processors);
public Cqld4CqlPreparedStatement(CqlSession session, BoundStatement stmt, int maxpages, boolean retryreplace, RSProcessors processors) {
super(session,maxpages,retryreplace,processors);
this.stmt = stmt;
}

View File

@ -0,0 +1,25 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.RSProcessors;
public class Cqld4CqlSimpleStatement extends Cqld4CqlOp {
private final SimpleStatement stmt;
public Cqld4CqlSimpleStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace) {
super(session, maxpages,retryreplace, new RSProcessors());
this.stmt = stmt;
}
@Override
public SimpleStatement getStmt() {
return stmt;
}
@Override
public String getQueryString() {
return stmt.getQuery();
}
}

View File

@ -0,0 +1,29 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.dse.driver.api.core.graph.FluentGraphStatement;
import com.datastax.dse.driver.api.core.graph.GraphResultSet;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public class Cqld4FluentGraphOp implements CycleOp<GraphResultSet> {
private final CqlSession session;
private final FluentGraphStatement stmt;
private int resultSize=0;
public Cqld4FluentGraphOp(CqlSession session, FluentGraphStatement stmt) {
this.session = session;
this.stmt = stmt;
}
@Override
public GraphResultSet apply(long value) {
GraphResultSet result = session.execute(stmt);
this.resultSize = result.all().size();
return result;
}
@Override
public long getResultSize() {
return resultSize;
}
}

View File

@ -0,0 +1,29 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.dse.driver.api.core.graph.GraphResultSet;
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
import com.datastax.oss.driver.api.core.CqlSession;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public class Cqld4ScriptGraphOp implements CycleOp<GraphResultSet> {
private final CqlSession session;
private final ScriptGraphStatement stmt;
private int resultSize=0;
public Cqld4ScriptGraphOp(CqlSession session, ScriptGraphStatement stmt) {
this.session = session;
this.stmt = stmt;
}
@Override
public GraphResultSet apply(long value) {
GraphResultSet result = session.execute(stmt);
this.resultSize = result.all().size();
return result;
}
@Override
public long getResultSize() {
return resultSize;
}
}

View File

@ -1,29 +0,0 @@
package io.nosqlbench.adapter.cqld4.optypes;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import io.nosqlbench.adapter.cqld4.Cqld4Op;
import io.nosqlbench.adapter.cqld4.Cqld4OpMetrics;
import io.nosqlbench.virtdata.core.templates.CapturePoint;
import java.util.Map;
public class Cqld4SimpleCqlStatement extends Cqld4Op {
private final SimpleStatement stmt;
public Cqld4SimpleCqlStatement(CqlSession session, SimpleStatement stmt, int maxpages, boolean retryreplace, Cqld4OpMetrics metrics) {
super(session, maxpages,retryreplace,metrics);
this.stmt = stmt;
}
@Override
public SimpleStatement getStmt() {
return stmt;
}
@Override
public String getQueryString() {
return stmt.getQuery();
}
}

View File

@ -0,0 +1,87 @@
package io.nosqlbench.datamappers.functions.diagnostics;
import com.datastax.oss.driver.api.core.type.DataTypes;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.*;
import java.util.function.Function;
/**
* Shows the compatible CQL type most associated with the incoming Java type.
*/
//@ThreadSafeMapper
//@Categories({Category.diagnostics})
public class ToCqlType implements Function<Object, String> {
private final static Map<String, String> typemap = new HashMap<String, String>() {{
for (Field field : DataTypes.class.getFields()) {
int mods = field.getModifiers();
int req = Modifier.STATIC & Modifier.FINAL & Modifier.PUBLIC;
if ((mods&req)<req) {
continue;
}
if (!field.getName().toUpperCase(Locale.ROOT).equals(field.getName())) {
continue;
}
}
put("unsupported in this version"," additional work required ");
}};
private final ThreadLocal<StringBuilder> tlsb = ThreadLocal.withInitial(StringBuilder::new);
@Override
public String apply(Object o) {
String canonicalName = o.getClass().getCanonicalName();
String cqlTypeName = typemap.get(canonicalName);
StringBuilder sb = tlsb.get();
sb.setLength(0);
if (cqlTypeName!=null) {
return sb.append(canonicalName).append(" -> ").append(cqlTypeName).toString();
}
return findAlternates(o,canonicalName);
}
private String findAlternates(Object o, String canonicalName) {
StringBuilder sb = tlsb.get();
if (List.class.isAssignableFrom(o.getClass())) {
sb.append(canonicalName).append("<");
if (((List)o).size()>0) {
Object o1 = ((List) o).get(0);
String elementType = o1.getClass().getCanonicalName();
sb.append(elementType).append("> -> List<");
sb.append(typemap.getOrDefault(elementType,"UNKNOWN")).append(">");
return sb.toString();
}
return sb.append("?> -> List<?>").toString();
}
if (Map.class.isAssignableFrom(o.getClass())) {
sb.append(canonicalName).append("<");
if (((Map)o).size()>0) {
Map.Entry next = (Map.Entry) ((Map) o).entrySet().iterator().next();
String keyType = next.getKey().getClass().getCanonicalName();
String valType = next.getValue().getClass().getCanonicalName();
sb.append(keyType).append(",").append(valType).append("> -> Map<");
sb.append(typemap.getOrDefault(keyType,"UNKNOWN")).append(",");
sb.append(typemap.getOrDefault(valType,"UNKNOWN")).append(">");
return sb.toString();
}
return sb.append("?,?> -> Map<?,?>").toString();
}
if (Set.class.isAssignableFrom(o.getClass())) {
sb.append(canonicalName).append("<");
if (((Set)o).size()>0) {
Object o1=((Set)o).iterator().next();
String elementType = o1.getClass().getCanonicalName();
sb.append(elementType).append("> -> Set<");
sb.append(typemap.getOrDefault(elementType,"UNKNOWN")).append(">");
return sb.toString();
}
return sb.append("?> -> Set<?>").toString();
}
return typemap.getOrDefault(o.getClass().getSuperclass().getCanonicalName(), "UNKNOWN");
}
}

View File

@ -0,0 +1,21 @@
package io.nosqlbench.datamappers.functions.double_to_cqlduration;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import java.util.function.DoubleFunction;
/**
* Convert the input double value into a CQL {@link CqlDuration} object,
* by setting months to zero, and using the fractional part as part
* of a day, assuming 24-hour days.
*/
public class ToCqlDuration implements DoubleFunction<CqlDuration> {
private final static double NS_PER_DAY = 1_000_000_000L * 60 * 60 * 24;
@Override
public CqlDuration apply(double value) {
double fraction = value - (long) value;
return CqlDuration.newInstance(0,(int)value,(long)(fraction*NS_PER_DAY));
}
}

View File

@ -1,6 +1,5 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.geometry;
package io.nosqlbench.datamappers.functions.geometry;
import com.datastax.driver.dse.geometry.Point;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
@ -14,56 +13,56 @@ import java.util.function.LongToDoubleFunction;
*/
@ThreadSafeMapper
@Categories({Category.objects})
public class Distance implements LongFunction<com.datastax.driver.dse.geometry.Distance> {
public class Distance implements LongFunction<com.datastax.dse.driver.internal.core.data.geometry.Distance> {
private final io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point pointfunc;
private final Point pointfunc;
private final LongToDoubleFunction rfunc;
public Distance(LongToDoubleFunction xfunc, LongToDoubleFunction yfunc, LongToDoubleFunction rfunc) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point(xfunc,yfunc);
pointfunc = new Point(xfunc,yfunc);
this.rfunc = rfunc;
}
public Distance(double x, LongToDoubleFunction yfunc, LongToDoubleFunction rfunc) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point((u)->x,yfunc);
pointfunc = new Point((u)->x,yfunc);
this.rfunc = rfunc;
}
public Distance(LongToDoubleFunction xfunc, double y, LongToDoubleFunction rfunc) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point(xfunc,(v)->y);
pointfunc = new Point(xfunc,(v)->y);
this.rfunc = rfunc;
}
public Distance(double x, double y, LongToDoubleFunction rfunc) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point((u)->x,(v)->y);
pointfunc = new Point((u)->x,(v)->y);
this.rfunc = rfunc;
}
public Distance(LongToDoubleFunction xfunc, LongToDoubleFunction yfunc, double r) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point(xfunc,yfunc);
pointfunc = new Point(xfunc,yfunc);
this.rfunc = (w) -> r;
}
public Distance(double x, LongToDoubleFunction yfunc, double r) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point((u)->x,yfunc);
pointfunc = new Point((u)->x,yfunc);
this.rfunc = (w) -> r;
}
public Distance(LongToDoubleFunction xfunc, double y, double r) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point(xfunc,(v)->y);
pointfunc = new Point(xfunc,(v)->y);
this.rfunc = (w) -> r;
}
public Distance(double x, double y, double r) {
pointfunc = new io.nosqlbench.activitytype.cql.datamappers.functions.geometry.Point((u) -> x, (v) -> y);
pointfunc = new Point((u) -> x, (v) -> y);
this.rfunc = (w) -> r;
}
@Override
public com.datastax.driver.dse.geometry.Distance apply(long value) {
Point apoint = pointfunc.apply(value);
public com.datastax.dse.driver.internal.core.data.geometry.Distance apply(long value) {
com.datastax.dse.driver.api.core.data.geometry.Point apoint = pointfunc.apply(value);
double aradius = rfunc.applyAsDouble(value);
return new com.datastax.driver.dse.geometry.Distance(apoint,aradius);
return new com.datastax.dse.driver.internal.core.data.geometry.Distance(apoint,aradius);
}
}

View File

@ -1,7 +1,8 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.geometry;
package io.nosqlbench.datamappers.functions.geometry;
//import com.datastax.driver.dse.geometry.Point;
import com.datastax.dse.driver.internal.core.data.geometry.DefaultLineString;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
@ -16,12 +17,12 @@ import java.util.function.LongToIntFunction;
@SuppressWarnings("Duplicates")
@ThreadSafeMapper
@Categories({Category.objects})
public class LineString implements LongFunction<com.datastax.driver.dse.geometry.LineString> {
public class LineString implements LongFunction<com.datastax.dse.driver.api.core.data.geometry.LineString > {
private final LongFunction<com.datastax.driver.dse.geometry.Point> pointfunc;
private final LongFunction<com.datastax.dse.driver.api.core.data.geometry.Point > pointfunc;
private final LongToIntFunction lenfunc;
public LineString(LongToIntFunction lenfunc, LongFunction<com.datastax.driver.dse.geometry.Point> pointfunc) {
public LineString(LongToIntFunction lenfunc, LongFunction<com.datastax.dse.driver.api.core.data.geometry.Point > pointfunc) {
this.pointfunc = pointfunc;
this.lenfunc = lenfunc;
}
@ -31,22 +32,23 @@ public class LineString implements LongFunction<com.datastax.driver.dse.geometry
this.pointfunc=new Point(xfunc,yfunc);
}
public LineString(int len, LongFunction<com.datastax.driver.dse.geometry.Point> pointfunc) {
public LineString(int len, LongFunction<com.datastax.dse.driver.api.core.data.geometry.Point > pointfunc) {
this.lenfunc = (i) -> len;
this.pointfunc = pointfunc;
}
@Override
public com.datastax.driver.dse.geometry.LineString apply(long value) {
public com.datastax.dse.driver.api.core.data.geometry.LineString apply(long value) {
int linelen = Math.max(lenfunc.applyAsInt(value),2);
com.datastax.driver.dse.geometry.Point p0 = pointfunc.apply(value);
com.datastax.driver.dse.geometry.Point p1 = pointfunc.apply(value+1);
com.datastax.dse.driver.api.core.data.geometry.Point p0 = pointfunc.apply(value);
com.datastax.dse.driver.api.core.data.geometry.Point p1 = pointfunc.apply(value+1);
com.datastax.driver.dse.geometry.Point[] points = new com.datastax.driver.dse.geometry.Point[linelen-2];
com.datastax.dse.driver.api.core.data.geometry.Point[] points =
new com.datastax.dse.driver.api.core.data.geometry.Point [linelen-2];
for (int i = 2; i < linelen; i++) {
points[i-2]=pointfunc.apply(value+i);
}
return new com.datastax.driver.dse.geometry.LineString(p0,p1,points);
return new DefaultLineString(p0,p1,points);
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.geometry;
package io.nosqlbench.datamappers.functions.geometry;
import com.datastax.dse.driver.internal.core.data.geometry.DefaultPoint;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
@ -13,7 +14,7 @@ import java.util.function.LongToDoubleFunction;
*/
@ThreadSafeMapper
@Categories({Category.objects})
public class Point implements LongFunction<com.datastax.driver.dse.geometry.Point> {
public class Point implements LongFunction<com.datastax.dse.driver.api.core.data.geometry.Point> {
private final LongToDoubleFunction xfunc;
private final LongToDoubleFunction yfunc;
@ -41,7 +42,7 @@ public class Point implements LongFunction<com.datastax.driver.dse.geometry.Poin
@Override
public com.datastax.driver.dse.geometry.Point apply(long value) {
return new com.datastax.driver.dse.geometry.Point(xfunc.applyAsDouble(value), yfunc.applyAsDouble(value));
public com.datastax.dse.driver.api.core.data.geometry.Point apply(long value) {
return new DefaultPoint(xfunc.applyAsDouble(value), yfunc.applyAsDouble(value));
}
}

View File

@ -0,0 +1,52 @@
package io.nosqlbench.datamappers.functions.geometry;
import com.datastax.dse.driver.internal.core.data.geometry.DefaultPolygon;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.util.function.LongFunction;
import java.util.function.LongToDoubleFunction;
import java.util.function.LongToIntFunction;
/**
* Create a com.datastax.driver.dse.geometry.Polygon
*/
@SuppressWarnings("ALL")
@ThreadSafeMapper
@Categories({Category.objects})
public class Polygon implements LongFunction<com.datastax.dse.driver.api.core.data.geometry.Polygon > {
private final LongFunction<com.datastax.dse.driver.api.core.data.geometry.Point> pointfunc;
private final LongToIntFunction lenfunc;
public Polygon(LongToIntFunction lenfunc, LongFunction<com.datastax.dse.driver.api.core.data.geometry.Point> pointfunc) {
this.pointfunc = pointfunc;
this.lenfunc = lenfunc;
}
public Polygon(LongToIntFunction lenfunc, LongToDoubleFunction xfunc, LongToDoubleFunction yfunc) {
this.lenfunc = lenfunc;
this.pointfunc=new Point(xfunc,yfunc);
}
public Polygon(int len, LongFunction<com.datastax.dse.driver.api.core.data.geometry.Point> pointfunc) {
this.lenfunc = (i) -> len;
this.pointfunc = pointfunc;
}
@Override
public com.datastax.dse.driver.api.core.data.geometry.Polygon apply(long value) {
int linelen = Math.max(lenfunc.applyAsInt(value),3);
com.datastax.dse.driver.api.core.data.geometry.Point p0 = pointfunc.apply(value);
com.datastax.dse.driver.api.core.data.geometry.Point p1 = pointfunc.apply(value+1);
com.datastax.dse.driver.api.core.data.geometry.Point p2 = pointfunc.apply(value+2);
com.datastax.dse.driver.api.core.data.geometry.Point[] points =
new com.datastax.dse.driver.api.core.data.geometry.Point[linelen-3];
for (int i = 3; i < linelen; i++) {
points[i-3]=pointfunc.apply(value+i);
}
return new DefaultPolygon(p0,p1,p2,points);
}
}

View File

@ -1,7 +1,8 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.geometry;
package io.nosqlbench.datamappers.functions.geometry;
import com.datastax.driver.dse.geometry.Point;
import com.datastax.driver.dse.geometry.Polygon;
import com.datastax.dse.driver.api.core.data.geometry.Polygon;
import com.datastax.dse.driver.internal.core.data.geometry.DefaultPoint;
import com.datastax.dse.driver.internal.core.data.geometry.DefaultPolygon;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
@ -28,7 +29,7 @@ import java.util.function.LongFunction;
@SuppressWarnings("ALL")
@ThreadSafeMapper
@Categories({Category.objects})
public class PolygonOnGrid implements LongFunction<Polygon> {
public class PolygonOnGrid implements LongFunction<com.datastax.dse.driver.api.core.data.geometry.Polygon> {
private final double rows;
private final double columns;
@ -76,11 +77,11 @@ public class PolygonOnGrid implements LongFunction<Polygon> {
double right = left+xwidth;
double bottom = top - yheight;
Polygon polygon = new Polygon(
new Point(left, bottom),
new Point(left, top),
new Point(right, top),
new Point(right, bottom)
com.datastax.dse.driver.api.core.data.geometry.Polygon polygon = new DefaultPolygon(
new DefaultPoint(left, bottom),
new DefaultPoint(left, top),
new DefaultPoint(right, top),
new DefaultPoint(right, bottom)
);
return polygon;

View File

@ -0,0 +1,42 @@
package io.nosqlbench.datamappers.functions.long_localdate;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneId;
import java.util.function.LongFunction;
/**
* Converts epoch millis to a java.time.LocalDate, which takes the place
* of the previous CQL-specific LocalDate. if a zoneid of 'default' is provided,
* then the time zone is set by the default for the JVM. If not, then
* a valid ZoneId is looked up. The no-args version uses GMT.
*/
@ThreadSafeMapper
@Categories({Category.datetime})
public class EpochMillisToCqlLocalDate implements LongFunction<LocalDate> {
private final ZoneId zoneId;
public EpochMillisToCqlLocalDate(String zoneid) {
if (zoneid.equals("default")) {
this.zoneId = ZoneId.systemDefault();
} else {
this.zoneId = ZoneId.of(zoneid);
}
}
@Example({"EpochMillisToJavaLocalDate()", "Yields the LocalDate for the millis in GMT"})
public EpochMillisToCqlLocalDate() {
this.zoneId = ZoneId.of("GMT");
}
@Override
public LocalDate apply(long value) {
return LocalDate.ofInstant(Instant.ofEpochMilli(value), ZoneId.systemDefault());
}
}

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_localdate;
package io.nosqlbench.datamappers.functions.long_localdate;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_localdate;
package io.nosqlbench.datamappers.functions.long_localdate;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;

View File

@ -1,11 +1,11 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_localdate;
package io.nosqlbench.datamappers.functions.long_localdate;
import com.datastax.driver.core.LocalDate;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.time.LocalDate;
import java.util.function.LongFunction;
/**
@ -14,12 +14,14 @@ import java.util.function.LongFunction;
@ThreadSafeMapper
@Categories({Category.datetime})
public class LongToLocalDateDays implements LongFunction<LocalDate> {
@Override
public LocalDate apply(long value) {
return LocalDate.fromDaysSinceEpoch((int) value % Integer.MAX_VALUE);
}
@Example({"LongToLocalDateDays()","take the cycle number and turn it into a LocalDate based on days since 1970"})
public LongToLocalDateDays (){
}
@Override
public LocalDate apply(long value) {
return LocalDate.ofEpochDay((int) value & Integer.MAX_VALUE);
}
}

View File

@ -1,6 +1,6 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_to_cqlduration;
package io.nosqlbench.datamappers.functions.long_to_cqlduration;
import com.datastax.driver.core.Duration;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
@ -15,7 +15,7 @@ import java.util.function.LongUnaryOperator;
*/
@ThreadSafeMapper
@Categories({Category.datetime})
public class CqlDurationFunctions implements LongFunction<Duration> {
public class CqlDurationFunctions implements LongFunction<CqlDuration> {
private final LongToIntFunction monthsfunc;
private final LongToIntFunction daysfunc;
@ -48,10 +48,10 @@ public class CqlDurationFunctions implements LongFunction<Duration> {
@Override
public Duration apply(long value) {
public CqlDuration apply(long value) {
int months = monthsfunc.applyAsInt(value);
int days = daysfunc.applyAsInt(value);
long nanos = nanosfunc.applyAsLong(value);
return Duration.newInstance(months,days,nanos);
return CqlDuration.newInstance(months,days,nanos);
}
}

View File

@ -1,6 +1,6 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_to_cqlduration;
package io.nosqlbench.datamappers.functions.long_to_cqlduration;
import com.datastax.driver.core.Duration;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
@ -8,24 +8,24 @@ import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.util.function.LongFunction;
/**
* Convert the input value into a {@link com.datastax.driver.core.Duration}
* Convert the input value into a {@link CqlDuration}
* by reading the input as total nanoseconds, assuming 30-month days.
*/
@ThreadSafeMapper
@Categories({Category.conversion,Category.datetime})
public class ToCqlDurationNanos implements LongFunction<Duration> {
public class ToCqlDurationNanos implements LongFunction<CqlDuration> {
private final static long NS_PER_S = 1_000_000_000L;
private final static long NS_PER_DAY = NS_PER_S * 60*60*24;
private final static long NS_PER_MONTH = NS_PER_DAY * 30;
@Override
public Duration apply(long value) {
public CqlDuration apply(long value) {
long nanos = value % NS_PER_DAY;
value -= nanos;
long days = value / NS_PER_DAY;
value -= days*NS_PER_DAY;
long months = value / NS_PER_MONTH;
return Duration.newInstance((int) months,(int) days, nanos);
return CqlDuration.newInstance((int) months,(int) days, nanos);
}
}

View File

@ -1,6 +1,7 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_uuid;
package io.nosqlbench.datamappers.functions.long_uuid;
import com.datastax.driver.core.utils.UUIDs;
//import com.datastax.driver.core.utils.UUIDs;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
@ -11,13 +12,14 @@ import java.util.function.LongFunction;
/**
* Converts a long timestamp in epoch millis form into a Version 1 TimeUUID
* according to <a href="https://www.ietf.org/rfc/rfc4122.txt">RFC 4122</a>.
* This form uses {@link UUIDs#startOf(long)}
* This form uses {@link Uuids#endOf(long)} (long)}
*/
@Categories({Category.datetime})
@ThreadSafeMapper
public class ToTimeUUIDMax implements LongFunction<UUID> {
@Override
public UUID apply(long value) {
return UUIDs.endOf(value);
return Uuids.endOf(value);
}
}

View File

@ -1,6 +1,6 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_uuid;
package io.nosqlbench.datamappers.functions.long_uuid;
import com.datastax.driver.core.utils.UUIDs;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
@ -11,13 +11,13 @@ import java.util.function.LongFunction;
/**
* Converts a long timestamp in epoch millis form into a Version 1 TimeUUID
* according to <a href="https://www.ietf.org/rfc/rfc4122.txt">RFC 4122</a>.
* This form uses {@link UUIDs#startOf(long)}
* This form uses {@link Uuids#startOf(long)}
*/
@Categories({Category.datetime})
@ThreadSafeMapper
public class ToTimeUUIDMin implements LongFunction<UUID> {
@Override
public UUID apply(long value) {
return UUIDs.startOf(value);
return Uuids.startOf(value);
}
}

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.rainbow;
package io.nosqlbench.datamappers.functions.rainbow;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
@ -25,14 +25,14 @@ public class TokenMapFileAPIService {
private final ByteBuffer buffer;
private final int RECORD_LEN = Long.BYTES * 2;
private int recordPosition;
private final int recordPosition;
private long token;
private int TOKEN_OFFSET = 0;
private final int TOKEN_OFFSET = 0;
private long cycle;
private int CYCLE_OFFSET = Long.BYTES;
private final int CYCLE_OFFSET = Long.BYTES;
private boolean loopdata;
private final boolean loopdata;
/**
* Create a new binary cursor for data in a binary file which consists of a (long,long) tuple of
@ -49,7 +49,7 @@ public class TokenMapFileAPIService {
public TokenMapFileAPIService(String datafile, boolean loopdata, boolean instanced, boolean ascending) {
this.loopdata = loopdata;
buffer = TokenMapFileSharedBuffers.getByteBuffer(datafile,instanced,ascending).asReadOnlyBuffer();
this.recordCount = (int) (buffer.capacity() / RECORD_LEN);
this.recordCount = buffer.capacity() / RECORD_LEN;
this.recordPosition = 0;
}

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.rainbow;
package io.nosqlbench.datamappers.functions.rainbow;
import java.util.function.IntToLongFunction;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.rainbow;
package io.nosqlbench.datamappers.functions.rainbow;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.rainbow;
package io.nosqlbench.datamappers.functions.rainbow;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.rainbow;
package io.nosqlbench.datamappers.functions.rainbow;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.rainbow;
package io.nosqlbench.datamappers.functions.rainbow;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.rainbow;
package io.nosqlbench.datamappers.functions.rainbow;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.string_string;
package io.nosqlbench.datamappers.functions.string_string;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;

View File

@ -0,0 +1,58 @@
package io.nosqlbench.datamappers.functions.to_daterange;
import com.datastax.dse.driver.api.core.data.time.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRangeBound;
import com.datastax.dse.driver.api.core.data.time.DateRangePrecision;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.function.LongFunction;
/**
* Takes an input as a reference point in epoch time, and converts it to a DateRange,
* with the bounds set to the lower and upper timestamps which align to the
* specified precision. You can use any of these precisions to control the bounds
* around the provided timestamp: millisecond, second, minute, hour, day, month, or year.
*
* If the zoneid is not specified, it defaults to "GMT". If the zoneid is set to "default",
* then the zoneid is set to the default for the JVM. Otherwise, the specified zone is used.
*/
@ThreadSafeMapper
@Categories(Category.datetime)
public class DateRangeDuring implements LongFunction<DateRange> {
private final DateRangePrecision precision;
private final ZoneId zoneid;
@Example({"DateRangeDuring('millisecond')}","Convert the incoming millisecond to an equivalent DateRange"})
@Example({"DateRangeDuring('minute')}","Convert the incoming millisecond to a DateRange for the minute in which the " +
"millisecond falls"})
public DateRangeDuring(String precision) {
this(precision,"GMT");
}
public DateRangeDuring(String precision, String zoneid) {
this.precision = DateRangePrecision.valueOf(precision.toUpperCase());
if (zoneid.equals("default")) {
this.zoneid = ZoneId.systemDefault();
} else {
this.zoneid = ZoneId.of(zoneid);
}
}
@Override
public DateRange apply(long value) {
ZonedDateTime date = ZonedDateTime.ofInstant(Instant.ofEpochMilli(value), zoneid);
DateRangeBound lower = DateRangeBound.lowerBound(date, precision);
DateRangeBound upper = DateRangeBound.upperBound(date, precision);
DateRange dateRange = new DateRange(lower, upper);
return dateRange;
}
}

View File

@ -0,0 +1,62 @@
package io.nosqlbench.datamappers.functions.to_daterange;
import com.datastax.dse.driver.api.core.data.time.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRangeBound;
import com.datastax.dse.driver.api.core.data.time.DateRangePrecision;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import io.nosqlbench.virtdata.api.bindings.VirtDataFunctions;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.function.LongFunction;
import java.util.function.LongUnaryOperator;
/**
* Uses the precision and the two functions provided to create a DateRange.
* You can use any of these precisions to control the bounds
* around the provided timestamp: millisecond, second, minute, hour, day, month, or year.
*
* If the zoneid is not specified, it defaults to "GMT". If the zoneid is set to "default",
* then the zoneid is set to the default for the JVM. Otherwise, the specified zone is used.
*/
@ThreadSafeMapper
@Categories(Category.datetime)
public class DateRangeFunc implements LongFunction<DateRange> {
private final DateRangePrecision precision;
private final LongUnaryOperator lower;
private final LongUnaryOperator upper;
private final ZoneId zoneid;
@Example({
"StartingEpochMillis('2017-01-01 23:59:59'); DateRangeFunc('second',Identity(),Add(3600000L)",
"Create 1-minute date ranges starting at 2017-01-01 23:59:59"})
public DateRangeFunc(String precision, Object lowerFunc, Object upperFunc) {
this(precision, lowerFunc, upperFunc, "GMT");
}
public DateRangeFunc(String precision, Object lowerFunc, Object upperFunc, String zoneid) {
this.precision = DateRangePrecision.valueOf(precision.toUpperCase());
this.lower = VirtDataFunctions.adapt(lowerFunc,LongUnaryOperator.class, long.class, false);
this.upper = VirtDataFunctions.adapt(upperFunc,LongUnaryOperator.class, long.class, false);
if (zoneid.equals("default")) {
this.zoneid = ZoneId.systemDefault();
} else {
this.zoneid = ZoneId.of(zoneid);
}
}
@Override
public DateRange apply(long value) {
ZonedDateTime lowerDate = ZonedDateTime.ofInstant(Instant.ofEpochMilli(lower.applyAsLong(value)), zoneid);
DateRangeBound lower = DateRangeBound.lowerBound(lowerDate,precision);
ZonedDateTime upperDate = ZonedDateTime.ofInstant(Instant.ofEpochMilli(upper.applyAsLong(value)), zoneid);
DateRangeBound upper = DateRangeBound.upperBound(upperDate,precision);
DateRange dateRange = new DateRange(lower, upper);
return dateRange;
}
}

View File

@ -1,13 +1,16 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.to_daterange;
package io.nosqlbench.datamappers.functions.to_daterange;
import com.datastax.driver.dse.search.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRangeBound;
import com.datastax.dse.driver.api.core.data.time.DateRangePrecision;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.util.Date;
import java.util.function.Function;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.function.LongFunction;
/**
@ -16,26 +19,40 @@ import java.util.function.LongFunction;
* provided, and with no upper bound.
* You can use any of these precisions to control the bounds
* around the provided timestamp: millisecond, second, minute, hour, day, month, or year.
*
* If the zoneid is not specified, it defaults to "GMT". If the zoneid is set to "default",
* then the zoneid is set to the default for the JVM. Otherwise, the specified zone is used.
*/
@ThreadSafeMapper
@Categories(Category.datetime)
public class DateRangeOnOrAfter implements LongFunction<DateRange> {
private final DateRange.DateRangeBound.Precision precision;
private final DateRangePrecision precision;
private final ZoneId zoneid;
public DateRangeOnOrAfter(String precision, String zoneid) {
this.precision = DateRangePrecision.valueOf(precision.toUpperCase());
if (zoneid.equals("default")) {
this.zoneid = ZoneId.systemDefault();
} else {
this.zoneid = ZoneId.of(zoneid);
}
}
@Example({"DateRangeOnOrAfter('millisecond')}","Convert the incoming millisecond to an match any time on or after"})
@Example({"DateRangeOnOrAfter('minute')}","Convert the incoming millisecond to mach any time on or after the" +
" minute in which the " +
"millisecond falls"})
public DateRangeOnOrAfter(String precision) {
this.precision = DateRange.DateRangeBound.Precision.valueOf(precision.toUpperCase());
this(precision,"GMT");
}
@Override
public DateRange apply(long value) {
Date date = new Date(value);
DateRange.DateRangeBound lower = DateRange.DateRangeBound.lowerBound(date, precision);
DateRange.DateRangeBound upper = DateRange.DateRangeBound.UNBOUNDED;
ZonedDateTime date = ZonedDateTime.ofInstant(Instant.ofEpochMilli(value), zoneid);
DateRangeBound lower = DateRangeBound.lowerBound(date, precision);
DateRangeBound upper = DateRangeBound.UNBOUNDED;
DateRange dateRange = new DateRange(lower, upper);
return dateRange;
}

View File

@ -1,13 +1,16 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.to_daterange;
package io.nosqlbench.datamappers.functions.to_daterange;
import com.datastax.driver.dse.search.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRangeBound;
import com.datastax.dse.driver.api.core.data.time.DateRangePrecision;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.util.Date;
import java.util.function.Function;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.function.LongFunction;
/**
@ -16,25 +19,40 @@ import java.util.function.LongFunction;
* provided, and with no lower bound.
* You can use any of these precisions to control the bounds
* around the provided timestamp: millisecond, second, minute, hour, day, month, or year.
*
* If the zoneid is not specified, it defaults to "GMT". If the zoneid is set to "default",
* then the zoneid is set to the default for the JVM. Otherwise, the specified zone is used.
*/
@ThreadSafeMapper
@Categories(Category.datetime)
public class DateRangeOnOrBefore implements LongFunction<DateRange> {
private final DateRange.DateRangeBound.Precision precision;
private final DateRangePrecision precision;
private final ZoneId zoneid;
@Example({"DateRangeOnOrBefore('millisecond')}","Convert the incoming millisecond to match anything on or before it."})
@Example({"DateRangeOnOrBefore('minute')}","Convert the incoming millisecond to match anything on or before the minute in" +
" which the millisecond falls"})
public DateRangeOnOrBefore(String precision) {
this.precision = DateRange.DateRangeBound.Precision.valueOf(precision.toUpperCase());
this(precision,"GMT");
}
public DateRangeOnOrBefore(String precision, String zoneid) {
this.precision = DateRangePrecision.valueOf(precision.toUpperCase());
if (zoneid.equals("default")) {
this.zoneid = ZoneId.systemDefault();
} else {
this.zoneid = ZoneId.of(zoneid);
}
}
@Override
public DateRange apply(long value) {
Date date = new Date(value);
DateRange.DateRangeBound lower = DateRange.DateRangeBound.UNBOUNDED;
DateRange.DateRangeBound upper = DateRange.DateRangeBound.upperBound(date,precision);
ZonedDateTime date = ZonedDateTime.ofInstant(Instant.ofEpochMilli(value),zoneid);
DateRangeBound lower = DateRangeBound.UNBOUNDED;
DateRangeBound upper = DateRangeBound.upperBound(date,precision);
DateRange dateRange = new DateRange(lower, upper);
return dateRange;
}

View File

@ -1,13 +1,13 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.to_daterange;
package io.nosqlbench.datamappers.functions.to_daterange;
import com.datastax.driver.dse.search.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRange;
import com.datastax.dse.driver.api.core.data.time.DateRangePrecision;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.Example;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import java.text.ParseException;
import java.util.Date;
import java.util.function.Function;
/**
@ -19,13 +19,13 @@ import java.util.function.Function;
@Categories(Category.datetime)
public class DateRangeParser implements Function<String, DateRange> {
private final DateRange.DateRangeBound.Precision precision;
private final DateRangePrecision precision;
@Example({"DateRangeParser()}","Convert inputs like '[1970-01-01T00:00:00 TO 1970-01-01T00:00:00]' into " +
"DateRanges" +
" "})
public DateRangeParser(String precision) {
this.precision = DateRange.DateRangeBound.Precision.valueOf(precision.toUpperCase());
this.precision = DateRangePrecision.valueOf(precision.toUpperCase());
}
@Override

View File

@ -0,0 +1,130 @@
description: creates local graphs which resemble a wagon-wheel topology, using
DSE Graph, version 6.8 or newer
scenarios:
drop-graph: run driver=cqld4 graphname=graph_wheels tags=block:drop-graph cycles===UNDEF
creategraph: run driver=cqld4 graphname=graph_wheels tags=phase:create-graph cycles===UNDEF
creategraph-classic: run driver=cqld4 graphname=graph_wheels tags=block:create-graph-classic cycles===UNDEF
schema: run driver=cqld4 graphname=graph_wheels tags=phase:graph-schema cycles===UNDEF
disable-verify: run driver=cqld4 graphname=graph_wheels tags=phase:disable-verify cycles===UNDEF
rampup: run driver==cqld4 graphname=graph_wheels tags=phase:rampup cycles=1000
fluent: run driver=cqld4 graphname=graph_wheels tags=block:fluent cycles=10
default:
creategraph: run driver=cqld4 graphname=graph_wheels tags=phase:create-graph cycles===UNDEF
schema: run driver=cqld4 graphname=graph_wheels tags=phase:graph-schema cycles===UNDEF
rampup: run driver==cqld4 graphname=graph_wheels tags=phase:rampup cycles=1
devmode: run driver=cqld4 graphname=graph_wheels tags=name:dev-mode
prodmode: run driver=cqld4 graphname=graph_wheels tags=name:dev-mode
bindings:
sessionid: ToEpochTimeUUID()->java.util.UUID; ToString();
deviceid: Add(200000); Div(<<sessons_per_device:10>>); ToEpochTimeUUID()->java.util.UUID; ToString();
type: WeightedStrings('phone:10;computer:10;')
os: WeightedStrings('android:6;ios:4;linux:2;osx:7;windows:3')
osversion: WeightedStrings('nougat:3;oreo:1;jellybean:2;4:1;4c:1;5:1;5c:1;trusty:1;xenial:1;yosemite:1;el capitan:2;sierra:3;high sierra:1;7:1;10:2')
ipaddress: Combinations('1;7;0-3;.;0-2;0-2;0-5;.;0-2;0-2;0-5')
createdtime: Add(1505256898)
diag_ten_pct: WeightedLongs('1:1;0:9')
diag_one_pct: WeightedLongs('1:1;0:99')
blocks:
drop-graph:
statements:
drop-graph:
type: gremlin
script: "system.graph('<<graphname:graph_wheels>>').ifExists().drop();"
create-graph-classic:
statements:
creategraph:
type: gremlin
script: >-
system.graph('<<graphname:graph_wheels>>')
.classicEngine()
.create()
create-graph:
tags:
phase: create-graph
statements:
creategraph:
type: gremlin
script: >-
system.graph('<<graphname:graph_wheels>>').ifNotExists().create()
create-schema:
tags:
phase: graph-schema
statements:
graph-schema:
type: gremlin
graphname: <<graphname:graph_wheels>>
script: >-
schema.vertexLabel('session')
.ifNotExists()
.partitionBy('sessionid', Uuid)
.property('ipaddress', Text)
.property('deviceid', Uuid)
.property('createdtime', Bigint)
.create();
schema.vertexLabel('device')
.ifNotExists()
.partitionBy('deviceid', Uuid)
.property('type', Text)
.property('os', Text)
.property('osversion', Text)
.create();
schema.edgeLabel('using')
.ifNotExists()
.from('session')
.to('device')
.create()
dev-mode:
tags:
phase: dev-mode
statements:
dev-mode:
type: gremlin
graphname: <<graphname:graph_wheels>>
script: >-
schema.config().option('graph.schema_mode').set('Development');
prod-mode:
tags:
phase: prod-mode
statements:
prod-mode:
type: gremlin
graphname: <<graphname:graph_wheels>>
script: >-
schema.config().option('graph.schema_mode').set('Production');
rampup:
tags:
phase: rampup
statements:
main-add:
type: gremlin
diag: "{diag_one_pct}"
graphname: <<graphname:graph_wheels>>
script: >-
device = g.addV('device')
.property('deviceid', '{deviceid}' as UUID)
.property('type', '{type}')
.property('os', '{os}')
.property('osversion', '{osversion}')
.as('d')
.addV('session')
.property('sessionid', '{sessionid}' as UUID)
.property('ipaddress', '{ipaddress}')
.property('deviceid', '{deviceid}' as UUID)
.property('createdtime', {createdtime})
.as('s')
.addE('using').from('s').to('d');
fluent:
statements:
read:
type: fluent
graphname: <<graphname:graph_wheels>>
imports:
- "org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__"
fluent: >-
g.V().hasLabel("device").has("deviceid", UUID.fromString({deviceid}))

View File

@ -156,7 +156,7 @@ blocks:
tags:
name: main-select-01234567
- main-select: |
select * from <<keyspace:baselines>>.<<table:tabular>> where part={part_read} limit {limit};
select data0,data1,data2,data3,data4,data5,data6,data7 from <<keyspace:baselines>>.<<table:tabular>> where part={part_read} limit {limit};
tags:
name: main-select
- name: main-write

View File

@ -1,19 +1,145 @@
# cqld4 driver
This is the newly revamped (alpha) driver for cql which uses
the OSS Driver version 4. As there was a significant restructuring
of the APIs between CQL driver 4 and previous versions, this driver
is not a derivative of the previous NoSQLBench CQL driver which
was based on the version 1.9 native driver. Instead, it is a
clean and separate implementation which aims to use the features
of version 4* of the native driver directly.
This is the newly revamped (beta) driver for CQL which uses the DataStax OSS Driver version 4. As
there was a significant restructuring of the APIs between CQL driver 4.x and previous versions, this
driver is a clean and separate implementation which aims to use the features of version 4.x of the
native driver directly as well as new internal NoSQLBench APIs.
This means that many features that advanced testers may have been used to (the syntactical sugar,
the surfacing of advanced configuration properties in simple ways, and so on) will have to be
redesigned to fit with version 4 of the driver. Most users who do basic testing with direct CQL
syntax should see few issues, but advanced testers will need to consult this documentation
specifically to understand the differences between `cqld4` driver features and `cql` driver
features.
Notably, these features need to be re-built on the cqld4 driver to bring it up to parity with
previous advanced testing features:
- verify
- result set size metrics
- explicit paging metrics
## Configuration
The DataStax Java Driver 4.* has a much different configuration system than previous versions. For
changing driver settings with this version it is **highly recommended** that users use the built-in
driver settings and configuration file/profile options, just as they would for an application. This
serves two goals: 1) to ensure that the settings you test with are portable from test environment to
application, and 2) to allow you to configure driver settings directly, without depending on
internal helper logic provided by NoSQLBench. This means that the driver options exposed are those
provided by the low-level driver, thus removing another dependency from your test setup.
### Config Sources
By using the option `driverconfig`, you can have as many configuration sources as you like, even
mixing in JSON or remote URLs.
**examples**
Configure directly from a config file, or classpath resource:
# If this isn't found in the file system, the classpath will also be checked.
driverconfig=myconfig.json
Configure directly from JSON:
driverconfig='{basic.request.timeout:"2 seconds"}'
Configure directly form a remote URL:
driverconfig='http://gist.github.com...'
Configure from multiple sources:
driverconfig=myconfig.json
### Activity level Driver Config
The activity parameters which are provided by the driver are exposed as `driver.<name>`. Any
configuration option that is specified this way will be applied directly to the driver through the
type-safe configuration layer. For example, specifying `driver.basic.request.timeout='2 seconds'`
has the same effect as setting `basic.request.timeout` in a driver configuration file.
## Backwards Compatibility with `cql` and `cqld3`
Many driver options were provided in a more convenient form for testing in previous CQL drivers with
NoSQLBench. Due to the changes in driver 4.x, the implementation of these options had to change as
well. Where possible, a backwards-compatible option helper was provided so that test defined for
`cql` and `cqld3` drivers would just work with the `cqld4` driver. In some cases, this simply was
not possible as some options were no longer supported, or changed so much that there was no longer a
direct mapping that would work consistently across versions. You can try to use the previous
options, like `pooling` and so on. If the option is not supported as such, it will cause an error
with an explanation. Otherwise, these helper options will simply set the equivalent options
in the driver profile to achieve the same effect. As stated above, it is highly recommended that
driver settings be captured in a configuration file and set with `driverconfig=<file>.json`
## Statement Forms
The CQLd4 driver supports idiomatic usage of all the main statement APIs within the native Java
driver. The syntax for specifying these types is simplified as well, using only a single
`type` field which allows values of simple, prepared, raw, gremlin, fluent, and so on.
The previous form of specifing `type: cql` and optional modifiers like `prepared` and
`parameterized` is deprecated now, sinces all the forms are explicitly supported by a
well-defined type name.
The previous form will work, but you will get a warning, as these should be deprecated
going forward. It is best to use the forms in the examples below. The defaults and field
names for the classic form have not changed.
## CQLd4 Op Template Examples
ops:
# prepared statement
# allows for parameterization via bindings, and uses prepared statements internally
example-prepared-cql-stmt:
prepared: |
select one, two from buckle.myshoe where ...
# prepared statement (verbose form)
example-prepared-cql-stmt-verbose:
type: prepared
stmt: |
select one, two from buckle.myshoe where ...
# simple statement
# allows for parameterization via bindings, but does not use prepared statements internally
example-simple-cql-stmt:
simple: |
select three, four from knock.onthedoor where ...
# raw statement
# pre-renders the statement into a string, with no driver-supervised parameterization
# useful for testing variant DDL where some fields are not parameterizable
# NOTE: the raw form does its best to quote non-literals where needed, but you may
# have to inject single or double quotes in special cases.
example-raw-cql-stmt:
raw: |
create table if not exist {ksname}.{tblname} ...
# gremlin statement using the fluent API, as it would be written in a client application
example-fluent-graph-stmt:
fluent: >-
g.V().hasLabel("device").has("deviceid", UUID.fromString({deviceid}))
# if imports are not specified, the following is auto imported.
# if imports are specified, you must also provide the __ class if needed
imports:
- org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__
# gremlin statement using string API (not recommended)
example-raw-gremlin-stmt:
gremlin: >-
g.V().hasLabel("device").has("deviceid", UUID.fromString('{deviceid})')
## Driver Cache
Like all driver adapters, the CQLd4 driver has the ability to use multiple low-level
driver instances for the purposes of advanced testing. To take advantage of this,
simply set a `space` parameter in your op templates, with a dynamic value.
__WARNING__: If you use the driver cache feature, be aware that creating a large
number of driver instances will be very expensive. Generally driver instances are meant
to be initialized and then shared throughout the life-cycle of an application process.
This means that many features that advanced testers may have been
used to (the syntactical sugar, the surfacing of advanced configuration
properties in simple ways, and so on) will have to be redesigned to
fit with version 4 of the driver. Most users who do basic testing with
direct CQL syntax should see few issues, but advanced testers will need
to consult this documentation specifically to understand the differences
between `cqld4` NB features and `cql` NB features.

View File

@ -0,0 +1,3 @@
These docs are carried over from the prior cql 1.9 and cql 3.* drivers. They do not describe
current behavior, but are here as a reference point for closing the implementation gap
in the new cqld4 driver before it is moved from prerelease status to mainline releases.

View File

@ -1,23 +0,0 @@
package io.nosqlbench.adapter.cqld4;
import org.junit.jupiter.api.Test;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class Cqld4SpaceTest {
@Test
public void testSplitConfigs() {
List<String> strings = Cqld4Space.splitConfigLoaders("http://config.example.com/asdf,file:foo,{inline config},c:\\file,/tmp/test");
assertThat(strings).containsExactly(
"http://config.example.com/asdf",
"file:foo",
"{inline config}",
"c:\\file",
"/tmp/test"
);
}
}

View File

@ -1,8 +1,8 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.double_to_cqlduration;
package io.nosqlbench.datamappers.functions.double_to_cqlduration;
import com.datastax.driver.core.Duration;
import io.nosqlbench.activitytype.cql.datamappers.functions.long_to_cqlduration.CqlDurationFunctions;
import io.nosqlbench.activitytype.cql.datamappers.functions.long_to_cqlduration.ToCqlDurationNanos;
import com.datastax.oss.driver.api.core.data.CqlDuration;
import io.nosqlbench.datamappers.functions.long_to_cqlduration.CqlDurationFunctions;
import io.nosqlbench.datamappers.functions.long_to_cqlduration.ToCqlDurationNanos;
import org.junit.jupiter.api.Test;
import java.util.function.LongToIntFunction;
@ -16,15 +16,15 @@ public class CqlDurationTests {
public void testFractionalCqlDuration() {
ToCqlDuration cd = new ToCqlDuration();
// only precise enough on the unit interval for this type of test
Duration oneDayPlusOneHour = cd.apply(1.0d + (1d/24D));
assertThat(oneDayPlusOneHour).isEqualTo(Duration.newInstance(0,1,1_000_000_000L*60*60));
CqlDuration oneDayPlusOneHour = cd.apply(1.0d + (1d/24D));
assertThat(oneDayPlusOneHour).isEqualTo(CqlDuration.newInstance(0,1,1_000_000_000L*60*60));
}
@Test
public void testLongToCqlDuration() {
ToCqlDurationNanos toNanos = new ToCqlDurationNanos();
// assertThat(toNanos.apply(1_000_000_000l * 2)).isEqualTo(Duration.newInstance(0,0,1_000_000_000*2));
assertThat(toNanos.apply(1_000_000_000L*86401L)).isEqualTo(Duration.newInstance(0,1,1_000_000_000));
assertThat(toNanos.apply(1_000_000_000L*86401L)).isEqualTo(CqlDuration.newInstance(0,1,1_000_000_000));
}
@Test
@ -34,9 +34,9 @@ public class CqlDurationTests {
(LongToIntFunction) d -> (int) (d * 2),
(LongUnaryOperator) n -> n * 10
);
Duration d2y10mo34d170ns = composed.apply(17);
CqlDuration d2y10mo34d170ns = composed.apply(17);
assertThat(d2y10mo34d170ns).isEqualTo(
Duration.newInstance(34,34,170));
CqlDuration.newInstance(34,34,170));
}

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.long_localdate;
package io.nosqlbench.datamappers.functions.long_localdate;
import org.joda.time.DateTime;
import org.junit.jupiter.api.Test;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.activitytype.cql.datamappers.functions.to_daterange;
package io.nosqlbench.datamappers.functions.to_daterange;
import org.junit.jupiter.api.Test;

View File

@ -4,17 +4,16 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.89-SNAPSHOT</version>
<version>4.17.10-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-cqlverify</artifactId>
<artifactId>adapter-dynamodb</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A CQL content verifier ActivityType, based on the CQL ActivityType
built on http://nosqlbench.io/
A DriverAdapter driver for dynamodb
</description>
<dependencies>
@ -23,14 +22,14 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cql-shaded</artifactId>
<version>4.15.89-SNAPSHOT</version>
<artifactId>adapters-api</artifactId>
<version>4.17.10-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.89-SNAPSHOT</version>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.12.129</version>
</dependency>
</dependencies>

Some files were not shown because too many files have changed in this diff Show More