Merge pull request #792 from yabinmeng/main

New consumer subscription feature; fix consumer DLT loading issue; code cleanup and reorg
This commit is contained in:
Jonathan Shook 2022-11-18 23:27:23 -06:00 committed by GitHub
commit 3e8153dd87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 471 additions and 524 deletions

View File

@ -119,9 +119,9 @@ public class PulsarSpace implements AutoCloseable {
// Pulsar Authentication
String authPluginClassName =
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authPulginClassName.label);
pulsarClientConf.getClientConfValueRaw(PulsarAdapterUtil.CLNT_CONF_KEY.authPulginClassName.label);
String authParams =
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authParams.label);
pulsarClientConf.getClientConfValueRaw(PulsarAdapterUtil.CLNT_CONF_KEY.authParams.label);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder.authentication(authPluginClassName, authParams);
@ -131,7 +131,7 @@ public class PulsarSpace implements AutoCloseable {
boolean useTls = StringUtils.contains(pulsarSvcUrl, "pulsar+ssl");
if ( useTls ) {
String tlsHostnameVerificationEnableStr =
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
pulsarClientConf.getClientConfValueRaw(PulsarAdapterUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
adminBuilder
@ -140,14 +140,14 @@ public class PulsarSpace implements AutoCloseable {
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
String tlsTrustCertsFilePath =
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
pulsarClientConf.getClientConfValueRaw(PulsarAdapterUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
if (!StringUtils.isBlank(tlsTrustCertsFilePath)) {
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
String tlsAllowInsecureConnectionStr =
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
pulsarClientConf.getClientConfValueRaw(PulsarAdapterUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
@ -188,8 +188,8 @@ public class PulsarSpace implements AutoCloseable {
private Schema<?> buildSchemaFromDefinition(String schemaTypeConfEntry,
String schemaDefinitionConfEntry) {
String schemaType = pulsarClientConf.getSchemaConfValue(schemaTypeConfEntry);
String schemaDef = pulsarClientConf.getSchemaConfValue(schemaDefinitionConfEntry);
String schemaType = pulsarClientConf.getSchemaConfValueRaw(schemaTypeConfEntry);
String schemaDef = pulsarClientConf.getSchemaConfValueRaw(schemaDefinitionConfEntry);
Schema<?> result;
if (PulsarAdapterUtil.isAvroSchemaTypeStr(schemaType)) {
@ -210,11 +210,13 @@ public class PulsarSpace implements AutoCloseable {
// this is to allow KEY_VALUE schema
if (pulsarClientConf.hasSchemaConfKey("schema.key.type")) {
Schema<?> pulsarKeySchema = buildSchemaFromDefinition("schema.key.type", "schema.key.definition");
String encodingType = pulsarClientConf.getSchemaConfValue("schema.keyvalue.encodingtype");
KeyValueEncodingType keyValueEncodingType = KeyValueEncodingType.SEPARATED;
if (encodingType != null) {
String encodingType = pulsarClientConf.getSchemaConfValueRaw("schema.keyvalue.encodingtype");
if (StringUtils.isNotBlank(encodingType)) {
keyValueEncodingType = KeyValueEncodingType.valueOf(encodingType);
}
pulsarSchema = Schema.KeyValue(pulsarKeySchema, pulsarSchema, keyValueEncodingType);
}
}

View File

@ -35,12 +35,6 @@ public class MessageConsumerOpDispenser extends PulsarClientOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageConsumerOpDispenser");
public static final String TOPIC_PATTERN_OP_PARAM = "topic_pattern";
public static final String SUBSCRIPTION_NAME_OP_PARAM = "subscription_name";
public static final String SUBSCRIPTION_TYPE_OP_PARAM = "subscription_type";
public static final String CONSUMER_NAME_OP_PARAM = "consumer_name";
public static final String RANGES_OP_PARAM = "ranges";
private final LongFunction<String> topicPatternFunc;
private final LongFunction<String> subscriptionNameFunc;
private final LongFunction<String> subscriptionTypeFunc;
@ -49,8 +43,8 @@ public class MessageConsumerOpDispenser extends PulsarClientOpDispenser {
private final LongFunction<String> e2eStartTimeSrcParamStrFunc;
private final LongFunction<Consumer> consumerFunction;
private final ThreadLocal<Map<String, ReceivedMessageSequenceTracker>> receivedMessageSequenceTrackersForTopicThreadLocal =
ThreadLocal.withInitial(HashMap::new);
private final ThreadLocal<Map<String, ReceivedMessageSequenceTracker>>
receivedMessageSequenceTrackersForTopicThreadLocal = ThreadLocal.withInitial(HashMap::new);
public MessageConsumerOpDispenser(DriverAdapter adapter,
ParsedOp op,
@ -58,11 +52,16 @@ public class MessageConsumerOpDispenser extends PulsarClientOpDispenser {
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.topicPatternFunc = lookupOptionalStrOpValueFunc(TOPIC_PATTERN_OP_PARAM);
this.subscriptionNameFunc = lookupMandtoryStrOpValueFunc(SUBSCRIPTION_NAME_OP_PARAM);
this.subscriptionTypeFunc = lookupOptionalStrOpValueFunc(SUBSCRIPTION_TYPE_OP_PARAM);
this.cycleConsumerNameFunc = lookupOptionalStrOpValueFunc(CONSUMER_NAME_OP_PARAM);
this.rangesFunc = lookupOptionalStrOpValueFunc(RANGES_OP_PARAM);
this.topicPatternFunc =
lookupOptionalStrOpValueFunc(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
this.subscriptionNameFunc =
lookupMandtoryStrOpValueFunc(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
this.subscriptionTypeFunc =
lookupOptionalStrOpValueFunc(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
this.cycleConsumerNameFunc =
lookupOptionalStrOpValueFunc(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
this.rangesFunc =
lookupOptionalStrOpValueFunc(PulsarAdapterUtil.CONSUMER_CONF_CUSTOM_KEY.ranges.label);
this.e2eStartTimeSrcParamStrFunc = lookupOptionalStrOpValueFunc(
PulsarAdapterUtil.DOC_LEVEL_PARAMS.E2E_STARTING_TIME_SOURCE.label, "none");
this.consumerFunction = (l) -> getConsumer(

View File

@ -18,20 +18,19 @@ package io.nosqlbench.adapter.pulsar.dispensers;
import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.ops.MessageProducerOp;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import java.util.Optional;
import java.util.function.LongFunction;
public class MessageProducerOpDispenser extends PulsarClientOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
public static final String PRODUCER_NAME_OP_PARAM = "producer_name";
public static final String MSG_KEY_OP_PARAM = "msg_key";
public static final String MSG_PROP_OP_PARAM = "msg_prop";
public static final String MSG_VALUE_OP_PARAM = "msg_value";
@ -48,7 +47,8 @@ public class MessageProducerOpDispenser extends PulsarClientOpDispenser {
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.cycleProducerNameFunc = lookupOptionalStrOpValueFunc(PRODUCER_NAME_OP_PARAM);
this.cycleProducerNameFunc =
lookupOptionalStrOpValueFunc(PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.producerName.label);
this.producerFunc = (l) -> getProducer(tgtNameFunc.apply(l), cycleProducerNameFunc.apply(l));
this.msgKeyFunc = lookupOptionalStrOpValueFunc(MSG_KEY_OP_PARAM);
this.msgPropFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
@ -65,7 +65,7 @@ public class MessageProducerOpDispenser extends PulsarClientOpDispenser {
useTransactFunc.apply(cycle),
seqTrackingFunc.apply(cycle),
transactSupplierFunc.apply(cycle),
errSimuTypeSetFunc.apply(cycle),
msgSeqErrSimuTypeSetFunc.apply(cycle),
producerFunc.apply(cycle),
msgKeyFunc.apply(cycle),
msgPropFunc.apply(cycle),

View File

@ -43,7 +43,8 @@ public class MessageReaderOpDispenser extends PulsarClientOpDispenser {
PulsarSpace pulsarSpace) {
super(adapter, op, tgtNameFunc, pulsarSpace);
this.cycleReaderNameFunc = lookupMandtoryStrOpValueFunc("reader_name");
this.cycleReaderNameFunc =
lookupMandtoryStrOpValueFunc(PulsarAdapterUtil.READER_CONF_STD_KEY.readerName.label);
this.msgStartPosStrFunc = lookupOptionalStrOpValueFunc(
"start_msg_position", PulsarAdapterUtil.READER_MSG_POSITION_TYPE.earliest.label);
this.readerFunc = (l) -> getReader(

View File

@ -195,49 +195,49 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return apiMetricsPrefix;
}
//////////////////////////////////////
// Producer Processing --> start
//////////////////////////////////////
//
// Topic name IS mandatory for a producer
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerTopicName(String cycleTopicName) {
if (!StringUtils.isBlank(cycleTopicName)) {
return cycleTopicName;
// A configuration parameter can be set either at the global level (config.properties file),
// or at the cycle level (<nb_scenario>.yaml file).
// If set at both levels, cycle level setting takes precedence
private String getEffectiveConValue(String confCategory, String confParamName, String cycleConfValue) {
if (!StringUtils.isBlank(cycleConfValue)) {
return cycleConfValue;
}
String globalTopicName = pulsarSpace.getPulsarNBClientConf().getProducerTopicName();
if (!StringUtils.isBlank(globalTopicName)) {
return globalTopicName;
}
if (PulsarAdapterUtil.isValidConfCategory(confCategory)) {
Map<String, String> catConfMap = new HashMap<>();
throw new PulsarAdapterInvalidParamException(
"Effective topic name for a producer can't NOT be empty, " +
"it must be set either as a corresponding adapter Op parameter value or " +
"set in the global Pulsar conf file.");
}
if (StringUtils.equalsIgnoreCase(confCategory, PulsarAdapterUtil.CONF_GATEGORY.Schema.label))
catConfMap = pulsarSpace.getPulsarNBClientConf().getSchemaConfMapRaw();
else if (StringUtils.equalsIgnoreCase(confCategory, PulsarAdapterUtil.CONF_GATEGORY.Client.label))
catConfMap = pulsarSpace.getPulsarNBClientConf().getClientConfMapRaw();
else if (StringUtils.equalsIgnoreCase(confCategory, PulsarAdapterUtil.CONF_GATEGORY.Producer.label))
catConfMap = pulsarSpace.getPulsarNBClientConf().getProducerConfMapRaw();
else if (StringUtils.equalsIgnoreCase(confCategory, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label))
catConfMap = pulsarSpace.getPulsarNBClientConf().getConsumerConfMapRaw();
else if (StringUtils.equalsIgnoreCase(confCategory, PulsarAdapterUtil.CONF_GATEGORY.Reader.label))
catConfMap = pulsarSpace.getPulsarNBClientConf().getReaderConfMapRaw();
// Producer name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerName(String cycleProducerName) {
if (!StringUtils.isBlank(cycleProducerName)) {
return cycleProducerName;
}
String globalProducerName = pulsarSpace.getPulsarNBClientConf().getProducerName();
if (!StringUtils.isBlank(globalProducerName)) {
return globalProducerName;
String globalConfValue = catConfMap.get(confParamName);
if (!StringUtils.isBlank(globalConfValue)) {
return globalConfValue;
}
}
return "";
}
public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) {
String topicName = getEffectiveProducerTopicName(cycleTopicName);
String producerName = getEffectiveProducerName(cycleProducerName);
String topicName = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Producer.label,
PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.topicName.label,
cycleTopicName);
String producerName = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Producer.label,
PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.producerName.label,
cycleProducerName);
String producerCacheKey = PulsarAdapterUtil.buildCacheKey(producerName, topicName);
Producer<?> producer = pulsarSpace.getProducer(producerCacheKey);
@ -248,7 +248,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarSpace.getPulsarNBClientConf().getProducerConfMapTgt();
// Remove global level settings: "topicName" and "producerName"
// Remove global level settings
producerConf.remove(PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.topicName.label);
producerConf.remove(PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.producerName.label);
@ -279,33 +279,11 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return producer;
}
//
//////////////////////////////////////
// Producer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Consumer Processing --> start
//////////////////////////////////////
//
private String getEffectiveConsumerTopicNameListStr(String cycleTopicNameListStr) {
if (!StringUtils.isBlank(cycleTopicNameListStr)) {
return cycleTopicNameListStr;
}
String globalTopicNames = pulsarSpace.getPulsarNBClientConf().getConsumerTopicNames();
if (!StringUtils.isBlank(globalTopicNames)) {
return globalTopicNames;
}
return "";
}
private List<String> getEffectiveConsumerTopicNameList(String cycleTopicNameListStr) {
String effectiveTopicNamesStr = getEffectiveConsumerTopicNameListStr(cycleTopicNameListStr);
String effectiveTopicNamesStr = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicNames.label,
cycleTopicNameListStr);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
@ -318,21 +296,12 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return effectiveTopicNameList;
}
private String getEffectiveConsumerTopicPatternStr(String cycleTopicPatternStr) {
if (!StringUtils.isBlank(cycleTopicPatternStr)) {
return cycleTopicPatternStr;
}
String globalTopicsPattern = pulsarSpace.getPulsarNBClientConf().getConsumerTopicPattern();
if (!StringUtils.isBlank(globalTopicsPattern)) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveConsumerTopicPattern(String cycleTopicPatternStr) {
String effectiveTopicPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicPatternStr);
String effectiveTopicPatternStr = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
cycleTopicPatternStr);
Pattern topicsPattern;
try {
if (!StringUtils.isBlank(effectiveTopicPatternStr))
@ -345,72 +314,27 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return topicsPattern;
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String subscriptionTypeStr = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label,
cycleSubscriptionType);
// Subscription name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
if (!StringUtils.isBlank(cycleSubscriptionName)) {
return cycleSubscriptionName;
}
String globalSubscriptionName = pulsarSpace.getPulsarNBClientConf().getConsumerSubscriptionName();
if (!StringUtils.isBlank(globalSubscriptionName)) {
return globalSubscriptionName;
}
throw new PulsarAdapterInvalidParamException(
"Effective subscription name for a consumer can't NOT be empty, " +
"it must be set either as a corresponding adapter Op parameter value or " +
"set in the global Pulsar conf file.");
}
private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) {
String subscriptionTypeStr = "";
if (!StringUtils.isBlank(cycleSubscriptionType)) {
subscriptionTypeStr = cycleSubscriptionType;
}
else {
String globalSubscriptionType = pulsarSpace.getPulsarNBClientConf().getConsumerSubscriptionType();
if (!StringUtils.isBlank(globalSubscriptionType)) {
subscriptionTypeStr = globalSubscriptionType;
SubscriptionType subscriptionType = SubscriptionType.Exclusive; // default subscription type
if (!StringUtils.isBlank(subscriptionTypeStr)) {
try {
subscriptionType = SubscriptionType.valueOf(subscriptionTypeStr);
}
catch (Exception e) {
throw new PulsarAdapterInvalidParamException(
"Invalid effective subscription type for a consumer (\"" + subscriptionTypeStr + "\"). " +
"It must be one of the following values: " + PulsarAdapterUtil.getValidSubscriptionTypeList());
}
}
if (StringUtils.isNotBlank(subscriptionTypeStr) &&
!PulsarAdapterUtil.isValidSubscriptionType(subscriptionTypeStr)) {
throw new PulsarAdapterInvalidParamException(
"Invalid effective subscription type for a consumer (\"" + subscriptionTypeStr + "\"). " +
"It must be one of the following values: " + PulsarAdapterUtil.getValidSubscriptionTypeList());
}
return subscriptionTypeStr;
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType);
SubscriptionType subscriptionType = SubscriptionType.Exclusive; // default subscription type
if (!StringUtils.isBlank(effectiveSubscriptionStr)) {
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
}
return subscriptionType;
}
private String getEffectiveConsumerName(String cycleConsumerName) {
if (!StringUtils.isBlank(cycleConsumerName)) {
return cycleConsumerName;
}
String globalConsumerName = pulsarSpace.getPulsarNBClientConf().getConsumerName();
if (!StringUtils.isBlank(globalConsumerName)) {
return globalConsumerName;
}
return "";
}
public Consumer<?> getConsumer(String cycleTopicNameListStr,
String cycleTopicPatternStr,
String cycleSubscriptionName,
@ -419,15 +343,28 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
String cycleKeySharedSubscriptionRanges) {
List<String> topicNameList = getEffectiveConsumerTopicNameList(cycleTopicNameListStr);
String topicPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicPatternStr);
String topicPatternStr = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
cycleTopicPatternStr);
Pattern topicPattern = getEffectiveConsumerTopicPattern(cycleTopicPatternStr);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
String subscriptionName = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label,
cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
String consumerName = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.consumerName.label,
cycleConsumerName);
if ( subscriptionType.equals(SubscriptionType.Exclusive) && (totalThreadNum > 1) ) {
throw new PulsarAdapterInvalidParamException(
MessageConsumerOpDispenser.SUBSCRIPTION_TYPE_OP_PARAM,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label,
"creating multiple consumers of \"Exclusive\" subscription type under the same subscription name");
}
@ -456,21 +393,30 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
// Get other possible consumer settings that are set at global level
Map<String, Object> consumerConf = new HashMap<>(pulsarSpace.getPulsarNBClientConf().getConsumerConfMapTgt());
// Remove global level settings:
// - "topicNames", "topicsPattern", "subscriptionName", "subscriptionType", "consumerName"
consumerConf.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
consumerConf.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
consumerConf.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
// Remove non-standard consumer configuration properties
consumerConf.remove(PulsarAdapterUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
Map<String, Object> consumerConf =
new HashMap<>(pulsarSpace.getPulsarNBClientConf().getConsumerConfMapTgt());
Map<String, Object> consumerConfToLoad = new HashMap<>();
consumerConfToLoad.putAll(consumerConf);
try {
ConsumerBuilder<?> consumerBuilder;
// Remove settings that will be handled outside "loadConf()"
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
// TODO: It looks like loadConf() method can't handle the following settings properly.
// Do these settings manually for now
// - deadLetterPolicy
// - negativeAckRedeliveryBackoff
// - ackTimeoutRedeliveryBackoff
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.deadLetterPolicy.label);
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.negativeAckRedeliveryBackoff.label);
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.ackTimeoutRedeliveryBackoff.label);
if (!multiTopicConsumer) {
assert (topicNameList.size() == 1);
consumerBuilder = pulsarClient.newConsumer(pulsarSpace.getPulsarSchema());
@ -487,10 +433,24 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
}
}
consumerBuilder.
loadConf(consumerConf).
subscriptionName(subscriptionName).
subscriptionType(subscriptionType);
consumerBuilder.loadConf(consumerConfToLoad);
if (consumerConf.containsKey(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.deadLetterPolicy.label)) {
consumerBuilder.deadLetterPolicy((DeadLetterPolicy)
consumerConf.get(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.deadLetterPolicy.label));
}
if (consumerConf.containsKey(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.negativeAckRedeliveryBackoff.label)) {
consumerBuilder.negativeAckRedeliveryBackoff((RedeliveryBackoff)
consumerConf.get(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.negativeAckRedeliveryBackoff.label));
}
if (consumerConf.containsKey(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.ackTimeoutRedeliveryBackoff.label)) {
consumerBuilder.ackTimeoutRedeliveryBackoff((RedeliveryBackoff)
consumerConf.get(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.ackTimeoutRedeliveryBackoff.label));
}
consumerBuilder
.subscriptionName(subscriptionName)
.subscriptionType(subscriptionType);
if (!StringUtils.isBlank(consumerName))
consumerBuilder.consumerName(consumerName);
@ -508,15 +468,12 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
consumer = consumerBuilder.subscribe();
pulsarSpace.setConsumer(consumerCacheKey, consumer);
if (instrument) {
pulsarAdapterMetrics.registerConsumerApiMetrics(
consumer,
getPulsarAPIMetricsPrefix(
PulsarAdapterUtil.PULSAR_API_TYPE.CONSUMER.label,
consumerName,
consumerTopicListString));
}
pulsarAdapterMetrics.registerConsumerApiMetrics(
consumer,
getPulsarAPIMetricsPrefix(
PulsarAdapterUtil.PULSAR_API_TYPE.CONSUMER.label,
consumerName,
consumerTopicListString));
}
catch (PulsarClientException ple) {
throw new PulsarAdapterUnexpectedException("Failed to create a Pulsar consumer!");
@ -549,73 +506,24 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return result;
}
//
//////////////////////////////////////
// Consumer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Reader Processing --> Start
//////////////////////////////////////
//
// Topic name IS mandatory for a reader
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveReaderTopicName(String cycleReaderTopicName) {
if (!StringUtils.isBlank(cycleReaderTopicName)) {
return cycleReaderTopicName;
}
String globalReaderTopicName = pulsarSpace.getPulsarNBClientConf().getReaderTopicName();
if (!StringUtils.isBlank(globalReaderTopicName)) {
return globalReaderTopicName;
}
throw new PulsarAdapterInvalidParamException(
"Effective topic name for a reader can't NOT be empty, " +
"it must be set either as a corresponding adapter Op parameter value or " +
"set in the global Pulsar conf file.");
}
// Reader name is NOT mandatory
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveReaderName(String cycleReaderName) {
if (!StringUtils.isBlank(cycleReaderName)) {
return cycleReaderName;
}
String globalReaderName = pulsarSpace.getPulsarNBClientConf().getReaderName();
if (!StringUtils.isBlank(globalReaderName)) {
return globalReaderName;
}
return "";
}
private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) {
if (!StringUtils.isBlank(cycleStartMsgPosStr)) {
return cycleStartMsgPosStr;
}
String globalStartMsgPosStr = pulsarSpace.getPulsarNBClientConf().getStartMsgPosStr();
if (!StringUtils.isBlank(globalStartMsgPosStr)) {
return globalStartMsgPosStr;
}
return PulsarAdapterUtil.READER_MSG_POSITION_TYPE.latest.label;
}
public Reader<?> getReader(String cycleTopicName,
String cycleReaderName,
String cycleStartMsgPos) {
String topicName = getEffectiveReaderTopicName(cycleTopicName);
String readerName = getEffectiveReaderName(cycleReaderName);
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
String topicName = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Reader.label,
PulsarAdapterUtil.READER_CONF_STD_KEY.topicName.label,
cycleTopicName);
String readerName = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Reader.label,
PulsarAdapterUtil.READER_CONF_STD_KEY.readerName.label,
cycleReaderName);
String startMsgPosStr = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Reader.label,
PulsarAdapterUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label,
cycleStartMsgPos);
if (!PulsarAdapterUtil.isValideReaderStartPosition(startMsgPosStr)) {
throw new RuntimeException("Reader:: Invalid value for reader start message position!");
}

View File

@ -49,7 +49,7 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
protected final LongFunction<Boolean> seqTrackingFunc;
protected final LongFunction<String> payloadRttFieldFunc;
protected final LongFunction<Supplier<Transaction>> transactSupplierFunc;
protected final LongFunction<Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE>> errSimuTypeSetFunc;
protected final LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> msgSeqErrSimuTypeSetFunc;
public PulsarClientOpDispenser(DriverAdapter adapter,
ParsedOp op,
@ -79,7 +79,7 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
this.transactSupplierFunc = (l) -> getTransactionSupplier();
this.errSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
this.msgSeqErrSimuTypeSetFunc = getStaticErrSimuTypeSetOpValueFunc();
}
protected Supplier<Transaction> getTransactionSupplier() {
@ -101,16 +101,16 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
};
}
protected LongFunction<Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
LongFunction<Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
protected LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> getStaticErrSimuTypeSetOpValueFunc() {
LongFunction<Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE>> setStringLongFunction;
setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue("seqerr_simu", String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
.map(PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE::parseSimuType)
.map(PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE::parseSimuType)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toCollection(LinkedHashSet::new));

View File

@ -49,7 +49,7 @@ public class MessageProducerOp extends PulsarClientOp {
private final boolean useTransact;
private final boolean seqTracking;
private final Supplier<Transaction> transactSupplier;
private final Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
private final Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet;
private final Producer<?> producer;
private final String msgKey;
private final String msgPropRawJsonStr;
@ -66,7 +66,7 @@ public class MessageProducerOp extends PulsarClientOp {
boolean useTransact,
boolean seqTracking,
Supplier<Transaction> transactSupplier,
Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> errSimuTypeSet,
Producer<?> producer,
String msgKey,
String msgProp,
@ -142,15 +142,6 @@ public class MessageProducerOp extends PulsarClientOp {
int messageSize;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (pulsarSchema instanceof KeyValueSchema) {
// // {KEY IN JSON}||{VALUE IN JSON}
// int separator = msgValue.indexOf("}||{");
// if (separator < 0) {
// throw new IllegalArgumentException("KeyValue payload MUST be in form {KEY IN JSON}||{VALUE IN JSON} (with 2 pipes that separate the KEY part from the VALUE part)");
// }
// String keyInput = msgValue.substring(0, separator + 1);
// String valueInput = msgValue.substring(separator + 3);
KeyValueSchema keyValueSchema = (KeyValueSchema) pulsarSchema;
org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
GenericRecord payload = PulsarAvroSchemaUtil.GetGenericRecord_PulsarAvro(

View File

@ -18,7 +18,6 @@ package io.nosqlbench.adapter.pulsar.util;
*/
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import org.apache.commons.lang3.RandomUtils;
import java.util.ArrayDeque;
@ -34,16 +33,16 @@ public class MessageSequenceNumberSendingHandler {
long number = 1;
Queue<Long> outOfOrderNumbers;
public long getNextSequenceNumber(Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes) {
public long getNextSequenceNumber(Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes) {
return getNextSequenceNumber(simulatedErrorTypes, SIMULATED_ERROR_PROBABILITY_PERCENTAGE);
}
long getNextSequenceNumber(Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
long getNextSequenceNumber(Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
simulateError(simulatedErrorTypes, errorProbabilityPercentage);
return nextNumber();
}
private void simulateError(Set<PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
private void simulateError(Set<PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE> simulatedErrorTypes, int errorProbabilityPercentage) {
if (!simulatedErrorTypes.isEmpty() && shouldSimulateError(errorProbabilityPercentage)) {
int selectIndex = 0;
int numberOfErrorTypes = simulatedErrorTypes.size();
@ -51,7 +50,7 @@ public class MessageSequenceNumberSendingHandler {
// pick one of the simulated error type randomly
selectIndex = RandomUtils.nextInt(0, numberOfErrorTypes);
}
PulsarAdapterUtil.SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream()
PulsarAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE errorType = simulatedErrorTypes.stream()
.skip(selectIndex)
.findFirst()
.get();

View File

@ -83,6 +83,29 @@ public class PulsarAdapterUtil {
return Arrays.stream(PULSAR_API_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid configuration categories
public enum CONF_GATEGORY {
Schema("schema"),
Client("client"),
Producer("producer"),
Consumer("consumer"),
Reader("reader");
public final String label;
CONF_GATEGORY(String label) {
this.label = label;
}
}
public static boolean isValidConfCategory(String item) {
return Arrays.stream(CONF_GATEGORY.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidConfCategoryList() {
return Arrays.stream(CONF_GATEGORY.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid persistence type
public enum PERSISTENT_TYPES {
@ -165,6 +188,27 @@ public class PulsarAdapterUtil {
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
// compressionType
public enum COMPRESSION_TYPE {
NONE("NONE"),
LZ4("LZ4"),
ZLIB("ZLIB"),
ZSTD("ZSTD"),
SNAPPY("SNAPPY");
public final String label;
COMPRESSION_TYPE(String label) {
this.label = label;
}
}
public static boolean isValidCompressionType(String item) {
return Arrays.stream(COMPRESSION_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidCompressionTypeList() {
return Arrays.stream(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Standard consumer configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#consumer
@ -189,7 +233,12 @@ public class PulsarAdapterUtil {
regexSubscriptionMode("regexSubscriptionMode"),
deadLetterPolicy("deadLetterPolicy"),
autoUpdatePartitions("autoUpdatePartitions"),
replicateSubscriptionState("replicateSubscriptionState");
replicateSubscriptionState("replicateSubscriptionState"),
negativeAckRedeliveryBackoff("negativeAckRedeliveryBackoff"),
ackTimeoutRedeliveryBackoff("ackTimeoutRedeliveryBackoff"),
autoAckOldestChunkedMessageOnQueueFull("autoAckOldestChunkedMessageOnQueueFull"),
maxPendingChunkedMessage("maxPendingChunkedMessage"),
expireTimeOfIncompleteChunkedMessageMillis("expireTimeOfIncompleteChunkedMessageMillis");
public final String label;
@ -206,7 +255,8 @@ public class PulsarAdapterUtil {
// - NOT part of https://pulsar.apache.org/docs/en/client-libraries-java/#consumer
// - NB Pulsar driver consumer operation specific
public enum CONSUMER_CONF_CUSTOM_KEY {
timeout("timeout");
timeout("timeout"),
ranges("ranges");
public final String label;
@ -218,8 +268,7 @@ public class PulsarAdapterUtil {
return Arrays.stream(CONSUMER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Pulsar subscription type
// subscriptionTyp
public enum SUBSCRIPTION_TYPE {
Exclusive("Exclusive"),
Failover("Failover"),
@ -239,6 +288,43 @@ public class PulsarAdapterUtil {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// subscriptionInitialPosition
public enum SUBSCRIPTION_INITIAL_POSITION {
Earliest("Earliest"),
Latest("Latest");
public final String label;
SUBSCRIPTION_INITIAL_POSITION(String label) {
this.label = label;
}
}
public static boolean isValidSubscriptionInitialPosition(String item) {
return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidSubscriptionInitialPositionList() {
return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// regexSubscriptionMode
public enum REGEX_SUBSCRIPTION_MODE {
Persistent("PersistentOnly"),
NonPersistent("NonPersistentOnly"),
All("AllTopics");
public final String label;
REGEX_SUBSCRIPTION_MODE(String label) {
this.label = label;
}
}
public static boolean isValidRegexSubscriptionMode(String item) {
return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidRegexSubscriptionModeList() {
return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Standard reader configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#reader
@ -284,8 +370,7 @@ public class PulsarAdapterUtil {
// Valid read positions for a Pulsar reader
public enum READER_MSG_POSITION_TYPE {
earliest("earliest"),
latest("latest"),
custom("custom");
latest("latest");
public final String label;
@ -298,22 +383,22 @@ public class PulsarAdapterUtil {
}
///////
// Pulsar subscription type
public enum SEQ_ERROR_SIMU_TYPE {
// Message processing sequence error simulation types
public enum MSG_SEQ_ERROR_SIMU_TYPE {
OutOfOrder("out_of_order"),
MsgLoss("msg_loss"),
MsgDup("msg_dup");
public final String label;
SEQ_ERROR_SIMU_TYPE(String label) {
MSG_SEQ_ERROR_SIMU_TYPE(String label) {
this.label = label;
}
private static final Map<String, SEQ_ERROR_SIMU_TYPE> MAPPING = new HashMap<>();
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = new HashMap<>();
static {
for (SEQ_ERROR_SIMU_TYPE simuType : values()) {
for (MSG_SEQ_ERROR_SIMU_TYPE simuType : values()) {
MAPPING.put(simuType.label, simuType);
MAPPING.put(simuType.label.toLowerCase(), simuType);
MAPPING.put(simuType.label.toUpperCase(), simuType);
@ -323,40 +408,15 @@ public class PulsarAdapterUtil {
}
}
public static Optional<SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
public static Optional<MSG_SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
}
}
public static boolean isValidSeqErrSimuType(String item) {
return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidSeqErrSimuTypeList() {
return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid websocket-producer configuration (activity-level settings)
// TODO: to be added
public enum WEBSKT_PRODUCER_CONF_KEY {
;
public final String label;
WEBSKT_PRODUCER_CONF_KEY(String label) {
this.label = label;
}
}
///////
// Valid managed-ledger configuration (activity-level settings)
// TODO: to be added
public enum MANAGED_LEDGER_CONF_KEY {
;
public final String label;
MANAGED_LEDGER_CONF_KEY(String label) {
this.label = label;
}
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
@ -385,6 +445,10 @@ public class PulsarAdapterUtil {
public static Schema<?> getPrimitiveTypeSchema(String typeStr) {
Schema<?> schema;
if (StringUtils.isBlank(typeStr)) {
typeStr = "BYTES";
}
switch (typeStr.toUpperCase()) {
case "BOOLEAN":
schema = Schema.BOOL;
@ -428,14 +492,12 @@ public class PulsarAdapterUtil {
case "LOCAL_DATE_TIME":
schema = Schema.LOCAL_DATE_TIME;
break;
// Use BYTES as the default schema type if the type string is not specified
case "":
case "BYTES":
schema = Schema.BYTES;
break;
// Report an error if non-valid, non-empty schema type string is provided
default:
throw new RuntimeException("Invalid Pulsar primitive schema type string : " + typeStr);
throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr);
}
return schema;
@ -444,15 +506,12 @@ public class PulsarAdapterUtil {
///////
// Complex strut type: Avro or Json
public static boolean isAvroSchemaTypeStr(String typeStr) {
return typeStr.equalsIgnoreCase("AVRO");
}
public static boolean isKeyValueTypeStr(String typeStr) {
return typeStr.equalsIgnoreCase("KEY_VALUE");
return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AVRO"));
}
// automatic decode the type from the Registry
public static boolean isAutoConsumeSchemaTypeStr(String typeStr) {
return typeStr.equalsIgnoreCase("AUTO_CONSUME");
return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AUTO_CONSUME"));
}
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
String schemaDefinitionStr = definitionStr;

View File

@ -24,6 +24,7 @@ import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -39,11 +40,6 @@ public class PulsarClientConf {
private String canonicalFilePath = "";
public static final String SCHEMA_CONF_PREFIX = "schema";
public static final String CLIENT_CONF_PREFIX = "client";
public static final String PRODUCER_CONF_PREFIX = "producer";
public static final String CONSUMER_CONF_PREFIX = "consumer";
public static final String READER_CONF_PREFIX = "reader";
private final Map<String, String> schemaConfMapRaw = new HashMap<>();
private final Map<String, String> clientConfMapRaw = new HashMap<>();
@ -75,8 +71,8 @@ public class PulsarClientConf {
//////////////////
// Convert the raw configuration map (<String,String>) to the required map (<String,Object>)
producerConfMapTgt.putAll(PulsarConfConverter.convertRawProducerConf(producerConfMapRaw));
consumerConfMapTgt.putAll(PulsarConfConverter.convertRawConsumerConf(consumerConfMapRaw));
producerConfMapTgt.putAll(PulsarConfConverter.convertStdRawProducerConf(producerConfMapRaw));
consumerConfMapTgt.putAll(PulsarConfConverter.convertStdRawConsumerConf(consumerConfMapRaw));
// TODO: Reader API is not disabled at the moment. Revisit when needed
}
@ -103,28 +99,28 @@ public class PulsarClientConf {
if (!StringUtils.isBlank(confVal)) {
// Get schema specific configuration settings, removing "schema." prefix
if (StringUtils.startsWith(confKey, SCHEMA_CONF_PREFIX)) {
schemaConfMapRaw.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), confVal);
if (StringUtils.startsWith(confKey, PulsarAdapterUtil.CONF_GATEGORY.Schema.label)) {
schemaConfMapRaw.put(confKey.substring(PulsarAdapterUtil.CONF_GATEGORY.Schema.label.length() + 1), confVal);
}
// Get client connection specific configuration settings, removing "client." prefix
// <<< https://pulsar.apache.org/docs/reference-configuration/#client >>>
else if (StringUtils.startsWith(confKey, CLIENT_CONF_PREFIX)) {
clientConfMapRaw.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), confVal);
else if (StringUtils.startsWith(confKey, PulsarAdapterUtil.CONF_GATEGORY.Client.label)) {
clientConfMapRaw.put(confKey.substring(PulsarAdapterUtil.CONF_GATEGORY.Client.label.length() + 1), confVal);
}
// Get producer specific configuration settings, removing "producer." prefix
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>>
else if (StringUtils.startsWith(confKey, PRODUCER_CONF_PREFIX)) {
producerConfMapRaw.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), confVal);
else if (StringUtils.startsWith(confKey, PulsarAdapterUtil.CONF_GATEGORY.Producer.label)) {
producerConfMapRaw.put(confKey.substring(PulsarAdapterUtil.CONF_GATEGORY.Producer.label.length() + 1), confVal);
}
// Get consumer specific configuration settings, removing "consumer." prefix
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer >>>
else if (StringUtils.startsWith(confKey, CONSUMER_CONF_PREFIX)) {
consumerConfMapRaw.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), confVal);
else if (StringUtils.startsWith(confKey, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label)) {
consumerConfMapRaw.put(confKey.substring(PulsarAdapterUtil.CONF_GATEGORY.Consumer.label.length() + 1), confVal);
}
// Get reader specific configuration settings, removing "reader." prefix
// <<< https://pulsar.apache.org/docs/2.10.x/client-libraries-java/#configure-reader >>>
else if (StringUtils.startsWith(confKey, READER_CONF_PREFIX)) {
readerConfMapRaw.put(confKey.substring(READER_CONF_PREFIX.length() + 1), confVal);
else if (StringUtils.startsWith(confKey, PulsarAdapterUtil.CONF_GATEGORY.Reader.label)) {
readerConfMapRaw.put(confKey.substring(PulsarAdapterUtil.CONF_GATEGORY.Reader.label.length() + 1), confVal);
}
}
}
@ -161,163 +157,111 @@ public class PulsarClientConf {
//////////////////
// Get Schema related config
public boolean hasSchemaConfKey(String key) {
if (key.contains(SCHEMA_CONF_PREFIX))
return schemaConfMapRaw.containsKey(key.substring(SCHEMA_CONF_PREFIX.length() + 1));
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Schema.label))
return schemaConfMapRaw.containsKey(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Schema.label.length() + 1));
else
return schemaConfMapRaw.containsKey(key);
}
public String getSchemaConfValue(String key) {
if (key.contains(SCHEMA_CONF_PREFIX))
return schemaConfMapRaw.get(key.substring(SCHEMA_CONF_PREFIX.length()+1));
else
return schemaConfMapRaw.get(key);
public String getSchemaConfValueRaw(String key) {
if (hasSchemaConfKey(key)) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Schema.label))
return schemaConfMapRaw.get(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Schema.label.length() + 1));
else
return schemaConfMapRaw.get(key);
}
else {
return "";
}
}
//////////////////
// Get Pulsar client related config
public String getClientConfValue(String key) {
if (key.contains(CLIENT_CONF_PREFIX))
return clientConfMapRaw.get(key.substring(CLIENT_CONF_PREFIX.length()+1));
public boolean hasClientConfKey(String key) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Client.label))
return clientConfMapRaw.containsKey(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Client.label.length() + 1));
else
return clientConfMapRaw.get(key);
return clientConfMapRaw.containsKey(key);
}
public String getClientConfValueRaw(String key) {
if (hasClientConfKey(key)) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Client.label))
return clientConfMapRaw.get(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Client.label.length() + 1));
else
return clientConfMapRaw.get(key);
}
else {
return "";
}
}
//////////////////
// Get Pulsar producer related config
public Object getProducerConfValue(String key) {
if (key.contains(PRODUCER_CONF_PREFIX))
return producerConfMapTgt.get(key.substring(PRODUCER_CONF_PREFIX.length()+1));
public boolean hasProducerConfKey(String key) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Producer.label))
return producerConfMapRaw.containsKey(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Producer.label.length() + 1));
else
return producerConfMapTgt.get(key);
return producerConfMapRaw.containsKey(key);
}
// other producer helper functions ...
public String getProducerName() {
Object confValue = getProducerConfValue(
"producer." + PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.producerName.label);
if (confValue == null)
public String getProducerConfValueRaw(String key) {
if (hasProducerConfKey(key)) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Producer.label))
return producerConfMapRaw.get(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Producer.label.length()+1));
else
return producerConfMapRaw.get(key);
}
else {
return "";
else
return confValue.toString();
}
public String getProducerTopicName() {
Object confValue = getProducerConfValue(
"producer." + PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.topicName);
if (confValue == null)
return "";
else
return confValue.toString();
}
}
//////////////////
// Get Pulsar consumer related config
public String getConsumerConfValue(String key) {
if (key.contains(CONSUMER_CONF_PREFIX))
return consumerConfMapRaw.get(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
public boolean hasConsumerConfKey(String key) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Consumer.label))
return consumerConfMapRaw.containsKey(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Consumer.label.length() + 1));
else
return consumerConfMapRaw.get(key);
return consumerConfMapRaw.containsKey(key);
}
// Other consumer helper functions ...
public String getConsumerTopicNames() {
String confValue = getConsumerConfValue(
"consumer." + PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
if (confValue == null)
public String getConsumerConfValueRaw(String key) {
if (hasConsumerConfKey(key)) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Consumer.label))
return consumerConfMapRaw.get(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Consumer.label.length() + 1));
else
return consumerConfMapRaw.get(key);
}
else {
return "";
else
return confValue.toString();
}
public String getConsumerTopicPattern() {
Object confValue = getConsumerConfValue(
"consumer." + PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getConsumerSubscriptionName() {
Object confValue = getConsumerConfValue(
"consumer." + PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getConsumerSubscriptionType() {
Object confValue = getConsumerConfValue(
"consumer." + PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getConsumerName() {
Object confValue = getConsumerConfValue(
"consumer." + PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
}
// NOTE: Below are not a standard Pulsar consumer configuration parameter as
// listed in "https://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer"
// They're custom-made configuration properties for NB pulsar driver consumer.
public int getConsumerTimeoutSeconds() {
Object confValue = getConsumerConfValue(
String confValue = getConsumerConfValueRaw(
"consumer." + PulsarAdapterUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
if (confValue == null)
return -1; // infinite
else
return Integer.parseInt(confValue.toString());
return NumberUtils.toInt(confValue, -1);
}
//////////////////
// Get Pulsar reader related config
public boolean hasReaderConfKey(String key) {
if (key.contains(READER_CONF_PREFIX))
return readerConfMapRaw.containsKey(key.substring(READER_CONF_PREFIX.length() + 1));
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Reader.label))
return readerConfMapRaw.containsKey(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Reader.label.length() + 1));
else
return readerConfMapRaw.containsKey(key);
}
public Object getReaderConfValue(String key) {
if (key.contains(READER_CONF_PREFIX))
return readerConfMapRaw.get(key.substring(READER_CONF_PREFIX.length() + 1));
else
return readerConfMapRaw.get(key);
}
public void setReaderConfValue(String key, String value) {
if (key.contains(READER_CONF_PREFIX))
readerConfMapRaw.put(key.substring(READER_CONF_PREFIX.length() + 1), value);
else
readerConfMapRaw.put(key, value);
}
// Other reader helper functions ...
public String getReaderTopicName() {
Object confValue = getReaderConfValue(
"reader." + PulsarAdapterUtil.READER_CONF_STD_KEY.topicName.label);
if (confValue == null)
public String getReaderConfValueRaw(String key) {
if (hasReaderConfKey(key)) {
if (key.contains(PulsarAdapterUtil.CONF_GATEGORY.Reader.label))
return readerConfMapRaw.get(key.substring(PulsarAdapterUtil.CONF_GATEGORY.Reader.label.length() + 1));
else
return readerConfMapRaw.get(key);
}
else {
return "";
else
return confValue.toString();
}
public String getReaderName() {
Object confValue = getReaderConfValue(
"reader." + PulsarAdapterUtil.READER_CONF_STD_KEY.readerName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
// NOTE: Below are not a standard Pulsar reader configuration parameter as
// listed in "https://pulsar.apache.org/docs/en/client-libraries-java/#reader"
// They're custom-made configuration properties for NB pulsar driver reader.
public String getStartMsgPosStr() {
Object confValue = getReaderConfValue(
"reader." + PulsarAdapterUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
}
}

View File

@ -20,9 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterInvalidParamException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
import java.util.ArrayList;
@ -33,7 +31,7 @@ import java.util.Map;
public class PulsarConfConverter {
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>>
private final static Map<String, String> validPulsarProducerConfKeyTypeMap = Map.ofEntries(
private final static Map<String, String> validStdProducerConfKeyTypeMap = Map.ofEntries(
Map.entry("topicName", "String"),
Map.entry("producerName","String"),
Map.entry("sendTimeoutMs","long"),
@ -50,9 +48,9 @@ public class PulsarConfConverter {
Map.entry("compressionType","CompressionType"),
Map.entry("initialSubscriptionName","string")
);
public static Map<String, Object> convertRawProducerConf(Map<String, String> pulsarProducerConfMapRaw) {
public static Map<String, Object> convertStdRawProducerConf(Map<String, String> pulsarProducerConfMapRaw) {
Map<String, Object> producerConfObjMap = new HashMap<>();
setConfObjMapForPrimitives(producerConfObjMap, pulsarProducerConfMapRaw, validPulsarProducerConfKeyTypeMap);
setConfObjMapForPrimitives(producerConfObjMap, pulsarProducerConfMapRaw, validStdProducerConfKeyTypeMap);
/**
* Non-primitive type processing for Pulsar producer configuration items
@ -63,31 +61,35 @@ public class PulsarConfConverter {
// * hashingScheme
// * cryptoFailureAction
// "compressionType" has value type "CompressionType"
// "compressionType"
// - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY'
String confKeyName = "compressionType";
String confKeyName = PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.compressionType.label;
String confVal = pulsarProducerConfMapRaw.get(confKeyName);
String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)";
String expectedVal = PulsarAdapterUtil.getValidCompressionTypeList();
if (StringUtils.isNotBlank(confVal)) {
if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) {
if ( StringUtils.isNotBlank(confVal) ) {
if (StringUtils.containsIgnoreCase(expectedVal, confVal)) {
CompressionType compressionType = CompressionType.NONE;
switch (StringUtils.upperCase(confVal)) {
case "LZ4":
compressionType = CompressionType.LZ4;
break;
case "ZLIB":
compressionType = CompressionType.ZLIB;
break;
case "ZSTD":
compressionType = CompressionType.ZSTD;
break;
case "SNAPPY":
compressionType = CompressionType.SNAPPY;
break;
}
producerConfObjMap.put(confKeyName, compressionType);
} else {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal));
getInvalidConfValStr(confKeyName, confVal, PulsarAdapterUtil.CONF_GATEGORY.Producer.label, expectedVal));
}
}
@ -96,7 +98,7 @@ public class PulsarConfConverter {
// https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer
private final static Map<String, String> validPulsarConsumerConfKeyTypeMap = Map.ofEntries(
private final static Map<String, String> validStdConsumerConfKeyTypeMap = Map.ofEntries(
Map.entry("topicNames", "Set<String>"),
Map.entry("topicsPattern","Pattern"),
Map.entry("subscriptionName","String"),
@ -124,30 +126,27 @@ public class PulsarConfConverter {
Map.entry("maxPendingChunkedMessage", "int"),
Map.entry("expireTimeOfIncompleteChunkedMessageMillis", "long")
);
public static Map<String, Object> convertRawConsumerConf(Map<String, String> pulsarConsumerConfMapRaw) {
public static Map<String, Object> convertStdRawConsumerConf(Map<String, String> pulsarConsumerConfMapRaw) {
Map<String, Object> consumerConfObjMap = new HashMap<>();
setConfObjMapForPrimitives(consumerConfObjMap, pulsarConsumerConfMapRaw, validPulsarConsumerConfKeyTypeMap);
setConfObjMapForPrimitives(consumerConfObjMap, pulsarConsumerConfMapRaw, validStdConsumerConfKeyTypeMap);
/**
* Non-primitive type processing for Pulsar consumer configuration items
*/
// NOTE: The following non-primitive type configuration items are excluded since
// they'll be handled in PulsarBasedOpDispenser.getConsumer() method directly
// * topicNames
// * topicPattern
// * subscriptionType
// they'll be handled in PulsarBasedOpDispenser.getConsumer() method directly
// * topicNames
// * topicPattern
// * subscriptionType
// TODO: Skip the following Pulsar configuration items for now because they're not really
// needed in the NB S4J testing right now. Add the support for them when needed.
// * subscriptionInitialPosition
// * regexSubscriptionMode
// * cryptoFailureAction
// "properties" has value type "SortedMap<String, String>"
// - expecting the value string has the format: a JSON string that includes a set of key/value pairs
String confKeyName = "properties";
String confKeyName = PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.properties.label;
String confVal = pulsarConsumerConfMapRaw.get(confKeyName);
String expectedVal = "{\"property1\":\"value1\", \"property2\":\"value2\"}, ...";
@ -164,17 +163,58 @@ public class PulsarConfConverter {
} catch (Exception e) {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
getInvalidConfValStr(confKeyName, confVal, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, expectedVal));
}
}
// "subscriptionInitialPosition"
// - expecting the following values: 'Latest' (default),
confKeyName = PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.subscriptionInitialPosition.label;
confVal = pulsarConsumerConfMapRaw.get(confKeyName);
expectedVal = PulsarAdapterUtil.getValidSubscriptionInitialPositionList();
if (StringUtils.isNotBlank(confVal)) {
try {
SubscriptionInitialPosition subInitPos = SubscriptionInitialPosition.Latest;
if (!StringUtils.isBlank(confVal)) {
subInitPos = SubscriptionInitialPosition.valueOf(confVal);
}
consumerConfObjMap.put(confKeyName, subInitPos);
} catch (Exception e) {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, expectedVal));
}
}
// "regexSubscriptionMode"
// - expecting the following values: 'PersistentOnly' (default), 'NonPersistentOnly', and 'AllTopics'
confKeyName = PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.regexSubscriptionMode.label;
confVal = pulsarConsumerConfMapRaw.get(confKeyName);
expectedVal = PulsarAdapterUtil.getValidRegexSubscriptionModeList();
if (StringUtils.isNotBlank(confVal)) {
try {
RegexSubscriptionMode regexSubscriptionMode = RegexSubscriptionMode.PersistentOnly;
if (!StringUtils.isBlank(confVal)) {
regexSubscriptionMode = RegexSubscriptionMode.valueOf(confVal);
}
consumerConfObjMap.put(confKeyName, regexSubscriptionMode);
} catch (Exception e) {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, expectedVal));
}
}
// "deadLetterPolicy"
// - expecting the value is a JSON string has the format:
// {"maxRedeliverCount":"<int_value>","deadLetterTopic":"<topic_name>","initialSubscriptionName":"<sub_name>"}
confKeyName = "deadLetterPolicy";
confKeyName = PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.deadLetterPolicy.label;
confVal = pulsarConsumerConfMapRaw.get(confKeyName);
expectedVal = "{" +
"\"maxRedeliverCount\":\"<int_value>\"," +
"\"maxRedeliverCount\":\"<int_value>\" (mandatory)," +
"\"retryLetterTopic\":\"<topic_name>\"," +
"\"deadLetterTopic\":\"<topic_name>\"," +
"\"initialSubscriptionName\":\"<sub_name>\"}";
@ -188,15 +228,15 @@ public class PulsarConfConverter {
// The JSON key must be one of "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName"
for (String key : dlqPolicyMap.keySet()) {
if (!StringUtils.equalsAnyIgnoreCase(key,
"maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName")) {
if (!StringUtils.equalsAnyIgnoreCase(key, "maxRedeliverCount",
"retryLetterTopic", "deadLetterTopic", "initialSubscriptionName")) {
valid = false;
break;
}
}
// DLQ.maxRedeliverCount is mandatory
if (valid && !dlqPolicyMap.containsKey("maxRedeliverCount")) {
if ( valid && !dlqPolicyMap.containsKey("maxRedeliverCount")) {
valid = false;
}
@ -206,28 +246,42 @@ public class PulsarConfConverter {
}
if (valid) {
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(NumberUtils.toInt(maxRedeliverCountStr))
.deadLetterTopic(dlqPolicyMap.get("deadLetterTopic"))
.initialSubscriptionName(dlqPolicyMap.get("initialSubscriptionName"))
.build();
DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder()
.maxRedeliverCount(NumberUtils.toInt(maxRedeliverCountStr));
String retryTopicName = dlqPolicyMap.get("retryLetterTopic");
String dlqTopicName = dlqPolicyMap.get("deadLetterTopic");
String initialSubName = dlqPolicyMap.get("initialSubscriptionName");
if (StringUtils.isNotBlank(retryTopicName))
builder.retryLetterTopic(retryTopicName);
if (StringUtils.isNotBlank(dlqTopicName))
builder.deadLetterTopic(dlqTopicName);
if (StringUtils.isNotBlank(initialSubName))
builder.initialSubscriptionName(initialSubName);
DeadLetterPolicy deadLetterPolicy = builder.build();
consumerConfObjMap.put(confKeyName, deadLetterPolicy);
} else {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
getInvalidConfValStr(confKeyName, confVal, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, expectedVal));
}
}
} catch (Exception e) {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
getInvalidConfValStr(confKeyName, confVal, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, expectedVal));
}
}
// "negativeAckRedeliveryBackoff" or "ackTimeoutRedeliveryBackoff"
// - expecting the value is a JSON string has the format:
// {"minDelayMs":"<int_value>", "maxDelayMs":"<int_value>", "multiplier":"<double_value>"}
String[] redeliveryBackoffConfigSet = {"negativeAckRedeliveryBackoff", "ackTimeoutRedeliveryBackoff"};
String[] redeliveryBackoffConfigSet = {
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.negativeAckRedeliveryBackoff.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.ackTimeoutRedeliveryBackoff.label
};
expectedVal = "{" +
"\"minDelayMs\":\"<int_value>\"," +
"\"maxDelayMs\":\"<int_value>\"," +
@ -274,24 +328,31 @@ public class PulsarConfConverter {
} else {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKey, confVal, "consumer", expectedVal));
getInvalidConfValStr(confKey, confVal, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, expectedVal));
}
}
} catch (Exception e) {
throw new PulsarAdapterInvalidParamException(
getInvalidConfValStr(confKey, confVal, "consumer", expectedVal));
getInvalidConfValStr(confKey, confVal, PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, expectedVal));
}
}
}
// Remove non-standard consumer configuration items
for (String confKey : consumerConfObjMap.keySet()) {
if (PulsarAdapterUtil.isCustomConsumerConfItem(confKey)) {
consumerConfObjMap.remove(confKey);
}
}
return consumerConfObjMap;
}
// Utility function
// - get configuration key names by the value type
private static List<String> getConfKeyNameByValueType(Map<String, String> confKeyTypeMap, String tgtValType) {
private static List<String> getStdConfKeyNameByValueType(Map<String, String> confKeyTypeMap, String tgtValType) {
ArrayList<String> confKeyNames = new ArrayList<>();
for (Map.Entry entry: confKeyTypeMap.entrySet()) {
@ -310,10 +371,10 @@ public class PulsarConfConverter {
Map<String, String> srcConfMapRaw,
Map<String, String> validConfKeyTypeMap)
{
List<String> confKeyList = new ArrayList<>();
List<String> confKeyList;
// All configuration items with "String" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "String");
confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "String");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);
@ -324,7 +385,7 @@ public class PulsarConfConverter {
}
// All configuration items with "long" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "long");
confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "long");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);
@ -335,7 +396,7 @@ public class PulsarConfConverter {
}
// All configuration items with "int" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "int");
confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "int");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);
@ -346,7 +407,7 @@ public class PulsarConfConverter {
}
// All configuration items with "boolean" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "boolean");
confKeyList = getStdConfKeyNameByValueType(validConfKeyTypeMap, "boolean");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);

View File

@ -22,7 +22,6 @@ client.connectionTimeoutMs=5000
client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
# Cluster admin
client.authParams=
client.tlsAllowInsecureConnection=true
### Producer related configurations (global) - producer.xxx
@ -35,18 +34,12 @@ producer.blockIfQueueFull=true
### Consumer related configurations (global) - consumer.xxx
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
consumer.topicNames=
consumer.topicsPattern=
consumer.subscriptionName=
consumer.subscriptionType=
consumer.consumerName=
consumer.receiverQueueSize=
consumer.subscriptionInitialPosition=Earliest
consumer.ackTimeoutMillis=10000
consumer.regexSubscriptionMode=AllTopics
consumer.deadLetterPolicy={"maxRedeliverCount":"5","retryLetterTopic":"public/default/retry","deadLetterTopic":"public/default/dlq","initialSubscriptionName":"dlq-sub"}
consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"10","maxDelayMs":"20","multiplier":"1.2"}
### Reader related configurations (global) - reader.xxx
# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
# - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file>
reader.topicName=
reader.receiverQueueSize=
reader.readerName=
reader.startMessagePos=earliest

View File

@ -30,5 +30,5 @@ blocks:
ops:
op1:
MessageConsume: "tnt0/ns0/tp0"
subscription_name: "mynbsub"
# subscription_type: "shared"
subscriptionName: "mynbsub"
subscriptionType: "Shared"

View File

@ -39,7 +39,7 @@ public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T>
private final String name;
protected final DriverAdapter<T, S> adapter;
protected boolean instrument;
private boolean instrument;
private Histogram resultSizeHistogram;
private Timer successTimer;
private Timer errorTimer;

View File

@ -83,15 +83,6 @@ public class ActivityMetrics {
return metric;
}
private static Metric register(String fullMetricName, MetricProvider metricProvider) {
Metric metric = get().getMetrics().get(fullMetricName);
if (metric == null) {
metric = metricProvider.getMetric();
return get().register(fullMetricName, metric);
}
return metric;
}
private static Metric register(ScriptContext context, String name, MetricProvider metricProvider) {
Metric metric = get().getMetrics().get(name);
if (metric == null) {

View File

@ -92,7 +92,6 @@
<module>driver-jmx</module>
<module>driver-jdbc</module>
<module>driver-cockroachdb</module>
<!--<module>driver-pulsar</module>-->
</modules>
</profile>