merge fixups

This commit is contained in:
Jonathan Shook 2022-08-09 13:11:55 -05:00
commit e5f717e6de
10 changed files with 41 additions and 115 deletions

View File

@ -42,7 +42,7 @@
<dependency> <dependency>
<groupId>org.postgresql</groupId> <groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId> <artifactId>postgresql</artifactId>
<version>42.4.0</version> <version>42.4.1</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -92,7 +92,7 @@
<dependency> <dependency>
<groupId>com.datastax.oss</groupId> <groupId>com.datastax.oss</groupId>
<artifactId>pulsar-jms</artifactId> <artifactId>pulsar-jms</artifactId>
<version>2.4.1</version> <version>2.4.2</version>
</dependency> </dependency>
</dependencies> </dependencies>

View File

@ -47,7 +47,7 @@
<dependency> <dependency>
<groupId>org.apache.avro</groupId> <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId> <artifactId>avro</artifactId>
<version>1.11.0</version> <version>1.11.1</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer --> <!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->

View File

@ -77,7 +77,7 @@
<dependency> <dependency>
<groupId>org.apache.avro</groupId> <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId> <artifactId>avro</artifactId>
<version>1.11.0</version> <version>1.11.1</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 --> <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->

View File

@ -150,18 +150,18 @@ public class ReadyPulsarOp extends BaseOpDispenser<PulsarOp> {
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) { else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc); return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc);
} }
// Regular/non-admin operation: batch message processing - batch start // // Regular/non-admin operation: batch message processing - batch start
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) { // else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) {
return resolveMsgBatchSendStart(clientSpace, topicUriFunc, asyncApiFunc); // return resolveMsgBatchSendStart(clientSpace, topicUriFunc, asyncApiFunc);
} // }
// Regular/non-admin operation: batch message processing - message sending (producer) // // Regular/non-admin operation: batch message processing - message sending (producer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) { // else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) {
return resolveMsgBatchSend(clientSpace, asyncApiFunc); // return resolveMsgBatchSend(clientSpace, asyncApiFunc);
} // }
// Regular/non-admin operation: batch message processing - batch send // // Regular/non-admin operation: batch message processing - batch send
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) { // else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) {
return resolveMsgBatchSendEnd(clientSpace, asyncApiFunc); // return resolveMsgBatchSendEnd(clientSpace, asyncApiFunc);
} // }
// Regular/non-admin operation: end-to-end message processing - sending message // Regular/non-admin operation: end-to-end message processing - sending message
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_SEND.label)) { else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc); return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc);

View File

@ -42,11 +42,11 @@ public class PulsarActivityUtil {
ADMIN_TENANT("admin-tenant"), ADMIN_TENANT("admin-tenant"),
ADMIN_NAMESPACE("admin-namespace"), ADMIN_NAMESPACE("admin-namespace"),
ADMIN_TOPIC("admin-topic"), ADMIN_TOPIC("admin-topic"),
E2E_MSG_PROC_SEND("ec2-msg-proc-send"), E2E_MSG_PROC_SEND("e22-msg-proc-send"),
E2E_MSG_PROC_CONSUME("ec2-msg-proc-consume"), E2E_MSG_PROC_CONSUME("e22-msg-proc-consume"),
BATCH_MSG_SEND_START("batch-msg-send-start"), // BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"), // BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"), // BATCH_MSG_SEND_END("batch-msg-send-end"),
MSG_SEND("msg-send"), MSG_SEND("msg-send"),
MSG_CONSUME("msg-consume"), MSG_CONSUME("msg-consume"),
MSG_READ("msg-read"), MSG_READ("msg-read"),

View File

@ -10,12 +10,11 @@
- [5.1. Pulsar Admin API Statement Block - Create/Delete Tenants](#51-pulsar-admin-api-statement-block---createdelete-tenants) - [5.1. Pulsar Admin API Statement Block - Create/Delete Tenants](#51-pulsar-admin-api-statement-block---createdelete-tenants)
- [5.2. Pulsar Admin API Command Block - Create/Delete Namespaces](#52-pulsar-admin-api-command-block---createdelete-namespaces) - [5.2. Pulsar Admin API Command Block - Create/Delete Namespaces](#52-pulsar-admin-api-command-block---createdelete-namespaces)
- [5.3. Pulsar Admin API Command Block - Create/Delete Topics, Partitioned or Not](#53-pulsar-admin-api-command-block---createdelete-topics-partitioned-or-not) - [5.3. Pulsar Admin API Command Block - Create/Delete Topics, Partitioned or Not](#53-pulsar-admin-api-command-block---createdelete-topics-partitioned-or-not)
- [5.4. Batch Producer Command Block (Only Applicable with Pulsar Synchronous API)](#54-batch-producer-command-block-only-applicable-with-pulsar-synchronous-api) - [5.4. Producer Statement Block](#54-producer-statement-block)
- [5.5. Producer Statement Block](#55-producer-statement-block) - [5.5. Consumer Statement Block](#55-consumer-statement-block)
- [5.6. Consumer Statement Block](#56-consumer-statement-block) - [5.6. Reader Statement Block](#56-reader-statement-block)
- [5.7. Reader Statement Block](#57-reader-statement-block) - [5.7. Multi-topic Consumer Statement Block](#57-multi-topic-consumer-statement-block)
- [5.8. Multi-topic Consumer Statement Block](#58-multi-topic-consumer-statement-block) - [5.8. End-to-end Message Latency Statement Block](#58-end-to-end-message-latency-statement-block)
- [5.9. End-to-end Message Latency Statement Block](#59-end-to-end-message-latency-statement-block)
- [6. Generate Message Content](#6-generate-message-content) - [6. Generate Message Content](#6-generate-message-content)
- [7. Message Schema Support](#7-message-schema-support) - [7. Message Schema Support](#7-message-schema-support)
- [8. Measure End-to-end Message Processing Latency](#8-measure-end-to-end-message-processing-latency) - [8. Measure End-to-end Message Processing Latency](#8-measure-end-to-end-message-processing-latency)
@ -53,7 +52,6 @@ At high level, Pulsar driver yaml file has the following structure:
* (Pulsar Admin API) **create-tenant-block**: create/delete tenants * (Pulsar Admin API) **create-tenant-block**: create/delete tenants
* (Pulsar Admin API) **create-namespace-block**: create/delete namespaces * (Pulsar Admin API) **create-namespace-block**: create/delete namespaces
* (Pulsar Admin API) **create-topic-block**: create/delete topics * (Pulsar Admin API) **create-topic-block**: create/delete topics
* (Pulsar Client API) **batch-producer-block**: batch producer
* (Pulsar Client API) **producer-block**: producer * (Pulsar Client API) **producer-block**: producer
* (Pulsar Client API) **consumer-block**: consumer (single topic) * (Pulsar Client API) **consumer-block**: consumer (single topic)
* (Pulsar Client API) **reader-block**: reader * (Pulsar Client API) **reader-block**: reader
@ -140,9 +138,9 @@ Currently, the following configuration parameters are available at this level:
* **admin_delop**: For Admin tasks, whether to execute delete operation instead of the default create operation. This can only be statically bound. * **admin_delop**: For Admin tasks, whether to execute delete operation instead of the default create operation. This can only be statically bound.
* **seq_tracking**: Whether to do message sequence tracking. This is used for abnormal message processing error detection such as message loss, message duplication, or message out-of-order. This can only be statically bound. * **seq_tracking**: Whether to do message sequence tracking. This is used for abnormal message processing error detection such as message loss, message duplication, or message out-of-order. This can only be statically bound.
* **e2e_starting_time_source**: Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. The possible values for `e2e_starting_time_source`: * **e2e_starting_time_source**: Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. The possible values for `e2e_starting_time_source`:
* `message_publish_time` - uses the message publishing timestamp as the starting time * `message_publish_time` - uses the message publishing timestamp as the starting time
* `message_event_time` - uses the message event timestamp as the starting time * `message_event_time` - uses the message event timestamp as the starting time
* `message_property_e2e_starting_time` - uses a message property `e2e_starting_time` as the starting time. * `message_property_e2e_starting_time` - uses a message property `e2e_starting_time` as the starting time.
## 3.3. Statement Level Parameters ## 3.3. Statement Level Parameters
@ -266,66 +264,7 @@ In this statement block, there is only one statement (s1):
**NOTE**: The topic name is bound by the document level parameter "topic_uri". **NOTE**: The topic name is bound by the document level parameter "topic_uri".
## 5.4. Batch Producer Command Block (Only Applicable with Pulsar Synchronous API) ## 5.4. Producer Statement Block
**NOTE**: This statement block is only applicable when Pulsar Synchronous API is used as defined by the document level setting ***async_api***.
```
params:
async_api: "false"
```
Batch producer statement block is used to send(produce) a batch of messages all at once to the Pulsar cluster. It has a typical format as below:
```yaml
- name: batch-producer-block
tags:
phase: batch-producer
statements:
- name: s1
optype: batch-msg-send-start
# For batch producer, "producer_name" should be associated with batch start
# batch_producer_name: {batch_producer_name}
ratio: 1
- name: s2
optype: batch-msg-send
msg_key: "{mykey}"
msg_property: |
{
"prop1": "{myprop1}",
"prop2": "{myprop2}"
}
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
ratio: 100
- name: s3
optype: batch-msg-send-end
ratio: 1
```
This statement block has three statements (s1, s2, and s3) with the following ratios: 1, <batch_num>, 1.
* Statement **s1** is used to mark the start of a message batch
* (Mandatory) **optype (batch-msg-send-start)** is the statement identifier for this statement
* (Optional) **batch_producer_name**, when provided, specifies the Pulsar producer name that is associated with the batch production of the messages.
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it is default to 1.
* Statement **s2** is the core statement that generates each message that needs to be put in a batch.
* (Mandatory) **optype (batch-msg-send)** is the statement identifier for this statement
* (Optional) **msg_key**, when provided, specifies the key of the generated message
* (Optional) **msg_property**, when provided, specifies the properties of the generated message. It must be a JSON string that contains a series of key-value pairs.
* (Mandatory) **msg_payload** specifies the payload of the generated message
* (Optional) **ratio**, when provided, specifies the batch size (how many messages to be put in one batch). If not provided, it is default to 1.
* Statement **s3** is used to mark the end of a batch
* (Mandatory) **optype (batch-msg-send-end)** is the statement identifier for this statement
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it is default to 1.
**NOTE**: the topic that the producer needs to publish messages to is specified by the document level parameter ***topic_uri***.
## 5.5. Producer Statement Block
This is the regular Pulsar producer statement block that produces one Pulsar message per NB execution cycle. A typical format of this statement block is as below: This is the regular Pulsar producer statement block that produces one Pulsar message per NB execution cycle. A typical format of this statement block is as below:
@ -363,7 +302,7 @@ This statement block only has one statement (s1):
**NOTE**: the topic that the producer needs to publish messages to is specified by the document level parameter ***topic_uri***. **NOTE**: the topic that the producer needs to publish messages to is specified by the document level parameter ***topic_uri***.
## 5.6. Consumer Statement Block ## 5.5. Consumer Statement Block
This is the regular Pulsar consumer statement block that consumes one message per NB execution cycle. A typical format of this statement block is as below: This is the regular Pulsar consumer statement block that consumes one message per NB execution cycle. A typical format of this statement block is as below:
@ -389,7 +328,7 @@ This statement block only has one statement (s1):
**NOTE**: the topic that the consumer receives messages from is specified by the document level parameter ***topic_uri***. **NOTE**: the topic that the consumer receives messages from is specified by the document level parameter ***topic_uri***.
## 5.7. Reader Statement Block ## 5.6. Reader Statement Block
This is the regular Pulsar reader statement block that reads one message per NB cycle execution. It has a typical format as below: This is the regular Pulsar reader statement block that reads one message per NB cycle execution. It has a typical format as below:
@ -416,7 +355,7 @@ This statement block only has one statement (s1):
**NOTE**: the topic that the reader needs to read messages from is specified by the document level parameter ***topic_uri***. **NOTE**: the topic that the reader needs to read messages from is specified by the document level parameter ***topic_uri***.
## 5.8. Multi-topic Consumer Statement Block ## 5.7. Multi-topic Consumer Statement Block
This is the Pulsar consumer statement block that consumes messages from multiple Pulsar topics per NB execution. It has a typical format as below: This is the Pulsar consumer statement block that consumes messages from multiple Pulsar topics per NB execution. It has a typical format as below:
@ -449,7 +388,7 @@ This statement block only has one statement (s1):
**NOTE 2**: when both **topic_names** and **topics_pattern** are provided, **topic_names** takes precedence over **topics_pattern**. **NOTE 2**: when both **topic_names** and **topics_pattern** are provided, **topic_names** takes precedence over **topics_pattern**.
## 5.9. End-to-end Message Latency Statement Block ## 5.8. End-to-end Message Latency Statement Block
End-to-end message latency statement block is used to simplify the task of measuring the end-to-end message processing (from being published to being consumed)latency. It has a typical format as below: End-to-end message latency statement block is used to simplify the task of measuring the end-to-end message processing (from being published to being consumed)latency. It has a typical format as below:
@ -585,7 +524,6 @@ At the moment, the following Pulsar driver specific** NB activity parameters are
Some other common NB activity parameters are listed as below. Please refer to NB documentation for more information. Some other common NB activity parameters are listed as below. Please refer to NB documentation for more information.
* driver=pulsar * driver=pulsar
* seq=concat (needed for **batch** producer)
* tags=phase:<command_block_identifier> * tags=phase:<command_block_identifier>
* threads=<NB_execution_thread_number> * threads=<NB_execution_thread_number>
* cycles=<total_NB_cycle_execution_number> * cycles=<total_NB_cycle_execution_number>
@ -599,20 +537,15 @@ Some other common NB activity parameters are listed as below. Please refer to NB
<nb_cmd> run driver=pulsar tags=phase:producer threads=100 cycles=100K web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml <nb_cmd> run driver=pulsar tags=phase:producer threads=100 cycles=100K web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
``` ```
2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads. 2. Run Pulsar consumer API to consume (and acknowledge) 100 messages using
**NOTE**: *seq=* must have **concat** value in order to make the batch API working properly!
```bash
<nb_cmd> run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml --report-csv-to <metrics_folder_path>
```
3. Run Pulsar consumer API to consume (and acknowledge) 100 messages using
one single NB thread. one single NB thread.
```bash ```bash
<nb_cmd> run driver=pulsar tags=phase:consumer cycles=100 web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml <nb_cmd> run driver=pulsar tags=phase:consumer cycles=100 web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
``` ```
# 12. Appendix A. Template Global Setting File (config.properties) # 12. Appendix A. Template Global Setting File (config.properties)
```properties
```
schema.type = schema.type =
schema.definition = schema.definition =

View File

@ -100,12 +100,12 @@
<dependency> <dependency>
<groupId>net.java.dev.jna</groupId> <groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId> <artifactId>jna</artifactId>
<version>5.11.0</version> <version>5.12.1</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.java.dev.jna</groupId> <groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId> <artifactId>jna-platform</artifactId>
<version>5.11.0</version> <version>5.12.1</version>
</dependency> </dependency>
<dependency> <dependency>
@ -260,7 +260,7 @@
<dependency> <dependency>
<groupId>com.github.oshi</groupId> <groupId>com.github.oshi</groupId>
<artifactId>oshi-core-java11</artifactId> <artifactId>oshi-core-java11</artifactId>
<version>6.1.6</version> <version>6.2.1</version>
</dependency> </dependency>
<dependency> <dependency>
@ -408,7 +408,7 @@
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId> <artifactId>log4j-jcl</artifactId>
<version>2.17.2</version> <version>2.18.0</version>
</dependency> </dependency>
<!-- Removed due to a possible conflict--> <!-- Removed due to a possible conflict-->

View File

@ -114,12 +114,6 @@
<version>4.17.21-SNAPSHOT</version> <version>4.17.21-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-jms</artifactId>
<version>4.17.21-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>

View File

@ -92,7 +92,6 @@
<module>driver-jdbc</module> <module>driver-jdbc</module>
<module>driver-cockroachdb</module> <module>driver-cockroachdb</module>
<module>driver-pulsar</module> <module>driver-pulsar</module>
<module>driver-jms</module>
</modules> </modules>
</profile> </profile>