Completed NB4 Pulsar driver code migration to NB5. But there are still a few lingering issues due to the difference of how NB4 vs NB5 driver/adapter interacts with the NB engine.

This commit is contained in:
yabinmeng 2022-11-16 15:37:26 -06:00
parent ca6f2b052b
commit 0c71696b15
28 changed files with 1435 additions and 118 deletions

View File

@ -83,7 +83,11 @@ public class PulsarOpMapper implements OpMapper<PulsarOp> {
new MessageProducerOpDispenser(adapter, op, opType.targetFunction, pulsarSpace);
case MessageConsume ->
new MessageConsumerOpDispenser(adapter, op, opType.targetFunction, pulsarSpace);
case MessageRead ->
//////////////////////////
// NOTE: not sure how useful to have Pulsar message reader API in the NB performance testing
// currently, the reader API in NB Pulsar driver is no-op (see TDOD in MessageReaderOp)
//////////////////////////
case MessageRead ->
new MessageReaderOpDispenser(adapter, op, opType.targetFunction, pulsarSpace);
};
}

View File

@ -26,6 +26,7 @@ public enum PulsarOpType {
AdminNamespace("admin-namespace"),
AdminTopic("admin-topic"),
MessageProduce("msg-send"),
// This also supports multi-topic message consumption
MessageConsume("msg-consume"),
MessageRead("msg-read");

View File

@ -34,10 +34,7 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
public class PulsarSpace {

View File

@ -20,12 +20,16 @@ import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.ops.AdminNamespaceOp;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import java.util.function.LongFunction;
public class AdminNamespaceOpDispenser extends PulsarAdminOpDispenser {
private final static Logger logger = LogManager.getLogger("AdminNamespaceOpDispenser");
public AdminNamespaceOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
@ -36,6 +40,7 @@ public class AdminNamespaceOpDispenser extends PulsarAdminOpDispenser {
@Override
public AdminNamespaceOp apply(long cycle) {
return new AdminNamespaceOp(
pulsarAdapterMetrics,
pulsarAdmin,
asyncApiFunc.apply(cycle),
adminDelOpFunc.apply(cycle),

View File

@ -20,6 +20,8 @@ import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.ops.AdminTenantOp;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import java.util.*;
@ -27,6 +29,8 @@ import java.util.function.LongFunction;
public class AdminTenantOpDispenser extends PulsarAdminOpDispenser {
private final static Logger logger = LogManager.getLogger("AdminTenantOpDispenser");
private final LongFunction<Set<String>> adminRolesFunc;
private final LongFunction<Set<String>> allowedClustersFunc;
public AdminTenantOpDispenser(DriverAdapter adapter,
@ -42,6 +46,7 @@ public class AdminTenantOpDispenser extends PulsarAdminOpDispenser {
@Override
public AdminTenantOp apply(long cycle) {
return new AdminTenantOp(
pulsarAdapterMetrics,
pulsarAdmin,
asyncApiFunc.apply(cycle),
adminDelOpFunc.apply(cycle),

View File

@ -20,12 +20,16 @@ import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.ops.AdminTopicOp;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import java.util.function.LongFunction;
public class AdminTopicOpDispenser extends PulsarAdminOpDispenser {
private final static Logger logger = LogManager.getLogger("AdminTopicOpDispenser");
private final LongFunction<Boolean> enablePartFunc;
private final LongFunction<Integer> partNumFunc;
@ -44,6 +48,7 @@ public class AdminTopicOpDispenser extends PulsarAdminOpDispenser {
public AdminTopicOp apply(long cycle) {
return new AdminTopicOp(
pulsarAdapterMetrics,
pulsarAdmin,
asyncApiFunc.apply(cycle),
adminDelOpFunc.apply(cycle),

View File

@ -18,24 +18,88 @@ package io.nosqlbench.adapter.pulsar.dispensers;
import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.ops.MessageConsumerOp;
import io.nosqlbench.adapter.pulsar.util.EndToEndStartingTimeSource;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.ReceivedMessageSequenceTracker;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
public class MessageConsumerOpDispenser extends PulsarClientOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageConsumerOpDispenser");
public static final String TOPIC_PATTERN_OP_PARAM = "topic_pattern";
public static final String SUBSCRIPTION_NAME_OP_PARAM = "subscription_name";
public static final String SUBSCRIPTION_TYPE_OP_PARAM = "subscription_type";
public static final String CONSUMER_NAME_OP_PARAM = "consumer_name";
public static final String RANGES_OP_PARAM = "ranges";
private final LongFunction<String> topicPatternFunc;
private final LongFunction<String> subscriptionNameFunc;
private final LongFunction<String> subscriptionTypeFunc;
private final LongFunction<String> cycleConsumerNameFunc;
private final LongFunction<String> rangesFunc;
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
private final LongFunction<Consumer> consumerFunction;
private final ThreadLocal<Map<String, ReceivedMessageSequenceTracker>> receivedMessageSequenceTrackersForTopicThreadLocal =
ThreadLocal.withInitial(HashMap::new);
public MessageConsumerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.topicPatternFunc = lookupOptionalStrOpValueFunc(TOPIC_PATTERN_OP_PARAM);
this.subscriptionNameFunc = lookupMandtoryStrOpValueFunc(SUBSCRIPTION_NAME_OP_PARAM);
this.subscriptionTypeFunc = lookupOptionalStrOpValueFunc(SUBSCRIPTION_TYPE_OP_PARAM);
this.cycleConsumerNameFunc = lookupOptionalStrOpValueFunc(CONSUMER_NAME_OP_PARAM);
this.rangesFunc = lookupOptionalStrOpValueFunc(RANGES_OP_PARAM);
this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none");
this.consumerFunction = (l) -> getConsumer(
tgtNameFunc.apply(l),
topicPatternFunc.apply(l),
subscriptionNameFunc.apply(l),
subscriptionTypeFunc.apply(l),
cycleConsumerNameFunc.apply(l),
rangesFunc.apply(l));
}
@Override
public MessageConsumerOp apply(long cycle) {
return new MessageConsumerOp(pulsarClient, pulsarSchema);
return new MessageConsumerOp(
pulsarAdapterMetrics,
pulsarClient,
pulsarSchema,
asyncApiFunc.apply(cycle),
useTransactFunc.apply(cycle),
seqTrackingFunc.apply(cycle),
transactSupplierFunc.apply(cycle),
payloadRttFieldFunc.apply(cycle),
EndToEndStartingTimeSource.valueOf(e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
this::getReceivedMessageSequenceTracker,
consumerFunction.apply(cycle),
pulsarSpace.getPulsarNBClientConf().getConsumerTimeoutSeconds()
);
}
private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(String topicName) {
return receivedMessageSequenceTrackersForTopicThreadLocal.get()
.computeIfAbsent(topicName, k -> createReceivedMessageSequenceTracker());
}
private ReceivedMessageSequenceTracker createReceivedMessageSequenceTracker() {
return new ReceivedMessageSequenceTracker(pulsarAdapterMetrics.getMsgErrOutOfSeqCounter(),
pulsarAdapterMetrics.getMsgErrDuplicateCounter(),
pulsarAdapterMetrics.getMsgErrLossCounter());
}
}

View File

@ -20,22 +20,56 @@ import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.ops.MessageProducerOp;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import java.util.Optional;
import java.util.function.LongFunction;
public class MessageProducerOpDispenser extends PulsarClientOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
public static final String PRODUCER_NAME_OP_PARAM = "producer_name";
public static final String MSG_KEY_OP_PARAM = "msg_key";
public static final String MSG_PROP_OP_PARAM = "msg_prop";
public static final String MSG_VALUE_OP_PARAM = "msg_value";
private final LongFunction<String> cycleProducerNameFunc;
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<String> msgKeyFunc;
private final LongFunction<String> msgPropFunc;
private final LongFunction<String> msgValueFunc;
public MessageProducerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.cycleProducerNameFunc = lookupOptionalStrOpValueFunc(PRODUCER_NAME_OP_PARAM);
this.producerFunc = (l) -> getProducer(tgtNameFunc.apply(l), cycleProducerNameFunc.apply(l));
this.msgKeyFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
this.msgPropFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
this.msgValueFunc = lookupMandtoryStrOpValueFunc(MSG_VALUE_OP_PARAM);
}
@Override
public MessageProducerOp apply(long cycle) {
return new MessageProducerOp(pulsarClient, pulsarSchema);
return new MessageProducerOp(
pulsarAdapterMetrics,
pulsarClient,
pulsarSchema,
asyncApiFunc.apply(cycle),
useTransactFunc.apply(cycle),
seqTrackingFunc.apply(cycle),
transactSupplierFunc.apply(cycle),
errSimuTypeSetFunc.apply(cycle),
producerFunc.apply(cycle),
msgKeyFunc.apply(cycle),
msgPropFunc.apply(cycle),
msgValueFunc.apply(cycle)
);
}
}

View File

@ -18,24 +18,48 @@ package io.nosqlbench.adapter.pulsar.dispensers;
import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.ops.MessageReaderOp;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import java.util.function.LongFunction;
public class MessageReaderOpDispenser extends PulsarClientOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageReaderOpDispenser");
private final LongFunction<String> cycleReaderNameFunc;
private final LongFunction<String> msgStartPosStrFunc;
private final LongFunction<Reader> readerFunc;
public MessageReaderOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.cycleReaderNameFunc = lookupMandtoryStrOpValueFunc("reader_name");
this.msgStartPosStrFunc = lookupOptionalStrOpValueFunc(
"start_msg_position", PulsarAdapterUtil.READER_MSG_POSITION_TYPE.earliest.label);
this.readerFunc = (l) -> getReader(
tgtNameFunc.apply(l),
cycleReaderNameFunc.apply(l),
msgStartPosStrFunc.apply(l));
}
@Override
public MessageReaderOp apply(long cycle) {
return new MessageReaderOp(pulsarClient, pulsarSchema);
return new MessageReaderOp(
pulsarAdapterMetrics,
pulsarClient,
pulsarSchema,
asyncApiFunc.apply(cycle),
readerFunc.apply(cycle));
}
}

View File

@ -20,12 +20,17 @@ import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import java.util.function.LongFunction;
public abstract class PulsarAdminOpDispenser extends PulsarBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("PulsarAdminOpDispenser");
protected final PulsarAdmin pulsarAdmin;
protected final LongFunction<Boolean> adminDelOpFunc;
@ -34,6 +39,7 @@ public abstract class PulsarAdminOpDispenser extends PulsarBaseOpDispenser {
LongFunction<String> tgtNameFunc,
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.pulsarAdmin = pulsarSpace.getPulsarAdmin();
// Doc-level parameter: admin_delop

View File

@ -22,6 +22,10 @@ import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
@ -36,21 +40,29 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, PulsarSpace> {
public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, PulsarSpace> implements NBNamedElement {
private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
protected final ParsedOp parsedOp;
protected final LongFunction<Boolean> asyncApiFunc;
protected final LongFunction<String> tgtNameFunc;
protected final PulsarSpace pulsarSpace;
protected final PulsarAdapterMetrics pulsarAdapterMetrics;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
protected final PulsarAdapterMetrics pulsarAdapterMetrics;
protected final LongFunction<Boolean> asyncApiFunc;
protected final LongFunction<String> tgtNameFunc;
protected final int totalThreadNum;
protected final long totalCycleNum;
protected RateLimiter per_thread_cyclelimiter;
public PulsarBaseOpDispenser(DriverAdapter adapter,
ParsedOp op,
@ -67,12 +79,28 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
this.asyncApiFunc = lookupStaticBoolConfigValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, true);
this.pulsarAdapterMetrics = new PulsarAdapterMetrics(pulsarSpace, getDefaultMetricsPrefix(this.parsedOp));
String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp);
this.pulsarAdapterMetrics = new PulsarAdapterMetrics(this, defaultMetricsPrefix);
if (instrument) {
pulsarAdapterMetrics.initPulsarAdapterInstrumentation();
}
totalThreadNum = NumberUtils.toInt(parsedOp.getStaticValue("threads"));
totalCycleNum = NumberUtils.toLong(parsedOp.getStaticValue("cycles"));
this.parsedOp.getOptionalStaticConfig("per_thread_cyclerate", String.class)
.map(RateSpec::new)
.ifPresent(spec -> per_thread_cyclelimiter =
RateLimiters.createOrUpdate(this, "cycles", per_thread_cyclelimiter, spec));
}
@Override
public String getName() {
return "PulsarBaseOpDispenser";
}
public PulsarSpace getPulsarSpace() { return pulsarSpace; }
protected LongFunction<Boolean> lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) {
LongFunction<Boolean> booleanLongFunction;
booleanLongFunction = (l) -> parsedOp.getOptionalStaticConfig(paramName, String.class)
@ -83,19 +111,6 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return booleanLongFunction;
}
protected LongFunction<Integer> lookupStaticIntOpValueFunc(String paramName, int defaultValue) {
LongFunction<Integer> integerLongFunction;
integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> NumberUtils.toInt(value))
.map(value -> {
if (value < 0) return 0;
else return value;
}).orElse(defaultValue);
logger.info("{}: {}", paramName, integerLongFunction.apply(0));
return integerLongFunction;
}
protected LongFunction<Set<String>> lookupStaticStrSetOpValueFunc(String paramName) {
LongFunction<Set<String>> setStringLongFunction;
setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
@ -116,6 +131,42 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return setStringLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<Integer> lookupStaticIntOpValueFunc(String paramName, int defaultValue) {
LongFunction<Integer> integerLongFunction;
integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> NumberUtils.toInt(value))
.map(value -> {
if (value < 0) return 0;
else return value;
}).orElse(defaultValue);
logger.info("{}: {}", paramName, integerLongFunction.apply(0));
return integerLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) {
LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class)
.orElse((l) -> defaultValue);
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction;
}
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName) {
return lookupOptionalStrOpValueFunc(paramName, "");
}
// Mandatory Op parameter. Throw an error if not specified or having empty value
protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) {
LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class);
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction;
}
/**
* Get a proper Pulsar API metrics prefix depending on the API type
*
@ -154,6 +205,8 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
.replace("persistent://", "")
// persistent://tenant/namespace/topicname -> tenant_namespace_topicname
.replace("/", "_");
apiMetricsPrefix += "--";
}
return apiMetricsPrefix;
@ -257,6 +310,61 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
//////////////////////////////////////
//
private String getEffectiveConsumerTopicNameListStr(String cycleTopicNameListStr) {
if (!StringUtils.isBlank(cycleTopicNameListStr)) {
return cycleTopicNameListStr;
}
String globalTopicNames = pulsarSpace.getPulsarNBClientConf().getConsumerTopicNames();
if (!StringUtils.isBlank(globalTopicNames)) {
return globalTopicNames;
}
return "";
}
private List<String> getEffectiveConsumerTopicNameList(String cycleTopicNameListStr) {
String effectiveTopicNamesStr = getEffectiveConsumerTopicNameListStr(cycleTopicNameListStr);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if (!StringUtils.isBlank(name))
effectiveTopicNameList.add(name.trim());
}
return effectiveTopicNameList;
}
private String getEffectiveConsumerTopicPatternStr(String cycleTopicPatternStr) {
if (!StringUtils.isBlank(cycleTopicPatternStr)) {
return cycleTopicPatternStr;
}
String globalTopicsPattern = pulsarSpace.getPulsarNBClientConf().getConsumerTopicPattern();
if (!StringUtils.isBlank(globalTopicsPattern)) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveConsumerTopicPattern(String cycleTopicPatternStr) {
String effectiveTopicPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicPatternStr);
Pattern topicsPattern;
try {
if (!StringUtils.isBlank(effectiveTopicPatternStr))
topicsPattern = Pattern.compile(effectiveTopicPatternStr);
else
topicsPattern = null;
} catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
}
// Subscription name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
@ -322,22 +430,47 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return "";
}
public Consumer<?> getConsumer(String cycleTopicName,
public Consumer<?> getConsumer(String cycleTopicNameListStr,
String cycleTopicPatternStr,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName,
String cycleKeySharedSubscriptionRanges) {
List<String> topicNameList = getEffectiveConsumerTopicNameList(cycleTopicNameListStr);
String topicPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicPatternStr);
Pattern topicPattern = getEffectiveConsumerTopicPattern(cycleTopicPatternStr);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if (StringUtils.isAnyBlank(cycleTopicName, subscriptionName)) {
if ( subscriptionType.equals(SubscriptionType.Exclusive) && (totalThreadNum > 1) ) {
throw new PulsarAdapterInvalidParamException(
"Must specify a topic name and a subscription name when creating a consumer!");
MessageConsumerOpDispenser.SUBSCRIPTION_TYPE_OP_PARAM,
"creating multiple consumers of \"Exclusive\" subscription type under the same subscription name");
}
String consumerCacheKey = PulsarAdapterUtil.buildCacheKey(consumerName, subscriptionName, cycleTopicName);
if ( (topicNameList.isEmpty() && (topicPattern == null)) ||
(!topicNameList.isEmpty() && (topicPattern != null)) ) {
throw new PulsarAdapterInvalidParamException(
"Invalid combination of topic name(s) and topic patterns; only specify one parameter!");
}
boolean multiTopicConsumer = (topicNameList.size() > 1 || (topicPattern != null));
String consumerTopicListString;
if (!topicNameList.isEmpty()) {
consumerTopicListString = String.join("|", topicNameList);
} else {
consumerTopicListString = topicPatternStr;
}
String consumerCacheKey = PulsarAdapterUtil.buildCacheKey(
consumerName,
subscriptionName,
consumerTopicListString);
Consumer<?> consumer = consumers.get(consumerCacheKey);
if (consumer == null) {
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
@ -355,13 +488,32 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
consumerConf.remove(PulsarAdapterUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
try {
ConsumerBuilder<?> consumerBuilder = pulsarClient.
newConsumer(pulsarSpace.getPulsarSchema()).
ConsumerBuilder<?> consumerBuilder;
if (!multiTopicConsumer) {
assert (topicNameList.size() == 1);
consumerBuilder = pulsarClient.newConsumer(pulsarSpace.getPulsarSchema());
consumerBuilder.topic(topicNameList.get(0));
}
else {
consumerBuilder = pulsarClient.newConsumer();
if (!topicNameList.isEmpty()) {
assert (topicNameList.size() > 1);
consumerBuilder.topics(topicNameList);
}
else {
consumerBuilder.topicsPattern(topicPattern);
}
}
consumerBuilder.
loadConf(consumerConf).
topic(cycleTopicName).
subscriptionName(subscriptionName).
subscriptionType(subscriptionType);
if (!StringUtils.isBlank(consumerName))
consumerBuilder.consumerName(consumerName);
if (subscriptionType == SubscriptionType.Key_Shared) {
KeySharedPolicy keySharedPolicy = KeySharedPolicy.autoSplitHashRange();
if (cycleKeySharedSubscriptionRanges != null && !cycleKeySharedSubscriptionRanges.isEmpty()) {
@ -372,10 +524,6 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
consumerBuilder.keySharedPolicy(keySharedPolicy);
}
if (!StringUtils.isBlank(consumerName)) {
consumerBuilder = consumerBuilder.consumerName(consumerName);
}
consumer = consumerBuilder.subscribe();
consumers.put(consumerCacheKey, consumer);
@ -385,7 +533,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
getPulsarAPIMetricsPrefix(
PulsarAdapterUtil.PULSAR_API_TYPE.CONSUMER.label,
consumerName,
cycleTopicName));
consumerTopicListString));
}
}

View File

@ -16,29 +16,47 @@
package io.nosqlbench.adapter.pulsar.dispensers;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("PulsarClientOpDispenser");
protected final PulsarClient pulsarClient;
protected final Schema<?> pulsarSchema;
protected final LongFunction<Boolean> useTransactFunc;
protected final LongFunction<Integer> transactBatchNumFunc;
// TODO: add support for "operation number per transaction"
// protected final LongFunction<Integer> transactBatchNumFunc;
protected final LongFunction<Boolean> seqTrackingFunc;
protected final LongFunction<String> payloadRttFieldFunc;
protected final LongFunction<Supplier<Transaction>> transactSupplierFunc;
protected final LongFunction<Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE>> errSimuTypeSetFunc;
public PulsarClientOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.pulsarClient = pulsarSpace.getPulsarClient();
this.pulsarSchema = pulsarSpace.getPulsarSchema();
@ -46,12 +64,61 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
this.useTransactFunc = lookupStaticBoolConfigValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label, false);
// TODO: add support for "operation number per transaction"
// Doc-level parameter: transact_batch_num
this.transactBatchNumFunc = lookupStaticIntOpValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.TRANSACT_BATCH_NUM.label, 1);
// this.transactBatchNumFunc = lookupStaticIntOpValueFunc(
// PulsarAdapterUtil.DOC_LEVEL_PARAMS.TRANSACT_BATCH_NUM.label, 1);
// Doc-level parameter: seq_tracking
this.seqTrackingFunc = lookupStaticBoolConfigValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false);
// Doc-level parameter: payload-tracking-field
this.payloadRttFieldFunc = (l) -> parsedOp.getStaticConfigOr(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.RTT_TRACKING_FIELD.label, "");
this.transactSupplierFunc = (l) -> getTransactionSupplier();
this.errSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
}
protected Supplier<Transaction> getTransactionSupplier() {
return () -> {
try (Timer.Context time = pulsarAdapterMetrics.getCommitTransactionTimer().time() ){
return pulsarClient
.newTransaction()
.build()
.get();
} catch (ExecutionException | InterruptedException err) {
if (logger.isWarnEnabled()) {
logger.warn("Error while starting a new transaction", err);
}
throw new RuntimeException(err);
} catch (PulsarClientException err) {
throw new RuntimeException("Transactions are not enabled on Pulsar Client, " +
"please set client.enableTransaction=true in your Pulsar Client configuration");
}
};
}
protected LongFunction<Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
LongFunction<Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue("seqerr_simu", String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
.map(PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE::parseSimuType)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toCollection(LinkedHashSet::new));
}
return set;
}).orElse(Collections.emptySet());
logger.info("seqerr_simu: {}", setStringLongFunction.apply(0));
return setStringLongFunction;
}
}

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.pulsar.ops;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -33,11 +34,12 @@ public class AdminNamespaceOp extends PulsarAdminOp {
// in format: <tenant>/<namespace>
private final String nsName;
public AdminNamespaceOp(PulsarAdmin pulsarAdmin,
public AdminNamespaceOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarAdmin pulsarAdmin,
boolean asyncApi,
boolean adminDelOp,
String nsName) {
super(pulsarAdmin, asyncApi, adminDelOp);
super(pulsarAdapterMetrics, pulsarAdmin, asyncApi, adminDelOp);
this.nsName = nsName;
}

View File

@ -18,6 +18,7 @@ package io.nosqlbench.adapter.pulsar.ops;
import io.nosqlbench.adapter.pulsar.PulsarDriverAdapter;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@ -37,13 +38,15 @@ public class AdminTenantOp extends PulsarAdminOp {
private final Set<String> allowedClusters;
private final String tntName;
public AdminTenantOp(PulsarAdmin pulsarAdmin,
public AdminTenantOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarAdmin pulsarAdmin,
boolean asyncApi,
boolean adminDelOp,
String tntName,
Set<String> adminRoles,
Set<String> allowedClusters) {
super(pulsarAdmin, asyncApi, adminDelOp);
super(pulsarAdapterMetrics, pulsarAdmin, asyncApi, adminDelOp);
this.tntName = tntName;
this.adminRoles = adminRoles;
this.allowedClusters = allowedClusters;

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.pulsar.ops;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -35,13 +36,15 @@ public class AdminTopicOp extends PulsarAdminOp {
private final boolean enablePart;
private final int partNum;
public AdminTopicOp(PulsarAdmin pulsarAdmin,
public AdminTopicOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarAdmin pulsarAdmin,
boolean asyncApi,
boolean adminDelOp,
String topicName,
boolean enablePart,
int partNum) {
super(pulsarAdmin, asyncApi, adminDelOp);
super(pulsarAdapterMetrics, pulsarAdmin, asyncApi, adminDelOp);
this.topicName = topicName;
this.enablePart = enablePart;
this.partNum = partNum;

View File

@ -16,16 +16,285 @@
package io.nosqlbench.adapter.pulsar.ops;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.shade.org.apache.avro.AvroRuntimeException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
public class MessageConsumerOp extends PulsarClientOp {
public MessageConsumerOp(PulsarClient pulsarClient, Schema<?> pulsarSchema) {
super(pulsarClient, pulsarSchema);
private final static Logger logger = LogManager.getLogger(MessageConsumerOp.class);
private final boolean useTransact;
private final boolean seqTracking;
private final Supplier<Transaction> transactSupplier;
private final String payloadRttField;
private final EndToEndStartingTimeSource e2eStartingTimeSrc;
private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
private final Consumer<?> consumer;
private final int consumerTimeoutInSec;
public MessageConsumerOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarClient pulsarClient,
Schema<?> pulsarSchema,
boolean asyncApi,
boolean useTransact,
boolean seqTracking,
Supplier<Transaction> transactSupplier,
String payloadRttField,
EndToEndStartingTimeSource e2eStartingTimeSrc,
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic,
Consumer<?> consumer,
int consumerTimeoutInSec) {
super(pulsarAdapterMetrics, pulsarClient, pulsarSchema, asyncApi);
this.useTransact = useTransact;
this.seqTracking = seqTracking;
this.transactSupplier = transactSupplier;
this.payloadRttField = payloadRttField;
this.e2eStartingTimeSrc = e2eStartingTimeSrc;
this.receivedMessageSequenceTrackerForTopic = receivedMessageSequenceTrackerForTopic;
this.consumer = consumer;
this.consumerTimeoutInSec = consumerTimeoutInSec;
}
@Override
public Object apply(long value) {
final Transaction transaction;
if (useTransact) {
// if you are in a transaction you cannot set the schema per-message
transaction = transactSupplier.get();
}
else {
transaction = null;
}
if (!asyncApi) {
try {
Message<?> message;
if (consumerTimeoutInSec <= 0) {
// wait forever
message = consumer.receive();
}
else {
message = consumer.receive(consumerTimeoutInSec, TimeUnit.SECONDS);
if (message == null) {
if ( logger.isDebugEnabled() ) {
logger.debug("Failed to sync-receive a message before time out ({} seconds)", consumerTimeoutInSec);
}
}
}
handleMessage(transaction, message);
}
catch (Exception e) {
throw new PulsarAdapterUnexpectedException("" +
"Sync message receiving failed - timeout value: " + consumerTimeoutInSec + " seconds ");
}
}
else {
try {
CompletableFuture<? extends Message<?>> msgRecvFuture = consumer.receiveAsync();
if (useTransact) {
// add commit step
msgRecvFuture = msgRecvFuture.thenCompose(msg -> {
Timer.Context ctx = transactionCommitTimer.time();
return transaction
.commit()
.whenComplete((m,e) -> ctx.close())
.thenApply(v-> msg);
}
);
}
msgRecvFuture.thenAccept(message -> {
try {
handleMessage(transaction, message);
} catch (PulsarClientException | TimeoutException e) {
pulsarActivity.asyncOperationFailed(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
pulsarActivity.asyncOperationFailed(e.getCause());
}
}).exceptionally(ex -> {
pulsarActivity.asyncOperationFailed(ex);
return null;
});
}
catch (Exception e) {
throw new PulsarAdapterUnexpectedException(e);
}
}
return null;
}
private void handleMessage(Transaction transaction, Message<?> message)
throws PulsarClientException, InterruptedException, ExecutionException, TimeoutException {
// acknowledge the message as soon as possible
if (!useTransact) {
consumer.acknowledgeAsync(message.getMessageId())
.get(consumerTimeoutInSec, TimeUnit.SECONDS);
} else {
consumer.acknowledgeAsync(message.getMessageId(), transaction)
.get(consumerTimeoutInSec, TimeUnit.SECONDS);
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
if (logger.isDebugEnabled()) {
Object decodedPayload = message.getValue();
if (decodedPayload instanceof GenericObject) {
// GenericObject is a wrapper for Primitives, for AVRO/JSON structs and for KeyValu
// we fall here with a configured AVRO schema or with AUTO_CONSUME
GenericObject object = (GenericObject) decodedPayload;
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
object.getNativeObject() + "");
}
else {
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
}
if (!payloadRttField.isEmpty()) {
boolean done = false;
Object decodedPayload = message.getValue();
Long extractedSendTime = null;
// if Pulsar is able to decode this it is better to let it do the work
// because Pulsar caches the Schema, handles Schema evolution
// as much efficiently as possible
if (decodedPayload instanceof GenericRecord) { // AVRO and AUTO_CONSUME
final GenericRecord pulsarGenericRecord = (GenericRecord) decodedPayload;
Object field = null;
// KeyValue is a special wrapper in Pulsar to represent a pair of values
// a Key and a Value
Object nativeObject = pulsarGenericRecord.getNativeObject();
if (nativeObject instanceof KeyValue) {
KeyValue keyValue = (KeyValue) nativeObject;
// look into the Key
if (keyValue.getKey() instanceof GenericRecord) {
GenericRecord keyPart = (GenericRecord) keyValue.getKey();
try {
field = keyPart.getField(payloadRttField);
} catch (AvroRuntimeException err) {
// field is not in the key
logger.error("Cannot find {} in key {}: {}", payloadRttField, keyPart, err + "");
}
}
// look into the Value
if (keyValue.getValue() instanceof GenericRecord && field == null) {
GenericRecord valuePart = (GenericRecord) keyValue.getValue();
try {
field = valuePart.getField(payloadRttField);
} catch (AvroRuntimeException err) {
// field is not in the value
logger.error("Cannot find {} in value {}: {}", payloadRttField, valuePart, err + "");
}
}
if (field == null) {
throw new RuntimeException("Cannot find field {}" + payloadRttField + " in " + keyValue.getKey() + " and " + keyValue.getValue());
}
} else {
field = pulsarGenericRecord.getField(payloadRttField);
}
if (field != null) {
if (field instanceof Number) {
extractedSendTime = ((Number) field).longValue();
} else {
extractedSendTime = Long.valueOf(field.toString());
}
} else {
logger.error("Cannot find {} in value {}", payloadRttField, pulsarGenericRecord);
}
done = true;
}
if (!done) {
org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
org.apache.avro.generic.GenericRecord avroGenericRecord =
PulsarAvroSchemaUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttField)) {
extractedSendTime = (Long) avroGenericRecord.get(payloadRttField);
}
}
if (extractedSendTime != null) {
// fallout expects latencies in "ns" and not in "ms"
long delta = TimeUnit.MILLISECONDS
.toNanos(System.currentTimeMillis() - extractedSendTime);
payloadRttHistogram.update(delta);
}
}
// keep track end-to-end message processing latency
if (e2eStartingTimeSrc != EndToEndStartingTimeSource.NONE) {
long startTimeStamp = 0L;
switch (e2eStartingTimeSrc) {
case MESSAGE_PUBLISH_TIME:
startTimeStamp = message.getPublishTime();
break;
case MESSAGE_EVENT_TIME:
startTimeStamp = message.getEventTime();
break;
case MESSAGE_PROPERTY_E2E_STARTING_TIME:
String startingTimeProperty = message.getProperty("e2e_starting_time");
startTimeStamp = startingTimeProperty != null ? Long.parseLong(startingTimeProperty) : 0L;
break;
}
if (startTimeStamp != 0L) {
long e2eMsgLatency = System.currentTimeMillis() - startTimeStamp;
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
int messageSize = message.getData().length;
messageSizeHistogram.update(messageSize);
}
private void checkAndUpdateMessageErrorCounter(Message<?> message) {
String msgSeqIdStr = message.getProperty(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER);
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
long sequenceNumber = Long.parseLong(msgSeqIdStr);
ReceivedMessageSequenceTracker receivedMessageSequenceTracker =
receivedMessageSequenceTrackerForTopic.apply(message.getTopicName());
receivedMessageSequenceTracker.sequenceNumberReceived(sequenceNumber);
}
}
}

View File

@ -16,17 +16,279 @@
package io.nosqlbench.adapter.pulsar.ops;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.MessageSequenceNumberSendingHandler;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.PulsarAvroSchemaUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
public class MessageProducerOp extends PulsarClientOp {
public MessageProducerOp(PulsarClient pulsarClient, Schema<?> pulsarSchema) {
super(pulsarClient, pulsarSchema);
private final static Logger logger = LogManager.getLogger("MessageProducerOp");
private final boolean useTransact;
private final boolean seqTracking;
private final Supplier<Transaction> transactSupplier;
private final Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
private final Producer<?> producer;
private final String msgKey;
private final String msgPropRawJsonStr;
private final String msgValue;
private final Map<String, String> msgProperties = new HashMap<>();
private final ThreadLocal<Map<String, MessageSequenceNumberSendingHandler>> MessageSequenceNumberSendingHandlersThreadLocal =
ThreadLocal.withInitial(HashMap::new);
public MessageProducerOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarClient pulsarClient,
Schema<?> pulsarSchema,
boolean asyncApi,
boolean useTransact,
boolean seqTracking,
Supplier<Transaction> transactSupplier,
Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
Producer<?> producer,
String msgKey,
String msgProp,
String msgValue) {
super(pulsarAdapterMetrics, pulsarClient, pulsarSchema, asyncApi);
this.useTransact = useTransact;
this.seqTracking = seqTracking;
this.transactSupplier = transactSupplier;
this.errSimuTypeSet = errSimuTypeSet;
this.producer = producer;
this.msgKey = msgKey;
this.msgPropRawJsonStr = msgProp;
this.msgValue = msgValue;
getMsgPropMapFromRawJsonStr();
getMsgPropMapFromRawJsonStr();
}
private MessageSequenceNumberSendingHandler getMessageSequenceNumberSendingHandler(String topicName) {
return MessageSequenceNumberSendingHandlersThreadLocal.get()
.computeIfAbsent(topicName, k -> new MessageSequenceNumberSendingHandler());
}
// Check if msgPropJonStr is valid JSON string with a collection of key/value pairs
// - if Yes, convert it to a map
// - otherwise, log an error message and ignore message properties without throwing a runtime exception
private void getMsgPropMapFromRawJsonStr() {
if (!StringUtils.isBlank(msgPropRawJsonStr)) {
try {
msgProperties.putAll(PulsarAdapterUtil.convertJsonToMap(msgPropRawJsonStr));
}
catch (Exception e) {
logger.error(
"Error parsing message property JSON string {}, ignore message properties!",
msgPropRawJsonStr);
}
}
if (seqTracking) {
long nextSequenceNumber = getMessageSequenceNumberSendingHandler(producer.getTopic())
.getNextSequenceNumber(errSimuTypeSet);
msgProperties.put(PulsarAdapterUtil.MSG_SEQUENCE_NUMBER, String.valueOf(nextSequenceNumber));
}
}
@Override
public Object apply(long value) {
TypedMessageBuilder typedMessageBuilder;
final Transaction transaction;
if (useTransact) {
// if you are in a transaction you cannot set the schema per-message
transaction = transactSupplier.get();
typedMessageBuilder = producer.newMessage(transaction);
}
else {
transaction = null;
typedMessageBuilder = producer.newMessage(pulsarSchema);
}
// set message key
if (!StringUtils.isBlank(msgKey)) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
// set message properties
if ( !msgPropRawJsonStr.isEmpty() ) {
typedMessageBuilder = typedMessageBuilder.properties(msgProperties);
}
// set message payload
int messageSize;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (pulsarSchema instanceof KeyValueSchema) {
// {KEY IN JSON}||{VALUE IN JSON}
int separator = msgValue.indexOf("}||{");
if (separator < 0) {
throw new IllegalArgumentException("KeyValue payload MUST be in form {KEY IN JSON}||{VALUE IN JSON} (with 2 pipes that separate the KEY part from the VALUE part)");
}
String keyInput = msgValue.substring(0, separator + 1);
String valueInput = msgValue.substring(separator + 3);
KeyValueSchema keyValueSchema = (KeyValueSchema) pulsarSchema;
org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
GenericRecord payload = PulsarAvroSchemaUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) keyValueSchema.getValueSchema(),
avroSchema,
valueInput
);
org.apache.avro.Schema avroSchemaForKey = getKeyAvroSchemaFromConfiguration();
GenericRecord key = PulsarAvroSchemaUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) keyValueSchema.getKeySchema(),
avroSchemaForKey,
keyInput
);
typedMessageBuilder = typedMessageBuilder.value(new KeyValue(key, payload));
// TODO: add a way to calculate the message size for KEY_VALUE messages
messageSize = msgValue.length();
}
else if (PulsarAdapterUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = PulsarAvroSchemaUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema,
pulsarSchema.getSchemaInfo().getSchemaDefinition(),
msgValue
);
typedMessageBuilder = typedMessageBuilder.value(payload);
// TODO: add a way to calculate the message size for AVRO messages
messageSize = msgValue.length();
} else {
byte[] array = msgValue.getBytes(StandardCharsets.UTF_8);
typedMessageBuilder = typedMessageBuilder.value(array);
messageSize = array.length;
}
messageSizeHistogram.update(messageSize);
//TODO: add error handling with failed message production
if (!asyncApi) {
try {
logger.trace("Sending message");
typedMessageBuilder.send();
if (useTransact) {
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
if (logger.isDebugEnabled()) {
if (PulsarAdapterUtil.isAvroSchemaTypeStr(schemaType.name())) {
org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
org.apache.avro.generic.GenericRecord avroGenericRecord =
PulsarAvroSchemaUtil.GetGenericRecord_ApacheAvro(avroSchema, msgValue);
logger.debug("({}) Sync message sent: msg-key={}; msg-properties={}; msg-payload={})",
producer.getProducerName(),
msgKey,
msgPropRawJsonStr,
avroGenericRecord.toString());
}
else {
logger.debug("({}) Sync message sent; msg-key={}; msg-properties={}; msg-payload={}",
producer.getProducerName(),
msgKey,
msgPropRawJsonStr,
msgValue);
}
}
}
catch (PulsarClientException | ExecutionException | InterruptedException pce) {
String errMsg =
"Sync message sending failed: " +
"key - " + msgKey + "; " +
"properties - " + msgPropRawJsonStr + "; " +
"payload - " + msgValue;
logger.trace(errMsg);
throw new PulsarAdapterUnexpectedException(errMsg);
}
timeTracker.run();
}
else {
try {
// we rely on blockIfQueueIsFull in order to throttle the request in this case
CompletableFuture<?> future = typedMessageBuilder.sendAsync();
if (useTransact) {
// add commit step
future = future.thenCompose(msg -> {
Timer.Context ctx = transactionCommitTimer.time();
return transaction
.commit()
.whenComplete((m,e) -> ctx.close())
.thenApply(v-> msg);
}
);
}
future.whenComplete((messageId, error) -> {
if (logger.isDebugEnabled()) {
if (PulsarAdapterUtil.isAvroSchemaTypeStr(schemaType.name())) {
org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
org.apache.avro.generic.GenericRecord avroGenericRecord =
PulsarAvroSchemaUtil.GetGenericRecord_ApacheAvro(avroSchema, msgValue);
logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})",
producer.getProducerName(),
msgKey,
msgPropRawJsonStr,
avroGenericRecord.toString());
}
else {
logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}",
producer.getProducerName(),
msgKey,
msgPropRawJsonStr,
msgValue);
}
}
timeTracker.run();
}).exceptionally(ex -> {
logger.error("Async message sending failed: " +
"key - " + msgKey + "; " +
"properties - " + msgPropRawJsonStr + "; " +
"payload - " + msgValue);
pulsarActivity.asyncOperationFailed(ex);
return null;
});
}
catch (Exception e) {
throw new PulsarAdapterUnexpectedException(e);
}
}
return null;
}
}

View File

@ -16,17 +16,33 @@
package io.nosqlbench.adapter.pulsar.ops;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
public class MessageReaderOp extends PulsarClientOp {
public MessageReaderOp(PulsarClient pulsarClient, Schema<?> pulsarSchema) {
super(pulsarClient, pulsarSchema);
private final static Logger logger = LogManager.getLogger(MessageReaderOp.class);
private final Reader<?> reader;
public MessageReaderOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarClient pulsarClient,
Schema<?> pulsarSchema,
boolean asyncApi,
Reader<?> reader) {
super(pulsarAdapterMetrics, pulsarClient, pulsarSchema, asyncApi);
this.reader = reader;
}
@Override
public Object apply(long value) {
// TODO: implement the Pulsar reader logic when needed
// at the moment, the reader API support from the NB Pulsar driver is disabled
return null;
}
}

View File

@ -16,17 +16,21 @@
package io.nosqlbench.adapter.pulsar.ops;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
import org.apache.pulsar.client.admin.PulsarAdmin;
public abstract class PulsarAdminOp extends PulsarOp {
protected PulsarAdmin pulsarAdmin;
protected boolean asyncApi;
protected boolean adminDelOp;
public PulsarAdminOp(PulsarAdmin pulsarAdmin, boolean asyncApi, boolean adminDelOp) {
public PulsarAdminOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarAdmin pulsarAdmin,
boolean asyncApi,
boolean adminDelOp) {
super(pulsarAdapterMetrics, asyncApi);
this.pulsarAdmin = pulsarAdmin;
this.asyncApi = asyncApi;
this.adminDelOp = adminDelOp;
}
}

View File

@ -16,16 +16,74 @@
package io.nosqlbench.adapter.pulsar.ops;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import io.nosqlbench.adapter.pulsar.util.PulsarAvroSchemaUtil;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.SchemaType;
public abstract class PulsarClientOp extends PulsarOp {
protected PulsarClient pulsarClient;
protected Schema<?> pulsarScheam;
protected final PulsarClient pulsarClient;
protected final Schema<?> pulsarSchema;
// Pulsar KeyValue schema
private org.apache.avro.Schema avroSchema;
private org.apache.avro.Schema avroKeySchema;
protected final Histogram messageSizeHistogram;
protected final Histogram payloadRttHistogram;
protected final Histogram e2eMsgProcLatencyHistogram;
protected final Timer transactionCommitTimer;
public PulsarClientOp(PulsarAdapterMetrics pulsarAdapterMetrics,
PulsarClient pulsarClient,
Schema<?> pulsarScheam,
boolean asyncApi) {
super (pulsarAdapterMetrics, asyncApi);
public PulsarClientOp(PulsarClient pulsarClient, Schema<?> pulsarScheam) {
this.pulsarClient = pulsarClient;
this.pulsarScheam = pulsarScheam;
this.pulsarSchema = pulsarScheam;
this.messageSizeHistogram = pulsarAdapterMetrics.getMessageSizeHistogram();
this.payloadRttHistogram = pulsarAdapterMetrics.getPayloadRttHistogram();
this.e2eMsgProcLatencyHistogram = pulsarAdapterMetrics.getE2eMsgProcLatencyHistogram();
this.transactionCommitTimer = pulsarAdapterMetrics.getCommitTransactionTimer();
}
protected org.apache.avro.Schema getAvroSchemaFromConfiguration() {
// no need for synchronization, this is only a cache
// in case of the race we will parse the string twice, not a big
if (avroSchema == null) {
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
Schema valueSchema = kvSchema.getValueSchema();
String avroDefStr = valueSchema.getSchemaInfo().getSchemaDefinition();
avroSchema = PulsarAvroSchemaUtil.GetSchema_ApacheAvro(avroDefStr);
} else {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
avroSchema = PulsarAvroSchemaUtil.GetSchema_ApacheAvro(avroDefStr);
}
}
return avroSchema;
}
protected org.apache.avro.Schema getKeyAvroSchemaFromConfiguration() {
// no need for synchronization, this is only a cache
// in case of the race we will parse the string twice, not a big
if (avroKeySchema == null) {
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
Schema keySchema = kvSchema.getKeySchema();
String avroDefStr = keySchema.getSchemaInfo().getSchemaDefinition();
avroKeySchema = PulsarAvroSchemaUtil.GetSchema_ApacheAvro(avroDefStr);
} else {
throw new RuntimeException("We are not using KEY_VALUE schema, so no Schema for the Key!");
}
}
return avroKeySchema;
}
}

View File

@ -17,7 +17,15 @@
package io.nosqlbench.adapter.pulsar.ops;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public abstract class PulsarOp implements CycleOp<Object> {
protected final boolean asyncApi;
protected final PulsarAdapterMetrics pulsarAdapterMetrics;
public PulsarOp(PulsarAdapterMetrics pulsarAdapterMetrics, boolean asyncApi) {
this.pulsarAdapterMetrics = pulsarAdapterMetrics;
this.asyncApi = asyncApi;
}
}

View File

@ -0,0 +1,26 @@
package io.nosqlbench.adapter.pulsar.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
public enum EndToEndStartingTimeSource {
NONE, // no end-to-end latency calculation
MESSAGE_PUBLISH_TIME, // use message publish timestamp
MESSAGE_EVENT_TIME, // use message event timestamp
MESSAGE_PROPERTY_E2E_STARTING_TIME // use message property called "e2e_starting_time" as the timestamp
}

View File

@ -0,0 +1,109 @@
package io.nosqlbench.adapter.pulsar.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import org.apache.commons.lang3.RandomUtils;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
import java.util.Set;
/**
* Handles adding a monotonic sequence number to message properties of sent messages
*/
public class MessageSequenceNumberSendingHandler {
static final int SIMULATED_ERROR_PROBABILITY_PERCENTAGE = 10;
long number = 1;
Queue<Long> outOfOrderNumbers;
public long getNextSequenceNumber(Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes) {
return getNextSequenceNumber(simulatedErrorTypes, SIMULATED_ERROR_PROBABILITY_PERCENTAGE);
}
long getNextSequenceNumber(Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
simulateError(simulatedErrorTypes, errorProbabilityPercentage);
return nextNumber();
}
private void simulateError(Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
if (!simulatedErrorTypes.isEmpty() && shouldSimulateError(errorProbabilityPercentage)) {
int selectIndex = 0;
int numberOfErrorTypes = simulatedErrorTypes.size();
if (numberOfErrorTypes > 1) {
// pick one of the simulated error type randomly
selectIndex = RandomUtils.nextInt(0, numberOfErrorTypes);
}
PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream()
.skip(selectIndex)
.findFirst()
.get();
switch (errorType) {
case OutOfOrder:
// simulate message out of order
injectMessagesOutOfOrder();
break;
case MsgDup:
// simulate message duplication
injectMessageDuplication();
break;
case MsgLoss:
// simulate message loss
injectMessageLoss();
break;
}
}
}
private boolean shouldSimulateError(int errorProbabilityPercentage) {
// Simulate error with the specified probability
return RandomUtils.nextInt(0, 100) < errorProbabilityPercentage;
}
long nextNumber() {
if (outOfOrderNumbers != null) {
long nextNumber = outOfOrderNumbers.poll();
if (outOfOrderNumbers.isEmpty()) {
outOfOrderNumbers = null;
}
return nextNumber;
}
return number++;
}
void injectMessagesOutOfOrder() {
if (outOfOrderNumbers == null) {
outOfOrderNumbers = new ArrayDeque<>(Arrays.asList(number + 2, number, number + 1));
number += 3;
}
}
void injectMessageDuplication() {
if (outOfOrderNumbers == null) {
number--;
}
}
void injectMessageLoss() {
if (outOfOrderNumbers == null) {
number++;
}
}
}

View File

@ -21,6 +21,8 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.dispensers.PulsarBaseOpDispenser;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@ -36,69 +38,104 @@ public class PulsarAdapterMetrics {
private final static Logger logger = LogManager.getLogger("PulsarAdapterMetrics");
private final PulsarSpace pulsarSpace;
private final PulsarBaseOpDispenser pulsarBaseOpDispenser;
private final String defaultAdapterMetricsPrefix;
/**
* Pulsar adapter specific metrics
*/
protected Counter bytesCounter;
// - message out of sequence error counter
protected Counter msgErrOutOfSeqCounter;
private Counter msgErrOutOfSeqCounter;
// - message loss counter
protected Counter msgErrLossCounter;
private Counter msgErrLossCounter;
// - message duplicate (when dedup is enabled) error counter
protected Counter msgErrDuplicateCounter;
private Counter msgErrDuplicateCounter;
protected Histogram messageSizeHistogram;
private Histogram messageSizeHistogram;
// end-to-end latency
protected Histogram e2eMsgProcLatencyHistogram;
private Histogram e2eMsgProcLatencyHistogram;
// A histogram that tracks payload round-trip-time, based on a user-defined field in some sender
// system which can be interpreted as millisecond epoch time in the system's local time zone.
// This is paired with a field name of the same type to be extracted and reported in a metric
// named 'payload-rtt'.
protected Histogram payloadRttHistogram;
private Histogram payloadRttHistogram;
protected Timer bindTimer;
protected Timer executeTimer;
protected Timer createTransactionTimer;
protected Timer commitTransactionTimer;
private Timer bindTimer;
private Timer executeTimer;
private Timer createTransactionTimer;
private Timer commitTransactionTimer;
public PulsarAdapterMetrics(PulsarSpace pulsarSpace, String defaultMetricsPrefix) {
this.pulsarSpace = pulsarSpace;
public PulsarAdapterMetrics(PulsarBaseOpDispenser pulsarBaseOpDispenser, String defaultMetricsPrefix) {
this.pulsarBaseOpDispenser = pulsarBaseOpDispenser;
this.defaultAdapterMetricsPrefix = defaultMetricsPrefix;
}
public void initPulsarAdapterInstrumentation() {
// Counter metrics
this.bytesCounter =
ActivityMetrics.counter(this.defaultAdapterMetricsPrefix + "bytes");
this.msgErrOutOfSeqCounter =
ActivityMetrics.counter(this.defaultAdapterMetricsPrefix + "err_msg_oos");
ActivityMetrics.counter(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_oos");
this.msgErrLossCounter =
ActivityMetrics.counter(this.defaultAdapterMetricsPrefix + "err_msg_loss");
ActivityMetrics.counter(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_loss");
this.msgErrDuplicateCounter =
ActivityMetrics.counter(this.defaultAdapterMetricsPrefix + "err_msg_dup");
ActivityMetrics.counter(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "err_msg_dup");
// Histogram metrics
this.messageSizeHistogram =
ActivityMetrics.histogram(this.defaultAdapterMetricsPrefix + "message_size");
ActivityMetrics.histogram(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "message_size",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.e2eMsgProcLatencyHistogram =
ActivityMetrics.histogram(this.defaultAdapterMetricsPrefix + "e2e_msg_latency");
ActivityMetrics.histogram(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "e2e_msg_latency",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.payloadRttHistogram =
ActivityMetrics.histogram(this.defaultAdapterMetricsPrefix + "payload_rtt");
ActivityMetrics.histogram(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "payload_rtt",
ActivityMetrics.DEFAULT_HDRDIGITS);
// Timer metrics
this.bindTimer =
ActivityMetrics.timer(this.defaultAdapterMetricsPrefix + "bind");
ActivityMetrics.timer(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "bind",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.executeTimer =
ActivityMetrics.timer(this.defaultAdapterMetricsPrefix + "execute");
ActivityMetrics.timer(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.createTransactionTimer =
ActivityMetrics.timer(this.defaultAdapterMetricsPrefix + "create_transaction");
ActivityMetrics.timer(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "create_transaction",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.commitTransactionTimer =
ActivityMetrics.timer(this.defaultAdapterMetricsPrefix + "commit_transaction");
ActivityMetrics.timer(
pulsarBaseOpDispenser,
defaultAdapterMetricsPrefix + "commit_transaction",
ActivityMetrics.DEFAULT_HDRDIGITS);
}
public Counter getMsgErrOutOfSeqCounter() { return this.msgErrOutOfSeqCounter; }
public Counter getMsgErrLossCounter() { return this.msgErrLossCounter; }
public Counter getMsgErrDuplicateCounter() { return this.msgErrDuplicateCounter; }
public Histogram getMessageSizeHistogram() { return this.messageSizeHistogram; }
public Histogram getE2eMsgProcLatencyHistogram() { return this.e2eMsgProcLatencyHistogram; }
public Histogram getPayloadRttHistogram() { return payloadRttHistogram; }
public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return executeTimer; }
public Timer getCreateTransactionTimer() { return createTransactionTimer; }
public Timer getCommitTransactionTimer() { return commitTransactionTimer; }
//////////////////////////////////////
// Pulsar client producer API metrics
@ -132,17 +169,17 @@ public class PulsarAdapterMetrics {
metricsPrefix = pulsarApiMetricsPrefix;
}
ActivityMetrics.gauge(metricsPrefix + "total_bytes_sent",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_bytes_sent",
producerSafeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(metricsPrefix + "total_msg_sent",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_msg_sent",
producerSafeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
ActivityMetrics.gauge(metricsPrefix + "total_send_failed",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_send_failed",
producerSafeExtractMetric(producer, (s -> s.getTotalSendFailed() + s.getNumSendFailed())));
ActivityMetrics.gauge(metricsPrefix + "total_ack_received",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_ack_received",
producerSafeExtractMetric(producer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived())));
ActivityMetrics.gauge(metricsPrefix + "send_bytes_rate",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "send_bytes_rate",
producerSafeExtractMetric(producer, ProducerStats::getSendBytesRate));
ActivityMetrics.gauge(metricsPrefix + "send_msg_rate",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "send_msg_rate",
producerSafeExtractMetric(producer, ProducerStats::getSendMsgsRate));
}
@ -180,17 +217,17 @@ public class PulsarAdapterMetrics {
metricsPrefix = pulsarApiMetricsPrefix;
}
ActivityMetrics.gauge(metricsPrefix + "total_bytes_recv",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_bytes_recv",
consumerSafeExtractMetric(consumer, (s -> s.getTotalBytesReceived() + s.getNumBytesReceived())));
ActivityMetrics.gauge(metricsPrefix + "total_msg_recv",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_msg_recv",
consumerSafeExtractMetric(consumer, (s -> s.getTotalMsgsReceived() + s.getNumMsgsReceived())));
ActivityMetrics.gauge(metricsPrefix + "total_recv_failed",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_recv_failed",
consumerSafeExtractMetric(consumer, (s -> s.getTotalReceivedFailed() + s.getNumReceiveFailed())));
ActivityMetrics.gauge(metricsPrefix + "total_acks_sent",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "total_acks_sent",
consumerSafeExtractMetric(consumer,(s -> s.getTotalAcksSent() + s.getNumAcksSent())));
ActivityMetrics.gauge(metricsPrefix + "recv_bytes_rate",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "recv_bytes_rate",
consumerSafeExtractMetric(consumer, ConsumerStats::getRateBytesReceived));
ActivityMetrics.gauge(metricsPrefix + "recv_msg_rate",
ActivityMetrics.gauge(pulsarBaseOpDispenser, metricsPrefix + "recv_msg_rate",
consumerSafeExtractMetric(consumer, ConsumerStats::getRateMsgsReceived));
}
}

View File

@ -470,7 +470,7 @@ public class PulsarAdapterUtil {
}
}
schema = AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
schema = PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
} else {
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
}

View File

@ -31,7 +31,7 @@ import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
public class AvroUtil {
public class PulsarAvroSchemaUtil {
////////////////////////
// Get an OSS Apache Avro schema from a string definition
public static org.apache.avro.Schema GetSchema_ApacheAvro(String avroSchemDef) {

View File

@ -0,0 +1,169 @@
package io.nosqlbench.adapter.pulsar.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.codahale.metrics.Counter;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* Detects message loss, message duplication and out-of-order message delivery
* based on a monotonic sequence number that each received message contains.
* <p>
* Out-of-order messages are detected with a maximum look behind of 1000 sequence number entries.
* This is currently defined as a constant, {@link ReceivedMessageSequenceTracker#DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS}.
*/
public class ReceivedMessageSequenceTracker implements AutoCloseable {
private static final int DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS = 1000;
private static final int DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS = 1000;
// message out-of-sequence error counter
private final Counter msgErrOutOfSeqCounter;
// duplicate message error counter
private final Counter msgErrDuplicateCounter;
// message loss error counter
private final Counter msgErrLossCounter;
private final SortedSet<Long> pendingOutOfSeqNumbers;
private final int maxTrackOutOfOrderSequenceNumbers;
private final SortedSet<Long> skippedSeqNumbers;
private final int maxTrackSkippedSequenceNumbers;
private long expectedNumber = -1;
public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter) {
this(msgErrOutOfSeqCounter, msgErrDuplicateCounter, msgErrLossCounter,
DEFAULT_MAX_TRACK_OUT_OF_ORDER_SEQUENCE_NUMBERS, DEFAULT_MAX_TRACK_SKIPPED_SEQUENCE_NUMBERS);
}
public ReceivedMessageSequenceTracker(Counter msgErrOutOfSeqCounter, Counter msgErrDuplicateCounter, Counter msgErrLossCounter,
int maxTrackOutOfOrderSequenceNumbers, int maxTrackSkippedSequenceNumbers) {
this.msgErrOutOfSeqCounter = msgErrOutOfSeqCounter;
this.msgErrDuplicateCounter = msgErrDuplicateCounter;
this.msgErrLossCounter = msgErrLossCounter;
this.maxTrackOutOfOrderSequenceNumbers = maxTrackOutOfOrderSequenceNumbers;
this.maxTrackSkippedSequenceNumbers = maxTrackSkippedSequenceNumbers;
this.pendingOutOfSeqNumbers = new TreeSet<>();
this.skippedSeqNumbers = new TreeSet<>();
}
/**
* Notifies the tracker about a received sequence number
*
* @param sequenceNumber the sequence number of the received message
*/
public void sequenceNumberReceived(long sequenceNumber) {
if (expectedNumber == -1) {
expectedNumber = sequenceNumber + 1;
return;
}
if (sequenceNumber < expectedNumber) {
if (skippedSeqNumbers.remove(sequenceNumber)) {
// late out-of-order delivery was detected
// decrease the loss counter
msgErrLossCounter.dec();
// increment the out-of-order counter
msgErrOutOfSeqCounter.inc();
} else {
msgErrDuplicateCounter.inc();
}
return;
}
boolean messagesSkipped = false;
if (sequenceNumber > expectedNumber) {
if (pendingOutOfSeqNumbers.size() == maxTrackOutOfOrderSequenceNumbers) {
messagesSkipped = processLowestPendingOutOfSequenceNumber();
}
if (!pendingOutOfSeqNumbers.add(sequenceNumber)) {
msgErrDuplicateCounter.inc();
}
} else {
// sequenceNumber == expectedNumber
expectedNumber++;
}
processPendingOutOfSequenceNumbers(messagesSkipped);
cleanUpTooFarBehindOutOfSequenceNumbers();
}
private boolean processLowestPendingOutOfSequenceNumber() {
// remove the lowest pending out of sequence number
Long lowestOutOfSeqNumber = pendingOutOfSeqNumbers.first();
pendingOutOfSeqNumbers.remove(lowestOutOfSeqNumber);
if (lowestOutOfSeqNumber > expectedNumber) {
// skip the expected number ahead to the number after the lowest sequence number
// increment the counter with the amount of sequence numbers that got skipped
// keep track of the skipped sequence numbers to detect late out-of-order message delivery
for (long l = expectedNumber; l < lowestOutOfSeqNumber; l++) {
msgErrLossCounter.inc();
skippedSeqNumbers.add(l);
if (skippedSeqNumbers.size() > maxTrackSkippedSequenceNumbers) {
skippedSeqNumbers.remove(skippedSeqNumbers.first());
}
}
expectedNumber = lowestOutOfSeqNumber + 1;
return true;
} else {
msgErrLossCounter.inc();
}
return false;
}
private void processPendingOutOfSequenceNumbers(boolean messagesSkipped) {
// check if there are previously received out-of-order sequence number that have been received
while (pendingOutOfSeqNumbers.remove(expectedNumber)) {
expectedNumber++;
if (!messagesSkipped) {
msgErrOutOfSeqCounter.inc();
}
}
}
private void cleanUpTooFarBehindOutOfSequenceNumbers() {
// remove sequence numbers that are too far behind
for (Iterator<Long> iterator = pendingOutOfSeqNumbers.iterator(); iterator.hasNext(); ) {
Long number = iterator.next();
if (number < expectedNumber - maxTrackOutOfOrderSequenceNumbers) {
msgErrLossCounter.inc();
iterator.remove();
} else {
break;
}
}
}
/**
* Handles the possible pending out of sequence numbers. Mainly needed in unit tests to assert the
* counter values.
*/
@Override
public void close() {
while (!pendingOutOfSeqNumbers.isEmpty()) {
processPendingOutOfSequenceNumbers(processLowestPendingOutOfSequenceNumber());
}
}
public int getMaxTrackOutOfOrderSequenceNumbers() {
return maxTrackOutOfOrderSequenceNumbers;
}
public int getMaxTrackSkippedSequenceNumbers() {
return maxTrackSkippedSequenceNumbers;
}
}

View File

@ -194,11 +194,6 @@ public class ActivityMetrics {
return (Counter) register(named, name, Counter::new);
}
public static Counter counter(String fullName) {
Counter counter = get().register(fullName, new Counter());
return counter;
}
/**
* <p>Create a meter associated with an activity.</p>
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
@ -229,10 +224,6 @@ public class ActivityMetrics {
return (Gauge<T>) register(named, name, () -> gauge);
}
public static <T> Gauge<T> gauge(String fullMetricsName, Gauge<T> gauge) {
return (Gauge<T>) register(fullMetricsName, () -> gauge);
}
@SuppressWarnings("unchecked")
public static <T> Gauge<T> gauge(ScriptContext scriptContext, String name, Gauge<T> gauge) {
return (Gauge<T>) register(scriptContext, name, () -> gauge);