confKeyList = new ArrayList<>();
+
+ // All configuration items with "String" as the value type
+ confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "String");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, confVal);
+ }
+ }
+ }
+
+ // All configuration items with "long" as the value type
+ confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "long");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, Long.valueOf(confVal));
+ }
+ }
+ }
+
+ // All configuration items with "int" as the value type
+ confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "int");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, Integer.valueOf(confVal));
+ }
+ }
+ }
+
+ // All configuration items with "boolean" as the value type
+ confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "boolean");
+ for (String confKey : confKeyList) {
+ if (srcConfMapRaw.containsKey(confKey)) {
+ String confVal = srcConfMapRaw.get(confKey);
+ if (StringUtils.isNotBlank(confVal)) {
+ tgtConfObjMap.put(confKey, Boolean.valueOf(confVal));
+ }
+ }
+ }
+
+ // TODO: So far the above primitive types should be good enough.
+ // Add support for other types when needed
+ }
+
+ private static String getInvalidConfValStr(String confKey, String confVal, String configCategory, String expectedVal) {
+ return "Incorrect value \"" + confVal + "\" for Pulsar " + configCategory +
+ " configuration item of \"" + confKey + "\". Expecting the following value (format): " + expectedVal;
+ }
+}
diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java
new file mode 100644
index 000000000..f929ab25a
--- /dev/null
+++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java
@@ -0,0 +1,169 @@
+package io.nosqlbench.adapter.pulsar.util;
+
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import com.codahale.metrics.Counter;
+
+import java.util.Iterator;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Detects message loss, message duplication and out-of-order message delivery
+ * based on a monotonic sequence number that each received message contains.
+ *
+ * Out-of-order messages are detected with a maximum look behind of 1000 sequence number entries.
+ * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}.
+ */
+public class ReceivedMessageSequenceTracker implements AutoCloseable {
+ private static final int DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 1000;
+ private static final int DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS = 1000;
+ // message out-of-sequence error counter
+ private final Counter msgErrOutOfSeqCounter;
+ // duplicate message error counter
+ private final Counter msgErrDuplicateCounter;
+ // message loss error counter
+ private final Counter msgErrLossCounter;
+ private final SortedSet pendingOutOfSeqNumbers;
+ private final int maxTrackOutOfOrderSequenceNumbers;
+ private final SortedSet skippedSeqNumbers;
+ private final int maxTrackSkippedSequenceNumbers;
+ private long expectedNumber = -1;
+
+ public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
+ this(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter,
+ DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS, DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS);
+ }
+
+ public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter,
+ int maxTrackOutOfOrderSequenceNumbers, int maxTrackSkippedSequenceNumbers) {
+ this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
+ this.msgErrDuplicateCounter = msgErrDuplicateCounter;
+ this.msgErrLossCounter = msgErrLossCounter;
+ this.maxTrackOutOfOrderSequenceNumbers = maxTrackOutOfOrderSequenceNumbers;
+ this.maxTrackSkippedSequenceNumbers = maxTrackSkippedSequenceNumbers;
+ this.pendingOutOfSeqNumbers = new TreeSet<>();
+ this.skippedSeqNumbers = new TreeSet<>();
+ }
+
+ /**
+ * Notifies the tracker about a received sequence number
+ *
+ * @param sequenceNumber the sequence number of the received message
+ */
+ public void sequenceNumberReceived(long sequenceNumber) {
+ if (expectedNumber == -1) {
+ expectedNumber = sequenceNumber + 1;
+ return;
+ }
+
+ if (sequenceNumber < expectedNumber) {
+ if (skippedSeqNumbers.remove(sequenceNumber)) {
+ // late out-of-order delivery was detected
+ // decrease the loss counter
+ msgErrLossCounter.dec();
+ // increment the out-of-order counter
+ msgErrOutOfSeqCounter.inc();
+ } else {
+ msgErrDuplicateCounter.inc();
+ }
+ return;
+ }
+
+ boolean messagesSkipped = false;
+ if (sequenceNumber > expectedNumber) {
+ if (pendingOutOfSeqNumbers.size() == maxTrackOutOfOrderSequenceNumbers) {
+ messagesSkipped = processLowestPendingOutOfSequenceNumber();
+ }
+ if (!pendingOutOfSeqNumbers.add(sequenceNumber)) {
+ msgErrDuplicateCounter.inc();
+ }
+ } else {
+ // sequenceNumber == expectedNumber
+ expectedNumber++;
+ }
+ processPendingOutOfSequenceNumbers(messagesSkipped);
+ cleanUpTooFarBehindOutOfSequenceNumbers();
+ }
+
+ private boolean processLowestPendingOutOfSequenceNumber() {
+ // remove the lowest pending out of sequence number
+ Long lowestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
+ pendingOutOfSeqNumbers.remove(lowestOutOfSeqNumber);
+ if (lowestOutOfSeqNumber > expectedNumber) {
+ // skip the expected number ahead to the number after the lowest sequence number
+ // increment the counter with the amount of sequence numbers that got skipped
+ // keep track of the skipped sequence numbers to detect late out-of-order message delivery
+ for (long l = expectedNumber; l < lowestOutOfSeqNumber; l++) {
+ msgErrLossCounter.inc();
+ skippedSeqNumbers.add(l);
+ if (skippedSeqNumbers.size() > maxTrackSkippedSequenceNumbers) {
+ skippedSeqNumbers.remove(skippedSeqNumbers.first());
+ }
+ }
+ expectedNumber = lowestOutOfSeqNumber + 1;
+ return true;
+ } else {
+ msgErrLossCounter.inc();
+ }
+ return false;
+ }
+
+ private void processPendingOutOfSequenceNumbers(boolean messagesSkipped) {
+ // check if there are previously received out-of-order sequence number that have been received
+ while (pendingOutOfSeqNumbers.remove(expectedNumber)) {
+ expectedNumber++;
+ if (!messagesSkipped) {
+ msgErrOutOfSeqCounter.inc();
+ }
+ }
+ }
+
+ private void cleanUpTooFarBehindOutOfSequenceNumbers() {
+ // remove sequence numbers that are too far behind
+ for (Iterator iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) {
+ Long number = iterator.next();
+ if (number < expectedNumber - maxTrackOutOfOrderSequenceNumbers) {
+ msgErrLossCounter.inc();
+ iterator.remove();
+ } else {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Handles the possible pending out of sequence numbers. Mainly needed in unit tests to assert the
+ * counter values.
+ */
+ @Override
+ public void close() {
+ while (!pendingOutOfSeqNumbers.isEmpty()) {
+ processPendingOutOfSequenceNumbers(processLowestPendingOutOfSequenceNumber());
+ }
+ }
+
+ public int getMaxTrackOutOfOrderSequenceNumbers() {
+ return maxTrackOutOfOrderSequenceNumbers;
+ }
+
+ public int getMaxTrackSkippedSequenceNumbers() {
+ return maxTrackSkippedSequenceNumbers;
+ }
+}
diff --git a/adapter-pulsar/src/main/resources/admin_namespace.yaml b/adapter-pulsar/src/main/resources/admin_namespace.yaml
new file mode 100644
index 000000000..4fd9b18ac
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/admin_namespace.yaml
@@ -0,0 +1,16 @@
+bindings:
+ # 20 topics: 10 tenants, 2 namespaces/tenant
+ tenant: Mod(20); Div(2); ToString(); Prefix("tnt")
+ namespace: Mod(2); ToString(); Prefix("ns")
+
+params:
+ async_api: "false"
+ admin_delop: "false"
+
+blocks:
+ admin-namespace-block:
+ tags:
+ phase: admin-namespace
+ ops:
+ op1:
+ AdminNamespace: "{tenant}/{namespace}"
diff --git a/adapter-pulsar/src/main/resources/admin_tenant.yaml b/adapter-pulsar/src/main/resources/admin_tenant.yaml
new file mode 100644
index 000000000..865643fde
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/admin_tenant.yaml
@@ -0,0 +1,17 @@
+bindings:
+ # 10 tenants
+ tenant: Mod(10); ToString(); Prefix("tnt")
+
+params:
+ async_api: "false"
+ admin_delop: "false"
+
+blocks:
+ admin-tenant-block:
+ tags:
+ phase: admin-tenant
+ ops:
+ op1:
+ AdminTopic: "{tenant}"
+ admin_roles: ""
+ allowed_clusters: ""
diff --git a/adapter-pulsar/src/main/resources/admin_topic.yaml b/adapter-pulsar/src/main/resources/admin_topic.yaml
new file mode 100644
index 000000000..6e30ad2bf
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/admin_topic.yaml
@@ -0,0 +1,19 @@
+bindings:
+ # 100 topics: 10 tenants, 2 namespaces/tenant, 5 topics/namespace
+ tenant: Mod(100); Div(10); ToString(); Prefix("tnt")
+ namespace: Mod(10); Div(5); ToString(); Prefix("ns")
+ topic: Mod(5); ToString(); Prefix("tp")
+
+params:
+ async_api: "false"
+ admin_delop: "false"
+
+blocks:
+ admin-topic-block:
+ tags:
+ phase: admin-topic
+ ops:
+ op1:
+ AdminTopic: "{tenant}/{namespace}/{topic}"
+ enable_partition: "false"
+ partition_num: "5"
diff --git a/adapter-pulsar/src/main/resources/bindingtest.yaml b/adapter-pulsar/src/main/resources/bindingtest.yaml
new file mode 100644
index 000000000..e687f5fa4
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/bindingtest.yaml
@@ -0,0 +1,4 @@
+bindings:
+ tenant: Mod(100); Div(10); ToString(); Prefix("tnt")
+ namespace: Mod(10); Div(5); ToString(); Prefix("ns")
+ topic: Mod(5); ToString(); Prefix("tp")
diff --git a/adapter-pulsar/src/main/resources/config.properties b/adapter-pulsar/src/main/resources/config.properties
new file mode 100644
index 000000000..9cc804c1f
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/config.properties
@@ -0,0 +1,52 @@
+9### Schema related configurations - schema.xxx
+# valid types:
+# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
+# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
+# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
+# avro, json, protobuf
+#
+# TODO: as a starting point, only supports the following types
+# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
+# 2) Avro for messages with schema
+#schema.key.type=avro
+#schema.key.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-key-example.avsc
+#schema.type=avro
+#schema.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-value-example.avsc
+schema.type=
+schema.definition=
+
+
+### Pulsar client related configurations - client.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#client
+client.connectionTimeoutMs=5000
+client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
+# Cluster admin
+client.authParams=
+client.tlsAllowInsecureConnection=true
+
+
+### Producer related configurations (global) - producer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
+producer.producerName=
+producer.topicName=
+producer.sendTimeoutMs=
+producer.blockIfQueueFull=true
+
+
+### Consumer related configurations (global) - consumer.xxx
+# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
+consumer.topicNames=
+consumer.topicsPattern=
+consumer.subscriptionName=
+consumer.subscriptionType=
+consumer.consumerName=
+consumer.receiverQueueSize=
+
+
+### Reader related configurations (global) - reader.xxx
+# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
+# - valid Pos: earliest, latest, custom::file:////
+reader.topicName=
+reader.receiverQueueSize=
+reader.readerName=
+reader.startMessagePos=earliest
diff --git a/adapter-pulsar/src/main/resources/iot-key-example.avsc b/adapter-pulsar/src/main/resources/iot-key-example.avsc
new file mode 100644
index 000000000..f36b52bc3
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/iot-key-example.avsc
@@ -0,0 +1,9 @@
+{
+ "type": "record",
+ "name": "IotSensorKey",
+ "namespace": "TestNS",
+ "fields" : [
+ {"name": "Location", "type": "string"},
+ {"name": "WellID", "type": "string"}
+ ]
+}
diff --git a/adapter-pulsar/src/main/resources/iot-value-example.avsc b/adapter-pulsar/src/main/resources/iot-value-example.avsc
new file mode 100644
index 000000000..20bb894fd
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/iot-value-example.avsc
@@ -0,0 +1,11 @@
+{
+ "type": "record",
+ "name": "IotSensor",
+ "namespace": "TestNS",
+ "fields" : [
+ {"name": "SensorID", "type": "string"},
+ {"name": "SensorType", "type": "string"},
+ {"name": "ReadingTime", "type": "string"},
+ {"name": "ReadingValue", "type": "float"}
+ ]
+}
diff --git a/adapter-pulsar/src/main/resources/msg_proc_avro.yaml b/adapter-pulsar/src/main/resources/msg_proc_avro.yaml
new file mode 100644
index 000000000..bfc07a176
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/msg_proc_avro.yaml
@@ -0,0 +1,41 @@
+bindings:
+ # message key and value
+ mykey: NumberNameToString()
+ location: Cities();
+ well_id: ToUUID();ToString();
+ sensor_id: ToUUID();ToString();
+ reading_time: ToDateTime();
+ reading_value: ToFloat(100);
+
+# document level parameters that apply to all Pulsar client types:
+params:
+ async_api: "true"
+
+blocks:
+ msg-produce-block:
+ tags:
+ phase: msg-send
+ ops:
+ op1:
+ MessageProduce: "tnt0/ns0/tp1"
+ msg_key: |
+ {
+ "Location": "{location}",
+ "WellID": "{well_id}"
+ }
+ msg_value: |
+ {
+ "SensorID": "{sensor_id}",
+ "SensorType": "Temperature",
+ "ReadingTime": "{reading_time}",
+ "ReadingValue": {reading_value}
+ }
+
+ msg-consume-block:
+ tags:
+ phase: msg-recv
+ ops:
+ op1:
+ MessageConsume: "tnt0/ns0/tp0"
+ subscription_name: "mynbsub"
+# subscription_type: "shared"
diff --git a/adapter-pulsar/src/main/resources/msg_proc_kvraw.yaml b/adapter-pulsar/src/main/resources/msg_proc_kvraw.yaml
new file mode 100644
index 000000000..4aea8f44a
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/msg_proc_kvraw.yaml
@@ -0,0 +1,34 @@
+bindings:
+ # message key, property and value
+ mykey: NumberNameToString()
+ int_prop_val: ToString(); Prefix("IntProp_")
+ text_prop_val: AlphaNumericString(5); Prefix("TextProp_")
+ myvalue: AlphaNumericString(20)
+
+# document level parameters that apply to all Pulsar client types:
+params:
+ async_api: "true"
+
+blocks:
+ msg-produce-block:
+ tags:
+ phase: msg-send
+ ops:
+ op1:
+ MessageProduce: "tnt0/ns0/tp0"
+ msg_key: "{mykey}"
+ msg_prop: |
+ {
+ "prop1": "{int_prop_val}",
+ "prop2": "{text_prop_val}"
+ }
+ msg_value: "{myvalue}"
+
+ msg-consume-block:
+ tags:
+ phase: msg-recv
+ ops:
+ op1:
+ MessageConsume: "tnt0/ns0/tp0"
+ subscription_name: "mynbsub"
+# subscription_type: "shared"
diff --git a/adapter-pulsar/src/main/resources/pulsar.md b/adapter-pulsar/src/main/resources/pulsar.md
new file mode 100644
index 000000000..586fecbb2
--- /dev/null
+++ b/adapter-pulsar/src/main/resources/pulsar.md
@@ -0,0 +1 @@
+<< to be added ... >>
diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java
index 740ef3348..7bfc2d60a 100644
--- a/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java
+++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/activityimpl/BaseOpDispenser.java
@@ -39,7 +39,7 @@ public abstract class BaseOpDispenser implements OpDispenser
private final String name;
protected final DriverAdapter adapter;
- private boolean instrument;
+ protected boolean instrument;
private Histogram resultSizeHistogram;
private Timer successTimer;
private Timer errorTimer;
@@ -83,12 +83,16 @@ public abstract class BaseOpDispenser implements OpDispenser
@Override
public abstract T apply(long cycle);
+ protected String getDefaultMetricsPrefix(ParsedOp pop) {
+ return pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--";
+ }
+
private void configureInstrumentation(ParsedOp pop) {
this.instrument = pop.takeStaticConfigOr("instrument", false);
if (instrument) {
- this.successTimer = ActivityMetrics.timer(pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--success");
- this.errorTimer = ActivityMetrics.timer(pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--error");
- this.resultSizeHistogram = ActivityMetrics.histogram(pop.getStaticConfigOr("alias", "UNKNOWN") + "-" + pop.getName() + "--resultset-size");
+ this.successTimer = ActivityMetrics.timer(getDefaultMetricsPrefix(pop) + "success");
+ this.errorTimer = ActivityMetrics.timer(getDefaultMetricsPrefix(pop) + "error");
+ this.resultSizeHistogram = ActivityMetrics.histogram(getDefaultMetricsPrefix(pop) + "resultset-size");
}
}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java
deleted file mode 100644
index 3a3ef9b2a..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms;
-
-import com.codahale.metrics.Timer;
-import io.nosqlbench.driver.jms.ops.JmsOp;
-import io.nosqlbench.engine.api.activityapi.core.SyncAction;
-import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.function.LongFunction;
-
-public class JmsAction implements SyncAction {
-
- private final static Logger logger = LogManager.getLogger(JmsAction.class);
-
- private final JmsActivity activity;
- private final int slot;
-
- int maxTries;
-
- public JmsAction(JmsActivity activity, int slot) {
- this.activity = activity;
- this.slot = slot;
- this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
- }
-
- @Override
- public void init() { }
-
- @Override
- public int runCycle(long cycle) {
- // let's fail the action if some async operation failed
- activity.failOnAsyncOperationFailure();
-
- long start = System.nanoTime();
-
- JmsOp jmsOp;
- try (Timer.Context ctx = activity.getBindTimer().time()) {
- LongFunction extends JmsOp> readyJmsOp = activity.getSequencer().apply(cycle);
- jmsOp = readyJmsOp.apply(cycle);
- } catch (Exception bindException) {
- // if diagnostic mode ...
- activity.getErrorhandler().handleError(bindException, cycle, 0);
- throw new RuntimeException(
- "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
- );
- }
-
- for (int i = 0; i < maxTries; i++) {
- Timer.Context ctx = activity.getExecuteTimer().time();
- try {
- // it is up to the jmsOp to call Context#close when the activity is executed
- // this allows us to track time for async operations
- jmsOp.run(ctx::close);
- break;
- } catch (RuntimeException err) {
- ErrorDetail errorDetail = activity
- .getErrorhandler()
- .handleError(err, cycle, System.nanoTime() - start);
- if (!errorDetail.isRetryable()) {
- break;
- }
- }
- }
-
- return 0;
- }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java
deleted file mode 100644
index 2da3a440c..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Timer;
-import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
-import io.nosqlbench.driver.jms.conn.JmsConnInfo;
-import io.nosqlbench.driver.jms.conn.JmsPulsarConnInfo;
-import io.nosqlbench.driver.jms.ops.JmsOp;
-import io.nosqlbench.driver.jms.util.JmsUtil;
-import io.nosqlbench.driver.jms.util.PulsarConfig;
-import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
-import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
-import io.nosqlbench.api.engine.activityimpl.ActivityDef;
-import io.nosqlbench.engine.api.activityimpl.OpDispenser;
-import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
-import io.nosqlbench.api.engine.metrics.ActivityMetrics;
-import org.apache.commons.lang3.StringUtils;
-
-import javax.jms.Destination;
-import javax.jms.JMSContext;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-public class JmsActivity extends SimpleActivity {
-
- private final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>();
-
- private String jmsProviderType;
- private JmsConnInfo jmsConnInfo;
-
- private JMSContext jmsContext;
-
- private OpSequence> sequence;
- private volatile Throwable asyncOperationFailure;
- private NBErrorHandler errorhandler;
-
- private Timer bindTimer;
- private Timer executeTimer;
- private Counter bytesCounter;
- private Histogram messagesizeHistogram;
-
- public JmsActivity(ActivityDef activityDef) {
- super(activityDef);
- }
-
- @Override
- public void initActivity() {
- super.initActivity();
-
- // default JMS type: Pulsar
- // - currently this is the only supported JMS provider
- jmsProviderType =
- activityDef.getParams()
- .getOptionalString(JmsUtil.JMS_PROVIDER_TYPE_KEY_STR)
- .orElse(JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label);
-
- // "Pulsar" as the JMS provider
- if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
-
- String webSvcUrl =
- activityDef.getParams()
- .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR)
- .orElse("http://localhost:8080");
- String pulsarSvcUrl =
- activityDef.getParams()
- .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR)
- .orElse("pulsar://localhost:6650");
-
- if (StringUtils.isAnyBlank(webSvcUrl, pulsarSvcUrl)) {
- throw new RuntimeException("For \"" + JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label + "\" type, " +
- "\"" + JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR + "\" and " +
- "\"" + JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR + "\" parameters are manadatory!");
- }
-
- // Check if extra Pulsar config. file is in place
- // - default file: "pulsar_config.properties" under the current directory
- String pulsarCfgFile =
- activityDef.getParams()
- .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR)
- .orElse(JmsUtil.JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME);
-
- PulsarConfig pulsarConfig = new PulsarConfig(pulsarCfgFile);
-
- jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, webSvcUrl, pulsarSvcUrl, pulsarConfig);
- }
- else {
- throw new RuntimeException("Unsupported JMS driver type : " + jmsProviderType);
- }
-
- PulsarConnectionFactory factory;
- factory = new PulsarConnectionFactory(jmsConnInfo.getJmsConnConfig());
- this.jmsContext = factory.createContext();
-
- bindTimer = ActivityMetrics.timer(activityDef, "bind", this.getHdrDigits());
- executeTimer = ActivityMetrics.timer(activityDef, "execute", this.getHdrDigits());
- bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
- messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize", this.getHdrDigits());
-
- if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
- this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this), false, Optional.empty());
- }
-
- setDefaultsFromOpSequence(sequence);
- onActivityDefUpdate(activityDef);
-
- this.errorhandler = new NBErrorHandler(
- () -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
- this::getExceptionMetrics
- );
- }
-
- private static String buildCacheKey(String... keyParts) {
- return String.join("::", keyParts);
- }
-
- /**
- * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
- */
- public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) {
- String destinationCacheKey = buildCacheKey(jmsDestinationType, destName);
- Destination destination = jmsDestinations.get(destinationCacheKey);
-
- if ( destination == null ) {
- // TODO: should we match Persistent/Non-peristent JMS Delivery mode with
- // Pulsar Persistent/Non-prsistent topic?
- if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.QUEUE.label)) {
- destination = jmsContext.createQueue(destName);
- } else if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.TOPIC.label)) {
- destination = jmsContext.createTopic(destName);
- }
-
- jmsDestinations.put(destinationCacheKey, destination);
- }
-
- return destination;
- }
-
- @Override
- public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
- public OpSequence> getSequencer() { return sequence; }
-
- public String getJmsProviderType() { return jmsProviderType; }
- public JmsConnInfo getJmsConnInfo() { return jmsConnInfo; }
- public JMSContext getJmsContext() { return jmsContext; }
-
- public Timer getBindTimer() { return bindTimer; }
- public Timer getExecuteTimer() { return this.executeTimer; }
- public Counter getBytesCounter() { return bytesCounter; }
- public Histogram getMessagesizeHistogram() { return messagesizeHistogram; }
-
- public NBErrorHandler getErrorhandler() { return errorhandler; }
-
- public void failOnAsyncOperationFailure() {
- if (asyncOperationFailure != null) {
- throw new RuntimeException(asyncOperationFailure);
- }
- }
- public void asyncOperationFailed(Throwable ex) {
- this.asyncOperationFailure = ex;
- }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java
deleted file mode 100644
index b964516ea..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms;
-
-import io.nosqlbench.engine.api.activityapi.core.Action;
-import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
-import io.nosqlbench.engine.api.activityapi.core.ActivityType;
-import io.nosqlbench.api.engine.activityimpl.ActivityDef;
-import io.nosqlbench.nb.annotations.Service;
-
-@Service(value = ActivityType.class, selector = "jms")
-public class JmsActivityType implements ActivityType {
- @Override
- public ActionDispenser getActionDispenser(JmsActivity activity) {
- return new PulsarJmsActionDispenser(activity);
- }
-
- @Override
- public JmsActivity getActivity(ActivityDef activityDef) {
- return new JmsActivity(activityDef);
- }
-
- private static class PulsarJmsActionDispenser implements ActionDispenser {
- private final JmsActivity activity;
- public PulsarJmsActionDispenser(JmsActivity activity) {
- this.activity = activity;
- }
-
- @Override
- public Action getAction(int slot) {
- return new JmsAction(activity, slot);
- }
- }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java
deleted file mode 100644
index bd532033f..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms;
-
-import io.nosqlbench.driver.jms.ops.JmsOp;
-import io.nosqlbench.driver.jms.util.JmsUtil;
-import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
-import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
-import io.nosqlbench.engine.api.templating.CommandTemplate;
-import org.apache.commons.lang3.BooleanUtils;
-
-import java.util.function.LongFunction;
-
-abstract public class ReadyJmsOp extends BaseOpDispenser {
-
- protected final OpTemplate optpl;
- protected final CommandTemplate cmdTpl;
- protected final JmsActivity jmsActivity;
-
- protected final String stmtOpType;
- protected LongFunction asyncApiFunc;
- protected LongFunction jmsDestinationTypeFunc;
-
- protected final LongFunction opFunc;
-
- public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
- super(opTemplate);
- this.optpl = opTemplate;
- this.cmdTpl = new CommandTemplate(optpl);
- this.jmsActivity = jmsActivity;
-
- if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
- throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
- }
- this.stmtOpType = cmdTpl.getStatic("optype");
-
- // Global/Doc-level parameter: async_api
- if (cmdTpl.containsKey(JmsUtil.ASYNC_API_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.ASYNC_API_KEY_STR)) {
- boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.ASYNC_API_KEY_STR));
- this.asyncApiFunc = (l) -> value;
- } else {
- throw new RuntimeException("\"" + JmsUtil.ASYNC_API_KEY_STR + "\" parameter cannot be dynamic!");
- }
- }
-
- // Global/Doc-level parameter: jms_desitation_type
- // - queue: point-to-point
- // - topic: pub/sub
- if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
- jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR);
- } else {
- throw new RuntimeException("\"" + JmsUtil.JMS_DESTINATION_TYPE_KEY_STR + "\" parameter cannot be dynamic!");
- }
- }
-
- this.opFunc = resolveJms();
- }
-
- public JmsOp apply(long value) { return opFunc.apply(value); }
-
- abstract LongFunction resolveJms();
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java
deleted file mode 100644
index 72c397a97..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms;
-
-import io.nosqlbench.driver.jms.ops.JmsMsgReadMapper;
-import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
-import io.nosqlbench.driver.jms.ops.JmsOp;
-import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
-import io.nosqlbench.driver.jms.util.JmsUtil;
-import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
-import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.JMSRuntimeException;
-import javax.jms.Message;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.LongFunction;
-import java.util.stream.Collectors;
-
-public class ReadyPulsarJmsOp extends ReadyJmsOp {
-
- public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
- super(opTemplate, jmsActivity);
- }
-
- public LongFunction resolveJms() {
- // Global/Doc-level parameter: topic_uri
- LongFunction topicUriFunc = (l) -> null;
- if (cmdTpl.containsKey(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
- topicUriFunc = (l) -> cmdTpl.getStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR);
- } else {
- topicUriFunc = (l) -> cmdTpl.getDynamic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR, l);
- }
- }
-
- // Global: JMS destination
- LongFunction jmsDestinationFunc;
- try {
- LongFunction finalTopicUriFunc = topicUriFunc;
- jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(
- jmsDestinationTypeFunc.apply(l),
- finalTopicUriFunc.apply(l));
- }
- catch (JMSRuntimeException ex) {
- throw new RuntimeException("Unable to create JMS destination!");
- }
-
- if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) {
- return resolveMsgSend(asyncApiFunc, jmsDestinationFunc);
- } else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) {
- return resolveMsgRead(asyncApiFunc, jmsDestinationFunc);
- } else {
- throw new RuntimeException("Unsupported JMS operation type");
- }
- }
-
- private LongFunction resolveMsgSend(
- LongFunction async_api_func,
- LongFunction jmsDestinationFunc
- ) {
- JmsHeaderLongFunc jmsHeaderLongFunc = new JmsHeaderLongFunc();
-
- // JMS header: delivery mode
- LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT;
- if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
- msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label));
- }
- else {
- msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l));
- }
- }
- jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc);
-
- // JMS header: message priority
- LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY;
- if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
- msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label));
- }
- else {
- msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l));
- }
- }
- jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc);
-
- // JMS header: message TTL
- LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE;
- if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
- msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label));
- }
- else {
- msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l));
- }
- }
- jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc);
-
- // JMS header: message delivery delay
- LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY;
- if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
- msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label));
- }
- else {
- msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l));
- }
- }
- jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc);
-
- // JMS header: disable message timestamp
- LongFunction disableMsgTimestampFunc = (l) -> false;
- if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
- disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label));
- }
- else {
- disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l));
- }
- }
- jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc);
-
- // JMS header: disable message ID
- LongFunction disableMsgIdFunc = (l) -> false;
- if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
- disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label));
- }
- else {
- disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l));
- }
- }
- jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc);
-
- // JMS message properties
- String jmsMsgPropertyListStr = "";
- if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
- jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR);
- } else {
- throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!");
- }
- }
-
- Map jmsMsgProperties = new HashMap<>();
- if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) {
- jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";"))
- .map(s -> s.split("=", 2))
- .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : ""));
- }
-
- LongFunction msgBodyFunc;
- if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
- msgBodyFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR);
- } else if (cmdTpl.isDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
- msgBodyFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR, l);
- } else {
- msgBodyFunc = (l) -> null;
- }
- } else {
- throw new RuntimeException("JMS message send:: \"msg_body\" field must be specified!");
- }
-
- return new JmsMsgSendMapper(
- jmsActivity,
- async_api_func,
- jmsDestinationFunc,
- jmsHeaderLongFunc,
- jmsMsgProperties,
- msgBodyFunc);
- }
-
- private LongFunction resolveMsgRead(
- LongFunction async_api_func,
- LongFunction jmsDestinationFunc
- ) {
- // For Pulsar JMS, make "durable" as the default
- LongFunction jmsConsumerDurableFunc = (l) -> true;
- if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
- jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR));
- } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
- jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR, l));
- }
- }
-
- LongFunction jmsConsumerSharedFunc = (l) -> true;
- if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
- jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR));
- } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
- jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR, l));
- }
- }
-
- LongFunction jmsMsgSubscriptionFunc = (l) -> "";
- if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
- jmsMsgSubscriptionFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR);
- } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
- jmsMsgSubscriptionFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR, l);
- }
- }
-
- LongFunction jmsMsgReadSelectorFunc = (l) -> "";
- if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
- jmsMsgReadSelectorFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR);
- } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
- jmsMsgReadSelectorFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR, l);
- }
- }
-
- LongFunction jmsMsgNoLocalFunc = (l) -> true;
- if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
- jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR));
- } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
- jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR, l));
- }
- }
-
- LongFunction jmsReadTimeoutFunc = (l) -> 0L;
- if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
- if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
- jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR));
- } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
- jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR, l));
- }
- }
-
- return new JmsMsgReadMapper(
- jmsActivity,
- async_api_func,
- jmsDestinationFunc,
- jmsConsumerDurableFunc,
- jmsConsumerSharedFunc,
- jmsMsgSubscriptionFunc,
- jmsMsgReadSelectorFunc,
- jmsMsgNoLocalFunc,
- jmsReadTimeoutFunc);
- }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java
deleted file mode 100644
index 738f83678..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms.conn;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class JmsConnInfo {
-
- protected final String jmsProviderType;
- protected final Map jmsConnConfig;
-
- protected JmsConnInfo(String jmsProviderType) {
- this.jmsProviderType = jmsProviderType;
- this.jmsConnConfig = new HashMap<>();
- }
-
- public Map getJmsConnConfig() { return this.jmsConnConfig; }
- public void resetJmsConnConfig() { this.jmsConnConfig.clear(); }
- public void addJmsConnConfigItems(Map cfgItems) { this.jmsConnConfig.putAll(cfgItems); }
- public void addJmsConnConfigItem(String key, Object value) { this.jmsConnConfig.put(key, value); }
- public void removeJmsConnConfigItem(String key) { this.jmsConnConfig.remove(key); }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java
deleted file mode 100644
index f1e09fe78..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms.conn;
-
-import io.nosqlbench.driver.jms.util.PulsarConfig;
-
-import java.util.Map;
-
-public class JmsPulsarConnInfo extends JmsConnInfo {
-
- private final String webSvcUrl;
- private final String pulsarSvcUrl;
- private final PulsarConfig extraPulsarConfig;
-
- public JmsPulsarConnInfo(String jmsProviderType, String webSvcUrl, String pulsarSvcUrl, PulsarConfig pulsarConfig) {
- super(jmsProviderType);
-
- this.webSvcUrl = webSvcUrl;
- this.pulsarSvcUrl = pulsarSvcUrl;
- this.extraPulsarConfig = pulsarConfig;
-
- this.addJmsConnConfigItem("webServiceUrl", this.webSvcUrl);
- this.addJmsConnConfigItem("brokerServiceUrl", this.pulsarSvcUrl);
-
- Map clientCfgMap = this.extraPulsarConfig.getClientConfMap();
- if (!clientCfgMap.isEmpty()) {
- this.addJmsConnConfigItems(clientCfgMap);
- }
-
- Map producerCfgMap = this.extraPulsarConfig.getProducerConfMap();
- if (!producerCfgMap.isEmpty()) {
- this.addJmsConnConfigItem("producerConfig", producerCfgMap);
- }
-
- Map consumerCfgMap = this.extraPulsarConfig.getConsumerConfMap();
- if (!consumerCfgMap.isEmpty()) {
- this.addJmsConnConfigItem("consumerConfig", consumerCfgMap);
- }
- }
-
- public String getWebSvcUrl() { return this.webSvcUrl; }
- public String getPulsarSvcUrl() { return this.pulsarSvcUrl; }
- public PulsarConfig getExtraPulsarConfig() { return this.extraPulsarConfig; }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java
deleted file mode 100644
index 0108c0608..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms.ops;
-
-import io.nosqlbench.driver.jms.JmsActivity;
-
-import javax.jms.Destination;
-import java.util.function.LongFunction;
-
-/**
- * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
- * enough state to define a pulsar operation such that it can be executed, measured, and possibly
- * retried if needed.
- *
- * This function doesn't act *as* the operation. It merely maps the construction logic into
- * a simple functional type, given the component functions.
- *
- * For additional parameterization, the command template is also provided.
- */
-public class JmsMsgReadMapper extends JmsOpMapper {
-
- private final LongFunction jmsConsumerDurableFunc;
- private final LongFunction jmsConsumerSharedFunc;
- private final LongFunction jmsMsgSubscriptionFunc;
- private final LongFunction jmsMsgReadSelectorFunc;
- private final LongFunction jmsMsgNoLocalFunc;
- private final LongFunction jmsReadTimeoutFunc;
-
- public JmsMsgReadMapper(JmsActivity jmsActivity,
- LongFunction asyncApiFunc,
- LongFunction jmsDestinationFunc,
- LongFunction jmsConsumerDurableFunc,
- LongFunction jmsConsumerSharedFunc,
- LongFunction jmsMsgSubscriptionFunc,
- LongFunction jmsMsgReadSelectorFunc,
- LongFunction jmsMsgNoLocalFunc,
- LongFunction jmsReadTimeoutFunc) {
- super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
-
- this.jmsConsumerDurableFunc = jmsConsumerDurableFunc;
- this.jmsConsumerSharedFunc = jmsConsumerSharedFunc;
- this.jmsMsgSubscriptionFunc = jmsMsgSubscriptionFunc;
- this.jmsMsgReadSelectorFunc = jmsMsgReadSelectorFunc;
- this.jmsMsgNoLocalFunc = jmsMsgNoLocalFunc;
- this.jmsReadTimeoutFunc = jmsReadTimeoutFunc;
- }
-
- @Override
- public JmsOp apply(long value) {
- boolean asyncApi = asyncApiFunc.apply(value);
- Destination jmsDestination = jmsDestinationFunc.apply(value);
- boolean jmsConsumerDurable = jmsConsumerDurableFunc.apply(value);
- boolean jmsConsumerShared = jmsConsumerSharedFunc.apply(value);
- String jmsMsgSubscription = jmsMsgSubscriptionFunc.apply(value);
- String jmsMsgReadSelector = jmsMsgReadSelectorFunc.apply(value);
- boolean jmsMsgNoLocal = jmsMsgNoLocalFunc.apply(value);
- long jmsReadTimeout = jmsReadTimeoutFunc.apply(value);
-
- // Default to NO read timeout
- if (jmsReadTimeout < 0) jmsReadTimeout = 0;
-
- return new JmsMsgReadOp(
- jmsActivity,
- asyncApi,
- jmsDestination,
- jmsConsumerDurable,
- jmsConsumerShared,
- jmsMsgSubscription,
- jmsMsgReadSelector,
- jmsMsgNoLocal,
- jmsReadTimeout
- );
- }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java
deleted file mode 100644
index de92f73c5..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms.ops;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Histogram;
-import io.nosqlbench.driver.jms.JmsActivity;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import javax.jms.*;
-
-public class JmsMsgReadOp extends JmsTimeTrackOp {
-
- private final static Logger logger = LogManager.getLogger(JmsMsgReadOp.class);
-
- private final JmsActivity jmsActivity;
- private final boolean asyncJmsOp;
- private final Destination jmsDestination;
-
- private final JMSContext jmsContext;
- private final JMSConsumer jmsConsumer;
- private final boolean jmsConsumerDurable;
- private final boolean jmsConsumerShared;
- private final String jmsMsgSubscrption;
- private final String jmsMsgReadSelector;
- private final boolean jmsMsgNoLocal;
- private final long jmsReadTimeout;
-
- private final Counter bytesCounter;
- private final Histogram messagesizeHistogram;
-
- public JmsMsgReadOp(JmsActivity jmsActivity,
- boolean asyncJmsOp,
- Destination jmsDestination,
- boolean jmsConsumerDurable,
- boolean jmsConsumerShared,
- String jmsMsgSubscrption,
- String jmsMsgReadSelector,
- boolean jmsMsgNoLocal,
- long jmsReadTimeout) {
- this.jmsActivity = jmsActivity;
- this.asyncJmsOp = asyncJmsOp;
- this.jmsDestination = jmsDestination;
- this.jmsConsumerDurable = jmsConsumerDurable;
- this.jmsConsumerShared = jmsConsumerShared;
- this.jmsMsgReadSelector = jmsMsgReadSelector;
- this.jmsMsgSubscrption = jmsMsgSubscrption;
- this.jmsMsgNoLocal = jmsMsgNoLocal;
- this.jmsReadTimeout = jmsReadTimeout;
-
- this.jmsContext = jmsActivity.getJmsContext();
- this.jmsConsumer = createJmsConsumer();
-
- this.bytesCounter = jmsActivity.getBytesCounter();
- this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
- }
-
- private JMSConsumer createJmsConsumer() {
- JMSConsumer jmsConsumer;
-
- try {
- if (jmsConsumerDurable) {
- if (jmsConsumerShared)
- jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
- else
- jmsConsumer = jmsContext.createDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector, jmsMsgNoLocal);
- } else {
- if (jmsConsumerShared)
- jmsConsumer = jmsContext.createSharedConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
- else
- jmsConsumer = jmsContext.createConsumer(jmsDestination, jmsMsgReadSelector, jmsMsgNoLocal);
- }
- }
- catch (InvalidDestinationRuntimeException invalidDestinationRuntimeException) {
- throw new RuntimeException("Failed to create JMS consumer: invalid destination!");
- }
- catch (InvalidSelectorRuntimeException invalidSelectorRuntimeException) {
- throw new RuntimeException("Failed to create JMS consumer: invalid message selector!");
- }
- catch (JMSRuntimeException jmsRuntimeException) {
- jmsRuntimeException.printStackTrace();
- throw new RuntimeException("Failed to create JMS consumer: runtime internal error!");
- }
-
- // TODO: async consumer
-// if (this.asyncJmsOp) {
-// jmsConsumer.setMessageListener();
-// }
-
- return jmsConsumer;
- }
-
- @Override
- public void run() {
- // FIXME: jmsReadTimeout being 0 behaves like receiveNoWait() instead of waiting indefinitley
- Message receivedMsg = jmsConsumer.receive(jmsReadTimeout);
- try {
- if (receivedMsg != null) {
- receivedMsg.acknowledge();
- byte[] receivedMsgBody = receivedMsg.getBody(byte[].class);
-
- if (logger.isDebugEnabled()) {
- logger.debug("received msg-payload={}", new String(receivedMsgBody));
- }
-
- int messagesize = receivedMsgBody.length;
- bytesCounter.inc(messagesize);
- messagesizeHistogram.update(messagesize);
- }
- } catch (JMSException e) {
- e.printStackTrace();
- throw new RuntimeException("Failed to acknowledge the received JMS message.");
- }
- }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java
deleted file mode 100644
index fb649f013..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms.ops;
-
-import io.nosqlbench.driver.jms.JmsActivity;
-import io.nosqlbench.driver.jms.util.JmsHeader;
-import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
-
-import javax.jms.Destination;
-import java.util.Map;
-import java.util.function.LongFunction;
-
-/**
- * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
- * enough state to define a pulsar operation such that it can be executed, measured, and possibly
- * retried if needed.
- *
- * This function doesn't act *as* the operation. It merely maps the construction logic into
- * a simple functional type, given the component functions.
- *
- * For additional parameterization, the command template is also provided.
- */
-public class JmsMsgSendMapper extends JmsOpMapper {
- private final JmsHeaderLongFunc jmsHeaderLongFunc;
- private final Map jmsMsgProperties;
- private final LongFunction msgBodyFunc;
-
- public JmsMsgSendMapper(JmsActivity jmsActivity,
- LongFunction asyncApiFunc,
- LongFunction jmsDestinationFunc,
- JmsHeaderLongFunc jmsHeaderLongFunc,
- Map jmsMsgProperties,
- LongFunction msgBodyFunc) {
- super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
-
- this.jmsHeaderLongFunc = jmsHeaderLongFunc;
- this.jmsMsgProperties = jmsMsgProperties;
- this.msgBodyFunc = msgBodyFunc;
- }
-
- @Override
- public JmsOp apply(long value) {
- boolean asyncApi = asyncApiFunc.apply(value);
- Destination jmsDestination = jmsDestinationFunc.apply(value);
- JmsHeader jmsHeader = (JmsHeader)jmsHeaderLongFunc.apply(value);
- String msgBody = msgBodyFunc.apply(value);
-
- return new JmsMsgSendOp(
- jmsActivity,
- asyncApi,
- jmsDestination,
- jmsHeader,
- jmsMsgProperties,
- msgBody
- );
- }
-}
diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java
deleted file mode 100644
index 2f432502c..000000000
--- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (c) 2022 nosqlbench
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.nosqlbench.driver.jms.ops;
-
-import com.codahale.metrics.Counter;
-import com.codahale.metrics.Histogram;
-import io.nosqlbench.driver.jms.JmsActivity;
-import io.nosqlbench.driver.jms.util.JmsHeader;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import javax.jms.*;
-import java.nio.charset.StandardCharsets;
-import java.util.Iterator;
-import java.util.Map;
-
-public class JmsMsgSendOp extends JmsTimeTrackOp {
-
- private final static Logger logger = LogManager.getLogger(JmsMsgSendOp.class);
-
- private final JmsActivity jmsActivity;
- private final boolean asyncJmsOp;
- private final Destination jmsDestination;
- private final JmsHeader jmsHeader;
- private final Map jmsMsgProperties;
-
- private final JMSContext jmsContext;
- private final JMSProducer jmsProducer;
- private final String msgBody;
-
- private final Counter bytesCounter;
- private final Histogram messagesizeHistogram;
-
- public JmsMsgSendOp(JmsActivity jmsActivity,
- boolean asyncJmsOp,
- Destination jmsDestination,
- JmsHeader jmsHeader,
- Map jmsMsgProperties,
- String msgBody) {
- this.jmsActivity = jmsActivity;
- this.asyncJmsOp = asyncJmsOp;
- this.jmsDestination = jmsDestination;
-
- this.jmsHeader = jmsHeader;
- this.jmsMsgProperties = jmsMsgProperties;
- this.msgBody = msgBody;
-
- if (!jmsHeader.isValidHeader()) {
- throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText());
- }
-
- if ((msgBody == null) || msgBody.isEmpty()) {
- throw new RuntimeException("JMS message body can't be empty!");
- }
-
- this.jmsContext = jmsActivity.getJmsContext();
- this.jmsProducer = createJmsProducer();
-
- this.bytesCounter = jmsActivity.getBytesCounter();
- this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
- }
-
- private JMSProducer createJmsProducer() {
- JMSProducer jmsProducer = this.jmsContext.createProducer();
-
- jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode());
- jmsProducer.setPriority(this.jmsHeader.getMsgPriority());
- jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay());
- jmsProducer.setDisableMessageTimestamp(this.jmsHeader.isDisableMsgTimestamp());
- jmsProducer.setDisableMessageID(this.jmsHeader.isDisableMsgId());
-
- if (this.asyncJmsOp) {
- jmsProducer.setAsync(new CompletionListener() {
- @Override
- public void onCompletion(Message msg) {
- try {
- byte[] msgBody = msg.getBody(byte[].class);
- if (logger.isTraceEnabled()) {
- logger.trace("Async message send success - message body: " + new String(msgBody));
- }
- }
- catch (JMSException jmsException) {
- jmsException.printStackTrace();
- logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
- }
- }
-
- @Override
- public void onException(Message msg, Exception e) {
- try {
- byte[] msgBody = msg.getBody(byte[].class);
- if (logger.isTraceEnabled()) {
- logger.trace("Async message send failure - message body: " + new String(msgBody));
- }
- }
- catch (JMSException jmsException) {
- jmsException.printStackTrace();
- logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
- }
- }
- });
- }
-
- for (Map.Entry