Merge pull request #286 from eolivelli/pulsar-bytes-counter

Pulsar: add bytes counters
This commit is contained in:
Jonathan Shook 2021-03-23 10:44:56 -05:00 committed by GitHub
commit dcbea0eb5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 76 additions and 15 deletions

View File

@ -1,5 +1,7 @@
package io.nosqlbench.driver.pulsar; package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp; import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp;
@ -20,6 +22,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer bindTimer; public Timer bindTimer;
public Timer executeTimer; public Timer executeTimer;
public Counter bytesCounter;
public Histogram messagesizeHistogram;
private PulsarSpaceCache pulsarCache; private PulsarSpaceCache pulsarCache;
private PulsarNBClientConf clientConf; private PulsarNBClientConf clientConf;
@ -41,7 +45,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
bindTimer = ActivityMetrics.timer(activityDef, "bind"); bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute"); executeTimer = ActivityMetrics.timer(activityDef, "execute");
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile); clientConf = new PulsarNBClientConf(pulsarClntConfFile);
@ -49,7 +54,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
pulsarCache = new PulsarSpaceCache(this); pulsarCache = new PulsarSpaceCache(this);
this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache)); this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this));
setDefaultsFromOpSequence(sequencer); setDefaultsFromOpSequence(sequencer);
onActivityDefUpdate(activityDef); onActivityDefUpdate(activityDef);
@ -87,4 +92,12 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer getExecuteTimer() { public Timer getExecuteTimer() {
return this.executeTimer; return this.executeTimer;
} }
public Counter getBytesCounter() {
return bytesCounter;
}
public Histogram getMessagesizeHistogram() {
return messagesizeHistogram;
}
} }

View File

@ -1,5 +1,7 @@
package io.nosqlbench.driver.pulsar.ops; package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate; import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
@ -20,14 +22,20 @@ import java.util.function.LongFunction;
public class PulsarConsumerMapper extends PulsarOpMapper { public class PulsarConsumerMapper extends PulsarOpMapper {
private final LongFunction<Consumer<?>> consumerFunc; private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<Boolean> asyncApiFunc; private final LongFunction<Boolean> asyncApiFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarConsumerMapper(CommandTemplate cmdTpl, public PulsarConsumerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace, PulsarSpace clientSpace,
LongFunction<Consumer<?>> consumerFunc, LongFunction<Consumer<?>> consumerFunc,
LongFunction<Boolean> asyncApiFunc) { LongFunction<Boolean> asyncApiFunc,
Counter bytesCounter,
Histogram messagesizeHistogram) {
super(cmdTpl, clientSpace); super(cmdTpl, clientSpace);
this.consumerFunc = consumerFunc; this.consumerFunc = consumerFunc;
this.asyncApiFunc = asyncApiFunc; this.asyncApiFunc = asyncApiFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
} }
@Override @Override
@ -39,7 +47,9 @@ public class PulsarConsumerMapper extends PulsarOpMapper {
consumer, consumer,
clientSpace.getPulsarSchema(), clientSpace.getPulsarSchema(),
asyncApi, asyncApi,
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds() clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
bytesCounter,
messagesizeHistogram
); );
} }
} }

View File

@ -1,5 +1,7 @@
package io.nosqlbench.driver.pulsar.ops; package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -18,12 +20,18 @@ public class PulsarConsumerOp implements PulsarOp {
private final Schema<?> pulsarSchema; private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp; private final boolean asyncPulsarOp;
private final int timeoutSeconds; private final int timeoutSeconds;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds) { public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds,
Counter bytesCounter,
Histogram messagesizeHistogram) {
this.consumer = consumer; this.consumer = consumer;
this.pulsarSchema = schema; this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp; this.asyncPulsarOp = asyncPulsarOp;
this.timeoutSeconds = timeoutSeconds; this.timeoutSeconds = timeoutSeconds;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
} }
public void syncConsume() { public void syncConsume() {
@ -54,7 +62,9 @@ public class PulsarConsumerOp implements PulsarOp {
logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData())); logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData()));
} }
} }
int messagesize = message.getData().length;
bytesCounter.inc(messagesize);
messagesizeHistogram.update(messagesize);
consumer.acknowledge(message.getMessageId()); consumer.acknowledge(message.getMessageId());
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -1,5 +1,7 @@
package io.nosqlbench.driver.pulsar.ops; package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate; import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
@ -22,18 +24,24 @@ public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<Boolean> asyncApiFunc; private final LongFunction<Boolean> asyncApiFunc;
private final LongFunction<String> keyFunc; private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc; private final LongFunction<String> payloadFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarProducerMapper(CommandTemplate cmdTpl, public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace, PulsarSpace clientSpace,
LongFunction<Producer<?>> producerFunc, LongFunction<Producer<?>> producerFunc,
LongFunction<Boolean> asyncApiFunc, LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc, LongFunction<String> keyFunc,
LongFunction<String> payloadFunc) { LongFunction<String> payloadFunc,
Counter bytesCounter,
Histogram messagesizeHistogram) {
super(cmdTpl, clientSpace); super(cmdTpl, clientSpace);
this.producerFunc = producerFunc; this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc; this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc; this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc; this.payloadFunc = payloadFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
} }
@Override @Override
@ -48,6 +56,8 @@ public class PulsarProducerMapper extends PulsarOpMapper {
clientSpace.getPulsarSchema(), clientSpace.getPulsarSchema(),
asyncApi, asyncApi,
msgKey, msgKey,
msgPayload); msgPayload,
bytesCounter,
messagesizeHistogram);
} }
} }

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar.ops; package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarAction; import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
@ -22,17 +23,23 @@ public class PulsarProducerOp implements PulsarOp {
private final String msgKey; private final String msgKey;
private final String msgPayload; private final String msgPayload;
private final boolean asyncPulsarOp; private final boolean asyncPulsarOp;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarProducerOp(Producer<?> producer, public PulsarProducerOp(Producer<?> producer,
Schema<?> schema, Schema<?> schema,
boolean asyncPulsarOp, boolean asyncPulsarOp,
String key, String key,
String payload) { String payload,
Counter bytesCounter,
Histogram messagesizeHistogram) {
this.producer = producer; this.producer = producer;
this.pulsarSchema = schema; this.pulsarSchema = schema;
this.msgKey = key; this.msgKey = key;
this.msgPayload = payload; this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp; this.asyncPulsarOp = asyncPulsarOp;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
} }
@Override @Override
@ -45,7 +52,7 @@ public class PulsarProducerOp implements PulsarOp {
if ((msgKey != null) && (!msgKey.isEmpty())) { if ((msgKey != null) && (!msgKey.isEmpty())) {
typedMessageBuilder = typedMessageBuilder.key(msgKey); typedMessageBuilder = typedMessageBuilder.key(msgKey);
} }
int messagesize;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
@ -54,9 +61,15 @@ public class PulsarProducerOp implements PulsarOp {
msgPayload msgPayload
); );
typedMessageBuilder = typedMessageBuilder.value(payload); typedMessageBuilder = typedMessageBuilder.value(payload);
// TODO: add a way to calculate the message size for AVRO messages
messagesize = msgPayload.length();
} else { } else {
typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8);
typedMessageBuilder = typedMessageBuilder.value(array);
messagesize = array.length;
} }
messagesizeHistogram.update(messagesize);
bytesCounter.inc(messagesize);
//TODO: add error handling with failed message production //TODO: add error handling with failed message production
if (!asyncPulsarOp) { if (!asyncPulsarOp) {

View File

@ -20,11 +20,13 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private final CommandTemplate cmdTpl; private final CommandTemplate cmdTpl;
private final PulsarSpace clientSpace; private final PulsarSpace clientSpace;
private final LongFunction<PulsarOp> opFunc; private final LongFunction<PulsarOp> opFunc;
private final PulsarActivity pulsarActivity;
// TODO: Add docs for the command template with respect to the OpTemplate // TODO: Add docs for the command template with respect to the OpTemplate
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) { public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache, PulsarActivity pulsarActivity) {
// TODO: Consider parsing map structures into equivalent binding representation // TODO: Consider parsing map structures into equivalent binding representation
this.pulsarActivity = pulsarActivity;
this.opTpl = opTemplate; this.opTpl = opTemplate;
this.cmdTpl = new CommandTemplate(opTemplate); this.cmdTpl = new CommandTemplate(opTemplate);
@ -180,7 +182,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
producerFunc, producerFunc,
async_api_func, async_api_func,
keyFunc, keyFunc,
valueFunc); valueFunc,
pulsarActivity.getBytesCounter(),
pulsarActivity.getMessagesizeHistogram());
} }
private LongFunction<PulsarOp> resolveMsgConsume( private LongFunction<PulsarOp> resolveMsgConsume(
@ -245,7 +249,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
consumer_name_func.apply(l) consumer_name_func.apply(l)
); );
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func); return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func,
pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram());
} }
private LongFunction<PulsarOp> resolveMsgRead( private LongFunction<PulsarOp> resolveMsgRead(