post-merge fixups

This commit is contained in:
Jonathan Shook 2021-03-10 13:02:18 -06:00
commit 5639a54434
27 changed files with 1603 additions and 780 deletions

View File

@ -3,17 +3,19 @@ package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -24,14 +26,13 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer executeTimer;
private PulsarSpaceCache pulsarCache;
private NBErrorHandler errorhandler;
private PulsarNBClientConf clientConf;
private String serviceUrl;
private OpSequence<OpDispenser<PulsarOp>> sequencer;
// private PulsarClient activityClient;
private NBErrorHandler errorhandler;
private OpSequence<LongFunction<PulsarOp>> sequencer;
private Supplier<PulsarSpace> clientSupplier;
// private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
public PulsarActivity(ActivityDef activityDef) {
@ -48,6 +49,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile);
serviceUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
pulsarCache = new PulsarSpaceCache(this);
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache));
@ -65,7 +68,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
super.onActivityDefUpdate(activityDef);
}
public OpSequence<OpDispenser<PulsarOp>> getSequencer() {
public OpSequence<LongFunction<PulsarOp>> getSequencer() {
return sequencer;
}
@ -73,6 +76,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
return clientConf;
}
public String getPulsarServiceUrl() {
return serviceUrl;
}
public Timer getBindTimer() {
return bindTimer;
}

View File

@ -1,196 +0,0 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
public class PulsarConsumerSpace extends PulsarSpace {
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) {
super(name, pulsarClientConf);
}
private String getEffectiveTopicNamesStr(String cycleTopicNames) {
if (!StringUtils.isBlank(cycleTopicNames)) {
return cycleTopicNames;
}
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
if (!StringUtils.isBlank(globalTopicNames)) {
return globalTopicNames;
}
return "";
}
private List<String> getEffectiveTopicNames(String cycleTopicNames) {
String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if (!StringUtils.isBlank(name))
effectiveTopicNameList.add(name.trim());
}
return effectiveTopicNameList;
}
private String getEffectiveTopicPatternStr(String cycleTopicsPattern) {
if (!StringUtils.isBlank(cycleTopicsPattern)) {
return cycleTopicsPattern;
}
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
if (!StringUtils.isBlank(globalTopicsPattern)) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) {
String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern;
try {
if (!StringUtils.isBlank(effectiveTopicsPatternStr))
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
else
topicsPattern = null;
} catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
}
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
if (!StringUtils.isBlank(cycleSubscriptionName)) {
return cycleSubscriptionName;
}
String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName();
if (!StringUtils.isBlank(globalSubscriptionName)) {
return globalSubscriptionName;
}
return "default-subs";
}
private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) {
if (!StringUtils.isBlank(cycleSubscriptionType)) {
return cycleSubscriptionType;
}
String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType();
if (!StringUtils.isBlank(globalSubscriptionType)) {
return globalSubscriptionType;
}
return "";
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType);
SubscriptionType subscriptionType;
try {
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
} catch (IllegalArgumentException iae) {
subscriptionType = SubscriptionType.Exclusive;
}
return subscriptionType;
}
private String getEffectiveConsumerName(String cycleConsumerName) {
if (!StringUtils.isBlank(cycleConsumerName)) {
return cycleConsumerName;
}
String globalConsumerName = pulsarNBClientConf.getConsumerName();
if (!StringUtils.isBlank(globalConsumerName)) {
return globalConsumerName;
}
return "default-cons";
}
public Consumer<?> getConsumer(String cycleTopicNames,
String cycleTopicsPattern,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName) {
String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
List<String> topicNames = getEffectiveTopicNames(cycleTopicNames);
String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if (topicNames.isEmpty() && (topicsPattern == null)) {
throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!");
}
String encodedStr;
if (!topicNames.isEmpty()) {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
StringUtils.join(topicNames, "|"));
} else {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
topicsPatternStr);
}
Consumer<?> consumer = consumers.get(encodedStr);
if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> consumerConf = pulsarNBClientConf.getConsumerConfMap();
// Explicit topic names will take precedence over topics pattern
if (!topicNames.isEmpty()) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.toString(), topicNames);
} else {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.put(
PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
getEffectiveTopicPattern(cycleTopicsPattern));
}
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName);
try {
consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe();
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar consumer!");
}
consumers.put(encodedStr, consumer);
}
return consumer;
}
}

View File

@ -1,80 +0,0 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarProducerSpace extends PulsarSpace {
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
public PulsarProducerSpace(String name, PulsarNBClientConf pulsarClientConf) {
super(name, pulsarClientConf);
}
// Producer name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerName(String cycleProducerName) {
if (!StringUtils.isBlank(cycleProducerName)) {
return cycleProducerName;
}
String globalProducerName = pulsarNBClientConf.getProducerName();
if (!StringUtils.isBlank(globalProducerName)) {
return globalProducerName;
}
// Default Producer name when it is not set at either cycle or global level
return "default-prod";
}
// Topic name IS mandatory
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveTopicName(String cycleTopicName) {
if (!StringUtils.isBlank(cycleTopicName)) {
return cycleTopicName;
}
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
if (!StringUtils.isBlank(globalTopicName)) {
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
}
return globalTopicName;
}
public Producer<?> getProducer(String cycleProducerName, String cycleTopicName) {
String producerName = getEffectiveProducerName(cycleProducerName);
String topicName = getEffectiveTopicName(cycleTopicName);
String encodedStr = PulsarActivityUtil.encode(cycleProducerName, cycleTopicName);
Producer<?> producer = producers.get(encodedStr);
if (producer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
try {
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
} catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a Pulsar producer!");
}
producers.put(encodedStr, producer);
}
return producer;
}
}

View File

@ -1,109 +0,0 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarReaderSpace extends PulsarSpace {
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) {
super(name, pulsarClientConf);
}
private String getEffectiveReaderTopicName(String cycleReaderTopicName) {
if (!StringUtils.isBlank(cycleReaderTopicName)) {
return cycleReaderTopicName;
}
String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName();
if (!StringUtils.isBlank(globalReaderTopicName)) {
return globalReaderTopicName;
}
return "";
}
private String getEffectiveReaderName(String cycleReaderName) {
if (!StringUtils.isBlank(cycleReaderName)) {
return cycleReaderName;
}
String globalReaderName = pulsarNBClientConf.getConsumerName();
if (!StringUtils.isBlank(globalReaderName)) {
return globalReaderName;
}
return "default-read";
}
private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) {
if (!StringUtils.isBlank(cycleStartMsgPosStr)) {
return cycleStartMsgPosStr;
}
String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr();
if (!StringUtils.isBlank(globalStartMsgPosStr)) {
return globalStartMsgPosStr;
}
return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
}
public Reader<?> getReader(String cycleTopicName,
String cycleReaderName,
String cycleStartMsgPos) {
String topicName = getEffectiveReaderTopicName(cycleTopicName);
String readerName = getEffectiveReaderName(cycleReaderName);
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
if (StringUtils.isBlank(topicName)) {
throw new RuntimeException("Must specify a \"topicName\" for a reader!");
}
String encodedStr = PulsarActivityUtil.encode(cycleTopicName, cycleReaderName, cycleStartMsgPos);
Reader<?> reader = readers.get(encodedStr);
if (reader == null) {
PulsarClient pulsarClient = getPulsarClient();
Map<String, Object> readerConf = pulsarNBClientConf.getReaderConfMap();
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName);
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName);
// "reader.startMessagePos" is NOT a standard Pulsar reader conf
readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
try {
ReaderBuilder<?> readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf);
MessageId startMsgId = MessageId.latest;
if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
startMsgId = MessageId.earliest;
}
//TODO: custom start message position is NOT supported yet
//else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) {
// startMsgId = MessageId.latest;
//}
if (startMsgId != null) {
readerBuilder = readerBuilder.startMessageId(startMsgId);
}
reader = readerBuilder.create();
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar reader!");
}
readers.put(encodedStr, reader);
}
return reader;
}
}

View File

@ -2,12 +2,20 @@ package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
/**
* An instance of a pulsar client, along with all the cached objects which are normally
@ -17,17 +25,22 @@ import java.util.concurrent.ConcurrentHashMap;
public class PulsarSpace {
private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
// TODO: add support for other client types: consumer, reader, websocket-producer, managed-ledger, etc.
protected final String name;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
protected final String spaceName;
protected final PulsarNBClientConf pulsarNBClientConf;
protected final String pulsarSvcUrl;
protected PulsarClient pulsarClient = null;
protected Schema<?> pulsarSchema = null;
public PulsarSpace(String name, PulsarNBClientConf pulsarClientConf) {
this.name = name;
public PulsarSpace(String name, PulsarNBClientConf pulsarClientConf, String pulsarSvcUrl) {
this.spaceName = name;
this.pulsarNBClientConf = pulsarClientConf;
this.pulsarSvcUrl = pulsarSvcUrl;
createPulsarClientFromConf();
createPulsarSchemaFromConf();
@ -36,14 +49,15 @@ public class PulsarSpace {
protected void createPulsarClientFromConf() {
ClientBuilder clientBuilder = PulsarClient.builder();
String dftSvcUrl = "pulsar://localhost:6650";
if (!pulsarNBClientConf.hasClientConfKey(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString())) {
pulsarNBClientConf.setClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString(), dftSvcUrl);
}
try {
Map<String, Object> clientConf = pulsarNBClientConf.getClientConfMap();
pulsarClient = clientBuilder.loadConf(clientConf).build();
// Override "client.serviceUrl" setting in config.properties
clientConf.remove("serviceUrl", pulsarSvcUrl);
pulsarClient = clientBuilder
.loadConf(clientConf)
.serviceUrl(pulsarSvcUrl)
.build();
} catch (PulsarClientException pce) {
logger.error("Fail to create PulsarClient from global configuration!");
throw new RuntimeException("Fail to create PulsarClient from global configuration!");
@ -66,9 +80,391 @@ public class PulsarSpace {
}
}
public PulsarClient getPulsarClient() { return pulsarClient; }
public PulsarClient getPulsarClient() {
return pulsarClient;
}
public PulsarNBClientConf getPulsarClientConf() {
return pulsarNBClientConf;
}
public Schema<?> getPulsarSchema() { return pulsarSchema; }
public Schema<?> getPulsarSchema() {
return pulsarSchema;
}
//////////////////////////////////////
// Producer Processing --> start
//////////////////////////////////////
// Topic name IS mandatory
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerTopicName(String cycleTopicName) {
if (!StringUtils.isBlank(cycleTopicName)) {
return cycleTopicName;
}
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
if (!StringUtils.isBlank(globalTopicName)) {
return globalTopicName;
}
throw new RuntimeException(" topic name must be set at either global level or cycle level!");
}
// Producer name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerName(String cycleProducerName) {
if (!StringUtils.isBlank(cycleProducerName)) {
return cycleProducerName;
}
String globalProducerName = pulsarNBClientConf.getProducerName();
if (!StringUtils.isBlank(globalProducerName)) {
return globalProducerName;
}
return "";
}
public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) {
String topicName = getEffectiveProducerTopicName(cycleTopicName);
String producerName = getEffectiveProducerName(cycleProducerName);
if (StringUtils.isBlank(topicName)) {
throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level");
}
String encodedStr = PulsarActivityUtil.encode(producerName, topicName);
Producer<?> producer = producers.get(encodedStr);
if (producer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
if (!StringUtils.isBlank(producerName)) {
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
}
try {
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
} catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a Pulsar producer!");
}
producers.put(encodedStr, producer);
}
return producer;
}
//////////////////////////////////////
// Producer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Consumer Processing --> start
//////////////////////////////////////
private String getEffectiveTopicNamesStr(String cycleTopicNames) {
if (!StringUtils.isBlank(cycleTopicNames)) {
return cycleTopicNames;
}
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
if (!StringUtils.isBlank(globalTopicNames)) {
return globalTopicNames;
}
return "";
}
private List<String> getEffectiveTopicNames(String cycleTopicNames) {
String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if (!StringUtils.isBlank(name))
effectiveTopicNameList.add(name.trim());
}
return effectiveTopicNameList;
}
private String getEffectiveTopicPatternStr(String cycleTopicsPattern) {
if (!StringUtils.isBlank(cycleTopicsPattern)) {
return cycleTopicsPattern;
}
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
if (!StringUtils.isBlank(globalTopicsPattern)) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) {
String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern;
try {
if (!StringUtils.isBlank(effectiveTopicsPatternStr))
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
else
topicsPattern = null;
} catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
}
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
if (!StringUtils.isBlank(cycleSubscriptionName)) {
return cycleSubscriptionName;
}
String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName();
if (!StringUtils.isBlank(globalSubscriptionName)) {
return globalSubscriptionName;
}
throw new RuntimeException("Consumer::Subscription name must be set at either global level or cycle level!");
}
private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) {
if (!StringUtils.isBlank(cycleSubscriptionType)) {
return cycleSubscriptionType;
}
String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType();
if (!StringUtils.isBlank(globalSubscriptionType)) {
return globalSubscriptionType;
}
return "";
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType);
SubscriptionType subscriptionType = SubscriptionType.Exclusive;
if (!StringUtils.isBlank(effectiveSubscriptionStr)) {
if (!PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionStr)) {
throw new RuntimeException("Consumer::Invalid subscription type: " + effectiveSubscriptionStr);
} else {
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
}
}
return subscriptionType;
}
private String getEffectiveConsumerName(String cycleConsumerName) {
if (!StringUtils.isBlank(cycleConsumerName)) {
return cycleConsumerName;
}
String globalConsumerName = pulsarNBClientConf.getConsumerName();
if (!StringUtils.isBlank(globalConsumerName)) {
return globalConsumerName;
}
return "";
}
public Consumer<?> getConsumer(String cycleTopicUri,
String cycleTopicNames,
String cycleTopicsPattern,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName) {
List<String> topicNames = getEffectiveTopicNames(cycleTopicNames);
String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if (StringUtils.isBlank(cycleTopicUri) && topicNames.isEmpty() && (topicsPattern == null)) {
throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
}
String encodedStr;
// precedence sequence:
// topic_names (consumer statement param) >
// topics_pattern (consumer statement param) >
// topic_uri (document level param)
if (!topicNames.isEmpty()) {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
StringUtils.join(topicNames, "|"));
} else if (topicsPattern != null) {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
topicsPatternStr);
} else {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
cycleTopicUri);
}
Consumer<?> consumer = consumers.get(encodedStr);
if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> consumerConf = pulsarNBClientConf.getConsumerConfMap();
// Explicit topic names will take precedence over topics pattern
if (!topicNames.isEmpty()) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames);
} else if (topicsPattern != null) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.put(
PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
getEffectiveTopicPattern(cycleTopicsPattern));
} else {
topicNames.add(cycleTopicUri);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames);
}
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType);
if (!StringUtils.isBlank(consumerName)) {
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName);
}
try {
consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe();
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar consumer!");
}
consumers.put(encodedStr, consumer);
}
return consumer;
}
//////////////////////////////////////
// Consumer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Reader Processing --> Start
//////////////////////////////////////
private String getEffectiveReaderTopicName(String cycleReaderTopicName) {
if (!StringUtils.isBlank(cycleReaderTopicName)) {
return cycleReaderTopicName;
}
String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName();
if (!StringUtils.isBlank(globalReaderTopicName)) {
return globalReaderTopicName;
}
throw new RuntimeException("Reader topic name must be set at either global level or cycle level!");
}
private String getEffectiveReaderName(String cycleReaderName) {
if (!StringUtils.isBlank(cycleReaderName)) {
return cycleReaderName;
}
String globalReaderName = pulsarNBClientConf.getConsumerName();
if (!StringUtils.isBlank(globalReaderName)) {
return globalReaderName;
}
return "";
}
private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) {
if (!StringUtils.isBlank(cycleStartMsgPosStr)) {
return cycleStartMsgPosStr;
}
String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr();
if (!StringUtils.isBlank(globalStartMsgPosStr)) {
return globalStartMsgPosStr;
}
return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
}
public Reader<?> getReader(String cycleTopicName,
String cycleReaderName,
String cycleStartMsgPos) {
String topicName = getEffectiveReaderTopicName(cycleTopicName);
if (StringUtils.isBlank(topicName)) {
throw new RuntimeException("Reader:: must specify a topic name either at the global level or the cycle level");
}
String readerName = getEffectiveReaderName(cycleReaderName);
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
if (!PulsarActivityUtil.isValideReaderStartPosition(startMsgPosStr)) {
throw new RuntimeException("Reader:: Invalid value for Reader start message position!");
}
String encodedStr = PulsarActivityUtil.encode(topicName, readerName, startMsgPosStr);
Reader<?> reader = readers.get(encodedStr);
if (reader == null) {
PulsarClient pulsarClient = getPulsarClient();
Map<String, Object> readerConf = pulsarNBClientConf.getReaderConfMap();
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName);
if (!StringUtils.isBlank(readerName)) {
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName);
}
// "reader.startMessagePos" is NOT a standard Pulsar reader conf
readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
try {
ReaderBuilder<?> readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf);
MessageId startMsgId = MessageId.latest;
if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
startMsgId = MessageId.earliest;
}
//TODO: custom start message position is NOT supported yet
//else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) {
// startMsgId = MessageId.latest;
//}
if (startMsgId != null) {
readerBuilder = readerBuilder.startMessageId(startMsgId);
}
reader = readerBuilder.create();
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar reader!");
}
readers.put(encodedStr, reader);
}
return reader;
}
//////////////////////////////////////
// Reader Processing <-- end
//////////////////////////////////////
}

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.ConcurrentHashMap;
@ -23,20 +24,8 @@ public class PulsarSpaceCache {
}
public PulsarSpace getPulsarSpace(String name) {
String pulsarClientType = activity.getPulsarConf().getPulsarClientType();
if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString())) {
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarProducerSpace(spaceName, activity.getPulsarConf()));
} else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString())) {
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarConsumerSpace(spaceName, activity.getPulsarConf()));
} else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString())) {
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarReaderSpace(spaceName, activity.getPulsarConf()));
}
// TODO: add support for websocket-producer and managed-ledger
else {
throw new RuntimeException("Unsupported Pulsar client type: " + pulsarClientType);
}
return clientScopes.computeIfAbsent(name, spaceName ->
new PulsarSpace(spaceName, activity.getPulsarConf(), activity.getPulsarServiceUrl()));
}
public PulsarActivity getActivity() {

View File

@ -0,0 +1,17 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
public class PulsarBatchProducerEndMapper extends PulsarOpMapper {
public PulsarBatchProducerEndMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace) {
super(cmdTpl, clientSpace);
}
@Override
public PulsarOp apply(long value) {
return new PulsarBatchProducerEndOp();
}
}

View File

@ -0,0 +1,36 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerEndOp implements PulsarOp {
@Override
public void run() {
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();
Producer<?> producer = PulsarBatchProducerStartOp.threadLocalProducer.get();
if ((container != null) && (!container.isEmpty())) {
try {
// producer.flushAsync().get();
FutureUtil.waitForAll(container).get();
} catch (Exception e) {
throw new RuntimeException("Batch Producer:: failed to send (some of) the batched messages!");
}
container.clear();
PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.set(null);
} else {
throw new BasicError("You tried to end an empty batch message container. This means you" +
" did initiate the batch container properly, or there is an error in your" +
" pulsar op sequencing and ratios.");
}
}
}

View File

@ -0,0 +1,33 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.function.LongFunction;
public class PulsarBatchProducerMapper extends PulsarOpMapper {
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
public PulsarBatchProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc) {
super(cmdTpl, clientSpace);
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
}
@Override
public PulsarOp apply(long value) {
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
return new PulsarBatchProducerOp(
clientSpace.getPulsarSchema(),
msgKey,
msgPayload
);
}
}

View File

@ -0,0 +1,61 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerOp implements PulsarOp {
private final Schema<?> pulsarSchema;
private final String msgKey;
private final String msgPayload;
public PulsarBatchProducerOp(Schema<?> schema,
String key,
String payload) {
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
}
@Override
public void run() {
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();
Producer<?> producer = PulsarBatchProducerStartOp.threadLocalProducer.get();
assert (producer != null) && (container != null);
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
if ((msgKey != null) && (!msgKey.isEmpty())) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema,
pulsarSchema.getSchemaInfo().getSchemaDefinition(),
msgPayload
);
typedMessageBuilder = typedMessageBuilder.value(payload);
} else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
}
container.add(typedMessageBuilder.sendAsync());
}
}

View File

@ -0,0 +1,26 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.function.LongFunction;
public class PulsarBatchProducerStartMapper extends PulsarOpMapper {
private final LongFunction<Producer<?>> batchProducerFunc;
public PulsarBatchProducerStartMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Producer<?>> batchProducerFunc) {
super(cmdTpl, clientSpace);
this.batchProducerFunc = batchProducerFunc;
}
@Override
public PulsarOp apply(long value) {
Producer<?> batchProducer = batchProducerFunc.apply(value);
return new PulsarBatchProducerStartOp(batchProducer);
}
}

View File

@ -0,0 +1,33 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.commons.compress.utils.Lists;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerStartOp implements PulsarOp {
// TODO: ensure sane container lifecycle management
public final static ThreadLocal<List<CompletableFuture<MessageId>>> threadLocalBatchMsgContainer = new ThreadLocal<>();
public final static ThreadLocal<Producer<?>> threadLocalProducer = new ThreadLocal<>();
public PulsarBatchProducerStartOp(Producer<?> batchProducer) {
threadLocalProducer.set(batchProducer);
}
@Override
public void run() {
List<CompletableFuture<MessageId>> container = threadLocalBatchMsgContainer.get();
if (container == null) {
container = Lists.newArrayList();
threadLocalBatchMsgContainer.set(container);
} else {
throw new BasicError("You tried to create a batch message container where one was already" +
" defined. This means you did not flush and unset the last container, or there is an error in your" +
" pulsar op sequencing and ratios.");
}
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
@ -16,22 +17,28 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarConsumerMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final Schema<?> pulsarSchema;
public class PulsarConsumerMapper extends PulsarOpMapper {
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<Boolean> asyncApiFunc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
Schema<?> pulsarSchema,
LongFunction<Consumer<?>> consumerFunc) {
this.cmdTpl = cmdTpl;
this.pulsarSchema = pulsarSchema;
PulsarSpace clientSpace,
LongFunction<Consumer<?>> consumerFunc,
LongFunction<Boolean> asyncApiFunc) {
super(cmdTpl, clientSpace);
this.consumerFunc = consumerFunc;
this.asyncApiFunc = asyncApiFunc;
}
@Override
public PulsarOp apply(long value) {
Consumer<?> consumer = consumerFunc.apply(value);
return new PulsarConsumerOp(consumer, pulsarSchema);
boolean asyncApi = asyncApiFunc.apply(value);
return new PulsarConsumerOp(
consumer,
clientSpace.getPulsarSchema(),
asyncApi
);
}
}

View File

@ -3,23 +3,20 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
public class PulsarConsumerOp implements PulsarOp {
private final Consumer<?> consumer;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema) {
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp) {
this.consumer = consumer;
this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp;
}
@Override
public void run() {
public void syncConsume() {
try {
Message<?> message = consumer.receive();
@ -35,9 +32,20 @@ public class PulsarConsumerOp implements PulsarOp {
}
consumer.acknowledge(message.getMessageId());
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
public void asyncConsume() {
//TODO: add support for async consume
}
@Override
public void run() {
if (!asyncPulsarOp)
syncConsume();
else
asyncConsume();
}
}

View File

@ -0,0 +1,19 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import java.util.function.LongFunction;
public abstract class PulsarOpMapper implements LongFunction<PulsarOp> {
protected final CommandTemplate cmdTpl;
protected final PulsarSpace clientSpace;
public PulsarOpMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace) {
this.cmdTpl = cmdTpl;
this.clientSpace = clientSpace;
}
}

View File

@ -17,21 +17,21 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarProducerMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final Schema<?> pulsarSchema;
public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<Boolean> asyncApiFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
public PulsarProducerMapper(CommandTemplate cmdTpl,
Schema<?> pulsarSchema,
PulsarSpace clientSpace,
LongFunction<Producer<?>> producerFunc,
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc) {
this.cmdTpl = cmdTpl;
this.pulsarSchema = pulsarSchema;
super(cmdTpl, clientSpace);
this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
}
@ -39,9 +39,15 @@ public class PulsarProducerMapper implements LongFunction<PulsarOp> {
@Override
public PulsarOp apply(long value) {
Producer<?> producer = producerFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
return new PulsarProducerOp(producer, pulsarSchema, msgKey, msgPayload);
return new PulsarProducerOp(
producer,
clientSpace.getPulsarSchema(),
asyncApi,
msgKey,
msgPayload);
}
}

View File

@ -1,58 +1,86 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarAction;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
public class PulsarProducerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarProducerOp.class);
private final Producer<?> producer;
private final Schema<?> pulsarSchema;
private final String msgKey;
private final String msgPayload;
private final boolean asyncPulsarOp;
public PulsarProducerOp(Producer<?> producer, Schema<?> schema, String key, String payload) {
public PulsarProducerOp(Producer<?> producer,
Schema<?> schema,
boolean asyncPulsarOp,
String key,
String payload) {
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp;
}
@Override
public void run() {
try {
if ( (msgPayload == null) || msgPayload.isEmpty() ) {
throw new RuntimeException("Message payload (\"msg-value\" can't be empty!");
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
if ((msgKey != null) && (!msgKey.isEmpty())) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema,
pulsarSchema.getSchemaInfo().getSchemaDefinition(),
msgPayload
);
typedMessageBuilder = typedMessageBuilder.value(payload);
} else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
}
//TODO: add error handling with failed message production
if (!asyncPulsarOp) {
try {
logger.trace("sending message");
typedMessageBuilder.send();
} catch (PulsarClientException pce) {
logger.trace("failed sending message");
throw new RuntimeException(pce);
}
} else {
try {
CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
future.get();
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
if ( (msgKey != null) && (!msgKey.isEmpty()) ) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
/*.thenRun(() -> {
// System.out.println("Producing message succeeded: key - " + msgKey + "; payload - " + msgPayload);
}).exceptionally(ex -> {
System.out.println("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
return ex;
})*/
} catch (Exception e) {
throw new RuntimeException(e);
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, msgPayload);
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema, avroGenericRecord);
typedMessageBuilder = typedMessageBuilder.value(payload);
}
else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
}
typedMessageBuilder.send();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,27 +1,35 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import java.util.function.LongFunction;
public class PulsarReaderMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final Schema<?> pulsarSchema;
public class PulsarReaderMapper extends PulsarOpMapper {
private final LongFunction<Reader<?>> readerFunc;
private final LongFunction<Boolean> asyncApiFunc;
public PulsarReaderMapper(CommandTemplate cmdTpl,
Schema<?> pulsarSchema,
LongFunction<Reader<?>> readerFunc) {
this.cmdTpl = cmdTpl;
this.pulsarSchema = pulsarSchema;
PulsarSpace clientSpace,
LongFunction<Reader<?>> readerFunc,
LongFunction<Boolean> asyncApiFunc) {
super(cmdTpl, clientSpace);
this.readerFunc = readerFunc;
this.asyncApiFunc = asyncApiFunc;
}
@Override
public PulsarOp apply(long value) {
Reader<?> reader = readerFunc.apply(value);
return new PulsarReaderOp(reader, pulsarSchema);
boolean asyncApi = asyncApiFunc.apply(value);
return new PulsarReaderOp(
reader,
clientSpace.getPulsarSchema(),
asyncApi
);
}
}

View File

@ -11,14 +11,15 @@ import org.apache.pulsar.common.schema.SchemaType;
public class PulsarReaderOp implements PulsarOp {
private final Reader<?> reader;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;
public PulsarReaderOp(Reader<?> reader, Schema<?> schema) {
public PulsarReaderOp(Reader<?> reader, Schema<?> schema, boolean asyncPulsarOp) {
this.reader = reader;
this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp;
}
@Override
public void run() {
public void syncRead() {
try {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
@ -40,4 +41,16 @@ public class PulsarReaderOp implements PulsarOp {
throw new RuntimeException(e);
}
}
public void asyncRead() {
//TODO: add support for async read
}
@Override
public void run() {
if (!asyncPulsarOp)
syncRead();
else
asyncRead();
}
}

View File

@ -1,36 +1,36 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.PulsarSpaceCache;
import io.nosqlbench.driver.pulsar.*;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.scoping.ScopedSupplier;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
public class ReadyPulsarOp implements LongFunction<PulsarOp> {
private final OpTemplate opTpl;
private final CommandTemplate cmdTpl;
private final PulsarSpace clientSpace;
private final LongFunction<PulsarOp> opFunc;
private final PulsarSpaceCache pcache;
private final Schema pulsarSchema;
// TODO: Add docs for the command template with respect to the OpTemplate
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) {
// TODO: Consider parsing map structures into equivalent binding representation
this.opTpl = opTemplate;
this.cmdTpl = new CommandTemplate(opTemplate);
if (cmdTpl.isDynamic("op_scope")) {
throw new RuntimeException("op_scope must be static");
}
this.pcache = pcache;
// TODO: At the moment, only supports static "client"
if (cmdTpl.containsKey("client")) {
if (cmdTpl.isDynamic("client")) {
@ -43,40 +43,38 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
this.clientSpace = pcache.getPulsarSpace("default");
}
this.pulsarSchema = clientSpace.getPulsarSchema();
this.opFunc = resolve();
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
Supplier<LongFunction<PulsarOp>> opSupplier = scope.supplier(this::resolve);
}
@Override
public PulsarOp apply(long value) {
return opFunc.apply(value);
}
private boolean isBoolean(String str) {
return StringUtils.equalsAnyIgnoreCase(str, "yes", "true");
}
private LongFunction<PulsarOp> resolve() {
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
LongFunction<String> cycle_producer_name_func;
if (cmdTpl.isStatic("producer-name")) {
cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer-name");
} else if (cmdTpl.isDynamic("producer-name")) {
cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer-name", l);
} else {
cycle_producer_name_func = (l) -> null;
}
LongFunction<String> topic_uri_func;
// Global parameter: topic_uri
LongFunction<String> topicUriFunc;
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
topic_uri_func = (l) -> cmdTpl.getStatic("topic_uri");
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l);
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
}
else if (cmdTpl.containsKey("topic")) {
} else if (cmdTpl.containsKey("topic")) {
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
@ -86,73 +84,252 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topic_uri_func = (l) -> composited;
topicUriFunc = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topic_uri_func = (l) ->
topicUriFunc = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "");
}
}
else {
topic_uri_func = (l) -> null;
}
assert (clientSpace != null);
String clientType = clientSpace.getPulsarClientConf().getPulsarClientType();
// TODO: At the moment, only implements "Producer" functionality; add implementation for others later!
if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) {
return resolveProducer(clientSpace, cmdTpl, cycle_producer_name_func, topic_uri_func);/*
} else if ( msgOperation.equalsIgnoreCase(PulsarActivityUtil.MSGOP_TYPES.CONSUMER.toString()) ) {
return resolveConsumer(spaceFunc, cmdTpl, topic_uri_func);
} else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.READER.toString()) ) {
} else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.WSOKT_PRODUCER.toString()) ) {
} else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.MANAGED_LEDGER.toString()) ) {
*/
} else {
throw new RuntimeException("Unsupported Pulsar message operation type.");
topicUriFunc = (l) -> null;
}
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc;
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
} else {
asyncApiFunc = (l) -> false;
}
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
// TODO: Complete implementation for websocket-producer and managed-ledger
if /*( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_TENANT.label) ) {
return resolveCreateTenant(clientSpace);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_NAMESPACE.label) ) {
return resolveCreateNameSpace(clientSpace);
} else if*/ (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) {
return resolveMsgBatchSendStart(clientSpace, topicUriFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) {
return resolveMsgBatchSend(clientSpace);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) {
return resolveMsgBatchSendEnd(clientSpace);
} else {
throw new RuntimeException("Unsupported Pulsar operation type");
}
}
private LongFunction<PulsarOp> resolveProducer(
PulsarSpace pulsarSpace,
CommandTemplate cmdTpl,
LongFunction<String> cycle_producer_name_func,
LongFunction<String> topic_uri_func
private LongFunction<PulsarOp> resolveMsgSend(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func
) {
LongFunction<String> cycle_producer_name_func;
if (cmdTpl.isStatic("producer_name")) {
cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer_name");
} else if (cmdTpl.isDynamic("producer_name")) {
cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer_name", l);
} else {
cycle_producer_name_func = (l) -> null;
}
LongFunction<Producer<?>> producerFunc =
(l) -> pulsarSpace.getProducer(cycle_producer_name_func.apply(l), topic_uri_func.apply(l));
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l));
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg-key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg-key");
} else if (cmdTpl.isDynamic("msg-key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg-key", l);
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
} else if (cmdTpl.isDynamic("msg_key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l);
} else {
keyFunc = (l) -> null;
}
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg-value")) {
if (cmdTpl.isStatic("msg-value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg-value");
} else if (cmdTpl.isDynamic("msg-value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg-value", l);
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg_value");
} else if (cmdTpl.isDynamic("msg_value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l);
} else {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("\"msg-value\" field must be specified!");
throw new RuntimeException("Producer:: \"msg_value\" field must be specified!");
}
return new PulsarProducerMapper(producerFunc, keyFunc, valueFunc, pulsarSchema, cmdTpl);
return new PulsarProducerMapper(
cmdTpl,
clientSpace,
producerFunc,
async_api_func,
keyFunc,
valueFunc);
}
@Override
public PulsarOp apply(long value) {
PulsarOp op = opFunc.apply(value);
return op;
private LongFunction<PulsarOp> resolveMsgConsume(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func
) {
// Topic list (multi-topic)
LongFunction<String> topic_names_func;
if (cmdTpl.isStatic("topic-names")) {
topic_names_func = (l) -> cmdTpl.getStatic("topic-names");
} else if (cmdTpl.isDynamic("topic-names")) {
topic_names_func = (l) -> cmdTpl.getDynamic("topic-names", l);
} else {
topic_names_func = (l) -> null;
}
// Topic pattern (multi-topic)
LongFunction<String> topics_pattern_func;
if (cmdTpl.isStatic("topics-pattern")) {
topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern");
} else if (cmdTpl.isDynamic("topics-pattern")) {
topics_pattern_func = (l) -> cmdTpl.getDynamic("topics-pattern", l);
} else {
topics_pattern_func = (l) -> null;
}
LongFunction<String> subscription_name_func;
if (cmdTpl.isStatic("subscription-name")) {
subscription_name_func = (l) -> cmdTpl.getStatic("subscription-name");
} else if (cmdTpl.isDynamic("subscription-name")) {
subscription_name_func = (l) -> cmdTpl.getDynamic("subscription-name", l);
} else {
subscription_name_func = (l) -> null;
}
LongFunction<String> subscription_type_func;
if (cmdTpl.isStatic("subscription-type")) {
subscription_type_func = (l) -> cmdTpl.getStatic("subscription-type");
} else if (cmdTpl.isDynamic("subscription-type")) {
subscription_type_func = (l) -> cmdTpl.getDynamic("subscription-type", l);
} else {
subscription_type_func = (l) -> null;
}
LongFunction<String> consumer_name_func;
if (cmdTpl.isStatic("consumer-name")) {
consumer_name_func = (l) -> cmdTpl.getStatic("consumer-name");
} else if (cmdTpl.isDynamic("consumer-name")) {
consumer_name_func = (l) -> cmdTpl.getDynamic("consumer-name", l);
} else {
consumer_name_func = (l) -> null;
}
LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer(
topic_uri_func.apply(l),
topic_names_func.apply(l),
topics_pattern_func.apply(l),
subscription_name_func.apply(l),
subscription_type_func.apply(l),
consumer_name_func.apply(l)
);
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func);
}
private LongFunction<PulsarOp> resolveMsgRead(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func
) {
LongFunction<String> reader_name_func;
if (cmdTpl.isStatic("reader-name")) {
reader_name_func = (l) -> cmdTpl.getStatic("reader-name");
} else if (cmdTpl.isDynamic("reader-name")) {
reader_name_func = (l) -> cmdTpl.getDynamic("reader-name", l);
} else {
reader_name_func = (l) -> null;
}
LongFunction<String> start_msg_pos_str_func;
if (cmdTpl.isStatic("start-msg-position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start-msg-position");
} else if (cmdTpl.isDynamic("start-msg-position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start-msg-position", l);
} else {
start_msg_pos_str_func = (l) -> null;
}
LongFunction<Reader<?>> readerFunc = (l) ->
clientSpace.getReader(
topic_uri_func.apply(l),
reader_name_func.apply(l),
start_msg_pos_str_func.apply(l)
);
return new PulsarReaderMapper(cmdTpl, clientSpace, readerFunc, async_api_func);
}
private LongFunction<PulsarOp> resolveMsgBatchSendStart(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func
) {
LongFunction<String> cycle_batch_producer_name_func;
if (cmdTpl.isStatic("batch_producer_name")) {
cycle_batch_producer_name_func = (l) -> cmdTpl.getStatic("batch_producer_name");
} else if (cmdTpl.isDynamic("batch_producer_name")) {
cycle_batch_producer_name_func = (l) -> cmdTpl.getDynamic("batch_producer_name", l);
} else {
cycle_batch_producer_name_func = (l) -> null;
}
LongFunction<Producer<?>> batchProducerFunc =
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l));
return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, batchProducerFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSend(PulsarSpace clientSpace) {
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
} else if (cmdTpl.isDynamic("msg_key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l);
} else {
keyFunc = (l) -> null;
}
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg_value");
} else if (cmdTpl.isDynamic("msg_value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l);
} else {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("Batch Producer:: \"msg_value\" field must be specified!");
}
return new PulsarBatchProducerMapper(
cmdTpl,
clientSpace,
keyFunc,
valueFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSendEnd(PulsarSpace clientSpace) {
return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace);
}
}

View File

@ -80,6 +80,12 @@ public class AvroUtil {
return recordBuilder.build();
}
public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDef, String jsonData) {
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
}
public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDef, String jsonData) {
GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDef);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData);

View File

@ -23,22 +23,25 @@ public class PulsarActivityUtil {
private final static Logger logger = LogManager.getLogger(PulsarActivityUtil.class);
// Supported message operation types
public enum CLIENT_TYPES {
PRODUCER("producer"),
CONSUMER("consumer"),
READER("reader"),
WSOKT_PRODUCER("websocket-producer"),
MANAGED_LEDGER("managed-ledger")
;
// TODO: websocket-producer and managed-ledger
public enum OP_TYPES {
CREATE_TENANT("create-tenant"),
CREATE_NAMESPACE("create-namespace"),
BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"),
MSG_SEND("msg-send"),
MSG_CONSUME("msg-consume"),
MSG_READ("msg-read");
public final String label;
CLIENT_TYPES(String label) {
OP_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidClientType(String type) {
return Arrays.stream(CLIENT_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
return Arrays.stream(OP_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
}
@ -50,7 +53,6 @@ public class PulsarActivityUtil {
;
public final String label;
PERSISTENT_TYPES(String label) {
this.label = label;
}
@ -87,7 +89,6 @@ public class PulsarActivityUtil {
;
public final String label;
CLNT_CONF_KEY(String label) {
this.label = label;
}
@ -120,7 +121,6 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isStandardProducerConfItem(String item) {
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
@ -162,6 +162,23 @@ public class PulsarActivityUtil {
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
public enum SUBSCRIPTION_TYPE {
exclusive("exclusive"),
failover("failover"),
shared("shared"),
key_shared("key_shared");
public final String label;
SUBSCRIPTION_TYPE(String label) {
this.label = label;
}
}
public static boolean isValidSubscriptionType(String item) {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Standard reader configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#reader
@ -182,7 +199,6 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isStandardReaderConfItem(String item) {
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
@ -213,6 +229,10 @@ public class PulsarActivityUtil {
}
}
public static boolean isValideReaderStartPosition(String item) {
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Valid websocket-producer configuration (activity-level settings)
// TODO: to be added
@ -233,7 +253,6 @@ public class PulsarActivityUtil {
;
public final String label;
MANAGED_LEDGER_CONF_KEY(String label) {
this.label = label;
}
@ -262,7 +281,6 @@ public class PulsarActivityUtil {
return isPrimitive;
}
public static Schema<?> getPrimitiveTypeSchema(String typeStr) {
Schema<?> schema;
@ -331,7 +349,6 @@ public class PulsarActivityUtil {
}
return isAvroType;
}
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
String schemaDefinitionStr = definitionStr;
String filePrefix = "file://";

View File

@ -7,7 +7,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -23,13 +23,11 @@ public class PulsarNBClientConf {
private String canonicalFilePath = "";
public static final String DRIVER_CONF_PREFIX = "driver";
public static final String SCHEMA_CONF_PREFIX = "schema";
public static final String CLIENT_CONF_PREFIX = "client";
public static final String PRODUCER_CONF_PREFIX = "producer";
public static final String CONSUMER_CONF_PREFIX = "consumer";
public static final String READER_CONF_PREFIX = "reader";
private final HashMap<String, Object> driverConfMap = new HashMap<>();
private final HashMap<String, Object> schemaConfMap = new HashMap<>();
private final HashMap<String, Object> clientConfMap = new HashMap<>();
private final HashMap<String, Object> producerConfMap = new HashMap<>();
@ -53,14 +51,6 @@ public class PulsarNBClientConf {
Configuration config = builder.getConfiguration();
// Get driver specific configuration settings
for (Iterator<String> it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if (!StringUtils.isBlank(confVal))
driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
}
// Get schema specific configuration settings
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
@ -110,50 +100,11 @@ public class PulsarNBClientConf {
}
//////////////////
// Get NB Driver related config
public Map<String, Object> getDriverConfMap() {
return this.driverConfMap;
}
public boolean hasDriverConfKey(String key) {
if (key.contains(DRIVER_CONF_PREFIX))
return driverConfMap.containsKey(key.substring(DRIVER_CONF_PREFIX.length() + 1));
else
return driverConfMap.containsKey(key);
}
public Object getDriverConfValue(String key) {
if (key.contains(DRIVER_CONF_PREFIX))
return driverConfMap.get(key.substring(DRIVER_CONF_PREFIX.length() + 1));
else
return driverConfMap.get(key);
}
public void setDriverConfValue(String key, Object value) {
if (key.contains(DRIVER_CONF_PREFIX))
driverConfMap.put(key.substring(DRIVER_CONF_PREFIX.length() + 1), value);
else
driverConfMap.put(key, value);
}
// other driver helper functions ...
public String getPulsarClientType() {
Object confValue = getDriverConfValue("driver.client-type");
// If not explicitly specifying Pulsar client type, "producer" is the default type
if (confValue == null)
return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString();
else
return confValue.toString();
}
//////////////////
// Get Schema related config
public Map<String, Object> getSchemaConfMap() {
return this.schemaConfMap;
}
public boolean hasSchemaConfKey(String key) {
if (key.contains(SCHEMA_CONF_PREFIX))
return schemaConfMap.containsKey(key.substring(SCHEMA_CONF_PREFIX.length() + 1));
@ -166,7 +117,6 @@ public class PulsarNBClientConf {
else
return schemaConfMap.get(key);
}
public void setSchemaConfValue(String key, Object value) {
if (key.contains(SCHEMA_CONF_PREFIX))
schemaConfMap.put(key.substring(SCHEMA_CONF_PREFIX.length() + 1), value);
@ -180,7 +130,6 @@ public class PulsarNBClientConf {
public Map<String, Object> getClientConfMap() {
return this.clientConfMap;
}
public boolean hasClientConfKey(String key) {
if (key.contains(CLIENT_CONF_PREFIX))
return clientConfMap.containsKey(key.substring(CLIENT_CONF_PREFIX.length() + 1));
@ -193,7 +142,6 @@ public class PulsarNBClientConf {
else
return clientConfMap.get(key);
}
public void setClientConfValue(String key, Object value) {
if (key.contains(CLIENT_CONF_PREFIX))
clientConfMap.put(key.substring(CLIENT_CONF_PREFIX.length() + 1), value);
@ -207,7 +155,6 @@ public class PulsarNBClientConf {
public Map<String, Object> getProducerConfMap() {
return this.producerConfMap;
}
public boolean hasProducerConfKey(String key) {
if (key.contains(PRODUCER_CONF_PREFIX))
return producerConfMap.containsKey(key.substring(PRODUCER_CONF_PREFIX.length() + 1));
@ -226,7 +173,6 @@ public class PulsarNBClientConf {
else
producerConfMap.put(key, value);
}
// other producer helper functions ...
public String getProducerName() {
Object confValue = getProducerConfValue("producer.producerName");
@ -235,7 +181,6 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public String getProducerTopicName() {
Object confValue = getProducerConfValue("producer.topicName");
if (confValue == null)
@ -250,28 +195,24 @@ public class PulsarNBClientConf {
public Map<String, Object> getConsumerConfMap() {
return this.consumerConfMap;
}
public boolean hasConsumerConfKey(String key) {
if (key.contains(CONSUMER_CONF_PREFIX))
return consumerConfMap.containsKey(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
else
return consumerConfMap.containsKey(key);
}
public Object getConsumerConfValue(String key) {
if (key.contains(CONSUMER_CONF_PREFIX))
return consumerConfMap.get(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
else
return consumerConfMap.get(key);
}
public void setConsumerConfValue(String key, Object value) {
if (key.contains(CONSUMER_CONF_PREFIX))
consumerConfMap.put(key.substring(CONSUMER_CONF_PREFIX.length() + 1), value);
else
consumerConfMap.put(key, value);
}
// Other consumer helper functions ...
public String getConsumerTopicNames() {
Object confValue = getConsumerConfValue("consumer.topicNames");
@ -280,7 +221,6 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public String getConsumerTopicPattern() {
Object confValue = getConsumerConfValue("consumer.topicsPattern");
if (confValue == null)
@ -288,7 +228,6 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public String getConsumerSubscriptionName() {
Object confValue = getConsumerConfValue("consumer.subscriptionName");
if (confValue == null)
@ -296,7 +235,6 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public String getConsumerSubscriptionType() {
Object confValue = getConsumerConfValue("consumer.subscriptionType");
if (confValue == null)
@ -304,7 +242,6 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public String getConsumerName() {
Object confValue = getConsumerConfValue("consumer.consumerName");
if (confValue == null)
@ -319,28 +256,24 @@ public class PulsarNBClientConf {
public Map<String, Object> getReaderConfMap() {
return this.readerConfMap;
}
public boolean hasReaderConfKey(String key) {
if (key.contains(READER_CONF_PREFIX))
return readerConfMap.containsKey(key.substring(READER_CONF_PREFIX.length() + 1));
else
return readerConfMap.containsKey(key);
}
public Object getReaderConfValue(String key) {
if (key.contains(READER_CONF_PREFIX))
return readerConfMap.get(key.substring(READER_CONF_PREFIX.length() + 1));
else
return readerConfMap.get(key);
}
public void setReaderConfValue(String key, Object value) {
if (key.contains(READER_CONF_PREFIX))
readerConfMap.put(key.substring(READER_CONF_PREFIX.length() + 1), value);
else
readerConfMap.put(key, value);
}
// Other consumer helper functions ...
public String getReaderTopicName() {
Object confValue = getReaderConfValue("reader.topicName");
@ -349,7 +282,6 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public String getReaderName() {
Object confValue = getReaderConfValue("reader.readerName");
if (confValue == null)
@ -357,7 +289,6 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public String getStartMsgPosStr() {
Object confValue = getReaderConfValue("reader.startMessagePos");
if (confValue == null)

View File

@ -1,9 +1,3 @@
### NB Pulsar driver related configuration - driver.xxx
driver.client-type=producer
driver.num-workers=1
# TODO: functionalities to be completed
driver.sync-mode=sync
driver.msg-recv-ouput=console
### Schema related configurations - schema.xxx
# valid types:
# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
@ -15,15 +9,14 @@ driver.msg-recv-ouput=console
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
# 2) Avro for messages with schema
schema.type=avro
schema.definition=file://<path>/<to>/<avro-definition-file>
schema.definition=file://<file/path/to/iot-example.avsc>
### Pulsar client related configurations - client.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
client.serviceUrl=pulsar://localhost:6650
client.connectionTimeoutMs=5000
### Producer related configurations (global) - producer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
producer.producerName=
producer.topicName=persistent://public/default/mynbtest
producer.topicName=
producer.sendTimeoutMs=
### Consumer related configurations (global) - consumer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
@ -36,7 +29,7 @@ consumer.receiverQueueSize=
### Reader related configurations (global) - reader.xxx
# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
# - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file>
reader.topicName=persistent://public/default/nbpulsar
reader.topicName=
reader.receiverQueueSize=
reader.readerName=
#reader.startMessagePos = earliest
reader.startMessagePos=earliest

View File

@ -7,55 +7,88 @@ description: |
bindings:
mykey: NumberNameToString();
sensor_id: ToUUID();ToString();
# sensor_type:
# sensor_type:
reading_time: ToDateTime();
reading_value: ToFloat(100);
topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L));
# document level parameters that apply to all Pulsar client types:
params:
#topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar"
async_api: "false"
blocks:
# - create-tenant-namespace:
# tags:
# type: create-tenant-namespace
# statements:
# tenant: {tenant}
# namespace: {namespace}
- name: admin-block
tags:
phase: create-tenant-namespace
statements:
- name: s1
optype: create-tenant
tenant: "{tenant}"
- name: s2
optype: create-namespace
namespace: "{namespace}"
- name: batch-producer-block
tags:
phase: batch-producer
statements:
- name: s1
optype: batch-msg-send-start
# For batch producer, "producer_name" should be associated with batch start
# batch_producer_name: {batch_producer_name}
ratio: 1
- name: s2
optype: batch-msg-send
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
ratio: 100
- name: s3
optype: batch-msg-send-end
ratio: 1
- name: producer-block
tags:
op-type: producer
phase: producer
statements:
- producer-stuff:
#######
# NOTE: tenant and namespace must be static and pre-exist in Pulsar first
# topic_uri: "[persistent|non-persistent]://<tenant>/<namespace>/<topic>"
# topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar"
# producer-name:
msg-key: "{mykey}"
msg-value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
- name: s1
optype: msg-send
# producer_name: {producer_name}
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
- name: consumer-block
tags:
op-type: consumer
phase: consumer
statements:
- consumer-stuff:
topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest"
topics-pattern: "public/default/.*"
subscription-name:
subscription-type:
consumer-name:
- name: s1
optype: msg-consume
topic_names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest"
topics_pattern: "public/default/.*"
subscription_name:
subscription_type:
consumer_name:
- reader:
- name: reader-block
tags:
op-type: reader
phase: reader
statements:
- reader-stuff:
- name: s1
optype: msg-read
reader_name:
# - websocket-producer:
# tags:
@ -68,5 +101,3 @@ blocks:
# type: managed-ledger
# statement:
# - managed-ledger-stuff:

View File

@ -1,13 +1,19 @@
- [1. NoSQLBench (NB) Pulsar Driver Overview](#1-nosqlbench-nb-pulsar-driver-overview)
- [1.1. Issues Tracker](#11-issues-tracker)
- [1.2. Global Level Pulsar Configuration Settings](#12-global-level-pulsar-configuration-settings)
- [1.3. Pulsar Driver Yaml File: Statement Blocks](#13-pulsar-driver-yaml-file-statement-blocks)
- [1.3.1. Producer Statement block](#131-producer-statement-block)
- [1.3.2. Consumer Statement block](#132-consumer-statement-block)
- [1.4. Schema Support](#14-schema-support)
- [1.5. Activity Parameters](#15-activity-parameters)
- [1.6. Pulsar NB Execution Example](#16-pulsar-nb-execution-example)
- [2. Advanced Driver Features -- TODO: Design Revisit](#2-advanced-driver-features----todo-design-revisit)
- [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure)
- [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters)
- [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details)
- [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block)
- [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block)
- [1.4.3. Producer Command Block](#143-producer-command-block)
- [1.4.4. Consumer Command Block](#144-consumer-command-block)
- [1.4.5. Reader Command Block](#145-reader-command-block)
- [1.5. Schema Support](#15-schema-support)
- [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters)
- [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example)
- [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties)
- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features)
- [2.1. Other Activity Parameters](#21-other-activity-parameters)
- [2.2. API Caching](#22-api-caching)
- [2.2.1. Instancing Controls](#221-instancing-controls)
@ -17,12 +23,10 @@
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
* Producer
* Consumer
* (Future) Reader
* Reader
* (Future) WebSocket Producer
* (Future) Managed Ledger
**NOTE**: At the moment, only Producer workload type is fully supported in NB. The support for Consumer type is partially added but not completed yet; and the support for other types of workloads will be added in NB in future releases.
## 1.1. Issues Tracker
If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
@ -36,15 +40,11 @@ When creating these objects (e.g. PulsarClient, Producer), there are different c
The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below:
```properties
### NB Pulsar driver related configuration - driver.xxx
driver.client-type = producer
### Schema related configurations - schema.xxx
schema.type = avro
schema.definition = file:///<path/to/avro/schema/definition/file>
### Pulsar client related configurations - client.xxx
client.serviceUrl = pulsar://<pulsar_broker_ip>:6650
client.connectionTimeoutMs = 5000
### Producer related configurations (global) - producer.xxx
@ -54,92 +54,402 @@ producer.sendTimeoutMs =
```
There are multiple sections in this file that correspond to different groups of configuration settings:
* **NB pulsar driver related settings**:
* All settings under this section starts with **driver.** prefix.
* Right now there is only valid option under this section:
* *driver.client-type* determines what type of Pulsar workload to be simulated by NB.
* **Schema related settings**:
* All settings under this section starts with **schema.** prefix.
* The NB Pulsar driver supports schema-based message publishing and consuming. This section defines configuration settings that are schema related.
* There are 2 valid options under this section.
* *shcema.type*: Pulsar message schema type. When unset or set as an empty string, Pulsar messages will be handled in raw *byte[]* format. The other valid option is **avro** which the Pulsar message will follow a specific Avro format.
* *schema.definition*: This only applies when an Avro schema type is specified and the value is the (full) file path that contains the Avro schema definition.
* *shcema.type*: Pulsar message schema type. When unset or set as
an empty string, Pulsar messages will be handled in raw *byte[]*
format. The other valid option is **avro** which the Pulsar
message will follow a specific Avro format.
* *schema.definition*: This only applies when an Avro schema type
is specified and the value is the (full) file path that contains
the Avro schema definition.
* **Pulsar Client related settings**:
* All settings under this section starts with **client.** prefix.
* This section defines all configuration settings that are related with defining a PulsarClient object.
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
* This section defines all configuration settings that are related
with defining a PulsarClient object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
* **Pulsar Producer related settings**:
* All settings under this section starts with **producer.** prefix.
* This section defines all configuration settings that are related with defining a Pulsar Producer object.
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
* All settings under this section starts with **producer** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Producer object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
* **Pulsar Consumer related settings**:
* All settings under this section starts with **consumer** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Consumer object.
*
See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
* **Pulsar Reader related settings**:
* All settings under this section starts with **reader** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Reader object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
In the future, when the support for other types of Pulsar workloads is added in NB Pulsar driver, there will be corresponding configuration sections in this file as well.
In the future, when the support for other types of Pulsar workloads is
added in NB Pulsar driver, there will be corresponding configuration
sections in this file as well.
## 1.3. Pulsar Driver Yaml File: Statement Blocks
## 1.3. NB Pulsar Driver Yaml File - High Level Structure
Just like other NB driver types, the actual Pulsar workload generation is determined by the statement blocks in the NB driver Yaml file. Depending on the Pulsar workload type, the corresponding statement block may have different contents.
Just like other NB driver types, the actual Pulsar workload generation is
determined by the statement blocks in the NB driver Yaml file. Depending
on the Pulsar workload type, the corresponding statement block may have
different contents.
### 1.3.1. Producer Statement block
At high level, Pulsar driver yaml file has the following structure:
An example of defining Pulsar **Producer** workload is as below:
* **description**: (optional) general description of the yaml file
* **bindings**: defines NB bindings
* **params**: document level Pulsar driver parameters that apply to all
command blocks. Currently there are two valid parameters:
* **topic_url**: Pulsar topic uri ([persistent|non-persistent]:
//<tenant>/<namespace>/<topic>). This can be statically assigned or
dynamically generated via NB bindings.
* **async_api**: Whether to use asynchronous Pulsar API (**note**:
more on this later)
* **blocks**: includes a series of command blocks. Each command block
defines one major Pulsar operation such as *producer*, *consumer*, etc.
Right now, the following command blocks are already supported or will be
added in the near future. We'll go through each of these command blocks
with more details in later sections.
* (future) **admin-block**: support for Pulsar Admin API, starting
with using NB to create tenants and namespaces.
* **batch-producer-block**: Pulsar batch producer
* **producer-block**: Pulsar producer
* **consumer-block**: Pulsar consumer
* **reader-block**: Pulsar reader
```yaml
description: |
... ...
bindings:
... ...
# global parameters that apply to all Pulsar client types:
params:
topic_uri: "<pulsar_topic_name>"
async_api: "false"
blocks:
- name: producer-block
tags:
type: producer
statements:
- producer-stuff:
# producer-name:
# topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar"
msg-key: "{mykey}"
msg-value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
- name: <command_block_1>
tags:
phase: <command_bock_filtering_identifier>
statements:
- name: <statement_name_1>
optype: <statement_filtering_identifier>
... <statement_specific_parameters> ...
- name: <statement_name_2>
... ...
- name: <command_block_2>
tags:
...
statements:
...
```
In the above statement block, there are 4 key statement parameters to provide values:
* **producer-name**: cycle-level Pulsar producer name (can be dynamically bound)
* **Optional**
* If not set, global level producer name in *config.properties* file will be used.
* Use a default producer name, "default", if it is neither set at global level.
* If set, cycle level producer name will take precedence over the global level setting
Each time when you execute the NB command, you can only choose one command
block to execute. This is achieved by applying a filtering condition
against **phase** tag, as below:
* **topic_uri**: cycle-level Pulsar topic name (can be dynamically bound)
* **Optional**
* If not set, global level topic_uri in *config.properties* file will be used
* Throw a Runtime Error if it is neither set at global level
* If set, cycle level topic_uri will take precedence over the global level setting; and the provided value must follow several guidelines:
* It must be in valid Pulsar topic format as below:
```
[persistent|non-persistent]://<tenant-name>/<namespace-name>/<short-topic-name>
```
* At the moment, only "**\<short-topic-name\>**" part can be dynamically bound (e.g. through NB binding function). All other parts must be static values and the corresponding tenants and namespaces must be created in the Pulsar cluster in advance.
```bash
<nb_cmd> driver=pulsar tags=phase:<command_bock_filtering_identifier> ...
```
**TODO**: allow dynamic binding for "\<tenant-name\>" and "
\<namespace-name\>" after adding a phase for creating "\<tenant-name\>"
and/or "\<namespace-name\>", similar to C* CQL schema creation phase!
An example of executing Pulsar producer/consumer API using NB is like
this:
* **msg-key**: Pulsar message key
* **Optional**
* If not set, the generated Pulsar messages (to be published by the Producer) doesn't have **keys**.
```bash
# producer
<nb_cmd> driver=pulsar tags=phase:producer ...
* **msg-value**: Pulsar message payload
* **Mandatory**
* If not set, throw a Runtime Error.
# consumer
<nb_cmd> driver=pulsar tags=phase:consumer ...
```
### 1.3.2. Consumer Statement block
### 1.3.1. NB Cycle Level Parameters vs. Global Level Parameters
**TBD ...**
Some parameters, especially topic name and producer/consumer/reader/etc.
name, can be set at the global level in **config.properties** file, or at
NB cycle level via **pulsar.yaml** file. An example of setting a topic
name in both levels is as below:
## 1.4. Schema Support
```bash
# Global level setting (config.properties):
producer.topicName = ...
# Cycle level setting (pulsar.yaml)
params:
topic_uri: ...
```
In theory, all Pulsar client settings can be made as cycle level settings
for maximum flexibility. But practically speaking (and also for simplicity
purposes), only the following parameters are made to be configurable at
both levels, listed by cycle level setting names with their corresponding
global level setting names:
* topic_uri (Mandatory)
* producer.topicName
* consumer.topicNames
* reader.topicName
* topic_names (Optional for Consumer)
* consumer.topicNames
* subscription_name (Mandatory for Consumer)
* consumer.subscriptionName
* subscription_type (Mandatory for Consumer, default to **exclusive**
type)
* consumer.subscriptionType
* topics_pattern (Optional for Consumer)
* consumer.topicsPattern
* producer_name (Optional)
* producer.producerName
* consumer_name (Optional)
* consumer.consumerName
* reader_name (Optional)
* reader.readerName
One key difference between setting a parameter at the global level vs. at
the cycle level is that the global level setting is always static and
stays the same for all NB cycle execution. The cycle level setting, on the
other side, can be dynamically bound and can be different from cycle to
cycle.
Because of this, setting these parameters at the NB cycle level allows us
to run Pulsar testing against multiple topics and/or multiple
producers/consumers/readers/etc all at once within one NB activity. This
makes the testing more flexible and effective.
**NOTE**: when a configuration is set at both the global level and the
cycle level, **the ycle level setting will take priority!**
## 1.4. Pulsar Driver Yaml File - Command Block Details
### 1.4.1. Pulsar Admin API Command Block
**NOTE**: this functionality is only partially implemented at the moment
and doesn't function yet.
Currently, the Pulsar Admin API Block is (planned) to only support
creating Pulsar tenants and namespaces. It has the following format:
```yaml
- name: admin-block
tags:
phase: create-tenant-namespace
statements:
- name: s1
optype: create-tenant
tenant: "{tenant}"
- name: s2
optype: create-namespace
namespace: "{namespace}"
```
In this command block, there are 2 statements (s1 and s2):
* Statement **s1** is used for creating a Pulsar tenant
* (Mandatory) **optype (create-tenant)** is the statement identifier
for this statement
* (Mandatory) **tenant** is the only statement parameter that
specifies the Pulsar tenant name which can either be dynamically
bound or statically assigned.
* Statement **s2** is used for creating a Pulsar namespace
* (Mandatory) **optype (create-namespace)** is the statement
identifier for this statement
* (Mandatory) **namespace** is the only statement parameter that
specifies the Pulsar namespace under the tenant created by statement
s1. Its name can either be dynamically bound or statically assigned.
### 1.4.2. Batch Producer Command Block
Batch producer command block is used to produce a batch of messages all at
once by one NB cycle execution. A typical format of this command block is
as below:
```yaml
- name: batch-producer-block
tags:
phase: batch-producer
statements:
- name: s1
optype: batch-msg-send-start
# For batch producer, "producer_name" should be associated with batch start
batch_producer_name: {batch_producer_name}
ratio: 1
- name: s2
optype: batch-msg-send
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
ratio: 100
- name: s3
optype: batch-msg-send-end
ratio: 1
```
This command block has 3 statements (s1, s2, and s3) with the following
ratios: 1, <batch_num>, 1.
* Statement **s1** is used to mark the start of a batch of message
production within one NB cycle
* (Mandatory) **optype (batch-msg-send-start)** is the statement
identifier for this statement
* (Optional) **batch_producer_name**, when provided, specifies the
Pulsar producer name that is associated with the batch production of
the messages.
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it
is default to 1.
* Statement **s2** is the core statement that generates the message key
and payload to be put in the batch.
* (Mandatory) **optype (batch-msg-send)** is the statement identifier
for this statement
* (Optional) **msg-key**, when provided, specifies the key of the
generated message
* (Mandatory) **msg-payload** specifies the payload of the generated
message
* (Optional) **ratio**, when provided, specifies the batch size (how
many messages to be put in one batch). If not provided, it is
default to 1.
* Statement **s3** is used to mark the end of a batch within one NB cycle
* (Mandatory) **optype (batch-msg-send-end)** is the statement
identifier for this statement
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it
is default to 1.
### 1.4.3. Producer Command Block
This is the regular Pulsar producer command block that produces one Pulsar
message per NB cycle execution. A typical format of this command block is
as below:
```yaml
- name: producer-block
tags:
phase: producer
statements:
- name: s1
optype: msg-send
# producer_name: {producer_name}
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
```
This command block only has 1 statements (s1):
* Statement **s1** is used to generate the key and payload for one message
* (Mandatory) **optype (msg-send)** is the statement identifier for
this statement
* (Optional) **producer_name**, when provided, specifies the Pulsar
producer name that is associated with the message production.
* (Optional) **msg-key**, when provided, specifies the key of the
generated message
* (Mandatory) **msg-payload** specifies the payload of the generated
message
### 1.4.4. Consumer Command Block
This is the regular Pulsar consumer command block that consumes one Pulsar
message per NB cycle execution. A typical format of this command block is
as below:
```yaml
- name: consumer-block
tags:
phase: consumer
statements:
- name: s1
optype: msg-consume
topic_names: "<pulsar_topic_1>, <pulsar_topic_2>"
# topics_pattern: "<pulsar_topic_regex_pattern>"
subscription_name:
subscription_type:
consumer_name:
```
This command block only has 1 statements (s1):
* Statement **s1** is used to consume one message from the Pulsar cluster
and acknowledge it.
* (Mandatory) **optype (msg-consume)** is the statement identifier for
this statement
* (Optional) **topic_names**, when provided, specifies multiple topic
names from which to consume messages. Default to document level
parameter **topic_uri**.
* (Optional) **topics_pattern**, when provided, specifies pulsar
topic regex pattern for multi-topic message consumption
* (Mandatory) **subscription_name** specifies subscription name.
* (Optional) **subscription_type**, when provided, specifies
subscription type. Default to **exclusive** subscription type.
* (Optional) **consumer_name**, when provided, specifies the
associated consumer name.
### 1.4.5. Reader Command Block
This is the regular Pulsar reader command block that reads one Pulsar
message per NB cycle execution. A typical format of this command block is
as below:
```yaml
- name: reader-block
tags:
phase: reader
statements:
- name: s1
optype: msg-read
reader_name:
```
This command block only has 1 statements (s1):
* Statement **s1** is used to consume one message from the Pulsar cluster
and acknowledge it.
* (Mandatory) **optype (msg-consume)** is the statement identifier for
this statement
* (Optional) **reader_name**, when provided, specifies the associated
consumer name.
**TBD**: at the moment, the NB Pulsar driver Reader API only supports
reading from the following positions:
* MessageId.earliest
* MessageId.latest (default)
A customized reader starting position, as below, is NOT supported yet!
```java
byte[] msgIdBytes = // Some message ID byte array
MessageId id = MessageId.fromByteArray(msgIdBytes);
Reader reader = pulsarClient.newReader()
.topic(topic)
.startMessageId(id)
.create();
```
## 1.5. Schema Support
Pulsar has built-in schema support. Other than primitive types, Pulsar
also supports complex types like **Avro**, etc. At the moment, the NB
Pulsar driver provides 2 schema support modes, via the global level schema
related settings as below:
Pulsar has built-in schema support. Other than primitive types, Pulsar also supports complex types like **Avro**, etc. At the moment, the NB Pulsar driver provides 2 schema support modes, via the global level schema related settings as below:
* Avro schema:
```properties
shcema.type= avro
@ -151,10 +461,9 @@ Pulsar has built-in schema support. Other than primitive types, Pulsar also supp
schema.definition=
```
For the previous Producer block statement example, the **msg-value**
Take the previous Producer command block as an example, the **msg-value**
parameter has the value of a JSON string that follows the following Avro
schema definition (e.g. as in the sample schema definition
file: **[iot-example.asvc](activities/iot-example.avsc)**)
schema definition:
```json
{
"type": "record",
@ -169,43 +478,107 @@ file: **[iot-example.asvc](activities/iot-example.avsc)**)
}
```
## 1.5. Activity Parameters
## 1.6. NB Activity Execution Parameters
At the moment, the following Pulsar driver specific Activity Parameter is
supported:
At the moment, the following NB Pulsar driver **specific** activity
parameters are supported:
- * config=<file/path/to/global/configuration/properties/file>
* service_url=<pulsar_driver_url>
* config=<file/path/to/global/configuration/properties/file>
## 1.6. Pulsar NB Execution Example
Some other common NB activity parameters are listed as below. Please
reference to NB documentation for more parameters
```
<NB_Cmd> run type=pulsar -vv cycles=10 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
* driver=pulsar
* seq=concat (needed for **batch** producer)
* tags=phase:<command_block_identifier>
* threads=<NB_execution_thread_number>
* cycles=<total_NB_cycle_execution_number>
* --report-csv-to <metrics_output_dir_name>
## 1.7. NB Pulsar Driver Execution Example
1. Run Pulsar producer API to produce 100K messages using 100 NB threads
```bash
<nb_cmd> run driver=pulsar tags=phase:producer threads=100 cycles=100K config=<dir>/config.properties yaml=<dir>/pulsar.yaml
```
**NOTE**:
2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads;
put NB execution metrics in a specified metrics folder
* An example of **config.properties** file is [here](activities/config.properties)
* An example of **pulsar.yaml** file is [here](activities/pulsar.yaml)
```bash
<nb_cmd> run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M config=<dir>/config.properties yaml=<dir>/pulsar.yaml --report-csv-to <metrics_folder_path>
```
3. Run Pulsar consumer API to consume (and acknowledge) 100 messages using
one single NB thread.
```bash
<nb_cmd> run driver=pulsar tags=phase:consumer cycles=100 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
```
## Appendix A. Template Global Setting File (config.properties)
```properties
schema.type =
schema.definition =
### Pulsar client related configurations - client.xxx
client.connectionTimeoutMs =
### Producer related configurations (global) - producer.xxx
producer.producerName =
producer.topicName =
producer.sendTimeoutMs =
### Consumer related configurations (global) - consumer.xxx
consumer.topicNames =
consumer.topicsPattern =
consumer.subscriptionName =
consumer.subscriptionType =
consumer.consumerName =
consumer.receiverQueueSize =
### Reader related configurations (global) - reader.xxx
reader.topicName =
reader.receiverQueueSize =
reader.readerName =
reader.startMessagePos =
```
---
# 2. Advanced Driver Features -- TODO: Design Revisit
# 2. TODO : Design Revisit -- Advanced Driver Features
**NOTE**: The following text is based on the original multi-layer API caching design which is not fully implemented at the moment. We need to revisit the original design at some point in order to achieve maximum testing flexibility.
**NOTE**: The following text is based on the original multi-layer API
caching design which is not fully implemented at the moment. We need to
revisit the original design at some point in order to achieve maximum
testing flexibility.
To summarize, the original caching design has the following key requirements:
To summarize, the original caching design has the following key
requirements:
* **Requirement 1**: Each NB Pulsar activity is able to launch and cache
multiple **client spaces**
* **Requirement 2**: Each client space can launch and cache multiple
Pulsar operators of the same type (producer, consumer, etc.)
* **Requirement 3**: The size of each Pulsar operator specific cached
space can be configurable.
In the current implementation, only requirement 2 is implemented. Regarding requirement 1, the current implementation only supports one client space per NB Pulsar activity!
In the current implementation, only requirement 2 is implemented.
* For requirement 1, the current implementation only supports one client
space per NB Pulsar activity
* For requirement 3, the cache space size is not configurable (no limit at
the moment)
## 2.1. Other Activity Parameters
- **url** - The pulsar url to connect to.
- **default** - `url=pulsar://localhost:6650`
- **maxcached** - A default value to be applied to `max_clients`,
`max_producers`, `max_consumers`.
- default: `max_cached=100`
@ -213,13 +586,9 @@ In the current implementation, only requirement 2 is implemented. Regarding requ
instances which are allowed to be cached in the NoSQLBench client
runtime. The clients cache automatically maintains a cache of unique
client instances internally. default: _maxcached_
- **max_producers** - Producers cache size (per client instance). Limits
the number of producer instances which are allowed to be cached per
client instance. default: _maxcached_
- **max_consumers** - Consumers cache size (per client instance). Limits
the number of consumer instances which are allowed to be cached per
client instance.
- **max_operators** - Producers/Consumers/Readers cache size (per client
instance). Limits the number of instances which are allowed to be cached
per client instance. default: _maxcached_
## 2.2. API Caching

View File

@ -1,3 +0,0 @@
# pulsar help topics
- pulsar