Pulsar Reader API with Avro schema support

This commit is contained in:
Yabin Meng 2021-02-28 17:42:16 -06:00
parent b0da4a149b
commit b3084fdd4f
9 changed files with 312 additions and 97 deletions

View File

@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@ -19,12 +20,12 @@ public class PulsarConsumerSpace extends PulsarSpace {
public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) { super(name, pulsarClientConf); }
private String getEffectiveTopicNamesStr(String cycleTopicNames) {
if ((cycleTopicNames != null) && (!cycleTopicNames.isEmpty())) {
if ( !StringUtils.isBlank(cycleTopicNames) ) {
return cycleTopicNames;
}
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
if ((globalTopicNames != null) && (!globalTopicNames.isEmpty())) {
if ( !StringUtils.isBlank(globalTopicNames) ) {
return globalTopicNames;
}
@ -37,7 +38,7 @@ public class PulsarConsumerSpace extends PulsarSpace {
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if ( !name.isEmpty() )
if ( !StringUtils.isBlank(name) )
effectiveTopicNameList.add(name.trim());
}
@ -46,22 +47,25 @@ public class PulsarConsumerSpace extends PulsarSpace {
}
private String getEffectiveTopicPatternStr(String cycleTopicsPattern) {
if ((cycleTopicsPattern != null) && (!cycleTopicsPattern.isEmpty())) {
if ( !StringUtils.isBlank(cycleTopicsPattern) ) {
return cycleTopicsPattern;
}
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
if ((globalTopicsPattern != null) && (!globalTopicsPattern.isEmpty())) {
if ( !StringUtils.isBlank(globalTopicsPattern) ) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) {
String effecitveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern;
try {
topicsPattern = Pattern.compile(effecitveTopicsPatternStr);
if ( !StringUtils.isBlank(effectiveTopicsPatternStr) )
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
else
topicsPattern = null;
}
catch (PatternSyntaxException pse) {
topicsPattern = null;
@ -70,12 +74,12 @@ public class PulsarConsumerSpace extends PulsarSpace {
}
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
if ((cycleSubscriptionName != null) && (!cycleSubscriptionName.isEmpty())) {
if ( !StringUtils.isBlank(cycleSubscriptionName) ) {
return cycleSubscriptionName;
}
String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName();
if ((globalSubscriptionName != null) && (!globalSubscriptionName.isEmpty())) {
if ( !StringUtils.isBlank(globalSubscriptionName) ) {
return globalSubscriptionName;
}
@ -83,12 +87,12 @@ public class PulsarConsumerSpace extends PulsarSpace {
}
private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) {
if ((cycleSubscriptionType != null) && (!cycleSubscriptionType.isEmpty())) {
if ( !StringUtils.isBlank(cycleSubscriptionType) ) {
return cycleSubscriptionType;
}
String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType();
if ((globalSubscriptionType != null) && (!globalSubscriptionType.isEmpty())) {
if ( !StringUtils.isBlank(globalSubscriptionType) ) {
return globalSubscriptionType;
}
@ -109,12 +113,12 @@ public class PulsarConsumerSpace extends PulsarSpace {
}
private String getEffectiveConsumerName(String cycleConsumerName) {
if ((cycleConsumerName != null) && (!cycleConsumerName.isEmpty())) {
if ( !StringUtils.isBlank(cycleConsumerName) ) {
return cycleConsumerName;
}
String globalConsumerName = pulsarNBClientConf.getConsumerName();
if ((globalConsumerName != null) && (!globalConsumerName.isEmpty())) {
if ( !StringUtils.isBlank(globalConsumerName) ) {
return globalConsumerName;
}
@ -130,12 +134,28 @@ public class PulsarConsumerSpace extends PulsarSpace {
String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
List<String> topicNames = getEffectiveTopicNames(cycleTopicNames);
String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
String encodedStr = PulsarActivityUtil.encode(
consumerName, subscriptionName, topicNamesStr, topicsPatternStr);
if ( topicNames.isEmpty() && (topicsPattern == null) ) {
throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!");
}
String encodedStr;
if ( !topicNames.isEmpty() ) {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
StringUtils.join(topicNames, "|") );
}
else {
encodedStr = PulsarActivityUtil.encode(
consumerName,
subscriptionName,
topicsPatternStr );
}
Consumer<?> consumer = consumers.get(encodedStr);
if (consumer == null) {
@ -146,23 +166,19 @@ public class PulsarConsumerSpace extends PulsarSpace {
// Explicit topic names will take precedence over topics pattern
if ( !topicNames.isEmpty() ) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_KEY.topicsPattern.toString());
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.topicNames.toString(), topicNames);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.toString(), topicNames);
}
else {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_KEY.topicNames.toString());
if ( !topicsPatternStr.isEmpty() )
consumerConf.put(
PulsarActivityUtil.CONSUMER_CONF_KEY.topicsPattern.toString(),
getEffectiveTopicPattern(cycleTopicsPattern));
else {
throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!");
}
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.put(
PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
getEffectiveTopicPattern(cycleTopicsPattern));
}
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.subscriptionName.toString(), subscriptionName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.subscriptionType.toString(), subscriptionType);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.consumerName.toString(), consumerName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName);
try {
consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe();

View File

@ -2,11 +2,11 @@ package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -22,12 +22,12 @@ public class PulsarProducerSpace extends PulsarSpace{
// - It can be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerName(String cycleProducerName) {
if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) {
if ( !StringUtils.isBlank(cycleProducerName) ) {
return cycleProducerName;
}
String globalProducerName = pulsarNBClientConf.getProducerName();
if ((globalProducerName != null) && (!globalProducerName.isEmpty())) {
if ( !StringUtils.isBlank(globalProducerName) ) {
return globalProducerName;
}
@ -39,12 +39,12 @@ public class PulsarProducerSpace extends PulsarSpace{
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveTopicName(String cycleTopicName) {
if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) {
if ( !StringUtils.isBlank(cycleTopicName) ) {
return cycleTopicName;
}
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
if ( (globalTopicName == null) || (globalTopicName.isEmpty()) ) {
if ( !StringUtils.isBlank(globalTopicName) ) {
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
}
@ -63,8 +63,8 @@ public class PulsarProducerSpace extends PulsarSpace{
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_KEY.topicName.toString(), topicName);
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_KEY.producerName.toString(), producerName);
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
try {
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();

View File

@ -1,8 +1,11 @@
package io.nosqlbench.driver.pulsar;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import org.apache.pulsar.client.api.Reader;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarReaderSpace extends PulsarSpace {
@ -12,4 +15,96 @@ public class PulsarReaderSpace extends PulsarSpace {
public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) {
super(name, pulsarClientConf);
}
private String getEffectiveReaderTopicName(String cycleReaderTopicName) {
if ( !StringUtils.isBlank(cycleReaderTopicName) ) {
return cycleReaderTopicName;
}
String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName();
if ( !StringUtils.isBlank(globalReaderTopicName) ) {
return globalReaderTopicName;
}
return "";
}
private String getEffectiveReaderName(String cycleReaderName) {
if ( !StringUtils.isBlank(cycleReaderName) ) {
return cycleReaderName;
}
String globalReaderName = pulsarNBClientConf.getConsumerName();
if ( !StringUtils.isBlank(globalReaderName) ) {
return globalReaderName;
}
return "default-read";
}
private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) {
if ( !StringUtils.isBlank(cycleStartMsgPosStr) ) {
return cycleStartMsgPosStr;
}
String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr();
if ( !StringUtils.isBlank(globalStartMsgPosStr) ) {
return globalStartMsgPosStr;
}
return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
}
public Reader<?> getReader(String cycleTopicName,
String cycleReaderName,
String cycleStartMsgPos) {
String topicName = getEffectiveReaderTopicName(cycleTopicName);
String readerName = getEffectiveReaderName(cycleReaderName);
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
if ( StringUtils.isBlank(topicName) ) {
throw new RuntimeException("Must specify a \"topicName\" for a reader!");
}
String encodedStr = PulsarActivityUtil.encode(cycleTopicName, cycleReaderName, cycleStartMsgPos);
Reader<?> reader = readers.get(encodedStr);
if (reader == null) {
PulsarClient pulsarClient = getPulsarClient();
Map<String, Object> readerConf = pulsarNBClientConf.getReaderConfMap();
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName);
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName);
// "reader.startMessagePos" is NOT a standard Pulsar reader conf
readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
try {
ReaderBuilder<?> readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf);
MessageId startMsgId = MessageId.latest;
if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
startMsgId = MessageId.earliest;
}
//TODO: custom start message position is NOT supported yet
//else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) {
// startMsgId = MessageId.latest;
//}
if (startMsgId != null) {
readerBuilder = readerBuilder.startMessageId(startMsgId);
}
reader = readerBuilder.create();
}
catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar reader!");
}
readers.put(encodedStr, reader);
}
return reader;
}
}

View File

@ -1,7 +1,12 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
public class PulsarReaderOp implements PulsarOp {
private final Reader<?> reader;
@ -14,6 +19,27 @@ public class PulsarReaderOp implements PulsarOp {
@Override
public void run() {
//TODO: to be added
try {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
// TODO: how many messages to read per NB cycle?
Message<?> message;
while (reader.hasMessageAvailable()) {
message = reader.readNext();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString());
}
else {
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData()));
}
}
}
catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -58,13 +58,13 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
// TODO: Complete implementation for reader, websocket-producer and managed-ledger
if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) {
assert clientSpace instanceof PulsarProducerSpace;
return resolveProducer((PulsarProducerSpace) clientSpace, cmdTpl);
return resolveProducer((PulsarProducerSpace) clientSpace);
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString()) ) {
assert clientSpace instanceof PulsarConsumerSpace;
return resolveConsumer((PulsarConsumerSpace)clientSpace, cmdTpl); /*
return resolveConsumer((PulsarConsumerSpace)clientSpace);
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString()) ) {
assert clientSpace instanceof PulsarReaderSpace;
return resolveReader((PulsarReaderSpace)clientSpace, cmdTpl);
return resolveReader((PulsarReaderSpace)clientSpace); /*
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.WSOKT_PRODUCER.toString()) ) {
} else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.MANAGED_LEDGER.toString()) ) {
*/
@ -74,8 +74,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
}
private LongFunction<PulsarOp> resolveProducer(
PulsarProducerSpace clientSpace,
CommandTemplate cmdTpl
PulsarProducerSpace clientSpace
) {
if (cmdTpl.containsKey("topic_url")) {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
@ -152,8 +151,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
}
private LongFunction<PulsarOp> resolveConsumer(
PulsarConsumerSpace clientSpace,
CommandTemplate cmdTpl
PulsarConsumerSpace clientSpace
) {
LongFunction<String> topic_names_func;
if (cmdTpl.isStatic("topic-names")) {
@ -213,11 +211,43 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
}
private LongFunction<PulsarOp> resolveReader(
PulsarReaderSpace pulsarSpace,
CommandTemplate cmdTpl
PulsarReaderSpace clientSpace
) {
//TODO: to be completed
return null;
LongFunction<String> topic_name_func;
if (cmdTpl.isStatic("topic-name")) {
topic_name_func = (l) -> cmdTpl.getStatic("topic-name");
} else if (cmdTpl.isDynamic("topic-name")) {
topic_name_func = (l) -> cmdTpl.getDynamic("topic-name", l);
} else {
topic_name_func = (l) -> null;
}
LongFunction<String> reader_name_func;
if (cmdTpl.isStatic("reader-name")) {
reader_name_func = (l) -> cmdTpl.getStatic("reader-name");
} else if (cmdTpl.isDynamic("reader-name")) {
reader_name_func = (l) -> cmdTpl.getDynamic("reader-name", l);
} else {
reader_name_func = (l) -> null;
}
LongFunction<String> start_msg_pos_str_func;
if (cmdTpl.isStatic("start-msg-position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start-msg-position");
} else if (cmdTpl.isDynamic("start-msg-position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start-msg-position", l);
} else {
start_msg_pos_str_func = (l) -> null;
}
LongFunction<Reader<?>> readerFunc = (l) ->
clientSpace.getReader(
topic_name_func.apply(l),
reader_name_func.apply(l),
start_msg_pos_str_func.apply(l)
);
return new PulsarReaderMapper(cmdTpl, pulsarSchema, readerFunc);
}
@Override

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Schema;
@ -11,7 +12,6 @@ import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.InvalidPathException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@ -94,9 +94,9 @@ public class PulsarActivityUtil {
}
///////
// Valid producer configuration (activity-level settings)
// Standard producer configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
public enum PRODUCER_CONF_KEY {
public enum PRODUCER_CONF_STD_KEY {
topicName("topicName"),
producerName("producerName"),
sendTimeoutMs("sendTimeoutMs"),
@ -113,18 +113,18 @@ public class PulsarActivityUtil {
;
public final String label;
PRODUCER_CONF_KEY(String label) {
PRODUCER_CONF_STD_KEY(String label) {
this.label = label;
}
}
public static boolean isValidProducerConfItem(String item) {
return Arrays.stream(PRODUCER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
public static boolean isStandardProducerConfItem(String item) {
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Valid consumer configuration (activity-level settings)
// Standard consumer configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#consumer
public enum CONSUMER_CONF_KEY {
public enum CONSUMER_CONF_STD_KEY {
topicNames("topicNames"),
topicsPattern("topicsPattern"),
subscriptionName("subscriptionName"),
@ -149,18 +149,18 @@ public class PulsarActivityUtil {
;
public final String label;
CONSUMER_CONF_KEY(String label) {
CONSUMER_CONF_STD_KEY(String label) {
this.label = label;
}
}
public static boolean isValidConsumerConfItem(String item) {
return Arrays.stream(CONSUMER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
public static boolean isStandardConsumerConfItem(String item) {
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
///////
// Valid reader configuration (activity-level settings)
// Standard reader configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#reader
public enum READER_CONF_KEY {
public enum READER_CONF_STD_KEY {
topicName("topicName"),
receiverQueueSize("receiverQueueSize"),
readerListener("readerListener"),
@ -173,12 +173,34 @@ public class PulsarActivityUtil {
;
public final String label;
READER_CONF_KEY(String label) {
READER_CONF_STD_KEY(String label) {
this.label = label;
}
}
public static boolean isValidReaderConfItem(String item) {
return Arrays.stream(READER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
public static boolean isStandardReaderConfItem(String item) {
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
public enum READER_CONF_CUSTOM_KEY {
startMessagePos("startMessagePos")
;
public final String label;
READER_CONF_CUSTOM_KEY(String label) {
this.label = label;
}
}
public static boolean isCustomReaderConfItem(String item) {
return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
}
public enum READER_MSG_POSITION_TYPE {
earliest("earliest"),
latest("latest"),
custom("custom");
public final String label;
READER_MSG_POSITION_TYPE(String label) { this.label = label; }
}
///////
@ -211,7 +233,7 @@ public class PulsarActivityUtil {
boolean isPrimitive = false;
// Use "BYTES" as the default type if the type string is not explicitly specified
if ((typeStr == null) || typeStr.isEmpty()) {
if (StringUtils.isBlank(typeStr)) {
typeStr = "BYTES";
}
@ -303,7 +325,7 @@ public class PulsarActivityUtil {
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
if (isAvroSchemaTypeStr(typeStr)) {
if ( (schemaDefinitionStr == null) || schemaDefinitionStr.isEmpty()) {
if ( StringUtils.isBlank(schemaDefinitionStr) ) {
throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!");
} else if (schemaDefinitionStr.startsWith(filePrefix)) {
try {
@ -333,13 +355,15 @@ public class PulsarActivityUtil {
public static String encode(String... strings) {
StringBuilder stringBuilder = new StringBuilder();
for (String str : strings) {
if ((str != null) && !str.isEmpty())
if ( !StringUtils.isBlank(str) )
stringBuilder.append(str).append("::");
}
return Base64.getEncoder().encodeToString(stringBuilder.toString().getBytes());
String concatenatedStr =
StringUtils.substringAfterLast(stringBuilder.toString(), "::");
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
}
}

View File

@ -7,6 +7,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -22,12 +23,12 @@ public class PulsarNBClientConf {
private String canonicalFilePath = "";
private static final String DRIVER_CONF_PREFIX = "driver";
private static final String SCHEMA_CONF_PREFIX = "schema";
private static final String CLIENT_CONF_PREFIX = "client";
private static final String PRODUCER_CONF_PREFIX = "producer";
private static final String CONSUMER_CONF_PREFIX = "consumer";
private static final String READER_CONF_PREFIX = "reader";
public static final String DRIVER_CONF_PREFIX = "driver";
public static final String SCHEMA_CONF_PREFIX = "schema";
public static final String CLIENT_CONF_PREFIX = "client";
public static final String PRODUCER_CONF_PREFIX = "producer";
public static final String CONSUMER_CONF_PREFIX = "consumer";
public static final String READER_CONF_PREFIX = "reader";
private HashMap<String, Object> driverConfMap = new HashMap<>();
private HashMap<String, Object> schemaConfMap = new HashMap<>();
private HashMap<String, Object> clientConfMap = new HashMap<>();
@ -56,7 +57,7 @@ public class PulsarNBClientConf {
for (Iterator<String> it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if ( (confVal != null) && !confVal.isEmpty() )
if ( !StringUtils.isBlank(confVal) )
driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
@ -64,7 +65,7 @@ public class PulsarNBClientConf {
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if ( (confVal != null) && !confVal.isEmpty() )
if ( !StringUtils.isBlank(confVal) )
schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
@ -72,7 +73,7 @@ public class PulsarNBClientConf {
for (Iterator<String> it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if ( (confVal != null) && !confVal.isEmpty() )
if ( !StringUtils.isBlank(confVal) )
clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
@ -80,23 +81,23 @@ public class PulsarNBClientConf {
for (Iterator<String> it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if ( (confVal != null) && !confVal.isEmpty() )
if ( !StringUtils.isBlank(confVal) )
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
// Get producer specific configuration settings
// Get consumer specific configuration settings
for (Iterator<String> it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if ( (confVal != null) && !confVal.isEmpty() )
if ( !StringUtils.isBlank(confVal) )
consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
// Get producer specific configuration settings
// Get reader specific configuration settings
for (Iterator<String> it = config.getKeys(READER_CONF_PREFIX); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if ( (confVal != null) && !confVal.isEmpty() )
if ( !StringUtils.isBlank(confVal) )
readerConfMap.put(confKey.substring(READER_CONF_PREFIX.length()+1), config.getProperty(confKey));
}
}
@ -321,4 +322,25 @@ public class PulsarNBClientConf {
readerConfMap.put(key, value);
}
// Other consumer helper functions ...
public String getReaderTopicName() {
Object confValue = getReaderConfValue("reader.topicName");
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getReaderName() {
Object confValue = getReaderConfValue("reader.readerName");
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getStartMsgPosStr() {
Object confValue = getReaderConfValue("reader.startMessagePos");
if (confValue == null)
return "";
else
return confValue.toString();
}
}

View File

@ -1,5 +1,5 @@
### NB Pulsar driver related configuration - driver.xxx
driver.client-type = consumer
driver.client-type = producer
driver.num-workers = 1
# TODO: functionalities to be completed
driver.sync-mode = sync
@ -16,8 +16,8 @@ driver.msg-recv-ouput = console
# TODO: as a starting point, only supports the following types
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
# 2) Avro for messages with schema
schema.type =
schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc
schema.type = avro
schema.definition = file://<path>/<to>/<avro-definition-file>
### Pulsar client related configurations - client.xxx
@ -43,6 +43,8 @@ consumer.receiverQueueSize =
### Reader related configurations (global) - reader.xxx
# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
reader.topicName =
# - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file>
reader.topicName = persistent://public/default/nbpulsar
reader.receiverQueueSize =
reader.readerName =
#reader.startMessagePos = earliest

View File

@ -46,17 +46,17 @@ blocks:
statements:
- consumer-stuff:
topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest"
# topics-pattern: "public/default/.*"
# subscription-name:
# subscription-type:
# consumer-name:
topics-pattern: "public/default/.*"
subscription-name:
subscription-type:
consumer-name:
- reader:
tags:
op-type: reader
statements:
- reader-stuff:
# - reader:
# tags:
# type: reader
# statements:
# - reader-stuff:
#
# - websocket-producer:
# tags:
# type: websocket-produer