mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Decouple e2e error metrics from adapter modules
Add classes implementing e2e error metrics to adapters-api module in `import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker`
This commit is contained in:
parent
c6e7829a2a
commit
b333898249
@ -76,13 +76,6 @@
|
|||||||
<artifactId>commons-configuration2</artifactId>
|
<artifactId>commons-configuration2</artifactId>
|
||||||
<version>2.8.0</version>
|
<version>2.8.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- This is needed in order to re-use the utilities tracking the end-to-end error metrics of the Pulsar adapter -->
|
|
||||||
<dependency>
|
|
||||||
<groupId>${project.groupId}</groupId>
|
|
||||||
<artifactId>adapter-pulsar</artifactId>
|
|
||||||
<version>${project.version}</version>
|
|
||||||
<scope>compile</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
@ -23,7 +23,7 @@ import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
|
|||||||
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
|
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
|
||||||
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
|
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
|
||||||
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
||||||
import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
|
import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
import org.apache.commons.lang3.BooleanUtils;
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
|
@ -22,8 +22,8 @@ import io.nosqlbench.adapter.kafka.ops.KafkaOp;
|
|||||||
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
|
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
|
||||||
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
|
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
|
||||||
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
@ -59,7 +59,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
|
|||||||
private final LongFunction<String> msgKeyStrFunc;
|
private final LongFunction<String> msgKeyStrFunc;
|
||||||
private final LongFunction<String> msgValueStrFunc;
|
private final LongFunction<String> msgValueStrFunc;
|
||||||
protected final LongFunction<Boolean> seqTrackingFunc;
|
protected final LongFunction<Boolean> seqTrackingFunc;
|
||||||
protected final LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> msgSeqErrSimuTypeSetFunc;
|
protected final LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> msgSeqErrSimuTypeSetFunc;
|
||||||
|
|
||||||
public MessageProducerOpDispenser(DriverAdapter adapter,
|
public MessageProducerOpDispenser(DriverAdapter adapter,
|
||||||
ParsedOp op,
|
ParsedOp op,
|
||||||
@ -78,7 +78,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
|
|||||||
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
|
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
|
||||||
// Doc-level parameter: seq_tracking
|
// Doc-level parameter: seq_tracking
|
||||||
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
|
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
|
||||||
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
|
KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getEffectiveClientId(long cycle) {
|
private String getEffectiveClientId(long cycle) {
|
||||||
@ -226,17 +226,17 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
|
|||||||
message);
|
message);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
|
protected LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
|
||||||
LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
|
LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
|
||||||
setStringLongFunction = (l) ->
|
setStringLongFunction = (l) ->
|
||||||
parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
|
parsedOp.getOptionalStaticValue(KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
|
||||||
.filter(Predicate.not(String::isEmpty))
|
.filter(Predicate.not(String::isEmpty))
|
||||||
.map(value -> {
|
.map(value -> {
|
||||||
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
|
Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
|
||||||
|
|
||||||
if (StringUtils.contains(value,',')) {
|
if (StringUtils.contains(value,',')) {
|
||||||
set = Arrays.stream(value.split(","))
|
set = Arrays.stream(value.split(","))
|
||||||
.map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
|
.map(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
|
||||||
.filter(Optional::isPresent)
|
.filter(Optional::isPresent)
|
||||||
.map(Optional::get)
|
.map(Optional::get)
|
||||||
.collect(Collectors.toCollection(LinkedHashSet::new));
|
.collect(Collectors.toCollection(LinkedHashSet::new));
|
||||||
@ -245,7 +245,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
|
|||||||
return set;
|
return set;
|
||||||
}).orElse(Collections.emptySet());
|
}).orElse(Collections.emptySet());
|
||||||
logger.info(
|
logger.info(
|
||||||
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}",
|
KafkaAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label + ": {}",
|
||||||
setStringLongFunction.apply(0));
|
setStringLongFunction.apply(0));
|
||||||
return setStringLongFunction;
|
return setStringLongFunction;
|
||||||
}
|
}
|
||||||
|
@ -22,8 +22,7 @@ import io.nosqlbench.adapter.kafka.KafkaSpace;
|
|||||||
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
|
import io.nosqlbench.adapter.kafka.util.EndToEndStartingTimeSource;
|
||||||
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
|
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
|
||||||
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
|
||||||
import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.kafka.clients.consumer.*;
|
import org.apache.kafka.clients.consumer.*;
|
||||||
import org.apache.kafka.common.TopicPartition;
|
import org.apache.kafka.common.TopicPartition;
|
||||||
@ -133,7 +132,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
|
|||||||
for (ConsumerRecord<String, String> record : records) {
|
for (ConsumerRecord<String, String> record : records) {
|
||||||
if (record != null) {
|
if (record != null) {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
Header msg_seq_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
|
Header msg_seq_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER);
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({}), e2e_seq_number({})",
|
"Receiving message is successful: [{}] - offset({}), cycle ({}), e2e_latency_ms({}), e2e_seq_number({})",
|
||||||
printRecvedMsg(record),
|
printRecvedMsg(record),
|
||||||
@ -210,7 +209,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void checkAndUpdateMessageErrorCounter(ConsumerRecord<String, String> record) {
|
private void checkAndUpdateMessageErrorCounter(ConsumerRecord<String, String> record) {
|
||||||
Header msg_seq_number_header = record.headers().lastHeader(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
|
Header msg_seq_number_header = record.headers().lastHeader(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER);
|
||||||
String msgSeqIdStr = msg_seq_number_header != null ? new String(msg_seq_number_header.value()) : StringUtils.EMPTY;
|
String msgSeqIdStr = msg_seq_number_header != null ? new String(msg_seq_number_header.value()) : StringUtils.EMPTY;
|
||||||
if (!StringUtils.isBlank(msgSeqIdStr)) {
|
if (!StringUtils.isBlank(msgSeqIdStr)) {
|
||||||
long sequenceNumber = Long.parseLong(msgSeqIdStr);
|
long sequenceNumber = Long.parseLong(msgSeqIdStr);
|
||||||
|
@ -20,8 +20,8 @@ package io.nosqlbench.adapter.kafka.ops;
|
|||||||
import io.nosqlbench.adapter.kafka.KafkaSpace;
|
import io.nosqlbench.adapter.kafka.KafkaSpace;
|
||||||
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
|
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
|
||||||
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
|
||||||
import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler;
|
import io.nosqlbench.engine.api.metrics.MessageSequenceNumberSendingHandler;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
|
||||||
import org.apache.kafka.clients.producer.Callback;
|
import org.apache.kafka.clients.producer.Callback;
|
||||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||||
@ -53,7 +53,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
|
|||||||
private ThreadLocal<Map<String, MessageSequenceNumberSendingHandler>> MessageSequenceNumberSendingHandlersThreadLocal =
|
private ThreadLocal<Map<String, MessageSequenceNumberSendingHandler>> MessageSequenceNumberSendingHandlersThreadLocal =
|
||||||
ThreadLocal.withInitial(HashMap::new);
|
ThreadLocal.withInitial(HashMap::new);
|
||||||
private final boolean seqTracking;
|
private final boolean seqTracking;
|
||||||
private final Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
|
private final Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
|
||||||
|
|
||||||
enum TxnProcResult {
|
enum TxnProcResult {
|
||||||
SUCCESS,
|
SUCCESS,
|
||||||
@ -77,7 +77,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
|
|||||||
boolean transactEnabledConfig,
|
boolean transactEnabledConfig,
|
||||||
int txnBatchNum,
|
int txnBatchNum,
|
||||||
boolean seqTracking,
|
boolean seqTracking,
|
||||||
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
|
Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
|
||||||
KafkaProducer<String, String> producer) {
|
KafkaProducer<String, String> producer) {
|
||||||
super(kafkaSpace);
|
super(kafkaSpace);
|
||||||
this.asyncMsgAck = asyncMsgAck;
|
this.asyncMsgAck = asyncMsgAck;
|
||||||
@ -209,7 +209,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
|
|||||||
if (seqTracking) {
|
if (seqTracking) {
|
||||||
long nextSequenceNumber = getMessageSequenceNumberSendingHandler(message.topic())
|
long nextSequenceNumber = getMessageSequenceNumberSendingHandler(message.topic())
|
||||||
.getNextSequenceNumber(errSimuTypeSet);
|
.getNextSequenceNumber(errSimuTypeSet);
|
||||||
message.headers().add(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber).getBytes());
|
message.headers().add(KafkaAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber).getBytes());
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (result == TxnProcResult.SUCCESS) {
|
if (result == TxnProcResult.SUCCESS) {
|
||||||
|
@ -31,7 +31,7 @@ import java.util.Map;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class KafkaAdapterUtil {
|
public class KafkaAdapterUtil {
|
||||||
|
public static final String MSG_SEQUENCE_NUMBER = "sequence_number";
|
||||||
private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class);
|
private final static Logger logger = LogManager.getLogger(KafkaAdapterUtil.class);
|
||||||
|
|
||||||
public static String DFT_CONSUMER_GROUP_NAME_PREFIX = "nbKafkaGrp";
|
public static String DFT_CONSUMER_GROUP_NAME_PREFIX = "nbKafkaGrp";
|
||||||
@ -42,6 +42,7 @@ public class KafkaAdapterUtil {
|
|||||||
public enum DOC_LEVEL_PARAMS {
|
public enum DOC_LEVEL_PARAMS {
|
||||||
// Blocking message producing or consuming
|
// Blocking message producing or consuming
|
||||||
ASYNC_API("async_api"),
|
ASYNC_API("async_api"),
|
||||||
|
SEQERR_SIMU("seqerr_simu"),
|
||||||
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"),
|
E2E_STARTING_TIME_SOURCE("e2e_starting_time_source"),
|
||||||
SEQ_TRACKING("seq_tracking");
|
SEQ_TRACKING("seq_tracking");
|
||||||
public final String label;
|
public final String label;
|
||||||
|
@ -20,7 +20,7 @@ import io.nosqlbench.adapter.pulsar.PulsarSpace;
|
|||||||
import io.nosqlbench.adapter.pulsar.ops.MessageConsumerOp;
|
import io.nosqlbench.adapter.pulsar.ops.MessageConsumerOp;
|
||||||
import io.nosqlbench.adapter.pulsar.util.EndToEndStartingTimeSource;
|
import io.nosqlbench.adapter.pulsar.util.EndToEndStartingTimeSource;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
||||||
import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
|
import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
@ -20,6 +20,7 @@ import com.codahale.metrics.Timer;
|
|||||||
import io.nosqlbench.adapter.pulsar.PulsarSpace;
|
import io.nosqlbench.adapter.pulsar.PulsarSpace;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
|
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
@ -49,7 +50,7 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
|
|||||||
protected final LongFunction<Boolean> seqTrackingFunc;
|
protected final LongFunction<Boolean> seqTrackingFunc;
|
||||||
protected final LongFunction<String> payloadRttFieldFunc;
|
protected final LongFunction<String> payloadRttFieldFunc;
|
||||||
protected final LongFunction<Supplier<Transaction>> transactSupplierFunc;
|
protected final LongFunction<Supplier<Transaction>> transactSupplierFunc;
|
||||||
protected final LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> msgSeqErrSimuTypeSetFunc;
|
protected final LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> msgSeqErrSimuTypeSetFunc;
|
||||||
|
|
||||||
public PulsarClientOpDispenser(DriverAdapter adapter,
|
public PulsarClientOpDispenser(DriverAdapter adapter,
|
||||||
ParsedOp op,
|
ParsedOp op,
|
||||||
@ -101,17 +102,17 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
protected LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
|
protected LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
|
||||||
LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
|
LongFunction<Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
|
||||||
setStringLongFunction = (l) ->
|
setStringLongFunction = (l) ->
|
||||||
parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
|
parsedOp.getOptionalStaticValue(PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQERR_SIMU.label, String.class)
|
||||||
.filter(Predicate.not(String::isEmpty))
|
.filter(Predicate.not(String::isEmpty))
|
||||||
.map(value -> {
|
.map(value -> {
|
||||||
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
|
Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
|
||||||
|
|
||||||
if (StringUtils.contains(value,',')) {
|
if (StringUtils.contains(value,',')) {
|
||||||
set = Arrays.stream(value.split(","))
|
set = Arrays.stream(value.split(","))
|
||||||
.map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
|
.map(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
|
||||||
.filter(Optional::isPresent)
|
.filter(Optional::isPresent)
|
||||||
.map(Optional::get)
|
.map(Optional::get)
|
||||||
.collect(Collectors.toCollection(LinkedHashSet::new));
|
.collect(Collectors.toCollection(LinkedHashSet::new));
|
||||||
|
@ -20,6 +20,7 @@ import com.codahale.metrics.Timer;
|
|||||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException;
|
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException;
|
||||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
||||||
import io.nosqlbench.adapter.pulsar.util.*;
|
import io.nosqlbench.adapter.pulsar.util.*;
|
||||||
|
import io.nosqlbench.engine.api.metrics.ReceivedMessageSequenceTracker;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
@ -19,10 +19,11 @@ package io.nosqlbench.adapter.pulsar.ops;
|
|||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException;
|
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException;
|
||||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
||||||
import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler;
|
import io.nosqlbench.engine.api.metrics.MessageSequenceNumberSendingHandler;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
|
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAvroSchemaUtil;
|
import io.nosqlbench.adapter.pulsar.util.PulsarAvroSchemaUtil;
|
||||||
|
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
@ -49,7 +50,7 @@ public class MessageProducerOp extends PulsarClientOp {
|
|||||||
private final boolean useTransact;
|
private final boolean useTransact;
|
||||||
private final boolean seqTracking;
|
private final boolean seqTracking;
|
||||||
private final Supplier<Transaction> transactSupplier;
|
private final Supplier<Transaction> transactSupplier;
|
||||||
private final Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
|
private final Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
|
||||||
private final Producer<?> producer;
|
private final Producer<?> producer;
|
||||||
private final String msgKey;
|
private final String msgKey;
|
||||||
private final String msgPropRawJsonStr;
|
private final String msgPropRawJsonStr;
|
||||||
@ -66,7 +67,7 @@ public class MessageProducerOp extends PulsarClientOp {
|
|||||||
boolean useTransact,
|
boolean useTransact,
|
||||||
boolean seqTracking,
|
boolean seqTracking,
|
||||||
Supplier<Transaction> transactSupplier,
|
Supplier<Transaction> transactSupplier,
|
||||||
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
|
Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
|
||||||
Producer<?> producer,
|
Producer<?> producer,
|
||||||
String msgKey,
|
String msgKey,
|
||||||
String msgProp,
|
String msgProp,
|
||||||
|
@ -33,9 +33,7 @@ import java.nio.charset.StandardCharsets;
|
|||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
import java.util.Base64;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -68,35 +66,6 @@ public class PulsarAdapterUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
///////
|
|
||||||
// Message processing sequence error simulation types
|
|
||||||
public enum MSG_SEQ_ERROR_SIMU_TYPE {
|
|
||||||
OutOfOrder("out_of_order"),
|
|
||||||
MsgLoss("msg_loss"),
|
|
||||||
MsgDup("msg_dup");
|
|
||||||
|
|
||||||
public final String label;
|
|
||||||
|
|
||||||
MSG_SEQ_ERROR_SIMU_TYPE(String label) {
|
|
||||||
this.label = label;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = Stream.of(values())
|
|
||||||
.flatMap(simuType ->
|
|
||||||
Stream.of(simuType.label,
|
|
||||||
simuType.label.toLowerCase(),
|
|
||||||
simuType.label.toUpperCase(),
|
|
||||||
simuType.name(),
|
|
||||||
simuType.name().toLowerCase(),
|
|
||||||
simuType.name().toUpperCase())
|
|
||||||
.distinct().map(key -> Map.entry(key, simuType)))
|
|
||||||
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
||||||
|
|
||||||
public static Optional<MSG_SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
|
|
||||||
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
///////
|
///////
|
||||||
// Valid Pulsar API type
|
// Valid Pulsar API type
|
||||||
public enum PULSAR_API_TYPE {
|
public enum PULSAR_API_TYPE {
|
||||||
|
@ -0,0 +1,37 @@
|
|||||||
|
package io.nosqlbench.engine.api.metrics;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class EndToEndMetricsAdapterUtil {
|
||||||
|
///////
|
||||||
|
// Message processing sequence error simulation types
|
||||||
|
public enum MSG_SEQ_ERROR_SIMU_TYPE {
|
||||||
|
OutOfOrder("out_of_order"),
|
||||||
|
MsgLoss("msg_loss"),
|
||||||
|
MsgDup("msg_dup");
|
||||||
|
|
||||||
|
public final String label;
|
||||||
|
|
||||||
|
MSG_SEQ_ERROR_SIMU_TYPE(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = Stream.of(values())
|
||||||
|
.flatMap(simuType ->
|
||||||
|
Stream.of(simuType.label,
|
||||||
|
simuType.label.toLowerCase(),
|
||||||
|
simuType.label.toUpperCase(),
|
||||||
|
simuType.name(),
|
||||||
|
simuType.name().toLowerCase(),
|
||||||
|
simuType.name().toUpperCase())
|
||||||
|
.distinct().map(key -> Map.entry(key, simuType)))
|
||||||
|
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||||
|
|
||||||
|
public static Optional<MSG_SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
|
||||||
|
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package io.nosqlbench.adapter.pulsar.util;
|
package io.nosqlbench.engine.api.metrics;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2022 nosqlbench
|
* Copyright (c) 2022 nosqlbench
|
||||||
@ -33,16 +33,16 @@ public class MessageSequenceNumberSendingHandler {
|
|||||||
long number = 1;
|
long number = 1;
|
||||||
Queue<Long> outOfOrderNumbers;
|
Queue<Long> outOfOrderNumbers;
|
||||||
|
|
||||||
public long getNextSequenceNumber(Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes) {
|
public long getNextSequenceNumber(Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes) {
|
||||||
return getNextSequenceNumber(simulatedErrorTypes, SIMULATED_ERROR_PROBABILITY_PERCENTAGE);
|
return getNextSequenceNumber(simulatedErrorTypes, SIMULATED_ERROR_PROBABILITY_PERCENTAGE);
|
||||||
}
|
}
|
||||||
|
|
||||||
long getNextSequenceNumber(Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
|
long getNextSequenceNumber(Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
|
||||||
simulateError(simulatedErrorTypes, errorProbabilityPercentage);
|
simulateError(simulatedErrorTypes, errorProbabilityPercentage);
|
||||||
return nextNumber();
|
return nextNumber();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void simulateError(Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
|
private void simulateError(Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
|
||||||
if (!simulatedErrorTypes.isEmpty() && shouldSimulateError(errorProbabilityPercentage)) {
|
if (!simulatedErrorTypes.isEmpty() && shouldSimulateError(errorProbabilityPercentage)) {
|
||||||
int selectIndex = 0;
|
int selectIndex = 0;
|
||||||
int numberOfErrorTypes = simulatedErrorTypes.size();
|
int numberOfErrorTypes = simulatedErrorTypes.size();
|
||||||
@ -50,7 +50,7 @@ public class MessageSequenceNumberSendingHandler {
|
|||||||
// pick one of the simulated error type randomly
|
// pick one of the simulated error type randomly
|
||||||
selectIndex = RandomUtils.nextInt(0, numberOfErrorTypes);
|
selectIndex = RandomUtils.nextInt(0, numberOfErrorTypes);
|
||||||
}
|
}
|
||||||
PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream()
|
EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream()
|
||||||
.skip(selectIndex)
|
.skip(selectIndex)
|
||||||
.findFirst()
|
.findFirst()
|
||||||
.get();
|
.get();
|
@ -1,4 +1,4 @@
|
|||||||
package io.nosqlbench.adapter.pulsar.util;
|
package io.nosqlbench.engine.api.metrics;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2022 nosqlbench
|
* Copyright (c) 2022 nosqlbench
|
@ -14,7 +14,7 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package io.nosqlbench.adapter.pulsar.util;
|
package io.nosqlbench.engine.api.metrics;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
@ -39,19 +39,19 @@ class MessageSequenceNumberSendingHandlerTest {
|
|||||||
@Test
|
@Test
|
||||||
void shouldInjectMessageLoss() {
|
void shouldInjectMessageLoss() {
|
||||||
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
||||||
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgLoss), 100));
|
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgLoss), 100));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldInjectMessageDuplication() {
|
void shouldInjectMessageDuplication() {
|
||||||
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
||||||
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgDup), 100));
|
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.MsgDup), 100));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldInjectMessageOutOfOrder() {
|
void shouldInjectMessageOutOfOrder() {
|
||||||
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
||||||
assertEquals(4L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.OutOfOrder), 100));
|
assertEquals(4L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.singleton(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.OutOfOrder), 100));
|
||||||
assertEquals(2L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
assertEquals(2L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
||||||
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
assertEquals(3L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
||||||
assertEquals(5L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
assertEquals(5L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
||||||
@ -60,7 +60,7 @@ class MessageSequenceNumberSendingHandlerTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
void shouldInjectOneOfTheSimulatedErrorsRandomly() {
|
void shouldInjectOneOfTheSimulatedErrorsRandomly() {
|
||||||
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> allErrorTypes = new HashSet<>(Arrays.asList(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.values()));
|
Set<EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> allErrorTypes = new HashSet<>(Arrays.asList(EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE.values()));
|
||||||
|
|
||||||
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
assertEquals(1L, sequenceNumberSendingHandler.getNextSequenceNumber(Collections.emptySet()));
|
||||||
long previousSequenceNumber = 1L;
|
long previousSequenceNumber = 1L;
|
@ -15,7 +15,7 @@
|
|||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package io.nosqlbench.adapter.pulsar.util;
|
package io.nosqlbench.engine.api.metrics;
|
||||||
|
|
||||||
import com.codahale.metrics.Counter;
|
import com.codahale.metrics.Counter;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
Loading…
Reference in New Issue
Block a user