diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index e40da3ad4..669bc3c74 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -28,8 +28,6 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class); private final LongFunction> consumerFunc; - private final LongFunction topicMsgDedupFunc; - private final LongFunction subscriptionTypeFunc; private final boolean e2eMsProc; public PulsarConsumerMapper(CommandTemplate cmdTpl, @@ -39,14 +37,10 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { LongFunction useTransactionFunc, LongFunction seqTrackingFunc, LongFunction> transactionSupplierFunc, - LongFunction topicMsgDedupFunc, LongFunction> consumerFunc, - LongFunction subscriptionTypeFunc, boolean e2eMsgProc) { super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc); this.consumerFunc = consumerFunc; - this.topicMsgDedupFunc = topicMsgDedupFunc; - this.subscriptionTypeFunc = subscriptionTypeFunc; this.e2eMsProc = e2eMsgProc; } @@ -57,22 +51,16 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { boolean asyncApi = asyncApiFunc.apply(value); boolean useTransaction = useTransactionFunc.apply(value); Supplier transactionSupplier = transactionSupplierFunc.apply(value); - boolean topicMsgDedup = topicMsgDedupFunc.apply(value); - String subscriptionType = subscriptionTypeFunc.apply(value); return new PulsarConsumerOp( - this, pulsarActivity, asyncApi, useTransaction, seqTracking, transactionSupplier, - topicMsgDedup, consumer, - subscriptionType, clientSpace.getPulsarSchema(), clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), - value, e2eMsProc, this::getReceivedMessageSequenceTracker); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index cf5155c51..67eb472c5 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -23,7 +23,6 @@ public class PulsarConsumerOp implements PulsarOp { private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class); - private final PulsarConsumerMapper consumerMapper; private final PulsarActivity pulsarActivity; private final boolean asyncPulsarOp; @@ -31,13 +30,10 @@ public class PulsarConsumerOp implements PulsarOp { private final boolean seqTracking; private final Supplier transactionSupplier; - private final boolean topicMsgDedup; private final Consumer consumer; - private final String subscriptionType; private final Schema pulsarSchema; private final int timeoutSeconds; private final boolean e2eMsgProc; - private final long curCycleNum; private final Counter bytesCounter; private final Histogram messageSizeHistogram; @@ -48,22 +44,17 @@ public class PulsarConsumerOp implements PulsarOp { private final Function receivedMessageSequenceTrackerForTopic; public PulsarConsumerOp( - PulsarConsumerMapper consumerMapper, PulsarActivity pulsarActivity, boolean asyncPulsarOp, boolean useTransaction, boolean seqTracking, Supplier transactionSupplier, - boolean topicMsgDedup, Consumer consumer, - String subscriptionType, Schema schema, int timeoutSeconds, - long curCycleNum, boolean e2eMsgProc, Function receivedMessageSequenceTrackerForTopic) { - this.consumerMapper = consumerMapper; this.pulsarActivity = pulsarActivity; this.asyncPulsarOp = asyncPulsarOp; @@ -71,12 +62,9 @@ public class PulsarConsumerOp implements PulsarOp { this.seqTracking = seqTracking; this.transactionSupplier = transactionSupplier; - this.topicMsgDedup = topicMsgDedup; this.consumer = consumer; - this.subscriptionType = subscriptionType; this.pulsarSchema = schema; this.timeoutSeconds = timeoutSeconds; - this.curCycleNum = curCycleNum; this.e2eMsgProc = e2eMsgProc; this.bytesCounter = pulsarActivity.getBytesCounter(); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index ecb14d8cf..6842a8af7 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -129,17 +129,6 @@ public class ReadyPulsarOp implements OpDispenser { } logger.info("seq_tracking: {}", seqTrackingFunc.apply(0)); - // Doc-level parameter: msg_dedup_broker - LongFunction brokerMsgDedupFunc = (l) -> false; - if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label)) { - if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label)) - brokerMsgDedupFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label)); - else - throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label + "\" parameter cannot be dynamic!"); - } - logger.info("msg_dedup_broker: {}", seqTrackingFunc.apply(0)); - - // TODO: Complete implementation for websocket-producer and managed-ledger // Admin operation: create/delete tenant if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) { @@ -165,7 +154,6 @@ public class ReadyPulsarOp implements OpDispenser { asyncApiFunc, useTransactionFunc, seqTrackingFunc, - brokerMsgDedupFunc, false); } // Regular/non-admin operation: single message consuming from multiple-topics (consumer) @@ -175,8 +163,7 @@ public class ReadyPulsarOp implements OpDispenser { topicUriFunc, asyncApiFunc, useTransactionFunc, - seqTrackingFunc, - brokerMsgDedupFunc); + seqTrackingFunc); } // Regular/non-admin operation: single message consuming a single topic (reader) else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) { @@ -206,7 +193,6 @@ public class ReadyPulsarOp implements OpDispenser { asyncApiFunc, useTransactionFunc, seqTrackingFunc, - brokerMsgDedupFunc, true); } // Invalid operation type @@ -425,7 +411,6 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction async_api_func, LongFunction useTransactionFunc, LongFunction seqTrackingFunc, - LongFunction brokerMsgDupFunc, boolean e2eMsgProc ) { LongFunction subscription_name_func; @@ -458,12 +443,6 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction> transactionSupplierFunc = (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? - // TODO: Ignore namespace and topic level dedup check on the fly - // this will impact the consumer performance significantly - // Consider using caching or Memoizer in the future? - // (https://www.baeldung.com/guava-memoizer) - LongFunction topicMsgDedupFunc = brokerMsgDupFunc; - LongFunction> consumerFunc = (l) -> clientSpace.getConsumer( topic_uri_func.apply(l), @@ -480,9 +459,7 @@ public class ReadyPulsarOp implements OpDispenser { useTransactionFunc, seqTrackingFunc, transactionSupplierFunc, - topicMsgDedupFunc, consumerFunc, - subscription_type_func, e2eMsgProc); } @@ -491,8 +468,7 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction topic_uri_func, LongFunction async_api_func, LongFunction useTransactionFunc, - LongFunction seqTrackingFunc, - LongFunction brokerMsgDupFunc + LongFunction seqTrackingFunc ) { // Topic list (multi-topic) LongFunction topic_names_func; @@ -562,17 +538,7 @@ public class ReadyPulsarOp implements OpDispenser { useTransactionFunc, seqTrackingFunc, transactionSupplierFunc, - // For multi-topic subscription message consumption, - // - Only consider broker-level message deduplication setting - // - Ignore namespace- and topic-level message deduplication setting - // - // This is because Pulsar is able to specify a list of topics from - // different namespaces. In theory, we can get topic deduplication - // status from each message, but this will be too much overhead. - // e.g. pulsarAdmin.getPulsarAdmin().topics().getDeduplicationStatus(message.getTopicName()) - brokerMsgDupFunc, mtConsumerFunc, - subscription_type_func, false); } diff --git a/driver-pulsar/src/main/resources/activities/pulsar_admin_namespace.yaml b/driver-pulsar/src/main/resources/activities/pulsar_admin_namespace.yaml index 49ca4e90d..0a75413d3 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar_admin_namespace.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar_admin_namespace.yaml @@ -6,7 +6,7 @@ bindings: params: # "true" - asynchronous Pulsar Admin API # "false" - synchronous Pulsar Admin API - async_api: "true" + async_api: "false" # "true" - delete namespace # "false" - create namespace admin_delop: "false" diff --git a/driver-pulsar/src/main/resources/activities/pulsar_admin_topic.yaml b/driver-pulsar/src/main/resources/activities/pulsar_admin_topic.yaml index 186983690..5d2dd567d 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar_admin_topic.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar_admin_topic.yaml @@ -8,7 +8,7 @@ params: topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" # "true" - asynchronous Pulsar Admin API # "false" - synchronous Pulsar Admin API - async_api: "true" + async_api: "false" # "true" - delete topic # "false" - create topic admin_delop: "false" diff --git a/driver-pulsar/src/main/resources/design_revisit.md b/driver-pulsar/src/main/resources/design_revisit.md new file mode 100644 index 000000000..4070a39d3 --- /dev/null +++ b/driver-pulsar/src/main/resources/design_revisit.md @@ -0,0 +1,89 @@ +# TODO : Design Revisit -- Advanced Driver Features + +**NOTE**: The following text is based on the original multi-layer API +caching design which is not fully implemented at the moment. We need to +revisit the original design at some point in order to achieve maximum +testing flexibility. + +To summarize, the original caching design has the following key +requirements: + +* **Requirement 1**: Each NB Pulsar activity is able to launch and cache + multiple **client spaces** +* **Requirement 2**: Each client space can launch and cache multiple + Pulsar operators of the same type (producer, consumer, etc.) +* **Requirement 3**: The size of each Pulsar operator specific cached + space can be configurable. + +In the current implementation, only requirement 2 is implemented. + +* For requirement 1, the current implementation only supports one client + space per NB Pulsar activity +* For requirement 3, the cache space size is not configurable (no limit at + the moment) + +## Other Activity Parameters + +- **maxcached** - A default value to be applied to `max_clients`, + `max_producers`, `max_consumers`. + - default: `max_cached=100` +- **max_clients** - Clients cache size. This is the number of client + instances which are allowed to be cached in the NoSQLBench client + runtime. The clients cache automatically maintains a cache of unique + client instances internally. default: _maxcached_ +- **max_operators** - Producers/Consumers/Readers cache size (per client + instance). Limits the number of instances which are allowed to be cached + per client instance. default: _maxcached_ + +## API Caching + +This driver is tailored around the multi-tenancy and topic naming scheme +that is part of Apache Pulsar. Specifically, you can create an arbitrary +number of client instances, producers (per client), and consumers (per +client) depending on your testing requirements. + +Further, the topic URI is composed from the provided qualifiers of +`persistence`, `tenant`, `namespace`, and `topic`, or you can provide a +fully-composed value in the `persistence://tenant/namespace/topic` +form. + +### Instancing Controls + +Normative usage of the Apache Pulsar API follows a strictly enforced +binding of topics to producers and consumers. As well, clients may be +customized with different behavior for advanced testing scenarios. There +is a significant variety of messaging and concurrency schemes seen in +modern architectures. Thus, it is important that testing tools rise to the +occasion by letting users configure their testing runtimes to emulate +applications as they are found in practice. To this end, the NoSQLBench +driver for Apache Pulsar provides a set controls within its op template +format which allow for flexible yet intuitive instancing in the client +runtime. This is enabled directly by using nominative variables for +instance names where needed. When the instance names are not provided for +an operation, defaults are used to emulate a simple configuration. + +Since this is a new capability in a NoSQLBench driver, how it works is +explained below: + +When a pulsar cycles is executed, the operation is synthesized from the op +template fields as explained below under _Op Fields_. This happens in a +specific order: + +1. The client instance name is resolved. If a `client` field is provided, + this is taken as the client instance name. If not, it is set + to `default`. +2. The named client instance is fetched from the cache, or created and + cached if it does not yet exist. +3. The topic_uri is resolved. This is the value to be used with + `.topic(...)` calls in the API. The op fields below explain how to + control this value. +4. For _send_ operations, a producer is named and created if needed. By + default, the producer is named after the topic_uri above. You can + override this by providing a value for `producer`. +5. For _recv_ operations, a consumer is named and created if needed. By + default, the consumer is named after the topic_uri above. You can + override this by providing a value for `consumer`. + +The most important detail for understanding the instancing controls is +that clients, producers, and consumers are all named and cached in the +specific order above. diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index f8d50602f..56ecab80c 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -1,38 +1,34 @@ -- [1. NoSQLBench (NB) Pulsar Driver Overview](#1-nosqlbench-nb-pulsar-driver-overview) - - [1.1. Issues Tracker](#11-issues-tracker) - - [1.2. Global Level Pulsar Configuration Settings](#12-global-level-pulsar-configuration-settings) - - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) - - [1.3.1. Configuration Parameter Levels](#131-configuration-parameter-levels) - - [1.4. Pulsar Driver Yaml File - Command Blocks](#14-pulsar-driver-yaml-file---command-blocks) - - [1.4.1. Pulsar Admin API Command Block - Create Tenants](#141-pulsar-admin-api-command-block---create-tenants) - - [1.4.2. Pulsar Admin API Command Block - Create Namespaces](#142-pulsar-admin-api-command-block---create-namespaces) - - [1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular)](#143-pulsar-admin-api-command-block---create-topics-partitioned-or-regular) - - [1.4.4. Batch Producer Command Block](#144-batch-producer-command-block) - - [1.4.5. Producer Command Block](#145-producer-command-block) - - [1.4.6. (Single-Topic) Consumer Command Block](#146-single-topic-consumer-command-block) - - [1.4.7. Reader Command Block](#147-reader-command-block) - - [1.4.8. Multi-topic Consumer Command Block](#148-multi-topic-consumer-command-block) - - [1.4.9. End-to-end Message Processing Command Block](#149-end-to-end-message-processing-command-block) - - [1.5. Message Properties](#15-message-properties) - - [1.6. Schema Support](#16-schema-support) - - [1.7. Measure End-to-end Message Processing Latency](#17-measure-end-to-end-message-processing-latency) - - [1.8. Detect Message Out-of-order, Message Loss, and Message Duplication](#18-detect-message-out-of-order-message-loss-and-message-duplication) - - [1.9. NB Activity Execution Parameters](#19-nb-activity-execution-parameters) - - [1.10. NB Pulsar Driver Execution Example](#110-nb-pulsar-driver-execution-example) - - [1.11. Appendix A. Template Global Setting File (config.properties)](#111-appendix-a-template-global-setting-file-configproperties) -- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - - [2.1. Other Activity Parameters](#21-other-activity-parameters) - - [2.2. API Caching](#22-api-caching) - - [2.2.1. Instancing Controls](#221-instancing-controls) +- [1. Overview](#1-overview) + - [1.1. Issues Tracker](#11-issues-tracker) +- [2. NB Pulsar Driver Yaml File - High Level Structure](#2-nb-pulsar-driver-yaml-file---high-level-structure) +- [3. Global Level Pulsar Configuration Settings](#3-global-level-pulsar-configuration-settings) +- [4. Global, Document, and Statement Level Configuration Items](#4-global-document-and-statement-level-configuration-items) +- [5. NB Pulsar Driver Yaml File - Command Blocks](#5-nb-pulsar-driver-yaml-file---command-blocks) + - [5.1. Pulsar Admin API Command Block - Create/Delete Tenants](#51-pulsar-admin-api-command-block---createdelete-tenants) + - [5.2. Pulsar Admin API Command Block - Create/Delete Namespaces](#52-pulsar-admin-api-command-block---createdelete-namespaces) + - [5.3. Pulsar Admin API Command Block - Create/Delete Topics (Partitioned or Regular)](#53-pulsar-admin-api-command-block---createdelete-topics-partitioned-or-regular) + - [5.4. Batch Producer Command Block](#54-batch-producer-command-block) + - [5.5. Producer Command Block](#55-producer-command-block) + - [5.6. (Single-Topic) Consumer Command Block](#56-single-topic-consumer-command-block) + - [5.7. Reader Command Block](#57-reader-command-block) + - [5.8. Multi-topic Consumer Command Block](#58-multi-topic-consumer-command-block) + - [5.9. End-to-end Message Processing Command Block](#59-end-to-end-message-processing-command-block) +- [6. Message Properties](#6-message-properties) +- [7. Schema Support](#7-schema-support) +- [8. Measure End-to-end Message Processing Latency](#8-measure-end-to-end-message-processing-latency) +- [9. Detect Message Out-of-order, Message Loss, and Message Duplication](#9-detect-message-out-of-order-message-loss-and-message-duplication) +- [10. NB Activity Execution Parameters](#10-nb-activity-execution-parameters) +- [11. NB Pulsar Driver Execution Example](#11-nb-pulsar-driver-execution-example) +- [12. Appendix A. Template Global Setting File (config.properties)](#12-appendix-a-template-global-setting-file-configproperties) -# 1. NoSQLBench (NB) Pulsar Driver Overview +# 1. Overview This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB). -* Admin API - create tenants -* Admin API - create namespaces -* Admin API - create topics -* Producer -* Consumer +* Admin API - create/delete tenants +* Admin API - create/delete namespaces +* Admin API - create/delete topics, partitioned or not +* Producer - publish messages with Avro schema support +* Consumer - consume messages with all subscription types * Reader * (Future) WebSocket Producer * (Future) Managed Ledger @@ -41,68 +37,7 @@ This driver allows you to simulate and run different types of workloads (as belo If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar). -## 1.2. Global Level Pulsar Configuration Settings - -The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) to publish messages to and consume messages from a Pulsar cluster. In order to do so, a [PulsarClient](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient) object needs to be created first in order to establish the connection to the Pulsar cluster; then a workload-specific object (e.g. [Producer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer) or [Consumer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer)) is required in order to execute workload-specific actions (e.g. publishing or consuming messages). - -When creating these objects (e.g. PulsarClient, Producer), there are different configuration options that can be used. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration options when creating a Pulsar Producer object. - -The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below: - -```properties -### Schema related configurations - schema.xxx -schema.type = avro -schema.definition = file:/// - -### Pulsar client related configurations - client.xxx -client.connectionTimeoutMs = 5000 - -### Producer related configurations (global) - producer.xxx -producer.topicName = persistent://public/default/mynbtest -producer.producerName = -producer.sendTimeoutMs = -``` - -There are multiple sections in this file that correspond to different groups of configuration settings: -* **Schema related settings**: - * All settings under this section starts with **schema.** prefix. - * The NB Pulsar driver supports schema-based message publishing and - consuming. This section defines configuration settings that are - schema related. - * There are 2 valid options under this section. - * *schema.type*: Pulsar message schema type. When unset or set as - an empty string, Pulsar messages will be handled in raw *byte[]* - format. The other valid option is **avro** which the Pulsar - message will follow a specific Avro format. - * *schema.definition*: This only applies when an Avro schema type - is specified. The value of this configuration is the (full) file - path that contains the Avro schema definition. -* **Pulsar Client related settings**: - * All settings under this section starts with **client.** prefix. - * This section defines all configuration settings that are related - with defining a PulsarClient object. - * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) -* **Pulsar Producer related settings**: - * All settings under this section starts with **producer** prefix. - * This section defines all configuration settings that are related - with defining a Pulsar Producer object. - * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) -* **Pulsar Consumer related settings**: - * All settings under this section starts with **consumer** prefix. - * This section defines all configuration settings that are related - with defining a Pulsar Consumer object. - * See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) -* **Pulsar Reader related settings**: - * All settings under this section starts with **reader** prefix. - * This section defines all configuration settings that are related - with defining a Pulsar Reader object. - * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) - -In the future, when the support for other types of Pulsar workloads is -added in NB Pulsar driver, there will be corresponding configuration -sections in this file as well. - -## 1.3. NB Pulsar Driver Yaml File - High Level Structure +# 2. NB Pulsar Driver Yaml File - High Level Structure Just like other NB driver types, the actual Pulsar workload generation is determined by the statement blocks in an NB driver Yaml file. Depending @@ -127,9 +62,7 @@ At high level, Pulsar driver yaml file has the following structure: * **seq_tracking**: Whether to do message sequence tracking. This is used for message out-of-order and message loss detection (more on this later). - * **msg_dedup_broker**: Whether or not broker level message deduplication - is enabled. -* **blocks**: includes a series of command blocks. Each command block +* **statement blocks**: includes a series of command blocks. Each command block defines one major Pulsar operation such as *producer*, *consumer*, etc. Right now, the following command blocks are already supported or will be added in the near future. We'll go through each of these command blocks @@ -203,7 +136,68 @@ multiple Pulsar operations in one run! But if we want to focus the testing on one particular operation, we can use the tag to filter the command block as listed above! -### 1.3.1. Configuration Parameter Levels +# 3. Global Level Pulsar Configuration Settings + +The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) to publish messages to and consume messages from a Pulsar cluster. In order to do so, a [PulsarClient](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient) object needs to be created first in order to establish the connection to the Pulsar cluster; then a workload-specific object (e.g. [Producer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer) or [Consumer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer)) is required in order to execute workload related actions (e.g. publishing or consuming messages). + +When creating these objects (e.g. PulsarClient, Producer), there are different configuration options that can be used. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration options when creating a Pulsar Producer object. + +The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below: + +```properties +### Schema related configurations - schema.xxx +schema.type = avro +schema.definition = file:/// + +### Pulsar client related configurations - client.xxx +client.connectionTimeoutMs = 5000 + +### Producer related configurations (global) - producer.xxx +producer.topicName = persistent://public/default/mynbtest +producer.producerName = +producer.sendTimeoutMs = +``` + +There are multiple sections in this file that correspond to different groups of configuration settings: +* **Schema related settings**: + * All settings under this section starts with **schema.** prefix. + * The NB Pulsar driver supports schema-based message publishing and + consuming. This section defines configuration settings that are + schema related. + * There are 2 valid options under this section. + * *schema.type*: Pulsar message schema type. When unset or set as + an empty string, Pulsar messages will be handled in raw *byte[]* + format. The other valid option is **avro** which the Pulsar + message will follow Avro schema format. + * *schema.definition*: This only applies when an Avro schema type + is specified. The value of this configuration is the (full) file + path that contains the Avro schema definition. +* **Pulsar Client related settings**: + * All settings under this section starts with **client.** prefix. + * This section defines all configuration settings that are related + with defining a PulsarClient object. + * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) +* **Pulsar Producer related settings**: + * All settings under this section starts with **producer** prefix. + * This section defines all configuration settings that are related + with defining a Pulsar Producer object. + * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) +* **Pulsar Consumer related settings**: + * All settings under this section starts with **consumer** prefix. + * This section defines all configuration settings that are related + with defining a Pulsar Consumer object. + * See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) +* **Pulsar Reader related settings**: + * All settings under this section starts with **reader** prefix. + * This section defines all configuration settings that are related + with defining a Pulsar Reader object. + * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) + +In the future, when the support for other types of Pulsar workloads is +added in NB Pulsar driver, there will be corresponding configuration +sections in this file as well. + +# 4. Global, Document, and Statement Level Configuration Items The NB Pulsar driver configuration parameters can be set at 3 different levels: @@ -213,13 +207,13 @@ levels: schema.type= ``` * **document level**: parameters that are set within NB yaml file and under - the ***params*** section + the ***params*** section ``` params: topic_uri: ... ``` * **statement level**: parameters that are set within NB yaml file, but -under different block statements + under different block statements ``` - name: producer-block statements: @@ -230,15 +224,15 @@ under different block statements **NOTE**: If one parameter is set at multiple levels (e.g. producer name), the parameter at lower level will take precedence. -## 1.4. Pulsar Driver Yaml File - Command Blocks +# 5. NB Pulsar Driver Yaml File - Command Blocks -### 1.4.1. Pulsar Admin API Command Block - Create Tenants +## 5.1. Pulsar Admin API Command Block - Create/Delete Tenants This Pulsar Admin API Block is used to create or delete Pulsar tenants. It has the following format. Please note that when document level parameter **admin_delop** is set to be -true, then this command block will delete Pulsar tenants instead. Similarly +true, then this command block will delete Pulsar tenants instead. Similarly, this applies to other Admin API blocks for namespace and topic management. ```yaml @@ -266,7 +260,7 @@ In this command block, there is only 1 statement (s1): * (Mandatory) **tenant** is the Pulsar tenant name to be created. It can either be dynamically or statically bound. -### 1.4.2. Pulsar Admin API Command Block - Create Namespaces +## 5.2. Pulsar Admin API Command Block - Create/Delete Namespaces This Pulsar Admin API Block is used to create Pulsar namespaces. It has the following format: @@ -289,7 +283,7 @@ In this command block, there is only 1 statement (s1): * (Mandatory) **namespace** is the Pulsar namespace name to be created under a tenant. It can be either statically or dynamically bound. -### 1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular) +## 5.3. Pulsar Admin API Command Block - Create/Delete Topics (Partitioned or Regular) This Pulsar Admin API Block is used to create Pulsar topics. It has the following format: @@ -318,7 +312,7 @@ In this command block, there is only 1 statement (s1): **NOTE**: The topic name is bound by the document level parameter "topic_uri". -### 1.4.4. Batch Producer Command Block +## 5.4. Batch Producer Command Block Batch producer command block is used to produce a batch of messages all at once by one NB cycle execution. A typical format of this command block is @@ -390,7 +384,7 @@ ratios: 1, , 1. **NOTE**: the topic that the producer needs to publish messages to is specified by the document level parameter ***topic_uri***. -### 1.4.5. Producer Command Block +## 5.5. Producer Command Block This is the regular Pulsar producer command block that produces one Pulsar message per NB cycle execution. A typical format of this command block is @@ -432,15 +426,16 @@ This command block only has 1 statements (s1): of the generated message. It must be a JSON string that contains a series of key-value pairs. * (Mandatory) **msg_payload** specifies the payload of the generated - message + message. If the message schema type is specified as Avro schema type, + then the message value format must be in proper Avro format. **NOTE**: the topic that the producer needs to publish messages to is specified by the document level parameter ***topic_uri***. -### 1.4.6. (Single-Topic) Consumer Command Block +## 5.6. (Single-Topic) Consumer Command Block -This is the regular Pulsar consumer command block that consumes one Pulsar -message from one single Pulsar topic per NB cycle execution. A typical +This is the regular Pulsar consumer command block that consumes messages +from one single Pulsar topic per NB cycle execution. A typical format of this command block is as below: ```yaml @@ -463,18 +458,18 @@ This command block only has 1 statements (s1): this statement * (Mandatory) **subscription_name** specifies subscription name. * (Optional) **subscription_type**, when provided, specifies - subscription type. Default to **exclusive** subscription type. + subscription type. Default to **Exclusive** subscription type. * (Optional) **consumer_name**, when provided, specifies the associated consumer name. -**NOTE**: the single topic that the consumer needs to consume messages from -is specified by the document level parameter ***topic_uri***. +**NOTE**: the single topic that the consumer receives messages from is +specified by the document level parameter ***topic_uri***. -### 1.4.7. Reader Command Block +## 5.7. Reader Command Block -This is the regular Pulsar reader command block that reads one Pulsar -message per NB cycle execution. A typical format of this command block is -as below: +This is the regular Pulsar reader command block that reads messages from +one Pulsar topic per NB cycle execution. A typical format of this command +block is as below: ```yaml - name: reader-block @@ -513,11 +508,11 @@ Reader reader = pulsarClient.newReader() .create(); ``` -### 1.4.8. Multi-topic Consumer Command Block +## 5.8. Multi-topic Consumer Command Block -This is the regular Pulsar consumer command block that consumes one Pulsar -message from multiple Pulsar topics per NB cycle execution. A typical format -of this command block is as below: +This is the Pulsar consumer command block that consumes messages from +multiple Pulsar topics per NB cycle execution. A typical format of +this command block is as below: ```yaml - name: multi-topic-consumer-block @@ -541,22 +536,23 @@ This command block only has 1 statements (s1): * (Mandatory) **optype (msg-consume)** is the statement identifier for this statement * (Optional) **topic_names**, when provided, specifies multiple topic - names from which to consume messages for multi-topic message consumption. + names from which to consume messages. * (Optional) **topics_pattern**, when provided, specifies pulsar topic regex pattern for multi-topic message consumption * (Mandatory) **subscription_name** specifies subscription name. * (Optional) **subscription_type**, when provided, specifies - subscription type. Default to **exclusive** subscription type. + subscription type. Default to **Exclusive** subscription type. * (Optional) **consumer_name**, when provided, specifies the associated consumer name. -**NOTE 1**: when both **topic_names** and **topics_pattern** are provided, +**NOTE 1**: if neither **topic_names** and **topics_pattern** is provided, +consumer topic name is default to the document level parameter **topic_uri**. +Otherwise, the document level parameter **topic_uri** is ignored. + +**NOTE 2**: when both **topic_names** and **topics_pattern** are provided, **topic_names** takes precedence over **topics_pattern**. -**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, -consumer topic name is default to the document level parameter **topic_uri**. - -### 1.4.9. End-to-end Message Processing Command Block +## 5.9. End-to-end Message Processing Command Block End-to-end message processing command block is used to simplify measuring the end-to-end message processing (from being published to being consumed) @@ -600,19 +596,19 @@ ratios: 1, 1. * (Optional) **ratio**, must be 1 when provided. Otherwise, default to 1. * Statement **s2** is used to consume the message that just got published -from the same topic + from the same topic * (Mandatory) **optype (ec2-msg-proc-consume)** is the statement identifier for this statement * (Mandatory) **subscription_name** specifies subscription name. * (Optional) **subscription_type**, when provided, specifies - subscription type. Default to **exclusive** subscription type. + subscription type. Default to **exclusive** subscription type. * (Optional) **ratio**, must be 1 when provided. Otherwise, default to 1. **NOTE**: the topic that the producer needs to publish messages to is specified by the document level parameter ***topic_uri***. -## 1.5. Message Properties +# 6. Message Properties In the producer command block, it is optional to specify message properties: ``` @@ -630,7 +626,7 @@ contains a list of key value pairs. Otherwise, if it is not a valid JSON string as expected, the driver will ignore it and treat the message as having no properties. -## 1.6. Schema Support +# 7. Schema Support Pulsar has built-in schema support. Other than primitive types, Pulsar also supports complex types like **Avro**, etc. At the moment, the NB @@ -648,8 +644,8 @@ related settings as below: ``` Take the previous Producer command block as an example, the **msg-value** -parameter has the value of a JSON string that follows the following Avro -schema definition: +parameter has the value of a JSON string that follows the specified Avro +schema definition, an example of which is as below: ```json { "type": "record", @@ -664,10 +660,10 @@ schema definition: } ``` -## 1.7. Measure End-to-end Message Processing Latency +# 8. Measure End-to-end Message Processing Latency -**e2e-msg-proc-block** measures the end-to-end message latency metrics. It -contains one message producing statement and one message consuming statement. +The built-in **e2e-msg-proc-block** measures the end-to-end message latency metrics. +It contains one message producing statement and one message consuming statement. When the message that is published by the producer is received by the consumer, the consumer calculates the time difference between when the time is received and when the time is published. @@ -675,8 +671,8 @@ and when the time is published. The measured end-to-end message processing latency is captured as a histogram metrics name "e2e_msg_latency". -This command block uses one single machine to act as both a producer and a -consumer. We do so just for convenience purposes. In reality, we can use +This built-in command block uses one single machine to act as both a producer and +a consumer. We do so just for convenience purposes. In reality, we can use **producer-block** and **consumer-block** command blocks on separate machines to achieve the same goal, which is probably closer to the actual use case and probably more accurate measurement (to avoid the situation of always reading @@ -685,11 +681,11 @@ messages from the managed ledger cache). One thing to remember though if we're using multiple machines to measure the end-to-end message processing latency, we need to make sure: 1) The time of the two machines are synced up with each other, e.g. through -NTP protocol. + NTP protocol. 2) If there is some time lag of starting the consumer, we need to count that -into consideration when interpreting the end-to-end message processing latency. + into consideration when interpreting the end-to-end message processing latency. -## 1.8. Detect Message Out-of-order, Message Loss, and Message Duplication +# 9. Detect Message Out-of-order, Message Loss, and Message Duplication In order to detect errors like message out-of-order and message loss through the NB Pulsar driver, we need to set the following document level parameter @@ -701,44 +697,7 @@ params: seq_tracking: "true" ``` -For message duplication detection, if broker level message dedup configuration -is enabled ("brokerDeduplicationEnabled=true" in broker.conf), we also need to -enable this document level parameter: -``` -params: - msg_dedup_broker: "true" -``` - -However, since message dedup. can be also enabled or disabled at namespace level -or topic level, the NB Pulsar driver will also check the settings at these layers -through API. Basically, the final message dedup setting for a topic is determined -by the following rules: -* if topic level message dedup is not set, check namespace level setting -* if namespace level message dedup is not set, check broker level setting which - in turn is determined by the document level NB parameter **msg_dedup_broker** -* if message dedup is enabled at multiple levels, the priority sequence follows: - * topic level > namespace level > broker level - -The logic of how this works is based on the fact that NB execution cycle number -is monotonically increasing by 1 for every cycle moving forward. When publishing -a series of messages, we use the current NB cycle number as one message property -which is also monotonically increasing by 1. - -When receiving the messages, if the message sequence number stored in the message -property is not monotonically increasing or if there is a gap larger than 1, then -it must be one of the following errors: -* If the current message sequence ID is less than the previous message sequence ID, - then it is message out-of-order error. Exception **PulsarMsgOutOfOrderException** - will be thrown out. -* if the current message sequence ID is more than 1 bigger than the previous message - sequence ID, then it is message loss error. Exception **PulsarMsgLossException** - will be thrown out. -* if message dedup is enabled and the current message sequence ID is equal to the - previous message sequence ID, then it is message duplication error. Exception **PulsarMsgDuplicateException** will be thrown out. - -In either case, a runtime error will be thrown out with corresponding error messages. - -## 1.9. NB Activity Execution Parameters +# 10. NB Activity Execution Parameters At the moment, the following NB Pulsar driver **specific** activity parameters are supported: @@ -756,7 +715,7 @@ reference to NB documentation for more parameters * cycles= * --report-csv-to -## 1.10. NB Pulsar Driver Execution Example +# 11. NB Pulsar Driver Execution Example **NOTE**: in the following examples, the Pulsar service URL is **pulsar: //localhost:6650**, please change it accordingly for your own Pulsar @@ -769,7 +728,7 @@ environment. ``` 2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads. -**NOTE**: *seq=* must have **concat** value in order to make the batch API working properly! + **NOTE**: *seq=* must have **concat** value in order to make the batch API working properly! ```bash run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml --report-csv-to ``` @@ -780,8 +739,7 @@ environment. run driver=pulsar tags=phase:consumer cycles=100 web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml ``` - -## 1.11. Appendix A. Template Global Setting File (config.properties) +# 12. Appendix A. Template Global Setting File (config.properties) ```properties schema.type = schema.definition = @@ -812,95 +770,3 @@ reader.receiverQueueSize = reader.readerName = reader.startMessagePos = ``` - ---- - -# 2. TODO : Design Revisit -- Advanced Driver Features - -**NOTE**: The following text is based on the original multi-layer API -caching design which is not fully implemented at the moment. We need to -revisit the original design at some point in order to achieve maximum -testing flexibility. - -To summarize, the original caching design has the following key -requirements: - -* **Requirement 1**: Each NB Pulsar activity is able to launch and cache - multiple **client spaces** -* **Requirement 2**: Each client space can launch and cache multiple - Pulsar operators of the same type (producer, consumer, etc.) -* **Requirement 3**: The size of each Pulsar operator specific cached - space can be configurable. - -In the current implementation, only requirement 2 is implemented. - -* For requirement 1, the current implementation only supports one client - space per NB Pulsar activity -* For requirement 3, the cache space size is not configurable (no limit at - the moment) - -## 2.1. Other Activity Parameters - -- **maxcached** - A default value to be applied to `max_clients`, - `max_producers`, `max_consumers`. - - default: `max_cached=100` -- **max_clients** - Clients cache size. This is the number of client - instances which are allowed to be cached in the NoSQLBench client - runtime. The clients cache automatically maintains a cache of unique - client instances internally. default: _maxcached_ -- **max_operators** - Producers/Consumers/Readers cache size (per client - instance). Limits the number of instances which are allowed to be cached - per client instance. default: _maxcached_ - -## 2.2. API Caching - -This driver is tailored around the multi-tenancy and topic naming scheme -that is part of Apache Pulsar. Specifically, you can create an arbitrary -number of client instances, producers (per client), and consumers (per -client) depending on your testing requirements. - -Further, the topic URI is composed from the provided qualifiers of -`persistence`, `tenant`, `namespace`, and `topic`, or you can provide a -fully-composed value in the `persistence://tenant/namespace/topic` -form. - -### 2.2.1. Instancing Controls - -Normative usage of the Apache Pulsar API follows a strictly enforced -binding of topics to producers and consumers. As well, clients may be -customized with different behavior for advanced testing scenarios. There -is a significant variety of messaging and concurrency schemes seen in -modern architectures. Thus, it is important that testing tools rise to the -occasion by letting users configure their testing runtimes to emulate -applications as they are found in practice. To this end, the NoSQLBench -driver for Apache Pulsar provides a set controls within its op template -format which allow for flexible yet intuitive instancing in the client -runtime. This is enabled directly by using nominative variables for -instance names where needed. When the instance names are not provided for -an operation, defaults are used to emulate a simple configuration. - -Since this is a new capability in a NoSQLBench driver, how it works is -explained below: - -When a pulsar cycles is executed, the operation is synthesized from the op -template fields as explained below under _Op Fields_. This happens in a -specific order: - -1. The client instance name is resolved. If a `client` field is provided, - this is taken as the client instance name. If not, it is set - to `default`. -2. The named client instance is fetched from the cache, or created and - cached if it does not yet exist. -3. The topic_uri is resolved. This is the value to be used with - `.topic(...)` calls in the API. The op fields below explain how to - control this value. -4. For _send_ operations, a producer is named and created if needed. By - default, the producer is named after the topic_uri above. You can - override this by providing a value for `producer`. -5. For _recv_ operations, a consumer is named and created if needed. By - default, the consumer is named after the topic_uri above. You can - override this by providing a value for `consumer`. - -The most important detail for understanding the instancing controls is -that clients, producers, and consumers are all named and cached in the -specific order above.