partial progress on cql-d4

This commit is contained in:
Jonathan Shook 2020-05-18 10:19:20 -05:00
parent 2ee74355bd
commit 93644b76fc
12 changed files with 371 additions and 429 deletions

View File

@ -1,5 +1,16 @@
package com.datastax.driver.core;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.TokenMap;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
import org.jetbrains.annotations.NotNull;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -7,40 +18,32 @@ import java.util.OptionalLong;
import java.util.Set;
public class M3PTokenFilter {
private final TokenRange[] ranges;
private final ProtocolVersion protocolVersion;
private final CodecRegistry codecRegistry;
private final Metadata clusterMetadata;
private final Token.Factory factory;
public M3PTokenFilter(Set<TokenRange> ranges, Cluster cluster) {
protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
codecRegistry = cluster.getConfiguration().getCodecRegistry();
clusterMetadata = cluster.getMetadata();
factory = Token.getFactory(clusterMetadata.partitioner);
List<TokenRange> rangeList = new ArrayList<>();
private final TokenRange[] ranges;
public M3PTokenFilter(Set<TokenRange> ranges, Session session) {
TokenMap tokenMap = session.getMetadata().getTokenMap().orElseThrow();
List<TokenRange> rangelist = new ArrayList<>();
for (TokenRange range : ranges) {
if (!range.getStart().getType().equals(DataType.bigint())) {
throw new RuntimeException("This filter only works with bigint valued token types");
}
rangeList.add(range);
rangelist.add(range);
}
this.ranges=rangeList.toArray(new TokenRange[0]);
this.ranges = rangelist.toArray(new TokenRange[0]);
if (this.ranges.length<1) {
throw new RuntimeException("There were no tokens found. Please check your keyspace and cluster settings.");
}
}
public OptionalLong matches(Statement statement) {
ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry);
Token token = factory.hash(routingKey);
public boolean matches(Statement statement) {
Token token = statement.getRoutingToken();
for (TokenRange range : ranges) {
if (range.contains(token)) {
return OptionalLong.of((long)token.getValue());
return true;
}
}
return OptionalLong.empty();
return false;
}

View File

@ -1,5 +1,15 @@
package com.datastax.driver.core;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.api.core.metadata.token.TokenRange;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3Token;
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3TokenFactory;
import com.datastax.oss.driver.internal.core.metadata.token.Murmur3TokenRange;
import io.nosqlbench.activitytype.cqld4.api.StatementFilter;
import java.nio.ByteBuffer;
@ -13,41 +23,41 @@ public class TokenRangeStmtFilter implements StatementFilter {
private final Metadata clusterMetadata;
private final ProtocolVersion protocolVersion;
private final CodecRegistry codecRegistry;
private final Token.Factory factory;
// private final Token.Factory factory;
private TokenRange[] ranges;
public TokenRangeStmtFilter(Cluster cluster, String rangesSpec) {
clusterMetadata = cluster.getMetadata();
protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
codecRegistry = cluster.getConfiguration().getCodecRegistry();
factory = Token.getFactory(clusterMetadata.partitioner);
ranges = parseRanges(factory, rangesSpec);
public TokenRangeStmtFilter(Session session, String rangesSpec) {
clusterMetadata = session.getMetadata();
protocolVersion = session.getContext().getProtocolVersion();
codecRegistry = session.getContext().getCodecRegistry();
ranges = parseRanges(session, rangesSpec);
}
private TokenRange[] parseRanges(Token.Factory factory, String rangesStr) {
private TokenRange[] parseRanges(Session session, String rangesStr) {
String[] ranges = rangesStr.split(",");
List<TokenRange> tr = new ArrayList<>();
for (String range : ranges) {
String[] interval = range.split(":");
Token start = factory.fromString(interval[0]);
Token end = factory.fromString(interval[1]);
TokenRange tokenRange = new TokenRange(start, end, factory);
Murmur3TokenFactory m3f = new Murmur3TokenFactory();
Token start = m3f.parse(interval[0]);
Token end = m3f.parse(interval[1]);
TokenRange tokenRange = m3f.range(start,end);
tr.add(tokenRange);
}
return tr.toArray(new TokenRange[tr.size()]);
return tr.toArray(new TokenRange[0]);
}
@Override
public boolean matches(Statement statement) {
ByteBuffer routingKey = statement.getRoutingKey(protocolVersion, codecRegistry);
Token token = factory.hash(routingKey);
public boolean matches(Statement<?> statement) {
Token routingToken = statement.getRoutingToken();
for (TokenRange range : ranges) {
if (range.contains(token)) {
if (range.contains(routingToken)) {
return true;
}
}
return false;
}
@Override

View File

@ -1,71 +0,0 @@
package com.datastax.driver.core;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Comparator;
import java.util.Set;
public class TokenRangeUtil {
private final Metadata clusterMetadata;
private final ProtocolVersion protocolVersion;
private final CodecRegistry codecRegistry;
private final Token.Factory factory;
private final Cluster cluster;
public TokenRangeUtil(Cluster cluster) {
this.cluster= cluster;
clusterMetadata = cluster.getMetadata();
protocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
codecRegistry = cluster.getConfiguration().getCodecRegistry();
factory = Token.getFactory(clusterMetadata.partitioner);
}
public Set<TokenRange> getTokenRangesFor(String keyspace, String hostaddress) {
Host host=null;
if (hostaddress.matches("\\d+")) {
int hostenum = Integer.parseInt(hostaddress);
host = clusterMetadata.getAllHosts().stream()
.sorted(Comparator.comparing(h -> h.getAddress().toString()))
.skip(hostenum)
.findFirst()
.orElseThrow();
} else if (!hostaddress.isEmpty()) {
host = clusterMetadata.getAllHosts().stream()
.filter(h -> h.getAddress().toString().replaceAll("/","").equals(hostaddress))
.findFirst()
.orElseThrow();
} else {
throw new RuntimeException("You must specify a host enum in order or a host address.");
}
return clusterMetadata.getTokenRanges(keyspace,host);
}
public void printRanges(String tokensks) {
Set<Host> hosts = clusterMetadata.getAllHosts();
for (Host host : hosts) {
String address = host.getAddress().toString().substring(1);
BufferedWriter writer = null;
try {
writer = new BufferedWriter(new FileWriter("ranges-"+address));
String ranges = getTokenRangesFor(tokensks, address).toString();
writer.write(ranges);
writer.close();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException("Can't write token range files");
}
}
}
public M3PTokenFilter getFilterFor(Set<TokenRange> ranges) {
return new M3PTokenFilter(ranges, this.cluster);
}
}

View File

@ -3,5 +3,5 @@ package io.nosqlbench.activitytype.cqld4.api;
import com.datastax.oss.driver.api.core.cql.Statement;
public interface StatementFilter {
boolean matches(Statement statement);
boolean matches(Statement<?> statement);
}

View File

@ -1,226 +1,202 @@
package io.nosqlbench.activitytype.cqld4.core;
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
import com.datastax.oss.driver.internal.core.connection.ConstantReconnectionPolicy;
import com.datastax.oss.driver.internal.core.connection.ExponentialReconnectionPolicy;
import com.datastax.oss.driver.internal.core.context.NettyOptions;
import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy;
import com.datastax.oss.driver.internal.core.specex.ConstantSpeculativeExecutionPolicy;
import io.netty.util.HashedWheelTimer;
import io.nosqlbench.nb.api.errors.BasicError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class CQLOptions {
private final static Logger logger = LoggerFactory.getLogger(CQLOptions.class);
private final static Pattern CORE_AND_MAX_RQ_PATTERN = Pattern.compile("(?<core>\\d+)(:(?<max>\\d+)(:(?<rq>\\d+))?)?(,(?<rcore>\\d+)(:(?<rmax>\\d+)(:(?<rrq>\\d+))?)?)?(,?heartbeat_interval_s:(?<heartbeatinterval>\\d+))?(,?idle_timeout_s:(?<idletimeout>\\d+))?(,?pool_timeout_ms:(?<pooltimeout>\\d+))?");
private final static Pattern PERCENTILE_EAGER_PATTERN = Pattern.compile("^p(?<pctile>[^:]+)(:(?<executions>\\d+))?(:(?<tracked>\\d+)ms)?$");
private final static Pattern CONSTANT_EAGER_PATTERN = Pattern.compile("^((?<msThreshold>\\d++)ms)(:(?<executions>\\d+))?$");
private static ConstantSpeculativeExecutionPolicy constantPolicy(DriverContext context, int threshold, int executions) {
return new ConstantSpeculativeExecutionPolicy(threshold, executions);
}
private static SpeculativeExecutionPolicy percentilePolicy(long tracked, double threshold, int executions) {
PerHostPercentileTracker tracker = newTracker(tracked);
return new PercentileSpeculativeExecutionPolicy(tracker, threshold, executions);
}
private static PerHostPercentileTracker newTracker(long millis) {
return PerHostPercentileTracker.builder(millis).build();
}
public static PoolingOptions poolingOptionsFor(String spec) {
Matcher matcher = CORE_AND_MAX_RQ_PATTERN.matcher(spec);
if (matcher.matches()) {
PoolingOptions poolingOptions = new PoolingOptions();
Optional.ofNullable(matcher.group("core")).map(Integer::valueOf)
.ifPresent(core -> poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, core));
Optional.ofNullable(matcher.group("max")).map(Integer::valueOf)
.ifPresent(max -> poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, max));
Optional.ofNullable(matcher.group("rq")).map(Integer::valueOf)
.ifPresent(rq -> poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, rq));
Optional.ofNullable(matcher.group("rcore")).map(Integer::valueOf)
.ifPresent(rcore -> poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, rcore));
Optional.ofNullable(matcher.group("rmax")).map(Integer::valueOf)
.ifPresent(rmax -> poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, rmax));
Optional.ofNullable(matcher.group("rrq")).map(Integer::valueOf)
.ifPresent(rrq -> poolingOptions.setMaxRequestsPerConnection(HostDistance.REMOTE, rrq));
Optional.ofNullable(matcher.group("heartbeatinterval")).map(Integer::valueOf)
.ifPresent(poolingOptions::setHeartbeatIntervalSeconds);
Optional.ofNullable(matcher.group("idletimeout")).map(Integer::valueOf)
.ifPresent(poolingOptions::setIdleTimeoutSeconds);
Optional.ofNullable(matcher.group("pooltimeout")).map(Integer::valueOf)
.ifPresent(poolingOptions::setPoolTimeoutMillis);
return poolingOptions;
}
throw new RuntimeException("No pooling options could be parsed from spec: " + spec);
}
public static RetryPolicy retryPolicyFor(String spec, Session session) {
Set<String> retryBehaviors = Arrays.stream(spec.split(",")).map(String::toLowerCase).collect(Collectors.toSet());
RetryPolicy retryPolicy = new DefaultRetryPolicy(session.getContext(),"default");
if (retryBehaviors.contains("default")) {
return retryPolicy;
} // add other mutually-exclusive behaviors here with checks, if we want to extend beyond "default"
if (retryBehaviors.contains("logging")) {
retryPolicy = new LoggingRetryPolicy(retryPolicy);
}
return retryPolicy;
}
public static ReconnectionPolicy reconnectPolicyFor(String spec, Session session) {
if(spec.startsWith("exponential(")){
String argsString = spec.substring(12);
String[] args = argsString.substring(0, argsString.length() - 1).split("[,;]");
if (args.length != 2){
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>)");
}
long baseDelay = Long.parseLong(args[0]);
long maxDelay = Long.parseLong(args[1]);
ExponentialReconnectionPolicy exponentialReconnectionPolicy = new ExponentialReconnectionPolicy(session.getContext());
}else if(spec.startsWith("constant(")){
String argsString = spec.substring(9);
long constantDelayMs= Long.parseLong(argsString.substring(0, argsString.length() - 1));
return new ConstantReconnectionPolicy(constantDelayMs);
}
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>) or constant(<constantDelayMs>)");
}
public static SocketOptions socketOptionsFor(String spec) {
String[] assignments = spec.split("[,;]");
Map<String, String> values = new HashMap<>();
for (String assignment : assignments) {
String[] namevalue = assignment.split("[:=]", 2);
String name = namevalue[0];
String value = namevalue[1];
values.put(name, value);
}
SocketOptions options = new SocketOptions();
Optional.ofNullable(values.get("read_timeout_ms")).map(Integer::parseInt).ifPresent(
options::setReadTimeoutMillis
);
Optional.ofNullable(values.get("connect_timeout_ms")).map(Integer::parseInt).ifPresent(
options::setConnectTimeoutMillis
);
Optional.ofNullable(values.get("keep_alive")).map(Boolean::parseBoolean).ifPresent(
options::setKeepAlive
);
Optional.ofNullable(values.get("reuse_address")).map(Boolean::parseBoolean).ifPresent(
options::setReuseAddress
);
Optional.ofNullable(values.get("so_linger")).map(Integer::parseInt).ifPresent(
options::setSoLinger
);
Optional.ofNullable(values.get("tcp_no_delay")).map(Boolean::parseBoolean).ifPresent(
options::setTcpNoDelay
);
Optional.ofNullable(values.get("receive_buffer_size")).map(Integer::parseInt).ifPresent(
options::setReceiveBufferSize
);
Optional.ofNullable(values.get("send_buffer_size")).map(Integer::parseInt).ifPresent(
options::setSendBufferSize
);
return options;
}
public static SpeculativeExecutionPolicy defaultSpeculativePolicy() {
PerHostPercentileTracker tracker = PerHostPercentileTracker
.builder(15000)
.build();
PercentileSpeculativeExecutionPolicy defaultSpecPolicy =
new PercentileSpeculativeExecutionPolicy(tracker, 99.0, 5);
return defaultSpecPolicy;
}
public static SpeculativeExecutionPolicy speculativeFor(String spec) {
Matcher pctileMatcher = PERCENTILE_EAGER_PATTERN.matcher(spec);
Matcher constantMatcher = CONSTANT_EAGER_PATTERN.matcher(spec);
if (pctileMatcher.matches()) {
double pctile = Double.valueOf(pctileMatcher.group("pctile"));
if (pctile > 100.0 || pctile < 0.0) {
throw new RuntimeException("pctile must be between 0.0 and 100.0");
}
String executionsSpec = pctileMatcher.group("executions");
String trackedSpec = pctileMatcher.group("tracked");
int executions = (executionsSpec != null && !executionsSpec.isEmpty()) ? Integer.valueOf(executionsSpec) : 5;
int tracked = (trackedSpec != null && !trackedSpec.isEmpty()) ? Integer.valueOf(trackedSpec) : 15000;
logger.debug("speculative: Creating new percentile tracker policy from spec '" + spec + "'");
return percentilePolicy(tracked, pctile, executions);
} else if (constantMatcher.matches()) {
int threshold = Integer.valueOf(constantMatcher.group("msThreshold"));
String executionsSpec = constantMatcher.group("executions");
int executions = (executionsSpec != null && !executionsSpec.isEmpty()) ? Integer.valueOf(executionsSpec) : 5;
logger.debug("speculative: Creating new constant policy from spec '" + spec + "'");
return constantPolicy(threshold, executions);
} else {
throw new RuntimeException("Unable to parse pattern for speculative option: " + spec + ", it must be in " +
"an accepted form, like p99.0:5:15000, or p99.0:5, or 5000ms:5");
}
}
public static LoadBalancingPolicy whitelistFor(String s, LoadBalancingPolicy innerPolicy) {
String[] addrSpecs = s.split(",");
List<InetSocketAddress> sockAddrs = Arrays.stream(addrSpecs)
.map(CQLOptions::toSocketAddr)
.collect(Collectors.toList());
if (innerPolicy == null) {
innerPolicy = new RoundRobinPolicy();
}
return new WhiteListPolicy(innerPolicy, sockAddrs);
}
public static NettyOptions withTickDuration(String tick) {
logger.info("Cluster builder using custom tick duration value for HashedWheelTimer: " + tick + " milliseconds");
int tickDuration = Integer.valueOf(tick);
return new NettyOptions() {
public io.netty.util.Timer timer(ThreadFactory threadFactory) {
return new HashedWheelTimer(
threadFactory, tickDuration, TimeUnit.MILLISECONDS);
}
};
}
private static InetSocketAddress toSocketAddr(String addr) {
String[] addrs = addr.split(":", 2);
String inetHost = addrs[0];
String inetPort = (addrs.length == 2) ? addrs[1] : "9042";
return new InetSocketAddress(inetHost, Integer.valueOf(inetPort));
}
public static ProtocolOptions.Compression withCompression(String compspec) {
try {
return ProtocolOptions.Compression.valueOf(compspec);
} catch (IllegalArgumentException iae) {
throw new RuntimeException("Compression option '" + compspec + "' was specified, but only " +
Arrays.toString(ProtocolOptions.Compression.values()) + " are available.");
}
}
// private final static Logger logger = LoggerFactory.getLogger(CQLOptions.class);
//
// private final static Pattern CORE_AND_MAX_RQ_PATTERN = Pattern.compile("(?<core>\\d+)(:(?<max>\\d+)(:(?<rq>\\d+))?)?(,(?<rcore>\\d+)(:(?<rmax>\\d+)(:(?<rrq>\\d+))?)?)?(,?heartbeat_interval_s:(?<heartbeatinterval>\\d+))?(,?idle_timeout_s:(?<idletimeout>\\d+))?(,?pool_timeout_ms:(?<pooltimeout>\\d+))?");
// private final static Pattern PERCENTILE_EAGER_PATTERN = Pattern.compile("^p(?<pctile>[^:]+)(:(?<executions>\\d+))?(:(?<tracked>\\d+)ms)?$");
// private final static Pattern CONSTANT_EAGER_PATTERN = Pattern.compile("^((?<msThreshold>\\d++)ms)(:(?<executions>\\d+))?$");
//
// private static ConstantSpeculativeExecutionPolicy constantPolicy(DriverContext context, int threshold, int executions) {
// return new ConstantSpeculativeExecutionPolicy(threshold, executions);
// }
//
// private static SpeculativeExecutionPolicy percentilePolicy(long tracked, double threshold, int executions) {
// PerHostPercentileTracker tracker = newTracker(tracked);
// return new PercentileSpeculativeExecutionPolicy(tracker, threshold, executions);
// }
//
// private static PerHostPercentileTracker newTracker(long millis) {
// return PerHostPercentileTracker.builder(millis).build();
// }
//
// public static PoolingOptions poolingOptionsFor(String spec) {
// Matcher matcher = CORE_AND_MAX_RQ_PATTERN.matcher(spec);
// if (matcher.matches()) {
// PoolingOptions poolingOptions = new PoolingOptions();
//
// Optional.ofNullable(matcher.group("core")).map(Integer::valueOf)
// .ifPresent(core -> poolingOptions.setCoreConnectionsPerHost(HostDistance.LOCAL, core));
// Optional.ofNullable(matcher.group("max")).map(Integer::valueOf)
// .ifPresent(max -> poolingOptions.setMaxConnectionsPerHost(HostDistance.LOCAL, max));
// Optional.ofNullable(matcher.group("rq")).map(Integer::valueOf)
// .ifPresent(rq -> poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, rq));
//
// Optional.ofNullable(matcher.group("rcore")).map(Integer::valueOf)
// .ifPresent(rcore -> poolingOptions.setCoreConnectionsPerHost(HostDistance.REMOTE, rcore));
// Optional.ofNullable(matcher.group("rmax")).map(Integer::valueOf)
// .ifPresent(rmax -> poolingOptions.setMaxConnectionsPerHost(HostDistance.REMOTE, rmax));
// Optional.ofNullable(matcher.group("rrq")).map(Integer::valueOf)
// .ifPresent(rrq -> poolingOptions.setMaxRequestsPerConnection(HostDistance.REMOTE, rrq));
//
// Optional.ofNullable(matcher.group("heartbeatinterval")).map(Integer::valueOf)
// .ifPresent(poolingOptions::setHeartbeatIntervalSeconds);
//
// Optional.ofNullable(matcher.group("idletimeout")).map(Integer::valueOf)
// .ifPresent(poolingOptions::setIdleTimeoutSeconds);
//
// Optional.ofNullable(matcher.group("pooltimeout")).map(Integer::valueOf)
// .ifPresent(poolingOptions::setPoolTimeoutMillis);
//
// return poolingOptions;
// }
// throw new RuntimeException("No pooling options could be parsed from spec: " + spec);
//
// }
//
// public static RetryPolicy retryPolicyFor(String spec, Session session) {
// Set<String> retryBehaviors = Arrays.stream(spec.split(",")).map(String::toLowerCase).collect(Collectors.toSet());
// RetryPolicy retryPolicy = new DefaultRetryPolicy(session.getContext(),"default");
//
// if (retryBehaviors.contains("default")) {
// return retryPolicy;
// } // add other mutually-exclusive behaviors here with checks, if we want to extend beyond "default"
//
// if (retryBehaviors.contains("logging")) {
// retryPolicy = new LoggingRetryPolicy(retryPolicy);
// }
//
// return retryPolicy;
// }
//
// public static ReconnectionPolicy reconnectPolicyFor(String spec, Session session) {
// if(spec.startsWith("exponential(")){
// String argsString = spec.substring(12);
// String[] args = argsString.substring(0, argsString.length() - 1).split("[,;]");
// if (args.length != 2){
// throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>)");
// }
// long baseDelay = Long.parseLong(args[0]);
// long maxDelay = Long.parseLong(args[1]);
// ExponentialReconnectionPolicy exponentialReconnectionPolicy = new ExponentialReconnectionPolicy(session.getContext());
// }else if(spec.startsWith("constant(")){
// String argsString = spec.substring(9);
// long constantDelayMs= Long.parseLong(argsString.substring(0, argsString.length() - 1));
// return new ConstantReconnectionPolicy(constantDelayMs);
// }
// throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>) or constant(<constantDelayMs>)");
// }
//
// public static SocketOptions socketOptionsFor(String spec) {
// String[] assignments = spec.split("[,;]");
// Map<String, String> values = new HashMap<>();
// for (String assignment : assignments) {
// String[] namevalue = assignment.split("[:=]", 2);
// String name = namevalue[0];
// String value = namevalue[1];
// values.put(name, value);
// }
//
// SocketOptions options = new SocketOptions();
// Optional.ofNullable(values.get("read_timeout_ms")).map(Integer::parseInt).ifPresent(
// options::setReadTimeoutMillis
// );
// Optional.ofNullable(values.get("connect_timeout_ms")).map(Integer::parseInt).ifPresent(
// options::setConnectTimeoutMillis
// );
// Optional.ofNullable(values.get("keep_alive")).map(Boolean::parseBoolean).ifPresent(
// options::setKeepAlive
// );
// Optional.ofNullable(values.get("reuse_address")).map(Boolean::parseBoolean).ifPresent(
// options::setReuseAddress
// );
// Optional.ofNullable(values.get("so_linger")).map(Integer::parseInt).ifPresent(
// options::setSoLinger
// );
// Optional.ofNullable(values.get("tcp_no_delay")).map(Boolean::parseBoolean).ifPresent(
// options::setTcpNoDelay
// );
// Optional.ofNullable(values.get("receive_buffer_size")).map(Integer::parseInt).ifPresent(
// options::setReceiveBufferSize
// );
// Optional.ofNullable(values.get("send_buffer_size")).map(Integer::parseInt).ifPresent(
// options::setSendBufferSize
// );
//
// return options;
// }
//
// public static SpeculativeExecutionPolicy defaultSpeculativePolicy() {
// PerHostPercentileTracker tracker = PerHostPercentileTracker
// .builder(15000)
// .build();
// PercentileSpeculativeExecutionPolicy defaultSpecPolicy =
// new PercentileSpeculativeExecutionPolicy(tracker, 99.0, 5);
// return defaultSpecPolicy;
// }
//
// public static SpeculativeExecutionPolicy speculativeFor(String spec) {
// Matcher pctileMatcher = PERCENTILE_EAGER_PATTERN.matcher(spec);
// Matcher constantMatcher = CONSTANT_EAGER_PATTERN.matcher(spec);
// if (pctileMatcher.matches()) {
// double pctile = Double.valueOf(pctileMatcher.group("pctile"));
// if (pctile > 100.0 || pctile < 0.0) {
// throw new RuntimeException("pctile must be between 0.0 and 100.0");
// }
// String executionsSpec = pctileMatcher.group("executions");
// String trackedSpec = pctileMatcher.group("tracked");
// int executions = (executionsSpec != null && !executionsSpec.isEmpty()) ? Integer.valueOf(executionsSpec) : 5;
// int tracked = (trackedSpec != null && !trackedSpec.isEmpty()) ? Integer.valueOf(trackedSpec) : 15000;
// logger.debug("speculative: Creating new percentile tracker policy from spec '" + spec + "'");
// return percentilePolicy(tracked, pctile, executions);
// } else if (constantMatcher.matches()) {
// int threshold = Integer.valueOf(constantMatcher.group("msThreshold"));
// String executionsSpec = constantMatcher.group("executions");
// int executions = (executionsSpec != null && !executionsSpec.isEmpty()) ? Integer.valueOf(executionsSpec) : 5;
// logger.debug("speculative: Creating new constant policy from spec '" + spec + "'");
// return constantPolicy(threshold, executions);
// } else {
// throw new RuntimeException("Unable to parse pattern for speculative option: " + spec + ", it must be in " +
// "an accepted form, like p99.0:5:15000, or p99.0:5, or 5000ms:5");
// }
//
// }
//
// public static LoadBalancingPolicy whitelistFor(String s, LoadBalancingPolicy innerPolicy) {
// String[] addrSpecs = s.split(",");
// List<InetSocketAddress> sockAddrs = Arrays.stream(addrSpecs)
// .map(CQLOptions::toSocketAddr)
// .collect(Collectors.toList());
// if (innerPolicy == null) {
// innerPolicy = new RoundRobinPolicy();
// }
// return new WhiteListPolicy(innerPolicy, sockAddrs);
// }
//
// public static NettyOptions withTickDuration(String tick) {
// logger.info("Cluster builder using custom tick duration value for HashedWheelTimer: " + tick + " milliseconds");
// int tickDuration = Integer.valueOf(tick);
// return new NettyOptions() {
// public io.netty.util.Timer timer(ThreadFactory threadFactory) {
// return new HashedWheelTimer(
// threadFactory, tickDuration, TimeUnit.MILLISECONDS);
// }
// };
// }
//
// private static InetSocketAddress toSocketAddr(String addr) {
// String[] addrs = addr.split(":", 2);
// String inetHost = addrs[0];
// String inetPort = (addrs.length == 2) ? addrs[1] : "9042";
// return new InetSocketAddress(inetHost, Integer.valueOf(inetPort));
// }
//
// public static ProtocolOptions.Compression withCompression(String compspec) {
// try {
// return ProtocolOptions.Compression.valueOf(compspec);
// } catch (IllegalArgumentException iae) {
// throw new RuntimeException("Compression option '" + compspec + "' was specified, but only " +
// Arrays.toString(ProtocolOptions.Compression.values()) + " are available.");
// }
// }
}

View File

@ -1,8 +1,9 @@
package io.nosqlbench.activitytype.cqld4.core;
import com.codahale.metrics.Timer;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.cql.*;
import com.datastax.oss.driver.api.core.session.Session;
import io.nosqlbench.activitytype.cqld4.api.ResultSetCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.RowCycleOperator;
import io.nosqlbench.activitytype.cqld4.api.StatementFilter;
@ -22,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("Duplicates")
@ -81,7 +83,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
totalRowsFetchedForQuery = 0L;
Statement statement;
ResultSetFuture resultSetFuture;
CompletionStage<AsyncResultSet> resultSetFuture;
ReadyCQLStatement readyCQLStatement;
int tries = 0;
@ -124,7 +126,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
}
try (Timer.Context executeTime = cqlActivity.executeTimer.time()) {
resultSetFuture = cqlActivity.getSession().executeAsync(statement);
CompletionStage<AsyncResultSet> completion = cqlActivity.getSession().executeAsync(statement);
}
Timer.Context resultTime = cqlActivity.resultTimer.time();
@ -149,7 +151,8 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
Row row = resultSet.one();
ColumnDefinitions defs = row.getColumnDefinitions();
if (retryReplace) {
statement = CQLBindHelper.rebindUnappliedStatement(statement, defs, row);
statement =
new CQLBindHelper(getCqlActivity().getSession()).rebindUnappliedStatement(statement, defs,row);
}
logger.trace(readyCQLStatement.getQueryString(cycleValue));
@ -212,7 +215,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
readyCQLStatement.getQueryString(cycleValue),
1,
cqlActivity.maxpages,
cqlActivity.getSession().getCluster().getConfiguration().getQueryOptions().getFetchSize()
cqlActivity.getSession().getContext().getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE)
);
}
}
@ -302,7 +305,7 @@ public class CqlAction implements SyncAction, MultiPhaseAction, ActivityDefObser
pagingReadyStatement.getQueryString(cycleValue),
pagesFetched,
cqlActivity.maxpages,
cqlActivity.getSession().getCluster().getConfiguration().getQueryOptions().getFetchSize()
cqlActivity.getSession().getContext().getConfig().getDefaultProfile().getInt(DefaultDriverOption.REQUEST_PAGE_SIZE)
);
}
pagingResultSet = resultSet;

View File

@ -4,6 +4,7 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.*;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.session.Session;
import io.nosqlbench.activitytype.cqld4.codecsupport.UDTCodecInjector;
import com.datastax.driver.core.TokenRangeStmtFilter;
@ -73,7 +74,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
Meter rowsCounter;
private HashedCQLErrorHandler errorHandler;
private OpSequence<ReadyCQLStatement> opsequence;
private Session session;
private CqlSession session;
private int maxTries;
private StatementFilter statementFilter;
private Boolean showcql;
@ -125,7 +126,7 @@ public class CqlActivity extends SimpleActivity implements Activity, ActivityDef
logger.debug("activity fully initialized: " + this.activityDef.getAlias());
}
public synchronized Session getSession() {
public synchronized CqlSession getSession() {
if (session == null) {
session = CQLSessionCache.get().getSession(this.getActivityDef());
}

View File

@ -1,10 +1,8 @@
package io.nosqlbench.activitytype.cqld4.core;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.oss.driver.api.core.cql.Statement;
import io.nosqlbench.activitytype.cqld4.statements.core.ReadyCQLStatement;
import com.google.common.util.concurrent.FutureCallback;
import io.nosqlbench.engine.api.activityapi.core.ops.fluent.opfacets.StartedOp;
public class CqlOpData implements FutureCallback<ResultSet> {

View File

@ -49,7 +49,9 @@ public class UnexpectedPagingException extends CqlGenericCycleException {
sb.append("Additional paging would be required to read the results from this query fully" +
", but the user has not explicitly indicated that paging was expected.")
.append(" fetched/allowed: ").append(fetchedPages).append("/").append(maxpages)
.append(" fetchSize(").append(fetchSize).append("): ").append(queryString);
.append(" fetchSize(").append(fetchSize).append("): ").append(queryString).append(", note this value " +
"is shown for reference from the default driver profile. If you are using a custom profile, it may be " +
"different.");
return sb.toString();
}
}

View File

@ -1,6 +1,10 @@
package io.nosqlbench.activitytype.cqld4.statements.core;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.retry.RetryPolicy;
import com.datastax.oss.driver.api.core.session.Session;
import com.datastax.oss.driver.api.core.specex.SpeculativeExecutionPolicy;
@ -29,7 +33,8 @@ public class CQLSessionCache implements Shutdownable {
private final static Logger logger = LoggerFactory.getLogger(CQLSessionCache.class);
private final static String DEFAULT_SESSION_ID = "default";
private static CQLSessionCache instance = new CQLSessionCache();
private Map<String, Session> sessionCache = new HashMap<>();
private Map<String, CqlSession> sessionCache = new HashMap<>();
private Map<String, Map<String, DriverOption>> rawConfigs = new HashMap<>();
private CQLSessionCache() {
}
@ -44,61 +49,62 @@ public class CQLSessionCache implements Shutdownable {
session.close();
}
public Session getSession(ActivityDef activityDef) {
public CqlSession getSession(ActivityDef activityDef) {
String key = activityDef.getParams().getOptionalString("clusterid").orElse(DEFAULT_SESSION_ID);
return sessionCache.computeIfAbsent(key, (cid) -> createSession(activityDef, key));
}
// cbopts=\".withLoadBalancingPolicy(LatencyAwarePolicy.builder(new TokenAwarePolicy(new DCAwareRoundRobinPolicy(\"dc1-us-east\", 0, false))).build()).withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE))\"
private Session createSession(ActivityDef activityDef, String sessid) {
private CqlSession createSession(ActivityDef activityDef, String sessid) {
String host = activityDef.getParams().getOptionalString("host").orElse("localhost");
int port = activityDef.getParams().getOptionalInteger("port").orElse(9042);
String driverType = activityDef.getParams().getOptionalString("cqldriver").orElse("dse");
Cluster.Builder builder =
driverType.toLowerCase().equals("dse") ? DseCluster.builder() :
driverType.toLowerCase().equals("oss") ? Cluster.builder() : null;
activityDef.getParams().getOptionalString("cqldriver").ifPresent(v -> {
logger.warn("The cqldriver parameter is not needed in this version of the driver.");
});
if (builder==null) {
throw new RuntimeException("The driver type '" + driverType + "' is not recognized");
CqlSessionBuilder builder = CqlSession.builder();
Optional<Path> scb = activityDef.getParams().getOptionalString("secureconnectbundle")
.map(Path::of);
Optional<List<String>> hosts = activityDef.getParams().getOptionalString("host", "hosts")
.map(h -> h.split(",")).map(Arrays::asList);
Optional<Integer> port1 = activityDef.getParams().getOptionalInteger("port");
if (scb.isPresent()) {
scb.map(b -> {
logger.debug("adding secureconnectbundle: " + b.toString());
return b;
}).ifPresent(builder::withCloudSecureConnectBundle);
if (hosts.isPresent()) {
logger.warn("The host parameter is not valid when using secureconnectbundle=");
}
if (port1.isPresent()) {
logger.warn("the port parameter is not used with CQL when using secureconnectbundle=");
}
} else {
hosts.orElse(List.of("localhost"))
.stream()
.map(h -> InetSocketAddress.createUnresolved(h,port))
.peek(h-> logger.debug("adding contact endpoint: " + h.getHostName()+":"+h.getPort()))
.forEachOrdered(builder::addContactPoint);
}
logger.info("Using driver type '" + driverType.toUpperCase() + "'");
Optional<String> scb = activityDef.getParams()
.getOptionalString("secureconnectbundle");
scb.map(File::new)
.ifPresent(builder::withCloudSecureConnectBundle);
activityDef.getParams()
.getOptionalString("insights").map(Boolean::parseBoolean)
.ifPresent(builder::withMonitorReporting);
String[] contactPoints = activityDef.getParams().getOptionalString("host")
.map(h -> h.split(",")).orElse(null);
if (contactPoints == null) {
contactPoints = activityDef.getParams().getOptionalString("hosts")
.map(h -> h.split(",")).orElse(null);
}
if (contactPoints == null && scb.isEmpty()) {
contactPoints = new String[]{"localhost"};
}
if (contactPoints != null) {
builder.addContactPoints(contactPoints);
}
activityDef.getParams().getOptionalInteger("port").ifPresent(builder::withPort);
builder.withCompression(ProtocolOptions.Compression.NONE);
// builder.withCompression(ProtocolOptions.Compression.NONE);
// TODO add map based configuration with compression defaults
Optional<String> usernameOpt = activityDef.getParams().getOptionalString("username");
Optional<String> passwordOpt = activityDef.getParams().getOptionalString("password");
Optional<String> passfileOpt = activityDef.getParams().getOptionalString("passfile");
Optional<String> authIdOpt = activityDef.getParams().getOptionalString("authid");
if (usernameOpt.isPresent()) {
String username = usernameOpt.get();
@ -119,7 +125,11 @@ public class CQLSessionCache implements Shutdownable {
logger.error(error);
throw new RuntimeException(error);
}
builder.withCredentials(username, password);
if (authIdOpt.isPresent()) {
builder.withAuthCredentials(username, password, authIdOpt.get());
} else {
builder.withAuthCredentials(username, password);
}
}
Optional<String> clusteropts = activityDef.getParams().getOptionalString("cbopts");

View File

@ -0,0 +1,18 @@
package io.nosqlbench.activitytype.cqld4.statements.core;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.internal.core.config.map.MapBasedDriverConfigLoader;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Map;
public class NBCqlDriverConfigLoader extends MapBasedDriverConfigLoader {
public NBCqlDriverConfigLoader(
@NonNull OptionsMap source,
@NonNull Map<String, Map<DriverOption, Object>> rawMap) {
super(source, rawMap);
}
}

View File

@ -4,55 +4,52 @@ This is the CQL version 4 driver for NoSQLBench. As it gets more use, we will ma
name. For now, the 'cql' refers to the version 1.9 driver, while 'cqld4' refers to this one. The drivers will have
identical features where possible, but new enhancements will be targeted at this one first.
In the alpha release of this NoSQLBench CQL driver, some of the options previously available on the CQL 1.9 driver will
not be supported. We are working to add these in an idiomatic way ASAP.
This is an driver which allows for the execution of CQL statements. This driver supports both sync and async modes, with
detailed metrics provided for both.
### Example activity definitions
Run a cql activity named 'cql1', with definitions from activities/cqldefs.yaml
~~~
... driver=cql alias=cql1 workload=cqldefs
~~~
Run a cql activity defined by cqldefs.yaml, but with shortcut naming
~~~
... driver=cql workload=cqldefs
~~~
Only run statement groups which match a tag regex
~~~
... driver=cql workload=cqldefs tags=group:'ddl.*'
~~~
Run the matching 'dml' statements, with 100 cycles, from [1000..1100)
~~~
... driver=cql workload=cqldefs tags=group:'dml.*' cycles=1000..1100
~~~
This last example shows that the cycle range is [inclusive..exclusive),
to allow for stacking test intervals. This is standard across all
activity types.
TEMPORARY EDITORS NOTE: This will use a more consistent layout as shown below. The topics are meant to be searchable in
the newer doc system scheme.
### CQL ActivityType Parameters
- **cqldriver** - default: dse - The type of driver to use, either dse, or oss. If you need DSE-specific features, use
the dse driver. If you are connecting to an OSS Apache Cassandra cluster, you must use the oss driver. The oss driver
option is only available in nosqlbench.
- **host** - The host or hosts to use for connection points to
the cluster. If you specify multiple values here, use commas
with no spaces.
Examples:
- `host=192.168.1.25`
- `host=`192.168.1.25,testhost42`
- **workload** - The workload definition which holds the schema and statement defs.
see workload yaml location for additional details
(no default, required)
- **port** - The port to connect with
- **cl** - An override to consistency levels for the activity. If
this option is used, then all consistency levels will be replaced
by this one for the current activity, and a log line explaining
the difference with respect to the yaml will be emitted.
This is not a dynamic parameter. It will only be applied at
activity start.
#### secureconnectbundle
This parameter is used to connect to Astra Database as a Service. This option accepts a path to the secure connect
bundle that is downloaded from the Astra UI.
- Examples:
- `secureconnectbundle=/tmp/secure-connect-my_db.zip`
- `secureconnectbundle="/home/automaton/secure-connect-my_db.zip"`
#### hosts
The host or hosts to use for connection points to the cluster. If you specify multiple values here, use commas with no
spaces. This option is not valid when the **secureconnectbundle** option is used.
* Examples:
- `host=192.168.1.25`
- `host=`192.168.1.25,testhost42`
#### port
The port to connect with. This option is not valid when the **secureconnectbundle** option is used.
Default
: 9042
---- below this line needs to be curated for the new driver ----
Examples:
- `port=9042`
- **cl** - An override to consistency levels for the activity. If this option is used, then all consistency levels will
be replaced by this one for the current activity, and a log line explaining the difference with respect to the yaml
will be emitted. This is not a dynamic parameter. It will only be applied at activity start.
-
- **cbopts** - default: none - this is how you customize the cluster
settings for the client, including policies, compression, etc. This
is a string of *Java*-like method calls just as you would use them
@ -238,11 +235,6 @@ activity types.
code base. This is for dynamic codec loading with user-provided codecs mapped
via the internal UDT APIs.
default: false
- **secureconnectbundle** - used to connect to CaaS, accepts a path to the secure connect bundle
that is downloaded from the CaaS UI.
Examples:
- `secureconnectbundle=/tmp/secure-connect-my_db.zip`
- `secureconnectbundle="/home/automaton/secure-connect-my_db.zip"`
- **insights** - Set to false to disable the driver from sending insights monitoring information
- `insights=false`
- **tickduration** - sets the tickDuration (milliseconds) of HashedWheelTimer of the