From a85ef30a76e47af5808ba5532a5509341322cd60 Mon Sep 17 00:00:00 2001 From: Massimiliano Mirelli Date: Fri, 24 Feb 2023 15:44:29 +0200 Subject: [PATCH] Add empty e2e error metrics --- .../kafka/util/KafkaAdapterMetrics.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java index d6aab0565..e7e7458bc 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterMetrics.java @@ -15,6 +15,7 @@ */ package io.nosqlbench.adapter.kafka.util; +import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import io.nosqlbench.adapter.kafka.dispensers.KafkaBaseOpDispenser; @@ -32,6 +33,12 @@ public class KafkaAdapterMetrics implements NBNamedElement { private Histogram messageSizeHistogram; private Timer bindTimer; private Timer executeTimer; + // - message out of sequence error counter + private Counter msgErrOutOfSeqCounter; + // - message loss counter + private Counter msgErrLossCounter; + // - message duplicate error counter + private Counter msgErrDuplicateCounter; public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; @@ -70,11 +77,27 @@ public class KafkaAdapterMetrics implements NBNamedElement { this, defaultAdapterMetricsPrefix + "execute", ActivityMetrics.DEFAULT_HDRDIGITS); + + // End-to-end metrics + // Latency this.e2eMsgProcLatencyHistogram = ActivityMetrics.histogram( kafkaBaseOpDispenser, defaultAdapterMetricsPrefix + "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS); + // Error metrics + this.msgErrOutOfSeqCounter = + ActivityMetrics.counter( + kafkaBaseOpDispenser, + defaultAdapterMetricsPrefix + "err_msg_oos"); + this.msgErrLossCounter = + ActivityMetrics.counter( + kafkaBaseOpDispenser, + defaultAdapterMetricsPrefix + "err_msg_loss"); + this.msgErrDuplicateCounter = + ActivityMetrics.counter( + kafkaBaseOpDispenser, + defaultAdapterMetricsPrefix + "err_msg_dup"); } public Timer getBindTimer() { return bindTimer; }