Add support for Pulsar Admin API, currently only supporting:

- create tenant
- create namespace
This commit is contained in:
Yabin Meng 2021-03-25 16:08:58 -05:00
parent a725bcc11d
commit 5fd0992e30
10 changed files with 396 additions and 99 deletions

View File

@ -18,7 +18,7 @@
</description>
<properties>
<pulsar.version>2.7.0</pulsar.version>
<pulsar.version>2.7.1</pulsar.version>
</properties>
<dependencies>
@ -31,13 +31,11 @@
<version>${pulsar.version}</version>
</dependency>
<!-- &lt;!&ndash; https://mvnrepository.com/artifact/org.apache.avro/avro &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.apache.avro</groupId>-->
<!-- <artifactId>avro</artifactId>-->
<!-- <version>1.10.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>

View File

@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
@ -11,8 +12,13 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -21,9 +27,13 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer bindTimer;
public Timer executeTimer;
private PulsarSpaceCache pulsarCache;
private PulsarAdmin pulsarAdmin;
private PulsarNBClientConf clientConf;
private String serviceUrl;
// e.g. pulsar://localhost:6650
private String pulsarSvcUrl;
// e.g. http://localhost:8080
private String webSvcUrl;
private NBErrorHandler errorhandler;
private OpSequence<OpDispenser<PulsarOp>> sequencer;
@ -35,6 +45,55 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
super(activityDef);
}
private void initPulsarAdmin() {
PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder()
.serviceHttpUrl(webSvcUrl);
try {
String authPluginClassName =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label);
String authParams =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label);
String useTlsStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label);
boolean useTls = BooleanUtils.toBoolean(useTlsStr);
String tlsTrustCertsFilePath =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
String tlsAllowInsecureConnectionStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
String tlsHostnameVerificationEnableStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder = adminBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
adminBuilder = adminBuilder
.useKeyStoreTls(useTls)
.allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
adminBuilder = adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
pulsarAdmin = adminBuilder.build();
} catch (PulsarClientException e) {
logger.error("Fail to create PulsarAdmin from global configuration!");
throw new RuntimeException("Fail to create PulsarAdmin from global configuration!");
}
}
@Override
public void initActivity() {
super.initActivity();
@ -42,10 +101,16 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties");
String pulsarClntConfFile =
activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile);
serviceUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
pulsarSvcUrl =
activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
webSvcUrl =
activityDef.getParams().getOptionalString("web_url").orElse("pulsar://localhost:8080");
initPulsarAdmin();
pulsarCache = new PulsarSpaceCache(this);
@ -72,10 +137,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
return clientConf;
}
public String getPulsarServiceUrl() {
return serviceUrl;
public String getPulsarSvcUrl() {
return pulsarSvcUrl;
}
public String getWebSvcUrl() { return webSvcUrl; }
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
public Timer getBindTimer() {
return bindTimer;
}

View File

@ -2,14 +2,15 @@ package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@ -27,23 +28,41 @@ public class PulsarSpace {
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
protected final String spaceName;
protected final PulsarNBClientConf pulsarNBClientConf;
protected final String pulsarSvcUrl;
private final String spaceName;
private final PulsarNBClientConf pulsarNBClientConf;
private final String pulsarSvcUrl;
private final String webSvcUrl;
private final PulsarAdmin pulsarAdmin;
protected PulsarClient pulsarClient = null;
protected Schema<?> pulsarSchema = null;
private final Set<String> pulsarClusterMetadata = new HashSet<>();
public PulsarSpace(String name, PulsarNBClientConf pulsarClientConf, String pulsarSvcUrl) {
private PulsarClient pulsarClient = null;
private Schema<?> pulsarSchema = null;
public PulsarSpace(String name,
PulsarNBClientConf pulsarClientConf,
String pulsarSvcUrl,
String webSvcUrl,
PulsarAdmin pulsarAdmin) {
this.spaceName = name;
this.pulsarNBClientConf = pulsarClientConf;
this.pulsarSvcUrl = pulsarSvcUrl;
this.webSvcUrl = webSvcUrl;
this.pulsarAdmin = pulsarAdmin;
createPulsarClientFromConf();
createPulsarSchemaFromConf();
try {
List<String> stringList = pulsarAdmin.clusters().getClusters();
CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator());
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get Pulsar cluster metadata!");
}
}
protected void createPulsarClientFromConf() {
private void createPulsarClientFromConf() {
ClientBuilder clientBuilder = PulsarClient.builder();
try {
@ -61,7 +80,7 @@ public class PulsarSpace {
}
}
protected void createPulsarSchemaFromConf() {
private void createPulsarSchemaFromConf() {
Object value = pulsarNBClientConf.getSchemaConfValue("schema.type");
String schemaType = (value != null) ? value.toString() : "";
@ -77,9 +96,7 @@ public class PulsarSpace {
}
}
public PulsarClient getPulsarClient() {
return pulsarClient;
}
public PulsarClient getPulsarClient() { return pulsarClient; }
public PulsarNBClientConf getPulsarClientConf() {
return pulsarNBClientConf;
@ -89,6 +106,15 @@ public class PulsarSpace {
return pulsarSchema;
}
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
public String getPulsarSvcUrl() {
return pulsarSvcUrl;
}
public String getWebSvcUrl() { return webSvcUrl; }
public Set<String> getPulsarClusterMetadata() { return pulsarClusterMetadata; }
//////////////////////////////////////
// Producer Processing --> start
@ -252,7 +278,8 @@ public class PulsarSpace {
if (!StringUtils.isBlank(effectiveSubscriptionStr)) {
if (!PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionStr)) {
throw new RuntimeException("Consumer::Invalid subscription type: " + effectiveSubscriptionStr);
throw new RuntimeException("Consumer::Invalid subscription type (\"" +
effectiveSubscriptionStr + "\"). \nValid subscription types: " + PulsarActivityUtil.getValidSubscriptionTypeList());
} else {
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
}

View File

@ -1,8 +1,5 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -25,7 +22,13 @@ public class PulsarSpaceCache {
public PulsarSpace getPulsarSpace(String name) {
return clientScopes.computeIfAbsent(name, spaceName ->
new PulsarSpace(spaceName, activity.getPulsarConf(), activity.getPulsarServiceUrl()));
new PulsarSpace(
spaceName,
activity.getPulsarConf(),
activity.getPulsarSvcUrl(),
activity.getWebSvcUrl(),
activity.getPulsarAdmin()
));
}
public PulsarActivity getActivity() {

View File

@ -0,0 +1,53 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.Set;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminMapper extends PulsarOpMapper {
private final LongFunction<Set<String>> adminRolesFunc;
private final LongFunction<Set<String>> allowedClustersFunc;
private final LongFunction<String> tenantFunc;
private final LongFunction<String> namespaceFunc;
public PulsarAdminMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) {
super(cmdTpl, clientSpace);
this.adminRolesFunc = adminRolesFunc;
this.allowedClustersFunc = allowedClustersFunc;
this.tenantFunc = tenantFunc;
this.namespaceFunc = namespaceFunc;
}
@Override
public PulsarOp apply(long value) {
Set<String> adminRoleSet = adminRolesFunc.apply(value);
Set<String> allowedClusterSet = allowedClustersFunc.apply(value);
String tenant = tenantFunc.apply(value);
String namespace = namespaceFunc.apply(value);
return new PulsarAdminOp(
clientSpace,
adminRoleSet,
allowedClusterSet,
tenant,
namespace);
}
}

View File

@ -0,0 +1,86 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class);
private final PulsarSpace clientSpace;
private final Set<String> adminRoleSet;
private final Set<String> allowedClusterSet;
private final String tenant;
private final String namespace;
public PulsarAdminOp(PulsarSpace clientSpace,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant,
String namespace) {
this.clientSpace = clientSpace;
this.adminRoleSet = adminRoleSet;
this.allowedClusterSet = allowedClusterSet;
this.tenant = tenant;
this.namespace = namespace;
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
int statusCode = e.getStatusCode();
// 409 conflict: resource already exists
if ( (statusCode >= 400) && (statusCode != 409) ) {
throw new RuntimeException(finalErrMsg);
}
}
@Override
public void run() {
if (StringUtils.isBlank(tenant) && !StringUtils.isBlank(namespace)) {
throw new RuntimeException("Can't create a namespace without a tenant!");
}
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
if (!StringUtils.isBlank(tenant)) {
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {
tenantInfo.setAllowedClusters(allowedClusterSet);
}
else {
tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata());
}
try {
pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar tenant: " + tenant);
}
}
if (!StringUtils.isBlank(namespace)) {
try {
pulsarAdmin.namespaces().createNamespace(tenant + "/" + namespace);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar namespace: " + tenant + "/" + namespace);
}
}
}
}

View File

@ -11,6 +11,9 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@ -61,64 +64,63 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private LongFunction<PulsarOp> resolve() {
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
// Global parameter: topic_uri
LongFunction<String> topicUriFunc;
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (cmdTpl.containsKey("topic")) {
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topicUriFunc = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topicUriFunc = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "");
}
} else {
topicUriFunc = (l) -> null;
}
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc;
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
} else {
asyncApiFunc = (l) -> false;
}
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
// Global parameter: topic_uri (applies only to non-Admin API)
LongFunction<String> topicUriFunc = (l) -> null;
// Global parameter: async_api (applies only to non-Admin API)
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (!StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label)) {
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (cmdTpl.containsKey("topic")) {
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topicUriFunc = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topicUriFunc = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "");
}
}
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
} else {
asyncApiFunc = (l) -> false;
}
}
// TODO: Complete implementation for websocket-producer and managed-ledger
if /*( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_TENANT.label) ) {
return resolveCreateTenant(clientSpace);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_NAMESPACE.label) ) {
return resolveCreateNameSpace(clientSpace);
} else if*/ (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label) ) {
return resolveAdminRequest(clientSpace);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc);
@ -135,6 +137,63 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
}
private LongFunction<PulsarOp> resolveAdminRequest(PulsarSpace clientSpace) {
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
}
LongFunction<Set<String>> adminRolesFunc;
Set<String> roleSet = new HashSet<>();
if (cmdTpl.isStatic("admin_roles")) {
// "admin_roles" includes comma-separated admin roles:
// e.g. role1, role2
String adminRolesStr = cmdTpl.getStatic("admin_roles");
String[] roleArr = adminRolesStr.split(",");
Set<String> stringSet = new HashSet<>(Arrays.asList(roleArr));
roleSet.addAll(stringSet);
}
adminRolesFunc = (l) -> roleSet;
LongFunction<Set<String>> allowedClustersFunc;
Set<String> clusterSet = new HashSet<>();
if (cmdTpl.isStatic("allowed_clusters")) {
// "allowed_cluster" includes comma-separated cluster names:
// e.g. cluster1, cluster2
String allowedClustersStr = cmdTpl.getStatic("allowed_clusters");
String[] clusterArr = allowedClustersStr.split(",");
Set<String> stringSet = new HashSet<>(Arrays.asList(clusterArr));
clusterSet.addAll(stringSet);
}
allowedClustersFunc = (l) -> clusterSet;
LongFunction<String> tenantFunc;
if (cmdTpl.isStatic("tenant")) {
tenantFunc = (l) -> cmdTpl.getStatic("tenant");
} else if (cmdTpl.isDynamic("tenant")) {
tenantFunc = (l) -> cmdTpl.getDynamic("tenant", l);
} else {
tenantFunc = (l) -> null;
}
LongFunction<String> namespaceFunc;
if (cmdTpl.isStatic("namespace")) {
namespaceFunc = (l) -> cmdTpl.getStatic("namespace");
} else if (cmdTpl.isDynamic("namespace")) {
namespaceFunc = (l) -> cmdTpl.getDynamic("namespace", l);
} else {
namespaceFunc = (l) -> null;
}
return new PulsarAdminMapper(
cmdTpl,
clientSpace,
adminRolesFunc,
allowedClustersFunc,
tenantFunc,
namespaceFunc);
}
private LongFunction<PulsarOp> resolveMsgSend(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,

View File

@ -17,6 +17,7 @@ import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.stream.Collectors;
public class PulsarActivityUtil {
@ -25,8 +26,7 @@ public class PulsarActivityUtil {
// Supported message operation types
// TODO: websocket-producer and managed-ledger
public enum OP_TYPES {
CREATE_TENANT("create-tenant"),
CREATE_NAMESPACE("create-namespace"),
ADMIN("admin"),
BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"),
@ -163,10 +163,10 @@ public class PulsarActivityUtil {
}
public enum SUBSCRIPTION_TYPE {
exclusive("exclusive"),
failover("failover"),
shared("shared"),
key_shared("key_shared");
Exclusive("Exclusive"),
Failover("Failover"),
Shared("Shared"),
Key_Shared("Key_Shared");
public final String label;
@ -176,7 +176,10 @@ public class PulsarActivityUtil {
}
public static boolean isValidSubscriptionType(String item) {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item));
}
public static String getValidSubscriptionTypeList() {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(Object::toString).collect(Collectors.joining(", "));
}
///////

View File

@ -46,8 +46,7 @@ public class PulsarNBClientConf {
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
.configure(params.properties()
.setFileName(fileName)
.setListDelimiterHandler(new DefaultListDelimiterHandler(',')));
.setFileName(fileName));
Configuration config = builder.getConfiguration();

View File

@ -10,7 +10,7 @@ bindings:
# sensor_type:
reading_time: ToDateTime();
reading_value: ToFloat(100);
topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L));
# tenant:
# document level parameters that apply to all Pulsar client types:
params:
@ -21,14 +21,14 @@ params:
blocks:
- name: admin-block
tags:
phase: create-tenant-namespace
phase: admin-api
statements:
- name: s1
optype: create-tenant
optype: admin
admin_roles:
allowed_clusters:
tenant: "{tenant}"
- name: s2
optype: create-namespace
namespace: "{namespace}"
namespace: "default"
- name: batch-producer-block
tags: