From 8aff79cba41e4277425f3e35871a8be008c0ce16 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 Mar 2021 15:24:51 +0100 Subject: [PATCH 1/7] Handle correctly Pulsar Consumer configuration --- .../main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index ee58f6848..8dea929bc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -11,6 +11,7 @@ import org.apache.pulsar.client.impl.DefaultBatcherBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -323,7 +324,8 @@ public class PulsarSpace { PulsarClient pulsarClient = getPulsarClient(); // Get other possible producer settings that are set at global level - Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); + Map consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap()); + consumerConf.remove("timeout"); // Explicit topic names will take precedence over topics pattern if (!topicNames.isEmpty()) { From abbea4df7175b62e6155cecb1412b34380e8537a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 Mar 2021 17:49:32 +0100 Subject: [PATCH 2/7] Add ErrorHandler --- .../driver/pulsar/PulsarAction.java | 20 +++++++++++++++---- .../driver/pulsar/PulsarActivity.java | 4 ++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index e7da412fa..853a70ad7 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.engine.api.activityapi.core.SyncAction; +import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,6 +13,7 @@ public class PulsarAction implements SyncAction { private final static Logger logger = LogManager.getLogger(PulsarAction.class); + private static final int MAX_TRIALS = 100; private final int slot; private final PulsarActivity activity; @@ -26,6 +28,7 @@ public class PulsarAction implements SyncAction { @Override public int runCycle(long cycle) { + long start = System.nanoTime(); PulsarOp pulsarOp; try (Timer.Context ctx = activity.getBindTimer().time()) { @@ -33,17 +36,26 @@ public class PulsarAction implements SyncAction { pulsarOp = readyPulsarOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... + activity.getErrorhandler().handleError(bindException, cycle, 0); throw new RuntimeException( "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException ); } - try (Timer.Context ctx = activity.getExecuteTimer().time()) { - pulsarOp.run(); + for (int i = 0; i < MAX_TRIALS; i++) { + try (Timer.Context ctx = activity.getExecuteTimer().time()) { + pulsarOp.run(); + break; + } catch (RuntimeException err) { + ErrorDetail errorDetail = activity + .getErrorhandler() + .handleError(err, cycle, System.nanoTime() - start); + if (!errorDetail.isRetryable()) { + break; + } + } } - // TODO: add retries and use standard error handler - return 0; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index fda1f16cd..c0ff8fe7e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -59,6 +59,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve ); } + public NBErrorHandler getErrorhandler() { + return errorhandler; + } + @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); From 84a9a4bf834cefcf0b0bc407050788f5944c37b4 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 Mar 2021 18:01:30 +0100 Subject: [PATCH 3/7] make tries configurable as in HttpAction --- .../main/java/io/nosqlbench/driver/pulsar/PulsarAction.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index 853a70ad7..f5f64609a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -13,13 +13,14 @@ public class PulsarAction implements SyncAction { private final static Logger logger = LogManager.getLogger(PulsarAction.class); - private static final int MAX_TRIALS = 100; private final int slot; private final PulsarActivity activity; + int maxTries = 1; public PulsarAction(PulsarActivity activity, int slot) { this.activity = activity; this.slot = slot; + this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10); } @Override @@ -42,7 +43,7 @@ public class PulsarAction implements SyncAction { ); } - for (int i = 0; i < MAX_TRIALS; i++) { + for (int i = 0; i < maxTries; i++) { try (Timer.Context ctx = activity.getExecuteTimer().time()) { pulsarOp.run(); break; From 0ecf83f863edd343ababbcfbbe36e3b247bd1635 Mon Sep 17 00:00:00 2001 From: Jeremy Hanna Date: Thu, 18 Mar 2021 10:13:52 +1100 Subject: [PATCH 4/7] Making the hosts into a weighted string to allow for round robin and weighted host rotation with Stargate hosts. Also parameterizing the protocol (http|https) and the path prefix so that this baseline can be run against Stargate directly or against Astra. --- .../resources/activities/baselines/http-iot.yaml | 12 +++++++++--- .../activities/baselines/http-keyvalue.yaml | 12 +++++++++--- .../resources/activities/baselines/http-tabular.yaml | 12 +++++++++--- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/driver-http/src/main/resources/activities/baselines/http-iot.yaml b/driver-http/src/main/resources/activities/baselines/http-iot.yaml index f625a647b..0ebfb34d1 100644 --- a/driver-http/src/main/resources/activities/baselines/http-iot.yaml +++ b/driver-http/src/main/resources/activities/baselines/http-iot.yaml @@ -15,6 +15,12 @@ scenarios: - run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto - run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto bindings: + # To enable an optional weighted set of hosts in place of a load balancer + # Examples + # single host: stargate_host=host1 + # multiple hosts: stargate_host=host1,host2,host3 + # multiple weighted hosts: stargate_host=host1:3,host2:7 + weighted_hosts: WeightedStrings('<>') # http request id request_id: ToHashedUUID(); ToString(); @@ -62,7 +68,7 @@ blocks: tags: phase: rampup statements: - - rampup-insert: POST http://<>:<>/v2/keyspaces/<>/<> + - rampup-insert: POST <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<> Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" @@ -85,7 +91,7 @@ blocks: params: ratio: <> statements: - - main-select: GET http://<>:<>/v2/keyspaces/<>/<>?where=E[[{"machine_id":{"$eq":"{machine_id}"},"sensor_name":{"$eq":"{sensor_name}"}}]]&page-size=<> + - main-select: GET <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<>?where=E[[{"machine_id":{"$eq":"{machine_id}"},"sensor_name":{"$eq":"{sensor_name}"}}]]&page-size=<> Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" @@ -99,7 +105,7 @@ blocks: params: ratio: <> statements: - - main-write: POST http://<>:<>/v2/keyspaces/<>/<> + - main-write: POST <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<> Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" diff --git a/driver-http/src/main/resources/activities/baselines/http-keyvalue.yaml b/driver-http/src/main/resources/activities/baselines/http-keyvalue.yaml index 0e75488a6..333e24519 100644 --- a/driver-http/src/main/resources/activities/baselines/http-keyvalue.yaml +++ b/driver-http/src/main/resources/activities/baselines/http-keyvalue.yaml @@ -11,6 +11,12 @@ scenarios: - run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto - run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto bindings: + # To enable an optional weighted set of hosts in place of a load balancer + # Examples + # single host: stargate_host=host1 + # multiple hosts: stargate_host=host1,host2,host3 + # multiple weighted hosts: stargate_host=host1:3,host2:7 + weighted_hosts: WeightedStrings('<>') # http request id request_id: ToHashedUUID(); ToString(); @@ -44,7 +50,7 @@ blocks: tags: phase: rampup statements: - - rampup-insert: POST http://<>:<>/v2/keyspaces/<>/<> + - rampup-insert: POST <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<> Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" @@ -63,7 +69,7 @@ blocks: params: ratio: 5 statements: - - main-select: GET http://<>:<>/v2/keyspaces/<>/<>/{rw_key} + - main-select: GET <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<>/{rw_key} Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" @@ -77,7 +83,7 @@ blocks: params: ratio: 5 statements: - - main-write: POST http://<>:<>/v2/keyspaces/<>/<> + - main-write: POST <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<> Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" diff --git a/driver-http/src/main/resources/activities/baselines/http-tabular.yaml b/driver-http/src/main/resources/activities/baselines/http-tabular.yaml index 6e8d8dda4..6840fef99 100644 --- a/driver-http/src/main/resources/activities/baselines/http-tabular.yaml +++ b/driver-http/src/main/resources/activities/baselines/http-tabular.yaml @@ -12,6 +12,12 @@ scenarios: - run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto - run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto bindings: + # To enable an optional weighted set of hosts in place of a load balancer + # Examples + # single host: stargate_host=host1 + # multiple hosts: stargate_host=host1,host2,host3 + # multiple weighted hosts: stargate_host=host1:3,host2:7 + weighted_hosts: WeightedStrings('<>') # http request id request_id: ToHashedUUID(); ToString(); # for ramp-up and verify @@ -53,7 +59,7 @@ blocks: tags: phase: rampup statements: - - rampup-insert: POST http://<>:<>/v2/keyspaces/<>/<> + - rampup-insert: POST <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<> Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" @@ -73,7 +79,7 @@ blocks: params: ratio: 5 statements: - - main-select: GET http://<>:<>/v2/keyspaces/<>/<>/{part_read}&page-size={limit} + - main-select: GET <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<>/{part_read}&page-size={limit} Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" @@ -87,7 +93,7 @@ blocks: params: ratio: 5 statements: - - main-write: POST http://<>:<>/v2/keyspaces/<>/<> + - main-write: POST <>://{weighted_hosts}:<><>/v2/keyspaces/<>/<> Accept: "application/json" X-Cassandra-Request-Id: "{request_id}" X-Cassandra-Token: "<>" From 0f53ea63a20815c7ea3bc3deff94225ea84ba776 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Mon, 22 Mar 2021 17:49:07 +0100 Subject: [PATCH 5/7] Pulsar: add bytes counter --- driver-pulsar/pom.xml | 4 ++-- .../io/nosqlbench/driver/pulsar/PulsarActivity.java | 10 +++++++++- .../driver/pulsar/ops/PulsarProducerMapper.java | 9 +++++++-- .../driver/pulsar/ops/PulsarProducerOp.java | 13 ++++++++++--- .../nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java | 7 +++++-- 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/driver-pulsar/pom.xml b/driver-pulsar/pom.xml index 2d778d803..13078519f 100644 --- a/driver-pulsar/pom.xml +++ b/driver-pulsar/pom.xml @@ -27,7 +27,7 @@ org.apache.pulsar - pulsar-client + pulsar-client-original ${pulsar.version} @@ -75,7 +75,7 @@ org.apache.avro avro - 1.10.1 + 1.9.1 diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index c0ff8fe7e..f7624a98e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -1,5 +1,7 @@ package io.nosqlbench.driver.pulsar; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp; @@ -20,6 +22,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer bindTimer; public Timer executeTimer; + public Counter bytesCounter; private PulsarSpaceCache pulsarCache; private PulsarNBClientConf clientConf; @@ -41,6 +44,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve bindTimer = ActivityMetrics.timer(activityDef, "bind"); executeTimer = ActivityMetrics.timer(activityDef, "execute"); + bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); clientConf = new PulsarNBClientConf(pulsarClntConfFile); @@ -49,7 +53,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve pulsarCache = new PulsarSpaceCache(this); - this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache)); + this.sequencer = createOpSequence((ot) -> new ReadyPulsarOp(ot, pulsarCache, this)); setDefaultsFromOpSequence(sequencer); onActivityDefUpdate(activityDef); @@ -87,4 +91,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer getExecuteTimer() { return this.executeTimer; } + + public Counter getBytesCounter() { + return bytesCounter; + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 7212356e0..71eab38be 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import com.codahale.metrics.Counter; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; @@ -22,18 +23,21 @@ public class PulsarProducerMapper extends PulsarOpMapper { private final LongFunction asyncApiFunc; private final LongFunction keyFunc; private final LongFunction payloadFunc; + private final Counter bytesCounter; public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, LongFunction> producerFunc, LongFunction asyncApiFunc, LongFunction keyFunc, - LongFunction payloadFunc) { + LongFunction payloadFunc, + Counter bytesCounter) { super(cmdTpl, clientSpace); this.producerFunc = producerFunc; this.asyncApiFunc = asyncApiFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; + this.bytesCounter = bytesCounter; } @Override @@ -48,6 +52,7 @@ public class PulsarProducerMapper extends PulsarOpMapper { clientSpace.getPulsarSchema(), asyncApi, msgKey, - msgPayload); + msgPayload, + bytesCounter); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 07827ceb2..890fdf00a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -1,6 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; -import io.nosqlbench.driver.pulsar.PulsarAction; +import com.codahale.metrics.Counter; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; @@ -22,17 +22,20 @@ public class PulsarProducerOp implements PulsarOp { private final String msgKey; private final String msgPayload; private final boolean asyncPulsarOp; + private final Counter bytesCounter; public PulsarProducerOp(Producer producer, Schema schema, boolean asyncPulsarOp, String key, - String payload) { + String payload, + Counter bytesCounter) { this.producer = producer; this.pulsarSchema = schema; this.msgKey = key; this.msgPayload = payload; this.asyncPulsarOp = asyncPulsarOp; + this.bytesCounter = bytesCounter; } @Override @@ -54,8 +57,12 @@ public class PulsarProducerOp implements PulsarOp { msgPayload ); typedMessageBuilder = typedMessageBuilder.value(payload); + // TODO: add a way to calculate the message size for AVRO messages + bytesCounter.inc(msgPayload.length()); } else { - typedMessageBuilder = typedMessageBuilder.value(msgPayload.getBytes(StandardCharsets.UTF_8)); + byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8); + typedMessageBuilder = typedMessageBuilder.value(array); + bytesCounter.inc(array.length); } //TODO: add error handling with failed message production diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index e659ee377..38b72cd08 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -20,11 +20,13 @@ public class ReadyPulsarOp implements OpDispenser { private final CommandTemplate cmdTpl; private final PulsarSpace clientSpace; private final LongFunction opFunc; + private final PulsarActivity pulsarActivity; // TODO: Add docs for the command template with respect to the OpTemplate - public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) { + public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache, PulsarActivity pulsarActivity) { // TODO: Consider parsing map structures into equivalent binding representation + this.pulsarActivity = pulsarActivity; this.opTpl = opTemplate; this.cmdTpl = new CommandTemplate(opTemplate); @@ -180,7 +182,8 @@ public class ReadyPulsarOp implements OpDispenser { producerFunc, async_api_func, keyFunc, - valueFunc); + valueFunc, + pulsarActivity.getBytesCounter()); } private LongFunction resolveMsgConsume( From c166b479ca0bc05e1c8f71fea5f1b5b2b49a5719 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 23 Mar 2021 12:17:46 +0100 Subject: [PATCH 6/7] Add messagesize histogram --- driver-pulsar/pom.xml | 4 ++-- .../nosqlbench/driver/pulsar/PulsarActivity.java | 9 +++++++-- .../driver/pulsar/ops/PulsarProducerMapper.java | 9 +++++++-- .../driver/pulsar/ops/PulsarProducerOp.java | 14 ++++++++++---- .../driver/pulsar/ops/ReadyPulsarOp.java | 3 ++- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/driver-pulsar/pom.xml b/driver-pulsar/pom.xml index 13078519f..2d778d803 100644 --- a/driver-pulsar/pom.xml +++ b/driver-pulsar/pom.xml @@ -27,7 +27,7 @@ org.apache.pulsar - pulsar-client-original + pulsar-client ${pulsar.version} @@ -75,7 +75,7 @@ org.apache.avro avro - 1.9.1 + 1.10.1 diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index f7624a98e..fe01bad6c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -1,7 +1,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Counter; -import com.codahale.metrics.Meter; +import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.driver.pulsar.ops.ReadyPulsarOp; @@ -23,6 +23,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer bindTimer; public Timer executeTimer; public Counter bytesCounter; + public Histogram messagesizeHistogram; private PulsarSpaceCache pulsarCache; private PulsarNBClientConf clientConf; @@ -45,7 +46,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve bindTimer = ActivityMetrics.timer(activityDef, "bind"); executeTimer = ActivityMetrics.timer(activityDef, "execute"); bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); - + messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); clientConf = new PulsarNBClientConf(pulsarClntConfFile); @@ -95,4 +96,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Counter getBytesCounter() { return bytesCounter; } + + public Histogram getMessagesizeHistogram() { + return messagesizeHistogram; + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 71eab38be..0333d9b3e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; @@ -24,6 +25,7 @@ public class PulsarProducerMapper extends PulsarOpMapper { private final LongFunction keyFunc; private final LongFunction payloadFunc; private final Counter bytesCounter; + private final Histogram messagesizeHistogram; public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, @@ -31,13 +33,15 @@ public class PulsarProducerMapper extends PulsarOpMapper { LongFunction asyncApiFunc, LongFunction keyFunc, LongFunction payloadFunc, - Counter bytesCounter) { + Counter bytesCounter, + Histogram messagesizeHistogram) { super(cmdTpl, clientSpace); this.producerFunc = producerFunc; this.asyncApiFunc = asyncApiFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } @Override @@ -53,6 +57,7 @@ public class PulsarProducerMapper extends PulsarOpMapper { asyncApi, msgKey, msgPayload, - bytesCounter); + bytesCounter, + messagesizeHistogram); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 890fdf00a..537e04833 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; @@ -23,19 +24,22 @@ public class PulsarProducerOp implements PulsarOp { private final String msgPayload; private final boolean asyncPulsarOp; private final Counter bytesCounter; + private final Histogram messagesizeHistogram; public PulsarProducerOp(Producer producer, Schema schema, boolean asyncPulsarOp, String key, String payload, - Counter bytesCounter) { + Counter bytesCounter, + Histogram messagesizeHistogram) { this.producer = producer; this.pulsarSchema = schema; this.msgKey = key; this.msgPayload = payload; this.asyncPulsarOp = asyncPulsarOp; this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } @Override @@ -48,7 +52,7 @@ public class PulsarProducerOp implements PulsarOp { if ((msgKey != null) && (!msgKey.isEmpty())) { typedMessageBuilder = typedMessageBuilder.key(msgKey); } - + int messagesize; SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( @@ -58,12 +62,14 @@ public class PulsarProducerOp implements PulsarOp { ); typedMessageBuilder = typedMessageBuilder.value(payload); // TODO: add a way to calculate the message size for AVRO messages - bytesCounter.inc(msgPayload.length()); + messagesize = msgPayload.length(); } else { byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8); typedMessageBuilder = typedMessageBuilder.value(array); - bytesCounter.inc(array.length); + messagesize = array.length; } + messagesizeHistogram.update(messagesize); + bytesCounter.inc(messagesize); //TODO: add error handling with failed message production if (!asyncPulsarOp) { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 38b72cd08..8623ce307 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -183,7 +183,8 @@ public class ReadyPulsarOp implements OpDispenser { async_api_func, keyFunc, valueFunc, - pulsarActivity.getBytesCounter()); + pulsarActivity.getBytesCounter(), + pulsarActivity.getMessagesizeHistogram()); } private LongFunction resolveMsgConsume( From d7dfafaae1fa164b86c40b7de372d0fba23531e1 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Tue, 23 Mar 2021 16:07:38 +0100 Subject: [PATCH 7/7] Add bytes counter to consumer --- .../driver/pulsar/ops/PulsarConsumerMapper.java | 14 ++++++++++++-- .../driver/pulsar/ops/PulsarConsumerOp.java | 14 ++++++++++++-- .../driver/pulsar/ops/ReadyPulsarOp.java | 3 ++- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index ffccca6b5..3e10059b0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -1,5 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Consumer; @@ -20,14 +22,20 @@ import java.util.function.LongFunction; public class PulsarConsumerMapper extends PulsarOpMapper { private final LongFunction> consumerFunc; private final LongFunction asyncApiFunc; + private final Counter bytesCounter; + private final Histogram messagesizeHistogram; public PulsarConsumerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, LongFunction> consumerFunc, - LongFunction asyncApiFunc) { + LongFunction asyncApiFunc, + Counter bytesCounter, + Histogram messagesizeHistogram) { super(cmdTpl, clientSpace); this.consumerFunc = consumerFunc; this.asyncApiFunc = asyncApiFunc; + this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } @Override @@ -39,7 +47,9 @@ public class PulsarConsumerMapper extends PulsarOpMapper { consumer, clientSpace.getPulsarSchema(), asyncApi, - clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds() + clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), + bytesCounter, + messagesizeHistogram ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 85c906944..c221787c3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -1,5 +1,7 @@ package io.nosqlbench.driver.pulsar.ops; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; @@ -18,12 +20,18 @@ public class PulsarConsumerOp implements PulsarOp { private final Schema pulsarSchema; private final boolean asyncPulsarOp; private final int timeoutSeconds; + private final Counter bytesCounter; + private final Histogram messagesizeHistogram; - public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp, int timeoutSeconds) { + public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp, int timeoutSeconds, + Counter bytesCounter, + Histogram messagesizeHistogram) { this.consumer = consumer; this.pulsarSchema = schema; this.asyncPulsarOp = asyncPulsarOp; this.timeoutSeconds = timeoutSeconds; + this.bytesCounter = bytesCounter; + this.messagesizeHistogram = messagesizeHistogram; } public void syncConsume() { @@ -54,7 +62,9 @@ public class PulsarConsumerOp implements PulsarOp { logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData())); } } - + int messagesize = message.getData().length; + bytesCounter.inc(messagesize); + messagesizeHistogram.update(messagesize); consumer.acknowledge(message.getMessageId()); } catch (Exception e) { throw new RuntimeException(e); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 8623ce307..6cf9ed9af 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -249,7 +249,8 @@ public class ReadyPulsarOp implements OpDispenser { consumer_name_func.apply(l) ); - return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func); + return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func, + pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram()); } private LongFunction resolveMsgRead(