mirror of
synced 2025-02-25 18:55:28 -06:00
Allow easy load balancer configuration for CQL driver #173
This commit is contained in:
@ -85,21 +85,21 @@ public class CQLOptions {
public static ReconnectionPolicy reconnectPolicyFor(String spec) {
String argsString = spec.substring(12);
String[] args = argsString.substring(0, argsString.length() - 1).split("[,;]");
if (args.length != 2){
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>)");
long baseDelay = Long.parseLong(args[0]);
long maxDelay = Long.parseLong(args[1]);
return new ExponentialReconnectionPolicy(baseDelay,maxDelay);
}else if(spec.startsWith("constant(")){
String argsString = spec.substring(9);
long constantDelayMs= Long.parseLong(argsString.substring(0, argsString.length() - 1));
return new ConstantReconnectionPolicy(constantDelayMs);
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>) or constant(<constantDelayMs>)");
if (spec.startsWith("exponential(")) {
String argsString = spec.substring(12);
String[] args = argsString.substring(0, argsString.length() - 1).split("[,;]");
if (args.length != 2) {
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>)");
long baseDelay = Long.parseLong(args[0]);
long maxDelay = Long.parseLong(args[1]);
return new ExponentialReconnectionPolicy(baseDelay, maxDelay);
} else if (spec.startsWith("constant(")) {
String argsString = spec.substring(9);
long constantDelayMs = Long.parseLong(argsString.substring(0, argsString.length() - 1));
return new ConstantReconnectionPolicy(constantDelayMs);
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>) or constant(<constantDelayMs>)");
public static SocketOptions socketOptionsFor(String spec) {
@ -188,6 +188,146 @@ public class CQLOptions {
return new WhiteListPolicy(innerPolicy, sockAddrs);
public static LoadBalancingPolicy lbpolicyFor(String polspec, LoadBalancingPolicy policy) {
Pattern polcall = Pattern.compile(",?(?<policyname>\\w+)\\((?<args>[^)]+)?\\)");
Matcher matcher = polcall.matcher(polspec);
Deque<List<String>> policies = new ArrayDeque<>();
while (matcher.find()) {
String policyname = matcher.group("policyname");
String argsgroup = matcher.group("args");
String args = argsgroup==null ? "" : argsgroup;
logger.debug("policyname=" + policyname);
logger.debug("args=" + args);
// reverse order for proper nesting
while (policies.size()>0) {
List<String> nextpolicy = policies.pop();
String policyname = nextpolicy.get(0)
.replaceAll("_", "")
.replaceAll("policy", "");
String argslist = nextpolicy.get(1);
String[] args= argslist.isBlank() ? new String[0] : argslist.split(",");
switch (policyname) {
case "WLP":
case "whitelist":
List<InetSocketAddress> sockAddrs = Arrays.stream(args)
policy = new WhiteListPolicy(policy, sockAddrs);
case "TAP":
case "tokenaware":
TokenAwarePolicy.ReplicaOrdering ordering = TokenAwarePolicy.ReplicaOrdering.NEUTRAL;
if (args.length==1) {
if (args[0].startsWith("ordering=") || args[0].startsWith("ordering:")) {
String orderingSpec = args[0].substring("ordering=".length()).toUpperCase();
} else {
throw new BasicError("Unrecognized option for " + TokenAwarePolicy.class.getCanonicalName());
policy = new TokenAwarePolicy(policy, ordering);
case "LAP":
case "latencyaware":
policy = latencyAwarePolicyFor(args,policy);
case "DCARRP":
case "dcawareroundrobin":
case "datacenterawareroundrobin":
if (policy!=null) {
throw new BasicError(DCAwareRoundRobinPolicy.class.getCanonicalName() + " can not wrap another policy.");
policy = dcAwareRoundRobinPolicyFor(args);
throw new BasicError("Unrecognized policy selector '" + policyname + "', please select one of WLP,TAP,LAP,DCARRP, or " +
"one of whitelist, tokenaware, latencyaware, dcawareroundrobin.");
return policy;
private static LoadBalancingPolicy dcAwareRoundRobinPolicyFor(String[] args) {
if (args.length==0){
throw new BasicError(DCAwareRoundRobinPolicy.class.getCanonicalName() + " requires a local DC name.");
DCAwareRoundRobinPolicy.Builder builder = DCAwareRoundRobinPolicy.builder();
for (String arg : args) {
String[] kv = arg.split("[:=]", 2);
if (kv.length != 2) {
throw new BasicError("LatencyAwarePolicy specifier requires named parameters like `exclusion_threshold=23.0`");
switch(kv[0]) {
case "local":
case "localdc":
throw new BasicError("Unknown option for " + DCAwareRoundRobinPolicy.class.getSimpleName() + ": '" + kv[0] + "'");
return builder.build();
private static LoadBalancingPolicy latencyAwarePolicyFor(String[] args, LoadBalancingPolicy childPolicy) {
LatencyAwarePolicy.Builder builder = LatencyAwarePolicy.builder(childPolicy);
for (String arg : args) {
String[] kv = arg.split("[:=]", 2);
if (kv.length != 2) {
throw new BasicError("LatencyAwarePolicy specifier requires named parameters like `exclusion_threshold=23.0`");
switch (kv[0]) {
case "exclusion_threshold":
case "et":
builder = builder.withExclusionThreshold(Double.parseDouble(kv[1]));
case "minimum_measurements":
case "mm":
builder = builder.withMininumMeasurements(Integer.parseInt(kv[1]));
case "retry_period_ms":
case "rp_ms":
builder = builder.withRetryPeriod(Long.parseLong(kv[1]), TimeUnit.MILLISECONDS);
case "retry_period":
case "rp":
builder = builder.withRetryPeriod(Long.parseLong(kv[1]), TimeUnit.SECONDS);
case "scale":
case "s":
builder = builder.withScale(Long.parseLong(kv[1]), TimeUnit.SECONDS);
case "scale_ms":
case "s_ms":
builder = builder.withScale(Long.parseLong(kv[1]), TimeUnit.MILLISECONDS);
case "update_rate":
case "ur":
builder.withUpdateRate(Long.parseLong(kv[1]), TimeUnit.SECONDS);
case "update_rate_ms":
case "ur_ms":
builder.withUpdateRate(Long.parseLong(kv[1]), TimeUnit.MILLISECONDS);
return builder.build();
public static NettyOptions withTickDuration(String tick) {
logger.info("Cluster builder using custom tick duration value for HashedWheelTimer: " + tick + " milliseconds");
int tickDuration = Integer.valueOf(tick);
@ -13,6 +13,7 @@ import java.util.Map;
import java.util.Optional;
import javax.net.ssl.SSLContext;
import io.nosqlbench.nb.api.errors.BasicError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -158,6 +159,12 @@ public class CQLSessionCache implements Shutdownable {
if (activityDef.getParams().getOptionalString("whitelist").isPresent() &&
activityDef.getParams().getOptionalString("lbp","loadbalancingpolicy").isPresent()) {
throw new BasicError("You specified both whitelist=.. and lbp=..., if you need whitelist and other policies together," +
" be sure to use the lbp option only with a whitelist policy included.");
SpeculativeExecutionPolicy speculativePolicy = activityDef.getParams()
.map(speculative -> {
@ -201,6 +208,14 @@ public class CQLSessionCache implements Shutdownable {
.map(p -> CQLOptions.whitelistFor(p, null))
.map(lbp -> {
logger.info("lbp=>" + lbp);
return lbp;
.map(p -> CQLOptions.lbpolicyFor(p,null))
.map(tickduration -> {
logger.info("tickduration=>" + tickduration);
@ -217,6 +232,7 @@ public class CQLSessionCache implements Shutdownable {
SSLContext context = SSLKsFactory.get().getContext(activityDef);
if (context != null) {
Normal file
Normal file
@ -0,0 +1,77 @@
# CQL Load Balancing Options
WIth the CQL driver, you may configure the load balancing with the same options you might use in
client code. However, they are expressed here in a command-line friendly form.
## Combining Policies
To apply these load balancer policies, set the activity parameter `lbp` with a comma-separated list
of policies from the examples below.
They are build as a nested set of polices, with the semantics of "and then". For example, the
TokenAwarePolicy followed by the LatencyAwarePolicy looks like `TAP(...),LAP(...)` which means
`TokenAwarePolicy(...)` and then `LatencyAwarePolicy(...)`. This is equivalent to Java code which
first constructs a LatencyAwarePolicy and then wraps it with a TokenAwarePolicy. This follows the
notion that the outer-most policy has primary control over options presented to child policies, and
thus you can think of the routing process as "TokenAwarePolicy decides ... " *and then* with what it
shares with the wrapped child policy, "LatencyAwarePolicy decides...", and so on.
Even though you can use the simple pollicy descriptions above, they are constructed in the same
programmatic way in Java that you would use to nest them in the specified order.
For example, a token aware policy wrapping a white list policy might look like this on your command
## Supported Load Balancer Policies
Each supported policy is described in detail below, with the options supported.
### WLP: White List Policy
Format: `WLP(addr,...)`
### TAP: Token Aware Policy
Format: `TAP()`
### LAP: Latency Aware Policy
This policy has many optional parameters, so if you use it you must set them by name.
Format: `LAP(options...)`, where each option is one of the following:
- `exclusion_threshold` (or `et`) - The exclusion threshold, or how much worse a node has to be to
be excluded for awhile. Javadoc: The default exclusion threshold (if this method is not called) is
`2`. In other words, the resulting policy excludes nodes that are more than twice slower than the
fastest node.
- `minimum_measurements` (or `mm`) - The minimum number of measurements to take before penalizing a
host. Javadoc: The default for this option (if this method is not called) is `50`. Note that it is
probably not a good idea to put this option too low if only to avoid the influence of JVM warm-up
on newly restarted nodes.
- `retry_period` (or `rp`) - The retry period, in seconds. Javadoc: The retry period defines how
long a node may be penalized by the policy before it is given a 2nd chance. This is 10 seconds by
- `retry_period_ms` (or `rp_ms`) - The retry period, in milliseconds. This is the same as above, but
allows you to have more precise control if needed.
- `scale` (or `s`) - The scale parameter adjusts how abruptly the most recent measurements are
scaled down in the moving average over time. 100ms is the default. Higher values reduce the
significance of more recent measurements, lower values increase it. The default is 100ms.
- `scale_ms` - The scale parameter, in milliseconds. This is the same as above, but allows you to
have more prcise control if needed.
- `update_rate` (or `ur`) - How often a node's latency average is computed. The default is 1/10
- `update_rate_ms` (or `ur_ms`) - The update rate, in milliseconds.
- `lbp="LAP(mm=10,rp_ms=10000)"`
- `lbp="LatencyAwarePolicy(minimum_measurements=10,retry_period_ms=10000)"`
### DCARRP: DC-Aware Round Robin Policy
Format: `DCARRP(localdc=somedcname)`
This load balancing policy does not expose other non-deprecated options in the bundled version of
the driver, and the datacenter name is required.
@ -225,6 +225,19 @@ activity types.
with EBDSE.
- **showcql** - logs cql statements as INFO (to see INFO messages in stdout use -v or greater) Note: this is expensive
and should only be done to troubleshoot workloads. Do not use `showcql` for your tests.
- **lbp** - configures the load balancing policies for the Java driver. With this parameter, you can
configure nested load balancing policies in short-hand form.
The policies available are documented in detail under the help topic `cql-loadbalancing`. See that
guide if you need more than the examples below.
- `lbp=LAP(retry_period=3,scale=10)` - Latency aware policy with retry period of 3 seconds.
(Seconds is the default time unit, unless _ms parameter is used) and scale 10.
- `lbp=LAP(rp=3,s=10)` - Same as above, using the equivalent but terser form.
- `lbp=LAP(rp_ms=3000,s_ms=10000)` - Same as above, with milliseconds instead of
- `loadbalancing=LAP(s=10),TAP(
### CQL YAML Parameters
Reference in New Issue
Block a user