From 10da1699edd0e048bfb5f2ca42ff440a6c271ecb Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 7 Feb 2023 07:35:00 +0200 Subject: [PATCH] Optimize enum mappings and cache Avro schemas in PulsarAdapterUtil --- .../pulsar/util/PulsarAdapterUtil.java | 277 +++++++----------- 1 file changed, 108 insertions(+), 169 deletions(-) diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java index e61f76eea..1429167d1 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarAdapterUtil.java @@ -16,6 +16,7 @@ package io.nosqlbench.adapter.pulsar.util; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterInvalidParamException; import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException; @@ -23,6 +24,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import java.io.IOException; import java.net.URI; @@ -30,7 +33,11 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.*; +import java.util.Base64; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -60,9 +67,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isValidDocLevelParam(String param) { - return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param)); - } /////// // Message processing sequence error simulation types @@ -77,29 +81,21 @@ public class PulsarAdapterUtil { this.label = label; } - private static final Map MAPPING = new HashMap<>(); - - static { - for (MSG_SEQ_ERROR_SIMU_TYPE simuType : values()) { - MAPPING.put(simuType.label, simuType); - MAPPING.put(simuType.label.toLowerCase(), simuType); - MAPPING.put(simuType.label.toUpperCase(), simuType); - MAPPING.put(simuType.name(), simuType); - MAPPING.put(simuType.name().toLowerCase(), simuType); - MAPPING.put(simuType.name().toUpperCase(), simuType); - } - } + private static final Map MAPPING = Stream.of(values()) + .flatMap(simuType -> + Stream.of(simuType.label, + simuType.label.toLowerCase(), + simuType.label.toUpperCase(), + simuType.name(), + simuType.name().toLowerCase(), + simuType.name().toUpperCase()) + .distinct().map(key -> Map.entry(key, simuType))) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); public static Optional parseSimuType(String simuTypeString) { return Optional.ofNullable(MAPPING.get(simuTypeString.trim())); } } - public static boolean isValidSeqErrSimuType(String item) { - return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item)); - } - public static String getValidSeqErrSimuTypeList() { - return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); - } /////// // Valid Pulsar API type @@ -113,12 +109,15 @@ public class PulsarAdapterUtil { PULSAR_API_TYPE(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } } public static boolean isValidPulsarApiType(String param) { - return Arrays.stream(PULSAR_API_TYPE.values()).anyMatch(t -> t.label.equals(param)); - } - public static String getValidPulsarApiTypeList() { - return Arrays.stream(PULSAR_API_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return PULSAR_API_TYPE.isValidLabel(param); } @@ -136,14 +135,16 @@ public class PulsarAdapterUtil { CONF_GATEGORY(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } } public static boolean isValidConfCategory(String item) { - return Arrays.stream(CONF_GATEGORY.values()).anyMatch(t -> t.label.equals(item)); + return CONF_GATEGORY.isValidLabel(item); } - public static String getValidConfCategoryList() { - return Arrays.stream(CONF_GATEGORY.values()).map(t -> t.label).collect(Collectors.joining(", ")); - } - /////// // Valid persistence type public enum PERSISTENT_TYPES { @@ -156,9 +157,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isValidPersistenceType(String type) { - return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch(t -> t.label.equals(type)); - } /////// // Valid Pulsar client configuration (activity-level settings) @@ -194,9 +192,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isValidClientConfItem(String item) { - return Arrays.stream(CLNT_CONF_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Standard producer configuration (activity-level settings) @@ -222,9 +217,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isStandardProducerConfItem(String item) { - return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); - } // compressionType public enum COMPRESSION_TYPE { @@ -239,12 +231,12 @@ public class PulsarAdapterUtil { COMPRESSION_TYPE(String label) { this.label = label; } + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); } - public static boolean isValidCompressionType(String item) { - return Arrays.stream(COMPRESSION_TYPE.values()).anyMatch(t -> t.label.equals(item)); - } + public static String getValidCompressionTypeList() { - return Arrays.stream(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return COMPRESSION_TYPE.TYPE_LIST; } /////// @@ -284,9 +276,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isStandardConsumerConfItem(String item) { - return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Custom consumer configuration (activity-level settings) @@ -301,9 +290,16 @@ public class PulsarAdapterUtil { CONSUMER_CONF_CUSTOM_KEY(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } + } public static boolean isCustomConsumerConfItem(String item) { - return Arrays.stream(CONSUMER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item)); + return CONSUMER_CONF_CUSTOM_KEY.isValidLabel(item); } // subscriptionTyp @@ -318,12 +314,21 @@ public class PulsarAdapterUtil { SUBSCRIPTION_TYPE(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label) + .collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); } public static boolean isValidSubscriptionType(String item) { - return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch(t -> t.label.equals(item)); + return SUBSCRIPTION_TYPE.isValidLabel(item); } public static String getValidSubscriptionTypeList() { - return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return SUBSCRIPTION_TYPE.TYPE_LIST; } // subscriptionInitialPosition @@ -336,12 +341,12 @@ public class PulsarAdapterUtil { SUBSCRIPTION_INITIAL_POSITION(String label) { this.label = label; } - } - public static boolean isValidSubscriptionInitialPosition(String item) { - return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).anyMatch(t -> t.label.equals(item)); + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } public static String getValidSubscriptionInitialPositionList() { - return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return SUBSCRIPTION_INITIAL_POSITION.TYPE_LIST; } // regexSubscriptionMode @@ -355,12 +360,12 @@ public class PulsarAdapterUtil { REGEX_SUBSCRIPTION_MODE(String label) { this.label = label; } + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); } - public static boolean isValidRegexSubscriptionMode(String item) { - return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).anyMatch(t -> t.label.equals(item)); - } + public static String getValidRegexSubscriptionModeList() { - return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return REGEX_SUBSCRIPTION_MODE.TYPE_LIST; } /////// @@ -383,9 +388,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isStandardReaderConfItem(String item) { - return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Custom reader configuration (activity-level settings) @@ -400,9 +402,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isCustomReaderConfItem(String item) { - return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Valid read positions for a Pulsar reader @@ -415,135 +414,72 @@ public class PulsarAdapterUtil { READER_MSG_POSITION_TYPE(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label) + .collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } } public static boolean isValideReaderStartPosition(String item) { - return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item)); + return READER_MSG_POSITION_TYPE.isValidLabel(item); } + private static final Map> PRIMITIVE_SCHEMA_TYPE_MAPPING = Stream.of(SchemaType.values()) + .filter(SchemaType::isPrimitive) + .collect(Collectors.toUnmodifiableMap(schemaType -> schemaType.name().toUpperCase(), + schemaType -> Schema.getSchema(SchemaInfo.builder().type(schemaType).build()))); + /////// // Primitive Schema type public static boolean isPrimitiveSchemaTypeStr(String typeStr) { - boolean isPrimitive = false; - - // Use "BYTES" as the default type if the type string is not explicitly specified - if (StringUtils.isBlank(typeStr)) { - typeStr = "BYTES"; - } - - if (typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") || - typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") || - typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") || - typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") || - typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") || - typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") || - typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") || - typeStr.equalsIgnoreCase("LOCAL_DATE_TIME")) { - isPrimitive = true; - } - - return isPrimitive; + return StringUtils.isBlank(typeStr) || PRIMITIVE_SCHEMA_TYPE_MAPPING.containsKey(typeStr.toUpperCase()); } + public static Schema getPrimitiveTypeSchema(String typeStr) { - Schema schema; - - if (StringUtils.isBlank(typeStr)) { - typeStr = "BYTES"; + String lookupKey = StringUtils.isBlank(typeStr) ? "BYTES" : typeStr.toUpperCase(); + Schema schema = PRIMITIVE_SCHEMA_TYPE_MAPPING.get(lookupKey); + if (schema == null) { + throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr); } - - switch (typeStr.toUpperCase()) { - case "BOOLEAN": - schema = Schema.BOOL; - break; - case "INT8": - schema = Schema.INT8; - break; - case "INT16": - schema = Schema.INT16; - break; - case "INT32": - schema = Schema.INT32; - break; - case "INT64": - schema = Schema.INT64; - break; - case "FLOAT": - schema = Schema.FLOAT; - break; - case "DOUBLE": - schema = Schema.DOUBLE; - break; - case "DATE": - schema = Schema.DATE; - break; - case "TIME": - schema = Schema.TIME; - break; - case "TIMESTAMP": - schema = Schema.TIMESTAMP; - break; - case "INSTANT": - schema = Schema.INSTANT; - break; - case "LOCAL_DATE": - schema = Schema.LOCAL_DATE; - break; - case "LOCAL_TIME": - schema = Schema.LOCAL_TIME; - break; - case "LOCAL_DATE_TIME": - schema = Schema.LOCAL_DATE_TIME; - break; - case "BYTES": - schema = Schema.BYTES; - break; - // Report an error if non-valid, non-empty schema type string is provided - default: - throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr); - } - return schema; } /////// // Complex strut type: Avro or Json public static boolean isAvroSchemaTypeStr(String typeStr) { - return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AVRO")); + return "AVRO".equalsIgnoreCase(typeStr); } // automatic decode the type from the Registry public static boolean isAutoConsumeSchemaTypeStr(String typeStr) { - return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AUTO_CONSUME")); + return "AUTO_CONSUME".equalsIgnoreCase(typeStr); } - public static Schema getAvroSchema(String typeStr, String definitionStr) { - String schemaDefinitionStr = definitionStr; - String filePrefix = "file://"; - Schema schema; + private static final Map> AVRO_SCHEMA_CACHE = new ConcurrentHashMap<>(); + + public static Schema getAvroSchema(String typeStr, final String definitionStr) { // Check if payloadStr points to a file (e.g. "file:///path/to/a/file") if (isAvroSchemaTypeStr(typeStr)) { - if (StringUtils.isBlank(schemaDefinitionStr)) { - throw new PulsarAdapterInvalidParamException( - "Schema definition must be provided for \"Avro\" schema type!"); + if (StringUtils.isBlank(definitionStr)) { + throw new PulsarAdapterInvalidParamException("Schema definition must be provided for \"Avro\" schema type!"); } - else if (schemaDefinitionStr.startsWith(filePrefix)) { - try { - Path filePath = Paths.get(URI.create(schemaDefinitionStr)); - schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII); + return AVRO_SCHEMA_CACHE.computeIfAbsent(definitionStr, __ -> { + String schemaDefinitionStr = definitionStr; + if (schemaDefinitionStr.startsWith("file://")) { + try { + Path filePath = Paths.get(URI.create(schemaDefinitionStr)); + schemaDefinitionStr = Files.readString(filePath, StandardCharsets.UTF_8); + } catch (IOException ioe) { + throw new PulsarAdapterUnexpectedException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage()); + } } - catch (IOException ioe) { - throw new PulsarAdapterUnexpectedException( - "Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage()); - } - } - - schema = PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr); + return PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr); + }); + } else { + throw new PulsarAdapterInvalidParamException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr); } - else { - throw new PulsarAdapterInvalidParamException( - "Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr); - } - - return schema; } /////// @@ -560,11 +496,14 @@ public class PulsarAdapterUtil { /////// // Convert JSON string to a key/value map - public static Map convertJsonToMap(String jsonStr) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonStr, Map.class); + private static final ObjectMapper JACKSON_OBJECT_MAPPER = new ObjectMapper(); + private static final TypeReference> MAP_TYPE_REF = new TypeReference<>() {}; + + public static Map convertJsonToMap(String jsonStr) throws IOException { + return JACKSON_OBJECT_MAPPER.readValue(jsonStr, MAP_TYPE_REF); } + /////// // Get full namespace name (/) from a Pulsar topic URI public static String getFullNamespaceName(String topicUri) {