Fix Kafka producer transaction issue and add support for subscribing from multiple topics.

NB yaml files and README update
This commit is contained in:
yabinmeng 2022-12-13 21:57:05 -06:00
parent 3943a29453
commit 37f889359e
11 changed files with 292 additions and 168 deletions

View File

@ -30,6 +30,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public class KafkaSpace implements AutoCloseable { public class KafkaSpace implements AutoCloseable {
@ -61,23 +62,25 @@ public class KafkaSpace implements AutoCloseable {
// - For Producer workload, this represents how many total producers to publish messages // - For Producer workload, this represents how many total producers to publish messages
// it must be the same value as the NB "threads" parameter // it must be the same value as the NB "threads" parameter
// - For Consumer workload, this represents how many total consumers per consumer group to subscribe messages // - For Consumer workload, this represents how many total consumers per consumer group to subscribe messages
// private final int kafkaClntNum;
private final int clntNum;
// Maximum number of Kafka consumer groups // Maximum number of Kafka consumer groups
// - This is only relevant for Consumer workload // - Only relevant for Consumer workload
// - (clntNum * consumerGrpNum) is the total consumer thread number and must be the same // - (topicPartNum * consumerGrpNum) is the total consumer thread number and must be the same
// as the NB "threads" parameter // as the NB "threads" parameter
// - For multi-topic testing, this means one consumer thread may read from multiple topics.
private final int consumerGrpNum; private final int consumerGrpNum;
private long totalCycleNum; private long totalCycleNum;
private AtomicBoolean beingShutdown = new AtomicBoolean(false);
public KafkaSpace(String spaceName, NBConfiguration cfg) { public KafkaSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName; this.spaceName = spaceName;
this.cfg = cfg; this.cfg = cfg;
this.bootstrapSvr = cfg.get("bootstrap_server"); this.bootstrapSvr = cfg.get("bootstrap_server");
this.clntNum = this.kafkaClntNum =
NumberUtils.toInt(cfg.getOptional("num_clnt").orElse("1")); NumberUtils.toInt(cfg.getOptional("num_clnt").orElse("1"));
this.consumerGrpNum = this.consumerGrpNum =
NumberUtils.toInt(cfg.getOptional("num_cons_grp").orElse("1")); NumberUtils.toInt(cfg.getOptional("num_cons_grp").orElse("1"));
@ -124,7 +127,7 @@ public class KafkaSpace implements AutoCloseable {
public String getBootstrapSvr() { return this.bootstrapSvr; } public String getBootstrapSvr() { return this.bootstrapSvr; }
public KafkaClientConf getKafkaClientConf() { return kafkaClientConf; } public KafkaClientConf getKafkaClientConf() { return kafkaClientConf; }
public int getClntNum() { return this.clntNum; } public int getKafkaClntNum() { return this.kafkaClntNum; }
public int getConsumerGrpNum() { return this.consumerGrpNum; } public int getConsumerGrpNum() { return this.consumerGrpNum; }
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; } public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
@ -132,14 +135,19 @@ public class KafkaSpace implements AutoCloseable {
public long getTotalCycleNum() { return totalCycleNum; } public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; } public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
public boolean isShuttigDown() {
return beingShutdown.get();
}
public void shutdownSpace() { public void shutdownSpace() {
try { try {
// Pause 5 seconds before closing producers/consumers beingShutdown.set(true);
KafkaAdapterUtil.pauseCurThreadExec(5);
for (OpTimeTrackKafkaClient client : opTimeTrackKafkaClients.values()) { for (OpTimeTrackKafkaClient client : opTimeTrackKafkaClients.values()) {
client.close(); client.close();
} }
// Pause 5 seconds before closing producers/consumers
KafkaAdapterUtil.pauseCurThreadExec(5);
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); e.printStackTrace();

View File

@ -34,7 +34,6 @@ import org.apache.logging.log4j.Logger;
import java.util.*; import java.util.*;
import java.util.function.LongFunction; import java.util.function.LongFunction;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors;
public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, KafkaSpace> { public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, KafkaSpace> {
@ -82,7 +81,7 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class)); this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class));
kafkaSpace.setTotalCycleNum(totalCycleNum); kafkaSpace.setTotalCycleNum(totalCycleNum);
this.kafkaClntCnt = kafkaSpace.getClntNum(); this.kafkaClntCnt = kafkaSpace.getKafkaClntNum();
this.consumerGrpCnt = kafkaSpace.getConsumerGrpNum(); this.consumerGrpCnt = kafkaSpace.getConsumerGrpNum();
this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class)); this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class));
@ -112,40 +111,6 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
return booleanLongFunction; return booleanLongFunction;
} }
protected LongFunction<Set<String>> lookupStaticStrSetOpValueFunc(String paramName) {
LongFunction<Set<String>> setStringLongFunction;
setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<String > set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
.map(String::trim)
.filter(Predicate.not(String::isEmpty))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
return set;
}).orElse(Collections.emptySet());
logger.info("{}: {}", paramName, setStringLongFunction.apply(0));
return setStringLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<Integer> lookupStaticIntOpValueFunc(String paramName, int defaultValue) {
LongFunction<Integer> integerLongFunction;
integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> NumberUtils.toInt(value))
.map(value -> {
if (value < 0) return 0;
else return value;
}).orElse(defaultValue);
logger.info("{}: {}", paramName, integerLongFunction.apply(0));
return integerLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value // If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) { protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) {
LongFunction<String> stringLongFunction; LongFunction<String> stringLongFunction;

View File

@ -31,11 +31,9 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.Arrays; import java.util.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.LongFunction; import java.util.function.LongFunction;
import java.util.stream.Collectors;
public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
@ -82,7 +80,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
private String getEffectiveGroupId(long cycle) { private String getEffectiveGroupId(long cycle) {
int grpIdx = (int) (cycle % consumerGrpCnt); int grpIdx = (int) (cycle % consumerGrpCnt);
String defaultGrpNamePrefix = "nb-grp"; String defaultGrpNamePrefix = KafkaAdapterUtil.DFT_CONSUMER_GROUP_NAME_PREFIX;
if (consumerClientConfMap.containsKey("group.id")) { if (consumerClientConfMap.containsKey("group.id")) {
defaultGrpNamePrefix = consumerClientConfMap.get("group.id"); defaultGrpNamePrefix = consumerClientConfMap.get("group.id");
} }
@ -91,10 +89,16 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
} }
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer( private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer(
String cacheKey, long cycle,
String groupId, List<String> topicNameList,
String topicName) String groupId)
{ {
String topicNameListStr = topicNameList.stream()
.collect(Collectors.joining("::"));
String cacheKey = KafkaAdapterUtil.buildCacheKey(
"consumer-" + String.valueOf(cycle % kafkaClntCnt), topicNameListStr, groupId );
OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey); OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (opTimeTrackKafkaClient == null) { if (opTimeTrackKafkaClient == null) {
Properties consumerConfProps = new Properties(); Properties consumerConfProps = new Properties();
@ -103,10 +107,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfProps); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfProps);
synchronized (this) { synchronized (this) {
consumer.subscribe(Arrays.asList(topicName)); consumer.subscribe(topicNameList);
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Kafka consumer created: {} -- {}", cacheKey, consumer); logger.debug("Kafka consumer created: {}/{} -- {}, {}, {}",
cacheKey,
consumer,
topicNameList,
autoCommitEnabled,
maxMsgCntPerCommit);
} }
opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer( opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(
@ -117,19 +126,27 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
return opTimeTrackKafkaClient; return opTimeTrackKafkaClient;
} }
protected List<String> getEffectiveTopicNameList(long cycle) {
String explicitTopicListStr = topicNameStrFunc.apply(cycle);
assert (StringUtils.isNotBlank(explicitTopicListStr));
return Arrays.stream(StringUtils.split(explicitTopicListStr, ','))
.filter(s -> StringUtils.isNotBlank(s))
.toList();
}
@Override @Override
public KafkaOp apply(long cycle) { public KafkaOp apply(long cycle) {
String topicName = topicNameStrFunc.apply(cycle); List<String> topicNameList = getEffectiveTopicNameList(cycle);
String groupId = getEffectiveGroupId(cycle); String groupId = getEffectiveGroupId(cycle);
String cacheKey = KafkaAdapterUtil.buildCacheKey( if (topicNameList.size() ==0 || StringUtils.isBlank(groupId)) {
"consumer", topicName, groupId, String.valueOf(cycle % kafkaClntCnt)); throw new KafkaAdapterInvalidParamException(
"Effective consumer group name and/or topic names are needed for creating a consumer!");
if (StringUtils.isBlank(groupId)) {
throw new KafkaAdapterInvalidParamException("An effective \"group.id\" is needed for a consumer!");
} }
OpTimeTrackKafkaClient opTimeTrackKafkaConsumer = OpTimeTrackKafkaClient opTimeTrackKafkaConsumer =
getOrCreateOpTimeTrackKafkaConsumer(cacheKey, groupId, topicName); getOrCreateOpTimeTrackKafkaConsumer(cycle, topicNameList, groupId);
return new KafkaOp( return new KafkaOp(
kafkaAdapterMetrics, kafkaAdapterMetrics,

View File

@ -59,8 +59,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap()); this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr()); producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
this.txnBatchNum = this.txnBatchNum = parsedOp.getStaticConfigOr("txn_batch_num", Integer.valueOf(0));
parsedOp.getStaticConfigOr(KafkaAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0));
this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM); this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
@ -79,38 +78,53 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
} }
} }
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer( private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer(long cycle,
String cacheKey, String clientId) String topicName,
String clientId)
{ {
String cacheKey = KafkaAdapterUtil.buildCacheKey(
"producer-" + String.valueOf(cycle % kafkaClntCnt), topicName);
OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey); OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (opTimeTrackKafkaClient == null) { if (opTimeTrackKafkaClient == null) {
Properties producerConfProps = new Properties(); Properties producerConfProps = new Properties();
producerConfProps.putAll(producerClientConfMap); producerConfProps.putAll(producerClientConfMap);
if (StringUtils.isNotBlank(clientId))
producerConfProps.put("client.id", clientId); producerConfProps.put("client.id", clientId);
else
producerConfProps.remove("client.id");
// When transaction batch number is less than 2, it is treated effectively as no-transaction // When transaction batch number is less than 2, it is treated effectively as no-transaction
if (txnBatchNum < 2) if (txnBatchNum < 2)
producerConfProps.remove("transactional.id"); producerConfProps.remove("transactional.id");
String baseTransactId = ""; String baseTransactId = "";
boolean transactionEnabled = false;
if (producerConfProps.containsKey("transactional.id")) { if (producerConfProps.containsKey("transactional.id")) {
baseTransactId = producerConfProps.get("transactional.id").toString(); baseTransactId = producerConfProps.get("transactional.id").toString();
producerConfProps.put("transactional.id", baseTransactId + "-" + cacheKey); producerConfProps.put("transactional.id", baseTransactId + "-" + cacheKey);
transactionEnabled = StringUtils.isNotBlank(producerConfProps.get("transactional.id").toString());
} }
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfProps); KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfProps);
if (producerConfProps.containsKey("transactional.id")) { if (transactionEnabled) {
producer.initTransactions(); producer.initTransactions();
} }
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Producer created: {} -- {}", cacheKey, producer); logger.debug("Producer created: {}/{} -- ({}, {}, {})",
cacheKey,
producer,
topicName,
transactionEnabled,
clientId);
} }
opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer( opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer(
kafkaSpace, kafkaSpace,
asyncAPI, asyncAPI,
StringUtils.isNotBlank(producerClientConfMap.get("transactional.id")), transactionEnabled,
txnBatchNum, txnBatchNum,
producer); producer);
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
@ -176,11 +190,9 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
public KafkaOp apply(long cycle) { public KafkaOp apply(long cycle) {
String topicName = topicNameStrFunc.apply(cycle); String topicName = topicNameStrFunc.apply(cycle);
String clientId = getEffectiveClientId(cycle); String clientId = getEffectiveClientId(cycle);
String cacheKey = KafkaAdapterUtil.buildCacheKey(
"producer", topicName, String.valueOf(cycle % kafkaClntCnt));
OpTimeTrackKafkaClient opTimeTrackKafkaProducer = OpTimeTrackKafkaClient opTimeTrackKafkaProducer =
getOrCreateOpTimeTrackKafkaProducer(cacheKey, clientId); getOrCreateOpTimeTrackKafkaProducer(cycle, topicName, clientId);
ProducerRecord<String, String> message = createKafkaMessage( ProducerRecord<String, String> message = createKafkaMessage(
cycle, cycle,

View File

@ -106,6 +106,10 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
@Override @Override
void cycleMsgProcess(long cycle, Object cycleObj) { void cycleMsgProcess(long cycle, Object cycleObj) {
if (kafkaSpace.isShuttigDown()) {
return;
}
synchronized (this) { synchronized (this) {
ConsumerRecords<String, String> records = consumer.poll(msgPoolIntervalInMs); ConsumerRecords<String, String> records = consumer.poll(msgPoolIntervalInMs);
for (ConsumerRecord<String, String> record : records) { for (ConsumerRecord<String, String> record : records) {

View File

@ -18,16 +18,23 @@
package io.nosqlbench.adapter.kafka.ops; package io.nosqlbench.adapter.kafka.ops;
import io.nosqlbench.adapter.kafka.KafkaSpace; import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.InterruptException;
public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
@ -39,8 +46,20 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
private final boolean transactEnabledConfig; private final boolean transactEnabledConfig;
private final int txnBatchNum; private final int txnBatchNum;
enum TxnProcResult {
SUCCESS,
RECOVERABLE_ERROR,
FATAL_ERROR,
UNKNOWN_ERROR
}
// Keep track the transaction count per thread // Keep track the transaction count per thread
private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); private static ThreadLocal<Integer>
txnBatchTrackingCntTL = ThreadLocal.withInitial(() -> 0);
private static ThreadLocal<TxnProcResult>
txnProcResultTL = ThreadLocal.withInitial(() -> TxnProcResult.SUCCESS);
private final KafkaProducer<String, String> producer; private final KafkaProducer<String, String> producer;
@ -57,47 +76,83 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
this.producer = producer; this.producer = producer;
} }
public int getTxnBatchTrackingCnt() { return txnBatchTrackingCnt.get(); } public static int getTxnBatchTrackingCntTL() {
public void incTxnBatchTrackingCnt() { return txnBatchTrackingCntTL.get();
int curVal = getTxnBatchTrackingCnt(); }
txnBatchTrackingCnt.set(curVal + 1); public static void incTxnBatchTrackingCnt() {
txnBatchTrackingCntTL.set(getTxnBatchTrackingCntTL() + 1);
}
public static void resetTxnBatchTrackingCnt() {
txnBatchTrackingCntTL.set(0);
} }
private boolean commitCurTransactionNeeded(long cycle) { public static TxnProcResult getTxnProcResultTL() {
// Whether to commit the transaction which happens when: return txnProcResultTL.get();
// - "txn_batch_num" has been reached since last reset }
boolean commitCurTrans = transactionEnabled; public static void setTxnProcResultTL(TxnProcResult result) {
txnProcResultTL.set(result);
}
public static void resetTxnProcResultTL(TxnProcResult result) {
txnProcResultTL.set(TxnProcResult.SUCCESS);
}
if (commitCurTrans) { private void processMsgTransaction(long cycle, KafkaProducer<String, String> producer) {
int txnBatchTackingCnt = getTxnBatchTrackingCnt(); TxnProcResult result = TxnProcResult.SUCCESS;
if ( ( (txnBatchTackingCnt > 0) && ((txnBatchTackingCnt % txnBatchNum) == 0) ) || if (transactionEnabled) {
( cycle >= (kafkaSpace.getTotalCycleNum() - 1) ) ) { int txnBatchTackingCnt = getTxnBatchTrackingCntTL();
try {
if (txnBatchTackingCnt == 0) {
// Start a new transaction when first starting the processing
producer.beginTransaction();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Commit transaction ({}, {})", logger.debug("New transaction started ( {}, {}, {}, {}, {} )",
txnBatchTackingCnt, cycle); cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL());
} }
} else if ( (txnBatchTackingCnt % (txnBatchNum - 1) == 0) ||
(cycle == (kafkaSpace.getTotalCycleNum() - 1)) ) {
synchronized (this) {
// Commit the current transaction
if (logger.isDebugEnabled()) {
logger.debug("Start committing transaction ... ( {}, {}, {}, {}, {} )",
cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL());
}
producer.commitTransaction();
if (logger.isDebugEnabled()) {
logger.debug("Transaction committed ( {}, {}, {}, {}, {} )",
cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL());
}
// Start a new transaction
producer.beginTransaction();
if (logger.isDebugEnabled()) {
logger.debug("New transaction started ( {}, {}, {}, {}, {} )",
cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL());
}
}
}
}
catch (Exception e) {
e.printStackTrace();
if ( (e instanceof IllegalStateException) ||
(e instanceof ProducerFencedException) ||
(e instanceof UnsupportedOperationException) ||
(e instanceof AuthorizationException) ) {
result = TxnProcResult.FATAL_ERROR;
}
else if ( (e instanceof TimeoutException ) ||
(e instanceof InterruptException)) {
result = TxnProcResult.RECOVERABLE_ERROR;
} }
else { else {
commitCurTrans = false; result = TxnProcResult.UNKNOWN_ERROR;
}
} }
} }
return commitCurTrans; setTxnProcResultTL(result);
}
private boolean startNewTransactionNeeded(long cycle) {
boolean startNewTransact = transactionEnabled;
if (startNewTransact) {
if ( (cycle > 0) && (cycle < (kafkaSpace.getTotalCycleNum() - 1)) ) {
startNewTransact = commitCurTransactionNeeded(cycle);
} else {
startNewTransact = false;
}
}
return startNewTransact;
} }
@Override @Override
@ -105,32 +160,48 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
// For producer, cycleObj represents a "message" (ProducerRecord) // For producer, cycleObj represents a "message" (ProducerRecord)
assert (cycleObj != null); assert (cycleObj != null);
if (kafkaSpace.isShuttigDown()) {
if (transactionEnabled) {
try { try {
ProducerRecord<String, String> message = (ProducerRecord<String, String>) cycleObj; producer.abortTransaction();
boolean startNewTransNeeded = startNewTransactionNeeded(cycle);
boolean commitCurTransNeeded = commitCurTransactionNeeded(cycle);
if (commitCurTransNeeded) {
producer.commitTransaction();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Transaction committed ( {}, {}, {}, {} )", logger.debug("Abort open transaction while shutting down ( {}, {}, {}, {}, {} )",
cycle, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCnt()); cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL());
}
}
catch (Exception e) {
e.printStackTrace();
}
}
return;
} }
incTxnBatchTrackingCnt(); processMsgTransaction(cycle, producer);
} TxnProcResult result = getTxnProcResultTL();
if (startNewTransNeeded) { if (result == TxnProcResult.RECOVERABLE_ERROR) {
producer.beginTransaction(); try {
producer.abortTransaction();
}
catch (Exception e) {
throw new KafkaAdapterUnexpectedException("Aborting transaction failed!");
}
} else if (result == TxnProcResult.FATAL_ERROR) {
throw new KafkaAdapterUnexpectedException("Fatal error when initializing or committing transactions!");
} else if (result == TxnProcResult.UNKNOWN_ERROR) {
logger.debug("Unexpected error when initializing or committing transactions!");
} }
ProducerRecord<String, String> message = (ProducerRecord<String, String>) cycleObj;
try {
if (result == TxnProcResult.SUCCESS) {
Future<RecordMetadata> responseFuture = producer.send(message, new Callback() { Future<RecordMetadata> responseFuture = producer.send(message, new Callback() {
@Override @Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) { public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (asyncMsgAck) { if (asyncMsgAck) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Message sending with async ack. is successful ( {} ) - {}", logger.debug("Message sending with async ack. is successful ({}) - {}, {}",
cycle, recordMetadata); cycle, producer, recordMetadata);
} }
} }
} }
@ -140,8 +211,8 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
try { try {
RecordMetadata recordMetadata = responseFuture.get(); RecordMetadata recordMetadata = responseFuture.get();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Message sending with sync ack. is successful ( {} ) - {}", logger.debug("Message sending with sync ack. is successful ({}) - {}, {}",
cycle, recordMetadata); cycle, producer, recordMetadata);
} }
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
KafkaAdapterUtil.messageErrorHandling( KafkaAdapterUtil.messageErrorHandling(
@ -151,9 +222,26 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
"\n-----\n" + e); "\n-----\n" + e);
} }
} }
incTxnBatchTrackingCnt();
}
}
catch ( ProducerFencedException | OutOfOrderSequenceException |
UnsupportedOperationException | AuthorizationException e) {
if (logger.isDebugEnabled()) {
logger.debug("Fatal error when sending a message ({}) - {}, {}",
cycle, producer, message);
}
throw new KafkaAdapterUnexpectedException(e);
}
catch (IllegalStateException | KafkaException e) {
if (transactionEnabled) {
}
} }
catch (Exception e) { catch (Exception e) {
e.printStackTrace(); throw new KafkaAdapterUnexpectedException(e);
} }
} }
@ -164,7 +252,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
producer.close(); producer.close();
} }
this.txnBatchTrackingCnt.remove(); this.txnBatchTrackingCntTL.remove();
} }
catch (IllegalStateException ise) { catch (IllegalStateException ise) {
// If a producer is already closed, that's fine. // If a producer is already closed, that's fine.

View File

@ -34,14 +34,14 @@ public class KafkaAdapterUtil {
private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class); private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class);
public static String DFT_CONSUMER_GROUP_NAME_PREFIX = "nbKafkaGrp";
public static String DFT_TOPIC_NAME_PREFIX = "nbKafkaTopic";
/////// ///////
// Valid document level parameters for JMS NB yaml file // Valid document level parameters for JMS NB yaml file
public enum DOC_LEVEL_PARAMS { public enum DOC_LEVEL_PARAMS {
// Blocking message producing or consuming // Blocking message producing or consuming
ASYNC_API("async_api"), ASYNC_API("async_api");
// Transaction batch size
TXN_BATCH_NUM("txn_batch_num");
public final String label; public final String label;
DOC_LEVEL_PARAMS(String label) { DOC_LEVEL_PARAMS(String label) {
@ -80,7 +80,9 @@ public class KafkaAdapterUtil {
} }
public static String buildCacheKey(String... keyParts) { public static String buildCacheKey(String... keyParts) {
String combinedStr = String.join("::", keyParts); String combinedStr = Arrays.stream(keyParts)
.filter(StringUtils::isNotBlank)
.collect(Collectors.joining("::"));
return Base64.encodeAsString(combinedStr.getBytes()); return Base64.encodeAsString(combinedStr.getBytes());
} }

View File

@ -1,15 +1,41 @@
# Overview
This NB Kafka driver allows publishing messages to or consuming messages from
* a Kafka cluster, or
* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar.
At high level, this driver supports the following Kafka functionalities
* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers)
* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits)
* auto message commit
* manual message commit with a configurable number of message commits in one batch
* Kafka Transaction support
## Example NB Yaml
* [kafka_producer.yaml](./kafka_producer.yaml)
*
* [kafka_consumer.yaml](./kafka_consumer.yaml)
# Usage # Usage
```bash ```bash
## Kafka Producer ## Kafka Producer
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://10.166.90.94:9092 $ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://localhost:9092
## Kafka Consumer ## Kafka Consumer
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://10.166.90.94:9092 $ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://localhost:9092
``` ```
# Example NB Yaml ## NB Kafka driver specific CLI parameters
[kafka_producer.yaml](./kafka_producer.yaml) * `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from
* For producer workload, this is the number of the producer threads to publish messages to the same topic
* Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe)
* `threads` and `num_clnt` values MUST be the same.
* For consumer workload, this is the partition number of a topic
* Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number.
* Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe)
* `threads` MUST be equal to `num_clnt`*`num_cons_grp`
[kafka_consumer.yaml](./kafka_consumer.yaml) * `num_cons_grp`: the number of consumer groups
* Only relevant for consumer workload

View File

@ -15,7 +15,7 @@ topic.flush.messages=2
producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
#producer.client.id=nbDftClient #producer.client.id=nbDftClient
#producer.transactional.id=nbDftTxn producer.transactional.id=nbDftTxn
##### #####

View File

@ -9,9 +9,9 @@ blocks:
msg-consume-block: msg-consume-block:
ops: ops:
op1: op1:
## TODO: can make this as a list of topics ## The value represents the topic names
## The value represents the topic name # - for consumer, a list of topics (separated by comma) are supported
MessageConsume: "nbktest1" MessageConsume: "nbktest1,nbktest2"
# The timeout value to poll messages (unit: milli-seconds) # The timeout value to poll messages (unit: milli-seconds)
# - default: 0 # - default: 0
@ -19,5 +19,6 @@ blocks:
# The number of messages to receive before doing a manual commit # The number of messages to receive before doing a manual commit
# - default: 0 # - default: 0
# When setting to 0, it could mean doing auto commit (determined by "enable.auto.commit" parameter) # - If 0, it could mean doing auto commit or not, which is determined
# by "enable.auto.commit" consumer config value
manual_commit_batch_num: "0" manual_commit_batch_num: "0"

View File

@ -10,18 +10,19 @@ params:
# - default: true # - default: true
async_api: "true" async_api: "true"
# The number of messages to put in one transaction
# - default: 0
# - value 0 or 1 means no transaction
# - it also requires "transactional.id" parameter is set
txn_batch_num: 5
blocks: blocks:
msg-produce-block: msg-produce-block:
ops: ops:
op1: op1:
## The value represents a topic name ## The value represents a topic name
MessageProduce: "nbktest1" # - for producer, only ONE topic is supported
MessageProduce: "nbktest"
# The number of messages to put in one transaction
# - default: 0
# - value 0 or 1 means no transaction
# - it also requires "transactional.id" parameter is set
txn_batch_num: 8
## (Optional) Kafka message headers (in JSON format). ## (Optional) Kafka message headers (in JSON format).
msg_header: | msg_header: |