Re-add tests for MessageSequenceNumberSendingHandler and ReceivedMessageSequenceTracker

- migrate tests from NB4 to NB5
This commit is contained in:
Lari Hotari 2023-02-07 08:45:02 +02:00
parent 435872dc7f
commit f9fe79818a
2 changed files with 337 additions and 0 deletions

View File

@ -0,0 +1,90 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pulsar.util;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
class MessageSequenceNumberSendingHandlerTest {
MessageSequenceNumberSendingHandler sequenceNumberSendingHandler = new MessageSequenceNumberSendingHandler();
@Test
void shouldAddMonotonicSequence() {
for (long l = 1; l <= 100; l++) {
assertEquals(l, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
}
}
@Test
void shouldInjectMessageLoss() {
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgLoss), 100));
}
@Test
void shouldInjectMessageDuplication() {
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgDup), 100));
}
@Test
void shouldInjectMessageOutOfOrder() {
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(4L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.OutOfOrder), 100));
assertEquals(2L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(5L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
assertEquals(6, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
}
@Test
void shouldInjectOneOfTheSimulatedErrorsRandomly() {
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> allErrorTypes = new HashSet<>(Arrays.asList(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.values()));
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
long previousSequenceNumber = 1L;
int outOfSequenceInjectionCounter = 0;
int messageDupCounter = 0;
int messageLossCounter = 0;
int successCounter = 0;
for (int i = 0; i < 1000; i++) {
long nextSequenceNumber = sequenceNumberSendingHandler.getNextSequenceNumber(allErrorTypes);
if (nextSequenceNumber >= previousSequenceNumber + 3) {
outOfSequenceInjectionCounter++;
} else if (nextSequenceNumber <= previousSequenceNumber) {
messageDupCounter++;
} else if (nextSequenceNumber >= previousSequenceNumber + 2) {
messageLossCounter++;
} else if (nextSequenceNumber == previousSequenceNumber + 1) {
successCounter++;
}
previousSequenceNumber = nextSequenceNumber;
}
assertTrue(outOfSequenceInjectionCounter > 0);
assertTrue(messageDupCounter > 0);
assertTrue(messageLossCounter > 0);
assertEquals(1000, outOfSequenceInjectionCounter + messageDupCounter + messageLossCounter + successCounter);
}
}

View File

@ -0,0 +1,247 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.pulsar.util;
import com.codahale.metrics.Counter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import static org.junit.jupiter.api.Assertions.assertEquals;
class ReceivedMessageSequenceTrackerTest {
Counter msgErrOutOfSeqCounter = new Counter();
Counter msgErrDuplicateCounter = new Counter();
Counter msgErrLossCounter = new Counter();
ReceivedMessageSequenceTracker messageSequenceTracker = new ReceivedMessageSequenceTracker(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter, 20, 20);
@Test
void shouldCountersBeZeroWhenSequenceDoesntContainGaps() {
// when
for (long l = 0; l < 100L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@ParameterizedTest
@ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L})
void shouldDetectMsgLossWhenEverySecondMessageIsLost(long totalMessages) {
doShouldDetectMsgLoss(totalMessages, 2);
}
@ParameterizedTest
@ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L})
void shouldDetectMsgLossWhenEveryThirdMessageIsLost(long totalMessages) {
doShouldDetectMsgLoss(totalMessages, 3);
}
@ParameterizedTest
@ValueSource(longs = {20L, 21L, 40L, 41L, 42L, 43L, 100L})
void shouldDetectMsgLossWhenEvery21stMessageIsLost(long totalMessages) {
doShouldDetectMsgLoss(totalMessages, 21);
}
private void doShouldDetectMsgLoss(long totalMessages, int looseEveryNthMessage) {
int messagesLost = 0;
// when
boolean lastMessageWasLost = false;
for (long l = 0; l < totalMessages; l++) {
if (l % looseEveryNthMessage == 1) {
messagesLost++;
lastMessageWasLost = true;
continue;
} else {
lastMessageWasLost = false;
}
messageSequenceTracker.sequenceNumberReceived(l);
}
if (lastMessageWasLost) {
messageSequenceTracker.sequenceNumberReceived(totalMessages);
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(messagesLost, msgErrLossCounter.getCount());
}
@ParameterizedTest
@ValueSource(longs = {10L, 11L, 19L, 20L, 21L, 100L})
void shouldDetectMsgDuplication(long totalMessages) {
int messagesDuplicated = 0;
// when
for (long l = 0; l < totalMessages; l++) {
if (l % 2 == 1) {
messagesDuplicated++;
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.sequenceNumberReceived(l);
}
if (totalMessages % 2 == 0) {
messageSequenceTracker.sequenceNumberReceived(totalMessages);
}
if (totalMessages < 2 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers()) {
messageSequenceTracker.close();
}
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(messagesDuplicated, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectSingleMessageOutOfSequence() {
// when
for (long l = 0; l < 10L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.sequenceNumberReceived(10L);
messageSequenceTracker.sequenceNumberReceived(12L);
messageSequenceTracker.sequenceNumberReceived(11L);
for (long l = 13L; l < 100L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
// then
assertEquals(1, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectMultipleMessagesOutOfSequence() {
// when
for (long l = 0; l < 10L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
messageSequenceTracker.sequenceNumberReceived(10L);
messageSequenceTracker.sequenceNumberReceived(14L);
messageSequenceTracker.sequenceNumberReceived(13L);
messageSequenceTracker.sequenceNumberReceived(11L);
messageSequenceTracker.sequenceNumberReceived(12L);
for (long l = 15L; l < 100L; l++) {
messageSequenceTracker.sequenceNumberReceived(l);
}
// then
assertEquals(2, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectIndividualMessageLoss() {
// when
for (long l = 0; l < 100L; l++) {
if (l != 11L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(1, msgErrLossCounter.getCount());
}
@Test
void shouldDetectGapAndMessageDuplication() {
// when
for (long l = 0; l < 100L; l++) {
if (l != 11L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == 12L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(1, msgErrDuplicateCounter.getCount());
assertEquals(1, msgErrLossCounter.getCount());
}
@Test
void shouldDetectGapAndMessageDuplicationTimes2() {
// when
for (long l = 0; l < 100L; l++) {
if (l != 11L) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == 12L) {
messageSequenceTracker.sequenceNumberReceived(l);
messageSequenceTracker.sequenceNumberReceived(l);
}
}
messageSequenceTracker.close();
// then
assertEquals(0, msgErrOutOfSeqCounter.getCount());
assertEquals(2, msgErrDuplicateCounter.getCount());
assertEquals(1, msgErrLossCounter.getCount());
}
@Test
void shouldDetectDelayedOutOfOrderDelivery() {
// when
for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) {
if (l != 10) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) {
messageSequenceTracker.sequenceNumberReceived(10);
}
}
messageSequenceTracker.close();
// then
assertEquals(1, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
@Test
void shouldDetectDelayedOutOfOrderDeliveryOf2ConsecutiveSequenceNumbers() {
// when
for (long l = 0; l < 5 * messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers(); l++) {
if (l != 10 && l != 11) {
messageSequenceTracker.sequenceNumberReceived(l);
}
if (l == messageSequenceTracker.getMaxTrackOutOfOrderSequenceNumbers() * 2) {
messageSequenceTracker.sequenceNumberReceived(10);
messageSequenceTracker.sequenceNumberReceived(11);
}
}
messageSequenceTracker.close();
// then
assertEquals(2, msgErrOutOfSeqCounter.getCount());
assertEquals(0, msgErrDuplicateCounter.getCount());
assertEquals(0, msgErrLossCounter.getCount());
}
}