This commit is contained in:
Jonathan Shook 2021-02-23 15:47:29 -06:00
commit 69ad2b54e4
53 changed files with 1122 additions and 332 deletions

View File

@ -9,7 +9,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -18,7 +18,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
@ -98,7 +98,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -21,7 +21,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-jdbc</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -24,13 +24,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cql-shaded</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,13 +20,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -3,7 +3,7 @@
<parent>
<artifactId>nosqlbench</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
@ -18,7 +18,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -44,13 +44,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -21,13 +21,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -42,13 +42,40 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.1</version>
</dependency>
</dependencies>

View File

@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
@ -11,26 +12,27 @@ import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
private final static Logger logger = LogManager.getLogger(PulsarAction.class);
private final static Logger logger = LogManager.getLogger(PulsarActivity.class);
public Timer bindTimer;
public Timer executeTimer;
private PulsarSpaceCache pulsarCache;
private NBErrorHandler errorhandler;
private String pulsarUrl;
private PulsarNBClientConf clientConf;
private OpSequence<LongFunction<PulsarOp>> sequencer;
private PulsarClient activityClient;
// private PulsarClient activityClient;
private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
public PulsarActivity(ActivityDef activityDef) {
super(activityDef);
@ -43,8 +45,11 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
pulsarUrl = activityDef.getParams().getOptionalString("url").orElse("pulsar://localhost:6650");
pulsarCache = new PulsarSpaceCache(this, this::newClient);
String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile);
pulsarCache = new PulsarSpaceCache(this);
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache));
setDefaultsFromOpSequence(sequencer);
onActivityDefUpdate(activityDef);
@ -60,21 +65,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
super.onActivityDefUpdate(activityDef);
}
public PulsarClient newClient() {
try {
PulsarClient newClient = PulsarClient.builder().
serviceUrl(this.pulsarUrl)
.build();
return newClient;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public OpSequence<LongFunction<PulsarOp>> getSequencer() {
return sequencer;
}
public PulsarNBClientConf getPulsarConf() {
return clientConf;
}
public Timer getBindTimer() {
return bindTimer;
}

View File

@ -23,7 +23,6 @@ public class PulsarActivityType implements ActivityType<PulsarActivity> {
}
private static class PulsarActionDispenser implements ActionDispenser {
private final PulsarActivity activity;
public PulsarActionDispenser(PulsarActivity activity) {
this.activity = activity;

View File

@ -1,12 +1,13 @@
package io.nosqlbench.driver.pulsar;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* An instance of a pulsar client, along with all the cached objects which are normally
@ -14,51 +15,141 @@ import java.util.function.Supplier;
* A PulsarSpace is simply a named and cached set of objects which must be used together.
*/
public class PulsarSpace {
private final String name;
private final Supplier<PulsarClient> clientFunc;
private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
// TODO: add support for other client types: consumer, reader, websocket-producer, managed-ledger, etc.
private PulsarClient _client;
private final String name;
private final PulsarNBClientConf pulsarNBClientConf;
public PulsarSpace(String name, Supplier<PulsarClient> clientFunc) {
private PulsarClient pulsarClient = null;
private Schema<?> pulsarSchema = null;
public PulsarSpace( String name, PulsarNBClientConf pulsarClientConf ) {
this.name = name;
this.clientFunc = clientFunc;
this.pulsarNBClientConf = pulsarClientConf;
createPulsarClientFromConf();
createPulsarSchemaFromConf();
}
public PulsarClient getClient() {
if (_client == null) {
_client = clientFunc.get();
private void createPulsarClientFromConf() {
ClientBuilder clientBuilder = PulsarClient.builder();
String dftSvcUrl = "pulsar://localhost:6650";
if ( !pulsarNBClientConf.hasClientConfKey(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString()) ) {
pulsarNBClientConf.setClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString(), dftSvcUrl);
}
try {
Map<String, Object> clientConf = pulsarNBClientConf.getClientConfMap();
pulsarClient = clientBuilder.loadConf(clientConf).build();
}
catch (PulsarClientException pce) {
logger.error("Fail to create PulsarClient from global configuration!");
throw new RuntimeException("Fail to create PulsarClient from global configuration!");
}
return _client;
}
public Producer<?> getProducer(String pname, String topicName) {
Producer<?> producer = producers.computeIfAbsent(
pname, n -> {
try {
// TODO: parameterize producer settings
return getClient().newProducer().topic(topicName).create();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
);
private void createPulsarSchemaFromConf() {
String schemaType = pulsarNBClientConf.getSchemaConfValue("schema.type").toString();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) {
String schemaDefStr = pulsarNBClientConf.getSchemaConfValue("schema.definition").toString();
pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr);
} else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) {
pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType));
} else {
throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " +
"Only primitive type and Avro type are supported at the moment!");
}
}
public PulsarClient getPulsarClient() {
return pulsarClient;
}
public PulsarNBClientConf getPulsarClientConf() {
return pulsarNBClientConf;
}
public Schema<?> getPulsarSchema() {
return pulsarSchema;
}
// Producer name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerName(String cycleProducerName) {
// TODO: Maybe using NB run specific string as the producer name?
String producerName = "default";
String globalProducerName = pulsarNBClientConf.getProducerName();
if ((globalProducerName != null) && (!globalProducerName.isEmpty())) {
producerName = globalProducerName;
}
if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) {
producerName = cycleProducerName;
}
return producerName;
}
// Topic name is mandatory
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveTopicName(String cycleTopicName) {
String globalTopicName = pulsarNBClientConf.getTopicName();
String topicName = globalTopicName;
if ( ((globalTopicName == null) || (globalTopicName.isEmpty())) &&
((cycleTopicName == null) || (cycleTopicName.isEmpty())) ) {
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
} else if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) {
topicName = cycleTopicName;
}
return topicName;
}
private Producer createPulsarProducer(String cycleTopicName, String cycleProducerName) {
PulsarClient pulsarClient = getPulsarClient();
Producer producer = null;
String producerName = getEffectiveProducerName(cycleProducerName);
String topicName = getEffectiveTopicName(cycleTopicName);
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put("topicName", topicName);
if ((producerName != null) && (!producerName.isEmpty())) {
producerConf.put("producerName", producerName);
}
try {
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
}
catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a client to connect to the Pulsar cluster!");
}
return producer;
}
public Consumer<?> getConsumer(String pname, String topicName) {
Consumer<?> consumer = consumers.computeIfAbsent(
pname, n -> {
try {
// TODO: parameterize subscription name and other settings
return getClient().newConsumer().topic(topicName).subscriptionName("testsub").subscribe();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
);
return consumer;
public Producer<?> getProducer(String cycleProducerName, String cycleTopicName) {
String producerName = getEffectiveProducerName(cycleProducerName);
String topicName = getEffectiveTopicName(cycleTopicName);
String identifierStr = producerName.toLowerCase() + "::" + topicName.toLowerCase();
Producer producer = producers.get(identifierStr);
if (producer == null) {
producer = createPulsarProducer(cycleTopicName, cycleProducerName);
producers.put(identifierStr, producer);
}
return producer;
}
}

View File

@ -1,9 +1,6 @@
package io.nosqlbench.driver.pulsar;
import org.apache.pulsar.client.api.PulsarClient;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
/**
* To enable flexibility in testing methods, each object graph which is used within
@ -17,22 +14,18 @@ public class PulsarSpaceCache {
// TODO: Implement variant cache eviction behaviors (halt, warn, LRU)
private final PulsarActivity activity;
private final Supplier<PulsarClient> clientFunc;
private final ConcurrentHashMap<String, PulsarSpace> clientScopes = new ConcurrentHashMap<>();
public PulsarSpaceCache(PulsarActivity pulsarActivity, Supplier<PulsarClient> newClient) {
public PulsarSpaceCache(PulsarActivity pulsarActivity) {
this.activity = pulsarActivity;
this.clientFunc = newClient;
}
public PulsarSpace getPulsarSpace(String name) {
PulsarSpace cspace = clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, clientFunc));
PulsarSpace cspace = clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, activity.getPulsarConf()));
return cspace;
}
public PulsarActivity getActivity() {
return activity;
}
}

View File

@ -15,14 +15,14 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarRecvMapper implements LongFunction<PulsarOp> {
public class PulsarConsumerMapper implements LongFunction<PulsarOp> {
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<String> recvInstructions;
private final CommandTemplate cmdTpl;
public PulsarRecvMapper(LongFunction<Consumer<?>> consumerFunc,
LongFunction<String> recvMsg,
CommandTemplate cmdTpl) {
public PulsarConsumerMapper(LongFunction<Consumer<?>> consumerFunc,
LongFunction<String> recvMsg,
CommandTemplate cmdTpl) {
this.consumerFunc = consumerFunc;
this.recvInstructions = recvMsg;
this.cmdTpl = cmdTpl;
@ -31,6 +31,6 @@ public class PulsarRecvMapper implements LongFunction<PulsarOp> {
@Override
public PulsarOp apply(long value) {
return new PulsarRecvOp((Consumer<byte[]>) consumerFunc.apply(value), recvInstructions.apply(value));
return new PulsarConsumerOp((Consumer<byte[]>) consumerFunc.apply(value), recvInstructions.apply(value));
}
}

View File

@ -6,11 +6,11 @@ import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;
public class PulsarRecvOp implements PulsarOp {
public class PulsarConsumerOp implements PulsarOp {
private final Consumer<byte[]> consumer;
private final String recvInstructions;
public PulsarRecvOp(Consumer<byte[]> consumer, String recvInstructions) {
public PulsarConsumerOp(Consumer<byte[]> consumer, String recvInstructions) {
this.consumer = consumer;
this.recvInstructions = recvInstructions;
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
@ -15,29 +16,32 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarSendMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
public class PulsarProducerMapper implements LongFunction<PulsarOp> {
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<String> payloadFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
private final PulsarSpace clientSpace;
private final CommandTemplate cmdTpl;
public PulsarSendMapper(
public PulsarProducerMapper(
LongFunction<Producer<?>> producerFunc,
LongFunction<String> msgFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc,
PulsarSpace clientSpace,
CommandTemplate cmdTpl) {
this.producerFunc = producerFunc;
this.payloadFunc = msgFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
this.clientSpace = clientSpace;
this.cmdTpl = cmdTpl;
// TODO: add schema support
}
@Override
public PulsarOp apply(long value) {
Producer<?> producer = producerFunc.apply(value);
String msg = payloadFunc.apply(value);
String key = keyFunc != null ? keyFunc.apply(value) : null;
return new PulsarSendOp(key, (Producer<byte[]>) producer, msg);
String msgKey = keyFunc != null ? keyFunc.apply(value) : null;
String msgPayload = payloadFunc.apply(value);
return new PulsarProducerOp(producer, clientSpace.getPulsarSchema(), msgKey, msgPayload);
}
}

View File

@ -0,0 +1,55 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
public class PulsarProducerOp implements PulsarOp {
private final Producer<?> producer;
private final Schema<?> pulsarSchema;
private final String msgKey;
private final String msgPayload;
public PulsarProducerOp(Producer<?> producer, Schema<?> schema, String key, String payload) {
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
}
@Override
public void run() {
try {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
if ( (msgKey != null) && (!msgKey.isEmpty()) ) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, msgPayload);
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema, avroGenericRecord);
typedMessageBuilder = typedMessageBuilder.value(payload);
}
else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
}
typedMessageBuilder.send();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,29 +0,0 @@
package io.nosqlbench.driver.pulsar.ops;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.nio.charset.StandardCharsets;
public class PulsarSendOp implements PulsarOp {
private final Producer<byte[]> producer;
private final String msg;
private final String key;
public PulsarSendOp(String key, Producer<byte[]> producer, String msg) {
this.producer = producer;
this.msg = msg;
this.key = key;
}
// TODO: figure out how to add a key when it is non-null
@Override
public void run() {
try {
MessageId mid = producer.send(msg.getBytes(StandardCharsets.UTF_8));
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,11 +1,11 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.PulsarSpaceCache;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.scoping.ScopedSupplier;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import java.util.function.LongFunction;
@ -38,115 +38,112 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
if (cmdTpl.containsKey("send") && cmdTpl.containsKey("recv")) {
throw new RuntimeException("You must specify either send or recv, but not both.");
}
if (!cmdTpl.containsKey("send") && !cmdTpl.containsKey("recv")) {
throw new RuntimeException("You must specify either send or recv, but none was provided.");
}
LongFunction<String> topic_uri_func;
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
String topic_uri = cmdTpl.getStatic("topic_uri");
topic_uri_func = (l) -> topic_uri;
topic_uri_func = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "default");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topic_uri_func = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topic_uri_func = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "default");
}
else {
if (cmdTpl.containsKey("topic")) {
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topic_uri_func = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topic_uri_func = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "");
}
}
else {
topic_uri_func = null;
}
}
LongFunction<PulsarSpace> spaceFunc;
// TODO: At the moment, only supports static "client"
PulsarSpace clientSpace;
if (cmdTpl.containsKey("client")) {
if (cmdTpl.isStatic("client")) {
String client_name = cmdTpl.getStatic("client");
PulsarSpace clientSpace = pcache.getPulsarSpace(client_name);
spaceFunc = l -> clientSpace;
if (cmdTpl.isDynamic("client")) {
throw new RuntimeException("\"client\" can't be made dynamic!");
} else {
spaceFunc = l -> pcache.getPulsarSpace(cmdTpl.getDynamic("client", l));
String client_name = cmdTpl.getStatic("client");
clientSpace = pcache.getPulsarSpace(client_name);
}
} else {
spaceFunc = l -> pcache.getPulsarSpace("default");
clientSpace = pcache.getPulsarSpace("default");
}
// TODO: Add batch operation types to pulsar
if (cmdTpl.containsKey("send")) {
return resolveSend(spaceFunc, cmdTpl, topic_uri_func);
} else if (cmdTpl.containsKey("recv")) {
return resolveRecv(spaceFunc, cmdTpl, topic_uri_func);
assert (clientSpace != null);
String clientType = clientSpace.getPulsarClientConf().getPulsarClientType();
// TODO: At the moment, only implements "Producer" functionality; add implementation for others later!
if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) {
return resolveProducer(clientSpace, cmdTpl, topic_uri_func);/*
} else if ( msgOperation.equalsIgnoreCase(PulsarActivityUtil.MSGOP_TYPES.CONSUMER.toString()) ) {
return resolveConsumer(spaceFunc, cmdTpl, topic_uri_func);
} else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.READER.toString()) ) {
} else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.WSOKT_PRODUCER.toString()) ) {
} else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.MANAGED_LEDGER.toString()) ) {
*/
} else {
throw new RuntimeException("Neither send nor recv were found in the op template.");
throw new RuntimeException("Unsupported Pulsar message operation type.");
}
}
private LongFunction<PulsarOp> resolveRecv(
LongFunction<PulsarSpace> spaceFunc,
CommandTemplate cmdTpl,
LongFunction<String> topic_uri_func) {
LongFunction<Consumer<?>> consumerFunc;
if (cmdTpl.isStatic("consumer")) {
String consumerName = cmdTpl.getStatic("consumer");
consumerFunc = (l) -> spaceFunc.apply(l).getConsumer(consumerName, topic_uri_func.apply(l));
} else if (cmdTpl.isDynamic("consumer")) {
consumerFunc = (l) -> spaceFunc.apply(l)
.getConsumer(cmdTpl.getDynamic("consumer", l), topic_uri_func.apply(l));
} else {
consumerFunc = (l) -> spaceFunc.apply(l)
.getConsumer(topic_uri_func.apply(l), topic_uri_func.apply(l));
}
return new PulsarRecvMapper(consumerFunc, (l) -> cmdTpl.get("recv", l), cmdTpl);
}
private LongFunction<PulsarOp> resolveSend(
LongFunction<PulsarSpace> spaceFunc,
private LongFunction<PulsarOp> resolveProducer(
PulsarSpace pulsarSpace,
CommandTemplate cmdTpl,
LongFunction<String> topic_uri_func
) {
LongFunction<Producer<?>> producerFunc;
if (cmdTpl.isStatic("producer")) {
String producerName = cmdTpl.getStatic("producer");
producerFunc = (l) -> spaceFunc.apply(l).getProducer(producerName, topic_uri_func.apply(l));
} else if (cmdTpl.isDynamic("producer")) {
producerFunc = (l) -> spaceFunc.apply(l)
.getProducer(cmdTpl.getDynamic("producer", l), topic_uri_func.apply(l));
if (cmdTpl.isStatic("producer-name")) {
producerFunc = (l) -> pulsarSpace.getProducer(cmdTpl.getStatic("producer-name"),
(topic_uri_func == null) ? null : topic_uri_func.apply(l));
} else if (cmdTpl.isDynamic("producer-name")) {
producerFunc = (l) -> pulsarSpace.getProducer(cmdTpl.getDynamic("producer-name", l),
(topic_uri_func == null) ? null : topic_uri_func.apply(l));
} else {
producerFunc = (l) -> spaceFunc.apply(l)
.getProducer(topic_uri_func.apply(l), topic_uri_func.apply(l));
producerFunc = (l) -> pulsarSpace.getProducer(null,
(topic_uri_func == null) ? null : topic_uri_func.apply(l));
}
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("key")) {
String keyName = cmdTpl.getStatic("key");
keyFunc = (l) -> keyName;
} else if (cmdTpl.isDynamic("key")) {
keyFunc = (l) -> cmdTpl.getDynamic("key", l);
if (cmdTpl.isStatic("msg-key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg-key");
} else if (cmdTpl.isDynamic("msg-key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg-key", l);
} else {
keyFunc = null;
}
return new PulsarSendMapper(producerFunc, (l) -> cmdTpl.get("send", l), keyFunc, cmdTpl);
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg-value")) {
if (cmdTpl.isStatic("msg-value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg-value");
} else if (cmdTpl.isDynamic("msg-value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg-value", l);
} else {
valueFunc = null;
}
} else {
throw new RuntimeException("\"msg-value\" field must be specified!");
}
return new PulsarProducerMapper(producerFunc, keyFunc, valueFunc, pulsarSpace, cmdTpl);
}
@Override

View File

@ -0,0 +1,68 @@
package io.nosqlbench.driver.pulsar.util;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
public class AvroUtil {
public static org.apache.avro.Schema GetSchema_ApacheAvro(String avroSchemDef) {
return new org.apache.avro.Schema.Parser().parse(avroSchemDef);
}
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, String jsonData) {
org.apache.avro.generic.GenericRecord record = null;
try {
org.apache.avro.Schema schema = GetSchema_ApacheAvro(avroSchemDef);
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, jsonData);
record = reader.read(null, decoder);
}
catch (IOException ioe) {
ioe.printStackTrace();
}
return record;
}
public static GenericAvroSchema GetSchema_PulsarAvro(String schemaName, String avroSchemDef) {
SchemaInfo schemaInfo = SchemaInfo.builder()
.schema(avroSchemDef.getBytes(StandardCharsets.UTF_8))
.type(SchemaType.AVRO)
.properties(new HashMap<>())
.name(schemaName)
.build();
return new GenericAvroSchema(schemaInfo);
}
public static GenericRecord GetGenericRecord_PulsarAvro(
GenericAvroSchema pulsarGenericAvroSchema,
org.apache.avro.generic.GenericRecord apacheAvroGenericRecord)
{
GenericRecordBuilder recordBuilder = pulsarGenericAvroSchema.newRecordBuilder();
List<Field> fieldList = pulsarGenericAvroSchema.getFields();
for (Field field : fieldList) {
String fieldName = field.getName();
recordBuilder.set(fieldName, apacheAvroGenericRecord.get(fieldName));
}
return recordBuilder.build();
}
public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDef, String jsonData) {
GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDef);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
}
}

View File

@ -0,0 +1,300 @@
package io.nosqlbench.driver.pulsar.util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
public class PulsarActivityUtil {
private final static Logger logger = LogManager.getLogger(PulsarActivityUtil.class);
// Supported message operation types
public enum CLIENT_TYPES {
PRODUCER("producer"),
CONSUMER("consumer"),
READER("reader"),
WSOKT_PRODUCER("websocket-producer"),
MANAGED_LEDGER("managed-ledger")
;
public final String label;
private CLIENT_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidClientType(String type) {
return Arrays.stream(CLIENT_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
}
///////
// Valid persistence type
public enum PERSISTENT_TYPES {
PERSISTENT("persistent"),
NON_PERSISTENT("non-persistent")
;
public final String label;
private PERSISTENT_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidPersistenceType(String type) {
return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
}
///////
// Valid Pulsar client configuration (activity-level settings)
public enum CLNT_CONF_KEY {
serviceUrl("serviceUrl"),
authPulginClassName("authPluginClassName"),
authParams("AuthParams"),
pperationTimeoutMs("operationTimeoutMs"),
statsIntervalSeconds("statsIntervalSeconds"),
numIoThreads("numIoThreads"),
numListenerThreads("numListenerThreads"),
useTcpNoDelay("useTcpNoDelay"),
useTls("useTls"),
tlsTrustCertsFilePath("tlsTrustCertsFilePath"),
tlsAllowInsecureConnection("tlsAllowInsecureConnection"),
tlsHostnameVerificationEnable("tlsHostnameVerificationEnable"),
concurrentLookupRequest("concurrentLookupRequest"),
maxLookupRequest("maxLookupRequest"),
maxNumberOfRejectedRequestPerConnection("maxNumberOfRejectedRequestPerConnection"),
keepAliveIntervalSeconds("keepAliveIntervalSeconds"),
connectionTimeoutMs("connectionTimeoutMs"),
requestTimeoutMs("requestTimeoutMs"),
defaultBackoffIntervalNanos("defaultBackoffIntervalNanos"),
maxBackoffIntervalNanos("maxBackoffIntervalNanos")
;
public final String label;
private CLNT_CONF_KEY(String label) {
this.label = label;
}
}
public static boolean isValidClientConfItem(String item) {
return Arrays.stream(CLNT_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Valid producer configuration (activity-level settings)
public enum PRODUCER_CONF_KEY {
// NOTE:
// For "topicName" and "producerName", they're ignore at activity-level.
// Instead, op-level settings are respected
// topicName("topicName"),
// producerName("producerName"),
sendTimeoutMs("sendTimeoutMs"),
blockIfQueueFull("blockIfQueueFull"),
maxPendingMessages("maxPendingMessages"),
maxPendingMessagesAcrossPartitions("maxPendingMessagesAcrossPartitions"),
messageRoutingMode("messageRoutingMode"),
hashingScheme("hashingScheme"),
cryptoFailureAction("cryptoFailureAction"),
batchingMaxPublishDelayMicros("batchingMaxPublishDelayMicros"),
batchingMaxMessages("batchingMaxMessages"),
batchingEnabled("batchingEnabled"),
compressionType("compressionType")
;
public final String label;
private PRODUCER_CONF_KEY(String label) {
this.label = label;
}
}
public static boolean isValidProducerConfItem(String item) {
return Arrays.stream(PRODUCER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Valid consumer configuration (activity-level settings)
// TODO: to be added
public enum CONSUMER_CONF_KEY {
;
public final String label;
private CONSUMER_CONF_KEY(String label) {
this.label = label;
}
}
///////
// Valid reader configuration (activity-level settings)
// TODO: to be added
public enum READER_CONF_KEY {
;
public final String label;
private READER_CONF_KEY(String label) {
this.label = label;
}
}
///////
// Valid websocket-producer configuration (activity-level settings)
// TODO: to be added
public enum WEBSKT_PRODUCER_CONF_KEY {
;
public final String label;
private WEBSKT_PRODUCER_CONF_KEY(String label) {
this.label = label;
}
}
///////
// Valid managed-ledger configuration (activity-level settings)
// TODO: to be added
public enum MANAGED_LEDGER_CONF_KEY {
;
public final String label;
private MANAGED_LEDGER_CONF_KEY(String label) {
this.label = label;
}
}
///////
// Primitive Schema type
public static boolean isPrimitiveSchemaTypeStr(String typeStr) {
boolean isPrimitive = false;
// Use "BYTES" as the default type if the type string is not explicitly specified
if ((typeStr == null) || typeStr.isEmpty()) {
typeStr = "BYTES";
}
if ( typeStr.toUpperCase().equals("BOOLEAN") || typeStr.toUpperCase().equals("INT8") ||
typeStr.toUpperCase().equals("INT16") || typeStr.toUpperCase().equals("INT32") ||
typeStr.toUpperCase().equals("INT64") || typeStr.toUpperCase().equals("FLOAT") ||
typeStr.toUpperCase().equals("DOUBLE") || typeStr.toUpperCase().equals("BYTES") ||
typeStr.toUpperCase().equals("DATE") || typeStr.toUpperCase().equals("TIME") ||
typeStr.toUpperCase().equals("TIMESTAMP") || typeStr.toUpperCase().equals("INSTANT") ||
typeStr.toUpperCase().equals("LOCAL_DATE") || typeStr.toUpperCase().equals("LOCAL_TIME") ||
typeStr.toUpperCase().equals("LOCAL_DATE_TIME") ) {
isPrimitive = true;
}
return isPrimitive;
}
public static Schema getPrimitiveTypeSchema(String typeStr) {
Schema schema = null;
switch (typeStr.toUpperCase()) {
case "BOOLEAN":
schema = Schema.BOOL;
break;
case "INT8":
schema = Schema.INT8;
break;
case "INT16":
schema = Schema.INT16;
break;
case "INT32":
schema = Schema.INT32;
break;
case "INT64":
schema = Schema.INT64;
break;
case "FLOAT":
schema = Schema.FLOAT;
break;
case "DOUBLE":
schema = Schema.DOUBLE;
break;
case "DATE":
schema = Schema.DATE;
break;
case "TIME":
schema = Schema.TIME;
break;
case "TIMESTAMP":
schema = Schema.TIMESTAMP;
break;
case "INSTANT":
schema = Schema.INSTANT;
break;
case "LOCAL_DATE":
schema = Schema.LOCAL_DATE;
break;
case "LOCAL_TIME":
schema = Schema.LOCAL_TIME;
break;
case "LOCAL_DATE_TIME":
schema = Schema.LOCAL_DATE_TIME;
break;
// Use BYTES as the default schema type if the type string is not specified
case "":
case "BTYES":
schema = Schema.BYTES;
break;
// Report an error if non-valid, non-empty schema type string is provided
default:
throw new RuntimeException("Invalid Pulsar primitive schema type string : " + typeStr);
}
return schema;
}
///////
// Complex strut type: Avro or Json
public static boolean isAvroSchemaTypeStr(String typeStr) {
boolean isAvroType = false;
if ( typeStr.toUpperCase().equals("AVRO") ) {
isAvroType = true;
}
return isAvroType;
}
public static Schema getAvroSchema(String typeStr, String definitionStr) {
String schemaDefinitionStr = definitionStr;
String filePrefix = "file://";
Schema schema = null;
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
if (isAvroSchemaTypeStr(typeStr)) {
if ( (schemaDefinitionStr == null) || schemaDefinitionStr.isEmpty()) {
throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!");
} else if (schemaDefinitionStr.startsWith(filePrefix)) {
try {
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII);
} catch (IOException ioe) {
throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr);
}
}
System.out.println(schemaDefinitionStr);
SchemaInfo schemaInfo = SchemaInfo.builder()
.schema(schemaDefinitionStr.getBytes(StandardCharsets.UTF_8))
.type(SchemaType.AVRO)
.properties(new HashMap<>())
//TODO: A unique name for each NB run?
.name("NBAvro")
.build();
schema = new GenericAvroSchema(schemaInfo);
}
else {
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
}
return schema;
}
}

View File

@ -0,0 +1,206 @@
package io.nosqlbench.driver.pulsar.util;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.FileBasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class PulsarNBClientConf {
private final static Logger logger = LogManager.getLogger(PulsarNBClientConf.class);
private String canonicalFilePath = "";
private static final String DRIVER_CONF_PREFIX = "driver";
private static final String SCHEMA_CONF_PREFIX = "schema";
private static final String CLIENT_CONF_PREFIX = "client";
private static final String PRODUCER_CONF_PREFIX = "producer";
private HashMap<String, Object> driverConfMap = new HashMap<>();
private HashMap<String, Object> schemaConfMap = new HashMap<>();
private HashMap<String, Object> clientConfMap = new HashMap<>();
private HashMap<String, Object> producerConfMap = new HashMap<>();
// TODO: add support for other operation types: consumer, reader, websocket-producer, managed-ledger
public PulsarNBClientConf(String fileName) {
File file = new File(fileName);
try {
canonicalFilePath = file.getCanonicalPath();
Parameters params = new Parameters();
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
.configure(params.properties()
.setFileName(fileName)
.setListDelimiterHandler(new DefaultListDelimiterHandler(',')));
Configuration config = builder.getConfiguration();
// Get driver specific configuration settings
for (Iterator<String> it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
// Get schema specific configuration settings
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
// Get client connection specific configuration settings
for (Iterator<String> it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
// Get producer specific configuration settings
for (Iterator<String> it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
}
catch (IOException ioe) {
logger.error("Can't read the specified config properties file!");
ioe.printStackTrace();
}
catch (ConfigurationException cex) {
logger.error("Error loading configuration items from the specified config properties file!");
cex.printStackTrace();
}
}
// Get NB Driver related config
public Map<String, Object> getDriverConfMap() {
return this.driverConfMap;
}
public boolean hasDriverConfKey(String key) {
if (key.contains(DRIVER_CONF_PREFIX))
return driverConfMap.containsKey(key.substring(DRIVER_CONF_PREFIX.length()+1));
else
return driverConfMap.containsKey(key);
}
public Object getDriverConfValue(String key) {
if (key.contains(DRIVER_CONF_PREFIX))
return driverConfMap.get(key.substring(DRIVER_CONF_PREFIX.length()+1));
else
return driverConfMap.get(key);
}
public void setDriverConfValue(String key, Object value) {
if (key.contains(DRIVER_CONF_PREFIX))
driverConfMap.put(key.substring(DRIVER_CONF_PREFIX.length()+1), value);
else
driverConfMap.put(key, value);
}
// Get Schema related config
public Map<String, Object> getSchemaConfMap() {
return this.schemaConfMap;
}
public boolean hasSchemaConfKey(String key) {
if (key.contains(SCHEMA_CONF_PREFIX))
return schemaConfMap.containsKey(key.substring(SCHEMA_CONF_PREFIX.length()+1));
else
return schemaConfMap.containsKey(key);
}
public Object getSchemaConfValue(String key) {
if (key.contains(SCHEMA_CONF_PREFIX))
return schemaConfMap.get(key.substring(SCHEMA_CONF_PREFIX.length()+1));
else
return schemaConfMap.get(key);
}
public void setSchemaConfValue(String key, Object value) {
if (key.contains(SCHEMA_CONF_PREFIX))
schemaConfMap.put(key.substring(SCHEMA_CONF_PREFIX.length()+1), value);
else
schemaConfMap.put(key, value);
}
// Get Pulsar client related config
public Map<String, Object> getClientConfMap() {
return this.clientConfMap;
}
public boolean hasClientConfKey(String key) {
if (key.contains(CLIENT_CONF_PREFIX))
return clientConfMap.containsKey(key.substring(CLIENT_CONF_PREFIX.length()+1));
else
return clientConfMap.containsKey(key);
}
public Object getClientConfValue(String key) {
if (key.contains(CLIENT_CONF_PREFIX))
return clientConfMap.get(key.substring(CLIENT_CONF_PREFIX.length()+1));
else
return clientConfMap.get(key);
}
public void setClientConfValue(String key, Object value) {
if (key.contains(CLIENT_CONF_PREFIX))
clientConfMap.put(key.substring(CLIENT_CONF_PREFIX.length()+1), value);
else
clientConfMap.put(key, value);
}
// Get Pulsar producer related config
public Map<String, Object> getProducerConfMap() {
return this.producerConfMap;
}
public boolean hasProducerConfKey(String key) {
if (key.contains(PRODUCER_CONF_PREFIX))
return producerConfMap.containsKey(key.substring(PRODUCER_CONF_PREFIX.length()+1));
else
return producerConfMap.containsKey(key);
}
public Object getProducerConfValue(String key) {
if (key.contains(PRODUCER_CONF_PREFIX))
return producerConfMap.get(key.substring(PRODUCER_CONF_PREFIX.length()+1));
else
return producerConfMap.get(key);
}
public void setProducerConfValue(String key, Object value) {
if (key.contains(PRODUCER_CONF_PREFIX))
producerConfMap.put(key.substring(PRODUCER_CONF_PREFIX.length()+1), value);
else
producerConfMap.put(key, value);
}
public String getPulsarClientType() {
Object confValue = getDriverConfValue("driver.client-type");
// If not explicitly specifying Pulsar client type, "producer" is the default type
if (confValue == null)
return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString();
else
return confValue.toString();
}
public String getProducerName() {
Object confValue = getProducerConfValue("producer.producerName");
// If not explicitly specifying Pulsar client type, "producer" is the default type
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getTopicName() {
Object confValue = getProducerConfValue("producer.topicName");
// If not explicitly specifying Pulsar client type, "producer" is the default type
if (confValue == null)
return "";
else
return confValue.toString();
}
}

View File

@ -0,0 +1,39 @@
### NB Pulsar driver related configuration - driver.xxx
driver.client-type = producer
# TODO: At the moment, only one producer is publishing messages to one single topic through NB
# TODO - consider allowing multiple producers to publish to the same topic
# TODO - but this relies on the NB cycle to be able to run Pulsar client asynchronously!
driver.num-workers = 1
### Schema related configurations - schema.xxx
# valid types:
# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
# avro, json, protobuf
#
# TODO: as a starting point, only supports:
# TODO: 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
# TODO: 2) Avro for messages with schema
schema.type = avro
schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc
### Pulsar client related configurations - client.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
# default: pulsar://localhost:6550
client.serviceUrl = pulsar://10.101.34.64:6650
# default: 10000
client.connectionTimeoutMs = 5000
### Producer related configurations (global) - producer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
producer.topicName = persistent://public/default/mynbtest
producer.producerName =
#producer.sendTimeoutMs =
### Consumer related configurations (global) - consumer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer

View File

@ -0,0 +1,11 @@
{
"type": "record",
"name": "IotSensor",
"namespace": "TestNS",
"fields" : [
{"name": "SensorID", "type": "string"},
{"name": "SensorType", "type": "string"},
{"name": "ReadingTime", "type": "string"},
{"name": "ReadingValue", "type": "float"}
]
}

View File

@ -4,35 +4,66 @@ description: |
below like send100 or recv100.
You can specify the number of tenants like `tenants=100`. This is the default.
scenarios:
send: run driver=pulsar cycles=1000 tags=type:send url=TEMPLATE(url,pulsar://localhost:6650)
recv: run driver=pulsar cycles=1000 tags=type:recv url=TEMPLATE(url,pulsar://localhost:6650)
bindings:
cycle: ToString();
numbername: NumberNameToString();
clientinstance: Mod(1);
tenant: Mod(TEMPLATE(tenants,100)L);
# tenant2: Template("tenant-{}",Mod(TEMPLATE(tenants,100)L)->String);
mykey: WeightedString('key1:5,key2:23');
namespace: Template("ns-{}",Mod(TEMPLATE(namespaces,3)L));
mykey: NumberNameToString();
sensor_id: ToUUID();ToString();
# sensor_type:
reading_time: ToDateTime();
reading_value: ToFloat(100);
topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L));
blocks:
- name: send-block
# - create-tenant-namespace:
# tags:
# type: create-tenant-namespace
# statements:
# tenant: {tenant}
# namespace: {namespace}
- name: producer-block
tags:
type: send
type: producer
statements:
- send-stuff:
send: |
{"number": {cycle}, "name": "{numbername}"}
tenant: "public"
namespace: "{namespace}"
# tenant: "tenant-{tenant}"
- name: recv-block
tags:
type: recv
statements:
- recv-stuff:
recv: stuff
- producer-stuff:
#######
# NOTE: tenant and namespace must be static and pre-exist in Pulsar first
# topic_uri: "[persistent|non-persistent]://<tenant>/<namespace>/<topic>"
# topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar"
# producer-name:
msg-key: "{mykey}"
msg-value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
# - name: consumer-block
# tags:
# type: consumer
# statements:
# - consumer-stuff:
# subscription-name:
# subscription-type:
#
# - reader:
# tags:
# type: reader
# statements:
# - reader-stuff:
#
# - websocket-producer:
# tags:
# type: websocket-produer
# statements:
# - websocket-producer-stuff:
#
# - managed-ledger:
# tags:
# type: managed-ledger
# statement:
# - managed-ledger-stuff:

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -24,19 +24,19 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-userlibs</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,25 +23,25 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-userlibs</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,13 +23,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-core</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-docker</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -21,7 +21,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -28,13 +28,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>drivers-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
@ -85,7 +85,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-clients</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<scope>compile</scope>
</dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -65,7 +65,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -28,7 +28,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>docsys</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,7 +22,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -47,7 +47,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -3,7 +3,7 @@
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<packaging>pom</packaging>
<properties>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -31,7 +31,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -5,7 +5,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -24,31 +24,31 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-rest</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-docs</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-core</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-extensions</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
@ -60,73 +60,73 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-web</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-kafka</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-diag</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-tcp</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-http</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-jmx</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-dsegraph-shaded</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cql-shaded</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cqlverify</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-mongodb</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-pulsar</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
@ -138,7 +138,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-cockroachdb</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
@ -269,7 +269,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-mongodb</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>
</profile>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>mvn-defaults</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -23,14 +23,14 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<artifactId>nb-api</artifactId>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lang</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,7 +20,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -22,13 +22,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,13 +20,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -20,7 +20,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-basics</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -18,7 +18,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>

View File

@ -4,7 +4,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -18,36 +18,36 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-realdata</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-realer</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-api</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>virtdata-lib-random</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<artifactId>virtdata-lib-basics</artifactId>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
<artifactId>virtdata-lib-curves4</artifactId>
</dependency>
@ -55,7 +55,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>docsys</artifactId>
<version>4.15.17-SNAPSHOT</version>
<version>4.15.18-SNAPSHOT</version>
</dependency>
</dependencies>