diff --git a/driver-jms/pom.xml b/driver-jms/pom.xml
new file mode 100644
index 000000000..0b9e28275
--- /dev/null
+++ b/driver-jms/pom.xml
@@ -0,0 +1,91 @@
+
+ 4.0.0
+
+
+ mvn-defaults
+ io.nosqlbench
+ 4.15.45-SNAPSHOT
+ ../mvn-defaults
+
+
+ driver-jms
+ jar
+ ${project.artifactId}
+
+
+ A JMS driver for nosqlbench. This provides the ability to inject synthetic data
+ into a pulsar system via JMS 2.0 compatibile APIs.
+
+ NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster
+ as the potential JMS Destination
+
+
+
+
+
+ datastax-releases-local
+ DataStax Local Releases
+ https://repo.sjc.dsinternal.org/artifactory/datastax-snapshots-local/
+
+ false
+
+
+ true
+
+
+
+
+
+
+
+ io.nosqlbench
+ engine-api
+ 4.15.45-SNAPSHOT
+
+
+
+ io.nosqlbench
+ driver-stdout
+ 4.15.45-SNAPSHOT
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+ provided
+
+
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.4
+
+
+
+
+ org.apache.commons
+ commons-configuration2
+ 2.7
+
+
+
+
+ com.datastax.oss
+ pulsar-jms
+ 1.0.0-ALPHA
+
+
+
+
+
+
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java
new file mode 100644
index 000000000..4c3273627
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java
@@ -0,0 +1,68 @@
+package io.nosqlbench.driver.jms;
+
+import com.codahale.metrics.Timer;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.engine.api.activityapi.core.SyncAction;
+import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.LongFunction;
+
+public class JmsAction implements SyncAction {
+
+ private final static Logger logger = LogManager.getLogger(JmsAction.class);
+
+ private final JmsActivity activity;
+ private final int slot;
+
+ int maxTries;
+
+ public JmsAction(JmsActivity activity, int slot) {
+ this.activity = activity;
+ this.slot = slot;
+ this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
+ }
+
+ @Override
+ public void init() { }
+
+ @Override
+ public int runCycle(long cycle) {
+ // let's fail the action if some async operation failed
+ activity.failOnAsyncOperationFailure();
+
+ long start = System.nanoTime();
+
+ JmsOp jmsOp;
+ try (Timer.Context ctx = activity.getBindTimer().time()) {
+ LongFunction readyJmsOp = activity.getSequencer().get(cycle);
+ jmsOp = readyJmsOp.apply(cycle);
+ } catch (Exception bindException) {
+ // if diagnostic mode ...
+ activity.getErrorhandler().handleError(bindException, cycle, 0);
+ throw new RuntimeException(
+ "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
+ );
+ }
+
+ for (int i = 0; i < maxTries; i++) {
+ Timer.Context ctx = activity.getExecuteTimer().time();
+ try {
+ // it is up to the jmsOp to call Context#close when the activity is executed
+ // this allows us to track time for async operations
+ jmsOp.run(ctx::close);
+ break;
+ } catch (RuntimeException err) {
+ ErrorDetail errorDetail = activity
+ .getErrorhandler()
+ .handleError(err, cycle, System.nanoTime() - start);
+ if (!errorDetail.isRetryable()) {
+ break;
+ }
+ }
+ }
+
+ return 0;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java
new file mode 100644
index 000000000..396982bbf
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java
@@ -0,0 +1,165 @@
+package io.nosqlbench.driver.jms;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
+import io.nosqlbench.driver.jms.conn.JmsConnInfo;
+import io.nosqlbench.driver.jms.conn.JmsPulsarConnInfo;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.driver.jms.util.JmsUtil;
+import io.nosqlbench.driver.jms.util.PulsarConfig;
+import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
+import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.engine.api.activityimpl.OpDispenser;
+import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
+import io.nosqlbench.engine.api.metrics.ActivityMetrics;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.jms.Destination;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class JmsActivity extends SimpleActivity {
+
+ private final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>();
+
+ private String jmsProviderType;
+ private JmsConnInfo jmsConnInfo;
+
+ private JMSContext jmsContext;
+
+ private OpSequence> sequence;
+ private volatile Throwable asyncOperationFailure;
+ private NBErrorHandler errorhandler;
+
+ private Timer bindTimer;
+ private Timer executeTimer;
+ private Counter bytesCounter;
+ private Histogram messagesizeHistogram;
+
+ public JmsActivity(ActivityDef activityDef) {
+ super(activityDef);
+ }
+
+ @Override
+ public void initActivity() {
+ super.initActivity();
+
+ // default JMS type: Pulsar
+ // - currently this is the only supported JMS provider
+ jmsProviderType =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PROVIDER_TYPE_KEY_STR)
+ .orElse(JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label);
+
+ // "Pulsar" as the JMS provider
+ if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
+
+ String webSvcUrl =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR)
+ .orElse("http://localhost:8080");
+ String pulsarSvcUrl =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR)
+ .orElse("pulsar://localhost:6650");
+
+ if (StringUtils.isAnyBlank(webSvcUrl, pulsarSvcUrl)) {
+ throw new RuntimeException("For \"" + JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label + "\" type, " +
+ "\"" + JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR + "\" and " +
+ "\"" + JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR + "\" parameters are manadatory!");
+ }
+
+ // Check if extra Pulsar config. file is in place
+ // - default file: "pulsar_config.properties" under the current directory
+ String pulsarCfgFile =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR)
+ .orElse(JmsUtil.JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME);
+
+ PulsarConfig pulsarConfig = new PulsarConfig(pulsarCfgFile);
+
+ jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, webSvcUrl, pulsarSvcUrl, pulsarConfig);
+ }
+ else {
+ throw new RuntimeException("Unsupported JMS driver type : " + jmsProviderType);
+ }
+
+ PulsarConnectionFactory factory;
+ try {
+ factory = new PulsarConnectionFactory(jmsConnInfo.getJmsConnConfig());
+ this.jmsContext = factory.createContext();
+ } catch (JMSException e) {
+ throw new RuntimeException(
+ "Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!");
+ }
+
+ bindTimer = ActivityMetrics.timer(activityDef, "bind");
+ executeTimer = ActivityMetrics.timer(activityDef, "execute");
+ bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
+ messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
+
+ if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
+ this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this));
+ }
+
+ setDefaultsFromOpSequence(sequence);
+ onActivityDefUpdate(activityDef);
+
+ this.errorhandler = new NBErrorHandler(
+ () -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
+ this::getExceptionMetrics
+ );
+ }
+
+ /**
+ * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
+ */
+ public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) {
+ String encodedTopicStr =
+ JmsUtil.encode(jmsDestinationType, destName);
+ Destination destination = jmsDestinations.get(encodedTopicStr);
+
+ if ( destination == null ) {
+ // TODO: should we match Persistent/Non-peristent JMS Delivery mode with
+ // Pulsar Persistent/Non-prsistent topic?
+ if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.QUEUE.label)) {
+ destination = jmsContext.createQueue(destName);
+ } else if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.TOPIC.label)) {
+ destination = jmsContext.createTopic(destName);
+ }
+
+ jmsDestinations.put(encodedTopicStr, destination);
+ }
+
+ return destination;
+ }
+
+ @Override
+ public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
+ public OpSequence> getSequencer() { return sequence; }
+
+ public String getJmsProviderType() { return jmsProviderType; }
+ public JmsConnInfo getJmsConnInfo() { return jmsConnInfo; }
+ public JMSContext getJmsContext() { return jmsContext; }
+
+ public Timer getBindTimer() { return bindTimer; }
+ public Timer getExecuteTimer() { return this.executeTimer; }
+ public Counter getBytesCounter() { return bytesCounter; }
+ public Histogram getMessagesizeHistogram() { return messagesizeHistogram; }
+
+ public NBErrorHandler getErrorhandler() { return errorhandler; }
+
+ public void failOnAsyncOperationFailure() {
+ if (asyncOperationFailure != null) {
+ throw new RuntimeException(asyncOperationFailure);
+ }
+ }
+ public void asyncOperationFailed(Throwable ex) {
+ this.asyncOperationFailure = ex;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java
new file mode 100644
index 000000000..0a49a8717
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java
@@ -0,0 +1,32 @@
+package io.nosqlbench.driver.jms;
+
+import io.nosqlbench.engine.api.activityapi.core.Action;
+import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
+import io.nosqlbench.engine.api.activityapi.core.ActivityType;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.nb.annotations.Service;
+
+@Service(value = ActivityType.class, selector = "jms")
+public class JmsActivityType implements ActivityType {
+ @Override
+ public ActionDispenser getActionDispenser(JmsActivity activity) {
+ return new PulsarJmsActionDispenser(activity);
+ }
+
+ @Override
+ public JmsActivity getActivity(ActivityDef activityDef) {
+ return new JmsActivity(activityDef);
+ }
+
+ private static class PulsarJmsActionDispenser implements ActionDispenser {
+ private final JmsActivity activity;
+ public PulsarJmsActionDispenser(JmsActivity activity) {
+ this.activity = activity;
+ }
+
+ @Override
+ public Action getAction(int slot) {
+ return new JmsAction(activity, slot);
+ }
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java
new file mode 100644
index 000000000..70be982c8
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java
@@ -0,0 +1,74 @@
+package io.nosqlbench.driver.jms;
+
+import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.driver.jms.util.JmsHeader;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+import io.nosqlbench.driver.jms.util.JmsUtil;
+import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
+import io.nosqlbench.engine.api.activityimpl.OpDispenser;
+import io.nosqlbench.engine.api.templating.CommandTemplate;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.LongFunction;
+import java.util.stream.Collectors;
+
+abstract public class ReadyJmsOp implements OpDispenser {
+
+ protected final OpTemplate opTpl;
+ protected final CommandTemplate cmdTpl;
+ protected final JmsActivity jmsActivity;
+
+ protected final String stmtOpType;
+ protected LongFunction asyncApiFunc;
+ protected LongFunction jmsDestinationTypeFunc;
+
+ protected final LongFunction opFunc;
+
+ public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
+ this.opTpl = opTemplate;
+ this.cmdTpl = new CommandTemplate(opTpl);
+ this.jmsActivity = jmsActivity;
+
+ if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
+ throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
+ }
+ this.stmtOpType = cmdTpl.getStatic("optype");
+
+ // Global/Doc-level parameter: async_api
+ if (cmdTpl.containsKey(JmsUtil.ASYNC_API_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.ASYNC_API_KEY_STR)) {
+ boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.ASYNC_API_KEY_STR));
+ this.asyncApiFunc = (l) -> value;
+ } else {
+ throw new RuntimeException("\"" + JmsUtil.ASYNC_API_KEY_STR + "\" parameter cannot be dynamic!");
+ }
+ }
+
+ // Global/Doc-level parameter: jms_desitation_type
+ // - queue: point-to-point
+ // - topic: pub/sub
+ if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
+ jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR);
+ } else {
+ throw new RuntimeException("\"" + JmsUtil.JMS_DESTINATION_TYPE_KEY_STR + "\" parameter cannot be dynamic!");
+ }
+ }
+
+ this.opFunc = resolveJms();
+ }
+
+ public JmsOp apply(long value) { return opFunc.apply(value); }
+
+ abstract LongFunction resolveJms();
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java
new file mode 100644
index 000000000..c6e8f431d
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java
@@ -0,0 +1,248 @@
+package io.nosqlbench.driver.jms;
+
+import io.nosqlbench.driver.jms.ops.JmsMsgReadMapper;
+import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+import io.nosqlbench.driver.jms.util.JmsUtil;
+import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.LongFunction;
+import java.util.stream.Collectors;
+
+public class ReadyPulsarJmsOp extends ReadyJmsOp {
+
+ public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
+ super(opTemplate, jmsActivity);
+ }
+
+ public LongFunction resolveJms() {
+ // Global/Doc-level parameter: topic_uri
+ LongFunction topicUriFunc = (l) -> null;
+ if (cmdTpl.containsKey(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
+ topicUriFunc = (l) -> cmdTpl.getStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR);
+ } else {
+ topicUriFunc = (l) -> cmdTpl.getDynamic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR, l);
+ }
+ }
+
+ // Global: JMS destination
+ LongFunction jmsDestinationFunc;
+ try {
+ LongFunction finalTopicUriFunc = topicUriFunc;
+ jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(
+ jmsDestinationTypeFunc.apply(l),
+ finalTopicUriFunc.apply(l));
+ }
+ catch (JMSRuntimeException ex) {
+ throw new RuntimeException("Unable to create JMS destination!");
+ }
+
+ if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) {
+ return resolveMsgSend(asyncApiFunc, jmsDestinationFunc);
+ } else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) {
+ return resolveMsgRead(asyncApiFunc, jmsDestinationFunc);
+ } else {
+ throw new RuntimeException("Unsupported JMS operation type");
+ }
+ }
+
+ private LongFunction resolveMsgSend(
+ LongFunction async_api_func,
+ LongFunction jmsDestinationFunc
+ ) {
+ JmsHeaderLongFunc jmsHeaderLongFunc = new JmsHeaderLongFunc();
+
+ // JMS header: delivery mode
+ LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
+ msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label));
+ }
+ else {
+ msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc);
+
+ // JMS header: message priority
+ LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
+ msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label));
+ }
+ else {
+ msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc);
+
+ // JMS header: message TTL
+ LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
+ msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label));
+ }
+ else {
+ msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc);
+
+ // JMS header: message delivery delay
+ LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
+ msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label));
+ }
+ else {
+ msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc);
+
+ // JMS header: disable message timestamp
+ LongFunction disableMsgTimestampFunc = (l) -> false;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
+ disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label));
+ }
+ else {
+ disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc);
+
+ // JMS header: disable message ID
+ LongFunction disableMsgIdFunc = (l) -> false;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
+ disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label));
+ }
+ else {
+ disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc);
+
+ // JMS message properties
+ String jmsMsgPropertyListStr = "";
+ if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
+ jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR);
+ } else {
+ throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!");
+ }
+ }
+
+ Map jmsMsgProperties = new HashMap<>();
+ if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) {
+ jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";"))
+ .map(s -> s.split("=", 2))
+ .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : ""));
+ }
+
+ LongFunction msgBodyFunc;
+ if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
+ msgBodyFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR);
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
+ msgBodyFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR, l);
+ } else {
+ msgBodyFunc = (l) -> null;
+ }
+ } else {
+ throw new RuntimeException("JMS message send:: \"msg_body\" field must be specified!");
+ }
+
+ return new JmsMsgSendMapper(
+ jmsActivity,
+ async_api_func,
+ jmsDestinationFunc,
+ jmsHeaderLongFunc,
+ jmsMsgProperties,
+ msgBodyFunc);
+ }
+
+ private LongFunction resolveMsgRead(
+ LongFunction async_api_func,
+ LongFunction jmsDestinationFunc
+ ) {
+ // For Pulsar JMS, make "durable" as the default
+ LongFunction jmsConsumerDurableFunc = (l) -> true;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
+ jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
+ jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR, l));
+ }
+ }
+
+ LongFunction jmsConsumerSharedFunc = (l) -> true;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
+ jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
+ jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR, l));
+ }
+ }
+
+ LongFunction jmsMsgSubscriptionFunc = (l) -> "";
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
+ jmsMsgSubscriptionFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR);
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
+ jmsMsgSubscriptionFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR, l);
+ }
+ }
+
+ LongFunction jmsMsgReadSelectorFunc = (l) -> "";
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
+ jmsMsgReadSelectorFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR);
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
+ jmsMsgReadSelectorFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR, l);
+ }
+ }
+
+ LongFunction jmsMsgNoLocalFunc = (l) -> true;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
+ jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
+ jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR, l));
+ }
+ }
+
+ LongFunction jmsReadTimeoutFunc = (l) -> 0L;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
+ jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
+ jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR, l));
+ }
+ }
+
+ return new JmsMsgReadMapper(
+ jmsActivity,
+ async_api_func,
+ jmsDestinationFunc,
+ jmsConsumerDurableFunc,
+ jmsConsumerSharedFunc,
+ jmsMsgSubscriptionFunc,
+ jmsMsgReadSelectorFunc,
+ jmsMsgNoLocalFunc,
+ jmsReadTimeoutFunc);
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java
new file mode 100644
index 000000000..e52408106
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java
@@ -0,0 +1,21 @@
+package io.nosqlbench.driver.jms.conn;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JmsConnInfo {
+
+ protected final String jmsProviderType;
+ protected final Map jmsConnConfig;
+
+ protected JmsConnInfo(String jmsProviderType) {
+ this.jmsProviderType = jmsProviderType;
+ this.jmsConnConfig = new HashMap<>();
+ }
+
+ public Map getJmsConnConfig() { return this.jmsConnConfig; }
+ public void resetJmsConnConfig() { this.jmsConnConfig.clear(); }
+ public void addJmsConnConfigItems(Map cfgItems) { this.jmsConnConfig.putAll(cfgItems); }
+ public void addJmsConnConfigItem(String key, Object value) { this.jmsConnConfig.put(key, value); }
+ public void removeJmsConnConfigItem(String key) { this.jmsConnConfig.remove(key); }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java
new file mode 100644
index 000000000..4d623527b
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java
@@ -0,0 +1,42 @@
+package io.nosqlbench.driver.jms.conn;
+
+import io.nosqlbench.driver.jms.util.PulsarConfig;
+
+import java.util.Map;
+
+public class JmsPulsarConnInfo extends JmsConnInfo {
+
+ private final String webSvcUrl;
+ private final String pulsarSvcUrl;
+ private final PulsarConfig extraPulsarConfig;
+
+ public JmsPulsarConnInfo(String jmsProviderType, String webSvcUrl, String pulsarSvcUrl, PulsarConfig pulsarConfig) {
+ super(jmsProviderType);
+
+ this.webSvcUrl = webSvcUrl;
+ this.pulsarSvcUrl = pulsarSvcUrl;
+ this.extraPulsarConfig = pulsarConfig;
+
+ this.addJmsConnConfigItem("webServiceUrl", this.webSvcUrl);
+ this.addJmsConnConfigItem("brokerServiceUrl", this.pulsarSvcUrl);
+
+ Map clientCfgMap = this.extraPulsarConfig.getClientConfMap();
+ if (!clientCfgMap.isEmpty()) {
+ this.addJmsConnConfigItems(clientCfgMap);
+ }
+
+ Map producerCfgMap = this.extraPulsarConfig.getProducerConfMap();
+ if (!producerCfgMap.isEmpty()) {
+ this.addJmsConnConfigItem("producerConfig", producerCfgMap);
+ }
+
+ Map consumerCfgMap = this.extraPulsarConfig.getConsumerConfMap();
+ if (!consumerCfgMap.isEmpty()) {
+ this.addJmsConnConfigItem("consumerConfig", consumerCfgMap);
+ }
+ }
+
+ public String getWebSvcUrl() { return this.webSvcUrl; }
+ public String getPulsarSvcUrl() { return this.pulsarSvcUrl; }
+ public PulsarConfig getExtraPulsarConfig() { return this.extraPulsarConfig; }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java
new file mode 100644
index 000000000..91a5ca5ba
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java
@@ -0,0 +1,72 @@
+package io.nosqlbench.driver.jms.ops;
+
+import io.nosqlbench.driver.jms.JmsActivity;
+
+import javax.jms.Destination;
+import java.util.function.LongFunction;
+
+/**
+ * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
+ * enough state to define a pulsar operation such that it can be executed, measured, and possibly
+ * retried if needed.
+ *
+ * This function doesn't act *as* the operation. It merely maps the construction logic into
+ * a simple functional type, given the component functions.
+ *
+ * For additional parameterization, the command template is also provided.
+ */
+public class JmsMsgReadMapper extends JmsOpMapper {
+
+ private final LongFunction jmsConsumerDurableFunc;
+ private final LongFunction jmsConsumerSharedFunc;
+ private final LongFunction jmsMsgSubscriptionFunc;
+ private final LongFunction jmsMsgReadSelectorFunc;
+ private final LongFunction jmsMsgNoLocalFunc;
+ private final LongFunction jmsReadTimeoutFunc;
+
+ public JmsMsgReadMapper(JmsActivity jmsActivity,
+ LongFunction asyncApiFunc,
+ LongFunction jmsDestinationFunc,
+ LongFunction jmsConsumerDurableFunc,
+ LongFunction jmsConsumerSharedFunc,
+ LongFunction jmsMsgSubscriptionFunc,
+ LongFunction jmsMsgReadSelectorFunc,
+ LongFunction jmsMsgNoLocalFunc,
+ LongFunction jmsReadTimeoutFunc) {
+ super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
+
+ this.jmsConsumerDurableFunc = jmsConsumerDurableFunc;
+ this.jmsConsumerSharedFunc = jmsConsumerSharedFunc;
+ this.jmsMsgSubscriptionFunc = jmsMsgSubscriptionFunc;
+ this.jmsMsgReadSelectorFunc = jmsMsgReadSelectorFunc;
+ this.jmsMsgNoLocalFunc = jmsMsgNoLocalFunc;
+ this.jmsReadTimeoutFunc = jmsReadTimeoutFunc;
+ }
+
+ @Override
+ public JmsOp apply(long value) {
+ boolean asyncApi = asyncApiFunc.apply(value);
+ Destination jmsDestination = jmsDestinationFunc.apply(value);
+ boolean jmsConsumerDurable = jmsConsumerDurableFunc.apply(value);
+ boolean jmsConsumerShared = jmsConsumerSharedFunc.apply(value);
+ String jmsMsgSubscription = jmsMsgSubscriptionFunc.apply(value);
+ String jmsMsgReadSelector = jmsMsgReadSelectorFunc.apply(value);
+ boolean jmsMsgNoLocal = jmsMsgNoLocalFunc.apply(value);
+ long jmsReadTimeout = jmsReadTimeoutFunc.apply(value);
+
+ // Default to NO read timeout
+ if (jmsReadTimeout < 0) jmsReadTimeout = 0;
+
+ return new JmsMsgReadOp(
+ jmsActivity,
+ asyncApi,
+ jmsDestination,
+ jmsConsumerDurable,
+ jmsConsumerShared,
+ jmsMsgSubscription,
+ jmsMsgReadSelector,
+ jmsMsgNoLocal,
+ jmsReadTimeout
+ );
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java
new file mode 100644
index 000000000..e83ff826c
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java
@@ -0,0 +1,114 @@
+package io.nosqlbench.driver.jms.ops;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import io.nosqlbench.driver.jms.JmsActivity;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import javax.jms.*;
+
+public class JmsMsgReadOp extends JmsTimeTrackOp {
+
+ private final static Logger logger = LogManager.getLogger(JmsMsgReadOp.class);
+
+ private final JmsActivity jmsActivity;
+ private final boolean asyncJmsOp;
+ private final Destination jmsDestination;
+
+ private final JMSContext jmsContext;
+ private final JMSConsumer jmsConsumer;
+ private final boolean jmsConsumerDurable;
+ private final boolean jmsConsumerShared;
+ private final String jmsMsgSubscrption;
+ private final String jmsMsgReadSelector;
+ private final boolean jmsMsgNoLocal;
+ private final long jmsReadTimeout;
+
+ private final Counter bytesCounter;
+ private final Histogram messagesizeHistogram;
+
+ public JmsMsgReadOp(JmsActivity jmsActivity,
+ boolean asyncJmsOp,
+ Destination jmsDestination,
+ boolean jmsConsumerDurable,
+ boolean jmsConsumerShared,
+ String jmsMsgSubscrption,
+ String jmsMsgReadSelector,
+ boolean jmsMsgNoLocal,
+ long jmsReadTimeout) {
+ this.jmsActivity = jmsActivity;
+ this.asyncJmsOp = asyncJmsOp;
+ this.jmsDestination = jmsDestination;
+ this.jmsConsumerDurable = jmsConsumerDurable;
+ this.jmsConsumerShared = jmsConsumerShared;
+ this.jmsMsgReadSelector = jmsMsgReadSelector;
+ this.jmsMsgSubscrption = jmsMsgSubscrption;
+ this.jmsMsgNoLocal = jmsMsgNoLocal;
+ this.jmsReadTimeout = jmsReadTimeout;
+
+ this.jmsContext = jmsActivity.getJmsContext();
+ this.jmsConsumer = createJmsConsumer();
+
+ this.bytesCounter = jmsActivity.getBytesCounter();
+ this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
+ }
+
+ private JMSConsumer createJmsConsumer() {
+ JMSConsumer jmsConsumer;
+
+ try {
+ if (jmsConsumerDurable) {
+ if (jmsConsumerShared)
+ jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
+ else
+ jmsConsumer = jmsContext.createDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector, jmsMsgNoLocal);
+ } else {
+ if (jmsConsumerShared)
+ jmsConsumer = jmsContext.createSharedConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
+ else
+ jmsConsumer = jmsContext.createConsumer(jmsDestination, jmsMsgReadSelector, jmsMsgNoLocal);
+ }
+ }
+ catch (InvalidDestinationRuntimeException invalidDestinationRuntimeException) {
+ throw new RuntimeException("Failed to create JMS consumer: invalid destination!");
+ }
+ catch (InvalidSelectorRuntimeException invalidSelectorRuntimeException) {
+ throw new RuntimeException("Failed to create JMS consumer: invalid message selector!");
+ }
+ catch (JMSRuntimeException jmsRuntimeException) {
+ jmsRuntimeException.printStackTrace();
+ throw new RuntimeException("Failed to create JMS consumer: runtime internal error!");
+ }
+
+ // TODO: async consumer
+// if (this.asyncJmsOp) {
+// jmsConsumer.setMessageListener();
+// }
+
+ return jmsConsumer;
+ }
+
+ @Override
+ public void run() {
+ // FIXME: jmsReadTimeout being 0 behaves like receiveNoWait() instead of waiting indefinitley
+ Message receivedMsg = jmsConsumer.receive(jmsReadTimeout);
+ try {
+ if (receivedMsg != null) {
+ receivedMsg.acknowledge();
+ byte[] receivedMsgBody = receivedMsg.getBody(byte[].class);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("received msg-payload={}", new String(receivedMsgBody));
+ }
+
+ int messagesize = receivedMsgBody.length;
+ bytesCounter.inc(messagesize);
+ messagesizeHistogram.update(messagesize);
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ throw new RuntimeException("Failed to acknowledge the received JMS message.");
+ }
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java
new file mode 100644
index 000000000..1714c4212
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java
@@ -0,0 +1,55 @@
+package io.nosqlbench.driver.jms.ops;
+
+import io.nosqlbench.driver.jms.JmsActivity;
+import io.nosqlbench.driver.jms.util.JmsHeader;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+
+import javax.jms.Destination;
+import java.util.Map;
+import java.util.function.LongFunction;
+
+/**
+ * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
+ * enough state to define a pulsar operation such that it can be executed, measured, and possibly
+ * retried if needed.
+ *
+ * This function doesn't act *as* the operation. It merely maps the construction logic into
+ * a simple functional type, given the component functions.
+ *
+ * For additional parameterization, the command template is also provided.
+ */
+public class JmsMsgSendMapper extends JmsOpMapper {
+ private final JmsHeaderLongFunc jmsHeaderLongFunc;
+ private final Map jmsMsgProperties;
+ private final LongFunction msgBodyFunc;
+
+ public JmsMsgSendMapper(JmsActivity jmsActivity,
+ LongFunction asyncApiFunc,
+ LongFunction jmsDestinationFunc,
+ JmsHeaderLongFunc jmsHeaderLongFunc,
+ Map jmsMsgProperties,
+ LongFunction msgBodyFunc) {
+ super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
+
+ this.jmsHeaderLongFunc = jmsHeaderLongFunc;
+ this.jmsMsgProperties = jmsMsgProperties;
+ this.msgBodyFunc = msgBodyFunc;
+ }
+
+ @Override
+ public JmsOp apply(long value) {
+ boolean asyncApi = asyncApiFunc.apply(value);
+ Destination jmsDestination = jmsDestinationFunc.apply(value);
+ JmsHeader jmsHeader = (JmsHeader)jmsHeaderLongFunc.apply(value);
+ String msgBody = msgBodyFunc.apply(value);
+
+ return new JmsMsgSendOp(
+ jmsActivity,
+ asyncApi,
+ jmsDestination,
+ jmsHeader,
+ jmsMsgProperties,
+ msgBody
+ );
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java
new file mode 100644
index 000000000..f23ac403e
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java
@@ -0,0 +1,124 @@
+package io.nosqlbench.driver.jms.ops;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import io.nosqlbench.driver.jms.JmsActivity;
+import io.nosqlbench.driver.jms.util.JmsHeader;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import javax.jms.*;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+
+public class JmsMsgSendOp extends JmsTimeTrackOp {
+
+ private final static Logger logger = LogManager.getLogger(JmsMsgSendOp.class);
+
+ private final JmsActivity jmsActivity;
+ private final boolean asyncJmsOp;
+ private final Destination jmsDestination;
+ private final JmsHeader jmsHeader;
+ private final Map jmsMsgProperties;
+
+ private final JMSContext jmsContext;
+ private final JMSProducer jmsProducer;
+ private final String msgBody;
+
+ private final Counter bytesCounter;
+ private final Histogram messagesizeHistogram;
+
+ public JmsMsgSendOp(JmsActivity jmsActivity,
+ boolean asyncJmsOp,
+ Destination jmsDestination,
+ JmsHeader jmsHeader,
+ Map jmsMsgProperties,
+ String msgBody) {
+ this.jmsActivity = jmsActivity;
+ this.asyncJmsOp = asyncJmsOp;
+ this.jmsDestination = jmsDestination;
+
+ this.jmsHeader = jmsHeader;
+ this.jmsMsgProperties = jmsMsgProperties;
+ this.msgBody = msgBody;
+
+ if (!jmsHeader.isValidHeader()) {
+ throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText());
+ }
+
+ if ((msgBody == null) || msgBody.isEmpty()) {
+ throw new RuntimeException("JMS message body can't be empty!");
+ }
+
+ this.jmsContext = jmsActivity.getJmsContext();
+ this.jmsProducer = createJmsProducer();
+
+ this.bytesCounter = jmsActivity.getBytesCounter();
+ this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
+ }
+
+ private JMSProducer createJmsProducer() {
+ JMSProducer jmsProducer = this.jmsContext.createProducer();
+
+ jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode());
+ jmsProducer.setPriority(this.jmsHeader.getMsgPriority());
+ jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay());
+ jmsProducer.setDisableMessageTimestamp(this.jmsHeader.isDisableMsgTimestamp());
+ jmsProducer.setDisableMessageID(this.jmsHeader.isDisableMsgId());
+
+ if (this.asyncJmsOp) {
+ jmsProducer.setAsync(new CompletionListener() {
+ @Override
+ public void onCompletion(Message msg) {
+ try {
+ byte[] msgBody = msg.getBody(byte[].class);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Async message send success - message body: " + new String(msgBody));
+ }
+ }
+ catch (JMSException jmsException) {
+ jmsException.printStackTrace();
+ logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
+ }
+ }
+
+ @Override
+ public void onException(Message msg, Exception e) {
+ try {
+ byte[] msgBody = msg.getBody(byte[].class);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Async message send failure - message body: " + new String(msgBody));
+ }
+ }
+ catch (JMSException jmsException) {
+ jmsException.printStackTrace();
+ logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
+ }
+ }
+ });
+ }
+
+ for (Map.Entry entry : jmsMsgProperties.entrySet()) {
+ jmsProducer.setProperty(entry.getKey(), entry.getValue());
+ }
+
+ return jmsProducer;
+ }
+
+ @Override
+ public void run() {
+ int messageSize;
+ try {
+ byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8);
+ messageSize = msgBytes.length;
+ jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8));
+
+ messagesizeHistogram.update(messageSize);
+ bytesCounter.inc(messageSize);
+ }
+ catch (Exception ex) {
+ logger.error("Failed to send JMS message - " + msgBody);
+ }
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java
new file mode 100644
index 000000000..bd06c6bca
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java
@@ -0,0 +1,13 @@
+package io.nosqlbench.driver.jms.ops;
+
+/**
+ * Base type of all Pulsar Operations including Producers and Consumers.
+ */
+public interface JmsOp {
+
+ /**
+ * Execute the operation, invoke the timeTracker when the operation ended.
+ * The timeTracker can be invoked in a separate thread, it is only used for metrics.
+ */
+ void run(Runnable timeTracker);
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java
new file mode 100644
index 000000000..04dac1450
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java
@@ -0,0 +1,23 @@
+package io.nosqlbench.driver.jms.ops;
+
+import io.nosqlbench.driver.jms.JmsActivity;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+
+import javax.jms.Destination;
+import java.util.Map;
+import java.util.function.LongFunction;
+
+public abstract class JmsOpMapper implements LongFunction {
+ protected final JmsActivity jmsActivity;
+ protected final LongFunction asyncApiFunc;
+ protected final LongFunction jmsDestinationFunc;
+
+ public JmsOpMapper(JmsActivity jmsActivity,
+ LongFunction asyncApiFunc,
+ LongFunction jmsDestinationFunc)
+ {
+ this.jmsActivity = jmsActivity;
+ this.asyncApiFunc = asyncApiFunc;
+ this.jmsDestinationFunc = jmsDestinationFunc;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java
new file mode 100644
index 000000000..58bd578d6
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java
@@ -0,0 +1,17 @@
+package io.nosqlbench.driver.jms.ops;
+
+/**
+ * Base type of all Sync Pulsar Operations including Producers and Consumers.
+ */
+public abstract class JmsTimeTrackOp implements JmsOp {
+
+ public void run(Runnable timeTracker) {
+ try {
+ this.run();
+ } finally {
+ timeTracker.run();
+ }
+ }
+
+ public abstract void run();
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java
new file mode 100644
index 000000000..236eb95ee
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java
@@ -0,0 +1,66 @@
+package io.nosqlbench.driver.jms.util;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.commons.lang.StringUtils;
+
+import javax.jms.DeliveryMode;
+
+@Setter
+@Getter
+@AllArgsConstructor
+@ToString
+public class JmsHeader {
+ private int deliveryMode;
+ private int msgPriority;
+ private long msgTtl;
+ private long msgDeliveryDelay;
+ private boolean disableMsgTimestamp;
+ private boolean disableMsgId;
+
+ public boolean isValidDeliveryMode() {
+ return (deliveryMode == DeliveryMode.NON_PERSISTENT) || (deliveryMode == DeliveryMode.PERSISTENT);
+ }
+
+ public boolean isValidPriority() {
+ return (msgPriority >= 0) && (msgPriority <= 9);
+ }
+
+ public boolean isValidTtl() {
+ return msgTtl >= 0;
+ }
+
+ public boolean isValidDeliveryDelay() {
+ return msgTtl >= 0;
+ }
+
+ public boolean isValidHeader() {
+ return isValidDeliveryMode()
+ && isValidPriority()
+ && isValidTtl()
+ && isValidDeliveryDelay();
+ }
+
+ public String getInvalidJmsHeaderMsgText() {
+ StringBuilder sb = new StringBuilder();
+
+ if (!isValidDeliveryMode())
+ sb.append("delivery mode - " + deliveryMode + "; ");
+ if (!isValidPriority())
+ sb.append("message priority - " + msgPriority + "; ");
+ if (!isValidTtl())
+ sb.append("message TTL - " + msgTtl + "; ");
+ if (!isValidDeliveryDelay())
+ sb.append("message delivery delay - " + msgDeliveryDelay + "; ");
+
+ String invalidMsgText = sb.toString();
+ if (StringUtils.length(invalidMsgText) > 0)
+ invalidMsgText = StringUtils.substringBeforeLast(invalidMsgText, ";");
+ else
+ invalidMsgText = "none";
+
+ return "Invalid JMS header values: " + invalidMsgText;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java
new file mode 100644
index 000000000..c094a6c81
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java
@@ -0,0 +1,31 @@
+package io.nosqlbench.driver.jms.util;
+
+import lombok.*;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import java.util.function.LongFunction;
+
+@Setter
+@Getter
+@NoArgsConstructor
+public class JmsHeaderLongFunc implements LongFunction {
+ private LongFunction deliveryModeFunc;
+ private LongFunction msgPriorityFunc;
+ private LongFunction msgTtlFunc;
+ private LongFunction msgDeliveryDelayFunc;
+ private LongFunction disableMsgTimestampFunc;
+ private LongFunction disableMsgIdFunc;
+
+ @Override
+ public Object apply(long value) {
+ return new JmsHeader(
+ (deliveryModeFunc != null) ? deliveryModeFunc.apply(value) : DeliveryMode.PERSISTENT,
+ (msgPriorityFunc != null) ? msgPriorityFunc.apply(value) : Message.DEFAULT_PRIORITY,
+ (msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_TIME_TO_LIVE,
+ (msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_DELIVERY_DELAY,
+ (disableMsgTimestampFunc != null) ? disableMsgTimestampFunc.apply(value) : false,
+ (disableMsgIdFunc != null) ? disableMsgIdFunc.apply(value) : false
+ );
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java
new file mode 100644
index 000000000..456939417
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java
@@ -0,0 +1,119 @@
+package io.nosqlbench.driver.jms.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Base64;
+
+public class JmsUtil {
+
+ private final static Logger logger = LogManager.getLogger(JmsUtil.class);
+
+ // Supported JMS provider type
+ public enum JMS_PROVIDER_TYPES {
+ PULSAR("pulsar");
+
+ public final String label;
+ JMS_PROVIDER_TYPES(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidJmsProviderType(String type) {
+ return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type));
+ }
+
+ /////
+ // NB command line parameters
+ // - JMS provider type
+ public final static String JMS_PROVIDER_TYPE_KEY_STR = "provider_type";
+
+ /// Only applicable when the provider is "Pulsar"
+ // - Pulsar configuration properties file
+ public final static String JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR = "pulsar_cfg_file";
+ public final static String JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME = "pulsar_config.properties";
+ // - Pulsar web url
+ public final static String JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR = "web_url";
+ // - Pulsar service url
+ public final static String JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR = "service_url";
+
+
+ public final static String ASYNC_API_KEY_STR = "async_api";
+ public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type";
+
+ ///// JMS Producer
+ // Supported JMS provider type
+ public enum JMS_MSG_HEADER_KEYS {
+ DELIVERY_MODE("jms_producer_header_msg_delivery_mode"),
+ PRIORITY("jms_producer_header_msg_priority"),
+ TTL("jms_producer_header_msg_ttl"),
+ DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"),
+ DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"),
+ DISABLE_ID("jms_producer_header_disable_msg_id");
+
+ public final String label;
+ JMS_MSG_HEADER_KEYS(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidJmsHeaderKey(String type) {
+ return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type));
+ }
+ public final static String JMS_PRODUCER_MSG_PROPERTY_KEY_STR = "jms_producer_msg_properties";
+ public final static String JMS_PRODUCER_MSG_BODY_KEY_STR = "msg_body";
+
+ ///// JMS Consumer
+ public final static String JMS_CONSUMER_DURABLE_KEY_STR = "jms_consumer_msg_durable";
+ public final static String JMS_CONSUMER_SHARED_KEY_STR = "jms_consumer_msg_shared";
+ public final static String JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR = "jms_consumer_subscription";
+ public final static String JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR = "jms_consumer_msg_read_selector";
+ public final static String JMS_CONSUMER_MSG_NOLOCAL_KEY_STR = "jms_consumer_msg_nolocal";
+ public final static String JMS_CONSUMER_READ_TIMEOUT_KEY_STR = "jms_consumer_msg_read_timeout";
+
+
+ // Only applicable to Pulsar JMS provider
+ public final static String PULSAR_JMS_TOPIC_URI_KEY_STR = "pulsar_topic_uri";
+
+ // Supported message operation types
+ public enum OP_TYPES {
+ MSG_SEND("msg_send"),
+ MSG_READ("msg_read");
+
+ public final String label;
+ OP_TYPES(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidClientType(String type) {
+ return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
+ }
+
+ // JMS Destination Types
+ public enum JMS_DESTINATION_TYPES {
+ QUEUE("queue"),
+ TOPIC("topic");
+
+ public final String label;
+ JMS_DESTINATION_TYPES(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidJmsDestinationType(String type) {
+ return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type));
+ }
+
+ public static String encode(String... strings) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String str : strings) {
+ if (!StringUtils.isBlank(str))
+ stringBuilder.append(str).append("::");
+ }
+
+ String concatenatedStr =
+ StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
+
+ return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
+ }
+}
+
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java
new file mode 100644
index 000000000..1d5cc6d5b
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java
@@ -0,0 +1,99 @@
+package io.nosqlbench.driver.jms.util;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.FileBasedConfiguration;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
+import org.apache.commons.configuration2.builder.fluent.Parameters;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class PulsarConfig {
+ private final static Logger logger = LogManager.getLogger(PulsarConfig.class);
+
+ public static final String SCHEMA_CONF_PREFIX = "schema";
+ public static final String CLIENT_CONF_PREFIX = "client";
+ public static final String PRODUCER_CONF_PREFIX = "producer";
+ public static final String CONSUMER_CONF_PREFIX = "consumer";
+
+ private final Map schemaConfMap = new HashMap<>();
+ private final Map clientConfMap = new HashMap<>();
+ private final Map producerConfMap = new HashMap<>();
+ private final Map consumerConfMap = new HashMap<>();
+
+ public PulsarConfig(String fileName) {
+ File file = new File(fileName);
+
+ try {
+ String canonicalFilePath = file.getCanonicalPath();
+
+ Parameters params = new Parameters();
+
+ FileBasedConfigurationBuilder builder =
+ new FileBasedConfigurationBuilder(PropertiesConfiguration.class)
+ .configure(params.properties()
+ .setFileName(fileName));
+
+ Configuration config = builder.getConfiguration();
+
+ // Get schema specific configuration settings
+ for (Iterator it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+
+ // Get client connection specific configuration settings
+ for (Iterator it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+
+ // Get producer specific configuration settings
+ for (Iterator it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+
+ // Get consumer specific configuration settings
+ for (Iterator it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+ } catch (IOException ioe) {
+ logger.error("Can't read the specified config properties file: " + fileName);
+ ioe.printStackTrace();
+ } catch (ConfigurationException cex) {
+ logger.error("Error loading configuration items from the specified config properties file: " + fileName);
+ cex.printStackTrace();
+ }
+ }
+
+ public Map getSchemaConfMap() {
+ return this.schemaConfMap;
+ }
+ public Map getClientConfMap() {
+ return this.clientConfMap;
+ }
+ public Map getProducerConfMap() {
+ return this.producerConfMap;
+ }
+ public Map getConsumerConfMap() {
+ return this.consumerConfMap;
+ }
+}
diff --git a/driver-jms/src/main/resources/jms.md b/driver-jms/src/main/resources/jms.md
new file mode 100644
index 000000000..07dd0c5c7
--- /dev/null
+++ b/driver-jms/src/main/resources/jms.md
@@ -0,0 +1 @@
+# Overview
diff --git a/driver-jms/src/main/resources/pulsar_config.properties b/driver-jms/src/main/resources/pulsar_config.properties
new file mode 100644
index 000000000..f711535ac
--- /dev/null
+++ b/driver-jms/src/main/resources/pulsar_config.properties
@@ -0,0 +1,33 @@
+### Schema related configurations - schema.xxx
+# valid types:
+# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
+# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
+# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
+# avro, json, protobuf
+#
+# NOTE: for JMS client, Pulsar "schema" is NOT supported yet
+schema.type=
+schema.definition=
+
+
+### Pulsar client related configurations - client.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#client
+client.connectionTimeoutMs=5000
+#client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
+#client.authParams=
+#client.tlsAllowInsecureConnection=true
+client.numIoThreads=10
+client.numListenerThreads=10
+
+
+### Producer related configurations (global) - producer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
+producer.sendTimeoutMs=
+producer.blockIfQueueFull=true
+producer.maxPendingMessages=10000
+producer.batchingMaxMessages=10000
+
+
+### Consumer related configurations (global) - consumer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
+consumer.receiverQueueSize=2000
diff --git a/driver-jms/src/main/resources/pulsar_jms.yaml b/driver-jms/src/main/resources/pulsar_jms.yaml
new file mode 100644
index 000000000..755f05ed6
--- /dev/null
+++ b/driver-jms/src/main/resources/pulsar_jms.yaml
@@ -0,0 +1,89 @@
+bindings:
+ payload: NumberNameToString() #AlphaNumericString(20)
+ tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
+ namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
+ core_topic_name: Mod(5); ToString(); Prefix("t")
+
+# document level parameters that apply to all Pulsar client types:
+params:
+ ### static only
+ async_api: "true"
+
+ ### Static only
+ # Valid values: queue (point-to-point) or topic (pub-sub)
+ jms_desitation_type: "topic"
+
+ ### Static Only
+ # NOTE: ONLY relevant when the JMS provider is Pulsar
+ #pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
+ #pulsar_topic_uri: "persistent://public/default/pt100"
+ #pulsar_topic_uri: "persistent://public/default/t0"
+ pulsar_topic_uri: "persistent://public/default/pt100_10"
+ #pulsar_topic_uri: "persistent://public/default/pt200_10"
+ #pulsar_topic_uri: "persistent://public/default/pt300_10"
+ #pulsar_topic_uri: "persistent://public/default/pt400_10"
+
+blocks:
+ - name: "producer-block"
+ tags:
+ phase: "jms_producer"
+ statements:
+ - name: "s1"
+ optype: "msg_send"
+
+ ### JMS PRODUCER message header
+ ### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT
+ # - static or dynamic
+ # - Producer only
+ # Valid values: non-persistent(1), or persistent(2) - default
+ jms_producer_header_msg_delivery_mode: "2"
+ # Valid values: 0~9 (4 as default)
+ jms_producer_header_msg_priority: "4"
+ # Valid values: non-negative long; default 0 (never expires)
+ jms_producer_header_msg_ttl: "0"
+ # Valid values: non-negative long; default 0 (no delay)
+ jms_producer_header_msg_delivery_delay: "0"
+ # Valid values: true/false; default false (message timestamp is enabled)
+ jms_producer_header_disable_msg_timestamp: "false"
+ # Valid values: true/false; default false (message ID is enabled)
+ jms_producer_header_disable_msg_id: "false"
+
+ ### JMS PRODUCER message properties
+ # - static only
+ # - Producer only
+ # - In format: "key1=value1;key2=value2;..."
+ jms_producer_msg_properties: "key1=value1;key2=value2"
+
+ ### JMS PRODUCER message body
+ msg_body: "{payload}"
+
+ - name: "consumer-block"
+ tags:
+ phase: "jms_consumer"
+ statements:
+ - name: "s1"
+ optype: "msg_read"
+
+ ### JMS CONSUMER durable and shared
+ jms_consumer_msg_durable: "true"
+ jms_consumer_msg_shared: "true"
+
+ ### JMS CONSUMER subscription name
+ # - only relevant for durable consumer
+ jms_consumer_subscription: "mysub"
+
+ ### JMS CONSUMER subscription name
+ # - only relevant for unshared consumer
+ jms_consumer_nolocal: "false"
+
+ ### JMS CONSUMER message read timeout
+ # - unit: milliseconds
+ # - 0 means call blocks indefinitely
+ # - FIXME: 0 supposes to wait indefinitly; but
+ # it actually behaves like no wait at all
+ jms_consumer_msg_read_timeout: "10000"
+
+ ### JMS CONSUMER message selector
+ # - empty string means no message selector
+ # - https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
+ jms_consumer_msg_read_selector: ""
diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java
index b442cf032..0bbabd8c4 100644
--- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java
+++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java
@@ -5,7 +5,6 @@ import org.apache.commons.configuration2.FileBasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
-import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
diff --git a/nb/pom.xml b/nb/pom.xml
index ed98a0f6d..ab70b88b9 100644
--- a/nb/pom.xml
+++ b/nb/pom.xml
@@ -135,6 +135,12 @@
4.15.45-SNAPSHOT
+
+ io.nosqlbench
+ driver-jms
+ 4.15.45-SNAPSHOT
+
+
diff --git a/pom.xml b/pom.xml
index 486c4c90e..83d8b467b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
driver-jdbc
driver-cockroachdb
driver-pulsar
+ driver-jms