Expose interal Pulsar Producer metrics

This commit is contained in:
Enrico Olivelli 2021-03-31 11:42:47 +02:00
parent ed222e43af
commit 9e04ef36b2
3 changed files with 68 additions and 8 deletions

View File

@ -1,7 +1,10 @@
package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Gauge;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@ -17,6 +20,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@ -43,17 +47,20 @@ public class PulsarSpace {
private PulsarClient pulsarClient = null;
private Schema<?> pulsarSchema = null;
private final ActivityDef activityDef;
public PulsarSpace(String name,
PulsarNBClientConf pulsarClientConf,
String pulsarSvcUrl,
String webSvcUrl,
PulsarAdmin pulsarAdmin) {
PulsarAdmin pulsarAdmin,
ActivityDef activityDef) {
this.spaceName = name;
this.pulsarNBClientConf = pulsarClientConf;
this.pulsarSvcUrl = pulsarSvcUrl;
this.webSvcUrl = webSvcUrl;
this.pulsarAdmin = pulsarAdmin;
this.activityDef = activityDef;
createPulsarClientFromConf();
createPulsarSchemaFromConf();
@ -169,29 +176,73 @@ public class PulsarSpace {
}
String encodedStr = PulsarActivityUtil.encode(producerName, topicName);
Producer<?> producer = producers.get(encodedStr);
Producer<?> producer = producers.computeIfAbsent(encodedStr, (pn -> {
if (producer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
String producerMetricsPrefix;
if (!StringUtils.isBlank(producerName)) {
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
producerMetricsPrefix = producerName + "_";
} else {
// we want a meaningful name for the producer
// we are not appending the topic name
producerMetricsPrefix = "producer" + producers.size() + "_" ;
}
producerMetricsPrefix += topicName + "_";
producerMetricsPrefix = producerMetricsPrefix
.replace("persistent://public/default/", "") // default name for tests/demos (in all Pulsar examples) is persistent://public/default/test -> use just the topic name test
.replace("non-persistent://", "") // always remove topic type
.replace("persistent://", "")
.replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname
try {
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
Producer<?> newProducer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent",safeExtractMetric(newProducer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(newProducer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalsendfailed", safeExtractMetric(newProducer, (s -> s.getTotalSendFailed() + s.getNumSendFailed())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalacksreceived", safeExtractMetric(newProducer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendbytesrate", safeExtractMetric(newProducer, ProducerStats::getSendBytesRate));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendmsgsrate", safeExtractMetric(newProducer, ProducerStats::getSendMsgsRate));
return newProducer;
} catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a Pulsar producer!");
throw new RuntimeException("Unable to create a Pulsar producer!", ple);
}
producers.put(encodedStr, producer);
}
}));
return producer;
}
static Gauge<Object> safeExtractMetric(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
return new GaugeImpl(producer, valueExtractor);
}
private static class GaugeImpl implements Gauge<Object> {
private final Producer<?> producer;
private final Function<ProducerStats, Object> valueExtractor;
GaugeImpl(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
this.producer = producer;
this.valueExtractor = valueExtractor;
}
@Override
public Object getValue() {
// see Pulsar bug https://github.com/apache/pulsar/issues/10100
// we need to synchronize on producer otherwise we could receive corrupted data
synchronized(producer) {
return valueExtractor.apply(producer.getStats());
}
}
}
//////////////////////////////////////
// Producer Processing <-- end
//////////////////////////////////////

View File

@ -27,7 +27,8 @@ public class PulsarSpaceCache {
activity.getPulsarConf(),
activity.getPulsarSvcUrl(),
activity.getWebSvcUrl(),
activity.getPulsarAdmin()
activity.getPulsarAdmin(),
activity.getActivityDef()
));
}

View File

@ -127,6 +127,14 @@ public class ScenarioResult {
if (count > 0) {
NBMetricsSummary.summarize(sb, k, v);
}
} else if (v instanceof Gauge) {
Object value = ((Gauge) v).getValue();
if (value != null && value instanceof Number) {
Number n = (Number) value;
if (n.doubleValue() != 0) {
NBMetricsSummary.summarize(sb, k, v);
}
}
}
});