diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgDuplicateException.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgDuplicateException.java deleted file mode 100644 index 8847ee6a4..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgDuplicateException.java +++ /dev/null @@ -1,10 +0,0 @@ -package io.nosqlbench.driver.pulsar.exception; - -public class PulsarMsgDuplicateException extends RuntimeException { - - public PulsarMsgDuplicateException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) { - super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") + - " Detected duplicate message when message deduplication is enabled (curCycleNum=" + nbCycleNum + - ", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + ")."); - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgLossException.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgLossException.java deleted file mode 100644 index 268d0651f..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgLossException.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.nosqlbench.driver.pulsar.exception; - -public class PulsarMsgLossException extends RuntimeException { - - public PulsarMsgLossException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) { - super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") + - " Detected message sequence id gap (curCycleNum=" + nbCycleNum + - ", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " + - "Some published messages are not received!"); - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgOutOfOrderException.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgOutOfOrderException.java deleted file mode 100644 index 88812b493..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/exception/PulsarMsgOutOfOrderException.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.nosqlbench.driver.pulsar.exception; - -public class PulsarMsgOutOfOrderException extends RuntimeException { - - public PulsarMsgOutOfOrderException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) { - super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]" ) + - " Detected message ordering is not guaranteed (curCycleNum=" + nbCycleNum + - ", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " + - "Older messages are received earlier!"); - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/MessageSequenceNumberSendingHandler.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/MessageSequenceNumberSendingHandler.java new file mode 100644 index 000000000..a7d813de2 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/MessageSequenceNumberSendingHandler.java @@ -0,0 +1,87 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import java.util.*; +import org.apache.commons.lang3.RandomUtils; + +/** + * Handles adding a monotonic sequence number to message properties of sent messages + */ +class MessageSequenceNumberSendingHandler { + static final int SIMULATED_ERROR_PROBABILITY_PERCENTAGE = 10; + long number = 1; + Queue outOfOrderNumbers; + + public long getNextSequenceNumber(Set simulatedErrorTypes) { + return getNextSequenceNumber(simulatedErrorTypes, SIMULATED_ERROR_PROBABILITY_PERCENTAGE); + } + + long getNextSequenceNumber(Set simulatedErrorTypes, int errorProbabilityPercentage) { + simulateError(simulatedErrorTypes, errorProbabilityPercentage); + return nextNumber(); + } + + private void simulateError(Set simulatedErrorTypes, int errorProbabilityPercentage) { + if (!simulatedErrorTypes.isEmpty() && shouldSimulateError(errorProbabilityPercentage)) { + int selectIndex = 0; + int numberOfErrorTypes = simulatedErrorTypes.size(); + if (numberOfErrorTypes > 1) { + // pick one of the simulated error type randomly + selectIndex = RandomUtils.nextInt(0, numberOfErrorTypes); + } + PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream() + .skip(selectIndex) + .findFirst() + .get(); + switch (errorType) { + case OutOfOrder: + // simulate message out of order + injectMessagesOutOfOrder(); + break; + case MsgDup: + // simulate message duplication + injectMessageDuplication(); + break; + case MsgLoss: + // simulate message loss + injectMessageLoss(); + break; + } + } + } + + private boolean shouldSimulateError(int errorProbabilityPercentage) { + // Simulate error with the specified probability + return RandomUtils.nextInt(0, 100) < errorProbabilityPercentage; + } + + long nextNumber() { + if (outOfOrderNumbers != null) { + long nextNumber = outOfOrderNumbers.poll(); + if (outOfOrderNumbers.isEmpty()) { + outOfOrderNumbers = null; + } + return nextNumber; + } + return number++; + } + + void injectMessagesOutOfOrder() { + if (outOfOrderNumbers == null) { + outOfOrderNumbers = new ArrayDeque<>(Arrays.asList(number + 2, number, number + 1)); + number += 3; + } + } + + void injectMessageDuplication() { + if (outOfOrderNumbers == null) { + number--; + } + } + + void injectMessageLoss() { + if (outOfOrderNumbers == null) { + number++; + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index 25af5d8ac..e40da3ad4 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -3,6 +3,8 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; +import java.util.HashMap; +import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Consumer; @@ -30,14 +32,6 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { private final LongFunction subscriptionTypeFunc; private final boolean e2eMsProc; - // Used for message loss checking - private long prevMsgSeqId = -1; - - // Used for early quiting when there are message loss - // Otherwise, sync API may unblock unnecessarily - private long totalMsgLossCnt = 0; - private long maxMsgSeqToExpect = -1; - public PulsarConsumerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, PulsarActivity pulsarActivity, @@ -56,24 +50,9 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { this.e2eMsProc = e2eMsgProc; } - public long getPrevMsgSeqId() { return prevMsgSeqId; } - public void setPrevMsgSeqId(long prevMsgSeqId) { this.prevMsgSeqId = prevMsgSeqId; } - - public long getTotalMsgLossCnt() { return totalMsgLossCnt; } - public void setTotalMsgLossCnt(long totalMsgLossCnt) { this.totalMsgLossCnt = totalMsgLossCnt; } - - public long getMaxMsgSeqToExpect() { return maxMsgSeqToExpect; } - public void setMaxMsgSeqToExpect(long maxMsgSeqToExpect) { this.maxMsgSeqToExpect = maxMsgSeqToExpect; } - @Override public PulsarOp apply(long value) { boolean seqTracking = seqTrackingFunc.apply(value); - if ( seqTracking && (maxMsgSeqToExpect != -1) ) { - if ( (value + totalMsgLossCnt) > maxMsgSeqToExpect) { - return new PulsarConumerEmptyOp(pulsarActivity); - } - } - Consumer consumer = consumerFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); boolean useTransaction = useTransactionFunc.apply(value); @@ -94,6 +73,23 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { clientSpace.getPulsarSchema(), clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), value, - e2eMsProc); + e2eMsProc, + this::getReceivedMessageSequenceTracker); } + + + private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(String topicName) { + return receivedMessageSequenceTrackersForTopicThreadLocal.get() + .computeIfAbsent(topicName, k -> createReceivedMessageSequenceTracker()); + } + + private ReceivedMessageSequenceTracker createReceivedMessageSequenceTracker() { + return new ReceivedMessageSequenceTracker(pulsarActivity.getMsgErrOutOfSeqCounter(), + pulsarActivity.getMsgErrDuplicateCounter(), + pulsarActivity.getMsgErrLossCounter()); + } + + private ThreadLocal> receivedMessageSequenceTrackersForTopicThreadLocal = + ThreadLocal.withInitial(HashMap::new); + } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 7b2e5057d..cf5155c51 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -7,6 +7,7 @@ import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.exception.*; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -44,16 +45,7 @@ public class PulsarConsumerOp implements PulsarOp { // keep track of end-to-end message latency private final Histogram e2eMsgProcLatencyHistogram; - // message out-of-sequence error counter - private final Counter msgErrOutOfSeqCounter; - // message out-of-sequence error counter - private final Counter msgErrDuplicateCounter; - // message loss error counter - private final Counter msgErrLossCounter; - - // Used for message error tracking - private final boolean ignoreMsgLossCheck; - private final boolean ignoreMsgDupCheck; + private final Function receivedMessageSequenceTrackerForTopic; public PulsarConsumerOp( PulsarConsumerMapper consumerMapper, @@ -68,7 +60,8 @@ public class PulsarConsumerOp implements PulsarOp { Schema schema, int timeoutSeconds, long curCycleNum, - boolean e2eMsgProc) + boolean e2eMsgProc, + Function receivedMessageSequenceTrackerForTopic) { this.consumerMapper = consumerMapper; this.pulsarActivity = pulsarActivity; @@ -91,74 +84,16 @@ public class PulsarConsumerOp implements PulsarOp { this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer(); this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram(); - this.msgErrOutOfSeqCounter = pulsarActivity.getMsgErrOutOfSeqCounter(); - this.msgErrLossCounter = pulsarActivity.getMsgErrLossCounter(); - this.msgErrDuplicateCounter = pulsarActivity.getMsgErrDuplicateCounter(); - - // When message deduplication configuration is not enable, ignore message - // duplication check - this.ignoreMsgDupCheck = !this.topicMsgDedup; - - // Limitations of the message sequence based check: - // - For message out of sequence and message duplicate check, it works for - // all subscription types, including "Shared" and "Key_Shared" - // - For message loss, it doesn't work for "Shared" and "Key_Shared" - // subscription types - this.ignoreMsgLossCheck = - StringUtils.equalsAnyIgnoreCase(this.subscriptionType, - PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label, - PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label); + this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic; } private void checkAndUpdateMessageErrorCounter(Message message) { - long maxMsgSeqToExpect = consumerMapper.getMaxMsgSeqToExpect(); - if (maxMsgSeqToExpect == -1) { - String msgSeqTgtMaxStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX); - if (!StringUtils.isBlank(msgSeqTgtMaxStr)) { - consumerMapper.setMaxMsgSeqToExpect(Long.valueOf(msgSeqTgtMaxStr)); - } - } - - String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_ID); + String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_NUMBER); if ( !StringUtils.isBlank(msgSeqIdStr) ) { - long prevMsgSeqId = consumerMapper.getPrevMsgSeqId(); - long curMsgSeqId = Long.parseLong(msgSeqIdStr); - - // Skip out-of-sequence check on the first received message - // - This is because out-of-sequence check requires at least 2 - // received messages for comparison - if ( (prevMsgSeqId != -1) && (curMsgSeqId < prevMsgSeqId) ) { - msgErrOutOfSeqCounter.inc(); - } - - // Similarly, when message duplicate check is needed, we also - // skip the first received message. - if ( !ignoreMsgDupCheck && (prevMsgSeqId != -1) && (curMsgSeqId == prevMsgSeqId) ) { - msgErrDuplicateCounter.inc(); - } - - // Note that message loss could be happened anywhere, E.g. - // - published messages: 0,1,2,3,4,5 - // - message loss scenario: - // * scenario 1: first set of messages are lost - received 2,3,4 - // * scenario 2: messages in the middle are lost - received 0,1,3,4 - // * scenario 3: last set of messages are lost - received 0,1,2 - if ( !ignoreMsgLossCheck ) { - // This check covers message loss scenarios 1 and 2 - if ( (curMsgSeqId - prevMsgSeqId) > 1 ){ - // there could be multiple published messages lost between - // 2 received messages - long msgLostCnt = (curMsgSeqId - prevMsgSeqId) - 1; - msgErrLossCounter.inc(msgLostCnt); - consumerMapper.setTotalMsgLossCnt(consumerMapper.getTotalMsgLossCnt() + msgLostCnt); - } - - // TODO: how can we detect message loss scenario 3? - } - - prevMsgSeqId = curMsgSeqId; - consumerMapper.setPrevMsgSeqId(prevMsgSeqId); + long sequenceNumber = Long.parseLong(msgSeqIdStr); + ReceivedMessageSequenceTracker receivedMessageSequenceTracker = receivedMessageSequenceTrackerForTopic.apply(message.getTopicName()); + receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber); } } @@ -323,4 +258,5 @@ public class PulsarConsumerOp implements PulsarOp { } } } + } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConumerEmptyOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConumerEmptyOp.java deleted file mode 100644 index 255a81351..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConumerEmptyOp.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.nosqlbench.driver.pulsar.ops; - -import com.codahale.metrics.Counter; -import io.nosqlbench.driver.pulsar.PulsarActivity; - -public class PulsarConumerEmptyOp implements PulsarOp { - - private final PulsarActivity pulsarActivity; - - // message loss error counter - private final Counter msgErrLossCounter; - - public PulsarConumerEmptyOp(PulsarActivity pulsarActivity) { - this.pulsarActivity = pulsarActivity; - this.msgErrLossCounter = pulsarActivity.getMsgErrDuplicateCounter(); - } - - @Override - public void run(Runnable timeTracker) { - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerEmptyOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerEmptyOp.java deleted file mode 100644 index f94af3982..000000000 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerEmptyOp.java +++ /dev/null @@ -1,20 +0,0 @@ -package io.nosqlbench.driver.pulsar.ops; - -import io.nosqlbench.driver.pulsar.PulsarActivity; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class PulsarProducerEmptyOp implements PulsarOp { - - private final static Logger logger = LogManager.getLogger(PulsarProducerEmptyOp.class); - - private final PulsarActivity pulsarActivity; - - public PulsarProducerEmptyOp(PulsarActivity pulsarActivity) { - this.pulsarActivity = pulsarActivity; - } - - @Override - public void run(Runnable timeTracker) { - } -} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 594f40406..9c6459663 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -4,18 +4,17 @@ import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.templating.CommandTemplate; -import org.apache.commons.lang3.RandomUtils; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.function.LongFunction; +import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.transaction.Transaction; -import java.util.HashMap; -import java.util.Map; -import java.util.function.LongFunction; -import java.util.function.Supplier; - /** * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains * enough state to define a pulsar operation such that it can be executed, measured, and possibly @@ -31,13 +30,11 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class); private final LongFunction> producerFunc; - private final LongFunction seqErrSimuTypeFunc; + private final Set seqErrSimuTypes; private final LongFunction keyFunc; private final LongFunction propFunc; private final LongFunction payloadFunc; - private final long totalCycleCount; - public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, PulsarActivity pulsarActivity, @@ -46,48 +43,27 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { LongFunction seqTrackingFunc, LongFunction> transactionSupplierFunc, LongFunction> producerFunc, - LongFunction seqErrSimuTypeFunc, + Set seqErrSimuTypes, LongFunction keyFunc, LongFunction propFunc, LongFunction payloadFunc) { super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc); this.producerFunc = producerFunc; - this.seqErrSimuTypeFunc = seqErrSimuTypeFunc; + this.seqErrSimuTypes = seqErrSimuTypes; this.keyFunc = keyFunc; this.propFunc = propFunc; this.payloadFunc = payloadFunc; - - this.totalCycleCount = pulsarActivity.getActivityDef().getCycleCount(); } @Override public PulsarOp apply(long value) { boolean asyncApi = asyncApiFunc.apply(value); boolean useTransaction = useTransactionFunc.apply(value); - boolean seqTracking = seqTrackingFunc.apply(value); Supplier transactionSupplier = transactionSupplierFunc.apply(value); Producer producer = producerFunc.apply(value); - boolean lastMsg = (value == (totalCycleCount-1)); - - // Simulate error 10% of the time, but always ignore - // the last message - float rndVal = RandomUtils.nextFloat(0, 1.0f); - boolean simulationError = (!lastMsg) && ((rndVal >= 0) && (rndVal < 0.2f)); - - String seqErrSimuTypesStr = seqErrSimuTypeFunc.apply(value); - boolean simulateMsgOutofOrder = simulationError && - !StringUtils.isBlank(seqErrSimuTypesStr) && - StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label); - boolean simulateMsgLoss = simulationError && - !StringUtils.isBlank(seqErrSimuTypesStr) && - StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label); - boolean simulateMsgDup = simulationError && - !StringUtils.isBlank(seqErrSimuTypesStr) && - StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label); - String msgKey = keyFunc.apply(value); String msgPayload = payloadFunc.apply(value); @@ -107,47 +83,31 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper { } } - // Error simulation sequence: - // - message loss > message out of order > message duplication - if (!simulateMsgLoss) { - // Set message sequence tracking property - if (seqTracking) { - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX, - String.valueOf(pulsarActivity.getActivityDef().getCycleCount()-1)); - - // normal case - if (!simulateMsgOutofOrder && !simulateMsgDup) { - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value)); - } - else { - // simulate message out of order - if (simulateMsgOutofOrder) { - int rndmOffset = 2; - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, - String.valueOf((value > rndmOffset) ? (value - rndmOffset) : value)); - } - // simulate message duplication - else if (simulateMsgDup) { - msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value - 1)); - } - } - } - - return new PulsarProducerOp( - pulsarActivity, - asyncApi, - useTransaction, - transactionSupplier, - producer, - clientSpace.getPulsarSchema(), - msgKey, - msgProperties, - msgPayload); - } - else { - // Simulate message loss, but don't simulate the scenario where - // only the last set of message are lost - return new PulsarProducerEmptyOp(pulsarActivity); + boolean sequenceTrackingEnabled = seqTrackingFunc.apply(value); + if (sequenceTrackingEnabled) { + long nextSequenceNumber = getMessageSequenceNumberSendingHandler(producer.getTopic()) + .getNextSequenceNumber(seqErrSimuTypes); + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber)); } + + return new PulsarProducerOp( + pulsarActivity, + asyncApi, + useTransaction, + transactionSupplier, + producer, + clientSpace.getPulsarSchema(), + msgKey, + msgProperties, + msgPayload); } + + private MessageSequenceNumberSendingHandler getMessageSequenceNumberSendingHandler(String topicName) { + return MessageSequenceNumberSendingHandlersThreadLocal.get() + .computeIfAbsent(topicName, k -> new MessageSequenceNumberSendingHandler()); + } + + private ThreadLocal> MessageSequenceNumberSendingHandlersThreadLocal = + ThreadLocal.withInitial(HashMap::new); + } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 892d35f50..ecb14d8cf 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -7,6 +7,8 @@ import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.templating.CommandTemplate; +import java.util.*; +import java.util.stream.Collectors; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -16,9 +18,6 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.transaction.Transaction; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -352,10 +351,10 @@ public class ReadyPulsarOp implements OpDispenser { // check if we're going to simulate producer message out-of-sequence error // - message ordering // - message loss - LongFunction seqErrSimuTypeFunc = (l) -> null; + Set seqErrSimuTypes = Collections.emptySet(); if (cmdTpl.containsKey("seqerr_simu")) { if (cmdTpl.isStatic("seqerr_simu")) { - seqErrSimuTypeFunc = (l) -> cmdTpl.getStatic("seqerr_simu"); + seqErrSimuTypes = parseSimulatedErrorTypes(cmdTpl.getStatic("seqerr_simu")); } else { throw new PulsarDriverParamException("[resolveMsgSend()] \"seqerr_simu\" parameter cannot be dynamic!"); } @@ -403,12 +402,23 @@ public class ReadyPulsarOp implements OpDispenser { seqTrackingFunc, transactionSupplierFunc, producerFunc, - seqErrSimuTypeFunc, + seqErrSimuTypes, keyFunc, propFunc, valueFunc); } + private Set parseSimulatedErrorTypes(String sequenceErrorSimulatedTypeString) { + if (StringUtils.isBlank(sequenceErrorSimulatedTypeString)) { + return Collections.emptySet(); + } + return Arrays.stream(StringUtils.split(sequenceErrorSimulatedTypeString, ',')) + .map(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE::parseSimuType) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toSet()); + } + private LongFunction resolveMsgConsume( PulsarSpace clientSpace, LongFunction topic_uri_func, diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java new file mode 100644 index 000000000..48e0c0d5e --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java @@ -0,0 +1,113 @@ +package io.nosqlbench.driver.pulsar.ops; + +import com.codahale.metrics.Counter; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.TreeSet; + +/** + * Detects message loss, message duplication and out-of-order message delivery + * based on a monotonic sequence number that each received message contains. + * + * Out-of-order messages are detected with a maximum look behind of 20 sequence number entries. + * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}. + */ +class ReceivedMessageSequenceTracker implements AutoCloseable{ + public static final int MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 20; + // message out-of-sequence error counter + private final Counter msgErrOutOfSeqCounter; + // message out-of-sequence error counter + private final Counter msgErrDuplicateCounter; + // message loss error counter + private final Counter msgErrLossCounter; + long expectedNumber = -1; + + SortedSet pendingOutOfSeqNumbers = new TreeSet<>(); + + + ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) { + this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter; + this.msgErrDuplicateCounter = msgErrDuplicateCounter; + this.msgErrLossCounter = msgErrLossCounter; + } + + /** + * Notifies the tracker about a received sequence number + * + * @param sequenceNumber the sequence number of the received message + */ + public void sequenceNumberReceived(long sequenceNumber) { + if (expectedNumber == -1) { + expectedNumber = sequenceNumber + 1; + return; + } + + if (sequenceNumber < expectedNumber) { + msgErrDuplicateCounter.inc(); + return; + } + + boolean messagesSkipped = false; + if (sequenceNumber > expectedNumber) { + if (pendingOutOfSeqNumbers.size() == MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { + messagesSkipped = processEarliestPendingOutOfSequenceNumber(); + } + pendingOutOfSeqNumbers.add(sequenceNumber); + } else { + // sequenceNumber == expectedNumber + expectedNumber++; + } + processPendingOutOfSequenceNumbers(messagesSkipped); + cleanUpTooFarBehindOutOfSequenceNumbers(); + } + + private boolean processEarliestPendingOutOfSequenceNumber() { + // remove the earliest pending out of sequence number + Long earliestOutOfSeqNumber = pendingOutOfSeqNumbers.first(); + pendingOutOfSeqNumbers.remove(earliestOutOfSeqNumber); + if (earliestOutOfSeqNumber > expectedNumber) { + // skip the expected number ahead to the number after the earliest sequence number + // increment the counter with the amount of sequence numbers that got skipped + msgErrLossCounter.inc(earliestOutOfSeqNumber - expectedNumber); + expectedNumber = earliestOutOfSeqNumber + 1; + return true; + } else { + msgErrLossCounter.inc(); + } + return false; + } + + private void processPendingOutOfSequenceNumbers(boolean messagesSkipped) { + // check if there are previously received out-of-order sequence number that have been received + while (pendingOutOfSeqNumbers.remove(expectedNumber)) { + expectedNumber++; + if (!messagesSkipped) { + msgErrOutOfSeqCounter.inc(); + } + } + } + + private void cleanUpTooFarBehindOutOfSequenceNumbers() { + // remove sequence numbers that are too far behind + for (Iterator iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) { + Long number = iterator.next(); + if (number < expectedNumber - MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { + msgErrLossCounter.inc(); + iterator.remove(); + } else { + break; + } + } + } + + /** + * Handles the possible pending out of sequence numbers. Mainly needed in unit tests to assert the + * counter values. + */ + @Override + public void close() { + while (!pendingOutOfSeqNumbers.isEmpty()) { + processPendingOutOfSequenceNumbers(processEarliestPendingOutOfSequenceNumber()); + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index 56b0e5f64..d47a856f6 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar.util; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.*; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,9 +13,6 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Base64; -import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -48,8 +46,7 @@ public class PulsarActivityUtil { return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type)); } - public static final String MSG_SEQUENCE_ID = "sequence_id"; - public static final String MSG_SEQUENCE_TGTMAX = "sequence_tgtmax"; + public static final String MSG_SEQUENCE_NUMBER = "sequence_number"; /////// // Valid document level parameters for Pulsar NB yaml file @@ -314,6 +311,23 @@ public class PulsarActivityUtil { SEQ_ERROR_SIMU_TYPE(String label) { this.label = label; } + + private static final Map MAPPING = new HashMap<>(); + + static { + for (SEQ_ERROR_SIMU_TYPE simuType : values()) { + MAPPING.put(simuType.label, simuType); + MAPPING.put(simuType.label.toLowerCase(), simuType); + MAPPING.put(simuType.label.toUpperCase(), simuType); + MAPPING.put(simuType.name(), simuType); + MAPPING.put(simuType.name().toLowerCase(), simuType); + MAPPING.put(simuType.name().toUpperCase(), simuType); + } + } + + public static Optional parseSimuType(String simuTypeString) { + return Optional.ofNullable(MAPPING.get(simuTypeString.trim())); + } } public static boolean isValidSeqErrSimuType(String item) { return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item)); diff --git a/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/MessageSequenceNumberSendingHandlerTest.java b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/MessageSequenceNumberSendingHandlerTest.java new file mode 100644 index 000000000..ded7971d8 --- /dev/null +++ b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/MessageSequenceNumberSendingHandlerTest.java @@ -0,0 +1,74 @@ +package io.nosqlbench.driver.pulsar.ops; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import org.junit.jupiter.api.Test; + +class MessageSequenceNumberSendingHandlerTest { + MessageSequenceNumberSendingHandler sequenceNumberSendingHandler = new MessageSequenceNumberSendingHandler(); + + @Test + void shouldAddMonotonicSequence() { + for (long l = 1; l <= 100; l++) { + assertEquals(l, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + } + } + + @Test + void shouldInjectMessageLoss() { + assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss), 100)); + } + + @Test + void shouldInjectMessageDuplication() { + assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup), 100)); + } + + @Test + void shouldInjectMessageOutOfOrder() { + assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + assertEquals(4L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder), 100)); + assertEquals(2L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + assertEquals(5L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + assertEquals(6, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + } + + @Test + void shouldInjectOneOfTheSimulatedErrorsRandomly() { + Set allErrorTypes = new HashSet<>(Arrays.asList(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.values())); + + assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); + long previousSequenceNumber = 1L; + int outOfSequenceInjectionCounter = 0; + int messageDupCounter = 0; + int messageLossCounter = 0; + int successCounter = 0; + for (int i = 0; i < 1000; i++) { + long nextSequenceNumber = sequenceNumberSendingHandler.getNextSequenceNumber(allErrorTypes); + if (nextSequenceNumber >= previousSequenceNumber + 3) { + outOfSequenceInjectionCounter++; + } else if (nextSequenceNumber <= previousSequenceNumber) { + messageDupCounter++; + } else if (nextSequenceNumber >= previousSequenceNumber + 2) { + messageLossCounter++; + } else if (nextSequenceNumber == previousSequenceNumber + 1) { + successCounter++; + } + previousSequenceNumber = nextSequenceNumber; + } + assertTrue(outOfSequenceInjectionCounter > 0); + assertTrue(messageDupCounter > 0); + assertTrue(messageLossCounter > 0); + assertEquals(1000, outOfSequenceInjectionCounter + messageDupCounter + messageLossCounter + successCounter); + } + +} diff --git a/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java new file mode 100644 index 000000000..f79c29ebd --- /dev/null +++ b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java @@ -0,0 +1,152 @@ +package io.nosqlbench.driver.pulsar.ops; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.codahale.metrics.Counter; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +class ReceivedMessageSequenceTrackerTest { + Counter msgErrOutOfSeqCounter = new Counter(); + Counter msgErrDuplicateCounter = new Counter(); + Counter msgErrLossCounter = new Counter(); + ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter); + + @Test + void shouldCountersBeZeroWhenSequenceDoesntContainGaps() { + // when + for (long l = 0; l < 100L; l++) { + messageSequenceTracker.sequenceNumberReceived(l); + } + messageSequenceTracker.close(); + // then + assertEquals(0, msgErrOutOfSeqCounter.getCount()); + assertEquals(0, msgErrDuplicateCounter.getCount()); + assertEquals(0, msgErrLossCounter.getCount()); + } + + @ParameterizedTest + @ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L}) + void shouldDetectMsgLossWhenEverySecondMessageIsLost(long totalMessages) { + doShouldDetectMsgLoss(totalMessages, 2); + } + + @ParameterizedTest + @ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L}) + void shouldDetectMsgLossWhenEveryThirdMessageIsLost(long totalMessages) { + doShouldDetectMsgLoss(totalMessages, 3); + } + + @ParameterizedTest + @ValueSource(longs = {20L, 21L, 40L, 41L, 42L, 43L, 100L}) + void shouldDetectMsgLossWhenEvery21stMessageIsLost(long totalMessages) { + doShouldDetectMsgLoss(totalMessages, 21); + } + + private void doShouldDetectMsgLoss(long totalMessages, int looseEveryNthMessage) { + int messagesLost = 0; + // when + boolean lastMessageWasLost = false; + for (long l = 0; l < totalMessages; l++) { + if (l % looseEveryNthMessage == 1) { + messagesLost++; + lastMessageWasLost = true; + continue; + } else { + lastMessageWasLost = false; + } + messageSequenceTracker.sequenceNumberReceived(l); + } + if (lastMessageWasLost) { + messageSequenceTracker.sequenceNumberReceived(totalMessages); + } + messageSequenceTracker.close(); + // then + assertEquals(0, msgErrOutOfSeqCounter.getCount()); + assertEquals(0, msgErrDuplicateCounter.getCount()); + assertEquals(messagesLost, msgErrLossCounter.getCount()); + } + + @ParameterizedTest + @ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L}) + void shouldDetectMsgDuplication(long totalMessages) { + int messagesDuplicated = 0; + // when + for (long l = 0; l < totalMessages; l++) { + if (l % 2 == 1) { + messagesDuplicated++; + messageSequenceTracker.sequenceNumberReceived(l); + } + messageSequenceTracker.sequenceNumberReceived(l); + } + if (totalMessages % 2 == 0) { + messageSequenceTracker.sequenceNumberReceived(totalMessages); + } + if (totalMessages < 2 * ReceivedMessageSequenceTracker.MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { + messageSequenceTracker.close(); + } + + // then + assertEquals(0, msgErrOutOfSeqCounter.getCount()); + assertEquals(messagesDuplicated, msgErrDuplicateCounter.getCount()); + assertEquals(0, msgErrLossCounter.getCount()); + } + + @Test + void shouldDetectSingleMessageOutOfSequence() { + // when + for (long l = 0; l < 10L; l++) { + messageSequenceTracker.sequenceNumberReceived(l); + } + messageSequenceTracker.sequenceNumberReceived(10L); + messageSequenceTracker.sequenceNumberReceived(12L); + messageSequenceTracker.sequenceNumberReceived(11L); + for (long l = 13L; l < 100L; l++) { + messageSequenceTracker.sequenceNumberReceived(l); + } + + // then + assertEquals(1, msgErrOutOfSeqCounter.getCount()); + assertEquals(0, msgErrDuplicateCounter.getCount()); + assertEquals(0, msgErrLossCounter.getCount()); + } + + @Test + void shouldDetectMultipleMessagesOutOfSequence() { + // when + for (long l = 0; l < 10L; l++) { + messageSequenceTracker.sequenceNumberReceived(l); + } + messageSequenceTracker.sequenceNumberReceived(10L); + messageSequenceTracker.sequenceNumberReceived(14L); + messageSequenceTracker.sequenceNumberReceived(13L); + messageSequenceTracker.sequenceNumberReceived(11L); + messageSequenceTracker.sequenceNumberReceived(12L); + for (long l = 15L; l < 100L; l++) { + messageSequenceTracker.sequenceNumberReceived(l); + } + + // then + assertEquals(2, msgErrOutOfSeqCounter.getCount()); + assertEquals(0, msgErrDuplicateCounter.getCount()); + assertEquals(0, msgErrLossCounter.getCount()); + } + + @Test + void shouldDetectIndividualMessageLoss() { + // when + for (long l = 0; l < 100L; l++) { + if (l != 11L) { + messageSequenceTracker.sequenceNumberReceived(l); + } + } + messageSequenceTracker.close(); + + // then + assertEquals(0, msgErrOutOfSeqCounter.getCount()); + assertEquals(0, msgErrDuplicateCounter.getCount()); + assertEquals(1, msgErrLossCounter.getCount()); + } + +}