do not set speculative on cql when it is not provided

This commit is contained in:
Jonathan Shook 2020-09-08 17:47:13 -05:00
parent a2a4a5eacc
commit 7ba14409ad
2 changed files with 97 additions and 90 deletions

View File

@ -153,7 +153,11 @@ public class CQLOptions {
public static SpeculativeExecutionPolicy speculativeFor(String spec) { public static SpeculativeExecutionPolicy speculativeFor(String spec) {
Matcher pctileMatcher = PERCENTILE_EAGER_PATTERN.matcher(spec); Matcher pctileMatcher = PERCENTILE_EAGER_PATTERN.matcher(spec);
Matcher constantMatcher = CONSTANT_EAGER_PATTERN.matcher(spec); Matcher constantMatcher = CONSTANT_EAGER_PATTERN.matcher(spec);
if (pctileMatcher.matches()) { if (spec.toLowerCase().trim().matches("disabled|none")) {
return null;
} else if (spec.toLowerCase().trim().equals("default")) {
return defaultSpeculativePolicy();
} else if (pctileMatcher.matches()) {
double pctile = Double.valueOf(pctileMatcher.group("pctile")); double pctile = Double.valueOf(pctileMatcher.group("pctile"));
if (pctile > 100.0 || pctile < 0.0) { if (pctile > 100.0 || pctile < 0.0) {
throw new RuntimeException("pctile must be between 0.0 and 100.0"); throw new RuntimeException("pctile must be between 0.0 and 100.0");

View File

@ -1,33 +1,10 @@
package io.nosqlbench.activitytype.cql.statements.core; package io.nosqlbench.activitytype.cql.statements.core;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import io.nosqlbench.nb.api.errors.BasicError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.driver.core.Cluster; import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ProtocolOptions; import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions; import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DefaultRetryPolicy; import com.datastax.driver.core.policies.*;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.LoggingRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.RoundRobinPolicy;
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
import com.datastax.driver.core.policies.WhiteListPolicy;
import com.datastax.driver.dse.DseCluster; import com.datastax.driver.dse.DseCluster;
import io.nosqlbench.activitytype.cql.core.CQLOptions; import io.nosqlbench.activitytype.cql.core.CQLOptions;
import io.nosqlbench.activitytype.cql.core.ProxyTranslator; import io.nosqlbench.activitytype.cql.core.ProxyTranslator;
@ -36,13 +13,25 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics; import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.scripting.NashornEvaluator; import io.nosqlbench.engine.api.scripting.NashornEvaluator;
import io.nosqlbench.engine.api.util.SSLKsFactory; import io.nosqlbench.engine.api.util.SSLKsFactory;
import io.nosqlbench.nb.api.errors.BasicError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLContext;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
public class CQLSessionCache implements Shutdownable { public class CQLSessionCache implements Shutdownable {
private final static Logger logger = LoggerFactory.getLogger(CQLSessionCache.class); private final static Logger logger = LoggerFactory.getLogger(CQLSessionCache.class);
private final static String DEFAULT_SESSION_ID = "default"; private final static String DEFAULT_SESSION_ID = "default";
private static final CQLSessionCache instance = new CQLSessionCache(); private static final CQLSessionCache instance = new CQLSessionCache();
private Map<String, Session> sessionCache = new HashMap<>(); private final Map<String, Session> sessionCache = new HashMap<>();
private CQLSessionCache() { private CQLSessionCache() {
} }
@ -73,30 +62,30 @@ public class CQLSessionCache implements Shutdownable {
String driverType = activityDef.getParams().getOptionalString("cqldriver").orElse("dse"); String driverType = activityDef.getParams().getOptionalString("cqldriver").orElse("dse");
Cluster.Builder builder = Cluster.Builder builder =
driverType.toLowerCase().equals("dse") ? DseCluster.builder() : driverType.toLowerCase().equals("dse") ? DseCluster.builder() :
driverType.toLowerCase().equals("oss") ? Cluster.builder() : null; driverType.toLowerCase().equals("oss") ? Cluster.builder() : null;
if (builder==null) { if (builder == null) {
throw new RuntimeException("The driver type '" + driverType + "' is not recognized"); throw new RuntimeException("The driver type '" + driverType + "' is not recognized");
} }
logger.info("Using driver type '" + driverType.toUpperCase() + "'"); logger.info("Using driver type '" + driverType.toUpperCase() + "'");
Optional<String> scb = activityDef.getParams() Optional<String> scb = activityDef.getParams()
.getOptionalString("secureconnectbundle"); .getOptionalString("secureconnectbundle");
scb.map(File::new) scb.map(File::new)
.ifPresent(builder::withCloudSecureConnectBundle); .ifPresent(builder::withCloudSecureConnectBundle);
activityDef.getParams() activityDef.getParams()
.getOptionalString("insights").map(Boolean::parseBoolean) .getOptionalString("insights").map(Boolean::parseBoolean)
.ifPresent(builder::withMonitorReporting); .ifPresent(builder::withMonitorReporting);
String[] contactPoints = activityDef.getParams().getOptionalString("host") String[] contactPoints = activityDef.getParams().getOptionalString("host")
.map(h -> h.split(",")).orElse(null); .map(h -> h.split(",")).orElse(null);
if (contactPoints == null) { if (contactPoints == null) {
contactPoints = activityDef.getParams().getOptionalString("hosts") contactPoints = activityDef.getParams().getOptionalString("hosts")
.map(h -> h.split(",")).orElse(null); .map(h -> h.split(",")).orElse(null);
} }
if (contactPoints == null && scb.isEmpty()) { if (contactPoints == null && scb.isEmpty()) {
contactPoints = new String[]{"localhost"}; contactPoints = new String[]{"localhost"};
@ -143,47 +132,61 @@ public class CQLSessionCache implements Shutdownable {
NashornEvaluator<DseCluster.Builder> clusterEval = new NashornEvaluator<>(DseCluster.Builder.class); NashornEvaluator<DseCluster.Builder> clusterEval = new NashornEvaluator<>(DseCluster.Builder.class);
clusterEval.put("builder", builder); clusterEval.put("builder", builder);
String importEnv = String importEnv =
"load(\"nashorn:mozilla_compat.js\");\n" + "load(\"nashorn:mozilla_compat.js\");\n" +
" importPackage(com.google.common.collect.Lists);\n" + " importPackage(com.google.common.collect.Lists);\n" +
" importPackage(com.google.common.collect.Maps);\n" + " importPackage(com.google.common.collect.Maps);\n" +
" importPackage(com.datastax.driver);\n" + " importPackage(com.datastax.driver);\n" +
" importPackage(com.datastax.driver.core);\n" + " importPackage(com.datastax.driver.core);\n" +
" importPackage(com.datastax.driver.core.policies);\n" + " importPackage(com.datastax.driver.core.policies);\n" +
"builder" + clusteropts.get() + "\n"; "builder" + clusteropts.get() + "\n";
clusterEval.script(importEnv); clusterEval.script(importEnv);
builder = clusterEval.eval(); builder = clusterEval.eval();
logger.info("successfully applied:" + clusteropts.get()); logger.info("successfully applied:" + clusteropts.get());
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Unable to evaluate: " + clusteropts.get() + " in script context:",e); throw new RuntimeException("Unable to evaluate: " + clusteropts.get() + " in script context:", e);
} }
} }
if (activityDef.getParams().getOptionalString("whitelist").isPresent() && if (activityDef.getParams().getOptionalString("whitelist").isPresent() &&
activityDef.getParams().getOptionalString("lbp","loadbalancingpolicy").isPresent()) { activityDef.getParams().getOptionalString("lbp", "loadbalancingpolicy").isPresent()) {
throw new BasicError("You specified both whitelist=.. and lbp=..., if you need whitelist and other policies together," + throw new BasicError("You specified both whitelist=.. and lbp=..., if you need whitelist and other policies together," +
" be sure to use the lbp option only with a whitelist policy included."); " be sure to use the lbp option only with a whitelist policy included.");
} }
SpeculativeExecutionPolicy speculativePolicy = activityDef.getParams() Optional<String> specSpec = activityDef.getParams()
.getOptionalString("speculative") .getOptionalString("speculative");
if (specSpec.isPresent()) {
specSpec
.map(speculative -> { .map(speculative -> {
logger.info("speculative=>" + speculative); logger.info("speculative=>" + speculative);
return speculative; return speculative;
}) })
.map(CQLOptions::speculativeFor) .map(CQLOptions::speculativeFor)
.orElse(CQLOptions.defaultSpeculativePolicy()); .ifPresent(builder::withSpeculativeExecutionPolicy);
builder.withSpeculativeExecutionPolicy(speculativePolicy); } else {
logger.warn(
"If the cql speculative parameter is not provided, it uses a default speculative\n" +
"policy, as in speculative=default, rather than leaving speculative off.\n" +
"If you want to keep this behavior or silence this warning, add one of\n" +
"speculative=none OR speculative=default to your activity params.\n" +
"This release, the default is speculative=default if the parameter is not specified.\n" +
"After Oct/2020, the default will be speculative=none if the parameter is not specified.\n" +
"Note: speculative=default is the same as speculative=p99.0:5:15000\n" +
"which is considered aggressive for some systems. This warning will go away after Oct/2020.\n");
builder.withSpeculativeExecutionPolicy(CQLOptions.defaultSpeculativePolicy());
}
activityDef.getParams().getOptionalString("socketoptions") activityDef.getParams().getOptionalString("socketoptions")
.map(sockopts -> { .map(sockopts -> {
logger.info("socketoptions=>" + sockopts); logger.info("socketoptions=>" + sockopts);
return sockopts; return sockopts;
}) })
.map(CQLOptions::socketOptionsFor) .map(CQLOptions::socketOptionsFor)
.ifPresent(builder::withSocketOptions); .ifPresent(builder::withSocketOptions);
activityDef.getParams().getOptionalString("reconnectpolicy") activityDef.getParams().getOptionalString("reconnectpolicy")
.map(reconnectpolicy-> { .map(reconnectpolicy -> {
logger.info("reconnectpolicy=>" + reconnectpolicy); logger.info("reconnectpolicy=>" + reconnectpolicy);
return reconnectpolicy; return reconnectpolicy;
}) })
@ -192,44 +195,44 @@ public class CQLSessionCache implements Shutdownable {
activityDef.getParams().getOptionalString("pooling") activityDef.getParams().getOptionalString("pooling")
.map(pooling -> { .map(pooling -> {
logger.info("pooling=>" + pooling); logger.info("pooling=>" + pooling);
return pooling; return pooling;
}) })
.map(CQLOptions::poolingOptionsFor) .map(CQLOptions::poolingOptionsFor)
.ifPresent(builder::withPoolingOptions); .ifPresent(builder::withPoolingOptions);
activityDef.getParams().getOptionalString("whitelist") activityDef.getParams().getOptionalString("whitelist")
.map(whitelist -> { .map(whitelist -> {
logger.info("whitelist=>" + whitelist); logger.info("whitelist=>" + whitelist);
return whitelist; return whitelist;
}) })
.map(p -> CQLOptions.whitelistFor(p, null)) .map(p -> CQLOptions.whitelistFor(p, null))
.ifPresent(builder::withLoadBalancingPolicy); .ifPresent(builder::withLoadBalancingPolicy);
activityDef.getParams().getOptionalString("lbp") activityDef.getParams().getOptionalString("lbp")
.map(lbp -> { .map(lbp -> {
logger.info("lbp=>" + lbp); logger.info("lbp=>" + lbp);
return lbp; return lbp;
}) })
.map(p -> CQLOptions.lbpolicyFor(p,null)) .map(p -> CQLOptions.lbpolicyFor(p, null))
.ifPresent(builder::withLoadBalancingPolicy); .ifPresent(builder::withLoadBalancingPolicy);
activityDef.getParams().getOptionalString("tickduration") activityDef.getParams().getOptionalString("tickduration")
.map(tickduration -> { .map(tickduration -> {
logger.info("tickduration=>" + tickduration); logger.info("tickduration=>" + tickduration);
return tickduration; return tickduration;
}) })
.map(CQLOptions::withTickDuration) .map(CQLOptions::withTickDuration)
.ifPresent(builder::withNettyOptions); .ifPresent(builder::withNettyOptions);
activityDef.getParams().getOptionalString("compression") activityDef.getParams().getOptionalString("compression")
.map(compression -> { .map(compression -> {
logger.info("compression=>" + compression); logger.info("compression=>" + compression);
return compression; return compression;
}) })
.map(CQLOptions::withCompression) .map(CQLOptions::withCompression)
.ifPresent(builder::withCompression); .ifPresent(builder::withCompression);
SSLContext context = SSLKsFactory.get().getContext(activityDef); SSLContext context = SSLKsFactory.get().getContext(activityDef);
@ -238,8 +241,8 @@ public class CQLSessionCache implements Shutdownable {
} }
RetryPolicy retryPolicy = activityDef.getParams() RetryPolicy retryPolicy = activityDef.getParams()
.getOptionalString("retrypolicy") .getOptionalString("retrypolicy")
.map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE); .map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE);
if (retryPolicy instanceof LoggingRetryPolicy) { if (retryPolicy instanceof LoggingRetryPolicy) {
logger.info("using LoggingRetryPolicy"); logger.info("using LoggingRetryPolicy");
@ -265,7 +268,7 @@ public class CQLSessionCache implements Shutdownable {
// Apply default idempotence, if set // Apply default idempotence, if set
activityDef.getParams().getOptionalBoolean("defaultidempotence").map( activityDef.getParams().getOptionalBoolean("defaultidempotence").map(
b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b) b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b)
); );
Session session = cl.newSession(); Session session = cl.newSession();