mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge pull request #293 from eolivelli/impl/pulsar-metrics
Expose internal Pulsar Producer metrics
This commit is contained in:
commit
ca7fb7b839
@ -1,7 +1,10 @@
|
|||||||
package io.nosqlbench.driver.pulsar;
|
package io.nosqlbench.driver.pulsar;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Gauge;
|
||||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||||
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
|
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||||
|
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||||
import org.apache.commons.collections.CollectionUtils;
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
@ -17,6 +20,7 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.function.Function;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.regex.PatternSyntaxException;
|
import java.util.regex.PatternSyntaxException;
|
||||||
|
|
||||||
@ -43,17 +47,20 @@ public class PulsarSpace {
|
|||||||
|
|
||||||
private PulsarClient pulsarClient = null;
|
private PulsarClient pulsarClient = null;
|
||||||
private Schema<?> pulsarSchema = null;
|
private Schema<?> pulsarSchema = null;
|
||||||
|
private final ActivityDef activityDef;
|
||||||
|
|
||||||
public PulsarSpace(String name,
|
public PulsarSpace(String name,
|
||||||
PulsarNBClientConf pulsarClientConf,
|
PulsarNBClientConf pulsarClientConf,
|
||||||
String pulsarSvcUrl,
|
String pulsarSvcUrl,
|
||||||
String webSvcUrl,
|
String webSvcUrl,
|
||||||
PulsarAdmin pulsarAdmin) {
|
PulsarAdmin pulsarAdmin,
|
||||||
|
ActivityDef activityDef) {
|
||||||
this.spaceName = name;
|
this.spaceName = name;
|
||||||
this.pulsarNBClientConf = pulsarClientConf;
|
this.pulsarNBClientConf = pulsarClientConf;
|
||||||
this.pulsarSvcUrl = pulsarSvcUrl;
|
this.pulsarSvcUrl = pulsarSvcUrl;
|
||||||
this.webSvcUrl = webSvcUrl;
|
this.webSvcUrl = webSvcUrl;
|
||||||
this.pulsarAdmin = pulsarAdmin;
|
this.pulsarAdmin = pulsarAdmin;
|
||||||
|
this.activityDef = activityDef;
|
||||||
|
|
||||||
createPulsarClientFromConf();
|
createPulsarClientFromConf();
|
||||||
createPulsarSchemaFromConf();
|
createPulsarSchemaFromConf();
|
||||||
@ -169,29 +176,73 @@ public class PulsarSpace {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String encodedStr = PulsarActivityUtil.encode(producerName, topicName);
|
String encodedStr = PulsarActivityUtil.encode(producerName, topicName);
|
||||||
Producer<?> producer = producers.get(encodedStr);
|
Producer<?> producer = producers.computeIfAbsent(encodedStr, (pn -> {
|
||||||
|
|
||||||
|
|
||||||
if (producer == null) {
|
|
||||||
PulsarClient pulsarClient = getPulsarClient();
|
PulsarClient pulsarClient = getPulsarClient();
|
||||||
|
|
||||||
// Get other possible producer settings that are set at global level
|
// Get other possible producer settings that are set at global level
|
||||||
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
|
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
|
||||||
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
|
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
|
||||||
|
String producerMetricsPrefix;
|
||||||
if (!StringUtils.isBlank(producerName)) {
|
if (!StringUtils.isBlank(producerName)) {
|
||||||
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
|
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
|
||||||
|
producerMetricsPrefix = producerName + "_";
|
||||||
|
} else {
|
||||||
|
// we want a meaningful name for the producer
|
||||||
|
// we are not appending the topic name
|
||||||
|
producerMetricsPrefix = "producer" + producers.size() + "_" ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
producerMetricsPrefix += topicName + "_";
|
||||||
|
|
||||||
|
producerMetricsPrefix = producerMetricsPrefix
|
||||||
|
.replace("persistent://public/default/", "") // default name for tests/demos (in all Pulsar examples) is persistent://public/default/test -> use just the topic name test
|
||||||
|
.replace("non-persistent://", "") // always remove topic type
|
||||||
|
.replace("persistent://", "")
|
||||||
|
.replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname
|
||||||
|
|
||||||
try {
|
try {
|
||||||
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
|
Producer<?> newProducer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
|
||||||
|
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent",safeExtractMetric(newProducer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
|
||||||
|
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(newProducer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
|
||||||
|
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalsendfailed", safeExtractMetric(newProducer, (s -> s.getTotalSendFailed() + s.getNumSendFailed())));
|
||||||
|
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalacksreceived", safeExtractMetric(newProducer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived())));
|
||||||
|
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendbytesrate", safeExtractMetric(newProducer, ProducerStats::getSendBytesRate));
|
||||||
|
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendmsgsrate", safeExtractMetric(newProducer, ProducerStats::getSendMsgsRate));
|
||||||
|
return newProducer;
|
||||||
} catch (PulsarClientException ple) {
|
} catch (PulsarClientException ple) {
|
||||||
throw new RuntimeException("Unable to create a Pulsar producer!");
|
throw new RuntimeException("Unable to create a Pulsar producer!", ple);
|
||||||
}
|
}
|
||||||
|
|
||||||
producers.put(encodedStr, producer);
|
}));
|
||||||
}
|
|
||||||
|
|
||||||
return producer;
|
return producer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static Gauge<Object> safeExtractMetric(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
|
||||||
|
return new GaugeImpl(producer, valueExtractor);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class GaugeImpl implements Gauge<Object> {
|
||||||
|
private final Producer<?> producer;
|
||||||
|
private final Function<ProducerStats, Object> valueExtractor;
|
||||||
|
|
||||||
|
GaugeImpl(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
|
||||||
|
this.producer = producer;
|
||||||
|
this.valueExtractor = valueExtractor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object getValue() {
|
||||||
|
// see Pulsar bug https://github.com/apache/pulsar/issues/10100
|
||||||
|
// we need to synchronize on producer otherwise we could receive corrupted data
|
||||||
|
synchronized(producer) {
|
||||||
|
return valueExtractor.apply(producer.getStats());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//////////////////////////////////////
|
//////////////////////////////////////
|
||||||
// Producer Processing <-- end
|
// Producer Processing <-- end
|
||||||
//////////////////////////////////////
|
//////////////////////////////////////
|
||||||
|
@ -27,7 +27,8 @@ public class PulsarSpaceCache {
|
|||||||
activity.getPulsarConf(),
|
activity.getPulsarConf(),
|
||||||
activity.getPulsarSvcUrl(),
|
activity.getPulsarSvcUrl(),
|
||||||
activity.getWebSvcUrl(),
|
activity.getWebSvcUrl(),
|
||||||
activity.getPulsarAdmin()
|
activity.getPulsarAdmin(),
|
||||||
|
activity.getActivityDef()
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -127,6 +127,14 @@ public class ScenarioResult {
|
|||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
NBMetricsSummary.summarize(sb, k, v);
|
NBMetricsSummary.summarize(sb, k, v);
|
||||||
}
|
}
|
||||||
|
} else if (v instanceof Gauge) {
|
||||||
|
Object value = ((Gauge) v).getValue();
|
||||||
|
if (value != null && value instanceof Number) {
|
||||||
|
Number n = (Number) value;
|
||||||
|
if (n.doubleValue() != 0) {
|
||||||
|
NBMetricsSummary.summarize(sb, k, v);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user