mirror of
				https://github.com/nosqlbench/nosqlbench.git
				synced 2025-02-25 18:55:28 -06:00 
			
		
		
		
	Detect duplicates after a gap
- rename earliest -> lowest to clarify the real intention - polish code - add unit test - increase duplicate counter if pending messages already contains the sequence number
This commit is contained in:
		@@ -16,13 +16,12 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
 | 
				
			|||||||
    public static final int MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 20;
 | 
					    public static final int MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 20;
 | 
				
			||||||
    // message out-of-sequence error counter
 | 
					    // message out-of-sequence error counter
 | 
				
			||||||
    private final Counter msgErrOutOfSeqCounter;
 | 
					    private final Counter msgErrOutOfSeqCounter;
 | 
				
			||||||
    // message out-of-sequence error counter
 | 
					    // duplicate message error counter
 | 
				
			||||||
    private final Counter msgErrDuplicateCounter;
 | 
					    private final Counter msgErrDuplicateCounter;
 | 
				
			||||||
    // message loss error counter
 | 
					    // message loss error counter
 | 
				
			||||||
    private final Counter msgErrLossCounter;
 | 
					    private final Counter msgErrLossCounter;
 | 
				
			||||||
    long expectedNumber = -1;
 | 
					    private final SortedSet<Long> pendingOutOfSeqNumbers = new TreeSet<>();
 | 
				
			||||||
 | 
					    private long expectedNumber = -1;
 | 
				
			||||||
    SortedSet<Long> pendingOutOfSeqNumbers = new TreeSet<>();
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
 | 
					    ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
 | 
				
			||||||
@@ -50,9 +49,11 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
 | 
				
			|||||||
        boolean messagesSkipped = false;
 | 
					        boolean messagesSkipped = false;
 | 
				
			||||||
        if (sequenceNumber > expectedNumber) {
 | 
					        if (sequenceNumber > expectedNumber) {
 | 
				
			||||||
            if (pendingOutOfSeqNumbers.size() == MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
 | 
					            if (pendingOutOfSeqNumbers.size() == MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS) {
 | 
				
			||||||
                messagesSkipped = processEarliestPendingOutOfSequenceNumber();
 | 
					                messagesSkipped = processLowestPendingOutOfSequenceNumber();
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            if(!pendingOutOfSeqNumbers.add(sequenceNumber)) {
 | 
				
			||||||
 | 
					                msgErrDuplicateCounter.inc();
 | 
				
			||||||
            }
 | 
					            }
 | 
				
			||||||
            pendingOutOfSeqNumbers.add(sequenceNumber);
 | 
					 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            // sequenceNumber == expectedNumber
 | 
					            // sequenceNumber == expectedNumber
 | 
				
			||||||
            expectedNumber++;
 | 
					            expectedNumber++;
 | 
				
			||||||
@@ -61,15 +62,15 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
 | 
				
			|||||||
        cleanUpTooFarBehindOutOfSequenceNumbers();
 | 
					        cleanUpTooFarBehindOutOfSequenceNumbers();
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private boolean processEarliestPendingOutOfSequenceNumber() {
 | 
					    private boolean processLowestPendingOutOfSequenceNumber() {
 | 
				
			||||||
        // remove the earliest pending out of sequence number
 | 
					        // remove the lowest pending out of sequence number
 | 
				
			||||||
        Long earliestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
 | 
					        Long lowestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
 | 
				
			||||||
        pendingOutOfSeqNumbers.remove(earliestOutOfSeqNumber);
 | 
					        pendingOutOfSeqNumbers.remove(lowestOutOfSeqNumber);
 | 
				
			||||||
        if (earliestOutOfSeqNumber > expectedNumber) {
 | 
					        if (lowestOutOfSeqNumber > expectedNumber) {
 | 
				
			||||||
            // skip the expected number ahead to the number after the earliest sequence number
 | 
					            // skip the expected number ahead to the number after the lowest sequence number
 | 
				
			||||||
            // increment the counter with the amount of sequence numbers that got skipped
 | 
					            // increment the counter with the amount of sequence numbers that got skipped
 | 
				
			||||||
            msgErrLossCounter.inc(earliestOutOfSeqNumber - expectedNumber);
 | 
					            msgErrLossCounter.inc(lowestOutOfSeqNumber - expectedNumber);
 | 
				
			||||||
            expectedNumber = earliestOutOfSeqNumber + 1;
 | 
					            expectedNumber = lowestOutOfSeqNumber + 1;
 | 
				
			||||||
            return true;
 | 
					            return true;
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            msgErrLossCounter.inc();
 | 
					            msgErrLossCounter.inc();
 | 
				
			||||||
@@ -107,7 +108,7 @@ class ReceivedMessageSequenceTracker implements AutoCloseable{
 | 
				
			|||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    public void close() {
 | 
					    public void close() {
 | 
				
			||||||
        while (!pendingOutOfSeqNumbers.isEmpty()) {
 | 
					        while (!pendingOutOfSeqNumbers.isEmpty()) {
 | 
				
			||||||
            processPendingOutOfSequenceNumbers(processEarliestPendingOutOfSequenceNumber());
 | 
					            processPendingOutOfSequenceNumbers(processLowestPendingOutOfSequenceNumber());
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -149,4 +149,43 @@ class ReceivedMessageSequenceTrackerTest {
 | 
				
			|||||||
        assertEquals(1, msgErrLossCounter.getCount());
 | 
					        assertEquals(1, msgErrLossCounter.getCount());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    void shouldDetectGapAndMessageDuplication() {
 | 
				
			||||||
 | 
					        // when
 | 
				
			||||||
 | 
					        for (long l = 0; l < 100L; l++) {
 | 
				
			||||||
 | 
					            if (l != 11L) {
 | 
				
			||||||
 | 
					                messageSequenceTracker.sequenceNumberReceived(l);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            if (l == 12L) {
 | 
				
			||||||
 | 
					                messageSequenceTracker.sequenceNumberReceived(l);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        messageSequenceTracker.close();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // then
 | 
				
			||||||
 | 
					        assertEquals(0, msgErrOutOfSeqCounter.getCount());
 | 
				
			||||||
 | 
					        assertEquals(1, msgErrDuplicateCounter.getCount());
 | 
				
			||||||
 | 
					        assertEquals(1, msgErrLossCounter.getCount());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    @Test
 | 
				
			||||||
 | 
					    void shouldDetectGapAndMessageDuplicationTimes2() {
 | 
				
			||||||
 | 
					        // when
 | 
				
			||||||
 | 
					        for (long l = 0; l < 100L; l++) {
 | 
				
			||||||
 | 
					            if (l != 11L) {
 | 
				
			||||||
 | 
					                messageSequenceTracker.sequenceNumberReceived(l);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            if (l == 12L) {
 | 
				
			||||||
 | 
					                messageSequenceTracker.sequenceNumberReceived(l);
 | 
				
			||||||
 | 
					                messageSequenceTracker.sequenceNumberReceived(l);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        messageSequenceTracker.close();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        // then
 | 
				
			||||||
 | 
					        assertEquals(0, msgErrOutOfSeqCounter.getCount());
 | 
				
			||||||
 | 
					        assertEquals(2, msgErrDuplicateCounter.getCount());
 | 
				
			||||||
 | 
					        assertEquals(1, msgErrLossCounter.getCount());
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user