Merge pull request #1271

Multiple enhancements and minor fixes for NB Pulsar, NB Kafka (S4K), and NB S4J adapters
This commit is contained in:
Jonathan Shook
2023-05-17 10:27:19 -05:00
committed by GitHub
19 changed files with 446 additions and 389 deletions

View File

@@ -16,8 +16,8 @@
package io.nosqlbench.adapter.kafka;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.kafka.util.KafkaClientConf;
import io.nosqlbench.api.config.standard.ConfigModel;
@@ -29,8 +29,10 @@ import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class KafkaSpace implements AutoCloseable {
@@ -41,7 +43,6 @@ public class KafkaSpace implements AutoCloseable {
// TODO: currently this NB Kafka driver only supports String type for message key and value
// add schema support in the future
private final ConcurrentHashMap<String, OpTimeTrackKafkaClient> opTimeTrackKafkaClients = new ConcurrentHashMap<>();
private final String bootstrapSvr;
private final String kafkaClientConfFileName;
@@ -75,6 +76,18 @@ public class KafkaSpace implements AutoCloseable {
private AtomicBoolean beingShutdown = new AtomicBoolean(false);
public record ProducerCacheKey(String producerName, String topicName, String clientId) {
}
private final ConcurrentHashMap<ProducerCacheKey, OpTimeTrackKafkaProducer> producers =
new ConcurrentHashMap<>();
public record ConsumerCacheKey(String consumerName, List<String> topicList, String clientId) {
}
private final ConcurrentHashMap<ConsumerCacheKey, OpTimeTrackKafkaConsumer> consumers =
new ConcurrentHashMap<>();
public KafkaSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
@@ -115,11 +128,16 @@ public class KafkaSpace implements AutoCloseable {
.asReadOnly();
}
public OpTimeTrackKafkaClient getOpTimeTrackKafkaClient(String cacheKey) {
return opTimeTrackKafkaClients.get(cacheKey);
public OpTimeTrackKafkaProducer getOpTimeTrackKafkaProducer(
ProducerCacheKey key,
Supplier<OpTimeTrackKafkaProducer> producerSupplier) {
return producers.computeIfAbsent(key, __ -> producerSupplier.get());
}
public void addOpTimeTrackKafkaClient(String cacheKey, OpTimeTrackKafkaClient client) {
opTimeTrackKafkaClients.put(cacheKey, client);
public OpTimeTrackKafkaConsumer getOpTimeTrackKafkaConsumer(
ConsumerCacheKey key,
Supplier<OpTimeTrackKafkaConsumer> consumerSupplier) {
return consumers.computeIfAbsent(key, __ -> consumerSupplier.get());
}
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
@@ -135,23 +153,27 @@ public class KafkaSpace implements AutoCloseable {
public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
public boolean isShuttigDown() {
public boolean isShuttingDown() {
return beingShutdown.get();
}
public void shutdownSpace() {
try {
beingShutdown.set(true);
for (OpTimeTrackKafkaClient client : opTimeTrackKafkaClients.values()) {
client.close();
for (OpTimeTrackKafkaProducer producer : producers.values()) {
producer.close();
}
for (OpTimeTrackKafkaConsumer consumer : consumers.values()) {
consumer.close();
}
// Pause 5 seconds before closing producers/consumers
KafkaAdapterUtil.pauseCurThreadExec(5);
}
catch (Exception e) {
e.printStackTrace();
throw new KafkaAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
catch (Exception ex) {
String exp = "Unexpected error when shutting down the Kafka adaptor space";
logger.error(exp, ex);
}
}
}

View File

@@ -20,7 +20,6 @@ import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil.DOC_LEVEL_PARAMS;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
@@ -133,5 +132,4 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
public String getName() {
return "KafkaBaseOpDispenser";
}
}

View File

@@ -36,7 +36,6 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
@@ -97,52 +96,6 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
return defaultGrpNamePrefix + '-' + grpIdx;
}
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer(
final long cycle,
final List<String> topicNameList,
final String groupId)
{
final String topicNameListStr = topicNameList.stream()
.collect(Collectors.joining("::"));
final String cacheKey = KafkaAdapterUtil.buildCacheKey(
"consumer-" + cycle % this.kafkaClntCnt, topicNameListStr, groupId );
OpTimeTrackKafkaClient opTimeTrackKafkaClient = this.kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (null == opTimeTrackKafkaClient) {
final Properties consumerConfProps = new Properties();
consumerConfProps.putAll(this.consumerClientConfMap);
consumerConfProps.put("group.id", groupId);
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfProps);
synchronized (this) {
consumer.subscribe(topicNameList);
}
if (MessageConsumerOpDispenser.logger.isDebugEnabled())
MessageConsumerOpDispenser.logger.debug("Kafka consumer created: {}/{} -- {}, {}, {}",
cacheKey,
consumer,
topicNameList,
this.autoCommitEnabled,
this.maxMsgCntPerCommit);
opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(
this.kafkaSpace,
this.asyncAPI,
this.msgPollIntervalInSec,
this.autoCommitEnabled,
this.maxMsgCntPerCommit,
consumer,
this.kafkaAdapterMetrics,
EndToEndStartingTimeSource.valueOf(this.e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
this::getReceivedMessageSequenceTracker,
this.seqTrackingFunc.apply(cycle));
this.kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
return opTimeTrackKafkaClient;
}
private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(final String topicName) {
return this.receivedMessageSequenceTrackersForTopicThreadLocal.get()
.computeIfAbsent(topicName, k -> this.createReceivedMessageSequenceTracker());
@@ -159,10 +112,48 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
assert StringUtils.isNotBlank(explicitTopicListStr);
return Arrays.stream(StringUtils.split(explicitTopicListStr, ','))
.filter(s -> StringUtils.isNotBlank(s))
.filter(StringUtils::isNotBlank)
.toList();
}
private OpTimeTrackKafkaConsumer getTimeTrackKafkaConsumer(final long cycle,
final List<String> topicNameList,
final String groupId)
{
final String consumerName = "consumer-" + cycle % this.kafkaClntCnt;
KafkaSpace.ConsumerCacheKey consumerCacheKey =
new KafkaSpace.ConsumerCacheKey(consumerName, topicNameList, groupId);
return kafkaSpace.getOpTimeTrackKafkaConsumer(consumerCacheKey, () -> {
final Properties consumerConfProps = new Properties();
consumerConfProps.putAll(this.consumerClientConfMap);
consumerConfProps.put("group.id", groupId);
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfProps);
synchronized (this) {
consumer.subscribe(topicNameList);
}
if (MessageConsumerOpDispenser.logger.isDebugEnabled())
MessageConsumerOpDispenser.logger.debug(
"Kafka consumer created: {} -- autoCommitEnabled: {}, maxMsgCntPerCommit: {}",
consumer,
this.autoCommitEnabled,
this.maxMsgCntPerCommit);
return new OpTimeTrackKafkaConsumer(
this.kafkaSpace,
this.asyncAPI,
this.msgPollIntervalInSec,
this.autoCommitEnabled,
this.maxMsgCntPerCommit,
consumer,
this.kafkaAdapterMetrics,
EndToEndStartingTimeSource.valueOf(this.e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
this::getReceivedMessageSequenceTracker,
this.seqTrackingFunc.apply(cycle));
});
}
@Override
public KafkaOp apply(final long cycle) {
final List<String> topicNameList = this.getEffectiveTopicNameList(cycle);
@@ -171,7 +162,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
"Effective consumer group name and/or topic names are needed for creating a consumer!");
final OpTimeTrackKafkaClient opTimeTrackKafkaConsumer =
this.getOrCreateOpTimeTrackKafkaConsumer(cycle, topicNameList, groupId);
this.getTimeTrackKafkaConsumer(cycle, topicNameList, groupId);
return new KafkaOp(
this.kafkaAdapterMetrics,

View File

@@ -72,7 +72,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
this.producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
txnBatchNum = this.parsedOp.getStaticConfigOr("txn_batch_num", Integer.valueOf(0));
txnBatchNum = this.parsedOp.getStaticConfigOr("txn_batch_num", 0);
msgHeaderJsonStrFunc = this.lookupOptionalStrOpValueFunc(MessageProducerOpDispenser.MSG_HEADER_OP_PARAM);
msgKeyStrFunc = this.lookupOptionalStrOpValueFunc(MessageProducerOpDispenser.MSG_KEY_OP_PARAM);
@@ -94,62 +94,6 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
return "";
}
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer(final long cycle,
final String topicName,
final String clientId)
{
final String cacheKey = KafkaAdapterUtil.buildCacheKey(
"producer-" + cycle % this.kafkaClntCnt, topicName);
OpTimeTrackKafkaClient opTimeTrackKafkaClient = this.kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (null == opTimeTrackKafkaClient) {
final Properties producerConfProps = new Properties();
producerConfProps.putAll(this.producerClientConfMap);
if (StringUtils.isNotBlank(clientId)) {
producerConfProps.put("client.id", clientId);
} else {
producerConfProps.remove("client.id");
}
// When transaction batch number is less than 2, it is treated effectively as no-transaction
if (2 > txnBatchNum) {
producerConfProps.remove("transactional.id");
}
String baseTransactId = "";
boolean transactionEnabled = false;
if (producerConfProps.containsKey("transactional.id")) {
baseTransactId = producerConfProps.getProperty("transactional.id").toString();
producerConfProps.put("transactional.id", baseTransactId + '-' + cacheKey);
transactionEnabled = StringUtils.isNotBlank(producerConfProps.getProperty("transactional.id").toString());
}
final KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfProps);
if (transactionEnabled) producer.initTransactions();
if (MessageProducerOpDispenser.logger.isDebugEnabled())
MessageProducerOpDispenser.logger.debug("Producer created: {}/{} -- ({}, {}, {})",
cacheKey,
producer,
topicName,
transactionEnabled,
clientId);
opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer(
this.kafkaSpace,
this.asyncAPI,
transactionEnabled,
this.txnBatchNum,
this.seqTrackingFunc.apply(cycle),
this.msgSeqErrSimuTypeSetFunc.apply(cycle),
producer);
this.kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
return opTimeTrackKafkaClient;
}
private ProducerRecord<String, String> createKafkaMessage(
final long curCycle,
final String topicName,
@@ -199,13 +143,69 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
return record;
}
public OpTimeTrackKafkaProducer getOpTimeTrackKafkaProducer(long cycle,
final String topicName,
final String clientId)
{
String producerName = "producer-" + cycle % this.kafkaClntCnt;
KafkaSpace.ProducerCacheKey producerCacheKey =
new KafkaSpace.ProducerCacheKey(producerName, topicName, clientId);
return kafkaSpace.getOpTimeTrackKafkaProducer(producerCacheKey, () -> {
final Properties producerConfProps = new Properties();
producerConfProps.putAll(this.producerClientConfMap);
if (StringUtils.isNotBlank(clientId)) {
producerConfProps.put("client.id", clientId);
} else {
producerConfProps.remove("client.id");
}
// When transaction batch number is less than 2, it is treated effectively as no-transaction
if (2 > txnBatchNum) {
producerConfProps.remove("transactional.id");
}
String baseTransactId = "";
boolean transactionEnabled = false;
if (producerConfProps.containsKey("transactional.id")) {
baseTransactId = producerConfProps.getProperty("transactional.id");
if (StringUtils.isNotBlank(baseTransactId)) {
producerConfProps.put(
"transactional.id",
baseTransactId + '-' + (cycle % this.kafkaClntCnt));
transactionEnabled = StringUtils.isNotBlank(producerConfProps.getProperty("transactional.id"));
}
}
final KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfProps);
if (transactionEnabled) producer.initTransactions();
if (MessageProducerOpDispenser.logger.isDebugEnabled())
MessageProducerOpDispenser.logger.debug(
"Producer created: {} -- transactionEnabled: {}, clientId: {})",
producer,
transactionEnabled,
clientId);
return new OpTimeTrackKafkaProducer(
this.kafkaSpace,
this.asyncAPI,
transactionEnabled,
this.txnBatchNum,
this.seqTrackingFunc.apply(cycle),
this.msgSeqErrSimuTypeSetFunc.apply(cycle),
producer);
});
}
@Override
public KafkaOp apply(final long cycle) {
final String topicName = this.topicNameStrFunc.apply(cycle);
final String clientId = this.getEffectiveClientId(cycle);
final OpTimeTrackKafkaClient opTimeTrackKafkaProducer =
this.getOrCreateOpTimeTrackKafkaProducer(cycle, topicName, clientId);
this.getOpTimeTrackKafkaProducer(cycle, topicName, clientId);
final ProducerRecord<String, String> message = this.createKafkaMessage(
cycle,

View File

@@ -29,9 +29,9 @@ abstract public class OpTimeTrackKafkaClient {
protected final long activityStartTime;
// Maximum time length to execute S4J operations (e.g. message send or consume)
// Maximum time length to execute Kafka operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes
// - 0 means no maximum time constraint. KafkaOp is always executed until NB execution cycle finishes
protected final long maxOpTimeInSec;
public OpTimeTrackKafkaClient(KafkaSpace kafkaSpace) {

View File

@@ -120,7 +120,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
@Override
void cycleMsgProcess(final long cycle, final Object cycleObj) {
if (this.kafkaSpace.isShuttigDown()) return;
if (this.kafkaSpace.isShuttingDown()) return;
synchronized (this) {
final ConsumerRecords<String, String> records = this.consumer.poll(this.msgPoolIntervalInMs);
@@ -225,8 +225,9 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
catch (final IllegalStateException ise) {
// If a consumer is already closed, that's fine.
}
catch (final Exception e) {
e.printStackTrace();
catch (final Exception ex) {
logger.error(ex);
ex.printStackTrace();
}
}
}

View File

@@ -160,7 +160,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
// For producer, cycleObj represents a "message" (ProducerRecord)
assert null != cycleObj;
if (this.kafkaSpace.isShuttigDown()) {
if (this.kafkaSpace.isShuttingDown()) {
if (this.transactionEnabled) try {
this.producer.abortTransaction();
if (OpTimeTrackKafkaProducer.logger.isDebugEnabled())
@@ -221,17 +221,12 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
}
catch ( final ProducerFencedException | OutOfOrderSequenceException |
UnsupportedOperationException | AuthorizationException e) {
UnsupportedOperationException | AuthorizationException | IllegalStateException e) {
if (OpTimeTrackKafkaProducer.logger.isDebugEnabled())
OpTimeTrackKafkaProducer.logger.debug("Fatal error when sending a message ({}) - {}, {}",
cycle, this.producer, message);
throw new KafkaAdapterUnexpectedException(e);
}
catch (final IllegalStateException | KafkaException e) {
if (this.transactionEnabled) {
}
}
catch (final Exception e) {
throw new KafkaAdapterUnexpectedException(e);
}
@@ -252,8 +247,9 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
catch (final IllegalStateException ise) {
// If a producer is already closed, that's fine.
}
catch (final Exception e) {
e.printStackTrace();
catch (final Exception ex) {
logger.error(ex);
ex.printStackTrace();
}
}

View File

@@ -16,7 +16,6 @@
package io.nosqlbench.adapter.kafka.util;
import com.amazonaws.util.Base64;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
@@ -27,10 +26,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public enum KafkaAdapterUtil {
;
public class KafkaAdapterUtil {
public static final String MSG_SEQUENCE_NUMBER = "sequence_number";
private static final Logger logger = LogManager.getLogger(KafkaAdapterUtil.class);
@@ -51,12 +48,6 @@ public enum KafkaAdapterUtil {
this.label = label;
}
}
public static boolean isValidDocLevelParam(final String param) {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidDocLevelParamList() {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public static final String NB_MSG_SEQ_PROP = "NBMsgSeqProp";
public static final String NB_MSG_SIZE_PROP = "NBMsgSize";
@@ -82,13 +73,6 @@ public enum KafkaAdapterUtil {
return Arrays.asList(mapper.readValue(jsonStr, Object[].class));
}
public static String buildCacheKey(final String... keyParts) {
final String combinedStr = Arrays.stream(keyParts)
.filter(StringUtils::isNotBlank)
.collect(Collectors.joining("::"));
return Base64.encodeAsString(combinedStr.getBytes(StandardCharsets.UTF_8));
}
public static void pauseCurThreadExec(final int pauseInSec) {
if (0 < pauseInSec) try {
Thread.sleep(pauseInSec * 1000L);

View File

@@ -25,18 +25,26 @@ topic.flush.messages=2
topic.log.message.timestamp.type=CreateTime
#####
# Producer related configurations (global) - topic.***
# Producer related configurations (global) - producer.***
# - Valid settings: https://kafka.apache.org/documentation/#producerconfigs
#
#--------------------------------------
producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
#producer.client.id=nbDftClient
producer.transactional.id=nbDftTxn
#producer.transactional.id=nbDftTxn
##
# NOTE: When connecting to Astra Streaming (with S4K enabled), enable the following settings and with
# the corresponding AS tenant and token information
##
#producer.security.protocol=SASL_SSL
#producer.sasl.mechanism=PLAIN
#producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<AS_tenant>" password="token:<AS_token>";
#####
# Consumer related configurations (global) - topic.***
# Consumer related configurations (global) - consumer.***
# - Valid settings: https://kafka.apache.org/documentation/#consumerconfigs
#
#--------------------------------------
@@ -45,3 +53,11 @@ consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserial
consumer.group.id=nbDftGrp
#consumer.isolation.level=read_uncommitted
#consumer.enable.auto.commit=true
##
# NOTE: When connecting to Astra Streaming (with S4K enabled), enable the following settings and with
# the corresponding AS tenant and token information
##
#consumer.security.protocol=SASL_SSL
#consumer.sasl.mechanism=PLAIN
#consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<AS_tenant>" password="token:<AS_token>";

View File

@@ -34,7 +34,10 @@
</description>
<properties>
<!-- Pulsar version 2.11.x causes NB S4J failure due to version conflict (S4J right now is on version 2.10.3)
<pulsar.version>2.11.1</pulsar.version>
-->
<pulsar.version>2.10.4</pulsar.version>
</properties>
<dependencies>

View File

@@ -16,7 +16,6 @@
package io.nosqlbench.adapter.pulsar;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.PulsarClientConf;
import io.nosqlbench.api.config.standard.ConfigModel;
@@ -54,10 +53,12 @@ public class PulsarSpace implements AutoCloseable {
public record ProducerCacheKey(String producerName, String topicName) {
}
private final ConcurrentHashMap<ProducerCacheKey, Producer<?>> producers = new ConcurrentHashMap<>();
public record ConsumerCacheKey(String consumerName, String subscriptionName, List<String> topicNameList, String topicPattern) {
public record ConsumerCacheKey(String consumerName,
String subscriptionName,
List<String> topicNameList,
String topicPattern) {
}
private final ConcurrentHashMap<ConsumerCacheKey, Consumer<?>> consumers = new ConcurrentHashMap<>();
@@ -100,11 +101,18 @@ public class PulsarSpace implements AutoCloseable {
public int getProducerSetCnt() { return producers.size(); }
public int getConsumerSetCnt() { return consumers.size(); }
public int getReaderSetCnt() { return readers.size(); }
public Producer<?> getProducer(ProducerCacheKey key, Supplier<Producer<?>> producerSupplier) { return producers.computeIfAbsent(key, __ -> producerSupplier.get()); }
public Consumer<?> getConsumer(ConsumerCacheKey key, Supplier<Consumer<?>> consumerSupplier) { return consumers.computeIfAbsent(key, __ -> consumerSupplier.get()); }
public Producer<?> getProducer(ProducerCacheKey key, Supplier<Producer<?>> producerSupplier) {
return producers.computeIfAbsent(key, __ -> producerSupplier.get());
}
public Reader<?> getReader(ReaderCacheKey key, Supplier<Reader<?>> readerSupplier) { return readers.computeIfAbsent(key, __ -> readerSupplier.get()); }
public Consumer<?> getConsumer(ConsumerCacheKey key, Supplier<Consumer<?>> consumerSupplier) {
return consumers.computeIfAbsent(key, __ -> consumerSupplier.get());
}
public Reader<?> getReader(ReaderCacheKey key, Supplier<Reader<?>> readerSupplier) {
return readers.computeIfAbsent(key, __ -> readerSupplier.get());
}
/**
@@ -185,9 +193,9 @@ public class PulsarSpace implements AutoCloseable {
if (pulsarAdmin != null) pulsarAdmin.close();
if (pulsarClient != null) pulsarClient.close();
}
catch (Exception e) {
throw new PulsarAdapterUnexpectedException(
"Unexpected error when shutting down the Pulsar space \"" + spaceName + "\"!");
catch (Exception ex) {
String exp = "Unexpected error when shutting down the Pulsar adaptor space";
logger.error(exp, ex);
}
}

View File

@@ -33,7 +33,6 @@ import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.PULSAR_API_TYPE;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.READER_CONF_CUSTOM_KEY;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.READER_CONF_STD_KEY;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.READER_MSG_POSITION_TYPE;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
@@ -252,6 +251,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, Pu
cycleProducerName);
final ProducerCacheKey producerCacheKey = new ProducerCacheKey(producerName, topicName);
return this.pulsarSpace.getProducer(producerCacheKey, () -> {
final PulsarClient pulsarClient = this.pulsarSpace.getPulsarClient();

View File

@@ -17,7 +17,6 @@
package io.nosqlbench.adapter.s4j;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.util.*;
@@ -33,10 +32,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class S4JSpace implements AutoCloseable {
@@ -49,7 +48,17 @@ public class S4JSpace implements AutoCloseable {
// - JMS connection can have a number of JMS sessions (\"num_session\" NB CLI parameter).
// - Each JMS session has its own sets of JMS destinations, producers, consumers, etc.
private final ConcurrentHashMap<String, JMSContext> connLvlJmsContexts = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, S4JJMSContextWrapper> sessionLvlJmsContexts = new ConcurrentHashMap<>();
public record JMSGenObjCacheKey(String identifierStr) { }
private final ConcurrentHashMap<JMSGenObjCacheKey, S4JJMSContextWrapper> sessionLvlJmsContextWrappers =
new ConcurrentHashMap<>();
public record JMSDestinationCacheKey(String contextIdentifier,
String destinationType,
String destinationName) { }
protected final ConcurrentHashMap<JMSDestinationCacheKey, Destination> jmsDestinations = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<JMSGenObjCacheKey, JMSProducer> jmsProducers = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<JMSGenObjCacheKey, JMSConsumer> jmsConsumers = new ConcurrentHashMap<>();
private final String pulsarSvcUrl;
private final String webSvcUrl;
@@ -95,6 +104,7 @@ public class S4JSpace implements AutoCloseable {
private long totalCycleNum;
public S4JSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
@@ -114,7 +124,7 @@ public class S4JSpace implements AutoCloseable {
this.s4jClientConfFileName = cfg.get("config");
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(
cfg.getOptional("session_mode").orElse(""));
this.s4JClientConf = new S4JClientConf(pulsarSvcUrl, webSvcUrl, s4jClientConfFileName);
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
this.setS4JActivityStartTimeMills(System.currentTimeMillis());
@@ -149,12 +159,32 @@ public class S4JSpace implements AutoCloseable {
.asReadOnly();
}
public ConcurrentHashMap<String, JMSContext> getConnLvlJmsContexts() {
return connLvlJmsContexts;
public JMSContext getConnLvlJMSContext(String jmsContextIdentifier) {
return connLvlJmsContexts.get(jmsContextIdentifier);
}
public ConcurrentHashMap<String, S4JJMSContextWrapper> getSessionLvlJmsContexts() {
return sessionLvlJmsContexts;
public S4JJMSContextWrapper getS4JJMSContextWrapper(
JMSGenObjCacheKey key,
Supplier<S4JJMSContextWrapper> s4JJMSContextWrapperSupplier) {
return sessionLvlJmsContextWrappers.computeIfAbsent(key, __ -> s4JJMSContextWrapperSupplier.get());
}
public Destination getJmsDestination(
JMSDestinationCacheKey key,
Supplier<Destination> jmsDestinationSupplier) {
return jmsDestinations.computeIfAbsent(key, __ -> jmsDestinationSupplier.get());
}
public JMSProducer getJmsProducer(
JMSGenObjCacheKey key,
Supplier<JMSProducer> jmsProducerSupplier) {
return jmsProducers.computeIfAbsent(key, __ -> jmsProducerSupplier.get());
}
public JMSConsumer getJmsConsumer(
JMSGenObjCacheKey key,
Supplier<JMSConsumer> jmsConsumerSupplier) {
return jmsConsumers.computeIfAbsent(key, __ -> jmsConsumerSupplier.get());
}
public long getS4JActivityStartTimeMills() { return this.s4JActivityStartTimeMills; }
@@ -203,11 +233,10 @@ public class S4JSpace implements AutoCloseable {
for (int i=0; i<getMaxNumConn(); i++) {
// Establish a JMS connection
String connLvlJmsConnContextIdStr = getConnLvlJmsContextIdentifier(i);
String clientIdStr = Base64.getEncoder().encodeToString(connLvlJmsConnContextIdStr.getBytes());
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);
JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
jmsConnContext.setClientID(clientIdStr);
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
jmsConnContext.setExceptionListener(e -> {
if (logger.isDebugEnabled()) {
logger.error("onException::Unexpected JMS error happened:" + e);
@@ -215,7 +244,6 @@ public class S4JSpace implements AutoCloseable {
});
connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext);
if (logger.isDebugEnabled()) {
logger.debug("[Connection level JMSContext] {} -- {}",
Thread.currentThread().getName(),
@@ -243,7 +271,7 @@ public class S4JSpace implements AutoCloseable {
this.txnBatchTrackingCnt.remove();
for (S4JJMSContextWrapper s4JJMSContextWrapper : sessionLvlJmsContexts.values()) {
for (S4JJMSContextWrapper s4JJMSContextWrapper : sessionLvlJmsContextWrappers.values()) {
if (s4JJMSContextWrapper != null) {
if (s4JJMSContextWrapper.isTransactedMode()) {
s4JJMSContextWrapper.getJmsContext().rollback();
@@ -252,15 +280,19 @@ public class S4JSpace implements AutoCloseable {
}
}
for (JMSConsumer jmsConsumer : jmsConsumers.values()) {
if (jmsConsumer != null) jmsConsumer.close();
}
for (JMSContext jmsContext : connLvlJmsContexts.values()) {
if (jmsContext != null) jmsContext.close();
}
s4jConnFactory.close();
}
catch (Exception e) {
e.printStackTrace();
throw new S4JAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
catch (Exception ex) {
String exp = "Unexpected error when shutting down the S4J adaptor space";
logger.error(exp, ex);
}
}
@@ -332,13 +364,15 @@ public class S4JSpace implements AutoCloseable {
}
public String getConnLvlJmsContextIdentifier(int jmsConnSeqNum) {
return S4JAdapterUtil.buildCacheKey(
return String.join(
"::",
this.spaceName,
StringUtils.join("conn-", jmsConnSeqNum));
}
public String getSessionLvlJmsContextIdentifier(int jmsConnSeqNum, int jmsSessionSeqNum) {
return S4JAdapterUtil.buildCacheKey(
return String.join(
"::",
this.spaceName,
StringUtils.join("conn-", jmsConnSeqNum),
StringUtils.join("session-", jmsSessionSeqNum));
@@ -381,65 +415,4 @@ public class S4JSpace implements AutoCloseable {
return jmsConnContext;
}
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long curCycle) {
return getOrCreateS4jJmsContextWrapper(curCycle, null);
}
// Get the next JMSContext Wrapper in the following approach
// - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection):
// c0s0, c0s1, c1s0, c1s1, c2s0, c2s1
// - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session
// When reaching the end of connection, move back to the first connection, but get the next session.
// e.g. first: c0s0 (0)
// next: c1s0 (1)
// next: c2s0 (2)
// next: c0s1 (3)
// next: c1s1 (4)
// next: c2s1 (5)
// next: c0s0 (6) <-- repeat the pattern
// next: c1s0 (7)
// next: c2s0 (8)
// next: c0s1 (9)
// ... ...
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(
long curCycle,
Map<String, Object> overrideS4jConfMap)
{
int totalConnNum = getMaxNumConn();
int totalSessionPerConnNum = getMaxNumSessionPerConn();
int connSeqNum = (int) curCycle % totalConnNum;
int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum;
String jmsConnContextIdStr = getConnLvlJmsContextIdentifier(connSeqNum);
JMSContext connLvlJmsContext = connLvlJmsContexts.get(jmsConnContextIdStr);
// Connection level JMSContext objects should be already created during the initialization phase
assert (connLvlJmsContext != null);
String jmsSessionContextIdStr = getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum);
S4JJMSContextWrapper jmsContextWrapper = sessionLvlJmsContexts.get(jmsSessionContextIdStr);
if (jmsContextWrapper == null) {
JMSContext jmsContext = null;
if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) {
jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode());
} else {
jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext(
connLvlJmsContext.getSessionMode(), overrideS4jConfMap);
}
jmsContextWrapper = new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext);
sessionLvlJmsContexts.put(jmsSessionContextIdStr, jmsContextWrapper);
if (logger.isDebugEnabled()) {
logger.debug("[Session level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsContextWrapper);
}
}
return jmsContextWrapper;
}
}

View File

@@ -81,13 +81,13 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
this.noLocal =
parsedOp.getStaticConfigOr("no_local", Boolean.FALSE);
this.readTimeout =
parsedOp.getStaticConfigOr("read_timeout", Integer.valueOf(0));
parsedOp.getStaticConfigOr("read_timeout", 0);
this.recvNoWait =
parsedOp.getStaticConfigOr("no_wait", Boolean.FALSE);
this.msgAckRatio =
parsedOp.getStaticConfigOr("msg_ack_ratio", Float.valueOf(1.0f));
parsedOp.getStaticConfigOr("msg_ack_ratio", 1.0f);
this.slowAckInSec =
parsedOp.getStaticConfigOr("slow_ack_in_sec", Integer.valueOf(0));
parsedOp.getStaticConfigOr("slow_ack_in_sec", 0);
this.localMsgSelectorFunc =
lookupOptionalStrOpValueFunc("msg_selector");
@@ -123,14 +123,13 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
@Override
public MessageConsumerOp apply(long cycle) {
S4JJMSContextWrapper s4JJMSContextWrapper =
s4jSpace.getOrCreateS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap);
S4JJMSContextWrapper s4JJMSContextWrapper = getS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransact = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
boolean commitTransact = super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
Destination destination;
try {
destination = getOrCreateJmsDestination(
destination = getJmsDestination(
s4JJMSContextWrapper, temporaryDest, destType, destNameStrFunc.apply(cycle));
}
catch (JMSRuntimeException jmsRuntimeException) {
@@ -139,7 +138,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
JMSConsumer jmsConsumer;
try {
jmsConsumer = getOrCreateJmsConsumer(
jmsConsumer = getJmsConsumer(
s4JJMSContextWrapper,
destination,
destType,

View File

@@ -172,7 +172,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
if (value != null) {
String destType = StringUtils.substringBefore(value, ':');
String destName = StringUtils.substringAfter(value, ':');
outMessage.setJMSReplyTo(getOrCreateJmsDestination(s4JJMSContextWrapper,false, destType, destName));
outMessage.setJMSReplyTo(getJmsDestination(s4JJMSContextWrapper,false, destType, destName));
}
}
// Ignore these headers - handled by S4J API automatically
@@ -279,13 +279,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
throw new S4JAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}
S4JJMSContextWrapper s4JJMSContextWrapper = s4jSpace.getOrCreateS4jJmsContextWrapper(cycle);
S4JJMSContextWrapper s4JJMSContextWrapper = getS4jJmsContextWrapper(cycle);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransaction = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
boolean commitTransaction = super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
Destination destination;
try {
destination = getOrCreateJmsDestination(s4JJMSContextWrapper, temporaryDest, destType, destName);
destination = getJmsDestination(s4JJMSContextWrapper, temporaryDest, destType, destName);
}
catch (JMSRuntimeException jmsRuntimeException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS destination!");
@@ -293,7 +293,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
JMSProducer producer;
try {
producer = getOrCreateJmsProducer(s4JJMSContextWrapper, asyncAPI);
producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI);
}
catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!");

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.adapter.s4j.dispensers;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.adapter.s4j.util.*;
@@ -30,7 +31,6 @@ import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -43,12 +43,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
protected final S4JSpace s4jSpace;
protected final S4JAdapterMetrics s4jAdapterMetrics;
private final ConcurrentHashMap<String, JMSContext> connLvlJmsContexts = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, S4JJMSContextWrapper> sessionLvlJmsContexts = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, Destination> jmsDestinations = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, JMSProducer> jmsProducers = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, JMSConsumer> jmsConsumers = new ConcurrentHashMap<>();
// Doc-level parameter: temporary_dest (default: false)
protected final boolean temporaryDest;
// Doc-level parameter: dest_type (default: Topic)
@@ -73,11 +67,9 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
this.parsedOp = op;
this.s4jSpace = s4jSpace;
this.connLvlJmsContexts.putAll(s4jSpace.getConnLvlJmsContexts());
this.sessionLvlJmsContexts.putAll(s4jSpace.getSessionLvlJmsContexts());
String defaultMetricsPrefix = parsedOp.getLabels().linearize("activity");
this.s4jAdapterMetrics = new S4JAdapterMetrics(defaultMetricsPrefix);
this.s4jAdapterMetrics = new S4JAdapterMetrics(this);
s4jAdapterMetrics.initS4JAdapterInstrumentation();
this.destNameStrFunc = destNameStrFunc;
@@ -102,7 +94,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
LongFunction<Boolean> booleanLongFunction;
booleanLongFunction = l -> parsedOp.getOptionalStaticConfig(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> BooleanUtils.toBoolean(value))
.map(BooleanUtils::toBoolean)
.orElse(defaultValue);
logger.info("{}: {}", paramName, booleanLongFunction.apply(0));
return booleanLongFunction;
@@ -133,7 +125,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
LongFunction<Integer> integerLongFunction;
integerLongFunction = l -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> NumberUtils.toInt(value))
.map(NumberUtils::toInt)
.map(value -> {
if (0 > value) return 0;
return value;
@@ -164,10 +156,71 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
return stringLongFunction;
}
public S4JJMSContextWrapper getS4jJmsContextWrapper(long curCycle) {
return getS4jJmsContextWrapper(curCycle, null);
}
// Get the next JMSContext Wrapper in the following approach
// - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection):
// c0s0, c0s1, c1s0, c1s1, c2s0, c2s1
// - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session
// When reaching the end of connection, move back to the first connection, but get the next session.
// e.g. first: c0s0 (0)
// next: c1s0 (1)
// next: c2s0 (2)
// next: c0s1 (3)
// next: c1s1 (4)
// next: c2s1 (5)
// next: c0s0 (6) <-- repeat the pattern
// next: c1s0 (7)
// next: c2s0 (8)
// next: c0s1 (9)
// ... ...
public S4JJMSContextWrapper getS4jJmsContextWrapper(
long curCycle,
Map<String, Object> overrideS4jConfMap)
{
int totalConnNum = s4jSpace.getMaxNumConn();
int totalSessionPerConnNum = s4jSpace.getMaxNumSessionPerConn();
int connSeqNum = (int) curCycle % totalConnNum;
int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum;
JMSContext connLvlJmsContext = s4jSpace.getConnLvlJMSContext(s4jSpace.getConnLvlJmsContextIdentifier(connSeqNum));
// Connection level JMSContext objects should be already created during the initialization phase
assert (connLvlJmsContext != null);
String jmsSessionContextIdStr = s4jSpace.getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum);
S4JSpace.JMSGenObjCacheKey jmsContextWrapperCacheKey =
new S4JSpace.JMSGenObjCacheKey(jmsSessionContextIdStr);
return s4jSpace.getS4JJMSContextWrapper(jmsContextWrapperCacheKey, () -> {
JMSContext jmsContext = null;
if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) {
jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode());
} else {
jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext(
connLvlJmsContext.getSessionMode(), overrideS4jConfMap);
}
S4JJMSContextWrapper s4JJMSContextWrapper =
new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext);
if (logger.isDebugEnabled()) {
logger.debug("[Session level JMSContext] {} -- {}",
Thread.currentThread().getName(),
s4JJMSContextWrapper);
}
return s4JJMSContextWrapper;
});
}
/**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/
public Destination getOrCreateJmsDestination(
public Destination getJmsDestination(
S4JJMSContextWrapper s4JJMSContextWrapper,
boolean tempDest,
String destType,
@@ -176,54 +229,58 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer();
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
// Regular, non-temporary destination
if (!tempDest) {
String destinationCacheKey = S4JAdapterUtil.buildCacheKey(jmsContextIdStr, destType, destName);
Destination destination = jmsDestinations.get(destinationCacheKey);
S4JSpace.JMSDestinationCacheKey destinationCacheKey =
new S4JSpace.JMSDestinationCacheKey(jmsContextIdStr, destType, destName);
if (null == destination) {
return s4jSpace.getJmsDestination(destinationCacheKey, () -> {
Destination destination;
// Regular, non-temporary destination
if (!tempDest) {
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
destination = jmsContext.createQueue(destName);
} else {
destination = jmsContext.createTopic(destName);
}
jmsDestinations.put(destinationCacheKey, destination);
}
// Temporary destination
else {
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
destination = jmsContext.createTemporaryQueue();
}
else {
destination = jmsContext.createTemporaryTopic();
}
}
return destination;
}
// Temporary destination
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
return jmsContext.createTemporaryQueue();
}
return jmsContext.createTemporaryTopic();
});
}
// Get simplified NB thread name
private String getSimplifiedNBThreadName(String fullThreadName) {
assert StringUtils.isNotBlank(fullThreadName);
if (StringUtils.contains(fullThreadName, '/')) return StringUtils.substringAfterLast(fullThreadName, "/");
return fullThreadName;
if (StringUtils.contains(fullThreadName, '/'))
return StringUtils.substringAfterLast(fullThreadName, "/");
else
return fullThreadName;
}
/**
* If the JMS producer that corresponds to a destination exists, reuse it; Otherwise, create it
*/
public JMSProducer getOrCreateJmsProducer(
public JMSProducer getJmsProducer(
S4JJMSContextWrapper s4JJMSContextWrapper,
boolean asyncApi) throws JMSException
{
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
String producerCacheKey = S4JAdapterUtil.buildCacheKey(
getSimplifiedNBThreadName(Thread.currentThread().getName()), "producer");
JMSProducer jmsProducer = jmsProducers.get(producerCacheKey);
S4JSpace.JMSGenObjCacheKey producerCacheKey =
new S4JSpace.JMSGenObjCacheKey(
String.join("::",
getSimplifiedNBThreadName(Thread.currentThread().getName()), "producer"));
if (null == jmsProducer) {
jmsProducer = jmsContext.createProducer();
return s4jSpace.getJmsProducer(producerCacheKey, () -> {
JMSProducer jmsProducer = jmsContext.createProducer();
if (asyncApi) {
jmsProducer.setAsync(new S4JCompletionListener(s4jSpace, this));
@@ -234,16 +291,14 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
producerCacheKey, jmsProducer, s4JJMSContextWrapper);
}
jmsProducers.put(producerCacheKey, jmsProducer);
}
return jmsProducer;
return jmsProducer;
});
}
/**
* If the JMS consumer that corresponds to a destination(, subscription, message selector) exists, reuse it; Otherwise, create it
*/
public JMSConsumer getOrCreateJmsConsumer(
public JMSConsumer getJmsConsumer(
S4JJMSContextWrapper s4JJMSContextWrapper,
Destination destination,
String destType,
@@ -258,11 +313,15 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
{
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean isTopic = StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.TOPIC.label);
String consumerCacheKey = S4JAdapterUtil.buildCacheKey(
getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer");
JMSConsumer jmsConsumer = jmsConsumers.get(consumerCacheKey);
if (null == jmsConsumer) {
S4JSpace.JMSGenObjCacheKey consumerCacheKey =
new S4JSpace.JMSGenObjCacheKey(
String.join("::",
getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer"));
return s4jSpace.getJmsConsumer(consumerCacheKey, () -> {
JMSConsumer jmsConsumer;
if (isTopic) {
if (!durable && !shared)
jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal);
@@ -271,13 +330,15 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
throw new RuntimeException("Subscription name is required for receiving messages from a durable or shared topic!");
}
if (durable && !shared) jmsConsumer = jmsContext.createDurableConsumer(
(Topic) destination, subName, msgSelector, nonLocal);
if (durable && !shared)
jmsConsumer = jmsContext.createDurableConsumer((Topic) destination, subName, msgSelector, nonLocal);
else if (!durable)
jmsConsumer = jmsContext.createSharedConsumer((Topic) destination, subName, msgSelector);
else jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) destination, subName, msgSelector);
else
jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) destination, subName, msgSelector);
}
} else {
}
else {
jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal);
}
@@ -291,10 +352,8 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
consumerCacheKey, jmsConsumer, s4JJMSContextWrapper);
}
jmsConsumers.put(consumerCacheKey, jmsConsumer);
}
return jmsConsumer;
return jmsConsumer;
});
}
protected boolean commitTransaction(int txnBatchNum, int jmsSessionMode, long curCycleNum) {
@@ -320,6 +379,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
s4jSpace.incTxnBatchTrackingCnt();
}
return !commitTransaction;
return commitTransaction;
}
}

View File

@@ -18,57 +18,41 @@ package io.nosqlbench.adapter.s4j.util;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.adapter.s4j.dispensers.S4JBaseOpDispenser;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S4JAdapterMetrics implements NBLabeledElement {
public class S4JAdapterMetrics {
private static final Logger logger = LogManager.getLogger("S4JAdapterMetrics");
private final String defaultAdapterMetricsPrefix;
private final S4JBaseOpDispenser s4jBaseOpDispenser;
private Histogram messageSizeHistogram;
private Timer bindTimer;
private Timer executeTimer;
public S4JAdapterMetrics(String defaultMetricsPrefix) {
this.defaultAdapterMetricsPrefix = defaultMetricsPrefix;
}
public String getName() {
return "S4JAdapterMetrics";
public S4JAdapterMetrics(final S4JBaseOpDispenser s4jBaseOpDispenser) {
this.s4jBaseOpDispenser = s4jBaseOpDispenser;
}
public void initS4JAdapterInstrumentation() {
// Histogram metrics
this.messageSizeHistogram =
ActivityMetrics.histogram(
this,
defaultAdapterMetricsPrefix + "message_size",
ActivityMetrics.DEFAULT_HDRDIGITS);
ActivityMetrics.histogram(this.s4jBaseOpDispenser,
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
// Timer metrics
this.bindTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "bind",
ActivityMetrics.DEFAULT_HDRDIGITS);
ActivityMetrics.timer(this.s4jBaseOpDispenser,
"bind", ActivityMetrics.DEFAULT_HDRDIGITS);
this.executeTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
ActivityMetrics.timer(this.s4jBaseOpDispenser,
"execute", ActivityMetrics.DEFAULT_HDRDIGITS);
}
public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return executeTimer; }
public Histogram getMessagesizeHistogram() { return messageSizeHistogram; }
@Override
public NBLabels getLabels() {
return NBLabels.forKV("name", getName());
}
}

View File

@@ -20,7 +20,6 @@ package io.nosqlbench.adapter.s4j.util;
import com.datastax.oss.pulsar.jms.PulsarJMSConstants;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.adapter.s4j.S4JOpType;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@@ -30,7 +29,9 @@ import javax.jms.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class S4JAdapterUtil {
@@ -66,12 +67,6 @@ public class S4JAdapterUtil {
this.label = label;
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidDocLevelParamList() {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// JMS Destination Types
public enum JMS_DEST_TYPES {
@@ -83,12 +78,6 @@ public class S4JAdapterUtil {
this.label = label;
}
}
public static boolean isValidJmsDestType(String type) {
return Arrays.stream(JMS_DEST_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String getValidJmsDestTypeList() {
return Arrays.stream(JMS_DEST_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// Standard JMS message headers (by JMS specification)
public enum JMS_MSG_HEADER_STD {
@@ -107,12 +96,16 @@ public class S4JAdapterUtil {
JMS_MSG_HEADER_STD(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isValidStdJmsMsgHeader(String header) {
return Arrays.stream(JMS_MSG_HEADER_STD.values()).anyMatch(t -> t.label.equals(header));
}
public static String getValidStdJmsMsgHeaderList() {
return Arrays.stream(JMS_MSG_HEADER_STD.values()).map(t -> t.label).collect(Collectors.joining(", "));
return JMS_MSG_HEADER_STD.isValidLabel(header);
}
// JMS defined message properties (by JMS specification)
@@ -133,12 +126,6 @@ public class S4JAdapterUtil {
this.label = label;
}
}
public static boolean isValidJmsDfndMsgProp(String property) {
return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).anyMatch(t -> t.label.equals(property));
}
public static String getValidJmsDfndMsgPropList() {
return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public final static String NB_MSG_SEQ_PROP = "NBMsgSeqProp";
public final static String NB_MSG_SIZE_PROP = "NBMsgSize";
@@ -155,12 +142,16 @@ public class S4JAdapterUtil {
JMS_SESSION_MODES(String label) {
this.label = label;
}
}
public static boolean isValidJmsSessionMode(String mode) {
return Arrays.stream(JMS_SESSION_MODES.values()).anyMatch(t -> t.label.equals(mode));
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static String getValidJmsSessionModeList() {
return Arrays.stream(JMS_SESSION_MODES.values()).map(t -> t.label).collect(Collectors.joining(", "));
return StringUtils.join(JMS_SESSION_MODES.LABELS, ", ");
}
// JMS Message Types
@@ -175,12 +166,16 @@ public class S4JAdapterUtil {
JMS_MESSAGE_TYPES(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isValidJmsMessageType(String type) {
return Arrays.stream(JMS_MESSAGE_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String getValidJmsMessageTypeList() {
return Arrays.stream(JMS_MESSAGE_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
return JMS_MESSAGE_TYPES.isValidLabel(type);
}
// JMS Message Types
@@ -198,12 +193,16 @@ public class S4JAdapterUtil {
JMS_MSG_PROP_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidJmsMsgPropType(String type) {
return Arrays.stream(JMS_MSG_PROP_TYPES.values()).anyMatch(t -> t.label.equals(type));
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static String getValidJmsMsgPropTypeList() {
return Arrays.stream(JMS_MSG_PROP_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
return StringUtils.join(JMS_MESSAGE_TYPES.LABELS, ", ");
}
///////
@@ -306,19 +305,12 @@ public class S4JAdapterUtil {
return "";
}
///////
// Calculate a unique cache key from a series of input parameters
public static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
}
///////
// Pause the execution of the current thread
public static void pauseCurThreadExec(int pauseInSec) {
if (pauseInSec > 0) {
try {
Thread.sleep(pauseInSec * 1000);
Thread.sleep(pauseInSec * 1000L);
}
catch (InterruptedException ie) {
ie.printStackTrace();

View File

@@ -1,7 +1,15 @@
###########
# Overview: Starlight for JMS (S4J) API configuration items are listed at:
# https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options
enableTransaction=true
##
# NOTE 1: this requires first enabling corresponding server side configurations as listed in the following doc
# https://pulsar.apache.org/docs/3.0.x/txn-use/#steps
#
# NOTE 2: Astra streaming doesn't have transaction enabled (yet). Need to set this value to 'false'
# in order to successfully connect to AS.
##
enableTransaction=false
####
# S4J API specific configurations (non Pulsar specific) - jms.***
@@ -9,9 +17,23 @@ enableTransaction=true
jms.usePulsarAdmin=false
jms.precreateQueueSubscription=false
jms.enableClientSideEmulation=false
jms.useServerSideFiltering=true
# NOTE 1: this requires first enabling corresponding server side configurations as listed in the following doc
# https://docs.datastax.com/en/streaming/starlight-for-jms/3.2/examples/pulsar-jms-server-side-filters.html#enable-server-side-filtering
#
# NOTE 2: Astra streaming doesn't have server side filtering enabled (yet). Need to set this value to 'false'
# in order to successfully connect to AS.
jms.useServerSideFiltering=false
jms.useCredentialsFromCreateConnection=false
jms.transactionsStickyPartitions=true
##
# NOTE: When connecting to AS, the default "public" tenant is not available, you have to use a specific AS tenant.
# Otherwise, you'll get authorization error when trying to create topi under "public/default"
##
#jms.systemNamespace=<AS_tenant>/default
# for JMS priority
jms.enableJMSPriority=true
jms.priorityMapping=non-linear
@@ -26,8 +48,13 @@ jms.priorityMapping=non-linear
# directly used as S4J configuration settings, on 1-to-1 basis.
#--------------------------------------
client.connectionTimeoutMs=5000
##
# NOTE: when connecting to AS, make sure you enable the following settings and
# put here the corresponding token information
##
#client.authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
#client.authParams=
#client.authParams=token:<token_value>
#...
@@ -55,7 +82,11 @@ producer.blockIfQueueFull=true
consumer.receiverQueueSize=2000
consumer.acknowledgementsGroupTimeMicros=0
consumer.ackTimeoutMillis=2000
consumer.deadLetterPolicy={ "maxRedeliverCount":"5", "deadLetterTopic":"", "initialSubscriptionName":"" }
consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"50", "maxDelayMs":"100", "multiplier":"2.0"}
consumer.negativeAckRedeliveryBackoff={}
##
# The following settings are only needed for DLQ testing
##
#consumer.deadLetterPolicy={ "maxRedeliverCount":"5", "deadLetterTopic":"", "initialSubscriptionName":"" }
#consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"50", "maxDelayMs":"100", "multiplier":"2.0"}
#consumer.negativeAckRedeliveryBackoff={}
#...