diff --git a/adapter-kafka/pom.xml b/adapter-kafka/pom.xml
new file mode 100644
index 000000000..95983b02b
--- /dev/null
+++ b/adapter-kafka/pom.xml
@@ -0,0 +1,81 @@
+
+
+
+ 4.0.0
+
+ adapter-kafka
+ jar
+
+
+ mvn-defaults
+ io.nosqlbench
+ 4.17.32-SNAPSHOT
+ ../mvn-defaults
+
+
+ ${project.artifactId}
+
+ A Kafka driver for nosqlbench. This provides the ability to inject synthetic data
+ into a Kafka or a Kafka-compatible (e.g. Pulsar with S4K) system .
+
+
+
+ 3.3.1
+
+
+
+
+ io.nosqlbench
+ engine-api
+ 4.17.32-SNAPSHOT
+
+
+
+ io.nosqlbench
+ adapters-api
+ 4.17.32-SNAPSHOT
+
+
+
+ org.apache.kafka
+ kafka-clients
+ ${kafka.version}
+
+
+
+
+ org.apache.commons
+ commons-lang3
+ 3.12.0
+
+
+
+
+ commons-beanutils
+ commons-beanutils
+ 1.9.4
+
+
+
+
+ org.apache.commons
+ commons-configuration2
+ 2.8.0
+
+
+
+
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaDriverAdapter.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaDriverAdapter.java
new file mode 100644
index 000000000..606b21885
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaDriverAdapter.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.kafka;
+
+import io.nosqlbench.adapter.kafka.ops.KafkaOp;
+import io.nosqlbench.api.config.standard.NBConfigModel;
+import io.nosqlbench.api.config.standard.NBConfiguration;
+import io.nosqlbench.engine.api.activityimpl.OpMapper;
+import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
+import io.nosqlbench.nb.annotations.Service;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.function.Function;
+
+@Service(value = DriverAdapter.class, selector = "kafka")
+public class KafkaDriverAdapter extends BaseDriverAdapter {
+ private final static Logger logger = LogManager.getLogger(KafkaDriverAdapter.class);
+
+ @Override
+ public OpMapper getOpMapper() {
+ DriverSpaceCache extends KafkaSpace> spaceCache = getSpaceCache();
+ NBConfiguration adapterConfig = getConfiguration();
+ return new KafkaOpMapper(this, adapterConfig, spaceCache);
+ }
+
+ @Override
+ public Function getSpaceInitializer(NBConfiguration cfg) {
+ return (s) -> new KafkaSpace(s, cfg);
+ }
+
+ @Override
+ public NBConfigModel getConfigModel() {
+ return super.getConfigModel().add(KafkaSpace.getConfigModel());
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaOpMapper.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaOpMapper.java
new file mode 100644
index 000000000..750a46460
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaOpMapper.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.kafka;
+
+import io.nosqlbench.adapter.kafka.dispensers.MessageConsumerOpDispenser;
+import io.nosqlbench.adapter.kafka.dispensers.MessageProducerOpDispenser;
+import io.nosqlbench.adapter.kafka.ops.KafkaOp;
+import io.nosqlbench.api.config.standard.NBConfiguration;
+import io.nosqlbench.engine.api.activityimpl.OpDispenser;
+import io.nosqlbench.engine.api.activityimpl.OpMapper;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import io.nosqlbench.engine.api.templating.TypeAndTarget;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class KafkaOpMapper implements OpMapper {
+
+ private final static Logger logger = LogManager.getLogger(KafkaOpMapper.class);
+
+ private final NBConfiguration cfg;
+ private final DriverSpaceCache extends KafkaSpace> spaceCache;
+ private final DriverAdapter adapter;
+
+ public KafkaOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache extends KafkaSpace> spaceCache) {
+ this.cfg = cfg;
+ this.spaceCache = spaceCache;
+ this.adapter = adapter;
+ }
+
+ @Override
+ public OpDispenser extends KafkaOp> apply(ParsedOp op) {
+ String spaceName = op.getStaticConfigOr("space", "default");
+ KafkaSpace kafkaSpace = spaceCache.get(spaceName);
+
+ /*
+ * If the user provides a body element, then they want to provide the JSON or
+ * a data structure that can be converted into JSON, bypassing any further
+ * specialized type-checking or op-type specific features
+ */
+ if (op.isDefined("body")) {
+ throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
+ }
+ else {
+ TypeAndTarget opType = op.getTypeAndTarget(KafkaOpType.class, String.class);
+
+ return switch (opType.enumId) {
+ case MessageProduce ->
+ new MessageProducerOpDispenser(adapter, op, opType.targetFunction, kafkaSpace);
+ case MessageConsume ->
+ new MessageConsumerOpDispenser(adapter, op, opType.targetFunction, kafkaSpace);
+ };
+ }
+ }
+
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaOpType.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaOpType.java
new file mode 100644
index 000000000..2f2b48396
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaOpType.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.kafka;
+
+public enum KafkaOpType {
+ // Kafka producer
+ MessageProduce,
+ // Kafka consumer
+ MessageConsume
+}
+
+
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java
new file mode 100644
index 000000000..c78a535f7
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/KafkaSpace.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.kafka;
+
+import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
+import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
+import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
+import io.nosqlbench.adapter.kafka.util.KafkaClientConf;
+import io.nosqlbench.api.config.standard.ConfigModel;
+import io.nosqlbench.api.config.standard.NBConfigModel;
+import io.nosqlbench.api.config.standard.NBConfiguration;
+import io.nosqlbench.api.config.standard.Param;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class KafkaSpace implements AutoCloseable {
+
+ private final static Logger logger = LogManager.getLogger(KafkaSpace.class);
+
+ private final String spaceName;
+ private final NBConfiguration cfg;
+
+ // TODO: currently this NB Kafka driver only supports String type for message key and value
+ // add schema support in the future
+ private final ConcurrentHashMap opTimeTrackKafkaClients = new ConcurrentHashMap<>();
+
+ private final String bootstrapSvr;
+ private final String kafkaClientConfFileName;
+ private final KafkaClientConf kafkaClientConf;
+
+ // Whether to do strict error handling while sending/receiving messages
+ // - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop
+ // - No: pause the current thread that received the error message for 1 second and then continue processing
+ private final boolean strictMsgErrorHandling;
+
+ // Maximum time length to execute S4J operations (e.g. message send or consume)
+ // - when NB execution passes this threshold, it is simply NoOp
+ // - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes
+ private final long maxOpTimeInSec;
+ private final long activityStartTimeMills;
+
+ // Maximum number of Kafka clients
+ // - For Producer workload, this represents how many total producers to publish messages
+ // it must be the same value as the NB "threads" parameter
+ // - For Consumer workload, this represents how many total consumers per consumer group to subscribe messages
+ //
+ private final int clntNum;
+
+ // Maximum number of Kafka consumer groups
+ // - This is only relevant for Consumer workload
+ // - (clntNum * consumerGrpNum) is the total consumer thread number and must be the same
+ // as the NB "threads" parameter
+ private final int consumerGrpNum;
+
+ private long totalCycleNum;
+
+ public KafkaSpace(String spaceName, NBConfiguration cfg) {
+ this.spaceName = spaceName;
+ this.cfg = cfg;
+
+ this.bootstrapSvr = cfg.get("bootstrap_server");
+ this.clntNum =
+ NumberUtils.toInt(cfg.getOptional("num_clnt").orElse("1"));
+ this.consumerGrpNum =
+ NumberUtils.toInt(cfg.getOptional("num_cons_grp").orElse("1"));
+ this.maxOpTimeInSec =
+ NumberUtils.toLong(cfg.getOptional("max_op_time").orElse("0L"));
+ this.strictMsgErrorHandling =
+ BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false"));
+ this.kafkaClientConfFileName = cfg.get("config");
+ this.kafkaClientConf = new KafkaClientConf(kafkaClientConfFileName);
+ this.activityStartTimeMills = System.currentTimeMillis();
+ }
+
+ @Override
+ public void close() {
+ shutdownSpace();
+ }
+
+ public static NBConfigModel getConfigModel() {
+ return ConfigModel.of(KafkaSpace.class)
+ .add(Param.defaultTo("bootstrap_server", "pulsar://localhost:9020")
+ .setDescription("Kafka bootstrap server URL."))
+ .add(Param.defaultTo("config", "config.properties")
+ .setDescription("Kafka client connection configuration property file."))
+ .add(Param.defaultTo("num_clnt", 1)
+ .setDescription("Number of Kafka clients. For consumer, this is the number of consumers per consumer group"))
+ .add(Param.defaultTo("num_cons_grp", 1)
+ .setDescription("Number of consumer groups (only relevant for Kafka consumer workload). "))
+ .add(Param.defaultTo("max_op_time", 0)
+ .setDescription("Maximum time (in seconds) to run NB Kafka testing scenario."))
+ .add(Param.defaultTo("strict_msg_error_handling", false)
+ .setDescription("Whether to do strict error handling which is to stop NB Kafka execution."))
+ .asReadOnly();
+ }
+
+ public OpTimeTrackKafkaClient getOpTimeTrackKafkaClient(String cacheKey) {
+ return opTimeTrackKafkaClients.get(cacheKey);
+ }
+ public void addOpTimeTrackKafkaClient(String cacheKey, OpTimeTrackKafkaClient client) {
+ opTimeTrackKafkaClients.put(cacheKey, client);
+ }
+
+ public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
+ public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; }
+ public String getBootstrapSvr() { return this.bootstrapSvr; }
+ public KafkaClientConf getKafkaClientConf() { return kafkaClientConf; }
+
+ public int getClntNum() { return this.clntNum; }
+ public int getConsumerGrpNum() { return this.consumerGrpNum; }
+
+ public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
+
+ public long getTotalCycleNum() { return totalCycleNum; }
+ public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
+
+ public void shutdownSpace() {
+ try {
+ // Pause 5 seconds before closing producers/consumers
+ KafkaAdapterUtil.pauseCurThreadExec(5);
+
+ for (OpTimeTrackKafkaClient client : opTimeTrackKafkaClients.values()) {
+ client.close();
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ throw new KafkaAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
+ }
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/KafkaBaseOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/KafkaBaseOpDispenser.java
new file mode 100644
index 000000000..bc1e75acb
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/KafkaBaseOpDispenser.java
@@ -0,0 +1,170 @@
+package io.nosqlbench.adapter.kafka.dispensers;
+
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import io.nosqlbench.adapter.kafka.KafkaSpace;
+import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
+import io.nosqlbench.adapter.kafka.ops.KafkaOp;
+import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
+import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
+import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.*;
+import java.util.function.LongFunction;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+public abstract class KafkaBaseOpDispenser extends BaseOpDispenser {
+
+ private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
+
+ protected final ParsedOp parsedOp;
+ protected final KafkaAdapterMetrics kafkaAdapterMetrics;
+ protected final KafkaSpace kafkaSpace;
+
+ protected final int kafkaClntCnt;
+ protected final int consumerGrpCnt;
+
+ // Doc-level parameter: async_api (default: true)
+ // - For Producer workload, this means waiting for message send ack. synchronously or asynchronously
+ // - For Consumer workload, this means doing manual message commit synchronously or asynchronously
+ // Only relevant when auto.commit is disabled
+ protected final boolean asyncAPI;
+
+ protected final LongFunction topicNameStrFunc;
+ protected final Map topicConfMap = new HashMap<>();
+
+ protected final int totalThreadNum;
+ protected final long totalCycleNum;
+
+ public KafkaBaseOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction topicNameStrFunc,
+ KafkaSpace kafkaSpace) {
+
+ super(adapter, op);
+
+ this.parsedOp = op;
+ this.kafkaSpace = kafkaSpace;
+
+ String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp);
+ this.kafkaAdapterMetrics = new KafkaAdapterMetrics(defaultMetricsPrefix);
+ kafkaAdapterMetrics.initS4JAdapterInstrumentation();
+
+ this.asyncAPI =
+ parsedOp.getStaticConfigOr(KafkaAdapterUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, Boolean.TRUE);
+
+ this.topicNameStrFunc = topicNameStrFunc;
+ this.topicConfMap.putAll(kafkaSpace.getKafkaClientConf().getTopicConfMap());
+
+ this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class));
+ kafkaSpace.setTotalCycleNum(totalCycleNum);
+
+ this.kafkaClntCnt = kafkaSpace.getClntNum();
+ this.consumerGrpCnt = kafkaSpace.getConsumerGrpNum();
+ this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class));
+
+ assert (kafkaClntCnt > 0);
+ assert (consumerGrpCnt > 0);
+
+ boolean validThreadNum =
+ ( ((this instanceof MessageProducerOpDispenser) && (totalThreadNum == kafkaClntCnt)) ||
+ ((this instanceof MessageConsumerOpDispenser) && (totalThreadNum == kafkaClntCnt*consumerGrpCnt)) );
+ if (!validThreadNum) {
+ throw new KafkaAdapterInvalidParamException(
+ "Incorrect settings of 'threads', 'num_clnt', or 'num_cons_grp' -- " +
+ totalThreadNum + ", " + kafkaClntCnt + ", " + consumerGrpCnt);
+ }
+ }
+
+ public KafkaSpace getKafkaSpace() { return kafkaSpace; }
+ public KafkaAdapterMetrics getKafkaAdapterMetrics() { return kafkaAdapterMetrics; }
+
+ protected LongFunction lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) {
+ LongFunction booleanLongFunction;
+ booleanLongFunction = (l) -> parsedOp.getOptionalStaticConfig(paramName, String.class)
+ .filter(Predicate.not(String::isEmpty))
+ .map(value -> BooleanUtils.toBoolean(value))
+ .orElse(defaultValue);
+ logger.info("{}: {}", paramName, booleanLongFunction.apply(0));
+ return booleanLongFunction;
+ }
+
+ protected LongFunction> lookupStaticStrSetOpValueFunc(String paramName) {
+ LongFunction> setStringLongFunction;
+ setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
+ .filter(Predicate.not(String::isEmpty))
+ .map(value -> {
+ Set set = new HashSet<>();
+
+ if (StringUtils.contains(value,',')) {
+ set = Arrays.stream(value.split(","))
+ .map(String::trim)
+ .filter(Predicate.not(String::isEmpty))
+ .collect(Collectors.toCollection(LinkedHashSet::new));
+ }
+
+ return set;
+ }).orElse(Collections.emptySet());
+ logger.info("{}: {}", paramName, setStringLongFunction.apply(0));
+ return setStringLongFunction;
+ }
+
+ // If the corresponding Op parameter is not provided, use the specified default value
+ protected LongFunction lookupStaticIntOpValueFunc(String paramName, int defaultValue) {
+ LongFunction integerLongFunction;
+ integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
+ .filter(Predicate.not(String::isEmpty))
+ .map(value -> NumberUtils.toInt(value))
+ .map(value -> {
+ if (value < 0) return 0;
+ else return value;
+ }).orElse(defaultValue);
+ logger.info("{}: {}", paramName, integerLongFunction.apply(0));
+ return integerLongFunction;
+ }
+
+ // If the corresponding Op parameter is not provided, use the specified default value
+ protected LongFunction lookupOptionalStrOpValueFunc(String paramName, String defaultValue) {
+ LongFunction stringLongFunction;
+ stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class)
+ .orElse((l) -> defaultValue);
+ logger.info("{}: {}", paramName, stringLongFunction.apply(0));
+
+ return stringLongFunction;
+ }
+ protected LongFunction lookupOptionalStrOpValueFunc(String paramName) {
+ return lookupOptionalStrOpValueFunc(paramName, "");
+ }
+
+ // Mandatory Op parameter. Throw an error if not specified or having empty value
+ protected LongFunction lookupMandtoryStrOpValueFunc(String paramName) {
+ LongFunction stringLongFunction;
+ stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class);
+ logger.info("{}: {}", paramName, stringLongFunction.apply(0));
+
+ return stringLongFunction;
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java
new file mode 100644
index 000000000..8133130b4
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.kafka.dispensers;
+
+import io.nosqlbench.adapter.kafka.KafkaSpace;
+import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
+import io.nosqlbench.adapter.kafka.ops.KafkaOp;
+import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
+import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
+import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.LongFunction;
+
+public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
+
+ private final static Logger logger = LogManager.getLogger("MessageConsumerOpDispenser");
+
+ private final Map consumerClientConfMap = new HashMap<>();
+
+ // The timeout value as message Poll interval (in seconds)
+ protected final int msgPollIntervalInSec;
+
+ // Manual commit frequency
+ // - # of received messages / sec.
+ // - This is only relevant when the effective setting (global level and statement level)
+ // of "enable.auto.commit" is false
+ protected final int maxMsgCntPerCommit;
+
+ protected boolean autoCommitEnabled;
+
+ public MessageConsumerOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ KafkaSpace kafkaSpace) {
+ super(adapter, op, tgtNameFunc, kafkaSpace);
+
+ this.consumerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getConsumerConfMap());
+ consumerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
+
+ this.msgPollIntervalInSec =
+ NumberUtils.toInt(parsedOp.getStaticConfigOr("msg_poll_interval", "0"));
+
+ this.maxMsgCntPerCommit =
+ NumberUtils.toInt(parsedOp.getStaticConfig("manual_commit_batch_num", String.class));
+
+ this.autoCommitEnabled = true;
+ if (maxMsgCntPerCommit > 0) {
+ this.autoCommitEnabled = false;
+ consumerClientConfMap.put("enable.auto.commit", "false");
+ } else {
+ if (consumerClientConfMap.containsKey("enable.auto.commit")) {
+ this.autoCommitEnabled = BooleanUtils.toBoolean(consumerClientConfMap.get("enable.auto.commit"));
+ }
+ }
+ }
+
+ private String getEffectiveGroupId(long cycle) {
+ int grpIdx = (int) (cycle % consumerGrpCnt);
+ String defaultGrpNamePrefix = "nb-grp";
+ if (consumerClientConfMap.containsKey("group.id")) {
+ defaultGrpNamePrefix = consumerClientConfMap.get("group.id");
+ }
+
+ return defaultGrpNamePrefix + "-" + grpIdx;
+ }
+
+ private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer(
+ String cacheKey,
+ String groupId,
+ String topicName)
+ {
+ OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
+ if (opTimeTrackKafkaClient == null) {
+ Properties consumerConfProps = new Properties();
+ consumerConfProps.putAll(consumerClientConfMap);
+ consumerConfProps.put("group.id", groupId);
+
+ KafkaConsumer consumer = new KafkaConsumer<>(consumerConfProps);
+ synchronized (this) {
+ consumer.subscribe(Arrays.asList(topicName));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Kafka consumer created: {} -- {}", cacheKey, consumer);
+ }
+
+ opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(
+ kafkaSpace, asyncAPI, msgPollIntervalInSec, autoCommitEnabled, maxMsgCntPerCommit, consumer);
+ kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
+ }
+
+ return opTimeTrackKafkaClient;
+ }
+
+ @Override
+ public KafkaOp apply(long cycle) {
+ String topicName = topicNameStrFunc.apply(cycle);
+ String groupId = getEffectiveGroupId(cycle);
+ String cacheKey = KafkaAdapterUtil.buildCacheKey(
+ "consumer", topicName, groupId, String.valueOf(cycle % kafkaClntCnt));
+
+ if (StringUtils.isBlank(groupId)) {
+ throw new KafkaAdapterInvalidParamException("An effective \"group.id\" is needed for a consumer!");
+ }
+
+ OpTimeTrackKafkaClient opTimeTrackKafkaConsumer =
+ getOrCreateOpTimeTrackKafkaConsumer(cacheKey, groupId, topicName);
+
+ return new KafkaOp(
+ kafkaAdapterMetrics,
+ kafkaSpace,
+ opTimeTrackKafkaConsumer,
+ null);
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java
new file mode 100644
index 000000000..00f9f8ee5
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.adapter.kafka.dispensers;
+
+import io.nosqlbench.adapter.kafka.KafkaSpace;
+import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
+import io.nosqlbench.adapter.kafka.ops.KafkaOp;
+import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
+import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
+import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
+import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.templating.ParsedOp;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.LongFunction;
+
+public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
+
+ private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
+
+ public static final String MSG_HEADER_OP_PARAM = "msg_header";
+ public static final String MSG_KEY_OP_PARAM = "msg_key";
+ public static final String MSG_BODY_OP_PARAM = "msg_body";
+
+ private final Map producerClientConfMap = new HashMap<>();
+
+ protected final int txnBatchNum;
+ private final LongFunction msgHeaderJsonStrFunc;
+ private final LongFunction msgKeyStrFunc;
+ private final LongFunction msgValueStrFunc;
+
+ public MessageProducerOpDispenser(DriverAdapter adapter,
+ ParsedOp op,
+ LongFunction tgtNameFunc,
+ KafkaSpace kafkaSpace) {
+ super(adapter, op, tgtNameFunc, kafkaSpace);
+
+ this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
+ producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
+
+ this.txnBatchNum =
+ parsedOp.getStaticConfigOr(KafkaAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0));
+
+ this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
+ this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
+ this.msgValueStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
+ }
+
+ private String getEffectiveClientId(long cycle) {
+ if (producerClientConfMap.containsKey("client.id")) {
+ String defaultClientIdPrefix = producerClientConfMap.get("client.id");
+ int clntIdx = (int) (cycle % kafkaClntCnt);
+
+ return defaultClientIdPrefix + "-" + clntIdx;
+ }
+ else {
+ return "";
+ }
+ }
+
+ private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer(
+ String cacheKey, String clientId)
+ {
+ OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
+ if (opTimeTrackKafkaClient == null) {
+ Properties producerConfProps = new Properties();
+ producerConfProps.putAll(producerClientConfMap);
+ producerConfProps.put("client.id", clientId);
+
+ // When transaction batch number is less than 2, it is treated effectively as no-transaction
+ if (txnBatchNum < 2)
+ producerConfProps.remove("transactional.id");
+
+ String baseTransactId = "";
+ if (producerConfProps.containsKey("transactional.id")) {
+ baseTransactId = producerConfProps.get("transactional.id").toString();
+ producerConfProps.put("transactional.id", baseTransactId + "-" + cacheKey);
+ }
+
+ KafkaProducer producer = new KafkaProducer<>(producerConfProps);
+ if (producerConfProps.containsKey("transactional.id")) {
+ producer.initTransactions();
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Producer created: {} -- {}", cacheKey, producer);
+ }
+
+ opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer(
+ kafkaSpace,
+ asyncAPI,
+ StringUtils.isNotBlank(producerClientConfMap.get("transactional.id")),
+ txnBatchNum,
+ producer);
+ kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
+ }
+
+ return opTimeTrackKafkaClient;
+ }
+
+ private ProducerRecord createKafkaMessage(
+ long curCycle,
+ String topicName,
+ String msgHeaderRawJsonStr,
+ String msgKey,
+ String msgValue
+ ) {
+ if (StringUtils.isAllBlank(msgKey, msgValue)) {
+ throw new KafkaAdapterInvalidParamException("Message key and value can't both be empty!");
+ }
+
+ int messageSize = KafkaAdapterUtil.getStrObjSize(msgKey) + KafkaAdapterUtil.getStrObjSize(msgValue);
+
+ ProducerRecord record = new ProducerRecord<>(topicName, msgKey, msgValue);
+
+ // Check if msgHeaderRawJsonStr is a valid JSON string with a collection of key/value pairs
+ // - if Yes, convert it to a map
+ // - otherwise, log an error message and ignore message headers without throwing a runtime exception
+ Map msgHeaderProperties = new HashMap<>();
+ if (!StringUtils.isBlank(msgHeaderRawJsonStr)) {
+ try {
+ msgHeaderProperties = KafkaAdapterUtil.convertJsonToMap(msgHeaderRawJsonStr);
+ } catch (Exception e) {
+ logger.warn(
+ "Error parsing message property JSON string {}, ignore message properties!",
+ msgHeaderRawJsonStr);
+ }
+ }
+
+ for (Map.Entry entry : msgHeaderProperties.entrySet()) {
+ String headerKey = entry.getKey();
+ String headerValue = entry.getValue();
+
+ messageSize += KafkaAdapterUtil.getStrObjSize(headerKey) + KafkaAdapterUtil.getStrObjSize(headerValue);
+
+ if (! StringUtils.isAnyBlank(headerKey, headerValue)) {
+ record.headers().add(headerKey, headerValue.getBytes());
+ }
+
+ }
+
+ // NB-specific headers
+ messageSize += KafkaAdapterUtil.getStrObjSize(KafkaAdapterUtil.NB_MSG_SEQ_PROP);
+ messageSize += 8;
+ messageSize += KafkaAdapterUtil.getStrObjSize(KafkaAdapterUtil.NB_MSG_SIZE_PROP);
+ messageSize += 6;
+
+ record.headers().add(KafkaAdapterUtil.NB_MSG_SEQ_PROP, String.valueOf(curCycle).getBytes());
+ record.headers().add(KafkaAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize).getBytes());
+
+ return record;
+ }
+
+ @Override
+ public KafkaOp apply(long cycle) {
+ String topicName = topicNameStrFunc.apply(cycle);
+ String clientId = getEffectiveClientId(cycle);
+ String cacheKey = KafkaAdapterUtil.buildCacheKey(
+ "producer", topicName, String.valueOf(cycle % kafkaClntCnt));
+
+ OpTimeTrackKafkaClient opTimeTrackKafkaProducer =
+ getOrCreateOpTimeTrackKafkaProducer(cacheKey, clientId);
+
+ ProducerRecord message = createKafkaMessage(
+ cycle,
+ topicName,
+ msgHeaderJsonStrFunc.apply(cycle),
+ msgKeyStrFunc.apply(cycle),
+ msgValueStrFunc.apply(cycle)
+ );
+
+ return new KafkaOp(
+ kafkaAdapterMetrics,
+ kafkaSpace,
+ opTimeTrackKafkaProducer,
+ message);
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterInvalidParamException.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterInvalidParamException.java
new file mode 100644
index 000000000..0a7fd2bc7
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterInvalidParamException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.kafka.exception;
+
+public class KafkaAdapterInvalidParamException extends RuntimeException {
+
+ public KafkaAdapterInvalidParamException(String paramName, String errDesc) {
+ super("Invalid setting for parameter (" + paramName + "): " + errDesc);
+ }
+
+ public KafkaAdapterInvalidParamException(String fullErrDesc) {
+ super(fullErrDesc);
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterUnexpectedException.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterUnexpectedException.java
new file mode 100644
index 000000000..c35cf0576
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterUnexpectedException.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.kafka.exception;
+
+public class KafkaAdapterUnexpectedException extends RuntimeException {
+
+ public KafkaAdapterUnexpectedException(String message) {
+ super(message);
+ printStackTrace();
+ }
+ public KafkaAdapterUnexpectedException(Exception e) {
+ super(e);
+ printStackTrace();
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterUnsupportedOpException.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterUnsupportedOpException.java
new file mode 100644
index 000000000..4386ddcd6
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/exception/KafkaAdapterUnsupportedOpException.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.kafka.exception;
+
+public class KafkaAdapterUnsupportedOpException extends RuntimeException {
+
+ public KafkaAdapterUnsupportedOpException(String pulsarOpType) {
+ super("Unsupported Pulsar adapter operation type: \"" + pulsarOpType + "\"");
+ }
+}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/KafkaOp.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/KafkaOp.java
new file mode 100644
index 000000000..d6e3a0573
--- /dev/null
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/KafkaOp.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (c) 2022 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.nosqlbench.adapter.kafka.ops;
+
+import com.codahale.metrics.Histogram;
+import io.nosqlbench.adapter.kafka.KafkaSpace;
+import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
+import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
+
+public class KafkaOp implements CycleOp