From 37f889359ea6ddeb5435a7a349e467a6663bda6c Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Tue, 13 Dec 2022 21:57:05 -0600 Subject: [PATCH] Fix Kafka producer transaction issue and add support for subscribing from multiple topics. NB yaml files and README update --- .../nosqlbench/adapter/kafka/KafkaSpace.java | 24 +- .../dispensers/KafkaBaseOpDispenser.java | 41 +--- .../MessageConsumerOpDispenser.java | 51 ++-- .../MessageProducerOpDispenser.java | 34 ++- .../kafka/ops/OpTimeTrackKafkaConsumer.java | 4 + .../kafka/ops/OpTimeTrackKafkaProducer.java | 232 ++++++++++++------ .../adapter/kafka/util/KafkaAdapterUtil.java | 12 +- adapter-kafka/src/main/resources/README.md | 36 ++- .../main/resources/kafka_config.properties | 2 +- .../src/main/resources/kafka_consumer.yaml | 9 +- .../src/main/resources/kafka_producer.yaml | 15 +- 11 files changed, 292 insertions(+), 168 deletions(-) diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java index c78a535f7..55fcd39b0 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java @@ -30,6 +30,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; public class KafkaSpace implements AutoCloseable { @@ -61,23 +62,25 @@ public class KafkaSpace implements AutoCloseable { // - For Producer workload, this represents how many total producers to publish messages // it must be the same value as the NB "threads" parameter // - For Consumer workload, this represents how many total consumers per consumer group to subscribe messages - // - private final int clntNum; + private final int kafkaClntNum; // Maximum number of Kafka consumer groups - // - This is only relevant for Consumer workload - // - (clntNum * consumerGrpNum) is the total consumer thread number and must be the same + // - Only relevant for Consumer workload + // - (topicPartNum * consumerGrpNum) is the total consumer thread number and must be the same // as the NB "threads" parameter + // - For multi-topic testing, this means one consumer thread may read from multiple topics. private final int consumerGrpNum; private long totalCycleNum; + private AtomicBoolean beingShutdown = new AtomicBoolean(false); + public KafkaSpace(String spaceName, NBConfiguration cfg) { this.spaceName = spaceName; this.cfg = cfg; this.bootstrapSvr = cfg.get("bootstrap_server"); - this.clntNum = + this.kafkaClntNum = NumberUtils.toInt(cfg.getOptional("num_clnt").orElse("1")); this.consumerGrpNum = NumberUtils.toInt(cfg.getOptional("num_cons_grp").orElse("1")); @@ -124,7 +127,7 @@ public class KafkaSpace implements AutoCloseable { public String getBootstrapSvr() { return this.bootstrapSvr; } public KafkaClientConf getKafkaClientConf() { return kafkaClientConf; } - public int getClntNum() { return this.clntNum; } + public int getKafkaClntNum() { return this.kafkaClntNum; } public int getConsumerGrpNum() { return this.consumerGrpNum; } public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; } @@ -132,14 +135,19 @@ public class KafkaSpace implements AutoCloseable { public long getTotalCycleNum() { return totalCycleNum; } public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; } + public boolean isShuttigDown() { + return beingShutdown.get(); + } public void shutdownSpace() { try { - // Pause 5 seconds before closing producers/consumers - KafkaAdapterUtil.pauseCurThreadExec(5); + beingShutdown.set(true); for (OpTimeTrackKafkaClient client : opTimeTrackKafkaClients.values()) { client.close(); } + + // Pause 5 seconds before closing producers/consumers + KafkaAdapterUtil.pauseCurThreadExec(5); } catch (Exception e) { e.printStackTrace(); diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/KafkaBaseOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/KafkaBaseOpDispenser.java index bc1e75acb..0198b9efb 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/KafkaBaseOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/KafkaBaseOpDispenser.java @@ -34,7 +34,6 @@ import org.apache.logging.log4j.Logger; import java.util.*; import java.util.function.LongFunction; import java.util.function.Predicate; -import java.util.stream.Collectors; public abstract class KafkaBaseOpDispenser extends BaseOpDispenser { @@ -82,7 +81,7 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser> lookupStaticStrSetOpValueFunc(String paramName) { - LongFunction> setStringLongFunction; - setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class) - .filter(Predicate.not(String::isEmpty)) - .map(value -> { - Set set = new HashSet<>(); - - if (StringUtils.contains(value,',')) { - set = Arrays.stream(value.split(",")) - .map(String::trim) - .filter(Predicate.not(String::isEmpty)) - .collect(Collectors.toCollection(LinkedHashSet::new)); - } - - return set; - }).orElse(Collections.emptySet()); - logger.info("{}: {}", paramName, setStringLongFunction.apply(0)); - return setStringLongFunction; - } - - // If the corresponding Op parameter is not provided, use the specified default value - protected LongFunction lookupStaticIntOpValueFunc(String paramName, int defaultValue) { - LongFunction integerLongFunction; - integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class) - .filter(Predicate.not(String::isEmpty)) - .map(value -> NumberUtils.toInt(value)) - .map(value -> { - if (value < 0) return 0; - else return value; - }).orElse(defaultValue); - logger.info("{}: {}", paramName, integerLongFunction.apply(0)); - return integerLongFunction; - } - // If the corresponding Op parameter is not provided, use the specified default value protected LongFunction lookupOptionalStrOpValueFunc(String paramName, String defaultValue) { LongFunction stringLongFunction; diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java index 8133130b4..bc3074a75 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java @@ -31,11 +31,9 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.util.*; import java.util.function.LongFunction; +import java.util.stream.Collectors; public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { @@ -82,7 +80,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { private String getEffectiveGroupId(long cycle) { int grpIdx = (int) (cycle % consumerGrpCnt); - String defaultGrpNamePrefix = "nb-grp"; + String defaultGrpNamePrefix = KafkaAdapterUtil.DFT_CONSUMER_GROUP_NAME_PREFIX; if (consumerClientConfMap.containsKey("group.id")) { defaultGrpNamePrefix = consumerClientConfMap.get("group.id"); } @@ -91,10 +89,16 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { } private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer( - String cacheKey, - String groupId, - String topicName) + long cycle, + List topicNameList, + String groupId) { + String topicNameListStr = topicNameList.stream() + .collect(Collectors.joining("::")); + + String cacheKey = KafkaAdapterUtil.buildCacheKey( + "consumer-" + String.valueOf(cycle % kafkaClntCnt), topicNameListStr, groupId ); + OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey); if (opTimeTrackKafkaClient == null) { Properties consumerConfProps = new Properties(); @@ -103,10 +107,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { KafkaConsumer consumer = new KafkaConsumer<>(consumerConfProps); synchronized (this) { - consumer.subscribe(Arrays.asList(topicName)); + consumer.subscribe(topicNameList); } if (logger.isDebugEnabled()) { - logger.debug("Kafka consumer created: {} -- {}", cacheKey, consumer); + logger.debug("Kafka consumer created: {}/{} -- {}, {}, {}", + cacheKey, + consumer, + topicNameList, + autoCommitEnabled, + maxMsgCntPerCommit); } opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer( @@ -117,19 +126,27 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser { return opTimeTrackKafkaClient; } + + protected List getEffectiveTopicNameList(long cycle) { + String explicitTopicListStr = topicNameStrFunc.apply(cycle); + assert (StringUtils.isNotBlank(explicitTopicListStr)); + + return Arrays.stream(StringUtils.split(explicitTopicListStr, ',')) + .filter(s -> StringUtils.isNotBlank(s)) + .toList(); + } + @Override public KafkaOp apply(long cycle) { - String topicName = topicNameStrFunc.apply(cycle); + List topicNameList = getEffectiveTopicNameList(cycle); String groupId = getEffectiveGroupId(cycle); - String cacheKey = KafkaAdapterUtil.buildCacheKey( - "consumer", topicName, groupId, String.valueOf(cycle % kafkaClntCnt)); - - if (StringUtils.isBlank(groupId)) { - throw new KafkaAdapterInvalidParamException("An effective \"group.id\" is needed for a consumer!"); + if (topicNameList.size() ==0 || StringUtils.isBlank(groupId)) { + throw new KafkaAdapterInvalidParamException( + "Effective consumer group name and/or topic names are needed for creating a consumer!"); } OpTimeTrackKafkaClient opTimeTrackKafkaConsumer = - getOrCreateOpTimeTrackKafkaConsumer(cacheKey, groupId, topicName); + getOrCreateOpTimeTrackKafkaConsumer(cycle, topicNameList, groupId); return new KafkaOp( kafkaAdapterMetrics, diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java index 00f9f8ee5..157926bb2 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java @@ -59,8 +59,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap()); producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr()); - this.txnBatchNum = - parsedOp.getStaticConfigOr(KafkaAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0)); + this.txnBatchNum = parsedOp.getStaticConfigOr("txn_batch_num", Integer.valueOf(0)); this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM); @@ -79,38 +78,53 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { } } - private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer( - String cacheKey, String clientId) + private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer(long cycle, + String topicName, + String clientId) { + String cacheKey = KafkaAdapterUtil.buildCacheKey( + "producer-" + String.valueOf(cycle % kafkaClntCnt), topicName); + OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey); if (opTimeTrackKafkaClient == null) { Properties producerConfProps = new Properties(); producerConfProps.putAll(producerClientConfMap); - producerConfProps.put("client.id", clientId); + + if (StringUtils.isNotBlank(clientId)) + producerConfProps.put("client.id", clientId); + else + producerConfProps.remove("client.id"); // When transaction batch number is less than 2, it is treated effectively as no-transaction if (txnBatchNum < 2) producerConfProps.remove("transactional.id"); String baseTransactId = ""; + boolean transactionEnabled = false; if (producerConfProps.containsKey("transactional.id")) { baseTransactId = producerConfProps.get("transactional.id").toString(); producerConfProps.put("transactional.id", baseTransactId + "-" + cacheKey); + transactionEnabled = StringUtils.isNotBlank(producerConfProps.get("transactional.id").toString()); } KafkaProducer producer = new KafkaProducer<>(producerConfProps); - if (producerConfProps.containsKey("transactional.id")) { + if (transactionEnabled) { producer.initTransactions(); } if (logger.isDebugEnabled()) { - logger.debug("Producer created: {} -- {}", cacheKey, producer); + logger.debug("Producer created: {}/{} -- ({}, {}, {})", + cacheKey, + producer, + topicName, + transactionEnabled, + clientId); } opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer( kafkaSpace, asyncAPI, - StringUtils.isNotBlank(producerClientConfMap.get("transactional.id")), + transactionEnabled, txnBatchNum, producer); kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient); @@ -176,11 +190,9 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser { public KafkaOp apply(long cycle) { String topicName = topicNameStrFunc.apply(cycle); String clientId = getEffectiveClientId(cycle); - String cacheKey = KafkaAdapterUtil.buildCacheKey( - "producer", topicName, String.valueOf(cycle % kafkaClntCnt)); OpTimeTrackKafkaClient opTimeTrackKafkaProducer = - getOrCreateOpTimeTrackKafkaProducer(cacheKey, clientId); + getOrCreateOpTimeTrackKafkaProducer(cycle, topicName, clientId); ProducerRecord message = createKafkaMessage( cycle, diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java index 3c7c8a151..f68b5b040 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java @@ -106,6 +106,10 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient { @Override void cycleMsgProcess(long cycle, Object cycleObj) { + if (kafkaSpace.isShuttigDown()) { + return; + } + synchronized (this) { ConsumerRecords records = consumer.poll(msgPoolIntervalInMs); for (ConsumerRecord record : records) { diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java index bed565467..d44265891 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java @@ -18,16 +18,23 @@ package io.nosqlbench.adapter.kafka.ops; import io.nosqlbench.adapter.kafka.KafkaSpace; +import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException; import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.InterruptException; public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { @@ -39,8 +46,20 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { private final boolean transactEnabledConfig; private final int txnBatchNum; + enum TxnProcResult { + SUCCESS, + RECOVERABLE_ERROR, + FATAL_ERROR, + UNKNOWN_ERROR + } + + // Keep track the transaction count per thread - private final ThreadLocal txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); + private static ThreadLocal + txnBatchTrackingCntTL = ThreadLocal.withInitial(() -> 0); + + private static ThreadLocal + txnProcResultTL = ThreadLocal.withInitial(() -> TxnProcResult.SUCCESS); private final KafkaProducer producer; @@ -57,47 +76,83 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { this.producer = producer; } - public int getTxnBatchTrackingCnt() { return txnBatchTrackingCnt.get(); } - public void incTxnBatchTrackingCnt() { - int curVal = getTxnBatchTrackingCnt(); - txnBatchTrackingCnt.set(curVal + 1); + public static int getTxnBatchTrackingCntTL() { + return txnBatchTrackingCntTL.get(); + } + public static void incTxnBatchTrackingCnt() { + txnBatchTrackingCntTL.set(getTxnBatchTrackingCntTL() + 1); + } + public static void resetTxnBatchTrackingCnt() { + txnBatchTrackingCntTL.set(0); } - private boolean commitCurTransactionNeeded(long cycle) { - // Whether to commit the transaction which happens when: - // - "txn_batch_num" has been reached since last reset - boolean commitCurTrans = transactionEnabled; + public static TxnProcResult getTxnProcResultTL() { + return txnProcResultTL.get(); + } + public static void setTxnProcResultTL(TxnProcResult result) { + txnProcResultTL.set(result); + } + public static void resetTxnProcResultTL(TxnProcResult result) { + txnProcResultTL.set(TxnProcResult.SUCCESS); + } - if (commitCurTrans) { - int txnBatchTackingCnt = getTxnBatchTrackingCnt(); + private void processMsgTransaction(long cycle, KafkaProducer producer) { + TxnProcResult result = TxnProcResult.SUCCESS; - if ( ( (txnBatchTackingCnt > 0) && ((txnBatchTackingCnt % txnBatchNum) == 0) ) || - ( cycle >= (kafkaSpace.getTotalCycleNum() - 1) ) ) { - if (logger.isDebugEnabled()) { - logger.debug("Commit transaction ({}, {})", - txnBatchTackingCnt, cycle); + if (transactionEnabled) { + int txnBatchTackingCnt = getTxnBatchTrackingCntTL(); + + try { + if (txnBatchTackingCnt == 0) { + // Start a new transaction when first starting the processing + producer.beginTransaction(); + if (logger.isDebugEnabled()) { + logger.debug("New transaction started ( {}, {}, {}, {}, {} )", + cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL()); + } + } else if ( (txnBatchTackingCnt % (txnBatchNum - 1) == 0) || + (cycle == (kafkaSpace.getTotalCycleNum() - 1)) ) { + + synchronized (this) { + // Commit the current transaction + if (logger.isDebugEnabled()) { + logger.debug("Start committing transaction ... ( {}, {}, {}, {}, {} )", + cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL()); + } + producer.commitTransaction(); + if (logger.isDebugEnabled()) { + logger.debug("Transaction committed ( {}, {}, {}, {}, {} )", + cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL()); + } + + // Start a new transaction + producer.beginTransaction(); + if (logger.isDebugEnabled()) { + logger.debug("New transaction started ( {}, {}, {}, {}, {} )", + cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL()); + } + } } } - else { - commitCurTrans = false; + catch (Exception e) { + e.printStackTrace(); + if ( (e instanceof IllegalStateException) || + (e instanceof ProducerFencedException) || + (e instanceof UnsupportedOperationException) || + (e instanceof AuthorizationException) ) { + result = TxnProcResult.FATAL_ERROR; + } + else if ( (e instanceof TimeoutException ) || + (e instanceof InterruptException)) { + result = TxnProcResult.RECOVERABLE_ERROR; + } + else { + result = TxnProcResult.UNKNOWN_ERROR; + } } } - return commitCurTrans; - } - - private boolean startNewTransactionNeeded(long cycle) { - boolean startNewTransact = transactionEnabled; - - if (startNewTransact) { - if ( (cycle > 0) && (cycle < (kafkaSpace.getTotalCycleNum() - 1)) ) { - startNewTransact = commitCurTransactionNeeded(cycle); - } else { - startNewTransact = false; - } - } - - return startNewTransact; + setTxnProcResultTL(result); } @Override @@ -105,55 +160,88 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { // For producer, cycleObj represents a "message" (ProducerRecord) assert (cycleObj != null); - try { - ProducerRecord message = (ProducerRecord) cycleObj; - boolean startNewTransNeeded = startNewTransactionNeeded(cycle); - boolean commitCurTransNeeded = commitCurTransactionNeeded(cycle); + if (kafkaSpace.isShuttigDown()) { + if (transactionEnabled) { + try { + producer.abortTransaction(); + if (logger.isDebugEnabled()) { + logger.debug("Abort open transaction while shutting down ( {}, {}, {}, {}, {} )", + cycle, producer, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCntTL()); + } + } + catch (Exception e) { + e.printStackTrace(); + } + } + return; + } - if (commitCurTransNeeded) { - producer.commitTransaction(); - if (logger.isDebugEnabled()) { - logger.debug("Transaction committed ( {}, {}, {}, {} )", - cycle, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCnt()); + processMsgTransaction(cycle, producer); + TxnProcResult result = getTxnProcResultTL(); + + if (result == TxnProcResult.RECOVERABLE_ERROR) { + try { + producer.abortTransaction(); + } + catch (Exception e) { + throw new KafkaAdapterUnexpectedException("Aborting transaction failed!"); + } + } else if (result == TxnProcResult.FATAL_ERROR) { + throw new KafkaAdapterUnexpectedException("Fatal error when initializing or committing transactions!"); + } else if (result == TxnProcResult.UNKNOWN_ERROR) { + logger.debug("Unexpected error when initializing or committing transactions!"); + } + + ProducerRecord message = (ProducerRecord) cycleObj; + try { + if (result == TxnProcResult.SUCCESS) { + Future responseFuture = producer.send(message, new Callback() { + @Override + public void onCompletion(RecordMetadata recordMetadata, Exception e) { + if (asyncMsgAck) { + if (logger.isDebugEnabled()) { + logger.debug("Message sending with async ack. is successful ({}) - {}, {}", + cycle, producer, recordMetadata); + } + } + } + }); + + if (!asyncMsgAck) { + try { + RecordMetadata recordMetadata = responseFuture.get(); + if (logger.isDebugEnabled()) { + logger.debug("Message sending with sync ack. is successful ({}) - {}, {}", + cycle, producer, recordMetadata); + } + } catch (InterruptedException | ExecutionException e) { + KafkaAdapterUtil.messageErrorHandling( + e, + kafkaSpace.isStrictMsgErrorHandling(), + "Unexpected error when waiting to receive message-send ack from the Kafka cluster." + + "\n-----\n" + e); + } } incTxnBatchTrackingCnt(); } - if (startNewTransNeeded) { - producer.beginTransaction(); + } + catch ( ProducerFencedException | OutOfOrderSequenceException | + UnsupportedOperationException | AuthorizationException e) { + if (logger.isDebugEnabled()) { + logger.debug("Fatal error when sending a message ({}) - {}, {}", + cycle, producer, message); } + throw new KafkaAdapterUnexpectedException(e); + } + catch (IllegalStateException | KafkaException e) { + if (transactionEnabled) { - Future responseFuture = producer.send(message, new Callback() { - @Override - public void onCompletion(RecordMetadata recordMetadata, Exception e) { - if (asyncMsgAck) { - if (logger.isDebugEnabled()) { - logger.debug("Message sending with async ack. is successful ( {} ) - {}", - cycle, recordMetadata); - } - } - } - }); - - if (!asyncMsgAck) { - try { - RecordMetadata recordMetadata = responseFuture.get(); - if (logger.isDebugEnabled()) { - logger.debug("Message sending with sync ack. is successful ( {} ) - {}", - cycle, recordMetadata); - } - } catch (InterruptedException | ExecutionException e) { - KafkaAdapterUtil.messageErrorHandling( - e, - kafkaSpace.isStrictMsgErrorHandling(), - "Unexpected error when waiting to receive message-send ack from the Kafka cluster." + - "\n-----\n" + e); - } } } catch (Exception e) { - e.printStackTrace(); + throw new KafkaAdapterUnexpectedException(e); } } @@ -164,7 +252,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient { producer.close(); } - this.txnBatchTrackingCnt.remove(); + this.txnBatchTrackingCntTL.remove(); } catch (IllegalStateException ise) { // If a producer is already closed, that's fine. diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java index dfa9bb19a..b95309b3d 100644 --- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java +++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/util/KafkaAdapterUtil.java @@ -34,14 +34,14 @@ public class KafkaAdapterUtil { private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class); + public static String DFT_CONSUMER_GROUP_NAME_PREFIX = "nbKafkaGrp"; + public static String DFT_TOPIC_NAME_PREFIX = "nbKafkaTopic"; + /////// // Valid document level parameters for JMS NB yaml file public enum DOC_LEVEL_PARAMS { // Blocking message producing or consuming - ASYNC_API("async_api"), - // Transaction batch size - TXN_BATCH_NUM("txn_batch_num"); - + ASYNC_API("async_api"); public final String label; DOC_LEVEL_PARAMS(String label) { @@ -80,7 +80,9 @@ public class KafkaAdapterUtil { } public static String buildCacheKey(String... keyParts) { - String combinedStr = String.join("::", keyParts); + String combinedStr = Arrays.stream(keyParts) + .filter(StringUtils::isNotBlank) + .collect(Collectors.joining("::")); return Base64.encodeAsString(combinedStr.getBytes()); } diff --git a/adapter-kafka/src/main/resources/README.md b/adapter-kafka/src/main/resources/README.md index 8472b31dd..e01f35fd0 100644 --- a/adapter-kafka/src/main/resources/README.md +++ b/adapter-kafka/src/main/resources/README.md @@ -1,15 +1,41 @@ +# Overview + +This NB Kafka driver allows publishing messages to or consuming messages from +* a Kafka cluster, or +* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar. + +At high level, this driver supports the following Kafka functionalities +* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers) +* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits) + * auto message commit + * manual message commit with a configurable number of message commits in one batch +* Kafka Transaction support + +## Example NB Yaml +* [kafka_producer.yaml](./kafka_producer.yaml) +* +* [kafka_consumer.yaml](./kafka_consumer.yaml) + # Usage ```bash ## Kafka Producer -$ run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://10.166.90.94:9092 +$ run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://localhost:9092 ## Kafka Consumer -$ run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://10.166.90.94:9092 +$ run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://localhost:9092 ``` -# Example NB Yaml +## NB Kafka driver specific CLI parameters -[kafka_producer.yaml](./kafka_producer.yaml) +* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from + * For producer workload, this is the number of the producer threads to publish messages to the same topic + * Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe) + * `threads` and `num_clnt` values MUST be the same. + * For consumer workload, this is the partition number of a topic + * Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number. + * Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe) + * `threads` MUST be equal to `num_clnt`*`num_cons_grp` -[kafka_consumer.yaml](./kafka_consumer.yaml) +* `num_cons_grp`: the number of consumer groups + * Only relevant for consumer workload diff --git a/adapter-kafka/src/main/resources/kafka_config.properties b/adapter-kafka/src/main/resources/kafka_config.properties index 4e8c49c19..75742baa6 100644 --- a/adapter-kafka/src/main/resources/kafka_config.properties +++ b/adapter-kafka/src/main/resources/kafka_config.properties @@ -15,7 +15,7 @@ topic.flush.messages=2 producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer #producer.client.id=nbDftClient -#producer.transactional.id=nbDftTxn +producer.transactional.id=nbDftTxn ##### diff --git a/adapter-kafka/src/main/resources/kafka_consumer.yaml b/adapter-kafka/src/main/resources/kafka_consumer.yaml index 5cb1910f0..2d88def3e 100644 --- a/adapter-kafka/src/main/resources/kafka_consumer.yaml +++ b/adapter-kafka/src/main/resources/kafka_consumer.yaml @@ -9,9 +9,9 @@ blocks: msg-consume-block: ops: op1: - ## TODO: can make this as a list of topics - ## The value represents the topic name - MessageConsume: "nbktest1" + ## The value represents the topic names + # - for consumer, a list of topics (separated by comma) are supported + MessageConsume: "nbktest1,nbktest2" # The timeout value to poll messages (unit: milli-seconds) # - default: 0 @@ -19,5 +19,6 @@ blocks: # The number of messages to receive before doing a manual commit # - default: 0 - # When setting to 0, it could mean doing auto commit (determined by "enable.auto.commit" parameter) + # - If 0, it could mean doing auto commit or not, which is determined + # by "enable.auto.commit" consumer config value manual_commit_batch_num: "0" diff --git a/adapter-kafka/src/main/resources/kafka_producer.yaml b/adapter-kafka/src/main/resources/kafka_producer.yaml index b6a374c35..7760bd19a 100644 --- a/adapter-kafka/src/main/resources/kafka_producer.yaml +++ b/adapter-kafka/src/main/resources/kafka_producer.yaml @@ -10,18 +10,19 @@ params: # - default: true async_api: "true" - # The number of messages to put in one transaction - # - default: 0 - # - value 0 or 1 means no transaction - # - it also requires "transactional.id" parameter is set - txn_batch_num: 5 - blocks: msg-produce-block: ops: op1: ## The value represents a topic name - MessageProduce: "nbktest1" + # - for producer, only ONE topic is supported + MessageProduce: "nbktest" + + # The number of messages to put in one transaction + # - default: 0 + # - value 0 or 1 means no transaction + # - it also requires "transactional.id" parameter is set + txn_batch_num: 8 ## (Optional) Kafka message headers (in JSON format). msg_header: |