Add empty e2e error metrics

This commit is contained in:
Massimiliano Mirelli 2023-02-24 15:44:29 +02:00
parent e832e5a589
commit a85ef30a76

View File

@ -15,6 +15,7 @@
*/ */
package io.nosqlbench.adapter.kafka.util; package io.nosqlbench.adapter.kafka.util;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram; import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.kafka.dispensers.KafkaBaseOpDispenser; import io.nosqlbench.adapter.kafka.dispensers.KafkaBaseOpDispenser;
@ -32,6 +33,12 @@ public class KafkaAdapterMetrics implements NBNamedElement {
private Histogram messageSizeHistogram; private Histogram messageSizeHistogram;
private Timer bindTimer; private Timer bindTimer;
private Timer executeTimer; private Timer executeTimer;
// - message out of sequence error counter
private Counter msgErrOutOfSeqCounter;
// - message loss counter
private Counter msgErrLossCounter;
// - message duplicate error counter
private Counter msgErrDuplicateCounter;
public Histogram getE2eMsgProcLatencyHistogram() { public Histogram getE2eMsgProcLatencyHistogram() {
return e2eMsgProcLatencyHistogram; return e2eMsgProcLatencyHistogram;
@ -70,11 +77,27 @@ public class KafkaAdapterMetrics implements NBNamedElement {
this, this,
defaultAdapterMetricsPrefix + "execute", defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS); ActivityMetrics.DEFAULT_HDRDIGITS);
// End-to-end metrics
// Latency
this.e2eMsgProcLatencyHistogram = this.e2eMsgProcLatencyHistogram =
ActivityMetrics.histogram( ActivityMetrics.histogram(
kafkaBaseOpDispenser, kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "e2e_msg_latency", defaultAdapterMetricsPrefix + "e2e_msg_latency",
ActivityMetrics.DEFAULT_HDRDIGITS); ActivityMetrics.DEFAULT_HDRDIGITS);
// Error metrics
this.msgErrOutOfSeqCounter =
ActivityMetrics.counter(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_oos");
this.msgErrLossCounter =
ActivityMetrics.counter(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_loss");
this.msgErrDuplicateCounter =
ActivityMetrics.counter(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_dup");
} }
public Timer getBindTimer() { return bindTimer; } public Timer getBindTimer() { return bindTimer; }