diff --git a/.run/NBCLI web foreground dryrun.run.xml b/.run/NBCLI web foreground dryrun.run.xml
deleted file mode 100644
index 4180a9a89..000000000
--- a/.run/NBCLI web foreground dryrun.run.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/driver-jms/pom.xml b/adapter-pulsar/pom.xml
similarity index 58%
rename from driver-jms/pom.xml
rename to adapter-pulsar/pom.xml
index c3cbcc803..2428c54fe 100644
--- a/driver-jms/pom.xml
+++ b/adapter-pulsar/pom.xml
@@ -17,61 +17,49 @@
4.0.0
+ adapter-pulsar
+ jar
+
mvn-defaultsio.nosqlbench
- 4.17.22-SNAPSHOT
+ 4.17.31-SNAPSHOT../mvn-defaults
- driver-jms
- jar${project.artifactId}
-
- A JMS driver for nosqlbench. This provides the ability to inject synthetic data
- into a pulsar system via JMS 2.0 compatibile APIs.
-
- NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster
- as the potential JMS Destination
+ A Pulsar driver for nosqlbench. This provides the ability to inject synthetic data
+ into a pulsar system.
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+ 2.10.1
+
-
io.nosqlbenchengine-api
- 4.17.22-SNAPSHOT
+ 4.17.31-SNAPSHOT
-
- org.apache.commons
- commons-lang3
- 3.12.0
+ io.nosqlbench
+ adapters-api
+ 4.17.31-SNAPSHOT
-
- org.projectlombok
- lombok
- 1.18.24
- provided
+ org.apache.pulsar
+ pulsar-client
+ ${pulsar.version}
+
+
+
+ org.apache.pulsar
+ pulsar-client-admin
+ ${pulsar.version}
@@ -88,13 +76,19 @@
2.8.0
-
+
- com.datastax.oss
- pulsar-jms
- 2.4.11
+ org.apache.avro
+ avro
+ 1.11.1
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarDriverAdapter.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarDriverAdapter.java
new file mode 100644
index 000000000..b121ca5e3
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarDriverAdapter.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar;
+
+import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
+import io.nosqlbench.engine.api.activityimpl.OpMapper;
+import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
+import io.nosqlbench.nb.annotations.Maturity;
+import io.nosqlbench.nb.annotations.Service;
+import io.nosqlbench.api.config.standard.NBConfigModel;
+import io.nosqlbench.api.config.standard.NBConfiguration;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.Function;
+
+@Service(value = DriverAdapter.class, selector = "pulsar-nb5", maturity = Maturity.Experimental)
+public class PulsarDriverAdapter extends BaseDriverAdapter {
+
+ private final static Logger logger = LogManager.getLogger(PulsarDriverAdapter.class);
+
+ @Override
+ public OpMapper getOpMapper() {
+ DriverSpaceCache extends PulsarSpace> spaceCache = getSpaceCache();
+ NBConfiguration adapterConfig = getConfiguration();
+ return new PulsarOpMapper(this, adapterConfig, spaceCache);
+ }
+
+ @Override
+ public Function getSpaceInitializer(NBConfiguration cfg) {
+ return (s) -> new PulsarSpace(s, cfg);
+ }
+
+ @Override
+ public NBConfigModel getConfigModel() {
+ return super.getConfigModel().add(PulsarSpace.getConfigModel());
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarOpMapper.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarOpMapper.java
new file mode 100644
index 000000000..babcb0349
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarOpMapper.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar;
+
+import io.nosqlbench.adapter.pulsar.dispensers.*;
+import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
+import io.nosqlbench.api.config.standard.NBConfiguration;
+import io.nosqlbench.engine.api.activityimpl.OpDispenser;
+import io.nosqlbench.engine.api.activityimpl.OpMapper;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import io.nosqlbench.engine.api.templating.TypeAndTarget;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+public class PulsarOpMapper implements OpMapper {
+
+ private final static Logger logger = LogManager.getLogger(PulsarOpMapper.class);
+
+ private final NBConfiguration cfg;
+ private final DriverSpaceCache extends PulsarSpace> cache;
+ private final DriverAdapter adapter;
+
+ public PulsarOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache extends PulsarSpace> cache) {
+ this.cfg = cfg;
+ this.cache = cache;
+ this.adapter = adapter;
+ }
+
+ @Override
+ public OpDispenser extends PulsarOp> apply(ParsedOp op) {
+ String space = op.getStaticConfigOr("space", "default");
+
+ PulsarClient pulsarClient = cache.get(space).getPulsarClient();
+ PulsarAdmin pulsarAdmin = cache.get(space).getPulsarAdmin();
+ Schema> pulsarSchema = cache.get(space).getPulsarSchema();
+
+
+
+ /*
+ * If the user provides a body element, then they want to provide the JSON or
+ * a data structure that can be converted into JSON, bypassing any further
+ * specialized type-checking or op-type specific features
+ */
+ if (op.isDefined("body")) {
+ throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
+ }
+ else {
+ TypeAndTarget opType = op.getTypeAndTarget(PulsarOpType.class, String.class);
+
+ return switch (opType.enumId) {
+ case AdminTenant ->
+ new AdminTenantOpDispenser(adapter, op, opType.targetFunction, pulsarAdmin);
+ case AdminNamespace ->
+ new AdminNamespaceOpDispenser(adapter, op, opType.targetFunction, pulsarAdmin);
+ case AdminTopic ->
+ new AdminTopicOpDispenser(adapter, op, opType.targetFunction, pulsarAdmin);
+ case MessageProduce ->
+ new MessageProducerOpDispenser(adapter, op, opType.targetFunction, pulsarClient, pulsarSchema);
+ case MessageConsume ->
+ new MessageConsumerOpDispenser(adapter, op, opType.targetFunction, pulsarClient, pulsarSchema);
+ case MessageRead ->
+ new MessageReaderOpDispenser(adapter, op, opType.targetFunction, pulsarClient, pulsarSchema);
+ };
+ }
+ }
+
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarOpType.java
similarity index 62%
rename from driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java
rename to adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarOpType.java
index 8d588faff..c4bf6c3ee 100644
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarOpType.java
@@ -14,20 +14,13 @@
* limitations under the License.
*/
-package io.nosqlbench.driver.jms.ops;
+package io.nosqlbench.adapter.pulsar;
-/**
- * Base type of all Sync Pulsar Operations including Producers and Consumers.
- */
-public abstract class JmsTimeTrackOp implements JmsOp {
-
- public void run(Runnable timeTracker) {
- try {
- this.run();
- } finally {
- timeTracker.run();
- }
- }
-
- public abstract void run();
+public enum PulsarOpType {
+ AdminTenant,
+ AdminNamespace,
+ AdminTopic,
+ MessageProduce,
+ MessageConsume,
+ MessageRead
}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarSpace.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarSpace.java
new file mode 100644
index 000000000..708007527
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarSpace.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar;
+
+import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
+import io.nosqlbench.adapter.pulsar.util.PulsarNBClientConf;
+import io.nosqlbench.api.config.standard.ConfigModel;
+import io.nosqlbench.api.config.standard.NBConfigModel;
+import io.nosqlbench.api.config.standard.NBConfiguration;
+import io.nosqlbench.api.config.standard.Param;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+
+import java.util.Map;
+
+public class PulsarSpace {
+
+ private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
+
+ private final String name;
+ private final NBConfiguration cfg;
+
+ private final String pulsarSvcUrl;
+ private final String webSvcUrl;
+
+ private PulsarNBClientConf pulsarNBClientConf;
+ private PulsarClient pulsarClient;
+ private PulsarAdmin pulsarAdmin;
+ private Schema> pulsarSchema;
+
+ public PulsarSpace(String name, NBConfiguration cfg) {
+ this.name = name;
+ this.cfg = cfg;
+
+ this.pulsarSvcUrl = cfg.get("service_url");
+ this.webSvcUrl = cfg.get("web_url");
+
+ this.pulsarNBClientConf = new PulsarNBClientConf(cfg.get("config"));
+
+ initPulsarAdminAndClientObj();
+ createPulsarSchemaFromConf();
+ }
+
+ public static NBConfigModel getConfigModel() {
+ return ConfigModel.of(PulsarSpace.class)
+ .add(Param.defaultTo("service_url", "pulsar://localhost:6650")
+ .setDescription("Pulsar broker service URL."))
+ .add(Param.defaultTo("web_url", "http://localhost:8080")
+ .setDescription("Pulsar web service URL."))
+ .add(Param.defaultTo("config", "config.properties")
+ .setDescription("Pulsar client connection configuration property file."))
+ .add(Param.defaultTo("cyclerate_per_thread", false)
+ .setDescription("Apply cycle rate per NB thread"))
+ .asReadOnly();
+ }
+
+ public String getPulsarSvcUrl() { return pulsarSvcUrl; }
+ public String getWebSvcUrl() { return webSvcUrl; }
+ public PulsarNBClientConf getPulsarNBClientConf() { return pulsarNBClientConf; }
+ public PulsarClient getPulsarClient() { return pulsarClient; }
+ public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
+ public Schema> getPulsarSchema() { return pulsarSchema; }
+
+ /**
+ * Initialize
+ * - PulsarAdmin object for adding/deleting tenant, namespace, and topic
+ * - PulsarClient object for message publishing and consuming
+ */
+ private void initPulsarAdminAndClientObj() {
+ PulsarAdminBuilder adminBuilder =
+ PulsarAdmin.builder()
+ .serviceHttpUrl(webSvcUrl);
+
+ ClientBuilder clientBuilder = PulsarClient.builder();
+
+ try {
+ Map clientConfMap = pulsarNBClientConf.getClientConfMap();
+
+ // Override "client.serviceUrl" setting in config.properties
+ clientConfMap.remove("serviceUrl");
+ clientBuilder.loadConf(clientConfMap).serviceUrl(pulsarSvcUrl);
+
+ // Pulsar Authentication
+ String authPluginClassName =
+ (String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authPulginClassName.label);
+ String authParams =
+ (String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authParams.label);
+
+ if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
+ adminBuilder.authentication(authPluginClassName, authParams);
+ clientBuilder.authentication(authPluginClassName, authParams);
+ }
+
+ boolean useTls = StringUtils.contains(pulsarSvcUrl, "pulsar+ssl");
+ if ( useTls ) {
+ String tlsHostnameVerificationEnableStr =
+ (String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
+ boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
+
+ adminBuilder
+ .enableTlsHostnameVerification(tlsHostnameVerificationEnable);
+ clientBuilder
+ .enableTlsHostnameVerification(tlsHostnameVerificationEnable);
+
+ String tlsTrustCertsFilePath =
+ (String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
+ if (!StringUtils.isBlank(tlsTrustCertsFilePath)) {
+ adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+ clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
+ }
+
+ String tlsAllowInsecureConnectionStr =
+ (String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
+ boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
+ adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
+ clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
+ }
+
+ pulsarAdmin = adminBuilder.build();
+ pulsarClient = clientBuilder.build();
+
+ } catch (PulsarClientException e) {
+ logger.error("Fail to create PulsarAdmin and/or PulsarClient object from the global configuration!");
+ throw new RuntimeException("Fail to create PulsarAdmin and/or PulsarClient object from global configuration!");
+ }
+ }
+
+ /**
+ * Get Pulsar schema from the definition string
+ */
+
+ private Schema> buildSchemaFromDefinition(String schemaTypeConfEntry,
+ String schemaDefinitionConfEntry) {
+ Object value = pulsarNBClientConf.getSchemaConfValue(schemaTypeConfEntry);
+ Object schemaDefinition = pulsarNBClientConf.getSchemaConfValue(schemaDefinitionConfEntry);
+ String schemaType = (value != null) ? value.toString() : "";
+
+ Schema> result;
+ if (PulsarAdapterUtil.isAvroSchemaTypeStr(schemaType)) {
+ String schemaDefStr = (schemaDefinition != null) ? schemaDefinition.toString() : "";
+ result = PulsarAdapterUtil.getAvroSchema(schemaType, schemaDefStr);
+ } else if (PulsarAdapterUtil.isPrimitiveSchemaTypeStr(schemaType)) {
+ result = PulsarAdapterUtil.getPrimitiveTypeSchema(schemaType);
+ } else if (PulsarAdapterUtil.isAutoConsumeSchemaTypeStr(schemaType)) {
+ result = Schema.AUTO_CONSUME();
+ } else {
+ throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " +
+ "Only primitive type, Avro type and AUTO_CONSUME are supported at the moment!");
+ }
+ return result;
+ }
+ private void createPulsarSchemaFromConf() {
+ pulsarSchema = buildSchemaFromDefinition("schema.type", "schema.definition");
+
+ // this is to allow KEY_VALUE schema
+ if (pulsarNBClientConf.hasSchemaConfKey("schema.key.type")) {
+ Schema> pulsarKeySchema = buildSchemaFromDefinition("schema.key.type", "schema.key.definition");
+ Object encodingType = pulsarNBClientConf.getSchemaConfValue("schema.keyvalue.encodingtype");
+ KeyValueEncodingType keyValueEncodingType = KeyValueEncodingType.SEPARATED;
+ if (encodingType != null) {
+ keyValueEncodingType = KeyValueEncodingType.valueOf(encodingType.toString());
+ }
+ pulsarSchema = Schema.KeyValue(pulsarKeySchema, pulsarSchema, keyValueEncodingType);
+ }
+ }
+}
+
+
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminNamespaceOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminNamespaceOpDispenser.java
new file mode 100644
index 000000000..89514d4e6
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminNamespaceOpDispenser.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.adapter.pulsar.ops.AdminNamespaceOp;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+import java.util.function.LongFunction;
+
+public class AdminNamespaceOpDispenser extends PulsarAdminOpDispenser {
+
+ public AdminNamespaceOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarAdmin pulsarAdmin) {
+ super(adapter, op, tgtNameFunc, pulsarAdmin);
+ }
+
+ @Override
+ public AdminNamespaceOp apply(long cycle) {
+ return new AdminNamespaceOp(
+ pulsarAdmin,
+ asyncApiFunc.apply(cycle),
+ adminDelOpFunc.apply(cycle),
+ tgtNameFunc.apply(cycle));
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminTenantOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminTenantOpDispenser.java
new file mode 100644
index 000000000..5bca4ec78
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminTenantOpDispenser.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.adapter.pulsar.ops.AdminTenantOp;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+import java.util.*;
+import java.util.function.LongFunction;
+
+public class AdminTenantOpDispenser extends PulsarAdminOpDispenser {
+
+ private final LongFunction> adminRolesFunc;
+ private final LongFunction> allowedClustersFunc;
+ public AdminTenantOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarAdmin pulsarAdmin) {
+ super(adapter, op, tgtNameFunc, pulsarAdmin);
+
+ adminRolesFunc = lookupStaticStrSetOpValueFunc("admin_roles");
+ allowedClustersFunc = lookupStaticStrSetOpValueFunc("allowed_clusters");
+ }
+
+ @Override
+ public AdminTenantOp apply(long cycle) {
+ return new AdminTenantOp(
+ pulsarAdmin,
+ asyncApiFunc.apply(cycle),
+ adminDelOpFunc.apply(cycle),
+ tgtNameFunc.apply(cycle),
+ adminRolesFunc.apply(cycle),
+ allowedClustersFunc.apply(cycle));
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminTopicOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminTopicOpDispenser.java
new file mode 100644
index 000000000..70af09180
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/AdminTopicOpDispenser.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.adapter.pulsar.ops.AdminTopicOp;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+import java.util.function.LongFunction;
+
+public class AdminTopicOpDispenser extends PulsarAdminOpDispenser {
+
+ private final LongFunction enablePartFunc;
+ private final LongFunction partNumFunc;
+
+ public AdminTopicOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarAdmin pulsarAdmin) {
+ super(adapter, op, tgtNameFunc, pulsarAdmin);
+
+ // Non-partitioned topic is default
+ enablePartFunc = lookupStaticBoolConfigValueFunc("enable_partition", false);
+ partNumFunc = lookupStaticIntOpValueFunc("partition_num", 1);
+ }
+
+ @Override
+ public AdminTopicOp apply(long cycle) {
+
+ return new AdminTopicOp(
+ pulsarAdmin,
+ asyncApiFunc.apply(cycle),
+ adminDelOpFunc.apply(cycle),
+ tgtNameFunc.apply(cycle),
+ enablePartFunc.apply(cycle),
+ partNumFunc.apply(cycle)
+ );
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageConsumerOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageConsumerOpDispenser.java
new file mode 100644
index 000000000..8a0cb6ba6
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageConsumerOpDispenser.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.adapter.pulsar.ops.MessageConsumerOp;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.function.LongFunction;
+
+public class MessageConsumerOpDispenser extends PulsarClientOpDispenser {
+
+ public MessageConsumerOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarClient pulsarClient,
+ Schema> pulsarSchema) {
+ super(adapter, op, tgtNameFunc, pulsarClient, pulsarSchema);
+ }
+
+ @Override
+ public MessageConsumerOp apply(long cycle) {
+ return new MessageConsumerOp(pulsarClient, pulsarSchema);
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageProducerOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageProducerOpDispenser.java
new file mode 100644
index 000000000..ef9b2547d
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageProducerOpDispenser.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.adapter.pulsar.ops.MessageProducerOp;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.function.LongFunction;
+
+public class MessageProducerOpDispenser extends PulsarClientOpDispenser {
+
+ public MessageProducerOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarClient pulsarClient,
+ Schema> pulsarSchema) {
+ super(adapter, op, tgtNameFunc, pulsarClient, pulsarSchema);
+ }
+
+ @Override
+ public MessageProducerOp apply(long cycle) {
+ return new MessageProducerOp(pulsarClient, pulsarSchema);
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageReaderOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageReaderOpDispenser.java
new file mode 100644
index 000000000..3e9f819fd
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageReaderOpDispenser.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.adapter.pulsar.ops.MessageReaderOp;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.function.LongFunction;
+
+public class MessageReaderOpDispenser extends PulsarClientOpDispenser {
+
+ public MessageReaderOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarClient pulsarClient,
+ Schema> pulsarSchema) {
+ super(adapter, op, tgtNameFunc, pulsarClient, pulsarSchema);
+ }
+
+ @Override
+ public MessageReaderOp apply(long cycle) {
+ return new MessageReaderOp(pulsarClient, pulsarSchema);
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarAdminOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarAdminOpDispenser.java
new file mode 100644
index 000000000..d18eee305
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarAdminOpDispenser.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+import java.util.function.LongFunction;
+
+public abstract class PulsarAdminOpDispenser extends PulsarBaseOpDispenser {
+
+ protected final PulsarAdmin pulsarAdmin;
+ protected final LongFunction adminDelOpFunc;
+
+ public PulsarAdminOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarAdmin pulsarAdmin) {
+ super(adapter, op, tgtNameFunc);
+ this.pulsarAdmin = pulsarAdmin;
+
+ // Creating admin objects (tenant, namespace, topic) is the default
+ this.adminDelOpFunc = lookupStaticBoolConfigValueFunc("admin_delop", false);
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarBaseOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarBaseOpDispenser.java
new file mode 100644
index 000000000..1a0350ac1
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarBaseOpDispenser.java
@@ -0,0 +1,85 @@
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import io.nosqlbench.adapter.pulsar.PulsarSpace;
+import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
+import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.*;
+import java.util.function.LongFunction;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public abstract class PulsarBaseOpDispenser extends BaseOpDispenser {
+
+ private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
+ protected final ParsedOp parsedOp;
+ protected final LongFunction asyncApiFunc;
+ protected final LongFunction tgtNameFunc;
+
+ public PulsarBaseOpDispenser(DriverAdapter adapter, ParsedOp op, LongFunction tgtNameFunc) {
+
+ super(adapter, op);
+
+ this.parsedOp = op;
+ this.tgtNameFunc = tgtNameFunc;
+ // Async API is the default
+ this.asyncApiFunc = lookupStaticBoolConfigValueFunc("async_api", true);
+ }
+
+ protected LongFunction lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) {
+ return (l) -> parsedOp.getOptionalStaticConfig(paramName, String.class)
+ .filter(Predicate.not(String::isEmpty))
+ .map(value -> BooleanUtils.toBoolean(value))
+ .orElse(defaultValue);
+ }
+
+ protected LongFunction lookupStaticIntOpValueFunc(String paramName, int defaultValue) {
+ return (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
+ .filter(Predicate.not(String::isEmpty))
+ .map(value -> NumberUtils.toInt(value))
+ .map(value -> {
+ if (value < 0) return 0;
+ else return value;
+ }).orElse(defaultValue);
+ }
+
+ protected LongFunction> lookupStaticStrSetOpValueFunc(String paramName) {
+ return (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
+ .filter(Predicate.not(String::isEmpty))
+ .map(value -> {
+ Set set = new HashSet<>();
+
+ if (StringUtils.contains(value,',')) {
+ set = Arrays.stream(value.split(","))
+ .map(String::trim)
+ .filter(Predicate.not(String::isEmpty))
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ return set;
+ }).orElse(Collections.emptySet());
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java
new file mode 100644
index 000000000..43acd0bb8
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.dispensers;
+
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.function.LongFunction;
+
+public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
+
+ protected final PulsarClient pulsarClient;
+ protected final Schema> pulsarSchema;
+
+ public PulsarClientOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ PulsarClient pulsarClient,
+ Schema> pulsarSchema) {
+ super(adapter, op, tgtNameFunc);
+ this.pulsarClient = pulsarClient;
+ this.pulsarSchema = pulsarSchema;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterInvalidParamException.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterInvalidParamException.java
new file mode 100644
index 000000000..1004a7a72
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterInvalidParamException.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.exception;
+
+public class PulsarAdapterInvalidParamException extends RuntimeException {
+
+ public PulsarAdapterInvalidParamException(String paramName, String errDesc) {
+ super("Invalid setting for parameter (" + paramName + "): " + errDesc);
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterUnexpectedException.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterUnexpectedException.java
new file mode 100644
index 000000000..4f0031fce
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterUnexpectedException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.exception;
+
+public class PulsarAdapterUnexpectedException extends RuntimeException {
+
+ public PulsarAdapterUnexpectedException(String message) {
+ super(message);
+ printStackTrace();
+ }
+ public PulsarAdapterUnexpectedException(Exception e) {
+ super(e);
+ printStackTrace();
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterUnsupportedOpException.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterUnsupportedOpException.java
new file mode 100644
index 000000000..475d358ea
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/exception/PulsarAdapterUnsupportedOpException.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.exception;
+
+public class PulsarAdapterUnsupportedOpException extends RuntimeException {
+
+ public PulsarAdapterUnsupportedOpException(String pulsarOpType) {
+ super("Unsupported Pulsar adapter operation type: \"" + pulsarOpType + "\"");
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminNamespaceOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminNamespaceOp.java
new file mode 100644
index 000000000..71e6f79ee
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminNamespaceOp.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+
+import java.util.concurrent.CompletableFuture;
+
+public class AdminNamespaceOp extends PulsarAdminOp {
+
+ private final static Logger logger = LogManager.getLogger(AdminNamespaceOp.class);
+
+ // in format: /
+ private final String nsName;
+
+ public AdminNamespaceOp(PulsarAdmin pulsarAdmin,
+ boolean asyncApi,
+ boolean adminDelOp,
+ String nsName) {
+ super(pulsarAdmin, asyncApi, adminDelOp);
+ this.nsName = nsName;
+ }
+
+ @Override
+ public Void apply(long value) {
+
+ // Do nothing if the namespace name is empty
+ if ( !StringUtils.isBlank(nsName) ) {
+
+ Namespaces namespaces = pulsarAdmin.namespaces();
+
+ // Admin API - create tenants and namespaces
+ if (!adminDelOp) {
+ try {
+ if (!asyncApi) {
+ namespaces.createNamespace(nsName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful sync creation of namespace \"{}\"", nsName);
+ }
+ } else {
+ CompletableFuture future = namespaces.createNamespaceAsync(nsName);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.trace("Successful async creation of namespace \"{}\"", nsName);
+ }
+ }).exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error("Failed async creation of namespace \"{}\"", nsName);
+ }
+ return null;
+ });
+ }
+ }
+ catch (PulsarAdminException.ConflictException ce) {
+ if (logger.isDebugEnabled()) {
+ logger.error("Namespace \"{}\" already exists - skip creation!", nsName);
+ }
+ }
+ catch (PulsarAdminException e) {
+ throw new PulsarAdapterUnexpectedException(
+ "Unexpected error when creating pulsar namespace \"" + nsName + "\"");
+ }
+ }
+ // Admin API - delete tenants and namespaces
+ else {
+ try {
+ if (!asyncApi) {
+ namespaces.deleteNamespace(nsName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful sync deletion of namespace \"{}\"", nsName);
+ }
+ } else {
+ CompletableFuture future = namespaces.deleteNamespaceAsync(nsName, true);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful sync deletion of namespace \"{}\"", nsName);
+ }
+ }).exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error("Failed async deletion of namespace \"{}\"", nsName);
+ }
+ return null;
+ });
+ }
+ }
+ catch (PulsarAdminException.NotFoundException nfe) {
+ if (logger.isDebugEnabled()) {
+ logger.error("Namespace \"{}\" doesn't exists - skip deletion!", nsName);
+ }
+ }
+ catch (PulsarAdminException e) {
+ throw new PulsarAdapterUnexpectedException(
+ "Unexpected error when deleting pulsar namespace \"" + nsName + "\"");
+ }
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminTenantOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminTenantOp.java
new file mode 100644
index 000000000..b40013ff7
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminTenantOp.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import io.nosqlbench.adapter.pulsar.PulsarDriverAdapter;
+import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.pulsar.client.admin.*;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class AdminTenantOp extends PulsarAdminOp {
+
+ private final static Logger logger = LogManager.getLogger(AdminTenantOp.class);
+
+ private final Set adminRoles;
+ private final Set allowedClusters;
+ private final String tntName;
+
+ public AdminTenantOp(PulsarAdmin pulsarAdmin,
+ boolean asyncApi,
+ boolean adminDelOp,
+ String tntName,
+ Set adminRoles,
+ Set allowedClusters) {
+ super(pulsarAdmin, asyncApi, adminDelOp);
+ this.tntName = tntName;
+ this.adminRoles = adminRoles;
+ this.allowedClusters = allowedClusters;
+ }
+
+ @Override
+ public Void apply(long value) {
+
+ // Do nothing if the tenant name is empty
+ if ( !StringUtils.isBlank(tntName) ) {
+ Tenants tenants = pulsarAdmin.tenants();
+
+ // Admin API - create tenants and namespaces
+ if (!adminDelOp) {
+ try {
+ Set existingPulsarClusters = new HashSet<>();
+ Clusters clusters = pulsarAdmin.clusters();
+ CollectionUtils.addAll(existingPulsarClusters, clusters.getClusters().listIterator());
+
+ TenantInfo tenantInfo = TenantInfo.builder()
+ .adminRoles(adminRoles)
+ .allowedClusters(!allowedClusters.isEmpty() ? allowedClusters : existingPulsarClusters)
+ .build();
+
+ if (!asyncApi) {
+ tenants.createTenant(tntName, tenantInfo);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful sync creation of tenant \"{}\"", tntName);
+ }
+ }
+ else {
+ CompletableFuture future = tenants.createTenantAsync(tntName, tenantInfo);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful async creation of tenant \"{}\"", tntName);
+ }
+ }).exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error("Failed async creation of tenant \"{}\"", tntName);
+ }
+ return null;
+ });
+ }
+ }
+ catch (PulsarAdminException.ConflictException ce) {
+ if (logger.isDebugEnabled()) {
+ logger.error("Tenant \"{}\" already exists - skip creation!", tntName);
+ }
+ }
+ catch (PulsarAdminException e) {
+ throw new PulsarAdapterUnexpectedException(
+ "Unexpected error when creating pulsar tenant \"" + tntName + "\"");
+ }
+ }
+ // Admin API - delete tenants and namespaces
+ else {
+ try {
+ Namespaces namespaces = pulsarAdmin.namespaces();
+ int nsNum = namespaces.getNamespaces(tntName).size();
+
+ // Only delete a tenant when there is no underlying namespaces
+ if ( nsNum == 0 ) {
+ if (!asyncApi) {
+ tenants.deleteTenant(tntName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful sync deletion of tenant \"{}\"", tntName);
+ }
+ }
+ else {
+ CompletableFuture future = tenants.deleteTenantAsync(tntName);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful async deletion of tenant \"{}\"", tntName);
+ }
+ }).exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error("Failed async deletion of tenant \"{}\"", tntName);
+ }
+ return null;
+ });
+ }
+ }
+ }
+ catch (PulsarAdminException.NotFoundException nfe) {
+ if (logger.isDebugEnabled()) {
+ logger.error("Tenant \"{}\" doesn't exists - skip deletion!", tntName);
+ }
+ }
+ catch (PulsarAdminException e) {
+ throw new PulsarAdapterUnexpectedException(
+ "Unexpected error when deleting pulsar tenant \"" + tntName + "\"");
+ }
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminTopicOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminTopicOp.java
new file mode 100644
index 000000000..2c3735658
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/AdminTopicOp.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class AdminTopicOp extends PulsarAdminOp {
+
+ private final static Logger logger = LogManager.getLogger(AdminTopicOp.class);
+
+ private final String topicName;
+ private final boolean enablePart;
+ private final int partNum;
+
+ public AdminTopicOp(PulsarAdmin pulsarAdmin,
+ boolean asyncApi,
+ boolean adminDelOp,
+ String topicName,
+ boolean enablePart,
+ int partNum) {
+ super(pulsarAdmin, asyncApi, adminDelOp);
+ this.topicName = topicName;
+ this.enablePart = enablePart;
+ this.partNum = partNum;
+ }
+
+ @Override
+ public Void apply(long value) {
+
+ // Do nothing if the topic name is empty
+ if ( !StringUtils.isBlank(topicName) ) {
+ Topics topics = pulsarAdmin.topics();
+
+ try {
+ // Create the topic
+ if (!adminDelOp) {
+ if (!enablePart) {
+ if (!asyncApi) {
+ topics.createNonPartitionedTopic(topicName);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful sync creation of non-partitioned topic \"{}\"", topicName);
+ }
+ } else {
+ CompletableFuture future = topics.createNonPartitionedTopicAsync(topicName);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Successful async creation of non-partitioned topic \"{}\"", topicName);
+ }
+ }).exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error("Failed async creation non-partitioned topic \"{}\"", topicName);
+ return null;
+ }
+ return null;
+ });
+ }
+ } else {
+ if (!asyncApi) {
+ topics.createPartitionedTopic(topicName, partNum);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Successful sync creation of partitioned topic \"{} (partition_num: {}\")",
+ topicName, partNum);
+ }
+ } else {
+ CompletableFuture future = topics.createPartitionedTopicAsync(topicName, partNum);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Successful async creation of partitioned topic \"{} (partition_num: {}\")",
+ topicName, partNum);
+ }
+ })
+ .exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error(
+ "Successful async creation of partitioned topic \"{} (partition_num: {}\")",
+ topicName, partNum);
+ }
+ return null;
+ });
+ }
+ }
+ }
+ // Delete the topic
+ else {
+ if (!enablePart) {
+ if (!asyncApi) {
+ topics.delete(topicName);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Successful sync deletion of non-partitioned topic \"{}\"",
+ topicName);
+ }
+ } else {
+ CompletableFuture future = topics.deleteAsync(topicName);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Successful async deletion of non-partitioned topic \"{}\"",
+ topicName);
+ }
+ })
+ .exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error(
+ "Failed async deletion of non-partitioned topic \"{}\"",
+ topicName);
+ }
+ return null;
+ });
+ }
+ } else {
+ if (!asyncApi) {
+ topics.deletePartitionedTopic(topicName);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Successful sync deletion of partitioned topic \"{} (partition_num: {}\")",
+ topicName, partNum);
+ }
+ } else {
+ CompletableFuture future = topics.deletePartitionedTopicAsync(topicName);
+ future.whenComplete((unused, throwable) -> {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Successful async deletion of partitioned topic \"{} (partition_num: {}\")",
+ topicName, partNum);
+ }
+ }).exceptionally(ex -> {
+ if (logger.isDebugEnabled()) {
+ logger.error(
+ "Failed async deletion of partitioned topic \"{} (partition_num: {}\")",
+ topicName, partNum);
+ }
+ return null;
+ });
+ }
+ }
+ }
+ }
+ catch (PulsarAdminException.NotFoundException nfe) {
+ if (logger.isDebugEnabled()) {
+ logger.error("Topic \"{}\" doesn't exists - skip deletion!", topicName);
+ }
+ }
+ catch (PulsarAdminException e) {
+ String errMsg = String.format("Unexpected error when %s pulsar topic: %s (partition enabled: %b; partition number: %d)",
+ (!adminDelOp ? "creating" : "deleting"),
+ topicName,
+ enablePart,
+ partNum);
+ throw new PulsarAdapterUnexpectedException(errMsg);
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageConsumerOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageConsumerOp.java
new file mode 100644
index 000000000..d9f28c7d3
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageConsumerOp.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+public class MessageConsumerOp extends PulsarClientOp {
+ public MessageConsumerOp(PulsarClient pulsarClient, Schema> pulsarSchema) {
+ super(pulsarClient, pulsarSchema);
+ }
+
+ @Override
+ public Object apply(long value) {
+ return null;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageProducerOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageProducerOp.java
new file mode 100644
index 000000000..664509ec8
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageProducerOp.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+public class MessageProducerOp extends PulsarClientOp {
+
+ public MessageProducerOp(PulsarClient pulsarClient, Schema> pulsarSchema) {
+ super(pulsarClient, pulsarSchema);
+ }
+
+ @Override
+ public Object apply(long value) {
+ return null;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageReaderOp.java
similarity index 60%
rename from driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java
rename to adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageReaderOp.java
index b8f227ffd..c9d47ec30 100644
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageReaderOp.java
@@ -14,16 +14,19 @@
* limitations under the License.
*/
-package io.nosqlbench.driver.jms.ops;
+package io.nosqlbench.adapter.pulsar.ops;
-/**
- * Base type of all Pulsar Operations including Producers and Consumers.
- */
-public interface JmsOp {
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
- /**
- * Execute the operation, invoke the timeTracker when the operation ended.
- * The timeTracker can be invoked in a separate thread, it is only used for metrics.
- */
- void run(Runnable timeTracker);
+public class MessageReaderOp extends PulsarClientOp {
+
+ public MessageReaderOp(PulsarClient pulsarClient, Schema> pulsarSchema) {
+ super(pulsarClient, pulsarSchema);
+ }
+
+ @Override
+ public Object apply(long value) {
+ return null;
+ }
}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarAdminOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarAdminOp.java
new file mode 100644
index 000000000..4729a8cc5
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarAdminOp.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+
+public abstract class PulsarAdminOp extends PulsarOp {
+ protected PulsarAdmin pulsarAdmin;
+ protected boolean asyncApi;
+ protected boolean adminDelOp;
+
+ public PulsarAdminOp(PulsarAdmin pulsarAdmin, boolean asyncApi, boolean adminDelOp) {
+ this.pulsarAdmin = pulsarAdmin;
+ this.asyncApi = asyncApi;
+ this.adminDelOp = adminDelOp;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarClientOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarClientOp.java
new file mode 100644
index 000000000..957616681
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarClientOp.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+
+public abstract class PulsarClientOp extends PulsarOp {
+ protected PulsarClient pulsarClient;
+ protected Schema> pulsarScheam;
+
+ public PulsarClientOp(PulsarClient pulsarClient, Schema> pulsarScheam) {
+ this.pulsarClient = pulsarClient;
+ this.pulsarScheam = pulsarScheam;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarOp.java
new file mode 100644
index 000000000..0c68c52d7
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/PulsarOp.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.pulsar.ops;
+
+import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
+
+public abstract class PulsarOp implements CycleOp