Merge pull request #290 from yabinmeng/main

Document update to reflect Admin API change
This commit is contained in:
Jonathan Shook 2021-03-30 09:49:59 -05:00 committed by GitHub
commit bb1f59b07a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 306 additions and 120 deletions

View File

@ -20,7 +20,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver { public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -77,18 +83,28 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr); boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) { if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder = adminBuilder.authentication(authPluginClassName, authParams); // String tokenFileName = StringUtils.removeStart(authParams, "file://");
// File tokenFile = new File(tokenFileName);
// String token;
// try {
// token = FileUtils.readFileToString(tokenFile, StandardCharsets.UTF_8);
// token = StringUtils.normalizeSpace(token);
// }
// catch (IOException ioe) {
// throw new RuntimeException("Failed to read the specified (\"client.authParams\") token file: " + tokenFileName + "!");
// }
// adminBuilder.authentication(AuthenticationFactory.token(token));
adminBuilder.authentication(authPluginClassName, authParams);
} }
if ( useTls ) { if ( useTls ) {
adminBuilder = adminBuilder adminBuilder
.useKeyStoreTls(useTls) .useKeyStoreTls(useTls)
.allowTlsInsecureConnection(tlsAllowInsecureConnection) .allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable); .enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath)) if (!StringUtils.isBlank(tlsTrustCertsFilePath))
adminBuilder = adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
} }
pulsarAdmin = adminBuilder.build(); pulsarAdmin = adminBuilder.build();

View File

@ -6,6 +6,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.*;
@ -58,11 +59,14 @@ public class PulsarSpace {
createPulsarSchemaFromConf(); createPulsarSchemaFromConf();
try { try {
List<String> stringList = pulsarAdmin.clusters().getClusters(); Clusters clusters = pulsarAdmin.clusters();
List<String> stringList = clusters.getClusters();
CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator()); CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator());
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get Pulsar cluster metadata!"); String errMsg = "Fail to create PulsarClient from global configuration: " + e.getMessage();
logger.error(errMsg);
throw new RuntimeException(errMsg);
} }
} }
@ -79,8 +83,9 @@ public class PulsarSpace {
.serviceUrl(pulsarSvcUrl) .serviceUrl(pulsarSvcUrl)
.build(); .build();
} catch (PulsarClientException pce) { } catch (PulsarClientException pce) {
logger.error("Fail to create PulsarClient from global configuration!"); String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage();
throw new RuntimeException("Fail to create PulsarClient from global configuration!"); logger.error(errMsg);
throw new RuntimeException(errMsg);
} }
} }

View File

@ -2,7 +2,6 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate; import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.Set; import java.util.Set;
import java.util.function.LongFunction; import java.util.function.LongFunction;
@ -17,18 +16,18 @@ import java.util.function.LongFunction;
* *
* For additional parameterization, the command template is also provided. * For additional parameterization, the command template is also provided.
*/ */
public class PulsarAdminMapper extends PulsarOpMapper { public class PulsarAdminCrtTennamMapper extends PulsarOpMapper {
private final LongFunction<Set<String>> adminRolesFunc; private final LongFunction<Set<String>> adminRolesFunc;
private final LongFunction<Set<String>> allowedClustersFunc; private final LongFunction<Set<String>> allowedClustersFunc;
private final LongFunction<String> tenantFunc; private final LongFunction<String> tenantFunc;
private final LongFunction<String> namespaceFunc; private final LongFunction<String> namespaceFunc;
public PulsarAdminMapper(CommandTemplate cmdTpl, public PulsarAdminCrtTennamMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace, PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc, LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc, LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc, LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) { LongFunction<String> namespaceFunc) {
super(cmdTpl, clientSpace); super(cmdTpl, clientSpace);
this.adminRolesFunc = adminRolesFunc; this.adminRolesFunc = adminRolesFunc;
this.allowedClustersFunc = allowedClustersFunc; this.allowedClustersFunc = allowedClustersFunc;
@ -43,7 +42,7 @@ public class PulsarAdminMapper extends PulsarOpMapper {
String tenant = tenantFunc.apply(value); String tenant = tenantFunc.apply(value);
String namespace = namespaceFunc.apply(value); String namespace = namespaceFunc.apply(value);
return new PulsarAdminOp( return new PulsarAdminCrtTennamOp(
clientSpace, clientSpace,
adminRoleSet, adminRoleSet,
allowedClusterSet, allowedClusterSet,

View File

@ -1,27 +1,18 @@
package io.nosqlbench.driver.pulsar.ops; package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminOp implements PulsarOp { public class PulsarAdminCrtTennamOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class); private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTennamOp.class);
private final PulsarSpace clientSpace; private final PulsarSpace clientSpace;
private final Set<String> adminRoleSet; private final Set<String> adminRoleSet;
@ -29,11 +20,11 @@ public class PulsarAdminOp implements PulsarOp {
private final String tenant; private final String tenant;
private final String namespace; private final String namespace;
public PulsarAdminOp(PulsarSpace clientSpace, public PulsarAdminCrtTennamOp(PulsarSpace clientSpace,
Set<String> adminRoleSet, Set<String> adminRoleSet,
Set<String> allowedClusterSet, Set<String> allowedClusterSet,
String tenant, String tenant,
String namespace) { String namespace) {
this.clientSpace = clientSpace; this.clientSpace = clientSpace;
this.adminRoleSet = adminRoleSet; this.adminRoleSet = adminRoleSet;
this.allowedClusterSet = allowedClusterSet; this.allowedClusterSet = allowedClusterSet;

View File

@ -0,0 +1,66 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminCrtTopMapper extends PulsarOpMapper {
private final LongFunction<String> topicUriFunc;
private final LongFunction<String> enablePartionFunc;
private final LongFunction<String> partitionNumFunc;
public PulsarAdminCrtTopMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<String> topicUriFunc,
LongFunction<String> enablePartionFunc,
LongFunction<String> partitionNumFunc) {
super(cmdTpl, clientSpace);
this.topicUriFunc = topicUriFunc;
this.enablePartionFunc = enablePartionFunc;
this.partitionNumFunc = partitionNumFunc;
}
@Override
public PulsarOp apply(long value) {
String topicUri = topicUriFunc.apply(value);
String enablePartitionStr = enablePartionFunc.apply(value);
String partitionNumStr = partitionNumFunc.apply(value);
if ( StringUtils.isBlank(topicUri) ) {
throw new RuntimeException("\"topic_uri\" parameter can't be empty when creating a Pulsar topic!");
}
boolean partitionTopic = BooleanUtils.toBoolean(enablePartitionStr);
boolean invalidPartStr;
int partitionNum = 0;
if ( StringUtils.isBlank(partitionNumStr) || !StringUtils.isNumeric(partitionNumStr) ) {
invalidPartStr = true;
} else {
partitionNum = Integer.valueOf(partitionNumStr);
invalidPartStr = (partitionNum <= 0);
}
if (partitionTopic && invalidPartStr) {
throw new RuntimeException("Invalid specified value for \"partition_num\" parameter when creating partitioned topic!");
}
return new PulsarAdminCrtTopOp(
clientSpace,
topicUri,
partitionTopic,
partitionNum);
}
}

View File

@ -0,0 +1,60 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
public class PulsarAdminCrtTopOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTopOp.class);
private final PulsarSpace clientSpace;
private final String topicUri;
private final boolean partitionTopic;
private final int partitionNum;
public PulsarAdminCrtTopOp(PulsarSpace clientSpace,
String topicUri,
boolean partitionTopic,
int partitionNum) {
this.clientSpace = clientSpace;
this.topicUri = topicUri;
this.partitionTopic = partitionTopic;
this.partitionNum = partitionNum;
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
int statusCode = e.getStatusCode();
// 409 conflict: resource already exists
if ( (statusCode >= 400) && (statusCode != 409) ) {
throw new RuntimeException(finalErrMsg);
}
}
@Override
public void run() {
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
try {
if (!partitionTopic) {
topics.createNonPartitionedTopic(topicUri);
}
else {
topics.createPartitionedTopic(topicUri, partitionNum);
}
} catch (PulsarAdminException e) {
String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d",
topicUri,
partitionTopic,
partitionNum);
processPulsarAdminException(e, errMsg);
}
}
}

View File

@ -75,53 +75,32 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
} }
// Global parameter: topic_uri (applies only to non-Admin API) // Global parameter: topic_uri
LongFunction<String> topicUriFunc = (l) -> null; LongFunction<String> topicUriFunc = (l) -> null;
// Global parameter: async_api (applies only to non-Admin API)
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (!StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label)) { if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsKey("topic_uri")) { if (cmdTpl.isStatic("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) { topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (cmdTpl.containsKey("topic")) {
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topicUriFunc = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topicUriFunc = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "");
}
}
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
} else { } else {
asyncApiFunc = (l) -> false; topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
} }
} }
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
}
// TODO: Complete implementation for websocket-producer and managed-ledger // TODO: Complete implementation for websocket-producer and managed-ledger
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label) ) { if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TENNAME.label) ) {
return resolveAdminRequest(clientSpace); return resolveAdminCrtTenname(clientSpace);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TOP.label)) {
return resolveAdminCrtParttop(clientSpace, topicUriFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) { } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc); return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
@ -139,7 +118,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
} }
} }
private LongFunction<PulsarOp> resolveAdminRequest(PulsarSpace clientSpace) { // Admin API: create tenant and namespace
private LongFunction<PulsarOp> resolveAdminCrtTenname(PulsarSpace clientSpace) {
if ( cmdTpl.isDynamic("admin_roles") || if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) { cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!"); throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
@ -187,7 +167,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
namespaceFunc = (l) -> null; namespaceFunc = (l) -> null;
} }
return new PulsarAdminMapper( return new PulsarAdminCrtTennamMapper(
cmdTpl, cmdTpl,
clientSpace, clientSpace,
adminRolesFunc, adminRolesFunc,
@ -196,6 +176,33 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
namespaceFunc); namespaceFunc);
} }
// Admin API: create partitioned topic
private LongFunction<PulsarOp> resolveAdminCrtParttop(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_fun
) {
LongFunction<String> enablePartionFunc = (l) -> null;
if (cmdTpl.isStatic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getStatic("enable_partition");
} else if (cmdTpl.isDynamic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getDynamic("enable_partition", l);
}
LongFunction<String> partitionNumFunc = (l) -> null;
if (cmdTpl.isStatic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getStatic("partition_num");
} else if (cmdTpl.isDynamic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getDynamic("partition_num", l);
}
return new PulsarAdminCrtTopMapper(
cmdTpl,
clientSpace,
topic_uri_fun,
enablePartionFunc,
partitionNumFunc);
}
private LongFunction<PulsarOp> resolveMsgSend( private LongFunction<PulsarOp> resolveMsgSend(
PulsarSpace clientSpace, PulsarSpace clientSpace,
LongFunction<String> topic_uri_func, LongFunction<String> topic_uri_func,

View File

@ -26,7 +26,8 @@ public class PulsarActivityUtil {
// Supported message operation types // Supported message operation types
// TODO: websocket-producer and managed-ledger // TODO: websocket-producer and managed-ledger
public enum OP_TYPES { public enum OP_TYPES {
ADMIN("admin"), ADMIN_CRT_TENNAME("admin-crt-tennam"),
ADMIN_CRT_TOP("admin-crt-top"),
BATCH_MSG_SEND_START("batch-msg-send-start"), BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"), BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"), BATCH_MSG_SEND_END("batch-msg-send-end"),
@ -68,7 +69,7 @@ public class PulsarActivityUtil {
public enum CLNT_CONF_KEY { public enum CLNT_CONF_KEY {
serviceUrl("serviceUrl"), serviceUrl("serviceUrl"),
authPulginClassName("authPluginClassName"), authPulginClassName("authPluginClassName"),
authParams("AuthParams"), authParams("authParams"),
pperationTimeoutMs("operationTimeoutMs"), pperationTimeoutMs("operationTimeoutMs"),
statsIntervalSeconds("statsIntervalSeconds"), statsIntervalSeconds("statsIntervalSeconds"),
numIoThreads("numIoThreads"), numIoThreads("numIoThreads"),

View File

@ -19,20 +19,32 @@ params:
async_api: "false" async_api: "false"
blocks: blocks:
- name: admin-block - name: create-tennam-block
tags: tags:
phase: admin-api phase: create-tenant-namespace
admin_task: true
statements: statements:
- name: s1 - name: s1
optype: admin optype: admin-crt-tennam
admin_roles: admin_roles:
allowed_clusters: allowed_clusters:
tenant: "{tenant}" tenant: "{tenant}"
namespace: "default" namespace: "default"
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
partition_num: "5"
- name: batch-producer-block - name: batch-producer-block
tags: tags:
phase: batch-producer phase: batch-producer
admin_task: false
statements: statements:
- name: s1 - name: s1
optype: batch-msg-send-start optype: batch-msg-send-start
@ -57,6 +69,7 @@ blocks:
- name: producer-block - name: producer-block
tags: tags:
phase: producer phase: producer
admin_task: false
statements: statements:
- name: s1 - name: s1
optype: msg-send optype: msg-send
@ -73,6 +86,7 @@ blocks:
- name: consumer-block - name: consumer-block
tags: tags:
phase: consumer phase: consumer
admin_task: false
statements: statements:
- name: s1 - name: s1
optype: msg-consume optype: msg-consume
@ -85,6 +99,7 @@ blocks:
- name: reader-block - name: reader-block
tags: tags:
phase: reader phase: reader
admin_task: false
statements: statements:
- name: s1 - name: s1
optype: msg-read optype: msg-read

View File

@ -4,15 +4,16 @@
- [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure)
- [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters) - [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters)
- [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details) - [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details)
- [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block) - [1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace](#141-pulsar-admin-api-command-block---create-tenant-and-namespace)
- [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block) - [1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)](#142-pulsar-admin-api-command-block---create-topic-partitioned-or-regular)
- [1.4.3. Producer Command Block](#143-producer-command-block) - [1.4.3. Batch Producer Command Block](#143-batch-producer-command-block)
- [1.4.4. Consumer Command Block](#144-consumer-command-block) - [1.4.4. Producer Command Block](#144-producer-command-block)
- [1.4.5. Reader Command Block](#145-reader-command-block) - [1.4.5. Consumer Command Block](#145-consumer-command-block)
- [1.4.6. Reader Command Block](#146-reader-command-block)
- [1.5. Schema Support](#15-schema-support) - [1.5. Schema Support](#15-schema-support)
- [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters) - [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters)
- [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example) - [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example)
- [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) - [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties)
- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features)
- [2.1. Other Activity Parameters](#21-other-activity-parameters) - [2.1. Other Activity Parameters](#21-other-activity-parameters)
- [2.2. API Caching](#22-api-caching) - [2.2. API Caching](#22-api-caching)
@ -72,25 +73,25 @@ There are multiple sections in this file that correspond to different groups of
* This section defines all configuration settings that are related * This section defines all configuration settings that are related
with defining a PulsarClient object. with defining a PulsarClient object.
* *
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
* **Pulsar Producer related settings**: * **Pulsar Producer related settings**:
* All settings under this section starts with **producer** prefix. * All settings under this section starts with **producer** prefix.
* This section defines all configuration settings that are related * This section defines all configuration settings that are related
with defining a Pulsar Producer object. with defining a Pulsar Producer object.
* *
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
* **Pulsar Consumer related settings**: * **Pulsar Consumer related settings**:
* All settings under this section starts with **consumer** prefix. * All settings under this section starts with **consumer** prefix.
* This section defines all configuration settings that are related * This section defines all configuration settings that are related
with defining a Pulsar Consumer object. with defining a Pulsar Consumer object.
* *
See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
* **Pulsar Reader related settings**: * **Pulsar Reader related settings**:
* All settings under this section starts with **reader** prefix. * All settings under this section starts with **reader** prefix.
* This section defines all configuration settings that are related * This section defines all configuration settings that are related
with defining a Pulsar Reader object. with defining a Pulsar Reader object.
* *
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
In the future, when the support for other types of Pulsar workloads is In the future, when the support for other types of Pulsar workloads is
added in NB Pulsar driver, there will be corresponding configuration added in NB Pulsar driver, there will be corresponding configuration
@ -141,10 +142,10 @@ params:
blocks: blocks:
- name: <command_block_1> - name: <command_block_1>
tags: tags:
phase: <command_bock_filtering_identifier> phase: <command_bock_identifier>
statements: statements:
- name: <statement_name_1> - name: <statement_name_1>
optype: <statement_filtering_identifier> optype: <statement_identifier>
... <statement_specific_parameters> ... ... <statement_specific_parameters> ...
- name: <statement_name_2> - name: <statement_name_2>
... ... ... ...
@ -237,43 +238,68 @@ cycle level, **the ycle level setting will take priority!**
## 1.4. Pulsar Driver Yaml File - Command Block Details ## 1.4. Pulsar Driver Yaml File - Command Block Details
### 1.4.1. Pulsar Admin API Command Block ### 1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace
**NOTE**: this functionality is only partially implemented at the moment This Pulsar Admin API Block is used to create Pulsar tenants and namespaces. It has the following format:
and doesn't function yet.
Currently, the Pulsar Admin API Block is (planned) to only support
creating Pulsar tenants and namespaces. It has the following format:
```yaml ```yaml
- name: admin-block - name: create-tennam-block
tags: tags:
phase: create-tenant-namespace phase: create-tenant-namespace
admin_task: true
statements: statements:
- name: s1 - name: s1
optype: create-tenant optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}" tenant: "{tenant}"
- name: s2 namespace: "default"
optype: create-namespace
namespace: "{namespace}"
``` ```
In this command block, there are 2 statements (s1 and s2): In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar tenant * Statement **s1** is used for creating a Pulsar tenant and a namespace
* (Mandatory) **optype (create-tenant)** is the statement identifier * (Mandatory) **optype (admin-crt-tennam)** is the statement identifier
for this statement for this statement
* (Mandatory) **tenant** is the only statement parameter that * (Optional) **allowed_clusters** must be statically bound and it
specifies the Pulsar tenant name which can either be dynamically specifies the cluster list that is allowed for a tenant.
bound or statically assigned. * (Optional) **admin_roles** must be statically bound and it specifies
* Statement **s2** is used for creating a Pulsar namespace the super user role that is associated with a tenant.
* (Mandatory) **optype (create-namespace)** is the statement * (Mandatory) **tenant** is the Pulsar tenant name to be created. It
identifier for this statement can either be dynamically or statically bound.
* (Mandatory) **namespace** is the only statement parameter that * (Mandatory) **namespace** is the Pulsar namespace name to be created
specifies the Pulsar namespace under the tenant created by statement under the above tenant. It also can be dynamically or statically bound.
s1. Its name can either be dynamically bound or statically assigned.
### 1.4.2. Batch Producer Command Block ### 1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)
This Pulsar Admin API Block is used to create Pulsar topics. It has the following format:
```yaml
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
partition_num: "5"
```
In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar tenant and a namespace
* (Mandatory) **optype (admin-crt-top)** is the statement identifier
for this statement
* (Mandatory) **enable_partition** specifies whether to create a
partitioned topic. It can either be dynamically or statically bound.
* (Mandatory) **partition_num** specifies the number of partitions if
a partitioned topic is to be created. It also can be dynamically or
statically bound.
**NOTE**: The topic name is bounded by the document level parameter "topic_uri".
### 1.4.3. Batch Producer Command Block
Batch producer command block is used to produce a batch of messages all at Batch producer command block is used to produce a batch of messages all at
once by one NB cycle execution. A typical format of this command block is once by one NB cycle execution. A typical format of this command block is
@ -334,7 +360,7 @@ ratios: 1, <batch_num>, 1.
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it * (Optional) **ratio**, when provided, MUST be 1. If not provided, it
is default to 1. is default to 1.
### 1.4.3. Producer Command Block ### 1.4.4. Producer Command Block
This is the regular Pulsar producer command block that produces one Pulsar This is the regular Pulsar producer command block that produces one Pulsar
message per NB cycle execution. A typical format of this command block is message per NB cycle execution. A typical format of this command block is
@ -370,7 +396,7 @@ This command block only has 1 statements (s1):
* (Mandatory) **msg_payload** specifies the payload of the generated * (Mandatory) **msg_payload** specifies the payload of the generated
message message
### 1.4.4. Consumer Command Block ### 1.4.5. Consumer Command Block
This is the regular Pulsar consumer command block that consumes one Pulsar This is the regular Pulsar consumer command block that consumes one Pulsar
message per NB cycle execution. A typical format of this command block is message per NB cycle execution. A typical format of this command block is
@ -410,7 +436,7 @@ This command block only has 1 statements (s1):
**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**. **NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**.
### 1.4.5. Reader Command Block ### 1.4.6. Reader Command Block
This is the regular Pulsar reader command block that reads one Pulsar This is the regular Pulsar reader command block that reads one Pulsar
message per NB cycle execution. A typical format of this command block is message per NB cycle execution. A typical format of this command block is
@ -528,7 +554,7 @@ environment.
``` ```
## Appendix A. Template Global Setting File (config.properties) ## 1.8. Appendix A. Template Global Setting File (config.properties)
```properties ```properties
schema.type = schema.type =
schema.definition = schema.definition =