confKeyList;
+
+ // All configuration items with "String" as the value type
+ confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "String");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, confVal);
+ }
+ }
+ }
+
+ // All configuration items with "long" as the value type
+ confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "long");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, Long.valueOf(confVal));
+ }
+ }
+ }
+
+ // All configuration items with "int" as the value type
+ confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "int");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, Integer.valueOf(confVal));
+ }
+ }
+ }
+
+ // All configuration items with "boolean" as the value type
+ confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "boolean");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, Boolean.valueOf(confVal));
+ }
+ }
+ }
+
+ // TODO: So far the above primitive types should be good enough.
+ // Add support for other types when needed
+ }
+
+ private static String getInvalidConfValStr(String confKey, String confVal, String configCategory, String expectedVal) {
+ return "Incorrect value \"" + confVal + "\" for Pulsar " + configCategory +
+ " configuration item of \"" + confKey + "\". Expecting the following value (format): " + expectedVal;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java
new file mode 100644
index 000000000..f929ab25a
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java
@@ -0,0 +1,169 @@
+package io.nosqlbench.adapter.pulsar.util;
+
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import com.codahale.metrics.Counter;
+
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Detects message loss, message duplication and out-of-order message delivery
+ * based on a monotonic sequence number that each received message contains.
+ *
+ * Out-of-order messages are detected with a maximum look behind of 1000 sequence number entries.
+ * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}.
+ */
+public class ReceivedMessageSequenceTracker implements AutoCloseable {
+ private static final int DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 1000;
+ private static final int DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS = 1000;
+ // message out-of-sequence error counter
+ private final Counter msgErrOutOfSeqCounter;
+ // duplicate message error counter
+ private final Counter msgErrDuplicateCounter;
+ // message loss error counter
+ private final Counter msgErrLossCounter;
+ private final SortedSet pendingOutOfSeqNumbers;
+ private final int maxTrackOutOfOrderSequenceNumbers;
+ private final SortedSet skippedSeqNumbers;
+ private final int maxTrackSkippedSequenceNumbers;
+ private long expectedNumber = -1;
+
+ public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
+ this(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter,
+ DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS, DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS);
+ }
+
+ public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter,
+ int maxTrackOutOfOrderSequenceNumbers, int maxTrackSkippedSequenceNumbers) {
+ this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
+ this.msgErrDuplicateCounter = msgErrDuplicateCounter;
+ this.msgErrLossCounter = msgErrLossCounter;
+ this.maxTrackOutOfOrderSequenceNumbers = maxTrackOutOfOrderSequenceNumbers;
+ this.maxTrackSkippedSequenceNumbers = maxTrackSkippedSequenceNumbers;
+ this.pendingOutOfSeqNumbers = new TreeSet<>();
+ this.skippedSeqNumbers = new TreeSet<>();
+ }
+
+ /**
+ * Notifies the tracker about a received sequence number
+ *
+ * @param sequenceNumber the sequence number of the received message
+ */
+ public void sequenceNumberReceived(long sequenceNumber) {
+ if (expectedNumber == -1) {
+ expectedNumber = sequenceNumber + 1;
+ return;
+ }
+
+ if (sequenceNumber < expectedNumber) {
+ if (skippedSeqNumbers.remove(sequenceNumber)) {
+ // late out-of-order delivery was detected
+ // decrease the loss counter
+ msgErrLossCounter.dec();
+ // increment the out-of-order counter
+ msgErrOutOfSeqCounter.inc();
+ } else {
+ msgErrDuplicateCounter.inc();
+ }
+ return;
+ }
+
+ boolean messagesSkipped = false;
+ if (sequenceNumber > expectedNumber) {
+ if (pendingOutOfSeqNumbers.size() == maxTrackOutOfOrderSequenceNumbers) {
+ messagesSkipped = processLowestPendingOutOfSequenceNumber();
+ }
+ if (!pendingOutOfSeqNumbers.add(sequenceNumber)) {
+ msgErrDuplicateCounter.inc();
+ }
+ } else {
+ // sequenceNumber == expectedNumber
+ expectedNumber++;
+ }
+ processPendingOutOfSequenceNumbers(messagesSkipped);
+ cleanUpTooFarBehindOutOfSequenceNumbers();
+ }
+
+ private boolean processLowestPendingOutOfSequenceNumber() {
+ // remove the lowest pending out of sequence number
+ Long lowestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
+ pendingOutOfSeqNumbers.remove(lowestOutOfSeqNumber);
+ if (lowestOutOfSeqNumber > expectedNumber) {
+ // skip the expected number ahead to the number after the lowest sequence number
+ // increment the counter with the amount of sequence numbers that got skipped
+ // keep track of the skipped sequence numbers to detect late out-of-order message delivery
+ for (long l = expectedNumber; l < lowestOutOfSeqNumber; l++) {
+ msgErrLossCounter.inc();
+ skippedSeqNumbers.add(l);
+ if (skippedSeqNumbers.size() > maxTrackSkippedSequenceNumbers) {
+ skippedSeqNumbers.remove(skippedSeqNumbers.first());
+ }
+ }
+ expectedNumber = lowestOutOfSeqNumber + 1;
+ return true;
+ } else {
+ msgErrLossCounter.inc();
+ }
+ return false;
+ }
+
+ private void processPendingOutOfSequenceNumbers(boolean messagesSkipped) {
+ // check if there are previously received out-of-order sequence number that have been received
+ while (pendingOutOfSeqNumbers.remove(expectedNumber)) {
+ expectedNumber++;
+ if (!messagesSkipped) {
+ msgErrOutOfSeqCounter.inc();
+ }
+ }
+ }
+
+ private void cleanUpTooFarBehindOutOfSequenceNumbers() {
+ // remove sequence numbers that are too far behind
+ for (Iterator iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) {
+ Long number = iterator.next();
+ if (number < expectedNumber - maxTrackOutOfOrderSequenceNumbers) {
+ msgErrLossCounter.inc();
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Handles the possible pending out of sequence numbers. Mainly needed in unit tests to assert the
+ * counter values.
+ */
+ @Override
+ public void close() {
+ while (!pendingOutOfSeqNumbers.isEmpty()) {
+ processPendingOutOfSequenceNumbers(processLowestPendingOutOfSequenceNumber());
+ }
+ }
+
+ public int getMaxTrackOutOfOrderSequenceNumbers() {
+ return maxTrackOutOfOrderSequenceNumbers;
+ }
+
+ public int getMaxTrackSkippedSequenceNumbers() {
+ return maxTrackSkippedSequenceNumbers;
+ }
+}
diff --git a/adapter-pulsar/src/main/resources/bindingtest.yaml b/adapter-pulsar/src/main/resources/bindingtest.yaml
new file mode 100644
index 000000000..e687f5fa4
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/bindingtest.yaml
@@ -0,0 +1,4 @@
+bindings:
+ tenant: Mod(100); Div(10); ToString(); Prefix("tnt")
+ namespace: Mod(10); Div(5); ToString(); Prefix("ns")
+ topic: Mod(5); ToString(); Prefix("tp")
diff --git a/adapter-pulsar/src/main/resources/config.properties b/adapter-pulsar/src/main/resources/config.properties
new file mode 100644
index 000000000..7afb0252b
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/config.properties
@@ -0,0 +1,43 @@
+9### Schema related configurations - schema.xxx
+# valid types:
+# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
+# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
+# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
+# avro, json, protobuf
+#
+# TODO: as a starting point, only supports the following types
+# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
+# 2) Avro for messages with schema
+#schema.key.type=avro
+#schema.key.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-key-example.avsc
+#schema.type=avro
+#schema.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-value-example.avsc
+schema.type=
+schema.definition=
+
+
+### Pulsar client related configurations - client.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#client
+client.connectionTimeoutMs=5000
+client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
+# Cluster admin
+client.authParams=
+
+
+### Producer related configurations (global) - producer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
+producer.sendTimeoutMs=
+producer.blockIfQueueFull=true
+
+
+### Consumer related configurations (global) - consumer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
+consumer.subscriptionInitialPosition=Earliest
+consumer.ackTimeoutMillis=10000
+consumer.regexSubscriptionMode=AllTopics
+consumer.deadLetterPolicy={"maxRedeliverCount":"5","retryLetterTopic":"public/default/retry","deadLetterTopic":"public/default/dlq","initialSubscriptionName":"dlq-sub"}
+consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"10","maxDelayMs":"20","multiplier":"1.2"}
+
+
+### Reader related configurations (global) - reader.xxx
+# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
diff --git a/adapter-pulsar/src/main/resources/iot-key-example.avsc b/adapter-pulsar/src/main/resources/iot-key-example.avsc
new file mode 100644
index 000000000..f36b52bc3
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/iot-key-example.avsc
@@ -0,0 +1,9 @@
+{
+ "type": "record",
+ "name": "IotSensorKey",
+ "namespace": "TestNS",
+ "fields" : [
+ {"name": "Location", "type": "string"},
+ {"name": "WellID", "type": "string"}
+ ]
+}
diff --git a/adapter-pulsar/src/main/resources/iot-value-example.avsc b/adapter-pulsar/src/main/resources/iot-value-example.avsc
new file mode 100644
index 000000000..20bb894fd
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/iot-value-example.avsc
@@ -0,0 +1,11 @@
+{
+ "type": "record",
+ "name": "IotSensor",
+ "namespace": "TestNS",
+ "fields" : [
+ {"name": "SensorID", "type": "string"},
+ {"name": "SensorType", "type": "string"},
+ {"name": "ReadingTime", "type": "string"},
+ {"name": "ReadingValue", "type": "float"}
+ ]
+}
diff --git a/adapter-pulsar/src/main/resources/pulsar.md b/adapter-pulsar/src/main/resources/pulsar.md
new file mode 100644
index 000000000..17b57dc6e
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/pulsar.md
@@ -0,0 +1,253 @@
+- [1. Overview](#1-overview)
+ - [1.1. Issues Tracker](#11-issues-tracker)
+- [2. Execute the NB Pulsar Driver Workload](#2-execute-the-nb-pulsar-driver-workload)
+ - [2.1. NB Pulsar Driver Yaml File High Level Structure](#21-nb-pulsar-driver-yaml-file-high-level-structure)
+ - [2.2. NB Pulsar Driver Configuration Parameters](#22-nb-pulsar-driver-configuration-parameters)
+ - [2.2.1. Global Level Parameters](#221-global-level-parameters)
+ - [2.2.2. Document Level Parameters](#222-document-level-parameters)
+- [3. NB Pulsar Driver OpTemplates](#3-nb-pulsar-driver-optemplates)
+- [4. Message Generation and Schema Support](#4-message-generation-and-schema-support)
+ - [4.1. Message Generation](#41-message-generation)
+ - [4.2. Schema Support](#42-schema-support)
+
+# 1. Overview
+
+This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
+* Admin API - create/delete tenants
+* Admin API - create/delete namespaces
+* Admin API - create/delete topics
+ * Topics can be partitioned or non-partitioned
+* Producer - publish messages with schema support
+ * Default schema type is byte[]
+ * Avro schema and KeyValue schema are also supported
+* Consumer - consume messages with schema support and the following support
+ * Different subscription types
+ * Multi-topic subscription (including Topic patterns)
+ * Subscription initial position
+ * Dead letter topic policy
+ * Negative acknowledgement and acknowledgement timeout redelivery backoff policy
+
+
+## 1.1. Issues Tracker
+
+If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
+
+# 2. Execute the NB Pulsar Driver Workload
+
+In order to run a NB Pulsar driver workload, it follows similar command as other NB driver types. But it does have its unique execution parameters. The general command has the following format:
+
+```shell
+ run driver=pulsar threads= cycles= web_url= service_url= config= yaml= []
+```
+
+In the above command, make sure the driver type is **pulsar** and provide the following Pulsar driver specific parameters:
+* ***web_url***: Pulsar web service url and default to "http://localhost:8080"
+* ***service_url***: Pulsar native protocol service url and default to "pulsar://localhost:6650"
+* ***config***: Pulsar schema/client/producer/consumer related configuration (as a property file)
+
+## 2.1. NB Pulsar Driver Yaml File High Level Structure
+
+Just like other NB driver types, the actual NB Pulsar workload is defined in a YAML file with the following high level structure:
+
+```yaml
+description: |
+ ...
+
+bindings:
+ ...
+
+params:
+ ...
+
+blocks:
+ :
+ ops:
+ op1:
+ : ""
+ : ""
+ : ""
+ ...
+
+ :
+ ...
+```
+
+* ***description***: This is an (optional) section where to provide general description of the Pulsar NB workload defined in this file.
+* ***bindings***: This section defines all NB bindings that are required in all OpTemplate blocks
+* ***params***: This section defines **Document level** configuration parameters that apply to all OpTemplate blocks.
+* ***blocks***: This section defines the OpTemplate blocks that are needed to execute Pulsar specific workloads. Each OpTemplate block may contain multiple OpTemplates.
+
+## 2.2. NB Pulsar Driver Configuration Parameters
+
+The NB Pulsar driver configuration parameters can be set at 3 different levels:
+* Global level
+* Document level
+ * The parameters at this level are those within a NB yaml file that impact all OpTemplates
+* Op level (or Cycle level)
+ * The parameters at this level are those within a NB yaml file that are associated with each individual OpTemplate
+
+Please **NOTE** that when a parameter is specified at multiple levels, the one at the lowest level takes precedence.
+
+### 2.2.1. Global Level Parameters
+
+The parameters at this level are those listed in the command line config properties file.
+
+The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) complete its workloads such as creating/deleting tenants/namespaces/topics, generating messages, creating producers to send messages, and creating consumers to receive messages. The Pulsar client API has different configuration parameters to control the execution behavior. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration parameters for how a Pulsar producer can be created.
+
+All these Pulsar "native" parameters are supported by the NB Pulsar driver, via the global configuration properties file (e.g. **config.properties**). An example of the structure of this file looks like below:
+
+```properties
+### Schema related configurations - MUST start with prefix "schema."
+#schema.key.type=avro
+#schema.key.definition=
+schema.type=avro
+schema.definition=
+
+### Pulsar client related configurations - MUST start with prefix "client."
+# http://pulsar.apache.org/docs/en/client-libraries-java/#client
+client.connectionTimeoutMs=5000
+client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
+client.authParams=
+# ...
+
+### Producer related configurations (global) - MUST start with prefix "producer."
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
+producer.sendTimeoutMs=
+producer.blockIfQueueFull=true
+# ...
+
+### Consumer related configurations (global) - MUST start with prefix "consumer."
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
+consumer.subscriptionInitialPosition=Earliest
+consumer.deadLetterPolicy={"maxRedeliverCount":"5","retryLetterTopic":"public/default/retry","deadLetterTopic":"public/default/dlq","initialSubscriptionName":"dlq-sub"}
+consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"10","maxDelayMs":"20","multiplier":"1.2"}
+# ...
+```
+
+There are multiple sections in this file that correspond to different
+categories of the configuration parameters:
+* **`Pulsar Schema` related settings**:
+ * All settings under this section starts with **schema.** prefix.
+ * At the moment, there are 3 schema types supported
+ * Default raw ***byte[]***
+ * Avro schema for the message payload
+ * KeyValue based Avro schema for both message key and message payload
+* **`Pulsar Client` related settings**:
+ * All settings under this section starts with **client.** prefix.
+ * This section defines all configuration parameters that are related with defining a PulsarClient object.
+ * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
+* **`Pulsar Producer` related settings**:
+ * All settings under this section starts with **producer** prefix.
+ * This section defines all configuration parameters that are related with defining a Pulsar Producer object.
+ * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
+* **`Pulsar Consumer` related settings**:
+ * All settings under this section starts with **consumer** prefix.
+ * This section defines all configuration parameters that are related with defining a Pulsar Consumer object.
+ * See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
+
+### 2.2.2. Document Level Parameters
+
+For the Pulsar NB driver, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported:
+
+* ***async_api*** (boolean):
+ * When true, use async Pulsar client API.
+* ***use_transaction*** (boolean):
+ * When true, use Pulsar transaction.
+* ***admin_delop*** (boolean):
+ * When true, delete Tenants/Namespaces/Topics. Otherwise, create them.
+ * Only applicable to administration related operations
+* ***seq_tracking*** (boolean):
+ * When true, a sequence number is created as part of each message's properties
+ * This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully.
+* ***seqerr_simu***:
+ * A list of error simulation types separated by comma (,)
+ * Valid error simulation types
+ * `out_of_order`: simulate message out of sequence
+ * `msg_loss`: simulate message loss
+ * `msg_dup`: simulate message duplication
+* ***e2e_starting_time_source***:
+ * Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch.
+ * The possible values for `e2e_starting_time_source`:
+ * `message_publish_time` : uses the message publishing timestamp as the starting time
+ * `message_event_time` : uses the message event timestamp as the starting time
+ * `message_property_e2e_starting_time` : uses a message property `e2e_starting_time` as the starting time.
+
+# 3. NB Pulsar Driver OpTemplates
+
+For the NB Pulsar driver, each OpTemplate has the following format:
+```yaml
+blocks:
+ :
+ ops:
+ :
+ :
+ : ""
+ : ""
+ ...
+```
+
+The `OpTypeIdentifier` determines which NB Pulsar workload type (`OpType`) to run, and it has the following value:
+```java
+public enum PulsarOpType {
+ AdminTenant,
+ AdminNamespace,
+ AdminTopic,
+ MessageProduce,
+ MessageConsume;
+}
+```
+
+Its value is mandatory and depending on the actual identifier, its value can be one of the following:
+* ***Tenant name***: for `AdminTenant` type
+* ***Namespace name***: for `AdminNamespace` type and in format "/"
+* ***Topic name***: for the rest of the types and in format [(persistent|non-persistent)://]//
+ is mandatory for each NB Pulsar operation type
+
+Each Pulsar `OpType` may have optional Op specific parameters. Please refer to [here](yaml_examples) for the example NB Pulsar YAML files for each OpType
+
+# 4. Message Generation and Schema Support
+
+## 4.1. Message Generation
+
+A Pulsar message has three main components: message key, message properties, and message payload. Among them, message payload is mandatory when creating a message.
+
+When running the "message producing" workload, the NB Pulsar driver is able to generate a message with its full content via the following OpTemplate level parameters:
+* `msg_key`: defines message key value
+* `msg_property`: defines message property values
+* `msg_value`: defines message payload value
+
+The actual values of them can be static or dynamic (which are determined by NB data binding rules)
+
+For `msg_key`, its value can be either
+* a plain text string, or
+* a JSON string that follows the specified "key" Avro schema (when KeyValue schema is used)
+
+For `msg_property`, its value needs to be a JSON string that contains a list of key-value pairs. An example is as below. Please **NOTE** that if the provided value is not a valid JSON string, the NB Pulsar driver will ignore it and treat the message as having no properties.
+```
+ msg_property: |
+ {
+ "prop1": "{myprop1}",
+ "prop2": "{myprop2}"
+ }
+```
+
+For `msg_value`, its value can be either
+* a plain simple text, or
+* a JSON string that follows the specified "value" Avro schema (when Avro schema or KeyValue schema is used)
+
+## 4.2. Schema Support
+
+The NB Pulsar driver supports the following Pulsar schema types:
+* Primitive schema types
+* Avro schema type (only for message payload - `msg_value`)
+* KeyValue schema type (with both key and value follows an Avro schema)
+
+The following 2 global configuration parameters define the required schema type
+* `schema.key.type`: defines message key type
+* `schema.type`: defines message value type
+ For them, if the parameter value is not specified, it means using the default `byte[]/BYTES` type as the schema type. Otherwise, if it is specified as "avro", it means using Avro as the schema type.
+
+The following 2 global configuration parameters define the schema specification (**ONLY** needed when Avro is the schema type)
+* `schema.key.definition`: a file path that defines the message key Avro schema specification
+* `schema.definition`: a file path the message value Avro schema specification
+ The NB Pulsar driver will throw an error if the schema type is Avro but no schema specification definition file is not provided or is not valid.
diff --git a/adapter-pulsar/src/main/resources/yaml_examples/admin_namespace.yaml b/adapter-pulsar/src/main/resources/yaml_examples/admin_namespace.yaml
new file mode 100644
index 000000000..b99073d31
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/yaml_examples/admin_namespace.yaml
@@ -0,0 +1,14 @@
+bindings:
+ # 20 topics: 10 tenants, 2 namespaces/tenant
+ tenant: Mod(20); Div(2); ToString(); Prefix("tnt")
+ namespace: Mod(2); ToString(); Prefix("ns")
+
+params:
+ async_api: "false"
+ admin_delop: "false"
+
+blocks:
+ admin-namespace-block:
+ ops:
+ op1:
+ AdminNamespace: "{tenant}/{namespace}"
diff --git a/adapter-pulsar/src/main/resources/yaml_examples/admin_tenant.yaml b/adapter-pulsar/src/main/resources/yaml_examples/admin_tenant.yaml
new file mode 100644
index 000000000..8564fb2f0
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/yaml_examples/admin_tenant.yaml
@@ -0,0 +1,15 @@
+bindings:
+ # 10 tenants
+ tenant: Mod(10); ToString(); Prefix("tnt")
+
+params:
+ async_api: "false"
+ admin_delop: "false"
+
+blocks:
+ admin-tenant-block:
+ ops:
+ op1:
+ AdminTopic: "{tenant}"
+ admin_roles: ""
+ allowed_clusters: ""
diff --git a/adapter-pulsar/src/main/resources/yaml_examples/admin_topic.yaml b/adapter-pulsar/src/main/resources/yaml_examples/admin_topic.yaml
new file mode 100644
index 000000000..ce742a782
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/yaml_examples/admin_topic.yaml
@@ -0,0 +1,17 @@
+bindings:
+ # 100 topics: 10 tenants, 2 namespaces/tenant, 5 topics/namespace
+ tenant: Mod(100); Div(10); ToString(); Prefix("tnt")
+ namespace: Mod(10); Div(5); ToString(); Prefix("ns")
+ topic: Mod(5); ToString(); Prefix("tp")
+
+params:
+ async_api: "false"
+ admin_delop: "false"
+
+blocks:
+ admin-topic-block:
+ ops:
+ op1:
+ AdminTopic: "{tenant}/{namespace}/{topic}"
+ enable_partition: "false"
+ partition_num: "5"
diff --git a/adapter-pulsar/src/main/resources/yaml_examples/msg_recv.yaml b/adapter-pulsar/src/main/resources/yaml_examples/msg_recv.yaml
new file mode 100644
index 000000000..364de373c
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/yaml_examples/msg_recv.yaml
@@ -0,0 +1,11 @@
+params:
+ async_api: "true"
+
+blocks:
+ msg-consume-block:
+ ops:
+ op1:
+ MessageConsume: "tnt0/ns0/tp0"
+ consumerName: ""
+ subscriptionName: "mynbsub"
+ subscriptionType: "shared"
diff --git a/adapter-pulsar/src/main/resources/yaml_examples/msg_send_avro.yaml b/adapter-pulsar/src/main/resources/yaml_examples/msg_send_avro.yaml
new file mode 100644
index 000000000..b0a585ccd
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/yaml_examples/msg_send_avro.yaml
@@ -0,0 +1,29 @@
+bindings:
+ location: Cities();
+ well_id: ToUUID();ToString();
+ sensor_id: ToUUID();ToString();
+ reading_time: ToDateTime();
+ reading_value: ToFloat(100);
+
+params:
+ async_api: "true"
+
+blocks:
+ msg-produce-block:
+ ops:
+ op1:
+ MessageProduce: "tnt0/ns0/tp1"
+ producerName: ""
+ msg_key: |
+ {
+ "Location": "{location}",
+ "WellID": "{well_id}"
+ }
+ msg_properties: ""
+ msg_value: |
+ {
+ "SensorID": "{sensor_id}",
+ "SensorType": "Temperature",
+ "ReadingTime": "{reading_time}",
+ "ReadingValue": {reading_value}
+ }
diff --git a/adapter-pulsar/src/main/resources/yaml_examples/msg_send_kvraw.yaml b/adapter-pulsar/src/main/resources/yaml_examples/msg_send_kvraw.yaml
new file mode 100644
index 000000000..0886948cc
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/yaml_examples/msg_send_kvraw.yaml
@@ -0,0 +1,23 @@
+bindings:
+ # message key, property and value
+ mykey: NumberNameToString()
+ int_prop_val: ToString(); Prefix("IntProp_")
+ text_prop_val: AlphaNumericString(5); Prefix("TextProp_")
+ myvalue: AlphaNumericString(20)
+
+# document level parameters that apply to all Pulsar client types:
+params:
+ async_api: "true"
+
+blocks:
+ msg-produce-block:
+ ops:
+ op1:
+ MessageProduce: "tnt0/ns0/tp0"
+ msg_key: "{mykey}"
+ msg_prop: |
+ {
+ "prop1": "{int_prop_val}",
+ "prop2": "{text_prop_val}"
+ }
+ msg_value: "{myvalue}"
diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java
index 740ef3348..d3a592e76 100644
--- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java
+++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java
@@ -83,12 +83,16 @@ public abstract class BaseOpDispenser implements OpDispenser
@Override
public abstract T apply(long cycle);
+ protected String getDefaultMetricsPrefix(ParsedOp pop) {
+ return pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--";
+ }
+
private void configureInstrumentation(ParsedOp pop) {
this.instrument = pop.takeStaticConfigOr("instrument", false);
if (instrument) {
- this.successTimer = ActivityMetrics.timer(pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--success");
- this.errorTimer = ActivityMetrics.timer(pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--error");
- this.resultSizeHistogram = ActivityMetrics.histogram(pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--resultset-size");
+ this.successTimer = ActivityMetrics.timer(getDefaultMetricsPrefix(pop) + "success");
+ this.errorTimer = ActivityMetrics.timer(getDefaultMetricsPrefix(pop) + "error");
+ this.resultSizeHistogram = ActivityMetrics.histogram(getDefaultMetricsPrefix(pop) + "resultset-size");
}
}
diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java
index 1eba69f2a..3e4ffef9d 100644
--- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java
+++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/uniform/BaseDriverAdapter.java
@@ -20,6 +20,8 @@ import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.engine.api.activityimpl.uniform.fieldmappers.FieldDestructuringMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@@ -29,7 +31,8 @@ import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
-public abstract class BaseDriverAdapter implements DriverAdapter, NBConfigurable, NBReconfigurable {
+public abstract class BaseDriverAdapter implements DriverAdapter, NBConfigurable, NBReconfigurable {
+ private final static Logger logger = LogManager.getLogger("ADAPTER");
private DriverSpaceCache extends S> spaceCache;
private NBConfiguration cfg;
@@ -43,22 +46,22 @@ public abstract class BaseDriverAdapter implements DriverAdapter
*/
@Override
public final Function