diff --git a/RELEASENOTES.md b/RELEASENOTES.md
index 74a1c6d61..12e374f0b 100644
--- a/RELEASENOTES.md
+++ b/RELEASENOTES.md
@@ -1,4 +1,2 @@
-- 5ecda12f (HEAD -> main) use built-in activity error handler wiring for http
-- d905c352 base exception metrics on name only
-- fed7a9e1 allow activities to provide an error name mapper
-- c45662ab allow NBErrorHandler to map more detailed error names
+- b0dc6ca8 (HEAD -> main, origin/main, origin/HEAD) Merge pull request #319 from yabinmeng/main
+- adfbcacb Maven central pom update
diff --git a/docsys/pom.xml b/docsys/pom.xml
index a3b4da370..c2f342d86 100644
--- a/docsys/pom.xml
+++ b/docsys/pom.xml
@@ -9,7 +9,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -18,7 +18,7 @@
io.nosqlbench
nb-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
@@ -114,7 +114,7 @@
io.nosqlbench
virtdata-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-cockroachdb/pom.xml b/driver-cockroachdb/pom.xml
index c77964208..c6644a101 100644
--- a/driver-cockroachdb/pom.xml
+++ b/driver-cockroachdb/pom.xml
@@ -5,7 +5,7 @@
io.nosqlbench
mvn-defaults
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -21,7 +21,7 @@
io.nosqlbench
driver-jdbc
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
org.postgresql
diff --git a/driver-cockroachdb/src/main/java/io/nosqlbench/activitytype/cockroachdb/CockroachActivity.java b/driver-cockroachdb/src/main/java/io/nosqlbench/activitytype/cockroachdb/CockroachActivity.java
index d818d77ed..8a8bf6e9f 100644
--- a/driver-cockroachdb/src/main/java/io/nosqlbench/activitytype/cockroachdb/CockroachActivity.java
+++ b/driver-cockroachdb/src/main/java/io/nosqlbench/activitytype/cockroachdb/CockroachActivity.java
@@ -7,7 +7,6 @@ import org.apache.logging.log4j.Logger;
import org.postgresql.ds.PGSimpleDataSource;
import javax.sql.DataSource;
-import java.sql.SQLException;
import java.util.Arrays;
public class CockroachActivity extends JDBCActivity {
@@ -17,6 +16,14 @@ public class CockroachActivity extends JDBCActivity {
super(activityDef);
}
+ // TODO provide an error handler with sane defaults including
+ // * retry on 40001 SQL state code (CockroachDB txn retry)
+ // * retry (implement exponential, to avoid stampeding herd) on timeout getting connection from connection pool
+ //
+ //@Override
+ //public NBErrorHandler getErrorHandler() {
+ //}
+
@Override
protected DataSource newDataSource() {
PGSimpleDataSource ds = new PGSimpleDataSource();
@@ -26,13 +33,17 @@ public class CockroachActivity extends JDBCActivity {
getOptionalString("serverName").
orElseThrow(() -> new RuntimeException("serverName parameter required"));
- // portNumber, user, password are optional
+ // portNumber, databaseName, user, password are optional
Integer portNumber = getParams().getOptionalInteger("portNumber").orElse(26257);
+ String databaseName = getParams().getOptionalString("databaseName").orElse(null);
String user = getParams().getOptionalString("user").orElse(null);
String password = getParams().getOptionalString("password").orElse(null);
ds.setServerNames(new String[]{serverName});
ds.setPortNumbers(new int[]{portNumber});
+ if (databaseName != null) {
+ ds.setDatabaseName(databaseName);
+ }
if (user != null) {
ds.setUser(user);
}
@@ -40,17 +51,13 @@ public class CockroachActivity extends JDBCActivity {
ds.setPassword(password);
}
- LOGGER.debug("Final DataSource fields"
+ LOGGER.debug("Final DataSource fields:"
+ " serverNames=" + Arrays.toString(ds.getServerNames())
+ " portNumbers=" + Arrays.toString(ds.getPortNumbers())
+ + " databaseName=" + ds.getDatabaseName()
+ " user=" + ds.getUser()
+ " password=" + ds.getPassword());
return ds;
}
-
- @Override
- public boolean isRetryable(SQLException sqlException) {
- return sqlException.getSQLState().equals("40001"); // sql state code for transaction conflict
- }
}
diff --git a/driver-cockroachdb/src/main/resources/cockroachdb.md b/driver-cockroachdb/src/main/resources/cockroachdb.md
index 69eaba8ba..ca239328e 100644
--- a/driver-cockroachdb/src/main/resources/cockroachdb.md
+++ b/driver-cockroachdb/src/main/resources/cockroachdb.md
@@ -10,11 +10,17 @@ the
[DataSource Configuration Properties](https://jdbc.postgresql.org/documentation/81/ds-ds.html)
section for detailed parameter documentation.
-* **serverName** (required) - database hostname
-* **portNumber** (optional) - database listen port; defaults to 26257.
-* **user** (optional) - database account username; defaults to empty.
-* **password** (optional) - database account password; defaults to empty.
-* **connectionpool** (optional) - connection pool implementation; defaults
- to no connection pool. Allowed values:
+* **serverName** (required) - database hostname.
+* **databaseName** (optional) - database namespace to use; Default *null*.
+* **portNumber** (optional) - database listen port; Default *26257*.
+* **user** (optional) - database account username; Default *null*.
+* **password** (optional) - database account password; Default *null*.
+* **connectionpool** (optional) - connection pool implementation; Default
+ no connection pool, in other words create a connection per statement execution.
+ Allowed values:
* *hikari* -
use [HikariCP](https://github.com/brettwooldridge/HikariCP)
+* **maxtries** (optional) - number of times to retry retry-able errors; Default *3*.
+* **errors** (optional) - expression which specifies how to handle SQL state error codes.
+ Expression syntax and behavior is explained in the `error-handlers` topic. Default
+ *stop*, in other words exit on any error.
diff --git a/driver-cockroachdb/src/test/java/io/nosqlbench/activitytype/cockroachdb/CockroachActivityTest.java b/driver-cockroachdb/src/test/java/io/nosqlbench/activitytype/cockroachdb/CockroachActivityTest.java
new file mode 100644
index 000000000..cefe4a6dc
--- /dev/null
+++ b/driver-cockroachdb/src/test/java/io/nosqlbench/activitytype/cockroachdb/CockroachActivityTest.java
@@ -0,0 +1,33 @@
+package io.nosqlbench.activitytype.cockroachdb;
+
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.engine.api.activityimpl.ParameterMap;
+import org.junit.Test;
+import org.postgresql.util.PSQLException;
+import org.postgresql.util.PSQLState;
+
+import java.net.SocketTimeoutException;
+import java.sql.SQLException;
+
+import static org.junit.Assert.*;
+
+public class CockroachActivityTest {
+ @Test
+ public void testErrorNameMapper() {
+ ActivityDef activityDef = new ActivityDef(ParameterMap.parseParams("").orElseThrow());
+ CockroachActivity activity = new CockroachActivity(activityDef);
+
+ // When the Throwable is a SQLException, the error name should be getSQLState()
+ Throwable sqlException = new SQLException("my test exception", "my-test-sql-state");
+ assertEquals("my-test-sql-state", activity.errorNameMapper(sqlException));
+
+ // See PSQLState to string code mapping at the Github source code website
+ // https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/util/PSQLState.java
+ Throwable psqlException = new PSQLException("retry transaction", PSQLState.CONNECTION_FAILURE);
+ assertEquals("08006", activity.errorNameMapper(psqlException));
+
+ // When Throwable is not a SQLException, the error name should be the class name
+ Throwable runtimeException = new SocketTimeoutException("my test runtime exception");
+ assertEquals("SocketTimeoutException", activity.errorNameMapper(runtimeException));
+ }
+}
diff --git a/driver-cql-shaded/pom.xml b/driver-cql-shaded/pom.xml
index f478754c2..39b2438ab 100644
--- a/driver-cql-shaded/pom.xml
+++ b/driver-cql-shaded/pom.xml
@@ -4,7 +4,7 @@
io.nosqlbench
mvn-defaults
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
@@ -206,6 +206,7 @@
io.nosqlbench.engine.cli.NBCLI
+
diff --git a/driver-cqld3-shaded/pom.xml b/driver-cqld3-shaded/pom.xml
index f48c1b325..031043d06 100644
--- a/driver-cqld3-shaded/pom.xml
+++ b/driver-cqld3-shaded/pom.xml
@@ -4,7 +4,7 @@
io.nosqlbench
mvn-defaults
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -24,13 +24,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
@@ -184,10 +184,22 @@
com.google.common
com.datastax.internal.com_google_common
-
-
-
-
+
+ com.datastax
+ com.datastax.cql3.shaded
+
+
+ io.nosqlbench.activitytype.cql
+ io.nosqlbench.activitytype.cql3.shaded
+
+
+ io.nosqlbench.endpoints.cql
+ io.nosqlbench.endpoints.cql3.shaded
+
+
+ io.nosqlbench.generators.cql
+ io.nosqlbench.generators.cql3.shaded
+
@@ -202,6 +214,7 @@
io.nosqlbench.engine.cli.NBCLI
+
diff --git a/driver-cqlverify/pom.xml b/driver-cqlverify/pom.xml
index f7ce54efe..1124c1302 100644
--- a/driver-cqlverify/pom.xml
+++ b/driver-cqlverify/pom.xml
@@ -4,7 +4,7 @@
io.nosqlbench
mvn-defaults
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -24,13 +24,13 @@
io.nosqlbench
driver-cql-shaded
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-diag/pom.xml b/driver-diag/pom.xml
index 915ee6257..9f62495ed 100644
--- a/driver-diag/pom.xml
+++ b/driver-diag/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -20,13 +20,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-dsegraph-shaded/pom.xml b/driver-dsegraph-shaded/pom.xml
index 8c9783dcf..7ad70f41e 100644
--- a/driver-dsegraph-shaded/pom.xml
+++ b/driver-dsegraph-shaded/pom.xml
@@ -4,7 +4,7 @@
io.nosqlbench
mvn-defaults
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-http/pom.xml b/driver-http/pom.xml
index cbf0155c1..a83d45745 100644
--- a/driver-http/pom.xml
+++ b/driver-http/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-jdbc/pom.xml b/driver-jdbc/pom.xml
index 0c4452a9f..a430b47f0 100644
--- a/driver-jdbc/pom.xml
+++ b/driver-jdbc/pom.xml
@@ -3,7 +3,7 @@
nosqlbench
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
4.0.0
@@ -18,7 +18,7 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
compile
diff --git a/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/api/JDBCActivity.java b/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/api/JDBCActivity.java
index baa42302e..1b5fd3600 100644
--- a/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/api/JDBCActivity.java
+++ b/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/api/JDBCActivity.java
@@ -17,7 +17,7 @@ import org.apache.logging.log4j.Logger;
import javax.sql.DataSource;
import java.sql.SQLException;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
// This should not be exposed as as service directly unless it can
// be used with a modular JDBC configuration.
@@ -27,8 +27,7 @@ public abstract class JDBCActivity extends SimpleActivity {
private Timer resultTimer;
private Timer resultSuccessTimer;
private Histogram triesHisto;
- private ExceptionCountMetrics exceptionCount;
- private SQLExceptionCountMetrics sqlExceptionCount;
+ private int maxTries;
protected DataSource dataSource;
protected OpSequence> opSequence;
@@ -47,6 +46,8 @@ public abstract class JDBCActivity extends SimpleActivity {
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
+ this.maxTries = getParams().getOptionalInteger("maxtries").orElse(3);
+
LOGGER.debug("initializing data source");
dataSource = newDataSource();
@@ -70,8 +71,6 @@ public abstract class JDBCActivity extends SimpleActivity {
resultTimer = ActivityMetrics.timer(getActivityDef(), "result");
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success");
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries");
- exceptionCount = new ExceptionCountMetrics(getActivityDef());
- sqlExceptionCount = new SQLExceptionCountMetrics(getActivityDef());
opSequence = createOpSequence(ReadyJDBCOp::new);
setDefaultsFromOpSequence(opSequence);
@@ -79,12 +78,20 @@ public abstract class JDBCActivity extends SimpleActivity {
onActivityDefUpdate(getActivityDef());
}
- public int getMaxTries() {
- return 3;
+ public String errorNameMapper(Throwable e) {
+ if (e instanceof SQLException) {
+ return ((SQLException) e).getSQLState();
+ }
+ return e.getClass().getSimpleName();
}
- public boolean isRetryable(SQLException sqlException) {
- return true;
+ @Override
+ public Function getErrorNameMapper() {
+ return this::errorNameMapper;
+ }
+
+ public int getMaxTries() {
+ return this.maxTries;
}
public DataSource getDataSource() {
@@ -110,29 +117,4 @@ public abstract class JDBCActivity extends SimpleActivity {
public Histogram getTriesHisto() {
return triesHisto;
}
-
- public ExceptionCountMetrics getExceptionCount() {
- return exceptionCount;
- }
-
- public SQLExceptionCountMetrics getSQLExceptionCount() {
- return sqlExceptionCount;
- }
-
- public static class SQLExceptionCountMetrics {
- private final ConcurrentHashMap counters = new ConcurrentHashMap<>();
- private final ActivityDef activityDef;
-
- private SQLExceptionCountMetrics(ActivityDef activityDef) {
- this.activityDef = activityDef;
- }
-
- public void inc(SQLException e) {
- Counter c = counters.computeIfAbsent(
- e.getErrorCode(),
- k -> ActivityMetrics.counter(activityDef, "errorcodecounts." + k)
- );
- c.inc();
- }
- }
}
diff --git a/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/impl/JDBCAction.java b/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/impl/JDBCAction.java
index e29889dab..bf287ff6c 100644
--- a/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/impl/JDBCAction.java
+++ b/driver-jdbc/src/main/java/io/nosqlbench/activitytype/jdbc/impl/JDBCAction.java
@@ -3,13 +3,13 @@ package io.nosqlbench.activitytype.jdbc.impl;
import com.codahale.metrics.Timer;
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
+import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
-import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
@@ -40,59 +40,37 @@ public class JDBCAction implements SyncAction {
}
int maxTries = activity.getMaxTries();
- int errorCode = 0;
for (int tries = 1; tries <= maxTries; tries++) {
- errorCode = execute(boundStmt, tries);
- if (errorCode == 0) return 0;
- }
+ Exception error = null;
+ long startTimeNanos = System.nanoTime();
- LOGGER.debug("Max tries " + maxTries + " exceeded for executing statement " + boundStmt);
- return errorCode;
- }
+ try (Connection conn = activity.getDataSource().getConnection()) {
+ Statement jdbcStmt = conn.createStatement();
+ jdbcStmt.execute(boundStmt);
- private int execute(String sql, int tries) {
- long startTimeNano = System.nanoTime();
- Long resultTime = null;
-
- try (Connection conn = activity.getDataSource().getConnection()) {
- Statement jdbcStmt = conn.createStatement();
- jdbcStmt.execute(sql);
-
- resultTime = System.nanoTime() - startTimeNano;
- activity.getResultSuccessTimer().update(resultTime, TimeUnit.NANOSECONDS);
-
- } catch (Exception e) {
- LOGGER.debug("Try " + tries + ": failed to execute statement: " + sql, e);
-
- activity.getExceptionCount().count(e.getClass().getSimpleName());
-
- if (e instanceof SQLException) {
- SQLException sqle = (SQLException) e;
-
- activity.getSQLExceptionCount().inc(sqle);
-
- // TODO non-retryable exception should return its non-zero error code to runCycle() caller
- if (!activity.isRetryable(sqle)) {
- return 0;
- }
-
- return sqle.getErrorCode();
+ } catch (Exception e) {
+ error = e;
}
- return 1;
+ long executionTimeNanos = System.nanoTime() - startTimeNanos;
- } finally {
- if (resultTime == null) {
- resultTime = System.nanoTime() - startTimeNano;
- }
-
- activity.getResultTimer().update(resultTime, TimeUnit.NANOSECONDS);
+ activity.getResultTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
activity.getTriesHisto().update(tries);
+
+ if (error == null) {
+ activity.getResultSuccessTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
+ return 0;
+ } else {
+ ErrorDetail detail = activity.getErrorHandler().handleError(error, cycle, executionTimeNanos);
+ if (!detail.isRetryable()) {
+ LOGGER.debug("Exit failure after non-retryable error");
+ return 1;
+ }
+ }
}
- LOGGER.trace("Try " + tries + ": successfully executed statement: " + sql);
- return 0;
+ LOGGER.debug("Exit failure after maxretries=" + maxTries);
+ return 1;
}
-
}
diff --git a/driver-jms/pom.xml b/driver-jms/pom.xml
new file mode 100644
index 000000000..e1f0e6739
--- /dev/null
+++ b/driver-jms/pom.xml
@@ -0,0 +1,90 @@
+
+ 4.0.0
+
+
+ mvn-defaults
+ io.nosqlbench
+ 4.15.47-SNAPSHOT
+ ../mvn-defaults
+
+
+ driver-jms
+ jar
+ ${project.artifactId}
+
+
+ A JMS driver for nosqlbench. This provides the ability to inject synthetic data
+ into a pulsar system via JMS 2.0 compatibile APIs.
+
+ NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster
+ as the potential JMS Destination
+
+
+
+
+
+ datastax-releases-local
+ DataStax Local Releases
+ https://repo.sjc.dsinternal.org/artifactory/datastax-snapshots-local/
+
+ false
+
+
+ true
+
+
+
+
+
+
+
+ io.nosqlbench
+ engine-api
+ 4.15.47-SNAPSHOT
+
+
+
+ io.nosqlbench
+ driver-stdout
+ 4.15.47-SNAPSHOT
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.20
+ provided
+
+
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.4
+
+
+
+
+ org.apache.commons
+ commons-configuration2
+ 2.7
+
+
+
+
+ com.datastax.oss
+ pulsar-jms
+ 1.0.0
+
+
+
+
+
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java
new file mode 100644
index 000000000..4c3273627
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java
@@ -0,0 +1,68 @@
+package io.nosqlbench.driver.jms;
+
+import com.codahale.metrics.Timer;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.engine.api.activityapi.core.SyncAction;
+import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.LongFunction;
+
+public class JmsAction implements SyncAction {
+
+ private final static Logger logger = LogManager.getLogger(JmsAction.class);
+
+ private final JmsActivity activity;
+ private final int slot;
+
+ int maxTries;
+
+ public JmsAction(JmsActivity activity, int slot) {
+ this.activity = activity;
+ this.slot = slot;
+ this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
+ }
+
+ @Override
+ public void init() { }
+
+ @Override
+ public int runCycle(long cycle) {
+ // let's fail the action if some async operation failed
+ activity.failOnAsyncOperationFailure();
+
+ long start = System.nanoTime();
+
+ JmsOp jmsOp;
+ try (Timer.Context ctx = activity.getBindTimer().time()) {
+ LongFunction readyJmsOp = activity.getSequencer().get(cycle);
+ jmsOp = readyJmsOp.apply(cycle);
+ } catch (Exception bindException) {
+ // if diagnostic mode ...
+ activity.getErrorhandler().handleError(bindException, cycle, 0);
+ throw new RuntimeException(
+ "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
+ );
+ }
+
+ for (int i = 0; i < maxTries; i++) {
+ Timer.Context ctx = activity.getExecuteTimer().time();
+ try {
+ // it is up to the jmsOp to call Context#close when the activity is executed
+ // this allows us to track time for async operations
+ jmsOp.run(ctx::close);
+ break;
+ } catch (RuntimeException err) {
+ ErrorDetail errorDetail = activity
+ .getErrorhandler()
+ .handleError(err, cycle, System.nanoTime() - start);
+ if (!errorDetail.isRetryable()) {
+ break;
+ }
+ }
+ }
+
+ return 0;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java
new file mode 100644
index 000000000..396982bbf
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java
@@ -0,0 +1,165 @@
+package io.nosqlbench.driver.jms;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Timer;
+import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
+import io.nosqlbench.driver.jms.conn.JmsConnInfo;
+import io.nosqlbench.driver.jms.conn.JmsPulsarConnInfo;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.driver.jms.util.JmsUtil;
+import io.nosqlbench.driver.jms.util.PulsarConfig;
+import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
+import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.engine.api.activityimpl.OpDispenser;
+import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
+import io.nosqlbench.engine.api.metrics.ActivityMetrics;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.jms.Destination;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class JmsActivity extends SimpleActivity {
+
+ private final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>();
+
+ private String jmsProviderType;
+ private JmsConnInfo jmsConnInfo;
+
+ private JMSContext jmsContext;
+
+ private OpSequence> sequence;
+ private volatile Throwable asyncOperationFailure;
+ private NBErrorHandler errorhandler;
+
+ private Timer bindTimer;
+ private Timer executeTimer;
+ private Counter bytesCounter;
+ private Histogram messagesizeHistogram;
+
+ public JmsActivity(ActivityDef activityDef) {
+ super(activityDef);
+ }
+
+ @Override
+ public void initActivity() {
+ super.initActivity();
+
+ // default JMS type: Pulsar
+ // - currently this is the only supported JMS provider
+ jmsProviderType =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PROVIDER_TYPE_KEY_STR)
+ .orElse(JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label);
+
+ // "Pulsar" as the JMS provider
+ if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
+
+ String webSvcUrl =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR)
+ .orElse("http://localhost:8080");
+ String pulsarSvcUrl =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR)
+ .orElse("pulsar://localhost:6650");
+
+ if (StringUtils.isAnyBlank(webSvcUrl, pulsarSvcUrl)) {
+ throw new RuntimeException("For \"" + JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label + "\" type, " +
+ "\"" + JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR + "\" and " +
+ "\"" + JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR + "\" parameters are manadatory!");
+ }
+
+ // Check if extra Pulsar config. file is in place
+ // - default file: "pulsar_config.properties" under the current directory
+ String pulsarCfgFile =
+ activityDef.getParams()
+ .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR)
+ .orElse(JmsUtil.JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME);
+
+ PulsarConfig pulsarConfig = new PulsarConfig(pulsarCfgFile);
+
+ jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, webSvcUrl, pulsarSvcUrl, pulsarConfig);
+ }
+ else {
+ throw new RuntimeException("Unsupported JMS driver type : " + jmsProviderType);
+ }
+
+ PulsarConnectionFactory factory;
+ try {
+ factory = new PulsarConnectionFactory(jmsConnInfo.getJmsConnConfig());
+ this.jmsContext = factory.createContext();
+ } catch (JMSException e) {
+ throw new RuntimeException(
+ "Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!");
+ }
+
+ bindTimer = ActivityMetrics.timer(activityDef, "bind");
+ executeTimer = ActivityMetrics.timer(activityDef, "execute");
+ bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
+ messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
+
+ if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
+ this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this));
+ }
+
+ setDefaultsFromOpSequence(sequence);
+ onActivityDefUpdate(activityDef);
+
+ this.errorhandler = new NBErrorHandler(
+ () -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
+ this::getExceptionMetrics
+ );
+ }
+
+ /**
+ * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
+ */
+ public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) {
+ String encodedTopicStr =
+ JmsUtil.encode(jmsDestinationType, destName);
+ Destination destination = jmsDestinations.get(encodedTopicStr);
+
+ if ( destination == null ) {
+ // TODO: should we match Persistent/Non-peristent JMS Delivery mode with
+ // Pulsar Persistent/Non-prsistent topic?
+ if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.QUEUE.label)) {
+ destination = jmsContext.createQueue(destName);
+ } else if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.TOPIC.label)) {
+ destination = jmsContext.createTopic(destName);
+ }
+
+ jmsDestinations.put(encodedTopicStr, destination);
+ }
+
+ return destination;
+ }
+
+ @Override
+ public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
+ public OpSequence> getSequencer() { return sequence; }
+
+ public String getJmsProviderType() { return jmsProviderType; }
+ public JmsConnInfo getJmsConnInfo() { return jmsConnInfo; }
+ public JMSContext getJmsContext() { return jmsContext; }
+
+ public Timer getBindTimer() { return bindTimer; }
+ public Timer getExecuteTimer() { return this.executeTimer; }
+ public Counter getBytesCounter() { return bytesCounter; }
+ public Histogram getMessagesizeHistogram() { return messagesizeHistogram; }
+
+ public NBErrorHandler getErrorhandler() { return errorhandler; }
+
+ public void failOnAsyncOperationFailure() {
+ if (asyncOperationFailure != null) {
+ throw new RuntimeException(asyncOperationFailure);
+ }
+ }
+ public void asyncOperationFailed(Throwable ex) {
+ this.asyncOperationFailure = ex;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java
new file mode 100644
index 000000000..0a49a8717
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java
@@ -0,0 +1,32 @@
+package io.nosqlbench.driver.jms;
+
+import io.nosqlbench.engine.api.activityapi.core.Action;
+import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
+import io.nosqlbench.engine.api.activityapi.core.ActivityType;
+import io.nosqlbench.engine.api.activityimpl.ActivityDef;
+import io.nosqlbench.nb.annotations.Service;
+
+@Service(value = ActivityType.class, selector = "jms")
+public class JmsActivityType implements ActivityType {
+ @Override
+ public ActionDispenser getActionDispenser(JmsActivity activity) {
+ return new PulsarJmsActionDispenser(activity);
+ }
+
+ @Override
+ public JmsActivity getActivity(ActivityDef activityDef) {
+ return new JmsActivity(activityDef);
+ }
+
+ private static class PulsarJmsActionDispenser implements ActionDispenser {
+ private final JmsActivity activity;
+ public PulsarJmsActionDispenser(JmsActivity activity) {
+ this.activity = activity;
+ }
+
+ @Override
+ public Action getAction(int slot) {
+ return new JmsAction(activity, slot);
+ }
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java
new file mode 100644
index 000000000..70be982c8
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java
@@ -0,0 +1,74 @@
+package io.nosqlbench.driver.jms;
+
+import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.driver.jms.util.JmsHeader;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+import io.nosqlbench.driver.jms.util.JmsUtil;
+import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
+import io.nosqlbench.engine.api.activityimpl.OpDispenser;
+import io.nosqlbench.engine.api.templating.CommandTemplate;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.LongFunction;
+import java.util.stream.Collectors;
+
+abstract public class ReadyJmsOp implements OpDispenser {
+
+ protected final OpTemplate opTpl;
+ protected final CommandTemplate cmdTpl;
+ protected final JmsActivity jmsActivity;
+
+ protected final String stmtOpType;
+ protected LongFunction asyncApiFunc;
+ protected LongFunction jmsDestinationTypeFunc;
+
+ protected final LongFunction opFunc;
+
+ public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
+ this.opTpl = opTemplate;
+ this.cmdTpl = new CommandTemplate(opTpl);
+ this.jmsActivity = jmsActivity;
+
+ if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
+ throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
+ }
+ this.stmtOpType = cmdTpl.getStatic("optype");
+
+ // Global/Doc-level parameter: async_api
+ if (cmdTpl.containsKey(JmsUtil.ASYNC_API_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.ASYNC_API_KEY_STR)) {
+ boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.ASYNC_API_KEY_STR));
+ this.asyncApiFunc = (l) -> value;
+ } else {
+ throw new RuntimeException("\"" + JmsUtil.ASYNC_API_KEY_STR + "\" parameter cannot be dynamic!");
+ }
+ }
+
+ // Global/Doc-level parameter: jms_desitation_type
+ // - queue: point-to-point
+ // - topic: pub/sub
+ if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
+ jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR);
+ } else {
+ throw new RuntimeException("\"" + JmsUtil.JMS_DESTINATION_TYPE_KEY_STR + "\" parameter cannot be dynamic!");
+ }
+ }
+
+ this.opFunc = resolveJms();
+ }
+
+ public JmsOp apply(long value) { return opFunc.apply(value); }
+
+ abstract LongFunction resolveJms();
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java
new file mode 100644
index 000000000..c6e8f431d
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java
@@ -0,0 +1,248 @@
+package io.nosqlbench.driver.jms;
+
+import io.nosqlbench.driver.jms.ops.JmsMsgReadMapper;
+import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
+import io.nosqlbench.driver.jms.ops.JmsOp;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+import io.nosqlbench.driver.jms.util.JmsUtil;
+import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.LongFunction;
+import java.util.stream.Collectors;
+
+public class ReadyPulsarJmsOp extends ReadyJmsOp {
+
+ public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
+ super(opTemplate, jmsActivity);
+ }
+
+ public LongFunction resolveJms() {
+ // Global/Doc-level parameter: topic_uri
+ LongFunction topicUriFunc = (l) -> null;
+ if (cmdTpl.containsKey(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
+ topicUriFunc = (l) -> cmdTpl.getStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR);
+ } else {
+ topicUriFunc = (l) -> cmdTpl.getDynamic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR, l);
+ }
+ }
+
+ // Global: JMS destination
+ LongFunction jmsDestinationFunc;
+ try {
+ LongFunction finalTopicUriFunc = topicUriFunc;
+ jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(
+ jmsDestinationTypeFunc.apply(l),
+ finalTopicUriFunc.apply(l));
+ }
+ catch (JMSRuntimeException ex) {
+ throw new RuntimeException("Unable to create JMS destination!");
+ }
+
+ if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) {
+ return resolveMsgSend(asyncApiFunc, jmsDestinationFunc);
+ } else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) {
+ return resolveMsgRead(asyncApiFunc, jmsDestinationFunc);
+ } else {
+ throw new RuntimeException("Unsupported JMS operation type");
+ }
+ }
+
+ private LongFunction resolveMsgSend(
+ LongFunction async_api_func,
+ LongFunction jmsDestinationFunc
+ ) {
+ JmsHeaderLongFunc jmsHeaderLongFunc = new JmsHeaderLongFunc();
+
+ // JMS header: delivery mode
+ LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
+ msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label));
+ }
+ else {
+ msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc);
+
+ // JMS header: message priority
+ LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
+ msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label));
+ }
+ else {
+ msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc);
+
+ // JMS header: message TTL
+ LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
+ msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label));
+ }
+ else {
+ msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc);
+
+ // JMS header: message delivery delay
+ LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
+ msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label));
+ }
+ else {
+ msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc);
+
+ // JMS header: disable message timestamp
+ LongFunction disableMsgTimestampFunc = (l) -> false;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
+ disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label));
+ }
+ else {
+ disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc);
+
+ // JMS header: disable message ID
+ LongFunction disableMsgIdFunc = (l) -> false;
+ if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
+ disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label));
+ }
+ else {
+ disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l));
+ }
+ }
+ jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc);
+
+ // JMS message properties
+ String jmsMsgPropertyListStr = "";
+ if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
+ jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR);
+ } else {
+ throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!");
+ }
+ }
+
+ Map jmsMsgProperties = new HashMap<>();
+ if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) {
+ jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";"))
+ .map(s -> s.split("=", 2))
+ .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : ""));
+ }
+
+ LongFunction msgBodyFunc;
+ if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
+ msgBodyFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR);
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
+ msgBodyFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR, l);
+ } else {
+ msgBodyFunc = (l) -> null;
+ }
+ } else {
+ throw new RuntimeException("JMS message send:: \"msg_body\" field must be specified!");
+ }
+
+ return new JmsMsgSendMapper(
+ jmsActivity,
+ async_api_func,
+ jmsDestinationFunc,
+ jmsHeaderLongFunc,
+ jmsMsgProperties,
+ msgBodyFunc);
+ }
+
+ private LongFunction resolveMsgRead(
+ LongFunction async_api_func,
+ LongFunction jmsDestinationFunc
+ ) {
+ // For Pulsar JMS, make "durable" as the default
+ LongFunction jmsConsumerDurableFunc = (l) -> true;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
+ jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
+ jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR, l));
+ }
+ }
+
+ LongFunction jmsConsumerSharedFunc = (l) -> true;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
+ jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
+ jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR, l));
+ }
+ }
+
+ LongFunction jmsMsgSubscriptionFunc = (l) -> "";
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
+ jmsMsgSubscriptionFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR);
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
+ jmsMsgSubscriptionFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR, l);
+ }
+ }
+
+ LongFunction jmsMsgReadSelectorFunc = (l) -> "";
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
+ jmsMsgReadSelectorFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR);
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
+ jmsMsgReadSelectorFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR, l);
+ }
+ }
+
+ LongFunction jmsMsgNoLocalFunc = (l) -> true;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
+ jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
+ jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR, l));
+ }
+ }
+
+ LongFunction jmsReadTimeoutFunc = (l) -> 0L;
+ if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
+ if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
+ jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR));
+ } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
+ jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR, l));
+ }
+ }
+
+ return new JmsMsgReadMapper(
+ jmsActivity,
+ async_api_func,
+ jmsDestinationFunc,
+ jmsConsumerDurableFunc,
+ jmsConsumerSharedFunc,
+ jmsMsgSubscriptionFunc,
+ jmsMsgReadSelectorFunc,
+ jmsMsgNoLocalFunc,
+ jmsReadTimeoutFunc);
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java
new file mode 100644
index 000000000..e52408106
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java
@@ -0,0 +1,21 @@
+package io.nosqlbench.driver.jms.conn;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JmsConnInfo {
+
+ protected final String jmsProviderType;
+ protected final Map jmsConnConfig;
+
+ protected JmsConnInfo(String jmsProviderType) {
+ this.jmsProviderType = jmsProviderType;
+ this.jmsConnConfig = new HashMap<>();
+ }
+
+ public Map getJmsConnConfig() { return this.jmsConnConfig; }
+ public void resetJmsConnConfig() { this.jmsConnConfig.clear(); }
+ public void addJmsConnConfigItems(Map cfgItems) { this.jmsConnConfig.putAll(cfgItems); }
+ public void addJmsConnConfigItem(String key, Object value) { this.jmsConnConfig.put(key, value); }
+ public void removeJmsConnConfigItem(String key) { this.jmsConnConfig.remove(key); }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java
new file mode 100644
index 000000000..4d623527b
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java
@@ -0,0 +1,42 @@
+package io.nosqlbench.driver.jms.conn;
+
+import io.nosqlbench.driver.jms.util.PulsarConfig;
+
+import java.util.Map;
+
+public class JmsPulsarConnInfo extends JmsConnInfo {
+
+ private final String webSvcUrl;
+ private final String pulsarSvcUrl;
+ private final PulsarConfig extraPulsarConfig;
+
+ public JmsPulsarConnInfo(String jmsProviderType, String webSvcUrl, String pulsarSvcUrl, PulsarConfig pulsarConfig) {
+ super(jmsProviderType);
+
+ this.webSvcUrl = webSvcUrl;
+ this.pulsarSvcUrl = pulsarSvcUrl;
+ this.extraPulsarConfig = pulsarConfig;
+
+ this.addJmsConnConfigItem("webServiceUrl", this.webSvcUrl);
+ this.addJmsConnConfigItem("brokerServiceUrl", this.pulsarSvcUrl);
+
+ Map clientCfgMap = this.extraPulsarConfig.getClientConfMap();
+ if (!clientCfgMap.isEmpty()) {
+ this.addJmsConnConfigItems(clientCfgMap);
+ }
+
+ Map producerCfgMap = this.extraPulsarConfig.getProducerConfMap();
+ if (!producerCfgMap.isEmpty()) {
+ this.addJmsConnConfigItem("producerConfig", producerCfgMap);
+ }
+
+ Map consumerCfgMap = this.extraPulsarConfig.getConsumerConfMap();
+ if (!consumerCfgMap.isEmpty()) {
+ this.addJmsConnConfigItem("consumerConfig", consumerCfgMap);
+ }
+ }
+
+ public String getWebSvcUrl() { return this.webSvcUrl; }
+ public String getPulsarSvcUrl() { return this.pulsarSvcUrl; }
+ public PulsarConfig getExtraPulsarConfig() { return this.extraPulsarConfig; }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java
new file mode 100644
index 000000000..91a5ca5ba
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java
@@ -0,0 +1,72 @@
+package io.nosqlbench.driver.jms.ops;
+
+import io.nosqlbench.driver.jms.JmsActivity;
+
+import javax.jms.Destination;
+import java.util.function.LongFunction;
+
+/**
+ * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
+ * enough state to define a pulsar operation such that it can be executed, measured, and possibly
+ * retried if needed.
+ *
+ * This function doesn't act *as* the operation. It merely maps the construction logic into
+ * a simple functional type, given the component functions.
+ *
+ * For additional parameterization, the command template is also provided.
+ */
+public class JmsMsgReadMapper extends JmsOpMapper {
+
+ private final LongFunction jmsConsumerDurableFunc;
+ private final LongFunction jmsConsumerSharedFunc;
+ private final LongFunction jmsMsgSubscriptionFunc;
+ private final LongFunction jmsMsgReadSelectorFunc;
+ private final LongFunction jmsMsgNoLocalFunc;
+ private final LongFunction jmsReadTimeoutFunc;
+
+ public JmsMsgReadMapper(JmsActivity jmsActivity,
+ LongFunction asyncApiFunc,
+ LongFunction jmsDestinationFunc,
+ LongFunction jmsConsumerDurableFunc,
+ LongFunction jmsConsumerSharedFunc,
+ LongFunction jmsMsgSubscriptionFunc,
+ LongFunction jmsMsgReadSelectorFunc,
+ LongFunction jmsMsgNoLocalFunc,
+ LongFunction jmsReadTimeoutFunc) {
+ super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
+
+ this.jmsConsumerDurableFunc = jmsConsumerDurableFunc;
+ this.jmsConsumerSharedFunc = jmsConsumerSharedFunc;
+ this.jmsMsgSubscriptionFunc = jmsMsgSubscriptionFunc;
+ this.jmsMsgReadSelectorFunc = jmsMsgReadSelectorFunc;
+ this.jmsMsgNoLocalFunc = jmsMsgNoLocalFunc;
+ this.jmsReadTimeoutFunc = jmsReadTimeoutFunc;
+ }
+
+ @Override
+ public JmsOp apply(long value) {
+ boolean asyncApi = asyncApiFunc.apply(value);
+ Destination jmsDestination = jmsDestinationFunc.apply(value);
+ boolean jmsConsumerDurable = jmsConsumerDurableFunc.apply(value);
+ boolean jmsConsumerShared = jmsConsumerSharedFunc.apply(value);
+ String jmsMsgSubscription = jmsMsgSubscriptionFunc.apply(value);
+ String jmsMsgReadSelector = jmsMsgReadSelectorFunc.apply(value);
+ boolean jmsMsgNoLocal = jmsMsgNoLocalFunc.apply(value);
+ long jmsReadTimeout = jmsReadTimeoutFunc.apply(value);
+
+ // Default to NO read timeout
+ if (jmsReadTimeout < 0) jmsReadTimeout = 0;
+
+ return new JmsMsgReadOp(
+ jmsActivity,
+ asyncApi,
+ jmsDestination,
+ jmsConsumerDurable,
+ jmsConsumerShared,
+ jmsMsgSubscription,
+ jmsMsgReadSelector,
+ jmsMsgNoLocal,
+ jmsReadTimeout
+ );
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java
new file mode 100644
index 000000000..e83ff826c
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java
@@ -0,0 +1,114 @@
+package io.nosqlbench.driver.jms.ops;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import io.nosqlbench.driver.jms.JmsActivity;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import javax.jms.*;
+
+public class JmsMsgReadOp extends JmsTimeTrackOp {
+
+ private final static Logger logger = LogManager.getLogger(JmsMsgReadOp.class);
+
+ private final JmsActivity jmsActivity;
+ private final boolean asyncJmsOp;
+ private final Destination jmsDestination;
+
+ private final JMSContext jmsContext;
+ private final JMSConsumer jmsConsumer;
+ private final boolean jmsConsumerDurable;
+ private final boolean jmsConsumerShared;
+ private final String jmsMsgSubscrption;
+ private final String jmsMsgReadSelector;
+ private final boolean jmsMsgNoLocal;
+ private final long jmsReadTimeout;
+
+ private final Counter bytesCounter;
+ private final Histogram messagesizeHistogram;
+
+ public JmsMsgReadOp(JmsActivity jmsActivity,
+ boolean asyncJmsOp,
+ Destination jmsDestination,
+ boolean jmsConsumerDurable,
+ boolean jmsConsumerShared,
+ String jmsMsgSubscrption,
+ String jmsMsgReadSelector,
+ boolean jmsMsgNoLocal,
+ long jmsReadTimeout) {
+ this.jmsActivity = jmsActivity;
+ this.asyncJmsOp = asyncJmsOp;
+ this.jmsDestination = jmsDestination;
+ this.jmsConsumerDurable = jmsConsumerDurable;
+ this.jmsConsumerShared = jmsConsumerShared;
+ this.jmsMsgReadSelector = jmsMsgReadSelector;
+ this.jmsMsgSubscrption = jmsMsgSubscrption;
+ this.jmsMsgNoLocal = jmsMsgNoLocal;
+ this.jmsReadTimeout = jmsReadTimeout;
+
+ this.jmsContext = jmsActivity.getJmsContext();
+ this.jmsConsumer = createJmsConsumer();
+
+ this.bytesCounter = jmsActivity.getBytesCounter();
+ this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
+ }
+
+ private JMSConsumer createJmsConsumer() {
+ JMSConsumer jmsConsumer;
+
+ try {
+ if (jmsConsumerDurable) {
+ if (jmsConsumerShared)
+ jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
+ else
+ jmsConsumer = jmsContext.createDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector, jmsMsgNoLocal);
+ } else {
+ if (jmsConsumerShared)
+ jmsConsumer = jmsContext.createSharedConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
+ else
+ jmsConsumer = jmsContext.createConsumer(jmsDestination, jmsMsgReadSelector, jmsMsgNoLocal);
+ }
+ }
+ catch (InvalidDestinationRuntimeException invalidDestinationRuntimeException) {
+ throw new RuntimeException("Failed to create JMS consumer: invalid destination!");
+ }
+ catch (InvalidSelectorRuntimeException invalidSelectorRuntimeException) {
+ throw new RuntimeException("Failed to create JMS consumer: invalid message selector!");
+ }
+ catch (JMSRuntimeException jmsRuntimeException) {
+ jmsRuntimeException.printStackTrace();
+ throw new RuntimeException("Failed to create JMS consumer: runtime internal error!");
+ }
+
+ // TODO: async consumer
+// if (this.asyncJmsOp) {
+// jmsConsumer.setMessageListener();
+// }
+
+ return jmsConsumer;
+ }
+
+ @Override
+ public void run() {
+ // FIXME: jmsReadTimeout being 0 behaves like receiveNoWait() instead of waiting indefinitley
+ Message receivedMsg = jmsConsumer.receive(jmsReadTimeout);
+ try {
+ if (receivedMsg != null) {
+ receivedMsg.acknowledge();
+ byte[] receivedMsgBody = receivedMsg.getBody(byte[].class);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("received msg-payload={}", new String(receivedMsgBody));
+ }
+
+ int messagesize = receivedMsgBody.length;
+ bytesCounter.inc(messagesize);
+ messagesizeHistogram.update(messagesize);
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ throw new RuntimeException("Failed to acknowledge the received JMS message.");
+ }
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java
new file mode 100644
index 000000000..1714c4212
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java
@@ -0,0 +1,55 @@
+package io.nosqlbench.driver.jms.ops;
+
+import io.nosqlbench.driver.jms.JmsActivity;
+import io.nosqlbench.driver.jms.util.JmsHeader;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+
+import javax.jms.Destination;
+import java.util.Map;
+import java.util.function.LongFunction;
+
+/**
+ * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
+ * enough state to define a pulsar operation such that it can be executed, measured, and possibly
+ * retried if needed.
+ *
+ * This function doesn't act *as* the operation. It merely maps the construction logic into
+ * a simple functional type, given the component functions.
+ *
+ * For additional parameterization, the command template is also provided.
+ */
+public class JmsMsgSendMapper extends JmsOpMapper {
+ private final JmsHeaderLongFunc jmsHeaderLongFunc;
+ private final Map jmsMsgProperties;
+ private final LongFunction msgBodyFunc;
+
+ public JmsMsgSendMapper(JmsActivity jmsActivity,
+ LongFunction asyncApiFunc,
+ LongFunction jmsDestinationFunc,
+ JmsHeaderLongFunc jmsHeaderLongFunc,
+ Map jmsMsgProperties,
+ LongFunction msgBodyFunc) {
+ super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
+
+ this.jmsHeaderLongFunc = jmsHeaderLongFunc;
+ this.jmsMsgProperties = jmsMsgProperties;
+ this.msgBodyFunc = msgBodyFunc;
+ }
+
+ @Override
+ public JmsOp apply(long value) {
+ boolean asyncApi = asyncApiFunc.apply(value);
+ Destination jmsDestination = jmsDestinationFunc.apply(value);
+ JmsHeader jmsHeader = (JmsHeader)jmsHeaderLongFunc.apply(value);
+ String msgBody = msgBodyFunc.apply(value);
+
+ return new JmsMsgSendOp(
+ jmsActivity,
+ asyncApi,
+ jmsDestination,
+ jmsHeader,
+ jmsMsgProperties,
+ msgBody
+ );
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java
new file mode 100644
index 000000000..f23ac403e
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java
@@ -0,0 +1,124 @@
+package io.nosqlbench.driver.jms.ops;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Histogram;
+import io.nosqlbench.driver.jms.JmsActivity;
+import io.nosqlbench.driver.jms.util.JmsHeader;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import javax.jms.*;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.Map;
+
+public class JmsMsgSendOp extends JmsTimeTrackOp {
+
+ private final static Logger logger = LogManager.getLogger(JmsMsgSendOp.class);
+
+ private final JmsActivity jmsActivity;
+ private final boolean asyncJmsOp;
+ private final Destination jmsDestination;
+ private final JmsHeader jmsHeader;
+ private final Map jmsMsgProperties;
+
+ private final JMSContext jmsContext;
+ private final JMSProducer jmsProducer;
+ private final String msgBody;
+
+ private final Counter bytesCounter;
+ private final Histogram messagesizeHistogram;
+
+ public JmsMsgSendOp(JmsActivity jmsActivity,
+ boolean asyncJmsOp,
+ Destination jmsDestination,
+ JmsHeader jmsHeader,
+ Map jmsMsgProperties,
+ String msgBody) {
+ this.jmsActivity = jmsActivity;
+ this.asyncJmsOp = asyncJmsOp;
+ this.jmsDestination = jmsDestination;
+
+ this.jmsHeader = jmsHeader;
+ this.jmsMsgProperties = jmsMsgProperties;
+ this.msgBody = msgBody;
+
+ if (!jmsHeader.isValidHeader()) {
+ throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText());
+ }
+
+ if ((msgBody == null) || msgBody.isEmpty()) {
+ throw new RuntimeException("JMS message body can't be empty!");
+ }
+
+ this.jmsContext = jmsActivity.getJmsContext();
+ this.jmsProducer = createJmsProducer();
+
+ this.bytesCounter = jmsActivity.getBytesCounter();
+ this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
+ }
+
+ private JMSProducer createJmsProducer() {
+ JMSProducer jmsProducer = this.jmsContext.createProducer();
+
+ jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode());
+ jmsProducer.setPriority(this.jmsHeader.getMsgPriority());
+ jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay());
+ jmsProducer.setDisableMessageTimestamp(this.jmsHeader.isDisableMsgTimestamp());
+ jmsProducer.setDisableMessageID(this.jmsHeader.isDisableMsgId());
+
+ if (this.asyncJmsOp) {
+ jmsProducer.setAsync(new CompletionListener() {
+ @Override
+ public void onCompletion(Message msg) {
+ try {
+ byte[] msgBody = msg.getBody(byte[].class);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Async message send success - message body: " + new String(msgBody));
+ }
+ }
+ catch (JMSException jmsException) {
+ jmsException.printStackTrace();
+ logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
+ }
+ }
+
+ @Override
+ public void onException(Message msg, Exception e) {
+ try {
+ byte[] msgBody = msg.getBody(byte[].class);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Async message send failure - message body: " + new String(msgBody));
+ }
+ }
+ catch (JMSException jmsException) {
+ jmsException.printStackTrace();
+ logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
+ }
+ }
+ });
+ }
+
+ for (Map.Entry entry : jmsMsgProperties.entrySet()) {
+ jmsProducer.setProperty(entry.getKey(), entry.getValue());
+ }
+
+ return jmsProducer;
+ }
+
+ @Override
+ public void run() {
+ int messageSize;
+ try {
+ byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8);
+ messageSize = msgBytes.length;
+ jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8));
+
+ messagesizeHistogram.update(messageSize);
+ bytesCounter.inc(messageSize);
+ }
+ catch (Exception ex) {
+ logger.error("Failed to send JMS message - " + msgBody);
+ }
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java
new file mode 100644
index 000000000..bd06c6bca
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java
@@ -0,0 +1,13 @@
+package io.nosqlbench.driver.jms.ops;
+
+/**
+ * Base type of all Pulsar Operations including Producers and Consumers.
+ */
+public interface JmsOp {
+
+ /**
+ * Execute the operation, invoke the timeTracker when the operation ended.
+ * The timeTracker can be invoked in a separate thread, it is only used for metrics.
+ */
+ void run(Runnable timeTracker);
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java
new file mode 100644
index 000000000..04dac1450
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java
@@ -0,0 +1,23 @@
+package io.nosqlbench.driver.jms.ops;
+
+import io.nosqlbench.driver.jms.JmsActivity;
+import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
+
+import javax.jms.Destination;
+import java.util.Map;
+import java.util.function.LongFunction;
+
+public abstract class JmsOpMapper implements LongFunction {
+ protected final JmsActivity jmsActivity;
+ protected final LongFunction asyncApiFunc;
+ protected final LongFunction jmsDestinationFunc;
+
+ public JmsOpMapper(JmsActivity jmsActivity,
+ LongFunction asyncApiFunc,
+ LongFunction jmsDestinationFunc)
+ {
+ this.jmsActivity = jmsActivity;
+ this.asyncApiFunc = asyncApiFunc;
+ this.jmsDestinationFunc = jmsDestinationFunc;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java
new file mode 100644
index 000000000..58bd578d6
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java
@@ -0,0 +1,17 @@
+package io.nosqlbench.driver.jms.ops;
+
+/**
+ * Base type of all Sync Pulsar Operations including Producers and Consumers.
+ */
+public abstract class JmsTimeTrackOp implements JmsOp {
+
+ public void run(Runnable timeTracker) {
+ try {
+ this.run();
+ } finally {
+ timeTracker.run();
+ }
+ }
+
+ public abstract void run();
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java
new file mode 100644
index 000000000..236eb95ee
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java
@@ -0,0 +1,66 @@
+package io.nosqlbench.driver.jms.util;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.commons.lang.StringUtils;
+
+import javax.jms.DeliveryMode;
+
+@Setter
+@Getter
+@AllArgsConstructor
+@ToString
+public class JmsHeader {
+ private int deliveryMode;
+ private int msgPriority;
+ private long msgTtl;
+ private long msgDeliveryDelay;
+ private boolean disableMsgTimestamp;
+ private boolean disableMsgId;
+
+ public boolean isValidDeliveryMode() {
+ return (deliveryMode == DeliveryMode.NON_PERSISTENT) || (deliveryMode == DeliveryMode.PERSISTENT);
+ }
+
+ public boolean isValidPriority() {
+ return (msgPriority >= 0) && (msgPriority <= 9);
+ }
+
+ public boolean isValidTtl() {
+ return msgTtl >= 0;
+ }
+
+ public boolean isValidDeliveryDelay() {
+ return msgTtl >= 0;
+ }
+
+ public boolean isValidHeader() {
+ return isValidDeliveryMode()
+ && isValidPriority()
+ && isValidTtl()
+ && isValidDeliveryDelay();
+ }
+
+ public String getInvalidJmsHeaderMsgText() {
+ StringBuilder sb = new StringBuilder();
+
+ if (!isValidDeliveryMode())
+ sb.append("delivery mode - " + deliveryMode + "; ");
+ if (!isValidPriority())
+ sb.append("message priority - " + msgPriority + "; ");
+ if (!isValidTtl())
+ sb.append("message TTL - " + msgTtl + "; ");
+ if (!isValidDeliveryDelay())
+ sb.append("message delivery delay - " + msgDeliveryDelay + "; ");
+
+ String invalidMsgText = sb.toString();
+ if (StringUtils.length(invalidMsgText) > 0)
+ invalidMsgText = StringUtils.substringBeforeLast(invalidMsgText, ";");
+ else
+ invalidMsgText = "none";
+
+ return "Invalid JMS header values: " + invalidMsgText;
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java
new file mode 100644
index 000000000..c094a6c81
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java
@@ -0,0 +1,31 @@
+package io.nosqlbench.driver.jms.util;
+
+import lombok.*;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import java.util.function.LongFunction;
+
+@Setter
+@Getter
+@NoArgsConstructor
+public class JmsHeaderLongFunc implements LongFunction {
+ private LongFunction deliveryModeFunc;
+ private LongFunction msgPriorityFunc;
+ private LongFunction msgTtlFunc;
+ private LongFunction msgDeliveryDelayFunc;
+ private LongFunction disableMsgTimestampFunc;
+ private LongFunction disableMsgIdFunc;
+
+ @Override
+ public Object apply(long value) {
+ return new JmsHeader(
+ (deliveryModeFunc != null) ? deliveryModeFunc.apply(value) : DeliveryMode.PERSISTENT,
+ (msgPriorityFunc != null) ? msgPriorityFunc.apply(value) : Message.DEFAULT_PRIORITY,
+ (msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_TIME_TO_LIVE,
+ (msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_DELIVERY_DELAY,
+ (disableMsgTimestampFunc != null) ? disableMsgTimestampFunc.apply(value) : false,
+ (disableMsgIdFunc != null) ? disableMsgIdFunc.apply(value) : false
+ );
+ }
+}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java
new file mode 100644
index 000000000..456939417
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java
@@ -0,0 +1,119 @@
+package io.nosqlbench.driver.jms.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.Base64;
+
+public class JmsUtil {
+
+ private final static Logger logger = LogManager.getLogger(JmsUtil.class);
+
+ // Supported JMS provider type
+ public enum JMS_PROVIDER_TYPES {
+ PULSAR("pulsar");
+
+ public final String label;
+ JMS_PROVIDER_TYPES(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidJmsProviderType(String type) {
+ return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type));
+ }
+
+ /////
+ // NB command line parameters
+ // - JMS provider type
+ public final static String JMS_PROVIDER_TYPE_KEY_STR = "provider_type";
+
+ /// Only applicable when the provider is "Pulsar"
+ // - Pulsar configuration properties file
+ public final static String JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR = "pulsar_cfg_file";
+ public final static String JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME = "pulsar_config.properties";
+ // - Pulsar web url
+ public final static String JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR = "web_url";
+ // - Pulsar service url
+ public final static String JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR = "service_url";
+
+
+ public final static String ASYNC_API_KEY_STR = "async_api";
+ public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type";
+
+ ///// JMS Producer
+ // Supported JMS provider type
+ public enum JMS_MSG_HEADER_KEYS {
+ DELIVERY_MODE("jms_producer_header_msg_delivery_mode"),
+ PRIORITY("jms_producer_header_msg_priority"),
+ TTL("jms_producer_header_msg_ttl"),
+ DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"),
+ DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"),
+ DISABLE_ID("jms_producer_header_disable_msg_id");
+
+ public final String label;
+ JMS_MSG_HEADER_KEYS(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidJmsHeaderKey(String type) {
+ return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type));
+ }
+ public final static String JMS_PRODUCER_MSG_PROPERTY_KEY_STR = "jms_producer_msg_properties";
+ public final static String JMS_PRODUCER_MSG_BODY_KEY_STR = "msg_body";
+
+ ///// JMS Consumer
+ public final static String JMS_CONSUMER_DURABLE_KEY_STR = "jms_consumer_msg_durable";
+ public final static String JMS_CONSUMER_SHARED_KEY_STR = "jms_consumer_msg_shared";
+ public final static String JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR = "jms_consumer_subscription";
+ public final static String JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR = "jms_consumer_msg_read_selector";
+ public final static String JMS_CONSUMER_MSG_NOLOCAL_KEY_STR = "jms_consumer_msg_nolocal";
+ public final static String JMS_CONSUMER_READ_TIMEOUT_KEY_STR = "jms_consumer_msg_read_timeout";
+
+
+ // Only applicable to Pulsar JMS provider
+ public final static String PULSAR_JMS_TOPIC_URI_KEY_STR = "pulsar_topic_uri";
+
+ // Supported message operation types
+ public enum OP_TYPES {
+ MSG_SEND("msg_send"),
+ MSG_READ("msg_read");
+
+ public final String label;
+ OP_TYPES(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidClientType(String type) {
+ return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
+ }
+
+ // JMS Destination Types
+ public enum JMS_DESTINATION_TYPES {
+ QUEUE("queue"),
+ TOPIC("topic");
+
+ public final String label;
+ JMS_DESTINATION_TYPES(String label) {
+ this.label = label;
+ }
+ }
+ public static boolean isValidJmsDestinationType(String type) {
+ return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type));
+ }
+
+ public static String encode(String... strings) {
+ StringBuilder stringBuilder = new StringBuilder();
+ for (String str : strings) {
+ if (!StringUtils.isBlank(str))
+ stringBuilder.append(str).append("::");
+ }
+
+ String concatenatedStr =
+ StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
+
+ return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
+ }
+}
+
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java
new file mode 100644
index 000000000..1d5cc6d5b
--- /dev/null
+++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java
@@ -0,0 +1,99 @@
+package io.nosqlbench.driver.jms.util;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.FileBasedConfiguration;
+import org.apache.commons.configuration2.PropertiesConfiguration;
+import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
+import org.apache.commons.configuration2.builder.fluent.Parameters;
+import org.apache.commons.configuration2.ex.ConfigurationException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class PulsarConfig {
+ private final static Logger logger = LogManager.getLogger(PulsarConfig.class);
+
+ public static final String SCHEMA_CONF_PREFIX = "schema";
+ public static final String CLIENT_CONF_PREFIX = "client";
+ public static final String PRODUCER_CONF_PREFIX = "producer";
+ public static final String CONSUMER_CONF_PREFIX = "consumer";
+
+ private final Map schemaConfMap = new HashMap<>();
+ private final Map clientConfMap = new HashMap<>();
+ private final Map producerConfMap = new HashMap<>();
+ private final Map consumerConfMap = new HashMap<>();
+
+ public PulsarConfig(String fileName) {
+ File file = new File(fileName);
+
+ try {
+ String canonicalFilePath = file.getCanonicalPath();
+
+ Parameters params = new Parameters();
+
+ FileBasedConfigurationBuilder builder =
+ new FileBasedConfigurationBuilder(PropertiesConfiguration.class)
+ .configure(params.properties()
+ .setFileName(fileName));
+
+ Configuration config = builder.getConfiguration();
+
+ // Get schema specific configuration settings
+ for (Iterator it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+
+ // Get client connection specific configuration settings
+ for (Iterator it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+
+ // Get producer specific configuration settings
+ for (Iterator it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+
+ // Get consumer specific configuration settings
+ for (Iterator it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) {
+ String confKey = it.next();
+ String confVal = config.getProperty(confKey).toString();
+ if (!StringUtils.isBlank(confVal))
+ consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
+ }
+ } catch (IOException ioe) {
+ logger.error("Can't read the specified config properties file: " + fileName);
+ ioe.printStackTrace();
+ } catch (ConfigurationException cex) {
+ logger.error("Error loading configuration items from the specified config properties file: " + fileName);
+ cex.printStackTrace();
+ }
+ }
+
+ public Map getSchemaConfMap() {
+ return this.schemaConfMap;
+ }
+ public Map getClientConfMap() {
+ return this.clientConfMap;
+ }
+ public Map getProducerConfMap() {
+ return this.producerConfMap;
+ }
+ public Map getConsumerConfMap() {
+ return this.consumerConfMap;
+ }
+}
diff --git a/driver-jms/src/main/resources/jms.md b/driver-jms/src/main/resources/jms.md
new file mode 100644
index 000000000..07dd0c5c7
--- /dev/null
+++ b/driver-jms/src/main/resources/jms.md
@@ -0,0 +1 @@
+# Overview
diff --git a/driver-jms/src/main/resources/pulsar_config.properties b/driver-jms/src/main/resources/pulsar_config.properties
new file mode 100644
index 000000000..f711535ac
--- /dev/null
+++ b/driver-jms/src/main/resources/pulsar_config.properties
@@ -0,0 +1,33 @@
+### Schema related configurations - schema.xxx
+# valid types:
+# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
+# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
+# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
+# avro, json, protobuf
+#
+# NOTE: for JMS client, Pulsar "schema" is NOT supported yet
+schema.type=
+schema.definition=
+
+
+### Pulsar client related configurations - client.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#client
+client.connectionTimeoutMs=5000
+#client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
+#client.authParams=
+#client.tlsAllowInsecureConnection=true
+client.numIoThreads=10
+client.numListenerThreads=10
+
+
+### Producer related configurations (global) - producer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
+producer.sendTimeoutMs=
+producer.blockIfQueueFull=true
+producer.maxPendingMessages=10000
+producer.batchingMaxMessages=10000
+
+
+### Consumer related configurations (global) - consumer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
+consumer.receiverQueueSize=2000
diff --git a/driver-jms/src/main/resources/pulsar_jms.yaml b/driver-jms/src/main/resources/pulsar_jms.yaml
new file mode 100644
index 000000000..755f05ed6
--- /dev/null
+++ b/driver-jms/src/main/resources/pulsar_jms.yaml
@@ -0,0 +1,89 @@
+bindings:
+ payload: NumberNameToString() #AlphaNumericString(20)
+ tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
+ namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
+ core_topic_name: Mod(5); ToString(); Prefix("t")
+
+# document level parameters that apply to all Pulsar client types:
+params:
+ ### static only
+ async_api: "true"
+
+ ### Static only
+ # Valid values: queue (point-to-point) or topic (pub-sub)
+ jms_desitation_type: "topic"
+
+ ### Static Only
+ # NOTE: ONLY relevant when the JMS provider is Pulsar
+ #pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
+ #pulsar_topic_uri: "persistent://public/default/pt100"
+ #pulsar_topic_uri: "persistent://public/default/t0"
+ pulsar_topic_uri: "persistent://public/default/pt100_10"
+ #pulsar_topic_uri: "persistent://public/default/pt200_10"
+ #pulsar_topic_uri: "persistent://public/default/pt300_10"
+ #pulsar_topic_uri: "persistent://public/default/pt400_10"
+
+blocks:
+ - name: "producer-block"
+ tags:
+ phase: "jms_producer"
+ statements:
+ - name: "s1"
+ optype: "msg_send"
+
+ ### JMS PRODUCER message header
+ ### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT
+ # - static or dynamic
+ # - Producer only
+ # Valid values: non-persistent(1), or persistent(2) - default
+ jms_producer_header_msg_delivery_mode: "2"
+ # Valid values: 0~9 (4 as default)
+ jms_producer_header_msg_priority: "4"
+ # Valid values: non-negative long; default 0 (never expires)
+ jms_producer_header_msg_ttl: "0"
+ # Valid values: non-negative long; default 0 (no delay)
+ jms_producer_header_msg_delivery_delay: "0"
+ # Valid values: true/false; default false (message timestamp is enabled)
+ jms_producer_header_disable_msg_timestamp: "false"
+ # Valid values: true/false; default false (message ID is enabled)
+ jms_producer_header_disable_msg_id: "false"
+
+ ### JMS PRODUCER message properties
+ # - static only
+ # - Producer only
+ # - In format: "key1=value1;key2=value2;..."
+ jms_producer_msg_properties: "key1=value1;key2=value2"
+
+ ### JMS PRODUCER message body
+ msg_body: "{payload}"
+
+ - name: "consumer-block"
+ tags:
+ phase: "jms_consumer"
+ statements:
+ - name: "s1"
+ optype: "msg_read"
+
+ ### JMS CONSUMER durable and shared
+ jms_consumer_msg_durable: "true"
+ jms_consumer_msg_shared: "true"
+
+ ### JMS CONSUMER subscription name
+ # - only relevant for durable consumer
+ jms_consumer_subscription: "mysub"
+
+ ### JMS CONSUMER subscription name
+ # - only relevant for unshared consumer
+ jms_consumer_nolocal: "false"
+
+ ### JMS CONSUMER message read timeout
+ # - unit: milliseconds
+ # - 0 means call blocks indefinitely
+ # - FIXME: 0 supposes to wait indefinitly; but
+ # it actually behaves like no wait at all
+ jms_consumer_msg_read_timeout: "10000"
+
+ ### JMS CONSUMER message selector
+ # - empty string means no message selector
+ # - https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
+ jms_consumer_msg_read_selector: ""
diff --git a/driver-jmx/pom.xml b/driver-jmx/pom.xml
index d4e218c6e..64b295e4c 100644
--- a/driver-jmx/pom.xml
+++ b/driver-jmx/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-kafka/pom.xml b/driver-kafka/pom.xml
index 2c27a933d..99a04f375 100644
--- a/driver-kafka/pom.xml
+++ b/driver-kafka/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -44,13 +44,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-stdout
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-mongodb/pom.xml b/driver-mongodb/pom.xml
index 904981efa..3087af763 100644
--- a/driver-mongodb/pom.xml
+++ b/driver-mongodb/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -21,13 +21,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-pulsar/pom.xml b/driver-pulsar/pom.xml
index 45e416d97..9585687c9 100644
--- a/driver-pulsar/pom.xml
+++ b/driver-pulsar/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -40,13 +40,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-stdout
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java
index b442cf032..0bbabd8c4 100644
--- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java
+++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java
@@ -5,7 +5,6 @@ import org.apache.commons.configuration2.FileBasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
-import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
diff --git a/driver-stdout/pom.xml b/driver-stdout/pom.xml
index 518cdf2d3..74b2f0dd8 100644
--- a/driver-stdout/pom.xml
+++ b/driver-stdout/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-tcp/pom.xml b/driver-tcp/pom.xml
index 1f435b489..aba33fd24 100644
--- a/driver-tcp/pom.xml
+++ b/driver-tcp/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -24,19 +24,19 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-stdout
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/driver-web/pom.xml b/driver-web/pom.xml
index 94d0ce903..a0c1ee066 100644
--- a/driver-web/pom.xml
+++ b/driver-web/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/drivers-api/pom.xml b/drivers-api/pom.xml
index 7f2bf8a7d..1484547f2 100644
--- a/drivers-api/pom.xml
+++ b/drivers-api/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbench
nb-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
virtdata-userlibs
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/engine-api/pom.xml b/engine-api/pom.xml
index 70635778d..3d576cd94 100644
--- a/engine-api/pom.xml
+++ b/engine-api/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -23,25 +23,25 @@
io.nosqlbench
nb-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
nb-annotations
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
virtdata-userlibs
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/engine-cli/pom.xml b/engine-cli/pom.xml
index 4a4f77a12..810af15a0 100644
--- a/engine-cli/pom.xml
+++ b/engine-cli/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -23,13 +23,13 @@
io.nosqlbench
engine-core
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
engine-docker
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/engine-clients/pom.xml b/engine-clients/pom.xml
index a741ede12..8131da3cd 100644
--- a/engine-clients/pom.xml
+++ b/engine-clients/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -21,7 +21,7 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/engine-core/pom.xml b/engine-core/pom.xml
index ca3622c50..6d10878e4 100644
--- a/engine-core/pom.xml
+++ b/engine-core/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -28,13 +28,13 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
drivers-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
@@ -85,7 +85,7 @@
io.nosqlbench
engine-clients
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
compile
diff --git a/engine-docker/pom.xml b/engine-docker/pom.xml
index d00200b2f..1b77f3c3d 100644
--- a/engine-docker/pom.xml
+++ b/engine-docker/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -56,7 +56,7 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/engine-docs/pom.xml b/engine-docs/pom.xml
index 7843b7dd5..aeb1259e6 100644
--- a/engine-docs/pom.xml
+++ b/engine-docs/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -28,7 +28,7 @@
io.nosqlbench
docsys
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/engine-extensions/pom.xml b/engine-extensions/pom.xml
index 1b8651739..6a74bf9f8 100644
--- a/engine-extensions/pom.xml
+++ b/engine-extensions/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -22,7 +22,7 @@
io.nosqlbench
engine-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/engine-rest/pom.xml b/engine-rest/pom.xml
index b24a33836..972d493b8 100644
--- a/engine-rest/pom.xml
+++ b/engine-rest/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -47,7 +47,7 @@
io.nosqlbench
engine-cli
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/mvn-defaults/pom.xml b/mvn-defaults/pom.xml
index f0514fd4c..116fb8727 100644
--- a/mvn-defaults/pom.xml
+++ b/mvn-defaults/pom.xml
@@ -3,7 +3,7 @@
io.nosqlbench
mvn-defaults
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
pom
diff --git a/nb-annotations/pom.xml b/nb-annotations/pom.xml
index 4286950cd..7fe9a6048 100644
--- a/nb-annotations/pom.xml
+++ b/nb-annotations/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
diff --git a/nb-api/pom.xml b/nb-api/pom.xml
index fd0c913e7..17fc0ec91 100644
--- a/nb-api/pom.xml
+++ b/nb-api/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -31,7 +31,7 @@
io.nosqlbench
nb-annotations
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/nb/pom.xml b/nb/pom.xml
index 88bfa0e08..01d541934 100644
--- a/nb/pom.xml
+++ b/nb/pom.xml
@@ -5,7 +5,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -24,115 +24,121 @@
io.nosqlbench
engine-rest
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
engine-cli
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
engine-docs
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
engine-core
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
engine-extensions
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-web
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-kafka
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-stdout
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-diag
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-tcp
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-http
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-jmx
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-dsegraph-shaded
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-cql-shaded
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-cqld3-shaded
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-cqlverify
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-mongodb
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-pulsar
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
driver-cockroachdb
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
+
+
+
+ io.nosqlbench
+ driver-jms
+ 4.15.47-SNAPSHOT
@@ -232,7 +238,7 @@
io.nosqlbench
driver-mongodb
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index 4f9201df9..99036d220 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
mvn-defaults
@@ -50,6 +50,7 @@
driver-jdbc
driver-cockroachdb
driver-pulsar
+ driver-jms
driver-grpc
driver-direct
diff --git a/virtdata-api/pom.xml b/virtdata-api/pom.xml
index 0be64047a..54a3cc238 100644
--- a/virtdata-api/pom.xml
+++ b/virtdata-api/pom.xml
@@ -7,7 +7,7 @@
io.nosqlbench
mvn-defaults
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -23,14 +23,14 @@
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
nb-api
io.nosqlbench
virtdata-lang
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/virtdata-lang/pom.xml b/virtdata-lang/pom.xml
index 71fcb8661..1daecba24 100644
--- a/virtdata-lang/pom.xml
+++ b/virtdata-lang/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
diff --git a/virtdata-lib-basics/pom.xml b/virtdata-lib-basics/pom.xml
index c9dbb9d49..a00451fea 100644
--- a/virtdata-lib-basics/pom.xml
+++ b/virtdata-lib-basics/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -20,7 +20,7 @@
io.nosqlbench
virtdata-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/virtdata-lib-curves4/pom.xml b/virtdata-lib-curves4/pom.xml
index dc327a175..f49df2608 100644
--- a/virtdata-lib-curves4/pom.xml
+++ b/virtdata-lib-curves4/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -22,13 +22,13 @@
io.nosqlbench
virtdata-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
virtdata-lib-basics
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/virtdata-lib-random/pom.xml b/virtdata-lib-random/pom.xml
index 988ac5d7a..6e055c582 100644
--- a/virtdata-lib-random/pom.xml
+++ b/virtdata-lib-random/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -20,13 +20,13 @@
io.nosqlbench
virtdata-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
virtdata-lib-basics
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/virtdata-lib-realer/pom.xml b/virtdata-lib-realer/pom.xml
index 501f6e2fe..1667684a6 100644
--- a/virtdata-lib-realer/pom.xml
+++ b/virtdata-lib-realer/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -20,7 +20,7 @@
io.nosqlbench
virtdata-lib-basics
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/virtdata-realdata/pom.xml b/virtdata-realdata/pom.xml
index 266658e9a..fc86b40f4 100644
--- a/virtdata-realdata/pom.xml
+++ b/virtdata-realdata/pom.xml
@@ -7,7 +7,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -18,7 +18,7 @@
io.nosqlbench
virtdata-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
diff --git a/virtdata-userlibs/pom.xml b/virtdata-userlibs/pom.xml
index 1ad314aa5..972c42c28 100644
--- a/virtdata-userlibs/pom.xml
+++ b/virtdata-userlibs/pom.xml
@@ -4,7 +4,7 @@
mvn-defaults
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
../mvn-defaults
@@ -18,36 +18,36 @@
io.nosqlbench
virtdata-realdata
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
virtdata-lib-realer
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
virtdata-api
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
virtdata-lib-random
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
virtdata-lib-basics
io.nosqlbench
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT
virtdata-lib-curves4
@@ -55,7 +55,7 @@
io.nosqlbench
docsys
- 4.15.45-SNAPSHOT
+ 4.15.47-SNAPSHOT