diff --git a/driver-cql-shaded/pom.xml b/driver-cql-shaded/pom.xml
index 957c45869..00a6c0b85 100644
--- a/driver-cql-shaded/pom.xml
+++ b/driver-cql-shaded/pom.xml
@@ -64,6 +64,12 @@
4.8
+
+ io.netty
+ netty-codec-haproxy
+ 4.1.54.Final
+
+
diff --git a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java
index 82787d3bb..7db1c24bd 100644
--- a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java
+++ b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java
@@ -1,11 +1,38 @@
package io.nosqlbench.activitytype.cql.statements.core;
+import java.io.File;
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.net.ssl.SSLContext;
+
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.NettyOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.policies.*;
+import com.datastax.driver.core.policies.DefaultRetryPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.LoggingRetryPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
import com.datastax.driver.dse.DseCluster;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.nosqlbench.activitytype.cql.core.CQLOptions;
import io.nosqlbench.activitytype.cql.core.ProxyTranslator;
import io.nosqlbench.engine.api.activityapi.core.Shutdownable;
@@ -18,15 +45,6 @@ import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.*;
-
public class CQLSessionCache implements Shutdownable {
private final static Logger logger = LogManager.getLogger(CQLSessionCache.class);
@@ -254,6 +272,48 @@ public class CQLSessionCache implements Shutdownable {
builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy);
}
+ activityDef.getParams().getOptionalString("haproxy_source_ip").map(
+ ip -> {
+ return new NettyOptions()
+ {
+ @Override
+ public void afterChannelInitialized(SocketChannel channel) throws Exception
+ {
+ try
+ {
+ InetAddress sourceIp = InetAddress.getByName(ip);
+ InetAddress destIp = activityDef.getParams().getOptionalString("haproxy_dest_ip").map(destip -> {
+ try
+ {
+ return InetAddress.getByName(destip);
+ }
+ catch (UnknownHostException e)
+ {
+ logger.warn("Invalid haproxy_dest_ip {}", destip);
+ return sourceIp;
+ }
+ }
+ ).orElse(sourceIp);
+
+ channel.pipeline().addFirst("proxyProtocol", new ProxyProtocolHander(
+ new HAProxyMessage(
+ HAProxyProtocolVersion.V1,
+ HAProxyCommand.PROXY,
+ sourceIp instanceof Inet6Address ? HAProxyProxiedProtocol.TCP6 : HAProxyProxiedProtocol.TCP4,
+ sourceIp.getHostAddress(),
+ destIp.getHostAddress(),
+ 8000,
+ 8000)));
+ }
+ catch (UnknownHostException e)
+ {
+ logger.warn("Invalid haproxy_source_ip {}", ip);
+ }
+ }
+ };
+ }
+ ).ifPresent(builder::withNettyOptions);
+
Cluster cl = builder.build();
// Apply default idempotence, if set
diff --git a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/ProxyProtocolHandler.java b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/ProxyProtocolHandler.java
new file mode 100644
index 000000000..41f02c62d
--- /dev/null
+++ b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/ProxyProtocolHandler.java
@@ -0,0 +1,27 @@
+package io.nosqlbench.activitytype.cql.statements.core;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+
+class ProxyProtocolHander extends ChannelOutboundHandlerAdapter
+{
+ private final AtomicBoolean sent = new AtomicBoolean(false);
+ private final HAProxyMessage message;
+
+ ProxyProtocolHander(HAProxyMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ if (sent.compareAndSet(false, true))
+ HAProxyMessageEncoder.INSTANCE.write(ctx, message, ctx.voidPromise());
+
+ super.write(ctx, msg, promise);
+ }
+}
\ No newline at end of file