Merge pull request #301 from yabinmeng/main

Bug fix of Pulsar Async API
This commit is contained in:
Jonathan Shook 2021-04-09 20:47:46 -05:00 committed by GitHub
commit 44d3920c4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 16 additions and 10 deletions

View File

@ -7,10 +7,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminNamespaceOp extends PulsarAdminOp {

View File

@ -81,7 +81,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label))
asyncApiFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic("PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label"));
asyncApiFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
}

View File

@ -1,5 +1,6 @@
#
# Results:
# - 10 tenants
# - 2 namespaces per tanant
# - 5 topics per namespace
#------------------------------------------------------
@ -23,12 +24,13 @@
#tenant=tenant_1 namespace=default_1 core_topic_name=t2
#tenant=tenant_1 namespace=default_1 core_topic_name=t3
#tenant=tenant_1 namespace=default_1 core_topic_name=t4
# ... ...
bindings:
# message key and value
#mykey: NumberNameToString()
#myvalue: AlphaNumericString(20)
# Admin API - create tenant, namespace, and topic
tenant: Mod(1000); Div(10L); ToString(); Prefix("tenant_")
tenant: Mod(100); Div(10L); ToString(); Prefix("tenant_")
namespace: Mod(10); Div(5L); ToString(); Prefix("default_")
core_topic_name: Mod(5); ToString(); Prefix("t")

View File

@ -8,13 +8,17 @@
# TODO: as a starting point, only supports the following types
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
# 2) Avro for messages with schema
schema.type=avro
schema.definition=file://<file/path/to/iot-example.avsc>
schema.type=
schema.definition=
### Pulsar client related configurations - client.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
client.connectionTimeoutMs=5000
client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
# Cluster admin
client.authParams=
client.tlsAllowInsecureConnection=true
### Producer related configurations (global) - producer.xxx
@ -22,6 +26,9 @@ client.connectionTimeoutMs=5000
producer.producerName=
producer.topicName=
producer.sendTimeoutMs=
producer.blockIfQueueFull=true
producer.maxPendingMessages=5000
producer.batchingMaxMessages=5000
### Consumer related configurations (global) - consumer.xxx

View File

@ -1,7 +1,7 @@
bindings:
# message key and value
mykey: NumberNameToString()
myvalue: AlphaNumericString(20)
mykey:
myvalue: NumberNameToString() #AlphaNumericString(20)
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
@ -39,7 +39,7 @@ blocks:
- name: s1
optype: msg-send
# producer_name: {producer_name}
msg_key: "{mykey}"
msg_key:
msg_value: "{myvalue}"
- name: consumer-block