Admin API update - check tenant/namespace/topic existence before creating it.

This commit is contained in:
Yabin Meng 2021-03-31 20:18:00 -05:00
parent ca7fb7b839
commit a130ffb8dd
5 changed files with 238 additions and 29 deletions

View File

@ -4,10 +4,14 @@ import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class PulsarAdminCrtTennamOp extends SyncPulsarOp {
@ -49,28 +53,60 @@ public class PulsarAdminCrtTennamOp extends SyncPulsarOp {
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
if (!StringUtils.isBlank(tenant)) {
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {
tenantInfo.setAllowedClusters(allowedClusterSet);
}
else {
tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata());
}
Tenants tenants = pulsarAdmin.tenants();
// Check if the tenant already exists
TenantInfo tenantInfo = null;
try {
pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar tenant: " + tenant);
tenantInfo = pulsarAdmin.tenants().getTenantInfo(tenant);
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing
}
catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to retrieve tenant info. for pulsar tenant: " + tenant);
}
if (tenantInfo == null) {
tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {
tenantInfo.setAllowedClusters(allowedClusterSet);
} else {
tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata());
}
try {
tenants.createTenant(tenant, tenantInfo);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar tenant: " + tenant);
}
}
}
if (!StringUtils.isBlank(namespace)) {
Namespaces namespaces = pulsarAdmin.namespaces();
List<String> nsListWorkingArea = new ArrayList<>();
try {
pulsarAdmin.namespaces().createNamespace(tenant + "/" + namespace);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar namespace: " + tenant + "/" + namespace);
nsListWorkingArea = namespaces.getNamespaces(tenant);
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing
}
catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to retrieve namespace info. for pulsar tenant: " + tenant);
}
// If te specified namespace doesn't exist yet, create it!
String fullNsName = tenant + "/" + namespace;
if (nsListWorkingArea.isEmpty() || !nsListWorkingArea.contains(fullNsName)) {
try {
namespaces.createNamespace(fullNsName);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar namespace: " + fullNsName);
}
}
}
}

View File

@ -1,12 +1,16 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import java.util.ArrayList;
import java.util.List;
public class PulsarAdminCrtTopOp extends SyncPulsarOp {
@ -16,6 +20,7 @@ public class PulsarAdminCrtTopOp extends SyncPulsarOp {
private final String topicUri;
private final boolean partitionTopic;
private final int partitionNum;
private final String fullNsName;
public PulsarAdminCrtTopOp(PulsarSpace clientSpace,
String topicUri,
@ -25,6 +30,13 @@ public class PulsarAdminCrtTopOp extends SyncPulsarOp {
this.topicUri = topicUri;
this.partitionTopic = partitionTopic;
this.partitionNum = partitionNum;
// Get tenant/namespace string
// - topicUri : persistent://<tenant>/<namespace>/<topic>
// - tmpStr : <tenant>/<namespace>/<topic>
// - fullNsName : <tenant>/<namespace>
String tmpStr = StringUtils.substringAfter(this.topicUri,"://");
this.fullNsName = StringUtils.substringBeforeLast(tmpStr, "/");
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
@ -41,21 +53,40 @@ public class PulsarAdminCrtTopOp extends SyncPulsarOp {
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
List<String> topicListWorkingArea = new ArrayList<>();
try {
if (!partitionTopic) {
topics.createNonPartitionedTopic(topicUri);
topicListWorkingArea = topics.getList(fullNsName);
}
else {
topics.createPartitionedTopic(topicUri, partitionNum);
topicListWorkingArea = topics.getPartitionedTopicList(fullNsName);
}
} catch (PulsarAdminException e) {
String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d",
topicUri,
partitionTopic,
partitionNum);
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing
}
catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to retrieve topic info.for pulsar namespace: " + fullNsName);
}
processPulsarAdminException(e, errMsg);
// If the topic doesn't exist, create it.
if (topicListWorkingArea.isEmpty() || !topicListWorkingArea.contains(topicUri)) {
try {
if (!partitionTopic) {
topics.createNonPartitionedTopic(topicUri);
}
else {
topics.createPartitionedTopic(topicUri, partitionNum);
}
} catch (PulsarAdminException e) {
String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d",
topicUri,
partitionTopic,
partitionNum);
processPulsarAdminException(e, errMsg);
}
}
}
}

View File

@ -0,0 +1,34 @@
#
# Results:
# - 2 namespaces per tanant
# - 5 topics per namespace
#------------------------------------------------------
#tenant=tenant_0 namespace=default_0 core_topic_name=t0
#tenant=tenant_0 namespace=default_0 core_topic_name=t1
#tenant=tenant_0 namespace=default_0 core_topic_name=t2
#tenant=tenant_0 namespace=default_0 core_topic_name=t3
#tenant=tenant_0 namespace=default_0 core_topic_name=t4
#tenant=tenant_0 namespace=default_1 core_topic_name=t0
#tenant=tenant_0 namespace=default_1 core_topic_name=t1
#tenant=tenant_0 namespace=default_1 core_topic_name=t2
#tenant=tenant_0 namespace=default_1 core_topic_name=t3
#tenant=tenant_0 namespace=default_1 core_topic_name=t4
#tenant=tenant_1 namespace=default_0 core_topic_name=t0
#tenant=tenant_1 namespace=default_0 core_topic_name=t1
#tenant=tenant_1 namespace=default_0 core_topic_name=t2
#tenant=tenant_1 namespace=default_0 core_topic_name=t3
#tenant=tenant_1 namespace=default_0 core_topic_name=t4
#tenant=tenant_1 namespace=default_1 core_topic_name=t0
#tenant=tenant_1 namespace=default_1 core_topic_name=t1
#tenant=tenant_1 namespace=default_1 core_topic_name=t2
#tenant=tenant_1 namespace=default_1 core_topic_name=t3
#tenant=tenant_1 namespace=default_1 core_topic_name=t4
bindings:
# message key and value
#mykey: NumberNameToString()
#myvalue: AlphaNumericString(20)
# Admin API - create tenant, namespace, and topic
tenant: Mod(1000); Div(10L); ToString(); Prefix("tenant_")
namespace: Mod(10); Div(5L); ToString(); Prefix("default_")
core_topic_name: Mod(5); ToString(); Prefix("t")

View File

@ -0,0 +1,106 @@
description: |
Test workload for new pulsar driver.
There is no default scenario. You must specify one of the named scenarios
below like send100 or recv100.
You can specify the number of tenants like `tenants=100`. This is the default.
bindings:
# message key and value
mykey: NumberNameToString()
myvalue: AlphaNumericString(20)
# Admin API - create tenant, namespace, and topic
tenant: Mod(1000); Div(10L); ToString(); Prefix("tenant_")
namespace: Mod(10); Div(5L); ToString(); Prefix("default_")
core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types:
params:
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
async_api: "false"
blocks:
- name: create-tennam-block
tags:
phase: create-tenant-namespace
admin_task: true
statements:
- name: s1
optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}"
namespace: "{namespace}"
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
partition_num: "5"
- name: batch-producer-block
tags:
phase: batch-producer
admin_task: false
statements:
- name: s1
optype: batch-msg-send-start
# For batch producer, "producer_name" should be associated with batch start
# batch_producer_name: {batch_producer_name}
ratio: 1
- name: s2
optype: batch-msg-send
msg_key: "{mykey}"
msg_value: "{myvalue}"
ratio: 100
- name: s3
optype: batch-msg-send-end
ratio: 1
- name: producer-block
tags:
phase: producer
admin_task: false
statements:
- name: s1
optype: msg-send
# producer_name: {producer_name}
msg_key: "{mykey}"
msg_value: "{myvalue}"
- name: consumer-block
tags:
phase: consumer
admin_task: false
statements:
- name: s1
optype: msg-consume
topic_names:
topics_pattern:
subscription_name: "mynbsub_test"
subscription_type:
consumer_name:
- name: reader-block
tags:
phase: reader
admin_task: false
statements:
- name: s1
optype: msg-read
reader_name:
# - websocket-producer:
# tags:
# type: websocket-produer
# statements:
# - websocket-producer-stuff:
#
# - managed-ledger:
# tags:
# type: managed-ledger
# statement:
# - managed-ledger-stuff:

View File

@ -22,6 +22,8 @@
# 1. NoSQLBench (NB) Pulsar Driver Overview
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
* Admin API - create tenant and namespace
* Admin API - create topic, partition or not
* Producer
* Consumer
* Reader
@ -61,7 +63,7 @@ There are multiple sections in this file that correspond to different groups of
consuming. This section defines configuration settings that are
schema related.
* There are 2 valid options under this section.
* *shcema.type*: Pulsar message schema type. When unset or set as
* *schema.type*: Pulsar message schema type. When unset or set as
an empty string, Pulsar messages will be handled in raw *byte[]*
format. The other valid option is **avro** which the Pulsar
message will follow a specific Avro format.
@ -234,7 +236,7 @@ producers/consumers/readers/etc all at once within one NB activity. This
makes the testing more flexible and effective.
**NOTE**: when a configuration is set at both the global level and the
cycle level, **the ycle level setting will take priority!**
cycle level, **the cycle level setting will take priority!**
## 1.4. Pulsar Driver Yaml File - Command Block Details
@ -537,20 +539,20 @@ environment.
1. Run Pulsar producer API to produce 100K messages using 100 NB threads
```bash
<nb_cmd> run driver=pulsar tags=phase:producer threads=100 cycles=100K service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
<nb_cmd> run driver=pulsar tags=phase:producer threads=100 cycles=100K web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
```
2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads;
put NB execution metrics in a specified metrics folder
```bash
<nb_cmd> run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml --report-csv-to <metrics_folder_path>
<nb_cmd> run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml --report-csv-to <metrics_folder_path>
```
3. Run Pulsar consumer API to consume (and acknowledge) 100 messages using
one single NB thread.
```bash
<nb_cmd> run driver=pulsar tags=phase:consumer cycles=100 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
<nb_cmd> run driver=pulsar tags=phase:consumer cycles=100 web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
```
@ -640,7 +642,7 @@ form.
### 2.2.1. Instancing Controls
Normative usage of the Apache Pulsar API follows a strictly enforced
binding of topics to produces and consumers. As well, clients may be
binding of topics to producers and consumers. As well, clients may be
customized with different behavior for advanced testing scenarios. There
is a significant variety of messaging and concurrency schemes seen in
modern architectures. Thus, it is important that testing tools rise to the