diff --git a/adapter-kafka/pom.xml b/adapter-kafka/pom.xml
index 68124f4b6..22f77558e 100644
--- a/adapter-kafka/pom.xml
+++ b/adapter-kafka/pom.xml
@@ -76,13 +76,6 @@
commons-configuration2
2.8.0
-
-
- ${project.groupId}
- adapter-pulsar
- ${project.version}
- compile
-
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java
index 23051cae8..3cc62595f 100644
--- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageConsumerOpDispenser.java
@@ -23,7 +23,7 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
-import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
+import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java
index 13079125a..0a8170717 100644
--- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/dispensers/MessageProducerOpDispenser.java
@@ -22,8 +22,8 @@ import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
-import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
+import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -59,7 +59,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
private final LongFunction msgKeyStrFunc;
private final LongFunction msgValueStrFunc;
protected final LongFunction seqTrackingFunc;
- protected final LongFunction> msgSeqErrSimuTypeSetFunc;
+ protected final LongFunction> msgSeqErrSimuTypeSetFunc;
public MessageProducerOpDispenser(DriverAdapter adapter,
ParsedOp op,
@@ -78,7 +78,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
// Doc-level parameter: seq_tracking
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
- PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
+ KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
}
private String getEffectiveClientId(long cycle) {
@@ -226,17 +226,17 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
message);
}
- protected LongFunction> getStaticErrSimuTypeSetOpValueFunc() {
- LongFunction> setStringLongFunction;
+ protected LongFunction> getStaticErrSimuTypeSetOpValueFunc() {
+ LongFunction> setStringLongFunction;
setStringLongFunction = (l) ->
- parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
+ parsedOp.getOptionalStaticValue(KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
- Set set = new HashSet<>();
+ Set set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
- .map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
+ .map(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toCollection(LinkedHashSet::new));
@@ -245,7 +245,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
return set;
}).orElse(Collections.emptySet());
logger.info(
- PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}",
+ KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}",
setStringLongFunction.apply(0));
return setStringLongFunction;
}
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java
index 9492ff331..91bd54f40 100644
--- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaConsumer.java
@@ -22,8 +22,7 @@ import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
-import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
-import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
+import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
@@ -133,7 +132,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
for (ConsumerRecord record : records) {
if (record != null) {
if (logger.isDebugEnabled()) {
- Header msg_seq_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
+ Header msg_seq_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER);
logger.debug(
"Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({}), e2e_seq_number({})",
printRecvedMsg(record),
@@ -210,7 +209,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
}
private void checkAndUpdateMessageErrorCounter(ConsumerRecord record) {
- Header msg_seq_number_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
+ Header msg_seq_number_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER);
String msgSeqIdStr = msg_seq_number_header != null ? new String(msg_seq_number_header.value()) : StringUtils.EMPTY;
if (!StringUtils.isBlank(msgSeqIdStr)) {
long sequenceNumber = Long.parseLong(msgSeqIdStr);
diff --git a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java
index b3be470fb..c5e032427 100644
--- a/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java
+++ b/adapter-kafka/src/main/java/io/nosqlbench/adapter/kafka/ops/OpTimeTrackKafkaProducer.java
@@ -20,8 +20,8 @@ package io.nosqlbench.adapter.kafka.ops;
import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
-import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler;
-import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
+import io.nosqlbench.engine.api.metrics.MessageSequenceNumberSendingHandler;
+import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -53,7 +53,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
private ThreadLocal