diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java index ef2511602..23051cae8 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java @@ -23,6 +23,7 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer; import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; +import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.BooleanUtils; @@ -50,10 +51,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { // - This is only relevant when the effective setting (global level and statement level) // of "enable.auto.commit" is false protected final int maxMsgCntPerCommit; - private final LongFunction e2eStartTimeSrcParamStrFunc; protected boolean autoCommitEnabled; + private final LongFunction e2eStartTimeSrcParamStrFunc; + + private final ThreadLocal> + receivedMessageSequenceTrackersForTopicThreadLocal = ThreadLocal.withInitial(HashMap::new); + protected final LongFunction seqTrackingFunc; + public MessageConsumerOpDispenser(DriverAdapter adapter, ParsedOp op, LongFunction tgtNameFunc, @@ -80,6 +86,9 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { } this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc( KafkaAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none"); + this.seqTrackingFunc = lookupStaticBoolConfigValueFunc( + KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false); + ; } private String getEffectiveGroupId(long cycle) { @@ -129,15 +138,26 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { autoCommitEnabled, maxMsgCntPerCommit, consumer, + kafkaAdapterMetrics, EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()), - kafkaAdapterMetrics - ); + this::getReceivedMessageSequenceTracker, + seqTrackingFunc.apply(cycle)); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); } return opTimeTrackKafkaClient; } + private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(String topicName) { + return receivedMessageSequenceTrackersForTopicThreadLocal.get() + .computeIfAbsent(topicName, k -> createReceivedMessageSequenceTracker()); + } + + private ReceivedMessageSequenceTracker createReceivedMessageSequenceTracker() { + return new ReceivedMessageSequenceTracker(kafkaAdapterMetrics.getMsgErrOutOfSeqCounter(), + kafkaAdapterMetrics.getMsgErrDuplicateCounter(), + kafkaAdapterMetrics.getMsgErrLossCounter()); + } protected List getEffectiveTopicNameList(long cycle) { String explicitTopicListStr = topicNameStrFunc.apply(cycle); diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java index 93f4027d3..13079125a 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java @@ -66,11 +66,6 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { LongFunction tgtNameFunc, KafkaSpace kafkaSpace) { super(adapter, op, tgtNameFunc, kafkaSpace); - // Doc-level parameter: seq_tracking - this.seqTrackingFunc = lookupStaticBoolConfigValueFunc( - PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false); - this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc(); - this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap()); producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr()); @@ -79,6 +74,11 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM); this.msgValueStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM); + + this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc(); + // Doc-level parameter: seq_tracking + this.seqTrackingFunc = lookupStaticBoolConfigValueFunc( + PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false); } private String getEffectiveClientId(long cycle) { diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java index 584c3a51a..4a6854517 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java @@ -22,6 +22,9 @@ import io.nosqlbench.adapter.kafka.KafkaSpace; import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; +import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; +import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; @@ -30,6 +33,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Map; +import java.util.function.Function; public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer"); @@ -44,6 +48,8 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { private final KafkaConsumer consumer; private Histogram e2eMsgProcLatencyHistogram; + private final Function receivedMessageSequenceTrackerForTopic; + private final boolean seqTracking; public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace, boolean asyncMsgCommit, @@ -51,8 +57,10 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { boolean autoCommitEnabled, int maxMsgCntPerCommit, KafkaConsumer consumer, + KafkaAdapterMetrics kafkaAdapterMetrics, EndToEndStartingTimeSource e2eStartingTimeSrc, - KafkaAdapterMetrics kafkaAdapterMetrics) { + Function receivedMessageSequenceTrackerForTopic, + boolean seqTracking) { super(kafkaSpace); this.msgPoolIntervalInMs = msgPoolIntervalInMs; this.asyncMsgCommit = asyncMsgCommit; @@ -61,6 +69,8 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { this.consumer = consumer; this.e2eStartingTimeSrc = e2eStartingTimeSrc; this.e2eMsgProcLatencyHistogram = kafkaAdapterMetrics.getE2eMsgProcLatencyHistogram(); + this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic; + this.seqTracking = seqTracking; } public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); } @@ -123,12 +133,14 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { for (ConsumerRecord record : records) { if (record != null) { if (logger.isDebugEnabled()) { + Header msg_seq_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER); logger.debug( - "Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({})", + "Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({}), e2e_seq_number({})", printRecvedMsg(record), record.offset(), cycle, - System.currentTimeMillis() - record.timestamp()); + System.currentTimeMillis() - record.timestamp(), + (msg_seq_header != null ? new String(msg_seq_header.value()) : "null")); } if (!autoCommitEnabled) { @@ -136,7 +148,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { if (bCommitMsg) { if (!asyncMsgCommit) { consumer.commitSync(); - updateE2ELatencyMetric(record); + checkAndUpdateMessageE2EMetrics(record); if (logger.isDebugEnabled()) { logger.debug( "Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})", @@ -153,7 +165,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { "Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})", cycle, maxMsgCntPerCommit); - updateE2ELatencyMetric(record); + checkAndUpdateMessageE2EMetrics(record); } else { logger.debug( "Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})", @@ -168,16 +180,22 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { resetManualCommitTrackingCnt(); } else { - updateE2ELatencyMetric(record); + checkAndUpdateMessageE2EMetrics(record); incManualCommitTrackingCnt(); } } - updateE2ELatencyMetric(record); + checkAndUpdateMessageE2EMetrics(record); } } } } + private void checkAndUpdateMessageE2EMetrics(ConsumerRecord record) { + // keep track of message errors and update error counters + if(seqTracking) checkAndUpdateMessageErrorCounter(record); + updateE2ELatencyMetric(record); + } + private void updateE2ELatencyMetric(ConsumerRecord record) { long startTimeStamp = 0L; switch (e2eStartingTimeSrc) { @@ -191,6 +209,16 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { } } + private void checkAndUpdateMessageErrorCounter(ConsumerRecord record) { + String msgSeqIdStr = new String(record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER).value()); + if ( !StringUtils.isBlank(msgSeqIdStr) ) { + long sequenceNumber = Long.parseLong(msgSeqIdStr); + ReceivedMessageSequenceTracker receivedMessageSequenceTracker = + receivedMessageSequenceTrackerForTopic.apply(record.topic()); + receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber); + } + } + @Override public void close() { try { diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java index e7e7458bc..f5802f185 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java @@ -103,4 +103,28 @@ public class KafkaAdapterMetrics implements NBNamedElement { public Timer getBindTimer() { return bindTimer; } public Timer getExecuteTimer() { return executeTimer; } public Histogram getMessagesizeHistogram() { return messageSizeHistogram; } + + public Counter getMsgErrOutOfSeqCounter() { + return msgErrOutOfSeqCounter; + } + + public void setMsgErrOutOfSeqCounter(Counter msgErrOutOfSeqCounter) { + this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter; + } + + public Counter getMsgErrLossCounter() { + return msgErrLossCounter; + } + + public void setMsgErrLossCounter(Counter msgErrLossCounter) { + this.msgErrLossCounter = msgErrLossCounter; + } + + public Counter getMsgErrDuplicateCounter() { + return msgErrDuplicateCounter; + } + + public void setMsgErrDuplicateCounter(Counter msgErrDuplicateCounter) { + this.msgErrDuplicateCounter = msgErrDuplicateCounter; + } } diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java index a9bd1a59d..9fc0fb4ec 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java @@ -42,7 +42,8 @@ public class KafkaAdapterUtil { public enum DOC_LEVEL_PARAMS { // Blocking message producing or consuming ASYNC_API("async_api"), - E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"); + E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"), + SEQ_TRACKING("seq_tracking"); public final String label; DOC_LEVEL_PARAMS(String label) { diff --git a/adapter-kafka/src/main/resources/kafka_consumer.yaml b/adapter-kafka/src/main/resources/kafka_consumer.yaml index 9d1e5fb61..6e2869b69 100644 --- a/adapter-kafka/src/main/resources/kafka_consumer.yaml +++ b/adapter-kafka/src/main/resources/kafka_consumer.yaml @@ -5,6 +5,10 @@ params: # - only relevant for manual commit # async_api: "true" e2e_starting_time_source: "message_publish_time" + # activates e2e error metrics (message duplication, message loss and out-of-order detection) + # it needs to be enabled both on the producer and the consumer + # - default: false + seq_tracking: "true" blocks: msg-consume-block: diff --git a/adapter-kafka/src/main/resources/kafka_producer.yaml b/adapter-kafka/src/main/resources/kafka_producer.yaml index 7760bd19a..8091ef9e0 100644 --- a/adapter-kafka/src/main/resources/kafka_producer.yaml +++ b/adapter-kafka/src/main/resources/kafka_producer.yaml @@ -9,6 +9,10 @@ params: # whether to confirm message send ack. asynchronously # - default: true async_api: "true" + # activates e2e error metrics (message duplication, message loss and out-of-order detection) + # it needs to be enabled both on the producer and the consumer + # - default: false + seq_tracking: "true" blocks: msg-produce-block: