mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge branch 'main' into pulsar
This commit is contained in:
commit
49eba0eaef
@ -23,3 +23,8 @@
|
|||||||
- fc42ff47 disable hashed file function for now
|
- fc42ff47 disable hashed file function for now
|
||||||
- 83a40b01 provide additional signature for HashedFileExtract
|
- 83a40b01 provide additional signature for HashedFileExtract
|
||||||
- cdf2bd59 make info less chatty by pushing some things to debug
|
- cdf2bd59 make info less chatty by pushing some things to debug
|
||||||
|
- 863f8f24 Add support for specifying an haproxy header for new cql connections
|
||||||
|
- 6f56c134 Add ByteBufferSizedHashed
|
||||||
|
- 4620f312 (origin/http-docs-nit) http docs
|
||||||
|
- e5de8714 reorder build
|
||||||
|
- cd86ea75 improve error handling
|
||||||
|
@ -64,6 +64,12 @@
|
|||||||
<version>4.8</version>
|
<version>4.8</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.netty</groupId>
|
||||||
|
<artifactId>netty-codec-haproxy</artifactId>
|
||||||
|
<version>4.1.54.Final</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- <dependency>-->
|
<!-- <dependency>-->
|
||||||
<!-- <groupId>io.netty</groupId>-->
|
<!-- <groupId>io.netty</groupId>-->
|
||||||
<!-- <artifactId>netty-transport-native-epoll</artifactId>-->
|
<!-- <artifactId>netty-transport-native-epoll</artifactId>-->
|
||||||
|
@ -1,11 +1,38 @@
|
|||||||
package io.nosqlbench.activitytype.cql.statements.core;
|
package io.nosqlbench.activitytype.cql.statements.core;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.Inet6Address;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
import com.datastax.driver.core.Cluster;
|
import com.datastax.driver.core.Cluster;
|
||||||
|
import com.datastax.driver.core.NettyOptions;
|
||||||
import com.datastax.driver.core.ProtocolOptions;
|
import com.datastax.driver.core.ProtocolOptions;
|
||||||
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
|
import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
|
||||||
import com.datastax.driver.core.Session;
|
import com.datastax.driver.core.Session;
|
||||||
import com.datastax.driver.core.policies.*;
|
import com.datastax.driver.core.policies.DefaultRetryPolicy;
|
||||||
|
import com.datastax.driver.core.policies.LoadBalancingPolicy;
|
||||||
|
import com.datastax.driver.core.policies.LoggingRetryPolicy;
|
||||||
|
import com.datastax.driver.core.policies.RetryPolicy;
|
||||||
|
import com.datastax.driver.core.policies.RoundRobinPolicy;
|
||||||
|
import com.datastax.driver.core.policies.WhiteListPolicy;
|
||||||
import com.datastax.driver.dse.DseCluster;
|
import com.datastax.driver.dse.DseCluster;
|
||||||
|
import io.netty.channel.socket.SocketChannel;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyCommand;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyMessage;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyProxiedProtocol;
|
||||||
import io.nosqlbench.activitytype.cql.core.CQLOptions;
|
import io.nosqlbench.activitytype.cql.core.CQLOptions;
|
||||||
import io.nosqlbench.activitytype.cql.core.ProxyTranslator;
|
import io.nosqlbench.activitytype.cql.core.ProxyTranslator;
|
||||||
import io.nosqlbench.engine.api.activityapi.core.Shutdownable;
|
import io.nosqlbench.engine.api.activityapi.core.Shutdownable;
|
||||||
@ -18,15 +45,6 @@ import io.nosqlbench.nb.api.errors.BasicError;
|
|||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.*;
|
|
||||||
|
|
||||||
public class CQLSessionCache implements Shutdownable {
|
public class CQLSessionCache implements Shutdownable {
|
||||||
|
|
||||||
private final static Logger logger = LogManager.getLogger(CQLSessionCache.class);
|
private final static Logger logger = LogManager.getLogger(CQLSessionCache.class);
|
||||||
@ -254,6 +272,48 @@ public class CQLSessionCache implements Shutdownable {
|
|||||||
builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy);
|
builder.withAddressTranslator(new ProxyTranslator(inetHost)).withLoadBalancingPolicy(whitelistPolicy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
activityDef.getParams().getOptionalString("haproxy_source_ip").map(
|
||||||
|
ip -> {
|
||||||
|
return new NettyOptions()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void afterChannelInitialized(SocketChannel channel) throws Exception
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
InetAddress sourceIp = InetAddress.getByName(ip);
|
||||||
|
InetAddress destIp = activityDef.getParams().getOptionalString("haproxy_dest_ip").map(destip -> {
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return InetAddress.getByName(destip);
|
||||||
|
}
|
||||||
|
catch (UnknownHostException e)
|
||||||
|
{
|
||||||
|
logger.warn("Invalid haproxy_dest_ip {}", destip);
|
||||||
|
return sourceIp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
).orElse(sourceIp);
|
||||||
|
|
||||||
|
channel.pipeline().addFirst("proxyProtocol", new ProxyProtocolHander(
|
||||||
|
new HAProxyMessage(
|
||||||
|
HAProxyProtocolVersion.V1,
|
||||||
|
HAProxyCommand.PROXY,
|
||||||
|
sourceIp instanceof Inet6Address ? HAProxyProxiedProtocol.TCP6 : HAProxyProxiedProtocol.TCP4,
|
||||||
|
sourceIp.getHostAddress(),
|
||||||
|
destIp.getHostAddress(),
|
||||||
|
8000,
|
||||||
|
8000)));
|
||||||
|
}
|
||||||
|
catch (UnknownHostException e)
|
||||||
|
{
|
||||||
|
logger.warn("Invalid haproxy_source_ip {}", ip);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
).ifPresent(builder::withNettyOptions);
|
||||||
|
|
||||||
Cluster cl = builder.build();
|
Cluster cl = builder.build();
|
||||||
|
|
||||||
// Apply default idempotence, if set
|
// Apply default idempotence, if set
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
package io.nosqlbench.activitytype.cql.statements.core;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import io.netty.channel.ChannelHandlerContext;
|
||||||
|
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||||
|
import io.netty.channel.ChannelPromise;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyMessage;
|
||||||
|
import io.netty.handler.codec.haproxy.HAProxyMessageEncoder;
|
||||||
|
|
||||||
|
class ProxyProtocolHander extends ChannelOutboundHandlerAdapter
|
||||||
|
{
|
||||||
|
private final AtomicBoolean sent = new AtomicBoolean(false);
|
||||||
|
private final HAProxyMessage message;
|
||||||
|
|
||||||
|
ProxyProtocolHander(HAProxyMessage message) {
|
||||||
|
this.message = message;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||||
|
if (sent.compareAndSet(false, true))
|
||||||
|
HAProxyMessageEncoder.INSTANCE.write(ctx, message, ctx.voidPromise());
|
||||||
|
|
||||||
|
super.write(ctx, msg, promise);
|
||||||
|
}
|
||||||
|
}
|
@ -45,7 +45,7 @@ You can even make a detailed request with custom headers and result verification
|
|||||||
```yaml
|
```yaml
|
||||||
# Require that the result be status code 200-299 match regex "OK, account id is .*" in the body
|
# Require that the result be status code 200-299 match regex "OK, account id is .*" in the body
|
||||||
statements:
|
statements:
|
||||||
- name: 'get-from-google'
|
- get-from-google:
|
||||||
method: GET
|
method: GET
|
||||||
uri: "https://google.com/"
|
uri: "https://google.com/"
|
||||||
version: "HTTP/1.1"
|
version: "HTTP/1.1"
|
||||||
|
@ -0,0 +1,63 @@
|
|||||||
|
package io.nosqlbench.virtdata.library.basics.shared.from_long.to_bytebuffer;
|
||||||
|
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.Categories;
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.Category;
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.Example;
|
||||||
|
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
|
||||||
|
import io.nosqlbench.virtdata.api.bindings.VirtDataConversions;
|
||||||
|
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_long.Hash;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
import java.util.function.LongToIntFunction;
|
||||||
|
/**
|
||||||
|
* Create a ByteBuffer from a long input based on a provided size function.
|
||||||
|
*
|
||||||
|
* As a 'Sized' function, the first argument is a function which determines the size of the resulting ByteBuffer.
|
||||||
|
*
|
||||||
|
* As a 'Hashed' function, the input value is hashed again before being used as value.
|
||||||
|
*/
|
||||||
|
@Categories(Category.conversion)
|
||||||
|
@ThreadSafeMapper
|
||||||
|
public class ByteBufferSizedHashed implements LongFunction<ByteBuffer> {
|
||||||
|
|
||||||
|
private final LongToIntFunction sizeFunc;
|
||||||
|
private final Hash hash = new Hash();
|
||||||
|
|
||||||
|
|
||||||
|
@Example({
|
||||||
|
"ByteBufferSizedHashed(16)",
|
||||||
|
"Functionally identical to HashedtoByteBuffer(16) but using dynamic sizing implementation"
|
||||||
|
|
||||||
|
|
||||||
|
})
|
||||||
|
@Example({
|
||||||
|
"ByteBufferSizedHashed(HashRange(10, 14))",
|
||||||
|
"Create a ByteBuffer with variable limit (10 to 14)"
|
||||||
|
})
|
||||||
|
public ByteBufferSizedHashed(int size) {
|
||||||
|
this.sizeFunc = s -> size;
|
||||||
|
}
|
||||||
|
public ByteBufferSizedHashed(Object sizeFunc) {
|
||||||
|
this.sizeFunc = VirtDataConversions.adaptFunction(sizeFunc, LongToIntFunction.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ByteBuffer apply(long input) {
|
||||||
|
int length = sizeFunc.applyAsInt(input);
|
||||||
|
|
||||||
|
int longs = (length / Long.BYTES) +1;
|
||||||
|
int bytes = longs * Long.BYTES;
|
||||||
|
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(bytes);
|
||||||
|
for (int i = 0; i < longs; i++) {
|
||||||
|
long l = hash.applyAsLong(input + i);
|
||||||
|
buffer.putLong(l);
|
||||||
|
}
|
||||||
|
buffer.flip();
|
||||||
|
buffer.limit(length);
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,23 @@
|
|||||||
|
package io.nosqlbench.virtdata.library.basics.shared.from_long.to_bytebuffer;
|
||||||
|
|
||||||
|
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_int.HashRange;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.function.LongToIntFunction;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
public class ByteBufferSizedHashedTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithHashRange() {
|
||||||
|
LongToIntFunction sizeFunc = new HashRange(100, 1000);
|
||||||
|
long input = 233423L;
|
||||||
|
|
||||||
|
ByteBufferSizedHashed d1 = new ByteBufferSizedHashed(sizeFunc);
|
||||||
|
ByteBuffer buf = d1.apply(input);
|
||||||
|
assertThat(sizeFunc.applyAsInt(233423L)).isEqualTo(buf.remaining());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user