mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-12-22 15:13:41 -06:00
Merge pull request #1005 from lhotari/lh-optimize-pulsar-driver-schema-type-mapping
Simplify and optimize Pulsar driver schema type mapping
This commit is contained in:
commit
7234d2a1f7
@ -24,6 +24,8 @@ import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
import org.apache.pulsar.common.schema.SchemaInfo;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@ -35,6 +37,7 @@ import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -332,85 +335,23 @@ public class PulsarActivityUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static final Map<String, Schema<?>> PRIMITIVE_SCHEMA_TYPE_MAPPING = Stream.of(SchemaType.values())
|
||||
.filter(SchemaType::isPrimitive)
|
||||
.collect(Collectors.toUnmodifiableMap(schemaType -> schemaType.name().toUpperCase(),
|
||||
schemaType -> Schema.getSchema(SchemaInfo.builder().type(schemaType).build())));
|
||||
|
||||
///////
|
||||
// Primitive Schema type
|
||||
public static boolean isPrimitiveSchemaTypeStr(String typeStr) {
|
||||
boolean isPrimitive = false;
|
||||
|
||||
// Use "BYTES" as the default type if the type string is not explicitly specified
|
||||
if (StringUtils.isBlank(typeStr)) {
|
||||
typeStr = "BYTES";
|
||||
}
|
||||
|
||||
if (typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") ||
|
||||
typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") ||
|
||||
typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") ||
|
||||
typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") ||
|
||||
typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") ||
|
||||
typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") ||
|
||||
typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") ||
|
||||
typeStr.equalsIgnoreCase("LOCAL_DATE_TIME")) {
|
||||
isPrimitive = true;
|
||||
}
|
||||
|
||||
return isPrimitive;
|
||||
return StringUtils.isBlank(typeStr) || PRIMITIVE_SCHEMA_TYPE_MAPPING.containsKey(typeStr.toUpperCase());
|
||||
}
|
||||
|
||||
public static Schema<?> getPrimitiveTypeSchema(String typeStr) {
|
||||
Schema<?> schema;
|
||||
|
||||
switch (typeStr.toUpperCase()) {
|
||||
case "BOOLEAN":
|
||||
schema = Schema.BOOL;
|
||||
break;
|
||||
case "INT8":
|
||||
schema = Schema.INT8;
|
||||
break;
|
||||
case "INT16":
|
||||
schema = Schema.INT16;
|
||||
break;
|
||||
case "INT32":
|
||||
schema = Schema.INT32;
|
||||
break;
|
||||
case "INT64":
|
||||
schema = Schema.INT64;
|
||||
break;
|
||||
case "FLOAT":
|
||||
schema = Schema.FLOAT;
|
||||
break;
|
||||
case "DOUBLE":
|
||||
schema = Schema.DOUBLE;
|
||||
break;
|
||||
case "DATE":
|
||||
schema = Schema.DATE;
|
||||
break;
|
||||
case "TIME":
|
||||
schema = Schema.TIME;
|
||||
break;
|
||||
case "TIMESTAMP":
|
||||
schema = Schema.TIMESTAMP;
|
||||
break;
|
||||
case "INSTANT":
|
||||
schema = Schema.INSTANT;
|
||||
break;
|
||||
case "LOCAL_DATE":
|
||||
schema = Schema.LOCAL_DATE;
|
||||
break;
|
||||
case "LOCAL_TIME":
|
||||
schema = Schema.LOCAL_TIME;
|
||||
break;
|
||||
case "LOCAL_DATE_TIME":
|
||||
schema = Schema.LOCAL_DATE_TIME;
|
||||
break;
|
||||
// Use BYTES as the default schema type if the type string is not specified
|
||||
case "":
|
||||
case "BYTES":
|
||||
schema = Schema.BYTES;
|
||||
break;
|
||||
// Report an error if non-valid, non-empty schema type string is provided
|
||||
default:
|
||||
throw new RuntimeException("Invalid Pulsar primitive schema type string : " + typeStr);
|
||||
String lookupKey = StringUtils.isBlank(typeStr) ? "BYTES" : typeStr.toUpperCase();
|
||||
Schema<?> schema = PRIMITIVE_SCHEMA_TYPE_MAPPING.get(lookupKey);
|
||||
if (schema == null) {
|
||||
throw new RuntimeException("Invalid Pulsar primitive schema type string : " + typeStr);
|
||||
}
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
@ -424,30 +365,31 @@ public class PulsarActivityUtil {
|
||||
public static boolean isAutoConsumeSchemaTypeStr(String typeStr) {
|
||||
return typeStr.equalsIgnoreCase("AUTO_CONSUME");
|
||||
}
|
||||
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
|
||||
String schemaDefinitionStr = definitionStr;
|
||||
String filePrefix = "file://";
|
||||
Schema<?> schema;
|
||||
|
||||
|
||||
private static final Map<String, Schema<?>> AVRO_SCHEMA_CACHE = new ConcurrentHashMap<>();
|
||||
|
||||
public static Schema<?> getAvroSchema(String typeStr, final String definitionStr) {
|
||||
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
|
||||
if (isAvroSchemaTypeStr(typeStr)) {
|
||||
if (StringUtils.isBlank(schemaDefinitionStr)) {
|
||||
if (StringUtils.isBlank(definitionStr)) {
|
||||
throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!");
|
||||
} else if (schemaDefinitionStr.startsWith(filePrefix)) {
|
||||
try {
|
||||
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
|
||||
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
schema = AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
|
||||
return AVRO_SCHEMA_CACHE.computeIfAbsent(definitionStr, __ -> {
|
||||
String schemaDefinitionStr = definitionStr;
|
||||
if (schemaDefinitionStr.startsWith("file://")) {
|
||||
try {
|
||||
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
|
||||
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.UTF_8);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
|
||||
}
|
||||
}
|
||||
return AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
|
||||
});
|
||||
} else {
|
||||
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
|
||||
}
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
///////
|
||||
|
Loading…
Reference in New Issue
Block a user