Add caching to Avro schema instantiation

This commit is contained in:
Lari Hotari 2023-02-06 09:02:20 +02:00
parent 4948a4101f
commit a5b0b3cb2d

View File

@ -37,6 +37,7 @@ import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -364,30 +365,31 @@ public class PulsarActivityUtil {
public static boolean isAutoConsumeSchemaTypeStr(String typeStr) {
return typeStr.equalsIgnoreCase("AUTO_CONSUME");
}
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
String schemaDefinitionStr = definitionStr;
String filePrefix = "file://";
Schema<?> schema;
private static final Map<String, Schema<?>> AVRO_SCHEMA_CACHE = new ConcurrentHashMap<>();
public static Schema<?> getAvroSchema(String typeStr, final String definitionStr) {
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
if (isAvroSchemaTypeStr(typeStr)) {
if (StringUtils.isBlank(schemaDefinitionStr)) {
if (StringUtils.isBlank(definitionStr)) {
throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!");
} else if (schemaDefinitionStr.startsWith(filePrefix)) {
try {
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII);
} catch (IOException ioe) {
throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
}
}
schema = AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
return AVRO_SCHEMA_CACHE.computeIfAbsent(definitionStr, __ -> {
String schemaDefinitionStr = definitionStr;
if (schemaDefinitionStr.startsWith("file://")) {
try {
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII);
} catch (IOException ioe) {
throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
}
}
return AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
});
} else {
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
}
return schema;
}
///////