Merge pull request #374 from nosqlbench/pulsar-payload-rtt

add instrumentation scaffold for payloadRtt
This commit is contained in:
Jonathan Shook 2021-11-10 10:29:18 -06:00 committed by GitHub
commit d9ce624355
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 27 deletions

View File

@ -40,6 +40,15 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
// Metrics for NB Pulsar driver milestone: https://github.com/nosqlbench/nosqlbench/milestone/11
// - end-to-end latency
private Histogram e2eMsgProcLatencyHistogram;
/**
* A histogram that tracks payload round-trip-time, based on a user-defined field in some sender
* system which can be interpreted as millisecond epoch time in the system's local time zone.
* This is paired with a field name of the same type to be extracted and reported in a meteric
* named 'payload-rtt'.
*/
private Histogram payloadRttHistogram;
// - message out of sequence error counter
private Counter msgErrOutOfSeqCounter;
// - message loss counter
@ -69,6 +78,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public void shutdownActivity() {
super.shutdownActivity();
if (pulsarCache == null) {
return;
}
for (PulsarSpace pulsarSpace : pulsarCache.getAssociatedPulsarSpace()) {
pulsarSpace.shutdownPulsarSpace();
}
@ -87,6 +100,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction");
e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency");
payloadRttHistogram = ActivityMetrics.histogram(activityDef, "payload_rtt");
msgErrOutOfSeqCounter = ActivityMetrics.counter(activityDef, "err_msg_oos");
msgErrLossCounter = ActivityMetrics.counter(activityDef, "err_msg_loss");
msgErrDuplicateCounter = ActivityMetrics.counter(activityDef, "err_msg_dup");
@ -258,6 +273,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Timer getCreateTransactionTimer() { return createTransactionTimer; }
public Timer getCommitTransactionTimer() { return commitTransactionTimer; }
public Histogram getPayloadRttHistogram() {return payloadRttHistogram;}
public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; }
public Counter getMsgErrOutOfSeqCounter() { return msgErrOutOfSeqCounter; }
public Counter getMsgErrLossCounter() { return msgErrLossCounter; }

View File

@ -3,13 +3,13 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@ -29,6 +29,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final LongFunction<Consumer<?>> consumerFunc;
private final boolean e2eMsProc;
private final LongFunction<String> payloadRttFieldFunc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
@ -38,10 +39,12 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
LongFunction<Consumer<?>> consumerFunc,
boolean e2eMsgProc) {
boolean e2eMsgProc,
LongFunction<String> payloadRttFieldFunc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.consumerFunc = consumerFunc;
this.e2eMsProc = e2eMsgProc;
this.payloadRttFieldFunc = payloadRttFieldFunc;
}
@Override
@ -51,6 +54,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
boolean asyncApi = asyncApiFunc.apply(value);
boolean useTransaction = useTransactionFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
String payloadRttFieldFunc = this.payloadRttFieldFunc.apply(value);
return new PulsarConsumerOp(
pulsarActivity,
@ -62,7 +66,8 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
clientSpace.getPulsarSchema(),
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
e2eMsProc,
this::getReceivedMessageSequenceTracker);
this::getReceivedMessageSequenceTracker,
payloadRttFieldFunc);
}

View File

@ -1,24 +1,27 @@
package io.nosqlbench.driver.pulsar.ops;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.*;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class PulsarConsumerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
@ -41,7 +44,10 @@ public class PulsarConsumerOp implements PulsarOp {
// keep track of end-to-end message latency
private final Histogram e2eMsgProcLatencyHistogram;
private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
private final Histogram payloadRttHistogram;
private final String payloadRttTrackingField;
public PulsarConsumerOp(
PulsarActivity pulsarActivity,
@ -53,7 +59,8 @@ public class PulsarConsumerOp implements PulsarOp {
Schema<?> schema,
int timeoutSeconds,
boolean e2eMsgProc,
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic)
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic,
String payloadRttTrackingField)
{
this.pulsarActivity = pulsarActivity;
@ -72,7 +79,9 @@ public class PulsarConsumerOp implements PulsarOp {
this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer();
this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram();
this.payloadRttHistogram = pulsarActivity.getPayloadRttHistogram();
this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic;
this.payloadRttTrackingField = payloadRttTrackingField;
}
private void checkAndUpdateMessageErrorCounter(Message message) {
@ -138,9 +147,22 @@ public class PulsarConsumerOp implements PulsarOp {
}
}
if (!payloadRttTrackingField.isEmpty()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
long delta = System.currentTimeMillis() - extractedSendTime;
payloadRttHistogram.update(delta);
}
}
// keep track end-to-end message processing latency
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
@ -220,8 +242,8 @@ public class PulsarConsumerOp implements PulsarOp {
}
}
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}

View File

@ -1,28 +1,32 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.*;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.PulsarSpaceCache;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverParamException;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnsupportedOpException;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import java.util.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.*;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
// TODO: Add this to the pulsar driver docs
public static final String RTT_TRACKING_FIELD = "payload-tracking-field";
private final static Logger logger = LogManager.getLogger(ReadyPulsarOp.class);
private final OpTemplate opTpl;
@ -129,6 +133,17 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
// TODO: Collapse this pattern into a simple version and flatten out all call sites
LongFunction<String> payloadRttFieldFunc = (l) -> "";
if (cmdTpl.isStatic(RTT_TRACKING_FIELD)) {
payloadRttFieldFunc = l -> cmdTpl.getStatic(RTT_TRACKING_FIELD);
logger.info("payload_rtt_field: {}", cmdTpl.getStatic(RTT_TRACKING_FIELD));
} else if (cmdTpl.isDynamic(RTT_TRACKING_FIELD)) {
payloadRttFieldFunc = l -> cmdTpl.getDynamic(RTT_TRACKING_FIELD,l);
logger.info("payload_rtt_field: {}", cmdTpl.getFieldDescription(RTT_TRACKING_FIELD));
}
logger.info("payload_rtt_field_func: {}", payloadRttFieldFunc.toString());
// TODO: Complete implementation for websocket-producer and managed-ledger
// Admin operation: create/delete tenant
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) {
@ -154,7 +169,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
false);
false,
payloadRttFieldFunc);
}
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_MULTI_CONSUME.label)) {
@ -163,7 +179,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
topicUriFunc,
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc);
seqTrackingFunc,
payloadRttFieldFunc);
}
// Regular/non-admin operation: single message consuming a single topic (reader)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
@ -193,7 +210,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
true);
true,
payloadRttFieldFunc);
}
// Invalid operation type
else {
@ -411,7 +429,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
boolean e2eMsgProc
boolean e2eMsgProc,
LongFunction<String> rttTrackingFieldFunc
) {
LongFunction<String> subscription_name_func;
if (cmdTpl.isStatic("subscription_name")) {
@ -460,7 +479,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
seqTrackingFunc,
transactionSupplierFunc,
consumerFunc,
e2eMsgProc);
e2eMsgProc,
rttTrackingFieldFunc);
}
private LongFunction<PulsarOp> resolveMultiTopicMsgConsume(
@ -468,7 +488,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc
LongFunction<Boolean> seqTrackingFunc,
LongFunction<String> payloadRttFieldFunc
) {
// Topic list (multi-topic)
LongFunction<String> topic_names_func;
@ -539,7 +560,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
seqTrackingFunc,
transactionSupplierFunc,
mtConsumerFunc,
false);
false,
payloadRttFieldFunc);
}
private LongFunction<PulsarOp> resolveMsgRead(

View File

@ -105,7 +105,7 @@ public class CommandTemplate {
for (Function<String, Map<String, String>> parser : parserlist) {
Map<String, String> parsed = parser.apply(oneline);
if (parsed != null) {
logger.debug("parsed request: " + parsed.toString());
logger.debug("parsed request: " + parsed);
cmd.putAll(parsed);
didParse = true;
break;
@ -291,4 +291,19 @@ public class CommandTemplate {
}
return true;
}
/**
* This should only be used to provide a view of a field definition, never for actual use in a payload.
* @param varname The field name which you want to explain
* @return A string representation of the field name
*/
public String getFieldDescription(String varname) {
if (this.isDynamic(varname)) {
return "dynamic: " + this.dynamics.get(varname).toString();
} else if (this.isStatic(varname)) {
return "static: " + this.getStatic(varname);
} else {
return "UNDEFINED";
}
}
}