Migrate NBS4J from NB4 to NB5

This commit is contained in:
yabinmeng 2022-12-06 17:24:52 -06:00
parent 2ca4320c24
commit b3d3f59d40
26 changed files with 3212 additions and 8 deletions

View File

@ -62,6 +62,13 @@
<version>${pulsar.version}</version> <version>${pulsar.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils --> <!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency> <dependency>
<groupId>commons-beanutils</groupId> <groupId>commons-beanutils</groupId>
@ -82,13 +89,6 @@
<artifactId>avro</artifactId> <artifactId>avro</artifactId>
<version>1.11.1</version> <version>1.11.1</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -73,7 +73,7 @@ public class PulsarClientConf {
// Convert the raw configuration map (<String,String>) to the required map (<String,Object>) // Convert the raw configuration map (<String,String>) to the required map (<String,Object>)
producerConfMapTgt.putAll(PulsarConfConverter.convertStdRawProducerConf(producerConfMapRaw)); producerConfMapTgt.putAll(PulsarConfConverter.convertStdRawProducerConf(producerConfMapRaw));
consumerConfMapTgt.putAll(PulsarConfConverter.convertStdRawConsumerConf(consumerConfMapRaw)); consumerConfMapTgt.putAll(PulsarConfConverter.convertStdRawConsumerConf(consumerConfMapRaw));
// TODO: Reader API is not disabled at the moment. Revisit when needed // TODO: Reader API is not enabled at the moment. Revisit when needed
} }

89
adapter-s4j/pom.xml Normal file
View File

@ -0,0 +1,89 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>adapter-s4j</artifactId>
<packaging>jar</packaging>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.31-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<name>${project.artifactId}</name>
<description>
A Starlight for JMS driver for nosqlbench. This provides the ability to inject synthetic data
into a pulsar system via JMS 2.0 compatible APIs.
NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster
as the potential JMS Destination
</description>
<properties>
<s4j.version>3.2.0</s4j.version>
</properties>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
</build>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.31-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.31-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.oss/pulsar-jms -->
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>pulsar-jms-all</artifactId>
<version>${s4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.8.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.conscrypt/conscrypt-openjdk -->
<dependency>
<groupId>org.conscrypt</groupId>
<artifactId>conscrypt-openjdk</artifactId>
<version>2.5.2</version>
<classifier>${os.detected.classifier}</classifier>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "s4j")
public class S4JDriverAdapter extends BaseDriverAdapter<S4JOp, S4JSpace> {
private final static Logger logger = LogManager.getLogger(S4JDriverAdapter.class);
@Override
public OpMapper<S4JOp> getOpMapper() {
DriverSpaceCache<? extends S4JSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new S4JOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends S4JSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new S4JSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(S4JSpace.getConfigModel());
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j;
import io.nosqlbench.adapter.s4j.dispensers.MessageConsumerOpDispenser;
import io.nosqlbench.adapter.s4j.dispensers.MessageProducerOpDispenser;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S4JOpMapper implements OpMapper<S4JOp> {
private final static Logger logger = LogManager.getLogger(S4JOpMapper.class);
private final NBConfiguration cfg;
private final DriverSpaceCache<? extends S4JSpace> spaceCache;
private final DriverAdapter adapter;
public S4JOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends S4JSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends S4JOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
S4JSpace s4jSpace = spaceCache.get(spaceName);
/*
* If the user provides a body element, then they want to provide the JSON or
* a data structure that can be converted into JSON, bypassing any further
* specialized type-checking or op-type specific features
*/
if (op.isDefined("body")) {
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
}
else {
TypeAndTarget<S4JOpType, String> opType = op.getTypeAndTarget(S4JOpType.class, String.class);
return switch (opType.enumId) {
case MessageProduce ->
new MessageProducerOpDispenser(adapter, op, opType.targetFunction, s4jSpace);
case MessageConsume ->
new MessageConsumerOpDispenser(adapter, op, opType.targetFunction, s4jSpace);
};
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j;
public enum S4JOpType {
// publishing/sending messages to a JMS queue or a topic
MessageProduce,
// consuming/receiving messages from a JMS queue or a topic
// for a topic, it can be:
// - non-durable, non-shared
// - durable, non-shared
// - non-durable, shared
// - durable, shared
MessageConsume;
}

View File

@ -0,0 +1,364 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.util.*;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class S4JSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(S4JSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
// - Each S4J space currently represents a number of JMS connections (\"num_conn\" NB CLI parameter);
// - JMS connection can have a number of JMS sessions (\"num_session\" NB CLI parameter).
// - Each JMS session has its own sets of JMS destinations, producers, consumers, etc.
private final ConcurrentHashMap<String, JMSContext> connLvlJmsContexts = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, S4JJMSContextWrapper> sessionLvlJmsContexts = new ConcurrentHashMap<>();
private final String pulsarSvcUrl;
private final String webSvcUrl;
private final String s4jClientConfFileName;
private S4JClientConf s4JClientConf;
private final int sessionMode;
// Whether to do strict error handling while sending/receiving messages
// - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop
// - No: pause the current thread that received the error message for 1 second and then continue processing
private boolean strictMsgErrorHandling;
// Maximum time length to execute S4J operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes
private long maxS4JOpTimeInSec;
private long s4JActivityStartTimeMills;
// Whether to keep track of the received message count, which includes
// - total received message count
// - received null message count (only relevant when non-blocking message receiving is used)
// By default, this setting is disabled
private boolean trackingMsgRecvCnt;
// How many JMS connections per NB S4J execution
private int maxNumConn;
// How many sessions per JMS connection
private int maxNumSessionPerConn;
// Total number of acknowledgement received
// - this can apply to both message production and consumption
// - for message consumption, this only applies to non-null messages received (which is for async API)
private final AtomicLong totalOpResponseCnt = new AtomicLong(0);
// Total number of null messages received
// - only applicable to message consumption
private final AtomicLong nullMsgRecvCnt = new AtomicLong(0);
// Keep track the transaction count per thread
private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
// Represents the JMS connection
private PulsarConnectionFactory s4jConnFactory;
private long totalCycleNum;
public S4JSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
this.pulsarSvcUrl = cfg.get("service_url");
this.webSvcUrl = cfg.get("web_url");
this.maxNumConn= cfg.getOrDefault("num_conn", Integer.valueOf(1));
this.maxNumSessionPerConn = cfg.getOrDefault("num_session", Integer.valueOf(1));
this.maxS4JOpTimeInSec= cfg.getOrDefault("max_s4jop_time", Long.valueOf(0));
this.trackingMsgRecvCnt=cfg.getOrDefault("track_msg_cnt", Boolean.FALSE);
this.strictMsgErrorHandling = cfg.getOrDefault("strict_msg_error_handling", Boolean.FALSE);
this.s4jClientConfFileName = cfg.get("config");
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(cfg.get("session_mode"));
this.s4JClientConf = new S4JClientConf(pulsarSvcUrl, webSvcUrl, s4jClientConfFileName);
this.initializeSpace(s4JClientConf);
}
@Override
public void close() {
shutdownSpace();
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(S4JSpace.class)
.add(Param.defaultTo("service_url", "pulsar://localhost:6650")
.setDescription("Pulsar broker service URL."))
.add(Param.defaultTo("web_url", "http://localhost:8080")
.setDescription("Pulsar web service URL."))
.add(Param.defaultTo("config", "config.properties")
.setDescription("Pulsar client connection configuration property file."))
.add(Param.defaultTo("num_conn", 1)
.setDescription("Number of JMS connections"))
.add(Param.defaultTo("num_session", 1)
.setDescription("Number of JMS sessions per JMS connection"))
.add(Param.defaultTo("max_s4jop_time", 0)
.setDescription("Maximum time (in seconds) to run NB S4J testing scenario."))
.add(Param.defaultTo("track_msg_cnt", false)
.setDescription("Whether to keep track of message count(s)"))
.add(Param.defaultTo("session_mode", "")
.setDescription("JMS session mode"))
.add(Param.defaultTo("strict_msg_error_handling", false)
.setDescription("Whether to do strict error handling which is to stop NB S4J execution."))
.asReadOnly();
}
public ConcurrentHashMap<String, JMSContext> getConnLvlJmsContexts() {
return connLvlJmsContexts;
}
public ConcurrentHashMap<String, S4JJMSContextWrapper> getSessionLvlJmsContexts() {
return sessionLvlJmsContexts;
}
public long getS4JActivityStartTimeMills() { return this.s4JActivityStartTimeMills; }
public void setS4JActivityStartTimeMills(long startTime) { this.s4JActivityStartTimeMills = startTime; }
public long getMaxS4JOpTimeInSec() { return this.maxS4JOpTimeInSec; }
public int getSessionMode() { return sessionMode; }
public String getS4jClientConfFileName() { return s4jClientConfFileName; }
public S4JClientConf getS4JClientConf() { return s4JClientConf; }
public boolean isTrackingMsgRecvCnt() { return trackingMsgRecvCnt; }
public int getMaxNumSessionPerConn() { return this.maxNumSessionPerConn; }
public int getMaxNumConn() { return this.maxNumConn; }
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
public int getTxnBatchTrackingCnt() { return txnBatchTrackingCnt.get(); }
public void incTxnBatchTrackingCnt() {
int curVal = getTxnBatchTrackingCnt();
txnBatchTrackingCnt.set(curVal + 1);
}
public long getTotalOpResponseCnt() { return totalOpResponseCnt.get();}
public long incTotalOpResponseCnt() { return totalOpResponseCnt.incrementAndGet();}
public void resetTotalOpResponseCnt() { totalOpResponseCnt.set(0); }
public long getTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.get();}
public void resetTotalNullMsgRecvdCnt() { nullMsgRecvCnt.set(0); }
public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); }
public PulsarConnectionFactory getS4jConnFactory() { return s4jConnFactory; }
public long getTotalCycleNum() { return totalCycleNum; }
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
public void initializeSpace(S4JClientConf s4JClientConnInfo) {
if (s4jConnFactory == null) {
Map<String, Object> cfgMap;
try {
cfgMap = s4JClientConnInfo.getS4jConfObjMap();
s4jConnFactory = new PulsarConnectionFactory(cfgMap);
for (int i=0; i<getMaxNumConn(); i++) {
// Establish a JMS connection
String connLvlJmsConnContextIdStr = getConnLvlJmsContextIdentifier(i);
String clientIdStr = Base64.getEncoder().encodeToString(connLvlJmsConnContextIdStr.getBytes());
JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
jmsConnContext.setClientID(clientIdStr);
jmsConnContext.setExceptionListener(e -> {
if (logger.isDebugEnabled()) {
logger.error("onException::Unexpected JMS error happened:" + e);
}
});
connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext);
if (logger.isDebugEnabled()) {
logger.debug("[Connection level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsConnContext );
}
}
}
catch (JMSRuntimeException e) {
if (logger.isDebugEnabled()) {
logger.debug("[ERROR] Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
}
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
}
catch (Exception e) {
e.printStackTrace();
}
}
}
public void shutdownSpace() {
long shutdownStartTimeMills = System.currentTimeMillis();
try {
waitUntilAllOpFinished(shutdownStartTimeMills);
this.txnBatchTrackingCnt.remove();
for (S4JJMSContextWrapper s4JJMSContextWrapper : sessionLvlJmsContexts.values()) {
if (s4JJMSContextWrapper != null) {
if (s4JJMSContextWrapper.isTransactedMode()) {
s4JJMSContextWrapper.getJmsContext().rollback();
}
s4JJMSContextWrapper.close();
}
}
for (JMSContext jmsContext : connLvlJmsContexts.values()) {
if (jmsContext != null) jmsContext.close();
}
s4jConnFactory.close();
}
catch (Exception e) {
e.printStackTrace();
throw new S4JAdapterUnexpectedException("Unexpected error when shutting down NB S4J space.");
}
}
// When completing NB execution, don't shut down right away because otherwise, async operation processing may fail.
// Instead, shut down when either one of the following condition is satisfied
// 1) the total number of the received operation response is the same as the total number of operations being executed;
// 2) time has passed for 10 seconds
private void waitUntilAllOpFinished(long shutdownStartTimeMills) {
long totalCycleNum = getTotalCycleNum();
long totalResponseCnt = 0;
long totalNullMsgCnt = 0;
long timeElapsedMills;
boolean trackingMsgCnt = isTrackingMsgRecvCnt();
boolean continueChk;
do {
S4JAdapterUtil.pauseCurThreadExec(1);
long curTimeMills = System.currentTimeMillis();
timeElapsedMills = curTimeMills - shutdownStartTimeMills;
continueChk = (timeElapsedMills <= 10000);
if (trackingMsgCnt) {
totalResponseCnt = this.getTotalOpResponseCnt();
totalNullMsgCnt = this.getTotalNullMsgRecvdCnt();
continueChk = continueChk && (totalResponseCnt < totalCycleNum);
}
if (logger.isTraceEnabled()) {
logger.trace(
buildExecSummaryString(trackingMsgCnt, timeElapsedMills, totalResponseCnt, totalNullMsgCnt));
}
} while (continueChk);
logger.info(
buildExecSummaryString(trackingMsgCnt, timeElapsedMills, totalResponseCnt, totalNullMsgCnt));
}
private String buildExecSummaryString(
boolean trackingMsgCnt,
long timeElapsedMills,
long totalResponseCnt,
long totalNullMsgCnt)
{
StringBuilder stringBuilder = new StringBuilder();
stringBuilder
.append("shutdownSpace::waitUntilAllOpFinished -- ")
.append("shutdown time elapsed: ").append(timeElapsedMills).append("ms; ");
if (trackingMsgCnt) {
stringBuilder.append("response received: ").append(totalResponseCnt).append("; ");
stringBuilder.append("null msg received: ").append(totalNullMsgCnt).append("; ");
}
return stringBuilder.toString();
}
public void processMsgAck(JMSContext jmsContext, Message message, float msgAckRatio, int slowAckInSec) throws JMSException {
int jmsSessionMode = jmsContext.getSessionMode();
if ((jmsSessionMode != Session.AUTO_ACKNOWLEDGE) &&
(jmsSessionMode != Session.SESSION_TRANSACTED)) {
float rndVal = RandomUtils.nextFloat(0, 1);
if (rndVal < msgAckRatio) {
S4JAdapterUtil.pauseCurThreadExec(slowAckInSec);
message.acknowledge();
}
}
}
public String getConnLvlJmsContextIdentifier(int jmsConnSeqNum) {
return S4JAdapterUtil.buildCacheKey(
this.spaceName,
StringUtils.join("conn-", jmsConnSeqNum));
}
public String getSessionLvlJmsContextIdentifier(int jmsConnSeqNum, int jmsSessionSeqNum) {
return S4JAdapterUtil.buildCacheKey(
this.spaceName,
StringUtils.join("conn-", jmsConnSeqNum),
StringUtils.join("session-", jmsSessionSeqNum));
}
// Create JMSContext that represents a new JMS connection
public JMSContext getOrCreateConnLvlJMSContext(
PulsarConnectionFactory s4jConnFactory,
S4JClientConf s4JClientConf,
int sessionMode)
{
boolean useCredentialsEnable = S4JAdapterUtil.isUseCredentialsEnabled(s4JClientConf);
JMSContext jmsConnContext;
if (!useCredentialsEnable)
jmsConnContext = s4jConnFactory.createContext(sessionMode);
else {
String userName = S4JAdapterUtil.getCredentialUserName(s4JClientConf);
String passWord = S4JAdapterUtil.getCredentialPassword(s4JClientConf);
// Password must be in "token:<token vale>" format
if (! StringUtils.startsWith(passWord, "token:")) {
throw new S4JAdapterInvalidParamException(
"When 'jms.useCredentialsFromCreateConnection' is enabled, " +
"the provided password must be in format 'token:<token_value_...> ");
}
jmsConnContext = s4jConnFactory.createContext(userName, passWord, sessionMode);
}
return jmsConnContext;
}
}

View File

@ -0,0 +1,162 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j.dispensers;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.ops.MessageConsumerOp;
import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil;
import io.nosqlbench.adapter.s4j.util.S4JJMSContextWrapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageConsumerOpDispenser");
// Doc-level parameter: blocking_msg_recv (default: false)
protected final boolean blockingMsgRecv;
// Doc-level parameter: shared_topic (default: false)
// - only applicable to Topic as the destination type
protected final boolean sharedTopic;
// Doc-level parameter: durable_topic (default: false)
// - only applicable to Topic as the destination type
protected final boolean durableTopic;
// default value: false
private final boolean noLocal;
// default value: 0
// value <= 0 : no timeout
private final int readTimeout;
// default value: false
private final boolean recvNoWait;
// default value: 1.0 (all received messages are acknowledged)
// value must be between 0 and 1 (inclusive)
private final float msgAckRatio;
// default value: 0
// value <= 0 : no slow message ack
private final int slowAckInSec;
private final LongFunction<String> subNameStrFunc;
private final LongFunction<String> localMsgSelectorFunc;
// Generally the consumer related configurations can be set in the global "config.properties" file,
// which can be applied to many testing scenarios.
// Setting them here will allow scenario-specific customer configurations. At the moment, only the
// DLT related settings are supported
private final Map<String, Object> combinedConsumerConfigObjMap = new HashMap<>();
public MessageConsumerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
S4JSpace s4jSpace) {
super(adapter, op, tgtNameFunc, s4jSpace);
this.blockingMsgRecv =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.BLOCKING_MSG_RECV.label, Boolean.FALSE);
this.sharedTopic =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.SHARED_TOPIC.label, Boolean.FALSE);
this.durableTopic =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.DURABLE_TOPIC.label, Boolean.FALSE);
this.noLocal =
parsedOp.getStaticConfigOr("no_local", Boolean.FALSE);
this.readTimeout =
parsedOp.getStaticConfigOr("read_timeout", Integer.valueOf(0));
this.recvNoWait =
parsedOp.getStaticConfigOr("no_wait", Boolean.FALSE);
this.msgAckRatio =
parsedOp.getStaticConfigOr("msg_ack_ratio", Float.valueOf(1.0f));
this.slowAckInSec =
parsedOp.getStaticConfigOr("slow_ack_in_sec", Integer.valueOf(0));
this.subNameStrFunc =
lookupMandtoryStrOpValueFunc("subscription_name");
this.localMsgSelectorFunc =
lookupOptionalStrOpValueFunc("msg_selector");
String[] stmtLvlConsumerConfKeyNameList = {
"consumer.ackTimeoutMillis",
"consumer.deadLetterPolicy",
"consumer.negativeAckRedeliveryBackoff",
"consumer.ackTimeoutRedeliveryBackoff"};
HashMap<String, String> stmtLvlConsumerConfRawMap = new HashMap<>();
for (String confKey : stmtLvlConsumerConfKeyNameList ) {
String confVal = parsedOp.getStaticConfigOr(confKey, "");
stmtLvlConsumerConfRawMap.put(
StringUtils.substringAfter(confKey, "consumer."),
confVal);
}
this.combinedConsumerConfigObjMap.putAll(
s4jSpace.getS4JClientConf().mergeExtraConsumerConfig(stmtLvlConsumerConfRawMap));
}
@Override
public MessageConsumerOp apply(long cycle) {
S4JJMSContextWrapper s4JJMSContextWrapper =
getOrCreateS4jJmsContextWrapper(cycle, this.combinedConsumerConfigObjMap);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransact = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
Destination destination;
try {
destination = getOrCreateJmsDestination(
s4JJMSContextWrapper, temporaryDest, destType, destNameStrFunc.apply(cycle));
}
catch (JMSRuntimeException jmsRuntimeException) {
throw new RuntimeException("Unable to create the JMS destination!");
}
JMSConsumer jmsConsumer;
try {
jmsConsumer = getOrCreateJmsConsumer(
s4JJMSContextWrapper,
destination,
destType,
subNameStrFunc.apply(cycle),
localMsgSelectorFunc.apply(cycle),
msgAckRatio,
noLocal,
durableTopic,
sharedTopic,
asyncAPI,
slowAckInSec);
}
catch (JMSException jmsException) {
throw new RuntimeException("Unable to create the JMS consumer!");
}
return new MessageConsumerOp(
s4jAdapterMetrics,
s4jSpace,
jmsContext,
destination,
asyncAPI,
commitTransact,
jmsConsumer,
blockingMsgRecv,
msgAckRatio,
readTimeout,
recvNoWait,
slowAckInSec);
}
}

View File

@ -0,0 +1,358 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j.dispensers;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.ops.MessageProducerOp;
import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil;
import io.nosqlbench.adapter.s4j.util.S4JJMSContextWrapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongFunction;
public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
public static final String MSG_HEADER_OP_PARAM = "msg_header";
public static final String MSG_PROP_OP_PARAM = "msg_property";
public static final String MSG_BODY_OP_PARAM = "msg_body";
public static final String MSG_TYPE_OP_PARAM = "msg_type";
private final LongFunction<String> msgHeaderRawJsonStrFunc;
private final LongFunction<String> msgPropRawJsonStrFunc;
private final LongFunction<String> msgBodyRawJsonStrFunc;
private final LongFunction<String> msgTypeFunc;
public MessageProducerOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> tgtNameFunc,
S4JSpace s4jSpace) {
super(adapter, op, tgtNameFunc, s4jSpace);
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgTypeFunc = lookupMandtoryStrOpValueFunc(MSG_TYPE_OP_PARAM);
}
private Message createAndSetMessagePayload(
S4JJMSContextWrapper s4JJMSContextWrapper,
String msgType, String msgBodyRawJsonStr) throws JMSException
{
Message message;
int messageSize = 0;
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label)) {
message = jmsContext.createTextMessage();
((TextMessage) message).setText(msgBodyRawJsonStr);
messageSize = msgBodyRawJsonStr.length();
} else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.MAP.label)) {
message = jmsContext.createMapMessage();
// The message body json string must be in the format of a collection of key/value pairs
// Otherwise, it is an error
Map<String, String> jmsMsgBodyMap;
try {
jmsMsgBodyMap = S4JAdapterUtil.convertJsonToMap(msgBodyRawJsonStr);
} catch (Exception e) {
throw new RuntimeException("The specified message payload can't be converted to a map when requiring a 'Map' message type!");
}
for (String key : jmsMsgBodyMap.keySet()) {
String value = jmsMsgBodyMap.get(key);
((MapMessage)message).setString(key, value);
messageSize += key.length();
messageSize += value.length();
}
} else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.STREAM.label)) {
message = jmsContext.createStreamMessage();
// The message body json string must be in the format of a list of objects
// Otherwise, it is an error
List<Object> jmsMsgBodyObjList;
try {
jmsMsgBodyObjList = S4JAdapterUtil.convertJsonToObjList(msgBodyRawJsonStr);
} catch (Exception e) {
throw new RuntimeException("The specified message payload can't be converted to a list of Objects when requiring a 'Stream' message type!");
}
for (Object obj : jmsMsgBodyObjList) {
((StreamMessage)message).writeObject(obj);
messageSize += ((String)obj).length();
}
} else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.OBJECT.label)) {
message = jmsContext.createObjectMessage();
((ObjectMessage) message).setObject(msgBodyRawJsonStr);
messageSize += msgBodyRawJsonStr.getBytes().length;
}
// default: BYTE message type
else {
message = jmsContext.createBytesMessage();
byte[] msgBytePayload = msgBodyRawJsonStr.getBytes();
((BytesMessage)message).writeBytes(msgBytePayload);
messageSize += msgBytePayload.length;
}
message.setStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize));
return message;
}
private Message updateMessageHeaders(S4JJMSContextWrapper s4JJMSContextWrapper, Message message, String msgType, String msgHeaderRawJsonStr) throws JMSException {
int messageSize = Integer.parseInt(message.getStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP));
// Check if msgHeaderRawJsonStr is a valid JSON string with a collection of key/value pairs
// - if Yes, convert it to a map
// - otherwise, log an error message and ignore message headers without throwing a runtime exception
Map<String, String> jmsMsgHeaders = new HashMap<>();
if (!StringUtils.isBlank(msgHeaderRawJsonStr)) {
try {
jmsMsgHeaders = S4JAdapterUtil.convertJsonToMap(msgHeaderRawJsonStr);
} catch (Exception e) {
logger.warn(
"Error parsing message header JSON string {}, ignore message headers!",
msgHeaderRawJsonStr);
}
}
// make sure the actual message type is used
jmsMsgHeaders.put(S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSType.label, msgType);
Message outMessage = message;
for (String msgHeaderKey:jmsMsgHeaders.keySet()) {
// Ignore non-standard message headers
if (S4JAdapterUtil.isValidStdJmsMsgHeader(msgHeaderKey)) {
String value = jmsMsgHeaders.get(msgHeaderKey);
messageSize += msgHeaderKey.length();
if (value != null) {
messageSize += value.length();
}
try {
if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSType.label)) {
outMessage.setJMSType(msgType);
} else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSPriority.label)) {
if (value != null) outMessage.setJMSPriority(Integer.parseInt(value));
} else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSDeliveryMode.label)) {
if (value != null) outMessage.setJMSDeliveryMode(Integer.parseInt(value));
} else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSExpiration.label)) {
// TODO: convert from a Date/Time string to the required long value
if (value != null) outMessage.setJMSExpiration(Long.parseLong(value));
} else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSCorrelationID.label)) {
if (value != null) outMessage.setJMSCorrelationID(value);
} else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSReplyTo.label)) {
// 'JMSReplyTo' value format: "[topic|queue]:<destination_name>"
if (value != null) {
String destType = StringUtils.substringBefore(value, ':');
String destName = StringUtils.substringAfter(value, ':');
outMessage.setJMSReplyTo(getOrCreateJmsDestination(s4JJMSContextWrapper,false, destType, destName));
}
}
// Ignore these headers - handled by S4J API automatically
/* else if (StringUtils.equalsAnyIgnoreCase(msgHeaderKey,
S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSDestination.label,
S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSMessageID.label,
S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSTimestamp.label,
S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSRedelivered.label
)) {
}*/
} catch (NumberFormatException nfe) {
logger.warn("Incorrect value format ('{}') for the message header field ('{}')!",
value, msgHeaderKey);
}
}
}
outMessage.setStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize));
return outMessage;
}
private Message updateMessageProperties(Message message, String msgPropertyRawJsonStr) throws JMSException {
int messageSize = Integer.parseInt(message.getStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP));
// Check if jmsMsgPropertyRawJsonStr is a valid JSON string with a collection of key/value pairs
// - if Yes, convert it to a map
// - otherwise, log an error message and ignore message headers without throwing a runtime exception
Map<String, String> jmsMsgProperties = new HashMap<>();
if (!StringUtils.isBlank(msgPropertyRawJsonStr)) {
try {
jmsMsgProperties = S4JAdapterUtil.convertJsonToMap(msgPropertyRawJsonStr);
} catch (Exception e) {
logger.warn(
"Error parsing message property JSON string {}, ignore message properties!",
msgPropertyRawJsonStr);
}
}
// Each key in the property json file may include value type information, such as:
// - key(string): value
// The above format specifies a message property that has "key" as the property key
// and "value" as the property value; and the type of the property value is "string"
//
// If the value type is not specified, use "string" as the default value type.
for (Map.Entry<String, String> entry : jmsMsgProperties.entrySet()) {
String rawKeyStr = entry.getKey();
String value = entry.getValue();
if (! StringUtils.isAnyBlank(rawKeyStr, value)) {
String key = rawKeyStr;
String valueType = S4JAdapterUtil.JMS_MSG_PROP_TYPES.STRING.label;
if (StringUtils.contains(rawKeyStr, '(')) {
key = StringUtils.substringBefore(rawKeyStr, "(").trim();
valueType = StringUtils.substringAfter(rawKeyStr, "(");
valueType = StringUtils.substringBefore(valueType, ")").trim();
}
if (StringUtils.isBlank(valueType)) {
message.setStringProperty(entry.getKey(), value);
}
else {
if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.SHORT.label))
message.setShortProperty(key, NumberUtils.toShort(value));
else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.INT.label))
message.setIntProperty(key, NumberUtils.toInt(value));
else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.LONG.label))
message.setLongProperty(key, NumberUtils.toLong(value));
else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.FLOAT.label))
message.setFloatProperty(key, NumberUtils.toFloat(value));
else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.DOUBLE.label))
message.setDoubleProperty(key, NumberUtils.toDouble(value));
else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.BOOLEAN.label))
message.setBooleanProperty(key, BooleanUtils.toBoolean(value));
else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.STRING.label))
message.setStringProperty(key, value);
else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.BYTE.label))
message.setByteProperty(key, NumberUtils.toByte(value));
else
throw new S4JAdapterInvalidParamException(
"Unsupported JMS message property value type (\"" + valueType + "\"). " +
"Value types are: \"" + S4JAdapterUtil.getValidJmsMsgPropTypeList() + "\"");
}
messageSize += key.length();
messageSize += value.length();
}
}
message.setStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize));
return message;
}
@Override
public MessageProducerOp apply(long cycle) {
String destName = destNameStrFunc.apply(cycle);
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle);
if (StringUtils.isBlank(jmsMsgBodyRawJsonStr)) {
throw new S4JAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}
S4JJMSContextWrapper s4JJMSContextWrapper = getOrCreateS4jJmsContextWrapper(cycle);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransaction = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);
Destination destination;
try {
destination = getOrCreateJmsDestination(s4JJMSContextWrapper, temporaryDest, destType, destName);
}
catch (JMSRuntimeException jmsRuntimeException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS destination!");
}
JMSProducer producer;
try {
producer = getOrCreateJmsProducer(s4JJMSContextWrapper, asyncAPI);
}
catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!");
}
// Get the right JMS message type
String jmsMsgType = msgTypeFunc.apply(cycle);
if (! S4JAdapterUtil.isValidJmsMessageType(jmsMsgType) ) {
logger.warn(
"The specified JMS message type {} is not valid, use the default TextMessage type!",
jmsMsgType);
jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label;
}
/////////////
// Set proper message payload based on the message type and the specified input
// -----------------------
//
Message message;
try {
message = createAndSetMessagePayload(s4JJMSContextWrapper, jmsMsgType, jmsMsgBodyRawJsonStr);
}
catch (JMSException jmsException) {
throw new RuntimeException("Failed to set create a JMS message and set its payload!");
}
/////////////
// Set standard message headers
// -----------------------
//
try {
message = updateMessageHeaders(s4JJMSContextWrapper, message, jmsMsgType, jmsMsgHeaderRawJsonStr);
}
catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Failed to set create a JMS message and set its payload!");
}
/////////////
// Set defined JMS message properties and other custom properties
// -----------------------
//
try {
message = updateMessageProperties(message, jmsMsgPropertyRawJsonStr);
// for testing purpose
message.setLongProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP, cycle);
}
catch (JMSException jmsException) {
throw new S4JAdapterUnexpectedException("Failed to set JMS message properties!");
}
return new MessageProducerOp(
s4jAdapterMetrics,
s4jSpace,
jmsContext,
destination,
asyncAPI,
commitTransaction,
producer,
message);
}
}

View File

@ -0,0 +1,395 @@
package io.nosqlbench.adapter.s4j.dispensers;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.ops.S4JOp;
import io.nosqlbench.adapter.s4j.util.*;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpace> {
private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser");
protected final ParsedOp parsedOp;
protected final S4JSpace s4jSpace;
protected final S4JAdapterMetrics s4jAdapterMetrics;
private final ConcurrentHashMap<String, JMSContext> connLvlJmsContexts = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, S4JJMSContextWrapper> sessionLvlJmsContexts = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, Destination> jmsDestinations = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, JMSProducer> jmsProducers = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, JMSConsumer> jmsConsumers = new ConcurrentHashMap<>();
// Doc-level parameter: temporary_dest (default: false)
protected final boolean temporaryDest;
// Doc-level parameter: dest_type (default: Topic)
protected final String destType;
// Doc-level parameter: async_api (default: true)
protected final boolean asyncAPI;
// Doc-level parameter: txn_batch_num (default: 0)
// - value <=0 : no transaction
protected final int txnBatchNum;
protected final LongFunction<String> destNameStrFunc;
protected final int totalThreadNum;
protected final long totalCycleNum;
public S4JBaseOpDispenser(DriverAdapter adapter,
ParsedOp op,
LongFunction<String> destNameStrFunc,
S4JSpace s4jSpace) {
super(adapter, op);
this.parsedOp = op;
this.s4jSpace = s4jSpace;
this.connLvlJmsContexts.putAll(s4jSpace.getConnLvlJmsContexts());
this.sessionLvlJmsContexts.putAll(s4jSpace.getSessionLvlJmsContexts());
String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp);
this.s4jAdapterMetrics = new S4JAdapterMetrics(defaultMetricsPrefix);
s4jAdapterMetrics.initS4JAdapterInstrumentation();
this.destNameStrFunc = destNameStrFunc;
this.temporaryDest =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TEMP_DEST.label, Boolean.FALSE);
this.destType =
parsedOp.getStaticConfig(S4JAdapterUtil.DOC_LEVEL_PARAMS.DEST_TYPE.label, String.class);
this.asyncAPI =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, Boolean.TRUE);
this.txnBatchNum =
parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0));
this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticValue("threads"));
this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticValue("cycles"));
s4jSpace.setTotalCycleNum(totalCycleNum);
}
public S4JSpace getS4jSpace() { return s4jSpace; }
public S4JAdapterMetrics getS4jAdapterMetrics() { return s4jAdapterMetrics; }
protected LongFunction<Boolean> lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) {
LongFunction<Boolean> booleanLongFunction;
booleanLongFunction = (l) -> parsedOp.getOptionalStaticConfig(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> BooleanUtils.toBoolean(value))
.orElse(defaultValue);
logger.info("{}: {}", paramName, booleanLongFunction.apply(0));
return booleanLongFunction;
}
protected LongFunction<Set<String>> lookupStaticStrSetOpValueFunc(String paramName) {
LongFunction<Set<String>> setStringLongFunction;
setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> {
Set<String > set = new HashSet<>();
if (StringUtils.contains(value,',')) {
set = Arrays.stream(value.split(","))
.map(String::trim)
.filter(Predicate.not(String::isEmpty))
.collect(Collectors.toCollection(LinkedHashSet::new));
}
return set;
}).orElse(Collections.emptySet());
logger.info("{}: {}", paramName, setStringLongFunction.apply(0));
return setStringLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<Integer> lookupStaticIntOpValueFunc(String paramName, int defaultValue) {
LongFunction<Integer> integerLongFunction;
integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class)
.filter(Predicate.not(String::isEmpty))
.map(value -> NumberUtils.toInt(value))
.map(value -> {
if (value < 0) return 0;
else return value;
}).orElse(defaultValue);
logger.info("{}: {}", paramName, integerLongFunction.apply(0));
return integerLongFunction;
}
// If the corresponding Op parameter is not provided, use the specified default value
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName, String defaultValue) {
LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class)
.orElse((l) -> defaultValue);
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction;
}
protected LongFunction<String> lookupOptionalStrOpValueFunc(String paramName) {
return lookupOptionalStrOpValueFunc(paramName, "");
}
// Mandatory Op parameter. Throw an error if not specified or having empty value
protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) {
LongFunction<String> stringLongFunction;
stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class);
logger.info("{}: {}", paramName, stringLongFunction.apply(0));
return stringLongFunction;
}
// Get the next JMSContext Wrapper in the following approach
// - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection):
// c0s0, c0s1, c1s0, c1s1, c2s0, c2s1
// - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session
// When reaching the end of connection, move back to the first connection, but get the next session.
// e.g. first: c0s0 (0)
// next: c1s0 (1)
// next: c2s0 (2)
// next: c0s1 (3)
// next: c1s1 (4)
// next: c2s1 (5)
// next: c0s0 (6) <-- repeat the pattern
// next: c1s0 (7)
// next: c2s0 (8)
// next: c0s1 (9)
// ... ...
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(
long curCycle,
Map<String, Object> overrideS4jConfMap)
{
int totalConnNum = s4jSpace.getMaxNumConn();
int totalSessionPerConnNum = s4jSpace.getMaxNumSessionPerConn();
int connSeqNum = (int) curCycle % totalConnNum;
int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum;
String jmsConnContextIdStr = s4jSpace.getConnLvlJmsContextIdentifier(connSeqNum);
JMSContext connLvlJmsContext = connLvlJmsContexts.get(jmsConnContextIdStr);
// Connection level JMSContext objects should be already created during the initialization phase
assert (connLvlJmsContext != null);
String jmsSessionContextIdStr = s4jSpace.getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum);
S4JJMSContextWrapper jmsContextWrapper = sessionLvlJmsContexts.get(jmsSessionContextIdStr);
if (jmsContextWrapper == null) {
JMSContext jmsContext = null;
if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) {
jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode());
} else {
jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext(
connLvlJmsContext.getSessionMode(), overrideS4jConfMap);
}
jmsContextWrapper = new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext);
sessionLvlJmsContexts.put(jmsSessionContextIdStr, jmsContextWrapper);
if (logger.isDebugEnabled()) {
logger.debug("[Session level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsContextWrapper);
}
}
return jmsContextWrapper;
}
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long curCycle) {
return getOrCreateS4jJmsContextWrapper(curCycle, null);
}
/**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/
public Destination getOrCreateJmsDestination(
S4JJMSContextWrapper s4JJMSContextWrapper,
boolean tempDest,
String destType,
String destName) throws JMSRuntimeException
{
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer();
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
// Regular, non-temporary destination
if (!tempDest) {
String destinationCacheKey = S4JAdapterUtil.buildCacheKey(jmsContextIdStr, destType, destName);
Destination destination = jmsDestinations.get(destinationCacheKey);
if (destination == null) {
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
destination = jmsContext.createQueue(destName);
} else {
destination = jmsContext.createTopic(destName);
}
jmsDestinations.put(destinationCacheKey, destination);
}
return destination;
}
// Temporary destination
else {
if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) {
return jmsContext.createTemporaryQueue();
} else {
return jmsContext.createTemporaryTopic();
}
}
}
// Get simplified NB thread name
private String getSimplifiedNBThreadName(String fullThreadName) {
assert (StringUtils.isNotBlank(fullThreadName));
if (StringUtils.contains(fullThreadName, '/'))
return StringUtils.substringAfterLast(fullThreadName, "/");
else
return fullThreadName;
}
/**
* If the JMS producer that corresponds to a destination exists, reuse it; Otherwise, create it
*/
public JMSProducer getOrCreateJmsProducer(
S4JJMSContextWrapper s4JJMSContextWrapper,
boolean asyncApi) throws JMSException
{
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
String producerCacheKey = S4JAdapterUtil.buildCacheKey(
getSimplifiedNBThreadName(Thread.currentThread().getName()), "producer");
JMSProducer jmsProducer = jmsProducers.get(producerCacheKey);
if (jmsProducer == null) {
jmsProducer = jmsContext.createProducer();
if (asyncApi) {
jmsProducer.setAsync(new S4JCompletionListener(s4jSpace, this));
}
if (logger.isDebugEnabled()) {
logger.debug("Producer created: {} -- {} -- {}",
producerCacheKey, jmsProducer, s4JJMSContextWrapper);
}
jmsProducers.put(producerCacheKey, jmsProducer);
}
return jmsProducer;
}
/**
* If the JMS consumer that corresponds to a destination(, subscription, message selector) exists, reuse it; Otherwise, create it
*/
public JMSConsumer getOrCreateJmsConsumer(
S4JJMSContextWrapper s4JJMSContextWrapper,
Destination destination,
String destType,
String subName,
String msgSelector,
float msgAckRatio,
boolean nonLocal,
boolean durable,
boolean shared,
boolean asyncApi,
int slowAckInSec) throws JMSException
{
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean isTopic = StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.TOPIC.label);
String consumerCacheKey = S4JAdapterUtil.buildCacheKey(
getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer");
JMSConsumer jmsConsumer = jmsConsumers.get(consumerCacheKey);
if (jmsConsumer == null) {
if (isTopic) {
if (!durable && !shared)
jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal);
else {
if (StringUtils.isBlank(subName)) {
throw new RuntimeException("Subscription name is required for receiving messages from a durable or shared topic!");
}
if (durable && !shared)
jmsConsumer = jmsContext.createDurableConsumer((Topic) destination, subName, msgSelector, nonLocal);
else if (!durable)
jmsConsumer = jmsContext.createSharedConsumer((Topic) destination, subName, msgSelector);
else
jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) destination, subName, msgSelector);
}
}
else {
jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal);
}
if (asyncApi) {
jmsConsumer.setMessageListener(
new S4JMessageListener(jmsContext, s4jSpace, this, msgAckRatio, slowAckInSec));
}
if (logger.isDebugEnabled()) {
logger.debug("Consumer created: {} -- {} -- {}",
consumerCacheKey, jmsConsumer, s4JJMSContextWrapper);
}
jmsConsumers.put(consumerCacheKey, jmsConsumer);
}
return jmsConsumer;
}
protected boolean commitTransaction(int txnBatchNum, int jmsSessionMode, long curCycleNum) {
// Whether to commit the transaction which happens when:
// - session mode is equal to "SESSION_TRANSACTED"
// - "txn_batch_num" has been reached since last reset
boolean commitTransaction = ( (Session.SESSION_TRANSACTED == jmsSessionMode) && (txnBatchNum > 0) );
if (commitTransaction) {
int txnBatchTackingCnt = s4jSpace.getTxnBatchTrackingCnt();
if ( ( (txnBatchTackingCnt > 0) && ((txnBatchTackingCnt % txnBatchNum) == 0) ) ||
( curCycleNum >= (totalCycleNum - 1) ) ) {
if (logger.isDebugEnabled()) {
logger.debug("Commit transaction ({}, {}, {})",
txnBatchTackingCnt,
s4jSpace.getTotalOpResponseCnt(), curCycleNum);
}
}
else {
commitTransaction = false;
}
s4jSpace.incTxnBatchTrackingCnt();
}
return !commitTransaction;
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.s4j.exception;
public class S4JAdapterAsyncOperationFailedException extends RuntimeException {
public S4JAdapterAsyncOperationFailedException(Throwable t) {
super(t);
printStackTrace();
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.s4j.exception;
public class S4JAdapterInvalidParamException extends RuntimeException {
public S4JAdapterInvalidParamException(String paramName, String errDesc) {
super("Invalid setting for parameter (" + paramName + "): " + errDesc);
}
public S4JAdapterInvalidParamException(String fullErrDesc) {
super(fullErrDesc);
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.s4j.exception;
public class S4JAdapterUnexpectedException extends RuntimeException {
public S4JAdapterUnexpectedException(String message) {
super(message);
printStackTrace();
}
public S4JAdapterUnexpectedException(Exception e) {
super(e);
printStackTrace();
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.s4j.exception;
public class S4JAdapterUnsupportedOpException extends RuntimeException {
public S4JAdapterUnsupportedOpException(String pulsarOpType) {
super("Unsupported Pulsar adapter operation type: \"" + pulsarOpType + "\"");
}
}

View File

@ -0,0 +1,135 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j.ops;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterAsyncOperationFailedException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.util.S4JAdapterMetrics;
import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.shade.org.apache.avro.AvroRuntimeException;
import javax.jms.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class MessageConsumerOp extends S4JOp {
private final static Logger logger = LogManager.getLogger(MessageConsumerOp.class);
private final JMSConsumer jmsConsumer;
private final boolean blockingMsgRecv;
private final float msgAckRatio;
private final long msgReadTimeout;
private final boolean recvNoWait;
private final int slowInSec;
public MessageConsumerOp(S4JAdapterMetrics s4jAdapterMetrics,
S4JSpace s4jSpace,
JMSContext jmsContext,
Destination destination,
boolean asyncApi,
boolean commitTransact,
JMSConsumer jmsConsumer,
boolean blockingMsgRecv,
float msgAckRatio,
long readTimeout,
boolean recvNoWait,
int slowInSec)
{
super(s4jAdapterMetrics, s4jSpace, jmsContext, destination, asyncApi, commitTransact);
this.jmsConsumer = jmsConsumer;
this.blockingMsgRecv = blockingMsgRecv;
this.msgAckRatio = msgAckRatio;
this.msgReadTimeout = readTimeout;
this.recvNoWait = recvNoWait;
this.slowInSec = slowInSec;
}
@Override
public Object apply(long value) {
long timeElapsedMills = System.currentTimeMillis() - s4jOpStartTimeMills;
// If maximum S4J operation duration is specified, only receive messages
// before the maximum duration threshold is reached. Otherwise, this is
// just no-op.
if ( (maxS4jOpDurationInSec == 0) || (timeElapsedMills <= (maxS4jOpDurationInSec*1000)) ) {
// Please see S4JSpace::getOrCreateJmsConsumer() for async processing
if (!asyncApi) {
Message recvdMsg;
try {
// blocking message receiving only applies to synchronous API
if (blockingMsgRecv) {
recvdMsg = jmsConsumer.receive();
} else if (recvNoWait) {
recvdMsg = jmsConsumer.receiveNoWait();
} else {
// timeout value 0 means to wait forever
recvdMsg = jmsConsumer.receive(msgReadTimeout);
}
if (this.commitTransact) jmsContext.commit();
if (recvdMsg != null) {
s4jSpace.processMsgAck(jmsContext, recvdMsg, msgAckRatio, slowInSec);
byte[] recvdMsgBody = recvdMsg.getBody(byte[].class);
int messageSize = recvdMsgBody.length;
messageSizeHistogram.update(messageSize);
if (logger.isDebugEnabled()) {
// for testing purpose
String myMsgSeq = recvdMsg.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP);
logger.debug("Sync message receive successful - message ID {} ({}) "
, recvdMsg.getJMSMessageID(), myMsgSeq);
}
if (s4jSpace.isTrackingMsgRecvCnt()) {
s4jSpace.incTotalOpResponseCnt();
}
} else {
if (s4jSpace.isTrackingMsgRecvCnt()) {
s4jSpace.incTotalNullMsgRecvdCnt();
}
}
} catch (JMSException | JMSRuntimeException e) {
S4JAdapterUtil.processMsgErrorHandling(
e,
s4jSpace.isStrictMsgErrorHandling(),
"Unexpected errors when sync receiving a JMS message.");
}
}
}
else {
if (logger.isTraceEnabled()) {
logger.trace("NB cycle number {} is no-op (maxS4jOpDurationInSec: {}, timeElapsedMills: {})",
value, maxS4jOpDurationInSec, timeElapsedMills);
}
}
return null;
}
}

View File

@ -0,0 +1,101 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j.ops;
import com.codahale.metrics.Histogram;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.util.S4JAdapterMetrics;
import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.HashMap;
import java.util.Map;
public class MessageProducerOp extends S4JOp {
private final static Logger logger = LogManager.getLogger("MessageProducerOp");
private final JMSProducer jmsProducer;
private final Message message;
public MessageProducerOp(S4JAdapterMetrics s4jAdapterMetrics,
S4JSpace s4jSpace,
JMSContext jmsContext,
Destination destination,
boolean asyncApi,
boolean commitTransact,
JMSProducer jmsProducer,
Message message) {
super(s4jAdapterMetrics, s4jSpace, jmsContext, destination, asyncApi, commitTransact);
this.jmsProducer = jmsProducer;
this.message = message;
}
@Override
public Object apply(long value) {
long timeElapsedMills = System.currentTimeMillis() - s4jOpStartTimeMills;
// If maximum S4J operation duration is specified, only publish messages
// before the maximum duration threshold is reached. Otherwise, this is
// just no-op.
if ( (maxS4jOpDurationInSec == 0) || (timeElapsedMills <= (maxS4jOpDurationInSec*1000)) ) {
try {
jmsProducer.send(destination, message);
if (this.commitTransact) {
jmsContext.commit();
}
int msgSize = message.getIntProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP);
messageSizeHistogram.update(msgSize);
// Please see s4JSpace::getOrCreateJmsProducer() for async processing
if (!asyncApi) {
if (logger.isDebugEnabled()) {
// for testing purpose
String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP);
logger.debug("Sync message sending is successful - message ID {} ({}) "
, message.getJMSMessageID(), myMsgSeq);
}
if (s4jSpace.isTrackingMsgRecvCnt()) {
s4jSpace.incTotalOpResponseCnt();
}
}
} catch (JMSException | JMSRuntimeException e) {
S4JAdapterUtil.processMsgErrorHandling(
e,
s4jSpace.isStrictMsgErrorHandling(),
"Unexpected errors when sync sending a JMS message.");
}
}
else {
if (logger.isTraceEnabled()) {
logger.trace("NB cycle number {} is no-op (maxS4jOpDurationInSec: {}, timeElapsedMills: {})",
value, maxS4jOpDurationInSec, timeElapsedMills);
}
}
return null;
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.s4j.ops;
import com.codahale.metrics.Histogram;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.util.S4JAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
import javax.jms.Destination;
import javax.jms.JMSContext;
public abstract class S4JOp implements CycleOp<Object> {
protected S4JAdapterMetrics s4jAdapterMetrics;
protected final S4JSpace s4jSpace;
protected final JMSContext jmsContext;
protected final Destination destination;
protected final boolean asyncApi;
protected final boolean commitTransact;
protected final long s4jOpStartTimeMills;
protected final long maxS4jOpDurationInSec;
protected final Histogram messageSizeHistogram;
public S4JOp(
S4JAdapterMetrics s4jAdapterMetrics,
S4JSpace s4jSpace,
JMSContext jmsContext,
Destination destination,
boolean asyncApi,
boolean commitTransact)
{
this.s4jAdapterMetrics = s4jAdapterMetrics;
this.s4jSpace = s4jSpace;
this.jmsContext = jmsContext;
this.destination = destination;
this.asyncApi = asyncApi;
this.commitTransact = commitTransact;
this.s4jOpStartTimeMills = s4jSpace.getS4JActivityStartTimeMills();
this.maxS4jOpDurationInSec = s4jSpace.getMaxS4JOpTimeInSec();
this.messageSizeHistogram = s4jAdapterMetrics.getMessagesizeHistogram();
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j.util;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S4JAdapterMetrics implements NBNamedElement {
private final static Logger logger = LogManager.getLogger("S4JAdapterMetrics");
private final String defaultAdapterMetricsPrefix;
private Histogram messageSizeHistogram;
private Timer bindTimer;
private Timer executeTimer;
public S4JAdapterMetrics(String defaultMetricsPrefix) {
this.defaultAdapterMetricsPrefix = defaultMetricsPrefix;
}
@Override
public String getName() {
return "S4JAdapterMetrics";
}
public void initS4JAdapterInstrumentation() {
// Histogram metrics
this.messageSizeHistogram =
ActivityMetrics.histogram(
this,
defaultAdapterMetricsPrefix + "message_size",
ActivityMetrics.DEFAULT_HDRDIGITS);
// Timer metrics
this.bindTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "bind",
ActivityMetrics.DEFAULT_HDRDIGITS);
this.executeTimer =
ActivityMetrics.timer(
this,
defaultAdapterMetricsPrefix + "execute",
ActivityMetrics.DEFAULT_HDRDIGITS);
}
public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return executeTimer; }
public Histogram getMessagesizeHistogram() { return messageSizeHistogram; }
}

View File

@ -0,0 +1,326 @@
package io.nosqlbench.adapter.s4j.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.datastax.oss.pulsar.jms.PulsarJMSConstants;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.adapter.s4j.S4JOpType;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.*;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class S4JAdapterUtil {
private final static Logger logger = LogManager.getLogger(S4JAdapterUtil.class);
///////
// Valid document level parameters for JMS NB yaml file
public final static String JMS_SPEC_VER_12 = "1.2";
public final static String JMS_SPEC_VER_20 = "2.0";
public enum DOC_LEVEL_PARAMS {
// Temporary destination
TEMP_DEST("temporary_dest"),
// JMS destination type - topic or queue
// String value
// - valid values: see JMS_DEST_TYPES
DEST_TYPE("dest_type"),
// JMS destination name
// String value
DEST_NAME("dest_name"),
// Asynchronous message processing
ASYNC_API("async_api"),
// Transaction batch size
// - Only relevant when session mode is SESSION_TRANSACTED
TXN_BATCH_NUM("txn_batch_num"),
// Whether to use blocking message receiving as the default behavior
BLOCKING_MSG_RECV("blocking_msg_recv"),
// Whether the destination is a shared topic
SHARED_TOPIC("shared_topic"),
// Whether the destination is a durable topic
DURABLE_TOPIC("durable_topic");
public final String label;
DOC_LEVEL_PARAMS(String label) {
this.label = label;
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidDocLevelParamList() {
return Arrays.stream(DOC_LEVEL_PARAMS.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// JMS Destination Types
public enum JMS_DEST_TYPES {
QUEUE("queue"),
TOPIC("topic");
public final String label;
JMS_DEST_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidJmsDestType(String type) {
return Arrays.stream(JMS_DEST_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String getValidJmsDestTypeList() {
return Arrays.stream(JMS_DEST_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// Standard JMS message headers (by JMS specification)
public enum JMS_MSG_HEADER_STD {
JMSDestination("JMSDestination"),
JMSDeliveryMode("JMSDeliveryMode"),
JMSMessageID("JMSMessageID"),
JMSTimestamp("JMSTimestamp"),
JMSRedelivered("JMSRedelivered"),
JMSExpiration("JMSExpiration"),
JMSCorrelationID("JMSCorrelationID"),
JMSType("JMSType"),
JMSReplyTo("JMSReplyTo"),
JMSPriority("JMSPriority");
public final String label;
JMS_MSG_HEADER_STD(String label) {
this.label = label;
}
}
public static boolean isValidStdJmsMsgHeader(String header) {
return Arrays.stream(JMS_MSG_HEADER_STD.values()).anyMatch(t -> t.label.equals(header));
}
public static String getValidStdJmsMsgHeaderList() {
return Arrays.stream(JMS_MSG_HEADER_STD.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// JMS defined message properties (by JMS specification)
public enum JMS_DEFINED_MSG_PROPERTY {
JMSDestination("JMSDestination"),
JMSDeliveryMode("JMSDeliveryMode"),
JMSMessageID("JMSMessageID"),
JMSTimestamp("JMSTimestamp"),
JMSRedelivered("JMSRedelivered"),
JMSExpiration("JMSExpiration"),
JMSCorrelationID("JMSCorrelationID"),
JMSType("JMSType"),
JMSReplyTo("JMSReplyTo"),
JMSPriority("JMSPriority");
public final String label;
JMS_DEFINED_MSG_PROPERTY(String label) {
this.label = label;
}
}
public static boolean isValidJmsDfndMsgProp(String property) {
return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).anyMatch(t -> t.label.equals(property));
}
public static String getValidJmsDfndMsgPropList() {
return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public final static String NB_MSG_SEQ_PROP = "NBMsgSeqProp";
public final static String NB_MSG_SIZE_PROP = "NBMsgSize";
// JMS Destination Types
public enum JMS_SESSION_MODES {
AUTO_ACK("auto_ack"),
CLIENT_ACK("client_ack"),
DUPS_OK_ACK("dups_ok_ack"),
INDIVIDUAL_ACK("individual_ack"),
TRANSACT("transact_ack");
public final String label;
JMS_SESSION_MODES(String label) {
this.label = label;
}
}
public static boolean isValidJmsSessionMode(String mode) {
return Arrays.stream(JMS_SESSION_MODES.values()).anyMatch(t -> t.label.equals(mode));
}
public static String getValidJmsSessionModeList() {
return Arrays.stream(JMS_SESSION_MODES.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// JMS Message Types
public enum JMS_MESSAGE_TYPES {
TEXT("text"),
BYTE("byte"),
MAP("map"),
STREAM("stream"),
OBJECT("object");
public final String label;
JMS_MESSAGE_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidJmsMessageType(String type) {
return Arrays.stream(JMS_MESSAGE_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String getValidJmsMessageTypeList() {
return Arrays.stream(JMS_MESSAGE_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
// JMS Message Types
public enum JMS_MSG_PROP_TYPES {
SHORT("short"),
INT("int"),
LONG("long"),
FLOAT("float"),
DOUBLE("double"),
STRING("string"),
BOOLEAN("boolean"),
BYTE("byte");
public final String label;
JMS_MSG_PROP_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidJmsMsgPropType(String type) {
return Arrays.stream(JMS_MSG_PROP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static String getValidJmsMsgPropTypeList() {
return Arrays.stream(JMS_MSG_PROP_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Convert JSON string to a key/value map
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonStr, new TypeReference<Map<String, String>>(){});
}
///////
// Convert JSON string to a list of objects
public static List<Object> convertJsonToObjList(String jsonStr) throws Exception {
ObjectMapper mapper = new ObjectMapper();
return Arrays.asList(mapper.readValue(jsonStr, Object[].class));
}
///////
// Get the destination name from the Destination object
public static String getDestinationName(Destination destination, String destType) throws JMSException {
String destName;
boolean isTopic = StringUtils.equalsIgnoreCase(destType, JMS_DEST_TYPES.TOPIC.label);
if (isTopic)
destName = ((Topic) destination).getTopicName();
else
destName = ((Queue) destination).getQueueName();
return destName;
}
///////
public static int getSessionModeFromStr(String sessionModeStr) {
// default ack mode: auto_ack
int sessionMode = -1;
if (StringUtils.isBlank(sessionModeStr))
sessionMode = JMSContext.AUTO_ACKNOWLEDGE;
else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.AUTO_ACK.label))
sessionMode = JMSContext.AUTO_ACKNOWLEDGE;
else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.CLIENT_ACK.label))
sessionMode = JMSContext.CLIENT_ACKNOWLEDGE;
else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.DUPS_OK_ACK.label))
sessionMode = JMSContext.DUPS_OK_ACKNOWLEDGE;
else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.TRANSACT.label))
sessionMode = JMSContext.SESSION_TRANSACTED;
else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.INDIVIDUAL_ACK.label))
sessionMode = PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE;
else {
if (logger.isDebugEnabled()) {
logger.debug("Invalid session mode string \"{}\". Valid values are: {}. Use the default \"auto_ack\" mode!"
,sessionModeStr, getValidJmsSessionModeList());
sessionMode = JMSContext.AUTO_ACKNOWLEDGE;
}
}
return sessionMode;
}
public static boolean isUseCredentialsEnabled(S4JClientConf s4JClientConf) {
assert (s4JClientConf != null);
boolean enabled = false;
Map<String, Object> s4jConfMap = s4JClientConf.getS4jConfObjMap();
if (s4jConfMap.containsKey("jms.useCredentialsFromCreateConnection")) {
enabled = BooleanUtils.toBoolean(s4jConfMap.get("jms.useCredentialsFromCreateConnection").toString());
}
return enabled;
}
public static String getCredentialUserName(S4JClientConf s4JClientConf) {
return "dummy";
}
public static String getCredentialPassword(S4JClientConf s4JClientConf) {
Map<String, Object> s4jConfMap = s4JClientConf.getS4jConfObjMap();
if (s4jConfMap.containsKey("authParams"))
return s4jConfMap.get("authParams").toString();
else
return "";
}
///////
// Calculate a unique cache key from a series of input parameters
public static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
}
///////
// Pause the execution of the current thread
public static void pauseCurThreadExec(int pauseInSec) {
if (pauseInSec > 0) {
try {
Thread.sleep(pauseInSec * 1000);
}
catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
///////
// Error handling for message processing
public static void processMsgErrorHandling(Exception exception, boolean strictErrorHandling, String errorMsg) {
exception.printStackTrace();
if (strictErrorHandling) {
throw new RuntimeException(errorMsg + " [ " + exception.getMessage() + " ]");
}
else {
S4JAdapterUtil.pauseCurThreadExec(1);
}
}
}

View File

@ -0,0 +1,213 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.s4j.util;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.FileBasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
import org.apache.commons.configuration2.builder.fluent.Parameters;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class S4JClientConf {
private final static Logger logger = LogManager.getLogger(S4JClientConf.class);
public static final String CLIENT_CONF_PREFIX = "client";
public static final String PRODUCER_CONF_PREFIX = "producer";
public static final String CONSUMER_CONF_PREFIX = "consumer";
public static final String JMS_CONF_PREFIX = "jms";
// "Raw" map is what is read from the config properties file
// "Tgt" map is what is really needed in the Pulsar producer/consumer API
private Map<String, String> clientConfMapRaw = new HashMap<>();
private Map<String, String> producerConfMapRaw = new HashMap<>();
private Map<String, String> consumerConfMapRaw = new HashMap<>();
private Map<String, String> jmsConfMapRaw = new HashMap<>();
private Map<String, String> miscConfMapRaw = new HashMap<>();
private final Map<String, Object> s4jConfMapTgt = new HashMap<>();
private Map<String, Object> clientConfMapTgt = new HashMap<>();
private Map<String, Object> producerConfMapTgt = new HashMap<>();
private Map<String, Object> consumerConfMapTgt = new HashMap<>();
private Map<String, Object> jmsConfMapTgt = new HashMap<>();
private Map<String, Object> miscConfMapTgt = new HashMap<>();
public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) {
//////////////////
// Read related Pulsar client configuration settings from a file
readRawConfFromFile(s4jConfFileName);
//////////////////
// Ignores the following Pulsar client/producer/consumer configurations since
// they're either not supported in the S4J API or the property must be specified
// as the NB CLI parameter or the NB yaml file parameter.
// <<< https://pulsar.apache.org/docs/reference-configuration/#client >>>
// pulsar client config
// * webServiceUrl
// * brokerServiceUrl
clientConfMapRaw.put("brokerServiceUrl", pulsarSvcUrl);
clientConfMapRaw.put("webServiceUrl", webSvcUrl);
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>>
// producer config
// * topicName
producerConfMapRaw.remove("topicName");
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer >>>
// consumer config
// * topicNames
// * topicsPattern
// * subscriptionName
// * subscriptionType
consumerConfMapRaw.remove("topicNames");
consumerConfMapRaw.remove("topicPattern");
consumerConfMapRaw.remove("subscriptionName");
consumerConfMapRaw.remove("subscriptionType");
consumerConfMapRaw.remove("subscriptionInitialPosition");
consumerConfMapRaw.remove("regexSubscriptionMode");
//////////////////
// Convert the raw configuration map (<String,String>) to the required map (<String,Object>)
clientConfMapTgt.putAll(S4JClientConfConverter.convertRawClientConf(clientConfMapRaw));
producerConfMapTgt.putAll(S4JClientConfConverter.convertRawProducerConf(producerConfMapRaw));
consumerConfMapTgt.putAll(S4JClientConfConverter.convertRawConsumerConf(consumerConfMapRaw));
jmsConfMapTgt.putAll(S4JClientConfConverter.convertRawJmsConf(jmsConfMapRaw));
miscConfMapTgt.putAll(S4JClientConfConverter.convertRawMiscConf(miscConfMapRaw));
s4jConfMapTgt.putAll(clientConfMapTgt);
s4jConfMapTgt.put("producerConfig", producerConfMapTgt);
s4jConfMapTgt.put("consumerConfig", consumerConfMapTgt);
s4jConfMapTgt.putAll(jmsConfMapTgt);
s4jConfMapTgt.putAll(miscConfMapTgt);
}
public void readRawConfFromFile(String fileName) {
File file = new File(fileName);
try {
String canonicalFilePath = file.getCanonicalPath();
Parameters params = new Parameters();
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
.configure(params.properties()
.setFileName(fileName));
Configuration config = builder.getConfiguration();
for (Iterator<String> it = config.getKeys(); it.hasNext(); ) {
String confKey = it.next();
String confVal = config.getProperty(confKey).toString();
if (!StringUtils.isBlank(confVal)) {
// Get client connection specific configuration settings, removing "client." prefix
if (StringUtils.startsWith(confKey, CLIENT_CONF_PREFIX)) {
clientConfMapRaw.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), confVal);
}
// Get producer specific configuration settings, removing "producer." prefix
else if (StringUtils.startsWith(confKey, PRODUCER_CONF_PREFIX)) {
producerConfMapRaw.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), confVal);
}
// Get consumer specific configuration settings, removing "consumer." prefix
else if (StringUtils.startsWith(confKey, CONSUMER_CONF_PREFIX)) {
consumerConfMapRaw.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), confVal);
}
// Get JMS specific configuration settings, keeping "jms." prefix
else if (StringUtils.startsWith(confKey, JMS_CONF_PREFIX)) {
jmsConfMapRaw.put(confKey, confVal);
}
// For all other configuration settings (not having any of the above prefixes), keep as is
else {
miscConfMapRaw.put(confKey, confVal);
}
}
}
} catch (IOException ioe) {
logger.error("Can't read the specified config properties file: " + fileName);
ioe.printStackTrace();
} catch (ConfigurationException cex) {
logger.error("Error loading configuration items from the specified config properties file: " + fileName + ":" + cex.getMessage());
cex.printStackTrace();
}
}
public Map<String, Object> getS4jConfObjMap() { return this.s4jConfMapTgt; }
public Map<String, Object> getS4jConfMapObj_client() { return this.clientConfMapTgt; }
public Map<String, Object> getS4jConfMapObj_producer() { return this.producerConfMapTgt; }
public Map<String, Object> getS4jConfMapObj_consumer() { return this.consumerConfMapTgt; }
public Map<String, Object> getS4jConfMapObj_jms() { return this.jmsConfMapTgt; }
public Map<String, Object> getS4jConfMapObj_misc() { return this.miscConfMapTgt; }
private Map<String, Object> mergeConfigObjMaps(
Map<String, Object> origConfigObjMap,
Map<String, Object> extraConfigObjMap )
{
Map<String, Object> newConfigObjMap = new HashMap<>();
// If there are the same settings in both "orig" and "extra" maps,
// the one in the "extra" map will take over
newConfigObjMap.putAll(origConfigObjMap);
newConfigObjMap.putAll(extraConfigObjMap);
return newConfigObjMap;
}
public Map<String, Object> mergeExtraConsumerConfig(
Map<String, String> extraConsumerConfigRaw)
{
if ( (extraConsumerConfigRaw == null) || (extraConsumerConfigRaw.isEmpty()) ) {
return getS4jConfObjMap();
}
else {
Map<String, Object> origConsumerConfigObjMap = getS4jConfMapObj_consumer();
Map<String, Object> extraConsumerConfigObjMap =
S4JClientConfConverter.convertRawConsumerConf(extraConsumerConfigRaw);
Map<String, Object> mergedConsumerConfigObjMap =
mergeConfigObjMaps(origConsumerConfigObjMap, extraConsumerConfigObjMap);
Map<String, Object> mergedS4JConfObjMap = getS4jConfObjMap();
mergedS4JConfObjMap.put("consumerConfig", mergedConsumerConfigObjMap);
return mergedS4JConfObjMap;
}
}
public String toString() {
return new ToStringBuilder(this).
append("effectiveS4jConfMap", s4jConfMapTgt).
toString();
}
}

View File

@ -0,0 +1,429 @@
package io.nosqlbench.adapter.s4j.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.CompressionType;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* This class is used to convert the configuration items in its raw
* format (as provided in the property file) to the format needed by
* the S4J driver
*/
public class S4JClientConfConverter {
public static Map<String, Object> convertRawClientConf(Map<String, String> pulsarClientConfMapRaw) {
Map<String, Object> s4jClientConfObjMap = new HashMap<>();
s4jClientConfObjMap.putAll(pulsarClientConfMapRaw);
/**
* No special handling for non-primitive types
*/
return s4jClientConfObjMap;
}
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>>
private final static Map<String, String> validStdProducerConfKeyTypeMap = Map.ofEntries(
Map.entry("topicName", "String"),
Map.entry("producerName","String"),
Map.entry("sendTimeoutMs","long"),
Map.entry("blockIfQueueFull","boolean"),
Map.entry("maxPendingMessages","int"),
Map.entry("maxPendingMessagesAcrossPartitions","int"),
Map.entry("messageRoutingMode","MessageRoutingMode"),
Map.entry("hashingScheme","HashingScheme"),
Map.entry("cryptoFailureAction","ProducerCryptoFailureAction"),
Map.entry("batchingMaxPublishDelayMicros","long"),
Map.entry("batchingMaxMessages","int"),
Map.entry("batchingEnabled","boolean"),
Map.entry("chunkingEnabled","boolean"),
Map.entry("compressionType","CompressionType"),
Map.entry("initialSubscriptionName","string")
);
public static Map<String, Object> convertRawProducerConf(Map<String, String> pulsarProducerConfMapRaw) {
Map<String, Object> s4jProducerConfObjMap = new HashMap<>();
setConfObjMapForPrimitives(s4jProducerConfObjMap, pulsarProducerConfMapRaw, validStdProducerConfKeyTypeMap);
/**
* Non-primitive type processing for Pulsar producer configuration items
*/
// "compressionType" has value type "CompressionType"
// - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY'
String confKeyName = "compressionType";
String confVal = pulsarProducerConfMapRaw.get(confKeyName);
String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)";
if (StringUtils.isNotBlank(confVal)) {
if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) {
CompressionType compressionType = CompressionType.NONE;
switch (StringUtils.upperCase(confVal)) {
case "LZ4":
compressionType = CompressionType.LZ4;
case "ZLIB":
compressionType = CompressionType.ZLIB;
case "ZSTD":
compressionType = CompressionType.ZSTD;
case "SNAPPY":
compressionType = CompressionType.SNAPPY;
}
s4jProducerConfObjMap.put(confKeyName, compressionType);
} else {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal));
}
}
// TODO: Skip the following Pulsar configuration items for now because they're not really
// needed in the NB S4J testing at the moment. Add support for them when needed.
// * messageRoutingMode
// * hashingScheme
// * cryptoFailureAction
return s4jProducerConfObjMap;
}
// https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer
private final static Map<String, String> validStdConsumerConfKeyTypeMap = Map.ofEntries(
Map.entry("topicNames", "Set<String>"),
Map.entry("topicsPattern","Pattern"),
Map.entry("subscriptionName","String"),
Map.entry("subscriptionType","SubscriptionType"),
Map.entry("receiverQueueSize","int"),
Map.entry("acknowledgementsGroupTimeMicros","long"),
Map.entry("negativeAckRedeliveryDelayMicros","long"),
Map.entry("maxTotalReceiverQueueSizeAcrossPartitions","int"),
Map.entry("consumerName","String"),
Map.entry("ackTimeoutMillis","long"),
Map.entry("tickDurationMillis","long"),
Map.entry("priorityLevel","int"),
Map.entry("cryptoFailureAction","ConsumerCryptoFailureAction"),
Map.entry("properties","SortedMap<String, String>"),
Map.entry("readCompacted","boolean"),
Map.entry("subscriptionInitialPosition", "SubscriptionInitialPosition"),
Map.entry("patternAutoDiscoveryPeriod", "int"),
Map.entry("regexSubscriptionMode", "RegexSubscriptionMode"),
Map.entry("deadLetterPolicy", "DeadLetterPolicy"),
Map.entry("autoUpdatePartitions", "boolean"),
Map.entry("replicateSubscriptionState", "boolean"),
Map.entry("negativeAckRedeliveryBackoff", "RedeliveryBackoff"),
Map.entry("ackTimeoutRedeliveryBackoff", "RedeliveryBackoff"),
Map.entry("autoAckOldestChunkedMessageOnQueueFull", "boolean"),
Map.entry("maxPendingChunkedMessage", "int"),
Map.entry("expireTimeOfIncompleteChunkedMessageMillis", "long")
);
public static Map<String, Object> convertRawConsumerConf(Map<String, String> pulsarConsumerConfMapRaw) {
Map<String, Object> s4jConsumerConfObjMap = new HashMap<>();
setConfObjMapForPrimitives(s4jConsumerConfObjMap, pulsarConsumerConfMapRaw, validStdConsumerConfKeyTypeMap);
/**
* Non-primitive type processing for Pulsar consumer configuration items
*/
// The following non-primitive type configuration items are already excluded
// and don't need to be processed.
// * topicNames
// * topicPattern
// * subscriptionName
// * subscriptionType
// * subscriptionInitialPosition
// * regexSubscriptionMode
// "properties" has value type "SortedMap<String, String>"
// - expecting the value string has the format: a JSON string that includes a set of key/value pairs
String confKeyName = "properties";
String confVal = pulsarConsumerConfMapRaw.get(confKeyName);
String expectedVal = "{\"property1\":\"value1\", \"property2\":\"value2\"}, ...";
ObjectMapper mapper = new ObjectMapper();
if (StringUtils.isNotBlank(confVal)) {
try {
Map<String, String> consumerProperties = mapper.readValue(confVal, Map.class);
// Empty map value is considered as no value
if (!consumerProperties.isEmpty()) {
s4jConsumerConfObjMap.put(confKeyName, consumerProperties);
}
} catch (Exception e) {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
}
}
// "deadLetterPolicy"
// - expecting the value is a JSON string has the format:
// {"maxRedeliverCount":"<int_value>","deadLetterTopic":"<topic_name>","initialSubscriptionName":"<sub_name>"}
confKeyName = "deadLetterPolicy";
confVal = pulsarConsumerConfMapRaw.get(confKeyName);
expectedVal = "{" +
"\"maxRedeliverCount\":\"<int_value>\"," +
"\"deadLetterTopic\":\"<topic_name>\"," +
"\"initialSubscriptionName\":\"<sub_name>\"}";
if (StringUtils.isNotBlank(confVal)) {
try {
Map<String, String> dlqPolicyMap = mapper.readValue(confVal, Map.class);
// Empty map value is considered as no value
if (!dlqPolicyMap.isEmpty()) {
boolean valid = true;
// The JSON key must be one of "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName"
for (String key : dlqPolicyMap.keySet()) {
if (!StringUtils.equalsAnyIgnoreCase(key,
"maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName")) {
valid = false;
break;
}
}
// DLQ.maxRedeliverCount is mandatory
if (valid && !dlqPolicyMap.containsKey("maxRedeliverCount")) {
valid = false;
}
String maxRedeliverCountStr = dlqPolicyMap.get("maxRedeliverCount");
if (!NumberUtils.isCreatable(maxRedeliverCountStr)) {
valid = false;
}
if (valid) {
// In S4J driver, DLQ setting is done via a Map
// <<< https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-implementation.html#dead-letter-policy >>>
s4jConsumerConfObjMap.put(confKeyName, dlqPolicyMap);
} else {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
}
}
} catch (Exception e) {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
}
}
// "negativeAckRedeliveryBackoff" or "ackTimeoutRedeliveryBackoff"
// - expecting the value is a JSON string has the format:
// {"minDelayMs":"<int_value>", "maxDelayMs":"<int_value>", "multiplier":"<double_value>"}
String[] redeliveryBackoffConfigSet = {"negativeAckRedeliveryBackoff", "ackTimeoutRedeliveryBackoff"};
expectedVal = "{" +
"\"minDelayMs\":\"<int_value>\"," +
"\"maxDelayMs\":\"<int_value>\"," +
"\"multiplier\":\"<double_value>\"}";
for (String confKey : redeliveryBackoffConfigSet) {
confVal = pulsarConsumerConfMapRaw.get(confKey);
if (StringUtils.isNotBlank(confVal)) {
try {
Map<String, String> redliveryBackoffMap = mapper.readValue(confVal, Map.class);
// Empty map value is considered as no value
if (! redliveryBackoffMap.isEmpty()) {
boolean valid = true;
// The JSON key must be one of "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName"
for (String key : redliveryBackoffMap.keySet()) {
if (!StringUtils.equalsAnyIgnoreCase(key,
"minDelayMs", "maxDelayMs", "multiplier")) {
valid = false;
break;
}
}
String minDelayMsStr = redliveryBackoffMap.get("minDelayMs");
String maxDelayMsStr = redliveryBackoffMap.get("maxDelayMs");
String multiplierStr = redliveryBackoffMap.get("multiplier");
if ((StringUtils.isNotBlank(minDelayMsStr) && !NumberUtils.isCreatable(minDelayMsStr)) ||
(StringUtils.isNotBlank(maxDelayMsStr) && !NumberUtils.isCreatable(maxDelayMsStr)) ||
(StringUtils.isNotBlank(multiplierStr) && !NumberUtils.isCreatable(multiplierStr))) {
valid = false;
}
if (valid) {
// In S4J driver, AckTimeOut and Negative TimeOut is done via a Map
// <<< https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-implementation.html#ack-timeout >>>
// <<< https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-implementation.html#negative-ack >>>
s4jConsumerConfObjMap.put(confKey, redliveryBackoffMap);
} else {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKey, confVal, "consumer", expectedVal));
}
}
} catch (Exception e) {
throw new S4JAdapterInvalidParamException(
getInvalidConfValStr(confKey, confVal, "consumer", expectedVal));
}
}
}
// TODO: Skip the following Pulsar configuration items for now because they're not really
// needed in the NB S4J testing right now. Add the support for them when needed.
// * cryptoFailureAction
return s4jConsumerConfObjMap;
}
// https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options
private final static Map<String, String> validS4jJmsConfKeyTypeMap = Map.ofEntries(
Map.entry("jms.acknowledgeRejectedMessages", "boolean"),
Map.entry("jms.clientId","String"),
Map.entry("jms.emulateTransactions","boolean"),
Map.entry("jms.enableClientSideEmulation","boolean"),
Map.entry("jms.forceDeleteTemporaryDestinations","boolean"),
Map.entry("jms.precreateQueueSubscription","boolean"),
Map.entry("jms.queueSubscriptionName","String"),
Map.entry("jms.systemNamespace","String"),
Map.entry("jms.topicSharedSubscriptionType","String"),
Map.entry("jms.useCredentialsFromCreateConnection","boolean"),
Map.entry("jms.useExclusiveSubscriptionsForSimpleConsumers","long"),
Map.entry("jms.usePulsarAdmin","boolean"),
Map.entry("jms.useServerSideFiltering","boolean"),
Map.entry("jms.waitForServerStartupTimeout","int"),
Map.entry("jms.transactionsStickyPartitions", "boolean")
);
public static Map<String, Object> convertRawJmsConf(Map<String, String> s4jJmsConfMapRaw) {
Map<String, Object> s4jJmsConfObjMap = new HashMap<>();
setConfObjMapForPrimitives(s4jJmsConfObjMap, s4jJmsConfMapRaw, validS4jJmsConfKeyTypeMap);
/**
* Non-primitive type processing for Pulsar client configuration items
*/
// None
return s4jJmsConfObjMap;
}
// https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options
private final static Map<String, String> validS4jMiscConfKeyTypeMap = Map.ofEntries(
Map.entry("brokerServiceUrl","String"),
Map.entry("webServiceUrl","String"),
Map.entry("ackTimeout", "long"),
Map.entry("ackTimeoutMillis","long"),
Map.entry("enableTransaction","boolean"),
Map.entry("consumerConfig","Map<String,Object>"),
Map.entry("producerConfig","Map<String,Object>")
);
public static Map<String, Object> convertRawMiscConf(Map<String, String> s4jMiscConfMapRaw) {
Map<String, Object> s4jMiscConfObjMap = new HashMap<>();
setConfObjMapForPrimitives(s4jMiscConfObjMap, s4jMiscConfMapRaw, validS4jMiscConfKeyTypeMap);
/**
* Non-primitive type processing for Pulsar client configuration items
*/
// Only the following 2 non-primitive type settings will be set explicitly
// * producerConfig
// * consumerConfig
return s4jMiscConfObjMap;
}
// Utility function
// - get configuration key names by the value type
private static List<String> getConfKeyNameByValueType(Map<String, String> confKeyTypeMap, String tgtValType) {
ArrayList<String> confKeyNames = new ArrayList<>();
for (Map.Entry entry: confKeyTypeMap.entrySet()) {
if (StringUtils.equalsIgnoreCase(entry.getValue().toString(), tgtValType)) {
confKeyNames.add(entry.getKey().toString());
}
}
return confKeyNames;
}
// Conversion from Map<String, String> to Map<String, Object> for configuration items with primitive
// value types
private static void setConfObjMapForPrimitives(
Map<String, Object> tgtConfObjMap,
Map<String, String> srcConfMapRaw,
Map<String, String> validConfKeyTypeMap)
{
List<String> confKeyList = new ArrayList<>();
// All configuration items with "String" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "String");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);
if (StringUtils.isNotBlank(confVal)) {
tgtConfObjMap.put(confKey, confVal);
}
}
}
// All configuration items with "long" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "long");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);
if (StringUtils.isNotBlank(confVal)) {
tgtConfObjMap.put(confKey, Long.valueOf(confVal));
}
}
}
// All configuration items with "int" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "int");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);
if (StringUtils.isNotBlank(confVal)) {
tgtConfObjMap.put(confKey, Integer.valueOf(confVal));
}
}
}
// All configuration items with "boolean" as the value type
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "boolean");
for (String confKey : confKeyList) {
if (srcConfMapRaw.containsKey(confKey)) {
String confVal = srcConfMapRaw.get(confKey);
if (StringUtils.isNotBlank(confVal)) {
tgtConfObjMap.put(confKey, Boolean.valueOf(confVal));
}
}
}
// TODO: So far the above primitive types should be good enough.
// Add support for other types when needed
}
private static String getInvalidConfValStr(String confKey, String confVal, String configCategory, String expectedVal) {
return "Incorrect value \"" + confVal + "\" for Pulsar " + configCategory +
" configuration item of \"" + confKey + "\". Expecting the following value (format): " + expectedVal;
}
}

View File

@ -0,0 +1,86 @@
package io.nosqlbench.adapter.s4j.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.dispensers.S4JBaseOpDispenser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.CompletionListener;
import javax.jms.JMSException;
import javax.jms.Message;
/**
* Used for async message production
*/
public class S4JCompletionListener implements CompletionListener {
private final static Logger logger = LogManager.getLogger(S4JCompletionListener.class);
private final S4JSpace s4JSpace;
private final S4JBaseOpDispenser s4jBaseOpDispenser;
public S4JCompletionListener(S4JSpace s4JSpace, S4JBaseOpDispenser s4jBaseOpDispenser) {
assert (s4JSpace != null);
assert (s4jBaseOpDispenser != null);
this.s4JSpace = s4JSpace;
this.s4jBaseOpDispenser = s4jBaseOpDispenser;
}
@Override
public void onCompletion(Message message) {
try {
if (logger.isTraceEnabled()) {
// for testing purpose
String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP);
logger.trace("onCompletion::Async message send successful - message ID {} ({}) "
, message.getJMSMessageID(), myMsgSeq);
}
if (s4JSpace.isTrackingMsgRecvCnt() ) {
long totalResponseCnt = s4JSpace.incTotalOpResponseCnt();
if (logger.isTraceEnabled()) {
logger.trace("... async op response received so far: {}", totalResponseCnt);
}
}
}
catch (JMSException e) {
S4JAdapterUtil.processMsgErrorHandling(
e,
s4JSpace.isStrictMsgErrorHandling(),
"Unexpected errors when async sending a JMS message.");
}
}
@Override
public void onException(Message message, Exception e) {
try {
if (logger.isDebugEnabled()) {
// for testing purpose
String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP);
logger.debug("onException::Async message send failed - message ID {} ({}) "
, message.getJMSMessageID(), myMsgSeq);
}
}
catch (JMSException jmsException) {
logger.warn("onException::Unexpected error: " + jmsException.getMessage());
}
}
}

View File

@ -0,0 +1,52 @@
package io.nosqlbench.adapter.s4j.util;
import org.apache.commons.lang3.builder.ToStringBuilder;
import javax.jms.JMSContext;
import javax.jms.Session;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
public class S4JJMSContextWrapper {
private final String jmsContextIdentifer;
private final JMSContext jmsContext;
private final int jmsSessionMode;
public S4JJMSContextWrapper(String identifer, JMSContext jmsContext) {
this.jmsContextIdentifer = identifer;
this.jmsContext = jmsContext;
this.jmsSessionMode = jmsContext.getSessionMode();
}
public int getJmsSessionMode() { return jmsSessionMode; }
public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); }
public String getJmsContextIdentifer() { return jmsContextIdentifer; }
public JMSContext getJmsContext() { return jmsContext; }
public void close() {
if (jmsContext != null) {
jmsContext.close();
}
}
public String toString() {
return new ToStringBuilder(this).
append("jmsContextIdentifer", jmsContextIdentifer).
append("jmsContext", jmsContext.toString()).
toString();
}
}

View File

@ -0,0 +1,97 @@
package io.nosqlbench.adapter.s4j.util;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.codahale.metrics.Histogram;
import io.nosqlbench.adapter.s4j.S4JSpace;
import io.nosqlbench.adapter.s4j.dispensers.S4JBaseOpDispenser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
/**
* Used for async message consumption
*/
public class S4JMessageListener implements MessageListener {
private final static Logger logger = LogManager.getLogger(S4JMessageListener.class);
private final float msgAckRatio;
private final int slowAckInSec;
private final JMSContext jmsContext;
private final S4JSpace s4jSpace;
private final S4JBaseOpDispenser s4jBaseOpDispenser;
public S4JMessageListener(
JMSContext jmsContext,
S4JSpace s4jSpace,
S4JBaseOpDispenser s4jBaseOpDispenser,
float msgAckRatio,
int slowAckInSec)
{
assert (jmsContext != null);
assert (s4jSpace != null);
assert (s4jBaseOpDispenser != null);
this.jmsContext = jmsContext;
this.s4jSpace = s4jSpace;
this.s4jBaseOpDispenser = s4jBaseOpDispenser;
this.msgAckRatio = msgAckRatio;
this.slowAckInSec = slowAckInSec;
}
@Override
public void onMessage(Message message) {
try {
if (message != null) {
s4jSpace.processMsgAck(jmsContext, message, msgAckRatio, slowAckInSec);
int msgSize = message.getIntProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP);
S4JAdapterMetrics s4JAdapterMetrics = s4jBaseOpDispenser.getS4jAdapterMetrics();
Histogram messageSizeHistogram = s4JAdapterMetrics.getMessagesizeHistogram();
messageSizeHistogram.update(msgSize);
if (logger.isTraceEnabled()) {
// for testing purpose
String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP);
logger.trace("onMessage::Async message receive successful - message ID {} ({}) "
, message.getJMSMessageID(), myMsgSeq);
}
if (s4jSpace.isTrackingMsgRecvCnt()) {
s4jSpace.incTotalOpResponseCnt();
}
}
else {
if (s4jSpace.isTrackingMsgRecvCnt()) {
s4jSpace.incTotalNullMsgRecvdCnt();
}
}
}
catch (JMSException e) {
S4JAdapterUtil.processMsgErrorHandling(
e,
s4jSpace.isStrictMsgErrorHandling(),
"Unexpected errors when async receiving a JMS message.");
}
}
}

View File

@ -94,6 +94,12 @@
<version>4.17.31-SNAPSHOT</version> <version>4.17.31-SNAPSHOT</version>
</dependency> </dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-s4j</artifactId>
<version>4.17.31-SNAPSHOT</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -62,6 +62,7 @@
<module>adapter-dynamodb</module> <module>adapter-dynamodb</module>
<module>adapter-mongodb</module> <module>adapter-mongodb</module>
<module>adapter-pulsar</module> <module>adapter-pulsar</module>
<module>adapter-s4j</module>
<!-- VIRTDATA MODULES --> <!-- VIRTDATA MODULES -->