diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index ef4a9e4fc..c923be8a3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -401,11 +401,6 @@ public class PulsarSpace { SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); String consumerName = getEffectiveConsumerName(cycleConsumerName); - if ( subscriptionType.equals(SubscriptionType.Exclusive) && (activityDef.getThreads() > 1) ) { - throw new RuntimeException("Consumer:: trying to create multiple consumers of " + - "\"Exclusive\" subscription type under the same subscription name to the same topic!"); - } - if (StringUtils.isAnyBlank(cycleTopicName, subscriptionName)) { throw new RuntimeException("Consumer:: must specify a topic name and a subscription name"); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index 4177f5cd2..cae3afd42 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -31,6 +31,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { private final LongFunction> consumerFunc; private final LongFunction topicMsgDedupFunc; + private final LongFunction subscriptionTypeFunc; private final boolean e2eMsProc; public PulsarConsumerMapper(CommandTemplate cmdTpl, @@ -42,10 +43,12 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { LongFunction> transactionSupplierFunc, LongFunction topicMsgDedupFunc, LongFunction> consumerFunc, + LongFunction subscriptionTypeFunc, boolean e2eMsgProc) { super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc); this.consumerFunc = consumerFunc; this.topicMsgDedupFunc = topicMsgDedupFunc; + this.subscriptionTypeFunc = subscriptionTypeFunc; this.e2eMsProc = e2eMsgProc; } @@ -57,6 +60,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { boolean seqTracking = seqTrackingFunc.apply(value); Supplier transactionSupplier = transactionSupplierFunc.apply(value); boolean topicMsgDedup = topicMsgDedupFunc.apply(value); + String subscriptionType = subscriptionTypeFunc.apply(value); return new PulsarConsumerOp( pulsarActivity, @@ -66,6 +70,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { transactionSupplier, topicMsgDedup, consumer, + subscriptionType, clientSpace.getPulsarSchema(), clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), value, diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index cdf929b96..24f61acac 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -31,6 +31,7 @@ public class PulsarConsumerOp implements PulsarOp { private final boolean topicMsgDedup; private final Consumer consumer; + private final String subscriptionType; private final Schema pulsarSchema; private final int timeoutSeconds; private final boolean e2eMsgProc; @@ -54,6 +55,7 @@ public class PulsarConsumerOp implements PulsarOp { Supplier transactionSupplier, boolean topicMsgDedup, Consumer consumer, + String subscriptionType, Schema schema, int timeoutSeconds, long curCycleNum, @@ -68,6 +70,7 @@ public class PulsarConsumerOp implements PulsarOp { this.topicMsgDedup = topicMsgDedup; this.consumer = consumer; + this.subscriptionType = subscriptionType; this.pulsarSchema = schema; this.timeoutSeconds = timeoutSeconds; this.curCycleNum = curCycleNum; @@ -149,17 +152,28 @@ public class PulsarConsumerOp implements PulsarOp { // normal case: message sequence id is monotonically increasing by 1 if ((curMsgSeqId - prevMsgSeqId) != 1) { // abnormal case: out of ordering + // - for any subscription type, this check should always hold if (curMsgSeqId < prevMsgSeqId) { throw new PulsarMsgOutOfOrderException( false, curCycleNum, curMsgSeqId, prevMsgSeqId); } - // abnormal case: message loss - else if ((curMsgSeqId - prevMsgSeqId) > 1) { - throw new PulsarMsgLossException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); - } else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) { - throw new PulsarMsgDuplicateException( - false, curCycleNum, curMsgSeqId, prevMsgSeqId); + // - this sequence based message loss and message duplicate check can't be used for + // "Shared" subscription (ignore this check) + // - TODO: for Key_Shared subscription type, this logic needs to be improved on + // per-key basis + else { + if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType, + PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label, + PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) { + // abnormal case: message loss + if ((curMsgSeqId - prevMsgSeqId) > 1) { + throw new PulsarMsgLossException( + false, curCycleNum, curMsgSeqId, prevMsgSeqId); + } else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) { + throw new PulsarMsgDuplicateException( + false, curCycleNum, curMsgSeqId, prevMsgSeqId); + } + } } } } @@ -249,17 +263,28 @@ public class PulsarConsumerOp implements PulsarOp { // normal case: message sequence id is monotonically increasing by 1 if ((curMsgSeqId - prevMsgSeqId) != 1) { // abnormal case: out of ordering + // - for any subscription type, this check should always hold if (curMsgSeqId < prevMsgSeqId) { throw new PulsarMsgOutOfOrderException( - true, curCycleNum, curMsgSeqId, prevMsgSeqId); + false, curCycleNum, curMsgSeqId, prevMsgSeqId); } - // abnormal case: message loss - else if ((curMsgSeqId - prevMsgSeqId) > 1) { - throw new PulsarMsgLossException( - true, curCycleNum, curMsgSeqId, prevMsgSeqId); - } else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) { - throw new PulsarMsgDuplicateException( - true, curCycleNum, curMsgSeqId, prevMsgSeqId); + // - this sequence based message loss and message duplicate check can't be used for + // "Shared" subscription (ignore this check) + // - TODO: for Key_Shared subscription type, this logic needs to be improved on + // per-key basis + else { + if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType, + PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label, + PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) { + // abnormal case: message loss + if ((curMsgSeqId - prevMsgSeqId) > 1) { + throw new PulsarMsgLossException( + false, curCycleNum, curMsgSeqId, prevMsgSeqId); + } else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) { + throw new PulsarMsgDuplicateException( + false, curCycleNum, curMsgSeqId, prevMsgSeqId); + } + } } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 85b998162..9bdf63881 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -108,8 +108,8 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { // simulate message out of order else if ( simulateMsgOutofOrder ) { int rndmOffset = 2; - if (value > rndmOffset) - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-rndmOffset)); + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, + String.valueOf((value > rndmOffset) ? (value-rndmOffset) : value)); } // simulate message duplication else { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 226f44f49..de26f15cc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -497,6 +497,7 @@ public class ReadyPulsarOp implements OpDispenser { transactionSupplierFunc, topicMsgDedupFunc, consumerFunc, + subscription_type_func, e2eMsgProc); } @@ -586,6 +587,7 @@ public class ReadyPulsarOp implements OpDispenser { // e.g. pulsarAdmin.getPulsarAdmin().topics().getDeduplicationStatus(message.getTopicName()) brokerMsgDupFunc, mtConsumerFunc, + subscription_type_func, false); }