diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index e659ee377..37e8c26e0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -190,47 +190,47 @@ public class ReadyPulsarOp implements OpDispenser { ) { // Topic list (multi-topic) LongFunction topic_names_func; - if (cmdTpl.isStatic("topic-names")) { - topic_names_func = (l) -> cmdTpl.getStatic("topic-names"); - } else if (cmdTpl.isDynamic("topic-names")) { - topic_names_func = (l) -> cmdTpl.getDynamic("topic-names", l); + if (cmdTpl.isStatic("topic_names")) { + topic_names_func = (l) -> cmdTpl.getStatic("topic_names"); + } else if (cmdTpl.isDynamic("topic_names")) { + topic_names_func = (l) -> cmdTpl.getDynamic("topic_names", l); } else { topic_names_func = (l) -> null; } // Topic pattern (multi-topic) LongFunction topics_pattern_func; - if (cmdTpl.isStatic("topics-pattern")) { - topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern"); - } else if (cmdTpl.isDynamic("topics-pattern")) { - topics_pattern_func = (l) -> cmdTpl.getDynamic("topics-pattern", l); + if (cmdTpl.isStatic("topics_pattern")) { + topics_pattern_func = (l) -> cmdTpl.getStatic("topics_pattern"); + } else if (cmdTpl.isDynamic("topics_pattern")) { + topics_pattern_func = (l) -> cmdTpl.getDynamic("topics_pattern", l); } else { topics_pattern_func = (l) -> null; } LongFunction subscription_name_func; - if (cmdTpl.isStatic("subscription-name")) { - subscription_name_func = (l) -> cmdTpl.getStatic("subscription-name"); - } else if (cmdTpl.isDynamic("subscription-name")) { - subscription_name_func = (l) -> cmdTpl.getDynamic("subscription-name", l); + if (cmdTpl.isStatic("subscription_name")) { + subscription_name_func = (l) -> cmdTpl.getStatic("subscription_name"); + } else if (cmdTpl.isDynamic("subscription_name")) { + subscription_name_func = (l) -> cmdTpl.getDynamic("subscription_name", l); } else { subscription_name_func = (l) -> null; } LongFunction subscription_type_func; - if (cmdTpl.isStatic("subscription-type")) { - subscription_type_func = (l) -> cmdTpl.getStatic("subscription-type"); - } else if (cmdTpl.isDynamic("subscription-type")) { - subscription_type_func = (l) -> cmdTpl.getDynamic("subscription-type", l); + if (cmdTpl.isStatic("subscription_type")) { + subscription_type_func = (l) -> cmdTpl.getStatic("subscription_type"); + } else if (cmdTpl.isDynamic("subscription_type")) { + subscription_type_func = (l) -> cmdTpl.getDynamic("subscription_type", l); } else { subscription_type_func = (l) -> null; } LongFunction consumer_name_func; - if (cmdTpl.isStatic("consumer-name")) { - consumer_name_func = (l) -> cmdTpl.getStatic("consumer-name"); - } else if (cmdTpl.isDynamic("consumer-name")) { - consumer_name_func = (l) -> cmdTpl.getDynamic("consumer-name", l); + if (cmdTpl.isStatic("consumer_name")) { + consumer_name_func = (l) -> cmdTpl.getStatic("consumer_name"); + } else if (cmdTpl.isDynamic("consumer_name")) { + consumer_name_func = (l) -> cmdTpl.getDynamic("consumer_name", l); } else { consumer_name_func = (l) -> null; } @@ -254,19 +254,19 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction async_api_func ) { LongFunction reader_name_func; - if (cmdTpl.isStatic("reader-name")) { - reader_name_func = (l) -> cmdTpl.getStatic("reader-name"); - } else if (cmdTpl.isDynamic("reader-name")) { - reader_name_func = (l) -> cmdTpl.getDynamic("reader-name", l); + if (cmdTpl.isStatic("reader_name")) { + reader_name_func = (l) -> cmdTpl.getStatic("reader_name"); + } else if (cmdTpl.isDynamic("reader_name")) { + reader_name_func = (l) -> cmdTpl.getDynamic("reader_name", l); } else { reader_name_func = (l) -> null; } LongFunction start_msg_pos_str_func; - if (cmdTpl.isStatic("start-msg-position")) { - start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start-msg-position"); - } else if (cmdTpl.isDynamic("start-msg-position")) { - start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start-msg-position", l); + if (cmdTpl.isStatic("start_msg_position")) { + start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start_msg_position"); + } else if (cmdTpl.isDynamic("start_msg_position")) { + start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start_msg_position", l); } else { start_msg_pos_str_func = (l) -> null; } diff --git a/driver-pulsar/src/main/resources/activities/config.properties b/driver-pulsar/src/main/resources/activities/config.properties index 68d51e3ca..58e8bf38c 100644 --- a/driver-pulsar/src/main/resources/activities/config.properties +++ b/driver-pulsar/src/main/resources/activities/config.properties @@ -10,14 +10,20 @@ # 2) Avro for messages with schema schema.type=avro schema.definition=file:// + + ### Pulsar client related configurations - client.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#client client.connectionTimeoutMs=5000 + + ### Producer related configurations (global) - producer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer producer.producerName= producer.topicName= producer.sendTimeoutMs= + + ### Consumer related configurations (global) - consumer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer consumer.topicNames= @@ -26,6 +32,8 @@ consumer.subscriptionName= consumer.subscriptionType= consumer.consumerName= consumer.receiverQueueSize= + + ### Reader related configurations (global) - reader.xxx # https://pulsar.apache.org/docs/en/client-libraries-java/#reader # - valid Pos: earliest, latest, custom::file://// diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index fc1ad899e..addbd1c2d 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -287,7 +287,7 @@ as below: - name: s1 optype: batch-msg-send-start # For batch producer, "producer_name" should be associated with batch start - batch_producer_name: {batch_producer_name} + # batch_producer_name: {batch_producer_name} ratio: 1 - name: s2 optype: batch-msg-send @@ -321,9 +321,9 @@ ratios: 1, , 1. and payload to be put in the batch. * (Mandatory) **optype (batch-msg-send)** is the statement identifier for this statement - * (Optional) **msg-key**, when provided, specifies the key of the + * (Optional) **msg_key**, when provided, specifies the key of the generated message - * (Mandatory) **msg-payload** specifies the payload of the generated + * (Mandatory) **msg_payload** specifies the payload of the generated message * (Optional) **ratio**, when provided, specifies the batch size (how many messages to be put in one batch). If not provided, it is @@ -365,9 +365,9 @@ This command block only has 1 statements (s1): this statement * (Optional) **producer_name**, when provided, specifies the Pulsar producer name that is associated with the message production. - * (Optional) **msg-key**, when provided, specifies the key of the + * (Optional) **msg_key**, when provided, specifies the key of the generated message - * (Mandatory) **msg-payload** specifies the payload of the generated + * (Mandatory) **msg_payload** specifies the payload of the generated message ### 1.4.4. Consumer Command Block @@ -397,8 +397,7 @@ This command block only has 1 statements (s1): * (Mandatory) **optype (msg-consume)** is the statement identifier for this statement * (Optional) **topic_names**, when provided, specifies multiple topic - names from which to consume messages. Default to document level - parameter **topic_uri**. + names from which to consume messages for multi-topic message consumption. * (Optional) **topics_pattern**, when provided, specifies pulsar topic regex pattern for multi-topic message consumption * (Mandatory) **subscription_name** specifies subscription name. @@ -407,6 +406,10 @@ This command block only has 1 statements (s1): * (Optional) **consumer_name**, when provided, specifies the associated consumer name. +**NOTE 1**: when both **topic_names** and **topics_pattern** are provided, **topic_names** takes precedence over **topics_pattern**. + +**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**. + ### 1.4.5. Reader Command Block This is the regular Pulsar reader command block that reads one Pulsar