diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java index 396982bbf..39ac5b42d 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/JmsActivity.java @@ -116,13 +116,16 @@ public class JmsActivity extends SimpleActivity { ); } + private static String buildCacheKey(String... keyParts) { + return String.join("::", keyParts); + } + /** * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it */ public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) { - String encodedTopicStr = - JmsUtil.encode(jmsDestinationType, destName); - Destination destination = jmsDestinations.get(encodedTopicStr); + String destinationCacheKey = buildCacheKey(jmsDestinationType, destName); + Destination destination = jmsDestinations.get(destinationCacheKey); if ( destination == null ) { // TODO: should we match Persistent/Non-peristent JMS Delivery mode with @@ -133,7 +136,7 @@ public class JmsActivity extends SimpleActivity { destination = jmsContext.createTopic(destName); } - jmsDestinations.put(encodedTopicStr, destination); + jmsDestinations.put(destinationCacheKey, destination); } return destination; diff --git a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java index 456939417..2b5e0ba14 100644 --- a/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java +++ b/driver-jms/src/main/java/io/nosqlbench/driver/jms/util/JmsUtil.java @@ -1,11 +1,9 @@ package io.nosqlbench.driver.jms.util; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Arrays; -import java.util.Base64; public class JmsUtil { @@ -102,18 +100,5 @@ public class JmsUtil { public static boolean isValidJmsDestinationType(String type) { return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type)); } - - public static String encode(String... strings) { - StringBuilder stringBuilder = new StringBuilder(); - for (String str : strings) { - if (!StringUtils.isBlank(str)) - stringBuilder.append(str).append("::"); - } - - String concatenatedStr = - StringUtils.substringBeforeLast(stringBuilder.toString(), "::"); - - return Base64.getEncoder().encodeToString(concatenatedStr.getBytes()); - } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index 8f0bbab09..e99b99beb 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -233,6 +233,9 @@ public class PulsarSpace { }; } + private static String buildCacheKey(String... keyParts) { + return String.join("::", keyParts); + } public Producer getProducer(String cycleTopicName, String cycleProducerName) { String topicName = getEffectiveProducerTopicName(cycleTopicName); @@ -242,8 +245,8 @@ public class PulsarSpace { throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level"); } - String encodedStr = PulsarActivityUtil.encode(producerName, topicName); - Producer producer = producers.get(encodedStr); + String producerCacheKey = buildCacheKey(producerName, topicName); + Producer producer = producers.get(producerCacheKey); if (producer == null) { PulsarClient pulsarClient = getPulsarClient(); @@ -273,7 +276,7 @@ public class PulsarSpace { ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema); producerBuilder.loadConf(producerConf); producer = producerBuilder.create(); - producers.put(encodedStr, producer); + producers.put(producerCacheKey, producer); ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent", safeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent()))); ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent()))); @@ -449,29 +452,29 @@ public class PulsarSpace { throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!"); } - String encodedStr; + String consumerCacheKey; // precedence sequence: // topic_names (consumer statement param) > // topics_pattern (consumer statement param) > // topic_uri (document level param) if (!topicNames.isEmpty()) { - encodedStr = PulsarActivityUtil.encode( + consumerCacheKey = buildCacheKey( consumerName, subscriptionName, - StringUtils.join(topicNames, "|")); + String.join("|", topicNames)); } else if (topicsPattern != null) { - encodedStr = PulsarActivityUtil.encode( + consumerCacheKey = buildCacheKey( consumerName, subscriptionName, topicsPatternStr); } else { - encodedStr = PulsarActivityUtil.encode( + consumerCacheKey = buildCacheKey( consumerName, subscriptionName, cycleTopicUri); } - Consumer consumer = consumers.get(encodedStr); + Consumer consumer = consumers.get(consumerCacheKey); if (consumer == null) { PulsarClient pulsarClient = getPulsarClient(); @@ -508,7 +511,7 @@ public class PulsarSpace { throw new RuntimeException("Unable to create a Pulsar consumer!"); } - consumers.put(encodedStr, consumer); + consumers.put(consumerCacheKey, consumer); } return consumer; @@ -576,8 +579,8 @@ public class PulsarSpace { throw new RuntimeException("Reader:: Invalid value for Reader start message position!"); } - String encodedStr = PulsarActivityUtil.encode(topicName, readerName, startMsgPosStr); - Reader reader = readers.get(encodedStr); + String readerCacheKey = buildCacheKey(topicName, readerName, startMsgPosStr); + Reader reader = readers.get(readerCacheKey); if (reader == null) { PulsarClient pulsarClient = getPulsarClient(); @@ -614,7 +617,7 @@ public class PulsarSpace { throw new RuntimeException("Unable to create a Pulsar reader!"); } - readers.put(encodedStr, reader); + readers.put(readerCacheKey, reader); } return reader; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index c12fb2ff2..5f82d4f76 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -15,7 +15,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; -import java.util.Base64; import java.util.HashMap; import java.util.stream.Collectors; @@ -403,18 +402,5 @@ public class PulsarActivityUtil { return schema; } - - public static String encode(String... strings) { - StringBuilder stringBuilder = new StringBuilder(); - for (String str : strings) { - if (!StringUtils.isBlank(str)) - stringBuilder.append(str).append("::"); - } - - String concatenatedStr = - StringUtils.substringBeforeLast(stringBuilder.toString(), "::"); - - return Base64.getEncoder().encodeToString(concatenatedStr.getBytes()); - } }