From 7ba14409ade08f92ded6bb8d64b35802c4d1be4c Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Tue, 8 Sep 2020 17:47:13 -0500 Subject: [PATCH] do not set speculative on cql when it is not provided --- .../activitytype/cql/core/CQLOptions.java | 6 +- .../cql/statements/core/CQLSessionCache.java | 181 +++++++++--------- 2 files changed, 97 insertions(+), 90 deletions(-) diff --git a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CQLOptions.java b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CQLOptions.java index 3952bc961..660bc5fc5 100644 --- a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CQLOptions.java +++ b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/core/CQLOptions.java @@ -153,7 +153,11 @@ public class CQLOptions { public static SpeculativeExecutionPolicy speculativeFor(String spec) { Matcher pctileMatcher = PERCENTILE_EAGER_PATTERN.matcher(spec); Matcher constantMatcher = CONSTANT_EAGER_PATTERN.matcher(spec); - if (pctileMatcher.matches()) { + if (spec.toLowerCase().trim().matches("disabled|none")) { + return null; + } else if (spec.toLowerCase().trim().equals("default")) { + return defaultSpeculativePolicy(); + } else if (pctileMatcher.matches()) { double pctile = Double.valueOf(pctileMatcher.group("pctile")); if (pctile > 100.0 || pctile < 0.0) { throw new RuntimeException("pctile must be between 0.0 and 100.0"); diff --git a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java index 1dc4261c2..8aa3fe0db 100644 --- a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java +++ b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java @@ -1,33 +1,10 @@ package io.nosqlbench.activitytype.cql.statements.core; -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import javax.net.ssl.SSLContext; - -import io.nosqlbench.nb.api.errors.BasicError; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ProtocolOptions; import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions; import com.datastax.driver.core.Session; -import com.datastax.driver.core.policies.DefaultRetryPolicy; -import com.datastax.driver.core.policies.LoadBalancingPolicy; -import com.datastax.driver.core.policies.LoggingRetryPolicy; -import com.datastax.driver.core.policies.RetryPolicy; -import com.datastax.driver.core.policies.RoundRobinPolicy; -import com.datastax.driver.core.policies.SpeculativeExecutionPolicy; -import com.datastax.driver.core.policies.WhiteListPolicy; +import com.datastax.driver.core.policies.*; import com.datastax.driver.dse.DseCluster; import io.nosqlbench.activitytype.cql.core.CQLOptions; import io.nosqlbench.activitytype.cql.core.ProxyTranslator; @@ -36,13 +13,25 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.metrics.ActivityMetrics; import io.nosqlbench.engine.api.scripting.NashornEvaluator; import io.nosqlbench.engine.api.util.SSLKsFactory; +import io.nosqlbench.nb.api.errors.BasicError; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; public class CQLSessionCache implements Shutdownable { private final static Logger logger = LoggerFactory.getLogger(CQLSessionCache.class); private final static String DEFAULT_SESSION_ID = "default"; private static final CQLSessionCache instance = new CQLSessionCache(); - private Map sessionCache = new HashMap<>(); + private final Map sessionCache = new HashMap<>(); private CQLSessionCache() { } @@ -73,30 +62,30 @@ public class CQLSessionCache implements Shutdownable { String driverType = activityDef.getParams().getOptionalString("cqldriver").orElse("dse"); Cluster.Builder builder = - driverType.toLowerCase().equals("dse") ? DseCluster.builder() : - driverType.toLowerCase().equals("oss") ? Cluster.builder() : null; + driverType.toLowerCase().equals("dse") ? DseCluster.builder() : + driverType.toLowerCase().equals("oss") ? Cluster.builder() : null; - if (builder==null) { + if (builder == null) { throw new RuntimeException("The driver type '" + driverType + "' is not recognized"); } logger.info("Using driver type '" + driverType.toUpperCase() + "'"); Optional scb = activityDef.getParams() - .getOptionalString("secureconnectbundle"); - scb.map(File::new) - .ifPresent(builder::withCloudSecureConnectBundle); + .getOptionalString("secureconnectbundle"); + scb.map(File::new) + .ifPresent(builder::withCloudSecureConnectBundle); activityDef.getParams() - .getOptionalString("insights").map(Boolean::parseBoolean) - .ifPresent(builder::withMonitorReporting); + .getOptionalString("insights").map(Boolean::parseBoolean) + .ifPresent(builder::withMonitorReporting); String[] contactPoints = activityDef.getParams().getOptionalString("host") - .map(h -> h.split(",")).orElse(null); + .map(h -> h.split(",")).orElse(null); if (contactPoints == null) { contactPoints = activityDef.getParams().getOptionalString("hosts") - .map(h -> h.split(",")).orElse(null); + .map(h -> h.split(",")).orElse(null); } if (contactPoints == null && scb.isEmpty()) { contactPoints = new String[]{"localhost"}; @@ -143,47 +132,61 @@ public class CQLSessionCache implements Shutdownable { NashornEvaluator clusterEval = new NashornEvaluator<>(DseCluster.Builder.class); clusterEval.put("builder", builder); String importEnv = - "load(\"nashorn:mozilla_compat.js\");\n" + - " importPackage(com.google.common.collect.Lists);\n" + - " importPackage(com.google.common.collect.Maps);\n" + - " importPackage(com.datastax.driver);\n" + - " importPackage(com.datastax.driver.core);\n" + - " importPackage(com.datastax.driver.core.policies);\n" + - "builder" + clusteropts.get() + "\n"; + "load(\"nashorn:mozilla_compat.js\");\n" + + " importPackage(com.google.common.collect.Lists);\n" + + " importPackage(com.google.common.collect.Maps);\n" + + " importPackage(com.datastax.driver);\n" + + " importPackage(com.datastax.driver.core);\n" + + " importPackage(com.datastax.driver.core.policies);\n" + + "builder" + clusteropts.get() + "\n"; clusterEval.script(importEnv); builder = clusterEval.eval(); logger.info("successfully applied:" + clusteropts.get()); } catch (Exception e) { - throw new RuntimeException("Unable to evaluate: " + clusteropts.get() + " in script context:",e); + throw new RuntimeException("Unable to evaluate: " + clusteropts.get() + " in script context:", e); } } if (activityDef.getParams().getOptionalString("whitelist").isPresent() && - activityDef.getParams().getOptionalString("lbp","loadbalancingpolicy").isPresent()) { + activityDef.getParams().getOptionalString("lbp", "loadbalancingpolicy").isPresent()) { throw new BasicError("You specified both whitelist=.. and lbp=..., if you need whitelist and other policies together," + - " be sure to use the lbp option only with a whitelist policy included."); + " be sure to use the lbp option only with a whitelist policy included."); } - SpeculativeExecutionPolicy speculativePolicy = activityDef.getParams() - .getOptionalString("speculative") + Optional specSpec = activityDef.getParams() + .getOptionalString("speculative"); + + if (specSpec.isPresent()) { + specSpec .map(speculative -> { logger.info("speculative=>" + speculative); return speculative; }) .map(CQLOptions::speculativeFor) - .orElse(CQLOptions.defaultSpeculativePolicy()); - builder.withSpeculativeExecutionPolicy(speculativePolicy); + .ifPresent(builder::withSpeculativeExecutionPolicy); + } else { + logger.warn( + "If the cql speculative parameter is not provided, it uses a default speculative\n" + + "policy, as in speculative=default, rather than leaving speculative off.\n" + + "If you want to keep this behavior or silence this warning, add one of\n" + + "speculative=none OR speculative=default to your activity params.\n" + + "This release, the default is speculative=default if the parameter is not specified.\n" + + "After Oct/2020, the default will be speculative=none if the parameter is not specified.\n" + + "Note: speculative=default is the same as speculative=p99.0:5:15000\n" + + "which is considered aggressive for some systems. This warning will go away after Oct/2020.\n"); + builder.withSpeculativeExecutionPolicy(CQLOptions.defaultSpeculativePolicy()); + } activityDef.getParams().getOptionalString("socketoptions") - .map(sockopts -> { - logger.info("socketoptions=>" + sockopts); - return sockopts; - }) - .map(CQLOptions::socketOptionsFor) - .ifPresent(builder::withSocketOptions); + .map(sockopts -> { + logger.info("socketoptions=>" + sockopts); + return sockopts; + }) + .map(CQLOptions::socketOptionsFor) + .ifPresent(builder::withSocketOptions); activityDef.getParams().getOptionalString("reconnectpolicy") - .map(reconnectpolicy-> { + .map(reconnectpolicy -> { logger.info("reconnectpolicy=>" + reconnectpolicy); return reconnectpolicy; }) @@ -192,44 +195,44 @@ public class CQLSessionCache implements Shutdownable { activityDef.getParams().getOptionalString("pooling") - .map(pooling -> { - logger.info("pooling=>" + pooling); - return pooling; - }) - .map(CQLOptions::poolingOptionsFor) - .ifPresent(builder::withPoolingOptions); + .map(pooling -> { + logger.info("pooling=>" + pooling); + return pooling; + }) + .map(CQLOptions::poolingOptionsFor) + .ifPresent(builder::withPoolingOptions); activityDef.getParams().getOptionalString("whitelist") - .map(whitelist -> { - logger.info("whitelist=>" + whitelist); - return whitelist; - }) - .map(p -> CQLOptions.whitelistFor(p, null)) - .ifPresent(builder::withLoadBalancingPolicy); + .map(whitelist -> { + logger.info("whitelist=>" + whitelist); + return whitelist; + }) + .map(p -> CQLOptions.whitelistFor(p, null)) + .ifPresent(builder::withLoadBalancingPolicy); activityDef.getParams().getOptionalString("lbp") - .map(lbp -> { - logger.info("lbp=>" + lbp); - return lbp; - }) - .map(p -> CQLOptions.lbpolicyFor(p,null)) - .ifPresent(builder::withLoadBalancingPolicy); + .map(lbp -> { + logger.info("lbp=>" + lbp); + return lbp; + }) + .map(p -> CQLOptions.lbpolicyFor(p, null)) + .ifPresent(builder::withLoadBalancingPolicy); activityDef.getParams().getOptionalString("tickduration") - .map(tickduration -> { - logger.info("tickduration=>" + tickduration); - return tickduration; - }) - .map(CQLOptions::withTickDuration) - .ifPresent(builder::withNettyOptions); + .map(tickduration -> { + logger.info("tickduration=>" + tickduration); + return tickduration; + }) + .map(CQLOptions::withTickDuration) + .ifPresent(builder::withNettyOptions); activityDef.getParams().getOptionalString("compression") - .map(compression -> { - logger.info("compression=>" + compression); - return compression; - }) - .map(CQLOptions::withCompression) - .ifPresent(builder::withCompression); + .map(compression -> { + logger.info("compression=>" + compression); + return compression; + }) + .map(CQLOptions::withCompression) + .ifPresent(builder::withCompression); SSLContext context = SSLKsFactory.get().getContext(activityDef); @@ -238,8 +241,8 @@ public class CQLSessionCache implements Shutdownable { } RetryPolicy retryPolicy = activityDef.getParams() - .getOptionalString("retrypolicy") - .map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE); + .getOptionalString("retrypolicy") + .map(CQLOptions::retryPolicyFor).orElse(DefaultRetryPolicy.INSTANCE); if (retryPolicy instanceof LoggingRetryPolicy) { logger.info("using LoggingRetryPolicy"); @@ -265,7 +268,7 @@ public class CQLSessionCache implements Shutdownable { // Apply default idempotence, if set activityDef.getParams().getOptionalBoolean("defaultidempotence").map( - b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b) + b -> cl.getConfiguration().getQueryOptions().setDefaultIdempotence(b) ); Session session = cl.newSession();