Merge pull request #287 from yabinmeng/main

Add Admin API support in NB Pulsar driver + merge latest changes from Enrico
This commit is contained in:
Jonathan Shook 2021-03-26 10:26:21 -05:00 committed by GitHub
commit 865c772e0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 458 additions and 146 deletions

View File

@ -18,7 +18,7 @@
</description> </description>
<properties> <properties>
<pulsar.version>2.7.0</pulsar.version> <pulsar.version>2.7.1</pulsar.version>
</properties> </properties>
<dependencies> <dependencies>
@ -31,13 +31,11 @@
<version>${pulsar.version}</version> <version>${pulsar.version}</version>
</dependency> </dependency>
<!-- &lt;!&ndash; https://mvnrepository.com/artifact/org.apache.avro/avro &ndash;&gt;--> <dependency>
<!-- <dependency>--> <groupId>org.apache.pulsar</groupId>
<!-- <groupId>org.apache.avro</groupId>--> <artifactId>pulsar-client-admin</artifactId>
<!-- <artifactId>avro</artifactId>--> <version>${pulsar.version}</version>
<!-- <version>1.10.0</version>--> </dependency>
<!-- </dependency>-->
<dependency> <dependency>
<groupId>io.nosqlbench</groupId> <groupId>io.nosqlbench</groupId>
@ -64,12 +62,6 @@
<artifactId>commons-configuration2</artifactId> <artifactId>commons-configuration2</artifactId>
<version>2.7</version> <version>2.7</version>
</dependency> </dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.10</version>
<scope>compile</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro --> <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency> <dependency>
@ -78,6 +70,14 @@
<version>1.10.1</version> <version>1.10.1</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -5,6 +5,7 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp; import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
@ -13,8 +14,13 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics; import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver { public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -24,10 +30,15 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer executeTimer; public Timer executeTimer;
public Counter bytesCounter; public Counter bytesCounter;
public Histogram messagesizeHistogram; public Histogram messagesizeHistogram;
private PulsarSpaceCache pulsarCache; private PulsarSpaceCache pulsarCache;
private PulsarAdmin pulsarAdmin;
private PulsarNBClientConf clientConf; private PulsarNBClientConf clientConf;
private String serviceUrl; // e.g. pulsar://localhost:6650
private String pulsarSvcUrl;
// e.g. http://localhost:8080
private String webSvcUrl;
private NBErrorHandler errorhandler; private NBErrorHandler errorhandler;
private OpSequence<OpDispenser<PulsarOp>> sequencer; private OpSequence<OpDispenser<PulsarOp>> sequencer;
@ -39,6 +50,55 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
super(activityDef); super(activityDef);
} }
private void initPulsarAdmin() {
PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder()
.serviceHttpUrl(webSvcUrl);
try {
String authPluginClassName =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label);
String authParams =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label);
String useTlsStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label);
boolean useTls = BooleanUtils.toBoolean(useTlsStr);
String tlsTrustCertsFilePath =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
String tlsAllowInsecureConnectionStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
String tlsHostnameVerificationEnableStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder = adminBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
adminBuilder = adminBuilder
.useKeyStoreTls(useTls)
.allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
adminBuilder = adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
pulsarAdmin = adminBuilder.build();
} catch (PulsarClientException e) {
logger.error("Fail to create PulsarAdmin from global configuration!");
throw new RuntimeException("Fail to create PulsarAdmin from global configuration!");
}
}
@Override @Override
public void initActivity() { public void initActivity() {
super.initActivity(); super.initActivity();
@ -47,10 +107,17 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
executeTimer = ActivityMetrics.timer(activityDef, "execute"); executeTimer = ActivityMetrics.timer(activityDef, "execute");
bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties");
String pulsarClntConfFile =
activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile); clientConf = new PulsarNBClientConf(pulsarClntConfFile);
serviceUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); pulsarSvcUrl =
activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
webSvcUrl =
activityDef.getParams().getOptionalString("web_url").orElse("pulsar://localhost:8080");
initPulsarAdmin();
pulsarCache = new PulsarSpaceCache(this); pulsarCache = new PulsarSpaceCache(this);
@ -81,10 +148,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
return clientConf; return clientConf;
} }
public String getPulsarServiceUrl() { public String getPulsarSvcUrl() {
return serviceUrl; return pulsarSvcUrl;
} }
public String getWebSvcUrl() { return webSvcUrl; }
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
public Timer getBindTimer() { public Timer getBindTimer() {
return bindTimer; return bindTimer;
} }

View File

@ -2,14 +2,15 @@ package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang.StringUtils; import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.client.impl.ProducerImpl;
import java.util.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -31,23 +32,41 @@ public class PulsarSpace {
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
protected final String spaceName; private final String spaceName;
protected final PulsarNBClientConf pulsarNBClientConf; private final PulsarNBClientConf pulsarNBClientConf;
protected final String pulsarSvcUrl; private final String pulsarSvcUrl;
private final String webSvcUrl;
private final PulsarAdmin pulsarAdmin;
protected PulsarClient pulsarClient = null; private final Set<String> pulsarClusterMetadata = new HashSet<>();
protected Schema<?> pulsarSchema = null;
public PulsarSpace(String name, PulsarNBClientConf pulsarClientConf, String pulsarSvcUrl) { private PulsarClient pulsarClient = null;
private Schema<?> pulsarSchema = null;
public PulsarSpace(String name,
PulsarNBClientConf pulsarClientConf,
String pulsarSvcUrl,
String webSvcUrl,
PulsarAdmin pulsarAdmin) {
this.spaceName = name; this.spaceName = name;
this.pulsarNBClientConf = pulsarClientConf; this.pulsarNBClientConf = pulsarClientConf;
this.pulsarSvcUrl = pulsarSvcUrl; this.pulsarSvcUrl = pulsarSvcUrl;
this.webSvcUrl = webSvcUrl;
this.pulsarAdmin = pulsarAdmin;
createPulsarClientFromConf(); createPulsarClientFromConf();
createPulsarSchemaFromConf(); createPulsarSchemaFromConf();
try {
List<String> stringList = pulsarAdmin.clusters().getClusters();
CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator());
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get Pulsar cluster metadata!");
}
} }
protected void createPulsarClientFromConf() { private void createPulsarClientFromConf() {
ClientBuilder clientBuilder = PulsarClient.builder(); ClientBuilder clientBuilder = PulsarClient.builder();
try { try {
@ -65,7 +84,7 @@ public class PulsarSpace {
} }
} }
protected void createPulsarSchemaFromConf() { private void createPulsarSchemaFromConf() {
Object value = pulsarNBClientConf.getSchemaConfValue("schema.type"); Object value = pulsarNBClientConf.getSchemaConfValue("schema.type");
String schemaType = (value != null) ? value.toString() : ""; String schemaType = (value != null) ? value.toString() : "";
@ -81,9 +100,7 @@ public class PulsarSpace {
} }
} }
public PulsarClient getPulsarClient() { public PulsarClient getPulsarClient() { return pulsarClient; }
return pulsarClient;
}
public PulsarNBClientConf getPulsarClientConf() { public PulsarNBClientConf getPulsarClientConf() {
return pulsarNBClientConf; return pulsarNBClientConf;
@ -93,6 +110,15 @@ public class PulsarSpace {
return pulsarSchema; return pulsarSchema;
} }
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
public String getPulsarSvcUrl() {
return pulsarSvcUrl;
}
public String getWebSvcUrl() { return webSvcUrl; }
public Set<String> getPulsarClusterMetadata() { return pulsarClusterMetadata; }
////////////////////////////////////// //////////////////////////////////////
// Producer Processing --> start // Producer Processing --> start
@ -256,7 +282,8 @@ public class PulsarSpace {
if (!StringUtils.isBlank(effectiveSubscriptionStr)) { if (!StringUtils.isBlank(effectiveSubscriptionStr)) {
if (!PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionStr)) { if (!PulsarActivityUtil.isValidSubscriptionType(effectiveSubscriptionStr)) {
throw new RuntimeException("Consumer::Invalid subscription type: " + effectiveSubscriptionStr); throw new RuntimeException("Consumer::Invalid subscription type (\"" +
effectiveSubscriptionStr + "\"). \nValid subscription types: " + PulsarActivityUtil.getValidSubscriptionTypeList());
} else { } else {
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr); subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
} }

View File

@ -1,8 +1,5 @@
package io.nosqlbench.driver.pulsar; package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -25,7 +22,13 @@ public class PulsarSpaceCache {
public PulsarSpace getPulsarSpace(String name) { public PulsarSpace getPulsarSpace(String name) {
return clientScopes.computeIfAbsent(name, spaceName -> return clientScopes.computeIfAbsent(name, spaceName ->
new PulsarSpace(spaceName, activity.getPulsarConf(), activity.getPulsarServiceUrl())); new PulsarSpace(
spaceName,
activity.getPulsarConf(),
activity.getPulsarSvcUrl(),
activity.getWebSvcUrl(),
activity.getPulsarAdmin()
));
} }
public PulsarActivity getActivity() { public PulsarActivity getActivity() {

View File

@ -0,0 +1,53 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.Set;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminMapper extends PulsarOpMapper {
private final LongFunction<Set<String>> adminRolesFunc;
private final LongFunction<Set<String>> allowedClustersFunc;
private final LongFunction<String> tenantFunc;
private final LongFunction<String> namespaceFunc;
public PulsarAdminMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) {
super(cmdTpl, clientSpace);
this.adminRolesFunc = adminRolesFunc;
this.allowedClustersFunc = allowedClustersFunc;
this.tenantFunc = tenantFunc;
this.namespaceFunc = namespaceFunc;
}
@Override
public PulsarOp apply(long value) {
Set<String> adminRoleSet = adminRolesFunc.apply(value);
Set<String> allowedClusterSet = allowedClustersFunc.apply(value);
String tenant = tenantFunc.apply(value);
String namespace = namespaceFunc.apply(value);
return new PulsarAdminOp(
clientSpace,
adminRoleSet,
allowedClusterSet,
tenant,
namespace);
}
}

View File

@ -0,0 +1,86 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class);
private final PulsarSpace clientSpace;
private final Set<String> adminRoleSet;
private final Set<String> allowedClusterSet;
private final String tenant;
private final String namespace;
public PulsarAdminOp(PulsarSpace clientSpace,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant,
String namespace) {
this.clientSpace = clientSpace;
this.adminRoleSet = adminRoleSet;
this.allowedClusterSet = allowedClusterSet;
this.tenant = tenant;
this.namespace = namespace;
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
int statusCode = e.getStatusCode();
// 409 conflict: resource already exists
if ( (statusCode >= 400) && (statusCode != 409) ) {
throw new RuntimeException(finalErrMsg);
}
}
@Override
public void run() {
if (StringUtils.isBlank(tenant) && !StringUtils.isBlank(namespace)) {
throw new RuntimeException("Can't create a namespace without a tenant!");
}
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
if (!StringUtils.isBlank(tenant)) {
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {
tenantInfo.setAllowedClusters(allowedClusterSet);
}
else {
tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata());
}
try {
pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar tenant: " + tenant);
}
}
if (!StringUtils.isBlank(namespace)) {
try {
pulsarAdmin.namespaces().createNamespace(tenant + "/" + namespace);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar namespace: " + tenant + "/" + namespace);
}
}
}
}

View File

@ -11,6 +11,9 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Reader;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongFunction; import java.util.function.LongFunction;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -63,12 +66,21 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private LongFunction<PulsarOp> resolve() { private LongFunction<PulsarOp> resolve() {
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
if (cmdTpl.containsKey("topic_url")) { if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
} }
// Global parameter: topic_uri // Global parameter: topic_uri (applies only to non-Admin API)
LongFunction<String> topicUriFunc; LongFunction<String> topicUriFunc = (l) -> null;
// Global parameter: async_api (applies only to non-Admin API)
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (!StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label)) {
if (cmdTpl.containsKey("topic_uri")) { if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) { if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'."); throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
@ -95,12 +107,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
+ "/" + cmdTpl.getOr("namespace", l, "default") + "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, ""); + "/" + cmdTpl.getOr("topic", l, "");
} }
} else {
topicUriFunc = (l) -> null;
} }
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc;
if (cmdTpl.containsKey("async_api")) { if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api")) if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api")); asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
@ -109,18 +117,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
} else { } else {
asyncApiFunc = (l) -> false; asyncApiFunc = (l) -> false;
} }
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must have a valid value!");
} }
String stmtOpType = cmdTpl.getStatic("optype");
// TODO: Complete implementation for websocket-producer and managed-ledger // TODO: Complete implementation for websocket-producer and managed-ledger
if /*( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_TENANT.label) ) { if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label) ) {
return resolveCreateTenant(clientSpace); return resolveAdminRequest(clientSpace);
} else if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.CREATE_NAMESPACE.label) ) { } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveCreateNameSpace(clientSpace);
} else if*/ (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc); return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc); return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc);
@ -137,6 +139,63 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
} }
} }
private LongFunction<PulsarOp> resolveAdminRequest(PulsarSpace clientSpace) {
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
}
LongFunction<Set<String>> adminRolesFunc;
Set<String> roleSet = new HashSet<>();
if (cmdTpl.isStatic("admin_roles")) {
// "admin_roles" includes comma-separated admin roles:
// e.g. role1, role2
String adminRolesStr = cmdTpl.getStatic("admin_roles");
String[] roleArr = adminRolesStr.split(",");
Set<String> stringSet = new HashSet<>(Arrays.asList(roleArr));
roleSet.addAll(stringSet);
}
adminRolesFunc = (l) -> roleSet;
LongFunction<Set<String>> allowedClustersFunc;
Set<String> clusterSet = new HashSet<>();
if (cmdTpl.isStatic("allowed_clusters")) {
// "allowed_cluster" includes comma-separated cluster names:
// e.g. cluster1, cluster2
String allowedClustersStr = cmdTpl.getStatic("allowed_clusters");
String[] clusterArr = allowedClustersStr.split(",");
Set<String> stringSet = new HashSet<>(Arrays.asList(clusterArr));
clusterSet.addAll(stringSet);
}
allowedClustersFunc = (l) -> clusterSet;
LongFunction<String> tenantFunc;
if (cmdTpl.isStatic("tenant")) {
tenantFunc = (l) -> cmdTpl.getStatic("tenant");
} else if (cmdTpl.isDynamic("tenant")) {
tenantFunc = (l) -> cmdTpl.getDynamic("tenant", l);
} else {
tenantFunc = (l) -> null;
}
LongFunction<String> namespaceFunc;
if (cmdTpl.isStatic("namespace")) {
namespaceFunc = (l) -> cmdTpl.getStatic("namespace");
} else if (cmdTpl.isDynamic("namespace")) {
namespaceFunc = (l) -> cmdTpl.getDynamic("namespace", l);
} else {
namespaceFunc = (l) -> null;
}
return new PulsarAdminMapper(
cmdTpl,
clientSpace,
adminRolesFunc,
allowedClustersFunc,
tenantFunc,
namespaceFunc);
}
private LongFunction<PulsarOp> resolveMsgSend( private LongFunction<PulsarOp> resolveMsgSend(
PulsarSpace clientSpace, PulsarSpace clientSpace,
LongFunction<String> topic_uri_func, LongFunction<String> topic_uri_func,
@ -194,47 +253,47 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
) { ) {
// Topic list (multi-topic) // Topic list (multi-topic)
LongFunction<String> topic_names_func; LongFunction<String> topic_names_func;
if (cmdTpl.isStatic("topic-names")) { if (cmdTpl.isStatic("topic_names")) {
topic_names_func = (l) -> cmdTpl.getStatic("topic-names"); topic_names_func = (l) -> cmdTpl.getStatic("topic_names");
} else if (cmdTpl.isDynamic("topic-names")) { } else if (cmdTpl.isDynamic("topic_names")) {
topic_names_func = (l) -> cmdTpl.getDynamic("topic-names", l); topic_names_func = (l) -> cmdTpl.getDynamic("topic_names", l);
} else { } else {
topic_names_func = (l) -> null; topic_names_func = (l) -> null;
} }
// Topic pattern (multi-topic) // Topic pattern (multi-topic)
LongFunction<String> topics_pattern_func; LongFunction<String> topics_pattern_func;
if (cmdTpl.isStatic("topics-pattern")) { if (cmdTpl.isStatic("topics_pattern")) {
topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern"); topics_pattern_func = (l) -> cmdTpl.getStatic("topics_pattern");
} else if (cmdTpl.isDynamic("topics-pattern")) { } else if (cmdTpl.isDynamic("topics_pattern")) {
topics_pattern_func = (l) -> cmdTpl.getDynamic("topics-pattern", l); topics_pattern_func = (l) -> cmdTpl.getDynamic("topics_pattern", l);
} else { } else {
topics_pattern_func = (l) -> null; topics_pattern_func = (l) -> null;
} }
LongFunction<String> subscription_name_func; LongFunction<String> subscription_name_func;
if (cmdTpl.isStatic("subscription-name")) { if (cmdTpl.isStatic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getStatic("subscription-name"); subscription_name_func = (l) -> cmdTpl.getStatic("subscription_name");
} else if (cmdTpl.isDynamic("subscription-name")) { } else if (cmdTpl.isDynamic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getDynamic("subscription-name", l); subscription_name_func = (l) -> cmdTpl.getDynamic("subscription_name", l);
} else { } else {
subscription_name_func = (l) -> null; subscription_name_func = (l) -> null;
} }
LongFunction<String> subscription_type_func; LongFunction<String> subscription_type_func;
if (cmdTpl.isStatic("subscription-type")) { if (cmdTpl.isStatic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getStatic("subscription-type"); subscription_type_func = (l) -> cmdTpl.getStatic("subscription_type");
} else if (cmdTpl.isDynamic("subscription-type")) { } else if (cmdTpl.isDynamic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getDynamic("subscription-type", l); subscription_type_func = (l) -> cmdTpl.getDynamic("subscription_type", l);
} else { } else {
subscription_type_func = (l) -> null; subscription_type_func = (l) -> null;
} }
LongFunction<String> consumer_name_func; LongFunction<String> consumer_name_func;
if (cmdTpl.isStatic("consumer-name")) { if (cmdTpl.isStatic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getStatic("consumer-name"); consumer_name_func = (l) -> cmdTpl.getStatic("consumer_name");
} else if (cmdTpl.isDynamic("consumer-name")) { } else if (cmdTpl.isDynamic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getDynamic("consumer-name", l); consumer_name_func = (l) -> cmdTpl.getDynamic("consumer_name", l);
} else { } else {
consumer_name_func = (l) -> null; consumer_name_func = (l) -> null;
} }
@ -259,19 +318,19 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> async_api_func LongFunction<Boolean> async_api_func
) { ) {
LongFunction<String> reader_name_func; LongFunction<String> reader_name_func;
if (cmdTpl.isStatic("reader-name")) { if (cmdTpl.isStatic("reader_name")) {
reader_name_func = (l) -> cmdTpl.getStatic("reader-name"); reader_name_func = (l) -> cmdTpl.getStatic("reader_name");
} else if (cmdTpl.isDynamic("reader-name")) { } else if (cmdTpl.isDynamic("reader_name")) {
reader_name_func = (l) -> cmdTpl.getDynamic("reader-name", l); reader_name_func = (l) -> cmdTpl.getDynamic("reader_name", l);
} else { } else {
reader_name_func = (l) -> null; reader_name_func = (l) -> null;
} }
LongFunction<String> start_msg_pos_str_func; LongFunction<String> start_msg_pos_str_func;
if (cmdTpl.isStatic("start-msg-position")) { if (cmdTpl.isStatic("start_msg_position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start-msg-position"); start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start_msg_position");
} else if (cmdTpl.isDynamic("start-msg-position")) { } else if (cmdTpl.isDynamic("start_msg_position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start-msg-position", l); start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start_msg_position", l);
} else { } else {
start_msg_pos_str_func = (l) -> null; start_msg_pos_str_func = (l) -> null;
} }

View File

@ -17,6 +17,7 @@ import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64; import java.util.Base64;
import java.util.HashMap; import java.util.HashMap;
import java.util.stream.Collectors;
public class PulsarActivityUtil { public class PulsarActivityUtil {
@ -25,8 +26,7 @@ public class PulsarActivityUtil {
// Supported message operation types // Supported message operation types
// TODO: websocket-producer and managed-ledger // TODO: websocket-producer and managed-ledger
public enum OP_TYPES { public enum OP_TYPES {
CREATE_TENANT("create-tenant"), ADMIN("admin"),
CREATE_NAMESPACE("create-namespace"),
BATCH_MSG_SEND_START("batch-msg-send-start"), BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"), BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"), BATCH_MSG_SEND_END("batch-msg-send-end"),
@ -163,10 +163,10 @@ public class PulsarActivityUtil {
} }
public enum SUBSCRIPTION_TYPE { public enum SUBSCRIPTION_TYPE {
exclusive("exclusive"), Exclusive("Exclusive"),
failover("failover"), Failover("Failover"),
shared("shared"), Shared("Shared"),
key_shared("key_shared"); Key_Shared("Key_Shared");
public final String label; public final String label;
@ -176,7 +176,10 @@ public class PulsarActivityUtil {
} }
public static boolean isValidSubscriptionType(String item) { public static boolean isValidSubscriptionType(String item) {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item));
}
public static String getValidSubscriptionTypeList() {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(Object::toString).collect(Collectors.joining(", "));
} }
/////// ///////

View File

@ -46,8 +46,7 @@ public class PulsarNBClientConf {
FileBasedConfigurationBuilder<FileBasedConfiguration> builder = FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class) new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
.configure(params.properties() .configure(params.properties()
.setFileName(fileName) .setFileName(fileName));
.setListDelimiterHandler(new DefaultListDelimiterHandler(',')));
Configuration config = builder.getConfiguration(); Configuration config = builder.getConfiguration();

View File

@ -10,14 +10,20 @@
# 2) Avro for messages with schema # 2) Avro for messages with schema
schema.type=avro schema.type=avro
schema.definition=file://<file/path/to/iot-example.avsc> schema.definition=file://<file/path/to/iot-example.avsc>
### Pulsar client related configurations - client.xxx ### Pulsar client related configurations - client.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#client # http://pulsar.apache.org/docs/en/client-libraries-java/#client
client.connectionTimeoutMs=5000 client.connectionTimeoutMs=5000
### Producer related configurations (global) - producer.xxx ### Producer related configurations (global) - producer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
producer.producerName= producer.producerName=
producer.topicName= producer.topicName=
producer.sendTimeoutMs= producer.sendTimeoutMs=
### Consumer related configurations (global) - consumer.xxx ### Consumer related configurations (global) - consumer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
consumer.topicNames= consumer.topicNames=
@ -26,6 +32,8 @@ consumer.subscriptionName=
consumer.subscriptionType= consumer.subscriptionType=
consumer.consumerName= consumer.consumerName=
consumer.receiverQueueSize= consumer.receiverQueueSize=
### Reader related configurations (global) - reader.xxx ### Reader related configurations (global) - reader.xxx
# https://pulsar.apache.org/docs/en/client-libraries-java/#reader # https://pulsar.apache.org/docs/en/client-libraries-java/#reader
# - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file> # - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file>

View File

@ -10,7 +10,7 @@ bindings:
# sensor_type: # sensor_type:
reading_time: ToDateTime(); reading_time: ToDateTime();
reading_value: ToFloat(100); reading_value: ToFloat(100);
topic: Template("topic-{}",Mod(TEMPLATE(tenants,10)L)); # tenant:
# document level parameters that apply to all Pulsar client types: # document level parameters that apply to all Pulsar client types:
params: params:
@ -21,14 +21,14 @@ params:
blocks: blocks:
- name: admin-block - name: admin-block
tags: tags:
phase: create-tenant-namespace phase: admin-api
statements: statements:
- name: s1 - name: s1
optype: create-tenant optype: admin
admin_roles:
allowed_clusters:
tenant: "{tenant}" tenant: "{tenant}"
- name: s2 namespace: "default"
optype: create-namespace
namespace: "{namespace}"
- name: batch-producer-block - name: batch-producer-block
tags: tags:
@ -76,9 +76,9 @@ blocks:
statements: statements:
- name: s1 - name: s1
optype: msg-consume optype: msg-consume
topic_names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest" topic_names:
topics_pattern: "public/default/.*" topics_pattern:
subscription_name: subscription_name: "mynbsub_test"
subscription_type: subscription_type:
consumer_name: consumer_name:

View File

@ -287,7 +287,7 @@ as below:
- name: s1 - name: s1
optype: batch-msg-send-start optype: batch-msg-send-start
# For batch producer, "producer_name" should be associated with batch start # For batch producer, "producer_name" should be associated with batch start
batch_producer_name: {batch_producer_name} # batch_producer_name: {batch_producer_name}
ratio: 1 ratio: 1
- name: s2 - name: s2
optype: batch-msg-send optype: batch-msg-send
@ -321,9 +321,9 @@ ratios: 1, <batch_num>, 1.
and payload to be put in the batch. and payload to be put in the batch.
* (Mandatory) **optype (batch-msg-send)** is the statement identifier * (Mandatory) **optype (batch-msg-send)** is the statement identifier
for this statement for this statement
* (Optional) **msg-key**, when provided, specifies the key of the * (Optional) **msg_key**, when provided, specifies the key of the
generated message generated message
* (Mandatory) **msg-payload** specifies the payload of the generated * (Mandatory) **msg_payload** specifies the payload of the generated
message message
* (Optional) **ratio**, when provided, specifies the batch size (how * (Optional) **ratio**, when provided, specifies the batch size (how
many messages to be put in one batch). If not provided, it is many messages to be put in one batch). If not provided, it is
@ -365,9 +365,9 @@ This command block only has 1 statements (s1):
this statement this statement
* (Optional) **producer_name**, when provided, specifies the Pulsar * (Optional) **producer_name**, when provided, specifies the Pulsar
producer name that is associated with the message production. producer name that is associated with the message production.
* (Optional) **msg-key**, when provided, specifies the key of the * (Optional) **msg_key**, when provided, specifies the key of the
generated message generated message
* (Mandatory) **msg-payload** specifies the payload of the generated * (Mandatory) **msg_payload** specifies the payload of the generated
message message
### 1.4.4. Consumer Command Block ### 1.4.4. Consumer Command Block
@ -397,8 +397,7 @@ This command block only has 1 statements (s1):
* (Mandatory) **optype (msg-consume)** is the statement identifier for * (Mandatory) **optype (msg-consume)** is the statement identifier for
this statement this statement
* (Optional) **topic_names**, when provided, specifies multiple topic * (Optional) **topic_names**, when provided, specifies multiple topic
names from which to consume messages. Default to document level names from which to consume messages for multi-topic message consumption.
parameter **topic_uri**.
* (Optional) **topics_pattern**, when provided, specifies pulsar * (Optional) **topics_pattern**, when provided, specifies pulsar
topic regex pattern for multi-topic message consumption topic regex pattern for multi-topic message consumption
* (Mandatory) **subscription_name** specifies subscription name. * (Mandatory) **subscription_name** specifies subscription name.
@ -407,6 +406,10 @@ This command block only has 1 statements (s1):
* (Optional) **consumer_name**, when provided, specifies the * (Optional) **consumer_name**, when provided, specifies the
associated consumer name. associated consumer name.
**NOTE 1**: when both **topic_names** and **topics_pattern** are provided, **topic_names** takes precedence over **topics_pattern**.
**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**.
### 1.4.5. Reader Command Block ### 1.4.5. Reader Command Block
This is the regular Pulsar reader command block that reads one Pulsar This is the regular Pulsar reader command block that reads one Pulsar
@ -455,12 +458,12 @@ Pulsar driver provides 2 schema support modes, via the global level schema
related settings as below: related settings as below:
* Avro schema: * Avro schema:
```properties ```properties
shcema.type= avro schema.type= avro
schema.definition= file:///<file/path/to/the/definition/file> schema.definition= file:///<file/path/to/the/definition/file>
``` ```
* Default byte[] schema: * Default byte[] schema:
```properties ```properties
shcema.type= schema.type=
schema.definition= schema.definition=
``` ```