mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Add support for specifying an haproxy header for new cql connections
This commit is contained in:
parent
ee51f4cee3
commit
863f8f24e5
@ -64,6 +64,12 @@
|
||||
<version>4.8</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec-haproxy</artifactId>
|
||||
<version>4.1.54.Final</version>
|
||||
</dependency>
|
||||
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>io.netty</groupId>-->
|
||||
<!-- <artifactId>netty-transport-native-epoll</artifactId>-->
|
||||
|
@ -1,11 +1,38 @@
|
||||
package io.nosqlbench.activitytype.cql.statements.core;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.Inet6Address;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import com.datastax.driver.core.Cluster;
|
||||
import com.datastax.driver.core.NettyOptions;
|
||||
import com.datastax.driver.core.ProtocolOptions;
|
||||
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
|
||||
import com.datastax.driver.core.Session;
|
||||
import com.datastax.driver.core.policies.*;
|
||||
import com.datastax.driver.core.policies.DefaultRetryPolicy;
|
||||
import com.datastax.driver.core.policies.LoadBalancingPolicy;
|
||||
import com.datastax.driver.core.policies.LoggingRetryPolicy;
|
||||
import com.datastax.driver.core.policies.RetryPolicy;
|
||||
import com.datastax.driver.core.policies.RoundRobinPolicy;
|
||||
import com.datastax.driver.core.policies.WhiteListPolicy;
|
||||
import com.datastax.driver.dse.DseCluster;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.handler.codec.haproxy.HAProxyCommand;
|
||||
import io.netty.handler.codec.haproxy.HAProxyMessage;
|
||||
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
|
||||
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
|
||||
import io.nosqlbench.activitytype.cql.core.CQLOptions;
|
||||
import io.nosqlbench.activitytype.cql.core.ProxyTranslator;
|
||||
import io.nosqlbench.engine.api.activityapi.core.Shutdownable;
|
||||
@ -18,15 +45,6 @@ import io.nosqlbench.nb.api.errors.BasicError;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
|
||||
public class CQLSessionCache implements Shutdownable {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(CQLSessionCache.class);
|
||||
@ -254,6 +272,48 @@ public class CQLSessionCache implements Shutdownable {
|
||||
builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy);
|
||||
}
|
||||
|
||||
activityDef.getParams().getOptionalString("haproxy_source_ip").map(
|
||||
ip -> {
|
||||
return new NettyOptions()
|
||||
{
|
||||
@Override
|
||||
public void afterChannelInitialized(SocketChannel channel) throws Exception
|
||||
{
|
||||
try
|
||||
{
|
||||
InetAddress sourceIp = InetAddress.getByName(ip);
|
||||
InetAddress destIp = activityDef.getParams().getOptionalString("haproxy_dest_ip").map(destip -> {
|
||||
try
|
||||
{
|
||||
return InetAddress.getByName(destip);
|
||||
}
|
||||
catch (UnknownHostException e)
|
||||
{
|
||||
logger.warn("Invalid haproxy_dest_ip {}", destip);
|
||||
return sourceIp;
|
||||
}
|
||||
}
|
||||
).orElse(sourceIp);
|
||||
|
||||
channel.pipeline().addFirst("proxyProtocol", new ProxyProtocolHander(
|
||||
new HAProxyMessage(
|
||||
HAProxyProtocolVersion.V1,
|
||||
HAProxyCommand.PROXY,
|
||||
sourceIp instanceof Inet6Address ? HAProxyProxiedProtocol.TCP6 : HAProxyProxiedProtocol.TCP4,
|
||||
sourceIp.getHostAddress(),
|
||||
destIp.getHostAddress(),
|
||||
8000,
|
||||
8000)));
|
||||
}
|
||||
catch (UnknownHostException e)
|
||||
{
|
||||
logger.warn("Invalid haproxy_source_ip {}", ip);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
).ifPresent(builder::withNettyOptions);
|
||||
|
||||
Cluster cl = builder.build();
|
||||
|
||||
// Apply default idempotence, if set
|
||||
|
@ -0,0 +1,27 @@
|
||||
package io.nosqlbench.activitytype.cql.statements.core;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.handler.codec.haproxy.HAProxyMessage;
|
||||
import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
|
||||
|
||||
class ProxyProtocolHander extends ChannelOutboundHandlerAdapter
|
||||
{
|
||||
private final AtomicBoolean sent = new AtomicBoolean(false);
|
||||
private final HAProxyMessage message;
|
||||
|
||||
ProxyProtocolHander(HAProxyMessage message) {
|
||||
this.message = message;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||
if (sent.compareAndSet(false, true))
|
||||
HAProxyMessageEncoder.INSTANCE.write(ctx, message, ctx.voidPromise());
|
||||
|
||||
super.write(ctx, msg, promise);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user