diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java index 4a6854517..53bba79c7 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java @@ -210,12 +210,17 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { } private void checkAndUpdateMessageErrorCounter(ConsumerRecord record) { - String msgSeqIdStr = new String(record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER).value()); - if ( !StringUtils.isBlank(msgSeqIdStr) ) { - long sequenceNumber = Long.parseLong(msgSeqIdStr); - ReceivedMessageSequenceTracker receivedMessageSequenceTracker = - receivedMessageSequenceTrackerForTopic.apply(record.topic()); - receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber); + Header msg_seq_number_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER); + if (msg_seq_number_header == null) { + logger.warn("Message sequence number header is null, skipping e2e message error metrics generation."); + } else { + String msgSeqIdStr = new String(msg_seq_number_header.value()); + if (!StringUtils.isBlank(msgSeqIdStr)) { + long sequenceNumber = Long.parseLong(msgSeqIdStr); + ReceivedMessageSequenceTracker receivedMessageSequenceTracker = + receivedMessageSequenceTrackerForTopic.apply(record.topic()); + receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber); + } } }