diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarSpace.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarSpace.java index 4aba161f1..9cc5e44c6 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarSpace.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/PulsarSpace.java @@ -32,8 +32,10 @@ import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; public class PulsarSpace implements AutoCloseable { @@ -50,9 +52,18 @@ public class PulsarSpace implements AutoCloseable { private PulsarAdmin pulsarAdmin; private Schema pulsarSchema; - private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); + public record ProducerCacheKey(String producerName, String topicName) { + } + + private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); + + public record ConsumerCacheKey(String consumerName, String subscriptionName, List topicNameList, String topicPattern) { + } + private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); + + public record ReaderCacheKey(String readerName, String topicName, String startMsgPosStr) { + } + private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); public PulsarSpace(String spaceName, NBConfiguration cfg) { @@ -89,13 +100,11 @@ public class PulsarSpace implements AutoCloseable { public int getProducerSetCnt() { return producers.size(); } public int getConsumerSetCnt() { return consumers.size(); } public int getReaderSetCnt() { return readers.size(); } - public Producer getProducer(String name) { return producers.get(name); } - public void setProducer(String name, Producer producer) { producers.put(name, producer); } - public Consumer getConsumer(String name) { return consumers.get(name); } - public void setConsumer(String name, Consumer consumer) { consumers.put(name, consumer); } + public Producer getProducer(ProducerCacheKey key, Supplier> producerSupplier) { return producers.computeIfAbsent(key, __ -> producerSupplier.get()); } - public Reader getReader(String name) { return readers.get(name); } - public void setReader(String name, Reader reader) { readers.put(name, reader); } + public Consumer getConsumer(ConsumerCacheKey key, Supplier> consumerSupplier) { return consumers.computeIfAbsent(key, __ -> consumerSupplier.get()); } + + public Reader getReader(ReaderCacheKey key, Supplier> readerSupplier) { return readers.computeIfAbsent(key, __ -> readerSupplier.get()); } /** diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarBaseOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarBaseOpDispenser.java index 1de99a097..2e1b40230 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarBaseOpDispenser.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarBaseOpDispenser.java @@ -37,7 +37,6 @@ import java.util.*; import java.util.function.LongFunction; import java.util.function.Predicate; import java.util.regex.Pattern; -import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; public abstract class PulsarBaseOpDispenser extends BaseOpDispenser implements NBNamedElement { @@ -239,10 +238,8 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser producer = pulsarSpace.getProducer(producerCacheKey); - - if (producer == null) { + PulsarSpace.ProducerCacheKey producerCacheKey = new PulsarSpace.ProducerCacheKey(producerName, topicName); + return pulsarSpace.getProducer(producerCacheKey, () -> { PulsarClient pulsarClient = pulsarSpace.getPulsarClient(); // Get other possible producer settings that are set at global level @@ -262,21 +259,17 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser producer = producerBuilder.create(); pulsarAdapterMetrics.registerProducerApiMetrics(producer, getPulsarAPIMetricsPrefix( PulsarAdapterUtil.PULSAR_API_TYPE.PRODUCER.label, producerName, topicName)); - } - catch (PulsarClientException ple) { + return producer; + } catch (PulsarClientException ple) { throw new PulsarAdapterUnexpectedException("Failed to create a Pulsar producer."); } - } - - return producer; + }); } private List getEffectiveConsumerTopicNameList(String cycleTopicNameListStr) { @@ -296,24 +289,6 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser topicNameList = getEffectiveConsumerTopicNameList(cycleTopicNameListStr); - String topicPatternStr = getEffectiveConValue( + String topicPatternStr = StringUtils.trimToNull(getEffectiveConValue( PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, - cycleTopicPatternStr); - Pattern topicPattern = getEffectiveConsumerTopicPattern(cycleTopicPatternStr); + cycleTopicPatternStr)); String subscriptionName = getEffectiveConValue( PulsarAdapterUtil.CONF_GATEGORY.Consumer.label, @@ -368,28 +342,14 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser 1 || (topicPattern != null)); - - String consumerTopicListString; - if (!topicNameList.isEmpty()) { - consumerTopicListString = String.join("|", topicNameList); - } else { - consumerTopicListString = topicPatternStr; - } - - String consumerCacheKey = PulsarAdapterUtil.buildCacheKey( - consumerName, - subscriptionName, - consumerTopicListString); - Consumer consumer = pulsarSpace.getConsumer(consumerCacheKey); - - if (consumer == null) { + return pulsarSpace.getConsumer( + new PulsarSpace.ConsumerCacheKey(consumerName, subscriptionName, topicNameList, topicPatternStr), () -> { PulsarClient pulsarClient = pulsarSpace.getPulsarClient(); // Get other possible consumer settings that are set at global level @@ -417,6 +377,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser 1 || (topicPatternStr != null)); if (!multiTopicConsumer) { assert (topicNameList.size() == 1); consumerBuilder = pulsarClient.newConsumer(pulsarSpace.getPulsarSchema()); @@ -429,6 +390,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser consumer = consumerBuilder.subscribe(); + String consumerTopicListString = (!topicNameList.isEmpty()) ? String.join("|", topicNameList) : topicPatternStr; pulsarAdapterMetrics.registerConsumerApiMetrics( consumer, getPulsarAPIMetricsPrefix( PulsarAdapterUtil.PULSAR_API_TYPE.CONSUMER.label, consumerName, consumerTopicListString)); + + return consumer; } catch (PulsarClientException ple) { throw new PulsarAdapterUnexpectedException("Failed to create a Pulsar consumer!"); } - } - - return consumer; + }); } private static Range[] parseRanges(String ranges) { @@ -528,10 +490,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser reader = pulsarSpace.getReader(readerCacheKey); - - if (reader == null) { + return pulsarSpace.getReader(new PulsarSpace.ReaderCacheKey(readerName, topicName, startMsgPosStr), () -> { PulsarClient pulsarClient = pulsarSpace.getPulsarClient();; Map readerConf = pulsarSpace.getPulsarNBClientConf().getReaderConfMapTgt(); @@ -558,17 +517,12 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser t.label.equals(param)); - } /////// // Message processing sequence error simulation types @@ -77,29 +81,21 @@ public class PulsarAdapterUtil { this.label = label; } - private static final Map MAPPING = new HashMap<>(); - - static { - for (MSG_SEQ_ERROR_SIMU_TYPE simuType : values()) { - MAPPING.put(simuType.label, simuType); - MAPPING.put(simuType.label.toLowerCase(), simuType); - MAPPING.put(simuType.label.toUpperCase(), simuType); - MAPPING.put(simuType.name(), simuType); - MAPPING.put(simuType.name().toLowerCase(), simuType); - MAPPING.put(simuType.name().toUpperCase(), simuType); - } - } + private static final Map MAPPING = Stream.of(values()) + .flatMap(simuType -> + Stream.of(simuType.label, + simuType.label.toLowerCase(), + simuType.label.toUpperCase(), + simuType.name(), + simuType.name().toLowerCase(), + simuType.name().toUpperCase()) + .distinct().map(key -> Map.entry(key, simuType))) + .collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue)); public static Optional parseSimuType(String simuTypeString) { return Optional.ofNullable(MAPPING.get(simuTypeString.trim())); } } - public static boolean isValidSeqErrSimuType(String item) { - return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item)); - } - public static String getValidSeqErrSimuTypeList() { - return Arrays.stream(MSG_SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); - } /////// // Valid Pulsar API type @@ -113,12 +109,15 @@ public class PulsarAdapterUtil { PULSAR_API_TYPE(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } } public static boolean isValidPulsarApiType(String param) { - return Arrays.stream(PULSAR_API_TYPE.values()).anyMatch(t -> t.label.equals(param)); - } - public static String getValidPulsarApiTypeList() { - return Arrays.stream(PULSAR_API_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return PULSAR_API_TYPE.isValidLabel(param); } @@ -136,14 +135,16 @@ public class PulsarAdapterUtil { CONF_GATEGORY(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } } public static boolean isValidConfCategory(String item) { - return Arrays.stream(CONF_GATEGORY.values()).anyMatch(t -> t.label.equals(item)); + return CONF_GATEGORY.isValidLabel(item); } - public static String getValidConfCategoryList() { - return Arrays.stream(CONF_GATEGORY.values()).map(t -> t.label).collect(Collectors.joining(", ")); - } - /////// // Valid persistence type public enum PERSISTENT_TYPES { @@ -156,9 +157,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isValidPersistenceType(String type) { - return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch(t -> t.label.equals(type)); - } /////// // Valid Pulsar client configuration (activity-level settings) @@ -194,9 +192,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isValidClientConfItem(String item) { - return Arrays.stream(CLNT_CONF_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Standard producer configuration (activity-level settings) @@ -222,9 +217,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isStandardProducerConfItem(String item) { - return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); - } // compressionType public enum COMPRESSION_TYPE { @@ -239,12 +231,12 @@ public class PulsarAdapterUtil { COMPRESSION_TYPE(String label) { this.label = label; } + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); } - public static boolean isValidCompressionType(String item) { - return Arrays.stream(COMPRESSION_TYPE.values()).anyMatch(t -> t.label.equals(item)); - } + public static String getValidCompressionTypeList() { - return Arrays.stream(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return COMPRESSION_TYPE.TYPE_LIST; } /////// @@ -284,9 +276,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isStandardConsumerConfItem(String item) { - return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Custom consumer configuration (activity-level settings) @@ -301,9 +290,16 @@ public class PulsarAdapterUtil { CONSUMER_CONF_CUSTOM_KEY(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label).collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } + } public static boolean isCustomConsumerConfItem(String item) { - return Arrays.stream(CONSUMER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item)); + return CONSUMER_CONF_CUSTOM_KEY.isValidLabel(item); } // subscriptionTyp @@ -318,12 +314,21 @@ public class PulsarAdapterUtil { SUBSCRIPTION_TYPE(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label) + .collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); } public static boolean isValidSubscriptionType(String item) { - return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch(t -> t.label.equals(item)); + return SUBSCRIPTION_TYPE.isValidLabel(item); } public static String getValidSubscriptionTypeList() { - return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return SUBSCRIPTION_TYPE.TYPE_LIST; } // subscriptionInitialPosition @@ -336,12 +341,12 @@ public class PulsarAdapterUtil { SUBSCRIPTION_INITIAL_POSITION(String label) { this.label = label; } - } - public static boolean isValidSubscriptionInitialPosition(String item) { - return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).anyMatch(t -> t.label.equals(item)); + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } public static String getValidSubscriptionInitialPositionList() { - return Arrays.stream(SUBSCRIPTION_INITIAL_POSITION.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return SUBSCRIPTION_INITIAL_POSITION.TYPE_LIST; } // regexSubscriptionMode @@ -355,12 +360,12 @@ public class PulsarAdapterUtil { REGEX_SUBSCRIPTION_MODE(String label) { this.label = label; } + + private final static String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); } - public static boolean isValidRegexSubscriptionMode(String item) { - return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).anyMatch(t -> t.label.equals(item)); - } + public static String getValidRegexSubscriptionModeList() { - return Arrays.stream(REGEX_SUBSCRIPTION_MODE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + return REGEX_SUBSCRIPTION_MODE.TYPE_LIST; } /////// @@ -383,9 +388,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isStandardReaderConfItem(String item) { - return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Custom reader configuration (activity-level settings) @@ -400,9 +402,6 @@ public class PulsarAdapterUtil { this.label = label; } } - public static boolean isCustomReaderConfItem(String item) { - return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item)); - } /////// // Valid read positions for a Pulsar reader @@ -415,156 +414,84 @@ public class PulsarAdapterUtil { READER_MSG_POSITION_TYPE(String label) { this.label = label; } + + private static final Set LABELS = Stream.of(values()).map(v -> v.label) + .collect(Collectors.toUnmodifiableSet()); + + public static boolean isValidLabel(String label) { + return LABELS.contains(label); + } } public static boolean isValideReaderStartPosition(String item) { - return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item)); + return READER_MSG_POSITION_TYPE.isValidLabel(item); } + private static final Map> PRIMITIVE_SCHEMA_TYPE_MAPPING = Stream.of(SchemaType.values()) + .filter(SchemaType::isPrimitive) + .collect(Collectors.toUnmodifiableMap(schemaType -> schemaType.name().toUpperCase(), + schemaType -> Schema.getSchema(SchemaInfo.builder().type(schemaType).build()))); + /////// // Primitive Schema type public static boolean isPrimitiveSchemaTypeStr(String typeStr) { - boolean isPrimitive = false; - - // Use "BYTES" as the default type if the type string is not explicitly specified - if (StringUtils.isBlank(typeStr)) { - typeStr = "BYTES"; - } - - if (typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") || - typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") || - typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") || - typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") || - typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") || - typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") || - typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") || - typeStr.equalsIgnoreCase("LOCAL_DATE_TIME")) { - isPrimitive = true; - } - - return isPrimitive; + return StringUtils.isBlank(typeStr) || PRIMITIVE_SCHEMA_TYPE_MAPPING.containsKey(typeStr.toUpperCase()); } + public static Schema getPrimitiveTypeSchema(String typeStr) { - Schema schema; - - if (StringUtils.isBlank(typeStr)) { - typeStr = "BYTES"; + String lookupKey = StringUtils.isBlank(typeStr) ? "BYTES" : typeStr.toUpperCase(); + Schema schema = PRIMITIVE_SCHEMA_TYPE_MAPPING.get(lookupKey); + if (schema == null) { + throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr); } - - switch (typeStr.toUpperCase()) { - case "BOOLEAN": - schema = Schema.BOOL; - break; - case "INT8": - schema = Schema.INT8; - break; - case "INT16": - schema = Schema.INT16; - break; - case "INT32": - schema = Schema.INT32; - break; - case "INT64": - schema = Schema.INT64; - break; - case "FLOAT": - schema = Schema.FLOAT; - break; - case "DOUBLE": - schema = Schema.DOUBLE; - break; - case "DATE": - schema = Schema.DATE; - break; - case "TIME": - schema = Schema.TIME; - break; - case "TIMESTAMP": - schema = Schema.TIMESTAMP; - break; - case "INSTANT": - schema = Schema.INSTANT; - break; - case "LOCAL_DATE": - schema = Schema.LOCAL_DATE; - break; - case "LOCAL_TIME": - schema = Schema.LOCAL_TIME; - break; - case "LOCAL_DATE_TIME": - schema = Schema.LOCAL_DATE_TIME; - break; - case "BYTES": - schema = Schema.BYTES; - break; - // Report an error if non-valid, non-empty schema type string is provided - default: - throw new PulsarAdapterInvalidParamException("Invalid Pulsar primitive schema type string : " + typeStr); - } - return schema; } /////// // Complex strut type: Avro or Json public static boolean isAvroSchemaTypeStr(String typeStr) { - return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AVRO")); + return "AVRO".equalsIgnoreCase(typeStr); } // automatic decode the type from the Registry public static boolean isAutoConsumeSchemaTypeStr(String typeStr) { - return (StringUtils.isNotBlank(typeStr) && typeStr.equalsIgnoreCase("AUTO_CONSUME")); + return "AUTO_CONSUME".equalsIgnoreCase(typeStr); } - public static Schema getAvroSchema(String typeStr, String definitionStr) { - String schemaDefinitionStr = definitionStr; - String filePrefix = "file://"; - Schema schema; + private static final Map> AVRO_SCHEMA_CACHE = new ConcurrentHashMap<>(); + + public static Schema getAvroSchema(String typeStr, final String definitionStr) { // Check if payloadStr points to a file (e.g. "file:///path/to/a/file") if (isAvroSchemaTypeStr(typeStr)) { - if (StringUtils.isBlank(schemaDefinitionStr)) { - throw new PulsarAdapterInvalidParamException( - "Schema definition must be provided for \"Avro\" schema type!"); + if (StringUtils.isBlank(definitionStr)) { + throw new PulsarAdapterInvalidParamException("Schema definition must be provided for \"Avro\" schema type!"); } - else if (schemaDefinitionStr.startsWith(filePrefix)) { - try { - Path filePath = Paths.get(URI.create(schemaDefinitionStr)); - schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII); + return AVRO_SCHEMA_CACHE.computeIfAbsent(definitionStr, __ -> { + String schemaDefinitionStr = definitionStr; + if (schemaDefinitionStr.startsWith("file://")) { + try { + Path filePath = Paths.get(URI.create(schemaDefinitionStr)); + schemaDefinitionStr = Files.readString(filePath, StandardCharsets.UTF_8); + } catch (IOException ioe) { + throw new PulsarAdapterUnexpectedException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage()); + } } - catch (IOException ioe) { - throw new PulsarAdapterUnexpectedException( - "Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage()); - } - } - - schema = PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr); + return PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr); + }); + } else { + throw new PulsarAdapterInvalidParamException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr); } - else { - throw new PulsarAdapterInvalidParamException( - "Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr); - } - - return schema; - } - - /////// - // Generate effective key string - public static String buildCacheKey(String... keyParts) { - // Ignore blank keyPart - String joinedKeyStr = - Stream.of(keyParts) - .filter(s -> !StringUtils.isBlank(s)) - .collect(Collectors.joining(",")); - - return Base64.getEncoder().encodeToString(joinedKeyStr.getBytes()); } /////// // Convert JSON string to a key/value map - public static Map convertJsonToMap(String jsonStr) throws Exception { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonStr, Map.class); + private static final ObjectMapper JACKSON_OBJECT_MAPPER = new ObjectMapper(); + private static final TypeReference> MAP_TYPE_REF = new TypeReference<>() {}; + + public static Map convertJsonToMap(String jsonStr) throws IOException { + return JACKSON_OBJECT_MAPPER.readValue(jsonStr, MAP_TYPE_REF); } + /////// // Get full namespace name (/) from a Pulsar topic URI public static String getFullNamespaceName(String topicUri) {