From b33389824980c41c7d227a465ffa5875287763d1 Mon Sep 17 00:00:00 2001 From: Massimiliano Mirelli Date: Tue, 7 Mar 2023 14:06:15 +0200 Subject: [PATCH] Decouple e2e error metrics from adapter modules Add classes implementing e2e error metrics to adapters-api module in `import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker` --- adapter-kafka/pom.xml | 7 ---- .../MessageConsumerOpDispenser.java | 2 +- .../MessageProducerOpDispenser.java | 18 ++++----- .../kafka/ops/OpTimeTrackKafkaConsumer.java | 7 ++-- .../kafka/ops/OpTimeTrackKafkaProducer.java | 10 ++--- .../adapter/kafka/util/KafkaAdapterUtil.java | 3 +- .../MessageConsumerOpDispenser.java | 2 +- .../dispensers/PulsarClientOpDispenser.java | 11 +++--- .../adapter/pulsar/ops/MessageConsumerOp.java | 1 + .../adapter/pulsar/ops/MessageProducerOp.java | 7 ++-- .../pulsar/util/PulsarAdapterUtil.java | 31 ---------------- .../metrics/EndToEndMetricsAdapterUtil.java | 37 +++++++++++++++++++ .../MessageSequenceNumberSendingHandler.java | 10 ++--- .../ReceivedMessageSequenceTracker.java | 2 +- ...ssageSequenceNumberSendingHandlerTest.java | 10 ++--- .../ReceivedMessageSequenceTrackerTest.java | 2 +- 16 files changed, 81 insertions(+), 79 deletions(-) create mode 100644 adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/EndToEndMetricsAdapterUtil.java rename {adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util => adapters-api/src/main/java/io/nosqlbench/engine/api/metrics}/MessageSequenceNumberSendingHandler.java (85%) rename {adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util => adapters-api/src/main/java/io/nosqlbench/engine/api/metrics}/ReceivedMessageSequenceTracker.java (99%) rename {adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util => adapters-api/src/test/java/io/nosqlbench/engine/api/metrics}/MessageSequenceNumberSendingHandlerTest.java (87%) rename {adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util => adapters-api/src/test/java/io/nosqlbench/engine/api/metrics}/ReceivedMessageSequenceTrackerTest.java (99%) diff --git a/adapter-kafka/pom.xml b/adapter-kafka/pom.xml index 68124f4b6..22f77558e 100644 --- a/adapter-kafka/pom.xml +++ b/adapter-kafka/pom.xml @@ -76,13 +76,6 @@ commons-configuration2 2.8.0 - - - ${project.groupId} - adapter-pulsar - ${project.version} - compile - diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java index 23051cae8..3cc62595f 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java @@ -23,7 +23,7 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer; import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; -import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker; +import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.BooleanUtils; diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java index 13079125a..0a8170717 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java @@ -22,8 +22,8 @@ import io.nosqlbench.adapter.kafka.ops.KafkaOp; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; -import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.KafkaProducer; @@ -59,7 +59,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { private final LongFunction msgKeyStrFunc; private final LongFunction msgValueStrFunc; protected final LongFunction seqTrackingFunc; - protected final LongFunction> msgSeqErrSimuTypeSetFunc; + protected final LongFunction> msgSeqErrSimuTypeSetFunc; public MessageProducerOpDispenser(DriverAdapter adapter, ParsedOp op, @@ -78,7 +78,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc(); // Doc-level parameter: seq_tracking this.seqTrackingFunc = lookupStaticBoolConfigValueFunc( - PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false); + KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false); } private String getEffectiveClientId(long cycle) { @@ -226,17 +226,17 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { message); } - protected LongFunction> getStaticErrSimuTypeSetOpValueFunc() { - LongFunction> setStringLongFunction; + protected LongFunction> getStaticErrSimuTypeSetOpValueFunc() { + LongFunction> setStringLongFunction; setStringLongFunction = (l) -> - parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class) + parsedOp.getOptionalStaticValue(KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class) .filter(Predicate.not(String::isEmpty)) .map(value -> { - Set set = new HashSet<>(); + Set set = new HashSet<>(); if (StringUtils.contains(value,',')) { set = Arrays.stream(value.split(",")) - .map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType) + .map(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType) .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toCollection(LinkedHashSet::new)); @@ -245,7 +245,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { return set; }).orElse(Collections.emptySet()); logger.info( - PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}", + KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}", setStringLongFunction.apply(0)); return setStringLongFunction; } diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java index 9492ff331..91bd54f40 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java @@ -22,8 +22,7 @@ import io.nosqlbench.adapter.kafka.KafkaSpace; import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; -import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; -import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker; +import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; @@ -133,7 +132,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { for (ConsumerRecord record : records) { if (record != null) { if (logger.isDebugEnabled()) { - Header msg_seq_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER); + Header msg_seq_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER); logger.debug( "Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({}), e2e_seq_number({})", printRecvedMsg(record), @@ -210,7 +209,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { } private void checkAndUpdateMessageErrorCounter(ConsumerRecord record) { - Header msg_seq_number_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER); + Header msg_seq_number_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER); String msgSeqIdStr = msg_seq_number_header != null ? new String(msg_seq_number_header.value()) : StringUtils.EMPTY; if (!StringUtils.isBlank(msgSeqIdStr)) { long sequenceNumber = Long.parseLong(msgSeqIdStr); diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java index b3be470fb..c5e032427 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java @@ -20,8 +20,8 @@ package io.nosqlbench.adapter.kafka.ops; import io.nosqlbench.adapter.kafka.KafkaSpace; import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; -import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler; -import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; +import io.nosqlbench.engine.api.metrics.MessageSequenceNumberSendingHandler; +import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -53,7 +53,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { private ThreadLocal> MessageSequenceNumberSendingHandlersThreadLocal = ThreadLocal.withInitial(HashMap::new); private final boolean seqTracking; - private final Set errSimuTypeSet; + private final Set errSimuTypeSet; enum TxnProcResult { SUCCESS, @@ -77,7 +77,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { boolean transactEnabledConfig, int txnBatchNum, boolean seqTracking, - Set errSimuTypeSet, + Set errSimuTypeSet, KafkaProducer producer) { super(kafkaSpace); this.asyncMsgAck = asyncMsgAck; @@ -209,7 +209,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { if (seqTracking) { long nextSequenceNumber = getMessageSequenceNumberSendingHandler(message.topic()) .getNextSequenceNumber(errSimuTypeSet); - message.headers().add(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber).getBytes()); + message.headers().add(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber).getBytes()); } try { if (result == TxnProcResult.SUCCESS) { diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java index 9fc0fb4ec..0c659ad6e 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java @@ -31,7 +31,7 @@ import java.util.Map; import java.util.stream.Collectors; public class KafkaAdapterUtil { - + public static final String MSG_SEQUENCE_NUMBER = "sequence_number"; private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class); public static String DFT_CONSUMER_GROUP_NAME_PREFIX = "nbKafkaGrp"; @@ -42,6 +42,7 @@ public class KafkaAdapterUtil { public enum DOC_LEVEL_PARAMS { // Blocking message producing or consuming ASYNC_API("async_api"), + SEQERR_SIMU("seqerr_simu"), E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"), SEQ_TRACKING("seq_tracking"); public final String label; diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageConsumerOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageConsumerOpDispenser.java index 4a60adeb8..b2bbeeb24 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageConsumerOpDispenser.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/MessageConsumerOpDispenser.java @@ -20,7 +20,7 @@ import io.nosqlbench.adapter.pulsar.PulsarSpace; import io.nosqlbench.adapter.pulsar.ops.MessageConsumerOp; import io.nosqlbench.adapter.pulsar.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; -import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker; +import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.logging.log4j.LogManager; diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java index 87e3c32fb..9d407b089 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java @@ -20,6 +20,7 @@ import com.codahale.metrics.Timer; import io.nosqlbench.adapter.pulsar.PulsarSpace; import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -49,7 +50,7 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser { protected final LongFunction seqTrackingFunc; protected final LongFunction payloadRttFieldFunc; protected final LongFunction> transactSupplierFunc; - protected final LongFunction> msgSeqErrSimuTypeSetFunc; + protected final LongFunction> msgSeqErrSimuTypeSetFunc; public PulsarClientOpDispenser(DriverAdapter adapter, ParsedOp op, @@ -101,17 +102,17 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser { }; } - protected LongFunction> getStaticErrSimuTypeSetOpValueFunc() { - LongFunction> setStringLongFunction; + protected LongFunction> getStaticErrSimuTypeSetOpValueFunc() { + LongFunction> setStringLongFunction; setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class) .filter(Predicate.not(String::isEmpty)) .map(value -> { - Set set = new HashSet<>(); + Set set = new HashSet<>(); if (StringUtils.contains(value,',')) { set = Arrays.stream(value.split(",")) - .map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType) + .map(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType) .filter(Optional::isPresent) .map(Optional::get) .collect(Collectors.toCollection(LinkedHashSet::new)); diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageConsumerOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageConsumerOp.java index 40b81ceac..f71978d3d 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageConsumerOp.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageConsumerOp.java @@ -20,6 +20,7 @@ import com.codahale.metrics.Timer; import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException; import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException; import io.nosqlbench.adapter.pulsar.util.*; +import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageProducerOp.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageProducerOp.java index 0a226429c..f1de93ddf 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageProducerOp.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/ops/MessageProducerOp.java @@ -19,10 +19,11 @@ package io.nosqlbench.adapter.pulsar.ops; import com.codahale.metrics.Timer; import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException; import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException; -import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler; +import io.nosqlbench.engine.api.metrics.MessageSequenceNumberSendingHandler; import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics; import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; import io.nosqlbench.adapter.pulsar.util.PulsarAvroSchemaUtil; +import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -49,7 +50,7 @@ public class MessageProducerOp extends PulsarClientOp { private final boolean useTransact; private final boolean seqTracking; private final Supplier transactSupplier; - private final Set errSimuTypeSet; + private final Set errSimuTypeSet; private final Producer producer; private final String msgKey; private final String msgPropRawJsonStr; @@ -66,7 +67,7 @@ public class MessageProducerOp extends PulsarClientOp { boolean useTransact, boolean seqTracking, Supplier transactSupplier, - Set errSimuTypeSet, + Set errSimuTypeSet, Producer producer, String msgKey, String msgProp, diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java index 890957ff4..dc95192b8 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java @@ -33,9 +33,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.Base64; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -68,35 +66,6 @@ public class PulsarAdapterUtil { } } - /////// - // Message processing sequence error simulation types - public enum MSG_SEQ_ERROR_SIMU_TYPE { - OutOfOrder("out_of_order"), - MsgLoss("msg_loss"), - MsgDup("msg_dup"); - - public final String label; - - MSG_SEQ_ERROR_SIMU_TYPE(String label) { - this.label = label; - } - - private static final Map MAPPING = Stream.of(values()) - .flatMap(simuType -> - Stream.of(simuType.label, - simuType.label.toLowerCase(), - simuType.label.toUpperCase(), - simuType.name(), - simuType.name().toLowerCase(), - simuType.name().toUpperCase()) - .distinct().map(key -> Map.entry(key, simuType))) - .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); - - public static Optional parseSimuType(String simuTypeString) { - return Optional.ofNullable(MAPPING.get(simuTypeString.trim())); - } - } - /////// // Valid Pulsar API type public enum PULSAR_API_TYPE { diff --git a/adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/EndToEndMetricsAdapterUtil.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/EndToEndMetricsAdapterUtil.java new file mode 100644 index 000000000..f99d7f0ca --- /dev/null +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/EndToEndMetricsAdapterUtil.java @@ -0,0 +1,37 @@ +package io.nosqlbench.engine.api.metrics; + +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class EndToEndMetricsAdapterUtil { + /////// + // Message processing sequence error simulation types + public enum MSG_SEQ_ERROR_SIMU_TYPE { + OutOfOrder("out_of_order"), + MsgLoss("msg_loss"), + MsgDup("msg_dup"); + + public final String label; + + MSG_SEQ_ERROR_SIMU_TYPE(String label) { + this.label = label; + } + + private static final Map MAPPING = Stream.of(values()) + .flatMap(simuType -> + Stream.of(simuType.label, + simuType.label.toLowerCase(), + simuType.label.toUpperCase(), + simuType.name(), + simuType.name().toLowerCase(), + simuType.name().toUpperCase()) + .distinct().map(key -> Map.entry(key, simuType))) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); + + public static Optional parseSimuType(String simuTypeString) { + return Optional.ofNullable(MAPPING.get(simuTypeString.trim())); + } + } +} diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/MessageSequenceNumberSendingHandler.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/MessageSequenceNumberSendingHandler.java similarity index 85% rename from adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/MessageSequenceNumberSendingHandler.java rename to adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/MessageSequenceNumberSendingHandler.java index 21208642a..8c24faa31 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/MessageSequenceNumberSendingHandler.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/MessageSequenceNumberSendingHandler.java @@ -1,4 +1,4 @@ -package io.nosqlbench.adapter.pulsar.util; +package io.nosqlbench.engine.api.metrics; /* * Copyright (c) 2022 nosqlbench @@ -33,16 +33,16 @@ public class MessageSequenceNumberSendingHandler { long number = 1; Queue outOfOrderNumbers; - public long getNextSequenceNumber(Set simulatedErrorTypes) { + public long getNextSequenceNumber(Set simulatedErrorTypes) { return getNextSequenceNumber(simulatedErrorTypes, SIMULATED_ERROR_PROBABILITY_PERCENTAGE); } - long getNextSequenceNumber(Set simulatedErrorTypes, int errorProbabilityPercentage) { + long getNextSequenceNumber(Set simulatedErrorTypes, int errorProbabilityPercentage) { simulateError(simulatedErrorTypes, errorProbabilityPercentage); return nextNumber(); } - private void simulateError(Set simulatedErrorTypes, int errorProbabilityPercentage) { + private void simulateError(Set simulatedErrorTypes, int errorProbabilityPercentage) { if (!simulatedErrorTypes.isEmpty() && shouldSimulateError(errorProbabilityPercentage)) { int selectIndex = 0; int numberOfErrorTypes = simulatedErrorTypes.size(); @@ -50,7 +50,7 @@ public class MessageSequenceNumberSendingHandler { // pick one of the simulated error type randomly selectIndex = RandomUtils.nextInt(0, numberOfErrorTypes); } - PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream() + EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream() .skip(selectIndex) .findFirst() .get(); diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java b/adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/ReceivedMessageSequenceTracker.java similarity index 99% rename from adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java rename to adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/ReceivedMessageSequenceTracker.java index f929ab25a..b18c12ab2 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTracker.java +++ b/adapters-api/src/main/java/io/nosqlbench/engine/api/metrics/ReceivedMessageSequenceTracker.java @@ -1,4 +1,4 @@ -package io.nosqlbench.adapter.pulsar.util; +package io.nosqlbench.engine.api.metrics; /* * Copyright (c) 2022 nosqlbench diff --git a/adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util/MessageSequenceNumberSendingHandlerTest.java b/adapters-api/src/test/java/io/nosqlbench/engine/api/metrics/MessageSequenceNumberSendingHandlerTest.java similarity index 87% rename from adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util/MessageSequenceNumberSendingHandlerTest.java rename to adapters-api/src/test/java/io/nosqlbench/engine/api/metrics/MessageSequenceNumberSendingHandlerTest.java index 9b16a7892..16cd5074a 100644 --- a/adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util/MessageSequenceNumberSendingHandlerTest.java +++ b/adapters-api/src/test/java/io/nosqlbench/engine/api/metrics/MessageSequenceNumberSendingHandlerTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.nosqlbench.adapter.pulsar.util; +package io.nosqlbench.engine.api.metrics; import org.junit.jupiter.api.Test; @@ -39,19 +39,19 @@ class MessageSequenceNumberSendingHandlerTest { @Test void shouldInjectMessageLoss() { assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); - assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgLoss), 100)); + assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgLoss), 100)); } @Test void shouldInjectMessageDuplication() { assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); - assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgDup), 100)); + assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgDup), 100)); } @Test void shouldInjectMessageOutOfOrder() { assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); - assertEquals(4L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.OutOfOrder), 100)); + assertEquals(4L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.OutOfOrder), 100)); assertEquals(2L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); assertEquals(5L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); @@ -60,7 +60,7 @@ class MessageSequenceNumberSendingHandlerTest { @Test void shouldInjectOneOfTheSimulatedErrorsRandomly() { - Set allErrorTypes = new HashSet<>(Arrays.asList(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.values())); + Set allErrorTypes = new HashSet<>(Arrays.asList(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.values())); assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet())); long previousSequenceNumber = 1L; diff --git a/adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTrackerTest.java b/adapters-api/src/test/java/io/nosqlbench/engine/api/metrics/ReceivedMessageSequenceTrackerTest.java similarity index 99% rename from adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTrackerTest.java rename to adapters-api/src/test/java/io/nosqlbench/engine/api/metrics/ReceivedMessageSequenceTrackerTest.java index 6b66e5f4d..505982d76 100644 --- a/adapter-pulsar/src/test/java/io/nosqlbench/adapter/pulsar/util/ReceivedMessageSequenceTrackerTest.java +++ b/adapters-api/src/test/java/io/nosqlbench/engine/api/metrics/ReceivedMessageSequenceTrackerTest.java @@ -15,7 +15,7 @@ * under the License. */ -package io.nosqlbench.adapter.pulsar.util; +package io.nosqlbench.engine.api.metrics; import com.codahale.metrics.Counter; import org.junit.jupiter.api.Test;