diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index aeae33af4..64e170b7e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -24,6 +24,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import java.io.IOException; import java.net.URI; @@ -35,6 +37,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -332,85 +335,23 @@ public class PulsarActivityUtil { } } + private static final Map> PRIMITIVE_SCHEMA_TYPE_MAPPING = Stream.of(SchemaType.values()) + .filter(SchemaType::isPrimitive) + .collect(Collectors.toUnmodifiableMap(schemaType -> schemaType.name().toUpperCase(), + schemaType -> Schema.getSchema(SchemaInfo.builder().type(schemaType).build()))); + /////// // Primitive Schema type public static boolean isPrimitiveSchemaTypeStr(String typeStr) { - boolean isPrimitive = false; - - // Use "BYTES" as the default type if the type string is not explicitly specified - if (StringUtils.isBlank(typeStr)) { - typeStr = "BYTES"; - } - - if (typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") || - typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") || - typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") || - typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") || - typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") || - typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") || - typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") || - typeStr.equalsIgnoreCase("LOCAL_DATE_TIME")) { - isPrimitive = true; - } - - return isPrimitive; + return StringUtils.isBlank(typeStr) || PRIMITIVE_SCHEMA_TYPE_MAPPING.containsKey(typeStr.toUpperCase()); } + public static Schema getPrimitiveTypeSchema(String typeStr) { - Schema schema; - - switch (typeStr.toUpperCase()) { - case "BOOLEAN": - schema = Schema.BOOL; - break; - case "INT8": - schema = Schema.INT8; - break; - case "INT16": - schema = Schema.INT16; - break; - case "INT32": - schema = Schema.INT32; - break; - case "INT64": - schema = Schema.INT64; - break; - case "FLOAT": - schema = Schema.FLOAT; - break; - case "DOUBLE": - schema = Schema.DOUBLE; - break; - case "DATE": - schema = Schema.DATE; - break; - case "TIME": - schema = Schema.TIME; - break; - case "TIMESTAMP": - schema = Schema.TIMESTAMP; - break; - case "INSTANT": - schema = Schema.INSTANT; - break; - case "LOCAL_DATE": - schema = Schema.LOCAL_DATE; - break; - case "LOCAL_TIME": - schema = Schema.LOCAL_TIME; - break; - case "LOCAL_DATE_TIME": - schema = Schema.LOCAL_DATE_TIME; - break; - // Use BYTES as the default schema type if the type string is not specified - case "": - case "BYTES": - schema = Schema.BYTES; - break; - // Report an error if non-valid, non-empty schema type string is provided - default: - throw new RuntimeException("Invalid Pulsar primitive schema type string : " + typeStr); + String lookupKey = StringUtils.isBlank(typeStr) ? "BYTES" : typeStr.toUpperCase(); + Schema schema = PRIMITIVE_SCHEMA_TYPE_MAPPING.get(lookupKey); + if (schema == null) { + throw new RuntimeException("Invalid Pulsar primitive schema type string : " + typeStr); } - return schema; } @@ -424,30 +365,31 @@ public class PulsarActivityUtil { public static boolean isAutoConsumeSchemaTypeStr(String typeStr) { return typeStr.equalsIgnoreCase("AUTO_CONSUME"); } - public static Schema getAvroSchema(String typeStr, String definitionStr) { - String schemaDefinitionStr = definitionStr; - String filePrefix = "file://"; - Schema schema; + + private static final Map> AVRO_SCHEMA_CACHE = new ConcurrentHashMap<>(); + + public static Schema getAvroSchema(String typeStr, final String definitionStr) { // Check if payloadStr points to a file (e.g. "file:///path/to/a/file") if (isAvroSchemaTypeStr(typeStr)) { - if (StringUtils.isBlank(schemaDefinitionStr)) { + if (StringUtils.isBlank(definitionStr)) { throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!"); - } else if (schemaDefinitionStr.startsWith(filePrefix)) { - try { - Path filePath = Paths.get(URI.create(schemaDefinitionStr)); - schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII); - } catch (IOException ioe) { - throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage()); - } } - - schema = AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr); + return AVRO_SCHEMA_CACHE.computeIfAbsent(definitionStr, __ -> { + String schemaDefinitionStr = definitionStr; + if (schemaDefinitionStr.startsWith("file://")) { + try { + Path filePath = Paths.get(URI.create(schemaDefinitionStr)); + schemaDefinitionStr = Files.readString(filePath, StandardCharsets.UTF_8); + } catch (IOException ioe) { + throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage()); + } + } + return AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr); + }); } else { throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr); } - - return schema; } ///////