mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Simplify handling NPE thrown on empty message sequence number header
This commit is contained in:
@@ -211,16 +211,14 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
|
|||||||
|
|
||||||
private void checkAndUpdateMessageErrorCounter(ConsumerRecord<String, String> record) {
|
private void checkAndUpdateMessageErrorCounter(ConsumerRecord<String, String> record) {
|
||||||
Header msg_seq_number_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
|
Header msg_seq_number_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
|
||||||
if (msg_seq_number_header == null) {
|
String msgSeqIdStr = msg_seq_number_header != null ? new String(msg_seq_number_header.value()) : StringUtils.EMPTY;
|
||||||
logger.warn("Message sequence number header is null, skipping e2e message error metrics generation.");
|
if (!StringUtils.isBlank(msgSeqIdStr)) {
|
||||||
|
long sequenceNumber = Long.parseLong(msgSeqIdStr);
|
||||||
|
ReceivedMessageSequenceTracker receivedMessageSequenceTracker =
|
||||||
|
receivedMessageSequenceTrackerForTopic.apply(record.topic());
|
||||||
|
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
|
||||||
} else {
|
} else {
|
||||||
String msgSeqIdStr = new String(msg_seq_number_header.value());
|
logger.warn("Message sequence number header is null, skipping e2e message error metrics generation.");
|
||||||
if (!StringUtils.isBlank(msgSeqIdStr)) {
|
|
||||||
long sequenceNumber = Long.parseLong(msgSeqIdStr);
|
|
||||||
ReceivedMessageSequenceTracker receivedMessageSequenceTracker =
|
|
||||||
receivedMessageSequenceTrackerForTopic.apply(record.topic());
|
|
||||||
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user