diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java index 157926bb2..93f4027d3 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java @@ -22,6 +22,7 @@ import io.nosqlbench.adapter.kafka.ops.KafkaOp; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; +import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.StringUtils; @@ -34,6 +35,14 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; import java.util.function.LongFunction; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.Set; +import java.util.HashSet; +import java.util.Arrays; +import java.util.Optional; +import java.util.Collections; +import java.util.LinkedHashSet; public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { @@ -49,12 +58,18 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { private final LongFunction msgHeaderJsonStrFunc; private final LongFunction msgKeyStrFunc; private final LongFunction msgValueStrFunc; + protected final LongFunction seqTrackingFunc; + protected final LongFunction> msgSeqErrSimuTypeSetFunc; public MessageProducerOpDispenser(DriverAdapter adapter, ParsedOp op, LongFunction tgtNameFunc, KafkaSpace kafkaSpace) { super(adapter, op, tgtNameFunc, kafkaSpace); + // Doc-level parameter: seq_tracking + this.seqTrackingFunc = lookupStaticBoolConfigValueFunc( + PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false); + this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc(); this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap()); producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr()); @@ -126,6 +141,8 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { asyncAPI, transactionEnabled, txnBatchNum, + seqTrackingFunc.apply(cycle), + msgSeqErrSimuTypeSetFunc.apply(cycle), producer); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); } @@ -208,4 +225,28 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { opTimeTrackKafkaProducer, message); } + + protected LongFunction> getStaticErrSimuTypeSetOpValueFunc() { + LongFunction> setStringLongFunction; + setStringLongFunction = (l) -> + parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class) + .filter(Predicate.not(String::isEmpty)) + .map(value -> { + Set set = new HashSet<>(); + + if (StringUtils.contains(value,',')) { + set = Arrays.stream(value.split(",")) + .map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + return set; + }).orElse(Collections.emptySet()); + logger.info( + PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}", + setStringLongFunction.apply(0)); + return setStringLongFunction; + } } diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java index d44265891..b3be470fb 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java @@ -20,6 +20,8 @@ package io.nosqlbench.adapter.kafka.ops; import io.nosqlbench.adapter.kafka.KafkaSpace; import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; +import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler; +import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -31,6 +33,9 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.common.errors.TimeoutException; @@ -45,6 +50,10 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { private final boolean asyncMsgAck; private final boolean transactEnabledConfig; private final int txnBatchNum; + private ThreadLocal> MessageSequenceNumberSendingHandlersThreadLocal = + ThreadLocal.withInitial(HashMap::new); + private final boolean seqTracking; + private final Set errSimuTypeSet; enum TxnProcResult { SUCCESS, @@ -67,11 +76,15 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { boolean asyncMsgAck, boolean transactEnabledConfig, int txnBatchNum, + boolean seqTracking, + Set errSimuTypeSet, KafkaProducer producer) { super(kafkaSpace); this.asyncMsgAck = asyncMsgAck; this.transactEnabledConfig = transactEnabledConfig; this.txnBatchNum = txnBatchNum; + this.seqTracking = seqTracking; + this.errSimuTypeSet = errSimuTypeSet; this.transactionEnabled = transactEnabledConfig && (txnBatchNum > 2); this.producer = producer; } @@ -193,6 +206,11 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { } ProducerRecord message = (ProducerRecord) cycleObj; + if (seqTracking) { + long nextSequenceNumber = getMessageSequenceNumberSendingHandler(message.topic()) + .getNextSequenceNumber(errSimuTypeSet); + message.headers().add(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber).getBytes()); + } try { if (result == TxnProcResult.SUCCESS) { Future responseFuture = producer.send(message, new Callback() { @@ -261,4 +279,9 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { e.printStackTrace(); } } + + private MessageSequenceNumberSendingHandler getMessageSequenceNumberSendingHandler(String topicName) { + return MessageSequenceNumberSendingHandlersThreadLocal.get() + .computeIfAbsent(topicName, k -> new MessageSequenceNumberSendingHandler()); + } }