Add e2e message publish metrics to kafka consumer

The metrics are computed using the kafka client API record.timestamp()ˆ
This commit is contained in:
Massimiliano Mirelli 2023-02-23 17:31:36 +02:00
parent c516baaedd
commit e70952e501
6 changed files with 73 additions and 7 deletions

View File

@ -21,6 +21,7 @@ import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp; import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp; import io.nosqlbench.engine.api.templating.ParsedOp;
@ -49,6 +50,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
// - This is only relevant when the effective setting (global level and statement level) // - This is only relevant when the effective setting (global level and statement level)
// of "enable.auto.commit" is false // of "enable.auto.commit" is false
protected final int maxMsgCntPerCommit; protected final int maxMsgCntPerCommit;
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
protected boolean autoCommitEnabled; protected boolean autoCommitEnabled;
@ -76,6 +78,8 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
this.autoCommitEnabled = BooleanUtils.toBoolean(consumerClientConfMap.get("enable.auto.commit")); this.autoCommitEnabled = BooleanUtils.toBoolean(consumerClientConfMap.get("enable.auto.commit"));
} }
} }
this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none");
} }
private String getEffectiveGroupId(long cycle) { private String getEffectiveGroupId(long cycle) {
@ -119,7 +123,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
} }
opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer( opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(
kafkaSpace, asyncAPI, msgPollIntervalInSec, autoCommitEnabled, maxMsgCntPerCommit, consumer); kafkaSpace,
asyncAPI,
msgPollIntervalInSec,
autoCommitEnabled,
maxMsgCntPerCommit,
consumer,
EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
kafkaAdapterMetrics
);
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
} }

View File

@ -17,7 +17,10 @@
package io.nosqlbench.adapter.kafka.ops; package io.nosqlbench.adapter.kafka.ops;
import com.codahale.metrics.Histogram;
import io.nosqlbench.adapter.kafka.KafkaSpace; import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartition;
@ -30,7 +33,7 @@ import java.util.Map;
public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer"); private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer");
private final EndToEndStartingTimeSource e2eStartingTimeSrc;
private final int msgPoolIntervalInMs; private final int msgPoolIntervalInMs;
private final boolean asyncMsgCommit; private final boolean asyncMsgCommit;
private final boolean autoCommitEnabled; private final boolean autoCommitEnabled;
@ -40,19 +43,24 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final ThreadLocal<Integer> manualCommitTrackingCnt = ThreadLocal.withInitial(() -> 0); private final ThreadLocal<Integer> manualCommitTrackingCnt = ThreadLocal.withInitial(() -> 0);
private final KafkaConsumer<String, String> consumer; private final KafkaConsumer<String, String> consumer;
private Histogram e2eMsgProcLatencyHistogram;
public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace, public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace,
boolean asyncMsgCommit, boolean asyncMsgCommit,
int msgPoolIntervalInMs, int msgPoolIntervalInMs,
boolean autoCommitEnabled, boolean autoCommitEnabled,
int maxMsgCntPerCommit, int maxMsgCntPerCommit,
KafkaConsumer<String, String> consumer) { KafkaConsumer<String, String> consumer,
EndToEndStartingTimeSource e2eStartingTimeSrc,
KafkaAdapterMetrics kafkaAdapterMetrics) {
super(kafkaSpace); super(kafkaSpace);
this.msgPoolIntervalInMs = msgPoolIntervalInMs; this.msgPoolIntervalInMs = msgPoolIntervalInMs;
this.asyncMsgCommit = asyncMsgCommit; this.asyncMsgCommit = asyncMsgCommit;
this.autoCommitEnabled = autoCommitEnabled; this.autoCommitEnabled = autoCommitEnabled;
this.maxMsgCntPerCommit = maxMsgCntPerCommit; this.maxMsgCntPerCommit = maxMsgCntPerCommit;
this.consumer = consumer; this.consumer = consumer;
this.e2eStartingTimeSrc = e2eStartingTimeSrc;
this.e2eMsgProcLatencyHistogram = kafkaAdapterMetrics.getE2eMsgProcLatencyHistogram();
} }
public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); } public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); }
@ -128,7 +136,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
if (bCommitMsg) { if (bCommitMsg) {
if (!asyncMsgCommit) { if (!asyncMsgCommit) {
consumer.commitSync(); consumer.commitSync();
updateE2ELatencyMetric(record);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug( logger.debug(
"Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})", "Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})",
@ -145,6 +153,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
"Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})", "Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})",
cycle, cycle,
maxMsgCntPerCommit); maxMsgCntPerCommit);
updateE2ELatencyMetric(record);
} else { } else {
logger.debug( logger.debug(
"Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})", "Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})",
@ -159,14 +168,29 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
resetManualCommitTrackingCnt(); resetManualCommitTrackingCnt();
} else { } else {
updateE2ELatencyMetric(record);
incManualCommitTrackingCnt(); incManualCommitTrackingCnt();
} }
} }
updateE2ELatencyMetric(record);
} }
} }
} }
} }
private void updateE2ELatencyMetric(ConsumerRecord<String, String> record) {
long startTimeStamp = 0L;
switch (e2eStartingTimeSrc) {
case MESSAGE_PUBLISH_TIME:
startTimeStamp = record.timestamp();
break;
}
if (startTimeStamp != 0L) {
long e2eMsgLatency = System.currentTimeMillis() - startTimeStamp;
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
}
@Override @Override
public void close() { public void close() {
try { try {

View File

@ -0,0 +1,23 @@
package io.nosqlbench.adapter.kafka.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
public enum EndToEndStartingTimeSource {
NONE, // no end-to-end latency calculation
MESSAGE_PUBLISH_TIME, // use message publish timestamp
}

View File

@ -32,6 +32,11 @@ public class KafkaAdapterMetrics implements NBNamedElement {
private Histogram messageSizeHistogram; private Histogram messageSizeHistogram;
private Timer bindTimer; private Timer bindTimer;
private Timer executeTimer; private Timer executeTimer;
public Histogram getE2eMsgProcLatencyHistogram() {
return e2eMsgProcLatencyHistogram;
}
// end-to-end latency // end-to-end latency
private Histogram e2eMsgProcLatencyHistogram; private Histogram e2eMsgProcLatencyHistogram;
private KafkaBaseOpDispenser kafkaBaseOpDispenser; private KafkaBaseOpDispenser kafkaBaseOpDispenser;

View File

@ -41,7 +41,8 @@ public class KafkaAdapterUtil {
// Valid document level parameters for JMS NB yaml file // Valid document level parameters for JMS NB yaml file
public enum DOC_LEVEL_PARAMS { public enum DOC_LEVEL_PARAMS {
// Blocking message producing or consuming // Blocking message producing or consuming
ASYNC_API("async_api"); ASYNC_API("async_api"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source");
public final String label; public final String label;
DOC_LEVEL_PARAMS(String label) { DOC_LEVEL_PARAMS(String label) {

View File

@ -3,7 +3,8 @@ params:
# Whether to commit message asynchronously # Whether to commit message asynchronously
# - default: true # - default: true
# - only relevant for manual commit # - only relevant for manual commit
async_api: "true" # async_api: "true"
e2e_starting_time_source: "message_publish_time"
blocks: blocks:
msg-consume-block: msg-consume-block: