mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
commit
4f065a7347
91
driver-jms/pom.xml
Normal file
91
driver-jms/pom.xml
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||||
|
<modelVersion>4.0.0</modelVersion>
|
||||||
|
|
||||||
|
<parent>
|
||||||
|
<artifactId>mvn-defaults</artifactId>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<version>4.15.45-SNAPSHOT</version>
|
||||||
|
<relativePath>../mvn-defaults</relativePath>
|
||||||
|
</parent>
|
||||||
|
|
||||||
|
<artifactId>driver-jms</artifactId>
|
||||||
|
<packaging>jar</packaging>
|
||||||
|
<name>${project.artifactId}</name>
|
||||||
|
|
||||||
|
<description>
|
||||||
|
A JMS driver for nosqlbench. This provides the ability to inject synthetic data
|
||||||
|
into a pulsar system via JMS 2.0 compatibile APIs.
|
||||||
|
|
||||||
|
NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster
|
||||||
|
as the potential JMS Destination
|
||||||
|
</description>
|
||||||
|
|
||||||
|
<repositories>
|
||||||
|
<!-- Tempoarily needed for Pulsar JMS Java library -->
|
||||||
|
<repository>
|
||||||
|
<id>datastax-releases-local</id>
|
||||||
|
<name>DataStax Local Releases</name>
|
||||||
|
<url>https://repo.sjc.dsinternal.org/artifactory/datastax-snapshots-local/</url>
|
||||||
|
<releases>
|
||||||
|
<enabled>false</enabled>
|
||||||
|
</releases>
|
||||||
|
<snapshots>
|
||||||
|
<enabled>true</enabled>
|
||||||
|
</snapshots>
|
||||||
|
</repository>
|
||||||
|
</repositories>
|
||||||
|
|
||||||
|
<dependencies>
|
||||||
|
<!-- core dependencies -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<artifactId>engine-api</artifactId>
|
||||||
|
<version>4.15.45-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<artifactId>driver-stdout</artifactId>
|
||||||
|
<version>4.15.45-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-lang3</artifactId>
|
||||||
|
<version>3.12.0</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>1.18.20</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-beanutils</groupId>
|
||||||
|
<artifactId>commons-beanutils</artifactId>
|
||||||
|
<version>1.9.4</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.commons</groupId>
|
||||||
|
<artifactId>commons-configuration2</artifactId>
|
||||||
|
<version>2.7</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<!-- https://mvnrepository.com/artifact/com.datastax.oss/pulsar-jms -->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.datastax.oss</groupId>
|
||||||
|
<artifactId>pulsar-jms</artifactId>
|
||||||
|
<version>1.0.0-ALPHA</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
</dependencies>
|
||||||
|
|
||||||
|
</project>
|
@ -0,0 +1,68 @@
|
|||||||
|
package io.nosqlbench.driver.jms;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
import io.nosqlbench.driver.jms.ops.JmsOp;
|
||||||
|
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||||
|
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
public class JmsAction implements SyncAction {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(JmsAction.class);
|
||||||
|
|
||||||
|
private final JmsActivity activity;
|
||||||
|
private final int slot;
|
||||||
|
|
||||||
|
int maxTries;
|
||||||
|
|
||||||
|
public JmsAction(JmsActivity activity, int slot) {
|
||||||
|
this.activity = activity;
|
||||||
|
this.slot = slot;
|
||||||
|
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init() { }
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int runCycle(long cycle) {
|
||||||
|
// let's fail the action if some async operation failed
|
||||||
|
activity.failOnAsyncOperationFailure();
|
||||||
|
|
||||||
|
long start = System.nanoTime();
|
||||||
|
|
||||||
|
JmsOp jmsOp;
|
||||||
|
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
||||||
|
LongFunction<JmsOp> readyJmsOp = activity.getSequencer().get(cycle);
|
||||||
|
jmsOp = readyJmsOp.apply(cycle);
|
||||||
|
} catch (Exception bindException) {
|
||||||
|
// if diagnostic mode ...
|
||||||
|
activity.getErrorhandler().handleError(bindException, cycle, 0);
|
||||||
|
throw new RuntimeException(
|
||||||
|
"while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < maxTries; i++) {
|
||||||
|
Timer.Context ctx = activity.getExecuteTimer().time();
|
||||||
|
try {
|
||||||
|
// it is up to the jmsOp to call Context#close when the activity is executed
|
||||||
|
// this allows us to track time for async operations
|
||||||
|
jmsOp.run(ctx::close);
|
||||||
|
break;
|
||||||
|
} catch (RuntimeException err) {
|
||||||
|
ErrorDetail errorDetail = activity
|
||||||
|
.getErrorhandler()
|
||||||
|
.handleError(err, cycle, System.nanoTime() - start);
|
||||||
|
if (!errorDetail.isRetryable()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,165 @@
|
|||||||
|
package io.nosqlbench.driver.jms;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Counter;
|
||||||
|
import com.codahale.metrics.Histogram;
|
||||||
|
import com.codahale.metrics.Timer;
|
||||||
|
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
|
||||||
|
import io.nosqlbench.driver.jms.conn.JmsConnInfo;
|
||||||
|
import io.nosqlbench.driver.jms.conn.JmsPulsarConnInfo;
|
||||||
|
import io.nosqlbench.driver.jms.ops.JmsOp;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsUtil;
|
||||||
|
import io.nosqlbench.driver.jms.util.PulsarConfig;
|
||||||
|
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
|
||||||
|
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||||
|
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSContext;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
|
||||||
|
public class JmsActivity extends SimpleActivity {
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<String, Destination> jmsDestinations = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
private String jmsProviderType;
|
||||||
|
private JmsConnInfo jmsConnInfo;
|
||||||
|
|
||||||
|
private JMSContext jmsContext;
|
||||||
|
|
||||||
|
private OpSequence<OpDispenser<JmsOp>> sequence;
|
||||||
|
private volatile Throwable asyncOperationFailure;
|
||||||
|
private NBErrorHandler errorhandler;
|
||||||
|
|
||||||
|
private Timer bindTimer;
|
||||||
|
private Timer executeTimer;
|
||||||
|
private Counter bytesCounter;
|
||||||
|
private Histogram messagesizeHistogram;
|
||||||
|
|
||||||
|
public JmsActivity(ActivityDef activityDef) {
|
||||||
|
super(activityDef);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initActivity() {
|
||||||
|
super.initActivity();
|
||||||
|
|
||||||
|
// default JMS type: Pulsar
|
||||||
|
// - currently this is the only supported JMS provider
|
||||||
|
jmsProviderType =
|
||||||
|
activityDef.getParams()
|
||||||
|
.getOptionalString(JmsUtil.JMS_PROVIDER_TYPE_KEY_STR)
|
||||||
|
.orElse(JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label);
|
||||||
|
|
||||||
|
// "Pulsar" as the JMS provider
|
||||||
|
if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
|
||||||
|
|
||||||
|
String webSvcUrl =
|
||||||
|
activityDef.getParams()
|
||||||
|
.getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR)
|
||||||
|
.orElse("http://localhost:8080");
|
||||||
|
String pulsarSvcUrl =
|
||||||
|
activityDef.getParams()
|
||||||
|
.getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR)
|
||||||
|
.orElse("pulsar://localhost:6650");
|
||||||
|
|
||||||
|
if (StringUtils.isAnyBlank(webSvcUrl, pulsarSvcUrl)) {
|
||||||
|
throw new RuntimeException("For \"" + JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label + "\" type, " +
|
||||||
|
"\"" + JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR + "\" and " +
|
||||||
|
"\"" + JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR + "\" parameters are manadatory!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if extra Pulsar config. file is in place
|
||||||
|
// - default file: "pulsar_config.properties" under the current directory
|
||||||
|
String pulsarCfgFile =
|
||||||
|
activityDef.getParams()
|
||||||
|
.getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR)
|
||||||
|
.orElse(JmsUtil.JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME);
|
||||||
|
|
||||||
|
PulsarConfig pulsarConfig = new PulsarConfig(pulsarCfgFile);
|
||||||
|
|
||||||
|
jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, webSvcUrl, pulsarSvcUrl, pulsarConfig);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new RuntimeException("Unsupported JMS driver type : " + jmsProviderType);
|
||||||
|
}
|
||||||
|
|
||||||
|
PulsarConnectionFactory factory;
|
||||||
|
try {
|
||||||
|
factory = new PulsarConnectionFactory(jmsConnInfo.getJmsConnConfig());
|
||||||
|
this.jmsContext = factory.createContext();
|
||||||
|
} catch (JMSException e) {
|
||||||
|
throw new RuntimeException(
|
||||||
|
"Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!");
|
||||||
|
}
|
||||||
|
|
||||||
|
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||||
|
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||||
|
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
|
||||||
|
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
|
||||||
|
|
||||||
|
if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
|
||||||
|
this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this));
|
||||||
|
}
|
||||||
|
|
||||||
|
setDefaultsFromOpSequence(sequence);
|
||||||
|
onActivityDefUpdate(activityDef);
|
||||||
|
|
||||||
|
this.errorhandler = new NBErrorHandler(
|
||||||
|
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
|
||||||
|
this::getExceptionMetrics
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
|
||||||
|
*/
|
||||||
|
public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) {
|
||||||
|
String encodedTopicStr =
|
||||||
|
JmsUtil.encode(jmsDestinationType, destName);
|
||||||
|
Destination destination = jmsDestinations.get(encodedTopicStr);
|
||||||
|
|
||||||
|
if ( destination == null ) {
|
||||||
|
// TODO: should we match Persistent/Non-peristent JMS Delivery mode with
|
||||||
|
// Pulsar Persistent/Non-prsistent topic?
|
||||||
|
if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.QUEUE.label)) {
|
||||||
|
destination = jmsContext.createQueue(destName);
|
||||||
|
} else if (StringUtils.equalsIgnoreCase(jmsDestinationType, JmsUtil.JMS_DESTINATION_TYPES.TOPIC.label)) {
|
||||||
|
destination = jmsContext.createTopic(destName);
|
||||||
|
}
|
||||||
|
|
||||||
|
jmsDestinations.put(encodedTopicStr, destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
return destination;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
|
||||||
|
public OpSequence<OpDispenser<JmsOp>> getSequencer() { return sequence; }
|
||||||
|
|
||||||
|
public String getJmsProviderType() { return jmsProviderType; }
|
||||||
|
public JmsConnInfo getJmsConnInfo() { return jmsConnInfo; }
|
||||||
|
public JMSContext getJmsContext() { return jmsContext; }
|
||||||
|
|
||||||
|
public Timer getBindTimer() { return bindTimer; }
|
||||||
|
public Timer getExecuteTimer() { return this.executeTimer; }
|
||||||
|
public Counter getBytesCounter() { return bytesCounter; }
|
||||||
|
public Histogram getMessagesizeHistogram() { return messagesizeHistogram; }
|
||||||
|
|
||||||
|
public NBErrorHandler getErrorhandler() { return errorhandler; }
|
||||||
|
|
||||||
|
public void failOnAsyncOperationFailure() {
|
||||||
|
if (asyncOperationFailure != null) {
|
||||||
|
throw new RuntimeException(asyncOperationFailure);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public void asyncOperationFailed(Throwable ex) {
|
||||||
|
this.asyncOperationFailure = ex;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,32 @@
|
|||||||
|
package io.nosqlbench.driver.jms;
|
||||||
|
|
||||||
|
import io.nosqlbench.engine.api.activityapi.core.Action;
|
||||||
|
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||||
|
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||||
|
import io.nosqlbench.nb.annotations.Service;
|
||||||
|
|
||||||
|
@Service(value = ActivityType.class, selector = "jms")
|
||||||
|
public class JmsActivityType implements ActivityType<JmsActivity> {
|
||||||
|
@Override
|
||||||
|
public ActionDispenser getActionDispenser(JmsActivity activity) {
|
||||||
|
return new PulsarJmsActionDispenser(activity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JmsActivity getActivity(ActivityDef activityDef) {
|
||||||
|
return new JmsActivity(activityDef);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class PulsarJmsActionDispenser implements ActionDispenser {
|
||||||
|
private final JmsActivity activity;
|
||||||
|
public PulsarJmsActionDispenser(JmsActivity activity) {
|
||||||
|
this.activity = activity;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Action getAction(int slot) {
|
||||||
|
return new JmsAction(activity, slot);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,74 @@
|
|||||||
|
package io.nosqlbench.driver.jms;
|
||||||
|
|
||||||
|
import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
|
||||||
|
import io.nosqlbench.driver.jms.ops.JmsOp;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsHeader;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsUtil;
|
||||||
|
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||||
|
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||||
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSRuntimeException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
abstract public class ReadyJmsOp implements OpDispenser<JmsOp> {
|
||||||
|
|
||||||
|
protected final OpTemplate opTpl;
|
||||||
|
protected final CommandTemplate cmdTpl;
|
||||||
|
protected final JmsActivity jmsActivity;
|
||||||
|
|
||||||
|
protected final String stmtOpType;
|
||||||
|
protected LongFunction<Boolean> asyncApiFunc;
|
||||||
|
protected LongFunction<String> jmsDestinationTypeFunc;
|
||||||
|
|
||||||
|
protected final LongFunction<JmsOp> opFunc;
|
||||||
|
|
||||||
|
public ReadyJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
|
||||||
|
this.opTpl = opTemplate;
|
||||||
|
this.cmdTpl = new CommandTemplate(opTpl);
|
||||||
|
this.jmsActivity = jmsActivity;
|
||||||
|
|
||||||
|
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
|
||||||
|
throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
|
||||||
|
}
|
||||||
|
this.stmtOpType = cmdTpl.getStatic("optype");
|
||||||
|
|
||||||
|
// Global/Doc-level parameter: async_api
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.ASYNC_API_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.ASYNC_API_KEY_STR)) {
|
||||||
|
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.ASYNC_API_KEY_STR));
|
||||||
|
this.asyncApiFunc = (l) -> value;
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("\"" + JmsUtil.ASYNC_API_KEY_STR + "\" parameter cannot be dynamic!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Global/Doc-level parameter: jms_desitation_type
|
||||||
|
// - queue: point-to-point
|
||||||
|
// - topic: pub/sub
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
|
||||||
|
jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("\"" + JmsUtil.JMS_DESTINATION_TYPE_KEY_STR + "\" parameter cannot be dynamic!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.opFunc = resolveJms();
|
||||||
|
}
|
||||||
|
|
||||||
|
public JmsOp apply(long value) { return opFunc.apply(value); }
|
||||||
|
|
||||||
|
abstract LongFunction<JmsOp> resolveJms();
|
||||||
|
}
|
@ -0,0 +1,248 @@
|
|||||||
|
package io.nosqlbench.driver.jms;
|
||||||
|
|
||||||
|
import io.nosqlbench.driver.jms.ops.JmsMsgReadMapper;
|
||||||
|
import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
|
||||||
|
import io.nosqlbench.driver.jms.ops.JmsOp;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsUtil;
|
||||||
|
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||||
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.JMSRuntimeException;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class ReadyPulsarJmsOp extends ReadyJmsOp {
|
||||||
|
|
||||||
|
public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
|
||||||
|
super(opTemplate, jmsActivity);
|
||||||
|
}
|
||||||
|
|
||||||
|
public LongFunction<JmsOp> resolveJms() {
|
||||||
|
// Global/Doc-level parameter: topic_uri
|
||||||
|
LongFunction<String> topicUriFunc = (l) -> null;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR)) {
|
||||||
|
topicUriFunc = (l) -> cmdTpl.getStatic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR);
|
||||||
|
} else {
|
||||||
|
topicUriFunc = (l) -> cmdTpl.getDynamic(JmsUtil.PULSAR_JMS_TOPIC_URI_KEY_STR, l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Global: JMS destination
|
||||||
|
LongFunction<Destination> jmsDestinationFunc;
|
||||||
|
try {
|
||||||
|
LongFunction<String> finalTopicUriFunc = topicUriFunc;
|
||||||
|
jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(
|
||||||
|
jmsDestinationTypeFunc.apply(l),
|
||||||
|
finalTopicUriFunc.apply(l));
|
||||||
|
}
|
||||||
|
catch (JMSRuntimeException ex) {
|
||||||
|
throw new RuntimeException("Unable to create JMS destination!");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) {
|
||||||
|
return resolveMsgSend(asyncApiFunc, jmsDestinationFunc);
|
||||||
|
} else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) {
|
||||||
|
return resolveMsgRead(asyncApiFunc, jmsDestinationFunc);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("Unsupported JMS operation type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private LongFunction<JmsOp> resolveMsgSend(
|
||||||
|
LongFunction<Boolean> async_api_func,
|
||||||
|
LongFunction<Destination> jmsDestinationFunc
|
||||||
|
) {
|
||||||
|
JmsHeaderLongFunc jmsHeaderLongFunc = new JmsHeaderLongFunc();
|
||||||
|
|
||||||
|
// JMS header: delivery mode
|
||||||
|
LongFunction<Integer> msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
|
||||||
|
msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc);
|
||||||
|
|
||||||
|
// JMS header: message priority
|
||||||
|
LongFunction<Integer> msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
|
||||||
|
msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc);
|
||||||
|
|
||||||
|
// JMS header: message TTL
|
||||||
|
LongFunction<Long> msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
|
||||||
|
msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc);
|
||||||
|
|
||||||
|
// JMS header: message delivery delay
|
||||||
|
LongFunction<Long> msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
|
||||||
|
msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc);
|
||||||
|
|
||||||
|
// JMS header: disable message timestamp
|
||||||
|
LongFunction<Boolean> disableMsgTimestampFunc = (l) -> false;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
|
||||||
|
disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc);
|
||||||
|
|
||||||
|
// JMS header: disable message ID
|
||||||
|
LongFunction<Boolean> disableMsgIdFunc = (l) -> false;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
|
||||||
|
disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc);
|
||||||
|
|
||||||
|
// JMS message properties
|
||||||
|
String jmsMsgPropertyListStr = "";
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
|
||||||
|
jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> jmsMsgProperties = new HashMap<>();
|
||||||
|
if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) {
|
||||||
|
jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";"))
|
||||||
|
.map(s -> s.split("=", 2))
|
||||||
|
.collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : ""));
|
||||||
|
}
|
||||||
|
|
||||||
|
LongFunction<String> msgBodyFunc;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
|
||||||
|
msgBodyFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR);
|
||||||
|
} else if (cmdTpl.isDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
|
||||||
|
msgBodyFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR, l);
|
||||||
|
} else {
|
||||||
|
msgBodyFunc = (l) -> null;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException("JMS message send:: \"msg_body\" field must be specified!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new JmsMsgSendMapper(
|
||||||
|
jmsActivity,
|
||||||
|
async_api_func,
|
||||||
|
jmsDestinationFunc,
|
||||||
|
jmsHeaderLongFunc,
|
||||||
|
jmsMsgProperties,
|
||||||
|
msgBodyFunc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private LongFunction<JmsOp> resolveMsgRead(
|
||||||
|
LongFunction<Boolean> async_api_func,
|
||||||
|
LongFunction<Destination> jmsDestinationFunc
|
||||||
|
) {
|
||||||
|
// For Pulsar JMS, make "durable" as the default
|
||||||
|
LongFunction<Boolean> jmsConsumerDurableFunc = (l) -> true;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
|
||||||
|
jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR));
|
||||||
|
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
|
||||||
|
jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LongFunction<Boolean> jmsConsumerSharedFunc = (l) -> true;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
|
||||||
|
jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR));
|
||||||
|
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
|
||||||
|
jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LongFunction<String> jmsMsgSubscriptionFunc = (l) -> "";
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
|
||||||
|
jmsMsgSubscriptionFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR);
|
||||||
|
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
|
||||||
|
jmsMsgSubscriptionFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR, l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LongFunction<String> jmsMsgReadSelectorFunc = (l) -> "";
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
|
||||||
|
jmsMsgReadSelectorFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR);
|
||||||
|
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
|
||||||
|
jmsMsgReadSelectorFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR, l);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LongFunction<Boolean> jmsMsgNoLocalFunc = (l) -> true;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
|
||||||
|
jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR));
|
||||||
|
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
|
||||||
|
jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LongFunction<Long> jmsReadTimeoutFunc = (l) -> 0L;
|
||||||
|
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
|
||||||
|
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
|
||||||
|
jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR));
|
||||||
|
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
|
||||||
|
jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR, l));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new JmsMsgReadMapper(
|
||||||
|
jmsActivity,
|
||||||
|
async_api_func,
|
||||||
|
jmsDestinationFunc,
|
||||||
|
jmsConsumerDurableFunc,
|
||||||
|
jmsConsumerSharedFunc,
|
||||||
|
jmsMsgSubscriptionFunc,
|
||||||
|
jmsMsgReadSelectorFunc,
|
||||||
|
jmsMsgNoLocalFunc,
|
||||||
|
jmsReadTimeoutFunc);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,21 @@
|
|||||||
|
package io.nosqlbench.driver.jms.conn;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class JmsConnInfo {
|
||||||
|
|
||||||
|
protected final String jmsProviderType;
|
||||||
|
protected final Map<String, Object> jmsConnConfig;
|
||||||
|
|
||||||
|
protected JmsConnInfo(String jmsProviderType) {
|
||||||
|
this.jmsProviderType = jmsProviderType;
|
||||||
|
this.jmsConnConfig = new HashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getJmsConnConfig() { return this.jmsConnConfig; }
|
||||||
|
public void resetJmsConnConfig() { this.jmsConnConfig.clear(); }
|
||||||
|
public void addJmsConnConfigItems(Map<String, Object> cfgItems) { this.jmsConnConfig.putAll(cfgItems); }
|
||||||
|
public void addJmsConnConfigItem(String key, Object value) { this.jmsConnConfig.put(key, value); }
|
||||||
|
public void removeJmsConnConfigItem(String key) { this.jmsConnConfig.remove(key); }
|
||||||
|
}
|
@ -0,0 +1,42 @@
|
|||||||
|
package io.nosqlbench.driver.jms.conn;
|
||||||
|
|
||||||
|
import io.nosqlbench.driver.jms.util.PulsarConfig;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class JmsPulsarConnInfo extends JmsConnInfo {
|
||||||
|
|
||||||
|
private final String webSvcUrl;
|
||||||
|
private final String pulsarSvcUrl;
|
||||||
|
private final PulsarConfig extraPulsarConfig;
|
||||||
|
|
||||||
|
public JmsPulsarConnInfo(String jmsProviderType, String webSvcUrl, String pulsarSvcUrl, PulsarConfig pulsarConfig) {
|
||||||
|
super(jmsProviderType);
|
||||||
|
|
||||||
|
this.webSvcUrl = webSvcUrl;
|
||||||
|
this.pulsarSvcUrl = pulsarSvcUrl;
|
||||||
|
this.extraPulsarConfig = pulsarConfig;
|
||||||
|
|
||||||
|
this.addJmsConnConfigItem("webServiceUrl", this.webSvcUrl);
|
||||||
|
this.addJmsConnConfigItem("brokerServiceUrl", this.pulsarSvcUrl);
|
||||||
|
|
||||||
|
Map<String, Object> clientCfgMap = this.extraPulsarConfig.getClientConfMap();
|
||||||
|
if (!clientCfgMap.isEmpty()) {
|
||||||
|
this.addJmsConnConfigItems(clientCfgMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> producerCfgMap = this.extraPulsarConfig.getProducerConfMap();
|
||||||
|
if (!producerCfgMap.isEmpty()) {
|
||||||
|
this.addJmsConnConfigItem("producerConfig", producerCfgMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> consumerCfgMap = this.extraPulsarConfig.getConsumerConfMap();
|
||||||
|
if (!consumerCfgMap.isEmpty()) {
|
||||||
|
this.addJmsConnConfigItem("consumerConfig", consumerCfgMap);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getWebSvcUrl() { return this.webSvcUrl; }
|
||||||
|
public String getPulsarSvcUrl() { return this.pulsarSvcUrl; }
|
||||||
|
public PulsarConfig getExtraPulsarConfig() { return this.extraPulsarConfig; }
|
||||||
|
}
|
@ -0,0 +1,72 @@
|
|||||||
|
package io.nosqlbench.driver.jms.ops;
|
||||||
|
|
||||||
|
import io.nosqlbench.driver.jms.JmsActivity;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
|
||||||
|
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
|
||||||
|
* retried if needed.
|
||||||
|
*
|
||||||
|
* This function doesn't act *as* the operation. It merely maps the construction logic into
|
||||||
|
* a simple functional type, given the component functions.
|
||||||
|
*
|
||||||
|
* For additional parameterization, the command template is also provided.
|
||||||
|
*/
|
||||||
|
public class JmsMsgReadMapper extends JmsOpMapper {
|
||||||
|
|
||||||
|
private final LongFunction<Boolean> jmsConsumerDurableFunc;
|
||||||
|
private final LongFunction<Boolean> jmsConsumerSharedFunc;
|
||||||
|
private final LongFunction<String> jmsMsgSubscriptionFunc;
|
||||||
|
private final LongFunction<String> jmsMsgReadSelectorFunc;
|
||||||
|
private final LongFunction<Boolean> jmsMsgNoLocalFunc;
|
||||||
|
private final LongFunction<Long> jmsReadTimeoutFunc;
|
||||||
|
|
||||||
|
public JmsMsgReadMapper(JmsActivity jmsActivity,
|
||||||
|
LongFunction<Boolean> asyncApiFunc,
|
||||||
|
LongFunction<Destination> jmsDestinationFunc,
|
||||||
|
LongFunction<Boolean> jmsConsumerDurableFunc,
|
||||||
|
LongFunction<Boolean> jmsConsumerSharedFunc,
|
||||||
|
LongFunction<String> jmsMsgSubscriptionFunc,
|
||||||
|
LongFunction<String> jmsMsgReadSelectorFunc,
|
||||||
|
LongFunction<Boolean> jmsMsgNoLocalFunc,
|
||||||
|
LongFunction<Long> jmsReadTimeoutFunc) {
|
||||||
|
super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
|
||||||
|
|
||||||
|
this.jmsConsumerDurableFunc = jmsConsumerDurableFunc;
|
||||||
|
this.jmsConsumerSharedFunc = jmsConsumerSharedFunc;
|
||||||
|
this.jmsMsgSubscriptionFunc = jmsMsgSubscriptionFunc;
|
||||||
|
this.jmsMsgReadSelectorFunc = jmsMsgReadSelectorFunc;
|
||||||
|
this.jmsMsgNoLocalFunc = jmsMsgNoLocalFunc;
|
||||||
|
this.jmsReadTimeoutFunc = jmsReadTimeoutFunc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JmsOp apply(long value) {
|
||||||
|
boolean asyncApi = asyncApiFunc.apply(value);
|
||||||
|
Destination jmsDestination = jmsDestinationFunc.apply(value);
|
||||||
|
boolean jmsConsumerDurable = jmsConsumerDurableFunc.apply(value);
|
||||||
|
boolean jmsConsumerShared = jmsConsumerSharedFunc.apply(value);
|
||||||
|
String jmsMsgSubscription = jmsMsgSubscriptionFunc.apply(value);
|
||||||
|
String jmsMsgReadSelector = jmsMsgReadSelectorFunc.apply(value);
|
||||||
|
boolean jmsMsgNoLocal = jmsMsgNoLocalFunc.apply(value);
|
||||||
|
long jmsReadTimeout = jmsReadTimeoutFunc.apply(value);
|
||||||
|
|
||||||
|
// Default to NO read timeout
|
||||||
|
if (jmsReadTimeout < 0) jmsReadTimeout = 0;
|
||||||
|
|
||||||
|
return new JmsMsgReadOp(
|
||||||
|
jmsActivity,
|
||||||
|
asyncApi,
|
||||||
|
jmsDestination,
|
||||||
|
jmsConsumerDurable,
|
||||||
|
jmsConsumerShared,
|
||||||
|
jmsMsgSubscription,
|
||||||
|
jmsMsgReadSelector,
|
||||||
|
jmsMsgNoLocal,
|
||||||
|
jmsReadTimeout
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,114 @@
|
|||||||
|
package io.nosqlbench.driver.jms.ops;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Counter;
|
||||||
|
import com.codahale.metrics.Histogram;
|
||||||
|
import io.nosqlbench.driver.jms.JmsActivity;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import javax.jms.*;
|
||||||
|
|
||||||
|
public class JmsMsgReadOp extends JmsTimeTrackOp {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(JmsMsgReadOp.class);
|
||||||
|
|
||||||
|
private final JmsActivity jmsActivity;
|
||||||
|
private final boolean asyncJmsOp;
|
||||||
|
private final Destination jmsDestination;
|
||||||
|
|
||||||
|
private final JMSContext jmsContext;
|
||||||
|
private final JMSConsumer jmsConsumer;
|
||||||
|
private final boolean jmsConsumerDurable;
|
||||||
|
private final boolean jmsConsumerShared;
|
||||||
|
private final String jmsMsgSubscrption;
|
||||||
|
private final String jmsMsgReadSelector;
|
||||||
|
private final boolean jmsMsgNoLocal;
|
||||||
|
private final long jmsReadTimeout;
|
||||||
|
|
||||||
|
private final Counter bytesCounter;
|
||||||
|
private final Histogram messagesizeHistogram;
|
||||||
|
|
||||||
|
public JmsMsgReadOp(JmsActivity jmsActivity,
|
||||||
|
boolean asyncJmsOp,
|
||||||
|
Destination jmsDestination,
|
||||||
|
boolean jmsConsumerDurable,
|
||||||
|
boolean jmsConsumerShared,
|
||||||
|
String jmsMsgSubscrption,
|
||||||
|
String jmsMsgReadSelector,
|
||||||
|
boolean jmsMsgNoLocal,
|
||||||
|
long jmsReadTimeout) {
|
||||||
|
this.jmsActivity = jmsActivity;
|
||||||
|
this.asyncJmsOp = asyncJmsOp;
|
||||||
|
this.jmsDestination = jmsDestination;
|
||||||
|
this.jmsConsumerDurable = jmsConsumerDurable;
|
||||||
|
this.jmsConsumerShared = jmsConsumerShared;
|
||||||
|
this.jmsMsgReadSelector = jmsMsgReadSelector;
|
||||||
|
this.jmsMsgSubscrption = jmsMsgSubscrption;
|
||||||
|
this.jmsMsgNoLocal = jmsMsgNoLocal;
|
||||||
|
this.jmsReadTimeout = jmsReadTimeout;
|
||||||
|
|
||||||
|
this.jmsContext = jmsActivity.getJmsContext();
|
||||||
|
this.jmsConsumer = createJmsConsumer();
|
||||||
|
|
||||||
|
this.bytesCounter = jmsActivity.getBytesCounter();
|
||||||
|
this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
|
||||||
|
}
|
||||||
|
|
||||||
|
private JMSConsumer createJmsConsumer() {
|
||||||
|
JMSConsumer jmsConsumer;
|
||||||
|
|
||||||
|
try {
|
||||||
|
if (jmsConsumerDurable) {
|
||||||
|
if (jmsConsumerShared)
|
||||||
|
jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
|
||||||
|
else
|
||||||
|
jmsConsumer = jmsContext.createDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector, jmsMsgNoLocal);
|
||||||
|
} else {
|
||||||
|
if (jmsConsumerShared)
|
||||||
|
jmsConsumer = jmsContext.createSharedConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
|
||||||
|
else
|
||||||
|
jmsConsumer = jmsContext.createConsumer(jmsDestination, jmsMsgReadSelector, jmsMsgNoLocal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (InvalidDestinationRuntimeException invalidDestinationRuntimeException) {
|
||||||
|
throw new RuntimeException("Failed to create JMS consumer: invalid destination!");
|
||||||
|
}
|
||||||
|
catch (InvalidSelectorRuntimeException invalidSelectorRuntimeException) {
|
||||||
|
throw new RuntimeException("Failed to create JMS consumer: invalid message selector!");
|
||||||
|
}
|
||||||
|
catch (JMSRuntimeException jmsRuntimeException) {
|
||||||
|
jmsRuntimeException.printStackTrace();
|
||||||
|
throw new RuntimeException("Failed to create JMS consumer: runtime internal error!");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: async consumer
|
||||||
|
// if (this.asyncJmsOp) {
|
||||||
|
// jmsConsumer.setMessageListener();
|
||||||
|
// }
|
||||||
|
|
||||||
|
return jmsConsumer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
// FIXME: jmsReadTimeout being 0 behaves like receiveNoWait() instead of waiting indefinitley
|
||||||
|
Message receivedMsg = jmsConsumer.receive(jmsReadTimeout);
|
||||||
|
try {
|
||||||
|
if (receivedMsg != null) {
|
||||||
|
receivedMsg.acknowledge();
|
||||||
|
byte[] receivedMsgBody = receivedMsg.getBody(byte[].class);
|
||||||
|
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("received msg-payload={}", new String(receivedMsgBody));
|
||||||
|
}
|
||||||
|
|
||||||
|
int messagesize = receivedMsgBody.length;
|
||||||
|
bytesCounter.inc(messagesize);
|
||||||
|
messagesizeHistogram.update(messagesize);
|
||||||
|
}
|
||||||
|
} catch (JMSException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
throw new RuntimeException("Failed to acknowledge the received JMS message.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,55 @@
|
|||||||
|
package io.nosqlbench.driver.jms.ops;
|
||||||
|
|
||||||
|
import io.nosqlbench.driver.jms.JmsActivity;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsHeader;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
|
||||||
|
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
|
||||||
|
* retried if needed.
|
||||||
|
*
|
||||||
|
* This function doesn't act *as* the operation. It merely maps the construction logic into
|
||||||
|
* a simple functional type, given the component functions.
|
||||||
|
*
|
||||||
|
* For additional parameterization, the command template is also provided.
|
||||||
|
*/
|
||||||
|
public class JmsMsgSendMapper extends JmsOpMapper {
|
||||||
|
private final JmsHeaderLongFunc jmsHeaderLongFunc;
|
||||||
|
private final Map<String, Object> jmsMsgProperties;
|
||||||
|
private final LongFunction<String> msgBodyFunc;
|
||||||
|
|
||||||
|
public JmsMsgSendMapper(JmsActivity jmsActivity,
|
||||||
|
LongFunction<Boolean> asyncApiFunc,
|
||||||
|
LongFunction<Destination> jmsDestinationFunc,
|
||||||
|
JmsHeaderLongFunc jmsHeaderLongFunc,
|
||||||
|
Map<String, Object> jmsMsgProperties,
|
||||||
|
LongFunction<String> msgBodyFunc) {
|
||||||
|
super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
|
||||||
|
|
||||||
|
this.jmsHeaderLongFunc = jmsHeaderLongFunc;
|
||||||
|
this.jmsMsgProperties = jmsMsgProperties;
|
||||||
|
this.msgBodyFunc = msgBodyFunc;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public JmsOp apply(long value) {
|
||||||
|
boolean asyncApi = asyncApiFunc.apply(value);
|
||||||
|
Destination jmsDestination = jmsDestinationFunc.apply(value);
|
||||||
|
JmsHeader jmsHeader = (JmsHeader)jmsHeaderLongFunc.apply(value);
|
||||||
|
String msgBody = msgBodyFunc.apply(value);
|
||||||
|
|
||||||
|
return new JmsMsgSendOp(
|
||||||
|
jmsActivity,
|
||||||
|
asyncApi,
|
||||||
|
jmsDestination,
|
||||||
|
jmsHeader,
|
||||||
|
jmsMsgProperties,
|
||||||
|
msgBody
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,124 @@
|
|||||||
|
package io.nosqlbench.driver.jms.ops;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Counter;
|
||||||
|
import com.codahale.metrics.Histogram;
|
||||||
|
import io.nosqlbench.driver.jms.JmsActivity;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsHeader;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import javax.jms.*;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class JmsMsgSendOp extends JmsTimeTrackOp {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(JmsMsgSendOp.class);
|
||||||
|
|
||||||
|
private final JmsActivity jmsActivity;
|
||||||
|
private final boolean asyncJmsOp;
|
||||||
|
private final Destination jmsDestination;
|
||||||
|
private final JmsHeader jmsHeader;
|
||||||
|
private final Map<String, Object> jmsMsgProperties;
|
||||||
|
|
||||||
|
private final JMSContext jmsContext;
|
||||||
|
private final JMSProducer jmsProducer;
|
||||||
|
private final String msgBody;
|
||||||
|
|
||||||
|
private final Counter bytesCounter;
|
||||||
|
private final Histogram messagesizeHistogram;
|
||||||
|
|
||||||
|
public JmsMsgSendOp(JmsActivity jmsActivity,
|
||||||
|
boolean asyncJmsOp,
|
||||||
|
Destination jmsDestination,
|
||||||
|
JmsHeader jmsHeader,
|
||||||
|
Map<String, Object> jmsMsgProperties,
|
||||||
|
String msgBody) {
|
||||||
|
this.jmsActivity = jmsActivity;
|
||||||
|
this.asyncJmsOp = asyncJmsOp;
|
||||||
|
this.jmsDestination = jmsDestination;
|
||||||
|
|
||||||
|
this.jmsHeader = jmsHeader;
|
||||||
|
this.jmsMsgProperties = jmsMsgProperties;
|
||||||
|
this.msgBody = msgBody;
|
||||||
|
|
||||||
|
if (!jmsHeader.isValidHeader()) {
|
||||||
|
throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText());
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((msgBody == null) || msgBody.isEmpty()) {
|
||||||
|
throw new RuntimeException("JMS message body can't be empty!");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.jmsContext = jmsActivity.getJmsContext();
|
||||||
|
this.jmsProducer = createJmsProducer();
|
||||||
|
|
||||||
|
this.bytesCounter = jmsActivity.getBytesCounter();
|
||||||
|
this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
|
||||||
|
}
|
||||||
|
|
||||||
|
private JMSProducer createJmsProducer() {
|
||||||
|
JMSProducer jmsProducer = this.jmsContext.createProducer();
|
||||||
|
|
||||||
|
jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode());
|
||||||
|
jmsProducer.setPriority(this.jmsHeader.getMsgPriority());
|
||||||
|
jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay());
|
||||||
|
jmsProducer.setDisableMessageTimestamp(this.jmsHeader.isDisableMsgTimestamp());
|
||||||
|
jmsProducer.setDisableMessageID(this.jmsHeader.isDisableMsgId());
|
||||||
|
|
||||||
|
if (this.asyncJmsOp) {
|
||||||
|
jmsProducer.setAsync(new CompletionListener() {
|
||||||
|
@Override
|
||||||
|
public void onCompletion(Message msg) {
|
||||||
|
try {
|
||||||
|
byte[] msgBody = msg.getBody(byte[].class);
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Async message send success - message body: " + new String(msgBody));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (JMSException jmsException) {
|
||||||
|
jmsException.printStackTrace();
|
||||||
|
logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Message msg, Exception e) {
|
||||||
|
try {
|
||||||
|
byte[] msgBody = msg.getBody(byte[].class);
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Async message send failure - message body: " + new String(msgBody));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (JMSException jmsException) {
|
||||||
|
jmsException.printStackTrace();
|
||||||
|
logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<String, Object> entry : jmsMsgProperties.entrySet()) {
|
||||||
|
jmsProducer.setProperty(entry.getKey(), entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
return jmsProducer;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
int messageSize;
|
||||||
|
try {
|
||||||
|
byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8);
|
||||||
|
messageSize = msgBytes.length;
|
||||||
|
jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8));
|
||||||
|
|
||||||
|
messagesizeHistogram.update(messageSize);
|
||||||
|
bytesCounter.inc(messageSize);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
logger.error("Failed to send JMS message - " + msgBody);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,13 @@
|
|||||||
|
package io.nosqlbench.driver.jms.ops;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base type of all Pulsar Operations including Producers and Consumers.
|
||||||
|
*/
|
||||||
|
public interface JmsOp {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the operation, invoke the timeTracker when the operation ended.
|
||||||
|
* The timeTracker can be invoked in a separate thread, it is only used for metrics.
|
||||||
|
*/
|
||||||
|
void run(Runnable timeTracker);
|
||||||
|
}
|
@ -0,0 +1,23 @@
|
|||||||
|
package io.nosqlbench.driver.jms.ops;
|
||||||
|
|
||||||
|
import io.nosqlbench.driver.jms.JmsActivity;
|
||||||
|
import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
|
||||||
|
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
public abstract class JmsOpMapper implements LongFunction<JmsOp> {
|
||||||
|
protected final JmsActivity jmsActivity;
|
||||||
|
protected final LongFunction<Boolean> asyncApiFunc;
|
||||||
|
protected final LongFunction<Destination> jmsDestinationFunc;
|
||||||
|
|
||||||
|
public JmsOpMapper(JmsActivity jmsActivity,
|
||||||
|
LongFunction<Boolean> asyncApiFunc,
|
||||||
|
LongFunction<Destination> jmsDestinationFunc)
|
||||||
|
{
|
||||||
|
this.jmsActivity = jmsActivity;
|
||||||
|
this.asyncApiFunc = asyncApiFunc;
|
||||||
|
this.jmsDestinationFunc = jmsDestinationFunc;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,17 @@
|
|||||||
|
package io.nosqlbench.driver.jms.ops;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base type of all Sync Pulsar Operations including Producers and Consumers.
|
||||||
|
*/
|
||||||
|
public abstract class JmsTimeTrackOp implements JmsOp {
|
||||||
|
|
||||||
|
public void run(Runnable timeTracker) {
|
||||||
|
try {
|
||||||
|
this.run();
|
||||||
|
} finally {
|
||||||
|
timeTracker.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract void run();
|
||||||
|
}
|
@ -0,0 +1,66 @@
|
|||||||
|
package io.nosqlbench.driver.jms.util;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import lombok.ToString;
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
|
||||||
|
@Setter
|
||||||
|
@Getter
|
||||||
|
@AllArgsConstructor
|
||||||
|
@ToString
|
||||||
|
public class JmsHeader {
|
||||||
|
private int deliveryMode;
|
||||||
|
private int msgPriority;
|
||||||
|
private long msgTtl;
|
||||||
|
private long msgDeliveryDelay;
|
||||||
|
private boolean disableMsgTimestamp;
|
||||||
|
private boolean disableMsgId;
|
||||||
|
|
||||||
|
public boolean isValidDeliveryMode() {
|
||||||
|
return (deliveryMode == DeliveryMode.NON_PERSISTENT) || (deliveryMode == DeliveryMode.PERSISTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isValidPriority() {
|
||||||
|
return (msgPriority >= 0) && (msgPriority <= 9);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isValidTtl() {
|
||||||
|
return msgTtl >= 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isValidDeliveryDelay() {
|
||||||
|
return msgTtl >= 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isValidHeader() {
|
||||||
|
return isValidDeliveryMode()
|
||||||
|
&& isValidPriority()
|
||||||
|
&& isValidTtl()
|
||||||
|
&& isValidDeliveryDelay();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getInvalidJmsHeaderMsgText() {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
|
||||||
|
if (!isValidDeliveryMode())
|
||||||
|
sb.append("delivery mode - " + deliveryMode + "; ");
|
||||||
|
if (!isValidPriority())
|
||||||
|
sb.append("message priority - " + msgPriority + "; ");
|
||||||
|
if (!isValidTtl())
|
||||||
|
sb.append("message TTL - " + msgTtl + "; ");
|
||||||
|
if (!isValidDeliveryDelay())
|
||||||
|
sb.append("message delivery delay - " + msgDeliveryDelay + "; ");
|
||||||
|
|
||||||
|
String invalidMsgText = sb.toString();
|
||||||
|
if (StringUtils.length(invalidMsgText) > 0)
|
||||||
|
invalidMsgText = StringUtils.substringBeforeLast(invalidMsgText, ";");
|
||||||
|
else
|
||||||
|
invalidMsgText = "none";
|
||||||
|
|
||||||
|
return "Invalid JMS header values: " + invalidMsgText;
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,31 @@
|
|||||||
|
package io.nosqlbench.driver.jms.util;
|
||||||
|
|
||||||
|
import lombok.*;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
@Setter
|
||||||
|
@Getter
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class JmsHeaderLongFunc implements LongFunction {
|
||||||
|
private LongFunction<Integer> deliveryModeFunc;
|
||||||
|
private LongFunction<Integer> msgPriorityFunc;
|
||||||
|
private LongFunction<Long> msgTtlFunc;
|
||||||
|
private LongFunction<Long> msgDeliveryDelayFunc;
|
||||||
|
private LongFunction<Boolean> disableMsgTimestampFunc;
|
||||||
|
private LongFunction<Boolean> disableMsgIdFunc;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object apply(long value) {
|
||||||
|
return new JmsHeader(
|
||||||
|
(deliveryModeFunc != null) ? deliveryModeFunc.apply(value) : DeliveryMode.PERSISTENT,
|
||||||
|
(msgPriorityFunc != null) ? msgPriorityFunc.apply(value) : Message.DEFAULT_PRIORITY,
|
||||||
|
(msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_TIME_TO_LIVE,
|
||||||
|
(msgTtlFunc != null) ? msgTtlFunc.apply(value) : Message.DEFAULT_DELIVERY_DELAY,
|
||||||
|
(disableMsgTimestampFunc != null) ? disableMsgTimestampFunc.apply(value) : false,
|
||||||
|
(disableMsgIdFunc != null) ? disableMsgIdFunc.apply(value) : false
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,119 @@
|
|||||||
|
package io.nosqlbench.driver.jms.util;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Base64;
|
||||||
|
|
||||||
|
public class JmsUtil {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(JmsUtil.class);
|
||||||
|
|
||||||
|
// Supported JMS provider type
|
||||||
|
public enum JMS_PROVIDER_TYPES {
|
||||||
|
PULSAR("pulsar");
|
||||||
|
|
||||||
|
public final String label;
|
||||||
|
JMS_PROVIDER_TYPES(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static boolean isValidJmsProviderType(String type) {
|
||||||
|
return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||||
|
}
|
||||||
|
|
||||||
|
/////
|
||||||
|
// NB command line parameters
|
||||||
|
// - JMS provider type
|
||||||
|
public final static String JMS_PROVIDER_TYPE_KEY_STR = "provider_type";
|
||||||
|
|
||||||
|
/// Only applicable when the provider is "Pulsar"
|
||||||
|
// - Pulsar configuration properties file
|
||||||
|
public final static String JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR = "pulsar_cfg_file";
|
||||||
|
public final static String JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME = "pulsar_config.properties";
|
||||||
|
// - Pulsar web url
|
||||||
|
public final static String JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR = "web_url";
|
||||||
|
// - Pulsar service url
|
||||||
|
public final static String JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR = "service_url";
|
||||||
|
|
||||||
|
|
||||||
|
public final static String ASYNC_API_KEY_STR = "async_api";
|
||||||
|
public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type";
|
||||||
|
|
||||||
|
///// JMS Producer
|
||||||
|
// Supported JMS provider type
|
||||||
|
public enum JMS_MSG_HEADER_KEYS {
|
||||||
|
DELIVERY_MODE("jms_producer_header_msg_delivery_mode"),
|
||||||
|
PRIORITY("jms_producer_header_msg_priority"),
|
||||||
|
TTL("jms_producer_header_msg_ttl"),
|
||||||
|
DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"),
|
||||||
|
DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"),
|
||||||
|
DISABLE_ID("jms_producer_header_disable_msg_id");
|
||||||
|
|
||||||
|
public final String label;
|
||||||
|
JMS_MSG_HEADER_KEYS(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static boolean isValidJmsHeaderKey(String type) {
|
||||||
|
return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type));
|
||||||
|
}
|
||||||
|
public final static String JMS_PRODUCER_MSG_PROPERTY_KEY_STR = "jms_producer_msg_properties";
|
||||||
|
public final static String JMS_PRODUCER_MSG_BODY_KEY_STR = "msg_body";
|
||||||
|
|
||||||
|
///// JMS Consumer
|
||||||
|
public final static String JMS_CONSUMER_DURABLE_KEY_STR = "jms_consumer_msg_durable";
|
||||||
|
public final static String JMS_CONSUMER_SHARED_KEY_STR = "jms_consumer_msg_shared";
|
||||||
|
public final static String JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR = "jms_consumer_subscription";
|
||||||
|
public final static String JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR = "jms_consumer_msg_read_selector";
|
||||||
|
public final static String JMS_CONSUMER_MSG_NOLOCAL_KEY_STR = "jms_consumer_msg_nolocal";
|
||||||
|
public final static String JMS_CONSUMER_READ_TIMEOUT_KEY_STR = "jms_consumer_msg_read_timeout";
|
||||||
|
|
||||||
|
|
||||||
|
// Only applicable to Pulsar JMS provider
|
||||||
|
public final static String PULSAR_JMS_TOPIC_URI_KEY_STR = "pulsar_topic_uri";
|
||||||
|
|
||||||
|
// Supported message operation types
|
||||||
|
public enum OP_TYPES {
|
||||||
|
MSG_SEND("msg_send"),
|
||||||
|
MSG_READ("msg_read");
|
||||||
|
|
||||||
|
public final String label;
|
||||||
|
OP_TYPES(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static boolean isValidClientType(String type) {
|
||||||
|
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||||
|
}
|
||||||
|
|
||||||
|
// JMS Destination Types
|
||||||
|
public enum JMS_DESTINATION_TYPES {
|
||||||
|
QUEUE("queue"),
|
||||||
|
TOPIC("topic");
|
||||||
|
|
||||||
|
public final String label;
|
||||||
|
JMS_DESTINATION_TYPES(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static boolean isValidJmsDestinationType(String type) {
|
||||||
|
return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String encode(String... strings) {
|
||||||
|
StringBuilder stringBuilder = new StringBuilder();
|
||||||
|
for (String str : strings) {
|
||||||
|
if (!StringUtils.isBlank(str))
|
||||||
|
stringBuilder.append(str).append("::");
|
||||||
|
}
|
||||||
|
|
||||||
|
String concatenatedStr =
|
||||||
|
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
|
||||||
|
|
||||||
|
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,99 @@
|
|||||||
|
package io.nosqlbench.driver.jms.util;
|
||||||
|
|
||||||
|
import org.apache.commons.configuration2.Configuration;
|
||||||
|
import org.apache.commons.configuration2.FileBasedConfiguration;
|
||||||
|
import org.apache.commons.configuration2.PropertiesConfiguration;
|
||||||
|
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
||||||
|
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||||
|
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class PulsarConfig {
|
||||||
|
private final static Logger logger = LogManager.getLogger(PulsarConfig.class);
|
||||||
|
|
||||||
|
public static final String SCHEMA_CONF_PREFIX = "schema";
|
||||||
|
public static final String CLIENT_CONF_PREFIX = "client";
|
||||||
|
public static final String PRODUCER_CONF_PREFIX = "producer";
|
||||||
|
public static final String CONSUMER_CONF_PREFIX = "consumer";
|
||||||
|
|
||||||
|
private final Map<String, Object> schemaConfMap = new HashMap<>();
|
||||||
|
private final Map<String, Object> clientConfMap = new HashMap<>();
|
||||||
|
private final Map<String, Object> producerConfMap = new HashMap<>();
|
||||||
|
private final Map<String, Object> consumerConfMap = new HashMap<>();
|
||||||
|
|
||||||
|
public PulsarConfig(String fileName) {
|
||||||
|
File file = new File(fileName);
|
||||||
|
|
||||||
|
try {
|
||||||
|
String canonicalFilePath = file.getCanonicalPath();
|
||||||
|
|
||||||
|
Parameters params = new Parameters();
|
||||||
|
|
||||||
|
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
|
||||||
|
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
|
||||||
|
.configure(params.properties()
|
||||||
|
.setFileName(fileName));
|
||||||
|
|
||||||
|
Configuration config = builder.getConfiguration();
|
||||||
|
|
||||||
|
// Get schema specific configuration settings
|
||||||
|
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
|
||||||
|
String confKey = it.next();
|
||||||
|
String confVal = config.getProperty(confKey).toString();
|
||||||
|
if (!StringUtils.isBlank(confVal))
|
||||||
|
schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get client connection specific configuration settings
|
||||||
|
for (Iterator<String> it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
|
||||||
|
String confKey = it.next();
|
||||||
|
String confVal = config.getProperty(confKey).toString();
|
||||||
|
if (!StringUtils.isBlank(confVal))
|
||||||
|
clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get producer specific configuration settings
|
||||||
|
for (Iterator<String> it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
|
||||||
|
String confKey = it.next();
|
||||||
|
String confVal = config.getProperty(confKey).toString();
|
||||||
|
if (!StringUtils.isBlank(confVal))
|
||||||
|
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get consumer specific configuration settings
|
||||||
|
for (Iterator<String> it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) {
|
||||||
|
String confKey = it.next();
|
||||||
|
String confVal = config.getProperty(confKey).toString();
|
||||||
|
if (!StringUtils.isBlank(confVal))
|
||||||
|
consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
logger.error("Can't read the specified config properties file: " + fileName);
|
||||||
|
ioe.printStackTrace();
|
||||||
|
} catch (ConfigurationException cex) {
|
||||||
|
logger.error("Error loading configuration items from the specified config properties file: " + fileName);
|
||||||
|
cex.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getSchemaConfMap() {
|
||||||
|
return this.schemaConfMap;
|
||||||
|
}
|
||||||
|
public Map<String, Object> getClientConfMap() {
|
||||||
|
return this.clientConfMap;
|
||||||
|
}
|
||||||
|
public Map<String, Object> getProducerConfMap() {
|
||||||
|
return this.producerConfMap;
|
||||||
|
}
|
||||||
|
public Map<String, Object> getConsumerConfMap() {
|
||||||
|
return this.consumerConfMap;
|
||||||
|
}
|
||||||
|
}
|
1
driver-jms/src/main/resources/jms.md
Normal file
1
driver-jms/src/main/resources/jms.md
Normal file
@ -0,0 +1 @@
|
|||||||
|
# Overview
|
33
driver-jms/src/main/resources/pulsar_config.properties
Normal file
33
driver-jms/src/main/resources/pulsar_config.properties
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
### Schema related configurations - schema.xxx
|
||||||
|
# valid types:
|
||||||
|
# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
|
||||||
|
# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
|
||||||
|
# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
|
||||||
|
# avro, json, protobuf
|
||||||
|
#
|
||||||
|
# NOTE: for JMS client, Pulsar "schema" is NOT supported yet
|
||||||
|
schema.type=
|
||||||
|
schema.definition=
|
||||||
|
|
||||||
|
|
||||||
|
### Pulsar client related configurations - client.xxx
|
||||||
|
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
|
||||||
|
client.connectionTimeoutMs=5000
|
||||||
|
#client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
|
||||||
|
#client.authParams=
|
||||||
|
#client.tlsAllowInsecureConnection=true
|
||||||
|
client.numIoThreads=10
|
||||||
|
client.numListenerThreads=10
|
||||||
|
|
||||||
|
|
||||||
|
### Producer related configurations (global) - producer.xxx
|
||||||
|
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
|
||||||
|
producer.sendTimeoutMs=
|
||||||
|
producer.blockIfQueueFull=true
|
||||||
|
producer.maxPendingMessages=10000
|
||||||
|
producer.batchingMaxMessages=10000
|
||||||
|
|
||||||
|
|
||||||
|
### Consumer related configurations (global) - consumer.xxx
|
||||||
|
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
|
||||||
|
consumer.receiverQueueSize=2000
|
89
driver-jms/src/main/resources/pulsar_jms.yaml
Normal file
89
driver-jms/src/main/resources/pulsar_jms.yaml
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
bindings:
|
||||||
|
payload: NumberNameToString() #AlphaNumericString(20)
|
||||||
|
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
|
||||||
|
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
|
||||||
|
core_topic_name: Mod(5); ToString(); Prefix("t")
|
||||||
|
|
||||||
|
# document level parameters that apply to all Pulsar client types:
|
||||||
|
params:
|
||||||
|
### static only
|
||||||
|
async_api: "true"
|
||||||
|
|
||||||
|
### Static only
|
||||||
|
# Valid values: queue (point-to-point) or topic (pub-sub)
|
||||||
|
jms_desitation_type: "topic"
|
||||||
|
|
||||||
|
### Static Only
|
||||||
|
# NOTE: ONLY relevant when the JMS provider is Pulsar
|
||||||
|
#pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
|
||||||
|
#pulsar_topic_uri: "persistent://public/default/pt100"
|
||||||
|
#pulsar_topic_uri: "persistent://public/default/t0"
|
||||||
|
pulsar_topic_uri: "persistent://public/default/pt100_10"
|
||||||
|
#pulsar_topic_uri: "persistent://public/default/pt200_10"
|
||||||
|
#pulsar_topic_uri: "persistent://public/default/pt300_10"
|
||||||
|
#pulsar_topic_uri: "persistent://public/default/pt400_10"
|
||||||
|
|
||||||
|
blocks:
|
||||||
|
- name: "producer-block"
|
||||||
|
tags:
|
||||||
|
phase: "jms_producer"
|
||||||
|
statements:
|
||||||
|
- name: "s1"
|
||||||
|
optype: "msg_send"
|
||||||
|
|
||||||
|
### JMS PRODUCER message header
|
||||||
|
### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT
|
||||||
|
# - static or dynamic
|
||||||
|
# - Producer only
|
||||||
|
# Valid values: non-persistent(1), or persistent(2) - default
|
||||||
|
jms_producer_header_msg_delivery_mode: "2"
|
||||||
|
# Valid values: 0~9 (4 as default)
|
||||||
|
jms_producer_header_msg_priority: "4"
|
||||||
|
# Valid values: non-negative long; default 0 (never expires)
|
||||||
|
jms_producer_header_msg_ttl: "0"
|
||||||
|
# Valid values: non-negative long; default 0 (no delay)
|
||||||
|
jms_producer_header_msg_delivery_delay: "0"
|
||||||
|
# Valid values: true/false; default false (message timestamp is enabled)
|
||||||
|
jms_producer_header_disable_msg_timestamp: "false"
|
||||||
|
# Valid values: true/false; default false (message ID is enabled)
|
||||||
|
jms_producer_header_disable_msg_id: "false"
|
||||||
|
|
||||||
|
### JMS PRODUCER message properties
|
||||||
|
# - static only
|
||||||
|
# - Producer only
|
||||||
|
# - In format: "key1=value1;key2=value2;..."
|
||||||
|
jms_producer_msg_properties: "key1=value1;key2=value2"
|
||||||
|
|
||||||
|
### JMS PRODUCER message body
|
||||||
|
msg_body: "{payload}"
|
||||||
|
|
||||||
|
- name: "consumer-block"
|
||||||
|
tags:
|
||||||
|
phase: "jms_consumer"
|
||||||
|
statements:
|
||||||
|
- name: "s1"
|
||||||
|
optype: "msg_read"
|
||||||
|
|
||||||
|
### JMS CONSUMER durable and shared
|
||||||
|
jms_consumer_msg_durable: "true"
|
||||||
|
jms_consumer_msg_shared: "true"
|
||||||
|
|
||||||
|
### JMS CONSUMER subscription name
|
||||||
|
# - only relevant for durable consumer
|
||||||
|
jms_consumer_subscription: "mysub"
|
||||||
|
|
||||||
|
### JMS CONSUMER subscription name
|
||||||
|
# - only relevant for unshared consumer
|
||||||
|
jms_consumer_nolocal: "false"
|
||||||
|
|
||||||
|
### JMS CONSUMER message read timeout
|
||||||
|
# - unit: milliseconds
|
||||||
|
# - 0 means call blocks indefinitely
|
||||||
|
# - FIXME: 0 supposes to wait indefinitly; but
|
||||||
|
# it actually behaves like no wait at all
|
||||||
|
jms_consumer_msg_read_timeout: "10000"
|
||||||
|
|
||||||
|
### JMS CONSUMER message selector
|
||||||
|
# - empty string means no message selector
|
||||||
|
# - https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
|
||||||
|
jms_consumer_msg_read_selector: ""
|
@ -5,7 +5,6 @@ import org.apache.commons.configuration2.FileBasedConfiguration;
|
|||||||
import org.apache.commons.configuration2.PropertiesConfiguration;
|
import org.apache.commons.configuration2.PropertiesConfiguration;
|
||||||
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
||||||
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||||
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
|
|
||||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
@ -135,6 +135,12 @@
|
|||||||
<version>4.15.45-SNAPSHOT</version>
|
<version>4.15.45-SNAPSHOT</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.nosqlbench</groupId>
|
||||||
|
<artifactId>driver-jms</artifactId>
|
||||||
|
<version>4.15.45-SNAPSHOT</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
1
pom.xml
1
pom.xml
@ -50,6 +50,7 @@
|
|||||||
<module>driver-jdbc</module>
|
<module>driver-jdbc</module>
|
||||||
<module>driver-cockroachdb</module>
|
<module>driver-cockroachdb</module>
|
||||||
<module>driver-pulsar</module>
|
<module>driver-pulsar</module>
|
||||||
|
<module>driver-jms</module>
|
||||||
|
|
||||||
<!-- VIRTDATA MODULES -->
|
<!-- VIRTDATA MODULES -->
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user