mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
removed all ActivityMetrics constructions
This commit is contained in:
@@ -57,7 +57,7 @@ public class AmqpAdapterMetrics {
|
||||
public void initS4JAdapterInstrumentation() {
|
||||
// Histogram metrics
|
||||
messageSizeHistogram =
|
||||
ActivityMetrics.histogram(this.amqpBaseOpDispenser,
|
||||
amqpBaseOpDispenser.create().histogram(
|
||||
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
|
||||
// Timer metrics
|
||||
@@ -70,14 +70,14 @@ public class AmqpAdapterMetrics {
|
||||
// End-to-end metrics
|
||||
// Latency
|
||||
e2eMsgProcLatencyHistogram =
|
||||
ActivityMetrics.histogram(this.amqpBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
amqpBaseOpDispenser.create().histogram("e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
// Error metrics
|
||||
msgErrOutOfSeqCounter =
|
||||
ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_oos");
|
||||
amqpBaseOpDispenser.create().counter("err_msg_oos");
|
||||
msgErrLossCounter =
|
||||
ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_loss");
|
||||
amqpBaseOpDispenser.create().counter("err_msg_loss");
|
||||
msgErrDuplicateCounter =
|
||||
ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_dup");
|
||||
amqpBaseOpDispenser.create().counter("err_msg_dup");
|
||||
}
|
||||
|
||||
public Timer getBindTimer() { return bindTimer; }
|
||||
|
||||
@@ -57,9 +57,9 @@ public abstract class Cqld4BaseOpDispenser extends BaseOpDispenser<Cqld4CqlOp, C
|
||||
this.maxpages = op.getStaticConfigOr("maxpages", 1);
|
||||
this.isRetryReplace = op.getStaticConfigOr("retryreplace", false);
|
||||
this.maxLwtRetries = op.getStaticConfigOr("maxlwtretries", 1);
|
||||
this.rowsHistogram = ActivityMetrics.histogram(op, "rows", op.getStaticConfigOr("hdr_digits", 3));
|
||||
this.pagesHistogram = ActivityMetrics.histogram(op, "pages", op.getStaticConfigOr("hdr_digits", 3));
|
||||
this.payloadBytesHistogram = ActivityMetrics.histogram(op, "payload_bytes", op.getStaticConfigOr("hdr_digits", 3));
|
||||
this.rowsHistogram = create().histogram("rows", op.getStaticConfigOr("hdr_digits", 3));
|
||||
this.pagesHistogram = create().histogram("pages", op.getStaticConfigOr("hdr_digits", 3));
|
||||
this.payloadBytesHistogram = create().histogram("payload_bytes", op.getStaticConfigOr("hdr_digits", 3));
|
||||
}
|
||||
|
||||
public int getMaxPages() {
|
||||
|
||||
@@ -136,7 +136,7 @@ public class DiagTask_gauge extends BaseDiagTask implements Gauge<Double> {
|
||||
}
|
||||
|
||||
logger.info("Registering gauge for diag task with labels:" + getParentLabels().getLabels() + " label:" + label);
|
||||
this.gauge=ActivityMetrics.gauge(this, label, this);
|
||||
this.gauge=parent.create().gauge(label,() -> this.sampleValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -40,8 +40,8 @@ import java.util.function.Function;
|
||||
@Service(value = DriverAdapter.class, selector = "http")
|
||||
public class HttpDriverAdapter extends BaseDriverAdapter<HttpOp, HttpSpace> {
|
||||
|
||||
public HttpDriverAdapter(NBComponent parentComponent) {
|
||||
super(parentComponent);
|
||||
public HttpDriverAdapter(NBComponent parent) {
|
||||
super(parent);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -53,7 +53,7 @@ public class HttpDriverAdapter extends BaseDriverAdapter<HttpOp, HttpSpace> {
|
||||
|
||||
@Override
|
||||
public Function<String, ? extends HttpSpace> getSpaceInitializer(NBConfiguration cfg) {
|
||||
return spaceName -> new HttpSpace(spaceName, cfg);
|
||||
return spaceName -> new HttpSpace(getParent(), spaceName, cfg);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -20,14 +20,17 @@ import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
|
||||
public class HttpMetrics implements NBLabeledElement {
|
||||
private final NBComponent parent;
|
||||
private final HttpSpace space;
|
||||
final Histogram statusCodeHistogram;
|
||||
|
||||
public HttpMetrics(HttpSpace space) {
|
||||
public HttpMetrics(NBComponent parent, HttpSpace space) {
|
||||
this.parent = parent;
|
||||
this.space = space;
|
||||
statusCodeHistogram = ActivityMetrics.histogram(this, "statuscode",space.getHdrDigits());
|
||||
statusCodeHistogram = parent.create().histogram("statuscode",space.getHdrDigits());
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
|
||||
@@ -22,6 +22,7 @@ import io.nosqlbench.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.api.config.standard.Param;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@@ -38,6 +39,7 @@ import java.util.Locale;
|
||||
public class HttpSpace implements NBLabeledElement {
|
||||
private final static Logger logger = LogManager.getLogger(HttpSpace.class);
|
||||
|
||||
private final NBComponent parent;
|
||||
private final String name;
|
||||
private final NBConfiguration cfg;
|
||||
private HttpConsoleFormats console;
|
||||
@@ -50,7 +52,8 @@ public class HttpSpace implements NBLabeledElement {
|
||||
private boolean diagnosticsEnabled;
|
||||
|
||||
|
||||
public HttpSpace(String spaceName, NBConfiguration cfg) {
|
||||
public HttpSpace(NBComponent parent, String spaceName, NBConfiguration cfg) {
|
||||
this.parent = parent;
|
||||
this.name = spaceName;
|
||||
this.cfg = cfg;
|
||||
applyConfig(cfg);
|
||||
@@ -76,7 +79,7 @@ public class HttpSpace implements NBLabeledElement {
|
||||
);
|
||||
this.timeout = Duration.ofMillis(cfg.get("timeout", long.class));
|
||||
this.timeoutMillis = cfg.get("timeout", long.class);
|
||||
this.httpMetrics = new HttpMetrics(this);
|
||||
this.httpMetrics = new HttpMetrics(parent, this);
|
||||
|
||||
this.console = cfg.getOptional("diag").map(s -> HttpConsoleFormats.apply(s, this.console))
|
||||
.orElseGet(() -> HttpConsoleFormats.apply(null,null));
|
||||
|
||||
@@ -56,8 +56,7 @@ public class KafkaAdapterMetrics {
|
||||
|
||||
public void initS4JAdapterInstrumentation() {
|
||||
// Histogram metrics
|
||||
messageSizeHistogram =
|
||||
ActivityMetrics.histogram(this.kafkaBaseOpDispenser,
|
||||
messageSizeHistogram = kafkaBaseOpDispenser.create().histogram(
|
||||
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
|
||||
// Timer metrics
|
||||
@@ -71,14 +70,14 @@ public class KafkaAdapterMetrics {
|
||||
// End-to-end metrics
|
||||
// Latency
|
||||
e2eMsgProcLatencyHistogram =
|
||||
ActivityMetrics.histogram(this.kafkaBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
kafkaBaseOpDispenser.create().histogram("e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
// Error metrics
|
||||
msgErrOutOfSeqCounter =
|
||||
ActivityMetrics.counter(this.kafkaBaseOpDispenser, "err_msg_oos");
|
||||
kafkaBaseOpDispenser.create().counter("err_msg_oos");
|
||||
msgErrLossCounter =
|
||||
ActivityMetrics.counter(this.kafkaBaseOpDispenser, "err_msg_loss");
|
||||
kafkaBaseOpDispenser.create().counter("err_msg_loss");
|
||||
msgErrDuplicateCounter =
|
||||
ActivityMetrics.counter(this.kafkaBaseOpDispenser, "err_msg_dup");
|
||||
kafkaBaseOpDispenser.create().counter( "err_msg_dup");
|
||||
}
|
||||
|
||||
public Timer getBindTimer() { return bindTimer; }
|
||||
|
||||
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.ProducerStats;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class PulsarAdapterMetrics {
|
||||
|
||||
@@ -67,20 +68,22 @@ public class PulsarAdapterMetrics {
|
||||
public void initPulsarAdapterInstrumentation() {
|
||||
// Counter metrics
|
||||
msgErrOutOfSeqCounter =
|
||||
ActivityMetrics.counter(this.pulsarBaseOpDispenser,"err_msg_oos");
|
||||
pulsarBaseOpDispenser.create().counter("err_msg_oos");
|
||||
msgErrLossCounter =
|
||||
ActivityMetrics.counter(this.pulsarBaseOpDispenser, "err_msg_loss");
|
||||
pulsarBaseOpDispenser.create().counter("err_msg_loss");
|
||||
msgErrDuplicateCounter =
|
||||
ActivityMetrics.counter(this.pulsarBaseOpDispenser, "err_msg_dup");
|
||||
pulsarBaseOpDispenser.create().counter("err_msg_dup");
|
||||
|
||||
// Histogram metrics
|
||||
messageSizeHistogram =
|
||||
ActivityMetrics.histogram(this.pulsarBaseOpDispenser,
|
||||
pulsarBaseOpDispenser.create().histogram(
|
||||
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(this.pulsarBaseOpDispenser,
|
||||
"e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
payloadRttHistogram = ActivityMetrics.histogram(this.pulsarBaseOpDispenser,
|
||||
"payload_rtt", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
e2eMsgProcLatencyHistogram =
|
||||
pulsarBaseOpDispenser.create().histogram(
|
||||
"e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
payloadRttHistogram =
|
||||
pulsarBaseOpDispenser.create().histogram(
|
||||
"payload_rtt", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
|
||||
// Timer metrics
|
||||
bindTimer =
|
||||
@@ -96,23 +99,52 @@ public class PulsarAdapterMetrics {
|
||||
"commit_transaction", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
}
|
||||
|
||||
public Counter getMsgErrOutOfSeqCounter() { return msgErrOutOfSeqCounter; }
|
||||
public Counter getMsgErrLossCounter() { return msgErrLossCounter; }
|
||||
public Counter getMsgErrDuplicateCounter() { return msgErrDuplicateCounter; }
|
||||
public Histogram getMessageSizeHistogram() { return messageSizeHistogram; }
|
||||
public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; }
|
||||
public Histogram getPayloadRttHistogram() { return this.payloadRttHistogram; }
|
||||
public Timer getBindTimer() { return this.bindTimer; }
|
||||
public Timer getExecuteTimer() { return this.executeTimer; }
|
||||
public Timer getCreateTransactionTimer() { return this.createTransactionTimer; }
|
||||
public Timer getCommitTransactionTimer() { return this.commitTransactionTimer; }
|
||||
public Counter getMsgErrOutOfSeqCounter() {
|
||||
return msgErrOutOfSeqCounter;
|
||||
}
|
||||
|
||||
public Counter getMsgErrLossCounter() {
|
||||
return msgErrLossCounter;
|
||||
}
|
||||
|
||||
public Counter getMsgErrDuplicateCounter() {
|
||||
return msgErrDuplicateCounter;
|
||||
}
|
||||
|
||||
public Histogram getMessageSizeHistogram() {
|
||||
return messageSizeHistogram;
|
||||
}
|
||||
|
||||
public Histogram getE2eMsgProcLatencyHistogram() {
|
||||
return e2eMsgProcLatencyHistogram;
|
||||
}
|
||||
|
||||
public Histogram getPayloadRttHistogram() {
|
||||
return this.payloadRttHistogram;
|
||||
}
|
||||
|
||||
public Timer getBindTimer() {
|
||||
return this.bindTimer;
|
||||
}
|
||||
|
||||
public Timer getExecuteTimer() {
|
||||
return this.executeTimer;
|
||||
}
|
||||
|
||||
public Timer getCreateTransactionTimer() {
|
||||
return this.createTransactionTimer;
|
||||
}
|
||||
|
||||
public Timer getCommitTransactionTimer() {
|
||||
return this.commitTransactionTimer;
|
||||
}
|
||||
|
||||
|
||||
//////////////////////////////////////
|
||||
// Pulsar client producer API metrics
|
||||
//////////////////////////////////////
|
||||
//
|
||||
private static class ProducerGaugeImpl implements Gauge<Double> {
|
||||
private static class ProducerGaugeImpl implements Supplier<Double> {
|
||||
private final Producer<?> producer;
|
||||
private final Function<ProducerStats, Double> valueExtractor;
|
||||
|
||||
@@ -122,32 +154,27 @@ public class PulsarAdapterMetrics {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getValue() {
|
||||
public Double get() {
|
||||
// see Pulsar bug https://github.com/apache/pulsar/issues/10100
|
||||
// we need to synchronize on producer otherwise we could receive corrupted data
|
||||
synchronized(this.producer) {
|
||||
synchronized (this.producer) {
|
||||
return this.valueExtractor.apply(this.producer.getStats());
|
||||
}
|
||||
}
|
||||
}
|
||||
private static Gauge<Double> producerSafeExtractMetric(final Producer<?> producer, final Function<ProducerStats, Double> valueExtractor) {
|
||||
|
||||
private static Supplier<Double> producerSafeExtractMetric(final Producer<?> producer, final Function<ProducerStats, Double> valueExtractor) {
|
||||
return new ProducerGaugeImpl(producer, valueExtractor);
|
||||
}
|
||||
|
||||
public void registerProducerApiMetrics(final Producer<?> producer) {
|
||||
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_bytes_sent",
|
||||
PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalBytesSent() + s.getNumBytesSent()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_msg_sent",
|
||||
PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalMsgsSent() + s.getNumMsgsSent()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_send_failed",
|
||||
PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalSendFailed() + s.getNumSendFailed()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_ack_received",
|
||||
PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalAcksReceived() + s.getNumAcksReceived()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "send_bytes_rate",
|
||||
PulsarAdapterMetrics.producerSafeExtractMetric(producer, ProducerStats::getSendBytesRate));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "send_msg_rate",
|
||||
PulsarAdapterMetrics.producerSafeExtractMetric(producer, ProducerStats::getSendMsgsRate));
|
||||
pulsarBaseOpDispenser.create().gauge("total_bytes_sent", PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalBytesSent() + s.getNumBytesSent()));
|
||||
pulsarBaseOpDispenser.create().gauge("total_msg_sent", PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalMsgsSent() + s.getNumMsgsSent()));
|
||||
pulsarBaseOpDispenser.create().gauge("total_send_failed", PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalSendFailed() + s.getNumSendFailed()));
|
||||
pulsarBaseOpDispenser.create().gauge("total_ack_received", PulsarAdapterMetrics.producerSafeExtractMetric(producer, s -> (double) s.getTotalAcksReceived() + s.getNumAcksReceived()));
|
||||
pulsarBaseOpDispenser.create().gauge("send_bytes_rate", PulsarAdapterMetrics.producerSafeExtractMetric(producer, ProducerStats::getSendBytesRate));
|
||||
pulsarBaseOpDispenser.create().gauge("send_msg_rate", PulsarAdapterMetrics.producerSafeExtractMetric(producer, ProducerStats::getSendMsgsRate));
|
||||
}
|
||||
|
||||
|
||||
@@ -155,7 +182,7 @@ public class PulsarAdapterMetrics {
|
||||
// Pulsar client consumer API metrics
|
||||
//////////////////////////////////////
|
||||
//
|
||||
private static class ConsumerGaugeImpl implements Gauge<Double> {
|
||||
private static class ConsumerGaugeImpl implements Supplier<Double> {
|
||||
private final Consumer<?> consumer;
|
||||
private final Function<ConsumerStats, Double> valueExtractor;
|
||||
|
||||
@@ -165,32 +192,33 @@ public class PulsarAdapterMetrics {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getValue() {
|
||||
public Double get() {
|
||||
// see Pulsar bug https://github.com/apache/pulsar/issues/10100
|
||||
// - this is a bug report for producer stats.
|
||||
// - assume this also applies to consumer stats.
|
||||
synchronized(this.consumer) {
|
||||
synchronized (this.consumer) {
|
||||
return this.valueExtractor.apply(this.consumer.getStats());
|
||||
}
|
||||
}
|
||||
}
|
||||
static Gauge<Double> consumerSafeExtractMetric(final Consumer<?> consumer, final Function<ConsumerStats, Double> valueExtractor) {
|
||||
|
||||
static Supplier<Double> consumerSafeExtractMetric(final Consumer<?> consumer, final Function<ConsumerStats, Double> valueExtractor) {
|
||||
return new ConsumerGaugeImpl(consumer, valueExtractor);
|
||||
}
|
||||
|
||||
public void registerConsumerApiMetrics(final Consumer<?> consumer, final String pulsarApiMetricsPrefix) {
|
||||
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_bytes_recv",
|
||||
pulsarBaseOpDispenser.create().gauge("total_bytes_recv",
|
||||
PulsarAdapterMetrics.consumerSafeExtractMetric(consumer, s -> (double) s.getTotalBytesReceived() + s.getNumBytesReceived()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_msg_recv",
|
||||
PulsarAdapterMetrics.consumerSafeExtractMetric(consumer, s -> (double)s.getTotalMsgsReceived() + s.getNumMsgsReceived()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_recv_failed",
|
||||
pulsarBaseOpDispenser.create().gauge("total_msg_recv",
|
||||
PulsarAdapterMetrics.consumerSafeExtractMetric(consumer, s -> (double) s.getTotalMsgsReceived() + s.getNumMsgsReceived()));
|
||||
pulsarBaseOpDispenser.create().gauge("total_recv_failed",
|
||||
PulsarAdapterMetrics.consumerSafeExtractMetric(consumer, s -> (double) s.getTotalReceivedFailed() + s.getNumReceiveFailed()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "total_acks_sent",
|
||||
pulsarBaseOpDispenser.create().gauge("total_acks_sent",
|
||||
PulsarAdapterMetrics.consumerSafeExtractMetric(consumer, s -> (double) s.getTotalAcksSent() + s.getNumAcksSent()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "recv_bytes_rate",
|
||||
pulsarBaseOpDispenser.create().gauge("recv_bytes_rate",
|
||||
PulsarAdapterMetrics.consumerSafeExtractMetric(consumer, s -> (double) s.getRateBytesReceived()));
|
||||
ActivityMetrics.gauge(this.pulsarBaseOpDispenser, "recv_msg_rate",
|
||||
pulsarBaseOpDispenser.create().gauge("recv_msg_rate",
|
||||
PulsarAdapterMetrics.consumerSafeExtractMetric(consumer, s -> (double) s.getRateMsgsReceived()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ public class S4JAdapterMetrics {
|
||||
public void initS4JAdapterInstrumentation() {
|
||||
// Histogram metrics
|
||||
this.messageSizeHistogram =
|
||||
ActivityMetrics.histogram(this.s4jBaseOpDispenser,
|
||||
s4jBaseOpDispenser.create().histogram(
|
||||
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
|
||||
|
||||
// Timer metrics
|
||||
|
||||
@@ -155,9 +155,9 @@ public class HybridRateLimiter extends NBBaseComponent implements RateLimiter {
|
||||
|
||||
|
||||
protected void init(final NBLabeledElement activityDef) {
|
||||
delayGauge = ActivityMetrics.gauge(activityDef, this.label + "_waittime", new RateLimiters.WaitTimeGauge(this));
|
||||
avgRateGauge = ActivityMetrics.gauge(activityDef, this.label + "_config_cyclerate", new RateLimiters.RateGauge(this));
|
||||
burstRateGauge = ActivityMetrics.gauge(activityDef, this.label + "_config_burstrate", new RateLimiters.BurstRateGauge(this));
|
||||
delayGauge = create().gauge( this.label + "_waittime", () -> (double)getTotalWaitTime());
|
||||
avgRateGauge = create().gauge(this.label + "_config_cyclerate", () -> getRateSpec().opsPerSec);
|
||||
burstRateGauge = create().gauge(this.label + "_config_burstrate", () -> getRateSpec().getBurstRatio() * getRateSpec().getRate());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -62,14 +62,14 @@ public class AtomicInput extends NBBaseComponent implements Input, ActivityDefOb
|
||||
super(parent);
|
||||
this.activityDef = activityDef;
|
||||
onActivityDefUpdate(activityDef);
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.cycles_min.get(), "input_cycles_first"));
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.cycles_max.get(), "input_cycles_last"));
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.cycle_value.get(), "input_cycle"));
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, this::getTotalCycles, "input_cycles_total"));
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.recycles_min.get(), "input_recycles_first"));
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.recycles_max.get(), "input_recycles_last"));
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, () -> (double) this.recycle_value.get(), "input_recycle"));
|
||||
ActivityMetrics.gauge(new NBFunctionGauge(this, this::getTotalRecycles, "input_recycles_total"));
|
||||
create().gauge("input_cycles_first",() -> (double) this.cycles_min.get());
|
||||
create().gauge("input_cycles_last",() -> (double) this.cycles_max.get());
|
||||
create().gauge("input_cycle",() -> (double) this.cycle_value.get());
|
||||
create().gauge("input_cycles_total",this::getTotalCycles);
|
||||
create().gauge("input_recycles_first",() -> (double) this.recycles_min.get());
|
||||
create().gauge("input_recycles_last",() -> (double) this.recycles_max.get());
|
||||
create().gauge("input_recycle",() -> (double) this.recycle_value.get());
|
||||
create().gauge("input_recycles_total",this::getTotalRecycles);
|
||||
}
|
||||
|
||||
private double getTotalRecycles() {
|
||||
|
||||
@@ -152,14 +152,12 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
|
||||
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
}
|
||||
|
||||
this.pendingOpsGauge = ActivityMetrics.gauge(
|
||||
new NBFunctionGauge(this,() -> this.getProgressMeter().getSummary().pending(), "ops_pending")
|
||||
);
|
||||
this.activeOpsGauge = ActivityMetrics.gauge(
|
||||
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().current(),"ops_active")
|
||||
);
|
||||
this.completeOpsGauge = ActivityMetrics.gauge(
|
||||
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().complete(),"ops_complete"));
|
||||
this.pendingOpsGauge = create().gauge(
|
||||
"ops_pending",() -> this.getProgressMeter().getSummary().pending());
|
||||
this.activeOpsGauge = create().gauge(
|
||||
"ops_active",() -> this.getProgressMeter().getSummary().current());
|
||||
this.completeOpsGauge = create().gauge(
|
||||
"ops_complete",() -> this.getProgressMeter().getSummary().complete());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -17,8 +17,8 @@
|
||||
package io.nosqlbench.engine.api.metrics;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -30,11 +30,11 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class ExceptionCountMetrics {
|
||||
private final ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
|
||||
private final Counter allerrors;
|
||||
private final NBLabeledElement parentLabels;
|
||||
private final NBComponent parent;
|
||||
|
||||
public ExceptionCountMetrics(final NBLabeledElement parentLabels) {
|
||||
this.parentLabels = parentLabels;
|
||||
this.allerrors =ActivityMetrics.counter(parentLabels, "errors_ALL");
|
||||
public ExceptionCountMetrics(final NBComponent parent) {
|
||||
this.parent = parent;
|
||||
this.allerrors=parent.create().counter( "errors_ALL");
|
||||
}
|
||||
|
||||
public void count(final String name) {
|
||||
@@ -42,7 +42,7 @@ public class ExceptionCountMetrics {
|
||||
if (null == c) synchronized (this.counters) {
|
||||
c = this.counters.computeIfAbsent(
|
||||
name,
|
||||
k -> ActivityMetrics.counter(this.parentLabels, "errors_" + name)
|
||||
k -> parent.create().counter("errors_" + name)
|
||||
);
|
||||
}
|
||||
c.inc();
|
||||
|
||||
@@ -17,22 +17,21 @@
|
||||
package io.nosqlbench.engine.api.metrics;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
|
||||
|
||||
/**
|
||||
* Use this to provide exception metering during expected result verification.
|
||||
*/
|
||||
public class ExceptionExpectedResultVerificationMetrics {
|
||||
private final NBLabeledElement parentLabels;
|
||||
private final NBComponent parent;
|
||||
private final Counter verificationErrors;
|
||||
private final Counter verificationRetries;
|
||||
|
||||
public ExceptionExpectedResultVerificationMetrics(final NBLabeledElement parentLabels) {
|
||||
this.parentLabels = parentLabels;
|
||||
verificationRetries = ActivityMetrics.counter(parentLabels, "verificationcounts_RETRIES");
|
||||
verificationErrors = ActivityMetrics.counter(parentLabels, "verificationcounts_ERRORS");
|
||||
public ExceptionExpectedResultVerificationMetrics(final NBComponent parent) {
|
||||
this.parent = parent;
|
||||
this.verificationRetries=parent.create().counter("verificationcounts_RETRIES");
|
||||
this.verificationErrors=parent.create().counter( "verificationcounts_ERRORS");
|
||||
}
|
||||
|
||||
public void countVerificationRetries() {
|
||||
|
||||
@@ -20,6 +20,7 @@ import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -33,13 +34,13 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class ExceptionHistoMetrics {
|
||||
private final ConcurrentHashMap<String, Histogram> histos = new ConcurrentHashMap<>();
|
||||
private final Histogram allerrors;
|
||||
private final NBLabeledElement parentLabels;
|
||||
private final NBComponent parent;
|
||||
private final ActivityDef activityDef;
|
||||
|
||||
public ExceptionHistoMetrics(final NBLabeledElement parentLabels, final ActivityDef activityDef) {
|
||||
this.parentLabels = parentLabels;
|
||||
public ExceptionHistoMetrics(final NBComponent parent, final ActivityDef activityDef) {
|
||||
this.parent = parent;
|
||||
this.activityDef = activityDef;
|
||||
this.allerrors = ActivityMetrics.histogram(parentLabels, "errorhistos_ALL", activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4));
|
||||
this.allerrors = parent.create().histogram( "errorhistos_ALL", activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4));
|
||||
}
|
||||
|
||||
public void update(final String name, final long magnitude) {
|
||||
@@ -47,7 +48,7 @@ public class ExceptionHistoMetrics {
|
||||
if (null == h) synchronized (this.histos) {
|
||||
h = this.histos.computeIfAbsent(
|
||||
name,
|
||||
errName -> ActivityMetrics.histogram(this.parentLabels, "errorhistos_"+errName, this.activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
|
||||
errName -> parent.create().histogram( "errorhistos_"+errName, this.activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
|
||||
);
|
||||
}
|
||||
h.update(magnitude);
|
||||
|
||||
@@ -19,6 +19,7 @@ package io.nosqlbench.engine.api.metrics;
|
||||
import com.codahale.metrics.Meter;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -30,11 +31,11 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class ExceptionMeterMetrics {
|
||||
private final ConcurrentHashMap<String, Meter> meters = new ConcurrentHashMap<>();
|
||||
private final Meter allerrors;
|
||||
private final NBLabeledElement parentLabels;
|
||||
private final NBComponent parent;
|
||||
|
||||
public ExceptionMeterMetrics(final NBLabeledElement parentLabels) {
|
||||
this.parentLabels = parentLabels;
|
||||
this.allerrors = ActivityMetrics.meter(parentLabels, "errormeters_ALL");
|
||||
public ExceptionMeterMetrics(final NBComponent parent) {
|
||||
this.parent = parent;
|
||||
this.allerrors = parent.create().meter("errormeters_ALL");
|
||||
}
|
||||
|
||||
public void mark(final String name) {
|
||||
@@ -42,7 +43,7 @@ public class ExceptionMeterMetrics {
|
||||
if (null == c) synchronized (this.meters) {
|
||||
c = this.meters.computeIfAbsent(
|
||||
name,
|
||||
k -> ActivityMetrics.meter(this.parentLabels, "errormeters_" + name)
|
||||
k -> parent.create().meter("errormeters_" + name)
|
||||
);
|
||||
}
|
||||
c.mark();
|
||||
|
||||
@@ -463,8 +463,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
|
||||
// }
|
||||
|
||||
private void registerMetrics() {
|
||||
NBMetricGauge gauge = new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads");
|
||||
this.threadsGauge = ActivityMetrics.gauge(gauge);
|
||||
this.activity.create().gauge("threads",() -> (double) this.motors.size());
|
||||
}
|
||||
|
||||
private void unregisterMetrics() {
|
||||
|
||||
@@ -39,21 +39,20 @@ public class ScriptingMetrics {
|
||||
|
||||
public ScriptingGauge newStaticGauge(final String name, final double initialValue) {
|
||||
final ScriptingGauge scriptingGauge = new ScriptingGauge(name, initialValue);
|
||||
ActivityMetrics.gauge(this.scriptContext,name, scriptingGauge);
|
||||
this.logger.info(() -> "registered scripting gauge:" + name);
|
||||
return scriptingGauge;
|
||||
throw new RuntimeException("replace me when merging");
|
||||
}
|
||||
|
||||
public DoubleSummaryGauge newSummaryGauge(final String name) {
|
||||
final DoubleSummaryGauge summaryGauge = ActivityMetrics.summaryGauge(scriptContext,name);
|
||||
this.logger.info(() -> "registered summmary gauge:" + name);
|
||||
return summaryGauge;
|
||||
throw new RuntimeException("replace me after merge");
|
||||
// this.logger.info(() -> "registered summmary gauge:" + name);
|
||||
// return summaryGauge;
|
||||
}
|
||||
|
||||
public DoubleSummaryGauge newSummaryGauge(NBLabeledElement context, final String name) {
|
||||
final DoubleSummaryGauge summaryGauge = ActivityMetrics.summaryGauge(context,name);
|
||||
this.logger.info(() -> "registered summmary gauge:" + name);
|
||||
return summaryGauge;
|
||||
throw new RuntimeException("replace me after merge");
|
||||
// final DoubleSummaryGauge summaryGauge = ActivityMetrics.summaryGauge(context,name);
|
||||
// this.logger.info(() -> "registered summmary gauge:" + name);
|
||||
// return summaryGauge;
|
||||
}
|
||||
|
||||
// public RelevancyMeasures newRelevancyMeasures(NBLabeledElement parent, Map<String,String> labels) {
|
||||
|
||||
@@ -142,65 +142,6 @@ public class ActivityMetrics {
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* <p>Create an HDR histogram associated with an activity.</p>
|
||||
*
|
||||
* <p>If the provide ActivityDef contains a parameter "hdr_digits", then it will be used to set the number of
|
||||
* significant digits on the histogram's precision.</p>
|
||||
*
|
||||
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
|
||||
* that only one of them succeeds.</p>
|
||||
*
|
||||
* @param named
|
||||
* an associated activity def
|
||||
* @param metricFamilyName
|
||||
* a simple, descriptive name for the histogram
|
||||
* @return the histogram, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Histogram histogram(NBLabeledElement labeled, String metricFamilyName, int hdrdigits) {
|
||||
final NBLabels labels = labeled.getLabels().and("name", sanitize(metricFamilyName));
|
||||
return (Histogram) register(labels, () ->
|
||||
new NBMetricHistogram(
|
||||
labels,
|
||||
new DeltaHdrHistogramReservoir(
|
||||
labels,
|
||||
hdrdigits
|
||||
)
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Create a counter associated with an activity.</p>
|
||||
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
|
||||
* that only one of them succeeds.</p>
|
||||
*
|
||||
* @param named
|
||||
* an associated activity def
|
||||
* @param name
|
||||
* a simple, descriptive name for the counter
|
||||
* @return the counter, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Counter counter(NBLabeledElement parent, String metricFamilyName) {
|
||||
final NBLabels labels = parent.getLabels().and("name", metricFamilyName);
|
||||
return (Counter) register(labels, () -> new NBMetricCounter(labels));
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Create a meter associated with an activity.</p>
|
||||
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
|
||||
* that only one of them succeeds.</p>
|
||||
*
|
||||
* @param named
|
||||
* an associated activity def
|
||||
* @param metricFamilyName
|
||||
* a simple, descriptive name for the meter
|
||||
* @return the meter, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Meter meter(NBLabeledElement parent, String metricFamilyName) {
|
||||
final NBLabels labels = parent.getLabels().and("name", sanitize(metricFamilyName));
|
||||
return (Meter) register(labels, () -> new NBMetricMeter(labels));
|
||||
}
|
||||
|
||||
private static MetricRegistry get() {
|
||||
if (null != ActivityMetrics.registry) {
|
||||
return registry;
|
||||
@@ -213,42 +154,6 @@ public class ActivityMetrics {
|
||||
return registry;
|
||||
}
|
||||
|
||||
/**
|
||||
* This variant creates a named metric for all of the stats which may be needed, name with metricname_average,
|
||||
* and so on. It uses the same data reservoir for all views, but only returns one of them as a handle to the metric.
|
||||
* This has the effect of leaving some of the metric objects unreferencable from the caller side. This may need
|
||||
* to be changed in a future update in the even that full inventory management is required on metric objects here.
|
||||
*
|
||||
* @param parent
|
||||
* The labeled element the metric pertains to
|
||||
* @param metricFamilyName
|
||||
* The name of the measurement
|
||||
* @return One of the created metrics, suitable for calling {@link DoubleSummaryGauge#accept(double)} on.
|
||||
*/
|
||||
public static DoubleSummaryGauge summaryGauge(NBLabeledElement parent, String metricFamilyName) {
|
||||
DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
|
||||
DoubleSummaryGauge anyGauge = null;
|
||||
for (DoubleSummaryGauge.Stat statName : DoubleSummaryGauge.Stat.values()) {
|
||||
final NBLabels labels = parent.getLabels()
|
||||
.and("name", sanitize(metricFamilyName))
|
||||
.modifyValue("name", n -> n + "_" + statName.name().toLowerCase());
|
||||
anyGauge = (DoubleSummaryGauge) register(labels, () -> new DoubleSummaryGauge(labels, statName, stats));
|
||||
}
|
||||
return anyGauge;
|
||||
}
|
||||
|
||||
|
||||
public static NBMetricGauge gauge(NBMetricGauge gauge) {
|
||||
final NBLabels labels = gauge.getLabels();
|
||||
return (NBMetricGauge) register(labels, () -> new NBMetricGaugeWrapper(labels, gauge));
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static Gauge<Double> gauge(NBLabeledElement parent, String metricFamilyName, Gauge<Double> gauge) {
|
||||
final NBLabels labels = parent.getLabels().and("name", sanitize(metricFamilyName));
|
||||
return (Gauge<Double>) register(labels, () -> new NBMetricGaugeWrapper(labels, gauge));
|
||||
}
|
||||
|
||||
private static MetricRegistry lookupRegistry() {
|
||||
ServiceLoader<MetricRegistryService> metricRegistryServices =
|
||||
|
||||
@@ -58,8 +58,9 @@ public class RelevancyMeasures implements NBLabeledElement {
|
||||
for (RelevancyFunction function : f) {
|
||||
this.functions.add(function);
|
||||
function.prependLabels(this);
|
||||
DoubleSummaryGauge gauge = ActivityMetrics.summaryGauge(function, function.getUniqueName());
|
||||
this.gauges.add(gauge);
|
||||
throw new RuntimeException("replace me after merge");
|
||||
// DoubleSummaryGauge gauge = ActivityMetrics.summaryGauge(function, function.getUniqueName());
|
||||
// this.gauges.add(gauge);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -16,12 +16,10 @@
|
||||
|
||||
package io.nosqlbench.components;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
|
||||
import io.nosqlbench.api.engine.metrics.DoubleSummaryGauge;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBMetricCounter;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBMetricHistogram;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBMetricTimer;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.*;
|
||||
import io.nosqlbench.api.engine.metrics.reporters.PromPushReporterComponent;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@@ -49,6 +47,14 @@ public class NBBuilders {
|
||||
return timer;
|
||||
}
|
||||
|
||||
public Meter meter(String metricFamilyName) {
|
||||
NBLabels labels = base.getLabels().and("name", metricFamilyName);
|
||||
NBMetricMeter meter = new NBMetricMeter(labels);
|
||||
base.addMetric(meter);
|
||||
return meter;
|
||||
}
|
||||
|
||||
|
||||
public NBMetricCounter counter(String metricFamilyName) {
|
||||
NBLabels labels = base.getLabels().and("name", metricFamilyName);
|
||||
NBMetricCounter counter = new NBMetricCounter(labels);
|
||||
|
||||
@@ -17,8 +17,10 @@
|
||||
package io.nosqlbench.engine.core.script;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.api.config.standard.TestComponent;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
@@ -29,8 +31,8 @@ public class MetricsIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void testHistogramLogger() {
|
||||
final NBLabeledElement labeled = NBLabeledElement.forKV("alias","foo","driver","diag","op","noop");
|
||||
final Histogram testhistogram = ActivityMetrics.histogram(labeled, "testhistogram", 3);
|
||||
NBComponent parent = new TestComponent("metricstest","metricstest","alias","foo","driver","diag","op","noop");
|
||||
final Histogram testhistogram = parent.create().histogram("testhistogram", 3);
|
||||
ActivityMetrics.addHistoLogger("testsession", ".*","testhisto.log","1s");
|
||||
testhistogram.update(400);
|
||||
testhistogram.getSnapshot();
|
||||
|
||||
Reference in New Issue
Block a user