simplify cache key creation in pulsar + jms driver

This commit is contained in:
Christopher Lambert 2021-06-10 12:19:54 +02:00
parent 18e9bb91ff
commit 94f93fe4af
4 changed files with 23 additions and 46 deletions

View File

@ -116,13 +116,16 @@ public class JmsActivity extends SimpleActivity {
);
}
private static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
}
/**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/
public Destination getOrCreateJmsDestination(String jmsDestinationType, String destName) {
String encodedTopicStr =
JmsUtil.encode(jmsDestinationType, destName);
Destination destination = jmsDestinations.get(encodedTopicStr);
String destinationCacheKey = buildCacheKey(jmsDestinationType, destName);
Destination destination = jmsDestinations.get(destinationCacheKey);
if ( destination == null ) {
// TODO: should we match Persistent/Non-peristent JMS Delivery mode with
@ -133,7 +136,7 @@ public class JmsActivity extends SimpleActivity {
destination = jmsContext.createTopic(destName);
}
jmsDestinations.put(encodedTopicStr, destination);
jmsDestinations.put(destinationCacheKey, destination);
}
return destination;

View File

@ -1,11 +1,9 @@
package io.nosqlbench.driver.jms.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.Base64;
public class JmsUtil {
@ -102,18 +100,5 @@ public class JmsUtil {
public static boolean isValidJmsDestinationType(String type) {
return Arrays.stream(JMS_DESTINATION_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String encode(String... strings) {
StringBuilder stringBuilder = new StringBuilder();
for (String str : strings) {
if (!StringUtils.isBlank(str))
stringBuilder.append(str).append("::");
}
String concatenatedStr =
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
}
}

View File

@ -233,6 +233,9 @@ public class PulsarSpace {
};
}
private static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
}
public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) {
String topicName = getEffectiveProducerTopicName(cycleTopicName);
@ -242,8 +245,8 @@ public class PulsarSpace {
throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level");
}
String encodedStr = PulsarActivityUtil.encode(producerName, topicName);
Producer<?> producer = producers.get(encodedStr);
String producerCacheKey = buildCacheKey(producerName, topicName);
Producer<?> producer = producers.get(producerCacheKey);
if (producer == null) {
PulsarClient pulsarClient = getPulsarClient();
@ -273,7 +276,7 @@ public class PulsarSpace {
ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema);
producerBuilder.loadConf(producerConf);
producer = producerBuilder.create();
producers.put(encodedStr, producer);
producers.put(producerCacheKey, producer);
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent", safeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
@ -449,29 +452,29 @@ public class PulsarSpace {
throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
}
String encodedStr;
String consumerCacheKey;
// precedence sequence:
// topic_names (consumer statement param) >
// topics_pattern (consumer statement param) >
// topic_uri (document level param)
if (!topicNames.isEmpty()) {
encodedStr = PulsarActivityUtil.encode(
consumerCacheKey = buildCacheKey(
consumerName,
subscriptionName,
StringUtils.join(topicNames, "|"));
String.join("|", topicNames));
} else if (topicsPattern != null) {
encodedStr = PulsarActivityUtil.encode(
consumerCacheKey = buildCacheKey(
consumerName,
subscriptionName,
topicsPatternStr);
} else {
encodedStr = PulsarActivityUtil.encode(
consumerCacheKey = buildCacheKey(
consumerName,
subscriptionName,
cycleTopicUri);
}
Consumer<?> consumer = consumers.get(encodedStr);
Consumer<?> consumer = consumers.get(consumerCacheKey);
if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient();
@ -508,7 +511,7 @@ public class PulsarSpace {
throw new RuntimeException("Unable to create a Pulsar consumer!");
}
consumers.put(encodedStr, consumer);
consumers.put(consumerCacheKey, consumer);
}
return consumer;
@ -576,8 +579,8 @@ public class PulsarSpace {
throw new RuntimeException("Reader:: Invalid value for Reader start message position!");
}
String encodedStr = PulsarActivityUtil.encode(topicName, readerName, startMsgPosStr);
Reader<?> reader = readers.get(encodedStr);
String readerCacheKey = buildCacheKey(topicName, readerName, startMsgPosStr);
Reader<?> reader = readers.get(readerCacheKey);
if (reader == null) {
PulsarClient pulsarClient = getPulsarClient();
@ -614,7 +617,7 @@ public class PulsarSpace {
throw new RuntimeException("Unable to create a Pulsar reader!");
}
readers.put(encodedStr, reader);
readers.put(readerCacheKey, reader);
}
return reader;

View File

@ -15,7 +15,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.stream.Collectors;
@ -403,18 +402,5 @@ public class PulsarActivityUtil {
return schema;
}
public static String encode(String... strings) {
StringBuilder stringBuilder = new StringBuilder();
for (String str : strings) {
if (!StringUtils.isBlank(str))
stringBuilder.append(str).append("::");
}
String concatenatedStr =
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
}
}