Merge pull request #390 from lhotari/lh-e2e-latency-event-time

[Pulsar] Add way to calculate end-to-end operation latency from event time or message property
This commit is contained in:
Jonathan Shook
2021-12-09 01:03:41 -06:00
committed by GitHub
6 changed files with 138 additions and 129 deletions

View File

@@ -0,0 +1,8 @@
package io.nosqlbench.driver.pulsar.ops;
public enum EndToEndStartingTimeSource {
NONE, // no end-to-end latency calculation
MESSAGE_PUBLISH_TIME, // use message publish timestamp
MESSAGE_EVENT_TIME, // use message event timestamp
MESSAGE_PROPERTY_E2E_STARTING_TIME // use message property called "e2e_starting_time" as the timestamp
}

View File

@@ -28,7 +28,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
private final LongFunction<Consumer<?>> consumerFunc;
private final boolean e2eMsProc;
private final EndToEndStartingTimeSource endToEndStartingTimeSource;
private final LongFunction<String> payloadRttFieldFunc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
@@ -39,11 +39,11 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
LongFunction<Consumer<?>> consumerFunc,
boolean e2eMsgProc,
EndToEndStartingTimeSource endToEndStartingTimeSource,
LongFunction<String> payloadRttFieldFunc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.consumerFunc = consumerFunc;
this.e2eMsProc = e2eMsgProc;
this.endToEndStartingTimeSource = endToEndStartingTimeSource;
this.payloadRttFieldFunc = payloadRttFieldFunc;
}
@@ -65,7 +65,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
consumer,
clientSpace.getPulsarSchema(),
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
e2eMsProc,
endToEndStartingTimeSource,
this::getReceivedMessageSequenceTracker,
payloadRttFieldFunc);
}

View File

@@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar.ops;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
@@ -19,6 +20,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.SchemaType;
@@ -37,7 +39,7 @@ public class PulsarConsumerOp implements PulsarOp {
private final Consumer<?> consumer;
private final Schema<?> pulsarSchema;
private final int timeoutSeconds;
private final boolean e2eMsgProc;
private final EndToEndStartingTimeSource endToEndStartingTimeSource;
private final Counter bytesCounter;
private final Histogram messageSizeHistogram;
@@ -59,7 +61,7 @@ public class PulsarConsumerOp implements PulsarOp {
Consumer<?> consumer,
Schema<?> schema,
int timeoutSeconds,
boolean e2eMsgProc,
EndToEndStartingTimeSource endToEndStartingTimeSource,
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic,
String payloadRttTrackingField)
{
@@ -73,7 +75,7 @@ public class PulsarConsumerOp implements PulsarOp {
this.consumer = consumer;
this.pulsarSchema = schema;
this.timeoutSeconds = timeoutSeconds;
this.e2eMsgProc = e2eMsgProc;
this.endToEndStartingTimeSource = endToEndStartingTimeSource;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
@@ -85,7 +87,7 @@ public class PulsarConsumerOp implements PulsarOp {
this.payloadRttTrackingField = payloadRttTrackingField;
}
private void checkAndUpdateMessageErrorCounter(Message message) {
private void checkAndUpdateMessageErrorCounter(Message<?> message) {
String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_NUMBER);
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
@@ -108,9 +110,9 @@ public class PulsarConsumerOp implements PulsarOp {
}
if (!asyncPulsarOp) {
Message<?> message;
try {
Message<?> message;
if (timeoutSeconds <= 0) {
// wait forever
message = consumer.receive();
@@ -123,77 +125,11 @@ public class PulsarConsumerOp implements PulsarOp {
}
}
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
}
if (!payloadRttTrackingField.isEmpty()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
long delta = System.currentTimeMillis() - extractedSendTime;
payloadRttHistogram.update(delta);
}
}
// keep track end-to-end message processing latency
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (!useTransaction) {
consumer.acknowledge(message.getMessageId());
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
handleMessage(transaction, message);
}
catch (Exception e) {
logger.error(
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds);
e.printStackTrace();
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds, e);
throw new PulsarDriverUnexpectedException("" +
"Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds ");
}
@@ -213,52 +149,16 @@ public class PulsarConsumerOp implements PulsarOp {
);
}
msgRecvFuture.whenComplete((message, error) -> {
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
msgRecvFuture.thenAccept(message -> {
try {
handleMessage(transaction, message);
} catch (PulsarClientException e) {
pulsarActivity.asyncOperationFailed(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
pulsarActivity.asyncOperationFailed(e.getCause());
}
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
if (!useTransaction) {
consumer.acknowledgeAsync(message);
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction);
}
timeTracker.run();
}).exceptionally(ex -> {
pulsarActivity.asyncOperationFailed(ex);
return null;
@@ -270,4 +170,88 @@ public class PulsarConsumerOp implements PulsarOp {
}
}
private void handleMessage(Transaction transaction, Message<?> message)
throws PulsarClientException, InterruptedException, ExecutionException {
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
}
if (!payloadRttTrackingField.isEmpty()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
long delta = System.currentTimeMillis() - extractedSendTime;
payloadRttHistogram.update(delta);
}
}
// keep track end-to-end message processing latency
if (endToEndStartingTimeSource != EndToEndStartingTimeSource.NONE) {
long startTimeStamp = 0L;
switch (endToEndStartingTimeSource) {
case MESSAGE_PUBLISH_TIME:
startTimeStamp = message.getPublishTime();
break;
case MESSAGE_EVENT_TIME:
startTimeStamp = message.getEventTime();
break;
case MESSAGE_PROPERTY_E2E_STARTING_TIME:
String startingTimeProperty = message.getProperty("e2e_starting_time");
startTimeStamp = startingTimeProperty != null ? Long.parseLong(startingTimeProperty) : 0L;
break;
}
if (startTimeStamp != 0L) {
long e2eMsgLatency = System.currentTimeMillis() - startTimeStamp;
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (!useTransaction) {
consumer.acknowledge(message.getMessageId());
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
}
}

View File

@@ -116,7 +116,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
false,
parseEndToEndStartingTimeSourceParameter(EndToEndStartingTimeSource.NONE),
payloadRttFieldFunc);
}
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
@@ -157,7 +157,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
true,
parseEndToEndStartingTimeSourceParameter(
EndToEndStartingTimeSource.MESSAGE_PUBLISH_TIME),
payloadRttFieldFunc);
}
// Invalid operation type
@@ -166,6 +167,14 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
}
private EndToEndStartingTimeSource parseEndToEndStartingTimeSourceParameter(EndToEndStartingTimeSource defaultValue) {
EndToEndStartingTimeSource endToEndStartingTimeSource = defaultValue;
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label)) {
endToEndStartingTimeSource = EndToEndStartingTimeSource.valueOf(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label).toUpperCase());
}
return endToEndStartingTimeSource;
}
// Admin API: create tenant
private LongFunction<PulsarOp> resolveAdminTenant(
PulsarSpace clientSpace,
@@ -304,7 +313,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
boolean e2eMsgProc,
EndToEndStartingTimeSource endToEndStartingTimeSource,
LongFunction<String> rttTrackingFieldFunc
) {
LongFunction<String> subscription_name_func = lookupParameterFunc("subscription_name");
@@ -333,7 +342,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
seqTrackingFunc,
transactionSupplierFunc,
consumerFunc,
e2eMsgProc,
endToEndStartingTimeSource,
rttTrackingFieldFunc);
}
@@ -379,7 +388,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
seqTrackingFunc,
transactionSupplierFunc,
mtConsumerFunc,
false,
parseEndToEndStartingTimeSourceParameter(EndToEndStartingTimeSource.NONE),
payloadRttFieldFunc);
}

View File

@@ -42,6 +42,8 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isValidClientType(String type) {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
@@ -56,7 +58,8 @@ public class PulsarActivityUtil {
USE_TRANSACTION("use_transaction"),
ADMIN_DELOP("admin_delop"),
SEQ_TRACKING("seq_tracking"),
MSG_DEDUP_BROKER("msg_dedup_broker");
MSG_DEDUP_BROKER("msg_dedup_broker"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source");
public final String label;

View File

@@ -139,6 +139,11 @@ Currently, the following configuration parameters are available at this level:
* **use_transaction**: Whether to simulate Pulsar transaction. This can only be statically bound.
* **admin_delop**: For Admin tasks, whether to execute delete operation instead of the default create operation. This can only be statically bound.
* **seq_tracking**: Whether to do message sequence tracking. This is used for abnormal message processing error detection such as message loss, message duplication, or message out-of-order. This can only be statically bound.
* **e2e_starting_time_source**: Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. The possible values for `e2e_starting_time_source`:
* `message_publish_time` - uses the message publishing timestamp as the starting time
* `message_event_time` - uses the message event timestamp as the starting time
* `message_property_e2e_starting_time` - uses a message property `e2e_starting_time` as the starting time.
## 3.3. Statement Level Parameters
**Statement Level** parameters are set within the NB yaml file under different statement blocks. Each workload type/statement block has its own set of statement level configuration parameters. We'll cover these parameters in section 5.