Pulsar Batch API, Async API (partial), yaml file structure,

This commit is contained in:
Yabin Meng 2021-03-06 10:38:08 -06:00
parent 305a6046b3
commit adf1ea03e6
25 changed files with 1037 additions and 648 deletions

View File

@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
@ -10,6 +11,7 @@ import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -24,14 +26,13 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer executeTimer;
private PulsarSpaceCache pulsarCache;
private NBErrorHandler errorhandler;
private PulsarNBClientConf clientConf;
private String serviceUrl;
private NBErrorHandler errorhandler;
private OpSequence<LongFunction<PulsarOp>> sequencer;
// private PulsarClient activityClient;
private Supplier<PulsarSpace> clientSupplier;
// private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
public PulsarActivity(ActivityDef activityDef) {
@ -48,6 +49,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile);
serviceUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
pulsarCache = new PulsarSpaceCache(this);
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache));
@ -72,6 +75,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public PulsarNBClientConf getPulsarConf() {
return clientConf;
}
public String getPulsarServiceUrl() { return serviceUrl; }
public Timer getBindTimer() {
return bindTimer;

View File

@ -1,196 +0,0 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
public class PulsarConsumerSpace extends PulsarSpace {
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) { super(name, pulsarClientConf); }
private String getEffectiveTopicNamesStr(String cycleTopicNames) {
if ( !StringUtils.isBlank(cycleTopicNames) ) {
return cycleTopicNames;
}
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
if ( !StringUtils.isBlank(globalTopicNames) ) {
return globalTopicNames;
}
return "";
}
private List<String> getEffectiveTopicNames(String cycleTopicNames) {
String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if ( !StringUtils.isBlank(name) )
effectiveTopicNameList.add(name.trim());
}
return effectiveTopicNameList;
}
private String getEffectiveTopicPatternStr(String cycleTopicsPattern) {
if ( !StringUtils.isBlank(cycleTopicsPattern) ) {
return cycleTopicsPattern;
}
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
if ( !StringUtils.isBlank(globalTopicsPattern) ) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) {
String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern;
try {
if ( !StringUtils.isBlank(effectiveTopicsPatternStr) )
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
else
topicsPattern = null;
}
catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
}
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
if ( !StringUtils.isBlank(cycleSubscriptionName) ) {
return cycleSubscriptionName;
}
String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName();
if ( !StringUtils.isBlank(globalSubscriptionName) ) {
return globalSubscriptionName;
}
return "default-subs";
}
private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) {
if ( !StringUtils.isBlank(cycleSubscriptionType) ) {
return cycleSubscriptionType;
}
String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType();
if ( !StringUtils.isBlank(globalSubscriptionType) ) {
return globalSubscriptionType;
}
return "";
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType);
SubscriptionType subscriptionType;
try {
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
}
catch (IllegalArgumentException iae) {
subscriptionType = SubscriptionType.Exclusive;
}
return subscriptionType;
}
private String getEffectiveConsumerName(String cycleConsumerName) {
if ( !StringUtils.isBlank(cycleConsumerName) ) {
return cycleConsumerName;
}
String globalConsumerName = pulsarNBClientConf.getConsumerName();
if ( !StringUtils.isBlank(globalConsumerName) ) {
return globalConsumerName;
}
return "default-cons";
}
public Consumer<?> getConsumer(String cycleTopicNames,
String cycleTopicsPattern,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName) {
String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
List<String> topicNames = getEffectiveTopicNames(cycleTopicNames);
String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if ( topicNames.isEmpty() && (topicsPattern == null) ) {
throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!");
}
String encodedStr;
if ( !topicNames.isEmpty() ) {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
StringUtils.join(topicNames, "|") );
}
else {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
topicsPatternStr );
}
Consumer<?> consumer = consumers.get(encodedStr);
if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> consumerConf = pulsarNBClientConf.getConsumerConfMap();
// Explicit topic names will take precedence over topics pattern
if ( !topicNames.isEmpty() ) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.toString(), topicNames);
}
else {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.put(
PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
getEffectiveTopicPattern(cycleTopicsPattern));
}
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName);
try {
consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe();
}
catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar consumer!");
}
consumers.put(encodedStr, consumer);
}
return consumer;
}
}

View File

@ -1,81 +0,0 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarProducerSpace extends PulsarSpace{
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
public PulsarProducerSpace(String name, PulsarNBClientConf pulsarClientConf) {
super(name, pulsarClientConf);
}
// Producer name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerName(String cycleProducerName) {
if ( !StringUtils.isBlank(cycleProducerName) ) {
return cycleProducerName;
}
String globalProducerName = pulsarNBClientConf.getProducerName();
if ( !StringUtils.isBlank(globalProducerName) ) {
return globalProducerName;
}
// Default Producer name when it is not set at either cycle or global level
return "default-prod";
}
// Topic name IS mandatory
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveTopicName(String cycleTopicName) {
if ( !StringUtils.isBlank(cycleTopicName) ) {
return cycleTopicName;
}
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
if ( !StringUtils.isBlank(globalTopicName) ) {
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
}
return globalTopicName;
}
public Producer<?> getProducer(String cycleProducerName, String cycleTopicName) {
String producerName = getEffectiveProducerName(cycleProducerName);
String topicName = getEffectiveTopicName(cycleTopicName);
String encodedStr = PulsarActivityUtil.encode(cycleProducerName, cycleTopicName);
Producer<?> producer = producers.get(encodedStr);
if (producer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
try {
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
}
catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a Pulsar producer!");
}
producers.put(encodedStr, producer);
}
return producer;
}
}

View File

@ -1,110 +0,0 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarReaderSpace extends PulsarSpace {
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) {
super(name, pulsarClientConf);
}
private String getEffectiveReaderTopicName(String cycleReaderTopicName) {
if ( !StringUtils.isBlank(cycleReaderTopicName) ) {
return cycleReaderTopicName;
}
String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName();
if ( !StringUtils.isBlank(globalReaderTopicName) ) {
return globalReaderTopicName;
}
return "";
}
private String getEffectiveReaderName(String cycleReaderName) {
if ( !StringUtils.isBlank(cycleReaderName) ) {
return cycleReaderName;
}
String globalReaderName = pulsarNBClientConf.getConsumerName();
if ( !StringUtils.isBlank(globalReaderName) ) {
return globalReaderName;
}
return "default-read";
}
private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) {
if ( !StringUtils.isBlank(cycleStartMsgPosStr) ) {
return cycleStartMsgPosStr;
}
String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr();
if ( !StringUtils.isBlank(globalStartMsgPosStr) ) {
return globalStartMsgPosStr;
}
return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
}
public Reader<?> getReader(String cycleTopicName,
String cycleReaderName,
String cycleStartMsgPos) {
String topicName = getEffectiveReaderTopicName(cycleTopicName);
String readerName = getEffectiveReaderName(cycleReaderName);
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
if ( StringUtils.isBlank(topicName) ) {
throw new RuntimeException("Must specify a \"topicName\" for a reader!");
}
String encodedStr = PulsarActivityUtil.encode(cycleTopicName, cycleReaderName, cycleStartMsgPos);
Reader<?> reader = readers.get(encodedStr);
if (reader == null) {
PulsarClient pulsarClient = getPulsarClient();
Map<String, Object> readerConf = pulsarNBClientConf.getReaderConfMap();
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName);
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName);
// "reader.startMessagePos" is NOT a standard Pulsar reader conf
readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
try {
ReaderBuilder<?> readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf);
MessageId startMsgId = MessageId.latest;
if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
startMsgId = MessageId.earliest;
}
//TODO: custom start message position is NOT supported yet
//else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) {
// startMsgId = MessageId.latest;
//}
if (startMsgId != null) {
readerBuilder = readerBuilder.startMessageId(startMsgId);
}
reader = readerBuilder.create();
}
catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar reader!");
}
readers.put(encodedStr, reader);
}
return reader;
}
}

View File

@ -2,12 +2,20 @@ package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
/**
* An instance of a pulsar client, along with all the cached objects which are normally
@ -17,17 +25,22 @@ import java.util.concurrent.ConcurrentHashMap;
public class PulsarSpace {
private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
// TODO: add support for other client types: consumer, reader, websocket-producer, managed-ledger, etc.
protected final String name;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
protected final String spaceName;
protected final PulsarNBClientConf pulsarNBClientConf;
protected final String pulsarSvcUrl;
protected PulsarClient pulsarClient = null;
protected Schema<?> pulsarSchema = null;
public PulsarSpace( String name, PulsarNBClientConf pulsarClientConf ) {
this.name = name;
public PulsarSpace( String name, PulsarNBClientConf pulsarClientConf, String pulsarSvcUrl ) {
this.spaceName = name;
this.pulsarNBClientConf = pulsarClientConf;
this.pulsarSvcUrl = pulsarSvcUrl;
createPulsarClientFromConf();
createPulsarSchemaFromConf();
@ -36,14 +49,15 @@ public class PulsarSpace {
protected void createPulsarClientFromConf() {
ClientBuilder clientBuilder = PulsarClient.builder();
String dftSvcUrl = "pulsar://localhost:6650";
if ( !pulsarNBClientConf.hasClientConfKey(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString()) ) {
pulsarNBClientConf.setClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString(), dftSvcUrl);
}
try {
Map<String, Object> clientConf = pulsarNBClientConf.getClientConfMap();
pulsarClient = clientBuilder.loadConf(clientConf).build();
// Override "client.serviceUrl" setting in config.properties
clientConf.remove("serviceUrl", pulsarSvcUrl);
pulsarClient = clientBuilder
.loadConf(clientConf)
.serviceUrl(pulsarSvcUrl)
.build();
}
catch (PulsarClientException pce) {
logger.error("Fail to create PulsarClient from global configuration!");
@ -72,4 +86,387 @@ public class PulsarSpace {
return pulsarNBClientConf;
}
public Schema<?> getPulsarSchema() { return pulsarSchema; }
//////////////////////////////////////
// Producer Processing --> start
//////////////////////////////////////
// Topic name IS mandatory
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerTopicName(String cycleTopicName) {
if ( !StringUtils.isBlank(cycleTopicName) ) {
return cycleTopicName;
}
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
if ( !StringUtils.isBlank(globalTopicName) ) {
return globalTopicName;
}
throw new RuntimeException(" topic name must be set at either global level or cycle level!");
}
// Producer name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerName(String cycleProducerName) {
if ( !StringUtils.isBlank(cycleProducerName) ) {
return cycleProducerName;
}
String globalProducerName = pulsarNBClientConf.getProducerName();
if ( !StringUtils.isBlank(globalProducerName) ) {
return globalProducerName;
}
return "";
}
public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) {
String topicName = getEffectiveProducerTopicName(cycleTopicName);
String producerName = getEffectiveProducerName(cycleProducerName);
if ( StringUtils.isBlank(topicName) ) {
throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level");
}
String encodedStr = PulsarActivityUtil.encode(producerName, topicName);
Producer<?> producer = producers.get(encodedStr);
if (producer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
if ( !StringUtils.isBlank(producerName) ) {
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
}
try {
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
}
catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a Pulsar producer!");
}
producers.put(encodedStr, producer);
}
return producer;
}
//////////////////////////////////////
// Producer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Consumer Processing --> start
//////////////////////////////////////
private String getEffectiveTopicNamesStr(String cycleTopicNames) {
if ( !StringUtils.isBlank(cycleTopicNames) ) {
return cycleTopicNames;
}
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
if ( !StringUtils.isBlank(globalTopicNames) ) {
return globalTopicNames;
}
return "";
}
private List<String> getEffectiveTopicNames(String cycleTopicNames) {
String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if ( !StringUtils.isBlank(name) )
effectiveTopicNameList.add(name.trim());
}
return effectiveTopicNameList;
}
private String getEffectiveTopicPatternStr(String cycleTopicsPattern) {
if ( !StringUtils.isBlank(cycleTopicsPattern) ) {
return cycleTopicsPattern;
}
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
if ( !StringUtils.isBlank(globalTopicsPattern) ) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) {
String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern;
try {
if ( !StringUtils.isBlank(effectiveTopicsPatternStr) )
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
else
topicsPattern = null;
}
catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
}
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
if ( !StringUtils.isBlank(cycleSubscriptionName) ) {
return cycleSubscriptionName;
}
String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName();
if ( !StringUtils.isBlank(globalSubscriptionName) ) {
return globalSubscriptionName;
}
throw new RuntimeException("Consumer::Subscription name must be set at either global level or cycle level!");
}
private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) {
if ( !StringUtils.isBlank(cycleSubscriptionType) ) {
return cycleSubscriptionType;
}
String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType();
if ( !StringUtils.isBlank(globalSubscriptionType) ) {
return globalSubscriptionType;
}
return "";
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType);
SubscriptionType subscriptionType = SubscriptionType.Exclusive;
if ( !StringUtils.isBlank(effectiveSubscriptionStr) ) {
if ( !PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionStr) ) {
throw new RuntimeException("Consumer::Invalid subscription type: " + effectiveSubscriptionStr);
}
else {
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
}
}
return subscriptionType;
}
private String getEffectiveConsumerName(String cycleConsumerName) {
if ( !StringUtils.isBlank(cycleConsumerName) ) {
return cycleConsumerName;
}
String globalConsumerName = pulsarNBClientConf.getConsumerName();
if ( !StringUtils.isBlank(globalConsumerName) ) {
return globalConsumerName;
}
return "";
}
public Consumer<?> getConsumer(String cycleTopicUri,
String cycleTopicNames,
String cycleTopicsPattern,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName) {
List<String> topicNames = getEffectiveTopicNames(cycleTopicNames);
String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if ( StringUtils.isBlank(cycleTopicUri) && topicNames.isEmpty() && (topicsPattern == null) ) {
throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
}
String encodedStr;
// precedence sequence:
// topic_names (consumer statement param) >
// topics_pattern (consumer statement param) >
// topic_uri (document level param)
if ( !topicNames.isEmpty() ) {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
StringUtils.join(topicNames, "|") );
}
else if ( topicsPattern != null ){
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
topicsPatternStr );
}
else {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
cycleTopicUri );
}
Consumer<?> consumer = consumers.get(encodedStr);
if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> consumerConf = pulsarNBClientConf.getConsumerConfMap();
// Explicit topic names will take precedence over topics pattern
if ( !topicNames.isEmpty() ) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames);
}
else if ( topicsPattern != null) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.put(
PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
getEffectiveTopicPattern(cycleTopicsPattern));
} else {
topicNames.add(cycleTopicUri);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames);
}
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType);
if ( !StringUtils.isBlank(consumerName) ) {
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName);
}
try {
consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe();
}
catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar consumer!");
}
consumers.put(encodedStr, consumer);
}
return consumer;
}
//////////////////////////////////////
// Consumer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Reader Processing --> Start
//////////////////////////////////////
private String getEffectiveReaderTopicName(String cycleReaderTopicName) {
if ( !StringUtils.isBlank(cycleReaderTopicName) ) {
return cycleReaderTopicName;
}
String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName();
if ( !StringUtils.isBlank(globalReaderTopicName) ) {
return globalReaderTopicName;
}
throw new RuntimeException("Reader topic name must be set at either global level or cycle level!");
}
private String getEffectiveReaderName(String cycleReaderName) {
if ( !StringUtils.isBlank(cycleReaderName) ) {
return cycleReaderName;
}
String globalReaderName = pulsarNBClientConf.getConsumerName();
if ( !StringUtils.isBlank(globalReaderName) ) {
return globalReaderName;
}
return "";
}
private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) {
if ( !StringUtils.isBlank(cycleStartMsgPosStr) ) {
return cycleStartMsgPosStr;
}
String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr();
if ( !StringUtils.isBlank(globalStartMsgPosStr) ) {
return globalStartMsgPosStr;
}
return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
}
public Reader<?> getReader(String cycleTopicName,
String cycleReaderName,
String cycleStartMsgPos) {
String topicName = getEffectiveReaderTopicName(cycleTopicName);
if ( StringUtils.isBlank(topicName) ) {
throw new RuntimeException("Reader:: must specify a topic name either at the global level or the cycle level");
}
String readerName = getEffectiveReaderName(cycleReaderName);
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
if ( !PulsarActivityUtil.isValideReaderStartPosition(startMsgPosStr) ) {
throw new RuntimeException("Reader:: Invalid value for Reader start message position!");
}
String encodedStr = PulsarActivityUtil.encode(topicName, readerName, startMsgPosStr);
Reader<?> reader = readers.get(encodedStr);
if (reader == null) {
PulsarClient pulsarClient = getPulsarClient();
Map<String, Object> readerConf = pulsarNBClientConf.getReaderConfMap();
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName);
if ( !StringUtils.isBlank(readerName) ) {
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName);
}
// "reader.startMessagePos" is NOT a standard Pulsar reader conf
readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
try {
ReaderBuilder<?> readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf);
MessageId startMsgId = MessageId.latest;
if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
startMsgId = MessageId.earliest;
}
//TODO: custom start message position is NOT supported yet
//else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) {
// startMsgId = MessageId.latest;
//}
if (startMsgId != null) {
readerBuilder = readerBuilder.startMessageId(startMsgId);
}
reader = readerBuilder.create();
}
catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar reader!");
}
readers.put(encodedStr, reader);
}
return reader;
}
//////////////////////////////////////
// Reader Processing <-- end
//////////////////////////////////////
}

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.ConcurrentHashMap;
@ -23,22 +24,8 @@ public class PulsarSpaceCache {
}
public PulsarSpace getPulsarSpace(String name) {
String pulsarClientType = activity.getPulsarConf().getPulsarClientType();
if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString())) {
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarProducerSpace(spaceName, activity.getPulsarConf()));
}
else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString())) {
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarConsumerSpace(spaceName, activity.getPulsarConf()));
}
else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString())) {
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarReaderSpace(spaceName, activity.getPulsarConf()));
}
// TODO: add support for websocket-producer and managed-ledger
else {
throw new RuntimeException("Unsupported Pulsar client type: " + pulsarClientType);
}
return clientScopes.computeIfAbsent(name, spaceName ->
new PulsarSpace(spaceName, activity.getPulsarConf(), activity.getPulsarServiceUrl()));
}
public PulsarActivity getActivity() {

View File

@ -0,0 +1,17 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
public class PulsarBatchProducerEndMapper extends PulsarOpMapper{
public PulsarBatchProducerEndMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace) {
super(cmdTpl, clientSpace);
}
@Override
public PulsarOp apply(long value) {
return new PulsarBatchProducerEndOp();
}
}

View File

@ -0,0 +1,37 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerEndOp implements PulsarOp {
@Override
public void run() {
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();
Producer<?> producer = PulsarBatchProducerStartOp.threadLocalProducer.get();
if ( (container != null) && (!container.isEmpty()) ) {
try {
// producer.flushAsync().get();
FutureUtil.waitForAll(container).get();
}
catch(Exception e) {
throw new RuntimeException("Batch Producer:: failed to send (some of) the batched messages!");
}
container.clear();
PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.set(null);
} else {
throw new BasicError("You tried to end an empty batch message container. This means you" +
" did initiate the batch container properly, or there is an error in your" +
" pulsar op sequencing and ratios.");
}
}
}

View File

@ -0,0 +1,33 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.function.LongFunction;
public class PulsarBatchProducerMapper extends PulsarOpMapper{
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
public PulsarBatchProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc) {
super(cmdTpl, clientSpace);
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
}
@Override
public PulsarOp apply(long value) {
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
return new PulsarBatchProducerOp(
clientSpace.getPulsarSchema(),
msgKey,
msgPayload
);
}
}

View File

@ -0,0 +1,62 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerOp implements PulsarOp {
private final Schema<?> pulsarSchema;
private final String msgKey;
private final String msgPayload;
public PulsarBatchProducerOp(Schema<?> schema,
String key,
String payload) {
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
}
@Override
public void run() {
if ( (msgPayload == null) || msgPayload.isEmpty() ) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();
Producer<?> producer = PulsarBatchProducerStartOp.threadLocalProducer.get();
assert (producer != null) && (container != null);
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
if ( (msgKey != null) && (!msgKey.isEmpty()) ) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema,
pulsarSchema.getSchemaInfo().getSchemaDefinition(),
msgPayload
);
typedMessageBuilder = typedMessageBuilder.value(payload);
}
else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
}
container.add(typedMessageBuilder.sendAsync());
}
}

View File

@ -0,0 +1,26 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.function.LongFunction;
public class PulsarBatchProducerStartMapper extends PulsarOpMapper{
private final LongFunction<Producer<?>> batchProducerFunc;
public PulsarBatchProducerStartMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Producer<?>> batchProducerFunc) {
super(cmdTpl, clientSpace);
this.batchProducerFunc = batchProducerFunc;
}
@Override
public PulsarOp apply(long value) {
Producer<?> batchProducer = batchProducerFunc.apply(value);
return new PulsarBatchProducerStartOp(batchProducer);
}
}

View File

@ -0,0 +1,33 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.commons.compress.utils.Lists;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerStartOp implements PulsarOp {
// TODO: ensure sane container lifecycle management
public final static ThreadLocal<List<CompletableFuture<MessageId>>> threadLocalBatchMsgContainer = new ThreadLocal<>();
public final static ThreadLocal<Producer<?>> threadLocalProducer = new ThreadLocal<>();
public PulsarBatchProducerStartOp(Producer<?> batchProducer) {
threadLocalProducer.set(batchProducer);
}
@Override
public void run() {
List<CompletableFuture<MessageId>> container = threadLocalBatchMsgContainer.get();
if ( container == null ) {
container = Lists.newArrayList();
threadLocalBatchMsgContainer.set(container);
} else {
throw new BasicError("You tried to create a batch message container where one was already" +
" defined. This means you did not flush and unset the last container, or there is an error in your" +
" pulsar op sequencing and ratios.");
}
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
@ -16,22 +17,28 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarConsumerMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final Schema<?> pulsarSchema;
public class PulsarConsumerMapper extends PulsarOpMapper {
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<Boolean> asyncApiFunc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
Schema<?> pulsarSchema,
LongFunction<Consumer<?>> consumerFunc) {
this.cmdTpl = cmdTpl;
this.pulsarSchema = pulsarSchema;
PulsarSpace clientSpace,
LongFunction<Consumer<?>> consumerFunc,
LongFunction<Boolean> asyncApiFunc) {
super(cmdTpl, clientSpace);
this.consumerFunc = consumerFunc;
this.asyncApiFunc = asyncApiFunc;
}
@Override
public PulsarOp apply(long value) {
Consumer<?> consumer = consumerFunc.apply(value);
return new PulsarConsumerOp(consumer, pulsarSchema);
boolean asyncApi = asyncApiFunc.apply(value);
return new PulsarConsumerOp(
consumer,
clientSpace.getPulsarSchema(),
asyncApi
);
}
}

View File

@ -3,23 +3,20 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
public class PulsarConsumerOp implements PulsarOp {
private final Consumer<?> consumer;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema) {
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp) {
this.consumer = consumer;
this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp;
}
@Override
public void run() {
public void syncConsume() {
try {
Message<?> message = consumer.receive();
@ -30,15 +27,25 @@ public class PulsarConsumerOp implements PulsarOp {
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString());
}
else {
} else {
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData()));
}
consumer.acknowledge(message.getMessageId());
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
public void asyncConsume() {
//TODO: add support for async consume
}
@Override
public void run() {
if (!asyncPulsarOp)
syncConsume();
else
asyncConsume();
}
}

View File

@ -0,0 +1,19 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import java.util.function.LongFunction;
public abstract class PulsarOpMapper implements LongFunction<PulsarOp> {
protected final CommandTemplate cmdTpl;
protected final PulsarSpace clientSpace;
public PulsarOpMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace) {
this.cmdTpl = cmdTpl;
this.clientSpace = clientSpace;
}
}

View File

@ -17,21 +17,21 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarProducerMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final Schema<?> pulsarSchema;
public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<Boolean> asyncApiFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
public PulsarProducerMapper(CommandTemplate cmdTpl,
Schema<?> pulsarSchema,
PulsarSpace clientSpace,
LongFunction<Producer<?>> producerFunc,
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc) {
this.cmdTpl = cmdTpl;
this.pulsarSchema = pulsarSchema;
super(cmdTpl, clientSpace);
this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
}
@ -39,9 +39,15 @@ public class PulsarProducerMapper implements LongFunction<PulsarOp> {
@Override
public PulsarOp apply(long value) {
Producer<?> producer = producerFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
return new PulsarProducerOp(producer, pulsarSchema, msgKey, msgPayload);
return new PulsarProducerOp(
producer,
clientSpace.getPulsarSchema(),
asyncApi,
msgKey,
msgPayload);
}
}

View File

@ -1,58 +1,90 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarAction;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
public class PulsarProducerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarProducerOp.class);
private final Producer<?> producer;
private final Schema<?> pulsarSchema;
private final String msgKey;
private final String msgPayload;
private final boolean asyncPulsarOp;
public PulsarProducerOp(Producer<?> producer, Schema<?> schema, String key, String payload) {
public PulsarProducerOp(Producer<?> producer,
Schema<?> schema,
boolean asyncPulsarOp,
String key,
String payload) {
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp;
}
@Override
public void run() {
try {
if ( (msgPayload == null) || msgPayload.isEmpty() ) {
throw new RuntimeException("Message payload (\"msg-value\" can't be empty!");
if ( (msgPayload == null) || msgPayload.isEmpty() ) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
if ( (msgKey != null) && (!msgKey.isEmpty()) ) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema,
pulsarSchema.getSchemaInfo().getSchemaDefinition(),
msgPayload
);
typedMessageBuilder = typedMessageBuilder.value(payload);
}
else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
}
//TODO: add error handling with failed message production
if (!asyncPulsarOp) {
try {
logger.trace("sending message");
typedMessageBuilder.send();
} catch (PulsarClientException pce) {
logger.trace("failed sending message");
throw new RuntimeException(pce);
}
}
else {
try {
CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
future.get();
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
if ( (msgKey != null) && (!msgKey.isEmpty()) ) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
/*.thenRun(() -> {
// System.out.println("Producing message succeeded: key - " + msgKey + "; payload - " + msgPayload);
}).exceptionally(ex -> {
System.out.println("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
return ex;
})*/
;
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, msgPayload);
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema, avroGenericRecord);
typedMessageBuilder = typedMessageBuilder.value(payload);
catch (Exception e) {
throw new RuntimeException(e);
}
else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8));
}
typedMessageBuilder.send();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,27 +1,35 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import java.util.function.LongFunction;
public class PulsarReaderMapper implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final Schema<?> pulsarSchema;
public class PulsarReaderMapper extends PulsarOpMapper {
private final LongFunction<Reader<?>> readerFunc;
private final LongFunction<Boolean> asyncApiFunc;
public PulsarReaderMapper(CommandTemplate cmdTpl,
Schema<?> pulsarSchema,
LongFunction<Reader<?>> readerFunc) {
this.cmdTpl = cmdTpl;
this.pulsarSchema = pulsarSchema;
PulsarSpace clientSpace,
LongFunction<Reader<?>> readerFunc,
LongFunction<Boolean> asyncApiFunc) {
super(cmdTpl, clientSpace);
this.readerFunc = readerFunc;
this.asyncApiFunc = asyncApiFunc;
}
@Override
public PulsarOp apply(long value) {
Reader<?> reader = readerFunc.apply(value);
return new PulsarReaderOp(reader, pulsarSchema);
boolean asyncApi = asyncApiFunc.apply(value);
return new PulsarReaderOp(
reader,
clientSpace.getPulsarSchema(),
asyncApi
);
}
}

View File

@ -11,14 +11,15 @@ import org.apache.pulsar.common.schema.SchemaType;
public class PulsarReaderOp implements PulsarOp {
private final Reader<?> reader;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;
public PulsarReaderOp(Reader<?> reader, Schema<?> schema) {
public PulsarReaderOp(Reader<?> reader, Schema<?> schema, boolean asyncPulsarOp) {
this.reader = reader;
this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp;
}
@Override
public void run() {
public void syncRead() {
try {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
@ -42,4 +43,16 @@ public class PulsarReaderOp implements PulsarOp {
throw new RuntimeException(e);
}
}
public void asyncRead() {
//TODO: add support for async read
}
@Override
public void run() {
if (!asyncPulsarOp)
syncRead();
else
asyncRead();
}
}

View File

@ -5,6 +5,7 @@ import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.scoping.ScopedSupplier;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;
@ -19,7 +20,6 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
private final CommandTemplate cmdTpl;
private final PulsarSpace clientSpace;
private final LongFunction<PulsarOp> opFunc;
private final Schema<?> pulsarSchema;
// TODO: Add docs for the command template with respect to the OpTemplate
@ -44,59 +44,36 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
this.clientSpace = pcache.getPulsarSpace("default");
}
this.pulsarSchema = clientSpace.getPulsarSchema();
this.opFunc = resolve();
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
Supplier<LongFunction<PulsarOp>> opSupplier = scope.supplier(this::resolve);
}
private LongFunction<PulsarOp> resolve() {
String clientType = clientSpace.getPulsarClientConf().getPulsarClientType();
// TODO: Complete implementation for reader, websocket-producer and managed-ledger
if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) {
assert clientSpace instanceof PulsarProducerSpace;
return resolveProducer((PulsarProducerSpace) clientSpace);
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString()) ) {
assert clientSpace instanceof PulsarConsumerSpace;
return resolveConsumer((PulsarConsumerSpace)clientSpace);
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString()) ) {
assert clientSpace instanceof PulsarReaderSpace;
return resolveReader((PulsarReaderSpace)clientSpace); /*
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.WSOKT_PRODUCER.toString()) ) {
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.MANAGED_LEDGER.toString()) ) {
*/
} else {
throw new RuntimeException("Unsupported Pulsar client: " + clientType);
}
@Override
public PulsarOp apply(long value) {
return opFunc.apply(value);
}
private LongFunction<PulsarOp> resolveProducer(
PulsarProducerSpace clientSpace
) {
private boolean IsBoolean (String str) {
return StringUtils.equalsAnyIgnoreCase(str, "yes", "true");
}
private LongFunction<PulsarOp> resolve() {
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
LongFunction<String> cycle_producer_name_func;
if (cmdTpl.isStatic("producer-name")) {
cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer-name");
} else if (cmdTpl.isDynamic("producer-name")) {
cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer-name", l);
} else {
cycle_producer_name_func = (l) -> null;
}
LongFunction<String> topic_uri_func;
// Global parameter: topic_uri
LongFunction<String> topicUriFunc;
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
topic_uri_func = (l) -> cmdTpl.getStatic("topic_uri");
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l);
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
}
else if (cmdTpl.containsKey("topic")) {
@ -109,9 +86,9 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topic_uri_func = (l) -> composited;
topicUriFunc = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topic_uri_func = (l) ->
topicUriFunc = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
@ -119,40 +96,102 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
}
}
else {
topic_uri_func = (l) -> null;
topicUriFunc = (l) -> null;
}
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc;
if ( cmdTpl.containsKey("async_api") ) {
if ( cmdTpl.isStatic("async_api") )
asyncApiFunc = (l) -> IsBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
}
else {
asyncApiFunc = (l) -> false;
}
if ( !cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype") ) {
throw new RuntimeException("Statement parameter \"optype\" must have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
// TODO: Complete implementation for websocket-producer and managed-ledger
if /*( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_TENANT.label) ) {
return resolveCreateTenant(clientSpace);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_NAMESPACE.label) ) {
return resolveCreateNameSpace(clientSpace);
} else if*/ ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label) ) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label) ) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label) ) {
return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label) ) {
return resolveMsgBatchSendStart(clientSpace, topicUriFunc);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label) ) {
return resolveMsgBatchSend(clientSpace);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label) ) {
return resolveMsgBatchSendEnd(clientSpace);
} else {
throw new RuntimeException("Unsupported Pulsar operation type" );
}
}
private LongFunction<PulsarOp> resolveMsgSend(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func
) {
LongFunction<String> cycle_producer_name_func;
if (cmdTpl.isStatic("producer_name")) {
cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer_name");
} else if (cmdTpl.isDynamic("producer_name")) {
cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer_name", l);
} else {
cycle_producer_name_func = (l) -> null;
}
LongFunction<Producer<?>> producerFunc =
(l) -> clientSpace.getProducer(cycle_producer_name_func.apply(l), topic_uri_func.apply(l));
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l));
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg-key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg-key");
} else if (cmdTpl.isDynamic("msg-key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg-key", l);
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
} else if (cmdTpl.isDynamic("msg_key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l);
} else {
keyFunc = (l) -> null;
}
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg-value")) {
if (cmdTpl.isStatic("msg-value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg-value");
} else if (cmdTpl.isDynamic("msg-value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg-value", l);
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg_value");
} else if (cmdTpl.isDynamic("msg_value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l);
} else {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("\"msg-value\" field must be specified!");
throw new RuntimeException("Producer:: \"msg_value\" field must be specified!");
}
return new PulsarProducerMapper(cmdTpl, pulsarSchema, producerFunc, keyFunc, valueFunc);
return new PulsarProducerMapper(
cmdTpl,
clientSpace,
producerFunc,
async_api_func,
keyFunc,
valueFunc);
}
private LongFunction<PulsarOp> resolveConsumer(
PulsarConsumerSpace clientSpace
private LongFunction<PulsarOp> resolveMsgConsume(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func
) {
// Topic list (multi-topic)
LongFunction<String> topic_names_func;
if (cmdTpl.isStatic("topic-names")) {
topic_names_func = (l) -> cmdTpl.getStatic("topic-names");
@ -162,6 +201,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
topic_names_func = (l) -> null;
}
// Topic pattern (multi-topic)
LongFunction<String> topics_pattern_func;
if (cmdTpl.isStatic("topics-pattern")) {
topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern");
@ -200,6 +240,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer(
topic_uri_func.apply(l),
topic_names_func.apply(l),
topics_pattern_func.apply(l),
subscription_name_func.apply(l),
@ -207,21 +248,14 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
consumer_name_func.apply(l)
);
return new PulsarConsumerMapper(cmdTpl, pulsarSchema, consumerFunc);
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func);
}
private LongFunction<PulsarOp> resolveReader(
PulsarReaderSpace clientSpace
private LongFunction<PulsarOp> resolveMsgRead(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func
) {
LongFunction<String> topic_name_func;
if (cmdTpl.isStatic("topic-name")) {
topic_name_func = (l) -> cmdTpl.getStatic("topic-name");
} else if (cmdTpl.isDynamic("topic-name")) {
topic_name_func = (l) -> cmdTpl.getDynamic("topic-name", l);
} else {
topic_name_func = (l) -> null;
}
LongFunction<String> reader_name_func;
if (cmdTpl.isStatic("reader-name")) {
reader_name_func = (l) -> cmdTpl.getStatic("reader-name");
@ -242,16 +276,64 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
LongFunction<Reader<?>> readerFunc = (l) ->
clientSpace.getReader(
topic_name_func.apply(l),
topic_uri_func.apply(l),
reader_name_func.apply(l),
start_msg_pos_str_func.apply(l)
);
return new PulsarReaderMapper(cmdTpl, pulsarSchema, readerFunc);
return new PulsarReaderMapper(cmdTpl, clientSpace, readerFunc, async_api_func);
}
@Override
public PulsarOp apply(long value) {
return opFunc.apply(value);
private LongFunction<PulsarOp> resolveMsgBatchSendStart(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func
) {
LongFunction<String> cycle_batch_producer_name_func;
if (cmdTpl.isStatic("batch_producer_name")) {
cycle_batch_producer_name_func = (l) -> cmdTpl.getStatic("batch_producer_name");
} else if (cmdTpl.isDynamic("batch_producer_name")) {
cycle_batch_producer_name_func = (l) -> cmdTpl.getDynamic("batch_producer_name", l);
} else {
cycle_batch_producer_name_func = (l) -> null;
}
LongFunction<Producer<?>> batchProducerFunc =
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l));
return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, batchProducerFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSend(PulsarSpace clientSpace) {
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
} else if (cmdTpl.isDynamic("msg_key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l);
} else {
keyFunc = (l) -> null;
}
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg_value");
} else if (cmdTpl.isDynamic("msg_value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l);
} else {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("Batch Producer:: \"msg_value\" field must be specified!");
}
return new PulsarBatchProducerMapper(
cmdTpl,
clientSpace,
keyFunc,
valueFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSendEnd(PulsarSpace clientSpace) {
return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace);
}
}

View File

@ -80,6 +80,10 @@ public class AvroUtil {
return recordBuilder.build();
}
public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDef, String jsonData) {
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
}
public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDef, String jsonData) {
GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDef);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData);

View File

@ -23,21 +23,25 @@ public class PulsarActivityUtil {
private final static Logger logger = LogManager.getLogger(PulsarActivityUtil.class);
// Supported message operation types
public enum CLIENT_TYPES {
PRODUCER("producer"),
CONSUMER("consumer"),
READER("reader"),
WSOKT_PRODUCER("websocket-producer"),
MANAGED_LEDGER("managed-ledger")
// TODO: websocket-producer and managed-ledger
public enum OP_TYPES {
CREATE_TENANT("create-tenant"),
CREATE_NAMESPACE("create-namespace"),
BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"),
MSG_SEND("msg-send"),
MSG_CONSUME("msg-consume"),
MSG_READ("msg-read")
;
public final String label;
CLIENT_TYPES(String label) {
OP_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidClientType(String type) {
return Arrays.stream(CLIENT_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
return Arrays.stream(OP_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
}
@ -157,6 +161,19 @@ public class PulsarActivityUtil {
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
public enum SUBSCRIPTION_TYPE {
exclusive("exclusive"),
failover("failover"),
shared("shared"),
key_shared("key_shared");
public final String label;
SUBSCRIPTION_TYPE(String label) { this.label = label; }
}
public static boolean isValidSubscriptionType(String item) {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Standard reader configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#reader
@ -202,6 +219,9 @@ public class PulsarActivityUtil {
public final String label;
READER_MSG_POSITION_TYPE(String label) { this.label = label; }
}
public static boolean isValideReaderStartPosition(String item) {
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Valid websocket-producer configuration (activity-level settings)

View File

@ -7,7 +7,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -23,13 +23,11 @@ public class PulsarNBClientConf {
private String canonicalFilePath = "";
public static final String DRIVER_CONF_PREFIX = "driver";
public static final String SCHEMA_CONF_PREFIX = "schema";
public static final String CLIENT_CONF_PREFIX = "client";
public static final String PRODUCER_CONF_PREFIX = "producer";
public static final String CONSUMER_CONF_PREFIX = "consumer";
public static final String READER_CONF_PREFIX = "reader";
private HashMap<String, Object> driverConfMap = new HashMap<>();
private HashMap<String, Object> schemaConfMap = new HashMap<>();
private HashMap<String, Object> clientConfMap = new HashMap<>();
private HashMap<String, Object> producerConfMap = new HashMap<>();
@ -53,14 +51,6 @@ public class PulsarNBClientConf {
Configuration config = builder.getConfiguration();
// Get driver specific configuration settings
for (Iterator<String> it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if ( !StringUtils.isBlank(confVal) )
driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
// Get schema specific configuration settings
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
@ -112,41 +102,6 @@ public class PulsarNBClientConf {
}
//////////////////
// Get NB Driver related config
public Map<String, Object> getDriverConfMap() {
return this.driverConfMap;
}
public boolean hasDriverConfKey(String key) {
if (key.contains(DRIVER_CONF_PREFIX))
return driverConfMap.containsKey(key.substring(DRIVER_CONF_PREFIX.length()+1));
else
return driverConfMap.containsKey(key);
}
public Object getDriverConfValue(String key) {
if (key.contains(DRIVER_CONF_PREFIX))
return driverConfMap.get(key.substring(DRIVER_CONF_PREFIX.length()+1));
else
return driverConfMap.get(key);
}
public void setDriverConfValue(String key, Object value) {
if (key.contains(DRIVER_CONF_PREFIX))
driverConfMap.put(key.substring(DRIVER_CONF_PREFIX.length()+1), value);
else
driverConfMap.put(key, value);
}
// other driver helper functions ...
public String getPulsarClientType() {
Object confValue = getDriverConfValue("driver.client-type");
// If not explicitly specifying Pulsar client type, "producer" is the default type
if (confValue == null)
return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString();
else
return confValue.toString();
}
//////////////////
// Get Schema related config
public Map<String, Object> getSchemaConfMap() {

View File

@ -1,11 +1,3 @@
### NB Pulsar driver related configuration - driver.xxx
driver.client-type = producer
driver.num-workers = 1
# TODO: functionalities to be completed
driver.sync-mode = sync
driver.msg-recv-ouput = console
### Schema related configurations - schema.xxx
# valid types:
# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
@ -17,21 +9,24 @@ driver.msg-recv-ouput = console
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
# 2) Avro for messages with schema
schema.type = avro
schema.definition = file://<path>/<to>/<avro-definition-file>
schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc
### Pulsar client related configurations - client.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
client.serviceUrl: pulsar://localhost:6650
client.connectionTimeoutMs = 5000
### Producer related configurations (global) - producer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
producer.producerName =
producer.topicName = persistent://public/default/mynbtest
producer.sendTimeoutMs =
### Consumer related configurations (global) - consumer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
consumer.topicNames =
@ -41,10 +36,12 @@ consumer.subscriptionType =
consumer.consumerName =
consumer.receiverQueueSize =
### Reader related configurations (global) - reader.xxx
# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
# - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file>
reader.topicName = persistent://public/default/nbpulsar
reader.receiverQueueSize =
reader.readerName =
#reader.startMessagePos = earliest
reader.startMessagePos = earliest

View File

@ -12,50 +12,82 @@ bindings:
reading_value: ToFloat(100);
topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L));
# global parameters that apply to all Pulsar client types:
params:
#topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar"
async_api: "false"
blocks:
# - create-tenant-namespace:
# tags:
# type: create-tenant-namespace
# statements:
# tenant: {tenant}
# namespace: {namespace}
- name: admin-block
tags:
phase: create-tenant-namespace
statements:
- name: s1
optype: create-tenant
tenant: "{tenant}"
- name: s2
optype: create-namespace
namespace: "{namespace}"
- name: batch-producer-block
tags:
phase: batch-producer
statements:
- name: s1
optype: batch-msg-send-start
# For batch producer, "producer_name" should be associated with batch start
# batch_producer_name: {batch_producer_name}
ratio: 1
- name: s2
optype: batch-msg-send
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
ratio: 5
- name: s3
optype: batch-msg-send-end
ratio: 1
- name: producer-block
tags:
op-type: producer
phase: producer
statements:
- producer-stuff:
#######
# NOTE: tenant and namespace must be static and pre-exist in Pulsar first
# topic_uri: "[persistent|non-persistent]://<tenant>/<namespace>/<topic>"
# topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar"
# producer-name:
msg-key: "{mykey}"
msg-value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
- name: s1
# producer_name: {producer_name}
optype: msg-send
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
- name: consumer-block
tags:
op-type: consumer
phase: consumer
statements:
- consumer-stuff:
topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest"
topics-pattern: "public/default/.*"
subscription-name:
subscription-type:
consumer-name:
- name: s1
optype: msg-consume
topic_names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest"
topics_pattern: "public/default/.*"
subscription_name:
subscription_type:
consumer_name:
- reader:
tags:
op-type: reader
phase: reader
statements:
- reader-stuff:
- name: s1
optype: msg-read
# - websocket-producer:
# tags:
@ -68,5 +100,3 @@ blocks:
# type: managed-ledger
# statement:
# - managed-ledger-stuff: