Merge pull request #847 from yabinmeng/main

Add NB5 Kafka driver
This commit is contained in:
Jonathan Shook 2022-12-12 22:49:01 -06:00 committed by GitHub
commit 17e4cb3230
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1871 additions and 7 deletions

81
adapter-kafka/pom.xml Normal file
View File

@ -0,0 +1,81 @@
<!--
~ Copyright (c) 2022 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>adapter-kafka</artifactId>
<packaging>jar</packaging>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.32-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<name>${project.artifactId}</name>
<description>
A Kafka driver for nosqlbench. This provides the ability to inject synthetic data
into a Kafka or a Kafka-compatible (e.g. Pulsar with S4K) system .
</description>
<properties>
<kafka.version>3.3.1</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.8.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "kafka")
public class KafkaDriverAdapter extends BaseDriverAdapter<KafkaOp, KafkaSpace> {
private final static Logger logger = LogManager.getLogger(KafkaDriverAdapter.class);
@Override
public OpMapper<KafkaOp> getOpMapper() {
DriverSpaceCache<? extends KafkaSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new KafkaOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends KafkaSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new KafkaSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(KafkaSpace.getConfigModel());
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka;
import io.nosqlbench.adapter.kafka.dispensers.MessageConsumerOpDispenser;
import io.nosqlbench.adapter.kafka.dispensers.MessageProducerOpDispenser;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class KafkaOpMapper implements OpMapper<KafkaOp> {
private final static Logger logger = LogManager.getLogger(KafkaOpMapper.class);
private final NBConfiguration cfg;
private final DriverSpaceCache<? extends KafkaSpace> spaceCache;
private final DriverAdapter adapter;
public KafkaOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends KafkaSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends KafkaOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
KafkaSpace kafkaSpace = spaceCache.get(spaceName);
/*
* If the user provides a body element, then they want to provide the JSON or
* a data structure that can be converted into JSON, bypassing any further
* specialized type-checking or op-type specific features
*/
if (op.isDefined("body")) {
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
}
else {
TypeAndTarget<KafkaOpType, String> opType = op.getTypeAndTarget(KafkaOpType.class, String.class);
return switch (opType.enumId) {
case MessageProduce ->
new MessageProducerOpDispenser(adapter, op, opType.targetFunction, kafkaSpace);
case MessageConsume ->
new MessageConsumerOpDispenser(adapter, op, opType.targetFunction, kafkaSpace);
};
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka;
public enum KafkaOpType {
// Kafka producer
MessageProduce,
// Kafka consumer
MessageConsume
}

View File

@ -0,0 +1,149 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.kafka.util.KafkaClientConf;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.ConcurrentHashMap;
public class KafkaSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(KafkaSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
// TODO: currently this NB Kafka driver only supports String type for message key and value
// add schema support in the future
private final ConcurrentHashMap<String, OpTimeTrackKafkaClient> opTimeTrackKafkaClients = new ConcurrentHashMap<>();
private final String bootstrapSvr;
private final String kafkaClientConfFileName;
private final KafkaClientConf kafkaClientConf;
// Whether to do strict error handling while sending/receiving messages
// - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop
// - No: pause the current thread that received the error message for 1 second and then continue processing
private final boolean strictMsgErrorHandling;
// Maximum time length to execute S4J operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes
private final long maxOpTimeInSec;
private final long activityStartTimeMills;
// Maximum number of Kafka clients
// - For Producer workload, this represents how many total producers to publish messages
// it must be the same value as the NB "threads" parameter
// - For Consumer workload, this represents how many total consumers per consumer group to subscribe messages
//
private final int clntNum;
// Maximum number of Kafka consumer groups
// - This is only relevant for Consumer workload
// - (clntNum * consumerGrpNum) is the total consumer thread number and must be the same
// as the NB "threads" parameter
private final int consumerGrpNum;
private long totalCycleNum;
public KafkaSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
this.bootstrapSvr = cfg.get("bootstrap_server");
this.clntNum =
NumberUtils.toInt(cfg.getOptional("num_clnt").orElse("1"));
this.consumerGrpNum =
NumberUtils.toInt(cfg.getOptional("num_cons_grp").orElse("1"));
this.maxOpTimeInSec =
NumberUtils.toLong(cfg.getOptional("max_op_time").orElse("0L"));
this.strictMsgErrorHandling =
BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false"));
this.kafkaClientConfFileName = cfg.get("config");
this.kafkaClientConf = new KafkaClientConf(kafkaClientConfFileName);
this.activityStartTimeMills = System.currentTimeMillis();
}
@Override
public void close() {
shutdownSpace();
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(KafkaSpace.class)
.add(Param.defaultTo("bootstrap_server", "pulsar://localhost:9020")
.setDescription("Kafka bootstrap server URL."))
.add(Param.defaultTo("config", "config.properties")
.setDescription("Kafka client connection configuration property file."))
.add(Param.defaultTo("num_clnt", 1)
.setDescription("Number of Kafka clients. For consumer, this is the number of consumers per consumer group"))
.add(Param.defaultTo("num_cons_grp", 1)
.setDescription("Number of consumer groups (only relevant for Kafka consumer workload). "))
.add(Param.defaultTo("max_op_time", 0)
.setDescription("Maximum time (in seconds) to run NB Kafka testing scenario."))
.add(Param.defaultTo("strict_msg_error_handling", false)
.setDescription("Whether to do strict error handling which is to stop NB Kafka execution."))
.asReadOnly();
}
public OpTimeTrackKafkaClient getOpTimeTrackKafkaClient(String cacheKey) {
return opTimeTrackKafkaClients.get(cacheKey);
}
public void addOpTimeTrackKafkaClient(String cacheKey, OpTimeTrackKafkaClient client) {
opTimeTrackKafkaClients.put(cacheKey, client);
}
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; }
public String getBootstrapSvr() { return this.bootstrapSvr; }
public KafkaClientConf getKafkaClientConf() { return kafkaClientConf; }
public int getClntNum() { return this.clntNum; }
public int getConsumerGrpNum() { return this.consumerGrpNum; }
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
public void shutdownSpace() {
try {
// Pause 5 seconds before closing producers/consumers
KafkaAdapterUtil.pauseCurThreadExec(5);
for (OpTimeTrackKafkaClient client : opTimeTrackKafkaClients.values()) {
client.close();
}
}
catch (Exception e) {
e.printStackTrace();
throw new KafkaAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
}
}
}

View File

@ -0,0 +1,170 @@
package io.nosqlbench.adapter.kafka.dispensers;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, KafkaSpace> {
private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
protected final ParsedOp parsedOp;
protected final KafkaAdapterMetrics kafkaAdapterMetrics;
protected final KafkaSpace kafkaSpace;
protected final int kafkaClntCnt;
protected final int consumerGrpCnt;
// Doc-level parameter: async_api (default: true)
// - For Producer workload, this means waiting for message send ack. synchronously or asynchronously
// - For Consumer workload, this means doing manual message commit synchronously or asynchronously
// Only relevant when auto.commit is disabled
protected final boolean asyncAPI;
protected final LongFunction<String> topicNameStrFunc;
protected final Map<String, String> topicConfMap = new HashMap<>();
protected final int totalThreadNum;
protected final long totalCycleNum;
public KafkaBaseOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> topicNameStrFunc,
KafkaSpace kafkaSpace) {
super(adapter, op);
this.parsedOp = op;
this.kafkaSpace = kafkaSpace;
String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp);
this.kafkaAdapterMetrics = new KafkaAdapterMetrics(defaultMetricsPrefix);
kafkaAdapterMetrics.initS4JAdapterInstrumentation();
this.asyncAPI =
parsedOp.getStaticConfigOr(KafkaAdapterUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, Boolean.TRUE);
this.topicNameStrFunc = topicNameStrFunc;
this.topicConfMap.putAll(kafkaSpace.getKafkaClientConf().getTopicConfMap());
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class));
kafkaSpace.setTotalCycleNum(totalCycleNum);
this.kafkaClntCnt = kafkaSpace.getClntNum();
this.consumerGrpCnt = kafkaSpace.getConsumerGrpNum();
this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class));
assert (kafkaClntCnt > 0);
assert (consumerGrpCnt > 0);
boolean validThreadNum =
( ((this instanceof MessageProducerOpDispenser) && (totalThreadNum == kafkaClntCnt)) ||
((this instanceof MessageConsumerOpDispenser) && (totalThreadNum == kafkaClntCnt*consumerGrpCnt)) );
if (!validThreadNum) {
throw new KafkaAdapterInvalidParamException(
"Incorrect settings of 'threads', 'num_clnt', or 'num_cons_grp' -- " +
totalThreadNum + ", " + kafkaClntCnt + ", " + consumerGrpCnt);
}
}
public KafkaSpace getKafkaSpace() { return kafkaSpace; }
public KafkaAdapterMetrics getKafkaAdapterMetrics() { return kafkaAdapterMetrics; }
protected LongFunction<Boolean> lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) {
LongFunction<Boolean> booleanLongFunction;
booleanLongFunction = (l) -> parsedOp.getOptionalStaticConfig(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> BooleanUtils.toBoolean(value))
.orElse(defaultValue);
logger.info("{}: {}", paramName, booleanLongFunction.apply(0));
return booleanLongFunction;
}
protected LongFunction<Set<String>> lookupStaticStrSetOpValueFunc(String paramName) {
LongFunction<Set<String>> setStringLongFunction;
setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<String > set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
.map(String::trim)
.filter(Predicate.not(String::isEmpty))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
return set;
}).orElse(Collections.emptySet());
logger.info("{}: {}", paramName, setStringLongFunction.apply(0));
return setStringLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<Integer> lookupStaticIntOpValueFunc(String paramName, int defaultValue) {
LongFunction<Integer> integerLongFunction;
integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> NumberUtils.toInt(value))
.map(value -> {
if (value < 0) return 0;
else return value;
}).orElse(defaultValue);
logger.info("{}: {}", paramName, integerLongFunction.apply(0));
return integerLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) {
LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class)
.orElse((l) -> defaultValue);
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction;
}
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName) {
return lookupOptionalStrOpValueFunc(paramName, "");
}
// Mandatory Op parameter. Throw an error if not specified or having empty value
protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) {
LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class);
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction;
}
}

View File

@ -0,0 +1,140 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka.dispensers;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.LongFunction;
public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageConsumerOpDispenser");
private final Map<String, String> consumerClientConfMap = new HashMap<>();
// The timeout value as message Poll interval (in seconds)
protected final int msgPollIntervalInSec;
// Manual commit frequency
// - # of received messages / sec.
// - This is only relevant when the effective setting (global level and statement level)
// of "enable.auto.commit" is false
protected final int maxMsgCntPerCommit;
protected boolean autoCommitEnabled;
public MessageConsumerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
KafkaSpace kafkaSpace) {
super(adapter, op, tgtNameFunc, kafkaSpace);
this.consumerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getConsumerConfMap());
consumerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
this.msgPollIntervalInSec =
NumberUtils.toInt(parsedOp.getStaticConfigOr("msg_poll_interval", "0"));
this.maxMsgCntPerCommit =
NumberUtils.toInt(parsedOp.getStaticConfig("manual_commit_batch_num", String.class));
this.autoCommitEnabled = true;
if (maxMsgCntPerCommit > 0) {
this.autoCommitEnabled = false;
consumerClientConfMap.put("enable.auto.commit", "false");
} else {
if (consumerClientConfMap.containsKey("enable.auto.commit")) {
this.autoCommitEnabled = BooleanUtils.toBoolean(consumerClientConfMap.get("enable.auto.commit"));
}
}
}
private String getEffectiveGroupId(long cycle) {
int grpIdx = (int) (cycle % consumerGrpCnt);
String defaultGrpNamePrefix = "nb-grp";
if (consumerClientConfMap.containsKey("group.id")) {
defaultGrpNamePrefix = consumerClientConfMap.get("group.id");
}
return defaultGrpNamePrefix + "-" + grpIdx;
}
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer(
String cacheKey,
String groupId,
String topicName)
{
OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (opTimeTrackKafkaClient == null) {
Properties consumerConfProps = new Properties();
consumerConfProps.putAll(consumerClientConfMap);
consumerConfProps.put("group.id", groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfProps);
synchronized (this) {
consumer.subscribe(Arrays.asList(topicName));
}
if (logger.isDebugEnabled()) {
logger.debug("Kafka consumer created: {} -- {}", cacheKey, consumer);
}
opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(
kafkaSpace, asyncAPI, msgPollIntervalInSec, autoCommitEnabled, maxMsgCntPerCommit, consumer);
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
return opTimeTrackKafkaClient;
}
@Override
public KafkaOp apply(long cycle) {
String topicName = topicNameStrFunc.apply(cycle);
String groupId = getEffectiveGroupId(cycle);
String cacheKey = KafkaAdapterUtil.buildCacheKey(
"consumer", topicName, groupId, String.valueOf(cycle % kafkaClntCnt));
if (StringUtils.isBlank(groupId)) {
throw new KafkaAdapterInvalidParamException("An effective \"group.id\" is needed for a consumer!");
}
OpTimeTrackKafkaClient opTimeTrackKafkaConsumer =
getOrCreateOpTimeTrackKafkaConsumer(cacheKey, groupId, topicName);
return new KafkaOp(
kafkaAdapterMetrics,
kafkaSpace,
opTimeTrackKafkaConsumer,
null);
}
}

View File

@ -0,0 +1,199 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka.dispensers;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.LongFunction;
public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
public static final String MSG_HEADER_OP_PARAM = "msg_header";
public static final String MSG_KEY_OP_PARAM = "msg_key";
public static final String MSG_BODY_OP_PARAM = "msg_body";
private final Map<String, String> producerClientConfMap = new HashMap<>();
protected final int txnBatchNum;
private final LongFunction<String> msgHeaderJsonStrFunc;
private final LongFunction<String> msgKeyStrFunc;
private final LongFunction<String> msgValueStrFunc;
public MessageProducerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
KafkaSpace kafkaSpace) {
super(adapter, op, tgtNameFunc, kafkaSpace);
this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
this.txnBatchNum =
parsedOp.getStaticConfigOr(KafkaAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0));
this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
this.msgValueStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
}
private String getEffectiveClientId(long cycle) {
if (producerClientConfMap.containsKey("client.id")) {
String defaultClientIdPrefix = producerClientConfMap.get("client.id");
int clntIdx = (int) (cycle % kafkaClntCnt);
return defaultClientIdPrefix + "-" + clntIdx;
}
else {
return "";
}
}
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer(
String cacheKey, String clientId)
{
OpTimeTrackKafkaClient opTimeTrackKafkaClient = kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (opTimeTrackKafkaClient == null) {
Properties producerConfProps = new Properties();
producerConfProps.putAll(producerClientConfMap);
producerConfProps.put("client.id", clientId);
// When transaction batch number is less than 2, it is treated effectively as no-transaction
if (txnBatchNum < 2)
producerConfProps.remove("transactional.id");
String baseTransactId = "";
if (producerConfProps.containsKey("transactional.id")) {
baseTransactId = producerConfProps.get("transactional.id").toString();
producerConfProps.put("transactional.id", baseTransactId + "-" + cacheKey);
}
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfProps);
if (producerConfProps.containsKey("transactional.id")) {
producer.initTransactions();
}
if (logger.isDebugEnabled()) {
logger.debug("Producer created: {} -- {}", cacheKey, producer);
}
opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer(
kafkaSpace,
asyncAPI,
StringUtils.isNotBlank(producerClientConfMap.get("transactional.id")),
txnBatchNum,
producer);
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
return opTimeTrackKafkaClient;
}
private ProducerRecord<String, String> createKafkaMessage(
long curCycle,
String topicName,
String msgHeaderRawJsonStr,
String msgKey,
String msgValue
) {
if (StringUtils.isAllBlank(msgKey, msgValue)) {
throw new KafkaAdapterInvalidParamException("Message key and value can't both be empty!");
}
int messageSize = KafkaAdapterUtil.getStrObjSize(msgKey) + KafkaAdapterUtil.getStrObjSize(msgValue);
ProducerRecord<String, String> record = new ProducerRecord<>(topicName, msgKey, msgValue);
// Check if msgHeaderRawJsonStr is a valid JSON string with a collection of key/value pairs
// - if Yes, convert it to a map
// - otherwise, log an error message and ignore message headers without throwing a runtime exception
Map<String, String> msgHeaderProperties = new HashMap<>();
if (!StringUtils.isBlank(msgHeaderRawJsonStr)) {
try {
msgHeaderProperties = KafkaAdapterUtil.convertJsonToMap(msgHeaderRawJsonStr);
} catch (Exception e) {
logger.warn(
"Error parsing message property JSON string {}, ignore message properties!",
msgHeaderRawJsonStr);
}
}
for (Map.Entry<String, String> entry : msgHeaderProperties.entrySet()) {
String headerKey = entry.getKey();
String headerValue = entry.getValue();
messageSize += KafkaAdapterUtil.getStrObjSize(headerKey) + KafkaAdapterUtil.getStrObjSize(headerValue);
if (! StringUtils.isAnyBlank(headerKey, headerValue)) {
record.headers().add(headerKey, headerValue.getBytes());
}
}
// NB-specific headers
messageSize += KafkaAdapterUtil.getStrObjSize(KafkaAdapterUtil.NB_MSG_SEQ_PROP);
messageSize += 8;
messageSize += KafkaAdapterUtil.getStrObjSize(KafkaAdapterUtil.NB_MSG_SIZE_PROP);
messageSize += 6;
record.headers().add(KafkaAdapterUtil.NB_MSG_SEQ_PROP, String.valueOf(curCycle).getBytes());
record.headers().add(KafkaAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize).getBytes());
return record;
}
@Override
public KafkaOp apply(long cycle) {
String topicName = topicNameStrFunc.apply(cycle);
String clientId = getEffectiveClientId(cycle);
String cacheKey = KafkaAdapterUtil.buildCacheKey(
"producer", topicName, String.valueOf(cycle % kafkaClntCnt));
OpTimeTrackKafkaClient opTimeTrackKafkaProducer =
getOrCreateOpTimeTrackKafkaProducer(cacheKey, clientId);
ProducerRecord<String, String> message = createKafkaMessage(
cycle,
topicName,
msgHeaderJsonStrFunc.apply(cycle),
msgKeyStrFunc.apply(cycle),
msgValueStrFunc.apply(cycle)
);
return new KafkaOp(
kafkaAdapterMetrics,
kafkaSpace,
opTimeTrackKafkaProducer,
message);
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.kafka.exception;
public class KafkaAdapterInvalidParamException extends RuntimeException {
public KafkaAdapterInvalidParamException(String paramName, String errDesc) {
super("Invalid setting for parameter (" + paramName + "): " + errDesc);
}
public KafkaAdapterInvalidParamException(String fullErrDesc) {
super(fullErrDesc);
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.kafka.exception;
public class KafkaAdapterUnexpectedException extends RuntimeException {
public KafkaAdapterUnexpectedException(String message) {
super(message);
printStackTrace();
}
public KafkaAdapterUnexpectedException(Exception e) {
super(e);
printStackTrace();
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.kafka.exception;
public class KafkaAdapterUnsupportedOpException extends RuntimeException {
public KafkaAdapterUnsupportedOpException(String pulsarOpType) {
super("Unsupported Pulsar adapter operation type: \"" + pulsarOpType + "\"");
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.kafka.ops;
import com.codahale.metrics.Histogram;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public class KafkaOp implements CycleOp<Object> {
private final KafkaAdapterMetrics kafkaAdapterMetrics;
protected final KafkaSpace kafkaSpace;
private final OpTimeTrackKafkaClient opTimeTrackKafkaClient;
private final Object cycleObj;
protected final Histogram messageSizeHistogram;
public KafkaOp(KafkaAdapterMetrics kafkaAdapterMetrics,
KafkaSpace kafkaSpace,
OpTimeTrackKafkaClient opTimeTrackKafkaClient,
Object cycleObj)
{
this.kafkaAdapterMetrics = kafkaAdapterMetrics;
this.kafkaSpace = kafkaSpace;
this.opTimeTrackKafkaClient = opTimeTrackKafkaClient;
this.cycleObj = cycleObj;
this.messageSizeHistogram = kafkaAdapterMetrics.getMessagesizeHistogram();
}
@Override
public Object apply(long value) {
opTimeTrackKafkaClient.process(value, cycleObj);
return null;
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.kafka.ops;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
abstract public class OpTimeTrackKafkaClient {
private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaClient");
protected final KafkaSpace kafkaSpace;
protected final long activityStartTime;
// Maximum time length to execute S4J operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes
protected final long maxOpTimeInSec;
public OpTimeTrackKafkaClient(KafkaSpace kafkaSpace) {
this.kafkaSpace = kafkaSpace;
this.activityStartTime = kafkaSpace.getActivityStartTimeMills();
this.maxOpTimeInSec = kafkaSpace.getMaxOpTimeInSec();
}
public void process(long cycle, Object cycleObj) {
long timeElapsedMills = System.currentTimeMillis() - activityStartTime;
// If maximum operation duration is specified, only process messages
// before the maximum duration threshold is reached. Otherwise, this is
// just no-op.
if ( (maxOpTimeInSec == 0) || (timeElapsedMills <= (maxOpTimeInSec*1000)) ) {
cycleMsgProcess(cycle, cycleObj);
}
}
abstract void cycleMsgProcess(long cycle, Object cycleObj);
abstract public void close();
}

View File

@ -0,0 +1,186 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.kafka.ops;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer");
private final int msgPoolIntervalInMs;
private final boolean asyncMsgCommit;
private final boolean autoCommitEnabled;
private final int maxMsgCntPerCommit;
// Keep track the manual commit count
private final ThreadLocal<Integer> manualCommitTrackingCnt = ThreadLocal.withInitial(() -> 0);
private final KafkaConsumer<String, String> consumer;
public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace,
boolean asyncMsgCommit,
int msgPoolIntervalInMs,
boolean autoCommitEnabled,
int maxMsgCntPerCommit,
KafkaConsumer<String, String> consumer) {
super(kafkaSpace);
this.msgPoolIntervalInMs = msgPoolIntervalInMs;
this.asyncMsgCommit = asyncMsgCommit;
this.autoCommitEnabled = autoCommitEnabled;
this.maxMsgCntPerCommit = maxMsgCntPerCommit;
this.consumer = consumer;
}
public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); }
public void incManualCommitTrackingCnt() {
int curVal = getManualCommitTrackingCnt();
manualCommitTrackingCnt.set(curVal + 1);
}
public void resetManualCommitTrackingCnt() {
manualCommitTrackingCnt.set(0);
}
private boolean msgCommitNeeded(long cycle) {
// Whether to commit the transaction which happens when:
// - "txn_batch_num" has been reached since last reset
boolean commitNeeded = !autoCommitEnabled;
if (commitNeeded) {
int msgCommitTackingCnt = manualCommitTrackingCnt.get();
if ( ( (msgCommitTackingCnt > 0) && ((msgCommitTackingCnt % maxMsgCntPerCommit) == 0) ) ||
( cycle >= (kafkaSpace.getTotalCycleNum() - 1) ) ) {
commitNeeded = true;
if (logger.isDebugEnabled()) {
logger.debug("Manually commit message ({}, {}, {})",
manualCommitTrackingCnt, msgCommitTackingCnt, cycle);
}
}
else {
commitNeeded = false;
}
}
return commitNeeded;
}
private String printRecvedMsg(ConsumerRecord<String, String> record) {
Headers headers = record.headers();
Header nbMsgSeqHeader = headers.lastHeader(KafkaAdapterUtil.NB_MSG_SEQ_PROP);
StringBuilder sb = new StringBuilder();
if (nbMsgSeqHeader != null) {
sb.append("Header (MsgSeq): " + new String(nbMsgSeqHeader.value()) + "; ");
}
sb.append("Key: " + record.key() + "; ");
sb.append("Value: " + record.value() + "; ");
return sb.toString();
}
@Override
void cycleMsgProcess(long cycle, Object cycleObj) {
synchronized (this) {
ConsumerRecords<String, String> records = consumer.poll(msgPoolIntervalInMs);
for (ConsumerRecord<String, String> record : records) {
if (record != null) {
if (logger.isDebugEnabled()) {
logger.debug(
"Receiving message is successful: [{}] - offset({}), cycle ({})",
printRecvedMsg(record),
record.offset(),
cycle);
}
if (!autoCommitEnabled) {
boolean bCommitMsg = msgCommitNeeded(cycle);
if (bCommitMsg) {
if (!asyncMsgCommit) {
consumer.commitSync();
if (logger.isDebugEnabled()) {
logger.debug(
"Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})",
cycle,
maxMsgCntPerCommit);
}
} else {
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (logger.isDebugEnabled()) {
if (e == null) {
logger.debug(
"Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})",
cycle,
maxMsgCntPerCommit);
} else {
logger.debug(
"Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})",
cycle,
maxMsgCntPerCommit,
e.getMessage());
}
}
}
});
}
resetManualCommitTrackingCnt();
} else {
incManualCommitTrackingCnt();
}
}
}
}
}
}
@Override
public void close() {
try {
if (consumer != null) {
if (!asyncMsgCommit)
consumer.commitSync();
else
consumer.commitAsync();
consumer.close();
}
this.manualCommitTrackingCnt.remove();
}
catch (IllegalStateException ise) {
// If a consumer is already closed, that's fine.
}
catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,176 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.kafka.ops;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaProducer");
private final boolean transactionEnabled;
private final boolean asyncMsgAck;
private final boolean transactEnabledConfig;
private final int txnBatchNum;
// Keep track the transaction count per thread
private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
private final KafkaProducer<String, String> producer;
public OpTimeTrackKafkaProducer(KafkaSpace kafkaSpace,
boolean asyncMsgAck,
boolean transactEnabledConfig,
int txnBatchNum,
KafkaProducer<String, String> producer) {
super(kafkaSpace);
this.asyncMsgAck = asyncMsgAck;
this.transactEnabledConfig = transactEnabledConfig;
this.txnBatchNum = txnBatchNum;
this.transactionEnabled = transactEnabledConfig && (txnBatchNum > 2);
this.producer = producer;
}
public int getTxnBatchTrackingCnt() { return txnBatchTrackingCnt.get(); }
public void incTxnBatchTrackingCnt() {
int curVal = getTxnBatchTrackingCnt();
txnBatchTrackingCnt.set(curVal + 1);
}
private boolean commitCurTransactionNeeded(long cycle) {
// Whether to commit the transaction which happens when:
// - "txn_batch_num" has been reached since last reset
boolean commitCurTrans = transactionEnabled;
if (commitCurTrans) {
int txnBatchTackingCnt = getTxnBatchTrackingCnt();
if ( ( (txnBatchTackingCnt > 0) && ((txnBatchTackingCnt % txnBatchNum) == 0) ) ||
( cycle >= (kafkaSpace.getTotalCycleNum() - 1) ) ) {
if (logger.isDebugEnabled()) {
logger.debug("Commit transaction ({}, {})",
txnBatchTackingCnt, cycle);
}
}
else {
commitCurTrans = false;
}
}
return commitCurTrans;
}
private boolean startNewTransactionNeeded(long cycle) {
boolean startNewTransact = transactionEnabled;
if (startNewTransact) {
if ( (cycle > 0) && (cycle < (kafkaSpace.getTotalCycleNum() - 1)) ) {
startNewTransact = commitCurTransactionNeeded(cycle);
} else {
startNewTransact = false;
}
}
return startNewTransact;
}
@Override
void cycleMsgProcess(long cycle, Object cycleObj) {
// For producer, cycleObj represents a "message" (ProducerRecord)
assert (cycleObj != null);
try {
ProducerRecord<String, String> message = (ProducerRecord<String, String>) cycleObj;
boolean startNewTransNeeded = startNewTransactionNeeded(cycle);
boolean commitCurTransNeeded = commitCurTransactionNeeded(cycle);
if (commitCurTransNeeded) {
producer.commitTransaction();
if (logger.isDebugEnabled()) {
logger.debug("Transaction committed ( {}, {}, {}, {} )",
cycle, transactEnabledConfig, txnBatchNum, getTxnBatchTrackingCnt());
}
incTxnBatchTrackingCnt();
}
if (startNewTransNeeded) {
producer.beginTransaction();
}
Future<RecordMetadata> responseFuture = producer.send(message, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (asyncMsgAck) {
if (logger.isDebugEnabled()) {
logger.debug("Message sending with async ack. is successful ( {} ) - {}",
cycle, recordMetadata);
}
}
}
});
if (!asyncMsgAck) {
try {
RecordMetadata recordMetadata = responseFuture.get();
if (logger.isDebugEnabled()) {
logger.debug("Message sending with sync ack. is successful ( {} ) - {}",
cycle, recordMetadata);
}
} catch (InterruptedException | ExecutionException e) {
KafkaAdapterUtil.messageErrorHandling(
e,
kafkaSpace.isStrictMsgErrorHandling(),
"Unexpected error when waiting to receive message-send ack from the Kafka cluster." +
"\n-----\n" + e);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
}
public void close() {
try {
if (producer != null) {
if (transactionEnabled) producer.commitTransaction();
producer.close();
}
this.txnBatchTrackingCnt.remove();
}
catch (IllegalStateException ise) {
// If a producer is already closed, that's fine.
}
catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka.util;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class KafkaAdapterMetrics implements NBNamedElement {
private final static Logger logger = LogManager.getLogger("S4JAdapterMetrics");
private final String defaultAdapterMetricsPrefix;
private Histogram messageSizeHistogram;
private Timer bindTimer;
private Timer executeTimer;
public KafkaAdapterMetrics(String defaultMetricsPrefix) {
this.defaultAdapterMetricsPrefix = defaultMetricsPrefix;
}
@Override
public String getName() {
return "S4JAdapterMetrics";
}
public void initS4JAdapterInstrumentation() {
// Histogram metrics
this.messageSizeHistogram =
ActivityMetrics.histogram(
this,
defaultAdapterMetricsPrefix + "message_size",
ActivityMetrics.DEFAULT_HDRDIGITS);
// Timer metrics
this.bindTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "bind",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.executeTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
}
public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return executeTimer; }
public Histogram getMessagesizeHistogram() { return messageSizeHistogram; }
}

View File

@ -0,0 +1,115 @@
package io.nosqlbench.adapter.kafka.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.amazonaws.util.Base64;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class KafkaAdapterUtil {
private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class);
///////
// Valid document level parameters for JMS NB yaml file
public enum DOC_LEVEL_PARAMS {
// Blocking message producing or consuming
ASYNC_API("async_api"),
// Transaction batch size
TXN_BATCH_NUM("txn_batch_num");
public final String label;
DOC_LEVEL_PARAMS(String label) {
this.label = label;
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidDocLevelParamList() {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public final static String NB_MSG_SEQ_PROP = "NBMsgSeqProp";
public final static String NB_MSG_SIZE_PROP = "NBMsgSize";
// Get simplified NB thread name
public static String getSimplifiedNBThreadName(String fullThreadName) {
assert (StringUtils.isNotBlank(fullThreadName));
if (StringUtils.contains(fullThreadName, '/'))
return StringUtils.substringAfterLast(fullThreadName, "/");
else
return fullThreadName;
}
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonStr, new TypeReference<Map<String, String>>(){});
}
public static List<Object> convertJsonToObjList(String jsonStr) throws Exception {
ObjectMapper mapper = new ObjectMapper();
return Arrays.asList(mapper.readValue(jsonStr, Object[].class));
}
public static String buildCacheKey(String... keyParts) {
String combinedStr = String.join("::", keyParts);
return Base64.encodeAsString(combinedStr.getBytes());
}
public static void pauseCurThreadExec(int pauseInSec) {
if (pauseInSec > 0) {
try {
Thread.sleep(pauseInSec * 1000);
}
catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
public static int getStrObjSize(String strObj) {
// << https://docs.oracle.com/javase/6/docs/api/java/lang/String.html >>
// A String represents a string in the UTF-16 format ...
return strObj.getBytes(StandardCharsets.UTF_16).length;
}
public static void messageErrorHandling(Exception exception, boolean strictErrorHandling, String errorMsg) {
exception.printStackTrace();
if (strictErrorHandling) {
throw new RuntimeException(errorMsg + " [ " + exception.getMessage() + " ]");
}
else {
KafkaAdapterUtil.pauseCurThreadExec(1);
}
}
}

View File

@ -0,0 +1,127 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.kafka.util;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.FileBasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class KafkaClientConf {
private final static Logger logger = LogManager.getLogger(KafkaClientConf.class);
public static final String TOPIC_CONF_PREFIX = "topic";
public static final String PRODUCER_CONF_PREFIX = "producer";
public static final String CONSUMER_CONF_PREFIX = "consumer";
// https://kafka.apache.org/documentation/#topicconfigs
private Map<String, String> topicConfMap = new HashMap<>();
private Map<String, String> producerConfMap = new HashMap<>();
private Map<String, String> consumerConfMap = new HashMap<>();
public KafkaClientConf(String clientConfFileName) {
//////////////////
// Read related Pulsar client configuration settings from a file
readRawConfFromFile(clientConfFileName);
//////////////////
// Ignores the following Kafka producer/consumer configurations since
// they're either not supported in the Kafka API or they must be specified
// as the NB CLI parameters or the NB yaml file parameters.
// <<< https://kafka.apache.org/documentation/#producerconfigs >>>
// producer config
// * bootstrap.servers
producerConfMap.remove("bootstrap.servers");
// <<< https://kafka.apache.org/documentation/#consumerconfigs >>>
// consumer config
// * bootstrap.servers
consumerConfMap.remove("bootstrap.servers");
}
public void readRawConfFromFile(String fileName) {
File file = new File(fileName);
try {
String canonicalFilePath = file.getCanonicalPath();
Parameters params = new Parameters();
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
.configure(params.properties()
.setFileName(fileName));
Configuration config = builder.getConfiguration();
for (Iterator<String> it = config.getKeys(); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if (!StringUtils.isBlank(confVal)) {
// Get client connection specific configuration settings, removing "topic." prefix
if (StringUtils.startsWith(confKey, TOPIC_CONF_PREFIX)) {
topicConfMap.put(confKey.substring(TOPIC_CONF_PREFIX.length() + 1), confVal);
}
// Get producer specific configuration settings, removing "producer." prefix
else if (StringUtils.startsWith(confKey, PRODUCER_CONF_PREFIX)) {
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), confVal);
}
// Get consumer specific configuration settings, removing "consumer." prefix
else if (StringUtils.startsWith(confKey, CONSUMER_CONF_PREFIX)) {
consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), confVal);
}
}
}
} catch (IOException ioe) {
logger.error("Can't read the specified config properties file: " + fileName);
ioe.printStackTrace();
} catch (ConfigurationException cex) {
logger.error("Error loading configuration items from the specified config properties file: " + fileName + ":" + cex.getMessage());
cex.printStackTrace();
}
}
public Map<String, String> getTopicConfMap() { return topicConfMap; }
public Map<String, String> getProducerConfMap() { return producerConfMap; }
public Map<String, String> getConsumerConfMap() { return consumerConfMap; }
public String toString() {
return new ToStringBuilder(this).
append("topicConfMap", topicConfMap).
append("producerConfMap", producerConfMap).
append("consumerConfMap", consumerConfMap).
toString();
}
}

View File

@ -0,0 +1,15 @@
# Usage
```bash
## Kafka Producer
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://10.166.90.94:9092
## Kafka Consumer
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://10.166.90.94:9092
```
# Example NB Yaml
[kafka_producer.yaml](./kafka_producer.yaml)
[kafka_consumer.yaml](./kafka_consumer.yaml)

View File

@ -0,0 +1,30 @@
#####
# Topic related configurations (global) - topic.***
# - Valid settings: https://kafka.apache.org/documentation/#topicconfigs
#
#--------------------------------------
topic.compression.type=uncompressed
topic.flush.messages=2
#####
# Producer related configurations (global) - topic.***
# - Valid settings: https://kafka.apache.org/documentation/#producerconfigs
#
#--------------------------------------
producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
#producer.client.id=nbDftClient
#producer.transactional.id=nbDftTxn
#####
# Consumer related configurations (global) - topic.***
# - Valid settings: https://kafka.apache.org/documentation/#consumerconfigs
#
#--------------------------------------
consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
consumer.group.id=nbDftGrp
#consumer.isolation.level=read_uncommitted
#consumer.enable.auto.commit=true

View File

@ -0,0 +1,23 @@
# document level parameters that apply to all Pulsar client types:
params:
# Whether to commit message asynchronously
# - default: true
# - only relevant for manual commit
async_api: "true"
blocks:
msg-consume-block:
ops:
op1:
## TODO: can make this as a list of topics
## The value represents the topic name
MessageConsume: "nbktest1"
# The timeout value to poll messages (unit: milli-seconds)
# - default: 0
msg_poll_interval: "10"
# The number of messages to receive before doing a manual commit
# - default: 0
# When setting to 0, it could mean doing auto commit (determined by "enable.auto.commit" parameter)
manual_commit_batch_num: "0"

View File

@ -0,0 +1,39 @@
bindings:
mykey: Mod(5); ToString(); Prefix("key-")
mytext_val: AlphaNumericString(30)
random_text_val1: AlphaNumericString(10)
random_text_val2: AlphaNumericString(20)
# document level parameters that apply to all Pulsar client types:
params:
# whether to confirm message send ack. asynchronously
# - default: true
async_api: "true"
# The number of messages to put in one transaction
# - default: 0
# - value 0 or 1 means no transaction
# - it also requires "transactional.id" parameter is set
txn_batch_num: 5
blocks:
msg-produce-block:
ops:
op1:
## The value represents a topic name
MessageProduce: "nbktest1"
## (Optional) Kafka message headers (in JSON format).
msg_header: |
{
"header-1": "{random_text_val1}",
"header-2": "{random_text_val2}"
}
## (Optional) Kafka message key.
# - message key and value can't be both empty at the same time
msg_key: "{mykey}"
## (Optional) Kafka message value.
# - message key and value can't be both empty at the same time
msg_body: "{mytext_val}"

View File

@ -54,7 +54,7 @@ public class S4JSpace implements AutoCloseable {
private final String pulsarSvcUrl;
private final String webSvcUrl;
private final String s4jClientConfFileName;
private S4JClientConf s4JClientConf;
private final S4JClientConf s4JClientConf;
private final int sessionMode;
// Whether to do strict error handling while sending/receiving messages
@ -116,6 +116,8 @@ public class S4JSpace implements AutoCloseable {
cfg.getOptional("session_mode").orElse(""));
this.s4JClientConf = new S4JClientConf(pulsarSvcUrl, webSvcUrl, s4jClientConfFileName);
this.setS4JActivityStartTimeMills(System.currentTimeMillis());
this.initializeSpace(s4JClientConf);
}

View File

@ -17,8 +17,6 @@ package io.nosqlbench.adapter.s4j.dispensers;
*/
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.adapter.s4j.util.*;
@ -93,8 +91,8 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
this.txnBatchNum =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0));
this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticValue("threads"));
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticValue("cycles"));
this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticConfig("threads", String.class));
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticConfig("cycles", String.class));
s4jSpace.setTotalCycleNum(totalCycleNum);
}

View File

@ -100,6 +100,12 @@
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-kafka</artifactId>
<version>4.17.32-SNAPSHOT</version>
</dependency>
</dependencies>
<build>

View File

@ -63,6 +63,7 @@
<module>adapter-mongodb</module>
<module>adapter-pulsar</module>
<module>adapter-s4j</module>
<module>adapter-kafka</module>
<!-- VIRTDATA MODULES -->
@ -77,7 +78,6 @@
<!-- Documentation -->
<module>docsys</module>
</modules>
<profiles>
@ -89,7 +89,6 @@
<modules>
<module>nb</module>
<module>driver-tcp</module>
<module>driver-kafka</module>
<module>driver-jmx</module>
<module>driver-jdbc</module>
<module>driver-cockroachdb</module>