From 6903a230496d8c652ffe12bfc5dca1c9a5f7d0a5 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Fri, 24 Sep 2021 09:27:43 -0500 Subject: [PATCH] README update --- .../pulsar/ops/PulsarAdminTenantOp.java | 30 +- .../activities/pulsar_admin_tenant.yaml | 4 +- driver-pulsar/src/main/resources/pulsar.md | 387 +++++++++++++----- 3 files changed, 304 insertions(+), 117 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java index 77ad3c8fe..5d0bd3508 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java @@ -52,13 +52,17 @@ public class PulsarAdminTenantOp extends PulsarAdminOp { try { if (!asyncApi) { tenants.createTenant(tenant, tenantInfo); - logger.trace("Successfully created tenant \"" + tenant + "\" synchronously!"); + if (logger.isDebugEnabled()) { + logger.debug("Successful sync creation of tenant {}", tenant); + } } else { CompletableFuture future = tenants.createTenantAsync(tenant, tenantInfo); - future.whenComplete((unused, throwable) -> - logger.trace("Successfully created tenant \"" + tenant + "\" asynchronously!")) - .exceptionally(ex -> { - logger.error("Failed to create tenant \"" + tenant + "\" asynchronously!"); + future.whenComplete((unused, throwable) -> { + if (logger.isDebugEnabled()) { + logger.debug("Successful async creation of tenant {}", tenant); + } + }).exceptionally(ex -> { + logger.error("Failed async creation of tenant {}", tenant); return null; }); } @@ -80,13 +84,19 @@ public class PulsarAdminTenantOp extends PulsarAdminOp { if ( nsNum == 0 ) { if (!asyncApi) { tenants.deleteTenant(tenant); - logger.trace("Successfully deleted tenant \"" + tenant + "\" synchronously!"); + if (logger.isDebugEnabled()) { + logger.debug("Successful sync deletion of tenant {}", tenant); + } } else { CompletableFuture future = tenants.deleteTenantAsync(tenant); - future.whenComplete((unused, throwable) - -> logger.trace("Successfully deleted tenant \"" + tenant + "\" asynchronously!")) - .exceptionally(ex -> { - logger.error("Failed to delete tenant \"" + tenant + "\" asynchronously!"); + future.whenComplete((unused, throwable) -> { + if (logger.isDebugEnabled()) { + logger.debug("Successful async deletion of tenant {}", tenant); + } + }).exceptionally(ex -> { + if (logger.isDebugEnabled()) { + logger.error("Failed async deletion of tenant {}", tenant); + } return null; }); } diff --git a/driver-pulsar/src/main/resources/activities/pulsar_admin_tenant.yaml b/driver-pulsar/src/main/resources/activities/pulsar_admin_tenant.yaml index fa579a31a..90a697ba6 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar_admin_tenant.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar_admin_tenant.yaml @@ -5,10 +5,10 @@ bindings: params: # "true" - asynchronous Pulsar Admin API # "false" - synchronous Pulsar Admin API - async_api: "true" + async_api: "false" # "true" - delete tenant # "false" - create tenant - admin_delop: "false" + admin_delop: "true" blocks: - name: create-tenant-block diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index 9355cc0b0..799c8656c 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -1,24 +1,29 @@ - [1. NoSQLBench (NB) Pulsar Driver Overview](#1-nosqlbench-nb-pulsar-driver-overview) - - [1.1. Issues Tracker](#11-issues-tracker) - - [1.2. Global Level Pulsar Configuration Settings](#12-global-level-pulsar-configuration-settings) - - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) - - [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters) - - [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details) - - [1.4.1. Pulsar Admin API Command Block - Create Tenants](#141-pulsar-admin-api-command-block---create-tenants) - - [1.4.2. Pulsar Admin API Command Block - Create Namespaces](#142-pulsar-admin-api-command-block---create-namespaces) - - [1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular)](#143-pulsar-admin-api-command-block---create-topics-partitioned-or-regular) - - [1.4.4. Batch Producer Command Block](#144-batch-producer-command-block) - - [1.4.5. Producer Command Block](#145-producer-command-block) - - [1.4.6. Consumer Command Block](#146-consumer-command-block) - - [1.4.7. Reader Command Block](#147-reader-command-block) - - [1.5. Schema Support](#15-schema-support) - - [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters) - - [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example) - - [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) + - [1.1. Issues Tracker](#11-issues-tracker) + - [1.2. Global Level Pulsar Configuration Settings](#12-global-level-pulsar-configuration-settings) + - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) + - [1.3.1. Configuration Parameter Levels](#131-configuration-parameter-levels) + - [1.4. Pulsar Driver Yaml File - Command Blocks](#14-pulsar-driver-yaml-file---command-blocks) + - [1.4.1. Pulsar Admin API Command Block - Create Tenants](#141-pulsar-admin-api-command-block---create-tenants) + - [1.4.2. Pulsar Admin API Command Block - Create Namespaces](#142-pulsar-admin-api-command-block---create-namespaces) + - [1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular)](#143-pulsar-admin-api-command-block---create-topics-partitioned-or-regular) + - [1.4.4. Batch Producer Command Block](#144-batch-producer-command-block) + - [1.4.5. Producer Command Block](#145-producer-command-block) + - [1.4.6. (Single-Topic) Consumer Command Block](#146-single-topic-consumer-command-block) + - [1.4.7. Reader Command Block](#147-reader-command-block) + - [1.4.8. Multi-topic Consumer Command Block](#148-multi-topic-consumer-command-block) + - [1.4.9. End-to-end Message Processing Command Block](#149-end-to-end-message-processing-command-block) + - [1.5. Message Properties](#15-message-properties) + - [1.6. Schema Support](#16-schema-support) + - [1.7. Measure End-to-end Message Processing Latency](#17-measure-end-to-end-message-processing-latency) + - [1.8. Detect Message Out-of-order Error and Message Loss](#18-detect-message-out-of-order-error-and-message-loss) + - [1.9. NB Activity Execution Parameters](#19-nb-activity-execution-parameters) + - [1.10. NB Pulsar Driver Execution Example](#110-nb-pulsar-driver-execution-example) + - [1.11. Appendix A. Template Global Setting File (config.properties)](#111-appendix-a-template-global-setting-file-configproperties) - [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - - [2.1. Other Activity Parameters](#21-other-activity-parameters) - - [2.2. API Caching](#22-api-caching) - - [2.2.1. Instancing Controls](#221-instancing-controls) + - [2.1. Other Activity Parameters](#21-other-activity-parameters) + - [2.2. API Caching](#22-api-caching) + - [2.2.1. Instancing Controls](#221-instancing-controls) # 1. NoSQLBench (NB) Pulsar Driver Overview @@ -38,7 +43,7 @@ If you have issues or new requirements for this driver, please add them at the [ ## 1.2. Global Level Pulsar Configuration Settings -The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) to publish and consume messages from the Pulsar cluster. In order to do so, a [PulsarClient](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient) object needs to be created first in order to establish the connection to the Pulsar cluster; then a workload-specific object (e.g. [Producer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer) or [Consumer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer)) is required in order to execute workload-specific actions (e.g. publishing or consuming messages). +The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) to publish messages to and consume messages from a Pulsar cluster. In order to do so, a [PulsarClient](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient) object needs to be created first in order to establish the connection to the Pulsar cluster; then a workload-specific object (e.g. [Producer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer) or [Consumer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer)) is required in order to execute workload-specific actions (e.g. publishing or consuming messages). When creating these objects (e.g. PulsarClient, Producer), there are different configuration options that can be used. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration options when creating a Pulsar Producer object. @@ -70,32 +75,28 @@ There are multiple sections in this file that correspond to different groups of format. The other valid option is **avro** which the Pulsar message will follow a specific Avro format. * *schema.definition*: This only applies when an Avro schema type - is specified and the value is the (full) file path that contains - the Avro schema definition. + is specified. The value of this configuration is the (full) file + path that contains the Avro schema definition. * **Pulsar Client related settings**: * All settings under this section starts with **client.** prefix. * This section defines all configuration settings that are related with defining a PulsarClient object. - * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) + * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) * **Pulsar Producer related settings**: * All settings under this section starts with **producer** prefix. * This section defines all configuration settings that are related with defining a Pulsar Producer object. - * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) + * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) * **Pulsar Consumer related settings**: * All settings under this section starts with **consumer** prefix. * This section defines all configuration settings that are related with defining a Pulsar Consumer object. - * - See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) + * See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) * **Pulsar Reader related settings**: * All settings under this section starts with **reader** prefix. * This section defines all configuration settings that are related with defining a Pulsar Reader object. - * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) + * See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) In the future, when the support for other types of Pulsar workloads is added in NB Pulsar driver, there will be corresponding configuration @@ -104,7 +105,7 @@ sections in this file as well. ## 1.3. NB Pulsar Driver Yaml File - High Level Structure Just like other NB driver types, the actual Pulsar workload generation is -determined by the statement blocks in the NB driver Yaml file. Depending +determined by the statement blocks in an NB driver Yaml file. Depending on the Pulsar workload type, the corresponding statement block may have different contents. @@ -113,12 +114,19 @@ At high level, Pulsar driver yaml file has the following structure: * **description**: (optional) general description of the yaml file * **bindings**: defines NB bindings * **params**: document level Pulsar driver parameters that apply to all - command blocks. Currently there are two valid parameters: + command blocks. Currently, the following parameters are valid at this + level: * **topic_url**: Pulsar topic uri ([persistent|non-persistent]: ////). This can be statically assigned or dynamically generated via NB bindings. * **async_api**: Whether to use asynchronous Pulsar API (**note**: more on this later) + * **use_transaction**: Whether to simulate Pulsar transaction + * **admin_delop**: For Admin tasks, whether to execute delete operation + instead of the default create operation. + * **seq_tracking**: Whether to do message sequence tracking. This is + used for message out-of-order and message loss detection (more on + this later). * **blocks**: includes a series of command blocks. Each command block defines one major Pulsar operation such as *producer*, *consumer*, etc. Right now, the following command blocks are already supported or will be @@ -129,8 +137,12 @@ At high level, Pulsar driver yaml file has the following structure: * (Pulsar Admin API) **create-topic-block**: create/delete topics * (Pulsar Client API) **batch-producer-block**: batch producer * (Pulsar Client API) **producer-block**: producer - * (Pulsar Client API) **consumer-block**: consumer + * (Pulsar Client API) **consumer-block**: consumer (single topic) * (Pulsar Client API) **reader-block**: reader + * (Pulsar Client API) **e2e-msg-proc-block**: keep track of end-to-end + message latency (histogram) + * (Pulsar Client API) **multi-topic-consumer-block**: consumer (multi- + topic) ```yaml description: | @@ -186,66 +198,43 @@ multiple Pulsar operations in one run! But if we want to focus the testing on one particular operation, we can use the tag to filter the command block as listed above! -### 1.3.1. NB Cycle Level Parameters vs. Global Level Parameters +### 1.3.1. Configuration Parameter Levels -Some parameters, especially topic name and producer/consumer/reader/etc. -name, can be set at the global level in **config.properties** file, or at -NB cycle level via **pulsar.yaml** file. An example of setting a topic -name in both levels is as below: +The NB Pulsar driver configuration parameters can be set at 3 different +levels: -```bash -# Global level setting (config.properties): -producer.topicName = ... - -# Cycle level setting (pulsar.yaml) +* **global level**: parameters that are set in ***config.properties*** file +``` +schema.type= +``` +* **document level**: parameters that are set within NB yaml file and under + the ***params*** section +``` params: topic_uri: ... ``` +* **statement level**: parameters that are set within NB yaml file, but +under different block statements +``` +- name: producer-block + statements: + - name: s1 + msg_key: +``` -In theory, all Pulsar client settings can be made as cycle level settings -for maximum flexibility. But practically speaking (and also for simplicity -purposes), only the following parameters are made to be configurable at -both levels, listed by cycle level setting names with their corresponding -global level setting names: -* topic_uri (Mandatory) - * producer.topicName - * consumer.topicNames - * reader.topicName -* topic_names (Optional for Consumer) - * consumer.topicNames -* subscription_name (Mandatory for Consumer) - * consumer.subscriptionName -* subscription_type (Mandatory for Consumer, default to **exclusive** - type) - * consumer.subscriptionType -* topics_pattern (Optional for Consumer) - * consumer.topicsPattern -* producer_name (Optional) - * producer.producerName -* consumer_name (Optional) - * consumer.consumerName -* reader_name (Optional) - * reader.readerName +**NOTE**: If one parameter is set at multiple levels (e.g. producer name), +the parameter at lower level will take precedence. -One key difference between setting a parameter at the global level vs. at -the cycle level is that the global level setting is always static and -stays the same for all NB cycle execution. The cycle level setting, on the -other side, can be dynamically bound and can be different from cycle to -cycle. - -Because of this, setting these parameters at the NB cycle level allows us -to run Pulsar testing against multiple topics and/or multiple -producers/consumers/readers/etc all at once within one NB activity. This -makes the testing more flexible and effective. - -**NOTE**: when a configuration is set at both the global level and the -cycle level, **the cycle level setting will take priority!** - -## 1.4. Pulsar Driver Yaml File - Command Block Details +## 1.4. Pulsar Driver Yaml File - Command Blocks ### 1.4.1. Pulsar Admin API Command Block - Create Tenants -This Pulsar Admin API Block is used to create Pulsar tenants. It has the following format: +This Pulsar Admin API Block is used to create or delete Pulsar tenants. It +has the following format. + +Please note that when document level parameter **admin_delop** is set to be +true, then this command block will delete Pulsar tenants instead. Similarly +this applies to other Admin API blocks for namespace and topic management. ```yaml - name: create-tenant-block @@ -265,10 +254,10 @@ In this command block, there is only 1 statement (s1): * Statement **s1** is used for creating a Pulsar tenant * (Mandatory) **optype (admin-tenant)** is the statement identifier for this statement - * (Optional) **allowed_clusters** must be statically bound and it + * (Optional) **allowed_clusters** must be statically bound, and it specifies the cluster list that is allowed for a tenant. - * (Optional) **admin_roles** must be statically bound and it specifies - the super user role that is associated with a tenant. + * (Optional) **admin_roles** must be statically bound, and it specifies + the superuser role that is associated with a tenant. * (Mandatory) **tenant** is the Pulsar tenant name to be created. It can either be dynamically or statically bound. @@ -293,7 +282,7 @@ In this command block, there is only 1 statement (s1): * (Mandatory) **optype (admin-namespace)** is the statement identifier for this statement * (Mandatory) **namespace** is the Pulsar namespace name to be created - under the above tenant. It also can be dynamically or statically bound. + under a tenant. It can be either statically or dynamically bound. ### 1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular) @@ -322,7 +311,7 @@ In this command block, there is only 1 statement (s1): a partitioned topic is to be created. It also can be dynamically or statically bound. -**NOTE**: The topic name is bounded by the document level parameter "topic_uri". +**NOTE**: The topic name is bound by the document level parameter "topic_uri". ### 1.4.4. Batch Producer Command Block @@ -331,7 +320,7 @@ once by one NB cycle execution. A typical format of this command block is as below: ```yaml -- name: batch-producer-block + - name: batch-producer-block tags: phase: batch-producer statements: @@ -343,6 +332,11 @@ as below: - name: s2 optype: batch-msg-send msg_key: "{mykey}" + msg_property: | + { + "prop1": "{myprop1}", + "prop2": "{myprop2}" + } msg_value: | { "SensorID": "{sensor_id}", @@ -374,6 +368,9 @@ ratios: 1, , 1. for this statement * (Optional) **msg_key**, when provided, specifies the key of the generated message + * (Optional) **msg_property**, when provided, specifies the properties + of the generated message. It must be a JSON string that contains a + series of key-value pairs. * (Mandatory) **msg_payload** specifies the payload of the generated message * (Optional) **ratio**, when provided, specifies the batch size (how @@ -385,6 +382,9 @@ ratios: 1, , 1. * (Optional) **ratio**, when provided, MUST be 1. If not provided, it is default to 1. +**NOTE**: the topic that the producer needs to publish messages to is +specified by the document level parameter ***topic_uri***. + ### 1.4.5. Producer Command Block This is the regular Pulsar producer command block that produces one Pulsar @@ -400,6 +400,11 @@ as below: optype: msg-send # producer_name: {producer_name} msg_key: "{mykey}" + msg_property: | + { + "prop1": "{myprop1}", + "prop2": "{myprop2}" + } msg_value: | { "SensorID": "{sensor_id}", @@ -418,14 +423,20 @@ This command block only has 1 statements (s1): producer name that is associated with the message production. * (Optional) **msg_key**, when provided, specifies the key of the generated message + * (Optional) **msg_property**, when provided, specifies the properties + of the generated message. It must be a JSON string that contains a + series of key-value pairs. * (Mandatory) **msg_payload** specifies the payload of the generated message -### 1.4.6. Consumer Command Block +**NOTE**: the topic that the producer needs to publish messages to is +specified by the document level parameter ***topic_uri***. + +### 1.4.6. (Single-Topic) Consumer Command Block This is the regular Pulsar consumer command block that consumes one Pulsar -message per NB cycle execution. A typical format of this command block is -as below: +message from one single Pulsar topic per NB cycle execution. A typical +format of this command block is as below: ```yaml - name: consumer-block @@ -434,8 +445,6 @@ as below: statements: - name: s1 optype: msg-consume - topic_names: ", " - # topics_pattern: "" subscription_name: subscription_type: consumer_name: @@ -447,19 +456,14 @@ This command block only has 1 statements (s1): and acknowledge it. * (Mandatory) **optype (msg-consume)** is the statement identifier for this statement - * (Optional) **topic_names**, when provided, specifies multiple topic - names from which to consume messages for multi-topic message consumption. - * (Optional) **topics_pattern**, when provided, specifies pulsar - topic regex pattern for multi-topic message consumption * (Mandatory) **subscription_name** specifies subscription name. * (Optional) **subscription_type**, when provided, specifies subscription type. Default to **exclusive** subscription type. * (Optional) **consumer_name**, when provided, specifies the associated consumer name. -**NOTE 1**: when both **topic_names** and **topics_pattern** are provided, **topic_names** takes precedence over **topics_pattern**. - -**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**. +**NOTE**: the single topic that the consumer needs to consume messages from +is specified by the document level parameter ***topic_uri***. ### 1.4.7. Reader Command Block @@ -486,6 +490,9 @@ This command block only has 1 statements (s1): * (Optional) **reader_name**, when provided, specifies the associated consumer name. +**NOTE**: the single topic that the reader needs to read messages from +is specified by the document level parameter ***topic_uri***. + **TBD**: at the moment, the NB Pulsar driver Reader API only supports reading from the following positions: * MessageId.earliest @@ -501,7 +508,121 @@ Reader reader = pulsarClient.newReader() .create(); ``` -## 1.5. Schema Support +### 1.4.8. Multi-topic Consumer Command Block + +This is the regular Pulsar consumer command block that consumes one Pulsar +message from multiple Pulsar topics per NB cycle execution. A typical format +of this command block is as below: + +```yaml + - name: consumer-block + tags: + phase: consumer + statements: + - name: s1 + optype: msg-consume + subscription_name: + subscription_type: + consumer_name: +``` + +This command block only has 1 statements (s1): + +* Statement **s1** is used to consume one message from the Pulsar cluster + and acknowledge it. + * (Mandatory) **optype (msg-consume)** is the statement identifier for + this statement + * (Optional) **topic_names**, when provided, specifies multiple topic + names from which to consume messages for multi-topic message consumption. + * (Optional) **topics_pattern**, when provided, specifies pulsar + topic regex pattern for multi-topic message consumption + * (Mandatory) **subscription_name** specifies subscription name. + * (Optional) **subscription_type**, when provided, specifies + subscription type. Default to **exclusive** subscription type. + * (Optional) **consumer_name**, when provided, specifies the + associated consumer name. + +**NOTE 1**: when both **topic_names** and **topics_pattern** are provided, +**topic_names** takes precedence over **topics_pattern**. + +**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, +consumer topic name is default to the document level parameter **topic_uri**. + +### 1.4.9. End-to-end Message Processing Command Block + +End-to-end message processing command block is used to simplify measuring +the end-to-end message processing (from being published to being consumed) +latency. A typical format of this command block is as below: + +```yaml + - name: e2e-msg-proc-block + tags: + phase: e2e-msg-proc + admin_task: false + statements: + - name: s1 + optype: ec2-msg-proc-send + msg_key: + msg_property: | + { + "prop1": "{myprop1}" + } + msg_value: "{myvalue}" + ratio: 1 + - name: s2 + optype: ec2-msg-proc-consume + subscription_name: "mysub" + subscription_type: + ratio: 1 +``` + +This command block has 2 statements (s1 and s2) with the following +ratios: 1, 1. + +* Statement **s1** is used to publish a message to a topic + * (Mandatory) **optype (ec2-msg-proc-send)** is the statement + identifier for this statement + * (Optional) **msg_key**, when provided, specifies the key of the + generated message + * (Optional) **msg_property**, when provided, specifies the properties + of the generated message. It must be a JSON string that contains a + series of key-value pairs. + * (Mandatory) **msg_payload** specifies the payload of the generated + message + * (Optional) **ratio**, must be 1 when provided. + Otherwise, default to 1. +* Statement **s2** is used to consume the message that just got published +from the same topic + * (Mandatory) **optype (ec2-msg-proc-consume)** is the statement + identifier for this statement + * (Mandatory) **subscription_name** specifies subscription name. + * (Optional) **subscription_type**, when provided, specifies + subscription type. Default to **exclusive** subscription type. + * (Optional) **ratio**, must be 1 when provided. + Otherwise, default to 1. + +**NOTE**: the topic that the producer needs to publish messages to is +specified by the document level parameter ***topic_uri***. + +## 1.5. Message Properties + +In the producer command block, it is optional to specify message properties: +``` + statements: + - name: s1 + msg_property: | + { + "prop1": "{myprop1}", + "prop2": "{myprop2}" + } +``` + +The provided message property string must be a valid JSON string that +contains a list of key value pairs. Otherwise, if it is not a valid +JSON string as expected, the driver will ignore it and treat the +message as having no properties. + +## 1.6. Schema Support Pulsar has built-in schema support. Other than primitive types, Pulsar also supports complex types like **Avro**, etc. At the moment, the NB @@ -535,7 +656,63 @@ schema definition: } ``` -## 1.6. NB Activity Execution Parameters +## 1.7. Measure End-to-end Message Processing Latency + +**e2e-msg-proc-block** measures the end-to-end message latency metrics. It +contains one message producing statement and one message consuming statement. +When the message that is published by the producer is received by the consumer, +the consumer calculates the time difference between when the time is received +and when the time is published. + +The measured end-to-end message processing latency is captured as a histogram +metrics name "e2e_msg_latency". + +This command block uses one single machine to act as both a producer and a +consumer. We do so just for convenience purposes. In reality, we can use +**producer-block** and **consumer-block** command blocks on separate machines +to achieve the same goal, which is probably closer to the actual use case and +probably more accurate measurement (to avoid the situation of always reading +messages from the managed ledger cache). + +One thing to remember though if we're using multiple machines to measure the +end-to-end message processing latency, we need to make sure: +1) The time of the two machines are synced up with each other, e.g. through +NTP protocol. +2) If there is some time lag of starting the consumer, we need to count that +into consideration when interpreting the end-to-end message processing latency. + +## 1.8. Detect Message Out-of-order Error and Message Loss + +In order to detect errors like message out-of-order and message loss through +the NB Pulsar driver, we need to set the following document level parameter +to be true. +``` +params: + # Only applicable to producer and consumer + # - used for message ordering and message loss check + seq_tracking: "true" +``` + +The logic of how this works is based on the fact that NB execution cycle number +is monotonically increasing by 1 for every cycle moving forward. When publishing +a series of messages, we use the current NB cycle number as one message property +which is also monotonically increasing by 1. + +When receiving the messages, if the message sequence number stored in the message +property is not monotonically increasing or if there is a gap larger than 1, then +it means the messages are either delivered out of the order or there are some message +loss. Either way, the consumer NB execution will throw runtime exceptions, with the +following messages respectively: + +```text + "Detected message ordering is not guaranteed. Older messages are received earlier!" +``` + +```text + "Detected message sequence id gap. Some published messages are not received!" +``` + +## 1.9. NB Activity Execution Parameters At the moment, the following NB Pulsar driver **specific** activity parameters are supported: @@ -553,7 +730,7 @@ reference to NB documentation for more parameters * cycles= * --report-csv-to -## 1.7. NB Pulsar Driver Execution Example +## 1.10. NB Pulsar Driver Execution Example **NOTE**: in the following examples, the Pulsar service URL is **pulsar: //localhost:6650**, please change it accordingly for your own Pulsar @@ -578,7 +755,7 @@ environment. ``` -## 1.8. Appendix A. Template Global Setting File (config.properties) +## 1.11. Appendix A. Template Global Setting File (config.properties) ```properties schema.type = schema.definition =