Merge pull request #329 from XN137/simplify-pulsar-jms-cache-keys

simplify cache key creation in pulsar + jms driver
This commit is contained in:
Jonathan Shook 2021-06-10 14:38:03 -05:00 committed by GitHub
commit 38204917a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 46 deletions

View File

@ -116,13 +116,16 @@ public class JmsActivity extends SimpleActivity {
); );
} }
private static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
}
/** /**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/ */
public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) { public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) {
String encodedTopicStr = String destinationCacheKey = buildCacheKey(jmsDestinationType, destName);
JmsUtil.encode(jmsDestinationType, destName); Destination destination = jmsDestinations.get(destinationCacheKey);
Destination destination = jmsDestinations.get(encodedTopicStr);
if ( destination == null ) { if ( destination == null ) {
// TODO: should we match Persistent/Non-peristent JMS Delivery mode with // TODO: should we match Persistent/Non-peristent JMS Delivery mode with
@ -133,7 +136,7 @@ public class JmsActivity extends SimpleActivity {
destination = jmsContext.createTopic(destName); destination = jmsContext.createTopic(destName);
} }
jmsDestinations.put(encodedTopicStr, destination); jmsDestinations.put(destinationCacheKey, destination);
} }
return destination; return destination;

View File

@ -1,11 +1,9 @@
package io.nosqlbench.driver.jms.util; package io.nosqlbench.driver.jms.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64;
public class JmsUtil { public class JmsUtil {
@ -102,18 +100,5 @@ public class JmsUtil {
public static boolean isValidJmsDestinationType(String type) { public static boolean isValidJmsDestinationType(String type) {
return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type)); return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type));
} }
public static String encode(String... strings) {
StringBuilder stringBuilder = new StringBuilder();
for (String str : strings) {
if (!StringUtils.isBlank(str))
stringBuilder.append(str).append("::");
}
String concatenatedStr =
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
}
} }

View File

@ -233,6 +233,9 @@ public class PulsarSpace {
}; };
} }
private static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
}
public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) { public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) {
String topicName = getEffectiveProducerTopicName(cycleTopicName); String topicName = getEffectiveProducerTopicName(cycleTopicName);
@ -242,8 +245,8 @@ public class PulsarSpace {
throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level"); throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level");
} }
String encodedStr = PulsarActivityUtil.encode(producerName, topicName); String producerCacheKey = buildCacheKey(producerName, topicName);
Producer<?> producer = producers.get(encodedStr); Producer<?> producer = producers.get(producerCacheKey);
if (producer == null) { if (producer == null) {
PulsarClient pulsarClient = getPulsarClient(); PulsarClient pulsarClient = getPulsarClient();
@ -273,7 +276,7 @@ public class PulsarSpace {
ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema); ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema);
producerBuilder.loadConf(producerConf); producerBuilder.loadConf(producerConf);
producer = producerBuilder.create(); producer = producerBuilder.create();
producers.put(encodedStr, producer); producers.put(producerCacheKey, producer);
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent", safeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent()))); ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent", safeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent()))); ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
@ -449,29 +452,29 @@ public class PulsarSpace {
throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!"); throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
} }
String encodedStr; String consumerCacheKey;
// precedence sequence: // precedence sequence:
// topic_names (consumer statement param) > // topic_names (consumer statement param) >
// topics_pattern (consumer statement param) > // topics_pattern (consumer statement param) >
// topic_uri (document level param) // topic_uri (document level param)
if (!topicNames.isEmpty()) { if (!topicNames.isEmpty()) {
encodedStr = PulsarActivityUtil.encode( consumerCacheKey = buildCacheKey(
consumerName, consumerName,
subscriptionName, subscriptionName,
StringUtils.join(topicNames, "|")); String.join("|", topicNames));
} else if (topicsPattern != null) { } else if (topicsPattern != null) {
encodedStr = PulsarActivityUtil.encode( consumerCacheKey = buildCacheKey(
consumerName, consumerName,
subscriptionName, subscriptionName,
topicsPatternStr); topicsPatternStr);
} else { } else {
encodedStr = PulsarActivityUtil.encode( consumerCacheKey = buildCacheKey(
consumerName, consumerName,
subscriptionName, subscriptionName,
cycleTopicUri); cycleTopicUri);
} }
Consumer<?> consumer = consumers.get(encodedStr); Consumer<?> consumer = consumers.get(consumerCacheKey);
if (consumer == null) { if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient(); PulsarClient pulsarClient = getPulsarClient();
@ -508,7 +511,7 @@ public class PulsarSpace {
throw new RuntimeException("Unable to create a Pulsar consumer!"); throw new RuntimeException("Unable to create a Pulsar consumer!");
} }
consumers.put(encodedStr, consumer); consumers.put(consumerCacheKey, consumer);
} }
return consumer; return consumer;
@ -576,8 +579,8 @@ public class PulsarSpace {
throw new RuntimeException("Reader:: Invalid value for Reader start message position!"); throw new RuntimeException("Reader:: Invalid value for Reader start message position!");
} }
String encodedStr = PulsarActivityUtil.encode(topicName, readerName, startMsgPosStr); String readerCacheKey = buildCacheKey(topicName, readerName, startMsgPosStr);
Reader<?> reader = readers.get(encodedStr); Reader<?> reader = readers.get(readerCacheKey);
if (reader == null) { if (reader == null) {
PulsarClient pulsarClient = getPulsarClient(); PulsarClient pulsarClient = getPulsarClient();
@ -614,7 +617,7 @@ public class PulsarSpace {
throw new RuntimeException("Unable to create a Pulsar reader!"); throw new RuntimeException("Unable to create a Pulsar reader!");
} }
readers.put(encodedStr, reader); readers.put(readerCacheKey, reader);
} }
return reader; return reader;

View File

@ -15,7 +15,6 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Arrays; import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap; import java.util.HashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -403,18 +402,5 @@ public class PulsarActivityUtil {
return schema; return schema;
} }
public static String encode(String... strings) {
StringBuilder stringBuilder = new StringBuilder();
for (String str : strings) {
if (!StringUtils.isBlank(str))
stringBuilder.append(str).append("::");
}
String concatenatedStr =
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
}
} }