From bc7dce5b04db6fa90cb75e6ca1ac6cd7b12ae5ae Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Mon, 29 Mar 2021 10:11:28 -0500 Subject: [PATCH 1/3] Document update to reflect Admin API change --- driver-pulsar/src/main/resources/pulsar.md | 40 ++++++++++------------ 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index 4e2d253d9..ea08144a0 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -141,10 +141,10 @@ params: blocks: - name: tags: - phase: + phase: statements: - name: - optype: + optype: ... ... - name: ... ... @@ -242,36 +242,34 @@ cycle level, **the ycle level setting will take priority!** **NOTE**: this functionality is only partially implemented at the moment and doesn't function yet. -Currently, the Pulsar Admin API Block is (planned) to only support -creating Pulsar tenants and namespaces. It has the following format: +Currently, the Pulsar Admin API Block is only supporting creating Pulsar tenants and namespaces. It has the following format: ```yaml - name: admin-block tags: - phase: create-tenant-namespace + phase: admin-api statements: - name: s1 - optype: create-tenant + optype: admin + allowed_clusters: + admin_roles: tenant: "{tenant}" - - name: s2 - optype: create-namespace - namespace: "{namespace}" + namespace: "default" ``` -In this command block, there are 2 statements (s1 and s2): +In this command block, there is only 1 statement (s1): -* Statement **s1** is used for creating a Pulsar tenant - * (Mandatory) **optype (create-tenant)** is the statement identifier +* Statement **s1** is used for creating a Pulsar tenant and a namespace + * (Mandatory) **optype (admin)** is the statement identifier for this statement - * (Mandatory) **tenant** is the only statement parameter that - specifies the Pulsar tenant name which can either be dynamically - bound or statically assigned. -* Statement **s2** is used for creating a Pulsar namespace - * (Mandatory) **optype (create-namespace)** is the statement - identifier for this statement - * (Mandatory) **namespace** is the only statement parameter that - specifies the Pulsar namespace under the tenant created by statement - s1. Its name can either be dynamically bound or statically assigned. + * (Optional) **allowed_clusters** must be statically bound and it + specifies the cluster list that is allowed for a tenant. + * (Optional) **admin_roles** must be statically bound and it specifies + the super user role that is associated with a tenant. + * (Mandatory) **tenant** is the Pulsar tenant name to be created. It + can either be dynamically or statically bound. + * (Mandatory) **namespace** is the Pulsar namespace name to be created + under the above tenant. It also can be dynamically or statically bound. ### 1.4.2. Batch Producer Command Block From c97b57a4889df6ccfc674bb655f96175d1a9b987 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Mon, 29 Mar 2021 21:01:26 -0500 Subject: [PATCH 2/3] Add Pulsar Admin API support for creating topics (partitioned or not) --- .../driver/pulsar/PulsarActivity.java | 24 ++++- ...r.java => PulsarAdminCrtTennamMapper.java} | 17 ++-- ...minOp.java => PulsarAdminCrtTennamOp.java} | 23 ++--- .../pulsar/ops/PulsarAdminCrtTopMapper.java | 66 ++++++++++++++ .../pulsar/ops/PulsarAdminCrtTopOp.java | 60 ++++++++++++ .../driver/pulsar/ops/ReadyPulsarOp.java | 91 ++++++++++--------- .../pulsar/util/PulsarActivityUtil.java | 5 +- .../src/main/resources/activities/pulsar.yaml | 21 ++++- driver-pulsar/src/main/resources/pulsar.md | 78 +++++++++++----- 9 files changed, 284 insertions(+), 101 deletions(-) rename driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/{PulsarAdminMapper.java => PulsarAdminCrtTennamMapper.java} (74%) rename driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/{PulsarAdminOp.java => PulsarAdminCrtTennamOp.java} (76%) create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopMapper.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index 5feb202b1..e9909ded2 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -20,7 +20,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.shade.org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; public class PulsarActivity extends SimpleActivity implements ActivityDefObserver { @@ -77,18 +83,28 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr); if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) { - adminBuilder = adminBuilder.authentication(authPluginClassName, authParams); +// String tokenFileName = StringUtils.removeStart(authParams, "file://"); +// File tokenFile = new File(tokenFileName); +// String token; +// try { +// token = FileUtils.readFileToString(tokenFile, StandardCharsets.UTF_8); +// token = StringUtils.normalizeSpace(token); +// } +// catch (IOException ioe) { +// throw new RuntimeException("Failed to read the specified (\"client.authParams\") token file: " + tokenFileName + "!"); +// } +// adminBuilder.authentication(AuthenticationFactory.token(token)); + adminBuilder.authentication(authPluginClassName, authParams); } if ( useTls ) { - adminBuilder = adminBuilder + adminBuilder .useKeyStoreTls(useTls) .allowTlsInsecureConnection(tlsAllowInsecureConnection) .enableTlsHostnameVerification(tlsHostnameVerificationEnable); if (!StringUtils.isBlank(tlsTrustCertsFilePath)) - adminBuilder = adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); - + adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); } pulsarAdmin = adminBuilder.build(); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamMapper.java similarity index 74% rename from driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java rename to driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamMapper.java index ac47afeaa..f821061fc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamMapper.java @@ -2,7 +2,6 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; -import org.apache.pulsar.client.api.Producer; import java.util.Set; import java.util.function.LongFunction; @@ -17,18 +16,18 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarAdminMapper extends PulsarOpMapper { +public class PulsarAdminCrtTennamMapper extends PulsarOpMapper { private final LongFunction> adminRolesFunc; private final LongFunction> allowedClustersFunc; private final LongFunction tenantFunc; private final LongFunction namespaceFunc; - public PulsarAdminMapper(CommandTemplate cmdTpl, - PulsarSpace clientSpace, - LongFunction> adminRolesFunc, - LongFunction> allowedClustersFunc, - LongFunction tenantFunc, - LongFunction namespaceFunc) { + public PulsarAdminCrtTennamMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction> adminRolesFunc, + LongFunction> allowedClustersFunc, + LongFunction tenantFunc, + LongFunction namespaceFunc) { super(cmdTpl, clientSpace); this.adminRolesFunc = adminRolesFunc; this.allowedClustersFunc = allowedClustersFunc; @@ -43,7 +42,7 @@ public class PulsarAdminMapper extends PulsarOpMapper { String tenant = tenantFunc.apply(value); String namespace = namespaceFunc.apply(value); - return new PulsarAdminOp( + return new PulsarAdminCrtTennamOp( clientSpace, adminRoleSet, allowedClusterSet, diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java similarity index 76% rename from driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java rename to driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java index 8614b294c..7020c9924 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java @@ -1,27 +1,18 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarSpace; -import io.nosqlbench.driver.pulsar.util.AvroUtil; -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.schema.SchemaType; -import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -public class PulsarAdminOp implements PulsarOp { +public class PulsarAdminCrtTennamOp implements PulsarOp { - private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class); + private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTennamOp.class); private final PulsarSpace clientSpace; private final Set adminRoleSet; @@ -29,11 +20,11 @@ public class PulsarAdminOp implements PulsarOp { private final String tenant; private final String namespace; - public PulsarAdminOp(PulsarSpace clientSpace, - Set adminRoleSet, - Set allowedClusterSet, - String tenant, - String namespace) { + public PulsarAdminCrtTennamOp(PulsarSpace clientSpace, + Set adminRoleSet, + Set allowedClusterSet, + String tenant, + String namespace) { this.clientSpace = clientSpace; this.adminRoleSet = adminRoleSet; this.allowedClusterSet = allowedClusterSet; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopMapper.java new file mode 100644 index 000000000..a236e4cd6 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopMapper.java @@ -0,0 +1,66 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.function.LongFunction; + +/** + * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains + * enough state to define a pulsar operation such that it can be executed, measured, and possibly + * retried if needed. + * + * This function doesn't act *as* the operation. It merely maps the construction logic into + * a simple functional type, given the component functions. + * + * For additional parameterization, the command template is also provided. + */ +public class PulsarAdminCrtTopMapper extends PulsarOpMapper { + private final LongFunction topicUriFunc; + private final LongFunction enablePartionFunc; + private final LongFunction partitionNumFunc; + + public PulsarAdminCrtTopMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction topicUriFunc, + LongFunction enablePartionFunc, + LongFunction partitionNumFunc) { + super(cmdTpl, clientSpace); + this.topicUriFunc = topicUriFunc; + this.enablePartionFunc = enablePartionFunc; + this.partitionNumFunc = partitionNumFunc; + } + + @Override + public PulsarOp apply(long value) { + String topicUri = topicUriFunc.apply(value); + String enablePartitionStr = enablePartionFunc.apply(value); + String partitionNumStr = partitionNumFunc.apply(value); + + if ( StringUtils.isBlank(topicUri) ) { + throw new RuntimeException("\"topic_uri\" parameter can't be empty when creating a Pulsar topic!"); + } + + boolean partitionTopic = BooleanUtils.toBoolean(enablePartitionStr); + + boolean invalidPartStr; + int partitionNum = 0; + if ( StringUtils.isBlank(partitionNumStr) || !StringUtils.isNumeric(partitionNumStr) ) { + invalidPartStr = true; + } else { + partitionNum = Integer.valueOf(partitionNumStr); + invalidPartStr = (partitionNum <= 0); + } + if (partitionTopic && invalidPartStr) { + throw new RuntimeException("Invalid specified value for \"partition_num\" parameter when creating partitioned topic!"); + } + + return new PulsarAdminCrtTopOp( + clientSpace, + topicUri, + partitionTopic, + partitionNum); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java new file mode 100644 index 000000000..483580ba3 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java @@ -0,0 +1,60 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; + +public class PulsarAdminCrtTopOp implements PulsarOp { + + private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTopOp.class); + + private final PulsarSpace clientSpace; + private final String topicUri; + private final boolean partitionTopic; + private final int partitionNum; + + public PulsarAdminCrtTopOp(PulsarSpace clientSpace, + String topicUri, + boolean partitionTopic, + int partitionNum) { + this.clientSpace = clientSpace; + this.topicUri = topicUri; + this.partitionTopic = partitionTopic; + this.partitionNum = partitionNum; + } + + private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) { + int statusCode = e.getStatusCode(); + + // 409 conflict: resource already exists + if ( (statusCode >= 400) && (statusCode != 409) ) { + throw new RuntimeException(finalErrMsg); + } + } + + @Override + public void run() { + PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin(); + + Topics topics = pulsarAdmin.topics(); + + try { + if (!partitionTopic) { + topics.createNonPartitionedTopic(topicUri); + } + else { + topics.createPartitionedTopic(topicUri, partitionNum); + } + } catch (PulsarAdminException e) { + String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d", + topicUri, + partitionTopic, + partitionNum); + + processPulsarAdminException(e, errMsg); + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 8fabe1ef1..05e63be0a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -75,53 +75,32 @@ public class ReadyPulsarOp implements OpDispenser { throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); } - // Global parameter: topic_uri (applies only to non-Admin API) + // Global parameter: topic_uri LongFunction topicUriFunc = (l) -> null; - // Global parameter: async_api (applies only to non-Admin API) - LongFunction asyncApiFunc = (l) -> false; - if (!StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label)) { - if (cmdTpl.containsKey("topic_uri")) { - if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) { - throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'."); - } else if (cmdTpl.isStatic("topic_uri")) { - topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri"); - } else { - topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l); - } - } else if (cmdTpl.containsKey("topic")) { - if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) { - String persistence = cmdTpl.getStaticOr("persistence", "persistent") - .replaceAll("true", "persistent"); - - String tenant = cmdTpl.getStaticOr("tenant", "public"); - String namespace = cmdTpl.getStaticOr("namespace", "default"); - String topic = cmdTpl.getStaticOr("topic", ""); - - String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic; - topicUriFunc = (l) -> composited; - } else { // some or all dynamic fields, composite into a single dynamic call - topicUriFunc = (l) -> - cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent") - + "://" + cmdTpl.getOr("tenant", l, "public") - + "/" + cmdTpl.getOr("namespace", l, "default") - + "/" + cmdTpl.getOr("topic", l, ""); - } - } - - if (cmdTpl.containsKey("async_api")) { - if (cmdTpl.isStatic("async_api")) - asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api")); - else - throw new RuntimeException("\"async_api\" parameter cannot be dynamic!"); + if (cmdTpl.containsKey("topic_uri")) { + if (cmdTpl.isStatic("topic_uri")) { + topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri"); } else { - asyncApiFunc = (l) -> false; + topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l); } } + // Global parameter: async_api + LongFunction asyncApiFunc = (l) -> false; + + if (cmdTpl.containsKey("async_api")) { + if (cmdTpl.isStatic("async_api")) + asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api")); + else + throw new RuntimeException("\"async_api\" parameter cannot be dynamic!"); + } + // TODO: Complete implementation for websocket-producer and managed-ledger - if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label) ) { - return resolveAdminRequest(clientSpace); + if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TENNAME.label) ) { + return resolveAdminCrtTenname(clientSpace); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TOP.label)) { + return resolveAdminCrtParttop(clientSpace, topicUriFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) { return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { @@ -139,7 +118,8 @@ public class ReadyPulsarOp implements OpDispenser { } } - private LongFunction resolveAdminRequest(PulsarSpace clientSpace) { + // Admin API: create tenant and namespace + private LongFunction resolveAdminCrtTenname(PulsarSpace clientSpace) { if ( cmdTpl.isDynamic("admin_roles") || cmdTpl.isDynamic("allowed_clusters") ) { throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!"); @@ -187,7 +167,7 @@ public class ReadyPulsarOp implements OpDispenser { namespaceFunc = (l) -> null; } - return new PulsarAdminMapper( + return new PulsarAdminCrtTennamMapper( cmdTpl, clientSpace, adminRolesFunc, @@ -196,6 +176,33 @@ public class ReadyPulsarOp implements OpDispenser { namespaceFunc); } + // Admin API: create partitioned topic + private LongFunction resolveAdminCrtParttop( + PulsarSpace clientSpace, + LongFunction topic_uri_fun + ) { + LongFunction enablePartionFunc = (l) -> null; + if (cmdTpl.isStatic("enable_partition")) { + enablePartionFunc = (l) -> cmdTpl.getStatic("enable_partition"); + } else if (cmdTpl.isDynamic("enable_partition")) { + enablePartionFunc = (l) -> cmdTpl.getDynamic("enable_partition", l); + } + + LongFunction partitionNumFunc = (l) -> null; + if (cmdTpl.isStatic("partition_num")) { + partitionNumFunc = (l) -> cmdTpl.getStatic("partition_num"); + } else if (cmdTpl.isDynamic("partition_num")) { + partitionNumFunc = (l) -> cmdTpl.getDynamic("partition_num", l); + } + + return new PulsarAdminCrtTopMapper( + cmdTpl, + clientSpace, + topic_uri_fun, + enablePartionFunc, + partitionNumFunc); + } + private LongFunction resolveMsgSend( PulsarSpace clientSpace, LongFunction topic_uri_func, diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index fa898b02f..060452115 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -26,7 +26,8 @@ public class PulsarActivityUtil { // Supported message operation types // TODO: websocket-producer and managed-ledger public enum OP_TYPES { - ADMIN("admin"), + ADMIN_CRT_TENNAME("admin-crt-tennam"), + ADMIN_CRT_TOP("admin-crt-top"), BATCH_MSG_SEND_START("batch-msg-send-start"), BATCH_MSG_SEND("batch-msg-send"), BATCH_MSG_SEND_END("batch-msg-send-end"), @@ -68,7 +69,7 @@ public class PulsarActivityUtil { public enum CLNT_CONF_KEY { serviceUrl("serviceUrl"), authPulginClassName("authPluginClassName"), - authParams("AuthParams"), + authParams("authParams"), pperationTimeoutMs("operationTimeoutMs"), statsIntervalSeconds("statsIntervalSeconds"), numIoThreads("numIoThreads"), diff --git a/driver-pulsar/src/main/resources/activities/pulsar.yaml b/driver-pulsar/src/main/resources/activities/pulsar.yaml index 4c20423f7..b89d120f3 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar.yaml @@ -19,20 +19,32 @@ params: async_api: "false" blocks: - - name: admin-block + - name: create-tennam-block tags: - phase: admin-api + phase: create-tenant-namespace + admin_task: true statements: - name: s1 - optype: admin + optype: admin-crt-tennam admin_roles: allowed_clusters: tenant: "{tenant}" namespace: "default" + - name: create-parttop-block + tags: + phase: create-topic + admin_task: true + statements: + - name: s1 + optype: admin-crt-top + enable_partition: "true" + partition_num: "5" + - name: batch-producer-block tags: phase: batch-producer + admin_task: false statements: - name: s1 optype: batch-msg-send-start @@ -57,6 +69,7 @@ blocks: - name: producer-block tags: phase: producer + admin_task: false statements: - name: s1 optype: msg-send @@ -73,6 +86,7 @@ blocks: - name: consumer-block tags: phase: consumer + admin_task: false statements: - name: s1 optype: msg-consume @@ -85,6 +99,7 @@ blocks: - name: reader-block tags: phase: reader + admin_task: false statements: - name: s1 optype: msg-read diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index ea08144a0..225a38eb2 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -4,15 +4,16 @@ - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) - [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters) - [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details) - - [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block) - - [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block) - - [1.4.3. Producer Command Block](#143-producer-command-block) - - [1.4.4. Consumer Command Block](#144-consumer-command-block) - - [1.4.5. Reader Command Block](#145-reader-command-block) + - [1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace](#141-pulsar-admin-api-command-block---create-tenant-and-namespace) + - [1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)](#142-pulsar-admin-api-command-block---create-topic-partitioned-or-regular) + - [1.4.3. Batch Producer Command Block](#143-batch-producer-command-block) + - [1.4.4. Producer Command Block](#144-producer-command-block) + - [1.4.5. Consumer Command Block](#145-consumer-command-block) + - [1.4.6. Reader Command Block](#146-reader-command-block) - [1.5. Schema Support](#15-schema-support) - [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters) - [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example) - - [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) + - [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) - [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - [2.1. Other Activity Parameters](#21-other-activity-parameters) - [2.2. API Caching](#22-api-caching) @@ -72,25 +73,25 @@ There are multiple sections in this file that correspond to different groups of * This section defines all configuration settings that are related with defining a PulsarClient object. * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) * **Pulsar Producer related settings**: * All settings under this section starts with **producer** prefix. * This section defines all configuration settings that are related with defining a Pulsar Producer object. * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) * **Pulsar Consumer related settings**: * All settings under this section starts with **consumer** prefix. * This section defines all configuration settings that are related with defining a Pulsar Consumer object. * - See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) + See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) * **Pulsar Reader related settings**: * All settings under this section starts with **reader** prefix. * This section defines all configuration settings that are related with defining a Pulsar Reader object. * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) In the future, when the support for other types of Pulsar workloads is added in NB Pulsar driver, there will be corresponding configuration @@ -237,22 +238,20 @@ cycle level, **the ycle level setting will take priority!** ## 1.4. Pulsar Driver Yaml File - Command Block Details -### 1.4.1. Pulsar Admin API Command Block +### 1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace -**NOTE**: this functionality is only partially implemented at the moment -and doesn't function yet. - -Currently, the Pulsar Admin API Block is only supporting creating Pulsar tenants and namespaces. It has the following format: +This Pulsar Admin API Block is used to create Pulsar tenants and namespaces. It has the following format: ```yaml - - name: admin-block + - name: create-tennam-block tags: - phase: admin-api + phase: create-tenant-namespace + admin_task: true statements: - name: s1 - optype: admin - allowed_clusters: + optype: admin-crt-tennam admin_roles: + allowed_clusters: tenant: "{tenant}" namespace: "default" ``` @@ -260,7 +259,7 @@ Currently, the Pulsar Admin API Block is only supporting creating Pulsar tenants In this command block, there is only 1 statement (s1): * Statement **s1** is used for creating a Pulsar tenant and a namespace - * (Mandatory) **optype (admin)** is the statement identifier + * (Mandatory) **optype (admin-crt-tennam)** is the statement identifier for this statement * (Optional) **allowed_clusters** must be statically bound and it specifies the cluster list that is allowed for a tenant. @@ -271,7 +270,36 @@ In this command block, there is only 1 statement (s1): * (Mandatory) **namespace** is the Pulsar namespace name to be created under the above tenant. It also can be dynamically or statically bound. -### 1.4.2. Batch Producer Command Block +### 1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular) + +This Pulsar Admin API Block is used to create Pulsar topics. It has the following format: + +```yaml + - name: create-parttop-block + tags: + phase: create-topic + admin_task: true + statements: + - name: s1 + optype: admin-crt-top + enable_partition: "true" + partition_num: "5" +``` + +In this command block, there is only 1 statement (s1): + +* Statement **s1** is used for creating a Pulsar tenant and a namespace + * (Mandatory) **optype (admin-crt-top)** is the statement identifier + for this statement + * (Mandatory) **enable_partition** specifies whether to create a + partitioned topic. It can either be dynamically or statically bound. + * (Mandatory) **partition_num** specifies the number of partitions if + a partitioned topic is to be created. It also can be dynamically or + statically bound. + +**NOTE**: The topic name is bounded by the document level parameter "topic_uri". + +### 1.4.3. Batch Producer Command Block Batch producer command block is used to produce a batch of messages all at once by one NB cycle execution. A typical format of this command block is @@ -332,7 +360,7 @@ ratios: 1, , 1. * (Optional) **ratio**, when provided, MUST be 1. If not provided, it is default to 1. -### 1.4.3. Producer Command Block +### 1.4.4. Producer Command Block This is the regular Pulsar producer command block that produces one Pulsar message per NB cycle execution. A typical format of this command block is @@ -368,7 +396,7 @@ This command block only has 1 statements (s1): * (Mandatory) **msg_payload** specifies the payload of the generated message -### 1.4.4. Consumer Command Block +### 1.4.5. Consumer Command Block This is the regular Pulsar consumer command block that consumes one Pulsar message per NB cycle execution. A typical format of this command block is @@ -408,7 +436,7 @@ This command block only has 1 statements (s1): **NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**. -### 1.4.5. Reader Command Block +### 1.4.6. Reader Command Block This is the regular Pulsar reader command block that reads one Pulsar message per NB cycle execution. A typical format of this command block is @@ -526,7 +554,7 @@ environment. ``` -## Appendix A. Template Global Setting File (config.properties) +## 1.8. Appendix A. Template Global Setting File (config.properties) ```properties schema.type = schema.definition = From 4b46c7ef830709d1c5a2aac67c6e4a42a49def1d Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Mon, 29 Mar 2021 22:04:24 -0500 Subject: [PATCH 3/3] Minor merge conflict --- .../io/nosqlbench/driver/pulsar/PulsarSpace.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index 492a282e2..6014ae21d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -6,6 +6,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.Clusters; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; @@ -58,11 +59,14 @@ public class PulsarSpace { createPulsarSchemaFromConf(); try { - List stringList = pulsarAdmin.clusters().getClusters(); + Clusters clusters = pulsarAdmin.clusters(); + List stringList = clusters.getClusters(); CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator()); } catch (PulsarAdminException e) { - throw new RuntimeException("Failed to get Pulsar cluster metadata!"); + String errMsg = "Fail to create PulsarClient from global configuration: " + e.getMessage(); + logger.error(errMsg); + throw new RuntimeException(errMsg); } } @@ -79,8 +83,9 @@ public class PulsarSpace { .serviceUrl(pulsarSvcUrl) .build(); } catch (PulsarClientException pce) { - logger.error("Fail to create PulsarClient from global configuration!"); - throw new RuntimeException("Fail to create PulsarClient from global configuration!"); + String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage(); + logger.error(errMsg); + throw new RuntimeException(errMsg); } }