From 2fbf0b4ad3ba3ea5e1eface013651e8fe7845b44 Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Tue, 2 Nov 2021 12:15:09 -0500 Subject: [PATCH 1/3] add instrumentation scaffold for payloadRtt --- .../driver/pulsar/PulsarActivity.java | 12 +++++ .../pulsar/ops/PulsarConsumerMapper.java | 15 ++++--- .../driver/pulsar/ops/PulsarConsumerOp.java | 27 +++++++++--- .../driver/pulsar/ops/ReadyPulsarOp.java | 44 ++++++++++++++----- .../api/templating/CommandTemplate.java | 17 ++++++- 5 files changed, 92 insertions(+), 23 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index af7f64744..630810b43 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -40,6 +40,15 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve // Metrics for NB Pulsar driver milestone: https://github.com/nosqlbench/nosqlbench/milestone/11 // - end-to-end latency private Histogram e2eMsgProcLatencyHistogram; + + /** + * A histogram that tracks payload round-trip-time, based on a user-defined field in some sender + * system which can be interpreted as millisecond epoch time in the system's local time zone. + * This is paired with a field name of the same type to be extracted and reported in a meteric + * named 'payload-rtt'. + */ + private Histogram payloadRttHistogram; + // - message out of sequence error counter private Counter msgErrOutOfSeqCounter; // - message loss counter @@ -85,6 +94,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction"); e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency"); + payloadRttHistogram = ActivityMetrics.histogram(activityDef, "payload_rtt"); + msgErrOutOfSeqCounter = ActivityMetrics.counter(activityDef, "err_msg_oos"); msgErrLossCounter = ActivityMetrics.counter(activityDef, "err_msg_loss"); msgErrDuplicateCounter = ActivityMetrics.counter(activityDef, "err_msg_dup"); @@ -257,6 +268,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer getCreateTransactionTimer() { return createTransactionTimer; } public Timer getCommitTransactionTimer() { return commitTransactionTimer; } + public Histogram getPayloadRttHistogram() {return payloadRttHistogram;} public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; } public Counter getMsgErrOutOfSeqCounter() { return msgErrOutOfSeqCounter; } public Counter getMsgErrLossCounter() { return msgErrLossCounter; } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index 669bc3c74..dfa070897 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -3,13 +3,13 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; -import java.util.HashMap; -import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.transaction.Transaction; +import java.util.HashMap; +import java.util.Map; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -29,6 +29,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { private final LongFunction> consumerFunc; private final boolean e2eMsProc; + private final LongFunction payloadRttFieldFunc; public PulsarConsumerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, @@ -38,10 +39,12 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { LongFunction seqTrackingFunc, LongFunction> transactionSupplierFunc, LongFunction> consumerFunc, - boolean e2eMsgProc) { + boolean e2eMsgProc, + LongFunction payloadRttFieldFunc) { super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc); this.consumerFunc = consumerFunc; this.e2eMsProc = e2eMsgProc; + this.payloadRttFieldFunc = payloadRttFieldFunc; } @Override @@ -51,6 +54,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { boolean asyncApi = asyncApiFunc.apply(value); boolean useTransaction = useTransactionFunc.apply(value); Supplier transactionSupplier = transactionSupplierFunc.apply(value); + String payloadRttFieldFunc = this.payloadRttFieldFunc.apply(value); return new PulsarConsumerOp( pulsarActivity, @@ -62,7 +66,8 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { clientSpace.getPulsarSchema(), clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), e2eMsProc, - this::getReceivedMessageSequenceTracker); + this::getReceivedMessageSequenceTracker, + payloadRttFieldFunc); } @@ -77,7 +82,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper { pulsarActivity.getMsgErrLossCounter()); } - private ThreadLocal> receivedMessageSequenceTrackersForTopicThreadLocal = + private final ThreadLocal> receivedMessageSequenceTrackersForTopicThreadLocal = ThreadLocal.withInitial(HashMap::new); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 67eb472c5..d73dd02c9 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -4,19 +4,21 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.PulsarActivity; -import io.nosqlbench.driver.pulsar.exception.*; +import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.schema.SchemaType; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; public class PulsarConsumerOp implements PulsarOp { @@ -41,7 +43,10 @@ public class PulsarConsumerOp implements PulsarOp { // keep track of end-to-end message latency private final Histogram e2eMsgProcLatencyHistogram; + private final Function receivedMessageSequenceTrackerForTopic; + private final Histogram payloadRttHistogram; + private final String payloadRttTrackingField; public PulsarConsumerOp( PulsarActivity pulsarActivity, @@ -53,7 +58,8 @@ public class PulsarConsumerOp implements PulsarOp { Schema schema, int timeoutSeconds, boolean e2eMsgProc, - Function receivedMessageSequenceTrackerForTopic) + Function receivedMessageSequenceTrackerForTopic, + String payloadRttTrackingField) { this.pulsarActivity = pulsarActivity; @@ -72,7 +78,9 @@ public class PulsarConsumerOp implements PulsarOp { this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer(); this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram(); + this.payloadRttHistogram = pulsarActivity.getPayloadRttHistogram(); this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic; + this.payloadRttTrackingField = payloadRttTrackingField; } private void checkAndUpdateMessageErrorCounter(Message message) { @@ -138,9 +146,16 @@ public class PulsarConsumerOp implements PulsarOp { } } + if (!payloadRttTrackingField.isEmpty()) { + // TODO: Extract the extractedSendTime from the payload and convert it to a long. + long extractedSendTime = 0L; + long delta = System.currentTimeMillis()-extractedSendTime; + payloadRttHistogram.update(delta); + } + // keep track end-to-end message processing latency - long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime(); if (e2eMsgProc) { + long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime(); e2eMsgProcLatencyHistogram.update(e2eMsgLatency); } @@ -220,8 +235,8 @@ public class PulsarConsumerOp implements PulsarOp { } } - long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime(); if (e2eMsgProc) { + long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime(); e2eMsgProcLatencyHistogram.update(e2eMsgLatency); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 6842a8af7..bf540ad36 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -1,28 +1,32 @@ package io.nosqlbench.driver.pulsar.ops; -import io.nosqlbench.driver.pulsar.*; +import io.nosqlbench.driver.pulsar.PulsarActivity; +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.driver.pulsar.PulsarSpaceCache; import io.nosqlbench.driver.pulsar.exception.PulsarDriverParamException; import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnsupportedOpException; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.templating.CommandTemplate; -import java.util.*; -import java.util.stream.Collectors; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.transaction.Transaction; +import java.util.*; import java.util.function.LongFunction; import java.util.function.Supplier; +import java.util.stream.Collectors; public class ReadyPulsarOp implements OpDispenser { + // TODO: Add this to the pulsar driver docs + public static final String RTT_TRACKING_FIELD = "payload-tracking-field"; private final static Logger logger = LogManager.getLogger(ReadyPulsarOp.class); private final OpTemplate opTpl; @@ -129,6 +133,17 @@ public class ReadyPulsarOp implements OpDispenser { } logger.info("seq_tracking: {}", seqTrackingFunc.apply(0)); + // TODO: Collapse this pattern into a simple version and flatten out all call sites + LongFunction payloadRttFieldFunc = (l) -> ""; + if (cmdTpl.isStatic(RTT_TRACKING_FIELD)) { + payloadRttFieldFunc = l -> cmdTpl.getStatic(RTT_TRACKING_FIELD); + logger.info("payload_rtt_field: {}", cmdTpl.getStatic(RTT_TRACKING_FIELD)); + } else if (cmdTpl.isDynamic(RTT_TRACKING_FIELD)) { + payloadRttFieldFunc = l -> cmdTpl.getDynamic(RTT_TRACKING_FIELD,l); + logger.info("payload_rtt_field: {}", cmdTpl.getFieldDescription(RTT_TRACKING_FIELD)); + } + logger.info("payload_rtt_field_func: {}", payloadRttFieldFunc.toString()); + // TODO: Complete implementation for websocket-producer and managed-ledger // Admin operation: create/delete tenant if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) { @@ -154,7 +169,8 @@ public class ReadyPulsarOp implements OpDispenser { asyncApiFunc, useTransactionFunc, seqTrackingFunc, - false); + false, + payloadRttFieldFunc); } // Regular/non-admin operation: single message consuming from multiple-topics (consumer) else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_MULTI_CONSUME.label)) { @@ -163,7 +179,8 @@ public class ReadyPulsarOp implements OpDispenser { topicUriFunc, asyncApiFunc, useTransactionFunc, - seqTrackingFunc); + seqTrackingFunc, + payloadRttFieldFunc); } // Regular/non-admin operation: single message consuming a single topic (reader) else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) { @@ -193,7 +210,8 @@ public class ReadyPulsarOp implements OpDispenser { asyncApiFunc, useTransactionFunc, seqTrackingFunc, - true); + true, + payloadRttFieldFunc); } // Invalid operation type else { @@ -411,7 +429,8 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction async_api_func, LongFunction useTransactionFunc, LongFunction seqTrackingFunc, - boolean e2eMsgProc + boolean e2eMsgProc, + LongFunction rttTrackingFieldFunc ) { LongFunction subscription_name_func; if (cmdTpl.isStatic("subscription_name")) { @@ -460,7 +479,8 @@ public class ReadyPulsarOp implements OpDispenser { seqTrackingFunc, transactionSupplierFunc, consumerFunc, - e2eMsgProc); + e2eMsgProc, + rttTrackingFieldFunc); } private LongFunction resolveMultiTopicMsgConsume( @@ -468,7 +488,8 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction topic_uri_func, LongFunction async_api_func, LongFunction useTransactionFunc, - LongFunction seqTrackingFunc + LongFunction seqTrackingFunc, + LongFunction payloadRttFieldFunc ) { // Topic list (multi-topic) LongFunction topic_names_func; @@ -539,7 +560,8 @@ public class ReadyPulsarOp implements OpDispenser { seqTrackingFunc, transactionSupplierFunc, mtConsumerFunc, - false); + false, + payloadRttFieldFunc); } private LongFunction resolveMsgRead( diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/templating/CommandTemplate.java b/engine-api/src/main/java/io/nosqlbench/engine/api/templating/CommandTemplate.java index e739a2586..8f9ce1f1b 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/templating/CommandTemplate.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/templating/CommandTemplate.java @@ -105,7 +105,7 @@ public class CommandTemplate { for (Function> parser : parserlist) { Map parsed = parser.apply(oneline); if (parsed != null) { - logger.debug("parsed request: " + parsed.toString()); + logger.debug("parsed request: " + parsed); cmd.putAll(parsed); didParse = true; break; @@ -291,4 +291,19 @@ public class CommandTemplate { } return true; } + + /** + * This should only be used to provide a view of a field definition, never for actual use in a payload. + * @param varname The field name which you want to explain + * @return A string representation of the field name + */ + public String getFieldDescription(String varname) { + if (this.isDynamic(varname)) { + return "dynamic: " + this.dynamics.get(varname).toString(); + } else if (this.isStatic(varname)) { + return "static: " + this.getStatic(varname); + } else { + return "UNDEFINED"; + } + } } From c15bd97c9780e58a2d23319f4a9334a43d65cf81 Mon Sep 17 00:00:00 2001 From: Matt Fleming Date: Mon, 8 Nov 2021 17:10:18 +0000 Subject: [PATCH 2/3] Avoid NullPointerException if pulsarCache hasn't been initialized. --- .../main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index 630810b43..2039e60a0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -78,6 +78,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public void shutdownActivity() { super.shutdownActivity(); + if (pulsarCache == null) { + return; + } + for (PulsarSpace pulsarSpace : pulsarCache.getAssociatedPulsarSpace()) { pulsarSpace.shutdownPulsarSpace(); } From 8353a9d8425766b224a19fdaa058b7c29b511c0d Mon Sep 17 00:00:00 2001 From: Matt Fleming Date: Tue, 9 Nov 2021 20:44:00 +0000 Subject: [PATCH 3/3] Extract payload rtt field from Avro record if it exists --- .../driver/pulsar/ops/PulsarConsumerOp.java | 27 ++++++++++++------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index d73dd02c9..896631293 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -1,5 +1,12 @@ package io.nosqlbench.driver.pulsar.ops; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.commons.lang3.StringUtils; + import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; @@ -7,7 +14,6 @@ import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Consumer; @@ -16,11 +22,6 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.schema.SchemaType; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.function.Supplier; - public class PulsarConsumerOp implements PulsarOp { private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class); @@ -147,10 +148,16 @@ public class PulsarConsumerOp implements PulsarOp { } if (!payloadRttTrackingField.isEmpty()) { - // TODO: Extract the extractedSendTime from the payload and convert it to a long. - long extractedSendTime = 0L; - long delta = System.currentTimeMillis()-extractedSendTime; - payloadRttHistogram.update(delta); + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + org.apache.avro.Schema avroSchema = + AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); + if (avroGenericRecord.hasField(payloadRttTrackingField)) { + long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField); + long delta = System.currentTimeMillis() - extractedSendTime; + payloadRttHistogram.update(delta); + } } // keep track end-to-end message processing latency