Add support for end-2-end error metrics on the consumer

This commit is contained in:
Massimiliano Mirelli 2023-03-01 16:47:58 +02:00
parent e6c39e6f5b
commit 7d8c84c567
7 changed files with 97 additions and 16 deletions

View File

@ -23,6 +23,7 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp; import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.BooleanUtils;
@ -50,10 +51,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
// - This is only relevant when the effective setting (global level and statement level) // - This is only relevant when the effective setting (global level and statement level)
// of "enable.auto.commit" is false // of "enable.auto.commit" is false
protected final int maxMsgCntPerCommit; protected final int maxMsgCntPerCommit;
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
protected boolean autoCommitEnabled; protected boolean autoCommitEnabled;
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
private final ThreadLocal<Map<String, ReceivedMessageSequenceTracker>>
receivedMessageSequenceTrackersForTopicThreadLocal = ThreadLocal.withInitial(HashMap::new);
protected final LongFunction<Boolean> seqTrackingFunc;
public MessageConsumerOpDispenser(DriverAdapter adapter, public MessageConsumerOpDispenser(DriverAdapter adapter,
ParsedOp op, ParsedOp op,
LongFunction<String> tgtNameFunc, LongFunction<String> tgtNameFunc,
@ -80,6 +86,9 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
} }
this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc( this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none"); KafkaAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none");
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
;
} }
private String getEffectiveGroupId(long cycle) { private String getEffectiveGroupId(long cycle) {
@ -129,15 +138,26 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
autoCommitEnabled, autoCommitEnabled,
maxMsgCntPerCommit, maxMsgCntPerCommit,
consumer, consumer,
kafkaAdapterMetrics,
EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()), EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
kafkaAdapterMetrics this::getReceivedMessageSequenceTracker,
); seqTrackingFunc.apply(cycle));
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
} }
return opTimeTrackKafkaClient; return opTimeTrackKafkaClient;
} }
private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(String topicName) {
return receivedMessageSequenceTrackersForTopicThreadLocal.get()
.computeIfAbsent(topicName, k -> createReceivedMessageSequenceTracker());
}
private ReceivedMessageSequenceTracker createReceivedMessageSequenceTracker() {
return new ReceivedMessageSequenceTracker(kafkaAdapterMetrics.getMsgErrOutOfSeqCounter(),
kafkaAdapterMetrics.getMsgErrDuplicateCounter(),
kafkaAdapterMetrics.getMsgErrLossCounter());
}
protected List<String> getEffectiveTopicNameList(long cycle) { protected List<String> getEffectiveTopicNameList(long cycle) {
String explicitTopicListStr = topicNameStrFunc.apply(cycle); String explicitTopicListStr = topicNameStrFunc.apply(cycle);

View File

@ -66,11 +66,6 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
LongFunction<String> tgtNameFunc, LongFunction<String> tgtNameFunc,
KafkaSpace kafkaSpace) { KafkaSpace kafkaSpace) {
super(adapter, op, tgtNameFunc, kafkaSpace); super(adapter, op, tgtNameFunc, kafkaSpace);
// Doc-level parameter: seq_tracking
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap()); this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr()); producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
@ -79,6 +74,11 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM); this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
this.msgValueStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM); this.msgValueStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
// Doc-level parameter: seq_tracking
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
} }
private String getEffectiveClientId(long cycle) { private String getEffectiveClientId(long cycle) {

View File

@ -22,6 +22,9 @@ import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics; import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Header;
@ -30,6 +33,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer"); private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer");
@ -44,6 +48,8 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final KafkaConsumer<String, String> consumer; private final KafkaConsumer<String, String> consumer;
private Histogram e2eMsgProcLatencyHistogram; private Histogram e2eMsgProcLatencyHistogram;
private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
private final boolean seqTracking;
public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace, public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace,
boolean asyncMsgCommit, boolean asyncMsgCommit,
@ -51,8 +57,10 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
boolean autoCommitEnabled, boolean autoCommitEnabled,
int maxMsgCntPerCommit, int maxMsgCntPerCommit,
KafkaConsumer<String, String> consumer, KafkaConsumer<String, String> consumer,
KafkaAdapterMetrics kafkaAdapterMetrics,
EndToEndStartingTimeSource e2eStartingTimeSrc, EndToEndStartingTimeSource e2eStartingTimeSrc,
KafkaAdapterMetrics kafkaAdapterMetrics) { Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic,
boolean seqTracking) {
super(kafkaSpace); super(kafkaSpace);
this.msgPoolIntervalInMs = msgPoolIntervalInMs; this.msgPoolIntervalInMs = msgPoolIntervalInMs;
this.asyncMsgCommit = asyncMsgCommit; this.asyncMsgCommit = asyncMsgCommit;
@ -61,6 +69,8 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
this.consumer = consumer; this.consumer = consumer;
this.e2eStartingTimeSrc = e2eStartingTimeSrc; this.e2eStartingTimeSrc = e2eStartingTimeSrc;
this.e2eMsgProcLatencyHistogram = kafkaAdapterMetrics.getE2eMsgProcLatencyHistogram(); this.e2eMsgProcLatencyHistogram = kafkaAdapterMetrics.getE2eMsgProcLatencyHistogram();
this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic;
this.seqTracking = seqTracking;
} }
public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); } public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); }
@ -123,12 +133,14 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {
if (record != null) { if (record != null) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
Header msg_seq_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
logger.debug( logger.debug(
"Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({})", "Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({}), e2e_seq_number({})",
printRecvedMsg(record), printRecvedMsg(record),
record.offset(), record.offset(),
cycle, cycle,
System.currentTimeMillis() - record.timestamp()); System.currentTimeMillis() - record.timestamp(),
(msg_seq_header != null ? new String(msg_seq_header.value()) : "null"));
} }
if (!autoCommitEnabled) { if (!autoCommitEnabled) {
@ -136,7 +148,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
if (bCommitMsg) { if (bCommitMsg) {
if (!asyncMsgCommit) { if (!asyncMsgCommit) {
consumer.commitSync(); consumer.commitSync();
updateE2ELatencyMetric(record); checkAndUpdateMessageE2EMetrics(record);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug( logger.debug(
"Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})", "Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})",
@ -153,7 +165,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
"Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})", "Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})",
cycle, cycle,
maxMsgCntPerCommit); maxMsgCntPerCommit);
updateE2ELatencyMetric(record); checkAndUpdateMessageE2EMetrics(record);
} else { } else {
logger.debug( logger.debug(
"Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})", "Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})",
@ -168,16 +180,22 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
resetManualCommitTrackingCnt(); resetManualCommitTrackingCnt();
} else { } else {
updateE2ELatencyMetric(record); checkAndUpdateMessageE2EMetrics(record);
incManualCommitTrackingCnt(); incManualCommitTrackingCnt();
} }
} }
updateE2ELatencyMetric(record); checkAndUpdateMessageE2EMetrics(record);
} }
} }
} }
} }
private void checkAndUpdateMessageE2EMetrics(ConsumerRecord<String, String> record) {
// keep track of message errors and update error counters
if(seqTracking) checkAndUpdateMessageErrorCounter(record);
updateE2ELatencyMetric(record);
}
private void updateE2ELatencyMetric(ConsumerRecord<String, String> record) { private void updateE2ELatencyMetric(ConsumerRecord<String, String> record) {
long startTimeStamp = 0L; long startTimeStamp = 0L;
switch (e2eStartingTimeSrc) { switch (e2eStartingTimeSrc) {
@ -191,6 +209,16 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
} }
} }
private void checkAndUpdateMessageErrorCounter(ConsumerRecord<String, String> record) {
String msgSeqIdStr = new String(record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER).value());
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
long sequenceNumber = Long.parseLong(msgSeqIdStr);
ReceivedMessageSequenceTracker receivedMessageSequenceTracker =
receivedMessageSequenceTrackerForTopic.apply(record.topic());
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
}
}
@Override @Override
public void close() { public void close() {
try { try {

View File

@ -103,4 +103,28 @@ public class KafkaAdapterMetrics implements NBNamedElement {
public Timer getBindTimer() { return bindTimer; } public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return executeTimer; } public Timer getExecuteTimer() { return executeTimer; }
public Histogram getMessagesizeHistogram() { return messageSizeHistogram; } public Histogram getMessagesizeHistogram() { return messageSizeHistogram; }
public Counter getMsgErrOutOfSeqCounter() {
return msgErrOutOfSeqCounter;
}
public void setMsgErrOutOfSeqCounter(Counter msgErrOutOfSeqCounter) {
this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
}
public Counter getMsgErrLossCounter() {
return msgErrLossCounter;
}
public void setMsgErrLossCounter(Counter msgErrLossCounter) {
this.msgErrLossCounter = msgErrLossCounter;
}
public Counter getMsgErrDuplicateCounter() {
return msgErrDuplicateCounter;
}
public void setMsgErrDuplicateCounter(Counter msgErrDuplicateCounter) {
this.msgErrDuplicateCounter = msgErrDuplicateCounter;
}
} }

View File

@ -42,7 +42,8 @@ public class KafkaAdapterUtil {
public enum DOC_LEVEL_PARAMS { public enum DOC_LEVEL_PARAMS {
// Blocking message producing or consuming // Blocking message producing or consuming
ASYNC_API("async_api"), ASYNC_API("async_api"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"); E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"),
SEQ_TRACKING("seq_tracking");
public final String label; public final String label;
DOC_LEVEL_PARAMS(String label) { DOC_LEVEL_PARAMS(String label) {

View File

@ -5,6 +5,10 @@ params:
# - only relevant for manual commit # - only relevant for manual commit
# async_api: "true" # async_api: "true"
e2e_starting_time_source: "message_publish_time" e2e_starting_time_source: "message_publish_time"
# activates e2e error metrics (message duplication, message loss and out-of-order detection)
# it needs to be enabled both on the producer and the consumer
# - default: false
seq_tracking: "true"
blocks: blocks:
msg-consume-block: msg-consume-block:

View File

@ -9,6 +9,10 @@ params:
# whether to confirm message send ack. asynchronously # whether to confirm message send ack. asynchronously
# - default: true # - default: true
async_api: "true" async_api: "true"
# activates e2e error metrics (message duplication, message loss and out-of-order detection)
# it needs to be enabled both on the producer and the consumer
# - default: false
seq_tracking: "true"
blocks: blocks:
msg-produce-block: msg-produce-block: