Merge pull request #362 from yabinmeng/main

Ignore message out-of-order and duplicate check for Shared and Key_Shared
This commit is contained in:
Jonathan Shook 2021-10-05 09:52:25 -05:00 committed by GitHub
commit 3b67498320
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 22 deletions

View File

@ -401,11 +401,6 @@ public class PulsarSpace {
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if ( subscriptionType.equals(SubscriptionType.Exclusive) && (activityDef.getThreads() > 1) ) {
throw new RuntimeException("Consumer:: trying to create multiple consumers of " +
"\"Exclusive\" subscription type under the same subscription name to the same topic!");
}
if (StringUtils.isAnyBlank(cycleTopicName, subscriptionName)) {
throw new RuntimeException("Consumer:: must specify a topic name and a subscription name");
}

View File

@ -31,6 +31,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<Boolean> topicMsgDedupFunc;
private final LongFunction<String> subscriptionTypeFunc;
private final boolean e2eMsProc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
@ -42,10 +43,12 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
LongFunction<Boolean> topicMsgDedupFunc,
LongFunction<Consumer<?>> consumerFunc,
LongFunction<String> subscriptionTypeFunc,
boolean e2eMsgProc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.consumerFunc = consumerFunc;
this.topicMsgDedupFunc = topicMsgDedupFunc;
this.subscriptionTypeFunc = subscriptionTypeFunc;
this.e2eMsProc = e2eMsgProc;
}
@ -57,6 +60,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
boolean seqTracking = seqTrackingFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
boolean topicMsgDedup = topicMsgDedupFunc.apply(value);
String subscriptionType = subscriptionTypeFunc.apply(value);
return new PulsarConsumerOp(
pulsarActivity,
@ -66,6 +70,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
transactionSupplier,
topicMsgDedup,
consumer,
subscriptionType,
clientSpace.getPulsarSchema(),
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
value,

View File

@ -31,6 +31,7 @@ public class PulsarConsumerOp implements PulsarOp {
private final boolean topicMsgDedup;
private final Consumer<?> consumer;
private final String subscriptionType;
private final Schema<?> pulsarSchema;
private final int timeoutSeconds;
private final boolean e2eMsgProc;
@ -54,6 +55,7 @@ public class PulsarConsumerOp implements PulsarOp {
Supplier<Transaction> transactionSupplier,
boolean topicMsgDedup,
Consumer<?> consumer,
String subscriptionType,
Schema<?> schema,
int timeoutSeconds,
long curCycleNum,
@ -68,6 +70,7 @@ public class PulsarConsumerOp implements PulsarOp {
this.topicMsgDedup = topicMsgDedup;
this.consumer = consumer;
this.subscriptionType = subscriptionType;
this.pulsarSchema = schema;
this.timeoutSeconds = timeoutSeconds;
this.curCycleNum = curCycleNum;
@ -149,17 +152,28 @@ public class PulsarConsumerOp implements PulsarOp {
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
// - for any subscription type, this check should always hold
if (curMsgSeqId < prevMsgSeqId) {
throw new PulsarMsgOutOfOrderException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// abnormal case: message loss
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
// - this sequence based message loss and message duplicate check can't be used for
// "Shared" subscription (ignore this check)
// - TODO: for Key_Shared subscription type, this logic needs to be improved on
// per-key basis
else {
if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) {
// abnormal case: message loss
if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
}
}
@ -249,17 +263,28 @@ public class PulsarConsumerOp implements PulsarOp {
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
// - for any subscription type, this check should always hold
if (curMsgSeqId < prevMsgSeqId) {
throw new PulsarMsgOutOfOrderException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// abnormal case: message loss
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
// - this sequence based message loss and message duplicate check can't be used for
// "Shared" subscription (ignore this check)
// - TODO: for Key_Shared subscription type, this logic needs to be improved on
// per-key basis
else {
if ( !StringUtils.equalsAnyIgnoreCase(subscriptionType,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label)) {
// abnormal case: message loss
if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
}
}

View File

@ -108,8 +108,8 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
// simulate message out of order
else if ( simulateMsgOutofOrder ) {
int rndmOffset = 2;
if (value > rndmOffset)
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-rndmOffset));
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID,
String.valueOf((value > rndmOffset) ? (value-rndmOffset) : value));
}
// simulate message duplication
else {

View File

@ -497,6 +497,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
transactionSupplierFunc,
topicMsgDedupFunc,
consumerFunc,
subscription_type_func,
e2eMsgProc);
}
@ -586,6 +587,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
// e.g. pulsarAdmin.getPulsarAdmin().topics().getDeduplicationStatus(message.getTopicName())
brokerMsgDupFunc,
mtConsumerFunc,
subscription_type_func,
false);
}