Merge pull request #372 from lhotari/lh-detect-duplicates-after-gap

Pulsar driver: Improve message sequence tracking
This commit is contained in:
Jonathan Shook 2021-10-26 09:28:47 -05:00 committed by GitHub
commit fa78ca547c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 141 additions and 26 deletions

View File

@ -8,27 +8,39 @@ import java.util.TreeSet;
/** /**
* Detects message loss, message duplication and out-of-order message delivery * Detects message loss, message duplication and out-of-order message delivery
* based on a monotonic sequence number that each received message contains. * based on a monotonic sequence number that each received message contains.
* * <p>
* Out-of-order messages are detected with a maximum look behind of 20 sequence number entries. * Out-of-order messages are detected with a maximum look behind of 1000 sequence number entries.
* This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}. * This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}.
*/ */
class ReceivedMessageSequenceTracker implements AutoCloseable{ class ReceivedMessageSequenceTracker implements AutoCloseable {
public static final int MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 20; private static final int DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 1000;
private static final int DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS = 1000;
// message out-of-sequence error counter // message out-of-sequence error counter
private final Counter msgErrOutOfSeqCounter; private final Counter msgErrOutOfSeqCounter;
// message out-of-sequence error counter // duplicate message error counter
private final Counter msgErrDuplicateCounter; private final Counter msgErrDuplicateCounter;
// message loss error counter // message loss error counter
private final Counter msgErrLossCounter; private final Counter msgErrLossCounter;
long expectedNumber = -1; private final SortedSet<Long> pendingOutOfSeqNumbers;
private final int maxTrackOutOfOrderSequenceNumbers;
private final SortedSet<Long> skippedSeqNumbers;
private final int maxTrackSkippedSequenceNumbers;
private long expectedNumber = -1;
SortedSet<Long> pendingOutOfSeqNumbers = new TreeSet<>(); public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
this(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter,
DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS, DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS);
}
public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter,
ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) { int maxTrackOutOfOrderSequenceNumbers, int maxTrackSkippedSequenceNumbers) {
this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter; this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
this.msgErrDuplicateCounter = msgErrDuplicateCounter; this.msgErrDuplicateCounter = msgErrDuplicateCounter;
this.msgErrLossCounter = msgErrLossCounter; this.msgErrLossCounter = msgErrLossCounter;
this.maxTrackOutOfOrderSequenceNumbers = maxTrackOutOfOrderSequenceNumbers;
this.maxTrackSkippedSequenceNumbers = maxTrackSkippedSequenceNumbers;
this.pendingOutOfSeqNumbers = new TreeSet<>();
this.skippedSeqNumbers = new TreeSet<>();
} }
/** /**
@ -43,16 +55,26 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
} }
if (sequenceNumber < expectedNumber) { if (sequenceNumber < expectedNumber) {
msgErrDuplicateCounter.inc(); if (skippedSeqNumbers.remove(sequenceNumber)) {
// late out-of-order delivery was detected
// decrease the loss counter
msgErrLossCounter.dec();
// increment the out-of-order counter
msgErrOutOfSeqCounter.inc();
} else {
msgErrDuplicateCounter.inc();
}
return; return;
} }
boolean messagesSkipped = false; boolean messagesSkipped = false;
if (sequenceNumber > expectedNumber) { if (sequenceNumber > expectedNumber) {
if (pendingOutOfSeqNumbers.size() == MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { if (pendingOutOfSeqNumbers.size() == maxTrackOutOfOrderSequenceNumbers) {
messagesSkipped = processEarliestPendingOutOfSequenceNumber(); messagesSkipped = processLowestPendingOutOfSequenceNumber();
}
if (!pendingOutOfSeqNumbers.add(sequenceNumber)) {
msgErrDuplicateCounter.inc();
} }
pendingOutOfSeqNumbers.add(sequenceNumber);
} else { } else {
// sequenceNumber == expectedNumber // sequenceNumber == expectedNumber
expectedNumber++; expectedNumber++;
@ -61,15 +83,22 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
cleanUpTooFarBehindOutOfSequenceNumbers(); cleanUpTooFarBehindOutOfSequenceNumbers();
} }
private boolean processEarliestPendingOutOfSequenceNumber() { private boolean processLowestPendingOutOfSequenceNumber() {
// remove the earliest pending out of sequence number // remove the lowest pending out of sequence number
Long earliestOutOfSeqNumber = pendingOutOfSeqNumbers.first(); Long lowestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
pendingOutOfSeqNumbers.remove(earliestOutOfSeqNumber); pendingOutOfSeqNumbers.remove(lowestOutOfSeqNumber);
if (earliestOutOfSeqNumber > expectedNumber) { if (lowestOutOfSeqNumber > expectedNumber) {
// skip the expected number ahead to the number after the earliest sequence number // skip the expected number ahead to the number after the lowest sequence number
// increment the counter with the amount of sequence numbers that got skipped // increment the counter with the amount of sequence numbers that got skipped
msgErrLossCounter.inc(earliestOutOfSeqNumber - expectedNumber); // keep track of the skipped sequence numbers to detect late out-of-order message delivery
expectedNumber = earliestOutOfSeqNumber + 1; for (long l = expectedNumber; l < lowestOutOfSeqNumber; l++) {
msgErrLossCounter.inc();
skippedSeqNumbers.add(l);
if (skippedSeqNumbers.size() > maxTrackSkippedSequenceNumbers) {
skippedSeqNumbers.remove(skippedSeqNumbers.first());
}
}
expectedNumber = lowestOutOfSeqNumber + 1;
return true; return true;
} else { } else {
msgErrLossCounter.inc(); msgErrLossCounter.inc();
@ -91,7 +120,7 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
// remove sequence numbers that are too far behind // remove sequence numbers that are too far behind
for (Iterator<Long> iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) { for (Iterator<Long> iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) {
Long number = iterator.next(); Long number = iterator.next();
if (number < expectedNumber - MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { if (number < expectedNumber - maxTrackOutOfOrderSequenceNumbers) {
msgErrLossCounter.inc(); msgErrLossCounter.inc();
iterator.remove(); iterator.remove();
} else { } else {
@ -107,7 +136,15 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
@Override @Override
public void close() { public void close() {
while (!pendingOutOfSeqNumbers.isEmpty()) { while (!pendingOutOfSeqNumbers.isEmpty()) {
processPendingOutOfSequenceNumbers(processEarliestPendingOutOfSequenceNumber()); processPendingOutOfSequenceNumbers(processLowestPendingOutOfSequenceNumber());
} }
} }
public int getMaxTrackOutOfOrderSequenceNumbers() {
return maxTrackOutOfOrderSequenceNumbers;
}
public int getMaxTrackSkippedSequenceNumbers() {
return maxTrackSkippedSequenceNumbers;
}
} }

View File

@ -11,7 +11,7 @@ class ReceivedMessageSequenceTrackerTest {
Counter msgErrOutOfSeqCounter = new Counter(); Counter msgErrOutOfSeqCounter = new Counter();
Counter msgErrDuplicateCounter = new Counter(); Counter msgErrDuplicateCounter = new Counter();
Counter msgErrLossCounter = new Counter(); Counter msgErrLossCounter = new Counter();
ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter); ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter, 20, 20);
@Test @Test
void shouldCountersBeZeroWhenSequenceDoesntContainGaps() { void shouldCountersBeZeroWhenSequenceDoesntContainGaps() {
@ -83,7 +83,7 @@ class ReceivedMessageSequenceTrackerTest {
if (totalMessages % 2 == 0) { if (totalMessages % 2 == 0) {
messageSequenceTracker.sequenceNumberReceived(totalMessages); messageSequenceTracker.sequenceNumberReceived(totalMessages);
} }
if (totalMessages < 2 * ReceivedMessageSequenceTracker.MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) { if (totalMessages < 2 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers()) {
messageSequenceTracker.close(); messageSequenceTracker.close();
} }
@ -149,4 +149,82 @@ class ReceivedMessageSequenceTrackerTest {
assertEquals(1, msgErrLossCounter.getCount()); assertEquals(1, msgErrLossCounter.getCount());
} }
@Test
void shouldDetectGapAndMessageDuplication() {
// when
for (long l = 0; l < 100L; l++) {
if (l != 11L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == 12L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(1, msgErrDuplicateCounter.getCount());
assertEquals(1, msgErrLossCounter.getCount());
}
@Test
void shouldDetectGapAndMessageDuplicationTimes2() {
// when
for (long l = 0; l < 100L; l++) {
if (l != 11L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == 12L) {
messageSequenceTracker.sequenceNumberReceived(l);
messageSequenceTracker.sequenceNumberReceived(l);
}
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(2, msgErrDuplicateCounter.getCount());
assertEquals(1, msgErrLossCounter.getCount());
}
@Test
void shouldDetectDelayedOutOfOrderDelivery() {
// when
for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) {
if (l != 10) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) {
messageSequenceTracker.sequenceNumberReceived(10);
}
}
messageSequenceTracker.close();
// then
assertEquals(1, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectDelayedOutOfOrderDeliveryOf2ConsecutiveSequenceNumbers() {
// when
for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) {
if (l != 10 && l != 11) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) {
messageSequenceTracker.sequenceNumberReceived(10);
messageSequenceTracker.sequenceNumberReceived(11);
}
}
messageSequenceTracker.close();
// then
assertEquals(2, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
} }