Add way to calculate end-to-end latency from event time

This commit is contained in:
Lari Hotari 2021-12-08 11:22:36 +02:00
parent fb30deb1ff
commit caceaed638
5 changed files with 51 additions and 16 deletions

View File

@ -0,0 +1,8 @@
package io.nosqlbench.driver.pulsar.ops;
public enum EndToEndStartingTimeSource {
NONE, // no end-to-end latency calculation
MESSAGE_PUBLISH_TIME, // use message publish timestamp
MESSAGE_EVENT_TIME, // use message event timestamp
MESSAGE_PROPERTY_E2E_STARTING_TIME // use message property called "e2e_starting_time" as the timestamp
}

View File

@ -28,7 +28,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
private final LongFunction<Consumer<?>> consumerFunc;
private final boolean e2eMsProc;
private final EndToEndStartingTimeSource endToEndStartingTimeSource;
private final LongFunction<String> payloadRttFieldFunc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
@ -39,11 +39,11 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
LongFunction<Consumer<?>> consumerFunc,
boolean e2eMsgProc,
EndToEndStartingTimeSource endToEndStartingTimeSource,
LongFunction<String> payloadRttFieldFunc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.consumerFunc = consumerFunc;
this.e2eMsProc = e2eMsgProc;
this.endToEndStartingTimeSource = endToEndStartingTimeSource;
this.payloadRttFieldFunc = payloadRttFieldFunc;
}
@ -65,7 +65,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
consumer,
clientSpace.getPulsarSchema(),
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
e2eMsProc,
endToEndStartingTimeSource,
this::getReceivedMessageSequenceTracker,
payloadRttFieldFunc);
}

View File

@ -39,7 +39,7 @@ public class PulsarConsumerOp implements PulsarOp {
private final Consumer<?> consumer;
private final Schema<?> pulsarSchema;
private final int timeoutSeconds;
private final boolean e2eMsgProc;
private final EndToEndStartingTimeSource endToEndStartingTimeSource;
private final Counter bytesCounter;
private final Histogram messageSizeHistogram;
@ -61,7 +61,7 @@ public class PulsarConsumerOp implements PulsarOp {
Consumer<?> consumer,
Schema<?> schema,
int timeoutSeconds,
boolean e2eMsgProc,
EndToEndStartingTimeSource endToEndStartingTimeSource,
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic,
String payloadRttTrackingField)
{
@ -75,7 +75,7 @@ public class PulsarConsumerOp implements PulsarOp {
this.consumer = consumer;
this.pulsarSchema = schema;
this.timeoutSeconds = timeoutSeconds;
this.e2eMsgProc = e2eMsgProc;
this.endToEndStartingTimeSource = endToEndStartingTimeSource;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
@ -211,9 +211,24 @@ public class PulsarConsumerOp implements PulsarOp {
}
// keep track end-to-end message processing latency
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
if (endToEndStartingTimeSource != EndToEndStartingTimeSource.NONE) {
long startTimeStamp = 0L;
switch (endToEndStartingTimeSource) {
case MESSAGE_PUBLISH_TIME:
startTimeStamp = message.getPublishTime();
break;
case MESSAGE_EVENT_TIME:
startTimeStamp = message.getEventTime();
break;
case MESSAGE_PROPERTY_E2E_STARTING_TIME:
String startingTimeProperty = message.getProperty("e2e_starting_time");
startTimeStamp = startingTimeProperty != null ? Long.parseLong(startingTimeProperty) : 0L;
break;
}
if (startTimeStamp != 0L) {
long e2eMsgLatency = System.currentTimeMillis() - startTimeStamp;
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
}
// keep track of message errors and update error counters

View File

@ -116,7 +116,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
false,
parseEndToEndStartingTimeSourceParameter(EndToEndStartingTimeSource.NONE),
payloadRttFieldFunc);
}
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
@ -157,7 +157,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
true,
parseEndToEndStartingTimeSourceParameter(
EndToEndStartingTimeSource.MESSAGE_PUBLISH_TIME),
payloadRttFieldFunc);
}
// Invalid operation type
@ -166,6 +167,14 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
}
private EndToEndStartingTimeSource parseEndToEndStartingTimeSourceParameter(EndToEndStartingTimeSource defaultValue) {
EndToEndStartingTimeSource endToEndStartingTimeSource = defaultValue;
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label)) {
endToEndStartingTimeSource = EndToEndStartingTimeSource.valueOf(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label).toUpperCase());
}
return endToEndStartingTimeSource;
}
// Admin API: create tenant
private LongFunction<PulsarOp> resolveAdminTenant(
PulsarSpace clientSpace,
@ -304,7 +313,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
boolean e2eMsgProc,
EndToEndStartingTimeSource endToEndStartingTimeSource,
LongFunction<String> rttTrackingFieldFunc
) {
LongFunction<String> subscription_name_func = lookupParameterFunc("subscription_name");
@ -333,7 +342,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
seqTrackingFunc,
transactionSupplierFunc,
consumerFunc,
e2eMsgProc,
endToEndStartingTimeSource,
rttTrackingFieldFunc);
}
@ -379,7 +388,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
seqTrackingFunc,
transactionSupplierFunc,
mtConsumerFunc,
false,
parseEndToEndStartingTimeSourceParameter(EndToEndStartingTimeSource.NONE),
payloadRttFieldFunc);
}

View File

@ -42,6 +42,8 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isValidClientType(String type) {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
@ -56,7 +58,8 @@ public class PulsarActivityUtil {
USE_TRANSACTION("use_transaction"),
ADMIN_DELOP("admin_delop"),
SEQ_TRACKING("seq_tracking"),
MSG_DEDUP_BROKER("msg_dedup_broker");
MSG_DEDUP_BROKER("msg_dedup_broker"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source");
public final String label;