Minor code fix regarding consumer subscription parameter name: e.g. from subscription-name changes to subscription_name

This commit is contained in:
Yabin Meng 2021-03-10 17:19:55 -06:00
parent dbdbf6e84a
commit 15a7685022
3 changed files with 46 additions and 35 deletions

View File

@ -190,47 +190,47 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
) {
// Topic list (multi-topic)
LongFunction<String> topic_names_func;
if (cmdTpl.isStatic("topic-names")) {
topic_names_func = (l) -> cmdTpl.getStatic("topic-names");
} else if (cmdTpl.isDynamic("topic-names")) {
topic_names_func = (l) -> cmdTpl.getDynamic("topic-names", l);
if (cmdTpl.isStatic("topic_names")) {
topic_names_func = (l) -> cmdTpl.getStatic("topic_names");
} else if (cmdTpl.isDynamic("topic_names")) {
topic_names_func = (l) -> cmdTpl.getDynamic("topic_names", l);
} else {
topic_names_func = (l) -> null;
}
// Topic pattern (multi-topic)
LongFunction<String> topics_pattern_func;
if (cmdTpl.isStatic("topics-pattern")) {
topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern");
} else if (cmdTpl.isDynamic("topics-pattern")) {
topics_pattern_func = (l) -> cmdTpl.getDynamic("topics-pattern", l);
if (cmdTpl.isStatic("topics_pattern")) {
topics_pattern_func = (l) -> cmdTpl.getStatic("topics_pattern");
} else if (cmdTpl.isDynamic("topics_pattern")) {
topics_pattern_func = (l) -> cmdTpl.getDynamic("topics_pattern", l);
} else {
topics_pattern_func = (l) -> null;
}
LongFunction<String> subscription_name_func;
if (cmdTpl.isStatic("subscription-name")) {
subscription_name_func = (l) -> cmdTpl.getStatic("subscription-name");
} else if (cmdTpl.isDynamic("subscription-name")) {
subscription_name_func = (l) -> cmdTpl.getDynamic("subscription-name", l);
if (cmdTpl.isStatic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getStatic("subscription_name");
} else if (cmdTpl.isDynamic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getDynamic("subscription_name", l);
} else {
subscription_name_func = (l) -> null;
}
LongFunction<String> subscription_type_func;
if (cmdTpl.isStatic("subscription-type")) {
subscription_type_func = (l) -> cmdTpl.getStatic("subscription-type");
} else if (cmdTpl.isDynamic("subscription-type")) {
subscription_type_func = (l) -> cmdTpl.getDynamic("subscription-type", l);
if (cmdTpl.isStatic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getStatic("subscription_type");
} else if (cmdTpl.isDynamic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getDynamic("subscription_type", l);
} else {
subscription_type_func = (l) -> null;
}
LongFunction<String> consumer_name_func;
if (cmdTpl.isStatic("consumer-name")) {
consumer_name_func = (l) -> cmdTpl.getStatic("consumer-name");
} else if (cmdTpl.isDynamic("consumer-name")) {
consumer_name_func = (l) -> cmdTpl.getDynamic("consumer-name", l);
if (cmdTpl.isStatic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getStatic("consumer_name");
} else if (cmdTpl.isDynamic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getDynamic("consumer_name", l);
} else {
consumer_name_func = (l) -> null;
}
@ -254,19 +254,19 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> async_api_func
) {
LongFunction<String> reader_name_func;
if (cmdTpl.isStatic("reader-name")) {
reader_name_func = (l) -> cmdTpl.getStatic("reader-name");
} else if (cmdTpl.isDynamic("reader-name")) {
reader_name_func = (l) -> cmdTpl.getDynamic("reader-name", l);
if (cmdTpl.isStatic("reader_name")) {
reader_name_func = (l) -> cmdTpl.getStatic("reader_name");
} else if (cmdTpl.isDynamic("reader_name")) {
reader_name_func = (l) -> cmdTpl.getDynamic("reader_name", l);
} else {
reader_name_func = (l) -> null;
}
LongFunction<String> start_msg_pos_str_func;
if (cmdTpl.isStatic("start-msg-position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start-msg-position");
} else if (cmdTpl.isDynamic("start-msg-position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start-msg-position", l);
if (cmdTpl.isStatic("start_msg_position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start_msg_position");
} else if (cmdTpl.isDynamic("start_msg_position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start_msg_position", l);
} else {
start_msg_pos_str_func = (l) -> null;
}

View File

@ -10,14 +10,20 @@
# 2) Avro for messages with schema
schema.type=avro
schema.definition=file://<file/path/to/iot-example.avsc>
### Pulsar client related configurations - client.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
client.connectionTimeoutMs=5000
### Producer related configurations (global) - producer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
producer.producerName=
producer.topicName=
producer.sendTimeoutMs=
### Consumer related configurations (global) - consumer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
consumer.topicNames=
@ -26,6 +32,8 @@ consumer.subscriptionName=
consumer.subscriptionType=
consumer.consumerName=
consumer.receiverQueueSize=
### Reader related configurations (global) - reader.xxx
# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
# - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file>

View File

@ -287,7 +287,7 @@ as below:
- name: s1
optype: batch-msg-send-start
# For batch producer, "producer_name" should be associated with batch start
batch_producer_name: {batch_producer_name}
# batch_producer_name: {batch_producer_name}
ratio: 1
- name: s2
optype: batch-msg-send
@ -321,9 +321,9 @@ ratios: 1, <batch_num>, 1.
and payload to be put in the batch.
* (Mandatory) **optype (batch-msg-send)** is the statement identifier
for this statement
* (Optional) **msg-key**, when provided, specifies the key of the
* (Optional) **msg_key**, when provided, specifies the key of the
generated message
* (Mandatory) **msg-payload** specifies the payload of the generated
* (Mandatory) **msg_payload** specifies the payload of the generated
message
* (Optional) **ratio**, when provided, specifies the batch size (how
many messages to be put in one batch). If not provided, it is
@ -365,9 +365,9 @@ This command block only has 1 statements (s1):
this statement
* (Optional) **producer_name**, when provided, specifies the Pulsar
producer name that is associated with the message production.
* (Optional) **msg-key**, when provided, specifies the key of the
* (Optional) **msg_key**, when provided, specifies the key of the
generated message
* (Mandatory) **msg-payload** specifies the payload of the generated
* (Mandatory) **msg_payload** specifies the payload of the generated
message
### 1.4.4. Consumer Command Block
@ -397,8 +397,7 @@ This command block only has 1 statements (s1):
* (Mandatory) **optype (msg-consume)** is the statement identifier for
this statement
* (Optional) **topic_names**, when provided, specifies multiple topic
names from which to consume messages. Default to document level
parameter **topic_uri**.
names from which to consume messages for multi-topic message consumption.
* (Optional) **topics_pattern**, when provided, specifies pulsar
topic regex pattern for multi-topic message consumption
* (Mandatory) **subscription_name** specifies subscription name.
@ -407,6 +406,10 @@ This command block only has 1 statements (s1):
* (Optional) **consumer_name**, when provided, specifies the
associated consumer name.
**NOTE 1**: when both **topic_names** and **topics_pattern** are provided, **topic_names** takes precedence over **topics_pattern**.
**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**.
### 1.4.5. Reader Command Block
This is the regular Pulsar reader command block that reads one Pulsar