1) NB Kafka and NB S4J adaptor code efficiency improvement (following PR 10115 by Lari on Pulsar adaptor)

2) Downgrade NB Pulsar version from 2.11.1 to 2.10.4 to avoid NB S4J execution failure (runtime version conflict)
3) Fix S4J Adaptor metrics labeling conflict
4) Change the space shutdown logic for NB Pulsar, Kafka, and S4J such that to simply log an error message instead of throwing out a run-time exception (NB is shutting down anyway).
5) Minor update of NB Kafka and NB S4J properties file and add settings for connecting to Astra Streaming
This commit is contained in:
yabinmeng
2023-05-16 19:59:39 -05:00
parent 69927c5350
commit 7a5831b4ff
19 changed files with 446 additions and 389 deletions

View File

@@ -16,8 +16,8 @@
package io.nosqlbench.adapter.kafka;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterUnexpectedException;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaClient;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaConsumer;
import io.nosqlbench.adapter.kafka.ops.OpTimeTrackKafkaProducer;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.kafka.util.KafkaClientConf;
import io.nosqlbench.api.config.standard.ConfigModel;
@@ -29,8 +29,10 @@ import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class KafkaSpace implements AutoCloseable {
@@ -41,7 +43,6 @@ public class KafkaSpace implements AutoCloseable {
// TODO: currently this NB Kafka driver only supports String type for message key and value
// add schema support in the future
private final ConcurrentHashMap<String, OpTimeTrackKafkaClient> opTimeTrackKafkaClients = new ConcurrentHashMap<>();
private final String bootstrapSvr;
private final String kafkaClientConfFileName;
@@ -75,6 +76,18 @@ public class KafkaSpace implements AutoCloseable {
private AtomicBoolean beingShutdown = new AtomicBoolean(false);
public record ProducerCacheKey(String producerName, String topicName, String clientId) {
}
private final ConcurrentHashMap<ProducerCacheKey, OpTimeTrackKafkaProducer> producers =
new ConcurrentHashMap<>();
public record ConsumerCacheKey(String consumerName, List<String> topicList, String clientId) {
}
private final ConcurrentHashMap<ConsumerCacheKey, OpTimeTrackKafkaConsumer> consumers =
new ConcurrentHashMap<>();
public KafkaSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
@@ -115,11 +128,16 @@ public class KafkaSpace implements AutoCloseable {
.asReadOnly();
}
public OpTimeTrackKafkaClient getOpTimeTrackKafkaClient(String cacheKey) {
return opTimeTrackKafkaClients.get(cacheKey);
public OpTimeTrackKafkaProducer getOpTimeTrackKafkaProducer(
ProducerCacheKey key,
Supplier<OpTimeTrackKafkaProducer> producerSupplier) {
return producers.computeIfAbsent(key, __ -> producerSupplier.get());
}
public void addOpTimeTrackKafkaClient(String cacheKey, OpTimeTrackKafkaClient client) {
opTimeTrackKafkaClients.put(cacheKey, client);
public OpTimeTrackKafkaConsumer getOpTimeTrackKafkaConsumer(
ConsumerCacheKey key,
Supplier<OpTimeTrackKafkaConsumer> consumerSupplier) {
return consumers.computeIfAbsent(key, __ -> consumerSupplier.get());
}
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
@@ -135,23 +153,27 @@ public class KafkaSpace implements AutoCloseable {
public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
public boolean isShuttigDown() {
public boolean isShuttingDown() {
return beingShutdown.get();
}
public void shutdownSpace() {
try {
beingShutdown.set(true);
for (OpTimeTrackKafkaClient client : opTimeTrackKafkaClients.values()) {
client.close();
for (OpTimeTrackKafkaProducer producer : producers.values()) {
producer.close();
}
for (OpTimeTrackKafkaConsumer consumer : consumers.values()) {
consumer.close();
}
// Pause 5 seconds before closing producers/consumers
KafkaAdapterUtil.pauseCurThreadExec(5);
}
catch (Exception e) {
e.printStackTrace();
throw new KafkaAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
catch (Exception ex) {
String exp = "Unexpected error when shutting down the Kafka adaptor space";
logger.error(exp, ex);
}
}
}

View File

@@ -20,7 +20,6 @@ import io.nosqlbench.adapter.kafka.KafkaSpace;
import io.nosqlbench.adapter.kafka.exception.KafkaAdapterInvalidParamException;
import io.nosqlbench.adapter.kafka.ops.KafkaOp;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterMetrics;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil;
import io.nosqlbench.adapter.kafka.util.KafkaAdapterUtil.DOC_LEVEL_PARAMS;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
@@ -133,5 +132,4 @@ public abstract class KafkaBaseOpDispenser extends BaseOpDispenser<KafkaOp, Kaf
public String getName() {
return "KafkaBaseOpDispenser";
}
}

View File

@@ -36,7 +36,6 @@ import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.LongFunction;
import java.util.stream.Collectors;
public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
@@ -97,52 +96,6 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
return defaultGrpNamePrefix + '-' + grpIdx;
}
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaConsumer(
final long cycle,
final List<String> topicNameList,
final String groupId)
{
final String topicNameListStr = topicNameList.stream()
.collect(Collectors.joining("::"));
final String cacheKey = KafkaAdapterUtil.buildCacheKey(
"consumer-" + cycle % this.kafkaClntCnt, topicNameListStr, groupId );
OpTimeTrackKafkaClient opTimeTrackKafkaClient = this.kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (null == opTimeTrackKafkaClient) {
final Properties consumerConfProps = new Properties();
consumerConfProps.putAll(this.consumerClientConfMap);
consumerConfProps.put("group.id", groupId);
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfProps);
synchronized (this) {
consumer.subscribe(topicNameList);
}
if (MessageConsumerOpDispenser.logger.isDebugEnabled())
MessageConsumerOpDispenser.logger.debug("Kafka consumer created: {}/{} -- {}, {}, {}",
cacheKey,
consumer,
topicNameList,
this.autoCommitEnabled,
this.maxMsgCntPerCommit);
opTimeTrackKafkaClient = new OpTimeTrackKafkaConsumer(
this.kafkaSpace,
this.asyncAPI,
this.msgPollIntervalInSec,
this.autoCommitEnabled,
this.maxMsgCntPerCommit,
consumer,
this.kafkaAdapterMetrics,
EndToEndStartingTimeSource.valueOf(this.e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
this::getReceivedMessageSequenceTracker,
this.seqTrackingFunc.apply(cycle));
this.kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
return opTimeTrackKafkaClient;
}
private ReceivedMessageSequenceTracker getReceivedMessageSequenceTracker(final String topicName) {
return this.receivedMessageSequenceTrackersForTopicThreadLocal.get()
.computeIfAbsent(topicName, k -> this.createReceivedMessageSequenceTracker());
@@ -159,10 +112,48 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
assert StringUtils.isNotBlank(explicitTopicListStr);
return Arrays.stream(StringUtils.split(explicitTopicListStr, ','))
.filter(s -> StringUtils.isNotBlank(s))
.filter(StringUtils::isNotBlank)
.toList();
}
private OpTimeTrackKafkaConsumer getTimeTrackKafkaConsumer(final long cycle,
final List<String> topicNameList,
final String groupId)
{
final String consumerName = "consumer-" + cycle % this.kafkaClntCnt;
KafkaSpace.ConsumerCacheKey consumerCacheKey =
new KafkaSpace.ConsumerCacheKey(consumerName, topicNameList, groupId);
return kafkaSpace.getOpTimeTrackKafkaConsumer(consumerCacheKey, () -> {
final Properties consumerConfProps = new Properties();
consumerConfProps.putAll(this.consumerClientConfMap);
consumerConfProps.put("group.id", groupId);
final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfProps);
synchronized (this) {
consumer.subscribe(topicNameList);
}
if (MessageConsumerOpDispenser.logger.isDebugEnabled())
MessageConsumerOpDispenser.logger.debug(
"Kafka consumer created: {} -- autoCommitEnabled: {}, maxMsgCntPerCommit: {}",
consumer,
this.autoCommitEnabled,
this.maxMsgCntPerCommit);
return new OpTimeTrackKafkaConsumer(
this.kafkaSpace,
this.asyncAPI,
this.msgPollIntervalInSec,
this.autoCommitEnabled,
this.maxMsgCntPerCommit,
consumer,
this.kafkaAdapterMetrics,
EndToEndStartingTimeSource.valueOf(this.e2eStartTimeSrcParamStrFunc.apply(cycle).toUpperCase()),
this::getReceivedMessageSequenceTracker,
this.seqTrackingFunc.apply(cycle));
});
}
@Override
public KafkaOp apply(final long cycle) {
final List<String> topicNameList = this.getEffectiveTopicNameList(cycle);
@@ -171,7 +162,7 @@ public class MessageConsumerOpDispenser extends KafkaBaseOpDispenser {
"Effective consumer group name and/or topic names are needed for creating a consumer!");
final OpTimeTrackKafkaClient opTimeTrackKafkaConsumer =
this.getOrCreateOpTimeTrackKafkaConsumer(cycle, topicNameList, groupId);
this.getTimeTrackKafkaConsumer(cycle, topicNameList, groupId);
return new KafkaOp(
this.kafkaAdapterMetrics,

View File

@@ -72,7 +72,7 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
producerClientConfMap.putAll(kafkaSpace.getKafkaClientConf().getProducerConfMap());
this.producerClientConfMap.put("bootstrap.servers", kafkaSpace.getBootstrapSvr());
txnBatchNum = this.parsedOp.getStaticConfigOr("txn_batch_num", Integer.valueOf(0));
txnBatchNum = this.parsedOp.getStaticConfigOr("txn_batch_num", 0);
msgHeaderJsonStrFunc = this.lookupOptionalStrOpValueFunc(MessageProducerOpDispenser.MSG_HEADER_OP_PARAM);
msgKeyStrFunc = this.lookupOptionalStrOpValueFunc(MessageProducerOpDispenser.MSG_KEY_OP_PARAM);
@@ -94,62 +94,6 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
return "";
}
private OpTimeTrackKafkaClient getOrCreateOpTimeTrackKafkaProducer(final long cycle,
final String topicName,
final String clientId)
{
final String cacheKey = KafkaAdapterUtil.buildCacheKey(
"producer-" + cycle % this.kafkaClntCnt, topicName);
OpTimeTrackKafkaClient opTimeTrackKafkaClient = this.kafkaSpace.getOpTimeTrackKafkaClient(cacheKey);
if (null == opTimeTrackKafkaClient) {
final Properties producerConfProps = new Properties();
producerConfProps.putAll(this.producerClientConfMap);
if (StringUtils.isNotBlank(clientId)) {
producerConfProps.put("client.id", clientId);
} else {
producerConfProps.remove("client.id");
}
// When transaction batch number is less than 2, it is treated effectively as no-transaction
if (2 > txnBatchNum) {
producerConfProps.remove("transactional.id");
}
String baseTransactId = "";
boolean transactionEnabled = false;
if (producerConfProps.containsKey("transactional.id")) {
baseTransactId = producerConfProps.getProperty("transactional.id").toString();
producerConfProps.put("transactional.id", baseTransactId + '-' + cacheKey);
transactionEnabled = StringUtils.isNotBlank(producerConfProps.getProperty("transactional.id").toString());
}
final KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfProps);
if (transactionEnabled) producer.initTransactions();
if (MessageProducerOpDispenser.logger.isDebugEnabled())
MessageProducerOpDispenser.logger.debug("Producer created: {}/{} -- ({}, {}, {})",
cacheKey,
producer,
topicName,
transactionEnabled,
clientId);
opTimeTrackKafkaClient = new OpTimeTrackKafkaProducer(
this.kafkaSpace,
this.asyncAPI,
transactionEnabled,
this.txnBatchNum,
this.seqTrackingFunc.apply(cycle),
this.msgSeqErrSimuTypeSetFunc.apply(cycle),
producer);
this.kafkaSpace.addOpTimeTrackKafkaClient(cacheKey, opTimeTrackKafkaClient);
}
return opTimeTrackKafkaClient;
}
private ProducerRecord<String, String> createKafkaMessage(
final long curCycle,
final String topicName,
@@ -199,13 +143,69 @@ public class MessageProducerOpDispenser extends KafkaBaseOpDispenser {
return record;
}
public OpTimeTrackKafkaProducer getOpTimeTrackKafkaProducer(long cycle,
final String topicName,
final String clientId)
{
String producerName = "producer-" + cycle % this.kafkaClntCnt;
KafkaSpace.ProducerCacheKey producerCacheKey =
new KafkaSpace.ProducerCacheKey(producerName, topicName, clientId);
return kafkaSpace.getOpTimeTrackKafkaProducer(producerCacheKey, () -> {
final Properties producerConfProps = new Properties();
producerConfProps.putAll(this.producerClientConfMap);
if (StringUtils.isNotBlank(clientId)) {
producerConfProps.put("client.id", clientId);
} else {
producerConfProps.remove("client.id");
}
// When transaction batch number is less than 2, it is treated effectively as no-transaction
if (2 > txnBatchNum) {
producerConfProps.remove("transactional.id");
}
String baseTransactId = "";
boolean transactionEnabled = false;
if (producerConfProps.containsKey("transactional.id")) {
baseTransactId = producerConfProps.getProperty("transactional.id");
if (StringUtils.isNotBlank(baseTransactId)) {
producerConfProps.put(
"transactional.id",
baseTransactId + '-' + (cycle % this.kafkaClntCnt));
transactionEnabled = StringUtils.isNotBlank(producerConfProps.getProperty("transactional.id"));
}
}
final KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfProps);
if (transactionEnabled) producer.initTransactions();
if (MessageProducerOpDispenser.logger.isDebugEnabled())
MessageProducerOpDispenser.logger.debug(
"Producer created: {} -- transactionEnabled: {}, clientId: {})",
producer,
transactionEnabled,
clientId);
return new OpTimeTrackKafkaProducer(
this.kafkaSpace,
this.asyncAPI,
transactionEnabled,
this.txnBatchNum,
this.seqTrackingFunc.apply(cycle),
this.msgSeqErrSimuTypeSetFunc.apply(cycle),
producer);
});
}
@Override
public KafkaOp apply(final long cycle) {
final String topicName = this.topicNameStrFunc.apply(cycle);
final String clientId = this.getEffectiveClientId(cycle);
final OpTimeTrackKafkaClient opTimeTrackKafkaProducer =
this.getOrCreateOpTimeTrackKafkaProducer(cycle, topicName, clientId);
this.getOpTimeTrackKafkaProducer(cycle, topicName, clientId);
final ProducerRecord<String, String> message = this.createKafkaMessage(
cycle,

View File

@@ -29,9 +29,9 @@ abstract public class OpTimeTrackKafkaClient {
protected final long activityStartTime;
// Maximum time length to execute S4J operations (e.g. message send or consume)
// Maximum time length to execute Kafka operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes
// - 0 means no maximum time constraint. KafkaOp is always executed until NB execution cycle finishes
protected final long maxOpTimeInSec;
public OpTimeTrackKafkaClient(KafkaSpace kafkaSpace) {

View File

@@ -120,7 +120,7 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
@Override
void cycleMsgProcess(final long cycle, final Object cycleObj) {
if (this.kafkaSpace.isShuttigDown()) return;
if (this.kafkaSpace.isShuttingDown()) return;
synchronized (this) {
final ConsumerRecords<String, String> records = this.consumer.poll(this.msgPoolIntervalInMs);
@@ -225,8 +225,9 @@ public class OpTimeTrackKafkaConsumer extends OpTimeTrackKafkaClient {
catch (final IllegalStateException ise) {
// If a consumer is already closed, that's fine.
}
catch (final Exception e) {
e.printStackTrace();
catch (final Exception ex) {
logger.error(ex);
ex.printStackTrace();
}
}
}

View File

@@ -160,7 +160,7 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
// For producer, cycleObj represents a "message" (ProducerRecord)
assert null != cycleObj;
if (this.kafkaSpace.isShuttigDown()) {
if (this.kafkaSpace.isShuttingDown()) {
if (this.transactionEnabled) try {
this.producer.abortTransaction();
if (OpTimeTrackKafkaProducer.logger.isDebugEnabled())
@@ -221,17 +221,12 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
}
catch ( final ProducerFencedException | OutOfOrderSequenceException |
UnsupportedOperationException | AuthorizationException e) {
UnsupportedOperationException | AuthorizationException | IllegalStateException e) {
if (OpTimeTrackKafkaProducer.logger.isDebugEnabled())
OpTimeTrackKafkaProducer.logger.debug("Fatal error when sending a message ({}) - {}, {}",
cycle, this.producer, message);
throw new KafkaAdapterUnexpectedException(e);
}
catch (final IllegalStateException | KafkaException e) {
if (this.transactionEnabled) {
}
}
catch (final Exception e) {
throw new KafkaAdapterUnexpectedException(e);
}
@@ -252,8 +247,9 @@ public class OpTimeTrackKafkaProducer extends OpTimeTrackKafkaClient {
catch (final IllegalStateException ise) {
// If a producer is already closed, that's fine.
}
catch (final Exception e) {
e.printStackTrace();
catch (final Exception ex) {
logger.error(ex);
ex.printStackTrace();
}
}

View File

@@ -16,7 +16,6 @@
package io.nosqlbench.adapter.kafka.util;
import com.amazonaws.util.Base64;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
@@ -27,10 +26,8 @@ import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public enum KafkaAdapterUtil {
;
public class KafkaAdapterUtil {
public static final String MSG_SEQUENCE_NUMBER = "sequence_number";
private static final Logger logger = LogManager.getLogger(KafkaAdapterUtil.class);
@@ -51,12 +48,6 @@ public enum KafkaAdapterUtil {
this.label = label;
}
}
public static boolean isValidDocLevelParam(final String param) {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidDocLevelParamList() {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public static final String NB_MSG_SEQ_PROP = "NBMsgSeqProp";
public static final String NB_MSG_SIZE_PROP = "NBMsgSize";
@@ -82,13 +73,6 @@ public enum KafkaAdapterUtil {
return Arrays.asList(mapper.readValue(jsonStr, Object[].class));
}
public static String buildCacheKey(final String... keyParts) {
final String combinedStr = Arrays.stream(keyParts)
.filter(StringUtils::isNotBlank)
.collect(Collectors.joining("::"));
return Base64.encodeAsString(combinedStr.getBytes(StandardCharsets.UTF_8));
}
public static void pauseCurThreadExec(final int pauseInSec) {
if (0 < pauseInSec) try {
Thread.sleep(pauseInSec * 1000L);

View File

@@ -25,18 +25,26 @@ topic.flush.messages=2
topic.log.message.timestamp.type=CreateTime
#####
# Producer related configurations (global) - topic.***
# Producer related configurations (global) - producer.***
# - Valid settings: https://kafka.apache.org/documentation/#producerconfigs
#
#--------------------------------------
producer.key.serializer=org.apache.kafka.common.serialization.StringSerializer
producer.value.serializer=org.apache.kafka.common.serialization.StringSerializer
#producer.client.id=nbDftClient
producer.transactional.id=nbDftTxn
#producer.transactional.id=nbDftTxn
##
# NOTE: When connecting to Astra Streaming (with S4K enabled), enable the following settings and with
# the corresponding AS tenant and token information
##
#producer.security.protocol=SASL_SSL
#producer.sasl.mechanism=PLAIN
#producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<AS_tenant>" password="token:<AS_token>";
#####
# Consumer related configurations (global) - topic.***
# Consumer related configurations (global) - consumer.***
# - Valid settings: https://kafka.apache.org/documentation/#consumerconfigs
#
#--------------------------------------
@@ -45,3 +53,11 @@ consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserial
consumer.group.id=nbDftGrp
#consumer.isolation.level=read_uncommitted
#consumer.enable.auto.commit=true
##
# NOTE: When connecting to Astra Streaming (with S4K enabled), enable the following settings and with
# the corresponding AS tenant and token information
##
#consumer.security.protocol=SASL_SSL
#consumer.sasl.mechanism=PLAIN
#consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<AS_tenant>" password="token:<AS_token>";

View File

@@ -34,7 +34,10 @@
</description>
<properties>
<!-- Pulsar version 2.11.x causes NB S4J failure due to version conflict (S4J right now is on version 2.10.3)
<pulsar.version>2.11.1</pulsar.version>
-->
<pulsar.version>2.10.4</pulsar.version>
</properties>
<dependencies>

View File

@@ -16,7 +16,6 @@
package io.nosqlbench.adapter.pulsar;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.util.PulsarClientConf;
import io.nosqlbench.api.config.standard.ConfigModel;
@@ -54,10 +53,12 @@ public class PulsarSpace implements AutoCloseable {
public record ProducerCacheKey(String producerName, String topicName) {
}
private final ConcurrentHashMap<ProducerCacheKey, Producer<?>> producers = new ConcurrentHashMap<>();
public record ConsumerCacheKey(String consumerName, String subscriptionName, List<String> topicNameList, String topicPattern) {
public record ConsumerCacheKey(String consumerName,
String subscriptionName,
List<String> topicNameList,
String topicPattern) {
}
private final ConcurrentHashMap<ConsumerCacheKey, Consumer<?>> consumers = new ConcurrentHashMap<>();
@@ -100,11 +101,18 @@ public class PulsarSpace implements AutoCloseable {
public int getProducerSetCnt() { return producers.size(); }
public int getConsumerSetCnt() { return consumers.size(); }
public int getReaderSetCnt() { return readers.size(); }
public Producer<?> getProducer(ProducerCacheKey key, Supplier<Producer<?>> producerSupplier) { return producers.computeIfAbsent(key, __ -> producerSupplier.get()); }
public Consumer<?> getConsumer(ConsumerCacheKey key, Supplier<Consumer<?>> consumerSupplier) { return consumers.computeIfAbsent(key, __ -> consumerSupplier.get()); }
public Producer<?> getProducer(ProducerCacheKey key, Supplier<Producer<?>> producerSupplier) {
return producers.computeIfAbsent(key, __ -> producerSupplier.get());
}
public Reader<?> getReader(ReaderCacheKey key, Supplier<Reader<?>> readerSupplier) { return readers.computeIfAbsent(key, __ -> readerSupplier.get()); }
public Consumer<?> getConsumer(ConsumerCacheKey key, Supplier<Consumer<?>> consumerSupplier) {
return consumers.computeIfAbsent(key, __ -> consumerSupplier.get());
}
public Reader<?> getReader(ReaderCacheKey key, Supplier<Reader<?>> readerSupplier) {
return readers.computeIfAbsent(key, __ -> readerSupplier.get());
}
/**
@@ -185,9 +193,9 @@ public class PulsarSpace implements AutoCloseable {
if (pulsarAdmin != null) pulsarAdmin.close();
if (pulsarClient != null) pulsarClient.close();
}
catch (Exception e) {
throw new PulsarAdapterUnexpectedException(
"Unexpected error when shutting down the Pulsar space \"" + spaceName + "\"!");
catch (Exception ex) {
String exp = "Unexpected error when shutting down the Pulsar adaptor space";
logger.error(exp, ex);
}
}

View File

@@ -33,7 +33,6 @@ import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.PULSAR_API_TYPE;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.READER_CONF_CUSTOM_KEY;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.READER_CONF_STD_KEY;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.READER_MSG_POSITION_TYPE;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
@@ -252,6 +251,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, Pu
cycleProducerName);
final ProducerCacheKey producerCacheKey = new ProducerCacheKey(producerName, topicName);
return this.pulsarSpace.getProducer(producerCacheKey, () -> {
final PulsarClient pulsarClient = this.pulsarSpace.getPulsarClient();

View File

@@ -17,7 +17,6 @@
package io.nosqlbench.adapter.s4j;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.util.*;
@@ -33,10 +32,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class S4JSpace implements AutoCloseable {
@@ -49,7 +48,17 @@ public class S4JSpace implements AutoCloseable {
// - JMS connection can have a number of JMS sessions (\"num_session\" NB CLI parameter).
// - Each JMS session has its own sets of JMS destinations, producers, consumers, etc.
private final ConcurrentHashMap<String, JMSContext> connLvlJmsContexts = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, S4JJMSContextWrapper> sessionLvlJmsContexts = new ConcurrentHashMap<>();
public record JMSGenObjCacheKey(String identifierStr) { }
private final ConcurrentHashMap<JMSGenObjCacheKey, S4JJMSContextWrapper> sessionLvlJmsContextWrappers =
new ConcurrentHashMap<>();
public record JMSDestinationCacheKey(String contextIdentifier,
String destinationType,
String destinationName) { }
protected final ConcurrentHashMap<JMSDestinationCacheKey, Destination> jmsDestinations = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<JMSGenObjCacheKey, JMSProducer> jmsProducers = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<JMSGenObjCacheKey, JMSConsumer> jmsConsumers = new ConcurrentHashMap<>();
private final String pulsarSvcUrl;
private final String webSvcUrl;
@@ -95,6 +104,7 @@ public class S4JSpace implements AutoCloseable {
private long totalCycleNum;
public S4JSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
@@ -114,7 +124,7 @@ public class S4JSpace implements AutoCloseable {
this.s4jClientConfFileName = cfg.get("config");
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(
cfg.getOptional("session_mode").orElse(""));
this.s4JClientConf = new S4JClientConf(pulsarSvcUrl, webSvcUrl, s4jClientConfFileName);
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
this.setS4JActivityStartTimeMills(System.currentTimeMillis());
@@ -149,12 +159,32 @@ public class S4JSpace implements AutoCloseable {
.asReadOnly();
}
public ConcurrentHashMap<String, JMSContext> getConnLvlJmsContexts() {
return connLvlJmsContexts;
public JMSContext getConnLvlJMSContext(String jmsContextIdentifier) {
return connLvlJmsContexts.get(jmsContextIdentifier);
}
public ConcurrentHashMap<String, S4JJMSContextWrapper> getSessionLvlJmsContexts() {
return sessionLvlJmsContexts;
public S4JJMSContextWrapper getS4JJMSContextWrapper(
JMSGenObjCacheKey key,
Supplier<S4JJMSContextWrapper> s4JJMSContextWrapperSupplier) {
return sessionLvlJmsContextWrappers.computeIfAbsent(key, __ -> s4JJMSContextWrapperSupplier.get());
}
public Destination getJmsDestination(
JMSDestinationCacheKey key,
Supplier<Destination> jmsDestinationSupplier) {
return jmsDestinations.computeIfAbsent(key, __ -> jmsDestinationSupplier.get());
}
public JMSProducer getJmsProducer(
JMSGenObjCacheKey key,
Supplier<JMSProducer> jmsProducerSupplier) {
return jmsProducers.computeIfAbsent(key, __ -> jmsProducerSupplier.get());
}
public JMSConsumer getJmsConsumer(
JMSGenObjCacheKey key,
Supplier<JMSConsumer> jmsConsumerSupplier) {
return jmsConsumers.computeIfAbsent(key, __ -> jmsConsumerSupplier.get());
}
public long getS4JActivityStartTimeMills() { return this.s4JActivityStartTimeMills; }
@@ -203,11 +233,10 @@ public class S4JSpace implements AutoCloseable {
for (int i=0; i<getMaxNumConn(); i++) {
// Establish a JMS connection
String connLvlJmsConnContextIdStr = getConnLvlJmsContextIdentifier(i);
String clientIdStr = Base64.getEncoder().encodeToString(connLvlJmsConnContextIdStr.getBytes());
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);
JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
jmsConnContext.setClientID(clientIdStr);
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
jmsConnContext.setExceptionListener(e -> {
if (logger.isDebugEnabled()) {
logger.error("onException::Unexpected JMS error happened:" + e);
@@ -215,7 +244,6 @@ public class S4JSpace implements AutoCloseable {
});
connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext);
if (logger.isDebugEnabled()) {
logger.debug("[Connection level JMSContext] {} -- {}",
Thread.currentThread().getName(),
@@ -243,7 +271,7 @@ public class S4JSpace implements AutoCloseable {
this.txnBatchTrackingCnt.remove();
for (S4JJMSContextWrapper s4JJMSContextWrapper : sessionLvlJmsContexts.values()) {
for (S4JJMSContextWrapper s4JJMSContextWrapper : sessionLvlJmsContextWrappers.values()) {
if (s4JJMSContextWrapper != null) {
if (s4JJMSContextWrapper.isTransactedMode()) {
s4JJMSContextWrapper.getJmsContext().rollback();
@@ -252,15 +280,19 @@ public class S4JSpace implements AutoCloseable {
}
}
for (JMSConsumer jmsConsumer : jmsConsumers.values()) {
if (jmsConsumer != null) jmsConsumer.close();
}
for (JMSContext jmsContext : connLvlJmsContexts.values()) {
if (jmsContext != null) jmsContext.close();
}
s4jConnFactory.close();
}
catch (Exception e) {
e.printStackTrace();
throw new S4JAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
catch (Exception ex) {
String exp = "Unexpected error when shutting down the S4J adaptor space";
logger.error(exp, ex);
}
}
@@ -332,13 +364,15 @@ public class S4JSpace implements AutoCloseable {
}
public String getConnLvlJmsContextIdentifier(int jmsConnSeqNum) {
return S4JAdapterUtil.buildCacheKey(
return String.join(
"::",
this.spaceName,
StringUtils.join("conn-", jmsConnSeqNum));
}
public String getSessionLvlJmsContextIdentifier(int jmsConnSeqNum, int jmsSessionSeqNum) {
return S4JAdapterUtil.buildCacheKey(
return String.join(
"::",
this.spaceName,
StringUtils.join("conn-", jmsConnSeqNum),
StringUtils.join("session-", jmsSessionSeqNum));
@@ -381,65 +415,4 @@ public class S4JSpace implements AutoCloseable {
return jmsConnContext;
}
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long curCycle) {
return getOrCreateS4jJmsContextWrapper(curCycle, null);
}
// Get the next JMSContext Wrapper in the following approach
// - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection):
// c0s0, c0s1, c1s0, c1s1, c2s0, c2s1
// - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session
// When reaching the end of connection, move back to the first connection, but get the next session.
// e.g. first: c0s0 (0)
// next: c1s0 (1)
// next: c2s0 (2)
// next: c0s1 (3)
// next: c1s1 (4)
// next: c2s1 (5)
// next: c0s0 (6) <-- repeat the pattern
// next: c1s0 (7)
// next: c2s0 (8)
// next: c0s1 (9)
// ... ...
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(
long curCycle,
Map<String, Object> overrideS4jConfMap)
{
int totalConnNum = getMaxNumConn();
int totalSessionPerConnNum = getMaxNumSessionPerConn();
int connSeqNum = (int) curCycle % totalConnNum;
int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum;
String jmsConnContextIdStr = getConnLvlJmsContextIdentifier(connSeqNum);
JMSContext connLvlJmsContext = connLvlJmsContexts.get(jmsConnContextIdStr);
// Connection level JMSContext objects should be already created during the initialization phase
assert (connLvlJmsContext != null);
String jmsSessionContextIdStr = getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum);
S4JJMSContextWrapper jmsContextWrapper = sessionLvlJmsContexts.get(jmsSessionContextIdStr);
if (jmsContextWrapper == null) {
JMSContext jmsContext = null;
if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) {
jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode());
} else {
jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext(
connLvlJmsContext.getSessionMode(), overrideS4jConfMap);
}
jmsContextWrapper = new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext);
sessionLvlJmsContexts.put(jmsSessionContextIdStr, jmsContextWrapper);
if (logger.isDebugEnabled()) {
logger.debug("[Session level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsContextWrapper);
}
}
return jmsContextWrapper;
}
}

View File

@@ -81,13 +81,13 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
this.noLocal =
parsedOp.getStaticConfigOr("no_local", Boolean.FALSE);
this.readTimeout =
parsedOp.getStaticConfigOr("read_timeout", Integer.valueOf(0));
parsedOp.getStaticConfigOr("read_timeout", 0);
this.recvNoWait =
parsedOp.getStaticConfigOr("no_wait", Boolean.FALSE);
this.msgAckRatio =
parsedOp.getStaticConfigOr("msg_ack_ratio", Float.valueOf(1.0f));
parsedOp.getStaticConfigOr("msg_ack_ratio", 1.0f);
this.slowAckInSec =
parsedOp.getStaticConfigOr("slow_ack_in_sec", Integer.valueOf(0));
parsedOp.getStaticConfigOr("slow_ack_in_sec", 0);
this.localMsgSelectorFunc =
lookupOptionalStrOpValueFunc("msg_selector");
@@ -123,14 +123,13 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
@Override
public MessageConsumerOp apply(long cycle) {
S4JJMSContextWrapper s4JJMSContextWrapper =
s4jSpace.getOrCreateS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap);
S4JJMSContextWrapper s4JJMSContextWrapper = getS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransact = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
boolean commitTransact = super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
Destination destination;
try {
destination = getOrCreateJmsDestination(
destination = getJmsDestination(
s4JJMSContextWrapper, temporaryDest, destType, destNameStrFunc.apply(cycle));
}
catch (JMSRuntimeException jmsRuntimeException) {
@@ -139,7 +138,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
JMSConsumer jmsConsumer;
try {
jmsConsumer = getOrCreateJmsConsumer(
jmsConsumer = getJmsConsumer(
s4JJMSContextWrapper,
destination,
destType,

View File

@@ -172,7 +172,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
if (value != null) {
String destType = StringUtils.substringBefore(value, ':');
String destName = StringUtils.substringAfter(value, ':');
outMessage.setJMSReplyTo(getOrCreateJmsDestination(s4JJMSContextWrapper,false, destType, destName));
outMessage.setJMSReplyTo(getJmsDestination(s4JJMSContextWrapper,false, destType, destName));
}
}
// Ignore these headers - handled by S4J API automatically
@@ -279,13 +279,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
throw new S4JAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}
S4JJMSContextWrapper s4JJMSContextWrapper = s4jSpace.getOrCreateS4jJmsContextWrapper(cycle);
S4JJMSContextWrapper s4JJMSContextWrapper = getS4jJmsContextWrapper(cycle);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransaction = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
boolean commitTransaction = super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
Destination destination;
try {
destination = getOrCreateJmsDestination(s4JJMSContextWrapper, temporaryDest, destType, destName);
destination = getJmsDestination(s4JJMSContextWrapper, temporaryDest, destType, destName);
}
catch (JMSRuntimeException jmsRuntimeException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS destination!");
@@ -293,7 +293,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
JMSProducer producer;
try {
producer = getOrCreateJmsProducer(s4JJMSContextWrapper, asyncAPI);
producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI);
}
catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!");

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.adapter.s4j.dispensers;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.adapter.s4j.util.*;
@@ -30,7 +31,6 @@ import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -43,12 +43,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
protected final S4JSpace s4jSpace;
protected final S4JAdapterMetrics s4jAdapterMetrics;
private final ConcurrentHashMap<String, JMSContext> connLvlJmsContexts = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, S4JJMSContextWrapper> sessionLvlJmsContexts = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, Destination> jmsDestinations = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, JMSProducer> jmsProducers = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, JMSConsumer> jmsConsumers = new ConcurrentHashMap<>();
// Doc-level parameter: temporary_dest (default: false)
protected final boolean temporaryDest;
// Doc-level parameter: dest_type (default: Topic)
@@ -73,11 +67,9 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
this.parsedOp = op;
this.s4jSpace = s4jSpace;
this.connLvlJmsContexts.putAll(s4jSpace.getConnLvlJmsContexts());
this.sessionLvlJmsContexts.putAll(s4jSpace.getSessionLvlJmsContexts());
String defaultMetricsPrefix = parsedOp.getLabels().linearize("activity");
this.s4jAdapterMetrics = new S4JAdapterMetrics(defaultMetricsPrefix);
this.s4jAdapterMetrics = new S4JAdapterMetrics(this);
s4jAdapterMetrics.initS4JAdapterInstrumentation();
this.destNameStrFunc = destNameStrFunc;
@@ -102,7 +94,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
LongFunction<Boolean> booleanLongFunction;
booleanLongFunction = l -> parsedOp.getOptionalStaticConfig(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> BooleanUtils.toBoolean(value))
.map(BooleanUtils::toBoolean)
.orElse(defaultValue);
logger.info("{}: {}", paramName, booleanLongFunction.apply(0));
return booleanLongFunction;
@@ -133,7 +125,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
LongFunction<Integer> integerLongFunction;
integerLongFunction = l -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> NumberUtils.toInt(value))
.map(NumberUtils::toInt)
.map(value -> {
if (0 > value) return 0;
return value;
@@ -164,10 +156,71 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
return stringLongFunction;
}
public S4JJMSContextWrapper getS4jJmsContextWrapper(long curCycle) {
return getS4jJmsContextWrapper(curCycle, null);
}
// Get the next JMSContext Wrapper in the following approach
// - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection):
// c0s0, c0s1, c1s0, c1s1, c2s0, c2s1
// - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session
// When reaching the end of connection, move back to the first connection, but get the next session.
// e.g. first: c0s0 (0)
// next: c1s0 (1)
// next: c2s0 (2)
// next: c0s1 (3)
// next: c1s1 (4)
// next: c2s1 (5)
// next: c0s0 (6) <-- repeat the pattern
// next: c1s0 (7)
// next: c2s0 (8)
// next: c0s1 (9)
// ... ...
public S4JJMSContextWrapper getS4jJmsContextWrapper(
long curCycle,
Map<String, Object> overrideS4jConfMap)
{
int totalConnNum = s4jSpace.getMaxNumConn();
int totalSessionPerConnNum = s4jSpace.getMaxNumSessionPerConn();
int connSeqNum = (int) curCycle % totalConnNum;
int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum;
JMSContext connLvlJmsContext = s4jSpace.getConnLvlJMSContext(s4jSpace.getConnLvlJmsContextIdentifier(connSeqNum));
// Connection level JMSContext objects should be already created during the initialization phase
assert (connLvlJmsContext != null);
String jmsSessionContextIdStr = s4jSpace.getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum);
S4JSpace.JMSGenObjCacheKey jmsContextWrapperCacheKey =
new S4JSpace.JMSGenObjCacheKey(jmsSessionContextIdStr);
return s4jSpace.getS4JJMSContextWrapper(jmsContextWrapperCacheKey, () -> {
JMSContext jmsContext = null;
if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) {
jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode());
} else {
jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext(
connLvlJmsContext.getSessionMode(), overrideS4jConfMap);
}
S4JJMSContextWrapper s4JJMSContextWrapper =
new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext);
if (logger.isDebugEnabled()) {
logger.debug("[Session level JMSContext] {} -- {}",
Thread.currentThread().getName(),
s4JJMSContextWrapper);
}
return s4JJMSContextWrapper;
});
}
/**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/
public Destination getOrCreateJmsDestination(
public Destination getJmsDestination(
S4JJMSContextWrapper s4JJMSContextWrapper,
boolean tempDest,
String destType,
@@ -176,54 +229,58 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer();
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
// Regular, non-temporary destination
if (!tempDest) {
String destinationCacheKey = S4JAdapterUtil.buildCacheKey(jmsContextIdStr, destType, destName);
Destination destination = jmsDestinations.get(destinationCacheKey);
S4JSpace.JMSDestinationCacheKey destinationCacheKey =
new S4JSpace.JMSDestinationCacheKey(jmsContextIdStr, destType, destName);
if (null == destination) {
return s4jSpace.getJmsDestination(destinationCacheKey, () -> {
Destination destination;
// Regular, non-temporary destination
if (!tempDest) {
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
destination = jmsContext.createQueue(destName);
} else {
destination = jmsContext.createTopic(destName);
}
jmsDestinations.put(destinationCacheKey, destination);
}
// Temporary destination
else {
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
destination = jmsContext.createTemporaryQueue();
}
else {
destination = jmsContext.createTemporaryTopic();
}
}
return destination;
}
// Temporary destination
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
return jmsContext.createTemporaryQueue();
}
return jmsContext.createTemporaryTopic();
});
}
// Get simplified NB thread name
private String getSimplifiedNBThreadName(String fullThreadName) {
assert StringUtils.isNotBlank(fullThreadName);
if (StringUtils.contains(fullThreadName, '/')) return StringUtils.substringAfterLast(fullThreadName, "/");
return fullThreadName;
if (StringUtils.contains(fullThreadName, '/'))
return StringUtils.substringAfterLast(fullThreadName, "/");
else
return fullThreadName;
}
/**
* If the JMS producer that corresponds to a destination exists, reuse it; Otherwise, create it
*/
public JMSProducer getOrCreateJmsProducer(
public JMSProducer getJmsProducer(
S4JJMSContextWrapper s4JJMSContextWrapper,
boolean asyncApi) throws JMSException
{
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
String producerCacheKey = S4JAdapterUtil.buildCacheKey(
getSimplifiedNBThreadName(Thread.currentThread().getName()), "producer");
JMSProducer jmsProducer = jmsProducers.get(producerCacheKey);
S4JSpace.JMSGenObjCacheKey producerCacheKey =
new S4JSpace.JMSGenObjCacheKey(
String.join("::",
getSimplifiedNBThreadName(Thread.currentThread().getName()), "producer"));
if (null == jmsProducer) {
jmsProducer = jmsContext.createProducer();
return s4jSpace.getJmsProducer(producerCacheKey, () -> {
JMSProducer jmsProducer = jmsContext.createProducer();
if (asyncApi) {
jmsProducer.setAsync(new S4JCompletionListener(s4jSpace, this));
@@ -234,16 +291,14 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
producerCacheKey, jmsProducer, s4JJMSContextWrapper);
}
jmsProducers.put(producerCacheKey, jmsProducer);
}
return jmsProducer;
return jmsProducer;
});
}
/**
* If the JMS consumer that corresponds to a destination(, subscription, message selector) exists, reuse it; Otherwise, create it
*/
public JMSConsumer getOrCreateJmsConsumer(
public JMSConsumer getJmsConsumer(
S4JJMSContextWrapper s4JJMSContextWrapper,
Destination destination,
String destType,
@@ -258,11 +313,15 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
{
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean isTopic = StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.TOPIC.label);
String consumerCacheKey = S4JAdapterUtil.buildCacheKey(
getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer");
JMSConsumer jmsConsumer = jmsConsumers.get(consumerCacheKey);
if (null == jmsConsumer) {
S4JSpace.JMSGenObjCacheKey consumerCacheKey =
new S4JSpace.JMSGenObjCacheKey(
String.join("::",
getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer"));
return s4jSpace.getJmsConsumer(consumerCacheKey, () -> {
JMSConsumer jmsConsumer;
if (isTopic) {
if (!durable && !shared)
jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal);
@@ -271,13 +330,15 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
throw new RuntimeException("Subscription name is required for receiving messages from a durable or shared topic!");
}
if (durable && !shared) jmsConsumer = jmsContext.createDurableConsumer(
(Topic) destination, subName, msgSelector, nonLocal);
if (durable && !shared)
jmsConsumer = jmsContext.createDurableConsumer((Topic) destination, subName, msgSelector, nonLocal);
else if (!durable)
jmsConsumer = jmsContext.createSharedConsumer((Topic) destination, subName, msgSelector);
else jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) destination, subName, msgSelector);
else
jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) destination, subName, msgSelector);
}
} else {
}
else {
jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal);
}
@@ -291,10 +352,8 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
consumerCacheKey, jmsConsumer, s4JJMSContextWrapper);
}
jmsConsumers.put(consumerCacheKey, jmsConsumer);
}
return jmsConsumer;
return jmsConsumer;
});
}
protected boolean commitTransaction(int txnBatchNum, int jmsSessionMode, long curCycleNum) {
@@ -320,6 +379,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
s4jSpace.incTxnBatchTrackingCnt();
}
return !commitTransaction;
return commitTransaction;
}
}

View File

@@ -18,57 +18,41 @@ package io.nosqlbench.adapter.s4j.util;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.adapter.s4j.dispensers.S4JBaseOpDispenser;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S4JAdapterMetrics implements NBLabeledElement {
public class S4JAdapterMetrics {
private static final Logger logger = LogManager.getLogger("S4JAdapterMetrics");
private final String defaultAdapterMetricsPrefix;
private final S4JBaseOpDispenser s4jBaseOpDispenser;
private Histogram messageSizeHistogram;
private Timer bindTimer;
private Timer executeTimer;
public S4JAdapterMetrics(String defaultMetricsPrefix) {
this.defaultAdapterMetricsPrefix = defaultMetricsPrefix;
}
public String getName() {
return "S4JAdapterMetrics";
public S4JAdapterMetrics(final S4JBaseOpDispenser s4jBaseOpDispenser) {
this.s4jBaseOpDispenser = s4jBaseOpDispenser;
}
public void initS4JAdapterInstrumentation() {
// Histogram metrics
this.messageSizeHistogram =
ActivityMetrics.histogram(
this,
defaultAdapterMetricsPrefix + "message_size",
ActivityMetrics.DEFAULT_HDRDIGITS);
ActivityMetrics.histogram(this.s4jBaseOpDispenser,
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
// Timer metrics
this.bindTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "bind",
ActivityMetrics.DEFAULT_HDRDIGITS);
ActivityMetrics.timer(this.s4jBaseOpDispenser,
"bind", ActivityMetrics.DEFAULT_HDRDIGITS);
this.executeTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
ActivityMetrics.timer(this.s4jBaseOpDispenser,
"execute", ActivityMetrics.DEFAULT_HDRDIGITS);
}
public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return executeTimer; }
public Histogram getMessagesizeHistogram() { return messageSizeHistogram; }
@Override
public NBLabels getLabels() {
return NBLabels.forKV("name", getName());
}
}

View File

@@ -20,7 +20,6 @@ package io.nosqlbench.adapter.s4j.util;
import com.datastax.oss.pulsar.jms.PulsarJMSConstants;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.adapter.s4j.S4JOpType;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@@ -30,7 +29,9 @@ import javax.jms.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class S4JAdapterUtil {
@@ -66,12 +67,6 @@ public class S4JAdapterUtil {
this.label = label;
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidDocLevelParamList() {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// JMS Destination Types
public enum JMS_DEST_TYPES {
@@ -83,12 +78,6 @@ public class S4JAdapterUtil {
this.label = label;
}
}
public static boolean isValidJmsDestType(String type) {
return Arrays.stream(JMS_DEST_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String getValidJmsDestTypeList() {
return Arrays.stream(JMS_DEST_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// Standard JMS message headers (by JMS specification)
public enum JMS_MSG_HEADER_STD {
@@ -107,12 +96,16 @@ public class S4JAdapterUtil {
JMS_MSG_HEADER_STD(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isValidStdJmsMsgHeader(String header) {
return Arrays.stream(JMS_MSG_HEADER_STD.values()).anyMatch(t -> t.label.equals(header));
}
public static String getValidStdJmsMsgHeaderList() {
return Arrays.stream(JMS_MSG_HEADER_STD.values()).map(t -> t.label).collect(Collectors.joining(", "));
return JMS_MSG_HEADER_STD.isValidLabel(header);
}
// JMS defined message properties (by JMS specification)
@@ -133,12 +126,6 @@ public class S4JAdapterUtil {
this.label = label;
}
}
public static boolean isValidJmsDfndMsgProp(String property) {
return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).anyMatch(t -> t.label.equals(property));
}
public static String getValidJmsDfndMsgPropList() {
return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public final static String NB_MSG_SEQ_PROP = "NBMsgSeqProp";
public final static String NB_MSG_SIZE_PROP = "NBMsgSize";
@@ -155,12 +142,16 @@ public class S4JAdapterUtil {
JMS_SESSION_MODES(String label) {
this.label = label;
}
}
public static boolean isValidJmsSessionMode(String mode) {
return Arrays.stream(JMS_SESSION_MODES.values()).anyMatch(t -> t.label.equals(mode));
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static String getValidJmsSessionModeList() {
return Arrays.stream(JMS_SESSION_MODES.values()).map(t -> t.label).collect(Collectors.joining(", "));
return StringUtils.join(JMS_SESSION_MODES.LABELS, ", ");
}
// JMS Message Types
@@ -175,12 +166,16 @@ public class S4JAdapterUtil {
JMS_MESSAGE_TYPES(String label) {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static boolean isValidJmsMessageType(String type) {
return Arrays.stream(JMS_MESSAGE_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String getValidJmsMessageTypeList() {
return Arrays.stream(JMS_MESSAGE_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
return JMS_MESSAGE_TYPES.isValidLabel(type);
}
// JMS Message Types
@@ -198,12 +193,16 @@ public class S4JAdapterUtil {
JMS_MSG_PROP_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidJmsMsgPropType(String type) {
return Arrays.stream(JMS_MSG_PROP_TYPES.values()).anyMatch(t -> t.label.equals(type));
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(String label) {
return LABELS.contains(label);
}
}
public static String getValidJmsMsgPropTypeList() {
return Arrays.stream(JMS_MSG_PROP_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
return StringUtils.join(JMS_MESSAGE_TYPES.LABELS, ", ");
}
///////
@@ -306,19 +305,12 @@ public class S4JAdapterUtil {
return "";
}
///////
// Calculate a unique cache key from a series of input parameters
public static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
}
///////
// Pause the execution of the current thread
public static void pauseCurThreadExec(int pauseInSec) {
if (pauseInSec > 0) {
try {
Thread.sleep(pauseInSec * 1000);
Thread.sleep(pauseInSec * 1000L);
}
catch (InterruptedException ie) {
ie.printStackTrace();

View File

@@ -1,7 +1,15 @@
###########
# Overview: Starlight for JMS (S4J) API configuration items are listed at:
# https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options
enableTransaction=true
##
# NOTE 1: this requires first enabling corresponding server side configurations as listed in the following doc
# https://pulsar.apache.org/docs/3.0.x/txn-use/#steps
#
# NOTE 2: Astra streaming doesn't have transaction enabled (yet). Need to set this value to 'false'
# in order to successfully connect to AS.
##
enableTransaction=false
####
# S4J API specific configurations (non Pulsar specific) - jms.***
@@ -9,9 +17,23 @@ enableTransaction=true
jms.usePulsarAdmin=false
jms.precreateQueueSubscription=false
jms.enableClientSideEmulation=false
jms.useServerSideFiltering=true
# NOTE 1: this requires first enabling corresponding server side configurations as listed in the following doc
# https://docs.datastax.com/en/streaming/starlight-for-jms/3.2/examples/pulsar-jms-server-side-filters.html#enable-server-side-filtering
#
# NOTE 2: Astra streaming doesn't have server side filtering enabled (yet). Need to set this value to 'false'
# in order to successfully connect to AS.
jms.useServerSideFiltering=false
jms.useCredentialsFromCreateConnection=false
jms.transactionsStickyPartitions=true
##
# NOTE: When connecting to AS, the default "public" tenant is not available, you have to use a specific AS tenant.
# Otherwise, you'll get authorization error when trying to create topi under "public/default"
##
#jms.systemNamespace=<AS_tenant>/default
# for JMS priority
jms.enableJMSPriority=true
jms.priorityMapping=non-linear
@@ -26,8 +48,13 @@ jms.priorityMapping=non-linear
# directly used as S4J configuration settings, on 1-to-1 basis.
#--------------------------------------
client.connectionTimeoutMs=5000
##
# NOTE: when connecting to AS, make sure you enable the following settings and
# put here the corresponding token information
##
#client.authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
#client.authParams=
#client.authParams=token:<token_value>
#...
@@ -55,7 +82,11 @@ producer.blockIfQueueFull=true
consumer.receiverQueueSize=2000
consumer.acknowledgementsGroupTimeMicros=0
consumer.ackTimeoutMillis=2000
consumer.deadLetterPolicy={ "maxRedeliverCount":"5", "deadLetterTopic":"", "initialSubscriptionName":"" }
consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"50", "maxDelayMs":"100", "multiplier":"2.0"}
consumer.negativeAckRedeliveryBackoff={}
##
# The following settings are only needed for DLQ testing
##
#consumer.deadLetterPolicy={ "maxRedeliverCount":"5", "deadLetterTopic":"", "initialSubscriptionName":"" }
#consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"50", "maxDelayMs":"100", "multiplier":"2.0"}
#consumer.negativeAckRedeliveryBackoff={}
#...