Handle NPE thrown on empty message sequence number header

This commit is contained in:
Massimiliano Mirelli 2023-03-02 10:09:28 +02:00
parent 31fe98b402
commit 97d2a01675

View File

@ -210,12 +210,17 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
}
private void checkAndUpdateMessageErrorCounter(ConsumerRecord<String, String> record) {
String msgSeqIdStr = new String(record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER).value());
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
long sequenceNumber = Long.parseLong(msgSeqIdStr);
ReceivedMessageSequenceTracker receivedMessageSequenceTracker =
receivedMessageSequenceTrackerForTopic.apply(record.topic());
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
Header msg_seq_number_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
if (msg_seq_number_header == null) {
logger.warn("Message sequence number header is null, skipping e2e message error metrics generation.");
} else {
String msgSeqIdStr = new String(msg_seq_number_header.value());
if (!StringUtils.isBlank(msgSeqIdStr)) {
long sequenceNumber = Long.parseLong(msgSeqIdStr);
ReceivedMessageSequenceTracker receivedMessageSequenceTracker =
receivedMessageSequenceTrackerForTopic.apply(record.topic());
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
}
}
}