mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-11-25 18:20:33 -06:00
Add support for message duplication check (if message de-duplication is enabled, either at broker-level, namespace-level, or topic-level)
This commit is contained in:
parent
fd7a6176dd
commit
e9e7e95113
@ -4,11 +4,9 @@ import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.admin.Namespaces;
|
||||
import org.apache.pulsar.client.admin.PulsarAdmin;
|
||||
import org.apache.pulsar.client.admin.PulsarAdminException;
|
||||
import org.apache.pulsar.client.admin.Tenants;
|
||||
import org.apache.pulsar.client.admin.*;
|
||||
import org.apache.pulsar.common.policies.data.TenantInfo;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
|
@ -1,6 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -33,13 +34,7 @@ public class PulsarAdminTopicOp extends PulsarAdminOp {
|
||||
this.topicUri = topicUri;
|
||||
this.partitionTopic = partitionTopic;
|
||||
this.partitionNum = partitionNum;
|
||||
|
||||
// Get tenant/namespace string
|
||||
// - topicUri : persistent://<tenant>/<namespace>/<topic>
|
||||
// - tmpStr : <tenant>/<namespace>/<topic>
|
||||
// - fullNsName : <tenant>/<namespace>
|
||||
String tmpStr = StringUtils.substringAfter(this.topicUri,"://");
|
||||
this.fullNsName = StringUtils.substringBeforeLast(tmpStr, "/");
|
||||
this.fullNsName = PulsarActivityUtil.getFullNamespaceName(this.topicUri);
|
||||
}
|
||||
|
||||
// Check whether the specified topic already exists
|
||||
|
@ -30,6 +30,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
|
||||
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
|
||||
|
||||
private final LongFunction<Consumer<?>> consumerFunc;
|
||||
private final LongFunction<Boolean> topicMsgDedupFunc;
|
||||
private final boolean e2eMsProc;
|
||||
|
||||
public PulsarConsumerMapper(CommandTemplate cmdTpl,
|
||||
@ -39,10 +40,12 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
|
||||
LongFunction<Boolean> useTransactionFunc,
|
||||
LongFunction<Boolean> seqTrackingFunc,
|
||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
|
||||
LongFunction<Boolean> topicMsgDedupFunc,
|
||||
LongFunction<Consumer<?>> consumerFunc,
|
||||
boolean e2eMsgProc) {
|
||||
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
|
||||
this.consumerFunc = consumerFunc;
|
||||
this.topicMsgDedupFunc = topicMsgDedupFunc;
|
||||
this.e2eMsProc = e2eMsgProc;
|
||||
}
|
||||
|
||||
@ -53,6 +56,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
|
||||
boolean useTransaction = useTransactionFunc.apply(value);
|
||||
boolean seqTracking = seqTrackingFunc.apply(value);
|
||||
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
|
||||
boolean topicMsgDedup = topicMsgDedupFunc.apply(value);
|
||||
|
||||
return new PulsarConsumerOp(
|
||||
pulsarActivity,
|
||||
@ -60,6 +64,7 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
|
||||
useTransaction,
|
||||
seqTracking,
|
||||
transactionSupplier,
|
||||
topicMsgDedup,
|
||||
consumer,
|
||||
clientSpace.getPulsarSchema(),
|
||||
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
|
||||
|
@ -6,6 +6,7 @@ import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pulsar.PulsarActivity;
|
||||
import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
@ -27,6 +28,7 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
private final boolean seqTracking;
|
||||
private final Supplier<Transaction> transactionSupplier;
|
||||
|
||||
private final boolean topicMsgDedup;
|
||||
private final Consumer<?> consumer;
|
||||
private final Schema<?> pulsarSchema;
|
||||
private final int timeoutSeconds;
|
||||
@ -34,7 +36,7 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
private final long curCycleNum;
|
||||
|
||||
private long curMsgSeqId;
|
||||
private long prevMsgSeqid;
|
||||
private long prevMsgSeqId;
|
||||
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messageSizeHistogram;
|
||||
@ -49,6 +51,7 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
boolean useTransaction,
|
||||
boolean seqTracking,
|
||||
Supplier<Transaction> transactionSupplier,
|
||||
boolean topicMsgDedup,
|
||||
Consumer<?> consumer,
|
||||
Schema<?> schema,
|
||||
int timeoutSeconds,
|
||||
@ -62,6 +65,7 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
this.seqTracking = seqTracking;
|
||||
this.transactionSupplier = transactionSupplier;
|
||||
|
||||
this.topicMsgDedup = topicMsgDedup;
|
||||
this.consumer = consumer;
|
||||
this.pulsarSchema = schema;
|
||||
this.timeoutSeconds = timeoutSeconds;
|
||||
@ -69,7 +73,7 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
this.e2eMsgProc = e2eMsgProc;
|
||||
|
||||
this.curMsgSeqId = 0;
|
||||
this.prevMsgSeqid = 0;
|
||||
this.prevMsgSeqId = (curCycleNum - 1);
|
||||
|
||||
this.bytesCounter = pulsarActivity.getBytesCounter();
|
||||
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
|
||||
@ -136,22 +140,32 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
}
|
||||
|
||||
// keep track of message ordering and message loss
|
||||
if (seqTracking) {
|
||||
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
|
||||
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
|
||||
if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) {
|
||||
curMsgSeqId = Long.parseLong(msgSeqIdStr);
|
||||
|
||||
// normal case: message sequence id is monotonically increasing by 1
|
||||
if ((curMsgSeqId - prevMsgSeqid) == 1) {
|
||||
prevMsgSeqid = curMsgSeqId;
|
||||
}
|
||||
else {
|
||||
// abnormal case: out of ordering
|
||||
if (curMsgSeqId < prevMsgSeqid) {
|
||||
throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!");
|
||||
}
|
||||
// abnormal case: message loss
|
||||
else if ( (curMsgSeqId - prevMsgSeqid) > 1 ) {
|
||||
throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!");
|
||||
if ( prevMsgSeqId > -1) {
|
||||
// normal case: message sequence id is monotonically increasing by 1
|
||||
if ((curMsgSeqId - prevMsgSeqId) != 1) {
|
||||
// abnormal case: out of ordering
|
||||
if (curMsgSeqId < prevMsgSeqId) {
|
||||
throw new RuntimeException("" +
|
||||
"[SyncAPI] Detected message ordering is not guaranteed (curCycleNum=" + curCycleNum +
|
||||
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
|
||||
"Older messages are received earlier!");
|
||||
}
|
||||
// abnormal case: message loss
|
||||
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
|
||||
throw new RuntimeException("" +
|
||||
"[SyncAPI] Detected message sequence id gap (curCycleNum=" + curCycleNum +
|
||||
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
|
||||
"Some published messages are not received!");
|
||||
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
|
||||
throw new RuntimeException("" +
|
||||
"[SyncAPI] Detected duplicate message when message deduplication is enabled " +
|
||||
"(curCycleNum=" + curCycleNum + ", curMsgSeqId=" + curMsgSeqId +
|
||||
", prevMsgSeqId=" + prevMsgSeqId + ")!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -230,22 +244,33 @@ public class PulsarConsumerOp implements PulsarOp {
|
||||
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
|
||||
}
|
||||
|
||||
// keep track of message ordering and message loss
|
||||
if (seqTracking) {
|
||||
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
|
||||
// keep track of message ordering, message loss, and message duplication
|
||||
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
|
||||
if ( (seqTracking) && !StringUtils.isBlank(msgSeqIdStr) ) {
|
||||
curMsgSeqId = Long.parseLong(msgSeqIdStr);
|
||||
|
||||
// normal case: message sequence id is monotonically increasing by 1
|
||||
if ((curMsgSeqId - prevMsgSeqid) == 1) {
|
||||
prevMsgSeqid = curMsgSeqId;
|
||||
} else {
|
||||
// abnormal case: out of ordering
|
||||
if (curMsgSeqId < prevMsgSeqid) {
|
||||
throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!");
|
||||
}
|
||||
// abnormal case: message loss
|
||||
else if ((curMsgSeqId - prevMsgSeqid) > 1) {
|
||||
throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!");
|
||||
if (prevMsgSeqId > -1) {
|
||||
// normal case: message sequence id is monotonically increasing by 1
|
||||
if ((curMsgSeqId - prevMsgSeqId) != 1) {
|
||||
// abnormal case: out of ordering
|
||||
if (curMsgSeqId < prevMsgSeqId) {
|
||||
throw new RuntimeException("" +
|
||||
"[AsyncAPI] Detected message ordering is not guaranteed (curCycleNum=" + curCycleNum +
|
||||
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
|
||||
"Older messages are received earlier!");
|
||||
}
|
||||
// abnormal case: message loss
|
||||
else if ((curMsgSeqId - prevMsgSeqId) > 1) {
|
||||
throw new RuntimeException("" +
|
||||
"[AsyncAPI] Detected message sequence id gap (curCycleNum=" + curCycleNum +
|
||||
", curMsgSeqId=" + curMsgSeqId + ", prevMsgSeqId=" + prevMsgSeqId + "). " +
|
||||
"Some published messages are not received!");
|
||||
} else if (topicMsgDedup && (curMsgSeqId == prevMsgSeqId)) {
|
||||
throw new RuntimeException("" +
|
||||
"[AsyncAPI] Detected duplicate message when message deduplication is enabled " +
|
||||
"(curCycleNum=" + curCycleNum + ", curMsgSeqId=" + curMsgSeqId +
|
||||
", prevMsgSeqId=" + prevMsgSeqId + ")!");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,5 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.nosqlbench.driver.pulsar.PulsarActivity;
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
@ -12,7 +11,6 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.LongFunction;
|
||||
@ -70,14 +68,17 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
|
||||
|
||||
// Simulate error 10% of the time
|
||||
float rndVal = RandomUtils.nextFloat(0, 1.0f);
|
||||
boolean simulationError = (rndVal > 0) && (rndVal < 0.1f);
|
||||
boolean simulationError = (rndVal >= 0) && (rndVal < 0.1f);
|
||||
String seqErrSimuType = seqErrSimuTypeFunc.apply(value);
|
||||
boolean simulateMsgOutofOrder = simulationError &&
|
||||
!StringUtils.isBlank(seqErrSimuType) &&
|
||||
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label);
|
||||
boolean simulateMsgLoss = simulationError &&
|
||||
!StringUtils.isBlank(seqErrSimuType) &&
|
||||
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.DataLoss.label);
|
||||
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgLoss.label);
|
||||
boolean simulateMsgDup = simulationError &&
|
||||
!StringUtils.isBlank(seqErrSimuType) &&
|
||||
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.MsgDup.label);
|
||||
|
||||
String msgKey = keyFunc.apply(value);
|
||||
String msgPayload = payloadFunc.apply(value);
|
||||
@ -100,14 +101,22 @@ public class PulsarProducerMapper extends PulsarTransactOpMapper {
|
||||
|
||||
// Set message sequence tracking property
|
||||
if (seqTracking) {
|
||||
if (!simulateMsgOutofOrder) {
|
||||
// normal case
|
||||
if (!simulateMsgOutofOrder && !simulateMsgDup) {
|
||||
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value));
|
||||
}
|
||||
else {
|
||||
// simulate message out of order
|
||||
else if ( simulateMsgOutofOrder ) {
|
||||
int rndmOffset = 2;
|
||||
if (value > rndmOffset)
|
||||
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-rndmOffset));
|
||||
}
|
||||
// simulate message duplication
|
||||
else {
|
||||
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-1));
|
||||
}
|
||||
// message loss simulation is not done by message property
|
||||
// we simply skip sending message in the current NB cycle
|
||||
}
|
||||
|
||||
return new PulsarProducerOp(
|
||||
|
@ -9,6 +9,8 @@ import org.apache.commons.lang3.BooleanUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.admin.PulsarAdmin;
|
||||
import org.apache.pulsar.client.admin.PulsarAdminException;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.Reader;
|
||||
@ -128,6 +130,16 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
}
|
||||
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
|
||||
|
||||
// Doc-level parameter: msg_dedup_broker
|
||||
LongFunction<Boolean> brokerMsgDedupFunc = (l) -> false;
|
||||
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label)) {
|
||||
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label))
|
||||
brokerMsgDedupFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label));
|
||||
else
|
||||
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label + "\" parameter cannot be dynamic!");
|
||||
}
|
||||
logger.info("msg_dedup_broker: {}", seqTrackingFunc.apply(0));
|
||||
|
||||
|
||||
// TODO: Complete implementation for websocket-producer and managed-ledger
|
||||
// Admin operation: create/delete tenant
|
||||
@ -148,11 +160,24 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
}
|
||||
// Regular/non-admin operation: single message consuming from a single topic (consumer)
|
||||
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
|
||||
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, false);
|
||||
return resolveMsgConsume(
|
||||
clientSpace,
|
||||
topicUriFunc,
|
||||
asyncApiFunc,
|
||||
useTransactionFunc,
|
||||
seqTrackingFunc,
|
||||
brokerMsgDedupFunc,
|
||||
false);
|
||||
}
|
||||
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
|
||||
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_MULTI_CONSUME.label)) {
|
||||
return resolveMultiTopicMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc);
|
||||
return resolveMultiTopicMsgConsume(
|
||||
clientSpace,
|
||||
topicUriFunc,
|
||||
asyncApiFunc,
|
||||
useTransactionFunc,
|
||||
seqTrackingFunc,
|
||||
brokerMsgDedupFunc);
|
||||
}
|
||||
// Regular/non-admin operation: single message consuming a single topic (reader)
|
||||
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
|
||||
@ -176,7 +201,14 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
}
|
||||
// Regular/non-admin operation: end-to-end message processing - consuming message
|
||||
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_CONSUME.label)) {
|
||||
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, true);
|
||||
return resolveMsgConsume(
|
||||
clientSpace,
|
||||
topicUriFunc,
|
||||
asyncApiFunc,
|
||||
useTransactionFunc,
|
||||
seqTrackingFunc,
|
||||
brokerMsgDedupFunc,
|
||||
true);
|
||||
}
|
||||
// Invalid operation type
|
||||
else {
|
||||
@ -383,6 +415,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
LongFunction<Boolean> async_api_func,
|
||||
LongFunction<Boolean> useTransactionFunc,
|
||||
LongFunction<Boolean> seqTrackingFunc,
|
||||
LongFunction<Boolean> brokerMsgDupFunc,
|
||||
boolean e2eMsgProc
|
||||
) {
|
||||
LongFunction<String> subscription_name_func;
|
||||
@ -415,6 +448,35 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
|
||||
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
|
||||
|
||||
LongFunction<Boolean> topicMsgDedupFunc = (l) -> {
|
||||
String topic = topic_uri_func.apply(l);
|
||||
String namespace = PulsarActivityUtil.getFullNamespaceName(topic);
|
||||
PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin();
|
||||
|
||||
// Check namespace-level deduplication setting
|
||||
// - default to broker level deduplication setting
|
||||
boolean nsMsgDedup = brokerMsgDupFunc.apply(l);
|
||||
try {
|
||||
nsMsgDedup = pulsarAdmin.namespaces().getDeduplicationStatus(namespace);
|
||||
}
|
||||
catch (PulsarAdminException pae) {
|
||||
// it is fine if we're unable to check namespace level setting; use default
|
||||
}
|
||||
|
||||
// Check topic-level deduplication setting
|
||||
// - default to namespace level deduplication setting
|
||||
boolean topicMsgDedup = nsMsgDedup;
|
||||
try {
|
||||
topicMsgDedup = pulsarAdmin.topics().getDeduplicationStatus(topic);
|
||||
}
|
||||
catch (PulsarAdminException pae) {
|
||||
// it is fine if we're unable to check topic level setting; use default
|
||||
}
|
||||
|
||||
return topicMsgDedup;
|
||||
};
|
||||
|
||||
|
||||
LongFunction<Consumer<?>> consumerFunc = (l) ->
|
||||
clientSpace.getConsumer(
|
||||
topic_uri_func.apply(l),
|
||||
@ -431,6 +493,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
useTransactionFunc,
|
||||
seqTrackingFunc,
|
||||
transactionSupplierFunc,
|
||||
topicMsgDedupFunc,
|
||||
consumerFunc,
|
||||
e2eMsgProc);
|
||||
}
|
||||
@ -440,7 +503,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
LongFunction<String> topic_uri_func,
|
||||
LongFunction<Boolean> async_api_func,
|
||||
LongFunction<Boolean> useTransactionFunc,
|
||||
LongFunction<Boolean> seqTrackingFunc
|
||||
LongFunction<Boolean> seqTrackingFunc,
|
||||
LongFunction<Boolean> brokerMsgDupFunc
|
||||
) {
|
||||
// Topic list (multi-topic)
|
||||
LongFunction<String> topic_names_func;
|
||||
@ -510,6 +574,15 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
useTransactionFunc,
|
||||
seqTrackingFunc,
|
||||
transactionSupplierFunc,
|
||||
// For multi-topic subscription message consumption,
|
||||
// - Only consider broker-level message deduplication setting
|
||||
// - Ignore namespace- and topic-level message deduplication setting
|
||||
//
|
||||
// This is because Pulsar is able to specify a list of topics from
|
||||
// different namespaces. In theory, we can get topic deduplication
|
||||
// status from each message, but this will be too much overhead.
|
||||
// e.g. pulsarAdmin.getPulsarAdmin().topics().getDeduplicationStatus(message.getTopicName())
|
||||
brokerMsgDupFunc,
|
||||
mtConsumerFunc,
|
||||
false);
|
||||
}
|
||||
|
@ -58,7 +58,8 @@ public class PulsarActivityUtil {
|
||||
ASYNC_API("async_api"),
|
||||
USE_TRANSACTION("use_transaction"),
|
||||
ADMIN_DELOP("admin_delop"),
|
||||
SEQ_TRACKING("seq_tracking");
|
||||
SEQ_TRACKING("seq_tracking"),
|
||||
MSG_DEDUP_BROKER("msg_dedup_broker");
|
||||
|
||||
public final String label;
|
||||
|
||||
@ -305,7 +306,8 @@ public class PulsarActivityUtil {
|
||||
// Pulsar subscription type
|
||||
public enum SEQ_ERROR_SIMU_TYPE {
|
||||
OutOfOrder("out_of_order"),
|
||||
DataLoss("data_loss");
|
||||
MsgLoss("msg_loss"),
|
||||
MsgDup("msg_dup");
|
||||
|
||||
public final String label;
|
||||
|
||||
@ -476,5 +478,17 @@ public class PulsarActivityUtil {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.readValue(jsonStr, Map.class);
|
||||
}
|
||||
|
||||
///////
|
||||
// Get full namespace name (<tenant>/<namespace>) from a Pulsar topic URI
|
||||
public static String getFullNamespaceName(String topicUri) {
|
||||
// Get tenant/namespace string
|
||||
// - topicUri : persistent://<tenant>/<namespace>/<topic>
|
||||
// - tmpStr : <tenant>/<namespace>/<topic>
|
||||
// - fullNsName : <tenant>/<namespace>
|
||||
|
||||
String tmpStr = StringUtils.substringAfter(topicUri,"://");
|
||||
return StringUtils.substringBeforeLast(tmpStr, "/");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -8,7 +8,9 @@ params:
|
||||
topic_uri: "persistent://public/default/sanity_seqloss2"
|
||||
# Only applicable to producer and consumer
|
||||
# - used for message ordering and message loss check
|
||||
seq_tracking: "false"
|
||||
async_api: "true"
|
||||
seq_tracking: "true"
|
||||
msg_dedup_broker: "true"
|
||||
|
||||
blocks:
|
||||
- name: producer-block
|
||||
@ -19,7 +21,8 @@ blocks:
|
||||
- name: s1
|
||||
optype: msg-send
|
||||
#seqerr_simu: "out_of_order"
|
||||
seqerr_simu: "data_loass"
|
||||
#seqerr_simu: "msg_loss"
|
||||
#seqerr_simu: "msg_dup"
|
||||
msg_key:
|
||||
msg_property:
|
||||
msg_value: "{myvalue}"
|
||||
|
@ -127,6 +127,8 @@ At high level, Pulsar driver yaml file has the following structure:
|
||||
* **seq_tracking**: Whether to do message sequence tracking. This is
|
||||
used for message out-of-order and message loss detection (more on
|
||||
this later).
|
||||
* **msg_dedup_broker**: Whether or not broker level message deduplication
|
||||
is enabled.
|
||||
* **blocks**: includes a series of command blocks. Each command block
|
||||
defines one major Pulsar operation such as *producer*, *consumer*, etc.
|
||||
Right now, the following command blocks are already supported or will be
|
||||
@ -154,7 +156,10 @@ bindings:
|
||||
params:
|
||||
topic_uri: "<pulsar_topic_name>"
|
||||
async_api: "false"
|
||||
use_transaction: "false"
|
||||
admin_delop: "false"
|
||||
seq_transaction: "false"
|
||||
msg_dedup_broker: "false"
|
||||
|
||||
blocks:
|
||||
- name: <command_block_1>
|
||||
@ -684,7 +689,7 @@ NTP protocol.
|
||||
2) If there is some time lag of starting the consumer, we need to count that
|
||||
into consideration when interpreting the end-to-end message processing latency.
|
||||
|
||||
## 1.8. Detect Message Out-of-order Error and Message Loss
|
||||
## 1.8. Detect Message Out-of-order, Message Loss, and Message Duplication
|
||||
|
||||
In order to detect errors like message out-of-order and message loss through
|
||||
the NB Pulsar driver, we need to set the following document level parameter
|
||||
@ -696,6 +701,24 @@ params:
|
||||
seq_tracking: "true"
|
||||
```
|
||||
|
||||
For message duplication detection, if broker level message dedup configuration
|
||||
is enabled ("brokerDeduplicationEnabled=true" in broker.conf), we also need to
|
||||
enable this document level parameter:
|
||||
```
|
||||
params:
|
||||
msg_dedup_broker: "true"
|
||||
```
|
||||
|
||||
However, since message dedup. can be also enabled or disabled at namespace level
|
||||
or topic level, the NB Pulsar driver will also check the settings at these layers
|
||||
through API. Basically, the final message dedup setting for a topic is determined
|
||||
by the following rules:
|
||||
* if topic level message dedup is not set, check namespace level setting
|
||||
* if namespace level message dedup is not set, check broker level setting which
|
||||
in turn is determined by the document level NB parameter **msg_dedup_broker**
|
||||
* if message dedup is enabled at multiple levels, the priority sequence follows:
|
||||
* topic level > namespace level > broker level
|
||||
|
||||
The logic of how this works is based on the fact that NB execution cycle number
|
||||
is monotonically increasing by 1 for every cycle moving forward. When publishing
|
||||
a series of messages, we use the current NB cycle number as one message property
|
||||
@ -703,17 +726,14 @@ which is also monotonically increasing by 1.
|
||||
|
||||
When receiving the messages, if the message sequence number stored in the message
|
||||
property is not monotonically increasing or if there is a gap larger than 1, then
|
||||
it means the messages are either delivered out of the order or there are some message
|
||||
loss. Either way, the consumer NB execution will throw runtime exceptions, with the
|
||||
following messages respectively:
|
||||
it must be one of the following errors:
|
||||
* if the current message sequence ID is less than the previous message sequence ID, then
|
||||
it is message out-of-order error
|
||||
* if the current message sequence ID is more than 1 bigger than the previous message sequence
|
||||
ID, then it is message loss error
|
||||
* if message dedup is enabled and the current message sequence ID is equal to the previous message sequence ID, then it is message duplication error
|
||||
|
||||
```text
|
||||
"Detected message ordering is not guaranteed. Older messages are received earlier!"
|
||||
```
|
||||
|
||||
```text
|
||||
"Detected message sequence id gap. Some published messages are not received!"
|
||||
```
|
||||
In either case, a runtime error will be thrown out with corresponding error messages.
|
||||
|
||||
## 1.9. NB Activity Execution Parameters
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user