From b1fe01e59160db8d9fdf301c07df5f1b3fc45bbd Mon Sep 17 00:00:00 2001 From: yabinmeng-gitee Date: Tue, 4 May 2021 15:51:20 -0500 Subject: [PATCH] Add support for JMS consumer --- .../io/nosqlbench/driver/jms/JmsActivity.java | 6 +- .../io/nosqlbench/driver/jms/ReadyJmsOp.java | 95 +-------- .../driver/jms/ReadyPulsarJmsOp.java | 197 ++++++++++++++++-- .../driver/jms/ops/JmsMsgReadMapper.java | 72 +++++++ .../driver/jms/ops/JmsMsgReadOp.java | 114 ++++++++++ .../driver/jms/ops/JmsMsgSendMapper.java | 7 +- .../driver/jms/ops/JmsMsgSendOp.java | 8 +- .../driver/jms/ops/JmsOpMapper.java | 8 +- .../nosqlbench/driver/jms/util/JmsUtil.java | 46 ++-- driver-jms/src/main/resources/jms.md | 1 + .../src/main/resources/pulsar_jms_bytes.yaml | 90 +++++--- 11 files changed, 476 insertions(+), 168 deletions(-) create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java create mode 100644 driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java create mode 100644 driver-jms/src/main/resources/jms.md diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java index 4f5d12e18..4622210b1 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java @@ -62,7 +62,7 @@ public class JmsActivity extends SimpleActivity { jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, activityDef); } - PulsarConnectionFactory factory = null; + PulsarConnectionFactory factory; if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) { Map configuration = new HashMap<>(); configuration.put("webServiceUrl", ((JmsPulsarConnInfo)jmsConnInfo).getWebSvcUrl()); @@ -101,9 +101,9 @@ public class JmsActivity extends SimpleActivity { /** * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it */ - public Destination getOrCreateJmsDestination(String jmsDestinationType, JmsHeader jmsHeader, String destName) { + public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) { String encodedTopicStr = - JmsUtil.encode(jmsDestinationType, ("" + jmsHeader.getDeliveryMode()), destName); + JmsUtil.encode(jmsDestinationType, destName); Destination destination = jmsDestinations.get(encodedTopicStr); if ( destination == null ) { diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java index 8c1357bd3..70be982c8 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyJmsOp.java @@ -31,8 +31,6 @@ abstract public class ReadyJmsOp implements OpDispenser { protected final String stmtOpType; protected LongFunction asyncApiFunc; protected LongFunction jmsDestinationTypeFunc; - protected JmsHeaderLongFunc jmsHeaderLongFunc; - protected Map jmsMsgProperties = new HashMap<>(); protected final LongFunction opFunc; @@ -57,6 +55,8 @@ abstract public class ReadyJmsOp implements OpDispenser { } // Global/Doc-level parameter: jms_desitation_type + // - queue: point-to-point + // - topic: pub/sub if (cmdTpl.containsKey(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) { if (cmdTpl.isStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR)) { jmsDestinationTypeFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_DESTINATION_TYPE_KEY_STR); @@ -65,97 +65,6 @@ abstract public class ReadyJmsOp implements OpDispenser { } } - jmsHeaderLongFunc = new JmsHeaderLongFunc(); - - // JMS header: delivery mode - LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { - msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)); - } - else { - msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l)); - } - } - jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc); - - // JMS header: message priority - LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { - msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)); - } - else { - msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l)); - } - } - jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc); - - // JMS header: message TTL - LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { - msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)); - } - else { - msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l)); - } - } - jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc); - - // JMS header: message delivery delay - LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { - msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)); - } - else { - msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l)); - } - } - jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc); - - // JMS header: disable message timestamp - LongFunction disableMsgTimestampFunc = (l) -> false; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { - disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)); - } - else { - disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l)); - } - } - jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc); - - // JMS header: disable message ID - LongFunction disableMsgIdFunc = (l) -> false; - if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { - if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { - disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)); - } - else { - disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l)); - } - } - jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc); - - - // JMS message properties - String jmsMsgPropertyListStr = ""; - if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { - if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { - jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR); - } else { - throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!"); - } - } - - if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) { - jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";")) - .map(s -> s.split("=", 2)) - .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : null)); - } - this.opFunc = resolveJms(); } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java index 1753582fd..c6e8f431d 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ReadyPulsarJmsOp.java @@ -1,17 +1,27 @@ package io.nosqlbench.driver.jms; +import io.nosqlbench.driver.jms.ops.JmsMsgReadMapper; import io.nosqlbench.driver.jms.ops.JmsMsgSendMapper; import io.nosqlbench.driver.jms.ops.JmsOp; -import io.nosqlbench.driver.jms.util.JmsHeader; +import io.nosqlbench.driver.jms.util.JmsHeaderLongFunc; import io.nosqlbench.driver.jms.util.JmsUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; +import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSRuntimeException; +import javax.jms.Message; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.function.LongFunction; +import java.util.stream.Collectors; public class ReadyPulsarJmsOp extends ReadyJmsOp { + public ReadyPulsarJmsOp(OpTemplate opTemplate, JmsActivity jmsActivity) { super(opTemplate, jmsActivity); } @@ -27,25 +37,24 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp { } } - // Global: JMS destinaion + // Global: JMS destination LongFunction jmsDestinationFunc; try { LongFunction finalTopicUriFunc = topicUriFunc; jmsDestinationFunc = (l) -> jmsActivity.getOrCreateJmsDestination( jmsDestinationTypeFunc.apply(l), - (JmsHeader) jmsHeaderLongFunc.apply(l), finalTopicUriFunc.apply(l)); } catch (JMSRuntimeException ex) { - throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!"); + throw new RuntimeException("Unable to create JMS destination!"); } if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_SEND.label)) { return resolveMsgSend(asyncApiFunc, jmsDestinationFunc); - } /*else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) { - return resolveMsgConsume(asyncApiFunc, jmsDestinationFunc); - } */ else { - throw new RuntimeException("Unsupported Pulsar operation type"); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, JmsUtil.OP_TYPES.MSG_READ.label)) { + return resolveMsgRead(asyncApiFunc, jmsDestinationFunc); + } else { + throw new RuntimeException("Unsupported JMS operation type"); } } @@ -53,12 +62,103 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp { LongFunction async_api_func, LongFunction jmsDestinationFunc ) { + JmsHeaderLongFunc jmsHeaderLongFunc = new JmsHeaderLongFunc(); + + // JMS header: delivery mode + LongFunction msgDeliveryModeFunc = (l) -> DeliveryMode.PERSISTENT; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)) { + msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label)); + } + else { + msgDeliveryModeFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_MODE.label, l)); + } + } + jmsHeaderLongFunc.setDeliveryModeFunc(msgDeliveryModeFunc); + + // JMS header: message priority + LongFunction msgPriorityFunc = (l) -> Message.DEFAULT_PRIORITY; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)) { + msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label)); + } + else { + msgPriorityFunc = (l) -> NumberUtils.toInt(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.PRIORITY.label, l)); + } + } + jmsHeaderLongFunc.setMsgPriorityFunc(msgPriorityFunc); + + // JMS header: message TTL + LongFunction msgTtlFunc = (l) -> Message.DEFAULT_TIME_TO_LIVE; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)) { + msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label)); + } + else { + msgTtlFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.TTL.label, l)); + } + } + jmsHeaderLongFunc.setMsgTtlFunc(msgTtlFunc); + + // JMS header: message delivery delay + LongFunction msgDeliveryDelayFunc = (l) -> Message.DEFAULT_DELIVERY_DELAY; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)) { + msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label)); + } + else { + msgDeliveryDelayFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DELIVERY_DELAY.label, l)); + } + } + jmsHeaderLongFunc.setMsgDeliveryDelayFunc(msgDeliveryDelayFunc); + + // JMS header: disable message timestamp + LongFunction disableMsgTimestampFunc = (l) -> false; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)) { + disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label)); + } + else { + disableMsgTimestampFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_TIMESTAMP.label, l)); + } + } + jmsHeaderLongFunc.setDisableMsgTimestampFunc(disableMsgTimestampFunc); + + // JMS header: disable message ID + LongFunction disableMsgIdFunc = (l) -> false; + if (cmdTpl.containsKey(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { + if (cmdTpl.isStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)) { + disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label)); + } + else { + disableMsgIdFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_MSG_HEADER_KEYS.DISABLE_ID.label, l)); + } + } + jmsHeaderLongFunc.setDisableMsgIdFunc(disableMsgIdFunc); + + // JMS message properties + String jmsMsgPropertyListStr = ""; + if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR)) { + jmsMsgPropertyListStr = cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR); + } else { + throw new RuntimeException("\"" + JmsUtil.JMS_PRODUCER_MSG_PROPERTY_KEY_STR + "\" parameter cannot be dynamic!"); + } + } + + Map jmsMsgProperties = new HashMap<>(); + if ( !StringUtils.isEmpty(jmsMsgPropertyListStr) ) { + jmsMsgProperties = Arrays.stream(jmsMsgPropertyListStr.split(";")) + .map(s -> s.split("=", 2)) + .collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1] : "")); + } + LongFunction msgBodyFunc; - if (cmdTpl.containsKey("msg_body")) { - if (cmdTpl.isStatic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body"); - } else if (cmdTpl.isDynamic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l); + if (cmdTpl.containsKey(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) { + msgBodyFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR)) { + msgBodyFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_PRODUCER_MSG_BODY_KEY_STR, l); } else { msgBodyFunc = (l) -> null; } @@ -74,4 +174,75 @@ public class ReadyPulsarJmsOp extends ReadyJmsOp { jmsMsgProperties, msgBodyFunc); } + + private LongFunction resolveMsgRead( + LongFunction async_api_func, + LongFunction jmsDestinationFunc + ) { + // For Pulsar JMS, make "durable" as the default + LongFunction jmsConsumerDurableFunc = (l) -> true; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) { + jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR)) { + jmsConsumerDurableFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_DURABLE_KEY_STR, l)); + } + } + + LongFunction jmsConsumerSharedFunc = (l) -> true; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) { + jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR)) { + jmsConsumerSharedFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_SHARED_KEY_STR, l)); + } + } + + LongFunction jmsMsgSubscriptionFunc = (l) -> ""; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) { + jmsMsgSubscriptionFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR)) { + jmsMsgSubscriptionFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR, l); + } + } + + LongFunction jmsMsgReadSelectorFunc = (l) -> ""; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) { + jmsMsgReadSelectorFunc = (l) -> cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR)) { + jmsMsgReadSelectorFunc = (l) -> cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR, l); + } + } + + LongFunction jmsMsgNoLocalFunc = (l) -> true; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) { + jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR)) { + jmsMsgNoLocalFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_MSG_NOLOCAL_KEY_STR, l)); + } + } + + LongFunction jmsReadTimeoutFunc = (l) -> 0L; + if (cmdTpl.containsKey(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) { + if (cmdTpl.isStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) { + jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getStatic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)); + } else if (cmdTpl.isDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR)) { + jmsReadTimeoutFunc = (l) -> NumberUtils.toLong(cmdTpl.getDynamic(JmsUtil.JMS_CONSUMER_READ_TIMEOUT_KEY_STR, l)); + } + } + + return new JmsMsgReadMapper( + jmsActivity, + async_api_func, + jmsDestinationFunc, + jmsConsumerDurableFunc, + jmsConsumerSharedFunc, + jmsMsgSubscriptionFunc, + jmsMsgReadSelectorFunc, + jmsMsgNoLocalFunc, + jmsReadTimeoutFunc); + } } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java new file mode 100644 index 000000000..91a5ca5ba --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadMapper.java @@ -0,0 +1,72 @@ +package io.nosqlbench.driver.jms.ops; + +import io.nosqlbench.driver.jms.JmsActivity; + +import javax.jms.Destination; +import java.util.function.LongFunction; + +/** + * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains + * enough state to define a pulsar operation such that it can be executed, measured, and possibly + * retried if needed. + * + * This function doesn't act *as* the operation. It merely maps the construction logic into + * a simple functional type, given the component functions. + * + * For additional parameterization, the command template is also provided. + */ +public class JmsMsgReadMapper extends JmsOpMapper { + + private final LongFunction jmsConsumerDurableFunc; + private final LongFunction jmsConsumerSharedFunc; + private final LongFunction jmsMsgSubscriptionFunc; + private final LongFunction jmsMsgReadSelectorFunc; + private final LongFunction jmsMsgNoLocalFunc; + private final LongFunction jmsReadTimeoutFunc; + + public JmsMsgReadMapper(JmsActivity jmsActivity, + LongFunction asyncApiFunc, + LongFunction jmsDestinationFunc, + LongFunction jmsConsumerDurableFunc, + LongFunction jmsConsumerSharedFunc, + LongFunction jmsMsgSubscriptionFunc, + LongFunction jmsMsgReadSelectorFunc, + LongFunction jmsMsgNoLocalFunc, + LongFunction jmsReadTimeoutFunc) { + super(jmsActivity, asyncApiFunc, jmsDestinationFunc); + + this.jmsConsumerDurableFunc = jmsConsumerDurableFunc; + this.jmsConsumerSharedFunc = jmsConsumerSharedFunc; + this.jmsMsgSubscriptionFunc = jmsMsgSubscriptionFunc; + this.jmsMsgReadSelectorFunc = jmsMsgReadSelectorFunc; + this.jmsMsgNoLocalFunc = jmsMsgNoLocalFunc; + this.jmsReadTimeoutFunc = jmsReadTimeoutFunc; + } + + @Override + public JmsOp apply(long value) { + boolean asyncApi = asyncApiFunc.apply(value); + Destination jmsDestination = jmsDestinationFunc.apply(value); + boolean jmsConsumerDurable = jmsConsumerDurableFunc.apply(value); + boolean jmsConsumerShared = jmsConsumerSharedFunc.apply(value); + String jmsMsgSubscription = jmsMsgSubscriptionFunc.apply(value); + String jmsMsgReadSelector = jmsMsgReadSelectorFunc.apply(value); + boolean jmsMsgNoLocal = jmsMsgNoLocalFunc.apply(value); + long jmsReadTimeout = jmsReadTimeoutFunc.apply(value); + + // Default to NO read timeout + if (jmsReadTimeout < 0) jmsReadTimeout = 0; + + return new JmsMsgReadOp( + jmsActivity, + asyncApi, + jmsDestination, + jmsConsumerDurable, + jmsConsumerShared, + jmsMsgSubscription, + jmsMsgReadSelector, + jmsMsgNoLocal, + jmsReadTimeout + ); + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java new file mode 100644 index 000000000..e83ff826c --- /dev/null +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgReadOp.java @@ -0,0 +1,114 @@ +package io.nosqlbench.driver.jms.ops; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; +import io.nosqlbench.driver.jms.JmsActivity; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; + +public class JmsMsgReadOp extends JmsTimeTrackOp { + + private final static Logger logger = LogManager.getLogger(JmsMsgReadOp.class); + + private final JmsActivity jmsActivity; + private final boolean asyncJmsOp; + private final Destination jmsDestination; + + private final JMSContext jmsContext; + private final JMSConsumer jmsConsumer; + private final boolean jmsConsumerDurable; + private final boolean jmsConsumerShared; + private final String jmsMsgSubscrption; + private final String jmsMsgReadSelector; + private final boolean jmsMsgNoLocal; + private final long jmsReadTimeout; + + private final Counter bytesCounter; + private final Histogram messagesizeHistogram; + + public JmsMsgReadOp(JmsActivity jmsActivity, + boolean asyncJmsOp, + Destination jmsDestination, + boolean jmsConsumerDurable, + boolean jmsConsumerShared, + String jmsMsgSubscrption, + String jmsMsgReadSelector, + boolean jmsMsgNoLocal, + long jmsReadTimeout) { + this.jmsActivity = jmsActivity; + this.asyncJmsOp = asyncJmsOp; + this.jmsDestination = jmsDestination; + this.jmsConsumerDurable = jmsConsumerDurable; + this.jmsConsumerShared = jmsConsumerShared; + this.jmsMsgReadSelector = jmsMsgReadSelector; + this.jmsMsgSubscrption = jmsMsgSubscrption; + this.jmsMsgNoLocal = jmsMsgNoLocal; + this.jmsReadTimeout = jmsReadTimeout; + + this.jmsContext = jmsActivity.getJmsContext(); + this.jmsConsumer = createJmsConsumer(); + + this.bytesCounter = jmsActivity.getBytesCounter(); + this.messagesizeHistogram = jmsActivity.getMessagesizeHistogram(); + } + + private JMSConsumer createJmsConsumer() { + JMSConsumer jmsConsumer; + + try { + if (jmsConsumerDurable) { + if (jmsConsumerShared) + jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector); + else + jmsConsumer = jmsContext.createDurableConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector, jmsMsgNoLocal); + } else { + if (jmsConsumerShared) + jmsConsumer = jmsContext.createSharedConsumer((Topic) jmsDestination, jmsMsgSubscrption, jmsMsgReadSelector); + else + jmsConsumer = jmsContext.createConsumer(jmsDestination, jmsMsgReadSelector, jmsMsgNoLocal); + } + } + catch (InvalidDestinationRuntimeException invalidDestinationRuntimeException) { + throw new RuntimeException("Failed to create JMS consumer: invalid destination!"); + } + catch (InvalidSelectorRuntimeException invalidSelectorRuntimeException) { + throw new RuntimeException("Failed to create JMS consumer: invalid message selector!"); + } + catch (JMSRuntimeException jmsRuntimeException) { + jmsRuntimeException.printStackTrace(); + throw new RuntimeException("Failed to create JMS consumer: runtime internal error!"); + } + + // TODO: async consumer +// if (this.asyncJmsOp) { +// jmsConsumer.setMessageListener(); +// } + + return jmsConsumer; + } + + @Override + public void run() { + // FIXME: jmsReadTimeout being 0 behaves like receiveNoWait() instead of waiting indefinitley + Message receivedMsg = jmsConsumer.receive(jmsReadTimeout); + try { + if (receivedMsg != null) { + receivedMsg.acknowledge(); + byte[] receivedMsgBody = receivedMsg.getBody(byte[].class); + + if (logger.isDebugEnabled()) { + logger.debug("received msg-payload={}", new String(receivedMsgBody)); + } + + int messagesize = receivedMsgBody.length; + bytesCounter.inc(messagesize); + messagesizeHistogram.update(messagesize); + } + } catch (JMSException e) { + e.printStackTrace(); + throw new RuntimeException("Failed to acknowledge the received JMS message."); + } + } +} diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java index 206f2f184..1714c4212 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendMapper.java @@ -19,6 +19,8 @@ import java.util.function.LongFunction; * For additional parameterization, the command template is also provided. */ public class JmsMsgSendMapper extends JmsOpMapper { + private final JmsHeaderLongFunc jmsHeaderLongFunc; + private final Map jmsMsgProperties; private final LongFunction msgBodyFunc; public JmsMsgSendMapper(JmsActivity jmsActivity, @@ -27,7 +29,10 @@ public class JmsMsgSendMapper extends JmsOpMapper { JmsHeaderLongFunc jmsHeaderLongFunc, Map jmsMsgProperties, LongFunction msgBodyFunc) { - super(jmsActivity, asyncApiFunc, jmsDestinationFunc, jmsHeaderLongFunc, jmsMsgProperties); + super(jmsActivity, asyncApiFunc, jmsDestinationFunc); + + this.jmsHeaderLongFunc = jmsHeaderLongFunc; + this.jmsMsgProperties = jmsMsgProperties; this.msgBodyFunc = msgBodyFunc; } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java index f954c66dd..c0f935771 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsMsgSendOp.java @@ -45,7 +45,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { this.jmsMsgProperties = jmsMsgProperties; this.msgBody = msgBody; - if (jmsHeader.isValidHeader()) { + if (!jmsHeader.isValidHeader()) { throw new RuntimeException(jmsHeader.getInvalidJmsHeaderMsgText()); } @@ -62,6 +62,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { private JMSProducer createJmsProducer() { JMSProducer jmsProducer = this.jmsContext.createProducer(); + jmsProducer.setDeliveryMode(this.jmsHeader.getDeliveryMode()); jmsProducer.setPriority(this.jmsHeader.getMsgPriority()); jmsProducer.setDeliveryDelay(this.jmsHeader.getMsgDeliveryDelay()); @@ -73,9 +74,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { // jmsProducer.setAsync(); // } - Iterator> itr = jmsMsgProperties.entrySet().iterator(); - while(itr.hasNext()) { - Map.Entry entry = itr.next(); + for (Map.Entry entry : jmsMsgProperties.entrySet()) { jmsProducer.setProperty(entry.getKey(), entry.getValue()); } @@ -89,6 +88,7 @@ public class JmsMsgSendOp extends JmsTimeTrackOp { byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8); messageSize = msgBytes.length; jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8)); + messagesizeHistogram.update(messageSize); bytesCounter.inc(messageSize); } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java index c67cbd691..04dac1450 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/ops/JmsOpMapper.java @@ -11,19 +11,13 @@ public abstract class JmsOpMapper implements LongFunction { protected final JmsActivity jmsActivity; protected final LongFunction asyncApiFunc; protected final LongFunction jmsDestinationFunc; - protected final JmsHeaderLongFunc jmsHeaderLongFunc; - protected final Map jmsMsgProperties; public JmsOpMapper(JmsActivity jmsActivity, LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc, - JmsHeaderLongFunc jmsHeaderLongFunc, - Map jmsMsgProperties) + LongFunction jmsDestinationFunc) { this.jmsActivity = jmsActivity; this.asyncApiFunc = asyncApiFunc; this.jmsDestinationFunc = jmsDestinationFunc; - this.jmsHeaderLongFunc = jmsHeaderLongFunc; - this.jmsMsgProperties = jmsMsgProperties; } } diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java index be10f0f79..99fcd1b2e 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java @@ -15,7 +15,35 @@ public class JmsUtil { public final static String JMS_PROVIDER_TYPE_KEY_STR = "jms_provider_type"; public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type"; + ///// JMS Producer + // Supported JMS provider type + public enum JMS_MSG_HEADER_KEYS { + DELIVERY_MODE("jms_producer_header_msg_delivery_mode"), + PRIORITY("jms_producer_header_msg_priority"), + TTL("jms_producer_header_msg_ttl"), + DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"), + DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"), + DISABLE_ID("jms_producer_header_disable_msg_id"); + + public final String label; + JMS_MSG_HEADER_KEYS(String label) { + this.label = label; + } + } + public static boolean isValidJmsHeaderKey(String type) { + return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type)); + } public final static String JMS_PRODUCER_MSG_PROPERTY_KEY_STR = "jms_producer_msg_properties"; + public final static String JMS_PRODUCER_MSG_BODY_KEY_STR = "msg_body"; + + ///// JMS Consumer + public final static String JMS_CONSUMER_DURABLE_KEY_STR = "jms_consumer_msg_durable"; + public final static String JMS_CONSUMER_SHARED_KEY_STR = "jms_consumer_msg_shared"; + public final static String JMS_CONSUMER_MSG_SUBSCRIPTIOn_KEY_STR = "jms_consumer_subscription"; + public final static String JMS_CONSUMER_MSG_READ_SELECTOR_KEY_STR = "jms_consumer_msg_read_selector"; + public final static String JMS_CONSUMER_MSG_NOLOCAL_KEY_STR = "jms_consumer_msg_nolocal"; + public final static String JMS_CONSUMER_READ_TIMEOUT_KEY_STR = "jms_consumer_msg_read_timeout"; + // Only applicable to Pulsar JMS provider public final static String PULSAR_JMS_TOPIC_URI_KEY_STR = "pulsar_topic_uri"; @@ -61,24 +89,6 @@ public class JmsUtil { return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type)); } - // Supported JMS provider type - public enum JMS_MSG_HEADER_KEYS { - DELIVERY_MODE("jms_producer_header_msg_delivery_mode"), - PRIORITY("jms_producer_header_msg_priority"), - TTL("jms_producer_header_msg_ttl"), - DELIVERY_DELAY("jms_producer_header_msg_delivery_delay"), - DISABLE_TIMESTAMP("jms_producer_header_disable_msg_timestamp"), - DISABLE_ID("jms_producer_header_disable_msg_id"); - - public final String label; - JMS_MSG_HEADER_KEYS(String label) { - this.label = label; - } - } - public static boolean isValidJmsHeaderKey(String type) { - return Arrays.stream(JMS_MSG_HEADER_KEYS.values()).anyMatch(t -> t.label.equals(type)); - } - public static String encode(String... strings) { StringBuilder stringBuilder = new StringBuilder(); for (String str : strings) { diff --git a/driver-jms/src/main/resources/jms.md b/driver-jms/src/main/resources/jms.md new file mode 100644 index 000000000..07dd0c5c7 --- /dev/null +++ b/driver-jms/src/main/resources/jms.md @@ -0,0 +1 @@ +# Overview diff --git a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml index 21f3b788e..c69010d36 100644 --- a/driver-jms/src/main/resources/pulsar_jms_bytes.yaml +++ b/driver-jms/src/main/resources/pulsar_jms_bytes.yaml @@ -11,31 +11,7 @@ params: ### Static only # Valid values: queue (point-to-point) or topic (pub-sub) - jms_desitation_type: "queue" - - ### JMS producer message header - ### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT - # - static or dynamic - # - Producer only - # Valid values: non-persistent(1), or persistent(2) - default - jms_producer_header_msg_delivery_mode: "2" - # Valid values: 0~9 (4 as default) - jms_producer_header_msg_priority: "4" - # Valid values: non-negative long; default 0 (never expires) - jms_producer_header_msg_ttl: "0" - # Valid values: non-negative long; default 0 (no delay) - jms_producer_header_msg_delivery_delay: "0" - # Valid values: true/false; default false (message timestamp is enabled) - jms_producer_header_disable_msg_timestamp: "false" - # Valid values: true/false; default false (message ID is enabled) - jms_producer_header_disable_msg_id: "false" - - ### JMS producer message properties - # - static only - # - Producer only - # - In format: "key1=value1;key2=value2;..." - jms_producer_msg_properties: "" - + jms_desitation_type: "topic" ### Static Only # NOTE: ONLY relevant when the JMS provider is Pulsar @@ -43,10 +19,66 @@ params: pulsar_topic_uri: "persistent://public/default/t0" blocks: - - name: producer-block + - name: "producer-block" tags: - phase: jms_producer + phase: "jms_producer" statements: - - name: s1 - optype: msg_send + - name: "s1" + optype: "msg_send" + + ### JMS PRODUCER message header + ### https://docs.oracle.com/javaee/7/api/constant-values.html#javax.jms.DeliveryMode.NON_PERSISTENT + # - static or dynamic + # - Producer only + # Valid values: non-persistent(1), or persistent(2) - default + jms_producer_header_msg_delivery_mode: "2" + # Valid values: 0~9 (4 as default) + jms_producer_header_msg_priority: "4" + # Valid values: non-negative long; default 0 (never expires) + jms_producer_header_msg_ttl: "0" + # Valid values: non-negative long; default 0 (no delay) + jms_producer_header_msg_delivery_delay: "0" + # Valid values: true/false; default false (message timestamp is enabled) + jms_producer_header_disable_msg_timestamp: "false" + # Valid values: true/false; default false (message ID is enabled) + jms_producer_header_disable_msg_id: "false" + + ### JMS PRODUCER message properties + # - static only + # - Producer only + # - In format: "key1=value1;key2=value2;..." + jms_producer_msg_properties: "key1=value1;key2=value2" + + ### JMS PRODUCER message body msg_body: "{payload}" + + - name: "consumer-block" + tags: + phase: "jms_consumer" + statements: + - name: "s1" + optype: "msg_read" + + ### JMS CONSUMER durable and shared + jms_consumer_msg_durable: "true" + jms_consumer_msg_shared: "true" + + ### JMS CONSUMER subscription name + # - only relevant for durable consumer + jms_consumer_subscription: "mysub" + + ### JMS CONSUMER subscription name + # - only relevant for unshared consumer + jms_consumer_nolocal: "false" + + ### JMS CONSUMER message read timeout + # - unit: milliseconds + # - 0 means call blocks indefinitely + # - FIXME: 0 supposes to wait indefinitly; but + # it actually behaves like no wait at all + jms_consumer_msg_read_timeout: "10000" + + ### JMS CONSUMER message selector + # - empty string means no message selector + # - https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html + jms_consumer_msg_read_selector: ""