Pulsar JMS driver type name change and package name typo fix

This commit is contained in:
yabinmeng-gitee 2021-05-03 13:33:59 -05:00
parent 136c920e24
commit f4aafb70c3
16 changed files with 144 additions and 119 deletions

View File

@ -8,16 +8,20 @@
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-pulsarjms</artifactId>
<artifactId>driver-jms</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A PulsarJMS driver for nosqlbench. This provides the ability to inject synthetic data
into a pulsar system via JMS 2.0 compatibile APIs
A JMS driver for nosqlbench. This provides the ability to inject synthetic data
into a pulsar system via JMS 2.0 compatibile APIs.
NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster
as the potential JMS Destination
</description>
<repositories>
<!-- Tempoarily needed for Pulsar JMS Java library -->
<repository>
<id>datastax-releases-local</id>
<name>DataStax Local Releases</name>
@ -32,7 +36,6 @@
</repositories>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>io.nosqlbench</groupId>

View File

@ -1,7 +1,7 @@
package io.nosqlbench.driver.pulsarjms;
package io.nosqlbench.driver.jms;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp;
import io.nosqlbench.driver.jms.ops.JmsOp;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import org.apache.logging.log4j.LogManager;
@ -9,16 +9,16 @@ import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class PulsarJmsAction implements SyncAction {
public class JmsAction implements SyncAction {
private final static Logger logger = LogManager.getLogger(PulsarJmsAction.class);
private final static Logger logger = LogManager.getLogger(JmsAction.class);
private final PulsarJmsActivity activity;
private final JmsActivity activity;
private final int slot;
int maxTries;
public PulsarJmsAction(PulsarJmsActivity activity, int slot) {
public JmsAction(JmsActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
@ -36,10 +36,10 @@ public class PulsarJmsAction implements SyncAction {
long start = System.nanoTime();
PulsarJmsOp pulsarJmsOp;
JmsOp jmsOp;
try (Timer.Context ctx = activity.getBindTimer().time()) {
LongFunction<PulsarJmsOp> readyPulsarJmsOp = activity.getSequencer().get(cycle);
pulsarJmsOp = readyPulsarJmsOp.apply(cycle);
LongFunction<JmsOp> readyPulsarJmsOp = activity.getSequencer().get(cycle);
jmsOp = readyPulsarJmsOp.apply(cycle);
} catch (Exception bindException) {
// if diagnostic mode ...
activity.getErrorhandler().handleError(bindException, cycle, 0);
@ -53,7 +53,7 @@ public class PulsarJmsAction implements SyncAction {
try {
// it is up to the pulsarOp to call Context#close when the activity is executed
// this allows us to track time for async operations
pulsarJmsOp.run(ctx::close);
jmsOp.run(ctx::close);
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity

View File

@ -1,11 +1,11 @@
package io.nosqlbench.driver.pulsarjms;
package io.nosqlbench.driver.jms;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp;
import io.nosqlbench.driver.pulsarjms.util.PulsarJmsActivityUtil;
import io.nosqlbench.driver.jms.ops.JmsOp;
import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
@ -21,7 +21,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarJmsActivity extends SimpleActivity {
public class JmsActivity extends SimpleActivity {
private final ConcurrentHashMap<String, Destination> jmsDestinations = new ConcurrentHashMap<>();
@ -32,7 +32,7 @@ public class PulsarJmsActivity extends SimpleActivity {
private JMSContext jmsContext;
private OpSequence<OpDispenser<PulsarJmsOp>> sequence;
private OpSequence<OpDispenser<JmsOp>> sequence;
private volatile Throwable asyncOperationFailure;
private NBErrorHandler errorhandler;
@ -41,7 +41,7 @@ public class PulsarJmsActivity extends SimpleActivity {
private Counter bytesCounter;
private Histogram messagesizeHistogram;
public PulsarJmsActivity(ActivityDef activityDef) {
public JmsActivity(ActivityDef activityDef) {
super(activityDef);
}
@ -71,7 +71,7 @@ public class PulsarJmsActivity extends SimpleActivity {
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this));
this.sequence = createOpSequence((ot) -> new ReadyJmsOp(ot, this));
setDefaultsFromOpSequence(sequence);
onActivityDefUpdate(activityDef);
@ -98,7 +98,7 @@ public class PulsarJmsActivity extends SimpleActivity {
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
public OpSequence<OpDispenser<PulsarJmsOp>> getSequencer() { return sequence; }
public OpSequence<OpDispenser<JmsOp>> getSequencer() { return sequence; }
public String getPulsarSvcUrl() {
return pulsarSvcUrl;

View File

@ -1,4 +1,4 @@
package io.nosqlbench.driver.pulsarjms;
package io.nosqlbench.driver.jms;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
@ -7,26 +7,26 @@ import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class, selector = "pulsarjms")
public class PulsarJmsActivityType implements ActivityType<PulsarJmsActivity> {
public class JmsActivityType implements ActivityType<JmsActivity> {
@Override
public ActionDispenser getActionDispenser(PulsarJmsActivity activity) {
public ActionDispenser getActionDispenser(JmsActivity activity) {
return new PulsarJmsActionDispenser(activity);
}
@Override
public PulsarJmsActivity getActivity(ActivityDef activityDef) {
return new PulsarJmsActivity(activityDef);
public JmsActivity getActivity(ActivityDef activityDef) {
return new JmsActivity(activityDef);
}
private static class PulsarJmsActionDispenser implements ActionDispenser {
private final PulsarJmsActivity activity;
public PulsarJmsActionDispenser(PulsarJmsActivity activity) {
private final JmsActivity activity;
public PulsarJmsActionDispenser(JmsActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new PulsarJmsAction(activity, slot);
return new JmsAction(activity, slot);
}
}
}

View File

@ -1,8 +1,8 @@
package io.nosqlbench.driver.pulsarjms;
package io.nosqlbench.driver.jms;
import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsMsgSendMapper;
import io.nosqlbench.driver.pulsarjms.ops.PulsarJmsOp;
import io.nosqlbench.driver.pulsarjms.util.PulsarJmsActivityUtil;
import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
import io.nosqlbench.driver.jms.ops.JmsOp;
import io.nosqlbench.driver.jms.util.PulsarJmsActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.CommandTemplate;
@ -13,26 +13,26 @@ import javax.jms.Destination;
import javax.jms.JMSRuntimeException;
import java.util.function.LongFunction;
public class ReadyPulsarJmsOp implements OpDispenser<PulsarJmsOp> {
public class ReadyJmsOp implements OpDispenser<JmsOp> {
private final OpTemplate opTpl;
private final CommandTemplate cmdTpl;
private final LongFunction<PulsarJmsOp> opFunc;
private final PulsarJmsActivity pulsarJmsActivity;
private final LongFunction<JmsOp> opFunc;
private final JmsActivity jmsActivity;
public ReadyPulsarJmsOp(OpTemplate opTemplate, PulsarJmsActivity pulsarJmsActivity) {
public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
this.opTpl = opTemplate;
this.cmdTpl = new CommandTemplate(opTpl);
this.pulsarJmsActivity = pulsarJmsActivity;
this.jmsActivity = jmsActivity;
this.opFunc = resolve();
}
public PulsarJmsOp apply(long value) {
public JmsOp apply(long value) {
return opFunc.apply(value);
}
public LongFunction<PulsarJmsOp> resolve() {
public LongFunction<JmsOp> resolve() {
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
}
@ -63,7 +63,7 @@ public class ReadyPulsarJmsOp implements OpDispenser<PulsarJmsOp> {
LongFunction<Destination> jmsDestinationFunc;
try {
LongFunction<String> finalTopicUriFunc = topicUriFunc;
jmsDestinationFunc = (l) -> pulsarJmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l));
jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l));
}
catch (JMSRuntimeException ex) {
throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!");
@ -79,7 +79,7 @@ public class ReadyPulsarJmsOp implements OpDispenser<PulsarJmsOp> {
}
}
private LongFunction<PulsarJmsOp> resolveMsgSend(
private LongFunction<JmsOp> resolveMsgSend(
LongFunction<Boolean> async_api_func,
LongFunction<Destination> jmsDestinationFunc
) {
@ -96,8 +96,8 @@ public class ReadyPulsarJmsOp implements OpDispenser<PulsarJmsOp> {
throw new RuntimeException("PulsarJMS message send:: \"msg_body\" field must be specified!");
}
return new PulsarJmsMsgSendMapper(
pulsarJmsActivity,
return new JmsMsgSendMapper(
jmsActivity,
async_api_func,
jmsDestinationFunc,
msgBodyFunc);

View File

@ -1,6 +1,6 @@
package io.nosqlbench.driver.pulsarjms.ops;
package io.nosqlbench.driver.jms.ops;
import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity;
import io.nosqlbench.driver.jms.JmsActivity;
import javax.jms.Destination;
import java.util.function.LongFunction;
@ -15,25 +15,25 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarJmsMsgSendMapper extends PulsarJmsOpMapper {
public class JmsMsgSendMapper extends JmsOpMapper {
private final LongFunction<String> msgBodyFunc;
public PulsarJmsMsgSendMapper(PulsarJmsActivity pulsarJmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc,
LongFunction<String> msgBodyFunc) {
super(pulsarJmsActivity, asyncApiFunc, jmsDestinationFunc);
public JmsMsgSendMapper(JmsActivity jmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc,
LongFunction<String> msgBodyFunc) {
super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
this.msgBodyFunc = msgBodyFunc;
}
@Override
public PulsarJmsOp apply(long value) {
public JmsOp apply(long value) {
Destination jmsDestination = jmsDestinationFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
String msgBody = msgBodyFunc.apply(value);
return new PulsarJmsMsgSendOp(
pulsarJmsActivity,
return new JmsMsgSendOp(
jmsActivity,
asyncApi,
jmsDestination,
msgBody

View File

@ -1,8 +1,8 @@
package io.nosqlbench.driver.pulsarjms.ops;
package io.nosqlbench.driver.jms.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity;
import io.nosqlbench.driver.jms.JmsActivity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -11,12 +11,12 @@ import javax.jms.JMSContext;
import javax.jms.JMSProducer;
import java.nio.charset.StandardCharsets;
public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp {
public class JmsMsgSendOp extends JmsTimeTrackOp {
private final static Logger logger = LogManager.getLogger(PulsarJmsMsgSendOp.class);
private final static Logger logger = LogManager.getLogger(JmsMsgSendOp.class);
private final PulsarJmsActivity pulsarActivity;
private final boolean asyncPulsarOp;
private final JmsActivity jmsActivity;
private final boolean asyncJmsOp;
private final Destination jmsDestination;
private final JMSContext jmsContext;
private final JMSProducer jmsProducer;
@ -25,18 +25,18 @@ public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp {
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarJmsMsgSendOp(PulsarJmsActivity pulsarActivity,
boolean asyncPulsarOp,
Destination jmsDestination,
String msgBody) {
this.pulsarActivity = pulsarActivity;
this.asyncPulsarOp = asyncPulsarOp;
public JmsMsgSendOp(JmsActivity jmsActivity,
boolean asyncJmsOp,
Destination jmsDestination,
String msgBody) {
this.jmsActivity = jmsActivity;
this.asyncJmsOp = asyncJmsOp;
this.jmsDestination = jmsDestination;
this.jmsContext = pulsarActivity.getJmsContext();
this.jmsContext = jmsActivity.getJmsContext();
this.jmsProducer = jmsContext.createProducer();
this.msgBody = msgBody;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
this.bytesCounter = jmsActivity.getBytesCounter();
this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
}
@Override

View File

@ -1,9 +1,9 @@
package io.nosqlbench.driver.pulsarjms.ops;
package io.nosqlbench.driver.jms.ops;
/**
* Base type of all Pulsar Operations including Producers and Consumers.
*/
public interface PulsarJmsOp {
public interface JmsOp {
/**
* Execute the operation, invoke the timeTracker when the operation ended.

View File

@ -0,0 +1,21 @@
package io.nosqlbench.driver.jms.ops;
import io.nosqlbench.driver.jms.JmsActivity;
import javax.jms.Destination;
import java.util.function.LongFunction;
public abstract class JmsOpMapper implements LongFunction<JmsOp> {
protected final JmsActivity jmsActivity;
protected final LongFunction<Boolean> asyncApiFunc;
protected final LongFunction<Destination> jmsDestinationFunc;
public JmsOpMapper(JmsActivity jmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc)
{
this.jmsActivity = jmsActivity;
this.asyncApiFunc = asyncApiFunc;
this.jmsDestinationFunc = jmsDestinationFunc;
}
}

View File

@ -1,9 +1,9 @@
package io.nosqlbench.driver.pulsarjms.ops;
package io.nosqlbench.driver.jms.ops;
/**
* Base type of all Sync Pulsar Operations including Producers and Consumers.
*/
public abstract class PulsarJmsTimeTrackOp implements PulsarJmsOp {
public abstract class JmsTimeTrackOp implements JmsOp {
public void run(Runnable timeTracker) {
try {

View File

@ -1,4 +1,4 @@
package io.nosqlbench.driver.pulsarjms.util;
package io.nosqlbench.driver.jms.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;

View File

@ -0,0 +1,37 @@
bindings:
payload: NumberNameToString() #AlphaNumericString(20)
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types:
params:
### static only
async_api: "false"
### Static Only
# Using Enrico's JMS library (https://github.com/riptano/pulsar-jms),
# we can use "Pulsar" as the JMS provider
jms_provider_type: "pulsar"
### Static only
# Valid values: queue (point-to-point) or topic (pub-sub)
jms_desitation_type: "queue"
### Static only
# Valid values: persistent or non-persistent
jms_delivery_mode: "persistent"
### Static Only
# Only relevant when the JMS provider is Pulsar
#topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
pulsar_topic_uri: "persistent://public/default/t0"
blocks:
- name: producer-block
tags:
phase: jms_producer
statements:
- name: s1
optype: msg_send
msg_body: "{payload}"

View File

@ -1,21 +0,0 @@
package io.nosqlbench.driver.pulsarjms.ops;
import io.nosqlbench.driver.pulsarjms.PulsarJmsActivity;
import javax.jms.Destination;
import java.util.function.LongFunction;
public abstract class PulsarJmsOpMapper implements LongFunction<PulsarJmsOp> {
protected final PulsarJmsActivity pulsarJmsActivity;
protected final LongFunction<Boolean> asyncApiFunc;
protected final LongFunction<Destination> jmsDestinationFunc;
public PulsarJmsOpMapper(PulsarJmsActivity pulsarJmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc)
{
this.pulsarJmsActivity = pulsarJmsActivity;
this.asyncApiFunc = asyncApiFunc;
this.jmsDestinationFunc = jmsDestinationFunc;
}
}

View File

@ -1,21 +0,0 @@
bindings:
payload: NumberNameToString() #AlphaNumericString(20)
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types:
params:
# topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
topic_uri: "persistent://public/default/t0"
async_api: "false"
blocks:
- name: producer-block
tags:
phase: jms_producer
admin_task: false
statements:
- name: s1
optype: msg_send
msg_body: "{payload}"

View File

@ -135,6 +135,12 @@
<version>4.15.45-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-jms</artifactId>
<version>4.15.45-SNAPSHOT</version>
</dependency>
</dependencies>
<build>

View File

@ -50,7 +50,7 @@
<module>driver-jdbc</module>
<module>driver-cockroachdb</module>
<module>driver-pulsar</module>
<module>driver-pulsarjms</module>
<module>driver-jms</module>
<!-- VIRTDATA MODULES -->