From f4aafb70c38b7b2655e915e64fbc0c807466f641 Mon Sep 17 00:00:00 2001 From: yabinmeng-gitee Date: Mon, 3 May 2021 13:33:59 -0500 Subject: [PATCH 1/5] Pulsar JMS driver type name change and package name typo fix --- {driver-pulsarjms => driver-jms}/pom.xml | 11 ++++-- .../io/nosqlbench/driver/jms/JmsAction.java | 20 +++++----- .../io/nosqlbench/driver/jms/JmsActivity.java | 16 ++++---- .../driver/jms/JmsActivityType.java | 16 ++++---- .../io/nosqlbench/driver/jms/ReadyJmsOp.java | 30 +++++++-------- .../driver/jms/ops/JmsMsgSendMapper.java | 22 +++++------ .../driver/jms/ops/JmsMsgSendOp.java | 30 +++++++-------- .../io/nosqlbench/driver/jms/ops/JmsOp.java | 4 +- .../driver/jms/ops/JmsOpMapper.java | 21 +++++++++++ .../driver/jms/ops/JmsTimeTrackOp.java | 4 +- .../jms}/util/PulsarJmsActivityUtil.java | 2 +- .../src/main/resources/pulsar_jms_bytes.yaml | 37 +++++++++++++++++++ .../pulsarjms/ops/PulsarJmsOpMapper.java | 21 ----------- .../src/main/resources/pulsar_jms_bytes.yaml | 21 ----------- nb/pom.xml | 6 +++ pom.xml | 2 +- 16 files changed, 144 insertions(+), 119 deletions(-) rename {driver-pulsarjms => driver-jms}/pom.xml (83%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java (76%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java (89%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java (57%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java (80%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java (58%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java (60%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java (80%) create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java (70%) rename {driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms => driver-jms/src/main/java/io/nosqlbench/driver/jms}/util/PulsarJmsActivityUtil.java (97%) create mode 100644 driver-jms/src/main/resources/pulsar_jms_bytes.yaml delete mode 100644 driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java delete mode 100644 driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml diff --git a/driver-pulsarjms/pom.xml b/driver-jms/pom.xml similarity index 83% rename from driver-pulsarjms/pom.xml rename to driver-jms/pom.xml index e024cb73a..129f1d978 100644 --- a/driver-pulsarjms/pom.xml +++ b/driver-jms/pom.xml @@ -8,16 +8,20 @@ ../mvn-defaults - driver-pulsarjms + driver-jms jar ${project.artifactId} - A PulsarJMS driver for nosqlbench. This provides the ability to inject synthetic data - into a pulsar system via JMS 2.0 compatibile APIs + A JMS driver for nosqlbench. This provides the ability to inject synthetic data + into a pulsar system via JMS 2.0 compatibile APIs. + + NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster + as the potential JMS Destination + datastax-releases-local DataStax Local Releases @@ -32,7 +36,6 @@ - io.nosqlbench diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java similarity index 76% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java index 660c68b5b..54769a243 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java @@ -1,7 +1,7 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; import com.codahale.metrics.Timer; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp; +import io.nosqlbench.driver.jms.ops.JmsOp; import io.nosqlbench.engine.api.activityapi.core.SyncAction; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; import org.apache.logging.log4j.LogManager; @@ -9,16 +9,16 @@ import org.apache.logging.log4j.Logger; import java.util.function.LongFunction; -public class PulsarJmsAction implements SyncAction { +public class JmsAction implements SyncAction { - private final static Logger logger = LogManager.getLogger(PulsarJmsAction.class); + private final static Logger logger = LogManager.getLogger(JmsAction.class); - private final PulsarJmsActivity activity; + private final JmsActivity activity; private final int slot; int maxTries; - public PulsarJmsAction(PulsarJmsActivity activity, int slot) { + public JmsAction(JmsActivity activity, int slot) { this.activity = activity; this.slot = slot; this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10); @@ -36,10 +36,10 @@ public class PulsarJmsAction implements SyncAction { long start = System.nanoTime(); - PulsarJmsOp pulsarJmsOp; + JmsOp jmsOp; try (Timer.Context ctx = activity.getBindTimer().time()) { - LongFunction readyPulsarJmsOp = activity.getSequencer().get(cycle); - pulsarJmsOp = readyPulsarJmsOp.apply(cycle); + LongFunction readyPulsarJmsOp = activity.getSequencer().get(cycle); + jmsOp = readyPulsarJmsOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... activity.getErrorhandler().handleError(bindException, cycle, 0); @@ -53,7 +53,7 @@ public class PulsarJmsAction implements SyncAction { try { // it is up to the pulsarOp to call Context#close when the activity is executed // this allows us to track time for async operations - pulsarJmsOp.run(ctx::close); + jmsOp.run(ctx::close); break; } catch (RuntimeException err) { ErrorDetail errorDetail = activity diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java similarity index 89% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java index fc87f80cf..e6d0a9a9f 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java @@ -1,11 +1,11 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp; -import io.nosqlbench.driver.pulsarjms.util.PulsarJmsActivityUtil; +import io.nosqlbench.driver.jms.ops.JmsOp; +import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityimpl.ActivityDef; @@ -21,7 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class PulsarJmsActivity extends SimpleActivity { +public class JmsActivity extends SimpleActivity { private final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>(); @@ -32,7 +32,7 @@ public class PulsarJmsActivity extends SimpleActivity { private JMSContext jmsContext; - private OpSequence> sequence; + private OpSequence> sequence; private volatile Throwable asyncOperationFailure; private NBErrorHandler errorhandler; @@ -41,7 +41,7 @@ public class PulsarJmsActivity extends SimpleActivity { private Counter bytesCounter; private Histogram messagesizeHistogram; - public PulsarJmsActivity(ActivityDef activityDef) { + public JmsActivity(ActivityDef activityDef) { super(activityDef); } @@ -71,7 +71,7 @@ public class PulsarJmsActivity extends SimpleActivity { bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); - this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this)); + this.sequence = createOpSequence((ot) -> new ReadyJmsOp(ot, this)); setDefaultsFromOpSequence(sequence); onActivityDefUpdate(activityDef); @@ -98,7 +98,7 @@ public class PulsarJmsActivity extends SimpleActivity { @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); } - public OpSequence> getSequencer() { return sequence; } + public OpSequence> getSequencer() { return sequence; } public String getPulsarSvcUrl() { return pulsarSvcUrl; diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java similarity index 57% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java index 395504475..026383147 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java @@ -1,4 +1,4 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; import io.nosqlbench.engine.api.activityapi.core.Action; import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; @@ -7,26 +7,26 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.nb.annotations.Service; @Service(value = ActivityType.class, selector = "pulsarjms") -public class PulsarJmsActivityType implements ActivityType { +public class JmsActivityType implements ActivityType { @Override - public ActionDispenser getActionDispenser(PulsarJmsActivity activity) { + public ActionDispenser getActionDispenser(JmsActivity activity) { return new PulsarJmsActionDispenser(activity); } @Override - public PulsarJmsActivity getActivity(ActivityDef activityDef) { - return new PulsarJmsActivity(activityDef); + public JmsActivity getActivity(ActivityDef activityDef) { + return new JmsActivity(activityDef); } private static class PulsarJmsActionDispenser implements ActionDispenser { - private final PulsarJmsActivity activity; - public PulsarJmsActionDispenser(PulsarJmsActivity activity) { + private final JmsActivity activity; + public PulsarJmsActionDispenser(JmsActivity activity) { this.activity = activity; } @Override public Action getAction(int slot) { - return new PulsarJmsAction(activity, slot); + return new JmsAction(activity, slot); } } } diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java similarity index 80% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java index fab8901e0..5f4972480 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java @@ -1,8 +1,8 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsMsgSendMapper; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp; -import io.nosqlbench.driver.pulsarjms.util.PulsarJmsActivityUtil; +import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper; +import io.nosqlbench.driver.jms.ops.JmsOp; +import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.templating.CommandTemplate; @@ -13,26 +13,26 @@ import javax.jms.Destination; import javax.jms.JMSRuntimeException; import java.util.function.LongFunction; -public class ReadyPulsarJmsOp implements OpDispenser { +public class ReadyJmsOp implements OpDispenser { private final OpTemplate opTpl; private final CommandTemplate cmdTpl; - private final LongFunction opFunc; - private final PulsarJmsActivity pulsarJmsActivity; + private final LongFunction opFunc; + private final JmsActivity jmsActivity; - public ReadyPulsarJmsOp(OpTemplate opTemplate, PulsarJmsActivity pulsarJmsActivity) { + public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) { this.opTpl = opTemplate; this.cmdTpl = new CommandTemplate(opTpl); - this.pulsarJmsActivity = pulsarJmsActivity; + this.jmsActivity = jmsActivity; this.opFunc = resolve(); } - public PulsarJmsOp apply(long value) { + public JmsOp apply(long value) { return opFunc.apply(value); } - public LongFunction resolve() { + public LongFunction resolve() { if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) { throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!"); } @@ -63,7 +63,7 @@ public class ReadyPulsarJmsOp implements OpDispenser { LongFunction jmsDestinationFunc; try { LongFunction finalTopicUriFunc = topicUriFunc; - jmsDestinationFunc = (l) -> pulsarJmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l)); + jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l)); } catch (JMSRuntimeException ex) { throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!"); @@ -79,7 +79,7 @@ public class ReadyPulsarJmsOp implements OpDispenser { } } - private LongFunction resolveMsgSend( + private LongFunction resolveMsgSend( LongFunction async_api_func, LongFunction jmsDestinationFunc ) { @@ -96,8 +96,8 @@ public class ReadyPulsarJmsOp implements OpDispenser { throw new RuntimeException("PulsarJMS message send:: \"msg_body\" field must be specified!"); } - return new PulsarJmsMsgSendMapper( - pulsarJmsActivity, + return new JmsMsgSendMapper( + jmsActivity, async_api_func, jmsDestinationFunc, msgBodyFunc); diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java similarity index 58% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java index bc3d3e7ca..a65e4df1c 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java @@ -1,6 +1,6 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; -import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity; +import io.nosqlbench.driver.jms.JmsActivity; import javax.jms.Destination; import java.util.function.LongFunction; @@ -15,25 +15,25 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarJmsMsgSendMapper extends PulsarJmsOpMapper { +public class JmsMsgSendMapper extends JmsOpMapper { private final LongFunction msgBodyFunc; - public PulsarJmsMsgSendMapper(PulsarJmsActivity pulsarJmsActivity, - LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc, - LongFunction msgBodyFunc) { - super(pulsarJmsActivity, asyncApiFunc, jmsDestinationFunc); + public JmsMsgSendMapper(JmsActivity jmsActivity, + LongFunction asyncApiFunc, + LongFunction jmsDestinationFunc, + LongFunction msgBodyFunc) { + super(jmsActivity, asyncApiFunc, jmsDestinationFunc); this.msgBodyFunc = msgBodyFunc; } @Override - public PulsarJmsOp apply(long value) { + public JmsOp apply(long value) { Destination jmsDestination = jmsDestinationFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); String msgBody = msgBodyFunc.apply(value); - return new PulsarJmsMsgSendOp( - pulsarJmsActivity, + return new JmsMsgSendOp( + jmsActivity, asyncApi, jmsDestination, msgBody diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java similarity index 60% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java index e46f3a856..bc44d7951 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java @@ -1,8 +1,8 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; -import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity; +import io.nosqlbench.driver.jms.JmsActivity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -11,12 +11,12 @@ import javax.jms.JMSContext; import javax.jms.JMSProducer; import java.nio.charset.StandardCharsets; -public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp { +public class JmsMsgSendOp extends JmsTimeTrackOp { - private final static Logger logger = LogManager.getLogger(PulsarJmsMsgSendOp.class); + private final static Logger logger = LogManager.getLogger(JmsMsgSendOp.class); - private final PulsarJmsActivity pulsarActivity; - private final boolean asyncPulsarOp; + private final JmsActivity jmsActivity; + private final boolean asyncJmsOp; private final Destination jmsDestination; private final JMSContext jmsContext; private final JMSProducer jmsProducer; @@ -25,18 +25,18 @@ public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp { private final Counter bytesCounter; private final Histogram messagesizeHistogram; - public PulsarJmsMsgSendOp(PulsarJmsActivity pulsarActivity, - boolean asyncPulsarOp, - Destination jmsDestination, - String msgBody) { - this.pulsarActivity = pulsarActivity; - this.asyncPulsarOp = asyncPulsarOp; + public JmsMsgSendOp(JmsActivity jmsActivity, + boolean asyncJmsOp, + Destination jmsDestination, + String msgBody) { + this.jmsActivity = jmsActivity; + this.asyncJmsOp = asyncJmsOp; this.jmsDestination = jmsDestination; - this.jmsContext = pulsarActivity.getJmsContext(); + this.jmsContext = jmsActivity.getJmsContext(); this.jmsProducer = jmsContext.createProducer(); this.msgBody = msgBody; - this.bytesCounter = pulsarActivity.getBytesCounter(); - this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram(); + this.bytesCounter = jmsActivity.getBytesCounter(); + this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram(); } @Override diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java similarity index 80% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java index 3cda16850..bd06c6bca 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java @@ -1,9 +1,9 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; /** * Base type of all Pulsar Operations including Producers and Consumers. */ -public interface PulsarJmsOp { +public interface JmsOp { /** * Execute the operation, invoke the timeTracker when the operation ended. diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java new file mode 100644 index 000000000..78bc6133c --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java @@ -0,0 +1,21 @@ +package io.nosqlbench.driver.jms.ops; + +import io.nosqlbench.driver.jms.JmsActivity; + +import javax.jms.Destination; +import java.util.function.LongFunction; + +public abstract class JmsOpMapper implements LongFunction { + protected final JmsActivity jmsActivity; + protected final LongFunction asyncApiFunc; + protected final LongFunction jmsDestinationFunc; + + public JmsOpMapper(JmsActivity jmsActivity, + LongFunction asyncApiFunc, + LongFunction jmsDestinationFunc) + { + this.jmsActivity = jmsActivity; + this.asyncApiFunc = asyncApiFunc; + this.jmsDestinationFunc = jmsDestinationFunc; + } +} diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java similarity index 70% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java index 38b0f9305..58bd578d6 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java @@ -1,9 +1,9 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; /** * Base type of all Sync Pulsar Operations including Producers and Consumers. */ -public abstract class PulsarJmsTimeTrackOp implements PulsarJmsOp { +public abstract class JmsTimeTrackOp implements JmsOp { public void run(Runnable timeTracker) { try { diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/util/PulsarJmsActivityUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java similarity index 97% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/util/PulsarJmsActivityUtil.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java index 3440e793f..8f068a810 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/util/PulsarJmsActivityUtil.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java @@ -1,4 +1,4 @@ -package io.nosqlbench.driver.pulsarjms.util; +package io.nosqlbench.driver.jms.util; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; diff --git a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml new file mode 100644 index 000000000..f5b9845f3 --- /dev/null +++ b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml @@ -0,0 +1,37 @@ +bindings: + payload: NumberNameToString() #AlphaNumericString(20) + tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt") + namespace: Mod(10); Div(5L); ToString(); Prefix("ns") + core_topic_name: Mod(5); ToString(); Prefix("t") + +# document level parameters that apply to all Pulsar client types: +params: + ### static only + async_api: "false" + + ### Static Only + # Using Enrico's JMS library (https://github.com/riptano/pulsar-jms), + # we can use "Pulsar" as the JMS provider + jms_provider_type: "pulsar" + + ### Static only + # Valid values: queue (point-to-point) or topic (pub-sub) + jms_desitation_type: "queue" + + ### Static only + # Valid values: persistent or non-persistent + jms_delivery_mode: "persistent" + + ### Static Only + # Only relevant when the JMS provider is Pulsar + #topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" + pulsar_topic_uri: "persistent://public/default/t0" + +blocks: + - name: producer-block + tags: + phase: jms_producer + statements: + - name: s1 + optype: msg_send + msg_body: "{payload}" diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java b/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java deleted file mode 100644 index 1b0a64d5f..000000000 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.nosqlbench.driver.pulsarjms.ops; - -import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity; - -import javax.jms.Destination; -import java.util.function.LongFunction; - -public abstract class PulsarJmsOpMapper implements LongFunction { - protected final PulsarJmsActivity pulsarJmsActivity; - protected final LongFunction asyncApiFunc; - protected final LongFunction jmsDestinationFunc; - - public PulsarJmsOpMapper(PulsarJmsActivity pulsarJmsActivity, - LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc) - { - this.pulsarJmsActivity = pulsarJmsActivity; - this.asyncApiFunc = asyncApiFunc; - this.jmsDestinationFunc = jmsDestinationFunc; - } -} diff --git a/driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml b/driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml deleted file mode 100644 index 138585d29..000000000 --- a/driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml +++ /dev/null @@ -1,21 +0,0 @@ -bindings: - payload: NumberNameToString() #AlphaNumericString(20) - tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt") - namespace: Mod(10); Div(5L); ToString(); Prefix("ns") - core_topic_name: Mod(5); ToString(); Prefix("t") - -# document level parameters that apply to all Pulsar client types: -params: -# topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" - topic_uri: "persistent://public/default/t0" - async_api: "false" - -blocks: - - name: producer-block - tags: - phase: jms_producer - admin_task: false - statements: - - name: s1 - optype: msg_send - msg_body: "{payload}" diff --git a/nb/pom.xml b/nb/pom.xml index ed98a0f6d..ab70b88b9 100644 --- a/nb/pom.xml +++ b/nb/pom.xml @@ -135,6 +135,12 @@ 4.15.45-SNAPSHOT + + io.nosqlbench + driver-jms + 4.15.45-SNAPSHOT + + diff --git a/pom.xml b/pom.xml index 0c94390b2..83d8b467b 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ driver-jdbc driver-cockroachdb driver-pulsar - driver-pulsarjms + driver-jms From 94cbd9c22c7a1feb613ce1b9989829463cf55923 Mon Sep 17 00:00:00 2001 From: yabinmeng-gitee Date: Mon, 3 May 2021 22:50:15 -0500 Subject: [PATCH 2/5] Add support for JMS header fields and message properties --- driver-jms/pom.xml | 15 ++ .../io/nosqlbench/driver/jms/JmsAction.java | 10 +- .../io/nosqlbench/driver/jms/JmsActivity.java | 95 +++++---- .../driver/jms/JmsActivityType.java | 2 +- .../io/nosqlbench/driver/jms/ReadyJmsOp.java | 198 ++++++++++++------ .../driver/jms/ReadyPulsarJmsOp.java | 77 +++++++ .../driver/jms/conn/JmsConnInfo.java | 10 + .../driver/jms/conn/JmsPulsarConnInfo.java | 24 +++ .../driver/jms/ops/JmsMsgSendMapper.java | 12 +- .../driver/jms/ops/JmsMsgSendOp.java | 55 ++++- .../driver/jms/ops/JmsOpMapper.java | 10 +- .../nosqlbench/driver/jms/util/JmsHeader.java | 66 ++++++ .../driver/jms/util/JmsHeaderLongFunc.java | 31 +++ .../nosqlbench/driver/jms/util/JmsUtil.java | 95 +++++++++ .../jms/util/PulsarJmsActivityUtil.java | 56 ----- .../src/main/resources/pulsar_jms_bytes.yaml | 35 +++- 16 files changed, 597 insertions(+), 194 deletions(-) create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java delete mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java diff --git a/driver-jms/pom.xml b/driver-jms/pom.xml index 129f1d978..7d0b5b0bc 100644 --- a/driver-jms/pom.xml +++ b/driver-jms/pom.xml @@ -56,6 +56,21 @@ 3.12.0 + + + org.projectlombok + lombok + 1.18.20 + provided + + + + + + + + + pulsar-jms com.datastax.oss diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java index 54769a243..4c3273627 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java @@ -25,9 +25,7 @@ public class JmsAction implements SyncAction { } @Override - public void init() { - - } + public void init() { } @Override public int runCycle(long cycle) { @@ -38,8 +36,8 @@ public class JmsAction implements SyncAction { JmsOp jmsOp; try (Timer.Context ctx = activity.getBindTimer().time()) { - LongFunction readyPulsarJmsOp = activity.getSequencer().get(cycle); - jmsOp = readyPulsarJmsOp.apply(cycle); + LongFunction readyJmsOp = activity.getSequencer().get(cycle); + jmsOp = readyJmsOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... activity.getErrorhandler().handleError(bindException, cycle, 0); @@ -51,7 +49,7 @@ public class JmsAction implements SyncAction { for (int i = 0; i < maxTries; i++) { Timer.Context ctx = activity.getExecuteTimer().time(); try { - // it is up to the pulsarOp to call Context#close when the activity is executed + // it is up to the jmsOp to call Context#close when the activity is executed // this allows us to track time for async operations jmsOp.run(ctx::close); break; diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java index e6d0a9a9f..4f5d12e18 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java @@ -4,14 +4,19 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; +import io.nosqlbench.driver.jms.conn.JmsConnInfo; +import io.nosqlbench.driver.jms.conn.JmsPulsarConnInfo; import io.nosqlbench.driver.jms.ops.JmsOp; -import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil; +import io.nosqlbench.driver.jms.util.JmsHeader; +import io.nosqlbench.driver.jms.util.JmsUtil; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.metrics.ActivityMetrics; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; import javax.jms.Destination; import javax.jms.JMSContext; @@ -25,10 +30,8 @@ public class JmsActivity extends SimpleActivity { private final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>(); - // e.g. pulsar://localhost:6650 - private String pulsarSvcUrl; - // e.g. http://localhost:8080 - private String webSvcUrl; + private String jmsProviderType; + private JmsConnInfo jmsConnInfo; private JMSContext jmsContext; @@ -49,21 +52,32 @@ public class JmsActivity extends SimpleActivity { public void initActivity() { super.initActivity(); - webSvcUrl = - activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080"); - pulsarSvcUrl = - activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); + // default JMS type: Pulsar + jmsProviderType = + activityDef.getParams() + .getOptionalString(JmsUtil.JMS_PROVIDER_TYPE_KEY_STR) + .orElse(JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label); - Map configuration = new HashMap<>(); - configuration.put("webServiceUrl", webSvcUrl); - configuration.put("brokerServiceUrl", pulsarSvcUrl); + if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) { + jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, activityDef); + } - PulsarConnectionFactory factory; - try { - factory = new PulsarConnectionFactory(configuration); - this.jmsContext = factory.createContext(); - } catch (JMSException e) { - throw new RuntimeException("PulsarJMS message send:: Unable to initialize Pulsar connection factory!"); + PulsarConnectionFactory factory = null; + if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) { + Map configuration = new HashMap<>(); + configuration.put("webServiceUrl", ((JmsPulsarConnInfo)jmsConnInfo).getWebSvcUrl()); + configuration.put("brokerServiceUrl",((JmsPulsarConnInfo)jmsConnInfo).getPulsarSvcUrl()); + + try { + factory = new PulsarConnectionFactory(configuration); + this.jmsContext = factory.createContext(); + } catch (JMSException e) { + throw new RuntimeException( + "Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!"); + } + } + else { + throw new RuntimeException("Unsupported JMS driver type : " + jmsProviderType); } bindTimer = ActivityMetrics.timer(activityDef, "bind"); @@ -71,7 +85,10 @@ public class JmsActivity extends SimpleActivity { bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); - this.sequence = createOpSequence((ot) -> new ReadyJmsOp(ot, this)); + if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) { + this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this)); + } + setDefaultsFromOpSequence(sequence); onActivityDefUpdate(activityDef); @@ -84,12 +101,20 @@ public class JmsActivity extends SimpleActivity { /** * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it */ - public Destination getOrCreateJmsDestination(String pulsarTopic) { - String encodedTopicStr = PulsarJmsActivityUtil.encode(pulsarTopic); + public Destination getOrCreateJmsDestination(String jmsDestinationType, JmsHeader jmsHeader, String destName) { + String encodedTopicStr = + JmsUtil.encode(jmsDestinationType, ("" + jmsHeader.getDeliveryMode()), destName); Destination destination = jmsDestinations.get(encodedTopicStr); if ( destination == null ) { - destination = jmsContext.createQueue(pulsarTopic); + // TODO: should we match Persistent/Non-peristent JMS Delivery mode with + // Pulsar Persistent/Non-prsistent topic? + if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.QUEUE.label)) { + destination = jmsContext.createQueue(destName); + } else if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.TOPIC.label)) { + destination = jmsContext.createTopic(destName); + } + jmsDestinations.put(encodedTopicStr, destination); } @@ -100,28 +125,16 @@ public class JmsActivity extends SimpleActivity { public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); } public OpSequence> getSequencer() { return sequence; } - public String getPulsarSvcUrl() { - return pulsarSvcUrl; - } - public String getWebSvcUrl() { return webSvcUrl; } + public String getJmsProviderType() { return jmsProviderType; } + public JmsConnInfo getJmsConnInfo() { return jmsConnInfo; } public JMSContext getJmsContext() { return jmsContext; } - public Timer getBindTimer() { - return bindTimer; - } - public Timer getExecuteTimer() { - return this.executeTimer; - } - public Counter getBytesCounter() { - return bytesCounter; - } - public Histogram getMessagesizeHistogram() { - return messagesizeHistogram; - } + public Timer getBindTimer() { return bindTimer; } + public Timer getExecuteTimer() { return this.executeTimer; } + public Counter getBytesCounter() { return bytesCounter; } + public Histogram getMessagesizeHistogram() { return messagesizeHistogram; } - public NBErrorHandler getErrorhandler() { - return errorhandler; - } + public NBErrorHandler getErrorhandler() { return errorhandler; } public void failOnAsyncOperationFailure() { if (asyncOperationFailure != null) { diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java index 026383147..0a49a8717 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java @@ -6,7 +6,7 @@ import io.nosqlbench.engine.api.activityapi.core.ActivityType; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.nb.annotations.Service; -@Service(value = ActivityType.class, selector = "pulsarjms") +@Service(value = ActivityType.class, selector = "jms") public class JmsActivityType implements ActivityType { @Override public ActionDispenser getActionDispenser(JmsActivity activity) { diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java index 5f4972480..8c1357bd3 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java @@ -2,104 +2,164 @@ package io.nosqlbench.driver.jms; import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper; import io.nosqlbench.driver.jms.ops.JmsOp; -import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil; +import io.nosqlbench.driver.jms.util.JmsHeader; +import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc; +import io.nosqlbench.driver.jms.util.JmsUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSRuntimeException; +import javax.jms.Message; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.function.LongFunction; +import java.util.stream.Collectors; -public class ReadyJmsOp implements OpDispenser { +abstract public class ReadyJmsOp implements OpDispenser { - private final OpTemplate opTpl; - private final CommandTemplate cmdTpl; - private final LongFunction opFunc; - private final JmsActivity jmsActivity; + protected final OpTemplate opTpl; + protected final CommandTemplate cmdTpl; + protected final JmsActivity jmsActivity; + + protected final String stmtOpType; + protected LongFunction asyncApiFunc; + protected LongFunction jmsDestinationTypeFunc; + protected JmsHeaderLongFunc jmsHeaderLongFunc; + protected Map jmsMsgProperties = new HashMap<>(); + + protected final LongFunction opFunc; public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) { this.opTpl = opTemplate; this.cmdTpl = new CommandTemplate(opTpl); this.jmsActivity = jmsActivity; - this.opFunc = resolve(); - } - - public JmsOp apply(long value) { - return opFunc.apply(value); - } - - public LongFunction resolve() { if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) { throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!"); } - String stmtOpType = cmdTpl.getStatic("optype"); - - // Global/Doc-level parameter: topic_uri - LongFunction topicUriFunc = (l) -> null; - if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) { - if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) { - topicUriFunc = (l) -> cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label); - } else { - topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l); - } - } + this.stmtOpType = cmdTpl.getStatic("optype"); // Global/Doc-level parameter: async_api - LongFunction asyncApiFunc = (l) -> false; - if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { - if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { - boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)); - asyncApiFunc = (l) -> value; + if (cmdTpl.containsKey(JmsUtil.ASYNC_API_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.ASYNC_API_KEY_STR)) { + boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.ASYNC_API_KEY_STR)); + this.asyncApiFunc = (l) -> value; } else { - throw new RuntimeException("\"" + PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!"); + throw new RuntimeException("\"" + JmsUtil.ASYNC_API_KEY_STR + "\" parameter cannot be dynamic!"); } } - // Global: JMS destinaion - LongFunction jmsDestinationFunc; - try { - LongFunction finalTopicUriFunc = topicUriFunc; - jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l)); - } - catch (JMSRuntimeException ex) { - throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!"); - } - - if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_SEND.label)) { - return resolveMsgSend(asyncApiFunc, jmsDestinationFunc); - } /*else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_READ.label)) { - return resolveMsgConsume(topicUriFunc, asyncApiFunc); - } */ - else { - throw new RuntimeException("Unsupported Pulsar operation type"); - } - } - - private LongFunction resolveMsgSend( - LongFunction async_api_func, - LongFunction jmsDestinationFunc - ) { - LongFunction msgBodyFunc; - if (cmdTpl.containsKey("msg_body")) { - if (cmdTpl.isStatic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body"); - } else if (cmdTpl.isDynamic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l); + // Global/Doc-level parameter: jms_desitation_type + if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) { + jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR); } else { - msgBodyFunc = (l) -> null; + throw new RuntimeException("\"" + JmsUtil.JMS_DESTINATION_TYPE_KEY_STR + "\" parameter cannot be dynamic!"); } - } else { - throw new RuntimeException("PulsarJMS message send:: \"msg_body\" field must be specified!"); } - return new JmsMsgSendMapper( - jmsActivity, - async_api_func, - jmsDestinationFunc, - msgBodyFunc); + jmsHeaderLongFunc = new JmsHeaderLongFunc(); + + // JMS header: delivery mode + LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { + msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)); + } + else { + msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l)); + } + } + jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc); + + // JMS header: message priority + LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { + msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)); + } + else { + msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l)); + } + } + jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc); + + // JMS header: message TTL + LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { + msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)); + } + else { + msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l)); + } + } + jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc); + + // JMS header: message delivery delay + LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { + msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)); + } + else { + msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l)); + } + } + jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc); + + // JMS header: disable message timestamp + LongFunction disableMsgTimestampFunc = (l) -> false; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { + disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)); + } + else { + disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l)); + } + } + jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc); + + // JMS header: disable message ID + LongFunction disableMsgIdFunc = (l) -> false; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { + disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)); + } + else { + disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l)); + } + } + jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc); + + + // JMS message properties + String jmsMsgPropertyListStr = ""; + if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { + jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR); + } else { + throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!"); + } + } + + if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) { + jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";")) + .map(s -> s.split("=", 2)) + .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : null)); + } + + this.opFunc = resolveJms(); } + + public JmsOp apply(long value) { return opFunc.apply(value); } + + abstract LongFunction resolveJms(); } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java new file mode 100644 index 000000000..1753582fd --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java @@ -0,0 +1,77 @@ +package io.nosqlbench.driver.jms; + +import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper; +import io.nosqlbench.driver.jms.ops.JmsOp; +import io.nosqlbench.driver.jms.util.JmsHeader; +import io.nosqlbench.driver.jms.util.JmsUtil; +import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; +import org.apache.commons.lang3.StringUtils; + +import javax.jms.Destination; +import javax.jms.JMSRuntimeException; +import java.util.function.LongFunction; + +public class ReadyPulsarJmsOp extends ReadyJmsOp { + public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) { + super(opTemplate, jmsActivity); + } + + public LongFunction resolveJms() { + // Global/Doc-level parameter: topic_uri + LongFunction topicUriFunc = (l) -> null; + if (cmdTpl.containsKey(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) { + topicUriFunc = (l) -> cmdTpl.getStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR); + } else { + topicUriFunc = (l) -> cmdTpl.getDynamic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR, l); + } + } + + // Global: JMS destinaion + LongFunction jmsDestinationFunc; + try { + LongFunction finalTopicUriFunc = topicUriFunc; + jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination( + jmsDestinationTypeFunc.apply(l), + (JmsHeader) jmsHeaderLongFunc.apply(l), + finalTopicUriFunc.apply(l)); + } + catch (JMSRuntimeException ex) { + throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!"); + } + + if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) { + return resolveMsgSend(asyncApiFunc, jmsDestinationFunc); + } /*else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) { + return resolveMsgConsume(asyncApiFunc, jmsDestinationFunc); + } */ else { + throw new RuntimeException("Unsupported Pulsar operation type"); + } + } + + private LongFunction resolveMsgSend( + LongFunction async_api_func, + LongFunction jmsDestinationFunc + ) { + LongFunction msgBodyFunc; + if (cmdTpl.containsKey("msg_body")) { + if (cmdTpl.isStatic("msg_body")) { + msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body"); + } else if (cmdTpl.isDynamic("msg_body")) { + msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l); + } else { + msgBodyFunc = (l) -> null; + } + } else { + throw new RuntimeException("JMS message send:: \"msg_body\" field must be specified!"); + } + + return new JmsMsgSendMapper( + jmsActivity, + async_api_func, + jmsDestinationFunc, + jmsHeaderLongFunc, + jmsMsgProperties, + msgBodyFunc); + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java new file mode 100644 index 000000000..fcd9776d3 --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java @@ -0,0 +1,10 @@ +package io.nosqlbench.driver.jms.conn; + +public class JmsConnInfo { + + private final String jmsProviderType; + + public JmsConnInfo(String jmsProviderType) { + this.jmsProviderType = jmsProviderType; + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java new file mode 100644 index 000000000..10f279124 --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java @@ -0,0 +1,24 @@ +package io.nosqlbench.driver.jms.conn; + +import io.nosqlbench.engine.api.activityimpl.ActivityDef; + +public class JmsPulsarConnInfo extends JmsConnInfo { + + private String pulsarSvcUrl; + private String webSvcUrl; + + public JmsPulsarConnInfo(String jmsProviderType, ActivityDef activityDef) { + super(jmsProviderType); + + webSvcUrl = + activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080"); + pulsarSvcUrl = + activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); + } + + public void setPulsarSvcUrl(String pulsarSvcUrl) { this.pulsarSvcUrl = pulsarSvcUrl; } + public String getPulsarSvcUrl() { return this.pulsarSvcUrl; } + + public void setWebSvcUrl(String webSvcUrl) { this.webSvcUrl = webSvcUrl; } + public String getWebSvcUrl() { return this.webSvcUrl; } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java index a65e4df1c..206f2f184 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java @@ -1,8 +1,11 @@ package io.nosqlbench.driver.jms.ops; import io.nosqlbench.driver.jms.JmsActivity; +import io.nosqlbench.driver.jms.util.JmsHeader; +import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc; import javax.jms.Destination; +import java.util.Map; import java.util.function.LongFunction; /** @@ -21,21 +24,26 @@ public class JmsMsgSendMapper extends JmsOpMapper { public JmsMsgSendMapper(JmsActivity jmsActivity, LongFunction asyncApiFunc, LongFunction jmsDestinationFunc, + JmsHeaderLongFunc jmsHeaderLongFunc, + Map jmsMsgProperties, LongFunction msgBodyFunc) { - super(jmsActivity, asyncApiFunc, jmsDestinationFunc); + super(jmsActivity, asyncApiFunc, jmsDestinationFunc, jmsHeaderLongFunc, jmsMsgProperties); this.msgBodyFunc = msgBodyFunc; } @Override public JmsOp apply(long value) { - Destination jmsDestination = jmsDestinationFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); + Destination jmsDestination = jmsDestinationFunc.apply(value); + JmsHeader jmsHeader = (JmsHeader)jmsHeaderLongFunc.apply(value); String msgBody = msgBodyFunc.apply(value); return new JmsMsgSendOp( jmsActivity, asyncApi, jmsDestination, + jmsHeader, + jmsMsgProperties, msgBody ); } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java index bc44d7951..f954c66dd 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java @@ -3,6 +3,7 @@ package io.nosqlbench.driver.jms.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import io.nosqlbench.driver.jms.JmsActivity; +import io.nosqlbench.driver.jms.util.JmsHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -10,6 +11,8 @@ import javax.jms.Destination; import javax.jms.JMSContext; import javax.jms.JMSProducer; import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.Map; public class JmsMsgSendOp extends JmsTimeTrackOp { @@ -18,6 +21,9 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { private final JmsActivity jmsActivity; private final boolean asyncJmsOp; private final Destination jmsDestination; + private final JmsHeader jmsHeader; + private final Map jmsMsgProperties; + private final JMSContext jmsContext; private final JMSProducer jmsProducer; private final String msgBody; @@ -28,23 +34,56 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { public JmsMsgSendOp(JmsActivity jmsActivity, boolean asyncJmsOp, Destination jmsDestination, + JmsHeader jmsHeader, + Map jmsMsgProperties, String msgBody) { this.jmsActivity = jmsActivity; this.asyncJmsOp = asyncJmsOp; this.jmsDestination = jmsDestination; - this.jmsContext = jmsActivity.getJmsContext(); - this.jmsProducer = jmsContext.createProducer(); - this.msgBody = msgBody; - this.bytesCounter = jmsActivity.getBytesCounter(); - this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram(); - } - @Override - public void run() { + this.jmsHeader = jmsHeader; + this.jmsMsgProperties = jmsMsgProperties; + this.msgBody = msgBody; + + if (jmsHeader.isValidHeader()) { + throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText()); + } + if ((msgBody == null) || msgBody.isEmpty()) { throw new RuntimeException("JMS message body can't be empty!"); } + this.jmsContext = jmsActivity.getJmsContext(); + this.jmsProducer = createJmsProducer(); + + this.bytesCounter = jmsActivity.getBytesCounter(); + this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram(); + } + + private JMSProducer createJmsProducer() { + JMSProducer jmsProducer = this.jmsContext.createProducer(); + jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode()); + jmsProducer.setPriority(this.jmsHeader.getMsgPriority()); + jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay()); + jmsProducer.setDisableMessageTimestamp(this.jmsHeader.isDisableMsgTimestamp()); + jmsProducer.setDisableMessageID(this.jmsHeader.isDisableMsgId()); + + // TODO: async producer +// if (this.asyncJmsOp) { +// jmsProducer.setAsync(); +// } + + Iterator> itr = jmsMsgProperties.entrySet().iterator(); + while(itr.hasNext()) { + Map.Entry entry = itr.next(); + jmsProducer.setProperty(entry.getKey(), entry.getValue()); + } + + return jmsProducer; + } + + @Override + public void run() { int messageSize; try { byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8); diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java index 78bc6133c..c67cbd691 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java @@ -1,21 +1,29 @@ package io.nosqlbench.driver.jms.ops; import io.nosqlbench.driver.jms.JmsActivity; +import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc; import javax.jms.Destination; +import java.util.Map; import java.util.function.LongFunction; public abstract class JmsOpMapper implements LongFunction { protected final JmsActivity jmsActivity; protected final LongFunction asyncApiFunc; protected final LongFunction jmsDestinationFunc; + protected final JmsHeaderLongFunc jmsHeaderLongFunc; + protected final Map jmsMsgProperties; public JmsOpMapper(JmsActivity jmsActivity, LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc) + LongFunction jmsDestinationFunc, + JmsHeaderLongFunc jmsHeaderLongFunc, + Map jmsMsgProperties) { this.jmsActivity = jmsActivity; this.asyncApiFunc = asyncApiFunc; this.jmsDestinationFunc = jmsDestinationFunc; + this.jmsHeaderLongFunc = jmsHeaderLongFunc; + this.jmsMsgProperties = jmsMsgProperties; } } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java new file mode 100644 index 000000000..236eb95ee --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeader.java @@ -0,0 +1,66 @@ +package io.nosqlbench.driver.jms.util; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.apache.commons.lang.StringUtils; + +import javax.jms.DeliveryMode; + +@Setter +@Getter +@AllArgsConstructor +@ToString +public class JmsHeader { + private int deliveryMode; + private int msgPriority; + private long msgTtl; + private long msgDeliveryDelay; + private boolean disableMsgTimestamp; + private boolean disableMsgId; + + public boolean isValidDeliveryMode() { + return (deliveryMode == DeliveryMode.NON_PERSISTENT) || (deliveryMode == DeliveryMode.PERSISTENT); + } + + public boolean isValidPriority() { + return (msgPriority >= 0) && (msgPriority <= 9); + } + + public boolean isValidTtl() { + return msgTtl >= 0; + } + + public boolean isValidDeliveryDelay() { + return msgTtl >= 0; + } + + public boolean isValidHeader() { + return isValidDeliveryMode() + && isValidPriority() + && isValidTtl() + && isValidDeliveryDelay(); + } + + public String getInvalidJmsHeaderMsgText() { + StringBuilder sb = new StringBuilder(); + + if (!isValidDeliveryMode()) + sb.append("delivery mode - " + deliveryMode + "; "); + if (!isValidPriority()) + sb.append("message priority - " + msgPriority + "; "); + if (!isValidTtl()) + sb.append("message TTL - " + msgTtl + "; "); + if (!isValidDeliveryDelay()) + sb.append("message delivery delay - " + msgDeliveryDelay + "; "); + + String invalidMsgText = sb.toString(); + if (StringUtils.length(invalidMsgText) > 0) + invalidMsgText = StringUtils.substringBeforeLast(invalidMsgText, ";"); + else + invalidMsgText = "none"; + + return "Invalid JMS header values: " + invalidMsgText; + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java new file mode 100644 index 000000000..c094a6c81 --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsHeaderLongFunc.java @@ -0,0 +1,31 @@ +package io.nosqlbench.driver.jms.util; + +import lombok.*; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import java.util.function.LongFunction; + +@Setter +@Getter +@NoArgsConstructor +public class JmsHeaderLongFunc implements LongFunction { + private LongFunction deliveryModeFunc; + private LongFunction msgPriorityFunc; + private LongFunction msgTtlFunc; + private LongFunction msgDeliveryDelayFunc; + private LongFunction disableMsgTimestampFunc; + private LongFunction disableMsgIdFunc; + + @Override + public Object apply(long value) { + return new JmsHeader( + (deliveryModeFunc != null) ? deliveryModeFunc.apply(value) : DeliveryMode.PERSISTENT, + (msgPriorityFunc != null) ? msgPriorityFunc.apply(value) : Message.DEFAULT_PRIORITY, + (msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_TIME_TO_LIVE, + (msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_DELIVERY_DELAY, + (disableMsgTimestampFunc != null) ? disableMsgTimestampFunc.apply(value) : false, + (disableMsgIdFunc != null) ? disableMsgIdFunc.apply(value) : false + ); + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java new file mode 100644 index 000000000..be10f0f79 --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java @@ -0,0 +1,95 @@ +package io.nosqlbench.driver.jms.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Arrays; +import java.util.Base64; + +public class JmsUtil { + + private final static Logger logger = LogManager.getLogger(JmsUtil.class); + + public final static String ASYNC_API_KEY_STR = "async_api"; + public final static String JMS_PROVIDER_TYPE_KEY_STR = "jms_provider_type"; + public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type"; + + public final static String JMS_PRODUCER_MSG_PROPERTY_KEY_STR = "jms_producer_msg_properties"; + + // Only applicable to Pulsar JMS provider + public final static String PULSAR_JMS_TOPIC_URI_KEY_STR = "pulsar_topic_uri"; + + // Supported message operation types + public enum OP_TYPES { + MSG_SEND("msg_send"), + MSG_READ("msg_read"); + + public final String label; + OP_TYPES(String label) { + this.label = label; + } + } + public static boolean isValidClientType(String type) { + return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type)); + } + + // Supported JMS provider type + public enum JMS_PROVIDER_TYPES { + PULSAR("pulsar"); + + public final String label; + JMS_PROVIDER_TYPES(String label) { + this.label = label; + } + } + public static boolean isValidJmsProviderType(String type) { + return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type)); + } + + // JMS Destination Types + public enum JMS_DESTINATION_TYPES { + QUEUE("queue"), + TOPIC("topic"); + + public final String label; + JMS_DESTINATION_TYPES(String label) { + this.label = label; + } + } + public static boolean isValidJmsDestinationType(String type) { + return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type)); + } + + // Supported JMS provider type + public enum JMS_MSG_HEADER_KEYS { + DELIVERY_MODE("jms_producer_header_msg_delivery_mode"), + PRIORITY("jms_producer_header_msg_priority"), + TTL("jms_producer_header_msg_ttl"), + DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"), + DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"), + DISABLE_ID("jms_producer_header_disable_msg_id"); + + public final String label; + JMS_MSG_HEADER_KEYS(String label) { + this.label = label; + } + } + public static boolean isValidJmsHeaderKey(String type) { + return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type)); + } + + public static String encode(String... strings) { + StringBuilder stringBuilder = new StringBuilder(); + for (String str : strings) { + if (!StringUtils.isBlank(str)) + stringBuilder.append(str).append("::"); + } + + String concatenatedStr = + StringUtils.substringBeforeLast(stringBuilder.toString(), "::"); + + return Base64.getEncoder().encodeToString(concatenatedStr.getBytes()); + } +} + diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java deleted file mode 100644 index 8f068a810..000000000 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java +++ /dev/null @@ -1,56 +0,0 @@ -package io.nosqlbench.driver.jms.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Arrays; -import java.util.Base64; - -public class PulsarJmsActivityUtil { - - private final static Logger logger = LogManager.getLogger(PulsarJmsActivityUtil.class); - - // Supported message operation types - public enum OP_TYPES { - MSG_SEND("msg_send"), - MSG_READ("msg_read"); - - public final String label; - - OP_TYPES(String label) { - this.label = label; - } - } - public static boolean isValidClientType(String type) { - return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type)); - } - - public enum DOC_LEVEL_PARAMS { - TOPIC_URI("topic_uri"), - ASYNC_API("async_api"); - - public final String label; - - DOC_LEVEL_PARAMS(String label) { - this.label = label; - } - } - public static boolean isValidDocLevelParam(String param) { - return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(param)); - } - - public static String encode(String... strings) { - StringBuilder stringBuilder = new StringBuilder(); - for (String str : strings) { - if (!StringUtils.isBlank(str)) - stringBuilder.append(str).append("::"); - } - - String concatenatedStr = - StringUtils.substringBeforeLast(stringBuilder.toString(), "::"); - - return Base64.getEncoder().encodeToString(concatenatedStr.getBytes()); - } -} - diff --git a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml index f5b9845f3..21f3b788e 100644 --- a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml +++ b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml @@ -9,22 +9,37 @@ params: ### static only async_api: "false" - ### Static Only - # Using Enrico's JMS library (https://github.com/riptano/pulsar-jms), - # we can use "Pulsar" as the JMS provider - jms_provider_type: "pulsar" - ### Static only # Valid values: queue (point-to-point) or topic (pub-sub) jms_desitation_type: "queue" - ### Static only - # Valid values: persistent or non-persistent - jms_delivery_mode: "persistent" + ### JMS producer message header + ### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT + # - static or dynamic + # - Producer only + # Valid values: non-persistent(1), or persistent(2) - default + jms_producer_header_msg_delivery_mode: "2" + # Valid values: 0~9 (4 as default) + jms_producer_header_msg_priority: "4" + # Valid values: non-negative long; default 0 (never expires) + jms_producer_header_msg_ttl: "0" + # Valid values: non-negative long; default 0 (no delay) + jms_producer_header_msg_delivery_delay: "0" + # Valid values: true/false; default false (message timestamp is enabled) + jms_producer_header_disable_msg_timestamp: "false" + # Valid values: true/false; default false (message ID is enabled) + jms_producer_header_disable_msg_id: "false" + + ### JMS producer message properties + # - static only + # - Producer only + # - In format: "key1=value1;key2=value2;..." + jms_producer_msg_properties: "" + ### Static Only - # Only relevant when the JMS provider is Pulsar - #topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" + # NOTE: ONLY relevant when the JMS provider is Pulsar + #pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" pulsar_topic_uri: "persistent://public/default/t0" blocks: From b1fe01e59160db8d9fdf301c07df5f1b3fc45bbd Mon Sep 17 00:00:00 2001 From: yabinmeng-gitee Date: Tue, 4 May 2021 15:51:20 -0500 Subject: [PATCH 3/5] Add support for JMS consumer --- .../io/nosqlbench/driver/jms/JmsActivity.java | 6 +- .../io/nosqlbench/driver/jms/ReadyJmsOp.java | 95 +-------- .../driver/jms/ReadyPulsarJmsOp.java | 197 ++++++++++++++++-- .../driver/jms/ops/JmsMsgReadMapper.java | 72 +++++++ .../driver/jms/ops/JmsMsgReadOp.java | 114 ++++++++++ .../driver/jms/ops/JmsMsgSendMapper.java | 7 +- .../driver/jms/ops/JmsMsgSendOp.java | 8 +- .../driver/jms/ops/JmsOpMapper.java | 8 +- .../nosqlbench/driver/jms/util/JmsUtil.java | 46 ++-- driver-jms/src/main/resources/jms.md | 1 + .../src/main/resources/pulsar_jms_bytes.yaml | 90 +++++--- 11 files changed, 476 insertions(+), 168 deletions(-) create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java create mode 100644 driver-jms/src/main/resources/jms.md diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java index 4f5d12e18..4622210b1 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java @@ -62,7 +62,7 @@ public class JmsActivity extends SimpleActivity { jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, activityDef); } - PulsarConnectionFactory factory = null; + PulsarConnectionFactory factory; if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) { Map configuration = new HashMap<>(); configuration.put("webServiceUrl", ((JmsPulsarConnInfo)jmsConnInfo).getWebSvcUrl()); @@ -101,9 +101,9 @@ public class JmsActivity extends SimpleActivity { /** * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it */ - public Destination getOrCreateJmsDestination(String jmsDestinationType, JmsHeader jmsHeader, String destName) { + public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) { String encodedTopicStr = - JmsUtil.encode(jmsDestinationType, ("" + jmsHeader.getDeliveryMode()), destName); + JmsUtil.encode(jmsDestinationType, destName); Destination destination = jmsDestinations.get(encodedTopicStr); if ( destination == null ) { diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java index 8c1357bd3..70be982c8 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java @@ -31,8 +31,6 @@ abstract public class ReadyJmsOp implements OpDispenser { protected final String stmtOpType; protected LongFunction asyncApiFunc; protected LongFunction jmsDestinationTypeFunc; - protected JmsHeaderLongFunc jmsHeaderLongFunc; - protected Map jmsMsgProperties = new HashMap<>(); protected final LongFunction opFunc; @@ -57,6 +55,8 @@ abstract public class ReadyJmsOp implements OpDispenser { } // Global/Doc-level parameter: jms_desitation_type + // - queue: point-to-point + // - topic: pub/sub if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) { if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) { jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR); @@ -65,97 +65,6 @@ abstract public class ReadyJmsOp implements OpDispenser { } } - jmsHeaderLongFunc = new JmsHeaderLongFunc(); - - // JMS header: delivery mode - LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { - msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)); - } - else { - msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l)); - } - } - jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc); - - // JMS header: message priority - LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { - msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)); - } - else { - msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l)); - } - } - jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc); - - // JMS header: message TTL - LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { - msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)); - } - else { - msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l)); - } - } - jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc); - - // JMS header: message delivery delay - LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { - msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)); - } - else { - msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l)); - } - } - jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc); - - // JMS header: disable message timestamp - LongFunction disableMsgTimestampFunc = (l) -> false; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { - disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)); - } - else { - disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l)); - } - } - jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc); - - // JMS header: disable message ID - LongFunction disableMsgIdFunc = (l) -> false; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { - disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)); - } - else { - disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l)); - } - } - jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc); - - - // JMS message properties - String jmsMsgPropertyListStr = ""; - if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { - if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { - jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR); - } else { - throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!"); - } - } - - if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) { - jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";")) - .map(s -> s.split("=", 2)) - .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : null)); - } - this.opFunc = resolveJms(); } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java index 1753582fd..c6e8f431d 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java @@ -1,17 +1,27 @@ package io.nosqlbench.driver.jms; +import io.nosqlbench.driver.jms.ops.JmsMsgReadMapper; import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper; import io.nosqlbench.driver.jms.ops.JmsOp; -import io.nosqlbench.driver.jms.util.JmsHeader; +import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc; import io.nosqlbench.driver.jms.util.JmsUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSRuntimeException; +import javax.jms.Message; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.function.LongFunction; +import java.util.stream.Collectors; public class ReadyPulsarJmsOp extends ReadyJmsOp { + public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) { super(opTemplate, jmsActivity); } @@ -27,25 +37,24 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp { } } - // Global: JMS destinaion + // Global: JMS destination LongFunction jmsDestinationFunc; try { LongFunction finalTopicUriFunc = topicUriFunc; jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination( jmsDestinationTypeFunc.apply(l), - (JmsHeader) jmsHeaderLongFunc.apply(l), finalTopicUriFunc.apply(l)); } catch (JMSRuntimeException ex) { - throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!"); + throw new RuntimeException("Unable to create JMS destination!"); } if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) { return resolveMsgSend(asyncApiFunc, jmsDestinationFunc); - } /*else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) { - return resolveMsgConsume(asyncApiFunc, jmsDestinationFunc); - } */ else { - throw new RuntimeException("Unsupported Pulsar operation type"); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) { + return resolveMsgRead(asyncApiFunc, jmsDestinationFunc); + } else { + throw new RuntimeException("Unsupported JMS operation type"); } } @@ -53,12 +62,103 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp { LongFunction async_api_func, LongFunction jmsDestinationFunc ) { + JmsHeaderLongFunc jmsHeaderLongFunc = new JmsHeaderLongFunc(); + + // JMS header: delivery mode + LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { + msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)); + } + else { + msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l)); + } + } + jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc); + + // JMS header: message priority + LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { + msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)); + } + else { + msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l)); + } + } + jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc); + + // JMS header: message TTL + LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { + msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)); + } + else { + msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l)); + } + } + jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc); + + // JMS header: message delivery delay + LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { + msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)); + } + else { + msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l)); + } + } + jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc); + + // JMS header: disable message timestamp + LongFunction disableMsgTimestampFunc = (l) -> false; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { + disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)); + } + else { + disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l)); + } + } + jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc); + + // JMS header: disable message ID + LongFunction disableMsgIdFunc = (l) -> false; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { + disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)); + } + else { + disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l)); + } + } + jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc); + + // JMS message properties + String jmsMsgPropertyListStr = ""; + if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { + jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR); + } else { + throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!"); + } + } + + Map jmsMsgProperties = new HashMap<>(); + if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) { + jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";")) + .map(s -> s.split("=", 2)) + .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : "")); + } + LongFunction msgBodyFunc; - if (cmdTpl.containsKey("msg_body")) { - if (cmdTpl.isStatic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body"); - } else if (cmdTpl.isDynamic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l); + if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) { + msgBodyFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) { + msgBodyFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR, l); } else { msgBodyFunc = (l) -> null; } @@ -74,4 +174,75 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp { jmsMsgProperties, msgBodyFunc); } + + private LongFunction resolveMsgRead( + LongFunction async_api_func, + LongFunction jmsDestinationFunc + ) { + // For Pulsar JMS, make "durable" as the default + LongFunction jmsConsumerDurableFunc = (l) -> true; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) { + jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) { + jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR, l)); + } + } + + LongFunction jmsConsumerSharedFunc = (l) -> true; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) { + jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) { + jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR, l)); + } + } + + LongFunction jmsMsgSubscriptionFunc = (l) -> ""; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) { + jmsMsgSubscriptionFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) { + jmsMsgSubscriptionFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR, l); + } + } + + LongFunction jmsMsgReadSelectorFunc = (l) -> ""; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) { + jmsMsgReadSelectorFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) { + jmsMsgReadSelectorFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR, l); + } + } + + LongFunction jmsMsgNoLocalFunc = (l) -> true; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) { + jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) { + jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR, l)); + } + } + + LongFunction jmsReadTimeoutFunc = (l) -> 0L; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) { + jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) { + jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR, l)); + } + } + + return new JmsMsgReadMapper( + jmsActivity, + async_api_func, + jmsDestinationFunc, + jmsConsumerDurableFunc, + jmsConsumerSharedFunc, + jmsMsgSubscriptionFunc, + jmsMsgReadSelectorFunc, + jmsMsgNoLocalFunc, + jmsReadTimeoutFunc); + } } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java new file mode 100644 index 000000000..91a5ca5ba --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java @@ -0,0 +1,72 @@ +package io.nosqlbench.driver.jms.ops; + +import io.nosqlbench.driver.jms.JmsActivity; + +import javax.jms.Destination; +import java.util.function.LongFunction; + +/** + * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains + * enough state to define a pulsar operation such that it can be executed, measured, and possibly + * retried if needed. + * + * This function doesn't act *as* the operation. It merely maps the construction logic into + * a simple functional type, given the component functions. + * + * For additional parameterization, the command template is also provided. + */ +public class JmsMsgReadMapper extends JmsOpMapper { + + private final LongFunction jmsConsumerDurableFunc; + private final LongFunction jmsConsumerSharedFunc; + private final LongFunction jmsMsgSubscriptionFunc; + private final LongFunction jmsMsgReadSelectorFunc; + private final LongFunction jmsMsgNoLocalFunc; + private final LongFunction jmsReadTimeoutFunc; + + public JmsMsgReadMapper(JmsActivity jmsActivity, + LongFunction asyncApiFunc, + LongFunction jmsDestinationFunc, + LongFunction jmsConsumerDurableFunc, + LongFunction jmsConsumerSharedFunc, + LongFunction jmsMsgSubscriptionFunc, + LongFunction jmsMsgReadSelectorFunc, + LongFunction jmsMsgNoLocalFunc, + LongFunction jmsReadTimeoutFunc) { + super(jmsActivity, asyncApiFunc, jmsDestinationFunc); + + this.jmsConsumerDurableFunc = jmsConsumerDurableFunc; + this.jmsConsumerSharedFunc = jmsConsumerSharedFunc; + this.jmsMsgSubscriptionFunc = jmsMsgSubscriptionFunc; + this.jmsMsgReadSelectorFunc = jmsMsgReadSelectorFunc; + this.jmsMsgNoLocalFunc = jmsMsgNoLocalFunc; + this.jmsReadTimeoutFunc = jmsReadTimeoutFunc; + } + + @Override + public JmsOp apply(long value) { + boolean asyncApi = asyncApiFunc.apply(value); + Destination jmsDestination = jmsDestinationFunc.apply(value); + boolean jmsConsumerDurable = jmsConsumerDurableFunc.apply(value); + boolean jmsConsumerShared = jmsConsumerSharedFunc.apply(value); + String jmsMsgSubscription = jmsMsgSubscriptionFunc.apply(value); + String jmsMsgReadSelector = jmsMsgReadSelectorFunc.apply(value); + boolean jmsMsgNoLocal = jmsMsgNoLocalFunc.apply(value); + long jmsReadTimeout = jmsReadTimeoutFunc.apply(value); + + // Default to NO read timeout + if (jmsReadTimeout < 0) jmsReadTimeout = 0; + + return new JmsMsgReadOp( + jmsActivity, + asyncApi, + jmsDestination, + jmsConsumerDurable, + jmsConsumerShared, + jmsMsgSubscription, + jmsMsgReadSelector, + jmsMsgNoLocal, + jmsReadTimeout + ); + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java new file mode 100644 index 000000000..e83ff826c --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java @@ -0,0 +1,114 @@ +package io.nosqlbench.driver.jms.ops; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import io.nosqlbench.driver.jms.JmsActivity; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; + +public class JmsMsgReadOp extends JmsTimeTrackOp { + + private final static Logger logger = LogManager.getLogger(JmsMsgReadOp.class); + + private final JmsActivity jmsActivity; + private final boolean asyncJmsOp; + private final Destination jmsDestination; + + private final JMSContext jmsContext; + private final JMSConsumer jmsConsumer; + private final boolean jmsConsumerDurable; + private final boolean jmsConsumerShared; + private final String jmsMsgSubscrption; + private final String jmsMsgReadSelector; + private final boolean jmsMsgNoLocal; + private final long jmsReadTimeout; + + private final Counter bytesCounter; + private final Histogram messagesizeHistogram; + + public JmsMsgReadOp(JmsActivity jmsActivity, + boolean asyncJmsOp, + Destination jmsDestination, + boolean jmsConsumerDurable, + boolean jmsConsumerShared, + String jmsMsgSubscrption, + String jmsMsgReadSelector, + boolean jmsMsgNoLocal, + long jmsReadTimeout) { + this.jmsActivity = jmsActivity; + this.asyncJmsOp = asyncJmsOp; + this.jmsDestination = jmsDestination; + this.jmsConsumerDurable = jmsConsumerDurable; + this.jmsConsumerShared = jmsConsumerShared; + this.jmsMsgReadSelector = jmsMsgReadSelector; + this.jmsMsgSubscrption = jmsMsgSubscrption; + this.jmsMsgNoLocal = jmsMsgNoLocal; + this.jmsReadTimeout = jmsReadTimeout; + + this.jmsContext = jmsActivity.getJmsContext(); + this.jmsConsumer = createJmsConsumer(); + + this.bytesCounter = jmsActivity.getBytesCounter(); + this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram(); + } + + private JMSConsumer createJmsConsumer() { + JMSConsumer jmsConsumer; + + try { + if (jmsConsumerDurable) { + if (jmsConsumerShared) + jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector); + else + jmsConsumer = jmsContext.createDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector, jmsMsgNoLocal); + } else { + if (jmsConsumerShared) + jmsConsumer = jmsContext.createSharedConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector); + else + jmsConsumer = jmsContext.createConsumer(jmsDestination, jmsMsgReadSelector, jmsMsgNoLocal); + } + } + catch (InvalidDestinationRuntimeException invalidDestinationRuntimeException) { + throw new RuntimeException("Failed to create JMS consumer: invalid destination!"); + } + catch (InvalidSelectorRuntimeException invalidSelectorRuntimeException) { + throw new RuntimeException("Failed to create JMS consumer: invalid message selector!"); + } + catch (JMSRuntimeException jmsRuntimeException) { + jmsRuntimeException.printStackTrace(); + throw new RuntimeException("Failed to create JMS consumer: runtime internal error!"); + } + + // TODO: async consumer +// if (this.asyncJmsOp) { +// jmsConsumer.setMessageListener(); +// } + + return jmsConsumer; + } + + @Override + public void run() { + // FIXME: jmsReadTimeout being 0 behaves like receiveNoWait() instead of waiting indefinitley + Message receivedMsg = jmsConsumer.receive(jmsReadTimeout); + try { + if (receivedMsg != null) { + receivedMsg.acknowledge(); + byte[] receivedMsgBody = receivedMsg.getBody(byte[].class); + + if (logger.isDebugEnabled()) { + logger.debug("received msg-payload={}", new String(receivedMsgBody)); + } + + int messagesize = receivedMsgBody.length; + bytesCounter.inc(messagesize); + messagesizeHistogram.update(messagesize); + } + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException("Failed to acknowledge the received JMS message."); + } + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java index 206f2f184..1714c4212 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java @@ -19,6 +19,8 @@ import java.util.function.LongFunction; * For additional parameterization, the command template is also provided. */ public class JmsMsgSendMapper extends JmsOpMapper { + private final JmsHeaderLongFunc jmsHeaderLongFunc; + private final Map jmsMsgProperties; private final LongFunction msgBodyFunc; public JmsMsgSendMapper(JmsActivity jmsActivity, @@ -27,7 +29,10 @@ public class JmsMsgSendMapper extends JmsOpMapper { JmsHeaderLongFunc jmsHeaderLongFunc, Map jmsMsgProperties, LongFunction msgBodyFunc) { - super(jmsActivity, asyncApiFunc, jmsDestinationFunc, jmsHeaderLongFunc, jmsMsgProperties); + super(jmsActivity, asyncApiFunc, jmsDestinationFunc); + + this.jmsHeaderLongFunc = jmsHeaderLongFunc; + this.jmsMsgProperties = jmsMsgProperties; this.msgBodyFunc = msgBodyFunc; } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java index f954c66dd..c0f935771 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java @@ -45,7 +45,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { this.jmsMsgProperties = jmsMsgProperties; this.msgBody = msgBody; - if (jmsHeader.isValidHeader()) { + if (!jmsHeader.isValidHeader()) { throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText()); } @@ -62,6 +62,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { private JMSProducer createJmsProducer() { JMSProducer jmsProducer = this.jmsContext.createProducer(); + jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode()); jmsProducer.setPriority(this.jmsHeader.getMsgPriority()); jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay()); @@ -73,9 +74,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { // jmsProducer.setAsync(); // } - Iterator> itr = jmsMsgProperties.entrySet().iterator(); - while(itr.hasNext()) { - Map.Entry entry = itr.next(); + for (Map.Entry entry : jmsMsgProperties.entrySet()) { jmsProducer.setProperty(entry.getKey(), entry.getValue()); } @@ -89,6 +88,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8); messageSize = msgBytes.length; jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8)); + messagesizeHistogram.update(messageSize); bytesCounter.inc(messageSize); } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java index c67cbd691..04dac1450 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java @@ -11,19 +11,13 @@ public abstract class JmsOpMapper implements LongFunction { protected final JmsActivity jmsActivity; protected final LongFunction asyncApiFunc; protected final LongFunction jmsDestinationFunc; - protected final JmsHeaderLongFunc jmsHeaderLongFunc; - protected final Map jmsMsgProperties; public JmsOpMapper(JmsActivity jmsActivity, LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc, - JmsHeaderLongFunc jmsHeaderLongFunc, - Map jmsMsgProperties) + LongFunction jmsDestinationFunc) { this.jmsActivity = jmsActivity; this.asyncApiFunc = asyncApiFunc; this.jmsDestinationFunc = jmsDestinationFunc; - this.jmsHeaderLongFunc = jmsHeaderLongFunc; - this.jmsMsgProperties = jmsMsgProperties; } } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java index be10f0f79..99fcd1b2e 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java @@ -15,7 +15,35 @@ public class JmsUtil { public final static String JMS_PROVIDER_TYPE_KEY_STR = "jms_provider_type"; public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type"; + ///// JMS Producer + // Supported JMS provider type + public enum JMS_MSG_HEADER_KEYS { + DELIVERY_MODE("jms_producer_header_msg_delivery_mode"), + PRIORITY("jms_producer_header_msg_priority"), + TTL("jms_producer_header_msg_ttl"), + DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"), + DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"), + DISABLE_ID("jms_producer_header_disable_msg_id"); + + public final String label; + JMS_MSG_HEADER_KEYS(String label) { + this.label = label; + } + } + public static boolean isValidJmsHeaderKey(String type) { + return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type)); + } public final static String JMS_PRODUCER_MSG_PROPERTY_KEY_STR = "jms_producer_msg_properties"; + public final static String JMS_PRODUCER_MSG_BODY_KEY_STR = "msg_body"; + + ///// JMS Consumer + public final static String JMS_CONSUMER_DURABLE_KEY_STR = "jms_consumer_msg_durable"; + public final static String JMS_CONSUMER_SHARED_KEY_STR = "jms_consumer_msg_shared"; + public final static String JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR = "jms_consumer_subscription"; + public final static String JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR = "jms_consumer_msg_read_selector"; + public final static String JMS_CONSUMER_MSG_NOLOCAL_KEY_STR = "jms_consumer_msg_nolocal"; + public final static String JMS_CONSUMER_READ_TIMEOUT_KEY_STR = "jms_consumer_msg_read_timeout"; + // Only applicable to Pulsar JMS provider public final static String PULSAR_JMS_TOPIC_URI_KEY_STR = "pulsar_topic_uri"; @@ -61,24 +89,6 @@ public class JmsUtil { return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type)); } - // Supported JMS provider type - public enum JMS_MSG_HEADER_KEYS { - DELIVERY_MODE("jms_producer_header_msg_delivery_mode"), - PRIORITY("jms_producer_header_msg_priority"), - TTL("jms_producer_header_msg_ttl"), - DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"), - DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"), - DISABLE_ID("jms_producer_header_disable_msg_id"); - - public final String label; - JMS_MSG_HEADER_KEYS(String label) { - this.label = label; - } - } - public static boolean isValidJmsHeaderKey(String type) { - return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type)); - } - public static String encode(String... strings) { StringBuilder stringBuilder = new StringBuilder(); for (String str : strings) { diff --git a/driver-jms/src/main/resources/jms.md b/driver-jms/src/main/resources/jms.md new file mode 100644 index 000000000..07dd0c5c7 --- /dev/null +++ b/driver-jms/src/main/resources/jms.md @@ -0,0 +1 @@ +# Overview diff --git a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml index 21f3b788e..c69010d36 100644 --- a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml +++ b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml @@ -11,31 +11,7 @@ params: ### Static only # Valid values: queue (point-to-point) or topic (pub-sub) - jms_desitation_type: "queue" - - ### JMS producer message header - ### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT - # - static or dynamic - # - Producer only - # Valid values: non-persistent(1), or persistent(2) - default - jms_producer_header_msg_delivery_mode: "2" - # Valid values: 0~9 (4 as default) - jms_producer_header_msg_priority: "4" - # Valid values: non-negative long; default 0 (never expires) - jms_producer_header_msg_ttl: "0" - # Valid values: non-negative long; default 0 (no delay) - jms_producer_header_msg_delivery_delay: "0" - # Valid values: true/false; default false (message timestamp is enabled) - jms_producer_header_disable_msg_timestamp: "false" - # Valid values: true/false; default false (message ID is enabled) - jms_producer_header_disable_msg_id: "false" - - ### JMS producer message properties - # - static only - # - Producer only - # - In format: "key1=value1;key2=value2;..." - jms_producer_msg_properties: "" - + jms_desitation_type: "topic" ### Static Only # NOTE: ONLY relevant when the JMS provider is Pulsar @@ -43,10 +19,66 @@ params: pulsar_topic_uri: "persistent://public/default/t0" blocks: - - name: producer-block + - name: "producer-block" tags: - phase: jms_producer + phase: "jms_producer" statements: - - name: s1 - optype: msg_send + - name: "s1" + optype: "msg_send" + + ### JMS PRODUCER message header + ### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT + # - static or dynamic + # - Producer only + # Valid values: non-persistent(1), or persistent(2) - default + jms_producer_header_msg_delivery_mode: "2" + # Valid values: 0~9 (4 as default) + jms_producer_header_msg_priority: "4" + # Valid values: non-negative long; default 0 (never expires) + jms_producer_header_msg_ttl: "0" + # Valid values: non-negative long; default 0 (no delay) + jms_producer_header_msg_delivery_delay: "0" + # Valid values: true/false; default false (message timestamp is enabled) + jms_producer_header_disable_msg_timestamp: "false" + # Valid values: true/false; default false (message ID is enabled) + jms_producer_header_disable_msg_id: "false" + + ### JMS PRODUCER message properties + # - static only + # - Producer only + # - In format: "key1=value1;key2=value2;..." + jms_producer_msg_properties: "key1=value1;key2=value2" + + ### JMS PRODUCER message body msg_body: "{payload}" + + - name: "consumer-block" + tags: + phase: "jms_consumer" + statements: + - name: "s1" + optype: "msg_read" + + ### JMS CONSUMER durable and shared + jms_consumer_msg_durable: "true" + jms_consumer_msg_shared: "true" + + ### JMS CONSUMER subscription name + # - only relevant for durable consumer + jms_consumer_subscription: "mysub" + + ### JMS CONSUMER subscription name + # - only relevant for unshared consumer + jms_consumer_nolocal: "false" + + ### JMS CONSUMER message read timeout + # - unit: milliseconds + # - 0 means call blocks indefinitely + # - FIXME: 0 supposes to wait indefinitly; but + # it actually behaves like no wait at all + jms_consumer_msg_read_timeout: "10000" + + ### JMS CONSUMER message selector + # - empty string means no message selector + # - https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html + jms_consumer_msg_read_selector: "" From e6491433aeb01da58ca152e2d2a0f4d3d462b7de Mon Sep 17 00:00:00 2001 From: yabinmeng-gitee Date: Wed, 5 May 2021 20:18:26 -0500 Subject: [PATCH 4/5] Add Pulsar client, producer, and consumer configuration support --- driver-jms/pom.xml | 19 ++-- .../io/nosqlbench/driver/jms/JmsActivity.java | 54 ++++++---- .../driver/jms/conn/JmsConnInfo.java | 15 ++- .../driver/jms/conn/JmsPulsarConnInfo.java | 42 +++++--- .../driver/jms/ops/JmsMsgSendOp.java | 39 ++++++-- .../nosqlbench/driver/jms/util/JmsUtil.java | 42 +++++--- .../driver/jms/util/PulsarConfig.java | 99 +++++++++++++++++++ .../main/resources/pulsar_config.properties | 33 +++++++ ...{pulsar_jms_bytes.yaml => pulsar_jms.yaml} | 9 +- .../pulsar/util/PulsarNBClientConf.java | 1 - 10 files changed, 291 insertions(+), 62 deletions(-) create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java create mode 100644 driver-jms/src/main/resources/pulsar_config.properties rename driver-jms/src/main/resources/{pulsar_jms_bytes.yaml => pulsar_jms.yaml} (88%) diff --git a/driver-jms/pom.xml b/driver-jms/pom.xml index 7d0b5b0bc..53cc52caa 100644 --- a/driver-jms/pom.xml +++ b/driver-jms/pom.xml @@ -64,12 +64,19 @@ provided - - - - - - + + + commons-beanutils + commons-beanutils + 1.9.4 + + + + + org.apache.commons + commons-configuration2 + 2.7 + pulsar-jms diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java index 4622210b1..396982bbf 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java @@ -7,22 +7,19 @@ import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; import io.nosqlbench.driver.jms.conn.JmsConnInfo; import io.nosqlbench.driver.jms.conn.JmsPulsarConnInfo; import io.nosqlbench.driver.jms.ops.JmsOp; -import io.nosqlbench.driver.jms.util.JmsHeader; import io.nosqlbench.driver.jms.util.JmsUtil; +import io.nosqlbench.driver.jms.util.PulsarConfig; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.metrics.ActivityMetrics; -import org.apache.commons.lang.StringUtils; -import org.apache.commons.lang3.math.NumberUtils; +import org.apache.commons.lang3.StringUtils; import javax.jms.Destination; import javax.jms.JMSContext; import javax.jms.JMSException; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -53,33 +50,54 @@ public class JmsActivity extends SimpleActivity { super.initActivity(); // default JMS type: Pulsar + // - currently this is the only supported JMS provider jmsProviderType = activityDef.getParams() .getOptionalString(JmsUtil.JMS_PROVIDER_TYPE_KEY_STR) .orElse(JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label); + // "Pulsar" as the JMS provider if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) { - jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, activityDef); - } - PulsarConnectionFactory factory; - if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) { - Map configuration = new HashMap<>(); - configuration.put("webServiceUrl", ((JmsPulsarConnInfo)jmsConnInfo).getWebSvcUrl()); - configuration.put("brokerServiceUrl",((JmsPulsarConnInfo)jmsConnInfo).getPulsarSvcUrl()); + String webSvcUrl = + activityDef.getParams() + .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR) + .orElse("http://localhost:8080"); + String pulsarSvcUrl = + activityDef.getParams() + .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR) + .orElse("pulsar://localhost:6650"); - try { - factory = new PulsarConnectionFactory(configuration); - this.jmsContext = factory.createContext(); - } catch (JMSException e) { - throw new RuntimeException( - "Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!"); + if (StringUtils.isAnyBlank(webSvcUrl, pulsarSvcUrl)) { + throw new RuntimeException("For \"" + JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label + "\" type, " + + "\"" + JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR + "\" and " + + "\"" + JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR + "\" parameters are manadatory!"); } + + // Check if extra Pulsar config. file is in place + // - default file: "pulsar_config.properties" under the current directory + String pulsarCfgFile = + activityDef.getParams() + .getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR) + .orElse(JmsUtil.JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME); + + PulsarConfig pulsarConfig = new PulsarConfig(pulsarCfgFile); + + jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, webSvcUrl, pulsarSvcUrl, pulsarConfig); } else { throw new RuntimeException("Unsupported JMS driver type : " + jmsProviderType); } + PulsarConnectionFactory factory; + try { + factory = new PulsarConnectionFactory(jmsConnInfo.getJmsConnConfig()); + this.jmsContext = factory.createContext(); + } catch (JMSException e) { + throw new RuntimeException( + "Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!"); + } + bindTimer = ActivityMetrics.timer(activityDef, "bind"); executeTimer = ActivityMetrics.timer(activityDef, "execute"); bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java index fcd9776d3..e52408106 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsConnInfo.java @@ -1,10 +1,21 @@ package io.nosqlbench.driver.jms.conn; +import java.util.HashMap; +import java.util.Map; + public class JmsConnInfo { - private final String jmsProviderType; + protected final String jmsProviderType; + protected final Map jmsConnConfig; - public JmsConnInfo(String jmsProviderType) { + protected JmsConnInfo(String jmsProviderType) { this.jmsProviderType = jmsProviderType; + this.jmsConnConfig = new HashMap<>(); } + + public Map getJmsConnConfig() { return this.jmsConnConfig; } + public void resetJmsConnConfig() { this.jmsConnConfig.clear(); } + public void addJmsConnConfigItems(Map cfgItems) { this.jmsConnConfig.putAll(cfgItems); } + public void addJmsConnConfigItem(String key, Object value) { this.jmsConnConfig.put(key, value); } + public void removeJmsConnConfigItem(String key) { this.jmsConnConfig.remove(key); } } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java index 10f279124..4d623527b 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/conn/JmsPulsarConnInfo.java @@ -1,24 +1,42 @@ package io.nosqlbench.driver.jms.conn; -import io.nosqlbench.engine.api.activityimpl.ActivityDef; +import io.nosqlbench.driver.jms.util.PulsarConfig; + +import java.util.Map; public class JmsPulsarConnInfo extends JmsConnInfo { - private String pulsarSvcUrl; - private String webSvcUrl; + private final String webSvcUrl; + private final String pulsarSvcUrl; + private final PulsarConfig extraPulsarConfig; - public JmsPulsarConnInfo(String jmsProviderType, ActivityDef activityDef) { + public JmsPulsarConnInfo(String jmsProviderType, String webSvcUrl, String pulsarSvcUrl, PulsarConfig pulsarConfig) { super(jmsProviderType); - webSvcUrl = - activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080"); - pulsarSvcUrl = - activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); + this.webSvcUrl = webSvcUrl; + this.pulsarSvcUrl = pulsarSvcUrl; + this.extraPulsarConfig = pulsarConfig; + + this.addJmsConnConfigItem("webServiceUrl", this.webSvcUrl); + this.addJmsConnConfigItem("brokerServiceUrl", this.pulsarSvcUrl); + + Map clientCfgMap = this.extraPulsarConfig.getClientConfMap(); + if (!clientCfgMap.isEmpty()) { + this.addJmsConnConfigItems(clientCfgMap); + } + + Map producerCfgMap = this.extraPulsarConfig.getProducerConfMap(); + if (!producerCfgMap.isEmpty()) { + this.addJmsConnConfigItem("producerConfig", producerCfgMap); + } + + Map consumerCfgMap = this.extraPulsarConfig.getConsumerConfMap(); + if (!consumerCfgMap.isEmpty()) { + this.addJmsConnConfigItem("consumerConfig", consumerCfgMap); + } } - public void setPulsarSvcUrl(String pulsarSvcUrl) { this.pulsarSvcUrl = pulsarSvcUrl; } - public String getPulsarSvcUrl() { return this.pulsarSvcUrl; } - - public void setWebSvcUrl(String webSvcUrl) { this.webSvcUrl = webSvcUrl; } public String getWebSvcUrl() { return this.webSvcUrl; } + public String getPulsarSvcUrl() { return this.pulsarSvcUrl; } + public PulsarConfig getExtraPulsarConfig() { return this.extraPulsarConfig; } } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java index c0f935771..f23ac403e 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java @@ -7,9 +7,7 @@ import io.nosqlbench.driver.jms.util.JmsHeader; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import javax.jms.Destination; -import javax.jms.JMSContext; -import javax.jms.JMSProducer; +import javax.jms.*; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Map; @@ -69,10 +67,37 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { jmsProducer.setDisableMessageTimestamp(this.jmsHeader.isDisableMsgTimestamp()); jmsProducer.setDisableMessageID(this.jmsHeader.isDisableMsgId()); - // TODO: async producer -// if (this.asyncJmsOp) { -// jmsProducer.setAsync(); -// } + if (this.asyncJmsOp) { + jmsProducer.setAsync(new CompletionListener() { + @Override + public void onCompletion(Message msg) { + try { + byte[] msgBody = msg.getBody(byte[].class); + if (logger.isTraceEnabled()) { + logger.trace("Async message send success - message body: " + new String(msgBody)); + } + } + catch (JMSException jmsException) { + jmsException.printStackTrace(); + logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage()); + } + } + + @Override + public void onException(Message msg, Exception e) { + try { + byte[] msgBody = msg.getBody(byte[].class); + if (logger.isTraceEnabled()) { + logger.trace("Async message send failure - message body: " + new String(msgBody)); + } + } + catch (JMSException jmsException) { + jmsException.printStackTrace(); + logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage()); + } + } + }); + } for (Map.Entry entry : jmsMsgProperties.entrySet()) { jmsProducer.setProperty(entry.getKey(), entry.getValue()); diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java index 99fcd1b2e..456939417 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java @@ -11,8 +11,35 @@ public class JmsUtil { private final static Logger logger = LogManager.getLogger(JmsUtil.class); + // Supported JMS provider type + public enum JMS_PROVIDER_TYPES { + PULSAR("pulsar"); + + public final String label; + JMS_PROVIDER_TYPES(String label) { + this.label = label; + } + } + public static boolean isValidJmsProviderType(String type) { + return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type)); + } + + ///// + // NB command line parameters + // - JMS provider type + public final static String JMS_PROVIDER_TYPE_KEY_STR = "provider_type"; + + /// Only applicable when the provider is "Pulsar" + // - Pulsar configuration properties file + public final static String JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR = "pulsar_cfg_file"; + public final static String JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME = "pulsar_config.properties"; + // - Pulsar web url + public final static String JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR = "web_url"; + // - Pulsar service url + public final static String JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR = "service_url"; + + public final static String ASYNC_API_KEY_STR = "async_api"; - public final static String JMS_PROVIDER_TYPE_KEY_STR = "jms_provider_type"; public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type"; ///// JMS Producer @@ -62,19 +89,6 @@ public class JmsUtil { return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type)); } - // Supported JMS provider type - public enum JMS_PROVIDER_TYPES { - PULSAR("pulsar"); - - public final String label; - JMS_PROVIDER_TYPES(String label) { - this.label = label; - } - } - public static boolean isValidJmsProviderType(String type) { - return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type)); - } - // JMS Destination Types public enum JMS_DESTINATION_TYPES { QUEUE("queue"), diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java new file mode 100644 index 000000000..1d5cc6d5b --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarConfig.java @@ -0,0 +1,99 @@ +package io.nosqlbench.driver.jms.util; + +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.FileBasedConfiguration; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; +import org.apache.commons.configuration2.builder.fluent.Parameters; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class PulsarConfig { + private final static Logger logger = LogManager.getLogger(PulsarConfig.class); + + public static final String SCHEMA_CONF_PREFIX = "schema"; + public static final String CLIENT_CONF_PREFIX = "client"; + public static final String PRODUCER_CONF_PREFIX = "producer"; + public static final String CONSUMER_CONF_PREFIX = "consumer"; + + private final Map schemaConfMap = new HashMap<>(); + private final Map clientConfMap = new HashMap<>(); + private final Map producerConfMap = new HashMap<>(); + private final Map consumerConfMap = new HashMap<>(); + + public PulsarConfig(String fileName) { + File file = new File(fileName); + + try { + String canonicalFilePath = file.getCanonicalPath(); + + Parameters params = new Parameters(); + + FileBasedConfigurationBuilder builder = + new FileBasedConfigurationBuilder(PropertiesConfiguration.class) + .configure(params.properties() + .setFileName(fileName)); + + Configuration config = builder.getConfiguration(); + + // Get schema specific configuration settings + for (Iterator it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) { + String confKey = it.next(); + String confVal = config.getProperty(confKey).toString(); + if (!StringUtils.isBlank(confVal)) + schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), config.getProperty(confKey)); + } + + // Get client connection specific configuration settings + for (Iterator it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) { + String confKey = it.next(); + String confVal = config.getProperty(confKey).toString(); + if (!StringUtils.isBlank(confVal)) + clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), config.getProperty(confKey)); + } + + // Get producer specific configuration settings + for (Iterator it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) { + String confKey = it.next(); + String confVal = config.getProperty(confKey).toString(); + if (!StringUtils.isBlank(confVal)) + producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), config.getProperty(confKey)); + } + + // Get consumer specific configuration settings + for (Iterator it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) { + String confKey = it.next(); + String confVal = config.getProperty(confKey).toString(); + if (!StringUtils.isBlank(confVal)) + consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), config.getProperty(confKey)); + } + } catch (IOException ioe) { + logger.error("Can't read the specified config properties file: " + fileName); + ioe.printStackTrace(); + } catch (ConfigurationException cex) { + logger.error("Error loading configuration items from the specified config properties file: " + fileName); + cex.printStackTrace(); + } + } + + public Map getSchemaConfMap() { + return this.schemaConfMap; + } + public Map getClientConfMap() { + return this.clientConfMap; + } + public Map getProducerConfMap() { + return this.producerConfMap; + } + public Map getConsumerConfMap() { + return this.consumerConfMap; + } +} diff --git a/driver-jms/src/main/resources/pulsar_config.properties b/driver-jms/src/main/resources/pulsar_config.properties new file mode 100644 index 000000000..f711535ac --- /dev/null +++ b/driver-jms/src/main/resources/pulsar_config.properties @@ -0,0 +1,33 @@ +### Schema related configurations - schema.xxx +# valid types: +# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type) +# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue) +# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct) +# avro, json, protobuf +# +# NOTE: for JMS client, Pulsar "schema" is NOT supported yet +schema.type= +schema.definition= + + +### Pulsar client related configurations - client.xxx +# http://pulsar.apache.org/docs/en/client-libraries-java/#client +client.connectionTimeoutMs=5000 +#client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken +#client.authParams= +#client.tlsAllowInsecureConnection=true +client.numIoThreads=10 +client.numListenerThreads=10 + + +### Producer related configurations (global) - producer.xxx +# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer +producer.sendTimeoutMs= +producer.blockIfQueueFull=true +producer.maxPendingMessages=10000 +producer.batchingMaxMessages=10000 + + +### Consumer related configurations (global) - consumer.xxx +# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer +consumer.receiverQueueSize=2000 diff --git a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml b/driver-jms/src/main/resources/pulsar_jms.yaml similarity index 88% rename from driver-jms/src/main/resources/pulsar_jms_bytes.yaml rename to driver-jms/src/main/resources/pulsar_jms.yaml index c69010d36..755f05ed6 100644 --- a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml +++ b/driver-jms/src/main/resources/pulsar_jms.yaml @@ -7,7 +7,7 @@ bindings: # document level parameters that apply to all Pulsar client types: params: ### static only - async_api: "false" + async_api: "true" ### Static only # Valid values: queue (point-to-point) or topic (pub-sub) @@ -16,7 +16,12 @@ params: ### Static Only # NOTE: ONLY relevant when the JMS provider is Pulsar #pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" - pulsar_topic_uri: "persistent://public/default/t0" + #pulsar_topic_uri: "persistent://public/default/pt100" + #pulsar_topic_uri: "persistent://public/default/t0" + pulsar_topic_uri: "persistent://public/default/pt100_10" + #pulsar_topic_uri: "persistent://public/default/pt200_10" + #pulsar_topic_uri: "persistent://public/default/pt300_10" + #pulsar_topic_uri: "persistent://public/default/pt400_10" blocks: - name: "producer-block" diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index b442cf032..0bbabd8c4 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -5,7 +5,6 @@ import org.apache.commons.configuration2.FileBasedConfiguration; import org.apache.commons.configuration2.PropertiesConfiguration; import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; import org.apache.commons.configuration2.builder.fluent.Parameters; -import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; From 87385c15d7ace73afd03093e9c8e2667a141a93e Mon Sep 17 00:00:00 2001 From: yabinmeng-gitee Date: Thu, 6 May 2021 16:44:39 -0500 Subject: [PATCH 5/5] Maven central pom update --- driver-jms/pom.xml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/driver-jms/pom.xml b/driver-jms/pom.xml index 53cc52caa..0b9e28275 100644 --- a/driver-jms/pom.xml +++ b/driver-jms/pom.xml @@ -78,12 +78,14 @@ 2.7 + - pulsar-jms com.datastax.oss - 1.0.0-SNAPSHOT + pulsar-jms + 1.0.0-ALPHA +