reconnectpolicy

This commit is contained in:
phact 2020-04-06 11:52:49 -04:00
parent 952e6f6ee8
commit f650d2e9be
4 changed files with 42 additions and 0 deletions

View File

@ -3,6 +3,7 @@ package io.nosqlbench.activitytype.cql.core;
import com.datastax.driver.core.*;
import com.datastax.driver.core.policies.*;
import io.netty.util.HashedWheelTimer;
import io.nosqlbench.engine.api.exceptions.BasicError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -83,6 +84,24 @@ public class CQLOptions {
return retryPolicy;
}
public static ReconnectionPolicy reconnectPolicyFor(String spec) {
if(spec.startsWith("exponential(")){
String argsString = spec.substring(12);
String[] args = argsString.substring(0, argsString.length() - 1).split("[,;]");
if (args.length != 2){
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>)");
}
long baseDelay = Long.parseLong(args[0]);
long maxDelay = Long.parseLong(args[1]);
return new ExponentialReconnectionPolicy(baseDelay,maxDelay);
}else if(spec.startsWith("constant(")){
String argsString = spec.substring(9);
long constantDelayMs= Long.parseLong(argsString.substring(0, argsString.length() - 1));
return new ConstantReconnectionPolicy(constantDelayMs);
}
throw new BasicError("Invalid reconnectionpolicy, try reconnectionpolicy=exponential(<baseDelay>, <maxDelay>) or constant(<constantDelayMs>)");
}
public static SocketOptions socketOptionsFor(String spec) {
String[] assignments = spec.split("[,;]");
Map<String, String> values = new HashMap<>();

View File

@ -162,6 +162,15 @@ public class CQLSessionCache implements Shutdownable {
.map(CQLOptions::socketOptionsFor)
.ifPresent(builder::withSocketOptions);
activityDef.getParams().getOptionalString("reconnectpolicy")
.map(reconnectpolicy-> {
logger.info("reconnectpolicy=>" + reconnectpolicy);
return reconnectpolicy;
})
.map(CQLOptions::reconnectPolicyFor)
.ifPresent(builder::withReconnectionPolicy);
activityDef.getParams().getOptionalString("pooling")
.map(pooling -> {
logger.info("pooling=>" + pooling);

View File

@ -66,6 +66,11 @@ activity types.
The only option supported for this version is `retrypolicy=logging`,
which uses the default retry policy, but with logging added.
- **reconnectpolicy** default: none - Applies a reconnection policy in the driver
Supports either `reconnectpolicy=exponential(minDelayInMs,maxDelayInMs)` or `reconnectpolicy=constant(delayInMs)`.
The driver reconnects using this policy when the entire cluster becomes unavailable.
- **pooling** default: none - Applies the connection pooling options
to the policy.
Examples:

View File

@ -4,6 +4,8 @@ import com.datastax.driver.core.HostDistance;
import com.datastax.driver.core.PoolingOptions;
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
import io.nosqlbench.activitytype.cql.core.CQLOptions;
import org.junit.Test;
@ -32,6 +34,13 @@ public class CQLOptionsTest {
assertThat(lbp).isNotNull();
}
@Test
public void testReconnectPolicyPatterns() {
ReconnectionPolicy rp = CQLOptions.reconnectPolicyFor("exponential(123,321)");
rp = CQLOptions.reconnectPolicyFor("constant(123)");
}
@Test
public void testSocketOptionPatterns() {
SocketOptions so = CQLOptions.socketOptionsFor("read_timeout_ms=23423,connect_timeout_ms=2344;keep_alive:true,reuse_address:true;so_linger:323;tcp_no_delay=true;receive_buffer_size:100,send_buffer_size=1000");