Merge pull request #1143 from MMirelli/mm-ls797-continue-add-e2e-error-metrics-to-kafka-adapter

[kafka-adapter] Add e2e error metrics -- to be continued
This commit is contained in:
Jonathan Shook
2023-04-25 09:10:32 -05:00
committed by GitHub
31 changed files with 401 additions and 76 deletions

View File

@@ -23,6 +23,7 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
@@ -50,10 +51,15 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
// - This is only relevant when the effective setting (global level and statement level)
// of "enable.auto.commit" is false
protected final int maxMsgCntPerCommit;
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
protected boolean autoCommitEnabled;
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
private final ThreadLocal<Map<String, ReceivedMessageSequenceTracker>>
receivedMessageSequenceTrackersForTopicThreadLocal = ThreadLocal.withInitial(HashMap::new);
protected final LongFunction<Boolean> seqTrackingFunc;
public MessageConsumerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
@@ -80,6 +86,9 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
}
this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none");
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
;
}
private String getEffectiveGroupId(long cycle) {
@@ -129,15 +138,26 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
autoCommitEnabled,
maxMsgCntPerCommit,
consumer,
kafkaAdapterMetrics,
EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
kafkaAdapterMetrics
);
this::getReceivedMessageSequenceTracker,
seqTrackingFunc.apply(cycle));
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
return opTimeTrackKafkaClient;
}
private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(String topicName) {
return receivedMessageSequenceTrackersForTopicThreadLocal.get()
.computeIfAbsent(topicName, k -> createReceivedMessageSequenceTracker());
}
private ReceivedMessageSequenceTracker createReceivedMessageSequenceTracker() {
return new ReceivedMessageSequenceTracker(kafkaAdapterMetrics.getMsgErrOutOfSeqCounter(),
kafkaAdapterMetrics.getMsgErrDuplicateCounter(),
kafkaAdapterMetrics.getMsgErrLossCounter());
}
protected List<String> getEffectiveTopicNameList(long cycle) {
String explicitTopicListStr = topicNameStrFunc.apply(cycle);

View File

@@ -23,6 +23,7 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -34,6 +35,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.Set;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Optional;
import java.util.Collections;
import java.util.LinkedHashSet;
public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
@@ -49,13 +58,14 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
private final LongFunction<String> msgHeaderJsonStrFunc;
private final LongFunction<String> msgKeyStrFunc;
private final LongFunction<String> msgValueStrFunc;
protected final LongFunction<Boolean> seqTrackingFunc;
protected final LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> msgSeqErrSimuTypeSetFunc;
public MessageProducerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
KafkaSpace kafkaSpace) {
super(adapter, op, tgtNameFunc, kafkaSpace);
this.producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
@@ -64,6 +74,11 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
this.msgHeaderJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgKeyStrFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
this.msgValueStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
// Doc-level parameter: seq_tracking
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
}
private String getEffectiveClientId(long cycle) {
@@ -126,6 +141,8 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
asyncAPI,
transactionEnabled,
txnBatchNum,
seqTrackingFunc.apply(cycle),
msgSeqErrSimuTypeSetFunc.apply(cycle),
producer);
kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
@@ -208,4 +225,28 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
opTimeTrackKafkaProducer,
message);
}
protected LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
setStringLongFunction = (l) ->
parsedOp.getOptionalStaticValue(KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
.map(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
return set;
}).orElse(Collections.emptySet());
logger.info(
KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}",
setStringLongFunction.apply(0));
return setStringLongFunction;
}
}

View File

@@ -19,7 +19,7 @@ package io.nosqlbench.adapter.kafka.exception;
public class KafkaAdapterUnsupportedOpException extends RuntimeException {
public KafkaAdapterUnsupportedOpException(String pulsarOpType) {
super("Unsupported Pulsar adapter operation type: \"" + pulsarOpType + "\"");
public KafkaAdapterUnsupportedOpException(String kafkaOpType) {
super("Unsupported Kafka adapter operation type: \"" + kafkaOpType + "\"");
}
}

View File

@@ -22,6 +22,8 @@ import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
@@ -30,6 +32,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.function.Function;
public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final static Logger logger = LogManager.getLogger("OpTimeTrackKafkaConsumer");
@@ -44,6 +47,8 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
private final KafkaConsumer<String, String> consumer;
private Histogram e2eMsgProcLatencyHistogram;
private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
private final boolean seqTracking;
public OpTimeTrackKafkaConsumer(KafkaSpace kafkaSpace,
boolean asyncMsgCommit,
@@ -51,8 +56,10 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
boolean autoCommitEnabled,
int maxMsgCntPerCommit,
KafkaConsumer<String, String> consumer,
KafkaAdapterMetrics kafkaAdapterMetrics,
EndToEndStartingTimeSource e2eStartingTimeSrc,
KafkaAdapterMetrics kafkaAdapterMetrics) {
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic,
boolean seqTracking) {
super(kafkaSpace);
this.msgPoolIntervalInMs = msgPoolIntervalInMs;
this.asyncMsgCommit = asyncMsgCommit;
@@ -61,6 +68,8 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
this.consumer = consumer;
this.e2eStartingTimeSrc = e2eStartingTimeSrc;
this.e2eMsgProcLatencyHistogram = kafkaAdapterMetrics.getE2eMsgProcLatencyHistogram();
this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic;
this.seqTracking = seqTracking;
}
public int getManualCommitTrackingCnt() { return manualCommitTrackingCnt.get(); }
@@ -123,12 +132,14 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
for (ConsumerRecord<String, String> record : records) {
if (record != null) {
if (logger.isDebugEnabled()) {
Header msg_seq_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER);
logger.debug(
"Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({})",
"Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({}), e2e_seq_number({})",
printRecvedMsg(record),
record.offset(),
cycle,
System.currentTimeMillis() - record.timestamp());
System.currentTimeMillis() - record.timestamp(),
(msg_seq_header != null ? new String(msg_seq_header.value()) : "null"));
}
if (!autoCommitEnabled) {
@@ -136,7 +147,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
if (bCommitMsg) {
if (!asyncMsgCommit) {
consumer.commitSync();
updateE2ELatencyMetric(record);
checkAndUpdateMessageE2EMetrics(record);
if (logger.isDebugEnabled()) {
logger.debug(
"Sync message commit is successful: cycle ({}), maxMsgCntPerCommit ({})",
@@ -153,7 +164,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
"Async message commit succeeded: cycle({}), maxMsgCntPerCommit ({})",
cycle,
maxMsgCntPerCommit);
updateE2ELatencyMetric(record);
checkAndUpdateMessageE2EMetrics(record);
} else {
logger.debug(
"Async message commit failed: cycle ({}), maxMsgCntPerCommit ({}), error ({})",
@@ -168,16 +179,22 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
resetManualCommitTrackingCnt();
} else {
updateE2ELatencyMetric(record);
checkAndUpdateMessageE2EMetrics(record);
incManualCommitTrackingCnt();
}
}
updateE2ELatencyMetric(record);
checkAndUpdateMessageE2EMetrics(record);
}
}
}
}
private void checkAndUpdateMessageE2EMetrics(ConsumerRecord<String, String> record) {
// keep track of message errors and update error counters
if(seqTracking) checkAndUpdateMessageErrorCounter(record);
updateE2ELatencyMetric(record);
}
private void updateE2ELatencyMetric(ConsumerRecord<String, String> record) {
long startTimeStamp = 0L;
switch (e2eStartingTimeSrc) {
@@ -191,6 +208,19 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
}
}
private void checkAndUpdateMessageErrorCounter(ConsumerRecord<String, String> record) {
Header msg_seq_number_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER);
String msgSeqIdStr = msg_seq_number_header != null ? new String(msg_seq_number_header.value()) : StringUtils.EMPTY;
if (!StringUtils.isBlank(msgSeqIdStr)) {
long sequenceNumber = Long.parseLong(msgSeqIdStr);
ReceivedMessageSequenceTracker receivedMessageSequenceTracker =
receivedMessageSequenceTrackerForTopic.apply(record.topic());
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
} else {
logger.warn("Message sequence number header is null, skipping e2e message error metrics generation.");
}
}
@Override
public void close() {
try {

View File

@@ -20,6 +20,8 @@ package io.nosqlbench.adapter.kafka.ops;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.engine.api.metrics.MessageSequenceNumberSendingHandler;
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -31,6 +33,9 @@ import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.common.errors.TimeoutException;
@@ -45,6 +50,10 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
private final boolean asyncMsgAck;
private final boolean transactEnabledConfig;
private final int txnBatchNum;
private final ThreadLocal<Map<String, MessageSequenceNumberSendingHandler>> MessageSequenceNumberSendingHandlersThreadLocal =
ThreadLocal.withInitial(HashMap::new);
private final boolean seqTracking;
private final Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
enum TxnProcResult {
SUCCESS,
@@ -67,11 +76,15 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
boolean asyncMsgAck,
boolean transactEnabledConfig,
int txnBatchNum,
boolean seqTracking,
Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
KafkaProducer<String, String> producer) {
super(kafkaSpace);
this.asyncMsgAck = asyncMsgAck;
this.transactEnabledConfig = transactEnabledConfig;
this.txnBatchNum = txnBatchNum;
this.seqTracking = seqTracking;
this.errSimuTypeSet = errSimuTypeSet;
this.transactionEnabled = transactEnabledConfig && (txnBatchNum > 2);
this.producer = producer;
}
@@ -193,6 +206,11 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
}
ProducerRecord<String, String> message = (ProducerRecord<String, String>) cycleObj;
if (seqTracking) {
long nextSequenceNumber = getMessageSequenceNumberSendingHandler(message.topic())
.getNextSequenceNumber(errSimuTypeSet);
message.headers().add(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber).getBytes());
}
try {
if (result == TxnProcResult.SUCCESS) {
Future<RecordMetadata> responseFuture = producer.send(message, new Callback() {
@@ -261,4 +279,9 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
e.printStackTrace();
}
}
private MessageSequenceNumberSendingHandler getMessageSequenceNumberSendingHandler(String topicName) {
return MessageSequenceNumberSendingHandlersThreadLocal.get()
.computeIfAbsent(topicName, k -> new MessageSequenceNumberSendingHandler());
}
}

View File

@@ -103,4 +103,28 @@ public class KafkaAdapterMetrics implements NBNamedElement {
public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return executeTimer; }
public Histogram getMessagesizeHistogram() { return messageSizeHistogram; }
public Counter getMsgErrOutOfSeqCounter() {
return msgErrOutOfSeqCounter;
}
public void setMsgErrOutOfSeqCounter(Counter msgErrOutOfSeqCounter) {
this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
}
public Counter getMsgErrLossCounter() {
return msgErrLossCounter;
}
public void setMsgErrLossCounter(Counter msgErrLossCounter) {
this.msgErrLossCounter = msgErrLossCounter;
}
public Counter getMsgErrDuplicateCounter() {
return msgErrDuplicateCounter;
}
public void setMsgErrDuplicateCounter(Counter msgErrDuplicateCounter) {
this.msgErrDuplicateCounter = msgErrDuplicateCounter;
}
}

View File

@@ -31,7 +31,7 @@ import java.util.Map;
import java.util.stream.Collectors;
public class KafkaAdapterUtil {
public static final String MSG_SEQUENCE_NUMBER = "sequence_number";
private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class);
public static String DFT_CONSUMER_GROUP_NAME_PREFIX = "nbKafkaGrp";
@@ -42,7 +42,9 @@ public class KafkaAdapterUtil {
public enum DOC_LEVEL_PARAMS {
// Blocking message producing or consuming
ASYNC_API("async_api"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source");
SEQERR_SIMU("seqerr_simu"),
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"),
SEQ_TRACKING("seq_tracking");
public final String label;
DOC_LEVEL_PARAMS(String label) {

View File

@@ -49,7 +49,7 @@ public class KafkaClientConf {
public KafkaClientConf(String clientConfFileName) {
//////////////////
// Read related Pulsar client configuration settings from a file
// Read related Kafka client configuration settings from a file
readRawConfFromFile(clientConfFileName);

View File

@@ -1,10 +1,10 @@
# Overview
This NB Kafka driver allows publishing messages to or consuming messages from
This NB Kafka adapter allows publishing messages to or consuming messages from
* a Kafka cluster, or
* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar.
At high level, this driver supports the following Kafka functionalities
At high level, this adapter supports the following Kafka functionalities
* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers)
* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits)
* auto message commit
@@ -26,7 +26,7 @@ $ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=kafka_produ
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://localhost:9092
```
## NB Kafka driver specific CLI parameters
## NB Kafka adapter specific CLI parameters
* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from
* For producer workload, this is the number of the producer threads to publish messages to the same topic
@@ -39,3 +39,24 @@ $ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 y
* `num_cons_grp`: the number of consumer groups
* Only relevant for consumer workload
For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported:
* `async_api` (boolean):
* When true, use async Kafka client API.
* `seq_tracking` (boolean):
* When true, a sequence number is created as part of each message's properties
* This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully.
* `seqerr_simu`:
* A list of error simulation types separated by comma (,)
* Valid error simulation types
* `out_of_order`: simulate message out of sequence
* `msg_loss`: simulate message loss
* `msg_dup`: simulate message duplication
* This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments.
* `e2e_starting_time_source`:
* Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch.
* The possible values for `e2e_starting_time_source`:
* `message_publish_time` : uses the message publishing timestamp as the starting time. The message publishing time, in this case, [is computed by the Kafka client on record generation](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). This is the case, as [`CreateTime` is the default](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-type).

View File

@@ -0,0 +1,8 @@
#!/usr/local/bin/bash
: "${SKIP_TESTS:=1}"
(
cd "$(git rev-parse --show-toplevel)" && \
mvn clean install "-DskipTests" -pl adapters-api,adapter-kafka,nb5 && \
[[ ${SKIP_TESTS} -ne 1 ]] && \
mvn test -pl adapters-api,adapter-pulsar
)

View File

@@ -5,7 +5,8 @@
#--------------------------------------
topic.compression.type=uncompressed
topic.flush.messages=2
# this is likely unused as this file doesn't seem to be loaded
topic.log.message.timestamp.type=CreateTime
#####
# Producer related configurations (global) - topic.***

View File

@@ -1,10 +1,16 @@
# document level parameters that apply to all Pulsar client types:
# document level parameters that apply to all Kafka client types:
params:
# Whether to commit message asynchronously
# - default: true
# - only relevant for manual commit
# async_api: "true"
# activates e2e latency metrics
# - default: "none" (i.e. disabled)
e2e_starting_time_source: "message_publish_time"
# activates e2e error metrics (message duplication, message loss and out-of-order detection)
# it needs to be enabled both on the producer and the consumer
# - default: false
seq_tracking: "true"
blocks:
msg-consume-block:
@@ -12,7 +18,7 @@ blocks:
op1:
## The value represents the topic names
# - for consumer, a list of topics (separated by comma) are supported
MessageConsume: "nbktest1,nbktest2"
MessageConsume: "nbktest"
# The timeout value to poll messages (unit: milli-seconds)
# - default: 0

View File

@@ -9,6 +9,12 @@ params:
# whether to confirm message send ack. asynchronously
# - default: true
async_api: "true"
# activates e2e error metrics (message duplication, message loss and out-of-order detection)
# it needs to be enabled both on the producer and the consumer
# - default: false
seq_tracking: "true"
# test error injection, remove in production
seqerr_simu: 'out_of_order,msg_loss,msg_dup'
blocks:
msg-produce-block:
@@ -22,7 +28,7 @@ blocks:
# - default: 0
# - value 0 or 1 means no transaction
# - it also requires "transactional.id" parameter is set
txn_batch_num: 8
txn_batch_num: 1
## (Optional) Kafka message headers (in JSON format).
msg_header: |

View File

@@ -0,0 +1,20 @@
#!/usr/local/bin/bash
: "${REBUILD:=1}"
: "${CYCLES:=1000000000}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
if [[ ${REBUILD} -eq 1 ]]; then
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
fi
java -jar nb5/target/nb5.jar \
run \
driver=kafka \
-vv \
--report-interval 5 \
--docker-metrics \
cycles=${CYCLES} \
threads=1 \
num_clnt=1 \
num_cons_grp=1 \
yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \
config="${SCRIPT_DIR}/kafka_config.properties" \
bootstrap_server=PLAINTEXT://localhost:9092

View File

@@ -0,0 +1,22 @@
#!/usr/local/bin/bash
: "${REBUILD:=1}"
: "${CYCLES:=1000000000}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
if [[ ${REBUILD} -eq 1 ]]; then
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
fi
while [[ 1 -eq 1 ]]; do
java -jar nb5/target/nb5.jar \
run \
driver=kafka \
-vv \
--report-interval 5 \
--docker-metrics \
cycles="${CYCLES}" \
threads=1 \
num_clnt=1 \
yaml="${SCRIPT_DIR}/kafka_producer.yaml" \
config="${SCRIPT_DIR}/kafka_config.properties" \
bootstrap_server=PLAINTEXT://localhost:9092
sleep 10
done