diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java index cd33e12de..fe3a27d6d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTracker.java @@ -8,26 +8,39 @@ import java.util.TreeSet; /** * Detects message loss, message duplication and out-of-order message delivery * based on a monotonic sequence number that each received message contains. - * - * Out-of-order messages are detected with a maximum look behind of 20 sequence number entries. - * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}. + *

+ * Out-of-order messages are detected with a maximum look behind of 1000 sequence number entries. + * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}. */ -class ReceivedMessageSequenceTracker implements AutoCloseable{ - public static final int MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 20; +class ReceivedMessageSequenceTracker implements AutoCloseable { + private static final int DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 1000; + private static final int DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS = 1000; // message out-of-sequence error counter private final Counter msgErrOutOfSeqCounter; // duplicate message error counter private final Counter msgErrDuplicateCounter; // message loss error counter private final Counter msgErrLossCounter; - private final SortedSet pendingOutOfSeqNumbers = new TreeSet<>(); + private final SortedSet pendingOutOfSeqNumbers; + private final int maxTrackOutOfOrderSequenceNumbers; + private final SortedSet skippedSeqNumbers; + private final int maxTrackSkippedSequenceNumbers; private long expectedNumber = -1; + public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) { + this(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter, + DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS, DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS); + } - ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) { + public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter, + int maxTrackOutOfOrderSequenceNumbers, int maxTrackSkippedSequenceNumbers) { this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter; this.msgErrDuplicateCounter = msgErrDuplicateCounter; this.msgErrLossCounter = msgErrLossCounter; + this.maxTrackOutOfOrderSequenceNumbers = maxTrackOutOfOrderSequenceNumbers; + this.maxTrackSkippedSequenceNumbers = maxTrackSkippedSequenceNumbers; + this.pendingOutOfSeqNumbers = new TreeSet<>(); + this.skippedSeqNumbers = new TreeSet<>(); } /** @@ -42,16 +55,24 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{ } if (sequenceNumber < expectedNumber) { - msgErrDuplicateCounter.inc(); + if (skippedSeqNumbers.remove(sequenceNumber)) { + // late out-of-order delivery was detected + // decrease the loss counter + msgErrLossCounter.dec(); + // increment the out-of-order counter + msgErrOutOfSeqCounter.inc(); + } else { + msgErrDuplicateCounter.inc(); + } return; } boolean messagesSkipped = false; if (sequenceNumber > expectedNumber) { - if (pendingOutOfSeqNumbers.size() == MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { + if (pendingOutOfSeqNumbers.size() == maxTrackOutOfOrderSequenceNumbers) { messagesSkipped = processLowestPendingOutOfSequenceNumber(); } - if(!pendingOutOfSeqNumbers.add(sequenceNumber)) { + if (!pendingOutOfSeqNumbers.add(sequenceNumber)) { msgErrDuplicateCounter.inc(); } } else { @@ -69,7 +90,14 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{ if (lowestOutOfSeqNumber > expectedNumber) { // skip the expected number ahead to the number after the lowest sequence number // increment the counter with the amount of sequence numbers that got skipped - msgErrLossCounter.inc(lowestOutOfSeqNumber - expectedNumber); + // keep track of the skipped sequence numbers to detect late out-of-order message delivery + for (long l = expectedNumber; l < lowestOutOfSeqNumber; l++) { + msgErrLossCounter.inc(); + skippedSeqNumbers.add(l); + if (skippedSeqNumbers.size() > maxTrackSkippedSequenceNumbers) { + skippedSeqNumbers.remove(skippedSeqNumbers.first()); + } + } expectedNumber = lowestOutOfSeqNumber + 1; return true; } else { @@ -92,7 +120,7 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{ // remove sequence numbers that are too far behind for (Iterator iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) { Long number = iterator.next(); - if (number < expectedNumber - MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { + if (number < expectedNumber - maxTrackOutOfOrderSequenceNumbers) { msgErrLossCounter.inc(); iterator.remove(); } else { @@ -111,4 +139,12 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{ processPendingOutOfSequenceNumbers(processLowestPendingOutOfSequenceNumber()); } } + + public int getMaxTrackOutOfOrderSequenceNumbers() { + return maxTrackOutOfOrderSequenceNumbers; + } + + public int getMaxTrackSkippedSequenceNumbers() { + return maxTrackSkippedSequenceNumbers; + } } diff --git a/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java index 4ea3b531e..d2391d67a 100644 --- a/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java +++ b/driver-pulsar/src/test/java/io/nosqlbench/driver/pulsar/ops/ReceivedMessageSequenceTrackerTest.java @@ -11,7 +11,7 @@ class ReceivedMessageSequenceTrackerTest { Counter msgErrOutOfSeqCounter = new Counter(); Counter msgErrDuplicateCounter = new Counter(); Counter msgErrLossCounter = new Counter(); - ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter); + ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter, 20, 20); @Test void shouldCountersBeZeroWhenSequenceDoesntContainGaps() { @@ -83,7 +83,7 @@ class ReceivedMessageSequenceTrackerTest { if (totalMessages % 2 == 0) { messageSequenceTracker.sequenceNumberReceived(totalMessages); } - if (totalMessages < 2 * ReceivedMessageSequenceTracker.MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { + if (totalMessages < 2 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers()) { messageSequenceTracker.close(); } @@ -188,4 +188,43 @@ class ReceivedMessageSequenceTrackerTest { assertEquals(1, msgErrLossCounter.getCount()); } + + @Test + void shouldDetectDelayedOutOfOrderDelivery() { + // when + for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) { + if (l != 10) { + messageSequenceTracker.sequenceNumberReceived(l); + } + if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) { + messageSequenceTracker.sequenceNumberReceived(10); + } + } + messageSequenceTracker.close(); + + // then + assertEquals(1, msgErrOutOfSeqCounter.getCount()); + assertEquals(0, msgErrDuplicateCounter.getCount()); + assertEquals(0, msgErrLossCounter.getCount()); + } + + @Test + void shouldDetectDelayedOutOfOrderDeliveryOf2ConsecutiveSequenceNumbers() { + // when + for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) { + if (l != 10 && l != 11) { + messageSequenceTracker.sequenceNumberReceived(l); + } + if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) { + messageSequenceTracker.sequenceNumberReceived(10); + messageSequenceTracker.sequenceNumberReceived(11); + } + } + messageSequenceTracker.close(); + + // then + assertEquals(2, msgErrOutOfSeqCounter.getCount()); + assertEquals(0, msgErrDuplicateCounter.getCount()); + assertEquals(0, msgErrLossCounter.getCount()); + } }