diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index cfef64528..466b79f3a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -3,17 +3,19 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityimpl.ActivityDef; -import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.metrics.ActivityMetrics; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.function.LongFunction; import java.util.function.Supplier; public class PulsarActivity extends SimpleActivity implements ActivityDefObserver { @@ -24,14 +26,13 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer executeTimer; private PulsarSpaceCache pulsarCache; - private NBErrorHandler errorhandler; - private PulsarNBClientConf clientConf; + private String serviceUrl; - private OpSequence> sequencer; - // private PulsarClient activityClient; + private NBErrorHandler errorhandler; + private OpSequence> sequencer; - private Supplier clientSupplier; + // private Supplier clientSupplier; // private ThreadLocal> tlClientSupplier; public PulsarActivity(ActivityDef activityDef) { @@ -48,6 +49,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); clientConf = new PulsarNBClientConf(pulsarClntConfFile); + serviceUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); + pulsarCache = new PulsarSpaceCache(this); this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache)); @@ -65,7 +68,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve super.onActivityDefUpdate(activityDef); } - public OpSequence> getSequencer() { + public OpSequence> getSequencer() { return sequencer; } @@ -73,6 +76,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve return clientConf; } + public String getPulsarServiceUrl() { + return serviceUrl; + } + public Timer getBindTimer() { return bindTimer; } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java deleted file mode 100644 index 9b7cea7c8..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java +++ /dev/null @@ -1,196 +0,0 @@ -package io.nosqlbench.driver.pulsar; - -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; - -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; - -public class PulsarConsumerSpace extends PulsarSpace { - - private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); - - public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) { - super(name, pulsarClientConf); - } - - private String getEffectiveTopicNamesStr(String cycleTopicNames) { - if (!StringUtils.isBlank(cycleTopicNames)) { - return cycleTopicNames; - } - - String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); - if (!StringUtils.isBlank(globalTopicNames)) { - return globalTopicNames; - } - - return ""; - } - - private List getEffectiveTopicNames(String cycleTopicNames) { - String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); - - String[] names = effectiveTopicNamesStr.split("[;,]"); - ArrayList effectiveTopicNameList = new ArrayList<>(); - - for (String name : names) { - if (!StringUtils.isBlank(name)) - effectiveTopicNameList.add(name.trim()); - } - - - return effectiveTopicNameList; - } - - private String getEffectiveTopicPatternStr(String cycleTopicsPattern) { - if (!StringUtils.isBlank(cycleTopicsPattern)) { - return cycleTopicsPattern; - } - - String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); - if (!StringUtils.isBlank(globalTopicsPattern)) { - return globalTopicsPattern; - } - - return ""; - } - - private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) { - String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); - Pattern topicsPattern; - try { - if (!StringUtils.isBlank(effectiveTopicsPatternStr)) - topicsPattern = Pattern.compile(effectiveTopicsPatternStr); - else - topicsPattern = null; - } catch (PatternSyntaxException pse) { - topicsPattern = null; - } - return topicsPattern; - } - - private String getEffectiveSubscriptionName(String cycleSubscriptionName) { - if (!StringUtils.isBlank(cycleSubscriptionName)) { - return cycleSubscriptionName; - } - - String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName(); - if (!StringUtils.isBlank(globalSubscriptionName)) { - return globalSubscriptionName; - } - - return "default-subs"; - } - - private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) { - if (!StringUtils.isBlank(cycleSubscriptionType)) { - return cycleSubscriptionType; - } - - String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType(); - if (!StringUtils.isBlank(globalSubscriptionType)) { - return globalSubscriptionType; - } - - return ""; - } - - private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) { - String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType); - SubscriptionType subscriptionType; - - try { - subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr); - } catch (IllegalArgumentException iae) { - subscriptionType = SubscriptionType.Exclusive; - } - - return subscriptionType; - } - - private String getEffectiveConsumerName(String cycleConsumerName) { - if (!StringUtils.isBlank(cycleConsumerName)) { - return cycleConsumerName; - } - - String globalConsumerName = pulsarNBClientConf.getConsumerName(); - if (!StringUtils.isBlank(globalConsumerName)) { - return globalConsumerName; - } - - return "default-cons"; - } - - public Consumer getConsumer(String cycleTopicNames, - String cycleTopicsPattern, - String cycleSubscriptionName, - String cycleSubscriptionType, - String cycleConsumerName) { - - String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); - List topicNames = getEffectiveTopicNames(cycleTopicNames); - String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); - Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern); - String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); - SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); - String consumerName = getEffectiveConsumerName(cycleConsumerName); - - if (topicNames.isEmpty() && (topicsPattern == null)) { - throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!"); - } - - String encodedStr; - if (!topicNames.isEmpty()) { - encodedStr = PulsarActivityUtil.encode( - consumerName, - subscriptionName, - StringUtils.join(topicNames, "|")); - } else { - encodedStr = PulsarActivityUtil.encode( - consumerName, - subscriptionName, - topicsPatternStr); - } - Consumer consumer = consumers.get(encodedStr); - - if (consumer == null) { - PulsarClient pulsarClient = getPulsarClient(); - - // Get other possible producer settings that are set at global level - Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); - - // Explicit topic names will take precedence over topics pattern - if (!topicNames.isEmpty()) { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.toString(), topicNames); - } else { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); - consumerConf.put( - PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, - getEffectiveTopicPattern(cycleTopicsPattern)); - } - - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName); - - try { - consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe(); - } catch (PulsarClientException ple) { - ple.printStackTrace(); - throw new RuntimeException("Unable to create a Pulsar consumer!"); - } - - consumers.put(encodedStr, consumer); - } - - return consumer; - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java deleted file mode 100644 index 460ad523b..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java +++ /dev/null @@ -1,80 +0,0 @@ -package io.nosqlbench.driver.pulsar; - -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; -import org.apache.commons.lang.StringUtils; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class PulsarProducerSpace extends PulsarSpace { - - private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); - - public PulsarProducerSpace(String name, PulsarNBClientConf pulsarClientConf) { - super(name, pulsarClientConf); - } - - // Producer name is NOT mandatory - // - It can be set at either global level or cycle level - // - If set at both levels, cycle level setting takes precedence - private String getEffectiveProducerName(String cycleProducerName) { - if (!StringUtils.isBlank(cycleProducerName)) { - return cycleProducerName; - } - - String globalProducerName = pulsarNBClientConf.getProducerName(); - if (!StringUtils.isBlank(globalProducerName)) { - return globalProducerName; - } - - // Default Producer name when it is not set at either cycle or global level - return "default-prod"; - } - - // Topic name IS mandatory - // - It must be set at either global level or cycle level - // - If set at both levels, cycle level setting takes precedence - private String getEffectiveTopicName(String cycleTopicName) { - if (!StringUtils.isBlank(cycleTopicName)) { - return cycleTopicName; - } - - String globalTopicName = pulsarNBClientConf.getProducerTopicName(); - if (!StringUtils.isBlank(globalTopicName)) { - throw new RuntimeException("Topic name must be set at either global level or cycle level!"); - } - - return globalTopicName; - } - - public Producer getProducer(String cycleProducerName, String cycleTopicName) { - String producerName = getEffectiveProducerName(cycleProducerName); - String topicName = getEffectiveTopicName(cycleTopicName); - - String encodedStr = PulsarActivityUtil.encode(cycleProducerName, cycleTopicName); - Producer producer = producers.get(encodedStr); - - if (producer == null) { - PulsarClient pulsarClient = getPulsarClient(); - - // Get other possible producer settings that are set at global level - Map producerConf = pulsarNBClientConf.getProducerConfMap(); - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName); - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName); - - try { - producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create(); - } catch (PulsarClientException ple) { - throw new RuntimeException("Unable to create a Pulsar producer!"); - } - - producers.put(encodedStr, producer); - } - - return producer; - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java deleted file mode 100644 index 4e7550437..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java +++ /dev/null @@ -1,109 +0,0 @@ -package io.nosqlbench.driver.pulsar; - -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; -import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.*; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class PulsarReaderSpace extends PulsarSpace { - - private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); - - public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) { - super(name, pulsarClientConf); - } - - private String getEffectiveReaderTopicName(String cycleReaderTopicName) { - if (!StringUtils.isBlank(cycleReaderTopicName)) { - return cycleReaderTopicName; - } - - String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName(); - if (!StringUtils.isBlank(globalReaderTopicName)) { - return globalReaderTopicName; - } - - return ""; - } - - private String getEffectiveReaderName(String cycleReaderName) { - if (!StringUtils.isBlank(cycleReaderName)) { - return cycleReaderName; - } - - String globalReaderName = pulsarNBClientConf.getConsumerName(); - if (!StringUtils.isBlank(globalReaderName)) { - return globalReaderName; - } - - return "default-read"; - } - - private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) { - if (!StringUtils.isBlank(cycleStartMsgPosStr)) { - return cycleStartMsgPosStr; - } - - String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr(); - if (!StringUtils.isBlank(globalStartMsgPosStr)) { - return globalStartMsgPosStr; - } - - return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label; - } - - public Reader getReader(String cycleTopicName, - String cycleReaderName, - String cycleStartMsgPos) { - - String topicName = getEffectiveReaderTopicName(cycleTopicName); - String readerName = getEffectiveReaderName(cycleReaderName); - String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos); - - if (StringUtils.isBlank(topicName)) { - throw new RuntimeException("Must specify a \"topicName\" for a reader!"); - } - - String encodedStr = PulsarActivityUtil.encode(cycleTopicName, cycleReaderName, cycleStartMsgPos); - Reader reader = readers.get(encodedStr); - - if (reader == null) { - PulsarClient pulsarClient = getPulsarClient(); - - Map readerConf = pulsarNBClientConf.getReaderConfMap(); - readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName); - readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName); - // "reader.startMessagePos" is NOT a standard Pulsar reader conf - readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label); - - try { - ReaderBuilder readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf); - - MessageId startMsgId = MessageId.latest; - if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) { - startMsgId = MessageId.earliest; - } - //TODO: custom start message position is NOT supported yet - //else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) { - // startMsgId = MessageId.latest; - //} - - if (startMsgId != null) { - readerBuilder = readerBuilder.startMessageId(startMsgId); - } - - reader = readerBuilder.create(); - } catch (PulsarClientException ple) { - ple.printStackTrace(); - throw new RuntimeException("Unable to create a Pulsar reader!"); - } - - readers.put(encodedStr, reader); - } - - return reader; - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index 483b69913..ee58f6848 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -2,12 +2,20 @@ package io.nosqlbench.driver.pulsar; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.impl.BatchMessageContainerBase; +import org.apache.pulsar.client.impl.DefaultBatcherBuilder; +import org.apache.pulsar.client.impl.ProducerImpl; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; /** * An instance of a pulsar client, along with all the cached objects which are normally @@ -17,17 +25,22 @@ import java.util.concurrent.ConcurrentHashMap; public class PulsarSpace { private final static Logger logger = LogManager.getLogger(PulsarSpace.class); - // TODO: add support for other client types: consumer, reader, websocket-producer, managed-ledger, etc. - protected final String name; + private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); + + protected final String spaceName; protected final PulsarNBClientConf pulsarNBClientConf; + protected final String pulsarSvcUrl; protected PulsarClient pulsarClient = null; protected Schema pulsarSchema = null; - public PulsarSpace(String name, PulsarNBClientConf pulsarClientConf) { - this.name = name; + public PulsarSpace(String name, PulsarNBClientConf pulsarClientConf, String pulsarSvcUrl) { + this.spaceName = name; this.pulsarNBClientConf = pulsarClientConf; + this.pulsarSvcUrl = pulsarSvcUrl; createPulsarClientFromConf(); createPulsarSchemaFromConf(); @@ -36,14 +49,15 @@ public class PulsarSpace { protected void createPulsarClientFromConf() { ClientBuilder clientBuilder = PulsarClient.builder(); - String dftSvcUrl = "pulsar://localhost:6650"; - if (!pulsarNBClientConf.hasClientConfKey(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString())) { - pulsarNBClientConf.setClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString(), dftSvcUrl); - } - try { Map clientConf = pulsarNBClientConf.getClientConfMap(); - pulsarClient = clientBuilder.loadConf(clientConf).build(); + // Override "client.serviceUrl" setting in config.properties + clientConf.remove("serviceUrl", pulsarSvcUrl); + + pulsarClient = clientBuilder + .loadConf(clientConf) + .serviceUrl(pulsarSvcUrl) + .build(); } catch (PulsarClientException pce) { logger.error("Fail to create PulsarClient from global configuration!"); throw new RuntimeException("Fail to create PulsarClient from global configuration!"); @@ -66,9 +80,391 @@ public class PulsarSpace { } } - public PulsarClient getPulsarClient() { return pulsarClient; } + public PulsarClient getPulsarClient() { + return pulsarClient; + } + public PulsarNBClientConf getPulsarClientConf() { return pulsarNBClientConf; } - public Schema getPulsarSchema() { return pulsarSchema; } + + public Schema getPulsarSchema() { + return pulsarSchema; + } + + + ////////////////////////////////////// + // Producer Processing --> start + ////////////////////////////////////// + // Topic name IS mandatory + // - It must be set at either global level or cycle level + // - If set at both levels, cycle level setting takes precedence + private String getEffectiveProducerTopicName(String cycleTopicName) { + if (!StringUtils.isBlank(cycleTopicName)) { + return cycleTopicName; + } + + String globalTopicName = pulsarNBClientConf.getProducerTopicName(); + if (!StringUtils.isBlank(globalTopicName)) { + return globalTopicName; + } + + throw new RuntimeException(" topic name must be set at either global level or cycle level!"); + } + + // Producer name is NOT mandatory + // - It can be set at either global level or cycle level + // - If set at both levels, cycle level setting takes precedence + private String getEffectiveProducerName(String cycleProducerName) { + if (!StringUtils.isBlank(cycleProducerName)) { + return cycleProducerName; + } + + String globalProducerName = pulsarNBClientConf.getProducerName(); + if (!StringUtils.isBlank(globalProducerName)) { + return globalProducerName; + } + + return ""; + } + + public Producer getProducer(String cycleTopicName, String cycleProducerName) { + String topicName = getEffectiveProducerTopicName(cycleTopicName); + String producerName = getEffectiveProducerName(cycleProducerName); + + if (StringUtils.isBlank(topicName)) { + throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level"); + } + + String encodedStr = PulsarActivityUtil.encode(producerName, topicName); + Producer producer = producers.get(encodedStr); + + if (producer == null) { + PulsarClient pulsarClient = getPulsarClient(); + + // Get other possible producer settings that are set at global level + Map producerConf = pulsarNBClientConf.getProducerConfMap(); + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName); + if (!StringUtils.isBlank(producerName)) { + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName); + } + + try { + producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create(); + } catch (PulsarClientException ple) { + throw new RuntimeException("Unable to create a Pulsar producer!"); + } + + producers.put(encodedStr, producer); + } + + return producer; + } + ////////////////////////////////////// + // Producer Processing <-- end + ////////////////////////////////////// + + + ////////////////////////////////////// + // Consumer Processing --> start + ////////////////////////////////////// + private String getEffectiveTopicNamesStr(String cycleTopicNames) { + if (!StringUtils.isBlank(cycleTopicNames)) { + return cycleTopicNames; + } + + String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); + if (!StringUtils.isBlank(globalTopicNames)) { + return globalTopicNames; + } + + return ""; + } + + private List getEffectiveTopicNames(String cycleTopicNames) { + String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); + + String[] names = effectiveTopicNamesStr.split("[;,]"); + ArrayList effectiveTopicNameList = new ArrayList<>(); + + for (String name : names) { + if (!StringUtils.isBlank(name)) + effectiveTopicNameList.add(name.trim()); + } + + + return effectiveTopicNameList; + } + + private String getEffectiveTopicPatternStr(String cycleTopicsPattern) { + if (!StringUtils.isBlank(cycleTopicsPattern)) { + return cycleTopicsPattern; + } + + String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); + if (!StringUtils.isBlank(globalTopicsPattern)) { + return globalTopicsPattern; + } + + return ""; + } + + private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) { + String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern; + try { + if (!StringUtils.isBlank(effectiveTopicsPatternStr)) + topicsPattern = Pattern.compile(effectiveTopicsPatternStr); + else + topicsPattern = null; + } catch (PatternSyntaxException pse) { + topicsPattern = null; + } + return topicsPattern; + } + + private String getEffectiveSubscriptionName(String cycleSubscriptionName) { + if (!StringUtils.isBlank(cycleSubscriptionName)) { + return cycleSubscriptionName; + } + + String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName(); + if (!StringUtils.isBlank(globalSubscriptionName)) { + return globalSubscriptionName; + } + + throw new RuntimeException("Consumer::Subscription name must be set at either global level or cycle level!"); + } + + private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) { + if (!StringUtils.isBlank(cycleSubscriptionType)) { + return cycleSubscriptionType; + } + + String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType(); + if (!StringUtils.isBlank(globalSubscriptionType)) { + return globalSubscriptionType; + } + + return ""; + } + + private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) { + String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType); + SubscriptionType subscriptionType = SubscriptionType.Exclusive; + + if (!StringUtils.isBlank(effectiveSubscriptionStr)) { + if (!PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionStr)) { + throw new RuntimeException("Consumer::Invalid subscription type: " + effectiveSubscriptionStr); + } else { + subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr); + } + } + + return subscriptionType; + } + + private String getEffectiveConsumerName(String cycleConsumerName) { + if (!StringUtils.isBlank(cycleConsumerName)) { + return cycleConsumerName; + } + + String globalConsumerName = pulsarNBClientConf.getConsumerName(); + if (!StringUtils.isBlank(globalConsumerName)) { + return globalConsumerName; + } + + return ""; + } + + public Consumer getConsumer(String cycleTopicUri, + String cycleTopicNames, + String cycleTopicsPattern, + String cycleSubscriptionName, + String cycleSubscriptionType, + String cycleConsumerName) { + + List topicNames = getEffectiveTopicNames(cycleTopicNames); + String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern); + String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); + SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); + String consumerName = getEffectiveConsumerName(cycleConsumerName); + + if (StringUtils.isBlank(cycleTopicUri) && topicNames.isEmpty() && (topicsPattern == null)) { + throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!"); + } + + String encodedStr; + // precedence sequence: + // topic_names (consumer statement param) > + // topics_pattern (consumer statement param) > + // topic_uri (document level param) + if (!topicNames.isEmpty()) { + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + StringUtils.join(topicNames, "|")); + } else if (topicsPattern != null) { + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + topicsPatternStr); + } else { + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + cycleTopicUri); + } + + Consumer consumer = consumers.get(encodedStr); + + if (consumer == null) { + PulsarClient pulsarClient = getPulsarClient(); + + // Get other possible producer settings that are set at global level + Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); + + // Explicit topic names will take precedence over topics pattern + if (!topicNames.isEmpty()) { + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames); + } else if (topicsPattern != null) { + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); + consumerConf.put( + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, + getEffectiveTopicPattern(cycleTopicsPattern)); + } else { + topicNames.add(cycleTopicUri); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames); + } + + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType); + if (!StringUtils.isBlank(consumerName)) { + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName); + } + + try { + consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe(); + } catch (PulsarClientException ple) { + ple.printStackTrace(); + throw new RuntimeException("Unable to create a Pulsar consumer!"); + } + + consumers.put(encodedStr, consumer); + } + + return consumer; + } + ////////////////////////////////////// + // Consumer Processing <-- end + ////////////////////////////////////// + + + ////////////////////////////////////// + // Reader Processing --> Start + ////////////////////////////////////// + private String getEffectiveReaderTopicName(String cycleReaderTopicName) { + if (!StringUtils.isBlank(cycleReaderTopicName)) { + return cycleReaderTopicName; + } + + String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName(); + if (!StringUtils.isBlank(globalReaderTopicName)) { + return globalReaderTopicName; + } + + throw new RuntimeException("Reader topic name must be set at either global level or cycle level!"); + } + + private String getEffectiveReaderName(String cycleReaderName) { + if (!StringUtils.isBlank(cycleReaderName)) { + return cycleReaderName; + } + + String globalReaderName = pulsarNBClientConf.getConsumerName(); + if (!StringUtils.isBlank(globalReaderName)) { + return globalReaderName; + } + + return ""; + } + + private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) { + if (!StringUtils.isBlank(cycleStartMsgPosStr)) { + return cycleStartMsgPosStr; + } + + String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr(); + if (!StringUtils.isBlank(globalStartMsgPosStr)) { + return globalStartMsgPosStr; + } + + return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label; + } + + public Reader getReader(String cycleTopicName, + String cycleReaderName, + String cycleStartMsgPos) { + + String topicName = getEffectiveReaderTopicName(cycleTopicName); + if (StringUtils.isBlank(topicName)) { + throw new RuntimeException("Reader:: must specify a topic name either at the global level or the cycle level"); + } + + String readerName = getEffectiveReaderName(cycleReaderName); + + String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos); + if (!PulsarActivityUtil.isValideReaderStartPosition(startMsgPosStr)) { + throw new RuntimeException("Reader:: Invalid value for Reader start message position!"); + } + + String encodedStr = PulsarActivityUtil.encode(topicName, readerName, startMsgPosStr); + Reader reader = readers.get(encodedStr); + + if (reader == null) { + PulsarClient pulsarClient = getPulsarClient(); + + Map readerConf = pulsarNBClientConf.getReaderConfMap(); + readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName); + + if (!StringUtils.isBlank(readerName)) { + readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName); + } + + // "reader.startMessagePos" is NOT a standard Pulsar reader conf + readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label); + + try { + ReaderBuilder readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf); + + MessageId startMsgId = MessageId.latest; + if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) { + startMsgId = MessageId.earliest; + } + //TODO: custom start message position is NOT supported yet + //else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) { + // startMsgId = MessageId.latest; + //} + + if (startMsgId != null) { + readerBuilder = readerBuilder.startMessageId(startMsgId); + } + + reader = readerBuilder.create(); + } catch (PulsarClientException ple) { + ple.printStackTrace(); + throw new RuntimeException("Unable to create a Pulsar reader!"); + } + + readers.put(encodedStr, reader); + } + + return reader; + } + ////////////////////////////////////// + // Reader Processing <-- end + ////////////////////////////////////// } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java index 54533caee..598c5e110 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.commons.lang3.StringUtils; import java.util.concurrent.ConcurrentHashMap; @@ -23,20 +24,8 @@ public class PulsarSpaceCache { } public PulsarSpace getPulsarSpace(String name) { - - String pulsarClientType = activity.getPulsarConf().getPulsarClientType(); - - if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString())) { - return clientScopes.computeIfAbsent(name, spaceName -> new PulsarProducerSpace(spaceName, activity.getPulsarConf())); - } else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString())) { - return clientScopes.computeIfAbsent(name, spaceName -> new PulsarConsumerSpace(spaceName, activity.getPulsarConf())); - } else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString())) { - return clientScopes.computeIfAbsent(name, spaceName -> new PulsarReaderSpace(spaceName, activity.getPulsarConf())); - } - // TODO: add support for websocket-producer and managed-ledger - else { - throw new RuntimeException("Unsupported Pulsar client type: " + pulsarClientType); - } + return clientScopes.computeIfAbsent(name, spaceName -> + new PulsarSpace(spaceName, activity.getPulsarConf(), activity.getPulsarServiceUrl())); } public PulsarActivity getActivity() { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java new file mode 100644 index 000000000..c690914a1 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java @@ -0,0 +1,17 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; + +public class PulsarBatchProducerEndMapper extends PulsarOpMapper { + + public PulsarBatchProducerEndMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace) { + super(cmdTpl, clientSpace); + } + + @Override + public PulsarOp apply(long value) { + return new PulsarBatchProducerEndOp(); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java new file mode 100644 index 000000000..c9061e4f5 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java @@ -0,0 +1,36 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.nb.api.errors.BasicError; +import org.apache.pulsar.client.api.BatchMessageContainer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.impl.BatchMessageContainerBase; +import org.apache.pulsar.client.impl.DefaultBatcherBuilder; +import org.apache.pulsar.common.util.FutureUtil; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class PulsarBatchProducerEndOp implements PulsarOp { + @Override + public void run() { + List> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get(); + Producer producer = PulsarBatchProducerStartOp.threadLocalProducer.get(); + + if ((container != null) && (!container.isEmpty())) { + try { + // producer.flushAsync().get(); + FutureUtil.waitForAll(container).get(); + } catch (Exception e) { + throw new RuntimeException("Batch Producer:: failed to send (some of) the batched messages!"); + } + + container.clear(); + PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.set(null); + } else { + throw new BasicError("You tried to end an empty batch message container. This means you" + + " did initiate the batch container properly, or there is an error in your" + + " pulsar op sequencing and ratios."); + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java new file mode 100644 index 000000000..301fd9377 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java @@ -0,0 +1,33 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.Producer; + +import java.util.function.LongFunction; + +public class PulsarBatchProducerMapper extends PulsarOpMapper { + private final LongFunction keyFunc; + private final LongFunction payloadFunc; + + public PulsarBatchProducerMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction keyFunc, + LongFunction payloadFunc) { + super(cmdTpl, clientSpace); + this.keyFunc = keyFunc; + this.payloadFunc = payloadFunc; + } + + @Override + public PulsarOp apply(long value) { + String msgKey = keyFunc.apply(value); + String msgPayload = payloadFunc.apply(value); + + return new PulsarBatchProducerOp( + clientSpace.getPulsarSchema(), + msgKey, + msgPayload + ); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java new file mode 100644 index 000000000..e16a9bfa3 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java @@ -0,0 +1,61 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.util.AvroUtil; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class PulsarBatchProducerOp implements PulsarOp { + + private final Schema pulsarSchema; + private final String msgKey; + private final String msgPayload; + + public PulsarBatchProducerOp(Schema schema, + String key, + String payload) { + this.pulsarSchema = schema; + this.msgKey = key; + this.msgPayload = payload; + } + + + @Override + public void run() { + if ((msgPayload == null) || msgPayload.isEmpty()) { + throw new RuntimeException("Message payload (\"msg-value\") can't be empty!"); + } + + List> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get(); + Producer producer = PulsarBatchProducerStartOp.threadLocalProducer.get(); + assert (producer != null) && (container != null); + + TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema); + if ((msgKey != null) && (!msgKey.isEmpty())) { + typedMessageBuilder = typedMessageBuilder.key(msgKey); + } + + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( + (GenericAvroSchema) pulsarSchema, + pulsarSchema.getSchemaInfo().getSchemaDefinition(), + msgPayload + ); + typedMessageBuilder = typedMessageBuilder.value(payload); + } else { + typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); + } + + container.add(typedMessageBuilder.sendAsync()); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java new file mode 100644 index 000000000..73a3133a6 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java @@ -0,0 +1,26 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.Producer; + +import java.util.function.LongFunction; + +public class PulsarBatchProducerStartMapper extends PulsarOpMapper { + + private final LongFunction> batchProducerFunc; + + public PulsarBatchProducerStartMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction> batchProducerFunc) { + super(cmdTpl, clientSpace); + this.batchProducerFunc = batchProducerFunc; + } + + @Override + public PulsarOp apply(long value) { + Producer batchProducer = batchProducerFunc.apply(value); + + return new PulsarBatchProducerStartOp(batchProducer); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java new file mode 100644 index 000000000..1798df3a5 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java @@ -0,0 +1,33 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.nb.api.errors.BasicError; +import org.apache.commons.compress.utils.Lists; +import org.apache.pulsar.client.api.*; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +public class PulsarBatchProducerStartOp implements PulsarOp { + + // TODO: ensure sane container lifecycle management + public final static ThreadLocal>> threadLocalBatchMsgContainer = new ThreadLocal<>(); + public final static ThreadLocal> threadLocalProducer = new ThreadLocal<>(); + + public PulsarBatchProducerStartOp(Producer batchProducer) { + threadLocalProducer.set(batchProducer); + } + + @Override + public void run() { + List> container = threadLocalBatchMsgContainer.get(); + + if (container == null) { + container = Lists.newArrayList(); + threadLocalBatchMsgContainer.set(container); + } else { + throw new BasicError("You tried to create a batch message container where one was already" + + " defined. This means you did not flush and unset the last container, or there is an error in your" + + " pulsar op sequencing and ratios."); + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index 4a69e6c9f..c1ff0332d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; @@ -16,22 +17,28 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarConsumerMapper implements LongFunction { - private final CommandTemplate cmdTpl; - private final Schema pulsarSchema; +public class PulsarConsumerMapper extends PulsarOpMapper { private final LongFunction> consumerFunc; + private final LongFunction asyncApiFunc; public PulsarConsumerMapper(CommandTemplate cmdTpl, - Schema pulsarSchema, - LongFunction> consumerFunc) { - this.cmdTpl = cmdTpl; - this.pulsarSchema = pulsarSchema; + PulsarSpace clientSpace, + LongFunction> consumerFunc, + LongFunction asyncApiFunc) { + super(cmdTpl, clientSpace); this.consumerFunc = consumerFunc; + this.asyncApiFunc = asyncApiFunc; } @Override public PulsarOp apply(long value) { Consumer consumer = consumerFunc.apply(value); - return new PulsarConsumerOp(consumer, pulsarSchema); + boolean asyncApi = asyncApiFunc.apply(value); + + return new PulsarConsumerOp( + consumer, + clientSpace.getPulsarSchema(), + asyncApi + ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 1e9cab4d9..d3606a059 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -3,23 +3,20 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; -import java.nio.charset.StandardCharsets; - public class PulsarConsumerOp implements PulsarOp { private final Consumer consumer; private final Schema pulsarSchema; + private final boolean asyncPulsarOp; - public PulsarConsumerOp(Consumer consumer, Schema schema) { + public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp) { this.consumer = consumer; this.pulsarSchema = schema; + this.asyncPulsarOp = asyncPulsarOp; } - @Override - public void run() { + public void syncConsume() { try { Message message = consumer.receive(); @@ -35,9 +32,20 @@ public class PulsarConsumerOp implements PulsarOp { } consumer.acknowledge(message.getMessageId()); - } catch (PulsarClientException e) { throw new RuntimeException(e); } } + + public void asyncConsume() { + //TODO: add support for async consume + } + + @Override + public void run() { + if (!asyncPulsarOp) + syncConsume(); + else + asyncConsume(); + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java new file mode 100644 index 000000000..06c239c8e --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java @@ -0,0 +1,19 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; + +import java.util.function.LongFunction; + +public abstract class PulsarOpMapper implements LongFunction { + protected final CommandTemplate cmdTpl; + protected final PulsarSpace clientSpace; + + public PulsarOpMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace) { + this.cmdTpl = cmdTpl; + this.clientSpace = clientSpace; + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index cebc8287c..7212356e0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -17,21 +17,21 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarProducerMapper implements LongFunction { - private final CommandTemplate cmdTpl; - private final Schema pulsarSchema; +public class PulsarProducerMapper extends PulsarOpMapper { private final LongFunction> producerFunc; + private final LongFunction asyncApiFunc; private final LongFunction keyFunc; private final LongFunction payloadFunc; public PulsarProducerMapper(CommandTemplate cmdTpl, - Schema pulsarSchema, + PulsarSpace clientSpace, LongFunction> producerFunc, + LongFunction asyncApiFunc, LongFunction keyFunc, LongFunction payloadFunc) { - this.cmdTpl = cmdTpl; - this.pulsarSchema = pulsarSchema; + super(cmdTpl, clientSpace); this.producerFunc = producerFunc; + this.asyncApiFunc = asyncApiFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; } @@ -39,9 +39,15 @@ public class PulsarProducerMapper implements LongFunction { @Override public PulsarOp apply(long value) { Producer producer = producerFunc.apply(value); + boolean asyncApi = asyncApiFunc.apply(value); String msgKey = keyFunc.apply(value); String msgPayload = payloadFunc.apply(value); - return new PulsarProducerOp(producer, pulsarSchema, msgKey, msgPayload); + return new PulsarProducerOp( + producer, + clientSpace.getPulsarSchema(), + asyncApi, + msgKey, + msgPayload); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 9ee2d4d51..07827ceb2 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -1,58 +1,86 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarAction; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; public class PulsarProducerOp implements PulsarOp { + + private final static Logger logger = LogManager.getLogger(PulsarProducerOp.class); + private final Producer producer; private final Schema pulsarSchema; private final String msgKey; private final String msgPayload; + private final boolean asyncPulsarOp; - public PulsarProducerOp(Producer producer, Schema schema, String key, String payload) { + public PulsarProducerOp(Producer producer, + Schema schema, + boolean asyncPulsarOp, + String key, + String payload) { this.producer = producer; this.pulsarSchema = schema; this.msgKey = key; this.msgPayload = payload; + this.asyncPulsarOp = asyncPulsarOp; } @Override public void run() { - try { - if ( (msgPayload == null) || msgPayload.isEmpty() ) { - throw new RuntimeException("Message payload (\"msg-value\" can't be empty!"); + if ((msgPayload == null) || msgPayload.isEmpty()) { + throw new RuntimeException("Message payload (\"msg-value\") can't be empty!"); + } + + TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema); + if ((msgKey != null) && (!msgKey.isEmpty())) { + typedMessageBuilder = typedMessageBuilder.key(msgKey); + } + + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( + (GenericAvroSchema) pulsarSchema, + pulsarSchema.getSchemaInfo().getSchemaDefinition(), + msgPayload + ); + typedMessageBuilder = typedMessageBuilder.value(payload); + } else { + typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); + } + + //TODO: add error handling with failed message production + if (!asyncPulsarOp) { + try { + logger.trace("sending message"); + typedMessageBuilder.send(); + } catch (PulsarClientException pce) { + logger.trace("failed sending message"); + throw new RuntimeException(pce); } + } else { + try { + CompletableFuture future = typedMessageBuilder.sendAsync(); + future.get(); - TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema); - if ( (msgKey != null) && (!msgKey.isEmpty()) ) { - typedMessageBuilder = typedMessageBuilder.key(msgKey); + /*.thenRun(() -> { +// System.out.println("Producing message succeeded: key - " + msgKey + "; payload - " + msgPayload); + }).exceptionally(ex -> { + System.out.println("Producing message failed: key - " + msgKey + "; payload - " + msgPayload); + return ex; + })*/ + } catch (Exception e) { + throw new RuntimeException(e); } - - SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); - if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.generic.GenericRecord avroGenericRecord = - AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, msgPayload); - - GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( - (GenericAvroSchema) pulsarSchema, avroGenericRecord); - - typedMessageBuilder = typedMessageBuilder.value(payload); - } - else { - typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); - } - - typedMessageBuilder.send(); - - } catch (PulsarClientException e) { - throw new RuntimeException(e); } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java index 7ae971a7e..cbd5d203b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java @@ -1,27 +1,35 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import java.util.function.LongFunction; -public class PulsarReaderMapper implements LongFunction { - private final CommandTemplate cmdTpl; - private final Schema pulsarSchema; +public class PulsarReaderMapper extends PulsarOpMapper { + private final LongFunction> readerFunc; + private final LongFunction asyncApiFunc; public PulsarReaderMapper(CommandTemplate cmdTpl, - Schema pulsarSchema, - LongFunction> readerFunc) { - this.cmdTpl = cmdTpl; - this.pulsarSchema = pulsarSchema; + PulsarSpace clientSpace, + LongFunction> readerFunc, + LongFunction asyncApiFunc) { + super(cmdTpl, clientSpace); this.readerFunc = readerFunc; + this.asyncApiFunc = asyncApiFunc; } @Override public PulsarOp apply(long value) { Reader reader = readerFunc.apply(value); - return new PulsarReaderOp(reader, pulsarSchema); + boolean asyncApi = asyncApiFunc.apply(value); + + return new PulsarReaderOp( + reader, + clientSpace.getPulsarSchema(), + asyncApi + ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java index 36770a2fc..a29fd9721 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java @@ -11,14 +11,15 @@ import org.apache.pulsar.common.schema.SchemaType; public class PulsarReaderOp implements PulsarOp { private final Reader reader; private final Schema pulsarSchema; + private final boolean asyncPulsarOp; - public PulsarReaderOp(Reader reader, Schema schema) { + public PulsarReaderOp(Reader reader, Schema schema, boolean asyncPulsarOp) { this.reader = reader; this.pulsarSchema = schema; + this.asyncPulsarOp = asyncPulsarOp; } - @Override - public void run() { + public void syncRead() { try { SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); @@ -40,4 +41,16 @@ public class PulsarReaderOp implements PulsarOp { throw new RuntimeException(e); } } + + public void asyncRead() { + //TODO: add support for async read + } + + @Override + public void run() { + if (!asyncPulsarOp) + syncRead(); + else + asyncRead(); + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 6ef854c7a..54b101e24 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -1,36 +1,36 @@ package io.nosqlbench.driver.pulsar.ops; -import io.nosqlbench.driver.pulsar.PulsarSpace; -import io.nosqlbench.driver.pulsar.PulsarSpaceCache; +import io.nosqlbench.driver.pulsar.*; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; -import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.scoping.ScopedSupplier; import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Reader; import java.util.function.LongFunction; import java.util.function.Supplier; -public class ReadyPulsarOp implements OpDispenser { +public class ReadyPulsarOp implements LongFunction { + private final OpTemplate opTpl; private final CommandTemplate cmdTpl; private final PulsarSpace clientSpace; private final LongFunction opFunc; - private final PulsarSpaceCache pcache; - private final Schema pulsarSchema; // TODO: Add docs for the command template with respect to the OpTemplate public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) { // TODO: Consider parsing map structures into equivalent binding representation + this.opTpl = opTemplate; this.cmdTpl = new CommandTemplate(opTemplate); + if (cmdTpl.isDynamic("op_scope")) { throw new RuntimeException("op_scope must be static"); } - this.pcache = pcache; // TODO: At the moment, only supports static "client" if (cmdTpl.containsKey("client")) { if (cmdTpl.isDynamic("client")) { @@ -43,40 +43,38 @@ public class ReadyPulsarOp implements OpDispenser { this.clientSpace = pcache.getPulsarSpace("default"); } - this.pulsarSchema = clientSpace.getPulsarSchema(); - this.opFunc = resolve(); ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton")); Supplier> opSupplier = scope.supplier(this::resolve); } + @Override + public PulsarOp apply(long value) { + return opFunc.apply(value); + } + + private boolean isBoolean(String str) { + return StringUtils.equalsAnyIgnoreCase(str, "yes", "true"); + } + private LongFunction resolve() { if (cmdTpl.containsKey("topic_url")) { throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); } - LongFunction cycle_producer_name_func; - if (cmdTpl.isStatic("producer-name")) { - cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer-name"); - } else if (cmdTpl.isDynamic("producer-name")) { - cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer-name", l); - } else { - cycle_producer_name_func = (l) -> null; - } - - LongFunction topic_uri_func; + // Global parameter: topic_uri + LongFunction topicUriFunc; if (cmdTpl.containsKey("topic_uri")) { if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) { throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'."); } else if (cmdTpl.isStatic("topic_uri")) { - topic_uri_func = (l) -> cmdTpl.getStatic("topic_uri"); + topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri"); } else { - topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l); + topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l); } - } - else if (cmdTpl.containsKey("topic")) { + } else if (cmdTpl.containsKey("topic")) { if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) { String persistence = cmdTpl.getStaticOr("persistence", "persistent") .replaceAll("true", "persistent"); @@ -86,73 +84,252 @@ public class ReadyPulsarOp implements OpDispenser { String topic = cmdTpl.getStaticOr("topic", ""); String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic; - topic_uri_func = (l) -> composited; + topicUriFunc = (l) -> composited; } else { // some or all dynamic fields, composite into a single dynamic call - topic_uri_func = (l) -> + topicUriFunc = (l) -> cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent") + "://" + cmdTpl.getOr("tenant", l, "public") + "/" + cmdTpl.getOr("namespace", l, "default") + "/" + cmdTpl.getOr("topic", l, ""); } - } - else { - topic_uri_func = (l) -> null; - } - - assert (clientSpace != null); - String clientType = clientSpace.getPulsarClientConf().getPulsarClientType(); - - // TODO: At the moment, only implements "Producer" functionality; add implementation for others later! - if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) { - return resolveProducer(clientSpace, cmdTpl, cycle_producer_name_func, topic_uri_func);/* - } else if ( msgOperation.equalsIgnoreCase(PulsarActivityUtil.MSGOP_TYPES.CONSUMER.toString()) ) { - return resolveConsumer(spaceFunc, cmdTpl, topic_uri_func); - } else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.READER.toString()) ) { - } else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.WSOKT_PRODUCER.toString()) ) { - } else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.MANAGED_LEDGER.toString()) ) { - */ } else { - throw new RuntimeException("Unsupported Pulsar message operation type."); + topicUriFunc = (l) -> null; + } + + // Global parameter: async_api + LongFunction asyncApiFunc; + if (cmdTpl.containsKey("async_api")) { + if (cmdTpl.isStatic("async_api")) + asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api")); + else + throw new RuntimeException("\"async_api\" parameter cannot be dynamic!"); + } else { + asyncApiFunc = (l) -> false; + } + + if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) { + throw new RuntimeException("Statement parameter \"optype\" must have a valid value!"); + } + String stmtOpType = cmdTpl.getStatic("optype"); + + // TODO: Complete implementation for websocket-producer and managed-ledger + if /*( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_TENANT.label) ) { + return resolveCreateTenant(clientSpace); + } else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_NAMESPACE.label) ) { + return resolveCreateNameSpace(clientSpace); + } else if*/ (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) { + return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { + return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) { + return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) { + return resolveMsgBatchSendStart(clientSpace, topicUriFunc); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) { + return resolveMsgBatchSend(clientSpace); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) { + return resolveMsgBatchSendEnd(clientSpace); + } else { + throw new RuntimeException("Unsupported Pulsar operation type"); } } - private LongFunction resolveProducer( - PulsarSpace pulsarSpace, - CommandTemplate cmdTpl, - LongFunction cycle_producer_name_func, - LongFunction topic_uri_func + private LongFunction resolveMsgSend( + PulsarSpace clientSpace, + LongFunction topic_uri_func, + LongFunction async_api_func ) { + LongFunction cycle_producer_name_func; + if (cmdTpl.isStatic("producer_name")) { + cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer_name"); + } else if (cmdTpl.isDynamic("producer_name")) { + cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer_name", l); + } else { + cycle_producer_name_func = (l) -> null; + } + LongFunction> producerFunc = - (l) -> pulsarSpace.getProducer(cycle_producer_name_func.apply(l), topic_uri_func.apply(l)); + (l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l)); LongFunction keyFunc; - if (cmdTpl.isStatic("msg-key")) { - keyFunc = (l) -> cmdTpl.getStatic("msg-key"); - } else if (cmdTpl.isDynamic("msg-key")) { - keyFunc = (l) -> cmdTpl.getDynamic("msg-key", l); + if (cmdTpl.isStatic("msg_key")) { + keyFunc = (l) -> cmdTpl.getStatic("msg_key"); + } else if (cmdTpl.isDynamic("msg_key")) { + keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l); } else { keyFunc = (l) -> null; } LongFunction valueFunc; - if (cmdTpl.containsKey("msg-value")) { - if (cmdTpl.isStatic("msg-value")) { - valueFunc = (l) -> cmdTpl.getStatic("msg-value"); - } else if (cmdTpl.isDynamic("msg-value")) { - valueFunc = (l) -> cmdTpl.getDynamic("msg-value", l); + if (cmdTpl.containsKey("msg_value")) { + if (cmdTpl.isStatic("msg_value")) { + valueFunc = (l) -> cmdTpl.getStatic("msg_value"); + } else if (cmdTpl.isDynamic("msg_value")) { + valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l); } else { valueFunc = (l) -> null; } } else { - throw new RuntimeException("\"msg-value\" field must be specified!"); + throw new RuntimeException("Producer:: \"msg_value\" field must be specified!"); } - return new PulsarProducerMapper(producerFunc, keyFunc, valueFunc, pulsarSchema, cmdTpl); + return new PulsarProducerMapper( + cmdTpl, + clientSpace, + producerFunc, + async_api_func, + keyFunc, + valueFunc); } - @Override - public PulsarOp apply(long value) { - PulsarOp op = opFunc.apply(value); - return op; + private LongFunction resolveMsgConsume( + PulsarSpace clientSpace, + LongFunction topic_uri_func, + LongFunction async_api_func + ) { + // Topic list (multi-topic) + LongFunction topic_names_func; + if (cmdTpl.isStatic("topic-names")) { + topic_names_func = (l) -> cmdTpl.getStatic("topic-names"); + } else if (cmdTpl.isDynamic("topic-names")) { + topic_names_func = (l) -> cmdTpl.getDynamic("topic-names", l); + } else { + topic_names_func = (l) -> null; + } + + // Topic pattern (multi-topic) + LongFunction topics_pattern_func; + if (cmdTpl.isStatic("topics-pattern")) { + topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern"); + } else if (cmdTpl.isDynamic("topics-pattern")) { + topics_pattern_func = (l) -> cmdTpl.getDynamic("topics-pattern", l); + } else { + topics_pattern_func = (l) -> null; + } + + LongFunction subscription_name_func; + if (cmdTpl.isStatic("subscription-name")) { + subscription_name_func = (l) -> cmdTpl.getStatic("subscription-name"); + } else if (cmdTpl.isDynamic("subscription-name")) { + subscription_name_func = (l) -> cmdTpl.getDynamic("subscription-name", l); + } else { + subscription_name_func = (l) -> null; + } + + LongFunction subscription_type_func; + if (cmdTpl.isStatic("subscription-type")) { + subscription_type_func = (l) -> cmdTpl.getStatic("subscription-type"); + } else if (cmdTpl.isDynamic("subscription-type")) { + subscription_type_func = (l) -> cmdTpl.getDynamic("subscription-type", l); + } else { + subscription_type_func = (l) -> null; + } + + LongFunction consumer_name_func; + if (cmdTpl.isStatic("consumer-name")) { + consumer_name_func = (l) -> cmdTpl.getStatic("consumer-name"); + } else if (cmdTpl.isDynamic("consumer-name")) { + consumer_name_func = (l) -> cmdTpl.getDynamic("consumer-name", l); + } else { + consumer_name_func = (l) -> null; + } + + LongFunction> consumerFunc = (l) -> + clientSpace.getConsumer( + topic_uri_func.apply(l), + topic_names_func.apply(l), + topics_pattern_func.apply(l), + subscription_name_func.apply(l), + subscription_type_func.apply(l), + consumer_name_func.apply(l) + ); + + return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func); + } + + private LongFunction resolveMsgRead( + PulsarSpace clientSpace, + LongFunction topic_uri_func, + LongFunction async_api_func + ) { + LongFunction reader_name_func; + if (cmdTpl.isStatic("reader-name")) { + reader_name_func = (l) -> cmdTpl.getStatic("reader-name"); + } else if (cmdTpl.isDynamic("reader-name")) { + reader_name_func = (l) -> cmdTpl.getDynamic("reader-name", l); + } else { + reader_name_func = (l) -> null; + } + + LongFunction start_msg_pos_str_func; + if (cmdTpl.isStatic("start-msg-position")) { + start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start-msg-position"); + } else if (cmdTpl.isDynamic("start-msg-position")) { + start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start-msg-position", l); + } else { + start_msg_pos_str_func = (l) -> null; + } + + LongFunction> readerFunc = (l) -> + clientSpace.getReader( + topic_uri_func.apply(l), + reader_name_func.apply(l), + start_msg_pos_str_func.apply(l) + ); + + return new PulsarReaderMapper(cmdTpl, clientSpace, readerFunc, async_api_func); + } + + private LongFunction resolveMsgBatchSendStart( + PulsarSpace clientSpace, + LongFunction topic_uri_func + ) { + LongFunction cycle_batch_producer_name_func; + if (cmdTpl.isStatic("batch_producer_name")) { + cycle_batch_producer_name_func = (l) -> cmdTpl.getStatic("batch_producer_name"); + } else if (cmdTpl.isDynamic("batch_producer_name")) { + cycle_batch_producer_name_func = (l) -> cmdTpl.getDynamic("batch_producer_name", l); + } else { + cycle_batch_producer_name_func = (l) -> null; + } + + LongFunction> batchProducerFunc = + (l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l)); + + return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, batchProducerFunc); + } + + private LongFunction resolveMsgBatchSend(PulsarSpace clientSpace) { + LongFunction keyFunc; + if (cmdTpl.isStatic("msg_key")) { + keyFunc = (l) -> cmdTpl.getStatic("msg_key"); + } else if (cmdTpl.isDynamic("msg_key")) { + keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l); + } else { + keyFunc = (l) -> null; + } + + LongFunction valueFunc; + if (cmdTpl.containsKey("msg_value")) { + if (cmdTpl.isStatic("msg_value")) { + valueFunc = (l) -> cmdTpl.getStatic("msg_value"); + } else if (cmdTpl.isDynamic("msg_value")) { + valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l); + } else { + valueFunc = (l) -> null; + } + } else { + throw new RuntimeException("Batch Producer:: \"msg_value\" field must be specified!"); + } + + return new PulsarBatchProducerMapper( + cmdTpl, + clientSpace, + keyFunc, + valueFunc); + } + + private LongFunction resolveMsgBatchSendEnd(PulsarSpace clientSpace) { + return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java index 37b8bfef9..6cf085f9f 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java @@ -80,6 +80,12 @@ public class AvroUtil { return recordBuilder.build(); } + + public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDef, String jsonData) { + org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData); + return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord); + } + public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDef, String jsonData) { GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDef); org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index eca58ef55..da141875c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -23,22 +23,25 @@ public class PulsarActivityUtil { private final static Logger logger = LogManager.getLogger(PulsarActivityUtil.class); // Supported message operation types - public enum CLIENT_TYPES { - PRODUCER("producer"), - CONSUMER("consumer"), - READER("reader"), - WSOKT_PRODUCER("websocket-producer"), - MANAGED_LEDGER("managed-ledger") - ; + // TODO: websocket-producer and managed-ledger + public enum OP_TYPES { + CREATE_TENANT("create-tenant"), + CREATE_NAMESPACE("create-namespace"), + BATCH_MSG_SEND_START("batch-msg-send-start"), + BATCH_MSG_SEND("batch-msg-send"), + BATCH_MSG_SEND_END("batch-msg-send-end"), + MSG_SEND("msg-send"), + MSG_CONSUME("msg-consume"), + MSG_READ("msg-read"); public final String label; - CLIENT_TYPES(String label) { + OP_TYPES(String label) { this.label = label; } } public static boolean isValidClientType(String type) { - return Arrays.stream(CLIENT_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase())); + return Arrays.stream(OP_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase())); } @@ -50,7 +53,6 @@ public class PulsarActivityUtil { ; public final String label; - PERSISTENT_TYPES(String label) { this.label = label; } @@ -87,7 +89,6 @@ public class PulsarActivityUtil { ; public final String label; - CLNT_CONF_KEY(String label) { this.label = label; } @@ -120,7 +121,6 @@ public class PulsarActivityUtil { this.label = label; } } - public static boolean isStandardProducerConfItem(String item) { return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); } @@ -162,6 +162,23 @@ public class PulsarActivityUtil { return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); } + public enum SUBSCRIPTION_TYPE { + exclusive("exclusive"), + failover("failover"), + shared("shared"), + key_shared("key_shared"); + + public final String label; + + SUBSCRIPTION_TYPE(String label) { + this.label = label; + } + } + + public static boolean isValidSubscriptionType(String item) { + return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } + /////// // Standard reader configuration (activity-level settings) // - https://pulsar.apache.org/docs/en/client-libraries-java/#reader @@ -182,7 +199,6 @@ public class PulsarActivityUtil { this.label = label; } } - public static boolean isStandardReaderConfItem(String item) { return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); } @@ -213,6 +229,10 @@ public class PulsarActivityUtil { } } + public static boolean isValideReaderStartPosition(String item) { + return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } + /////// // Valid websocket-producer configuration (activity-level settings) // TODO: to be added @@ -233,7 +253,6 @@ public class PulsarActivityUtil { ; public final String label; - MANAGED_LEDGER_CONF_KEY(String label) { this.label = label; } @@ -262,7 +281,6 @@ public class PulsarActivityUtil { return isPrimitive; } - public static Schema getPrimitiveTypeSchema(String typeStr) { Schema schema; @@ -331,7 +349,6 @@ public class PulsarActivityUtil { } return isAvroType; } - public static Schema getAvroSchema(String typeStr, String definitionStr) { String schemaDefinitionStr = definitionStr; String filePrefix = "file://"; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index 3fa31301a..5b5814759 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -7,7 +7,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; import org.apache.commons.configuration2.builder.fluent.Parameters; import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler; import org.apache.commons.configuration2.ex.ConfigurationException; -import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -23,13 +23,11 @@ public class PulsarNBClientConf { private String canonicalFilePath = ""; - public static final String DRIVER_CONF_PREFIX = "driver"; public static final String SCHEMA_CONF_PREFIX = "schema"; public static final String CLIENT_CONF_PREFIX = "client"; public static final String PRODUCER_CONF_PREFIX = "producer"; public static final String CONSUMER_CONF_PREFIX = "consumer"; public static final String READER_CONF_PREFIX = "reader"; - private final HashMap driverConfMap = new HashMap<>(); private final HashMap schemaConfMap = new HashMap<>(); private final HashMap clientConfMap = new HashMap<>(); private final HashMap producerConfMap = new HashMap<>(); @@ -53,14 +51,6 @@ public class PulsarNBClientConf { Configuration config = builder.getConfiguration(); - // Get driver specific configuration settings - for (Iterator it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) { - String confKey = it.next(); - String confVal = config.getProperty(confKey).toString(); - if (!StringUtils.isBlank(confVal)) - driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length() + 1), config.getProperty(confKey)); - } - // Get schema specific configuration settings for (Iterator it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); @@ -110,50 +100,11 @@ public class PulsarNBClientConf { } - ////////////////// - // Get NB Driver related config - public Map getDriverConfMap() { - return this.driverConfMap; - } - - public boolean hasDriverConfKey(String key) { - if (key.contains(DRIVER_CONF_PREFIX)) - return driverConfMap.containsKey(key.substring(DRIVER_CONF_PREFIX.length() + 1)); - else - return driverConfMap.containsKey(key); - } - public Object getDriverConfValue(String key) { - if (key.contains(DRIVER_CONF_PREFIX)) - return driverConfMap.get(key.substring(DRIVER_CONF_PREFIX.length() + 1)); - else - return driverConfMap.get(key); - } - - public void setDriverConfValue(String key, Object value) { - if (key.contains(DRIVER_CONF_PREFIX)) - driverConfMap.put(key.substring(DRIVER_CONF_PREFIX.length() + 1), value); - else - driverConfMap.put(key, value); - } - - // other driver helper functions ... - public String getPulsarClientType() { - Object confValue = getDriverConfValue("driver.client-type"); - - // If not explicitly specifying Pulsar client type, "producer" is the default type - if (confValue == null) - return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString(); - else - return confValue.toString(); - } - - ////////////////// // Get Schema related config public Map getSchemaConfMap() { return this.schemaConfMap; } - public boolean hasSchemaConfKey(String key) { if (key.contains(SCHEMA_CONF_PREFIX)) return schemaConfMap.containsKey(key.substring(SCHEMA_CONF_PREFIX.length() + 1)); @@ -166,7 +117,6 @@ public class PulsarNBClientConf { else return schemaConfMap.get(key); } - public void setSchemaConfValue(String key, Object value) { if (key.contains(SCHEMA_CONF_PREFIX)) schemaConfMap.put(key.substring(SCHEMA_CONF_PREFIX.length() + 1), value); @@ -180,7 +130,6 @@ public class PulsarNBClientConf { public Map getClientConfMap() { return this.clientConfMap; } - public boolean hasClientConfKey(String key) { if (key.contains(CLIENT_CONF_PREFIX)) return clientConfMap.containsKey(key.substring(CLIENT_CONF_PREFIX.length() + 1)); @@ -193,7 +142,6 @@ public class PulsarNBClientConf { else return clientConfMap.get(key); } - public void setClientConfValue(String key, Object value) { if (key.contains(CLIENT_CONF_PREFIX)) clientConfMap.put(key.substring(CLIENT_CONF_PREFIX.length() + 1), value); @@ -207,7 +155,6 @@ public class PulsarNBClientConf { public Map getProducerConfMap() { return this.producerConfMap; } - public boolean hasProducerConfKey(String key) { if (key.contains(PRODUCER_CONF_PREFIX)) return producerConfMap.containsKey(key.substring(PRODUCER_CONF_PREFIX.length() + 1)); @@ -226,7 +173,6 @@ public class PulsarNBClientConf { else producerConfMap.put(key, value); } - // other producer helper functions ... public String getProducerName() { Object confValue = getProducerConfValue("producer.producerName"); @@ -235,7 +181,6 @@ public class PulsarNBClientConf { else return confValue.toString(); } - public String getProducerTopicName() { Object confValue = getProducerConfValue("producer.topicName"); if (confValue == null) @@ -250,28 +195,24 @@ public class PulsarNBClientConf { public Map getConsumerConfMap() { return this.consumerConfMap; } - public boolean hasConsumerConfKey(String key) { if (key.contains(CONSUMER_CONF_PREFIX)) return consumerConfMap.containsKey(key.substring(CONSUMER_CONF_PREFIX.length() + 1)); else return consumerConfMap.containsKey(key); } - public Object getConsumerConfValue(String key) { if (key.contains(CONSUMER_CONF_PREFIX)) return consumerConfMap.get(key.substring(CONSUMER_CONF_PREFIX.length() + 1)); else return consumerConfMap.get(key); } - public void setConsumerConfValue(String key, Object value) { if (key.contains(CONSUMER_CONF_PREFIX)) consumerConfMap.put(key.substring(CONSUMER_CONF_PREFIX.length() + 1), value); else consumerConfMap.put(key, value); } - // Other consumer helper functions ... public String getConsumerTopicNames() { Object confValue = getConsumerConfValue("consumer.topicNames"); @@ -280,7 +221,6 @@ public class PulsarNBClientConf { else return confValue.toString(); } - public String getConsumerTopicPattern() { Object confValue = getConsumerConfValue("consumer.topicsPattern"); if (confValue == null) @@ -288,7 +228,6 @@ public class PulsarNBClientConf { else return confValue.toString(); } - public String getConsumerSubscriptionName() { Object confValue = getConsumerConfValue("consumer.subscriptionName"); if (confValue == null) @@ -296,7 +235,6 @@ public class PulsarNBClientConf { else return confValue.toString(); } - public String getConsumerSubscriptionType() { Object confValue = getConsumerConfValue("consumer.subscriptionType"); if (confValue == null) @@ -304,7 +242,6 @@ public class PulsarNBClientConf { else return confValue.toString(); } - public String getConsumerName() { Object confValue = getConsumerConfValue("consumer.consumerName"); if (confValue == null) @@ -319,28 +256,24 @@ public class PulsarNBClientConf { public Map getReaderConfMap() { return this.readerConfMap; } - public boolean hasReaderConfKey(String key) { if (key.contains(READER_CONF_PREFIX)) return readerConfMap.containsKey(key.substring(READER_CONF_PREFIX.length() + 1)); else return readerConfMap.containsKey(key); } - public Object getReaderConfValue(String key) { if (key.contains(READER_CONF_PREFIX)) return readerConfMap.get(key.substring(READER_CONF_PREFIX.length() + 1)); else return readerConfMap.get(key); } - public void setReaderConfValue(String key, Object value) { if (key.contains(READER_CONF_PREFIX)) readerConfMap.put(key.substring(READER_CONF_PREFIX.length() + 1), value); else readerConfMap.put(key, value); } - // Other consumer helper functions ... public String getReaderTopicName() { Object confValue = getReaderConfValue("reader.topicName"); @@ -349,7 +282,6 @@ public class PulsarNBClientConf { else return confValue.toString(); } - public String getReaderName() { Object confValue = getReaderConfValue("reader.readerName"); if (confValue == null) @@ -357,7 +289,6 @@ public class PulsarNBClientConf { else return confValue.toString(); } - public String getStartMsgPosStr() { Object confValue = getReaderConfValue("reader.startMessagePos"); if (confValue == null) diff --git a/driver-pulsar/src/main/resources/activities/config.properties b/driver-pulsar/src/main/resources/activities/config.properties index 7fae8891b..68d51e3ca 100644 --- a/driver-pulsar/src/main/resources/activities/config.properties +++ b/driver-pulsar/src/main/resources/activities/config.properties @@ -1,9 +1,3 @@ -### NB Pulsar driver related configuration - driver.xxx -driver.client-type=producer -driver.num-workers=1 -# TODO: functionalities to be completed -driver.sync-mode=sync -driver.msg-recv-ouput=console ### Schema related configurations - schema.xxx # valid types: # - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type) @@ -15,15 +9,14 @@ driver.msg-recv-ouput=console # 1) primitive types, including bytearray (byte[]) which is default, for messages without schema # 2) Avro for messages with schema schema.type=avro -schema.definition=file://// +schema.definition=file:// ### Pulsar client related configurations - client.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#client -client.serviceUrl=pulsar://localhost:6650 client.connectionTimeoutMs=5000 ### Producer related configurations (global) - producer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer producer.producerName= -producer.topicName=persistent://public/default/mynbtest +producer.topicName= producer.sendTimeoutMs= ### Consumer related configurations (global) - consumer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer @@ -36,7 +29,7 @@ consumer.receiverQueueSize= ### Reader related configurations (global) - reader.xxx # https://pulsar.apache.org/docs/en/client-libraries-java/#reader # - valid Pos: earliest, latest, custom::file://// -reader.topicName=persistent://public/default/nbpulsar +reader.topicName= reader.receiverQueueSize= reader.readerName= -#reader.startMessagePos = earliest +reader.startMessagePos=earliest diff --git a/driver-pulsar/src/main/resources/activities/pulsar.yaml b/driver-pulsar/src/main/resources/activities/pulsar.yaml index c329e193f..a4a739fad 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar.yaml @@ -7,55 +7,88 @@ description: | bindings: mykey: NumberNameToString(); sensor_id: ToUUID();ToString(); -# sensor_type: + # sensor_type: reading_time: ToDateTime(); reading_value: ToFloat(100); topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L)); +# document level parameters that apply to all Pulsar client types: +params: + #topic_uri: "persistent://public/default/{topic}" + topic_uri: "persistent://public/default/nbpulsar" + async_api: "false" + blocks: -# - create-tenant-namespace: -# tags: -# type: create-tenant-namespace -# statements: -# tenant: {tenant} -# namespace: {namespace} + - name: admin-block + tags: + phase: create-tenant-namespace + statements: + - name: s1 + optype: create-tenant + tenant: "{tenant}" + - name: s2 + optype: create-namespace + namespace: "{namespace}" + + - name: batch-producer-block + tags: + phase: batch-producer + statements: + - name: s1 + optype: batch-msg-send-start + # For batch producer, "producer_name" should be associated with batch start + # batch_producer_name: {batch_producer_name} + ratio: 1 + - name: s2 + optype: batch-msg-send + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } + ratio: 100 + - name: s3 + optype: batch-msg-send-end + ratio: 1 - name: producer-block tags: - op-type: producer + phase: producer statements: - - producer-stuff: - ####### - # NOTE: tenant and namespace must be static and pre-exist in Pulsar first - # topic_uri: "[persistent|non-persistent]:////" -# topic_uri: "persistent://public/default/{topic}" - topic_uri: "persistent://public/default/nbpulsar" - # producer-name: - msg-key: "{mykey}" - msg-value: | - { - "SensorID": "{sensor_id}", - "SensorType": "Temperature", - "ReadingTime": "{reading_time}", - "ReadingValue": {reading_value} - } + - name: s1 + optype: msg-send + # producer_name: {producer_name} + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } - name: consumer-block tags: - op-type: consumer + phase: consumer statements: - - consumer-stuff: - topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest" - topics-pattern: "public/default/.*" - subscription-name: - subscription-type: - consumer-name: + - name: s1 + optype: msg-consume + topic_names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest" + topics_pattern: "public/default/.*" + subscription_name: + subscription_type: + consumer_name: - - reader: + - name: reader-block tags: - op-type: reader + phase: reader statements: - - reader-stuff: + - name: s1 + optype: msg-read + reader_name: # - websocket-producer: # tags: @@ -68,5 +101,3 @@ blocks: # type: managed-ledger # statement: # - managed-ledger-stuff: - - diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index 589bee015..a762ebd30 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -1,13 +1,19 @@ - [1. NoSQLBench (NB) Pulsar Driver Overview](#1-nosqlbench-nb-pulsar-driver-overview) - [1.1. Issues Tracker](#11-issues-tracker) - [1.2. Global Level Pulsar Configuration Settings](#12-global-level-pulsar-configuration-settings) - - [1.3. Pulsar Driver Yaml File: Statement Blocks](#13-pulsar-driver-yaml-file-statement-blocks) - - [1.3.1. Producer Statement block](#131-producer-statement-block) - - [1.3.2. Consumer Statement block](#132-consumer-statement-block) - - [1.4. Schema Support](#14-schema-support) - - [1.5. Activity Parameters](#15-activity-parameters) - - [1.6. Pulsar NB Execution Example](#16-pulsar-nb-execution-example) -- [2. Advanced Driver Features -- TODO: Design Revisit](#2-advanced-driver-features----todo-design-revisit) + - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) + - [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters) + - [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details) + - [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block) + - [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block) + - [1.4.3. Producer Command Block](#143-producer-command-block) + - [1.4.4. Consumer Command Block](#144-consumer-command-block) + - [1.4.5. Reader Command Block](#145-reader-command-block) + - [1.5. Schema Support](#15-schema-support) + - [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters) + - [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example) + - [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) +- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - [2.1. Other Activity Parameters](#21-other-activity-parameters) - [2.2. API Caching](#22-api-caching) - [2.2.1. Instancing Controls](#221-instancing-controls) @@ -17,12 +23,10 @@ This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB). * Producer * Consumer -* (Future) Reader +* Reader * (Future) WebSocket Producer * (Future) Managed Ledger -**NOTE**: At the moment, only Producer workload type is fully supported in NB. The support for Consumer type is partially added but not completed yet; and the support for other types of workloads will be added in NB in future releases. - ## 1.1. Issues Tracker If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar). @@ -36,15 +40,11 @@ When creating these objects (e.g. PulsarClient, Producer), there are different c The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below: ```properties -### NB Pulsar driver related configuration - driver.xxx -driver.client-type = producer - ### Schema related configurations - schema.xxx schema.type = avro schema.definition = file:/// ### Pulsar client related configurations - client.xxx -client.serviceUrl = pulsar://:6650 client.connectionTimeoutMs = 5000 ### Producer related configurations (global) - producer.xxx @@ -54,92 +54,402 @@ producer.sendTimeoutMs = ``` There are multiple sections in this file that correspond to different groups of configuration settings: -* **NB pulsar driver related settings**: - * All settings under this section starts with **driver.** prefix. - * Right now there is only valid option under this section: - * *driver.client-type* determines what type of Pulsar workload to be simulated by NB. * **Schema related settings**: * All settings under this section starts with **schema.** prefix. * The NB Pulsar driver supports schema-based message publishing and consuming. This section defines configuration settings that are schema related. * There are 2 valid options under this section. - * *shcema.type*: Pulsar message schema type. When unset or set as an empty string, Pulsar messages will be handled in raw *byte[]* format. The other valid option is **avro** which the Pulsar message will follow a specific Avro format. - * *schema.definition*: This only applies when an Avro schema type is specified and the value is the (full) file path that contains the Avro schema definition. + * *shcema.type*: Pulsar message schema type. When unset or set as + an empty string, Pulsar messages will be handled in raw *byte[]* + format. The other valid option is **avro** which the Pulsar + message will follow a specific Avro format. + * *schema.definition*: This only applies when an Avro schema type + is specified and the value is the (full) file path that contains + the Avro schema definition. * **Pulsar Client related settings**: * All settings under this section starts with **client.** prefix. - * This section defines all configuration settings that are related with defining a PulsarClient object. - * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) + * This section defines all configuration settings that are related + with defining a PulsarClient object. + * + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) * **Pulsar Producer related settings**: - * All settings under this section starts with **producer.** prefix. - * This section defines all configuration settings that are related with defining a Pulsar Producer object. - * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) + * All settings under this section starts with **producer** prefix. + * This section defines all configuration settings that are related + with defining a Pulsar Producer object. + * + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) +* **Pulsar Consumer related settings**: + * All settings under this section starts with **consumer** prefix. + * This section defines all configuration settings that are related + with defining a Pulsar Consumer object. + * + See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) +* **Pulsar Reader related settings**: + * All settings under this section starts with **reader** prefix. + * This section defines all configuration settings that are related + with defining a Pulsar Reader object. + * + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) -In the future, when the support for other types of Pulsar workloads is added in NB Pulsar driver, there will be corresponding configuration sections in this file as well. +In the future, when the support for other types of Pulsar workloads is +added in NB Pulsar driver, there will be corresponding configuration +sections in this file as well. -## 1.3. Pulsar Driver Yaml File: Statement Blocks +## 1.3. NB Pulsar Driver Yaml File - High Level Structure -Just like other NB driver types, the actual Pulsar workload generation is determined by the statement blocks in the NB driver Yaml file. Depending on the Pulsar workload type, the corresponding statement block may have different contents. +Just like other NB driver types, the actual Pulsar workload generation is +determined by the statement blocks in the NB driver Yaml file. Depending +on the Pulsar workload type, the corresponding statement block may have +different contents. -### 1.3.1. Producer Statement block +At high level, Pulsar driver yaml file has the following structure: -An example of defining Pulsar **Producer** workload is as below: +* **description**: (optional) general description of the yaml file +* **bindings**: defines NB bindings +* **params**: document level Pulsar driver parameters that apply to all + command blocks. Currently there are two valid parameters: + * **topic_url**: Pulsar topic uri ([persistent|non-persistent]: + ////). This can be statically assigned or + dynamically generated via NB bindings. + * **async_api**: Whether to use asynchronous Pulsar API (**note**: + more on this later) +* **blocks**: includes a series of command blocks. Each command block + defines one major Pulsar operation such as *producer*, *consumer*, etc. + Right now, the following command blocks are already supported or will be + added in the near future. We'll go through each of these command blocks + with more details in later sections. + * (future) **admin-block**: support for Pulsar Admin API, starting + with using NB to create tenants and namespaces. + * **batch-producer-block**: Pulsar batch producer + * **producer-block**: Pulsar producer + * **consumer-block**: Pulsar consumer + * **reader-block**: Pulsar reader ```yaml +description: | + ... ... + +bindings: + ... ... + +# global parameters that apply to all Pulsar client types: +params: + topic_uri: "" + async_api: "false" + blocks: -- name: producer-block - tags: - type: producer - statements: - - producer-stuff: - # producer-name: - # topic_uri: "persistent://public/default/{topic}" - topic_uri: "persistent://public/default/nbpulsar" - msg-key: "{mykey}" - msg-value: | - { - "SensorID": "{sensor_id}", - "SensorType": "Temperature", - "ReadingTime": "{reading_time}", - "ReadingValue": {reading_value} - } + - name: + tags: + phase: + statements: + - name: + optype: + ... ... + - name: + ... ... + + - name: + tags: + ... + statements: + ... ``` -In the above statement block, there are 4 key statement parameters to provide values: -* **producer-name**: cycle-level Pulsar producer name (can be dynamically bound) - * **Optional** - * If not set, global level producer name in *config.properties* file will be used. - * Use a default producer name, "default", if it is neither set at global level. - * If set, cycle level producer name will take precedence over the global level setting +Each time when you execute the NB command, you can only choose one command +block to execute. This is achieved by applying a filtering condition +against **phase** tag, as below: -* **topic_uri**: cycle-level Pulsar topic name (can be dynamically bound) - * **Optional** - * If not set, global level topic_uri in *config.properties* file will be used - * Throw a Runtime Error if it is neither set at global level - * If set, cycle level topic_uri will take precedence over the global level setting; and the provided value must follow several guidelines: - * It must be in valid Pulsar topic format as below: - ``` - [persistent|non-persistent]://// - ``` - * At the moment, only "**\**" part can be dynamically bound (e.g. through NB binding function). All other parts must be static values and the corresponding tenants and namespaces must be created in the Pulsar cluster in advance. +```bash + driver=pulsar tags=phase: ... +``` -**TODO**: allow dynamic binding for "\" and " -\" after adding a phase for creating "\" -and/or "\", similar to C* CQL schema creation phase! +An example of executing Pulsar producer/consumer API using NB is like +this: -* **msg-key**: Pulsar message key - * **Optional** - * If not set, the generated Pulsar messages (to be published by the Producer) doesn't have **keys**. +```bash +# producer + driver=pulsar tags=phase:producer ... -* **msg-value**: Pulsar message payload - * **Mandatory** - * If not set, throw a Runtime Error. +# consumer + driver=pulsar tags=phase:consumer ... +``` -### 1.3.2. Consumer Statement block +### 1.3.1. NB Cycle Level Parameters vs. Global Level Parameters -**TBD ...** +Some parameters, especially topic name and producer/consumer/reader/etc. +name, can be set at the global level in **config.properties** file, or at +NB cycle level via **pulsar.yaml** file. An example of setting a topic +name in both levels is as below: -## 1.4. Schema Support +```bash +# Global level setting (config.properties): +producer.topicName = ... + +# Cycle level setting (pulsar.yaml) +params: + topic_uri: ... +``` + +In theory, all Pulsar client settings can be made as cycle level settings +for maximum flexibility. But practically speaking (and also for simplicity +purposes), only the following parameters are made to be configurable at +both levels, listed by cycle level setting names with their corresponding +global level setting names: + +* topic_uri (Mandatory) + * producer.topicName + * consumer.topicNames + * reader.topicName +* topic_names (Optional for Consumer) + * consumer.topicNames +* subscription_name (Mandatory for Consumer) + * consumer.subscriptionName +* subscription_type (Mandatory for Consumer, default to **exclusive** + type) + * consumer.subscriptionType +* topics_pattern (Optional for Consumer) + * consumer.topicsPattern +* producer_name (Optional) + * producer.producerName +* consumer_name (Optional) + * consumer.consumerName +* reader_name (Optional) + * reader.readerName + +One key difference between setting a parameter at the global level vs. at +the cycle level is that the global level setting is always static and +stays the same for all NB cycle execution. The cycle level setting, on the +other side, can be dynamically bound and can be different from cycle to +cycle. + +Because of this, setting these parameters at the NB cycle level allows us +to run Pulsar testing against multiple topics and/or multiple +producers/consumers/readers/etc all at once within one NB activity. This +makes the testing more flexible and effective. + +**NOTE**: when a configuration is set at both the global level and the +cycle level, **the ycle level setting will take priority!** + +## 1.4. Pulsar Driver Yaml File - Command Block Details + +### 1.4.1. Pulsar Admin API Command Block + +**NOTE**: this functionality is only partially implemented at the moment +and doesn't function yet. + +Currently, the Pulsar Admin API Block is (planned) to only support +creating Pulsar tenants and namespaces. It has the following format: + +```yaml + - name: admin-block + tags: + phase: create-tenant-namespace + statements: + - name: s1 + optype: create-tenant + tenant: "{tenant}" + - name: s2 + optype: create-namespace + namespace: "{namespace}" +``` + +In this command block, there are 2 statements (s1 and s2): + +* Statement **s1** is used for creating a Pulsar tenant + * (Mandatory) **optype (create-tenant)** is the statement identifier + for this statement + * (Mandatory) **tenant** is the only statement parameter that + specifies the Pulsar tenant name which can either be dynamically + bound or statically assigned. +* Statement **s2** is used for creating a Pulsar namespace + * (Mandatory) **optype (create-namespace)** is the statement + identifier for this statement + * (Mandatory) **namespace** is the only statement parameter that + specifies the Pulsar namespace under the tenant created by statement + s1. Its name can either be dynamically bound or statically assigned. + +### 1.4.2. Batch Producer Command Block + +Batch producer command block is used to produce a batch of messages all at +once by one NB cycle execution. A typical format of this command block is +as below: + +```yaml +- name: batch-producer-block + tags: + phase: batch-producer + statements: + - name: s1 + optype: batch-msg-send-start + # For batch producer, "producer_name" should be associated with batch start + batch_producer_name: {batch_producer_name} + ratio: 1 + - name: s2 + optype: batch-msg-send + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } + ratio: 100 + - name: s3 + optype: batch-msg-send-end + ratio: 1 +``` + +This command block has 3 statements (s1, s2, and s3) with the following +ratios: 1, , 1. + +* Statement **s1** is used to mark the start of a batch of message + production within one NB cycle + * (Mandatory) **optype (batch-msg-send-start)** is the statement + identifier for this statement + * (Optional) **batch_producer_name**, when provided, specifies the + Pulsar producer name that is associated with the batch production of + the messages. + * (Optional) **ratio**, when provided, MUST be 1. If not provided, it + is default to 1. +* Statement **s2** is the core statement that generates the message key + and payload to be put in the batch. + * (Mandatory) **optype (batch-msg-send)** is the statement identifier + for this statement + * (Optional) **msg-key**, when provided, specifies the key of the + generated message + * (Mandatory) **msg-payload** specifies the payload of the generated + message + * (Optional) **ratio**, when provided, specifies the batch size (how + many messages to be put in one batch). If not provided, it is + default to 1. +* Statement **s3** is used to mark the end of a batch within one NB cycle + * (Mandatory) **optype (batch-msg-send-end)** is the statement + identifier for this statement + * (Optional) **ratio**, when provided, MUST be 1. If not provided, it + is default to 1. + +### 1.4.3. Producer Command Block + +This is the regular Pulsar producer command block that produces one Pulsar +message per NB cycle execution. A typical format of this command block is +as below: + +```yaml + - name: producer-block + tags: + phase: producer + statements: + - name: s1 + optype: msg-send + # producer_name: {producer_name} + msg_key: "{mykey}" + msg_value: | + { + "SensorID": "{sensor_id}", + "SensorType": "Temperature", + "ReadingTime": "{reading_time}", + "ReadingValue": {reading_value} + } +``` + +This command block only has 1 statements (s1): + +* Statement **s1** is used to generate the key and payload for one message + * (Mandatory) **optype (msg-send)** is the statement identifier for + this statement + * (Optional) **producer_name**, when provided, specifies the Pulsar + producer name that is associated with the message production. + * (Optional) **msg-key**, when provided, specifies the key of the + generated message + * (Mandatory) **msg-payload** specifies the payload of the generated + message + +### 1.4.4. Consumer Command Block + +This is the regular Pulsar consumer command block that consumes one Pulsar +message per NB cycle execution. A typical format of this command block is +as below: + +```yaml + - name: consumer-block + tags: + phase: consumer + statements: + - name: s1 + optype: msg-consume + topic_names: ", " + # topics_pattern: "" + subscription_name: + subscription_type: + consumer_name: +``` + +This command block only has 1 statements (s1): + +* Statement **s1** is used to consume one message from the Pulsar cluster + and acknowledge it. + * (Mandatory) **optype (msg-consume)** is the statement identifier for + this statement + * (Optional) **topic_names**, when provided, specifies multiple topic + names from which to consume messages. Default to document level + parameter **topic_uri**. + * (Optional) **topics_pattern**, when provided, specifies pulsar + topic regex pattern for multi-topic message consumption + * (Mandatory) **subscription_name** specifies subscription name. + * (Optional) **subscription_type**, when provided, specifies + subscription type. Default to **exclusive** subscription type. + * (Optional) **consumer_name**, when provided, specifies the + associated consumer name. + +### 1.4.5. Reader Command Block + +This is the regular Pulsar reader command block that reads one Pulsar +message per NB cycle execution. A typical format of this command block is +as below: + +```yaml + - name: reader-block + tags: + phase: reader + statements: + - name: s1 + optype: msg-read + reader_name: +``` + +This command block only has 1 statements (s1): + +* Statement **s1** is used to consume one message from the Pulsar cluster + and acknowledge it. + * (Mandatory) **optype (msg-consume)** is the statement identifier for + this statement + * (Optional) **reader_name**, when provided, specifies the associated + consumer name. + +**TBD**: at the moment, the NB Pulsar driver Reader API only supports +reading from the following positions: + +* MessageId.earliest +* MessageId.latest (default) + +A customized reader starting position, as below, is NOT supported yet! + +```java +byte[] msgIdBytes = // Some message ID byte array +MessageId id = MessageId.fromByteArray(msgIdBytes); +Reader reader = pulsarClient.newReader() + .topic(topic) + .startMessageId(id) + .create(); +``` + +## 1.5. Schema Support + +Pulsar has built-in schema support. Other than primitive types, Pulsar +also supports complex types like **Avro**, etc. At the moment, the NB +Pulsar driver provides 2 schema support modes, via the global level schema +related settings as below: -Pulsar has built-in schema support. Other than primitive types, Pulsar also supports complex types like **Avro**, etc. At the moment, the NB Pulsar driver provides 2 schema support modes, via the global level schema related settings as below: * Avro schema: ```properties shcema.type= avro @@ -151,10 +461,9 @@ Pulsar has built-in schema support. Other than primitive types, Pulsar also supp schema.definition= ``` -For the previous Producer block statement example, the **msg-value** +Take the previous Producer command block as an example, the **msg-value** parameter has the value of a JSON string that follows the following Avro -schema definition (e.g. as in the sample schema definition -file: **[iot-example.asvc](activities/iot-example.avsc)**) +schema definition: ```json { "type": "record", @@ -169,43 +478,107 @@ file: **[iot-example.asvc](activities/iot-example.avsc)**) } ``` -## 1.5. Activity Parameters +## 1.6. NB Activity Execution Parameters -At the moment, the following Pulsar driver specific Activity Parameter is -supported: +At the moment, the following NB Pulsar driver **specific** activity +parameters are supported: -- * config= +* service_url= +* config= -## 1.6. Pulsar NB Execution Example +Some other common NB activity parameters are listed as below. Please +reference to NB documentation for more parameters -``` - run type=pulsar -vv cycles=10 config=/config.properties yaml=/pulsar.yaml +* driver=pulsar +* seq=concat (needed for **batch** producer) +* tags=phase: +* threads= +* cycles= +* --report-csv-to + +## 1.7. NB Pulsar Driver Execution Example + +1. Run Pulsar producer API to produce 100K messages using 100 NB threads + +```bash + run driver=pulsar tags=phase:producer threads=100 cycles=100K config=/config.properties yaml=/pulsar.yaml ``` -**NOTE**: +2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads; + put NB execution metrics in a specified metrics folder -* An example of **config.properties** file is [here](activities/config.properties) -* An example of **pulsar.yaml** file is [here](activities/pulsar.yaml) +```bash + run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M config=/config.properties yaml=/pulsar.yaml --report-csv-to +``` + +3. Run Pulsar consumer API to consume (and acknowledge) 100 messages using + one single NB thread. + +```bash + run driver=pulsar tags=phase:consumer cycles=100 config=/config.properties yaml=/pulsar.yaml +``` + +## Appendix A. Template Global Setting File (config.properties) + +```properties +schema.type = +schema.definition = + + +### Pulsar client related configurations - client.xxx +client.connectionTimeoutMs = + + +### Producer related configurations (global) - producer.xxx +producer.producerName = +producer.topicName = +producer.sendTimeoutMs = + + +### Consumer related configurations (global) - consumer.xxx +consumer.topicNames = +consumer.topicsPattern = +consumer.subscriptionName = +consumer.subscriptionType = +consumer.consumerName = +consumer.receiverQueueSize = + + +### Reader related configurations (global) - reader.xxx +reader.topicName = +reader.receiverQueueSize = +reader.readerName = +reader.startMessagePos = +``` --- -# 2. Advanced Driver Features -- TODO: Design Revisit +# 2. TODO : Design Revisit -- Advanced Driver Features -**NOTE**: The following text is based on the original multi-layer API caching design which is not fully implemented at the moment. We need to revisit the original design at some point in order to achieve maximum testing flexibility. +**NOTE**: The following text is based on the original multi-layer API +caching design which is not fully implemented at the moment. We need to +revisit the original design at some point in order to achieve maximum +testing flexibility. -To summarize, the original caching design has the following key requirements: +To summarize, the original caching design has the following key +requirements: * **Requirement 1**: Each NB Pulsar activity is able to launch and cache multiple **client spaces** * **Requirement 2**: Each client space can launch and cache multiple Pulsar operators of the same type (producer, consumer, etc.) +* **Requirement 3**: The size of each Pulsar operator specific cached + space can be configurable. -In the current implementation, only requirement 2 is implemented. Regarding requirement 1, the current implementation only supports one client space per NB Pulsar activity! +In the current implementation, only requirement 2 is implemented. + +* For requirement 1, the current implementation only supports one client + space per NB Pulsar activity +* For requirement 3, the cache space size is not configurable (no limit at + the moment) ## 2.1. Other Activity Parameters -- **url** - The pulsar url to connect to. - - **default** - `url=pulsar://localhost:6650` - **maxcached** - A default value to be applied to `max_clients`, `max_producers`, `max_consumers`. - default: `max_cached=100` @@ -213,13 +586,9 @@ In the current implementation, only requirement 2 is implemented. Regarding requ instances which are allowed to be cached in the NoSQLBench client runtime. The clients cache automatically maintains a cache of unique client instances internally. default: _maxcached_ - -- **max_producers** - Producers cache size (per client instance). Limits - the number of producer instances which are allowed to be cached per - client instance. default: _maxcached_ -- **max_consumers** - Consumers cache size (per client instance). Limits - the number of consumer instances which are allowed to be cached per - client instance. +- **max_operators** - Producers/Consumers/Readers cache size (per client + instance). Limits the number of instances which are allowed to be cached + per client instance. default: _maxcached_ ## 2.2. API Caching diff --git a/driver-pulsar/src/main/resources/topics.md b/driver-pulsar/src/main/resources/topics.md deleted file mode 100644 index 55a67f02a..000000000 --- a/driver-pulsar/src/main/resources/topics.md +++ /dev/null @@ -1,3 +0,0 @@ -# pulsar help topics - -- pulsar