From adf1ea03e6b32da94694156fdc53430ae4e8a7c8 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Sat, 6 Mar 2021 10:38:08 -0600 Subject: [PATCH 1/4] Pulsar Batch API, Async API (partial), yaml file structure, --- .../driver/pulsar/PulsarActivity.java | 12 +- .../driver/pulsar/PulsarConsumerSpace.java | 196 -------- .../driver/pulsar/PulsarProducerSpace.java | 81 ---- .../driver/pulsar/PulsarReaderSpace.java | 110 ----- .../nosqlbench/driver/pulsar/PulsarSpace.java | 417 +++++++++++++++++- .../driver/pulsar/PulsarSpaceCache.java | 19 +- .../ops/PulsarBatchProducerEndMapper.java | 17 + .../pulsar/ops/PulsarBatchProducerEndOp.java | 37 ++ .../pulsar/ops/PulsarBatchProducerMapper.java | 33 ++ .../pulsar/ops/PulsarBatchProducerOp.java | 62 +++ .../ops/PulsarBatchProducerStartMapper.java | 26 ++ .../ops/PulsarBatchProducerStartOp.java | 33 ++ .../pulsar/ops/PulsarConsumerMapper.java | 23 +- .../driver/pulsar/ops/PulsarConsumerOp.java | 27 +- .../driver/pulsar/ops/PulsarOpMapper.java | 19 + .../pulsar/ops/PulsarProducerMapper.java | 20 +- .../driver/pulsar/ops/PulsarProducerOp.java | 84 ++-- .../driver/pulsar/ops/PulsarReaderMapper.java | 24 +- .../driver/pulsar/ops/PulsarReaderOp.java | 19 +- .../driver/pulsar/ops/ReadyPulsarOp.java | 224 +++++++--- .../driver/pulsar/util/AvroUtil.java | 4 + .../pulsar/util/PulsarActivityUtil.java | 36 +- .../pulsar/util/PulsarNBClientConf.java | 47 +- .../resources/activities/config.properties | 19 +- .../src/main/resources/activities/pulsar.yaml | 96 ++-- 25 files changed, 1037 insertions(+), 648 deletions(-) delete mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java delete mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java delete mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index b720a59af..dfaeffd5a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; @@ -10,6 +11,7 @@ import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.metrics.ActivityMetrics; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -24,14 +26,13 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer executeTimer; private PulsarSpaceCache pulsarCache; - private NBErrorHandler errorhandler; - private PulsarNBClientConf clientConf; + private String serviceUrl; + private NBErrorHandler errorhandler; private OpSequence> sequencer; - // private PulsarClient activityClient; - private Supplier clientSupplier; + // private Supplier clientSupplier; // private ThreadLocal> tlClientSupplier; public PulsarActivity(ActivityDef activityDef) { @@ -48,6 +49,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); clientConf = new PulsarNBClientConf(pulsarClntConfFile); + serviceUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); + pulsarCache = new PulsarSpaceCache(this); this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache)); @@ -72,6 +75,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public PulsarNBClientConf getPulsarConf() { return clientConf; } + public String getPulsarServiceUrl() { return serviceUrl; } public Timer getBindTimer() { return bindTimer; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java deleted file mode 100644 index 56d21d2b0..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java +++ /dev/null @@ -1,196 +0,0 @@ -package io.nosqlbench.driver.pulsar; - -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; - -public class PulsarConsumerSpace extends PulsarSpace { - - private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); - - public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) { super(name, pulsarClientConf); } - - private String getEffectiveTopicNamesStr(String cycleTopicNames) { - if ( !StringUtils.isBlank(cycleTopicNames) ) { - return cycleTopicNames; - } - - String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); - if ( !StringUtils.isBlank(globalTopicNames) ) { - return globalTopicNames; - } - - return ""; - } - private List getEffectiveTopicNames(String cycleTopicNames) { - String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); - - String[] names = effectiveTopicNamesStr.split("[;,]"); - ArrayList effectiveTopicNameList = new ArrayList<>(); - - for (String name : names) { - if ( !StringUtils.isBlank(name) ) - effectiveTopicNameList.add(name.trim()); - } - - - return effectiveTopicNameList; - } - - private String getEffectiveTopicPatternStr(String cycleTopicsPattern) { - if ( !StringUtils.isBlank(cycleTopicsPattern) ) { - return cycleTopicsPattern; - } - - String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); - if ( !StringUtils.isBlank(globalTopicsPattern) ) { - return globalTopicsPattern; - } - - return ""; - } - private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) { - String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); - Pattern topicsPattern; - try { - if ( !StringUtils.isBlank(effectiveTopicsPatternStr) ) - topicsPattern = Pattern.compile(effectiveTopicsPatternStr); - else - topicsPattern = null; - } - catch (PatternSyntaxException pse) { - topicsPattern = null; - } - return topicsPattern; - } - - private String getEffectiveSubscriptionName(String cycleSubscriptionName) { - if ( !StringUtils.isBlank(cycleSubscriptionName) ) { - return cycleSubscriptionName; - } - - String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName(); - if ( !StringUtils.isBlank(globalSubscriptionName) ) { - return globalSubscriptionName; - } - - return "default-subs"; - } - - private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) { - if ( !StringUtils.isBlank(cycleSubscriptionType) ) { - return cycleSubscriptionType; - } - - String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType(); - if ( !StringUtils.isBlank(globalSubscriptionType) ) { - return globalSubscriptionType; - } - - return ""; - } - private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) { - String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType); - SubscriptionType subscriptionType; - - try { - subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr); - } - catch (IllegalArgumentException iae) { - subscriptionType = SubscriptionType.Exclusive; - } - - return subscriptionType; - } - - private String getEffectiveConsumerName(String cycleConsumerName) { - if ( !StringUtils.isBlank(cycleConsumerName) ) { - return cycleConsumerName; - } - - String globalConsumerName = pulsarNBClientConf.getConsumerName(); - if ( !StringUtils.isBlank(globalConsumerName) ) { - return globalConsumerName; - } - - return "default-cons"; - } - - public Consumer getConsumer(String cycleTopicNames, - String cycleTopicsPattern, - String cycleSubscriptionName, - String cycleSubscriptionType, - String cycleConsumerName) { - - String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); - List topicNames = getEffectiveTopicNames(cycleTopicNames); - String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); - Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern); - String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); - SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); - String consumerName = getEffectiveConsumerName(cycleConsumerName); - - if ( topicNames.isEmpty() && (topicsPattern == null) ) { - throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!"); - } - - String encodedStr; - if ( !topicNames.isEmpty() ) { - encodedStr = PulsarActivityUtil.encode( - consumerName, - subscriptionName, - StringUtils.join(topicNames, "|") ); - } - else { - encodedStr = PulsarActivityUtil.encode( - consumerName, - subscriptionName, - topicsPatternStr ); - } - Consumer consumer = consumers.get(encodedStr); - - if (consumer == null) { - PulsarClient pulsarClient = getPulsarClient(); - - // Get other possible producer settings that are set at global level - Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); - - // Explicit topic names will take precedence over topics pattern - if ( !topicNames.isEmpty() ) { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.toString(), topicNames); - } - else { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); - consumerConf.put( - PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, - getEffectiveTopicPattern(cycleTopicsPattern)); - } - - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName); - - try { - consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe(); - } - catch (PulsarClientException ple) { - ple.printStackTrace(); - throw new RuntimeException("Unable to create a Pulsar consumer!"); - } - - consumers.put(encodedStr, consumer); - } - - return consumer; - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java deleted file mode 100644 index 325b9e73b..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java +++ /dev/null @@ -1,81 +0,0 @@ -package io.nosqlbench.driver.pulsar; - -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; -import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class PulsarProducerSpace extends PulsarSpace{ - - private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); - - public PulsarProducerSpace(String name, PulsarNBClientConf pulsarClientConf) { - super(name, pulsarClientConf); - } - - // Producer name is NOT mandatory - // - It can be set at either global level or cycle level - // - If set at both levels, cycle level setting takes precedence - private String getEffectiveProducerName(String cycleProducerName) { - if ( !StringUtils.isBlank(cycleProducerName) ) { - return cycleProducerName; - } - - String globalProducerName = pulsarNBClientConf.getProducerName(); - if ( !StringUtils.isBlank(globalProducerName) ) { - return globalProducerName; - } - - // Default Producer name when it is not set at either cycle or global level - return "default-prod"; - } - - // Topic name IS mandatory - // - It must be set at either global level or cycle level - // - If set at both levels, cycle level setting takes precedence - private String getEffectiveTopicName(String cycleTopicName) { - if ( !StringUtils.isBlank(cycleTopicName) ) { - return cycleTopicName; - } - - String globalTopicName = pulsarNBClientConf.getProducerTopicName(); - if ( !StringUtils.isBlank(globalTopicName) ) { - throw new RuntimeException("Topic name must be set at either global level or cycle level!"); - } - - return globalTopicName; - } - - public Producer getProducer(String cycleProducerName, String cycleTopicName) { - String producerName = getEffectiveProducerName(cycleProducerName); - String topicName = getEffectiveTopicName(cycleTopicName); - - String encodedStr = PulsarActivityUtil.encode(cycleProducerName, cycleTopicName); - Producer producer = producers.get(encodedStr); - - if (producer == null) { - PulsarClient pulsarClient = getPulsarClient(); - - // Get other possible producer settings that are set at global level - Map producerConf = pulsarNBClientConf.getProducerConfMap(); - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName); - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName); - - try { - producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create(); - } - catch (PulsarClientException ple) { - throw new RuntimeException("Unable to create a Pulsar producer!"); - } - - producers.put(encodedStr, producer); - } - - return producer; - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java deleted file mode 100644 index f8e8de38d..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java +++ /dev/null @@ -1,110 +0,0 @@ -package io.nosqlbench.driver.pulsar; - -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.*; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class PulsarReaderSpace extends PulsarSpace { - - private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); - - public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) { - super(name, pulsarClientConf); - } - - private String getEffectiveReaderTopicName(String cycleReaderTopicName) { - if ( !StringUtils.isBlank(cycleReaderTopicName) ) { - return cycleReaderTopicName; - } - - String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName(); - if ( !StringUtils.isBlank(globalReaderTopicName) ) { - return globalReaderTopicName; - } - - return ""; - } - - private String getEffectiveReaderName(String cycleReaderName) { - if ( !StringUtils.isBlank(cycleReaderName) ) { - return cycleReaderName; - } - - String globalReaderName = pulsarNBClientConf.getConsumerName(); - if ( !StringUtils.isBlank(globalReaderName) ) { - return globalReaderName; - } - - return "default-read"; - } - - private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) { - if ( !StringUtils.isBlank(cycleStartMsgPosStr) ) { - return cycleStartMsgPosStr; - } - - String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr(); - if ( !StringUtils.isBlank(globalStartMsgPosStr) ) { - return globalStartMsgPosStr; - } - - return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label; - } - - public Reader getReader(String cycleTopicName, - String cycleReaderName, - String cycleStartMsgPos) { - - String topicName = getEffectiveReaderTopicName(cycleTopicName); - String readerName = getEffectiveReaderName(cycleReaderName); - String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos); - - if ( StringUtils.isBlank(topicName) ) { - throw new RuntimeException("Must specify a \"topicName\" for a reader!"); - } - - String encodedStr = PulsarActivityUtil.encode(cycleTopicName, cycleReaderName, cycleStartMsgPos); - Reader reader = readers.get(encodedStr); - - if (reader == null) { - PulsarClient pulsarClient = getPulsarClient(); - - Map readerConf = pulsarNBClientConf.getReaderConfMap(); - readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName); - readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName); - // "reader.startMessagePos" is NOT a standard Pulsar reader conf - readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label); - - try { - ReaderBuilder readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf); - - MessageId startMsgId = MessageId.latest; - if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) { - startMsgId = MessageId.earliest; - } - //TODO: custom start message position is NOT supported yet - //else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) { - // startMsgId = MessageId.latest; - //} - - if (startMsgId != null) { - readerBuilder = readerBuilder.startMessageId(startMsgId); - } - - reader = readerBuilder.create(); - } - catch (PulsarClientException ple) { - ple.printStackTrace(); - throw new RuntimeException("Unable to create a Pulsar reader!"); - } - - readers.put(encodedStr, reader); - } - - return reader; - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index 56b50947b..84d0aee97 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -2,12 +2,20 @@ package io.nosqlbench.driver.pulsar; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.BatchMessageContainerBase; +import org.apache.pulsar.client.impl.DefaultBatcherBuilder; +import org.apache.pulsar.client.impl.ProducerImpl; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; /** * An instance of a pulsar client, along with all the cached objects which are normally @@ -17,17 +25,22 @@ import java.util.concurrent.ConcurrentHashMap; public class PulsarSpace { private final static Logger logger = LogManager.getLogger(PulsarSpace.class); - // TODO: add support for other client types: consumer, reader, websocket-producer, managed-ledger, etc. - protected final String name; + private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); + + protected final String spaceName; protected final PulsarNBClientConf pulsarNBClientConf; + protected final String pulsarSvcUrl; protected PulsarClient pulsarClient = null; protected Schema pulsarSchema = null; - public PulsarSpace( String name, PulsarNBClientConf pulsarClientConf ) { - this.name = name; + public PulsarSpace( String name, PulsarNBClientConf pulsarClientConf, String pulsarSvcUrl ) { + this.spaceName = name; this.pulsarNBClientConf = pulsarClientConf; + this.pulsarSvcUrl = pulsarSvcUrl; createPulsarClientFromConf(); createPulsarSchemaFromConf(); @@ -36,14 +49,15 @@ public class PulsarSpace { protected void createPulsarClientFromConf() { ClientBuilder clientBuilder = PulsarClient.builder(); - String dftSvcUrl = "pulsar://localhost:6650"; - if ( !pulsarNBClientConf.hasClientConfKey(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString()) ) { - pulsarNBClientConf.setClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString(), dftSvcUrl); - } - try { Map clientConf = pulsarNBClientConf.getClientConfMap(); - pulsarClient = clientBuilder.loadConf(clientConf).build(); + // Override "client.serviceUrl" setting in config.properties + clientConf.remove("serviceUrl", pulsarSvcUrl); + + pulsarClient = clientBuilder + .loadConf(clientConf) + .serviceUrl(pulsarSvcUrl) + .build(); } catch (PulsarClientException pce) { logger.error("Fail to create PulsarClient from global configuration!"); @@ -72,4 +86,387 @@ public class PulsarSpace { return pulsarNBClientConf; } public Schema getPulsarSchema() { return pulsarSchema; } + + + + ////////////////////////////////////// + // Producer Processing --> start + ////////////////////////////////////// + // Topic name IS mandatory + // - It must be set at either global level or cycle level + // - If set at both levels, cycle level setting takes precedence + private String getEffectiveProducerTopicName(String cycleTopicName) { + if ( !StringUtils.isBlank(cycleTopicName) ) { + return cycleTopicName; + } + + String globalTopicName = pulsarNBClientConf.getProducerTopicName(); + if ( !StringUtils.isBlank(globalTopicName) ) { + return globalTopicName; + } + + throw new RuntimeException(" topic name must be set at either global level or cycle level!"); + } + + // Producer name is NOT mandatory + // - It can be set at either global level or cycle level + // - If set at both levels, cycle level setting takes precedence + private String getEffectiveProducerName(String cycleProducerName) { + if ( !StringUtils.isBlank(cycleProducerName) ) { + return cycleProducerName; + } + + String globalProducerName = pulsarNBClientConf.getProducerName(); + if ( !StringUtils.isBlank(globalProducerName) ) { + return globalProducerName; + } + + return ""; + } + + public Producer getProducer(String cycleTopicName, String cycleProducerName) { + String topicName = getEffectiveProducerTopicName(cycleTopicName); + String producerName = getEffectiveProducerName(cycleProducerName); + + if ( StringUtils.isBlank(topicName) ) { + throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level"); + } + + String encodedStr = PulsarActivityUtil.encode(producerName, topicName); + Producer producer = producers.get(encodedStr); + + if (producer == null) { + PulsarClient pulsarClient = getPulsarClient(); + + // Get other possible producer settings that are set at global level + Map producerConf = pulsarNBClientConf.getProducerConfMap(); + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName); + if ( !StringUtils.isBlank(producerName) ) { + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName); + } + + try { + producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create(); + } + catch (PulsarClientException ple) { + throw new RuntimeException("Unable to create a Pulsar producer!"); + } + + producers.put(encodedStr, producer); + } + + return producer; + } + ////////////////////////////////////// + // Producer Processing <-- end + ////////////////////////////////////// + + + + ////////////////////////////////////// + // Consumer Processing --> start + ////////////////////////////////////// + private String getEffectiveTopicNamesStr(String cycleTopicNames) { + if ( !StringUtils.isBlank(cycleTopicNames) ) { + return cycleTopicNames; + } + + String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); + if ( !StringUtils.isBlank(globalTopicNames) ) { + return globalTopicNames; + } + + return ""; + } + private List getEffectiveTopicNames(String cycleTopicNames) { + String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); + + String[] names = effectiveTopicNamesStr.split("[;,]"); + ArrayList effectiveTopicNameList = new ArrayList<>(); + + for (String name : names) { + if ( !StringUtils.isBlank(name) ) + effectiveTopicNameList.add(name.trim()); + } + + + return effectiveTopicNameList; + } + + private String getEffectiveTopicPatternStr(String cycleTopicsPattern) { + if ( !StringUtils.isBlank(cycleTopicsPattern) ) { + return cycleTopicsPattern; + } + + String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); + if ( !StringUtils.isBlank(globalTopicsPattern) ) { + return globalTopicsPattern; + } + + return ""; + } + private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) { + String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern; + try { + if ( !StringUtils.isBlank(effectiveTopicsPatternStr) ) + topicsPattern = Pattern.compile(effectiveTopicsPatternStr); + else + topicsPattern = null; + } + catch (PatternSyntaxException pse) { + topicsPattern = null; + } + return topicsPattern; + } + + private String getEffectiveSubscriptionName(String cycleSubscriptionName) { + if ( !StringUtils.isBlank(cycleSubscriptionName) ) { + return cycleSubscriptionName; + } + + String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName(); + if ( !StringUtils.isBlank(globalSubscriptionName) ) { + return globalSubscriptionName; + } + + throw new RuntimeException("Consumer::Subscription name must be set at either global level or cycle level!"); + } + + private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) { + if ( !StringUtils.isBlank(cycleSubscriptionType) ) { + return cycleSubscriptionType; + } + + String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType(); + if ( !StringUtils.isBlank(globalSubscriptionType) ) { + return globalSubscriptionType; + } + + return ""; + } + private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) { + String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType); + SubscriptionType subscriptionType = SubscriptionType.Exclusive; + + if ( !StringUtils.isBlank(effectiveSubscriptionStr) ) { + if ( !PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionStr) ) { + throw new RuntimeException("Consumer::Invalid subscription type: " + effectiveSubscriptionStr); + } + else { + subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr); + } + } + + return subscriptionType; + } + + private String getEffectiveConsumerName(String cycleConsumerName) { + if ( !StringUtils.isBlank(cycleConsumerName) ) { + return cycleConsumerName; + } + + String globalConsumerName = pulsarNBClientConf.getConsumerName(); + if ( !StringUtils.isBlank(globalConsumerName) ) { + return globalConsumerName; + } + + return ""; + } + + public Consumer getConsumer(String cycleTopicUri, + String cycleTopicNames, + String cycleTopicsPattern, + String cycleSubscriptionName, + String cycleSubscriptionType, + String cycleConsumerName) { + + List topicNames = getEffectiveTopicNames(cycleTopicNames); + String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern); + String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); + SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); + String consumerName = getEffectiveConsumerName(cycleConsumerName); + + if ( StringUtils.isBlank(cycleTopicUri) && topicNames.isEmpty() && (topicsPattern == null) ) { + throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!"); + } + + String encodedStr; + // precedence sequence: + // topic_names (consumer statement param) > + // topics_pattern (consumer statement param) > + // topic_uri (document level param) + if ( !topicNames.isEmpty() ) { + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + StringUtils.join(topicNames, "|") ); + } + else if ( topicsPattern != null ){ + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + topicsPatternStr ); + } + else { + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + cycleTopicUri ); + } + + Consumer consumer = consumers.get(encodedStr); + + if (consumer == null) { + PulsarClient pulsarClient = getPulsarClient(); + + // Get other possible producer settings that are set at global level + Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); + + // Explicit topic names will take precedence over topics pattern + if ( !topicNames.isEmpty() ) { + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames); + } + else if ( topicsPattern != null) { + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); + consumerConf.put( + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, + getEffectiveTopicPattern(cycleTopicsPattern)); + } else { + topicNames.add(cycleTopicUri); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames); + } + + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType); + if ( !StringUtils.isBlank(consumerName) ) { + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName); + } + + try { + consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe(); + } + catch (PulsarClientException ple) { + ple.printStackTrace(); + throw new RuntimeException("Unable to create a Pulsar consumer!"); + } + + consumers.put(encodedStr, consumer); + } + + return consumer; + } + ////////////////////////////////////// + // Consumer Processing <-- end + ////////////////////////////////////// + + + ////////////////////////////////////// + // Reader Processing --> Start + ////////////////////////////////////// + private String getEffectiveReaderTopicName(String cycleReaderTopicName) { + if ( !StringUtils.isBlank(cycleReaderTopicName) ) { + return cycleReaderTopicName; + } + + String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName(); + if ( !StringUtils.isBlank(globalReaderTopicName) ) { + return globalReaderTopicName; + } + + throw new RuntimeException("Reader topic name must be set at either global level or cycle level!"); + } + + private String getEffectiveReaderName(String cycleReaderName) { + if ( !StringUtils.isBlank(cycleReaderName) ) { + return cycleReaderName; + } + + String globalReaderName = pulsarNBClientConf.getConsumerName(); + if ( !StringUtils.isBlank(globalReaderName) ) { + return globalReaderName; + } + + return ""; + } + + private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) { + if ( !StringUtils.isBlank(cycleStartMsgPosStr) ) { + return cycleStartMsgPosStr; + } + + String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr(); + if ( !StringUtils.isBlank(globalStartMsgPosStr) ) { + return globalStartMsgPosStr; + } + + return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label; + } + + public Reader getReader(String cycleTopicName, + String cycleReaderName, + String cycleStartMsgPos) { + + String topicName = getEffectiveReaderTopicName(cycleTopicName); + if ( StringUtils.isBlank(topicName) ) { + throw new RuntimeException("Reader:: must specify a topic name either at the global level or the cycle level"); + } + + String readerName = getEffectiveReaderName(cycleReaderName); + + String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos); + if ( !PulsarActivityUtil.isValideReaderStartPosition(startMsgPosStr) ) { + throw new RuntimeException("Reader:: Invalid value for Reader start message position!"); + } + + String encodedStr = PulsarActivityUtil.encode(topicName, readerName, startMsgPosStr); + Reader reader = readers.get(encodedStr); + + if (reader == null) { + PulsarClient pulsarClient = getPulsarClient(); + + Map readerConf = pulsarNBClientConf.getReaderConfMap(); + readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName); + + if ( !StringUtils.isBlank(readerName) ) { + readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName); + } + + // "reader.startMessagePos" is NOT a standard Pulsar reader conf + readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label); + + try { + ReaderBuilder readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf); + + MessageId startMsgId = MessageId.latest; + if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) { + startMsgId = MessageId.earliest; + } + //TODO: custom start message position is NOT supported yet + //else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) { + // startMsgId = MessageId.latest; + //} + + if (startMsgId != null) { + readerBuilder = readerBuilder.startMessageId(startMsgId); + } + + reader = readerBuilder.create(); + } + catch (PulsarClientException ple) { + ple.printStackTrace(); + throw new RuntimeException("Unable to create a Pulsar reader!"); + } + + readers.put(encodedStr, reader); + } + + return reader; + } + ////////////////////////////////////// + // Reader Processing <-- end + ////////////////////////////////////// } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java index fe9a64bf7..598c5e110 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.commons.lang3.StringUtils; import java.util.concurrent.ConcurrentHashMap; @@ -23,22 +24,8 @@ public class PulsarSpaceCache { } public PulsarSpace getPulsarSpace(String name) { - - String pulsarClientType = activity.getPulsarConf().getPulsarClientType(); - - if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString())) { - return clientScopes.computeIfAbsent(name, spaceName -> new PulsarProducerSpace(spaceName, activity.getPulsarConf())); - } - else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString())) { - return clientScopes.computeIfAbsent(name, spaceName -> new PulsarConsumerSpace(spaceName, activity.getPulsarConf())); - } - else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString())) { - return clientScopes.computeIfAbsent(name, spaceName -> new PulsarReaderSpace(spaceName, activity.getPulsarConf())); - } - // TODO: add support for websocket-producer and managed-ledger - else { - throw new RuntimeException("Unsupported Pulsar client type: " + pulsarClientType); - } + return clientScopes.computeIfAbsent(name, spaceName -> + new PulsarSpace(spaceName, activity.getPulsarConf(), activity.getPulsarServiceUrl())); } public PulsarActivity getActivity() { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java new file mode 100644 index 000000000..764048748 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java @@ -0,0 +1,17 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; + +public class PulsarBatchProducerEndMapper extends PulsarOpMapper{ + + public PulsarBatchProducerEndMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace) { + super(cmdTpl, clientSpace); + } + + @Override + public PulsarOp apply(long value) { + return new PulsarBatchProducerEndOp(); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java new file mode 100644 index 000000000..c7fc0902f --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java @@ -0,0 +1,37 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.nb.api.errors.BasicError; +import org.apache.pulsar.client.api.BatchMessageContainer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.BatchMessageContainerBase; +import org.apache.pulsar.client.impl.DefaultBatcherBuilder; +import org.apache.pulsar.common.util.FutureUtil; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class PulsarBatchProducerEndOp implements PulsarOp { + @Override + public void run() { + List> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get(); + Producer producer = PulsarBatchProducerStartOp.threadLocalProducer.get(); + + if ( (container != null) && (!container.isEmpty()) ) { + try { + // producer.flushAsync().get(); + FutureUtil.waitForAll(container).get(); + } + catch(Exception e) { + throw new RuntimeException("Batch Producer:: failed to send (some of) the batched messages!"); + } + + container.clear(); + PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.set(null); + } else { + throw new BasicError("You tried to end an empty batch message container. This means you" + + " did initiate the batch container properly, or there is an error in your" + + " pulsar op sequencing and ratios."); + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java new file mode 100644 index 000000000..34f988447 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java @@ -0,0 +1,33 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.Producer; + +import java.util.function.LongFunction; + +public class PulsarBatchProducerMapper extends PulsarOpMapper{ + private final LongFunction keyFunc; + private final LongFunction payloadFunc; + + public PulsarBatchProducerMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction keyFunc, + LongFunction payloadFunc) { + super(cmdTpl, clientSpace); + this.keyFunc = keyFunc; + this.payloadFunc = payloadFunc; + } + + @Override + public PulsarOp apply(long value) { + String msgKey = keyFunc.apply(value); + String msgPayload = payloadFunc.apply(value); + + return new PulsarBatchProducerOp( + clientSpace.getPulsarSchema(), + msgKey, + msgPayload + ); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java new file mode 100644 index 000000000..bbc9530ce --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java @@ -0,0 +1,62 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.util.AvroUtil; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class PulsarBatchProducerOp implements PulsarOp { + + private final Schema pulsarSchema; + private final String msgKey; + private final String msgPayload; + + public PulsarBatchProducerOp(Schema schema, + String key, + String payload) { + this.pulsarSchema = schema; + this.msgKey = key; + this.msgPayload = payload; + } + + + @Override + public void run() { + if ( (msgPayload == null) || msgPayload.isEmpty() ) { + throw new RuntimeException("Message payload (\"msg-value\") can't be empty!"); + } + + List> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get(); + Producer producer = PulsarBatchProducerStartOp.threadLocalProducer.get(); + assert (producer != null) && (container != null); + + TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema); + if ( (msgKey != null) && (!msgKey.isEmpty()) ) { + typedMessageBuilder = typedMessageBuilder.key(msgKey); + } + + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( + (GenericAvroSchema) pulsarSchema, + pulsarSchema.getSchemaInfo().getSchemaDefinition(), + msgPayload + ); + typedMessageBuilder = typedMessageBuilder.value(payload); + } + else { + typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); + } + + container.add(typedMessageBuilder.sendAsync()); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java new file mode 100644 index 000000000..7b692ae98 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java @@ -0,0 +1,26 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.Producer; + +import java.util.function.LongFunction; + +public class PulsarBatchProducerStartMapper extends PulsarOpMapper{ + + private final LongFunction> batchProducerFunc; + + public PulsarBatchProducerStartMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction> batchProducerFunc) { + super(cmdTpl, clientSpace); + this.batchProducerFunc = batchProducerFunc; + } + + @Override + public PulsarOp apply(long value) { + Producer batchProducer = batchProducerFunc.apply(value); + + return new PulsarBatchProducerStartOp(batchProducer); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java new file mode 100644 index 000000000..301b49d66 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java @@ -0,0 +1,33 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.nb.api.errors.BasicError; +import org.apache.commons.compress.utils.Lists; +import org.apache.pulsar.client.api.*; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class PulsarBatchProducerStartOp implements PulsarOp { + + // TODO: ensure sane container lifecycle management + public final static ThreadLocal>> threadLocalBatchMsgContainer = new ThreadLocal<>(); + public final static ThreadLocal> threadLocalProducer = new ThreadLocal<>(); + + public PulsarBatchProducerStartOp(Producer batchProducer) { + threadLocalProducer.set(batchProducer); + } + + @Override + public void run() { + List> container = threadLocalBatchMsgContainer.get(); + + if ( container == null ) { + container = Lists.newArrayList(); + threadLocalBatchMsgContainer.set(container); + } else { + throw new BasicError("You tried to create a batch message container where one was already" + + " defined. This means you did not flush and unset the last container, or there is an error in your" + + " pulsar op sequencing and ratios."); + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index 4a69e6c9f..c1ff0332d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; @@ -16,22 +17,28 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarConsumerMapper implements LongFunction { - private final CommandTemplate cmdTpl; - private final Schema pulsarSchema; +public class PulsarConsumerMapper extends PulsarOpMapper { private final LongFunction> consumerFunc; + private final LongFunction asyncApiFunc; public PulsarConsumerMapper(CommandTemplate cmdTpl, - Schema pulsarSchema, - LongFunction> consumerFunc) { - this.cmdTpl = cmdTpl; - this.pulsarSchema = pulsarSchema; + PulsarSpace clientSpace, + LongFunction> consumerFunc, + LongFunction asyncApiFunc) { + super(cmdTpl, clientSpace); this.consumerFunc = consumerFunc; + this.asyncApiFunc = asyncApiFunc; } @Override public PulsarOp apply(long value) { Consumer consumer = consumerFunc.apply(value); - return new PulsarConsumerOp(consumer, pulsarSchema); + boolean asyncApi = asyncApiFunc.apply(value); + + return new PulsarConsumerOp( + consumer, + clientSpace.getPulsarSchema(), + asyncApi + ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 1833772e4..d3606a059 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -3,23 +3,20 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; -import java.nio.charset.StandardCharsets; - public class PulsarConsumerOp implements PulsarOp { private final Consumer consumer; private final Schema pulsarSchema; + private final boolean asyncPulsarOp; - public PulsarConsumerOp(Consumer consumer, Schema schema) { + public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp) { this.consumer = consumer; this.pulsarSchema = schema; + this.asyncPulsarOp = asyncPulsarOp; } - @Override - public void run() { + public void syncConsume() { try { Message message = consumer.receive(); @@ -30,15 +27,25 @@ public class PulsarConsumerOp implements PulsarOp { AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData()); System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString()); - } - else { + } else { System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData())); } consumer.acknowledge(message.getMessageId()); - } catch (PulsarClientException e) { throw new RuntimeException(e); } } + + public void asyncConsume() { + //TODO: add support for async consume + } + + @Override + public void run() { + if (!asyncPulsarOp) + syncConsume(); + else + asyncConsume(); + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java new file mode 100644 index 000000000..06c239c8e --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java @@ -0,0 +1,19 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; + +import java.util.function.LongFunction; + +public abstract class PulsarOpMapper implements LongFunction { + protected final CommandTemplate cmdTpl; + protected final PulsarSpace clientSpace; + + public PulsarOpMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace) { + this.cmdTpl = cmdTpl; + this.clientSpace = clientSpace; + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index cebc8287c..7212356e0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -17,21 +17,21 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarProducerMapper implements LongFunction { - private final CommandTemplate cmdTpl; - private final Schema pulsarSchema; +public class PulsarProducerMapper extends PulsarOpMapper { private final LongFunction> producerFunc; + private final LongFunction asyncApiFunc; private final LongFunction keyFunc; private final LongFunction payloadFunc; public PulsarProducerMapper(CommandTemplate cmdTpl, - Schema pulsarSchema, + PulsarSpace clientSpace, LongFunction> producerFunc, + LongFunction asyncApiFunc, LongFunction keyFunc, LongFunction payloadFunc) { - this.cmdTpl = cmdTpl; - this.pulsarSchema = pulsarSchema; + super(cmdTpl, clientSpace); this.producerFunc = producerFunc; + this.asyncApiFunc = asyncApiFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; } @@ -39,9 +39,15 @@ public class PulsarProducerMapper implements LongFunction { @Override public PulsarOp apply(long value) { Producer producer = producerFunc.apply(value); + boolean asyncApi = asyncApiFunc.apply(value); String msgKey = keyFunc.apply(value); String msgPayload = payloadFunc.apply(value); - return new PulsarProducerOp(producer, pulsarSchema, msgKey, msgPayload); + return new PulsarProducerOp( + producer, + clientSpace.getPulsarSchema(), + asyncApi, + msgKey, + msgPayload); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 9ee2d4d51..70494d9cc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -1,58 +1,90 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarAction; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; public class PulsarProducerOp implements PulsarOp { + + private final static Logger logger = LogManager.getLogger(PulsarProducerOp.class); + private final Producer producer; private final Schema pulsarSchema; private final String msgKey; private final String msgPayload; + private final boolean asyncPulsarOp; - public PulsarProducerOp(Producer producer, Schema schema, String key, String payload) { + public PulsarProducerOp(Producer producer, + Schema schema, + boolean asyncPulsarOp, + String key, + String payload) { this.producer = producer; this.pulsarSchema = schema; this.msgKey = key; this.msgPayload = payload; + this.asyncPulsarOp = asyncPulsarOp; } @Override public void run() { - try { - if ( (msgPayload == null) || msgPayload.isEmpty() ) { - throw new RuntimeException("Message payload (\"msg-value\" can't be empty!"); + if ( (msgPayload == null) || msgPayload.isEmpty() ) { + throw new RuntimeException("Message payload (\"msg-value\") can't be empty!"); + } + + TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema); + if ( (msgKey != null) && (!msgKey.isEmpty()) ) { + typedMessageBuilder = typedMessageBuilder.key(msgKey); + } + + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( + (GenericAvroSchema) pulsarSchema, + pulsarSchema.getSchemaInfo().getSchemaDefinition(), + msgPayload + ); + typedMessageBuilder = typedMessageBuilder.value(payload); + } + else { + typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); + } + + //TODO: add error handling with failed message production + if (!asyncPulsarOp) { + try { + logger.trace("sending message"); + typedMessageBuilder.send(); + } catch (PulsarClientException pce) { + logger.trace("failed sending message"); + throw new RuntimeException(pce); } + } + else { + try { + CompletableFuture future = typedMessageBuilder.sendAsync(); + future.get(); - TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema); - if ( (msgKey != null) && (!msgKey.isEmpty()) ) { - typedMessageBuilder = typedMessageBuilder.key(msgKey); + /*.thenRun(() -> { +// System.out.println("Producing message succeeded: key - " + msgKey + "; payload - " + msgPayload); + }).exceptionally(ex -> { + System.out.println("Producing message failed: key - " + msgKey + "; payload - " + msgPayload); + return ex; + })*/ + ; } - - SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); - if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.generic.GenericRecord avroGenericRecord = - AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, msgPayload); - - GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( - (GenericAvroSchema) pulsarSchema, avroGenericRecord); - - typedMessageBuilder = typedMessageBuilder.value(payload); + catch (Exception e) { + throw new RuntimeException(e); } - else { - typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); - } - - typedMessageBuilder.send(); - - } catch (PulsarClientException e) { - throw new RuntimeException(e); } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java index 7ae971a7e..cbd5d203b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java @@ -1,27 +1,35 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import java.util.function.LongFunction; -public class PulsarReaderMapper implements LongFunction { - private final CommandTemplate cmdTpl; - private final Schema pulsarSchema; +public class PulsarReaderMapper extends PulsarOpMapper { + private final LongFunction> readerFunc; + private final LongFunction asyncApiFunc; public PulsarReaderMapper(CommandTemplate cmdTpl, - Schema pulsarSchema, - LongFunction> readerFunc) { - this.cmdTpl = cmdTpl; - this.pulsarSchema = pulsarSchema; + PulsarSpace clientSpace, + LongFunction> readerFunc, + LongFunction asyncApiFunc) { + super(cmdTpl, clientSpace); this.readerFunc = readerFunc; + this.asyncApiFunc = asyncApiFunc; } @Override public PulsarOp apply(long value) { Reader reader = readerFunc.apply(value); - return new PulsarReaderOp(reader, pulsarSchema); + boolean asyncApi = asyncApiFunc.apply(value); + + return new PulsarReaderOp( + reader, + clientSpace.getPulsarSchema(), + asyncApi + ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java index 6551f3abf..1df68183c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java @@ -11,14 +11,15 @@ import org.apache.pulsar.common.schema.SchemaType; public class PulsarReaderOp implements PulsarOp { private final Reader reader; private final Schema pulsarSchema; + private final boolean asyncPulsarOp; - public PulsarReaderOp(Reader reader, Schema schema) { + public PulsarReaderOp(Reader reader, Schema schema, boolean asyncPulsarOp) { this.reader = reader; this.pulsarSchema = schema; + this.asyncPulsarOp = asyncPulsarOp; } - @Override - public void run() { + public void syncRead() { try { SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); @@ -42,4 +43,16 @@ public class PulsarReaderOp implements PulsarOp { throw new RuntimeException(e); } } + + public void asyncRead() { + //TODO: add support for async read + } + + @Override + public void run() { + if (!asyncPulsarOp) + syncRead(); + else + asyncRead(); + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 1a1086075..44d87ff13 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -5,6 +5,7 @@ import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.engine.api.scoping.ScopedSupplier; import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; @@ -19,7 +20,6 @@ public class ReadyPulsarOp implements LongFunction { private final CommandTemplate cmdTpl; private final PulsarSpace clientSpace; private final LongFunction opFunc; - private final Schema pulsarSchema; // TODO: Add docs for the command template with respect to the OpTemplate @@ -44,59 +44,36 @@ public class ReadyPulsarOp implements LongFunction { this.clientSpace = pcache.getPulsarSpace("default"); } - this.pulsarSchema = clientSpace.getPulsarSchema(); - this.opFunc = resolve(); ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton")); Supplier> opSupplier = scope.supplier(this::resolve); } - private LongFunction resolve() { - String clientType = clientSpace.getPulsarClientConf().getPulsarClientType(); - - // TODO: Complete implementation for reader, websocket-producer and managed-ledger - if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) { - assert clientSpace instanceof PulsarProducerSpace; - return resolveProducer((PulsarProducerSpace) clientSpace); - } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString()) ) { - assert clientSpace instanceof PulsarConsumerSpace; - return resolveConsumer((PulsarConsumerSpace)clientSpace); - } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString()) ) { - assert clientSpace instanceof PulsarReaderSpace; - return resolveReader((PulsarReaderSpace)clientSpace); /* - } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.WSOKT_PRODUCER.toString()) ) { - } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.MANAGED_LEDGER.toString()) ) { - */ - } else { - throw new RuntimeException("Unsupported Pulsar client: " + clientType); - } + @Override + public PulsarOp apply(long value) { + return opFunc.apply(value); } - private LongFunction resolveProducer( - PulsarProducerSpace clientSpace - ) { + private boolean IsBoolean (String str) { + return StringUtils.equalsAnyIgnoreCase(str, "yes", "true"); + } + + private LongFunction resolve() { + if (cmdTpl.containsKey("topic_url")) { throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); } - LongFunction cycle_producer_name_func; - if (cmdTpl.isStatic("producer-name")) { - cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer-name"); - } else if (cmdTpl.isDynamic("producer-name")) { - cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer-name", l); - } else { - cycle_producer_name_func = (l) -> null; - } - - LongFunction topic_uri_func; + // Global parameter: topic_uri + LongFunction topicUriFunc; if (cmdTpl.containsKey("topic_uri")) { if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) { throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'."); } else if (cmdTpl.isStatic("topic_uri")) { - topic_uri_func = (l) -> cmdTpl.getStatic("topic_uri"); + topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri"); } else { - topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l); + topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l); } } else if (cmdTpl.containsKey("topic")) { @@ -109,9 +86,9 @@ public class ReadyPulsarOp implements LongFunction { String topic = cmdTpl.getStaticOr("topic", ""); String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic; - topic_uri_func = (l) -> composited; + topicUriFunc = (l) -> composited; } else { // some or all dynamic fields, composite into a single dynamic call - topic_uri_func = (l) -> + topicUriFunc = (l) -> cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent") + "://" + cmdTpl.getOr("tenant", l, "public") + "/" + cmdTpl.getOr("namespace", l, "default") @@ -119,40 +96,102 @@ public class ReadyPulsarOp implements LongFunction { } } else { - topic_uri_func = (l) -> null; + topicUriFunc = (l) -> null; + } + + // Global parameter: async_api + LongFunction asyncApiFunc; + if ( cmdTpl.containsKey("async_api") ) { + if ( cmdTpl.isStatic("async_api") ) + asyncApiFunc = (l) -> IsBoolean(cmdTpl.getStatic("async_api")); + else + throw new RuntimeException("\"async_api\" parameter cannot be dynamic!"); + } + else { + asyncApiFunc = (l) -> false; + } + + if ( !cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype") ) { + throw new RuntimeException("Statement parameter \"optype\" must have a valid value!"); + } + String stmtOpType = cmdTpl.getStatic("optype"); + + // TODO: Complete implementation for websocket-producer and managed-ledger + if /*( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_TENANT.label) ) { + return resolveCreateTenant(clientSpace); + } else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_NAMESPACE.label) ) { + return resolveCreateNameSpace(clientSpace); + } else if*/ ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label) ) { + return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc); + } else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label) ) { + return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc); + } else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label) ) { + return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc); + } else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label) ) { + return resolveMsgBatchSendStart(clientSpace, topicUriFunc); + } else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label) ) { + return resolveMsgBatchSend(clientSpace); + } else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label) ) { + return resolveMsgBatchSendEnd(clientSpace); + } else { + throw new RuntimeException("Unsupported Pulsar operation type" ); + } + } + + private LongFunction resolveMsgSend( + PulsarSpace clientSpace, + LongFunction topic_uri_func, + LongFunction async_api_func + ) { + LongFunction cycle_producer_name_func; + if (cmdTpl.isStatic("producer_name")) { + cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer_name"); + } else if (cmdTpl.isDynamic("producer_name")) { + cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer_name", l); + } else { + cycle_producer_name_func = (l) -> null; } LongFunction> producerFunc = - (l) -> clientSpace.getProducer(cycle_producer_name_func.apply(l), topic_uri_func.apply(l)); + (l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l)); LongFunction keyFunc; - if (cmdTpl.isStatic("msg-key")) { - keyFunc = (l) -> cmdTpl.getStatic("msg-key"); - } else if (cmdTpl.isDynamic("msg-key")) { - keyFunc = (l) -> cmdTpl.getDynamic("msg-key", l); + if (cmdTpl.isStatic("msg_key")) { + keyFunc = (l) -> cmdTpl.getStatic("msg_key"); + } else if (cmdTpl.isDynamic("msg_key")) { + keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l); } else { keyFunc = (l) -> null; } LongFunction valueFunc; - if (cmdTpl.containsKey("msg-value")) { - if (cmdTpl.isStatic("msg-value")) { - valueFunc = (l) -> cmdTpl.getStatic("msg-value"); - } else if (cmdTpl.isDynamic("msg-value")) { - valueFunc = (l) -> cmdTpl.getDynamic("msg-value", l); + if (cmdTpl.containsKey("msg_value")) { + if (cmdTpl.isStatic("msg_value")) { + valueFunc = (l) -> cmdTpl.getStatic("msg_value"); + } else if (cmdTpl.isDynamic("msg_value")) { + valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l); } else { valueFunc = (l) -> null; } } else { - throw new RuntimeException("\"msg-value\" field must be specified!"); + throw new RuntimeException("Producer:: \"msg_value\" field must be specified!"); } - return new PulsarProducerMapper(cmdTpl, pulsarSchema, producerFunc, keyFunc, valueFunc); + return new PulsarProducerMapper( + cmdTpl, + clientSpace, + producerFunc, + async_api_func, + keyFunc, + valueFunc); } - private LongFunction resolveConsumer( - PulsarConsumerSpace clientSpace + private LongFunction resolveMsgConsume( + PulsarSpace clientSpace, + LongFunction topic_uri_func, + LongFunction async_api_func ) { + // Topic list (multi-topic) LongFunction topic_names_func; if (cmdTpl.isStatic("topic-names")) { topic_names_func = (l) -> cmdTpl.getStatic("topic-names"); @@ -162,6 +201,7 @@ public class ReadyPulsarOp implements LongFunction { topic_names_func = (l) -> null; } + // Topic pattern (multi-topic) LongFunction topics_pattern_func; if (cmdTpl.isStatic("topics-pattern")) { topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern"); @@ -200,6 +240,7 @@ public class ReadyPulsarOp implements LongFunction { LongFunction> consumerFunc = (l) -> clientSpace.getConsumer( + topic_uri_func.apply(l), topic_names_func.apply(l), topics_pattern_func.apply(l), subscription_name_func.apply(l), @@ -207,21 +248,14 @@ public class ReadyPulsarOp implements LongFunction { consumer_name_func.apply(l) ); - return new PulsarConsumerMapper(cmdTpl, pulsarSchema, consumerFunc); + return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func); } - private LongFunction resolveReader( - PulsarReaderSpace clientSpace + private LongFunction resolveMsgRead( + PulsarSpace clientSpace, + LongFunction topic_uri_func, + LongFunction async_api_func ) { - LongFunction topic_name_func; - if (cmdTpl.isStatic("topic-name")) { - topic_name_func = (l) -> cmdTpl.getStatic("topic-name"); - } else if (cmdTpl.isDynamic("topic-name")) { - topic_name_func = (l) -> cmdTpl.getDynamic("topic-name", l); - } else { - topic_name_func = (l) -> null; - } - LongFunction reader_name_func; if (cmdTpl.isStatic("reader-name")) { reader_name_func = (l) -> cmdTpl.getStatic("reader-name"); @@ -242,16 +276,64 @@ public class ReadyPulsarOp implements LongFunction { LongFunction> readerFunc = (l) -> clientSpace.getReader( - topic_name_func.apply(l), + topic_uri_func.apply(l), reader_name_func.apply(l), start_msg_pos_str_func.apply(l) ); - return new PulsarReaderMapper(cmdTpl, pulsarSchema, readerFunc); + return new PulsarReaderMapper(cmdTpl, clientSpace, readerFunc, async_api_func); } - @Override - public PulsarOp apply(long value) { - return opFunc.apply(value); + private LongFunction resolveMsgBatchSendStart( + PulsarSpace clientSpace, + LongFunction topic_uri_func + ) { + LongFunction cycle_batch_producer_name_func; + if (cmdTpl.isStatic("batch_producer_name")) { + cycle_batch_producer_name_func = (l) -> cmdTpl.getStatic("batch_producer_name"); + } else if (cmdTpl.isDynamic("batch_producer_name")) { + cycle_batch_producer_name_func = (l) -> cmdTpl.getDynamic("batch_producer_name", l); + } else { + cycle_batch_producer_name_func = (l) -> null; + } + + LongFunction> batchProducerFunc = + (l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l)); + + return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, batchProducerFunc); + } + + private LongFunction resolveMsgBatchSend(PulsarSpace clientSpace) { + LongFunction keyFunc; + if (cmdTpl.isStatic("msg_key")) { + keyFunc = (l) -> cmdTpl.getStatic("msg_key"); + } else if (cmdTpl.isDynamic("msg_key")) { + keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l); + } else { + keyFunc = (l) -> null; + } + + LongFunction valueFunc; + if (cmdTpl.containsKey("msg_value")) { + if (cmdTpl.isStatic("msg_value")) { + valueFunc = (l) -> cmdTpl.getStatic("msg_value"); + } else if (cmdTpl.isDynamic("msg_value")) { + valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l); + } else { + valueFunc = (l) -> null; + } + } else { + throw new RuntimeException("Batch Producer:: \"msg_value\" field must be specified!"); + } + + return new PulsarBatchProducerMapper( + cmdTpl, + clientSpace, + keyFunc, + valueFunc); + } + + private LongFunction resolveMsgBatchSendEnd(PulsarSpace clientSpace) { + return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java index 2a4f1f75e..d799312b4 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java @@ -80,6 +80,10 @@ public class AvroUtil { return recordBuilder.build(); } + public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDef, String jsonData) { + org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData); + return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord); + } public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDef, String jsonData) { GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDef); org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index 7ddf858b5..ba0cd5718 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -23,21 +23,25 @@ public class PulsarActivityUtil { private final static Logger logger = LogManager.getLogger(PulsarActivityUtil.class); // Supported message operation types - public enum CLIENT_TYPES { - PRODUCER("producer"), - CONSUMER("consumer"), - READER("reader"), - WSOKT_PRODUCER("websocket-producer"), - MANAGED_LEDGER("managed-ledger") + // TODO: websocket-producer and managed-ledger + public enum OP_TYPES { + CREATE_TENANT("create-tenant"), + CREATE_NAMESPACE("create-namespace"), + BATCH_MSG_SEND_START("batch-msg-send-start"), + BATCH_MSG_SEND("batch-msg-send"), + BATCH_MSG_SEND_END("batch-msg-send-end"), + MSG_SEND("msg-send"), + MSG_CONSUME("msg-consume"), + MSG_READ("msg-read") ; public final String label; - CLIENT_TYPES(String label) { + OP_TYPES(String label) { this.label = label; } } public static boolean isValidClientType(String type) { - return Arrays.stream(CLIENT_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase())); + return Arrays.stream(OP_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase())); } @@ -157,6 +161,19 @@ public class PulsarActivityUtil { return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); } + public enum SUBSCRIPTION_TYPE { + exclusive("exclusive"), + failover("failover"), + shared("shared"), + key_shared("key_shared"); + + public final String label; + SUBSCRIPTION_TYPE(String label) { this.label = label; } + } + public static boolean isValidSubscriptionType(String item) { + return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } + /////// // Standard reader configuration (activity-level settings) // - https://pulsar.apache.org/docs/en/client-libraries-java/#reader @@ -202,6 +219,9 @@ public class PulsarActivityUtil { public final String label; READER_MSG_POSITION_TYPE(String label) { this.label = label; } } + public static boolean isValideReaderStartPosition(String item) { + return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } /////// // Valid websocket-producer configuration (activity-level settings) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index 9d656caac..e3d78ec0e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -7,7 +7,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; import org.apache.commons.configuration2.builder.fluent.Parameters; import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler; import org.apache.commons.configuration2.ex.ConfigurationException; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -23,13 +23,11 @@ public class PulsarNBClientConf { private String canonicalFilePath = ""; - public static final String DRIVER_CONF_PREFIX = "driver"; public static final String SCHEMA_CONF_PREFIX = "schema"; public static final String CLIENT_CONF_PREFIX = "client"; public static final String PRODUCER_CONF_PREFIX = "producer"; public static final String CONSUMER_CONF_PREFIX = "consumer"; public static final String READER_CONF_PREFIX = "reader"; - private HashMap driverConfMap = new HashMap<>(); private HashMap schemaConfMap = new HashMap<>(); private HashMap clientConfMap = new HashMap<>(); private HashMap producerConfMap = new HashMap<>(); @@ -53,14 +51,6 @@ public class PulsarNBClientConf { Configuration config = builder.getConfiguration(); - // Get driver specific configuration settings - for (Iterator it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) { - String confKey = it.next(); - String confVal = config.getProperty(confKey).toString(); - if ( !StringUtils.isBlank(confVal) ) - driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey)); - } - // Get schema specific configuration settings for (Iterator it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); @@ -112,41 +102,6 @@ public class PulsarNBClientConf { } - ////////////////// - // Get NB Driver related config - public Map getDriverConfMap() { - return this.driverConfMap; - } - public boolean hasDriverConfKey(String key) { - if (key.contains(DRIVER_CONF_PREFIX)) - return driverConfMap.containsKey(key.substring(DRIVER_CONF_PREFIX.length()+1)); - else - return driverConfMap.containsKey(key); - } - public Object getDriverConfValue(String key) { - if (key.contains(DRIVER_CONF_PREFIX)) - return driverConfMap.get(key.substring(DRIVER_CONF_PREFIX.length()+1)); - else - return driverConfMap.get(key); - } - public void setDriverConfValue(String key, Object value) { - if (key.contains(DRIVER_CONF_PREFIX)) - driverConfMap.put(key.substring(DRIVER_CONF_PREFIX.length()+1), value); - else - driverConfMap.put(key, value); - } - // other driver helper functions ... - public String getPulsarClientType() { - Object confValue = getDriverConfValue("driver.client-type"); - - // If not explicitly specifying Pulsar client type, "producer" is the default type - if (confValue == null) - return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString(); - else - return confValue.toString(); - } - - ////////////////// // Get Schema related config public Map getSchemaConfMap() { diff --git a/driver-pulsar/src/main/resources/activities/config.properties b/driver-pulsar/src/main/resources/activities/config.properties index ec954c961..589225a61 100644 --- a/driver-pulsar/src/main/resources/activities/config.properties +++ b/driver-pulsar/src/main/resources/activities/config.properties @@ -1,11 +1,3 @@ -### NB Pulsar driver related configuration - driver.xxx -driver.client-type = producer -driver.num-workers = 1 -# TODO: functionalities to be completed -driver.sync-mode = sync -driver.msg-recv-ouput = console - - ### Schema related configurations - schema.xxx # valid types: # - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type) @@ -17,21 +9,24 @@ driver.msg-recv-ouput = console # 1) primitive types, including bytearray (byte[]) which is default, for messages without schema # 2) Avro for messages with schema schema.type = avro -schema.definition = file://// +schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc + ### Pulsar client related configurations - client.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#client -client.serviceUrl: pulsar://localhost:6650 client.connectionTimeoutMs = 5000 + ### Producer related configurations (global) - producer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer producer.producerName = producer.topicName = persistent://public/default/mynbtest producer.sendTimeoutMs = + + ### Consumer related configurations (global) - consumer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer consumer.topicNames = @@ -41,10 +36,12 @@ consumer.subscriptionType = consumer.consumerName = consumer.receiverQueueSize = + + ### Reader related configurations (global) - reader.xxx # https://pulsar.apache.org/docs/en/client-libraries-java/#reader # - valid Pos: earliest, latest, custom::file://// reader.topicName = persistent://public/default/nbpulsar reader.receiverQueueSize = reader.readerName = -#reader.startMessagePos = earliest +reader.startMessagePos = earliest diff --git a/driver-pulsar/src/main/resources/activities/pulsar.yaml b/driver-pulsar/src/main/resources/activities/pulsar.yaml index c329e193f..b4da7635d 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar.yaml @@ -12,50 +12,82 @@ bindings: reading_value: ToFloat(100); topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L)); +# global parameters that apply to all Pulsar client types: +params: + #topic_uri: "persistent://public/default/{topic}" + topic_uri: "persistent://public/default/nbpulsar" + async_api: "false" + blocks: -# - create-tenant-namespace: -# tags: -# type: create-tenant-namespace -# statements: -# tenant: {tenant} -# namespace: {namespace} + - name: admin-block + tags: + phase: create-tenant-namespace + statements: + - name: s1 + optype: create-tenant + tenant: "{tenant}" + - name: s2 + optype: create-namespace + namespace: "{namespace}" + + - name: batch-producer-block + tags: + phase: batch-producer + statements: + - name: s1 + optype: batch-msg-send-start + # For batch producer, "producer_name" should be associated with batch start + # batch_producer_name: {batch_producer_name} + ratio: 1 + - name: s2 + optype: batch-msg-send + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } + ratio: 5 + - name: s3 + optype: batch-msg-send-end + ratio: 1 - name: producer-block tags: - op-type: producer + phase: producer statements: - - producer-stuff: - ####### - # NOTE: tenant and namespace must be static and pre-exist in Pulsar first - # topic_uri: "[persistent|non-persistent]:////" -# topic_uri: "persistent://public/default/{topic}" - topic_uri: "persistent://public/default/nbpulsar" - # producer-name: - msg-key: "{mykey}" - msg-value: | - { - "SensorID": "{sensor_id}", - "SensorType": "Temperature", - "ReadingTime": "{reading_time}", - "ReadingValue": {reading_value} - } + - name: s1 + # producer_name: {producer_name} + optype: msg-send + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } - name: consumer-block tags: - op-type: consumer + phase: consumer statements: - - consumer-stuff: - topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest" - topics-pattern: "public/default/.*" - subscription-name: - subscription-type: - consumer-name: + - name: s1 + optype: msg-consume + topic_names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest" + topics_pattern: "public/default/.*" + subscription_name: + subscription_type: + consumer_name: - reader: tags: - op-type: reader + phase: reader statements: - - reader-stuff: + - name: s1 + optype: msg-read # - websocket-producer: # tags: @@ -68,5 +100,3 @@ blocks: # type: managed-ledger # statement: # - managed-ledger-stuff: - - From 58a93bd7b0f7bd3170cbd6b7b05fcda64ad2d3da Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Wed, 10 Mar 2021 12:18:15 -0600 Subject: [PATCH 2/4] Document update --- .../resources/activities/config.properties | 6 +- .../src/main/resources/activities/pulsar.yaml | 9 +- driver-pulsar/src/main/resources/pulsar.md | 425 ++++++++++++++---- driver-pulsar/src/main/resources/topics.md | 3 - 4 files changed, 347 insertions(+), 96 deletions(-) delete mode 100644 driver-pulsar/src/main/resources/topics.md diff --git a/driver-pulsar/src/main/resources/activities/config.properties b/driver-pulsar/src/main/resources/activities/config.properties index 589225a61..e7182198b 100644 --- a/driver-pulsar/src/main/resources/activities/config.properties +++ b/driver-pulsar/src/main/resources/activities/config.properties @@ -9,7 +9,7 @@ # 1) primitive types, including bytearray (byte[]) which is default, for messages without schema # 2) Avro for messages with schema schema.type = avro -schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc +schema.definition = file:// @@ -22,7 +22,7 @@ client.connectionTimeoutMs = 5000 ### Producer related configurations (global) - producer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer producer.producerName = -producer.topicName = persistent://public/default/mynbtest +producer.topicName = producer.sendTimeoutMs = @@ -41,7 +41,7 @@ consumer.receiverQueueSize = ### Reader related configurations (global) - reader.xxx # https://pulsar.apache.org/docs/en/client-libraries-java/#reader # - valid Pos: earliest, latest, custom::file://// -reader.topicName = persistent://public/default/nbpulsar +reader.topicName = reader.receiverQueueSize = reader.readerName = reader.startMessagePos = earliest diff --git a/driver-pulsar/src/main/resources/activities/pulsar.yaml b/driver-pulsar/src/main/resources/activities/pulsar.yaml index b4da7635d..a27f74360 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar.yaml @@ -12,7 +12,7 @@ bindings: reading_value: ToFloat(100); topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L)); -# global parameters that apply to all Pulsar client types: +# document level parameters that apply to all Pulsar client types: params: #topic_uri: "persistent://public/default/{topic}" topic_uri: "persistent://public/default/nbpulsar" @@ -49,7 +49,7 @@ blocks: "ReadingTime": "{reading_time}", "ReadingValue": {reading_value} } - ratio: 5 + ratio: 100 - name: s3 optype: batch-msg-send-end ratio: 1 @@ -59,8 +59,8 @@ blocks: phase: producer statements: - name: s1 - # producer_name: {producer_name} optype: msg-send + # producer_name: {producer_name} msg_key: "{mykey}" msg_value: | { @@ -82,12 +82,13 @@ blocks: subscription_type: consumer_name: - - reader: + - name: reader-block tags: phase: reader statements: - name: s1 optype: msg-read + reader_name: # - websocket-producer: # tags: diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index 00deabf03..927875e45 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -1,13 +1,19 @@ - [1. NoSQLBench (NB) Pulsar Driver Overview](#1-nosqlbench-nb-pulsar-driver-overview) - [1.1. Issues Tracker](#11-issues-tracker) - [1.2. Global Level Pulsar Configuration Settings](#12-global-level-pulsar-configuration-settings) - - [1.3. Pulsar Driver Yaml File: Statement Blocks](#13-pulsar-driver-yaml-file-statement-blocks) - - [1.3.1. Producer Statement block](#131-producer-statement-block) - - [1.3.2. Consumer Statement block](#132-consumer-statement-block) - - [1.4. Schema Support](#14-schema-support) - - [1.5. Activity Parameters](#15-activity-parameters) - - [1.6. Pulsar NB Execution Example](#16-pulsar-nb-execution-example) -- [2. Advanced Driver Features -- TODO: Design Revisit](#2-advanced-driver-features----todo-design-revisit) + - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) + - [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters) + - [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details) + - [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block) + - [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block) + - [1.4.3. Producer Command Block](#143-producer-command-block) + - [1.4.4. Consumer Command Block](#144-consumer-command-block) + - [1.4.5. Reader Command Block](#145-reader-command-block) + - [1.5. Schema Support](#15-schema-support) + - [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters) + - [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example) + - [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) +- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - [2.1. Other Activity Parameters](#21-other-activity-parameters) - [2.2. API Caching](#22-api-caching) - [2.2.1. Instancing Controls](#221-instancing-controls) @@ -17,12 +23,10 @@ This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB). * Producer * Consumer -* (Future) Reader +* Reader * (Future) WebSocket Producer * (Future) Managed Ledger -**NOTE**: At the moment, only Producer workload type is fully supported in NB. The support for Consumer type is partially added but not completed yet; and the support for other types of workloads will be added in NB in future releases. - ## 1.1. Issues Tracker If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar). @@ -36,15 +40,11 @@ When creating these objects (e.g. PulsarClient, Producer), there are different c The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below: ```properties -### NB Pulsar driver related configuration - driver.xxx -driver.client-type = producer - ### Schema related configurations - schema.xxx schema.type = avro schema.definition = file:/// ### Pulsar client related configurations - client.xxx -client.serviceUrl = pulsar://:6650 client.connectionTimeoutMs = 5000 ### Producer related configurations (global) - producer.xxx @@ -54,10 +54,6 @@ producer.sendTimeoutMs = ``` There are multiple sections in this file that correspond to different groups of configuration settings: -* **NB pulsar driver related settings**: - * All settings under this section starts with **driver.** prefix. - * Right now there is only valid option under this section: - * *driver.client-type* determines what type of Pulsar workload to be simulated by NB. * **Schema related settings**: * All settings under this section starts with **schema.** prefix. * The NB Pulsar driver supports schema-based message publishing and consuming. This section defines configuration settings that are schema related. @@ -69,73 +65,283 @@ There are multiple sections in this file that correspond to different groups of * This section defines all configuration settings that are related with defining a PulsarClient object. * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) * **Pulsar Producer related settings**: - * All settings under this section starts with **producer.** prefix. + * All settings under this section starts with **producer** prefix. * This section defines all configuration settings that are related with defining a Pulsar Producer object. * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) +* **Pulsar Consumer related settings**: + * All settings under this section starts with **consumer** prefix. + * This section defines all configuration settings that are related with defining a Pulsar Consumer object. + * See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) +* **Pulsar Reader related settings**: + * All settings under this section starts with **reader** prefix. + * This section defines all configuration settings that are related with defining a Pulsar Reader object. + * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) In the future, when the support for other types of Pulsar workloads is added in NB Pulsar driver, there will be corresponding configuration sections in this file as well. -## 1.3. Pulsar Driver Yaml File: Statement Blocks +## 1.3. NB Pulsar Driver Yaml File - High Level Structure Just like other NB driver types, the actual Pulsar workload generation is determined by the statement blocks in the NB driver Yaml file. Depending on the Pulsar workload type, the corresponding statement block may have different contents. -### 1.3.1. Producer Statement block - -An example of defining Pulsar **Producer** workload is as below: +At high level, Pulsar driver yaml file has the following structure: +* **description**: (optional) general description of the yaml file +* **bindings**: defines NB bindings +* **params**: document level Pulsar driver parameters that apply to all command blocks. Currently there are two valid parameters: + * **topic_url**: Pulsar topic uri ([persistent|non-persistent]:////). This can be statically assigned or dynamically generated via NB bindings. + * **async_api**: Whether to use asynchronous Pulsar API (**note**: more on this later) +* **blocks**: includes a series of command blocks. Each command block defines one major Pulsar operation such as *producer*, *consumer*, etc. Right now, the following command blocks are already supported or will be added in the near future. We'll go through each of these command blocks with more details in later sections. + * (future) **admin-block**: support for Pulsar Admin API, starting with using NB to create tenants and namespaces. + * **batch-producer-block**: Pulsar batch producer + * **producer-block**: Pulsar producer + * **consumer-block**: Pulsar consumer + * **reader-block**: Pulsar reader ```yaml +description: | + ... ... + +bindings: + ... ... + +# global parameters that apply to all Pulsar client types: +params: + topic_uri: "" + async_api: "false" + blocks: -- name: producer-block - tags: - type: producer - statements: - - producer-stuff: - # producer-name: - # topic_uri: "persistent://public/default/{topic}" - topic_uri: "persistent://public/default/nbpulsar" - msg-key: "{mykey}" - msg-value: | - { - "SensorID": "{sensor_id}", - "SensorType": "Temperature", - "ReadingTime": "{reading_time}", - "ReadingValue": {reading_value} - } + - name: + tags: + phase: + statements: + - name: + optype: + ... ... + - name: + ... ... + + - name: + tags: + ... + statements: + ... ``` -In the above statement block, there are 4 key statement parameters to provide values: -* **producer-name**: cycle-level Pulsar producer name (can be dynamically bound) - * **Optional** - * If not set, global level producer name in *config.properties* file will be used. - * Use a default producer name, "default", if it is neither set at global level. - * If set, cycle level producer name will take precedence over the global level setting +Each time when you execute the NB command, you can only choose one command block to execute. This is achieved by applying a filtering condition against **phase** tag, as below: +```bash + driver=pulsar tags=phase: ... +``` -* **topic_uri**: cycle-level Pulsar topic name (can be dynamically bound) - * **Optional** - * If not set, global level topic_uri in *config.properties* file will be used - * Throw a Runtime Error if it is neither set at global level - * If set, cycle level topic_uri will take precedence over the global level setting; and the provided value must follow several guidelines: - * It must be in valid Pulsar topic format as below: - ``` - [persistent|non-persistent]://// - ``` - * At the moment, only "**\**" part can be dynamically bound (e.g. through NB binding function). All other parts must be static values and the corresponding tenants and namespaces must be created in the Pulsar cluster in advance. +An example of executing Pulsar producer/consumer API using NB is like this: +```bash +# producer + driver=pulsar tags=phase:producer ... -**TODO**: allow dynamic binding for "\" and "\" after adding a phase for creating "\" and/or "\", similar to C* CQL schema creation phase! +# consumer + driver=pulsar tags=phase:consumer ... +``` -* **msg-key**: Pulsar message key - * **Optional** - * If not set, the generated Pulsar messages (to be published by the Producer) doesn't have **keys**. +### 1.3.1. NB Cycle Level Parameters vs. Global Level Parameters -* **msg-value**: Pulsar message payload - * **Mandatory** - * If not set, throw a Runtime Error. +Some parameters, especially topic name and producer/consumer/reader/etc. name, can be set at the global level in **config.properties** file, or at NB cycle level via **pulsar.yaml** file. An example of setting a topic name in both levels is as below: -### 1.3.2. Consumer Statement block +```bash +# Global level setting (config.properties): +producer.topicName = ... -**TBD ...** +# Cycle level setting (pulsar.yaml) +params: + topic_uri: ... +``` -## 1.4. Schema Support +In theory, all Pulsar client settings can be made as cycle level settings for maximum flexibility. But practically speaking (and also for simplicity purposes), only the following parameters are made to be configurable at both levels, listed by cycle level setting names with their corresponding global level setting names: +* topic_uri (Mandatory) + * producer.topicName + * consumer.topicNames + * reader.topicName +* topic_names (Optional for Consumer) + * consumer.topicNames +* subscription_name (Mandatory for Consumer) + * consumer.subscriptionName +* subscription_type (Mandatory for Consumer, default to **exclusive** type) + * consumer.subscriptionType +* topics_pattern (Optional for Consumer) + * consumer.topicsPattern +* producer_name (Optional) + * producer.producerName +* consumer_name (Optional) + * consumer.consumerName +* reader_name (Optional) + * reader.readerName + +One key difference between setting a parameter at the global level vs. at the cycle level is that the global level setting is always static and stays the same for all NB cycle execution. The cycle level setting, on the other side, can be dynamically bound and can be different from cycle to cycle. + +Because of this, setting these parameters at the NB cycle level allows us to run Pulsar testing against multiple topics and/or multiple producers/consumers/readers/etc all at once within one NB activity. This makes the testing more flexible and effective. + +**NOTE**, when a configuration is set at both the global level and the cycle level, **the ycle level setting will take priority!** + +## 1.4. Pulsar Driver Yaml File - Command Block Details + +### 1.4.1. Pulsar Admin API Command Block + +**NOTE**: this functionality is only partially implemented at the moment and doesn't function yet. + +Currently, the Pulsar Admin API Block is (planned) to only support creating Pulsar tenants and namespaces. It has the following format: + +```yaml + - name: admin-block + tags: + phase: create-tenant-namespace + statements: + - name: s1 + optype: create-tenant + tenant: "{tenant}" + - name: s2 + optype: create-namespace + namespace: "{namespace}" +``` + +In this command block, there are 2 statements (s1 and s2): +* Statement **s1** is used for creating a Pulsar tenant + * (Mandatory) **optype (create-tenant)** is the statement identifier for this statement + * (Mandatory) **tenant** is the only statement parameter that specifies the Pulsar tenant name which can either be dynamically bound or statically assigned. +* Statement **s2** is used for creating a Pulsar namespace + * (Mandatory) **optype (create-namespace)** is the statement identifier for this statement + * (Mandatory) **namespace** is the only statement parameter that specifies the Pulsar namespace under the tenant created by statement s1. Its name can either be dynamically bound or statically assigned. + +### 1.4.2. Batch Producer Command Block + +Batch producer command block is used to produce a batch of messages all at once by one NB cycle execution. A typical format of this command block is as below: + +```yaml +- name: batch-producer-block + tags: + phase: batch-producer + statements: + - name: s1 + optype: batch-msg-send-start + # For batch producer, "producer_name" should be associated with batch start + batch_producer_name: {batch_producer_name} + ratio: 1 + - name: s2 + optype: batch-msg-send + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } + ratio: 100 + - name: s3 + optype: batch-msg-send-end + ratio: 1 +``` + +This command block has 3 statements (s1, s2, and s3) with the following ratios: 1, , 1. +* Statement **s1** is used to mark the start of a batch of message production within one NB cycle + * (Mandatory) **optype (batch-msg-send-start)** is the statement identifier for this statement + * (Optional) **batch_producer_name**, when provided, specifies the Pulsar producer name that is associated with the batch production of the messages. + * (Optional) **ratio**, when provided, MUST be 1. If not provided, it is default to 1. +* Statement **s2** is the core statement that generates the message key and payload to be put in the batch. + * (Mandatory) **optype (batch-msg-send)** is the statement identifier for this statement + * (Optional) **msg-key**, when provided, specifies the key of the generated message + * (Mandatory) **msg-payload** specifies the payload of the generated message + * (Optional) **ratio**, when provided, specifies the batch size (how many messages to be put in one batch). If not provided, it is default to 1. +* Statement **s3** is used to mark the end of a batch within one NB cycle + * (Mandatory) **optype (batch-msg-send-end)** is the statement identifier for this statement + * (Optional) **ratio**, when provided, MUST be 1. If not provided, it is default to 1. + +### 1.4.3. Producer Command Block + +This is the regular Pulsar producer command block that produces one Pulsar message per NB cycle execution. A typical format of this command block is as below: + +```yaml + - name: producer-block + tags: + phase: producer + statements: + - name: s1 + optype: msg-send + # producer_name: {producer_name} + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } +``` + +This command block only has 1 statements (s1): +* Statement **s1** is used to generate the key and payload for one message + * (Mandatory) **optype (msg-send)** is the statement identifier for this statement + * (Optional) **producer_name**, when provided, specifies the Pulsar producer name that is associated with the message production. + * (Optional) **msg-key**, when provided, specifies the key of the generated message + * (Mandatory) **msg-payload** specifies the payload of the generated message + +### 1.4.4. Consumer Command Block + +This is the regular Pulsar consumer command block that consumes one Pulsar message per NB cycle execution. A typical format of this command block is as below: + +```yaml + - name: consumer-block + tags: + phase: consumer + statements: + - name: s1 + optype: msg-consume + topic_names: ", " + # topics_pattern: "" + subscription_name: + subscription_type: + consumer_name: +``` + +This command block only has 1 statements (s1): +* Statement **s1** is used to consume one message from the Pulsar cluster and acknowledge it. + * (Mandatory) **optype (msg-consume)** is the statement identifier for this statement + * (Optional) **topic_names**, when provided, specifies multiple topic names from which to consume messages. Default to document level parameter **topic_uri**. + * (Optional) **topics_pattern**, when provided, specifies pulsar topic regex pattern for multi-topic message consumption + * (Mandatory) **subscription_name** specifies subscription name. + * (Optional) **subscription_type**, when provided, specifies subscription type. Default to **exclusive** subscription type. + * (Optional) **consumer_name**, when provided, specifies the associated consumer name. + +### 1.4.5. Reader Command Block + +This is the regular Pulsar reader command block that reads one Pulsar message per NB cycle execution. A typical format of this command block is as below: + +```yaml + - name: reader-block + tags: + phase: reader + statements: + - name: s1 + optype: msg-read + reader_name: +``` + +This command block only has 1 statements (s1): +* Statement **s1** is used to consume one message from the Pulsar cluster and acknowledge it. + * (Mandatory) **optype (msg-consume)** is the statement identifier for this statement + * (Optional) **reader_name**, when provided, specifies the associated consumer name. + +**TBD**: at the moment, the NB Pulsar driver Reader API only supports reading from the following positions: +* MessageId.earliest +* MessageId.latest (default) + +A customized reader starting position, as below, is NOT supported yet! +```java +byte[] msgIdBytes = // Some message ID byte array +MessageId id = MessageId.fromByteArray(msgIdBytes); +Reader reader = pulsarClient.newReader() + .topic(topic) + .startMessageId(id) + .create(); +``` + +## 1.5. Schema Support Pulsar has built-in schema support. Other than primitive types, Pulsar also supports complex types like **Avro**, etc. At the moment, the NB Pulsar driver provides 2 schema support modes, via the global level schema related settings as below: * Avro schema: @@ -149,7 +355,7 @@ Pulsar has built-in schema support. Other than primitive types, Pulsar also supp schema.definition: ``` -For the previous Producer block statement example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition (e.g. as in the sample schema definition file: **[iot-example.asvc](activities/iot-example.avsc)**) +Take the previous Producer command block as an example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition (e.g. as in the sample schema definition file: **[iot-example.asvc](activities/iot-example.avsc)**) ```json { "type": "record", @@ -164,39 +370,91 @@ For the previous Producer block statement example, the **msg-value** parameter h } ``` -## 1.5. Activity Parameters +## 1.6. NB Activity Execution Parameters -At the moment, the following Pulsar driver specific Activity Parameter is supported: +At the moment, the following NB Pulsar driver **specific** activity parameters are supported: -- * config= +* service_url= +* config= -## 1.6. Pulsar NB Execution Example +Some other common NB activity parameters are listed as below. Please reference to NB documentation for more parameters -``` - run type=pulsar -vv cycles=10 config=/config.properties yaml=/pulsar.yaml +* driver=pulsar +* seq=concat (needed for **batch** producer) +* tags=phase: +* threads= +* cycles= +* --report-csv-to + +## 1.7. NB Pulsar Driver Execution Example + +1. Run Pulsar producer API to produce 100K messages using 100 NB threads +```bash + run driver=pulsar tags=phase:producer threads=100 cycles=100K config=/config.properties yaml=/pulsar.yaml ``` -**NOTE**: +2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads; put NB execution metrics in a specified metrics folder + +```bash + run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M config=/config.properties yaml=/pulsar.yaml --report-csv-to +``` + +3. Run Pulsar consumer API to consume (and acknowledge) 100 messages using one single NB thread. +```bash + run driver=pulsar tags=phase:consumer cycles=100 config=/config.properties yaml=/pulsar.yaml +``` + + +## 1.8. Appendix A. Template Global Setting File (config.properties) +```properties +schema.type = +schema.definition = + + +### Pulsar client related configurations - client.xxx +client.connectionTimeoutMs = + + +### Producer related configurations (global) - producer.xxx +producer.producerName = +producer.topicName = +producer.sendTimeoutMs = + + +### Consumer related configurations (global) - consumer.xxx +consumer.topicNames = +consumer.topicsPattern = +consumer.subscriptionName = +consumer.subscriptionType = +consumer.consumerName = +consumer.receiverQueueSize = + + +### Reader related configurations (global) - reader.xxx +reader.topicName = +reader.receiverQueueSize = +reader.readerName = +reader.startMessagePos = +``` -* An example of **config.properties** file is [here](activities/config.properties) -* An example of **pulsar.yaml** file is [here](activities/pulsar.yaml) --- -# 2. Advanced Driver Features -- TODO: Design Revisit +# 2. TODO : Design Revisit -- Advanced Driver Features **NOTE**: The following text is based on the original multi-layer API caching design which is not fully implemented at the moment. We need to revisit the original design at some point in order to achieve maximum testing flexibility. To summarize, the original caching design has the following key requirements: * **Requirement 1**: Each NB Pulsar activity is able to launch and cache multiple **client spaces** * **Requirement 2**: Each client space can launch and cache multiple Pulsar operators of the same type (producer, consumer, etc.) +* **Requirement 3**: The size of each Pulsar operator specific cached space can be configurable. -In the current implementation, only requirement 2 is implemented. Regarding requirement 1, the current implementation only supports one client space per NB Pulsar activity! +In the current implementation, only requirement 2 is implemented. +* For requirement 1, the current implementation only supports one client space per NB Pulsar activity +* For requirement 3, the cache space size is not configurable (no limit at the moment) ## 2.1. Other Activity Parameters -- **url** - The pulsar url to connect to. - - **default** - `url=pulsar://localhost:6650` - **maxcached** - A default value to be applied to `max_clients`, `max_producers`, `max_consumers`. - default: `max_cached=100` @@ -204,13 +462,8 @@ In the current implementation, only requirement 2 is implemented. Regarding requ instances which are allowed to be cached in the NoSQLBench client runtime. The clients cache automatically maintains a cache of unique client instances internally. default: _maxcached_ - -- **max_producers** - Producers cache size (per client instance). Limits - the number of producer instances which are allowed to be cached per - client instance. default: _maxcached_ -- **max_consumers** - Consumers cache size (per client instance). Limits - the number of consumer instances which are allowed to be cached per - client instance. +- **max_operators** - Producers/Consumers/Readers cache size (per client instance). Limits + the number of instances which are allowed to be cached per client instance. default: _maxcached_ ## 2.2. API Caching diff --git a/driver-pulsar/src/main/resources/topics.md b/driver-pulsar/src/main/resources/topics.md deleted file mode 100644 index 55a67f02a..000000000 --- a/driver-pulsar/src/main/resources/topics.md +++ /dev/null @@ -1,3 +0,0 @@ -# pulsar help topics - -- pulsar From 3ad3cc6b7016dd9b2f9187ba3cd859fa3f224e58 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Wed, 10 Mar 2021 12:29:28 -0600 Subject: [PATCH 3/4] Minor document update --- driver-pulsar/src/main/resources/pulsar.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index 927875e45..3a51d1b92 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -12,7 +12,7 @@ - [1.5. Schema Support](#15-schema-support) - [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters) - [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example) - - [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) + - [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) - [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - [2.1. Other Activity Parameters](#21-other-activity-parameters) - [2.2. API Caching](#22-api-caching) @@ -177,7 +177,7 @@ One key difference between setting a parameter at the global level vs. at the cy Because of this, setting these parameters at the NB cycle level allows us to run Pulsar testing against multiple topics and/or multiple producers/consumers/readers/etc all at once within one NB activity. This makes the testing more flexible and effective. -**NOTE**, when a configuration is set at both the global level and the cycle level, **the ycle level setting will take priority!** +**NOTE**: when a configuration is set at both the global level and the cycle level, **the ycle level setting will take priority!** ## 1.4. Pulsar Driver Yaml File - Command Block Details @@ -355,7 +355,7 @@ Pulsar has built-in schema support. Other than primitive types, Pulsar also supp schema.definition: ``` -Take the previous Producer command block as an example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition (e.g. as in the sample schema definition file: **[iot-example.asvc](activities/iot-example.avsc)**) +Take the previous Producer command block as an example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition: ```json { "type": "record", @@ -405,7 +405,7 @@ Some other common NB activity parameters are listed as below. Please reference t ``` -## 1.8. Appendix A. Template Global Setting File (config.properties) +## Appendix A. Template Global Setting File (config.properties) ```properties schema.type = schema.definition = From f1a15230e8ef61d8a451f72b49c404f978764bcf Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Wed, 10 Mar 2021 12:34:02 -0600 Subject: [PATCH 4/4] Minor code change - change method name from IsBoolean() to isBoolean() --- .../java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 44d87ff13..270238f53 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -9,7 +9,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.Schema; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -55,7 +54,7 @@ public class ReadyPulsarOp implements LongFunction { return opFunc.apply(value); } - private boolean IsBoolean (String str) { + private boolean isBoolean(String str) { return StringUtils.equalsAnyIgnoreCase(str, "yes", "true"); } @@ -103,7 +102,7 @@ public class ReadyPulsarOp implements LongFunction { LongFunction asyncApiFunc; if ( cmdTpl.containsKey("async_api") ) { if ( cmdTpl.isStatic("async_api") ) - asyncApiFunc = (l) -> IsBoolean(cmdTpl.getStatic("async_api")); + asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api")); else throw new RuntimeException("\"async_api\" parameter cannot be dynamic!"); }