From f4aafb70c38b7b2655e915e64fbc0c807466f641 Mon Sep 17 00:00:00 2001 From: yabinmeng-gitee Date: Mon, 3 May 2021 13:33:59 -0500 Subject: [PATCH] Pulsar JMS driver type name change and package name typo fix --- {driver-pulsarjms => driver-jms}/pom.xml | 11 ++++-- .../io/nosqlbench/driver/jms/JmsAction.java | 20 +++++----- .../io/nosqlbench/driver/jms/JmsActivity.java | 16 ++++---- .../driver/jms/JmsActivityType.java | 16 ++++---- .../io/nosqlbench/driver/jms/ReadyJmsOp.java | 30 +++++++-------- .../driver/jms/ops/JmsMsgSendMapper.java | 22 +++++------ .../driver/jms/ops/JmsMsgSendOp.java | 30 +++++++-------- .../io/nosqlbench/driver/jms/ops/JmsOp.java | 4 +- .../driver/jms/ops/JmsOpMapper.java | 21 +++++++++++ .../driver/jms/ops/JmsTimeTrackOp.java | 4 +- .../jms}/util/PulsarJmsActivityUtil.java | 2 +- .../src/main/resources/pulsar_jms_bytes.yaml | 37 +++++++++++++++++++ .../pulsarjms/ops/PulsarJmsOpMapper.java | 21 ----------- .../src/main/resources/pulsar_jms_bytes.yaml | 21 ----------- nb/pom.xml | 6 +++ pom.xml | 2 +- 16 files changed, 144 insertions(+), 119 deletions(-) rename {driver-pulsarjms => driver-jms}/pom.xml (83%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java (76%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java (89%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java (57%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java (80%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java (58%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java (60%) rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java (80%) create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java rename driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java => driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java (70%) rename {driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms => driver-jms/src/main/java/io/nosqlbench/driver/jms}/util/PulsarJmsActivityUtil.java (97%) create mode 100644 driver-jms/src/main/resources/pulsar_jms_bytes.yaml delete mode 100644 driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java delete mode 100644 driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml diff --git a/driver-pulsarjms/pom.xml b/driver-jms/pom.xml similarity index 83% rename from driver-pulsarjms/pom.xml rename to driver-jms/pom.xml index e024cb73a..129f1d978 100644 --- a/driver-pulsarjms/pom.xml +++ b/driver-jms/pom.xml @@ -8,16 +8,20 @@ ../mvn-defaults - driver-pulsarjms + driver-jms jar ${project.artifactId} - A PulsarJMS driver for nosqlbench. This provides the ability to inject synthetic data - into a pulsar system via JMS 2.0 compatibile APIs + A JMS driver for nosqlbench. This provides the ability to inject synthetic data + into a pulsar system via JMS 2.0 compatibile APIs. + + NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster + as the potential JMS Destination + datastax-releases-local DataStax Local Releases @@ -32,7 +36,6 @@ - io.nosqlbench diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java similarity index 76% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java index 660c68b5b..54769a243 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsAction.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsAction.java @@ -1,7 +1,7 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; import com.codahale.metrics.Timer; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp; +import io.nosqlbench.driver.jms.ops.JmsOp; import io.nosqlbench.engine.api.activityapi.core.SyncAction; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; import org.apache.logging.log4j.LogManager; @@ -9,16 +9,16 @@ import org.apache.logging.log4j.Logger; import java.util.function.LongFunction; -public class PulsarJmsAction implements SyncAction { +public class JmsAction implements SyncAction { - private final static Logger logger = LogManager.getLogger(PulsarJmsAction.class); + private final static Logger logger = LogManager.getLogger(JmsAction.class); - private final PulsarJmsActivity activity; + private final JmsActivity activity; private final int slot; int maxTries; - public PulsarJmsAction(PulsarJmsActivity activity, int slot) { + public JmsAction(JmsActivity activity, int slot) { this.activity = activity; this.slot = slot; this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10); @@ -36,10 +36,10 @@ public class PulsarJmsAction implements SyncAction { long start = System.nanoTime(); - PulsarJmsOp pulsarJmsOp; + JmsOp jmsOp; try (Timer.Context ctx = activity.getBindTimer().time()) { - LongFunction readyPulsarJmsOp = activity.getSequencer().get(cycle); - pulsarJmsOp = readyPulsarJmsOp.apply(cycle); + LongFunction readyPulsarJmsOp = activity.getSequencer().get(cycle); + jmsOp = readyPulsarJmsOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... activity.getErrorhandler().handleError(bindException, cycle, 0); @@ -53,7 +53,7 @@ public class PulsarJmsAction implements SyncAction { try { // it is up to the pulsarOp to call Context#close when the activity is executed // this allows us to track time for async operations - pulsarJmsOp.run(ctx::close); + jmsOp.run(ctx::close); break; } catch (RuntimeException err) { ErrorDetail errorDetail = activity diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java similarity index 89% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java index fc87f80cf..e6d0a9a9f 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivity.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java @@ -1,11 +1,11 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp; -import io.nosqlbench.driver.pulsarjms.util.PulsarJmsActivityUtil; +import io.nosqlbench.driver.jms.ops.JmsOp; +import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityimpl.ActivityDef; @@ -21,7 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class PulsarJmsActivity extends SimpleActivity { +public class JmsActivity extends SimpleActivity { private final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>(); @@ -32,7 +32,7 @@ public class PulsarJmsActivity extends SimpleActivity { private JMSContext jmsContext; - private OpSequence> sequence; + private OpSequence> sequence; private volatile Throwable asyncOperationFailure; private NBErrorHandler errorhandler; @@ -41,7 +41,7 @@ public class PulsarJmsActivity extends SimpleActivity { private Counter bytesCounter; private Histogram messagesizeHistogram; - public PulsarJmsActivity(ActivityDef activityDef) { + public JmsActivity(ActivityDef activityDef) { super(activityDef); } @@ -71,7 +71,7 @@ public class PulsarJmsActivity extends SimpleActivity { bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); - this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this)); + this.sequence = createOpSequence((ot) -> new ReadyJmsOp(ot, this)); setDefaultsFromOpSequence(sequence); onActivityDefUpdate(activityDef); @@ -98,7 +98,7 @@ public class PulsarJmsActivity extends SimpleActivity { @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); } - public OpSequence> getSequencer() { return sequence; } + public OpSequence> getSequencer() { return sequence; } public String getPulsarSvcUrl() { return pulsarSvcUrl; diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java similarity index 57% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java index 395504475..026383147 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/PulsarJmsActivityType.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivityType.java @@ -1,4 +1,4 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; import io.nosqlbench.engine.api.activityapi.core.Action; import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; @@ -7,26 +7,26 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.nb.annotations.Service; @Service(value = ActivityType.class, selector = "pulsarjms") -public class PulsarJmsActivityType implements ActivityType { +public class JmsActivityType implements ActivityType { @Override - public ActionDispenser getActionDispenser(PulsarJmsActivity activity) { + public ActionDispenser getActionDispenser(JmsActivity activity) { return new PulsarJmsActionDispenser(activity); } @Override - public PulsarJmsActivity getActivity(ActivityDef activityDef) { - return new PulsarJmsActivity(activityDef); + public JmsActivity getActivity(ActivityDef activityDef) { + return new JmsActivity(activityDef); } private static class PulsarJmsActionDispenser implements ActionDispenser { - private final PulsarJmsActivity activity; - public PulsarJmsActionDispenser(PulsarJmsActivity activity) { + private final JmsActivity activity; + public PulsarJmsActionDispenser(JmsActivity activity) { this.activity = activity; } @Override public Action getAction(int slot) { - return new PulsarJmsAction(activity, slot); + return new JmsAction(activity, slot); } } } diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java similarity index 80% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java index fab8901e0..5f4972480 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ReadyPulsarJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java @@ -1,8 +1,8 @@ -package io.nosqlbench.driver.pulsarjms; +package io.nosqlbench.driver.jms; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsMsgSendMapper; -import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp; -import io.nosqlbench.driver.pulsarjms.util.PulsarJmsActivityUtil; +import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper; +import io.nosqlbench.driver.jms.ops.JmsOp; +import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.templating.CommandTemplate; @@ -13,26 +13,26 @@ import javax.jms.Destination; import javax.jms.JMSRuntimeException; import java.util.function.LongFunction; -public class ReadyPulsarJmsOp implements OpDispenser { +public class ReadyJmsOp implements OpDispenser { private final OpTemplate opTpl; private final CommandTemplate cmdTpl; - private final LongFunction opFunc; - private final PulsarJmsActivity pulsarJmsActivity; + private final LongFunction opFunc; + private final JmsActivity jmsActivity; - public ReadyPulsarJmsOp(OpTemplate opTemplate, PulsarJmsActivity pulsarJmsActivity) { + public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) { this.opTpl = opTemplate; this.cmdTpl = new CommandTemplate(opTpl); - this.pulsarJmsActivity = pulsarJmsActivity; + this.jmsActivity = jmsActivity; this.opFunc = resolve(); } - public PulsarJmsOp apply(long value) { + public JmsOp apply(long value) { return opFunc.apply(value); } - public LongFunction resolve() { + public LongFunction resolve() { if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) { throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!"); } @@ -63,7 +63,7 @@ public class ReadyPulsarJmsOp implements OpDispenser { LongFunction jmsDestinationFunc; try { LongFunction finalTopicUriFunc = topicUriFunc; - jmsDestinationFunc = (l) -> pulsarJmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l)); + jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l)); } catch (JMSRuntimeException ex) { throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!"); @@ -79,7 +79,7 @@ public class ReadyPulsarJmsOp implements OpDispenser { } } - private LongFunction resolveMsgSend( + private LongFunction resolveMsgSend( LongFunction async_api_func, LongFunction jmsDestinationFunc ) { @@ -96,8 +96,8 @@ public class ReadyPulsarJmsOp implements OpDispenser { throw new RuntimeException("PulsarJMS message send:: \"msg_body\" field must be specified!"); } - return new PulsarJmsMsgSendMapper( - pulsarJmsActivity, + return new JmsMsgSendMapper( + jmsActivity, async_api_func, jmsDestinationFunc, msgBodyFunc); diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java similarity index 58% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java index bc3d3e7ca..a65e4df1c 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java @@ -1,6 +1,6 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; -import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity; +import io.nosqlbench.driver.jms.JmsActivity; import javax.jms.Destination; import java.util.function.LongFunction; @@ -15,25 +15,25 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarJmsMsgSendMapper extends PulsarJmsOpMapper { +public class JmsMsgSendMapper extends JmsOpMapper { private final LongFunction msgBodyFunc; - public PulsarJmsMsgSendMapper(PulsarJmsActivity pulsarJmsActivity, - LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc, - LongFunction msgBodyFunc) { - super(pulsarJmsActivity, asyncApiFunc, jmsDestinationFunc); + public JmsMsgSendMapper(JmsActivity jmsActivity, + LongFunction asyncApiFunc, + LongFunction jmsDestinationFunc, + LongFunction msgBodyFunc) { + super(jmsActivity, asyncApiFunc, jmsDestinationFunc); this.msgBodyFunc = msgBodyFunc; } @Override - public PulsarJmsOp apply(long value) { + public JmsOp apply(long value) { Destination jmsDestination = jmsDestinationFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); String msgBody = msgBodyFunc.apply(value); - return new PulsarJmsMsgSendOp( - pulsarJmsActivity, + return new JmsMsgSendOp( + jmsActivity, asyncApi, jmsDestination, msgBody diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java similarity index 60% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java index e46f3a856..bc44d7951 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsMsgSendOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java @@ -1,8 +1,8 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; -import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity; +import io.nosqlbench.driver.jms.JmsActivity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -11,12 +11,12 @@ import javax.jms.JMSContext; import javax.jms.JMSProducer; import java.nio.charset.StandardCharsets; -public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp { +public class JmsMsgSendOp extends JmsTimeTrackOp { - private final static Logger logger = LogManager.getLogger(PulsarJmsMsgSendOp.class); + private final static Logger logger = LogManager.getLogger(JmsMsgSendOp.class); - private final PulsarJmsActivity pulsarActivity; - private final boolean asyncPulsarOp; + private final JmsActivity jmsActivity; + private final boolean asyncJmsOp; private final Destination jmsDestination; private final JMSContext jmsContext; private final JMSProducer jmsProducer; @@ -25,18 +25,18 @@ public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp { private final Counter bytesCounter; private final Histogram messagesizeHistogram; - public PulsarJmsMsgSendOp(PulsarJmsActivity pulsarActivity, - boolean asyncPulsarOp, - Destination jmsDestination, - String msgBody) { - this.pulsarActivity = pulsarActivity; - this.asyncPulsarOp = asyncPulsarOp; + public JmsMsgSendOp(JmsActivity jmsActivity, + boolean asyncJmsOp, + Destination jmsDestination, + String msgBody) { + this.jmsActivity = jmsActivity; + this.asyncJmsOp = asyncJmsOp; this.jmsDestination = jmsDestination; - this.jmsContext = pulsarActivity.getJmsContext(); + this.jmsContext = jmsActivity.getJmsContext(); this.jmsProducer = jmsContext.createProducer(); this.msgBody = msgBody; - this.bytesCounter = pulsarActivity.getBytesCounter(); - this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram(); + this.bytesCounter = jmsActivity.getBytesCounter(); + this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram(); } @Override diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java similarity index 80% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java index 3cda16850..bd06c6bca 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOp.java @@ -1,9 +1,9 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; /** * Base type of all Pulsar Operations including Producers and Consumers. */ -public interface PulsarJmsOp { +public interface JmsOp { /** * Execute the operation, invoke the timeTracker when the operation ended. diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java new file mode 100644 index 000000000..78bc6133c --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java @@ -0,0 +1,21 @@ +package io.nosqlbench.driver.jms.ops; + +import io.nosqlbench.driver.jms.JmsActivity; + +import javax.jms.Destination; +import java.util.function.LongFunction; + +public abstract class JmsOpMapper implements LongFunction { + protected final JmsActivity jmsActivity; + protected final LongFunction asyncApiFunc; + protected final LongFunction jmsDestinationFunc; + + public JmsOpMapper(JmsActivity jmsActivity, + LongFunction asyncApiFunc, + LongFunction jmsDestinationFunc) + { + this.jmsActivity = jmsActivity; + this.asyncApiFunc = asyncApiFunc; + this.jmsDestinationFunc = jmsDestinationFunc; + } +} diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java similarity index 70% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java index 38b0f9305..58bd578d6 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsTimeTrackOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsTimeTrackOp.java @@ -1,9 +1,9 @@ -package io.nosqlbench.driver.pulsarjms.ops; +package io.nosqlbench.driver.jms.ops; /** * Base type of all Sync Pulsar Operations including Producers and Consumers. */ -public abstract class PulsarJmsTimeTrackOp implements PulsarJmsOp { +public abstract class JmsTimeTrackOp implements JmsOp { public void run(Runnable timeTracker) { try { diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/util/PulsarJmsActivityUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java similarity index 97% rename from driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/util/PulsarJmsActivityUtil.java rename to driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java index 3440e793f..8f068a810 100644 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/util/PulsarJmsActivityUtil.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/PulsarJmsActivityUtil.java @@ -1,4 +1,4 @@ -package io.nosqlbench.driver.pulsarjms.util; +package io.nosqlbench.driver.jms.util; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; diff --git a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml new file mode 100644 index 000000000..f5b9845f3 --- /dev/null +++ b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml @@ -0,0 +1,37 @@ +bindings: + payload: NumberNameToString() #AlphaNumericString(20) + tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt") + namespace: Mod(10); Div(5L); ToString(); Prefix("ns") + core_topic_name: Mod(5); ToString(); Prefix("t") + +# document level parameters that apply to all Pulsar client types: +params: + ### static only + async_api: "false" + + ### Static Only + # Using Enrico's JMS library (https://github.com/riptano/pulsar-jms), + # we can use "Pulsar" as the JMS provider + jms_provider_type: "pulsar" + + ### Static only + # Valid values: queue (point-to-point) or topic (pub-sub) + jms_desitation_type: "queue" + + ### Static only + # Valid values: persistent or non-persistent + jms_delivery_mode: "persistent" + + ### Static Only + # Only relevant when the JMS provider is Pulsar + #topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" + pulsar_topic_uri: "persistent://public/default/t0" + +blocks: + - name: producer-block + tags: + phase: jms_producer + statements: + - name: s1 + optype: msg_send + msg_body: "{payload}" diff --git a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java b/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java deleted file mode 100644 index 1b0a64d5f..000000000 --- a/driver-pulsarjms/src/main/java/io/nosqlbench/driver/pulsarjms/ops/PulsarJmsOpMapper.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.nosqlbench.driver.pulsarjms.ops; - -import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity; - -import javax.jms.Destination; -import java.util.function.LongFunction; - -public abstract class PulsarJmsOpMapper implements LongFunction { - protected final PulsarJmsActivity pulsarJmsActivity; - protected final LongFunction asyncApiFunc; - protected final LongFunction jmsDestinationFunc; - - public PulsarJmsOpMapper(PulsarJmsActivity pulsarJmsActivity, - LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc) - { - this.pulsarJmsActivity = pulsarJmsActivity; - this.asyncApiFunc = asyncApiFunc; - this.jmsDestinationFunc = jmsDestinationFunc; - } -} diff --git a/driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml b/driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml deleted file mode 100644 index 138585d29..000000000 --- a/driver-pulsarjms/src/main/resources/pulsar_jms_bytes.yaml +++ /dev/null @@ -1,21 +0,0 @@ -bindings: - payload: NumberNameToString() #AlphaNumericString(20) - tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt") - namespace: Mod(10); Div(5L); ToString(); Prefix("ns") - core_topic_name: Mod(5); ToString(); Prefix("t") - -# document level parameters that apply to all Pulsar client types: -params: -# topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" - topic_uri: "persistent://public/default/t0" - async_api: "false" - -blocks: - - name: producer-block - tags: - phase: jms_producer - admin_task: false - statements: - - name: s1 - optype: msg_send - msg_body: "{payload}" diff --git a/nb/pom.xml b/nb/pom.xml index ed98a0f6d..ab70b88b9 100644 --- a/nb/pom.xml +++ b/nb/pom.xml @@ -135,6 +135,12 @@ 4.15.45-SNAPSHOT + + io.nosqlbench + driver-jms + 4.15.45-SNAPSHOT + + diff --git a/pom.xml b/pom.xml index 0c94390b2..83d8b467b 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ driver-jdbc driver-cockroachdb driver-pulsar - driver-pulsarjms + driver-jms