From c166b479ca0bc05e1c8f71fea5f1b5b2b49a5719 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 23 Mar 2021 12:17:46 +0100 Subject: [PATCH] Add messagesize histogram --- driver-pulsar/pom.xml | 4 ++-- .../nosqlbench/driver/pulsar/PulsarActivity.java | 9 +++++++-- .../driver/pulsar/ops/PulsarProducerMapper.java | 9 +++++++-- .../driver/pulsar/ops/PulsarProducerOp.java | 14 ++++++++++---- .../driver/pulsar/ops/ReadyPulsarOp.java | 3 ++- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/driver-pulsar/pom.xml b/driver-pulsar/pom.xml index 13078519f..2d778d803 100644 --- a/driver-pulsar/pom.xml +++ b/driver-pulsar/pom.xml @@ -27,7 +27,7 @@ org.apache.pulsar - pulsar-client-original + pulsar-client ${pulsar.version} @@ -75,7 +75,7 @@ org.apache.avro avro - 1.9.1 + 1.10.1 diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index f7624a98e..fe01bad6c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -1,7 +1,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; +import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp; @@ -23,6 +23,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer bindTimer; public Timer executeTimer; public Counter bytesCounter; + public Histogram messagesizeHistogram; private PulsarSpaceCache pulsarCache; private PulsarNBClientConf clientConf; @@ -45,7 +46,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve bindTimer = ActivityMetrics.timer(activityDef, "bind"); executeTimer = ActivityMetrics.timer(activityDef, "execute"); bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); - + messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); clientConf = new PulsarNBClientConf(pulsarClntConfFile); @@ -95,4 +96,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Counter getBytesCounter() { return bytesCounter; } + + public Histogram getMessagesizeHistogram() { + return messagesizeHistogram; + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 71eab38be..0333d9b3e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; @@ -24,6 +25,7 @@ public class PulsarProducerMapper extends PulsarOpMapper { private final LongFunction keyFunc; private final LongFunction payloadFunc; private final Counter bytesCounter; + private final Histogram messagesizeHistogram; public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, @@ -31,13 +33,15 @@ public class PulsarProducerMapper extends PulsarOpMapper { LongFunction asyncApiFunc, LongFunction keyFunc, LongFunction payloadFunc, - Counter bytesCounter) { + Counter bytesCounter, + Histogram messagesizeHistogram) { super(cmdTpl, clientSpace); this.producerFunc = producerFunc; this.asyncApiFunc = asyncApiFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } @Override @@ -53,6 +57,7 @@ public class PulsarProducerMapper extends PulsarOpMapper { asyncApi, msgKey, msgPayload, - bytesCounter); + bytesCounter, + messagesizeHistogram); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 890fdf00a..537e04833 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; @@ -23,19 +24,22 @@ public class PulsarProducerOp implements PulsarOp { private final String msgPayload; private final boolean asyncPulsarOp; private final Counter bytesCounter; + private final Histogram messagesizeHistogram; public PulsarProducerOp(Producer producer, Schema schema, boolean asyncPulsarOp, String key, String payload, - Counter bytesCounter) { + Counter bytesCounter, + Histogram messagesizeHistogram) { this.producer = producer; this.pulsarSchema = schema; this.msgKey = key; this.msgPayload = payload; this.asyncPulsarOp = asyncPulsarOp; this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } @Override @@ -48,7 +52,7 @@ public class PulsarProducerOp implements PulsarOp { if ((msgKey != null) && (!msgKey.isEmpty())) { typedMessageBuilder = typedMessageBuilder.key(msgKey); } - + int messagesize; SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( @@ -58,12 +62,14 @@ public class PulsarProducerOp implements PulsarOp { ); typedMessageBuilder = typedMessageBuilder.value(payload); // TODO: add a way to calculate the message size for AVRO messages - bytesCounter.inc(msgPayload.length()); + messagesize = msgPayload.length(); } else { byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8); typedMessageBuilder = typedMessageBuilder.value(array); - bytesCounter.inc(array.length); + messagesize = array.length; } + messagesizeHistogram.update(messagesize); + bytesCounter.inc(messagesize); //TODO: add error handling with failed message production if (!asyncPulsarOp) { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 38b72cd08..8623ce307 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -183,7 +183,8 @@ public class ReadyPulsarOp implements OpDispenser { async_api_func, keyFunc, valueFunc, - pulsarActivity.getBytesCounter()); + pulsarActivity.getBytesCounter(), + pulsarActivity.getMessagesizeHistogram()); } private LongFunction resolveMsgConsume(