This commit is contained in:
Massimiliano Mirelli 2023-02-22 12:37:47 +02:00
parent 28e618413f
commit 675f1c99df
2 changed files with 19 additions and 4 deletions

View File

@ -22,11 +22,11 @@ import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -35,7 +35,7 @@ import java.util.*;
import java.util.function.LongFunction;
import java.util.function.Predicate;
public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, KafkaSpace> {
public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, KafkaSpace> implements NBNamedElement {
private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
@ -69,7 +69,7 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
this.kafkaSpace = kafkaSpace;
String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp);
this.kafkaAdapterMetrics = new KafkaAdapterMetrics(defaultMetricsPrefix);
this.kafkaAdapterMetrics = new KafkaAdapterMetrics(this, defaultMetricsPrefix);
kafkaAdapterMetrics.initS4JAdapterInstrumentation();
this.asyncAPI =
@ -132,4 +132,9 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
return stringLongFunction;
}
@Override
public String getName() {
return "MessageConsumerOpDispenser";
}
}

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.kafka.util;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.kafka.dispensers.KafkaBaseOpDispenser;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
@ -31,8 +32,12 @@ public class KafkaAdapterMetrics implements NBNamedElement {
private Histogram messageSizeHistogram;
private Timer bindTimer;
private Timer executeTimer;
// end-to-end latency
private Histogram e2eMsgProcLatencyHistogram;
private KafkaBaseOpDispenser kafkaBaseOpDispenser;
public KafkaAdapterMetrics(String defaultMetricsPrefix) {
public KafkaAdapterMetrics(KafkaBaseOpDispenser kafkaBaseOpDispenser, String defaultMetricsPrefix) {
this.kafkaBaseOpDispenser = kafkaBaseOpDispenser;
this.defaultAdapterMetricsPrefix = defaultMetricsPrefix;
}
@ -60,6 +65,11 @@ public class KafkaAdapterMetrics implements NBNamedElement {
this,
defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.e2eMsgProcLatencyHistogram =
ActivityMetrics.histogram(
kafkaBaseOpDispenser,
defaultAdapterMetricsPrefix + "e2e_msg_latency",
ActivityMetrics.DEFAULT_HDRDIGITS);
}
public Timer getBindTimer() { return bindTimer; }