Merge branch 'main' into all-contributors/add-eolivelli

This commit is contained in:
Jonathan Shook 2023-02-07 12:56:42 -06:00 committed by GitHub
commit bd1134b2b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 259 deletions

View File

@ -78,7 +78,18 @@
"avatar_url": "https://avatars.githubusercontent.com/u/9469110?v=4",
"profile": "http://eolivelli.blogspot.it/",
"contributions": [
"test"
"test", "code", "review"
]
},
{
"login": "lhotari",
"name": "Lari Hotari",
"avatar_url": "https://avatars.githubusercontent.com/u/66864?v=4",
"profile": "https://github.com/lhotari",
"contributions": [
"bug",
"code",
"review"
]
}
]

View File

@ -135,6 +135,7 @@ For recognizing contributions, please follow [this documentation](https://allcon
<td align="center" valign="top" width="14.28%"><a href="http://jjbanks.com"><img src="https://avatars.githubusercontent.com/u/4078933?v=4?s=50" width="50px;" alt="Jeff Banks"/><br /><sub><b>Jeff Banks</b></sub></a><br /><a href="https://github.com/nosqlbench/nosqlbench/commits?author=jeffbanks" title="Code">💻</a> <a href="#mentoring-jeffbanks" title="Mentoring">🧑‍🏫</a> <a href="https://github.com/nosqlbench/nosqlbench/commits?author=jeffbanks" title="Tests">⚠️</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/hemidactylus"><img src="https://avatars.githubusercontent.com/u/14221764?v=4?s=50" width="50px;" alt="Stefano Lottini"/><br /><sub><b>Stefano Lottini</b></sub></a><br /><a href="https://github.com/nosqlbench/nosqlbench/issues?q=author%3Ahemidactylus" title="Bug reports">🐛</a></td>
<td align="center" valign="top" width="14.28%"><a href="http://eolivelli.blogspot.it/"><img src="https://avatars.githubusercontent.com/u/9469110?v=4?s=50" width="50px;" alt="Enrico Olivelli"/><br /><sub><b>Enrico Olivelli</b></sub></a><br /><a href="https://github.com/nosqlbench/nosqlbench/commits?author=eolivelli" title="Tests">⚠️</a></td>
<td align="center" valign="top" width="14.28%"><a href="https://github.com/lhotari"><img src="https://avatars.githubusercontent.com/u/66864?v=4?s=50" width="50px;" alt="Lari Hotari"/><br /><sub><b>Lari Hotari</b></sub></a><br /><a href="https://github.com/nosqlbench/nosqlbench/issues?q=author%3Alhotari" title="Bug reports">🐛</a> <a href="https://github.com/nosqlbench/nosqlbench/commits?author=lhotari" title="Code">💻</a> <a href="https://github.com/nosqlbench/nosqlbench/pulls?q=is%3Apr+reviewed-by%3Alhotari" title="Reviewed Pull Requests">👀</a></td>
</tr>
</tbody>
</table>

View File

@ -32,8 +32,10 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
public class PulsarSpace implements AutoCloseable {
@ -50,9 +52,18 @@ public class PulsarSpace implements AutoCloseable {
private PulsarAdmin pulsarAdmin;
private Schema<?> pulsarSchema;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
public record ProducerCacheKey(String producerName, String topicName) {
}
private final ConcurrentHashMap<ProducerCacheKey, Producer<?>> producers = new ConcurrentHashMap<>();
public record ConsumerCacheKey(String consumerName, String subscriptionName, List<String> topicNameList, String topicPattern) {
}
private final ConcurrentHashMap<ConsumerCacheKey, Consumer<?>> consumers = new ConcurrentHashMap<>();
public record ReaderCacheKey(String readerName, String topicName, String startMsgPosStr) {
}
private final ConcurrentHashMap<ReaderCacheKey, Reader<?>> readers = new ConcurrentHashMap<>();
public PulsarSpace(String spaceName, NBConfiguration cfg) {
@ -89,13 +100,11 @@ public class PulsarSpace implements AutoCloseable {
public int getProducerSetCnt() { return producers.size(); }
public int getConsumerSetCnt() { return consumers.size(); }
public int getReaderSetCnt() { return readers.size(); }
public Producer<?> getProducer(String name) { return producers.get(name); }
public void setProducer(String name, Producer<?> producer) { producers.put(name, producer); }
public Consumer<?> getConsumer(String name) { return consumers.get(name); }
public void setConsumer(String name, Consumer<?> consumer) { consumers.put(name, consumer); }
public Producer<?> getProducer(ProducerCacheKey key, Supplier<Producer<?>> producerSupplier) { return producers.computeIfAbsent(key, __ -> producerSupplier.get()); }
public Reader<?> getReader(String name) { return readers.get(name); }
public void setReader(String name, Reader<?> reader) { readers.put(name, reader); }
public Consumer<?> getConsumer(ConsumerCacheKey key, Supplier<Consumer<?>> consumerSupplier) { return consumers.computeIfAbsent(key, __ -> consumerSupplier.get()); }
public Reader<?> getReader(ReaderCacheKey key, Supplier<Reader<?>> readerSupplier) { return readers.computeIfAbsent(key, __ -> readerSupplier.get()); }
/**

View File

@ -37,7 +37,6 @@ import java.util.*;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, PulsarSpace> implements NBNamedElement {
@ -239,10 +238,8 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.producerName.label,
cycleProducerName);
String producerCacheKey = PulsarAdapterUtil.buildCacheKey(producerName, topicName);
Producer<?> producer = pulsarSpace.getProducer(producerCacheKey);
if (producer == null) {
PulsarSpace.ProducerCacheKey producerCacheKey = new PulsarSpace.ProducerCacheKey(producerName, topicName);
return pulsarSpace.getProducer(producerCacheKey, () -> {
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
// Get other possible producer settings that are set at global level
@ -262,21 +259,17 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
producerBuilder = producerBuilder.producerName(producerName);
}
producer = producerBuilder.create();
pulsarSpace.setProducer(producerCacheKey, producer);
Producer<?> producer = producerBuilder.create();
pulsarAdapterMetrics.registerProducerApiMetrics(producer,
getPulsarAPIMetricsPrefix(
PulsarAdapterUtil.PULSAR_API_TYPE.PRODUCER.label,
producerName,
topicName));
}
catch (PulsarClientException ple) {
return producer;
} catch (PulsarClientException ple) {
throw new PulsarAdapterUnexpectedException("Failed to create a Pulsar producer.");
}
}
return producer;
});
}
private List<String> getEffectiveConsumerTopicNameList(String cycleTopicNameListStr) {
@ -296,24 +289,6 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
return effectiveTopicNameList;
}
private Pattern getEffectiveConsumerTopicPattern(String cycleTopicPatternStr) {
String effectiveTopicPatternStr = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
cycleTopicPatternStr);
Pattern topicsPattern;
try {
if (!StringUtils.isBlank(effectiveTopicPatternStr))
topicsPattern = Pattern.compile(effectiveTopicPatternStr);
else
topicsPattern = null;
} catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String subscriptionTypeStr = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
@ -344,11 +319,10 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
List<String> topicNameList = getEffectiveConsumerTopicNameList(cycleTopicNameListStr);
String topicPatternStr = getEffectiveConValue(
String topicPatternStr = StringUtils.trimToNull(getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
cycleTopicPatternStr);
Pattern topicPattern = getEffectiveConsumerTopicPattern(cycleTopicPatternStr);
cycleTopicPatternStr));
String subscriptionName = getEffectiveConValue(
PulsarAdapterUtil.CONF_GATEGORY.Consumer.label,
@ -368,28 +342,14 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
"creating multiple consumers of \"Exclusive\" subscription type under the same subscription name");
}
if ( (topicNameList.isEmpty() && (topicPattern == null)) ||
(!topicNameList.isEmpty() && (topicPattern != null)) ) {
if ( (topicNameList.isEmpty() && (topicPatternStr == null)) ||
(!topicNameList.isEmpty() && (topicPatternStr != null)) ) {
throw new PulsarAdapterInvalidParamException(
"Invalid combination of topic name(s) and topic patterns; only specify one parameter!");
}
boolean multiTopicConsumer = (topicNameList.size() > 1 || (topicPattern != null));
String consumerTopicListString;
if (!topicNameList.isEmpty()) {
consumerTopicListString = String.join("|", topicNameList);
} else {
consumerTopicListString = topicPatternStr;
}
String consumerCacheKey = PulsarAdapterUtil.buildCacheKey(
consumerName,
subscriptionName,
consumerTopicListString);
Consumer<?> consumer = pulsarSpace.getConsumer(consumerCacheKey);
if (consumer == null) {
return pulsarSpace.getConsumer(
new PulsarSpace.ConsumerCacheKey(consumerName, subscriptionName, topicNameList, topicPatternStr), () -> {
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
// Get other possible consumer settings that are set at global level
@ -417,6 +377,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.negativeAckRedeliveryBackoff.label);
consumerConfToLoad.remove(PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.ackTimeoutRedeliveryBackoff.label);
boolean multiTopicConsumer = (topicNameList.size() > 1 || (topicPatternStr != null));
if (!multiTopicConsumer) {
assert (topicNameList.size() == 1);
consumerBuilder = pulsarClient.newConsumer(pulsarSpace.getPulsarSchema());
@ -429,6 +390,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
consumerBuilder.topics(topicNameList);
}
else {
Pattern topicPattern = Pattern.compile(topicPatternStr);
consumerBuilder.topicsPattern(topicPattern);
}
}
@ -465,22 +427,22 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
consumerBuilder.keySharedPolicy(keySharedPolicy);
}
consumer = consumerBuilder.subscribe();
pulsarSpace.setConsumer(consumerCacheKey, consumer);
Consumer<?> consumer = consumerBuilder.subscribe();
String consumerTopicListString = (!topicNameList.isEmpty()) ? String.join("|", topicNameList) : topicPatternStr;
pulsarAdapterMetrics.registerConsumerApiMetrics(
consumer,
getPulsarAPIMetricsPrefix(
PulsarAdapterUtil.PULSAR_API_TYPE.CONSUMER.label,
consumerName,
consumerTopicListString));
return consumer;
}
catch (PulsarClientException ple) {
throw new PulsarAdapterUnexpectedException("Failed to create a Pulsar consumer!");
}
}
return consumer;
});
}
private static Range[] parseRanges(String ranges) {
@ -528,10 +490,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
throw new RuntimeException("Reader:: Invalid value for reader start message position!");
}
String readerCacheKey = PulsarAdapterUtil.buildCacheKey(topicName, readerName, startMsgPosStr);
Reader<?> reader = pulsarSpace.getReader(readerCacheKey);
if (reader == null) {
return pulsarSpace.getReader(new PulsarSpace.ReaderCacheKey(readerName, topicName, startMsgPosStr), () -> {
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();;
Map<String, Object> readerConf = pulsarSpace.getPulsarNBClientConf().getReaderConfMapTgt();
@ -558,17 +517,12 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
// startMsgId = MessageId.latest;
//}
reader = readerBuilder.startMessageId(startMsgId).create();
return readerBuilder.startMessageId(startMsgId).create();
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar reader!");
}
pulsarSpace.setReader(readerCacheKey, reader);
}
return reader;
});
}
//
//////////////////////////////////////

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.pulsar.util;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterInvalidParamException;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
@ -23,6 +24,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.net.URI;
@ -30,7 +33,11 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -60,9 +67,6 @@ public class PulsarAdapterUtil {
this.label = label;
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
///////
// Message processing sequence error simulation types
@ -77,29 +81,21 @@ public class PulsarAdapterUtil {
this.label = label;
}
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = new HashMap<>();
static {
for (MSG_SEQ_ERROR_SIMU_TYPE simuType : values()) {
MAPPING.put(simuType.label, simuType);
MAPPING.put(simuType.label.toLowerCase(), simuType);
MAPPING.put(simuType.label.toUpperCase(), simuType);
MAPPING.put(simuType.name(), simuType);
MAPPING.put(simuType.name().toLowerCase(), simuType);
MAPPING.put(simuType.name().toUpperCase(), simuType);
}
}
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = Stream.of(values())
.flatMap(simuType ->
Stream.of(simuType.label,
simuType.label.toLowerCase(),
simuType.label.toUpperCase(),
simuType.name(),
simuType.name().toLowerCase(),
simuType.name().toUpperCase())
.distinct().map(key -> Map.entry(key, simuType)))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
public static Optional<MSG_SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
}
}
public static boolean isValidSeqErrSimuType(String item) {
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidSeqErrSimuTypeList() {
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid Pulsar API type
@ -113,12 +109,15 @@ public class PulsarAdapterUtil {
PULSAR_API_TYPE(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isValidPulsarApiType(String param) {
return Arrays.stream(PULSAR_API_TYPE.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidPulsarApiTypeList() {
return Arrays.stream(PULSAR_API_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
return PULSAR_API_TYPE.isValidLabel(param);
}
@ -136,14 +135,16 @@ public class PulsarAdapterUtil {
CONF_GATEGORY(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isValidConfCategory(String item) {
return Arrays.stream(CONF_GATEGORY.values()).anyMatch(t -> t.label.equals(item));
return CONF_GATEGORY.isValidLabel(item);
}
public static String getValidConfCategoryList() {
return Arrays.stream(CONF_GATEGORY.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid persistence type
public enum PERSISTENT_TYPES {
@ -156,9 +157,6 @@ public class PulsarAdapterUtil {
this.label = label;
}
}
public static boolean isValidPersistenceType(String type) {
return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
///////
// Valid Pulsar client configuration (activity-level settings)
@ -194,9 +192,6 @@ public class PulsarAdapterUtil {
this.label = label;
}
}
public static boolean isValidClientConfItem(String item) {
return Arrays.stream(CLNT_CONF_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Standard producer configuration (activity-level settings)
@ -222,9 +217,6 @@ public class PulsarAdapterUtil {
this.label = label;
}
}
public static boolean isStandardProducerConfItem(String item) {
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
// compressionType
public enum COMPRESSION_TYPE {
@ -239,12 +231,12 @@ public class PulsarAdapterUtil {
COMPRESSION_TYPE(String label) {
this.label = label;
}
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public static boolean isValidCompressionType(String item) {
return Arrays.stream(COMPRESSION_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidCompressionTypeList() {
return Arrays.stream(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
return COMPRESSION_TYPE.TYPE_LIST;
}
///////
@ -284,9 +276,6 @@ public class PulsarAdapterUtil {
this.label = label;
}
}
public static boolean isStandardConsumerConfItem(String item) {
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Custom consumer configuration (activity-level settings)
@ -301,9 +290,16 @@ public class PulsarAdapterUtil {
CONSUMER_CONF_CUSTOM_KEY(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isCustomConsumerConfItem(String item) {
return Arrays.stream(CONSUMER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
return CONSUMER_CONF_CUSTOM_KEY.isValidLabel(item);
}
// subscriptionTyp
@ -318,12 +314,21 @@ public class PulsarAdapterUtil {
SUBSCRIPTION_TYPE(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public static boolean isValidSubscriptionType(String item) {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch(t -> t.label.equals(item));
return SUBSCRIPTION_TYPE.isValidLabel(item);
}
public static String getValidSubscriptionTypeList() {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
return SUBSCRIPTION_TYPE.TYPE_LIST;
}
// subscriptionInitialPosition
@ -336,12 +341,12 @@ public class PulsarAdapterUtil {
SUBSCRIPTION_INITIAL_POSITION(String label) {
this.label = label;
}
}
public static boolean isValidSubscriptionInitialPosition(String item) {
return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).anyMatch(t -> t.label.equals(item));
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public static String getValidSubscriptionInitialPositionList() {
return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).map(t -> t.label).collect(Collectors.joining(", "));
return SUBSCRIPTION_INITIAL_POSITION.TYPE_LIST;
}
// regexSubscriptionMode
@ -355,12 +360,12 @@ public class PulsarAdapterUtil {
REGEX_SUBSCRIPTION_MODE(String label) {
this.label = label;
}
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public static boolean isValidRegexSubscriptionMode(String item) {
return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidRegexSubscriptionModeList() {
return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).map(t -> t.label).collect(Collectors.joining(", "));
return REGEX_SUBSCRIPTION_MODE.TYPE_LIST;
}
///////
@ -383,9 +388,6 @@ public class PulsarAdapterUtil {
this.label = label;
}
}
public static boolean isStandardReaderConfItem(String item) {
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Custom reader configuration (activity-level settings)
@ -400,9 +402,6 @@ public class PulsarAdapterUtil {
this.label = label;
}
}
public static boolean isCustomReaderConfItem(String item) {
return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Valid read positions for a Pulsar reader
@ -415,156 +414,84 @@ public class PulsarAdapterUtil {
READER_MSG_POSITION_TYPE(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isValideReaderStartPosition(String item) {
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item));
return READER_MSG_POSITION_TYPE.isValidLabel(item);
}
private static final Map<String, Schema<?>> PRIMITIVE_SCHEMA_TYPE_MAPPING = Stream.of(SchemaType.values())
.filter(SchemaType::isPrimitive)
.collect(Collectors.toUnmodifiableMap(schemaType -> schemaType.name().toUpperCase(),
schemaType -> Schema.getSchema(SchemaInfo.builder().type(schemaType).build())));
///////
// Primitive Schema type
public static boolean isPrimitiveSchemaTypeStr(String typeStr) {
boolean isPrimitive = false;
// Use "BYTES" as the default type if the type string is not explicitly specified
if (StringUtils.isBlank(typeStr)) {
typeStr = "BYTES";
}
if (typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") ||
typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") ||
typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") ||
typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") ||
typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") ||
typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") ||
typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") ||
typeStr.equalsIgnoreCase("LOCAL_DATE_TIME")) {
isPrimitive = true;
}
return isPrimitive;
return StringUtils.isBlank(typeStr) || PRIMITIVE_SCHEMA_TYPE_MAPPING.containsKey(typeStr.toUpperCase());
}
public static Schema<?> getPrimitiveTypeSchema(String typeStr) {
Schema<?> schema;
if (StringUtils.isBlank(typeStr)) {
typeStr = "BYTES";
String lookupKey = StringUtils.isBlank(typeStr) ? "BYTES" : typeStr.toUpperCase();
Schema<?> schema = PRIMITIVE_SCHEMA_TYPE_MAPPING.get(lookupKey);
if (schema == null) {
throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr);
}
switch (typeStr.toUpperCase()) {
case "BOOLEAN":
schema = Schema.BOOL;
break;
case "INT8":
schema = Schema.INT8;
break;
case "INT16":
schema = Schema.INT16;
break;
case "INT32":
schema = Schema.INT32;
break;
case "INT64":
schema = Schema.INT64;
break;
case "FLOAT":
schema = Schema.FLOAT;
break;
case "DOUBLE":
schema = Schema.DOUBLE;
break;
case "DATE":
schema = Schema.DATE;
break;
case "TIME":
schema = Schema.TIME;
break;
case "TIMESTAMP":
schema = Schema.TIMESTAMP;
break;
case "INSTANT":
schema = Schema.INSTANT;
break;
case "LOCAL_DATE":
schema = Schema.LOCAL_DATE;
break;
case "LOCAL_TIME":
schema = Schema.LOCAL_TIME;
break;
case "LOCAL_DATE_TIME":
schema = Schema.LOCAL_DATE_TIME;
break;
case "BYTES":
schema = Schema.BYTES;
break;
// Report an error if non-valid, non-empty schema type string is provided
default:
throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr);
}
return schema;
}
///////
// Complex strut type: Avro or Json
public static boolean isAvroSchemaTypeStr(String typeStr) {
return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AVRO"));
return "AVRO".equalsIgnoreCase(typeStr);
}
// automatic decode the type from the Registry
public static boolean isAutoConsumeSchemaTypeStr(String typeStr) {
return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AUTO_CONSUME"));
return "AUTO_CONSUME".equalsIgnoreCase(typeStr);
}
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
String schemaDefinitionStr = definitionStr;
String filePrefix = "file://";
Schema<?> schema;
private static final Map<String, Schema<?>> AVRO_SCHEMA_CACHE = new ConcurrentHashMap<>();
public static Schema<?> getAvroSchema(String typeStr, final String definitionStr) {
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
if (isAvroSchemaTypeStr(typeStr)) {
if (StringUtils.isBlank(schemaDefinitionStr)) {
throw new PulsarAdapterInvalidParamException(
"Schema definition must be provided for \"Avro\" schema type!");
if (StringUtils.isBlank(definitionStr)) {
throw new PulsarAdapterInvalidParamException("Schema definition must be provided for \"Avro\" schema type!");
}
else if (schemaDefinitionStr.startsWith(filePrefix)) {
try {
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII);
return AVRO_SCHEMA_CACHE.computeIfAbsent(definitionStr, __ -> {
String schemaDefinitionStr = definitionStr;
if (schemaDefinitionStr.startsWith("file://")) {
try {
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.UTF_8);
} catch (IOException ioe) {
throw new PulsarAdapterUnexpectedException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
}
}
catch (IOException ioe) {
throw new PulsarAdapterUnexpectedException(
"Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
}
}
schema = PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
return PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
});
} else {
throw new PulsarAdapterInvalidParamException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
}
else {
throw new PulsarAdapterInvalidParamException(
"Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
}
return schema;
}
///////
// Generate effective key string
public static String buildCacheKey(String... keyParts) {
// Ignore blank keyPart
String joinedKeyStr =
Stream.of(keyParts)
.filter(s -> !StringUtils.isBlank(s))
.collect(Collectors.joining(","));
return Base64.getEncoder().encodeToString(joinedKeyStr.getBytes());
}
///////
// Convert JSON string to a key/value map
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonStr, Map.class);
private static final ObjectMapper JACKSON_OBJECT_MAPPER = new ObjectMapper();
private static final TypeReference<Map<String, String>> MAP_TYPE_REF = new TypeReference<>() {};
public static Map<String, String> convertJsonToMap(String jsonStr) throws IOException {
return JACKSON_OBJECT_MAPPER.readValue(jsonStr, MAP_TYPE_REF);
}
///////
// Get full namespace name (<tenant>/<namespace>) from a Pulsar topic URI
public static String getFullNamespaceName(String topicUri) {