Merge pull request #368 from lhotari/lh-threadlocal-sequence-tracking

Pulsar driver: Use thread local and topic based sequence number tracking
This commit is contained in:
Jonathan Shook 2021-10-19 14:40:13 -05:00 committed by GitHub
commit bdb577f25d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 524 additions and 255 deletions

View File

@ -1,10 +0,0 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgDuplicateException extends RuntimeException {
public PulsarMsgDuplicateException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") +
" Detected duplicate message when message deduplication is enabled (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + ").");
}
}

View File

@ -1,11 +0,0 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgLossException extends RuntimeException {
public PulsarMsgLossException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") +
" Detected message sequence id gap (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Some published messages are not received!");
}
}

View File

@ -1,11 +0,0 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgOutOfOrderException extends RuntimeException {
public PulsarMsgOutOfOrderException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]" ) +
" Detected message ordering is not guaranteed (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Older messages are received earlier!");
}
}

View File

@ -0,0 +1,87 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import java.util.*;
import org.apache.commons.lang3.RandomUtils;
/**
* Handles adding a monotonic sequence number to message properties of sent messages
*/
class MessageSequenceNumberSendingHandler {
static final int SIMULATED_ERROR_PROBABILITY_PERCENTAGE = 10;
long number = 1;
Queue<Long> outOfOrderNumbers;
public long getNextSequenceNumber(Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes) {
return getNextSequenceNumber(simulatedErrorTypes, SIMULATED_ERROR_PROBABILITY_PERCENTAGE);
}
long getNextSequenceNumber(Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
simulateError(simulatedErrorTypes, errorProbabilityPercentage);
return nextNumber();
}
private void simulateError(Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
if (!simulatedErrorTypes.isEmpty() && shouldSimulateError(errorProbabilityPercentage)) {
int selectIndex = 0;
int numberOfErrorTypes = simulatedErrorTypes.size();
if (numberOfErrorTypes > 1) {
// pick one of the simulated error type randomly
selectIndex = RandomUtils.nextInt(0, numberOfErrorTypes);
}
PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream()
.skip(selectIndex)
.findFirst()
.get();
switch (errorType) {
case OutOfOrder:
// simulate message out of order
injectMessagesOutOfOrder();
break;
case MsgDup:
// simulate message duplication
injectMessageDuplication();
break;
case MsgLoss:
// simulate message loss
injectMessageLoss();
break;
}
}
}
private boolean shouldSimulateError(int errorProbabilityPercentage) {
// Simulate error with the specified probability
return RandomUtils.nextInt(0, 100) < errorProbabilityPercentage;
}
long nextNumber() {
if (outOfOrderNumbers != null) {
long nextNumber = outOfOrderNumbers.poll();
if (outOfOrderNumbers.isEmpty()) {
outOfOrderNumbers = null;
}
return nextNumber;
}
return number++;
}
void injectMessagesOutOfOrder() {
if (outOfOrderNumbers == null) {
outOfOrderNumbers = new ArrayDeque<>(Arrays.asList(number + 2, number, number + 1));
number += 3;
}
}
void injectMessageDuplication() {
if (outOfOrderNumbers == null) {
number--;
}
}
void injectMessageLoss() {
if (outOfOrderNumbers == null) {
number++;
}
}
}

View File

@ -3,6 +3,8 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
@ -30,14 +32,6 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final LongFunction<String> subscriptionTypeFunc;
private final boolean e2eMsProc;
// Used for message loss checking
private long prevMsgSeqId = -1;
// Used for early quiting when there are message loss
// Otherwise, sync API may unblock unnecessarily
private long totalMsgLossCnt = 0;
private long maxMsgSeqToExpect = -1;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
@ -56,24 +50,9 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
this.e2eMsProc = e2eMsgProc;
}
public long getPrevMsgSeqId() { return prevMsgSeqId; }
public void setPrevMsgSeqId(long prevMsgSeqId) { this.prevMsgSeqId = prevMsgSeqId; }
public long getTotalMsgLossCnt() { return totalMsgLossCnt; }
public void setTotalMsgLossCnt(long totalMsgLossCnt) { this.totalMsgLossCnt = totalMsgLossCnt; }
public long getMaxMsgSeqToExpect() { return maxMsgSeqToExpect; }
public void setMaxMsgSeqToExpect(long maxMsgSeqToExpect) { this.maxMsgSeqToExpect = maxMsgSeqToExpect; }
@Override
public PulsarOp apply(long value) {
boolean seqTracking = seqTrackingFunc.apply(value);
if ( seqTracking && (maxMsgSeqToExpect != -1) ) {
if ( (value + totalMsgLossCnt) > maxMsgSeqToExpect) {
return new PulsarConumerEmptyOp(pulsarActivity);
}
}
Consumer<?> consumer = consumerFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
boolean useTransaction = useTransactionFunc.apply(value);
@ -94,6 +73,23 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
clientSpace.getPulsarSchema(),
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
value,
e2eMsProc);
e2eMsProc,
this::getReceivedMessageSequenceTracker);
}
private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(String topicName) {
return receivedMessageSequenceTrackersForTopicThreadLocal.get()
.computeIfAbsent(topicName, k -> createReceivedMessageSequenceTracker());
}
private ReceivedMessageSequenceTracker createReceivedMessageSequenceTracker() {
return new ReceivedMessageSequenceTracker(pulsarActivity.getMsgErrOutOfSeqCounter(),
pulsarActivity.getMsgErrDuplicateCounter(),
pulsarActivity.getMsgErrLossCounter());
}
private ThreadLocal<Map<String, ReceivedMessageSequenceTracker>> receivedMessageSequenceTrackersForTopicThreadLocal =
ThreadLocal.withInitial(HashMap::new);
}

View File

@ -7,6 +7,7 @@ import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.*;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -44,16 +45,7 @@ public class PulsarConsumerOp implements PulsarOp {
// keep track of end-to-end message latency
private final Histogram e2eMsgProcLatencyHistogram;
// message out-of-sequence error counter
private final Counter msgErrOutOfSeqCounter;
// message out-of-sequence error counter
private final Counter msgErrDuplicateCounter;
// message loss error counter
private final Counter msgErrLossCounter;
// Used for message error tracking
private final boolean ignoreMsgLossCheck;
private final boolean ignoreMsgDupCheck;
private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
public PulsarConsumerOp(
PulsarConsumerMapper consumerMapper,
@ -68,7 +60,8 @@ public class PulsarConsumerOp implements PulsarOp {
Schema<?> schema,
int timeoutSeconds,
long curCycleNum,
boolean e2eMsgProc)
boolean e2eMsgProc,
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic)
{
this.consumerMapper = consumerMapper;
this.pulsarActivity = pulsarActivity;
@ -91,74 +84,16 @@ public class PulsarConsumerOp implements PulsarOp {
this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer();
this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram();
this.msgErrOutOfSeqCounter = pulsarActivity.getMsgErrOutOfSeqCounter();
this.msgErrLossCounter = pulsarActivity.getMsgErrLossCounter();
this.msgErrDuplicateCounter = pulsarActivity.getMsgErrDuplicateCounter();
// When message deduplication configuration is not enable, ignore message
// duplication check
this.ignoreMsgDupCheck = !this.topicMsgDedup;
// Limitations of the message sequence based check:
// - For message out of sequence and message duplicate check, it works for
// all subscription types, including "Shared" and "Key_Shared"
// - For message loss, it doesn't work for "Shared" and "Key_Shared"
// subscription types
this.ignoreMsgLossCheck =
StringUtils.equalsAnyIgnoreCase(this.subscriptionType,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Shared.label,
PulsarActivityUtil.SUBSCRIPTION_TYPE.Key_Shared.label);
this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic;
}
private void checkAndUpdateMessageErrorCounter(Message message) {
long maxMsgSeqToExpect = consumerMapper.getMaxMsgSeqToExpect();
if (maxMsgSeqToExpect == -1) {
String msgSeqTgtMaxStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX);
if (!StringUtils.isBlank(msgSeqTgtMaxStr)) {
consumerMapper.setMaxMsgSeqToExpect(Long.valueOf(msgSeqTgtMaxStr));
}
}
String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_ID);
String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_NUMBER);
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
long prevMsgSeqId = consumerMapper.getPrevMsgSeqId();
long curMsgSeqId = Long.parseLong(msgSeqIdStr);
// Skip out-of-sequence check on the first received message
// - This is because out-of-sequence check requires at least 2
// received messages for comparison
if ( (prevMsgSeqId != -1) && (curMsgSeqId < prevMsgSeqId) ) {
msgErrOutOfSeqCounter.inc();
}
// Similarly, when message duplicate check is needed, we also
// skip the first received message.
if ( !ignoreMsgDupCheck && (prevMsgSeqId != -1) && (curMsgSeqId == prevMsgSeqId) ) {
msgErrDuplicateCounter.inc();
}
// Note that message loss could be happened anywhere, E.g.
// - published messages: 0,1,2,3,4,5
// - message loss scenario:
// * scenario 1: first set of messages are lost - received 2,3,4
// * scenario 2: messages in the middle are lost - received 0,1,3,4
// * scenario 3: last set of messages are lost - received 0,1,2
if ( !ignoreMsgLossCheck ) {
// This check covers message loss scenarios 1 and 2
if ( (curMsgSeqId - prevMsgSeqId) > 1 ){
// there could be multiple published messages lost between
// 2 received messages
long msgLostCnt = (curMsgSeqId - prevMsgSeqId) - 1;
msgErrLossCounter.inc(msgLostCnt);
consumerMapper.setTotalMsgLossCnt(consumerMapper.getTotalMsgLossCnt() + msgLostCnt);
}
// TODO: how can we detect message loss scenario 3?
}
prevMsgSeqId = curMsgSeqId;
consumerMapper.setPrevMsgSeqId(prevMsgSeqId);
long sequenceNumber = Long.parseLong(msgSeqIdStr);
ReceivedMessageSequenceTracker receivedMessageSequenceTracker = receivedMessageSequenceTrackerForTopic.apply(message.getTopicName());
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
}
}
@ -323,4 +258,5 @@ public class PulsarConsumerOp implements PulsarOp {
}
}
}
}

View File

@ -1,21 +0,0 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import io.nosqlbench.driver.pulsar.PulsarActivity;
public class PulsarConumerEmptyOp implements PulsarOp {
private final PulsarActivity pulsarActivity;
// message loss error counter
private final Counter msgErrLossCounter;
public PulsarConumerEmptyOp(PulsarActivity pulsarActivity) {
this.pulsarActivity = pulsarActivity;
this.msgErrLossCounter = pulsarActivity.getMsgErrDuplicateCounter();
}
@Override
public void run(Runnable timeTracker) {
}
}

View File

@ -1,20 +0,0 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class PulsarProducerEmptyOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarProducerEmptyOp.class);
private final PulsarActivity pulsarActivity;
public PulsarProducerEmptyOp(PulsarActivity pulsarActivity) {
this.pulsarActivity = pulsarActivity;
}
@Override
public void run(Runnable timeTracker) {
}
}

View File

@ -4,18 +4,17 @@ import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.RandomUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.function.Supplier;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
@ -31,13 +30,11 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<String> seqErrSimuTypeFunc;
private final Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> seqErrSimuTypes;
private final LongFunction<String> keyFunc;
private final LongFunction<String> propFunc;
private final LongFunction<String> payloadFunc;
private final long totalCycleCount;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
@ -46,48 +43,27 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
LongFunction<Producer<?>> producerFunc,
LongFunction<String> seqErrSimuTypeFunc,
Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> seqErrSimuTypes,
LongFunction<String> keyFunc,
LongFunction<String> propFunc,
LongFunction<String> payloadFunc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.producerFunc = producerFunc;
this.seqErrSimuTypeFunc = seqErrSimuTypeFunc;
this.seqErrSimuTypes = seqErrSimuTypes;
this.keyFunc = keyFunc;
this.propFunc = propFunc;
this.payloadFunc = payloadFunc;
this.totalCycleCount = pulsarActivity.getActivityDef().getCycleCount();
}
@Override
public PulsarOp apply(long value) {
boolean asyncApi = asyncApiFunc.apply(value);
boolean useTransaction = useTransactionFunc.apply(value);
boolean seqTracking = seqTrackingFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
Producer<?> producer = producerFunc.apply(value);
boolean lastMsg = (value == (totalCycleCount-1));
// Simulate error 10% of the time, but always ignore
// the last message
float rndVal = RandomUtils.nextFloat(0, 1.0f);
boolean simulationError = (!lastMsg) && ((rndVal >= 0) && (rndVal < 0.2f));
String seqErrSimuTypesStr = seqErrSimuTypeFunc.apply(value);
boolean simulateMsgOutofOrder = simulationError &&
!StringUtils.isBlank(seqErrSimuTypesStr) &&
StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label);
boolean simulateMsgLoss = simulationError &&
!StringUtils.isBlank(seqErrSimuTypesStr) &&
StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label);
boolean simulateMsgDup = simulationError &&
!StringUtils.isBlank(seqErrSimuTypesStr) &&
StringUtils.containsIgnoreCase(seqErrSimuTypesStr, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label);
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
@ -107,47 +83,31 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
}
}
// Error simulation sequence:
// - message loss > message out of order > message duplication
if (!simulateMsgLoss) {
// Set message sequence tracking property
if (seqTracking) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_TGTMAX,
String.valueOf(pulsarActivity.getActivityDef().getCycleCount()-1));
// normal case
if (!simulateMsgOutofOrder && !simulateMsgDup) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value));
}
else {
// simulate message out of order
if (simulateMsgOutofOrder) {
int rndmOffset = 2;
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID,
String.valueOf((value > rndmOffset) ? (value - rndmOffset) : value));
}
// simulate message duplication
else if (simulateMsgDup) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value - 1));
}
}
}
return new PulsarProducerOp(
pulsarActivity,
asyncApi,
useTransaction,
transactionSupplier,
producer,
clientSpace.getPulsarSchema(),
msgKey,
msgProperties,
msgPayload);
}
else {
// Simulate message loss, but don't simulate the scenario where
// only the last set of message are lost
return new PulsarProducerEmptyOp(pulsarActivity);
boolean sequenceTrackingEnabled = seqTrackingFunc.apply(value);
if (sequenceTrackingEnabled) {
long nextSequenceNumber = getMessageSequenceNumberSendingHandler(producer.getTopic())
.getNextSequenceNumber(seqErrSimuTypes);
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber));
}
return new PulsarProducerOp(
pulsarActivity,
asyncApi,
useTransaction,
transactionSupplier,
producer,
clientSpace.getPulsarSchema(),
msgKey,
msgProperties,
msgPayload);
}
private MessageSequenceNumberSendingHandler getMessageSequenceNumberSendingHandler(String topicName) {
return MessageSequenceNumberSendingHandlersThreadLocal.get()
.computeIfAbsent(topicName, k -> new MessageSequenceNumberSendingHandler());
}
private ThreadLocal<Map<String, MessageSequenceNumberSendingHandler>> MessageSequenceNumberSendingHandlersThreadLocal =
ThreadLocal.withInitial(HashMap::new);
}

View File

@ -7,6 +7,8 @@ import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@ -16,9 +18,6 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@ -352,10 +351,10 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
// check if we're going to simulate producer message out-of-sequence error
// - message ordering
// - message loss
LongFunction<String> seqErrSimuTypeFunc = (l) -> null;
Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> seqErrSimuTypes = Collections.emptySet();
if (cmdTpl.containsKey("seqerr_simu")) {
if (cmdTpl.isStatic("seqerr_simu")) {
seqErrSimuTypeFunc = (l) -> cmdTpl.getStatic("seqerr_simu");
seqErrSimuTypes = parseSimulatedErrorTypes(cmdTpl.getStatic("seqerr_simu"));
} else {
throw new PulsarDriverParamException("[resolveMsgSend()] \"seqerr_simu\" parameter cannot be dynamic!");
}
@ -403,12 +402,23 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
seqTrackingFunc,
transactionSupplierFunc,
producerFunc,
seqErrSimuTypeFunc,
seqErrSimuTypes,
keyFunc,
propFunc,
valueFunc);
}
private Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> parseSimulatedErrorTypes(String sequenceErrorSimulatedTypeString) {
if (StringUtils.isBlank(sequenceErrorSimulatedTypeString)) {
return Collections.emptySet();
}
return Arrays.stream(StringUtils.split(sequenceErrorSimulatedTypeString, ','))
.map(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE::parseSimuType)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
}
private LongFunction<PulsarOp> resolveMsgConsume(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,

View File

@ -0,0 +1,113 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* Detects message loss, message duplication and out-of-order message delivery
* based on a monotonic sequence number that each received message contains.
*
* Out-of-order messages are detected with a maximum look behind of 20 sequence number entries.
* This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}.
*/
class ReceivedMessageSequenceTracker implements AutoCloseable{
public static final int MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 20;
// message out-of-sequence error counter
private final Counter msgErrOutOfSeqCounter;
// message out-of-sequence error counter
private final Counter msgErrDuplicateCounter;
// message loss error counter
private final Counter msgErrLossCounter;
long expectedNumber = -1;
SortedSet<Long> pendingOutOfSeqNumbers = new TreeSet<>();
ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
this.msgErrDuplicateCounter = msgErrDuplicateCounter;
this.msgErrLossCounter = msgErrLossCounter;
}
/**
* Notifies the tracker about a received sequence number
*
* @param sequenceNumber the sequence number of the received message
*/
public void sequenceNumberReceived(long sequenceNumber) {
if (expectedNumber == -1) {
expectedNumber = sequenceNumber + 1;
return;
}
if (sequenceNumber < expectedNumber) {
msgErrDuplicateCounter.inc();
return;
}
boolean messagesSkipped = false;
if (sequenceNumber > expectedNumber) {
if (pendingOutOfSeqNumbers.size() == MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
messagesSkipped = processEarliestPendingOutOfSequenceNumber();
}
pendingOutOfSeqNumbers.add(sequenceNumber);
} else {
// sequenceNumber == expectedNumber
expectedNumber++;
}
processPendingOutOfSequenceNumbers(messagesSkipped);
cleanUpTooFarBehindOutOfSequenceNumbers();
}
private boolean processEarliestPendingOutOfSequenceNumber() {
// remove the earliest pending out of sequence number
Long earliestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
pendingOutOfSeqNumbers.remove(earliestOutOfSeqNumber);
if (earliestOutOfSeqNumber > expectedNumber) {
// skip the expected number ahead to the number after the earliest sequence number
// increment the counter with the amount of sequence numbers that got skipped
msgErrLossCounter.inc(earliestOutOfSeqNumber - expectedNumber);
expectedNumber = earliestOutOfSeqNumber + 1;
return true;
} else {
msgErrLossCounter.inc();
}
return false;
}
private void processPendingOutOfSequenceNumbers(boolean messagesSkipped) {
// check if there are previously received out-of-order sequence number that have been received
while (pendingOutOfSeqNumbers.remove(expectedNumber)) {
expectedNumber++;
if (!messagesSkipped) {
msgErrOutOfSeqCounter.inc();
}
}
}
private void cleanUpTooFarBehindOutOfSequenceNumbers() {
// remove sequence numbers that are too far behind
for (Iterator<Long> iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) {
Long number = iterator.next();
if (number < expectedNumber - MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
msgErrLossCounter.inc();
iterator.remove();
} else {
break;
}
}
}
/**
* Handles the possible pending out of sequence numbers. Mainly needed in unit tests to assert the
* counter values.
*/
@Override
public void close() {
while (!pendingOutOfSeqNumbers.isEmpty()) {
processPendingOutOfSequenceNumbers(processEarliestPendingOutOfSequenceNumber());
}
}
}

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -12,9 +13,6 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -48,8 +46,7 @@ public class PulsarActivityUtil {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static final String MSG_SEQUENCE_ID = "sequence_id";
public static final String MSG_SEQUENCE_TGTMAX = "sequence_tgtmax";
public static final String MSG_SEQUENCE_NUMBER = "sequence_number";
///////
// Valid document level parameters for Pulsar NB yaml file
@ -314,6 +311,23 @@ public class PulsarActivityUtil {
SEQ_ERROR_SIMU_TYPE(String label) {
this.label = label;
}
private static final Map<String, SEQ_ERROR_SIMU_TYPE> MAPPING = new HashMap<>();
static {
for (SEQ_ERROR_SIMU_TYPE simuType : values()) {
MAPPING.put(simuType.label, simuType);
MAPPING.put(simuType.label.toLowerCase(), simuType);
MAPPING.put(simuType.label.toUpperCase(), simuType);
MAPPING.put(simuType.name(), simuType);
MAPPING.put(simuType.name().toLowerCase(), simuType);
MAPPING.put(simuType.name().toUpperCase(), simuType);
}
}
public static Optional<SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
}
}
public static boolean isValidSeqErrSimuType(String item) {
return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));

View File

@ -0,0 +1,74 @@
package io.nosqlbench.driver.pulsar.ops;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.junit.jupiter.api.Test;
class MessageSequenceNumberSendingHandlerTest {
MessageSequenceNumberSendingHandler sequenceNumberSendingHandler = new MessageSequenceNumberSendingHandler();
@Test
void shouldAddMonotonicSequence() {
for (long l = 1; l <= 100; l++) {
assertEquals(l, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
}
}
@Test
void shouldInjectMessageLoss() {
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss), 100));
}
@Test
void shouldInjectMessageDuplication() {
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup), 100));
}
@Test
void shouldInjectMessageOutOfOrder() {
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(4L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder), 100));
assertEquals(2L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(5L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(6, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
}
@Test
void shouldInjectOneOfTheSimulatedErrorsRandomly() {
Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> allErrorTypes = new HashSet<>(Arrays.asList(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.values()));
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
long previousSequenceNumber = 1L;
int outOfSequenceInjectionCounter = 0;
int messageDupCounter = 0;
int messageLossCounter = 0;
int successCounter = 0;
for (int i = 0; i < 1000; i++) {
long nextSequenceNumber = sequenceNumberSendingHandler.getNextSequenceNumber(allErrorTypes);
if (nextSequenceNumber >= previousSequenceNumber + 3) {
outOfSequenceInjectionCounter++;
} else if (nextSequenceNumber <= previousSequenceNumber) {
messageDupCounter++;
} else if (nextSequenceNumber >= previousSequenceNumber + 2) {
messageLossCounter++;
} else if (nextSequenceNumber == previousSequenceNumber + 1) {
successCounter++;
}
previousSequenceNumber = nextSequenceNumber;
}
assertTrue(outOfSequenceInjectionCounter > 0);
assertTrue(messageDupCounter > 0);
assertTrue(messageLossCounter > 0);
assertEquals(1000, outOfSequenceInjectionCounter + messageDupCounter + messageLossCounter + successCounter);
}
}

View File

@ -0,0 +1,152 @@
package io.nosqlbench.driver.pulsar.ops;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.codahale.metrics.Counter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
class ReceivedMessageSequenceTrackerTest {
Counter msgErrOutOfSeqCounter = new Counter();
Counter msgErrDuplicateCounter = new Counter();
Counter msgErrLossCounter = new Counter();
ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter);
@Test
void shouldCountersBeZeroWhenSequenceDoesntContainGaps() {
// when
for (long l = 0; l < 100L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@ParameterizedTest
@ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L})
void shouldDetectMsgLossWhenEverySecondMessageIsLost(long totalMessages) {
doShouldDetectMsgLoss(totalMessages, 2);
}
@ParameterizedTest
@ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L})
void shouldDetectMsgLossWhenEveryThirdMessageIsLost(long totalMessages) {
doShouldDetectMsgLoss(totalMessages, 3);
}
@ParameterizedTest
@ValueSource(longs = {20L, 21L, 40L, 41L, 42L, 43L, 100L})
void shouldDetectMsgLossWhenEvery21stMessageIsLost(long totalMessages) {
doShouldDetectMsgLoss(totalMessages, 21);
}
private void doShouldDetectMsgLoss(long totalMessages, int looseEveryNthMessage) {
int messagesLost = 0;
// when
boolean lastMessageWasLost = false;
for (long l = 0; l < totalMessages; l++) {
if (l % looseEveryNthMessage == 1) {
messagesLost++;
lastMessageWasLost = true;
continue;
} else {
lastMessageWasLost = false;
}
messageSequenceTracker.sequenceNumberReceived(l);
}
if (lastMessageWasLost) {
messageSequenceTracker.sequenceNumberReceived(totalMessages);
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(messagesLost, msgErrLossCounter.getCount());
}
@ParameterizedTest
@ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L})
void shouldDetectMsgDuplication(long totalMessages) {
int messagesDuplicated = 0;
// when
for (long l = 0; l < totalMessages; l++) {
if (l % 2 == 1) {
messagesDuplicated++;
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.sequenceNumberReceived(l);
}
if (totalMessages % 2 == 0) {
messageSequenceTracker.sequenceNumberReceived(totalMessages);
}
if (totalMessages < 2 * ReceivedMessageSequenceTracker.MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
messageSequenceTracker.close();
}
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(messagesDuplicated, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectSingleMessageOutOfSequence() {
// when
for (long l = 0; l < 10L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.sequenceNumberReceived(10L);
messageSequenceTracker.sequenceNumberReceived(12L);
messageSequenceTracker.sequenceNumberReceived(11L);
for (long l = 13L; l < 100L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
// then
assertEquals(1, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectMultipleMessagesOutOfSequence() {
// when
for (long l = 0; l < 10L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.sequenceNumberReceived(10L);
messageSequenceTracker.sequenceNumberReceived(14L);
messageSequenceTracker.sequenceNumberReceived(13L);
messageSequenceTracker.sequenceNumberReceived(11L);
messageSequenceTracker.sequenceNumberReceived(12L);
for (long l = 15L; l < 100L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
// then
assertEquals(2, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectIndividualMessageLoss() {
// when
for (long l = 0; l < 100L; l++) {
if (l != 11L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(1, msgErrLossCounter.getCount());
}
}