diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java
index 48e0c0d5e..fe3a27d6d 100644
--- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java
+++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java
@@ -8,27 +8,39 @@ import java.util.TreeSet;
/**
* Detects message loss, message duplication and out-of-order message delivery
* based on a monotonic sequence number that each received message contains.
- *
- * Out-of-order messages are detected with a maximum look behind of 20 sequence number entries.
- * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}.
+ *
+ * Out-of-order messages are detected with a maximum look behind of 1000 sequence number entries.
+ * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}.
*/
-class ReceivedMessageSequenceTracker implements AutoCloseable{
- public static final int MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 20;
+class ReceivedMessageSequenceTracker implements AutoCloseable {
+ private static final int DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 1000;
+ private static final int DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS = 1000;
// message out-of-sequence error counter
private final Counter msgErrOutOfSeqCounter;
- // message out-of-sequence error counter
+ // duplicate message error counter
private final Counter msgErrDuplicateCounter;
// message loss error counter
private final Counter msgErrLossCounter;
- long expectedNumber = -1;
+ private final SortedSet pendingOutOfSeqNumbers;
+ private final int maxTrackOutOfOrderSequenceNumbers;
+ private final SortedSet skippedSeqNumbers;
+ private final int maxTrackSkippedSequenceNumbers;
+ private long expectedNumber = -1;
- SortedSet pendingOutOfSeqNumbers = new TreeSet<>();
+ public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
+ this(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter,
+ DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS, DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS);
+ }
-
- ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
+ public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter,
+ int maxTrackOutOfOrderSequenceNumbers, int maxTrackSkippedSequenceNumbers) {
this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
this.msgErrDuplicateCounter = msgErrDuplicateCounter;
this.msgErrLossCounter = msgErrLossCounter;
+ this.maxTrackOutOfOrderSequenceNumbers = maxTrackOutOfOrderSequenceNumbers;
+ this.maxTrackSkippedSequenceNumbers = maxTrackSkippedSequenceNumbers;
+ this.pendingOutOfSeqNumbers = new TreeSet<>();
+ this.skippedSeqNumbers = new TreeSet<>();
}
/**
@@ -43,16 +55,26 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
}
if (sequenceNumber < expectedNumber) {
- msgErrDuplicateCounter.inc();
+ if (skippedSeqNumbers.remove(sequenceNumber)) {
+ // late out-of-order delivery was detected
+ // decrease the loss counter
+ msgErrLossCounter.dec();
+ // increment the out-of-order counter
+ msgErrOutOfSeqCounter.inc();
+ } else {
+ msgErrDuplicateCounter.inc();
+ }
return;
}
boolean messagesSkipped = false;
if (sequenceNumber > expectedNumber) {
- if (pendingOutOfSeqNumbers.size() == MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
- messagesSkipped = processEarliestPendingOutOfSequenceNumber();
+ if (pendingOutOfSeqNumbers.size() == maxTrackOutOfOrderSequenceNumbers) {
+ messagesSkipped = processLowestPendingOutOfSequenceNumber();
+ }
+ if (!pendingOutOfSeqNumbers.add(sequenceNumber)) {
+ msgErrDuplicateCounter.inc();
}
- pendingOutOfSeqNumbers.add(sequenceNumber);
} else {
// sequenceNumber == expectedNumber
expectedNumber++;
@@ -61,15 +83,22 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
cleanUpTooFarBehindOutOfSequenceNumbers();
}
- private boolean processEarliestPendingOutOfSequenceNumber() {
- // remove the earliest pending out of sequence number
- Long earliestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
- pendingOutOfSeqNumbers.remove(earliestOutOfSeqNumber);
- if (earliestOutOfSeqNumber > expectedNumber) {
- // skip the expected number ahead to the number after the earliest sequence number
+ private boolean processLowestPendingOutOfSequenceNumber() {
+ // remove the lowest pending out of sequence number
+ Long lowestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
+ pendingOutOfSeqNumbers.remove(lowestOutOfSeqNumber);
+ if (lowestOutOfSeqNumber > expectedNumber) {
+ // skip the expected number ahead to the number after the lowest sequence number
// increment the counter with the amount of sequence numbers that got skipped
- msgErrLossCounter.inc(earliestOutOfSeqNumber - expectedNumber);
- expectedNumber = earliestOutOfSeqNumber + 1;
+ // keep track of the skipped sequence numbers to detect late out-of-order message delivery
+ for (long l = expectedNumber; l < lowestOutOfSeqNumber; l++) {
+ msgErrLossCounter.inc();
+ skippedSeqNumbers.add(l);
+ if (skippedSeqNumbers.size() > maxTrackSkippedSequenceNumbers) {
+ skippedSeqNumbers.remove(skippedSeqNumbers.first());
+ }
+ }
+ expectedNumber = lowestOutOfSeqNumber + 1;
return true;
} else {
msgErrLossCounter.inc();
@@ -91,7 +120,7 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
// remove sequence numbers that are too far behind
for (Iterator iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) {
Long number = iterator.next();
- if (number < expectedNumber - MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
+ if (number < expectedNumber - maxTrackOutOfOrderSequenceNumbers) {
msgErrLossCounter.inc();
iterator.remove();
} else {
@@ -107,7 +136,15 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
@Override
public void close() {
while (!pendingOutOfSeqNumbers.isEmpty()) {
- processPendingOutOfSequenceNumbers(processEarliestPendingOutOfSequenceNumber());
+ processPendingOutOfSequenceNumbers(processLowestPendingOutOfSequenceNumber());
}
}
+
+ public int getMaxTrackOutOfOrderSequenceNumbers() {
+ return maxTrackOutOfOrderSequenceNumbers;
+ }
+
+ public int getMaxTrackSkippedSequenceNumbers() {
+ return maxTrackSkippedSequenceNumbers;
+ }
}
diff --git a/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java
index f79c29ebd..d2391d67a 100644
--- a/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java
+++ b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java
@@ -11,7 +11,7 @@ class ReceivedMessageSequenceTrackerTest {
Counter msgErrOutOfSeqCounter = new Counter();
Counter msgErrDuplicateCounter = new Counter();
Counter msgErrLossCounter = new Counter();
- ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter);
+ ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter, 20, 20);
@Test
void shouldCountersBeZeroWhenSequenceDoesntContainGaps() {
@@ -83,7 +83,7 @@ class ReceivedMessageSequenceTrackerTest {
if (totalMessages % 2 == 0) {
messageSequenceTracker.sequenceNumberReceived(totalMessages);
}
- if (totalMessages < 2 * ReceivedMessageSequenceTracker.MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
+ if (totalMessages < 2 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers()) {
messageSequenceTracker.close();
}
@@ -149,4 +149,82 @@ class ReceivedMessageSequenceTrackerTest {
assertEquals(1, msgErrLossCounter.getCount());
}
+ @Test
+ void shouldDetectGapAndMessageDuplication() {
+ // when
+ for (long l = 0; l < 100L; l++) {
+ if (l != 11L) {
+ messageSequenceTracker.sequenceNumberReceived(l);
+ }
+ if (l == 12L) {
+ messageSequenceTracker.sequenceNumberReceived(l);
+ }
+ }
+ messageSequenceTracker.close();
+
+ // then
+ assertEquals(0, msgErrOutOfSeqCounter.getCount());
+ assertEquals(1, msgErrDuplicateCounter.getCount());
+ assertEquals(1, msgErrLossCounter.getCount());
+ }
+
+ @Test
+ void shouldDetectGapAndMessageDuplicationTimes2() {
+ // when
+ for (long l = 0; l < 100L; l++) {
+ if (l != 11L) {
+ messageSequenceTracker.sequenceNumberReceived(l);
+ }
+ if (l == 12L) {
+ messageSequenceTracker.sequenceNumberReceived(l);
+ messageSequenceTracker.sequenceNumberReceived(l);
+ }
+ }
+ messageSequenceTracker.close();
+
+ // then
+ assertEquals(0, msgErrOutOfSeqCounter.getCount());
+ assertEquals(2, msgErrDuplicateCounter.getCount());
+ assertEquals(1, msgErrLossCounter.getCount());
+ }
+
+
+ @Test
+ void shouldDetectDelayedOutOfOrderDelivery() {
+ // when
+ for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) {
+ if (l != 10) {
+ messageSequenceTracker.sequenceNumberReceived(l);
+ }
+ if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) {
+ messageSequenceTracker.sequenceNumberReceived(10);
+ }
+ }
+ messageSequenceTracker.close();
+
+ // then
+ assertEquals(1, msgErrOutOfSeqCounter.getCount());
+ assertEquals(0, msgErrDuplicateCounter.getCount());
+ assertEquals(0, msgErrLossCounter.getCount());
+ }
+
+ @Test
+ void shouldDetectDelayedOutOfOrderDeliveryOf2ConsecutiveSequenceNumbers() {
+ // when
+ for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) {
+ if (l != 10 && l != 11) {
+ messageSequenceTracker.sequenceNumberReceived(l);
+ }
+ if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) {
+ messageSequenceTracker.sequenceNumberReceived(10);
+ messageSequenceTracker.sequenceNumberReceived(11);
+ }
+ }
+ messageSequenceTracker.close();
+
+ // then
+ assertEquals(2, msgErrOutOfSeqCounter.getCount());
+ assertEquals(0, msgErrDuplicateCounter.getCount());
+ assertEquals(0, msgErrLossCounter.getCount());
+ }
}