From d7dfafaae1fa164b86c40b7de372d0fba23531e1 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 23 Mar 2021 16:07:38 +0100 Subject: [PATCH] Add bytes counter to consumer --- .../driver/pulsar/ops/PulsarConsumerMapper.java | 14 ++++++++++++-- .../driver/pulsar/ops/PulsarConsumerOp.java | 14 ++++++++++++-- .../driver/pulsar/ops/ReadyPulsarOp.java | 3 ++- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index ffccca6b5..3e10059b0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -1,5 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Consumer; @@ -20,14 +22,20 @@ import java.util.function.LongFunction; public class PulsarConsumerMapper extends PulsarOpMapper { private final LongFunction> consumerFunc; private final LongFunction asyncApiFunc; + private final Counter bytesCounter; + private final Histogram messagesizeHistogram; public PulsarConsumerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, LongFunction> consumerFunc, - LongFunction asyncApiFunc) { + LongFunction asyncApiFunc, + Counter bytesCounter, + Histogram messagesizeHistogram) { super(cmdTpl, clientSpace); this.consumerFunc = consumerFunc; this.asyncApiFunc = asyncApiFunc; + this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } @Override @@ -39,7 +47,9 @@ public class PulsarConsumerMapper extends PulsarOpMapper { consumer, clientSpace.getPulsarSchema(), asyncApi, - clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds() + clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), + bytesCounter, + messagesizeHistogram ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 85c906944..c221787c3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -1,5 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; @@ -18,12 +20,18 @@ public class PulsarConsumerOp implements PulsarOp { private final Schema pulsarSchema; private final boolean asyncPulsarOp; private final int timeoutSeconds; + private final Counter bytesCounter; + private final Histogram messagesizeHistogram; - public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp, int timeoutSeconds) { + public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp, int timeoutSeconds, + Counter bytesCounter, + Histogram messagesizeHistogram) { this.consumer = consumer; this.pulsarSchema = schema; this.asyncPulsarOp = asyncPulsarOp; this.timeoutSeconds = timeoutSeconds; + this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } public void syncConsume() { @@ -54,7 +62,9 @@ public class PulsarConsumerOp implements PulsarOp { logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData())); } } - + int messagesize = message.getData().length; + bytesCounter.inc(messagesize); + messagesizeHistogram.update(messagesize); consumer.acknowledge(message.getMessageId()); } catch (Exception e) { throw new RuntimeException(e); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 8623ce307..6cf9ed9af 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -249,7 +249,8 @@ public class ReadyPulsarOp implements OpDispenser { consumer_name_func.apply(l) ); - return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func); + return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func, + pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram()); } private LongFunction resolveMsgRead(