From a130ffb8ddf6c89b9ce4fa342589b5eba9322c28 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Wed, 31 Mar 2021 20:18:00 -0500 Subject: [PATCH] Admin API update - check tenant/namespace/topic existence before creating it. --- .../pulsar/ops/PulsarAdminCrtTennamOp.java | 66 ++++++++--- .../pulsar/ops/PulsarAdminCrtTopOp.java | 47 ++++++-- .../resources/activities/bindingtest.yaml | 34 ++++++ .../main/resources/activities/pulsar2.yaml | 106 ++++++++++++++++++ driver-pulsar/src/main/resources/pulsar.md | 14 ++- 5 files changed, 238 insertions(+), 29 deletions(-) create mode 100644 driver-pulsar/src/main/resources/activities/bindingtest.yaml create mode 100644 driver-pulsar/src/main/resources/activities/pulsar2.yaml diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java index 30c0f717e..15dc4b727 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java @@ -4,10 +4,14 @@ import io.nosqlbench.driver.pulsar.PulsarSpace; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.common.policies.data.TenantInfo; +import java.util.ArrayList; +import java.util.List; import java.util.Set; public class PulsarAdminCrtTennamOp extends SyncPulsarOp { @@ -49,28 +53,60 @@ public class PulsarAdminCrtTennamOp extends SyncPulsarOp { PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin(); if (!StringUtils.isBlank(tenant)) { - TenantInfo tenantInfo = new TenantInfo(); - tenantInfo.setAdminRoles(adminRoleSet); - - if ( !allowedClusterSet.isEmpty() ) { - tenantInfo.setAllowedClusters(allowedClusterSet); - } - else { - tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata()); - } + Tenants tenants = pulsarAdmin.tenants(); + // Check if the tenant already exists + TenantInfo tenantInfo = null; try { - pulsarAdmin.tenants().createTenant(tenant, tenantInfo); - } catch (PulsarAdminException e) { - processPulsarAdminException(e, "Failed to create pulsar tenant: " + tenant); + tenantInfo = pulsarAdmin.tenants().getTenantInfo(tenant); + } + catch (PulsarAdminException.NotFoundException nfe) { + // do nothing + } + catch (PulsarAdminException e) { + processPulsarAdminException(e, "Failed to retrieve tenant info. for pulsar tenant: " + tenant); + } + + if (tenantInfo == null) { + tenantInfo = new TenantInfo(); + tenantInfo.setAdminRoles(adminRoleSet); + + if ( !allowedClusterSet.isEmpty() ) { + tenantInfo.setAllowedClusters(allowedClusterSet); + } else { + tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata()); + } + + try { + tenants.createTenant(tenant, tenantInfo); + } catch (PulsarAdminException e) { + processPulsarAdminException(e, "Failed to create pulsar tenant: " + tenant); + } } } if (!StringUtils.isBlank(namespace)) { + Namespaces namespaces = pulsarAdmin.namespaces(); + + List nsListWorkingArea = new ArrayList<>(); try { - pulsarAdmin.namespaces().createNamespace(tenant + "/" + namespace); - } catch (PulsarAdminException e) { - processPulsarAdminException(e, "Failed to create pulsar namespace: " + tenant + "/" + namespace); + nsListWorkingArea = namespaces.getNamespaces(tenant); + } + catch (PulsarAdminException.NotFoundException nfe) { + // do nothing + } + catch (PulsarAdminException e) { + processPulsarAdminException(e, "Failed to retrieve namespace info. for pulsar tenant: " + tenant); + } + + // If te specified namespace doesn't exist yet, create it! + String fullNsName = tenant + "/" + namespace; + if (nsListWorkingArea.isEmpty() || !nsListWorkingArea.contains(fullNsName)) { + try { + namespaces.createNamespace(fullNsName); + } catch (PulsarAdminException e) { + processPulsarAdminException(e, "Failed to create pulsar namespace: " + fullNsName); + } } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java index 5165d333d..8e30a804a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java @@ -1,12 +1,16 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarSpace; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; +import java.util.ArrayList; +import java.util.List; + public class PulsarAdminCrtTopOp extends SyncPulsarOp { @@ -16,6 +20,7 @@ public class PulsarAdminCrtTopOp extends SyncPulsarOp { private final String topicUri; private final boolean partitionTopic; private final int partitionNum; + private final String fullNsName; public PulsarAdminCrtTopOp(PulsarSpace clientSpace, String topicUri, @@ -25,6 +30,13 @@ public class PulsarAdminCrtTopOp extends SyncPulsarOp { this.topicUri = topicUri; this.partitionTopic = partitionTopic; this.partitionNum = partitionNum; + + // Get tenant/namespace string + // - topicUri : persistent://// + // - tmpStr : // + // - fullNsName : / + String tmpStr = StringUtils.substringAfter(this.topicUri,"://"); + this.fullNsName = StringUtils.substringBeforeLast(tmpStr, "/"); } private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) { @@ -41,21 +53,40 @@ public class PulsarAdminCrtTopOp extends SyncPulsarOp { PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin(); Topics topics = pulsarAdmin.topics(); + List topicListWorkingArea = new ArrayList<>(); try { if (!partitionTopic) { - topics.createNonPartitionedTopic(topicUri); + topicListWorkingArea = topics.getList(fullNsName); } else { - topics.createPartitionedTopic(topicUri, partitionNum); + topicListWorkingArea = topics.getPartitionedTopicList(fullNsName); } - } catch (PulsarAdminException e) { - String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d", - topicUri, - partitionTopic, - partitionNum); + } + catch (PulsarAdminException.NotFoundException nfe) { + // do nothing + } + catch (PulsarAdminException e) { + processPulsarAdminException(e, "Failed to retrieve topic info.for pulsar namespace: " + fullNsName); + } - processPulsarAdminException(e, errMsg); + // If the topic doesn't exist, create it. + if (topicListWorkingArea.isEmpty() || !topicListWorkingArea.contains(topicUri)) { + try { + if (!partitionTopic) { + topics.createNonPartitionedTopic(topicUri); + } + else { + topics.createPartitionedTopic(topicUri, partitionNum); + } + } catch (PulsarAdminException e) { + String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d", + topicUri, + partitionTopic, + partitionNum); + + processPulsarAdminException(e, errMsg); + } } } } diff --git a/driver-pulsar/src/main/resources/activities/bindingtest.yaml b/driver-pulsar/src/main/resources/activities/bindingtest.yaml new file mode 100644 index 000000000..50c4c293e --- /dev/null +++ b/driver-pulsar/src/main/resources/activities/bindingtest.yaml @@ -0,0 +1,34 @@ +# +# Results: +# - 2 namespaces per tanant +# - 5 topics per namespace +#------------------------------------------------------ +#tenant=tenant_0 namespace=default_0 core_topic_name=t0 +#tenant=tenant_0 namespace=default_0 core_topic_name=t1 +#tenant=tenant_0 namespace=default_0 core_topic_name=t2 +#tenant=tenant_0 namespace=default_0 core_topic_name=t3 +#tenant=tenant_0 namespace=default_0 core_topic_name=t4 +#tenant=tenant_0 namespace=default_1 core_topic_name=t0 +#tenant=tenant_0 namespace=default_1 core_topic_name=t1 +#tenant=tenant_0 namespace=default_1 core_topic_name=t2 +#tenant=tenant_0 namespace=default_1 core_topic_name=t3 +#tenant=tenant_0 namespace=default_1 core_topic_name=t4 +#tenant=tenant_1 namespace=default_0 core_topic_name=t0 +#tenant=tenant_1 namespace=default_0 core_topic_name=t1 +#tenant=tenant_1 namespace=default_0 core_topic_name=t2 +#tenant=tenant_1 namespace=default_0 core_topic_name=t3 +#tenant=tenant_1 namespace=default_0 core_topic_name=t4 +#tenant=tenant_1 namespace=default_1 core_topic_name=t0 +#tenant=tenant_1 namespace=default_1 core_topic_name=t1 +#tenant=tenant_1 namespace=default_1 core_topic_name=t2 +#tenant=tenant_1 namespace=default_1 core_topic_name=t3 +#tenant=tenant_1 namespace=default_1 core_topic_name=t4 + +bindings: + # message key and value + #mykey: NumberNameToString() + #myvalue: AlphaNumericString(20) + # Admin API - create tenant, namespace, and topic + tenant: Mod(1000); Div(10L); ToString(); Prefix("tenant_") + namespace: Mod(10); Div(5L); ToString(); Prefix("default_") + core_topic_name: Mod(5); ToString(); Prefix("t") diff --git a/driver-pulsar/src/main/resources/activities/pulsar2.yaml b/driver-pulsar/src/main/resources/activities/pulsar2.yaml new file mode 100644 index 000000000..cf1839da8 --- /dev/null +++ b/driver-pulsar/src/main/resources/activities/pulsar2.yaml @@ -0,0 +1,106 @@ +description: | + Test workload for new pulsar driver. + There is no default scenario. You must specify one of the named scenarios + below like send100 or recv100. + You can specify the number of tenants like `tenants=100`. This is the default. + +bindings: + # message key and value + mykey: NumberNameToString() + myvalue: AlphaNumericString(20) + # Admin API - create tenant, namespace, and topic + tenant: Mod(1000); Div(10L); ToString(); Prefix("tenant_") + namespace: Mod(10); Div(5L); ToString(); Prefix("default_") + core_topic_name: Mod(5); ToString(); Prefix("t") + +# document level parameters that apply to all Pulsar client types: +params: + topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" + async_api: "false" + +blocks: + - name: create-tennam-block + tags: + phase: create-tenant-namespace + admin_task: true + statements: + - name: s1 + optype: admin-crt-tennam + admin_roles: + allowed_clusters: + tenant: "{tenant}" + namespace: "{namespace}" + + - name: create-parttop-block + tags: + phase: create-topic + admin_task: true + statements: + - name: s1 + optype: admin-crt-top + enable_partition: "true" + partition_num: "5" + + - name: batch-producer-block + tags: + phase: batch-producer + admin_task: false + statements: + - name: s1 + optype: batch-msg-send-start + # For batch producer, "producer_name" should be associated with batch start + # batch_producer_name: {batch_producer_name} + ratio: 1 + - name: s2 + optype: batch-msg-send + msg_key: "{mykey}" + msg_value: "{myvalue}" + ratio: 100 + - name: s3 + optype: batch-msg-send-end + ratio: 1 + + - name: producer-block + tags: + phase: producer + admin_task: false + statements: + - name: s1 + optype: msg-send + # producer_name: {producer_name} + msg_key: "{mykey}" + msg_value: "{myvalue}" + + - name: consumer-block + tags: + phase: consumer + admin_task: false + statements: + - name: s1 + optype: msg-consume + topic_names: + topics_pattern: + subscription_name: "mynbsub_test" + subscription_type: + consumer_name: + + - name: reader-block + tags: + phase: reader + admin_task: false + statements: + - name: s1 + optype: msg-read + reader_name: + +# - websocket-producer: +# tags: +# type: websocket-produer +# statements: +# - websocket-producer-stuff: +# +# - managed-ledger: +# tags: +# type: managed-ledger +# statement: +# - managed-ledger-stuff: diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index 225a38eb2..ff2da2cea 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -22,6 +22,8 @@ # 1. NoSQLBench (NB) Pulsar Driver Overview This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB). +* Admin API - create tenant and namespace +* Admin API - create topic, partition or not * Producer * Consumer * Reader @@ -61,7 +63,7 @@ There are multiple sections in this file that correspond to different groups of consuming. This section defines configuration settings that are schema related. * There are 2 valid options under this section. - * *shcema.type*: Pulsar message schema type. When unset or set as + * *schema.type*: Pulsar message schema type. When unset or set as an empty string, Pulsar messages will be handled in raw *byte[]* format. The other valid option is **avro** which the Pulsar message will follow a specific Avro format. @@ -234,7 +236,7 @@ producers/consumers/readers/etc all at once within one NB activity. This makes the testing more flexible and effective. **NOTE**: when a configuration is set at both the global level and the -cycle level, **the ycle level setting will take priority!** +cycle level, **the cycle level setting will take priority!** ## 1.4. Pulsar Driver Yaml File - Command Block Details @@ -537,20 +539,20 @@ environment. 1. Run Pulsar producer API to produce 100K messages using 100 NB threads ```bash - run driver=pulsar tags=phase:producer threads=100 cycles=100K service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml + run driver=pulsar tags=phase:producer threads=100 cycles=100K web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml ``` 2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads; put NB execution metrics in a specified metrics folder ```bash - run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml --report-csv-to + run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml --report-csv-to ``` 3. Run Pulsar consumer API to consume (and acknowledge) 100 messages using one single NB thread. ```bash - run driver=pulsar tags=phase:consumer cycles=100 service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml + run driver=pulsar tags=phase:consumer cycles=100 web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=/config.properties yaml=/pulsar.yaml ``` @@ -640,7 +642,7 @@ form. ### 2.2.1. Instancing Controls Normative usage of the Apache Pulsar API follows a strictly enforced -binding of topics to produces and consumers. As well, clients may be +binding of topics to producers and consumers. As well, clients may be customized with different behavior for advanced testing scenarios. There is a significant variety of messaging and concurrency schemes seen in modern architectures. Thus, it is important that testing tools rise to the