diff --git a/RELEASENOTES.md b/RELEASENOTES.md
index 1ec46b7c9..85087b733 100644
--- a/RELEASENOTES.md
+++ b/RELEASENOTES.md
@@ -23,3 +23,8 @@
- fc42ff47 disable hashed file function for now
- 83a40b01 provide additional signature for HashedFileExtract
- cdf2bd59 make info less chatty by pushing some things to debug
+- 863f8f24 Add support for specifying an haproxy header for new cql connections
+- 6f56c134 Add ByteBufferSizedHashed
+- 4620f312 (origin/http-docs-nit) http docs
+- e5de8714 reorder build
+- cd86ea75 improve error handling
diff --git a/driver-cql-shaded/pom.xml b/driver-cql-shaded/pom.xml
index 957c45869..00a6c0b85 100644
--- a/driver-cql-shaded/pom.xml
+++ b/driver-cql-shaded/pom.xml
@@ -64,6 +64,12 @@
4.8
+
+ io.netty
+ netty-codec-haproxy
+ 4.1.54.Final
+
+
diff --git a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java
index 82787d3bb..7db1c24bd 100644
--- a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java
+++ b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/CQLSessionCache.java
@@ -1,11 +1,38 @@
package io.nosqlbench.activitytype.cql.statements.core;
+import java.io.File;
+import java.io.IOException;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import javax.net.ssl.SSLContext;
+
import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.NettyOptions;
import com.datastax.driver.core.ProtocolOptions;
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.policies.*;
+import com.datastax.driver.core.policies.DefaultRetryPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.LoggingRetryPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.WhiteListPolicy;
import com.datastax.driver.dse.DseCluster;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.haproxy.HAProxyCommand;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
+import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
import io.nosqlbench.activitytype.cql.core.CQLOptions;
import io.nosqlbench.activitytype.cql.core.ProxyTranslator;
import io.nosqlbench.engine.api.activityapi.core.Shutdownable;
@@ -18,15 +45,6 @@ import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.*;
-
public class CQLSessionCache implements Shutdownable {
private final static Logger logger = LogManager.getLogger(CQLSessionCache.class);
@@ -254,6 +272,48 @@ public class CQLSessionCache implements Shutdownable {
builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy);
}
+ activityDef.getParams().getOptionalString("haproxy_source_ip").map(
+ ip -> {
+ return new NettyOptions()
+ {
+ @Override
+ public void afterChannelInitialized(SocketChannel channel) throws Exception
+ {
+ try
+ {
+ InetAddress sourceIp = InetAddress.getByName(ip);
+ InetAddress destIp = activityDef.getParams().getOptionalString("haproxy_dest_ip").map(destip -> {
+ try
+ {
+ return InetAddress.getByName(destip);
+ }
+ catch (UnknownHostException e)
+ {
+ logger.warn("Invalid haproxy_dest_ip {}", destip);
+ return sourceIp;
+ }
+ }
+ ).orElse(sourceIp);
+
+ channel.pipeline().addFirst("proxyProtocol", new ProxyProtocolHander(
+ new HAProxyMessage(
+ HAProxyProtocolVersion.V1,
+ HAProxyCommand.PROXY,
+ sourceIp instanceof Inet6Address ? HAProxyProxiedProtocol.TCP6 : HAProxyProxiedProtocol.TCP4,
+ sourceIp.getHostAddress(),
+ destIp.getHostAddress(),
+ 8000,
+ 8000)));
+ }
+ catch (UnknownHostException e)
+ {
+ logger.warn("Invalid haproxy_source_ip {}", ip);
+ }
+ }
+ };
+ }
+ ).ifPresent(builder::withNettyOptions);
+
Cluster cl = builder.build();
// Apply default idempotence, if set
diff --git a/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/ProxyProtocolHandler.java b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/ProxyProtocolHandler.java
new file mode 100644
index 000000000..41f02c62d
--- /dev/null
+++ b/driver-cql-shaded/src/main/java/io/nosqlbench/activitytype/cql/statements/core/ProxyProtocolHandler.java
@@ -0,0 +1,27 @@
+package io.nosqlbench.activitytype.cql.statements.core;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.haproxy.HAProxyMessage;
+import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
+
+class ProxyProtocolHander extends ChannelOutboundHandlerAdapter
+{
+ private final AtomicBoolean sent = new AtomicBoolean(false);
+ private final HAProxyMessage message;
+
+ ProxyProtocolHander(HAProxyMessage message) {
+ this.message = message;
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ if (sent.compareAndSet(false, true))
+ HAProxyMessageEncoder.INSTANCE.write(ctx, message, ctx.voidPromise());
+
+ super.write(ctx, msg, promise);
+ }
+}
\ No newline at end of file
diff --git a/driver-http/src/main/resources/http.md b/driver-http/src/main/resources/http.md
index 821d7e00b..445cb2242 100644
--- a/driver-http/src/main/resources/http.md
+++ b/driver-http/src/main/resources/http.md
@@ -45,7 +45,7 @@ You can even make a detailed request with custom headers and result verification
```yaml
# Require that the result be status code 200-299 match regex "OK, account id is .*" in the body
statements:
- - name: 'get-from-google'
+ - get-from-google:
method: GET
uri: "https://google.com/"
version: "HTTP/1.1"
diff --git a/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_bytebuffer/ByteBufferSizedHashed.java b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_bytebuffer/ByteBufferSizedHashed.java
new file mode 100644
index 000000000..0f7327547
--- /dev/null
+++ b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_bytebuffer/ByteBufferSizedHashed.java
@@ -0,0 +1,63 @@
+package io.nosqlbench.virtdata.library.basics.shared.from_long.to_bytebuffer;
+
+import io.nosqlbench.virtdata.api.annotations.Categories;
+import io.nosqlbench.virtdata.api.annotations.Category;
+import io.nosqlbench.virtdata.api.annotations.Example;
+import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
+import io.nosqlbench.virtdata.api.bindings.VirtDataConversions;
+import io.nosqlbench.virtdata.library.basics.shared.from_long.to_long.Hash;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.function.LongFunction;
+import java.util.function.LongToIntFunction;
+/**
+ * Create a ByteBuffer from a long input based on a provided size function.
+ *
+ * As a 'Sized' function, the first argument is a function which determines the size of the resulting ByteBuffer.
+ *
+ * As a 'Hashed' function, the input value is hashed again before being used as value.
+ */
+@Categories(Category.conversion)
+@ThreadSafeMapper
+public class ByteBufferSizedHashed implements LongFunction {
+
+ private final LongToIntFunction sizeFunc;
+ private final Hash hash = new Hash();
+
+
+ @Example({
+ "ByteBufferSizedHashed(16)",
+ "Functionally identical to HashedtoByteBuffer(16) but using dynamic sizing implementation"
+
+
+ })
+ @Example({
+ "ByteBufferSizedHashed(HashRange(10, 14))",
+ "Create a ByteBuffer with variable limit (10 to 14)"
+ })
+ public ByteBufferSizedHashed(int size) {
+ this.sizeFunc = s -> size;
+ }
+ public ByteBufferSizedHashed(Object sizeFunc) {
+ this.sizeFunc = VirtDataConversions.adaptFunction(sizeFunc, LongToIntFunction.class);
+ }
+
+
+ @Override
+ public ByteBuffer apply(long input) {
+ int length = sizeFunc.applyAsInt(input);
+
+ int longs = (length / Long.BYTES) +1;
+ int bytes = longs * Long.BYTES;
+
+ ByteBuffer buffer = ByteBuffer.allocate(bytes);
+ for (int i = 0; i < longs; i++) {
+ long l = hash.applyAsLong(input + i);
+ buffer.putLong(l);
+ }
+ buffer.flip();
+ buffer.limit(length);
+ return buffer;
+ }
+}
diff --git a/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_bytebuffer/ByteBufferSizedHashedTest.java b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_bytebuffer/ByteBufferSizedHashedTest.java
new file mode 100644
index 000000000..7a2cdaab6
--- /dev/null
+++ b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_bytebuffer/ByteBufferSizedHashedTest.java
@@ -0,0 +1,23 @@
+package io.nosqlbench.virtdata.library.basics.shared.from_long.to_bytebuffer;
+
+import io.nosqlbench.virtdata.library.basics.shared.from_long.to_int.HashRange;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.function.LongToIntFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ByteBufferSizedHashedTest {
+
+ @Test
+ public void testWithHashRange() {
+ LongToIntFunction sizeFunc = new HashRange(100, 1000);
+ long input = 233423L;
+
+ ByteBufferSizedHashed d1 = new ByteBufferSizedHashed(sizeFunc);
+ ByteBuffer buf = d1.apply(input);
+ assertThat(sizeFunc.applyAsInt(233423L)).isEqualTo(buf.remaining());
+ }
+
+}