Add support for JMS consumer

This commit is contained in:
yabinmeng-gitee 2021-05-04 15:51:20 -05:00
parent 94cbd9c22c
commit b1fe01e591
11 changed files with 476 additions and 168 deletions

View File

@ -62,7 +62,7 @@ public class JmsActivity extends SimpleActivity {
jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, activityDef);
}
PulsarConnectionFactory factory = null;
PulsarConnectionFactory factory;
if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
Map<String, Object> configuration = new HashMap<>();
configuration.put("webServiceUrl", ((JmsPulsarConnInfo)jmsConnInfo).getWebSvcUrl());
@ -101,9 +101,9 @@ public class JmsActivity extends SimpleActivity {
/**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/
public Destination getOrCreateJmsDestination(String jmsDestinationType, JmsHeader jmsHeader, String destName) {
public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) {
String encodedTopicStr =
JmsUtil.encode(jmsDestinationType, ("" + jmsHeader.getDeliveryMode()), destName);
JmsUtil.encode(jmsDestinationType, destName);
Destination destination = jmsDestinations.get(encodedTopicStr);
if ( destination == null ) {

View File

@ -31,8 +31,6 @@ abstract public class ReadyJmsOp implements OpDispenser<JmsOp> {
protected final String stmtOpType;
protected LongFunction<Boolean> asyncApiFunc;
protected LongFunction<String> jmsDestinationTypeFunc;
protected JmsHeaderLongFunc jmsHeaderLongFunc;
protected Map<String, Object> jmsMsgProperties = new HashMap<>();
protected final LongFunction<JmsOp> opFunc;
@ -57,6 +55,8 @@ abstract public class ReadyJmsOp implements OpDispenser<JmsOp> {
}
// Global/Doc-level parameter: jms_desitation_type
// - queue: point-to-point
// - topic: pub/sub
if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) {
jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR);
@ -65,97 +65,6 @@ abstract public class ReadyJmsOp implements OpDispenser<JmsOp> {
}
}
jmsHeaderLongFunc = new JmsHeaderLongFunc();
// JMS header: delivery mode
LongFunction<Integer> msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label));
}
else {
msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l));
}
}
jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc);
// JMS header: message priority
LongFunction<Integer> msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label));
}
else {
msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l));
}
}
jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc);
// JMS header: message TTL
LongFunction<Long> msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label));
}
else {
msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l));
}
}
jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc);
// JMS header: message delivery delay
LongFunction<Long> msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label));
}
else {
msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l));
}
}
jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc);
// JMS header: disable message timestamp
LongFunction<Boolean> disableMsgTimestampFunc = (l) -> false;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label));
}
else {
disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l));
}
}
jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc);
// JMS header: disable message ID
LongFunction<Boolean> disableMsgIdFunc = (l) -> false;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label));
}
else {
disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l));
}
}
jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc);
// JMS message properties
String jmsMsgPropertyListStr = "";
if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR);
} else {
throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!");
}
}
if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) {
jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";"))
.map(s -> s.split("=", 2))
.collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : null));
}
this.opFunc = resolveJms();
}

View File

@ -1,17 +1,27 @@
package io.nosqlbench.driver.jms;
import io.nosqlbench.driver.jms.ops.JmsMsgReadMapper;
import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper;
import io.nosqlbench.driver.jms.ops.JmsOp;
import io.nosqlbench.driver.jms.util.JmsHeader;
import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc;
import io.nosqlbench.driver.jms.util.JmsUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
public class ReadyPulsarJmsOp extends ReadyJmsOp {
public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) {
super(opTemplate, jmsActivity);
}
@ -27,25 +37,24 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp {
}
}
// Global: JMS destinaion
// Global: JMS destination
LongFunction<Destination> jmsDestinationFunc;
try {
LongFunction<String> finalTopicUriFunc = topicUriFunc;
jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination(
jmsDestinationTypeFunc.apply(l),
(JmsHeader) jmsHeaderLongFunc.apply(l),
finalTopicUriFunc.apply(l));
}
catch (JMSRuntimeException ex) {
throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!");
throw new RuntimeException("Unable to create JMS destination!");
}
if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(asyncApiFunc, jmsDestinationFunc);
} /*else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) {
return resolveMsgConsume(asyncApiFunc, jmsDestinationFunc);
} */ else {
throw new RuntimeException("Unsupported Pulsar operation type");
} else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) {
return resolveMsgRead(asyncApiFunc, jmsDestinationFunc);
} else {
throw new RuntimeException("Unsupported JMS operation type");
}
}
@ -53,12 +62,103 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp {
LongFunction<Boolean> async_api_func,
LongFunction<Destination> jmsDestinationFunc
) {
JmsHeaderLongFunc jmsHeaderLongFunc = new JmsHeaderLongFunc();
// JMS header: delivery mode
LongFunction<Integer> msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) {
msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label));
}
else {
msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l));
}
}
jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc);
// JMS header: message priority
LongFunction<Integer> msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) {
msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label));
}
else {
msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l));
}
}
jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc);
// JMS header: message TTL
LongFunction<Long> msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) {
msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label));
}
else {
msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l));
}
}
jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc);
// JMS header: message delivery delay
LongFunction<Long> msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) {
msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label));
}
else {
msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l));
}
}
jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc);
// JMS header: disable message timestamp
LongFunction<Boolean> disableMsgTimestampFunc = (l) -> false;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) {
disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label));
}
else {
disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l));
}
}
jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc);
// JMS header: disable message ID
LongFunction<Boolean> disableMsgIdFunc = (l) -> false;
if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) {
disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label));
}
else {
disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l));
}
}
jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc);
// JMS message properties
String jmsMsgPropertyListStr = "";
if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) {
jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR);
} else {
throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!");
}
}
Map<String, Object> jmsMsgProperties = new HashMap<>();
if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) {
jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";"))
.map(s -> s.split("=", 2))
.collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : ""));
}
LongFunction<String> msgBodyFunc;
if (cmdTpl.containsKey("msg_body")) {
if (cmdTpl.isStatic("msg_body")) {
msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body");
} else if (cmdTpl.isDynamic("msg_body")) {
msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l);
if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
msgBodyFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR);
} else if (cmdTpl.isDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) {
msgBodyFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR, l);
} else {
msgBodyFunc = (l) -> null;
}
@ -74,4 +174,75 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp {
jmsMsgProperties,
msgBodyFunc);
}
private LongFunction<JmsOp> resolveMsgRead(
LongFunction<Boolean> async_api_func,
LongFunction<Destination> jmsDestinationFunc
) {
// For Pulsar JMS, make "durable" as the default
LongFunction<Boolean> jmsConsumerDurableFunc = (l) -> true;
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR));
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) {
jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR, l));
}
}
LongFunction<Boolean> jmsConsumerSharedFunc = (l) -> true;
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR));
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) {
jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR, l));
}
}
LongFunction<String> jmsMsgSubscriptionFunc = (l) -> "";
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
jmsMsgSubscriptionFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR);
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) {
jmsMsgSubscriptionFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR, l);
}
}
LongFunction<String> jmsMsgReadSelectorFunc = (l) -> "";
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
jmsMsgReadSelectorFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR);
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) {
jmsMsgReadSelectorFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR, l);
}
}
LongFunction<Boolean> jmsMsgNoLocalFunc = (l) -> true;
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR));
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) {
jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR, l));
}
}
LongFunction<Long> jmsReadTimeoutFunc = (l) -> 0L;
if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR));
} else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) {
jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR, l));
}
}
return new JmsMsgReadMapper(
jmsActivity,
async_api_func,
jmsDestinationFunc,
jmsConsumerDurableFunc,
jmsConsumerSharedFunc,
jmsMsgSubscriptionFunc,
jmsMsgReadSelectorFunc,
jmsMsgNoLocalFunc,
jmsReadTimeoutFunc);
}
}

View File

@ -0,0 +1,72 @@
package io.nosqlbench.driver.jms.ops;
import io.nosqlbench.driver.jms.JmsActivity;
import javax.jms.Destination;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class JmsMsgReadMapper extends JmsOpMapper {
private final LongFunction<Boolean> jmsConsumerDurableFunc;
private final LongFunction<Boolean> jmsConsumerSharedFunc;
private final LongFunction<String> jmsMsgSubscriptionFunc;
private final LongFunction<String> jmsMsgReadSelectorFunc;
private final LongFunction<Boolean> jmsMsgNoLocalFunc;
private final LongFunction<Long> jmsReadTimeoutFunc;
public JmsMsgReadMapper(JmsActivity jmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc,
LongFunction<Boolean> jmsConsumerDurableFunc,
LongFunction<Boolean> jmsConsumerSharedFunc,
LongFunction<String> jmsMsgSubscriptionFunc,
LongFunction<String> jmsMsgReadSelectorFunc,
LongFunction<Boolean> jmsMsgNoLocalFunc,
LongFunction<Long> jmsReadTimeoutFunc) {
super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
this.jmsConsumerDurableFunc = jmsConsumerDurableFunc;
this.jmsConsumerSharedFunc = jmsConsumerSharedFunc;
this.jmsMsgSubscriptionFunc = jmsMsgSubscriptionFunc;
this.jmsMsgReadSelectorFunc = jmsMsgReadSelectorFunc;
this.jmsMsgNoLocalFunc = jmsMsgNoLocalFunc;
this.jmsReadTimeoutFunc = jmsReadTimeoutFunc;
}
@Override
public JmsOp apply(long value) {
boolean asyncApi = asyncApiFunc.apply(value);
Destination jmsDestination = jmsDestinationFunc.apply(value);
boolean jmsConsumerDurable = jmsConsumerDurableFunc.apply(value);
boolean jmsConsumerShared = jmsConsumerSharedFunc.apply(value);
String jmsMsgSubscription = jmsMsgSubscriptionFunc.apply(value);
String jmsMsgReadSelector = jmsMsgReadSelectorFunc.apply(value);
boolean jmsMsgNoLocal = jmsMsgNoLocalFunc.apply(value);
long jmsReadTimeout = jmsReadTimeoutFunc.apply(value);
// Default to NO read timeout
if (jmsReadTimeout < 0) jmsReadTimeout = 0;
return new JmsMsgReadOp(
jmsActivity,
asyncApi,
jmsDestination,
jmsConsumerDurable,
jmsConsumerShared,
jmsMsgSubscription,
jmsMsgReadSelector,
jmsMsgNoLocal,
jmsReadTimeout
);
}
}

View File

@ -0,0 +1,114 @@
package io.nosqlbench.driver.jms.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.jms.JmsActivity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
public class JmsMsgReadOp extends JmsTimeTrackOp {
private final static Logger logger = LogManager.getLogger(JmsMsgReadOp.class);
private final JmsActivity jmsActivity;
private final boolean asyncJmsOp;
private final Destination jmsDestination;
private final JMSContext jmsContext;
private final JMSConsumer jmsConsumer;
private final boolean jmsConsumerDurable;
private final boolean jmsConsumerShared;
private final String jmsMsgSubscrption;
private final String jmsMsgReadSelector;
private final boolean jmsMsgNoLocal;
private final long jmsReadTimeout;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public JmsMsgReadOp(JmsActivity jmsActivity,
boolean asyncJmsOp,
Destination jmsDestination,
boolean jmsConsumerDurable,
boolean jmsConsumerShared,
String jmsMsgSubscrption,
String jmsMsgReadSelector,
boolean jmsMsgNoLocal,
long jmsReadTimeout) {
this.jmsActivity = jmsActivity;
this.asyncJmsOp = asyncJmsOp;
this.jmsDestination = jmsDestination;
this.jmsConsumerDurable = jmsConsumerDurable;
this.jmsConsumerShared = jmsConsumerShared;
this.jmsMsgReadSelector = jmsMsgReadSelector;
this.jmsMsgSubscrption = jmsMsgSubscrption;
this.jmsMsgNoLocal = jmsMsgNoLocal;
this.jmsReadTimeout = jmsReadTimeout;
this.jmsContext = jmsActivity.getJmsContext();
this.jmsConsumer = createJmsConsumer();
this.bytesCounter = jmsActivity.getBytesCounter();
this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram();
}
private JMSConsumer createJmsConsumer() {
JMSConsumer jmsConsumer;
try {
if (jmsConsumerDurable) {
if (jmsConsumerShared)
jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
else
jmsConsumer = jmsContext.createDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector, jmsMsgNoLocal);
} else {
if (jmsConsumerShared)
jmsConsumer = jmsContext.createSharedConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector);
else
jmsConsumer = jmsContext.createConsumer(jmsDestination, jmsMsgReadSelector, jmsMsgNoLocal);
}
}
catch (InvalidDestinationRuntimeException invalidDestinationRuntimeException) {
throw new RuntimeException("Failed to create JMS consumer: invalid destination!");
}
catch (InvalidSelectorRuntimeException invalidSelectorRuntimeException) {
throw new RuntimeException("Failed to create JMS consumer: invalid message selector!");
}
catch (JMSRuntimeException jmsRuntimeException) {
jmsRuntimeException.printStackTrace();
throw new RuntimeException("Failed to create JMS consumer: runtime internal error!");
}
// TODO: async consumer
// if (this.asyncJmsOp) {
// jmsConsumer.setMessageListener();
// }
return jmsConsumer;
}
@Override
public void run() {
// FIXME: jmsReadTimeout being 0 behaves like receiveNoWait() instead of waiting indefinitley
Message receivedMsg = jmsConsumer.receive(jmsReadTimeout);
try {
if (receivedMsg != null) {
receivedMsg.acknowledge();
byte[] receivedMsgBody = receivedMsg.getBody(byte[].class);
if (logger.isDebugEnabled()) {
logger.debug("received msg-payload={}", new String(receivedMsgBody));
}
int messagesize = receivedMsgBody.length;
bytesCounter.inc(messagesize);
messagesizeHistogram.update(messagesize);
}
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException("Failed to acknowledge the received JMS message.");
}
}
}

View File

@ -19,6 +19,8 @@ import java.util.function.LongFunction;
* For additional parameterization, the command template is also provided.
*/
public class JmsMsgSendMapper extends JmsOpMapper {
private final JmsHeaderLongFunc jmsHeaderLongFunc;
private final Map<String, Object> jmsMsgProperties;
private final LongFunction<String> msgBodyFunc;
public JmsMsgSendMapper(JmsActivity jmsActivity,
@ -27,7 +29,10 @@ public class JmsMsgSendMapper extends JmsOpMapper {
JmsHeaderLongFunc jmsHeaderLongFunc,
Map<String, Object> jmsMsgProperties,
LongFunction<String> msgBodyFunc) {
super(jmsActivity, asyncApiFunc, jmsDestinationFunc, jmsHeaderLongFunc, jmsMsgProperties);
super(jmsActivity, asyncApiFunc, jmsDestinationFunc);
this.jmsHeaderLongFunc = jmsHeaderLongFunc;
this.jmsMsgProperties = jmsMsgProperties;
this.msgBodyFunc = msgBodyFunc;
}

View File

@ -45,7 +45,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp {
this.jmsMsgProperties = jmsMsgProperties;
this.msgBody = msgBody;
if (jmsHeader.isValidHeader()) {
if (!jmsHeader.isValidHeader()) {
throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText());
}
@ -62,6 +62,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp {
private JMSProducer createJmsProducer() {
JMSProducer jmsProducer = this.jmsContext.createProducer();
jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode());
jmsProducer.setPriority(this.jmsHeader.getMsgPriority());
jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay());
@ -73,9 +74,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp {
// jmsProducer.setAsync();
// }
Iterator<Map.Entry<String, Object>> itr = jmsMsgProperties.entrySet().iterator();
while(itr.hasNext()) {
Map.Entry<String, Object> entry = itr.next();
for (Map.Entry<String, Object> entry : jmsMsgProperties.entrySet()) {
jmsProducer.setProperty(entry.getKey(), entry.getValue());
}
@ -89,6 +88,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp {
byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8);
messageSize = msgBytes.length;
jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8));
messagesizeHistogram.update(messageSize);
bytesCounter.inc(messageSize);
}

View File

@ -11,19 +11,13 @@ public abstract class JmsOpMapper implements LongFunction<JmsOp> {
protected final JmsActivity jmsActivity;
protected final LongFunction<Boolean> asyncApiFunc;
protected final LongFunction<Destination> jmsDestinationFunc;
protected final JmsHeaderLongFunc jmsHeaderLongFunc;
protected final Map<String, Object> jmsMsgProperties;
public JmsOpMapper(JmsActivity jmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc,
JmsHeaderLongFunc jmsHeaderLongFunc,
Map<String, Object> jmsMsgProperties)
LongFunction<Destination> jmsDestinationFunc)
{
this.jmsActivity = jmsActivity;
this.asyncApiFunc = asyncApiFunc;
this.jmsDestinationFunc = jmsDestinationFunc;
this.jmsHeaderLongFunc = jmsHeaderLongFunc;
this.jmsMsgProperties = jmsMsgProperties;
}
}

View File

@ -15,7 +15,35 @@ public class JmsUtil {
public final static String JMS_PROVIDER_TYPE_KEY_STR = "jms_provider_type";
public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type";
///// JMS Producer
// Supported JMS provider type
public enum JMS_MSG_HEADER_KEYS {
DELIVERY_MODE("jms_producer_header_msg_delivery_mode"),
PRIORITY("jms_producer_header_msg_priority"),
TTL("jms_producer_header_msg_ttl"),
DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"),
DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"),
DISABLE_ID("jms_producer_header_disable_msg_id");
public final String label;
JMS_MSG_HEADER_KEYS(String label) {
this.label = label;
}
}
public static boolean isValidJmsHeaderKey(String type) {
return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type));
}
public final static String JMS_PRODUCER_MSG_PROPERTY_KEY_STR = "jms_producer_msg_properties";
public final static String JMS_PRODUCER_MSG_BODY_KEY_STR = "msg_body";
///// JMS Consumer
public final static String JMS_CONSUMER_DURABLE_KEY_STR = "jms_consumer_msg_durable";
public final static String JMS_CONSUMER_SHARED_KEY_STR = "jms_consumer_msg_shared";
public final static String JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR = "jms_consumer_subscription";
public final static String JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR = "jms_consumer_msg_read_selector";
public final static String JMS_CONSUMER_MSG_NOLOCAL_KEY_STR = "jms_consumer_msg_nolocal";
public final static String JMS_CONSUMER_READ_TIMEOUT_KEY_STR = "jms_consumer_msg_read_timeout";
// Only applicable to Pulsar JMS provider
public final static String PULSAR_JMS_TOPIC_URI_KEY_STR = "pulsar_topic_uri";
@ -61,24 +89,6 @@ public class JmsUtil {
return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
// Supported JMS provider type
public enum JMS_MSG_HEADER_KEYS {
DELIVERY_MODE("jms_producer_header_msg_delivery_mode"),
PRIORITY("jms_producer_header_msg_priority"),
TTL("jms_producer_header_msg_ttl"),
DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"),
DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"),
DISABLE_ID("jms_producer_header_disable_msg_id");
public final String label;
JMS_MSG_HEADER_KEYS(String label) {
this.label = label;
}
}
public static boolean isValidJmsHeaderKey(String type) {
return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type));
}
public static String encode(String... strings) {
StringBuilder stringBuilder = new StringBuilder();
for (String str : strings) {

View File

@ -0,0 +1 @@
# Overview

View File

@ -11,9 +11,22 @@ params:
### Static only
# Valid values: queue (point-to-point) or topic (pub-sub)
jms_desitation_type: "queue"
jms_desitation_type: "topic"
### JMS producer message header
### Static Only
# NOTE: ONLY relevant when the JMS provider is Pulsar
#pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
pulsar_topic_uri: "persistent://public/default/t0"
blocks:
- name: "producer-block"
tags:
phase: "jms_producer"
statements:
- name: "s1"
optype: "msg_send"
### JMS PRODUCER message header
### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT
# - static or dynamic
# - Producer only
@ -30,23 +43,42 @@ params:
# Valid values: true/false; default false (message ID is enabled)
jms_producer_header_disable_msg_id: "false"
### JMS producer message properties
### JMS PRODUCER message properties
# - static only
# - Producer only
# - In format: "key1=value1;key2=value2;..."
jms_producer_msg_properties: ""
jms_producer_msg_properties: "key1=value1;key2=value2"
### Static Only
# NOTE: ONLY relevant when the JMS provider is Pulsar
#pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
pulsar_topic_uri: "persistent://public/default/t0"
blocks:
- name: producer-block
tags:
phase: jms_producer
statements:
- name: s1
optype: msg_send
### JMS PRODUCER message body
msg_body: "{payload}"
- name: "consumer-block"
tags:
phase: "jms_consumer"
statements:
- name: "s1"
optype: "msg_read"
### JMS CONSUMER durable and shared
jms_consumer_msg_durable: "true"
jms_consumer_msg_shared: "true"
### JMS CONSUMER subscription name
# - only relevant for durable consumer
jms_consumer_subscription: "mysub"
### JMS CONSUMER subscription name
# - only relevant for unshared consumer
jms_consumer_nolocal: "false"
### JMS CONSUMER message read timeout
# - unit: milliseconds
# - 0 means call blocks indefinitely
# - FIXME: 0 supposes to wait indefinitly; but
# it actually behaves like no wait at all
jms_consumer_msg_read_timeout: "10000"
### JMS CONSUMER message selector
# - empty string means no message selector
# - https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
jms_consumer_msg_read_selector: ""