From e70952e50185b49a983bb8d4d8a4e9f22d6f3111 Mon Sep 17 00:00:00 2001 From: Massimiliano Mirelli Date: Thu, 23 Feb 2023 17:31:36 +0200 Subject: [PATCH] Add e2e message publish metrics to kafka consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The metrics are computed using the kafka client API record.timestamp()ˆ --- .../MessageConsumerOpDispenser.java | 14 +++++++- .../kafka/ops/OpTimeTrackKafkaConsumer.java | 32 ++++++++++++++++--- .../util/EndToEndStartingTimeSource.java | 23 +++++++++++++ .../kafka/util/KafkaAdapterMetrics.java | 5 +++ .../adapter/kafka/util/KafkaAdapterUtil.java | 3 +- .../src/main/resources/kafka_consumer.yaml | 3 +- 6 files changed, 73 insertions(+), 7 deletions(-) create mode 100644 adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/EndToEndStartingTimeSource.java diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java index bc3074a75..ef2511602 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java @@ -21,6 +21,7 @@ import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException; import io.nosqlbench.adapter.kafka.ops.KafkaOp; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer; +import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; @@ -49,6 +50,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { // - This is only relevant when the effective setting (global level and statement level) // of "enable.auto.commit" is false protected final int maxMsgCntPerCommit; + private final LongFunction e2eStartTimeSrcParamStrFunc; protected boolean autoCommitEnabled; @@ -76,6 +78,8 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { this.autoCommitEnabled = BooleanUtils.toBoolean(consumerClientConfMap.get("enable.auto.commit")); } } + this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc( + KafkaAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none"); } private String getEffectiveGroupId(long cycle) { @@ -119,7 +123,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { } opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer( - kafkaSpace, asyncAPI, msgPollIntervalInSec, autoCommitEnabled, maxMsgCntPerCommit, consumer); + kafkaSpace, + asyncAPI, + msgPollIntervalInSec, + autoCommitEnabled, + maxMsgCntPerCommit, + consumer, + EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()), + kafkaAdapterMetrics + ); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); } diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java index 9638ca855..584c3a51a 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java @@ -17,7 +17,10 @@ package io.nosqlbench.adapter.kafka.ops; +import com.codahale.metrics.Histogram; import io.nosqlbench.adapter.kafka.KafkaSpace; +import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource; +import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; @@ -30,7 +33,7 @@ import java.util.Map; public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer"); - + private final EndToEndStartingTimeSource e2eStartingTimeSrc; private final int msgPoolIntervalInMs; private final boolean asyncMsgCommit; private final boolean autoCommitEnabled; @@ -40,19 +43,24 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { private final ThreadLocal manualCommitTrackingCnt = ThreadLocal.withInitial(() -> 0); private final KafkaConsumer consumer; + private Histogram e2eMsgProcLatencyHistogram; public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace, boolean asyncMsgCommit, int msgPoolIntervalInMs, boolean autoCommitEnabled, int maxMsgCntPerCommit, - KafkaConsumer consumer) { + KafkaConsumer consumer, + EndToEndStartingTimeSource e2eStartingTimeSrc, + KafkaAdapterMetrics kafkaAdapterMetrics) { super(kafkaSpace); this.msgPoolIntervalInMs = msgPoolIntervalInMs; this.asyncMsgCommit = asyncMsgCommit; this.autoCommitEnabled = autoCommitEnabled; this.maxMsgCntPerCommit = maxMsgCntPerCommit; this.consumer = consumer; + this.e2eStartingTimeSrc = e2eStartingTimeSrc; + this.e2eMsgProcLatencyHistogram = kafkaAdapterMetrics.getE2eMsgProcLatencyHistogram(); } public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); } @@ -128,7 +136,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { if (bCommitMsg) { if (!asyncMsgCommit) { consumer.commitSync(); - + updateE2ELatencyMetric(record); if (logger.isDebugEnabled()) { logger.debug( "Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})", @@ -145,6 +153,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { "Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})", cycle, maxMsgCntPerCommit); + updateE2ELatencyMetric(record); } else { logger.debug( "Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})", @@ -158,15 +167,30 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { } resetManualCommitTrackingCnt(); - } else { + } else { + updateE2ELatencyMetric(record); incManualCommitTrackingCnt(); } } + updateE2ELatencyMetric(record); } } } } + private void updateE2ELatencyMetric(ConsumerRecord record) { + long startTimeStamp = 0L; + switch (e2eStartingTimeSrc) { + case MESSAGE_PUBLISH_TIME: + startTimeStamp = record.timestamp(); + break; + } + if (startTimeStamp != 0L) { + long e2eMsgLatency = System.currentTimeMillis() - startTimeStamp; + e2eMsgProcLatencyHistogram.update(e2eMsgLatency); + } + } + @Override public void close() { try { diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/EndToEndStartingTimeSource.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/EndToEndStartingTimeSource.java new file mode 100644 index 000000000..9db6f74f8 --- /dev/null +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/EndToEndStartingTimeSource.java @@ -0,0 +1,23 @@ +package io.nosqlbench.adapter.kafka.util; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +public enum EndToEndStartingTimeSource { + NONE, // no end-to-end latency calculation + MESSAGE_PUBLISH_TIME, // use message publish timestamp +} diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java index 3c3afa08b..d6aab0565 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java @@ -32,6 +32,11 @@ public class KafkaAdapterMetrics implements NBNamedElement { private Histogram messageSizeHistogram; private Timer bindTimer; private Timer executeTimer; + + public Histogram getE2eMsgProcLatencyHistogram() { + return e2eMsgProcLatencyHistogram; + } + // end-to-end latency private Histogram e2eMsgProcLatencyHistogram; private KafkaBaseOpDispenser kafkaBaseOpDispenser; diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java index b95309b3d..a9bd1a59d 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java @@ -41,7 +41,8 @@ public class KafkaAdapterUtil { // Valid document level parameters for JMS NB yaml file public enum DOC_LEVEL_PARAMS { // Blocking message producing or consuming - ASYNC_API("async_api"); + ASYNC_API("async_api"), + E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"); public final String label; DOC_LEVEL_PARAMS(String label) { diff --git a/adapter-kafka/src/main/resources/kafka_consumer.yaml b/adapter-kafka/src/main/resources/kafka_consumer.yaml index 2d88def3e..9d1e5fb61 100644 --- a/adapter-kafka/src/main/resources/kafka_consumer.yaml +++ b/adapter-kafka/src/main/resources/kafka_consumer.yaml @@ -3,7 +3,8 @@ params: # Whether to commit message asynchronously # - default: true # - only relevant for manual commit - async_api: "true" +# async_api: "true" + e2e_starting_time_source: "message_publish_time" blocks: msg-consume-block: