Add sequence number to produced kafka records

This commit is contained in:
Massimiliano Mirelli
2023-03-01 12:53:57 +02:00
parent a85ef30a76
commit 375af41131
2 changed files with 64 additions and 0 deletions

View File

@@ -22,6 +22,7 @@ import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer; import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp; import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@@ -34,6 +35,14 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.function.LongFunction; import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Optional;
import java.util.Collections;
import java.util.LinkedHashSet;
public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
@@ -49,12 +58,18 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
private final LongFunction<String> msgHeaderJsonStrFunc; private final LongFunction<String> msgHeaderJsonStrFunc;
private final LongFunction<String> msgKeyStrFunc; private final LongFunction<String> msgKeyStrFunc;
private final LongFunction<String> msgValueStrFunc; private final LongFunction<String> msgValueStrFunc;
protected final LongFunction<Boolean> seqTrackingFunc;
protected final LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> msgSeqErrSimuTypeSetFunc;
public MessageProducerOpDispenser(DriverAdapter adapter, public MessageProducerOpDispenser(DriverAdapter adapter,
ParsedOp op, ParsedOp op,
LongFunction<String> tgtNameFunc, LongFunction<String> tgtNameFunc,
KafkaSpace kafkaSpace) { KafkaSpace kafkaSpace) {
super(adapter, op, tgtNameFunc, kafkaSpace); super(adapter, op, tgtNameFunc, kafkaSpace);
// Doc-level parameter: seq_tracking
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap()); this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr()); producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
@@ -126,6 +141,8 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
asyncAPI, asyncAPI,
transactionEnabled, transactionEnabled,
txnBatchNum, txnBatchNum,
seqTrackingFunc.apply(cycle),
msgSeqErrSimuTypeSetFunc.apply(cycle),
producer); producer);
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
} }
@@ -208,4 +225,28 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
opTimeTrackKafkaProducer, opTimeTrackKafkaProducer,
message); message);
} }
protected LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
setStringLongFunction = (l) ->
parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
.map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
return set;
}).orElse(Collections.emptySet());
logger.info(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}",
setStringLongFunction.apply(0));
return setStringLongFunction;
}
} }

View File

@@ -20,6 +20,8 @@ package io.nosqlbench.adapter.kafka.ops;
import io.nosqlbench.adapter.kafka.KafkaSpace; import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException; import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
@@ -31,6 +33,9 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TimeoutException;
@@ -45,6 +50,10 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
private final boolean asyncMsgAck; private final boolean asyncMsgAck;
private final boolean transactEnabledConfig; private final boolean transactEnabledConfig;
private final int txnBatchNum; private final int txnBatchNum;
private ThreadLocal<Map<String, MessageSequenceNumberSendingHandler>> MessageSequenceNumberSendingHandlersThreadLocal =
ThreadLocal.withInitial(HashMap::new);
private final boolean seqTracking;
private final Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
enum TxnProcResult { enum TxnProcResult {
SUCCESS, SUCCESS,
@@ -67,11 +76,15 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
boolean asyncMsgAck, boolean asyncMsgAck,
boolean transactEnabledConfig, boolean transactEnabledConfig,
int txnBatchNum, int txnBatchNum,
boolean seqTracking,
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
KafkaProducer<String, String> producer) { KafkaProducer<String, String> producer) {
super(kafkaSpace); super(kafkaSpace);
this.asyncMsgAck = asyncMsgAck; this.asyncMsgAck = asyncMsgAck;
this.transactEnabledConfig = transactEnabledConfig; this.transactEnabledConfig = transactEnabledConfig;
this.txnBatchNum = txnBatchNum; this.txnBatchNum = txnBatchNum;
this.seqTracking = seqTracking;
this.errSimuTypeSet = errSimuTypeSet;
this.transactionEnabled = transactEnabledConfig && (txnBatchNum > 2); this.transactionEnabled = transactEnabledConfig && (txnBatchNum > 2);
this.producer = producer; this.producer = producer;
} }
@@ -193,6 +206,11 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
} }
ProducerRecord<String, String> message = (ProducerRecord<String, String>) cycleObj; ProducerRecord<String, String> message = (ProducerRecord<String, String>) cycleObj;
if (seqTracking) {
long nextSequenceNumber = getMessageSequenceNumberSendingHandler(message.topic())
.getNextSequenceNumber(errSimuTypeSet);
message.headers().add(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber).getBytes());
}
try { try {
if (result == TxnProcResult.SUCCESS) { if (result == TxnProcResult.SUCCESS) {
Future<RecordMetadata> responseFuture = producer.send(message, new Callback() { Future<RecordMetadata> responseFuture = producer.send(message, new Callback() {
@@ -261,4 +279,9 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
e.printStackTrace(); e.printStackTrace();
} }
} }
private MessageSequenceNumberSendingHandler getMessageSequenceNumberSendingHandler(String topicName) {
return MessageSequenceNumberSendingHandlersThreadLocal.get()
.computeIfAbsent(topicName, k -> new MessageSequenceNumberSendingHandler());
}
} }