mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Optimize enum mappings and cache Avro schemas in PulsarAdapterUtil
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.pulsar.util;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterInvalidParamException;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
||||
@@ -23,6 +24,8 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
import org.apache.pulsar.common.schema.SchemaInfo;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@@ -30,7 +33,11 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
import java.util.Base64;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -60,9 +67,6 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidDocLevelParam(String param) {
|
||||
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
|
||||
}
|
||||
|
||||
///////
|
||||
// Message processing sequence error simulation types
|
||||
@@ -77,29 +81,21 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = new HashMap<>();
|
||||
|
||||
static {
|
||||
for (MSG_SEQ_ERROR_SIMU_TYPE simuType : values()) {
|
||||
MAPPING.put(simuType.label, simuType);
|
||||
MAPPING.put(simuType.label.toLowerCase(), simuType);
|
||||
MAPPING.put(simuType.label.toUpperCase(), simuType);
|
||||
MAPPING.put(simuType.name(), simuType);
|
||||
MAPPING.put(simuType.name().toLowerCase(), simuType);
|
||||
MAPPING.put(simuType.name().toUpperCase(), simuType);
|
||||
}
|
||||
}
|
||||
private static final Map<String, MSG_SEQ_ERROR_SIMU_TYPE> MAPPING = Stream.of(values())
|
||||
.flatMap(simuType ->
|
||||
Stream.of(simuType.label,
|
||||
simuType.label.toLowerCase(),
|
||||
simuType.label.toUpperCase(),
|
||||
simuType.name(),
|
||||
simuType.name().toLowerCase(),
|
||||
simuType.name().toUpperCase())
|
||||
.distinct().map(key -> Map.entry(key, simuType)))
|
||||
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
|
||||
public static Optional<MSG_SEQ_ERROR_SIMU_TYPE> parseSimuType(String simuTypeString) {
|
||||
return Optional.ofNullable(MAPPING.get(simuTypeString.trim()));
|
||||
}
|
||||
}
|
||||
public static boolean isValidSeqErrSimuType(String item) {
|
||||
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
public static String getValidSeqErrSimuTypeList() {
|
||||
return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
}
|
||||
|
||||
///////
|
||||
// Valid Pulsar API type
|
||||
@@ -113,12 +109,15 @@ public class PulsarAdapterUtil {
|
||||
PULSAR_API_TYPE(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet());
|
||||
|
||||
public static boolean isValidLabel(String label) {
|
||||
return LABELS.contains(label);
|
||||
}
|
||||
}
|
||||
public static boolean isValidPulsarApiType(String param) {
|
||||
return Arrays.stream(PULSAR_API_TYPE.values()).anyMatch(t -> t.label.equals(param));
|
||||
}
|
||||
public static String getValidPulsarApiTypeList() {
|
||||
return Arrays.stream(PULSAR_API_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
return PULSAR_API_TYPE.isValidLabel(param);
|
||||
}
|
||||
|
||||
|
||||
@@ -136,14 +135,16 @@ public class PulsarAdapterUtil {
|
||||
CONF_GATEGORY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet());
|
||||
|
||||
public static boolean isValidLabel(String label) {
|
||||
return LABELS.contains(label);
|
||||
}
|
||||
}
|
||||
public static boolean isValidConfCategory(String item) {
|
||||
return Arrays.stream(CONF_GATEGORY.values()).anyMatch(t -> t.label.equals(item));
|
||||
return CONF_GATEGORY.isValidLabel(item);
|
||||
}
|
||||
public static String getValidConfCategoryList() {
|
||||
return Arrays.stream(CONF_GATEGORY.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
}
|
||||
|
||||
///////
|
||||
// Valid persistence type
|
||||
public enum PERSISTENT_TYPES {
|
||||
@@ -156,9 +157,6 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidPersistenceType(String type) {
|
||||
return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||
}
|
||||
|
||||
///////
|
||||
// Valid Pulsar client configuration (activity-level settings)
|
||||
@@ -194,9 +192,6 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidClientConfItem(String item) {
|
||||
return Arrays.stream(CLNT_CONF_KEY.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
|
||||
///////
|
||||
// Standard producer configuration (activity-level settings)
|
||||
@@ -222,9 +217,6 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isStandardProducerConfItem(String item) {
|
||||
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
|
||||
// compressionType
|
||||
public enum COMPRESSION_TYPE {
|
||||
@@ -239,12 +231,12 @@ public class PulsarAdapterUtil {
|
||||
COMPRESSION_TYPE(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
}
|
||||
public static boolean isValidCompressionType(String item) {
|
||||
return Arrays.stream(COMPRESSION_TYPE.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
|
||||
public static String getValidCompressionTypeList() {
|
||||
return Arrays.stream(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
return COMPRESSION_TYPE.TYPE_LIST;
|
||||
}
|
||||
|
||||
///////
|
||||
@@ -284,9 +276,6 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isStandardConsumerConfItem(String item) {
|
||||
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
|
||||
///////
|
||||
// Custom consumer configuration (activity-level settings)
|
||||
@@ -301,9 +290,16 @@ public class PulsarAdapterUtil {
|
||||
CONSUMER_CONF_CUSTOM_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet());
|
||||
|
||||
public static boolean isValidLabel(String label) {
|
||||
return LABELS.contains(label);
|
||||
}
|
||||
|
||||
}
|
||||
public static boolean isCustomConsumerConfItem(String item) {
|
||||
return Arrays.stream(CONSUMER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
|
||||
return CONSUMER_CONF_CUSTOM_KEY.isValidLabel(item);
|
||||
}
|
||||
|
||||
// subscriptionTyp
|
||||
@@ -318,12 +314,21 @@ public class PulsarAdapterUtil {
|
||||
SUBSCRIPTION_TYPE(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
|
||||
.collect(Collectors.toUnmodifiableSet());
|
||||
|
||||
public static boolean isValidLabel(String label) {
|
||||
return LABELS.contains(label);
|
||||
}
|
||||
|
||||
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
}
|
||||
public static boolean isValidSubscriptionType(String item) {
|
||||
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch(t -> t.label.equals(item));
|
||||
return SUBSCRIPTION_TYPE.isValidLabel(item);
|
||||
}
|
||||
public static String getValidSubscriptionTypeList() {
|
||||
return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
return SUBSCRIPTION_TYPE.TYPE_LIST;
|
||||
}
|
||||
|
||||
// subscriptionInitialPosition
|
||||
@@ -336,12 +341,12 @@ public class PulsarAdapterUtil {
|
||||
SUBSCRIPTION_INITIAL_POSITION(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidSubscriptionInitialPosition(String item) {
|
||||
return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).anyMatch(t -> t.label.equals(item));
|
||||
|
||||
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
|
||||
}
|
||||
public static String getValidSubscriptionInitialPositionList() {
|
||||
return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
return SUBSCRIPTION_INITIAL_POSITION.TYPE_LIST;
|
||||
}
|
||||
|
||||
// regexSubscriptionMode
|
||||
@@ -355,12 +360,12 @@ public class PulsarAdapterUtil {
|
||||
REGEX_SUBSCRIPTION_MODE(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
}
|
||||
public static boolean isValidRegexSubscriptionMode(String item) {
|
||||
return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
|
||||
public static String getValidRegexSubscriptionModeList() {
|
||||
return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).map(t -> t.label).collect(Collectors.joining(", "));
|
||||
return REGEX_SUBSCRIPTION_MODE.TYPE_LIST;
|
||||
}
|
||||
|
||||
///////
|
||||
@@ -383,9 +388,6 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isStandardReaderConfItem(String item) {
|
||||
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
|
||||
///////
|
||||
// Custom reader configuration (activity-level settings)
|
||||
@@ -400,9 +402,6 @@ public class PulsarAdapterUtil {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isCustomReaderConfItem(String item) {
|
||||
return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
|
||||
}
|
||||
|
||||
///////
|
||||
// Valid read positions for a Pulsar reader
|
||||
@@ -415,135 +414,72 @@ public class PulsarAdapterUtil {
|
||||
READER_MSG_POSITION_TYPE(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
|
||||
.collect(Collectors.toUnmodifiableSet());
|
||||
|
||||
public static boolean isValidLabel(String label) {
|
||||
return LABELS.contains(label);
|
||||
}
|
||||
}
|
||||
public static boolean isValideReaderStartPosition(String item) {
|
||||
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item));
|
||||
return READER_MSG_POSITION_TYPE.isValidLabel(item);
|
||||
}
|
||||
|
||||
private static final Map<String, Schema<?>> PRIMITIVE_SCHEMA_TYPE_MAPPING = Stream.of(SchemaType.values())
|
||||
.filter(SchemaType::isPrimitive)
|
||||
.collect(Collectors.toUnmodifiableMap(schemaType -> schemaType.name().toUpperCase(),
|
||||
schemaType -> Schema.getSchema(SchemaInfo.builder().type(schemaType).build())));
|
||||
|
||||
///////
|
||||
// Primitive Schema type
|
||||
public static boolean isPrimitiveSchemaTypeStr(String typeStr) {
|
||||
boolean isPrimitive = false;
|
||||
|
||||
// Use "BYTES" as the default type if the type string is not explicitly specified
|
||||
if (StringUtils.isBlank(typeStr)) {
|
||||
typeStr = "BYTES";
|
||||
}
|
||||
|
||||
if (typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") ||
|
||||
typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") ||
|
||||
typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") ||
|
||||
typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") ||
|
||||
typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") ||
|
||||
typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") ||
|
||||
typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") ||
|
||||
typeStr.equalsIgnoreCase("LOCAL_DATE_TIME")) {
|
||||
isPrimitive = true;
|
||||
}
|
||||
|
||||
return isPrimitive;
|
||||
return StringUtils.isBlank(typeStr) || PRIMITIVE_SCHEMA_TYPE_MAPPING.containsKey(typeStr.toUpperCase());
|
||||
}
|
||||
|
||||
public static Schema<?> getPrimitiveTypeSchema(String typeStr) {
|
||||
Schema<?> schema;
|
||||
|
||||
if (StringUtils.isBlank(typeStr)) {
|
||||
typeStr = "BYTES";
|
||||
String lookupKey = StringUtils.isBlank(typeStr) ? "BYTES" : typeStr.toUpperCase();
|
||||
Schema<?> schema = PRIMITIVE_SCHEMA_TYPE_MAPPING.get(lookupKey);
|
||||
if (schema == null) {
|
||||
throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr);
|
||||
}
|
||||
|
||||
switch (typeStr.toUpperCase()) {
|
||||
case "BOOLEAN":
|
||||
schema = Schema.BOOL;
|
||||
break;
|
||||
case "INT8":
|
||||
schema = Schema.INT8;
|
||||
break;
|
||||
case "INT16":
|
||||
schema = Schema.INT16;
|
||||
break;
|
||||
case "INT32":
|
||||
schema = Schema.INT32;
|
||||
break;
|
||||
case "INT64":
|
||||
schema = Schema.INT64;
|
||||
break;
|
||||
case "FLOAT":
|
||||
schema = Schema.FLOAT;
|
||||
break;
|
||||
case "DOUBLE":
|
||||
schema = Schema.DOUBLE;
|
||||
break;
|
||||
case "DATE":
|
||||
schema = Schema.DATE;
|
||||
break;
|
||||
case "TIME":
|
||||
schema = Schema.TIME;
|
||||
break;
|
||||
case "TIMESTAMP":
|
||||
schema = Schema.TIMESTAMP;
|
||||
break;
|
||||
case "INSTANT":
|
||||
schema = Schema.INSTANT;
|
||||
break;
|
||||
case "LOCAL_DATE":
|
||||
schema = Schema.LOCAL_DATE;
|
||||
break;
|
||||
case "LOCAL_TIME":
|
||||
schema = Schema.LOCAL_TIME;
|
||||
break;
|
||||
case "LOCAL_DATE_TIME":
|
||||
schema = Schema.LOCAL_DATE_TIME;
|
||||
break;
|
||||
case "BYTES":
|
||||
schema = Schema.BYTES;
|
||||
break;
|
||||
// Report an error if non-valid, non-empty schema type string is provided
|
||||
default:
|
||||
throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr);
|
||||
}
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
///////
|
||||
// Complex strut type: Avro or Json
|
||||
public static boolean isAvroSchemaTypeStr(String typeStr) {
|
||||
return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AVRO"));
|
||||
return "AVRO".equalsIgnoreCase(typeStr);
|
||||
}
|
||||
|
||||
// automatic decode the type from the Registry
|
||||
public static boolean isAutoConsumeSchemaTypeStr(String typeStr) {
|
||||
return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AUTO_CONSUME"));
|
||||
return "AUTO_CONSUME".equalsIgnoreCase(typeStr);
|
||||
}
|
||||
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
|
||||
String schemaDefinitionStr = definitionStr;
|
||||
String filePrefix = "file://";
|
||||
Schema<?> schema;
|
||||
|
||||
private static final Map<String, Schema<?>> AVRO_SCHEMA_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
public static Schema<?> getAvroSchema(String typeStr, final String definitionStr) {
|
||||
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
|
||||
if (isAvroSchemaTypeStr(typeStr)) {
|
||||
if (StringUtils.isBlank(schemaDefinitionStr)) {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
"Schema definition must be provided for \"Avro\" schema type!");
|
||||
if (StringUtils.isBlank(definitionStr)) {
|
||||
throw new PulsarAdapterInvalidParamException("Schema definition must be provided for \"Avro\" schema type!");
|
||||
}
|
||||
else if (schemaDefinitionStr.startsWith(filePrefix)) {
|
||||
try {
|
||||
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
|
||||
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII);
|
||||
return AVRO_SCHEMA_CACHE.computeIfAbsent(definitionStr, __ -> {
|
||||
String schemaDefinitionStr = definitionStr;
|
||||
if (schemaDefinitionStr.startsWith("file://")) {
|
||||
try {
|
||||
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
|
||||
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.UTF_8);
|
||||
} catch (IOException ioe) {
|
||||
throw new PulsarAdapterUnexpectedException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
|
||||
}
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
throw new PulsarAdapterUnexpectedException(
|
||||
"Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
schema = PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
|
||||
return PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
|
||||
});
|
||||
} else {
|
||||
throw new PulsarAdapterInvalidParamException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
|
||||
}
|
||||
else {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
"Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
|
||||
}
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
///////
|
||||
@@ -560,11 +496,14 @@ public class PulsarAdapterUtil {
|
||||
|
||||
///////
|
||||
// Convert JSON string to a key/value map
|
||||
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
return mapper.readValue(jsonStr, Map.class);
|
||||
private static final ObjectMapper JACKSON_OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final TypeReference<Map<String, String>> MAP_TYPE_REF = new TypeReference<>() {};
|
||||
|
||||
public static Map<String, String> convertJsonToMap(String jsonStr) throws IOException {
|
||||
return JACKSON_OBJECT_MAPPER.readValue(jsonStr, MAP_TYPE_REF);
|
||||
}
|
||||
|
||||
|
||||
///////
|
||||
// Get full namespace name (<tenant>/<namespace>) from a Pulsar topic URI
|
||||
public static String getFullNamespaceName(String topicUri) {
|
||||
|
||||
Reference in New Issue
Block a user