Merge pull request #360 from yabinmeng/main

Add support for message duplication check
This commit is contained in:
Jonathan Shook 2021-09-29 15:55:46 -05:00 committed by GitHub
commit 16a502d4d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 286 additions and 88 deletions

View File

@ -0,0 +1,8 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarDriverParamException extends RuntimeException {
public PulsarDriverParamException(String message) {
super(message);
}
}

View File

@ -0,0 +1,9 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarDriverUnexpectedException extends RuntimeException {
public PulsarDriverUnexpectedException(String message) {
super(message);
}
public PulsarDriverUnexpectedException(Exception e) { super(e); }
}

View File

@ -0,0 +1,7 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarDriverUnsupportedOpException extends RuntimeException {
public PulsarDriverUnsupportedOpException() { super("Unsupported Pulsar driver operation type"); }
}

View File

@ -0,0 +1,10 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgDuplicateException extends RuntimeException {
public PulsarMsgDuplicateException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") +
" Detected duplicate message when message deduplication is enabled (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + ").");
}
}

View File

@ -0,0 +1,11 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgLossException extends RuntimeException {
public PulsarMsgLossException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]") +
" Detected message sequence id gap (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Some published messages are not received!");
}
}

View File

@ -0,0 +1,11 @@
package io.nosqlbench.driver.pulsar.exception;
public class PulsarMsgOutOfOrderException extends RuntimeException {
public PulsarMsgOutOfOrderException(boolean asyncPulsarOp, long nbCycleNum, long curMsgSeqId, long prevMsgSeqId) {
super("" + (asyncPulsarOp ? "[AsyncAPI]" : "[SyncAPI]" ) +
" Detected message ordering is not guaranteed (curCycleNum=" + nbCycleNum +
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
"Older messages are received earlier!");
}
}

View File

@ -4,11 +4,9 @@ import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.admin.*;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -33,13 +34,7 @@ public class PulsarAdminTopicOp extends PulsarAdminOp {
this.topicUri = topicUri;
this.partitionTopic = partitionTopic;
this.partitionNum = partitionNum;
// Get tenant/namespace string
// - topicUri : persistent://<tenant>/<namespace>/<topic>
// - tmpStr : <tenant>/<namespace>/<topic>
// - fullNsName : <tenant>/<namespace>
String tmpStr = StringUtils.substringAfter(this.topicUri,"://");
this.fullNsName = StringUtils.substringBeforeLast(tmpStr, "/");
this.fullNsName = PulsarActivityUtil.getFullNamespaceName(this.topicUri);
}
// Check whether the specified topic already exists

View File

@ -30,6 +30,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<Boolean> topicMsgDedupFunc;
private final boolean e2eMsProc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
@ -39,10 +40,12 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
LongFunction<Boolean> topicMsgDedupFunc,
LongFunction<Consumer<?>> consumerFunc,
boolean e2eMsgProc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.consumerFunc = consumerFunc;
this.topicMsgDedupFunc = topicMsgDedupFunc;
this.e2eMsProc = e2eMsgProc;
}
@ -53,6 +56,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
boolean useTransaction = useTransactionFunc.apply(value);
boolean seqTracking = seqTrackingFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
boolean topicMsgDedup = topicMsgDedupFunc.apply(value);
return new PulsarConsumerOp(
pulsarActivity,
@ -60,6 +64,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
useTransaction,
seqTracking,
transactionSupplier,
topicMsgDedup,
consumer,
clientSpace.getPulsarSchema(),
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),

View File

@ -4,8 +4,10 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.*;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
@ -27,6 +29,7 @@ public class PulsarConsumerOp implements PulsarOp {
private final boolean seqTracking;
private final Supplier<Transaction> transactionSupplier;
private final boolean topicMsgDedup;
private final Consumer<?> consumer;
private final Schema<?> pulsarSchema;
private final int timeoutSeconds;
@ -34,7 +37,7 @@ public class PulsarConsumerOp implements PulsarOp {
private final long curCycleNum;
private long curMsgSeqId;
private long prevMsgSeqid;
private long prevMsgSeqId;
private final Counter bytesCounter;
private final Histogram messageSizeHistogram;
@ -49,6 +52,7 @@ public class PulsarConsumerOp implements PulsarOp {
boolean useTransaction,
boolean seqTracking,
Supplier<Transaction> transactionSupplier,
boolean topicMsgDedup,
Consumer<?> consumer,
Schema<?> schema,
int timeoutSeconds,
@ -62,6 +66,7 @@ public class PulsarConsumerOp implements PulsarOp {
this.seqTracking = seqTracking;
this.transactionSupplier = transactionSupplier;
this.topicMsgDedup = topicMsgDedup;
this.consumer = consumer;
this.pulsarSchema = schema;
this.timeoutSeconds = timeoutSeconds;
@ -69,7 +74,7 @@ public class PulsarConsumerOp implements PulsarOp {
this.e2eMsgProc = e2eMsgProc;
this.curMsgSeqId = 0;
this.prevMsgSeqid = 0;
this.prevMsgSeqId = (curCycleNum - 1);
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
@ -136,22 +141,26 @@ public class PulsarConsumerOp implements PulsarOp {
}
// keep track of message ordering and message loss
if (seqTracking) {
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) {
curMsgSeqId = Long.parseLong(msgSeqIdStr);
if ( prevMsgSeqId > -1) {
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqid) == 1) {
prevMsgSeqid = curMsgSeqId;
}
else {
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
if (curMsgSeqId < prevMsgSeqid) {
throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!");
if (curMsgSeqId < prevMsgSeqId) {
throw new PulsarMsgOutOfOrderException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// abnormal case: message loss
else if ( (curMsgSeqId - prevMsgSeqid) > 1 ) {
throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!");
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
false, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
}
@ -179,7 +188,8 @@ public class PulsarConsumerOp implements PulsarOp {
catch (Exception e) {
logger.error(
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds);
throw new RuntimeException(e);
throw new PulsarDriverUnexpectedException("" +
"Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds ");
}
}
else {
@ -230,22 +240,27 @@ public class PulsarConsumerOp implements PulsarOp {
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message ordering and message loss
if (seqTracking) {
// keep track of message ordering, message loss, and message duplication
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) {
curMsgSeqId = Long.parseLong(msgSeqIdStr);
if (prevMsgSeqId > -1) {
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqid) == 1) {
prevMsgSeqid = curMsgSeqId;
} else {
if ((curMsgSeqId - prevMsgSeqId) != 1) {
// abnormal case: out of ordering
if (curMsgSeqId < prevMsgSeqid) {
throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!");
if (curMsgSeqId < prevMsgSeqId) {
throw new PulsarMsgOutOfOrderException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
// abnormal case: message loss
else if ((curMsgSeqId - prevMsgSeqid) > 1) {
throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!");
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
throw new PulsarMsgLossException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
throw new PulsarMsgDuplicateException(
true, curCycleNum, curMsgSeqId, prevMsgSeqId);
}
}
}
}
@ -264,7 +279,7 @@ public class PulsarConsumerOp implements PulsarOp {
});
}
catch (Exception e) {
throw new RuntimeException(e);
throw new PulsarDriverUnexpectedException("Async message receiving failed");
}
}
}

View File

@ -1,6 +1,5 @@
package io.nosqlbench.driver.pulsar.ops;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
@ -12,7 +11,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
@ -70,14 +68,17 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
// Simulate error 10% of the time
float rndVal = RandomUtils.nextFloat(0, 1.0f);
boolean simulationError = (rndVal > 0) && (rndVal < 0.1f);
boolean simulationError = (rndVal >= 0) && (rndVal < 0.1f);
String seqErrSimuType = seqErrSimuTypeFunc.apply(value);
boolean simulateMsgOutofOrder = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label);
boolean simulateMsgLoss = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.DataLoss.label);
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label);
boolean simulateMsgDup = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label);
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
@ -100,14 +101,22 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
// Set message sequence tracking property
if (seqTracking) {
if (!simulateMsgOutofOrder) {
// normal case
if (!simulateMsgOutofOrder && !simulateMsgDup) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value));
}
else {
// simulate message out of order
else if ( simulateMsgOutofOrder ) {
int rndmOffset = 2;
if (value > rndmOffset)
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-rndmOffset));
}
// simulate message duplication
else {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-1));
}
// message loss simulation is not done by message property
// we simply skip sending message in the current NB cycle
}
return new PulsarProducerOp(

View File

@ -4,6 +4,8 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverParamException;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
@ -77,8 +79,8 @@ public class PulsarProducerOp implements PulsarOp {
return;
}
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
if ( StringUtils.isBlank(msgPayload)) {
throw new PulsarDriverParamException("Message payload (\"msg-value\") can't be empty!");
}
TypedMessageBuilder typedMessageBuilder;
@ -158,12 +160,15 @@ public class PulsarProducerOp implements PulsarOp {
}
}
catch (PulsarClientException | ExecutionException | InterruptedException pce) {
logger.trace(
String errMsg =
"Sync message sending failed: " +
"key - " + msgKey + "; " +
"properties - " + msgProperties + "; " +
"payload - " + msgPayload);
throw new RuntimeException(pce);
"payload - " + msgPayload;
logger.trace(errMsg);
throw new PulsarDriverUnexpectedException(errMsg);
}
timeTracker.run();
@ -219,7 +224,7 @@ public class PulsarProducerOp implements PulsarOp {
});
}
catch (Exception e) {
throw new RuntimeException(e);
throw new PulsarDriverUnexpectedException(e);
}
}
}

View File

@ -1,6 +1,8 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.*;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverParamException;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnsupportedOpException;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
@ -9,6 +11,8 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;
@ -39,13 +43,13 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
this.cmdTpl = new CommandTemplate(opTemplate);
if (cmdTpl.isDynamic("op_scope")) {
throw new RuntimeException("op_scope must be static");
throw new PulsarDriverParamException("\"op_scope\" parameter must be static");
}
// TODO: At the moment, only supports static "client"
if (cmdTpl.containsKey("client")) {
if (cmdTpl.isDynamic("client")) {
throw new RuntimeException("\"client\" can't be made dynamic!");
throw new PulsarDriverParamException("\"client\" parameter can't be made dynamic!");
} else {
String client_name = cmdTpl.getStatic("client");
this.clientSpace = pcache.getPulsarSpace(client_name);
@ -65,12 +69,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private LongFunction<PulsarOp> resolve() {
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
throw new PulsarDriverParamException("[resolve()] \"optype\" parameter must be static and have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
throw new PulsarDriverParamException("[resolve()] \"topic_url\" parameter is not valid. Perhaps you mean \"topic_uri\"?");
}
// Doc-level parameter: topic_uri
@ -91,7 +95,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
asyncApiFunc = (l) -> value;
} else {
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
}
}
logger.info("async_api: {}", asyncApiFunc.apply(0));
@ -103,7 +107,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label));
useTransactionFunc = (l) -> value;
} else {
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label + "\" parameter cannot be dynamic!");
}
}
logger.info("use_transaction: {}", useTransactionFunc.apply(0));
@ -114,7 +118,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label))
adminDelOpFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!");
}
logger.info("admin_delop: {}", adminDelOpFunc.apply(0));
@ -124,10 +128,20 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label))
seqTrackingFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label + "\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label + "\" parameter cannot be dynamic!");
}
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
// Doc-level parameter: msg_dedup_broker
LongFunction<Boolean> brokerMsgDedupFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label))
brokerMsgDedupFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label));
else
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label + "\" parameter cannot be dynamic!");
}
logger.info("msg_dedup_broker: {}", seqTrackingFunc.apply(0));
// TODO: Complete implementation for websocket-producer and managed-ledger
// Admin operation: create/delete tenant
@ -148,11 +162,24 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
// Regular/non-admin operation: single message consuming from a single topic (consumer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, false);
return resolveMsgConsume(
clientSpace,
topicUriFunc,
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
brokerMsgDedupFunc,
false);
}
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_MULTI_CONSUME.label)) {
return resolveMultiTopicMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc);
return resolveMultiTopicMsgConsume(
clientSpace,
topicUriFunc,
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
brokerMsgDedupFunc);
}
// Regular/non-admin operation: single message consuming a single topic (reader)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
@ -176,11 +203,18 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
// Regular/non-admin operation: end-to-end message processing - consuming message
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, true);
return resolveMsgConsume(
clientSpace,
topicUriFunc,
asyncApiFunc,
useTransactionFunc,
seqTrackingFunc,
brokerMsgDedupFunc,
true);
}
// Invalid operation type
else {
throw new RuntimeException("Unsupported Pulsar operation type");
throw new PulsarDriverUnsupportedOpException();
}
}
@ -192,7 +226,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
{
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
throw new PulsarDriverParamException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
}
LongFunction<Set<String>> adminRolesFunc;
@ -325,7 +359,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
if (cmdTpl.isStatic("seqerr_simu")) {
seqErrSimuTypeFunc = (l) -> cmdTpl.getStatic("seqerr_simu");
} else {
throw new RuntimeException("\"seqerr_simu\" parameter cannot be dynamic!");
throw new PulsarDriverParamException("[resolveMsgSend()] \"seqerr_simu\" parameter cannot be dynamic!");
}
}
@ -359,7 +393,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("Producer:: \"msg_value\" field must be specified!");
throw new PulsarDriverParamException("[resolveMsgSend()] \"msg_value\" field must be specified!");
}
return new PulsarProducerMapper(
@ -383,6 +417,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Boolean> brokerMsgDupFunc,
boolean e2eMsgProc
) {
LongFunction<String> subscription_name_func;
@ -415,6 +450,35 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Boolean> topicMsgDedupFunc = (l) -> {
String topic = topic_uri_func.apply(l);
String namespace = PulsarActivityUtil.getFullNamespaceName(topic);
PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin();
// Check namespace-level deduplication setting
// - default to broker level deduplication setting
boolean nsMsgDedup = brokerMsgDupFunc.apply(l);
try {
nsMsgDedup = pulsarAdmin.namespaces().getDeduplicationStatus(namespace);
}
catch (PulsarAdminException pae) {
// it is fine if we're unable to check namespace level setting; use default
}
// Check topic-level deduplication setting
// - default to namespace level deduplication setting
boolean topicMsgDedup = nsMsgDedup;
try {
topicMsgDedup = pulsarAdmin.topics().getDeduplicationStatus(topic);
}
catch (PulsarAdminException pae) {
// it is fine if we're unable to check topic level setting; use default
}
return topicMsgDedup;
};
LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer(
topic_uri_func.apply(l),
@ -431,6 +495,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
useTransactionFunc,
seqTrackingFunc,
transactionSupplierFunc,
topicMsgDedupFunc,
consumerFunc,
e2eMsgProc);
}
@ -440,7 +505,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Boolean> brokerMsgDupFunc
) {
// Topic list (multi-topic)
LongFunction<String> topic_names_func;
@ -510,6 +576,15 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
useTransactionFunc,
seqTrackingFunc,
transactionSupplierFunc,
// For multi-topic subscription message consumption,
// - Only consider broker-level message deduplication setting
// - Ignore namespace- and topic-level message deduplication setting
//
// This is because Pulsar is able to specify a list of topics from
// different namespaces. In theory, we can get topic deduplication
// status from each message, but this will be too much overhead.
// e.g. pulsarAdmin.getPulsarAdmin().topics().getDeduplicationStatus(message.getTopicName())
brokerMsgDupFunc,
mtConsumerFunc,
false);
}
@ -609,7 +684,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
valueFunc = (l) -> null;
}
} else {
throw new RuntimeException("Batch Producer:: \"msg_value\" field must be specified!");
throw new PulsarDriverParamException("[resolveMsgBatchSend()] \"msg_value\" field must be specified!");
}
return new PulsarBatchProducerMapper(

View File

@ -58,7 +58,8 @@ public class PulsarActivityUtil {
ASYNC_API("async_api"),
USE_TRANSACTION("use_transaction"),
ADMIN_DELOP("admin_delop"),
SEQ_TRACKING("seq_tracking");
SEQ_TRACKING("seq_tracking"),
MSG_DEDUP_BROKER("msg_dedup_broker");
public final String label;
@ -305,7 +306,8 @@ public class PulsarActivityUtil {
// Pulsar subscription type
public enum SEQ_ERROR_SIMU_TYPE {
OutOfOrder("out_of_order"),
DataLoss("data_loss");
MsgLoss("msg_loss"),
MsgDup("msg_dup");
public final String label;
@ -476,5 +478,17 @@ public class PulsarActivityUtil {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonStr, Map.class);
}
///////
// Get full namespace name (<tenant>/<namespace>) from a Pulsar topic URI
public static String getFullNamespaceName(String topicUri) {
// Get tenant/namespace string
// - topicUri : persistent://<tenant>/<namespace>/<topic>
// - tmpStr : <tenant>/<namespace>/<topic>
// - fullNsName : <tenant>/<namespace>
String tmpStr = StringUtils.substringAfter(topicUri,"://");
return StringUtils.substringBeforeLast(tmpStr, "/");
}
}

View File

@ -8,7 +8,9 @@ params:
topic_uri: "persistent://public/default/sanity_seqloss2"
# Only applicable to producer and consumer
# - used for message ordering and message loss check
seq_tracking: "false"
async_api: "true"
seq_tracking: "true"
msg_dedup_broker: "true"
blocks:
- name: producer-block
@ -19,7 +21,8 @@ blocks:
- name: s1
optype: msg-send
#seqerr_simu: "out_of_order"
seqerr_simu: "data_loass"
#seqerr_simu: "msg_loss"
#seqerr_simu: "msg_dup"
msg_key:
msg_property:
msg_value: "{myvalue}"

View File

@ -16,7 +16,7 @@
- [1.5. Message Properties](#15-message-properties)
- [1.6. Schema Support](#16-schema-support)
- [1.7. Measure End-to-end Message Processing Latency](#17-measure-end-to-end-message-processing-latency)
- [1.8. Detect Message Out-of-order Error and Message Loss](#18-detect-message-out-of-order-error-and-message-loss)
- [1.8. Detect Message Out-of-order, Message Loss, and Message Duplication](#18-detect-message-out-of-order-message-loss-and-message-duplication)
- [1.9. NB Activity Execution Parameters](#19-nb-activity-execution-parameters)
- [1.10. NB Pulsar Driver Execution Example](#110-nb-pulsar-driver-execution-example)
- [1.11. Appendix A. Template Global Setting File (config.properties)](#111-appendix-a-template-global-setting-file-configproperties)
@ -127,6 +127,8 @@ At high level, Pulsar driver yaml file has the following structure:
* **seq_tracking**: Whether to do message sequence tracking. This is
used for message out-of-order and message loss detection (more on
this later).
* **msg_dedup_broker**: Whether or not broker level message deduplication
is enabled.
* **blocks**: includes a series of command blocks. Each command block
defines one major Pulsar operation such as *producer*, *consumer*, etc.
Right now, the following command blocks are already supported or will be
@ -154,7 +156,10 @@ bindings:
params:
topic_uri: "<pulsar_topic_name>"
async_api: "false"
use_transaction: "false"
admin_delop: "false"
seq_transaction: "false"
msg_dedup_broker: "false"
blocks:
- name: <command_block_1>
@ -684,7 +689,7 @@ NTP protocol.
2) If there is some time lag of starting the consumer, we need to count that
into consideration when interpreting the end-to-end message processing latency.
## 1.8. Detect Message Out-of-order Error and Message Loss
## 1.8. Detect Message Out-of-order, Message Loss, and Message Duplication
In order to detect errors like message out-of-order and message loss through
the NB Pulsar driver, we need to set the following document level parameter
@ -696,6 +701,24 @@ params:
seq_tracking: "true"
```
For message duplication detection, if broker level message dedup configuration
is enabled ("brokerDeduplicationEnabled=true" in broker.conf), we also need to
enable this document level parameter:
```
params:
msg_dedup_broker: "true"
```
However, since message dedup. can be also enabled or disabled at namespace level
or topic level, the NB Pulsar driver will also check the settings at these layers
through API. Basically, the final message dedup setting for a topic is determined
by the following rules:
* if topic level message dedup is not set, check namespace level setting
* if namespace level message dedup is not set, check broker level setting which
in turn is determined by the document level NB parameter **msg_dedup_broker**
* if message dedup is enabled at multiple levels, the priority sequence follows:
* topic level > namespace level > broker level
The logic of how this works is based on the fact that NB execution cycle number
is monotonically increasing by 1 for every cycle moving forward. When publishing
a series of messages, we use the current NB cycle number as one message property
@ -703,17 +726,17 @@ which is also monotonically increasing by 1.
When receiving the messages, if the message sequence number stored in the message
property is not monotonically increasing or if there is a gap larger than 1, then
it means the messages are either delivered out of the order or there are some message
loss. Either way, the consumer NB execution will throw runtime exceptions, with the
following messages respectively:
it must be one of the following errors:
* If the current message sequence ID is less than the previous message sequence ID,
then it is message out-of-order error. Exception **PulsarMsgOutOfOrderException**
will be thrown out.
* if the current message sequence ID is more than 1 bigger than the previous message
sequence ID, then it is message loss error. Exception **PulsarMsgLossException**
will be thrown out.
* if message dedup is enabled and the current message sequence ID is equal to the
previous message sequence ID, then it is message duplication error. Exception **PulsarMsgDuplicateException** will be thrown out.
```text
"Detected message ordering is not guaranteed. Older messages are received earlier!"
```
```text
"Detected message sequence id gap. Some published messages are not received!"
```
In either case, a runtime error will be thrown out with corresponding error messages.
## 1.9. NB Activity Execution Parameters