diff --git a/adapter-pulsar/pom.xml b/adapter-pulsar/pom.xml index 2428c54fe..5ed541f12 100644 --- a/adapter-pulsar/pom.xml +++ b/adapter-pulsar/pom.xml @@ -62,6 +62,13 @@ ${pulsar.version} + + + org.apache.commons + commons-lang3 + 3.12.0 + + commons-beanutils @@ -82,13 +89,6 @@ avro 1.11.1 - - - - org.apache.commons - commons-lang3 - 3.12.0 - diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarClientConf.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarClientConf.java index 4af681509..fc6966f4a 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarClientConf.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/util/PulsarClientConf.java @@ -73,7 +73,7 @@ public class PulsarClientConf { // Convert the raw configuration map () to the required map () producerConfMapTgt.putAll(PulsarConfConverter.convertStdRawProducerConf(producerConfMapRaw)); consumerConfMapTgt.putAll(PulsarConfConverter.convertStdRawConsumerConf(consumerConfMapRaw)); - // TODO: Reader API is not disabled at the moment. Revisit when needed + // TODO: Reader API is not enabled at the moment. Revisit when needed } diff --git a/adapter-s4j/pom.xml b/adapter-s4j/pom.xml new file mode 100644 index 000000000..41bb02db6 --- /dev/null +++ b/adapter-s4j/pom.xml @@ -0,0 +1,89 @@ + + 4.0.0 + + adapter-s4j + jar + + + mvn-defaults + io.nosqlbench + 4.17.31-SNAPSHOT + ../mvn-defaults + + + ${project.artifactId} + + A Starlight for JMS driver for nosqlbench. This provides the ability to inject synthetic data + into a pulsar system via JMS 2.0 compatible APIs. + + NOTE: this is JMS compatible driver from DataStax that allows using a Pulsar cluster + as the potential JMS Destination + + + + 3.2.0 + + + + + + kr.motd.maven + os-maven-plugin + 1.7.1 + + + + + + + + io.nosqlbench + engine-api + 4.17.31-SNAPSHOT + + + + io.nosqlbench + adapters-api + 4.17.31-SNAPSHOT + + + + + com.datastax.oss + pulsar-jms-all + ${s4j.version} + + + + + org.apache.commons + commons-lang3 + 3.12.0 + + + + + commons-beanutils + commons-beanutils + 1.9.4 + + + + + org.apache.commons + commons-configuration2 + 2.8.0 + + + + + org.conscrypt + conscrypt-openjdk + 2.5.2 + ${os.detected.classifier} + + + + + diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JDriverAdapter.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JDriverAdapter.java new file mode 100644 index 000000000..49527f9c0 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JDriverAdapter.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j; + +import io.nosqlbench.adapter.s4j.ops.S4JOp; +import io.nosqlbench.api.config.standard.NBConfigModel; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.engine.api.activityimpl.OpMapper; +import io.nosqlbench.engine.api.activityimpl.uniform.BaseDriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.nb.annotations.Service; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.Function; + +@Service(value = DriverAdapter.class, selector = "s4j") +public class S4JDriverAdapter extends BaseDriverAdapter { + private final static Logger logger = LogManager.getLogger(S4JDriverAdapter.class); + + @Override + public OpMapper getOpMapper() { + DriverSpaceCache spaceCache = getSpaceCache(); + NBConfiguration adapterConfig = getConfiguration(); + return new S4JOpMapper(this, adapterConfig, spaceCache); + } + + @Override + public Function getSpaceInitializer(NBConfiguration cfg) { + return (s) -> new S4JSpace(s, cfg); + } + + @Override + public NBConfigModel getConfigModel() { + return super.getConfigModel().add(S4JSpace.getConfigModel()); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JOpMapper.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JOpMapper.java new file mode 100644 index 000000000..95b39ec57 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JOpMapper.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j; + +import io.nosqlbench.adapter.s4j.dispensers.MessageConsumerOpDispenser; +import io.nosqlbench.adapter.s4j.dispensers.MessageProducerOpDispenser; +import io.nosqlbench.adapter.s4j.ops.S4JOp; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.engine.api.activityimpl.OpDispenser; +import io.nosqlbench.engine.api.activityimpl.OpMapper; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.engine.api.templating.ParsedOp; +import io.nosqlbench.engine.api.templating.TypeAndTarget; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class S4JOpMapper implements OpMapper { + + private final static Logger logger = LogManager.getLogger(S4JOpMapper.class); + + private final NBConfiguration cfg; + private final DriverSpaceCache spaceCache; + private final DriverAdapter adapter; + + public S4JOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache spaceCache) { + this.cfg = cfg; + this.spaceCache = spaceCache; + this.adapter = adapter; + } + + @Override + public OpDispenser apply(ParsedOp op) { + String spaceName = op.getStaticConfigOr("space", "default"); + S4JSpace s4jSpace = spaceCache.get(spaceName); + + /* + * If the user provides a body element, then they want to provide the JSON or + * a data structure that can be converted into JSON, bypassing any further + * specialized type-checking or op-type specific features + */ + if (op.isDefined("body")) { + throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field."); + } + else { + TypeAndTarget opType = op.getTypeAndTarget(S4JOpType.class, String.class); + + return switch (opType.enumId) { + case MessageProduce -> + new MessageProducerOpDispenser(adapter, op, opType.targetFunction, s4jSpace); + case MessageConsume -> + new MessageConsumerOpDispenser(adapter, op, opType.targetFunction, s4jSpace); + }; + } + } + +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JOpType.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JOpType.java new file mode 100644 index 000000000..2e4f1144b --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JOpType.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j; + +public enum S4JOpType { + // publishing/sending messages to a JMS queue or a topic + MessageProduce, + // consuming/receiving messages from a JMS queue or a topic + // for a topic, it can be: + // - non-durable, non-shared + // - durable, non-shared + // - non-durable, shared + // - durable, shared + MessageConsume; +} + + diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java new file mode 100644 index 000000000..8c552912f --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/S4JSpace.java @@ -0,0 +1,364 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j; + +import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException; +import io.nosqlbench.adapter.s4j.util.*; +import io.nosqlbench.api.config.standard.ConfigModel; +import io.nosqlbench.api.config.standard.NBConfigModel; +import io.nosqlbench.api.config.standard.NBConfiguration; +import io.nosqlbench.api.config.standard.Param; +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +public class S4JSpace implements AutoCloseable { + + private final static Logger logger = LogManager.getLogger(S4JSpace.class); + + private final String spaceName; + private final NBConfiguration cfg; + + // - Each S4J space currently represents a number of JMS connections (\"num_conn\" NB CLI parameter); + // - JMS connection can have a number of JMS sessions (\"num_session\" NB CLI parameter). + // - Each JMS session has its own sets of JMS destinations, producers, consumers, etc. + private final ConcurrentHashMap connLvlJmsContexts = new ConcurrentHashMap<>(); + private final ConcurrentHashMap sessionLvlJmsContexts = new ConcurrentHashMap<>(); + + private final String pulsarSvcUrl; + private final String webSvcUrl; + private final String s4jClientConfFileName; + private S4JClientConf s4JClientConf; + private final int sessionMode; + + // Whether to do strict error handling while sending/receiving messages + // - Yes: any error returned from the Pulsar server while doing message receiving/sending will trigger NB execution stop + // - No: pause the current thread that received the error message for 1 second and then continue processing + private boolean strictMsgErrorHandling; + + // Maximum time length to execute S4J operations (e.g. message send or consume) + // - when NB execution passes this threshold, it is simply NoOp + // - 0 means no maximum time constraint. S4JOp is always executed until NB execution cycle finishes + private long maxS4JOpTimeInSec; + private long s4JActivityStartTimeMills; + + // Whether to keep track of the received message count, which includes + // - total received message count + // - received null message count (only relevant when non-blocking message receiving is used) + // By default, this setting is disabled + private boolean trackingMsgRecvCnt; + + // How many JMS connections per NB S4J execution + private int maxNumConn; + // How many sessions per JMS connection + private int maxNumSessionPerConn; + + // Total number of acknowledgement received + // - this can apply to both message production and consumption + // - for message consumption, this only applies to non-null messages received (which is for async API) + private final AtomicLong totalOpResponseCnt = new AtomicLong(0); + // Total number of null messages received + // - only applicable to message consumption + private final AtomicLong nullMsgRecvCnt = new AtomicLong(0); + + // Keep track the transaction count per thread + private final ThreadLocal txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); + + // Represents the JMS connection + private PulsarConnectionFactory s4jConnFactory; + + private long totalCycleNum; + + public S4JSpace(String spaceName, NBConfiguration cfg) { + this.spaceName = spaceName; + this.cfg = cfg; + + this.pulsarSvcUrl = cfg.get("service_url"); + this.webSvcUrl = cfg.get("web_url"); + this.maxNumConn= cfg.getOrDefault("num_conn", Integer.valueOf(1)); + this.maxNumSessionPerConn = cfg.getOrDefault("num_session", Integer.valueOf(1)); + this.maxS4JOpTimeInSec= cfg.getOrDefault("max_s4jop_time", Long.valueOf(0)); + this.trackingMsgRecvCnt=cfg.getOrDefault("track_msg_cnt", Boolean.FALSE); + this.strictMsgErrorHandling = cfg.getOrDefault("strict_msg_error_handling", Boolean.FALSE); + this.s4jClientConfFileName = cfg.get("config"); + this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(cfg.get("session_mode")); + this.s4JClientConf = new S4JClientConf(pulsarSvcUrl, webSvcUrl, s4jClientConfFileName); + + this.initializeSpace(s4JClientConf); + } + + @Override + public void close() { + shutdownSpace(); + } + + public static NBConfigModel getConfigModel() { + return ConfigModel.of(S4JSpace.class) + .add(Param.defaultTo("service_url", "pulsar://localhost:6650") + .setDescription("Pulsar broker service URL.")) + .add(Param.defaultTo("web_url", "http://localhost:8080") + .setDescription("Pulsar web service URL.")) + .add(Param.defaultTo("config", "config.properties") + .setDescription("Pulsar client connection configuration property file.")) + .add(Param.defaultTo("num_conn", 1) + .setDescription("Number of JMS connections")) + .add(Param.defaultTo("num_session", 1) + .setDescription("Number of JMS sessions per JMS connection")) + .add(Param.defaultTo("max_s4jop_time", 0) + .setDescription("Maximum time (in seconds) to run NB S4J testing scenario.")) + .add(Param.defaultTo("track_msg_cnt", false) + .setDescription("Whether to keep track of message count(s)")) + .add(Param.defaultTo("session_mode", "") + .setDescription("JMS session mode")) + .add(Param.defaultTo("strict_msg_error_handling", false) + .setDescription("Whether to do strict error handling which is to stop NB S4J execution.")) + .asReadOnly(); + } + + public ConcurrentHashMap getConnLvlJmsContexts() { + return connLvlJmsContexts; + } + + public ConcurrentHashMap getSessionLvlJmsContexts() { + return sessionLvlJmsContexts; + } + + public long getS4JActivityStartTimeMills() { return this.s4JActivityStartTimeMills; } + public void setS4JActivityStartTimeMills(long startTime) { this.s4JActivityStartTimeMills = startTime; } + + public long getMaxS4JOpTimeInSec() { return this.maxS4JOpTimeInSec; } + + public int getSessionMode() { return sessionMode; } + + public String getS4jClientConfFileName() { return s4jClientConfFileName; } + public S4JClientConf getS4JClientConf() { return s4JClientConf; } + + public boolean isTrackingMsgRecvCnt() { return trackingMsgRecvCnt; } + + public int getMaxNumSessionPerConn() { return this.maxNumSessionPerConn; } + public int getMaxNumConn() { return this.maxNumConn; } + + public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; } + + public int getTxnBatchTrackingCnt() { return txnBatchTrackingCnt.get(); } + public void incTxnBatchTrackingCnt() { + int curVal = getTxnBatchTrackingCnt(); + txnBatchTrackingCnt.set(curVal + 1); + } + + public long getTotalOpResponseCnt() { return totalOpResponseCnt.get();} + public long incTotalOpResponseCnt() { return totalOpResponseCnt.incrementAndGet();} + public void resetTotalOpResponseCnt() { totalOpResponseCnt.set(0); } + + public long getTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.get();} + public void resetTotalNullMsgRecvdCnt() { nullMsgRecvCnt.set(0); } + + public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); } + + public PulsarConnectionFactory getS4jConnFactory() { return s4jConnFactory; } + + public long getTotalCycleNum() { return totalCycleNum; } + public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; } + + public void initializeSpace(S4JClientConf s4JClientConnInfo) { + if (s4jConnFactory == null) { + Map cfgMap; + try { + cfgMap = s4JClientConnInfo.getS4jConfObjMap(); + s4jConnFactory = new PulsarConnectionFactory(cfgMap); + + for (int i=0; i { + if (logger.isDebugEnabled()) { + logger.error("onException::Unexpected JMS error happened:" + e); + } + }); + + connLvlJmsContexts.put(connLvlJmsConnContextIdStr, jmsConnContext); + + if (logger.isDebugEnabled()) { + logger.debug("[Connection level JMSContext] {} -- {}", + Thread.currentThread().getName(), + jmsConnContext ); + } + } + } + catch (JMSRuntimeException e) { + if (logger.isDebugEnabled()) { + logger.debug("[ERROR] Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString()); + } + throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause()); + } + catch (Exception e) { + e.printStackTrace(); + } + } + } + + public void shutdownSpace() { + long shutdownStartTimeMills = System.currentTimeMillis(); + + try { + waitUntilAllOpFinished(shutdownStartTimeMills); + + this.txnBatchTrackingCnt.remove(); + + for (S4JJMSContextWrapper s4JJMSContextWrapper : sessionLvlJmsContexts.values()) { + if (s4JJMSContextWrapper != null) { + if (s4JJMSContextWrapper.isTransactedMode()) { + s4JJMSContextWrapper.getJmsContext().rollback(); + } + s4JJMSContextWrapper.close(); + } + } + + for (JMSContext jmsContext : connLvlJmsContexts.values()) { + if (jmsContext != null) jmsContext.close(); + } + + s4jConnFactory.close(); + } + catch (Exception e) { + e.printStackTrace(); + throw new S4JAdapterUnexpectedException("Unexpected error when shutting down NB S4J space."); + } + } + + // When completing NB execution, don't shut down right away because otherwise, async operation processing may fail. + // Instead, shut down when either one of the following condition is satisfied + // 1) the total number of the received operation response is the same as the total number of operations being executed; + // 2) time has passed for 10 seconds + private void waitUntilAllOpFinished(long shutdownStartTimeMills) { + long totalCycleNum = getTotalCycleNum(); + long totalResponseCnt = 0; + long totalNullMsgCnt = 0; + long timeElapsedMills; + + boolean trackingMsgCnt = isTrackingMsgRecvCnt(); + boolean continueChk; + + do { + S4JAdapterUtil.pauseCurThreadExec(1); + + long curTimeMills = System.currentTimeMillis(); + timeElapsedMills = curTimeMills - shutdownStartTimeMills; + continueChk = (timeElapsedMills <= 10000); + + if (trackingMsgCnt) { + totalResponseCnt = this.getTotalOpResponseCnt(); + totalNullMsgCnt = this.getTotalNullMsgRecvdCnt(); + continueChk = continueChk && (totalResponseCnt < totalCycleNum); + } + + if (logger.isTraceEnabled()) { + logger.trace( + buildExecSummaryString(trackingMsgCnt, timeElapsedMills, totalResponseCnt, totalNullMsgCnt)); + } + } while (continueChk); + + logger.info( + buildExecSummaryString(trackingMsgCnt, timeElapsedMills, totalResponseCnt, totalNullMsgCnt)); + } + + private String buildExecSummaryString( + boolean trackingMsgCnt, + long timeElapsedMills, + long totalResponseCnt, + long totalNullMsgCnt) + { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder + .append("shutdownSpace::waitUntilAllOpFinished -- ") + .append("shutdown time elapsed: ").append(timeElapsedMills).append("ms; "); + + if (trackingMsgCnt) { + stringBuilder.append("response received: ").append(totalResponseCnt).append("; "); + stringBuilder.append("null msg received: ").append(totalNullMsgCnt).append("; "); + } + + return stringBuilder.toString(); + } + + public void processMsgAck(JMSContext jmsContext, Message message, float msgAckRatio, int slowAckInSec) throws JMSException { + int jmsSessionMode = jmsContext.getSessionMode(); + + if ((jmsSessionMode != Session.AUTO_ACKNOWLEDGE) && + (jmsSessionMode != Session.SESSION_TRANSACTED)) { + float rndVal = RandomUtils.nextFloat(0, 1); + if (rndVal < msgAckRatio) { + S4JAdapterUtil.pauseCurThreadExec(slowAckInSec); + message.acknowledge(); + } + } + } + + public String getConnLvlJmsContextIdentifier(int jmsConnSeqNum) { + return S4JAdapterUtil.buildCacheKey( + this.spaceName, + StringUtils.join("conn-", jmsConnSeqNum)); + } + + public String getSessionLvlJmsContextIdentifier(int jmsConnSeqNum, int jmsSessionSeqNum) { + return S4JAdapterUtil.buildCacheKey( + this.spaceName, + StringUtils.join("conn-", jmsConnSeqNum), + StringUtils.join("session-", jmsSessionSeqNum)); + } + + // Create JMSContext that represents a new JMS connection + public JMSContext getOrCreateConnLvlJMSContext( + PulsarConnectionFactory s4jConnFactory, + S4JClientConf s4JClientConf, + int sessionMode) + { + boolean useCredentialsEnable = S4JAdapterUtil.isUseCredentialsEnabled(s4JClientConf); + JMSContext jmsConnContext; + + if (!useCredentialsEnable) + jmsConnContext = s4jConnFactory.createContext(sessionMode); + else { + String userName = S4JAdapterUtil.getCredentialUserName(s4JClientConf); + String passWord = S4JAdapterUtil.getCredentialPassword(s4JClientConf); + + // Password must be in "token:" format + if (! StringUtils.startsWith(passWord, "token:")) { + throw new S4JAdapterInvalidParamException( + "When 'jms.useCredentialsFromCreateConnection' is enabled, " + + "the provided password must be in format 'token: "); + } + + jmsConnContext = s4jConnFactory.createContext(userName, passWord, sessionMode); + } + + return jmsConnContext; + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageConsumerOpDispenser.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageConsumerOpDispenser.java new file mode 100644 index 000000000..b71e9ad4a --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageConsumerOpDispenser.java @@ -0,0 +1,162 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j.dispensers; + +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.ops.MessageConsumerOp; +import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil; +import io.nosqlbench.adapter.s4j.util.S4JJMSContextWrapper; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; +import java.util.HashMap; +import java.util.Map; +import java.util.function.LongFunction; + +public class MessageConsumerOpDispenser extends S4JBaseOpDispenser { + + private final static Logger logger = LogManager.getLogger("MessageConsumerOpDispenser"); + + // Doc-level parameter: blocking_msg_recv (default: false) + protected final boolean blockingMsgRecv; + // Doc-level parameter: shared_topic (default: false) + // - only applicable to Topic as the destination type + protected final boolean sharedTopic; + // Doc-level parameter: durable_topic (default: false) + // - only applicable to Topic as the destination type + protected final boolean durableTopic; + // default value: false + private final boolean noLocal; + // default value: 0 + // value <= 0 : no timeout + private final int readTimeout; + // default value: false + private final boolean recvNoWait; + // default value: 1.0 (all received messages are acknowledged) + // value must be between 0 and 1 (inclusive) + private final float msgAckRatio; + // default value: 0 + // value <= 0 : no slow message ack + private final int slowAckInSec; + private final LongFunction subNameStrFunc; + private final LongFunction localMsgSelectorFunc; + + // Generally the consumer related configurations can be set in the global "config.properties" file, + // which can be applied to many testing scenarios. + // Setting them here will allow scenario-specific customer configurations. At the moment, only the + // DLT related settings are supported + private final Map combinedConsumerConfigObjMap = new HashMap<>(); + + + public MessageConsumerOpDispenser(DriverAdapter adapter, + ParsedOp op, + LongFunction tgtNameFunc, + S4JSpace s4jSpace) { + super(adapter, op, tgtNameFunc, s4jSpace); + + this.blockingMsgRecv = + parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.BLOCKING_MSG_RECV.label, Boolean.FALSE); + this.sharedTopic = + parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.SHARED_TOPIC.label, Boolean.FALSE); + this.durableTopic = + parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.DURABLE_TOPIC.label, Boolean.FALSE); + this.noLocal = + parsedOp.getStaticConfigOr("no_local", Boolean.FALSE); + this.readTimeout = + parsedOp.getStaticConfigOr("read_timeout", Integer.valueOf(0)); + this.recvNoWait = + parsedOp.getStaticConfigOr("no_wait", Boolean.FALSE); + this.msgAckRatio = + parsedOp.getStaticConfigOr("msg_ack_ratio", Float.valueOf(1.0f)); + this.slowAckInSec = + parsedOp.getStaticConfigOr("slow_ack_in_sec", Integer.valueOf(0)); + this.subNameStrFunc = + lookupMandtoryStrOpValueFunc("subscription_name"); + this.localMsgSelectorFunc = + lookupOptionalStrOpValueFunc("msg_selector"); + + String[] stmtLvlConsumerConfKeyNameList = { + "consumer.ackTimeoutMillis", + "consumer.deadLetterPolicy", + "consumer.negativeAckRedeliveryBackoff", + "consumer.ackTimeoutRedeliveryBackoff"}; + HashMap stmtLvlConsumerConfRawMap = new HashMap<>(); + for (String confKey : stmtLvlConsumerConfKeyNameList ) { + String confVal = parsedOp.getStaticConfigOr(confKey, ""); + stmtLvlConsumerConfRawMap.put( + StringUtils.substringAfter(confKey, "consumer."), + confVal); + } + + this.combinedConsumerConfigObjMap.putAll( + s4jSpace.getS4JClientConf().mergeExtraConsumerConfig(stmtLvlConsumerConfRawMap)); + } + + @Override + public MessageConsumerOp apply(long cycle) { + S4JJMSContextWrapper s4JJMSContextWrapper = + getOrCreateS4jJmsContextWrapper(cycle, this.combinedConsumerConfigObjMap); + JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); + boolean commitTransact = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle); + + Destination destination; + try { + destination = getOrCreateJmsDestination( + s4JJMSContextWrapper, temporaryDest, destType, destNameStrFunc.apply(cycle)); + } + catch (JMSRuntimeException jmsRuntimeException) { + throw new RuntimeException("Unable to create the JMS destination!"); + } + + JMSConsumer jmsConsumer; + try { + jmsConsumer = getOrCreateJmsConsumer( + s4JJMSContextWrapper, + destination, + destType, + subNameStrFunc.apply(cycle), + localMsgSelectorFunc.apply(cycle), + msgAckRatio, + noLocal, + durableTopic, + sharedTopic, + asyncAPI, + slowAckInSec); + } + catch (JMSException jmsException) { + throw new RuntimeException("Unable to create the JMS consumer!"); + } + + return new MessageConsumerOp( + s4jAdapterMetrics, + s4jSpace, + jmsContext, + destination, + asyncAPI, + commitTransact, + jmsConsumer, + blockingMsgRecv, + msgAckRatio, + readTimeout, + recvNoWait, + slowAckInSec); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java new file mode 100644 index 000000000..8c2e5e1b7 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java @@ -0,0 +1,358 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j.dispensers; + +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException; +import io.nosqlbench.adapter.s4j.ops.MessageProducerOp; +import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil; +import io.nosqlbench.adapter.s4j.util.S4JJMSContextWrapper; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.LongFunction; +public class MessageProducerOpDispenser extends S4JBaseOpDispenser { + + private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser"); + + public static final String MSG_HEADER_OP_PARAM = "msg_header"; + public static final String MSG_PROP_OP_PARAM = "msg_property"; + public static final String MSG_BODY_OP_PARAM = "msg_body"; + public static final String MSG_TYPE_OP_PARAM = "msg_type"; + + private final LongFunction msgHeaderRawJsonStrFunc; + private final LongFunction msgPropRawJsonStrFunc; + private final LongFunction msgBodyRawJsonStrFunc; + private final LongFunction msgTypeFunc; + + public MessageProducerOpDispenser(DriverAdapter adapter, + ParsedOp op, + LongFunction tgtNameFunc, + S4JSpace s4jSpace) { + super(adapter, op, tgtNameFunc, s4jSpace); + + this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); + this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM); + this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM); + this.msgTypeFunc = lookupMandtoryStrOpValueFunc(MSG_TYPE_OP_PARAM); + } + + private Message createAndSetMessagePayload( + S4JJMSContextWrapper s4JJMSContextWrapper, + String msgType, String msgBodyRawJsonStr) throws JMSException + { + Message message; + int messageSize = 0; + + JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); + + if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label)) { + message = jmsContext.createTextMessage(); + ((TextMessage) message).setText(msgBodyRawJsonStr); + messageSize = msgBodyRawJsonStr.length(); + } else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.MAP.label)) { + message = jmsContext.createMapMessage(); + + // The message body json string must be in the format of a collection of key/value pairs + // Otherwise, it is an error + Map jmsMsgBodyMap; + try { + jmsMsgBodyMap = S4JAdapterUtil.convertJsonToMap(msgBodyRawJsonStr); + } catch (Exception e) { + throw new RuntimeException("The specified message payload can't be converted to a map when requiring a 'Map' message type!"); + } + + for (String key : jmsMsgBodyMap.keySet()) { + String value = jmsMsgBodyMap.get(key); + ((MapMessage)message).setString(key, value); + messageSize += key.length(); + messageSize += value.length(); + } + } else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.STREAM.label)) { + message = jmsContext.createStreamMessage(); + + // The message body json string must be in the format of a list of objects + // Otherwise, it is an error + List jmsMsgBodyObjList; + try { + jmsMsgBodyObjList = S4JAdapterUtil.convertJsonToObjList(msgBodyRawJsonStr); + } catch (Exception e) { + throw new RuntimeException("The specified message payload can't be converted to a list of Objects when requiring a 'Stream' message type!"); + } + + for (Object obj : jmsMsgBodyObjList) { + ((StreamMessage)message).writeObject(obj); + messageSize += ((String)obj).length(); + } + } else if (StringUtils.equalsIgnoreCase(msgType, S4JAdapterUtil.JMS_MESSAGE_TYPES.OBJECT.label)) { + message = jmsContext.createObjectMessage(); + ((ObjectMessage) message).setObject(msgBodyRawJsonStr); + messageSize += msgBodyRawJsonStr.getBytes().length; + } + // default: BYTE message type + else { + message = jmsContext.createBytesMessage(); + byte[] msgBytePayload = msgBodyRawJsonStr.getBytes(); + ((BytesMessage)message).writeBytes(msgBytePayload); + messageSize += msgBytePayload.length; + } + + message.setStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize)); + + return message; + } + + private Message updateMessageHeaders(S4JJMSContextWrapper s4JJMSContextWrapper, Message message, String msgType, String msgHeaderRawJsonStr) throws JMSException { + int messageSize = Integer.parseInt(message.getStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP)); + + // Check if msgHeaderRawJsonStr is a valid JSON string with a collection of key/value pairs + // - if Yes, convert it to a map + // - otherwise, log an error message and ignore message headers without throwing a runtime exception + Map jmsMsgHeaders = new HashMap<>(); + if (!StringUtils.isBlank(msgHeaderRawJsonStr)) { + try { + jmsMsgHeaders = S4JAdapterUtil.convertJsonToMap(msgHeaderRawJsonStr); + } catch (Exception e) { + logger.warn( + "Error parsing message header JSON string {}, ignore message headers!", + msgHeaderRawJsonStr); + } + } + // make sure the actual message type is used + jmsMsgHeaders.put(S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSType.label, msgType); + + Message outMessage = message; + for (String msgHeaderKey:jmsMsgHeaders.keySet()) { + // Ignore non-standard message headers + if (S4JAdapterUtil.isValidStdJmsMsgHeader(msgHeaderKey)) { + String value = jmsMsgHeaders.get(msgHeaderKey); + messageSize += msgHeaderKey.length(); + if (value != null) { + messageSize += value.length(); + } + + try { + if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSType.label)) { + outMessage.setJMSType(msgType); + } else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSPriority.label)) { + if (value != null) outMessage.setJMSPriority(Integer.parseInt(value)); + } else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSDeliveryMode.label)) { + if (value != null) outMessage.setJMSDeliveryMode(Integer.parseInt(value)); + } else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSExpiration.label)) { + // TODO: convert from a Date/Time string to the required long value + if (value != null) outMessage.setJMSExpiration(Long.parseLong(value)); + } else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSCorrelationID.label)) { + if (value != null) outMessage.setJMSCorrelationID(value); + } else if (StringUtils.equalsIgnoreCase(msgHeaderKey, S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSReplyTo.label)) { + // 'JMSReplyTo' value format: "[topic|queue]:" + if (value != null) { + String destType = StringUtils.substringBefore(value, ':'); + String destName = StringUtils.substringAfter(value, ':'); + outMessage.setJMSReplyTo(getOrCreateJmsDestination(s4JJMSContextWrapper,false, destType, destName)); + } + } + // Ignore these headers - handled by S4J API automatically + /* else if (StringUtils.equalsAnyIgnoreCase(msgHeaderKey, + S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSDestination.label, + S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSMessageID.label, + S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSTimestamp.label, + S4JAdapterUtil.JMS_MSG_HEADER_STD.JMSRedelivered.label + )) { + }*/ + } catch (NumberFormatException nfe) { + logger.warn("Incorrect value format ('{}') for the message header field ('{}')!", + value, msgHeaderKey); + } + } + } + + outMessage.setStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize)); + + return outMessage; + } + + private Message updateMessageProperties(Message message, String msgPropertyRawJsonStr) throws JMSException { + int messageSize = Integer.parseInt(message.getStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP)); + + // Check if jmsMsgPropertyRawJsonStr is a valid JSON string with a collection of key/value pairs + // - if Yes, convert it to a map + // - otherwise, log an error message and ignore message headers without throwing a runtime exception + Map jmsMsgProperties = new HashMap<>(); + if (!StringUtils.isBlank(msgPropertyRawJsonStr)) { + try { + jmsMsgProperties = S4JAdapterUtil.convertJsonToMap(msgPropertyRawJsonStr); + } catch (Exception e) { + logger.warn( + "Error parsing message property JSON string {}, ignore message properties!", + msgPropertyRawJsonStr); + } + } + + // Each key in the property json file may include value type information, such as: + // - key(string): value + // The above format specifies a message property that has "key" as the property key + // and "value" as the property value; and the type of the property value is "string" + // + // If the value type is not specified, use "string" as the default value type. + for (Map.Entry entry : jmsMsgProperties.entrySet()) { + String rawKeyStr = entry.getKey(); + String value = entry.getValue(); + + if (! StringUtils.isAnyBlank(rawKeyStr, value)) { + String key = rawKeyStr; + String valueType = S4JAdapterUtil.JMS_MSG_PROP_TYPES.STRING.label; + + if (StringUtils.contains(rawKeyStr, '(')) { + key = StringUtils.substringBefore(rawKeyStr, "(").trim(); + valueType = StringUtils.substringAfter(rawKeyStr, "("); + valueType = StringUtils.substringBefore(valueType, ")").trim(); + } + + if (StringUtils.isBlank(valueType)) { + message.setStringProperty(entry.getKey(), value); + } + else { + if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.SHORT.label)) + message.setShortProperty(key, NumberUtils.toShort(value)); + else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.INT.label)) + message.setIntProperty(key, NumberUtils.toInt(value)); + else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.LONG.label)) + message.setLongProperty(key, NumberUtils.toLong(value)); + else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.FLOAT.label)) + message.setFloatProperty(key, NumberUtils.toFloat(value)); + else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.DOUBLE.label)) + message.setDoubleProperty(key, NumberUtils.toDouble(value)); + else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.BOOLEAN.label)) + message.setBooleanProperty(key, BooleanUtils.toBoolean(value)); + else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.STRING.label)) + message.setStringProperty(key, value); + else if (StringUtils.equalsIgnoreCase(valueType, S4JAdapterUtil.JMS_MSG_PROP_TYPES.BYTE.label)) + message.setByteProperty(key, NumberUtils.toByte(value)); + else + throw new S4JAdapterInvalidParamException( + "Unsupported JMS message property value type (\"" + valueType + "\"). " + + "Value types are: \"" + S4JAdapterUtil.getValidJmsMsgPropTypeList() + "\""); + } + + messageSize += key.length(); + messageSize += value.length(); + } + } + + message.setStringProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP, String.valueOf(messageSize)); + + return message; + } + + @Override + public MessageProducerOp apply(long cycle) { + String destName = destNameStrFunc.apply(cycle); + String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle); + String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle); + String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle); + + if (StringUtils.isBlank(jmsMsgBodyRawJsonStr)) { + throw new S4JAdapterInvalidParamException("Message payload must be specified and can't be empty!"); + } + + S4JJMSContextWrapper s4JJMSContextWrapper = getOrCreateS4jJmsContextWrapper(cycle); + JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); + boolean commitTransaction = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle); + + Destination destination; + try { + destination = getOrCreateJmsDestination(s4JJMSContextWrapper, temporaryDest, destType, destName); + } + catch (JMSRuntimeException jmsRuntimeException) { + throw new S4JAdapterUnexpectedException("Unable to create the JMS destination!"); + } + + JMSProducer producer; + try { + producer = getOrCreateJmsProducer(s4JJMSContextWrapper, asyncAPI); + } + catch (JMSException jmsException) { + throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!"); + } + + // Get the right JMS message type + String jmsMsgType = msgTypeFunc.apply(cycle); + if (! S4JAdapterUtil.isValidJmsMessageType(jmsMsgType) ) { + logger.warn( + "The specified JMS message type {} is not valid, use the default TextMessage type!", + jmsMsgType); + jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label; + } + + + ///////////// + // Set proper message payload based on the message type and the specified input + // ----------------------- + // + Message message; + try { + message = createAndSetMessagePayload(s4JJMSContextWrapper, jmsMsgType, jmsMsgBodyRawJsonStr); + } + catch (JMSException jmsException) { + throw new RuntimeException("Failed to set create a JMS message and set its payload!"); + } + + ///////////// + // Set standard message headers + // ----------------------- + // + try { + message = updateMessageHeaders(s4JJMSContextWrapper, message, jmsMsgType, jmsMsgHeaderRawJsonStr); + } + catch (JMSException jmsException) { + throw new S4JAdapterUnexpectedException("Failed to set create a JMS message and set its payload!"); + } + + ///////////// + // Set defined JMS message properties and other custom properties + // ----------------------- + // + try { + message = updateMessageProperties(message, jmsMsgPropertyRawJsonStr); + // for testing purpose + message.setLongProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP, cycle); + } + catch (JMSException jmsException) { + throw new S4JAdapterUnexpectedException("Failed to set JMS message properties!"); + } + + return new MessageProducerOp( + s4jAdapterMetrics, + s4jSpace, + jmsContext, + destination, + asyncAPI, + commitTransaction, + producer, + message); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java new file mode 100644 index 000000000..f49fda8aa --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/S4JBaseOpDispenser.java @@ -0,0 +1,395 @@ +package io.nosqlbench.adapter.s4j.dispensers; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; +import com.datastax.oss.pulsar.jms.PulsarJMSContext; +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.ops.S4JOp; +import io.nosqlbench.adapter.s4j.util.*; +import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; +import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.engine.api.templating.ParsedOp; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.LongFunction; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +public abstract class S4JBaseOpDispenser extends BaseOpDispenser { + + private final static Logger logger = LogManager.getLogger("PulsarBaseOpDispenser"); + + protected final ParsedOp parsedOp; + protected final S4JSpace s4jSpace; + protected final S4JAdapterMetrics s4jAdapterMetrics; + + private final ConcurrentHashMap connLvlJmsContexts = new ConcurrentHashMap<>(); + private final ConcurrentHashMap sessionLvlJmsContexts = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap jmsProducers = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap jmsConsumers = new ConcurrentHashMap<>(); + + // Doc-level parameter: temporary_dest (default: false) + protected final boolean temporaryDest; + // Doc-level parameter: dest_type (default: Topic) + protected final String destType; + // Doc-level parameter: async_api (default: true) + protected final boolean asyncAPI; + // Doc-level parameter: txn_batch_num (default: 0) + // - value <=0 : no transaction + protected final int txnBatchNum; + + protected final LongFunction destNameStrFunc; + + protected final int totalThreadNum; + protected final long totalCycleNum; + + public S4JBaseOpDispenser(DriverAdapter adapter, + ParsedOp op, + LongFunction destNameStrFunc, + S4JSpace s4jSpace) { + + super(adapter, op); + + this.parsedOp = op; + this.s4jSpace = s4jSpace; + this.connLvlJmsContexts.putAll(s4jSpace.getConnLvlJmsContexts()); + this.sessionLvlJmsContexts.putAll(s4jSpace.getSessionLvlJmsContexts()); + + String defaultMetricsPrefix = getDefaultMetricsPrefix(this.parsedOp); + this.s4jAdapterMetrics = new S4JAdapterMetrics(defaultMetricsPrefix); + s4jAdapterMetrics.initS4JAdapterInstrumentation(); + + this.destNameStrFunc = destNameStrFunc; + this.temporaryDest = + parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TEMP_DEST.label, Boolean.FALSE); + this.destType = + parsedOp.getStaticConfig(S4JAdapterUtil.DOC_LEVEL_PARAMS.DEST_TYPE.label, String.class); + this.asyncAPI = + parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, Boolean.TRUE); + this.txnBatchNum = + parsedOp.getStaticConfigOr(S4JAdapterUtil.DOC_LEVEL_PARAMS.TXN_BATCH_NUM.label, Integer.valueOf(0)); + + this.totalThreadNum = NumberUtils.toInt(parsedOp.getStaticValue("threads")); + this.totalCycleNum = NumberUtils.toLong(parsedOp.getStaticValue("cycles")); + s4jSpace.setTotalCycleNum(totalCycleNum); + } + + public S4JSpace getS4jSpace() { return s4jSpace; } + public S4JAdapterMetrics getS4jAdapterMetrics() { return s4jAdapterMetrics; } + + protected LongFunction lookupStaticBoolConfigValueFunc(String paramName, boolean defaultValue) { + LongFunction booleanLongFunction; + booleanLongFunction = (l) -> parsedOp.getOptionalStaticConfig(paramName, String.class) + .filter(Predicate.not(String::isEmpty)) + .map(value -> BooleanUtils.toBoolean(value)) + .orElse(defaultValue); + logger.info("{}: {}", paramName, booleanLongFunction.apply(0)); + return booleanLongFunction; + } + + protected LongFunction> lookupStaticStrSetOpValueFunc(String paramName) { + LongFunction> setStringLongFunction; + setStringLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class) + .filter(Predicate.not(String::isEmpty)) + .map(value -> { + Set set = new HashSet<>(); + + if (StringUtils.contains(value,',')) { + set = Arrays.stream(value.split(",")) + .map(String::trim) + .filter(Predicate.not(String::isEmpty)) + .collect(Collectors.toCollection(LinkedHashSet::new)); + } + + return set; + }).orElse(Collections.emptySet()); + logger.info("{}: {}", paramName, setStringLongFunction.apply(0)); + return setStringLongFunction; + } + + // If the corresponding Op parameter is not provided, use the specified default value + protected LongFunction lookupStaticIntOpValueFunc(String paramName, int defaultValue) { + LongFunction integerLongFunction; + integerLongFunction = (l) -> parsedOp.getOptionalStaticValue(paramName, String.class) + .filter(Predicate.not(String::isEmpty)) + .map(value -> NumberUtils.toInt(value)) + .map(value -> { + if (value < 0) return 0; + else return value; + }).orElse(defaultValue); + logger.info("{}: {}", paramName, integerLongFunction.apply(0)); + return integerLongFunction; + } + + // If the corresponding Op parameter is not provided, use the specified default value + protected LongFunction lookupOptionalStrOpValueFunc(String paramName, String defaultValue) { + LongFunction stringLongFunction; + stringLongFunction = parsedOp.getAsOptionalFunction(paramName, String.class) + .orElse((l) -> defaultValue); + logger.info("{}: {}", paramName, stringLongFunction.apply(0)); + + return stringLongFunction; + } + protected LongFunction lookupOptionalStrOpValueFunc(String paramName) { + return lookupOptionalStrOpValueFunc(paramName, ""); + } + + // Mandatory Op parameter. Throw an error if not specified or having empty value + protected LongFunction lookupMandtoryStrOpValueFunc(String paramName) { + LongFunction stringLongFunction; + stringLongFunction = parsedOp.getAsRequiredFunction(paramName, String.class); + logger.info("{}: {}", paramName, stringLongFunction.apply(0)); + + return stringLongFunction; + } + + // Get the next JMSContext Wrapper in the following approach + // - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection): + // c0s0, c0s1, c1s0, c1s1, c2s0, c2s1 + // - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session + // When reaching the end of connection, move back to the first connection, but get the next session. + // e.g. first: c0s0 (0) + // next: c1s0 (1) + // next: c2s0 (2) + // next: c0s1 (3) + // next: c1s1 (4) + // next: c2s1 (5) + // next: c0s0 (6) <-- repeat the pattern + // next: c1s0 (7) + // next: c2s0 (8) + // next: c0s1 (9) + // ... ... + public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper( + long curCycle, + Map overrideS4jConfMap) + { + int totalConnNum = s4jSpace.getMaxNumConn(); + int totalSessionPerConnNum = s4jSpace.getMaxNumSessionPerConn(); + + int connSeqNum = (int) curCycle % totalConnNum; + int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum; + + String jmsConnContextIdStr = s4jSpace.getConnLvlJmsContextIdentifier(connSeqNum); + JMSContext connLvlJmsContext = connLvlJmsContexts.get(jmsConnContextIdStr); + // Connection level JMSContext objects should be already created during the initialization phase + assert (connLvlJmsContext != null); + + String jmsSessionContextIdStr = s4jSpace.getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum); + S4JJMSContextWrapper jmsContextWrapper = sessionLvlJmsContexts.get(jmsSessionContextIdStr); + + if (jmsContextWrapper == null) { + JMSContext jmsContext = null; + + if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) { + jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode()); + } else { + jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext( + connLvlJmsContext.getSessionMode(), overrideS4jConfMap); + } + + jmsContextWrapper = new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext); + sessionLvlJmsContexts.put(jmsSessionContextIdStr, jmsContextWrapper); + + if (logger.isDebugEnabled()) { + logger.debug("[Session level JMSContext] {} -- {}", + Thread.currentThread().getName(), + jmsContextWrapper); + } + + } + return jmsContextWrapper; + } + + public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long curCycle) { + return getOrCreateS4jJmsContextWrapper(curCycle, null); + } + + /** + * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it + */ + public Destination getOrCreateJmsDestination( + S4JJMSContextWrapper s4JJMSContextWrapper, + boolean tempDest, + String destType, + String destName) throws JMSRuntimeException + { + String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer(); + JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); + + // Regular, non-temporary destination + if (!tempDest) { + String destinationCacheKey = S4JAdapterUtil.buildCacheKey(jmsContextIdStr, destType, destName); + Destination destination = jmsDestinations.get(destinationCacheKey); + + if (destination == null) { + if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) { + destination = jmsContext.createQueue(destName); + } else { + destination = jmsContext.createTopic(destName); + } + + jmsDestinations.put(destinationCacheKey, destination); + } + + return destination; + } + // Temporary destination + else { + if (StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label)) { + return jmsContext.createTemporaryQueue(); + } else { + return jmsContext.createTemporaryTopic(); + } + } + } + + // Get simplified NB thread name + private String getSimplifiedNBThreadName(String fullThreadName) { + assert (StringUtils.isNotBlank(fullThreadName)); + + if (StringUtils.contains(fullThreadName, '/')) + return StringUtils.substringAfterLast(fullThreadName, "/"); + else + return fullThreadName; + } + + + /** + * If the JMS producer that corresponds to a destination exists, reuse it; Otherwise, create it + */ + public JMSProducer getOrCreateJmsProducer( + S4JJMSContextWrapper s4JJMSContextWrapper, + boolean asyncApi) throws JMSException + { + JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); + String producerCacheKey = S4JAdapterUtil.buildCacheKey( + getSimplifiedNBThreadName(Thread.currentThread().getName()), "producer"); + JMSProducer jmsProducer = jmsProducers.get(producerCacheKey); + + if (jmsProducer == null) { + jmsProducer = jmsContext.createProducer(); + + if (asyncApi) { + jmsProducer.setAsync(new S4JCompletionListener(s4jSpace, this)); + } + + if (logger.isDebugEnabled()) { + logger.debug("Producer created: {} -- {} -- {}", + producerCacheKey, jmsProducer, s4JJMSContextWrapper); + } + + jmsProducers.put(producerCacheKey, jmsProducer); + } + + return jmsProducer; + } + + /** + * If the JMS consumer that corresponds to a destination(, subscription, message selector) exists, reuse it; Otherwise, create it + */ + public JMSConsumer getOrCreateJmsConsumer( + S4JJMSContextWrapper s4JJMSContextWrapper, + Destination destination, + String destType, + String subName, + String msgSelector, + float msgAckRatio, + boolean nonLocal, + boolean durable, + boolean shared, + boolean asyncApi, + int slowAckInSec) throws JMSException + { + JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext(); + boolean isTopic = StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.TOPIC.label); + String consumerCacheKey = S4JAdapterUtil.buildCacheKey( + getSimplifiedNBThreadName(Thread.currentThread().getName()), "consumer"); + + JMSConsumer jmsConsumer = jmsConsumers.get(consumerCacheKey); + if (jmsConsumer == null) { + if (isTopic) { + if (!durable && !shared) + jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal); + else { + if (StringUtils.isBlank(subName)) { + throw new RuntimeException("Subscription name is required for receiving messages from a durable or shared topic!"); + } + + if (durable && !shared) + jmsConsumer = jmsContext.createDurableConsumer((Topic) destination, subName, msgSelector, nonLocal); + else if (!durable) + jmsConsumer = jmsContext.createSharedConsumer((Topic) destination, subName, msgSelector); + else + jmsConsumer = jmsContext.createSharedDurableConsumer((Topic) destination, subName, msgSelector); + } + } + else { + jmsConsumer = jmsContext.createConsumer(destination, msgSelector, nonLocal); + } + + if (asyncApi) { + jmsConsumer.setMessageListener( + new S4JMessageListener(jmsContext, s4jSpace, this, msgAckRatio, slowAckInSec)); + } + + if (logger.isDebugEnabled()) { + logger.debug("Consumer created: {} -- {} -- {}", + consumerCacheKey, jmsConsumer, s4JJMSContextWrapper); + } + + jmsConsumers.put(consumerCacheKey, jmsConsumer); + } + + return jmsConsumer; + } + + protected boolean commitTransaction(int txnBatchNum, int jmsSessionMode, long curCycleNum) { + // Whether to commit the transaction which happens when: + // - session mode is equal to "SESSION_TRANSACTED" + // - "txn_batch_num" has been reached since last reset + boolean commitTransaction = ( (Session.SESSION_TRANSACTED == jmsSessionMode) && (txnBatchNum > 0) ); + if (commitTransaction) { + int txnBatchTackingCnt = s4jSpace.getTxnBatchTrackingCnt(); + + if ( ( (txnBatchTackingCnt > 0) && ((txnBatchTackingCnt % txnBatchNum) == 0) ) || + ( curCycleNum >= (totalCycleNum - 1) ) ) { + if (logger.isDebugEnabled()) { + logger.debug("Commit transaction ({}, {}, {})", + txnBatchTackingCnt, + s4jSpace.getTotalOpResponseCnt(), curCycleNum); + } + } + else { + commitTransaction = false; + } + + s4jSpace.incTxnBatchTrackingCnt(); + } + + return !commitTransaction; + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterAsyncOperationFailedException.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterAsyncOperationFailedException.java new file mode 100644 index 000000000..70d2f7dec --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterAsyncOperationFailedException.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4j.exception; + +public class S4JAdapterAsyncOperationFailedException extends RuntimeException { + + public S4JAdapterAsyncOperationFailedException(Throwable t) { + super(t); + printStackTrace(); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterInvalidParamException.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterInvalidParamException.java new file mode 100644 index 000000000..0b4a35bc8 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterInvalidParamException.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4j.exception; + +public class S4JAdapterInvalidParamException extends RuntimeException { + + public S4JAdapterInvalidParamException(String paramName, String errDesc) { + super("Invalid setting for parameter (" + paramName + "): " + errDesc); + } + + public S4JAdapterInvalidParamException(String fullErrDesc) { + super(fullErrDesc); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterUnexpectedException.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterUnexpectedException.java new file mode 100644 index 000000000..f96ce2edb --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterUnexpectedException.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4j.exception; + +public class S4JAdapterUnexpectedException extends RuntimeException { + + public S4JAdapterUnexpectedException(String message) { + super(message); + printStackTrace(); + } + public S4JAdapterUnexpectedException(Exception e) { + super(e); + printStackTrace(); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterUnsupportedOpException.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterUnsupportedOpException.java new file mode 100644 index 000000000..7a86d66f2 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/exception/S4JAdapterUnsupportedOpException.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4j.exception; + +public class S4JAdapterUnsupportedOpException extends RuntimeException { + + public S4JAdapterUnsupportedOpException(String pulsarOpType) { + super("Unsupported Pulsar adapter operation type: \"" + pulsarOpType + "\""); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/MessageConsumerOp.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/MessageConsumerOp.java new file mode 100644 index 000000000..cdc911aa0 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/MessageConsumerOp.java @@ -0,0 +1,135 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j.ops; + +import com.codahale.metrics.Timer; +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterAsyncOperationFailedException; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException; +import io.nosqlbench.adapter.s4j.util.S4JAdapterMetrics; +import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.pulsar.shade.org.apache.avro.AvroRuntimeException; + +import javax.jms.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class MessageConsumerOp extends S4JOp { + + private final static Logger logger = LogManager.getLogger(MessageConsumerOp.class); + + private final JMSConsumer jmsConsumer; + private final boolean blockingMsgRecv; + private final float msgAckRatio; + private final long msgReadTimeout; + private final boolean recvNoWait; + private final int slowInSec; + + public MessageConsumerOp(S4JAdapterMetrics s4jAdapterMetrics, + S4JSpace s4jSpace, + JMSContext jmsContext, + Destination destination, + boolean asyncApi, + boolean commitTransact, + JMSConsumer jmsConsumer, + boolean blockingMsgRecv, + float msgAckRatio, + long readTimeout, + boolean recvNoWait, + int slowInSec) + { + super(s4jAdapterMetrics, s4jSpace, jmsContext, destination, asyncApi, commitTransact); + + this.jmsConsumer = jmsConsumer; + this.blockingMsgRecv = blockingMsgRecv; + this.msgAckRatio = msgAckRatio; + this.msgReadTimeout = readTimeout; + this.recvNoWait = recvNoWait; + this.slowInSec = slowInSec; + } + + @Override + public Object apply(long value) { + long timeElapsedMills = System.currentTimeMillis() - s4jOpStartTimeMills; + + // If maximum S4J operation duration is specified, only receive messages + // before the maximum duration threshold is reached. Otherwise, this is + // just no-op. + if ( (maxS4jOpDurationInSec == 0) || (timeElapsedMills <= (maxS4jOpDurationInSec*1000)) ) { + + // Please see S4JSpace::getOrCreateJmsConsumer() for async processing + if (!asyncApi) { + Message recvdMsg; + + try { + // blocking message receiving only applies to synchronous API + if (blockingMsgRecv) { + recvdMsg = jmsConsumer.receive(); + } else if (recvNoWait) { + recvdMsg = jmsConsumer.receiveNoWait(); + } else { + // timeout value 0 means to wait forever + recvdMsg = jmsConsumer.receive(msgReadTimeout); + } + if (this.commitTransact) jmsContext.commit(); + + if (recvdMsg != null) { + s4jSpace.processMsgAck(jmsContext, recvdMsg, msgAckRatio, slowInSec); + + byte[] recvdMsgBody = recvdMsg.getBody(byte[].class); + int messageSize = recvdMsgBody.length; + + messageSizeHistogram.update(messageSize); + + if (logger.isDebugEnabled()) { + // for testing purpose + String myMsgSeq = recvdMsg.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP); + logger.debug("Sync message receive successful - message ID {} ({}) " + , recvdMsg.getJMSMessageID(), myMsgSeq); + } + + if (s4jSpace.isTrackingMsgRecvCnt()) { + s4jSpace.incTotalOpResponseCnt(); + } + } else { + if (s4jSpace.isTrackingMsgRecvCnt()) { + s4jSpace.incTotalNullMsgRecvdCnt(); + } + } + } catch (JMSException | JMSRuntimeException e) { + S4JAdapterUtil.processMsgErrorHandling( + e, + s4jSpace.isStrictMsgErrorHandling(), + "Unexpected errors when sync receiving a JMS message."); + } + } + } + else { + if (logger.isTraceEnabled()) { + logger.trace("NB cycle number {} is no-op (maxS4jOpDurationInSec: {}, timeElapsedMills: {})", + value, maxS4jOpDurationInSec, timeElapsedMills); + } + } + + + return null; + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/MessageProducerOp.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/MessageProducerOp.java new file mode 100644 index 000000000..2624c36cf --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/MessageProducerOp.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j.ops; + + +import com.codahale.metrics.Histogram; +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.util.S4JAdapterMetrics; +import io.nosqlbench.adapter.s4j.util.S4JAdapterUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +import javax.jms.*; +import java.util.HashMap; +import java.util.Map; + + +public class MessageProducerOp extends S4JOp { + + private final static Logger logger = LogManager.getLogger("MessageProducerOp"); + + private final JMSProducer jmsProducer; + private final Message message; + + public MessageProducerOp(S4JAdapterMetrics s4jAdapterMetrics, + S4JSpace s4jSpace, + JMSContext jmsContext, + Destination destination, + boolean asyncApi, + boolean commitTransact, + JMSProducer jmsProducer, + Message message) { + super(s4jAdapterMetrics, s4jSpace, jmsContext, destination, asyncApi, commitTransact); + + this.jmsProducer = jmsProducer; + this.message = message; + } + + @Override + public Object apply(long value) { + + long timeElapsedMills = System.currentTimeMillis() - s4jOpStartTimeMills; + + // If maximum S4J operation duration is specified, only publish messages + // before the maximum duration threshold is reached. Otherwise, this is + // just no-op. + if ( (maxS4jOpDurationInSec == 0) || (timeElapsedMills <= (maxS4jOpDurationInSec*1000)) ) { + try { + jmsProducer.send(destination, message); + if (this.commitTransact) { + jmsContext.commit(); + } + + int msgSize = message.getIntProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP); + messageSizeHistogram.update(msgSize); + + // Please see s4JSpace::getOrCreateJmsProducer() for async processing + if (!asyncApi) { + if (logger.isDebugEnabled()) { + // for testing purpose + String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP); + logger.debug("Sync message sending is successful - message ID {} ({}) " + , message.getJMSMessageID(), myMsgSeq); + } + + if (s4jSpace.isTrackingMsgRecvCnt()) { + s4jSpace.incTotalOpResponseCnt(); + } + } + } catch (JMSException | JMSRuntimeException e) { + S4JAdapterUtil.processMsgErrorHandling( + e, + s4jSpace.isStrictMsgErrorHandling(), + "Unexpected errors when sync sending a JMS message."); + } + } + else { + if (logger.isTraceEnabled()) { + logger.trace("NB cycle number {} is no-op (maxS4jOpDurationInSec: {}, timeElapsedMills: {})", + value, maxS4jOpDurationInSec, timeElapsedMills); + } + } + + return null; + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/S4JOp.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/S4JOp.java new file mode 100644 index 000000000..4fee61dab --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/ops/S4JOp.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.nosqlbench.adapter.s4j.ops; + +import com.codahale.metrics.Histogram; +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.util.S4JAdapterMetrics; +import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp; + +import javax.jms.Destination; +import javax.jms.JMSContext; + +public abstract class S4JOp implements CycleOp { + protected S4JAdapterMetrics s4jAdapterMetrics; + protected final S4JSpace s4jSpace; + protected final JMSContext jmsContext; + protected final Destination destination; + protected final boolean asyncApi; + protected final boolean commitTransact; + protected final long s4jOpStartTimeMills; + protected final long maxS4jOpDurationInSec; + protected final Histogram messageSizeHistogram; + + + public S4JOp( + S4JAdapterMetrics s4jAdapterMetrics, + S4JSpace s4jSpace, + JMSContext jmsContext, + Destination destination, + boolean asyncApi, + boolean commitTransact) + { + this.s4jAdapterMetrics = s4jAdapterMetrics; + this.s4jSpace = s4jSpace; + this.jmsContext = jmsContext; + this.destination = destination; + this.asyncApi = asyncApi; + this.commitTransact = commitTransact; + this.s4jOpStartTimeMills = s4jSpace.getS4JActivityStartTimeMills(); + this.maxS4jOpDurationInSec = s4jSpace.getMaxS4JOpTimeInSec(); + this.messageSizeHistogram = s4jAdapterMetrics.getMessagesizeHistogram(); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterMetrics.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterMetrics.java new file mode 100644 index 000000000..2903353a3 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterMetrics.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j.util; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; +import io.nosqlbench.api.config.NBNamedElement; +import io.nosqlbench.api.engine.metrics.ActivityMetrics; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class S4JAdapterMetrics implements NBNamedElement { + + private final static Logger logger = LogManager.getLogger("S4JAdapterMetrics"); + + private final String defaultAdapterMetricsPrefix; + + private Histogram messageSizeHistogram; + private Timer bindTimer; + private Timer executeTimer; + + public S4JAdapterMetrics(String defaultMetricsPrefix) { + this.defaultAdapterMetricsPrefix = defaultMetricsPrefix; + } + + @Override + public String getName() { + return "S4JAdapterMetrics"; + } + + public void initS4JAdapterInstrumentation() { + // Histogram metrics + this.messageSizeHistogram = + ActivityMetrics.histogram( + this, + defaultAdapterMetricsPrefix + "message_size", + ActivityMetrics.DEFAULT_HDRDIGITS); + + // Timer metrics + this.bindTimer = + ActivityMetrics.timer( + this, + defaultAdapterMetricsPrefix + "bind", + ActivityMetrics.DEFAULT_HDRDIGITS); + this.executeTimer = + ActivityMetrics.timer( + this, + defaultAdapterMetricsPrefix + "execute", + ActivityMetrics.DEFAULT_HDRDIGITS); + } + + public Timer getBindTimer() { return bindTimer; } + public Timer getExecuteTimer() { return executeTimer; } + public Histogram getMessagesizeHistogram() { return messageSizeHistogram; } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterUtil.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterUtil.java new file mode 100644 index 000000000..8906b2565 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JAdapterUtil.java @@ -0,0 +1,326 @@ +package io.nosqlbench.adapter.s4j.util; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.datastax.oss.pulsar.jms.PulsarJMSConstants; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.nosqlbench.adapter.s4j.S4JOpType; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.*; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class S4JAdapterUtil { + + private final static Logger logger = LogManager.getLogger(S4JAdapterUtil.class); + + /////// + // Valid document level parameters for JMS NB yaml file + public final static String JMS_SPEC_VER_12 = "1.2"; + public final static String JMS_SPEC_VER_20 = "2.0"; + public enum DOC_LEVEL_PARAMS { + + // Temporary destination + TEMP_DEST("temporary_dest"), + // JMS destination type - topic or queue + // String value + // - valid values: see JMS_DEST_TYPES + DEST_TYPE("dest_type"), + // JMS destination name + // String value + DEST_NAME("dest_name"), + // Asynchronous message processing + ASYNC_API("async_api"), + // Transaction batch size + // - Only relevant when session mode is SESSION_TRANSACTED + TXN_BATCH_NUM("txn_batch_num"), + // Whether to use blocking message receiving as the default behavior + BLOCKING_MSG_RECV("blocking_msg_recv"), + // Whether the destination is a shared topic + SHARED_TOPIC("shared_topic"), + // Whether the destination is a durable topic + DURABLE_TOPIC("durable_topic"); + + public final String label; + + DOC_LEVEL_PARAMS(String label) { + this.label = label; + } + } + public static boolean isValidDocLevelParam(String param) { + return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param)); + } + public static String getValidDocLevelParamList() { + return Arrays.stream(DOC_LEVEL_PARAMS.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + + // JMS Destination Types + public enum JMS_DEST_TYPES { + QUEUE("queue"), + TOPIC("topic"); + + public final String label; + JMS_DEST_TYPES(String label) { + this.label = label; + } + } + public static boolean isValidJmsDestType(String type) { + return Arrays.stream(JMS_DEST_TYPES.values()).anyMatch(t -> t.label.equals(type)); + } + public static String getValidJmsDestTypeList() { + return Arrays.stream(JMS_DEST_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + + // Standard JMS message headers (by JMS specification) + public enum JMS_MSG_HEADER_STD { + JMSDestination("JMSDestination"), + JMSDeliveryMode("JMSDeliveryMode"), + JMSMessageID("JMSMessageID"), + JMSTimestamp("JMSTimestamp"), + JMSRedelivered("JMSRedelivered"), + JMSExpiration("JMSExpiration"), + JMSCorrelationID("JMSCorrelationID"), + JMSType("JMSType"), + JMSReplyTo("JMSReplyTo"), + JMSPriority("JMSPriority"); + + public final String label; + JMS_MSG_HEADER_STD(String label) { + this.label = label; + } + } + public static boolean isValidStdJmsMsgHeader(String header) { + return Arrays.stream(JMS_MSG_HEADER_STD.values()).anyMatch(t -> t.label.equals(header)); + } + public static String getValidStdJmsMsgHeaderList() { + return Arrays.stream(JMS_MSG_HEADER_STD.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + + // JMS defined message properties (by JMS specification) + public enum JMS_DEFINED_MSG_PROPERTY { + JMSDestination("JMSDestination"), + JMSDeliveryMode("JMSDeliveryMode"), + JMSMessageID("JMSMessageID"), + JMSTimestamp("JMSTimestamp"), + JMSRedelivered("JMSRedelivered"), + JMSExpiration("JMSExpiration"), + JMSCorrelationID("JMSCorrelationID"), + JMSType("JMSType"), + JMSReplyTo("JMSReplyTo"), + JMSPriority("JMSPriority"); + + public final String label; + JMS_DEFINED_MSG_PROPERTY(String label) { + this.label = label; + } + } + public static boolean isValidJmsDfndMsgProp(String property) { + return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).anyMatch(t -> t.label.equals(property)); + } + public static String getValidJmsDfndMsgPropList() { + return Arrays.stream(JMS_DEFINED_MSG_PROPERTY.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + + public final static String NB_MSG_SEQ_PROP = "NBMsgSeqProp"; + public final static String NB_MSG_SIZE_PROP = "NBMsgSize"; + + // JMS Destination Types + public enum JMS_SESSION_MODES { + AUTO_ACK("auto_ack"), + CLIENT_ACK("client_ack"), + DUPS_OK_ACK("dups_ok_ack"), + INDIVIDUAL_ACK("individual_ack"), + TRANSACT("transact_ack"); + + public final String label; + JMS_SESSION_MODES(String label) { + this.label = label; + } + } + public static boolean isValidJmsSessionMode(String mode) { + return Arrays.stream(JMS_SESSION_MODES.values()).anyMatch(t -> t.label.equals(mode)); + } + public static String getValidJmsSessionModeList() { + return Arrays.stream(JMS_SESSION_MODES.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + + // JMS Message Types + public enum JMS_MESSAGE_TYPES { + TEXT("text"), + BYTE("byte"), + MAP("map"), + STREAM("stream"), + OBJECT("object"); + + public final String label; + JMS_MESSAGE_TYPES(String label) { + this.label = label; + } + } + public static boolean isValidJmsMessageType(String type) { + return Arrays.stream(JMS_MESSAGE_TYPES.values()).anyMatch(t -> t.label.equals(type)); + } + public static String getValidJmsMessageTypeList() { + return Arrays.stream(JMS_MESSAGE_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + + // JMS Message Types + public enum JMS_MSG_PROP_TYPES { + SHORT("short"), + INT("int"), + LONG("long"), + FLOAT("float"), + DOUBLE("double"), + STRING("string"), + BOOLEAN("boolean"), + BYTE("byte"); + + public final String label; + JMS_MSG_PROP_TYPES(String label) { + this.label = label; + } + } + public static boolean isValidJmsMsgPropType(String type) { + return Arrays.stream(JMS_MSG_PROP_TYPES.values()).anyMatch(t -> t.label.equals(type)); + } + public static String getValidJmsMsgPropTypeList() { + return Arrays.stream(JMS_MSG_PROP_TYPES.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + + /////// + // Convert JSON string to a key/value map + public static Map convertJsonToMap(String jsonStr) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonStr, new TypeReference>(){}); + } + + /////// + // Convert JSON string to a list of objects + public static List convertJsonToObjList(String jsonStr) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + return Arrays.asList(mapper.readValue(jsonStr, Object[].class)); + } + + /////// + // Get the destination name from the Destination object + public static String getDestinationName(Destination destination, String destType) throws JMSException { + String destName; + + boolean isTopic = StringUtils.equalsIgnoreCase(destType, JMS_DEST_TYPES.TOPIC.label); + + if (isTopic) + destName = ((Topic) destination).getTopicName(); + else + destName = ((Queue) destination).getQueueName(); + + return destName; + } + + /////// + public static int getSessionModeFromStr(String sessionModeStr) { + // default ack mode: auto_ack + int sessionMode = -1; + + if (StringUtils.isBlank(sessionModeStr)) + sessionMode = JMSContext.AUTO_ACKNOWLEDGE; + else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.AUTO_ACK.label)) + sessionMode = JMSContext.AUTO_ACKNOWLEDGE; + else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.CLIENT_ACK.label)) + sessionMode = JMSContext.CLIENT_ACKNOWLEDGE; + else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.DUPS_OK_ACK.label)) + sessionMode = JMSContext.DUPS_OK_ACKNOWLEDGE; + else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.TRANSACT.label)) + sessionMode = JMSContext.SESSION_TRANSACTED; + else if (StringUtils.equalsIgnoreCase(sessionModeStr, JMS_SESSION_MODES.INDIVIDUAL_ACK.label)) + sessionMode = PulsarJMSConstants.INDIVIDUAL_ACKNOWLEDGE; + else { + if (logger.isDebugEnabled()) { + logger.debug("Invalid session mode string \"{}\". Valid values are: {}. Use the default \"auto_ack\" mode!" + ,sessionModeStr, getValidJmsSessionModeList()); + sessionMode = JMSContext.AUTO_ACKNOWLEDGE; + } + } + + return sessionMode; + } + + public static boolean isUseCredentialsEnabled(S4JClientConf s4JClientConf) { + assert (s4JClientConf != null); + + boolean enabled = false; + Map s4jConfMap = s4JClientConf.getS4jConfObjMap(); + + if (s4jConfMap.containsKey("jms.useCredentialsFromCreateConnection")) { + enabled = BooleanUtils.toBoolean(s4jConfMap.get("jms.useCredentialsFromCreateConnection").toString()); + } + return enabled; + } + + public static String getCredentialUserName(S4JClientConf s4JClientConf) { + return "dummy"; + } + + public static String getCredentialPassword(S4JClientConf s4JClientConf) { + Map s4jConfMap = s4JClientConf.getS4jConfObjMap(); + if (s4jConfMap.containsKey("authParams")) + return s4jConfMap.get("authParams").toString(); + else + return ""; + } + + /////// + // Calculate a unique cache key from a series of input parameters + public static String buildCacheKey(String... keyParts) { + return String.join("::", keyParts); + } + + + /////// + // Pause the execution of the current thread + public static void pauseCurThreadExec(int pauseInSec) { + if (pauseInSec > 0) { + try { + Thread.sleep(pauseInSec * 1000); + } + catch (InterruptedException ie) { + ie.printStackTrace(); + } + } + } + + /////// + // Error handling for message processing + public static void processMsgErrorHandling(Exception exception, boolean strictErrorHandling, String errorMsg) { + exception.printStackTrace(); + + if (strictErrorHandling) { + throw new RuntimeException(errorMsg + " [ " + exception.getMessage() + " ]"); + } + else { + S4JAdapterUtil.pauseCurThreadExec(1); + } + } +} + diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java new file mode 100644 index 000000000..e2a62c5a1 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java @@ -0,0 +1,213 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.s4j.util; + +import org.apache.commons.configuration2.Configuration; +import org.apache.commons.configuration2.FileBasedConfiguration; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; +import org.apache.commons.configuration2.builder.fluent.Parameters; +import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +public class S4JClientConf { + private final static Logger logger = LogManager.getLogger(S4JClientConf.class); + + public static final String CLIENT_CONF_PREFIX = "client"; + public static final String PRODUCER_CONF_PREFIX = "producer"; + public static final String CONSUMER_CONF_PREFIX = "consumer"; + public static final String JMS_CONF_PREFIX = "jms"; + + + // "Raw" map is what is read from the config properties file + // "Tgt" map is what is really needed in the Pulsar producer/consumer API + private Map clientConfMapRaw = new HashMap<>(); + private Map producerConfMapRaw = new HashMap<>(); + private Map consumerConfMapRaw = new HashMap<>(); + private Map jmsConfMapRaw = new HashMap<>(); + private Map miscConfMapRaw = new HashMap<>(); + + private final Map s4jConfMapTgt = new HashMap<>(); + private Map clientConfMapTgt = new HashMap<>(); + private Map producerConfMapTgt = new HashMap<>(); + + private Map consumerConfMapTgt = new HashMap<>(); + private Map jmsConfMapTgt = new HashMap<>(); + private Map miscConfMapTgt = new HashMap<>(); + + + + public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) { + + ////////////////// + // Read related Pulsar client configuration settings from a file + readRawConfFromFile(s4jConfFileName); + + + ////////////////// + // Ignores the following Pulsar client/producer/consumer configurations since + // they're either not supported in the S4J API or the property must be specified + // as the NB CLI parameter or the NB yaml file parameter. + + // <<< https://pulsar.apache.org/docs/reference-configuration/#client >>> + // pulsar client config + // * webServiceUrl + // * brokerServiceUrl + clientConfMapRaw.put("brokerServiceUrl", pulsarSvcUrl); + clientConfMapRaw.put("webServiceUrl", webSvcUrl); + + + // <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>> + // producer config + // * topicName + producerConfMapRaw.remove("topicName"); + + // <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer >>> + // consumer config + // * topicNames + // * topicsPattern + // * subscriptionName + // * subscriptionType + consumerConfMapRaw.remove("topicNames"); + consumerConfMapRaw.remove("topicPattern"); + consumerConfMapRaw.remove("subscriptionName"); + consumerConfMapRaw.remove("subscriptionType"); + consumerConfMapRaw.remove("subscriptionInitialPosition"); + consumerConfMapRaw.remove("regexSubscriptionMode"); + + + ////////////////// + // Convert the raw configuration map () to the required map () + clientConfMapTgt.putAll(S4JClientConfConverter.convertRawClientConf(clientConfMapRaw)); + producerConfMapTgt.putAll(S4JClientConfConverter.convertRawProducerConf(producerConfMapRaw)); + consumerConfMapTgt.putAll(S4JClientConfConverter.convertRawConsumerConf(consumerConfMapRaw)); + jmsConfMapTgt.putAll(S4JClientConfConverter.convertRawJmsConf(jmsConfMapRaw)); + miscConfMapTgt.putAll(S4JClientConfConverter.convertRawMiscConf(miscConfMapRaw)); + + s4jConfMapTgt.putAll(clientConfMapTgt); + s4jConfMapTgt.put("producerConfig", producerConfMapTgt); + s4jConfMapTgt.put("consumerConfig", consumerConfMapTgt); + s4jConfMapTgt.putAll(jmsConfMapTgt); + s4jConfMapTgt.putAll(miscConfMapTgt); + } + + public void readRawConfFromFile(String fileName) { + File file = new File(fileName); + + try { + String canonicalFilePath = file.getCanonicalPath(); + + Parameters params = new Parameters(); + + FileBasedConfigurationBuilder builder = + new FileBasedConfigurationBuilder(PropertiesConfiguration.class) + .configure(params.properties() + .setFileName(fileName)); + + Configuration config = builder.getConfiguration(); + + for (Iterator it = config.getKeys(); it.hasNext(); ) { + String confKey = it.next(); + String confVal = config.getProperty(confKey).toString(); + + if (!StringUtils.isBlank(confVal)) { + // Get client connection specific configuration settings, removing "client." prefix + if (StringUtils.startsWith(confKey, CLIENT_CONF_PREFIX)) { + clientConfMapRaw.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), confVal); + } + // Get producer specific configuration settings, removing "producer." prefix + else if (StringUtils.startsWith(confKey, PRODUCER_CONF_PREFIX)) { + producerConfMapRaw.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), confVal); + } + // Get consumer specific configuration settings, removing "consumer." prefix + else if (StringUtils.startsWith(confKey, CONSUMER_CONF_PREFIX)) { + consumerConfMapRaw.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), confVal); + } + // Get JMS specific configuration settings, keeping "jms." prefix + else if (StringUtils.startsWith(confKey, JMS_CONF_PREFIX)) { + jmsConfMapRaw.put(confKey, confVal); + } + // For all other configuration settings (not having any of the above prefixes), keep as is + else { + miscConfMapRaw.put(confKey, confVal); + } + } + } + } catch (IOException ioe) { + logger.error("Can't read the specified config properties file: " + fileName); + ioe.printStackTrace(); + } catch (ConfigurationException cex) { + logger.error("Error loading configuration items from the specified config properties file: " + fileName + ":" + cex.getMessage()); + cex.printStackTrace(); + } + } + + public Map getS4jConfObjMap() { return this.s4jConfMapTgt; } + public Map getS4jConfMapObj_client() { return this.clientConfMapTgt; } + public Map getS4jConfMapObj_producer() { return this.producerConfMapTgt; } + public Map getS4jConfMapObj_consumer() { return this.consumerConfMapTgt; } + public Map getS4jConfMapObj_jms() { return this.jmsConfMapTgt; } + public Map getS4jConfMapObj_misc() { return this.miscConfMapTgt; } + + private Map mergeConfigObjMaps( + Map origConfigObjMap, + Map extraConfigObjMap ) + { + Map newConfigObjMap = new HashMap<>(); + + // If there are the same settings in both "orig" and "extra" maps, + // the one in the "extra" map will take over + newConfigObjMap.putAll(origConfigObjMap); + newConfigObjMap.putAll(extraConfigObjMap); + + return newConfigObjMap; + } + public Map mergeExtraConsumerConfig( + Map extraConsumerConfigRaw) + { + if ( (extraConsumerConfigRaw == null) || (extraConsumerConfigRaw.isEmpty()) ) { + return getS4jConfObjMap(); + } + else { + Map origConsumerConfigObjMap = getS4jConfMapObj_consumer(); + Map extraConsumerConfigObjMap = + S4JClientConfConverter.convertRawConsumerConf(extraConsumerConfigRaw); + Map mergedConsumerConfigObjMap = + mergeConfigObjMaps(origConsumerConfigObjMap, extraConsumerConfigObjMap); + + Map mergedS4JConfObjMap = getS4jConfObjMap(); + mergedS4JConfObjMap.put("consumerConfig", mergedConsumerConfigObjMap); + + return mergedS4JConfObjMap; + } + } + + public String toString() { + return new ToStringBuilder(this). + append("effectiveS4jConfMap", s4jConfMapTgt). + toString(); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java new file mode 100644 index 000000000..9ba86bfb3 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConfConverter.java @@ -0,0 +1,429 @@ +package io.nosqlbench.adapter.s4j.util; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.datastax.oss.pulsar.jms.shaded.org.apache.pulsar.client.api.CompressionType; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.math.NumberUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class is used to convert the configuration items in its raw + * format (as provided in the property file) to the format needed by + * the S4J driver + */ +public class S4JClientConfConverter { + + public static Map convertRawClientConf(Map pulsarClientConfMapRaw) { + Map s4jClientConfObjMap = new HashMap<>(); + s4jClientConfObjMap.putAll(pulsarClientConfMapRaw); + + /** + * No special handling for non-primitive types + */ + + return s4jClientConfObjMap; + } + + // <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>> + private final static Map validStdProducerConfKeyTypeMap = Map.ofEntries( + Map.entry("topicName", "String"), + Map.entry("producerName","String"), + Map.entry("sendTimeoutMs","long"), + Map.entry("blockIfQueueFull","boolean"), + Map.entry("maxPendingMessages","int"), + Map.entry("maxPendingMessagesAcrossPartitions","int"), + Map.entry("messageRoutingMode","MessageRoutingMode"), + Map.entry("hashingScheme","HashingScheme"), + Map.entry("cryptoFailureAction","ProducerCryptoFailureAction"), + Map.entry("batchingMaxPublishDelayMicros","long"), + Map.entry("batchingMaxMessages","int"), + Map.entry("batchingEnabled","boolean"), + Map.entry("chunkingEnabled","boolean"), + Map.entry("compressionType","CompressionType"), + Map.entry("initialSubscriptionName","string") + ); + public static Map convertRawProducerConf(Map pulsarProducerConfMapRaw) { + Map s4jProducerConfObjMap = new HashMap<>(); + setConfObjMapForPrimitives(s4jProducerConfObjMap, pulsarProducerConfMapRaw, validStdProducerConfKeyTypeMap); + + /** + * Non-primitive type processing for Pulsar producer configuration items + */ + // "compressionType" has value type "CompressionType" + // - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY' + String confKeyName = "compressionType"; + String confVal = pulsarProducerConfMapRaw.get(confKeyName); + String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)"; + + if (StringUtils.isNotBlank(confVal)) { + if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) { + CompressionType compressionType = CompressionType.NONE; + + switch (StringUtils.upperCase(confVal)) { + case "LZ4": + compressionType = CompressionType.LZ4; + case "ZLIB": + compressionType = CompressionType.ZLIB; + case "ZSTD": + compressionType = CompressionType.ZSTD; + case "SNAPPY": + compressionType = CompressionType.SNAPPY; + } + + s4jProducerConfObjMap.put(confKeyName, compressionType); + } else { + throw new S4JAdapterInvalidParamException( + getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal)); + } + } + + // TODO: Skip the following Pulsar configuration items for now because they're not really + // needed in the NB S4J testing at the moment. Add support for them when needed. + // * messageRoutingMode + // * hashingScheme + // * cryptoFailureAction + + return s4jProducerConfObjMap; + } + + // https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer + private final static Map validStdConsumerConfKeyTypeMap = Map.ofEntries( + Map.entry("topicNames", "Set"), + Map.entry("topicsPattern","Pattern"), + Map.entry("subscriptionName","String"), + Map.entry("subscriptionType","SubscriptionType"), + Map.entry("receiverQueueSize","int"), + Map.entry("acknowledgementsGroupTimeMicros","long"), + Map.entry("negativeAckRedeliveryDelayMicros","long"), + Map.entry("maxTotalReceiverQueueSizeAcrossPartitions","int"), + Map.entry("consumerName","String"), + Map.entry("ackTimeoutMillis","long"), + Map.entry("tickDurationMillis","long"), + Map.entry("priorityLevel","int"), + Map.entry("cryptoFailureAction","ConsumerCryptoFailureAction"), + Map.entry("properties","SortedMap"), + Map.entry("readCompacted","boolean"), + Map.entry("subscriptionInitialPosition", "SubscriptionInitialPosition"), + Map.entry("patternAutoDiscoveryPeriod", "int"), + Map.entry("regexSubscriptionMode", "RegexSubscriptionMode"), + Map.entry("deadLetterPolicy", "DeadLetterPolicy"), + Map.entry("autoUpdatePartitions", "boolean"), + Map.entry("replicateSubscriptionState", "boolean"), + Map.entry("negativeAckRedeliveryBackoff", "RedeliveryBackoff"), + Map.entry("ackTimeoutRedeliveryBackoff", "RedeliveryBackoff"), + Map.entry("autoAckOldestChunkedMessageOnQueueFull", "boolean"), + Map.entry("maxPendingChunkedMessage", "int"), + Map.entry("expireTimeOfIncompleteChunkedMessageMillis", "long") + ); + public static Map convertRawConsumerConf(Map pulsarConsumerConfMapRaw) { + Map s4jConsumerConfObjMap = new HashMap<>(); + setConfObjMapForPrimitives(s4jConsumerConfObjMap, pulsarConsumerConfMapRaw, validStdConsumerConfKeyTypeMap); + + /** + * Non-primitive type processing for Pulsar consumer configuration items + */ + // The following non-primitive type configuration items are already excluded + // and don't need to be processed. + // * topicNames + // * topicPattern + // * subscriptionName + // * subscriptionType + // * subscriptionInitialPosition + // * regexSubscriptionMode + + // "properties" has value type "SortedMap" + // - expecting the value string has the format: a JSON string that includes a set of key/value pairs + String confKeyName = "properties"; + String confVal = pulsarConsumerConfMapRaw.get(confKeyName); + String expectedVal = "{\"property1\":\"value1\", \"property2\":\"value2\"}, ..."; + + ObjectMapper mapper = new ObjectMapper(); + + if (StringUtils.isNotBlank(confVal)) { + try { + Map consumerProperties = mapper.readValue(confVal, Map.class); + + // Empty map value is considered as no value + if (!consumerProperties.isEmpty()) { + s4jConsumerConfObjMap.put(confKeyName, consumerProperties); + } + + } catch (Exception e) { + throw new S4JAdapterInvalidParamException( + getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal)); + } + } + + // "deadLetterPolicy" + // - expecting the value is a JSON string has the format: + // {"maxRedeliverCount":"","deadLetterTopic":"","initialSubscriptionName":""} + confKeyName = "deadLetterPolicy"; + confVal = pulsarConsumerConfMapRaw.get(confKeyName); + expectedVal = "{" + + "\"maxRedeliverCount\":\"\"," + + "\"deadLetterTopic\":\"\"," + + "\"initialSubscriptionName\":\"\"}"; + + if (StringUtils.isNotBlank(confVal)) { + try { + Map dlqPolicyMap = mapper.readValue(confVal, Map.class); + + // Empty map value is considered as no value + if (!dlqPolicyMap.isEmpty()) { + boolean valid = true; + + // The JSON key must be one of "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName" + for (String key : dlqPolicyMap.keySet()) { + if (!StringUtils.equalsAnyIgnoreCase(key, + "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName")) { + valid = false; + break; + } + } + + // DLQ.maxRedeliverCount is mandatory + if (valid && !dlqPolicyMap.containsKey("maxRedeliverCount")) { + valid = false; + } + + String maxRedeliverCountStr = dlqPolicyMap.get("maxRedeliverCount"); + if (!NumberUtils.isCreatable(maxRedeliverCountStr)) { + valid = false; + } + + if (valid) { + // In S4J driver, DLQ setting is done via a Map + // <<< https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-implementation.html#dead-letter-policy >>> + + s4jConsumerConfObjMap.put(confKeyName, dlqPolicyMap); + } else { + throw new S4JAdapterInvalidParamException( + getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal)); + } + } + } catch (Exception e) { + throw new S4JAdapterInvalidParamException( + getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal)); + } + } + + // "negativeAckRedeliveryBackoff" or "ackTimeoutRedeliveryBackoff" + // - expecting the value is a JSON string has the format: + // {"minDelayMs":"", "maxDelayMs":"", "multiplier":""} + String[] redeliveryBackoffConfigSet = {"negativeAckRedeliveryBackoff", "ackTimeoutRedeliveryBackoff"}; + expectedVal = "{" + + "\"minDelayMs\":\"\"," + + "\"maxDelayMs\":\"\"," + + "\"multiplier\":\"\"}"; + + for (String confKey : redeliveryBackoffConfigSet) { + confVal = pulsarConsumerConfMapRaw.get(confKey); + + if (StringUtils.isNotBlank(confVal)) { + try { + Map redliveryBackoffMap = mapper.readValue(confVal, Map.class); + + // Empty map value is considered as no value + if (! redliveryBackoffMap.isEmpty()) { + boolean valid = true; + + // The JSON key must be one of "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName" + for (String key : redliveryBackoffMap.keySet()) { + if (!StringUtils.equalsAnyIgnoreCase(key, + "minDelayMs", "maxDelayMs", "multiplier")) { + valid = false; + break; + } + } + + String minDelayMsStr = redliveryBackoffMap.get("minDelayMs"); + String maxDelayMsStr = redliveryBackoffMap.get("maxDelayMs"); + String multiplierStr = redliveryBackoffMap.get("multiplier"); + + if ((StringUtils.isNotBlank(minDelayMsStr) && !NumberUtils.isCreatable(minDelayMsStr)) || + (StringUtils.isNotBlank(maxDelayMsStr) && !NumberUtils.isCreatable(maxDelayMsStr)) || + (StringUtils.isNotBlank(multiplierStr) && !NumberUtils.isCreatable(multiplierStr))) { + valid = false; + } + + if (valid) { + // In S4J driver, AckTimeOut and Negative TimeOut is done via a Map + // <<< https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-implementation.html#ack-timeout >>> + // <<< https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-implementation.html#negative-ack >>> + + s4jConsumerConfObjMap.put(confKey, redliveryBackoffMap); + + } else { + throw new S4JAdapterInvalidParamException( + getInvalidConfValStr(confKey, confVal, "consumer", expectedVal)); + } + } + + } catch (Exception e) { + throw new S4JAdapterInvalidParamException( + getInvalidConfValStr(confKey, confVal, "consumer", expectedVal)); + } + } + } + + + // TODO: Skip the following Pulsar configuration items for now because they're not really + // needed in the NB S4J testing right now. Add the support for them when needed. + // * cryptoFailureAction + + return s4jConsumerConfObjMap; + } + + // https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options + private final static Map validS4jJmsConfKeyTypeMap = Map.ofEntries( + Map.entry("jms.acknowledgeRejectedMessages", "boolean"), + Map.entry("jms.clientId","String"), + Map.entry("jms.emulateTransactions","boolean"), + Map.entry("jms.enableClientSideEmulation","boolean"), + Map.entry("jms.forceDeleteTemporaryDestinations","boolean"), + Map.entry("jms.precreateQueueSubscription","boolean"), + Map.entry("jms.queueSubscriptionName","String"), + Map.entry("jms.systemNamespace","String"), + Map.entry("jms.topicSharedSubscriptionType","String"), + Map.entry("jms.useCredentialsFromCreateConnection","boolean"), + Map.entry("jms.useExclusiveSubscriptionsForSimpleConsumers","long"), + Map.entry("jms.usePulsarAdmin","boolean"), + Map.entry("jms.useServerSideFiltering","boolean"), + Map.entry("jms.waitForServerStartupTimeout","int"), + Map.entry("jms.transactionsStickyPartitions", "boolean") + ); + public static Map convertRawJmsConf(Map s4jJmsConfMapRaw) { + Map s4jJmsConfObjMap = new HashMap<>(); + setConfObjMapForPrimitives(s4jJmsConfObjMap, s4jJmsConfMapRaw, validS4jJmsConfKeyTypeMap); + + /** + * Non-primitive type processing for Pulsar client configuration items + */ + // None + + return s4jJmsConfObjMap; + } + + // https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options + private final static Map validS4jMiscConfKeyTypeMap = Map.ofEntries( + Map.entry("brokerServiceUrl","String"), + Map.entry("webServiceUrl","String"), + Map.entry("ackTimeout", "long"), + Map.entry("ackTimeoutMillis","long"), + Map.entry("enableTransaction","boolean"), + Map.entry("consumerConfig","Map"), + Map.entry("producerConfig","Map") + ); + public static Map convertRawMiscConf(Map s4jMiscConfMapRaw) { + Map s4jMiscConfObjMap = new HashMap<>(); + setConfObjMapForPrimitives(s4jMiscConfObjMap, s4jMiscConfMapRaw, validS4jMiscConfKeyTypeMap); + + /** + * Non-primitive type processing for Pulsar client configuration items + */ + // Only the following 2 non-primitive type settings will be set explicitly + // * producerConfig + // * consumerConfig + + return s4jMiscConfObjMap; + } + + + // Utility function + // - get configuration key names by the value type + private static List getConfKeyNameByValueType(Map confKeyTypeMap, String tgtValType) { + ArrayList confKeyNames = new ArrayList<>(); + + for (Map.Entry entry: confKeyTypeMap.entrySet()) { + if (StringUtils.equalsIgnoreCase(entry.getValue().toString(), tgtValType)) { + confKeyNames.add(entry.getKey().toString()); + } + } + + return confKeyNames; + } + + // Conversion from Map to Map for configuration items with primitive + // value types + private static void setConfObjMapForPrimitives( + Map tgtConfObjMap, + Map srcConfMapRaw, + Map validConfKeyTypeMap) + { + List confKeyList = new ArrayList<>(); + + // All configuration items with "String" as the value type + confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "String"); + for (String confKey : confKeyList) { + if (srcConfMapRaw.containsKey(confKey)) { + String confVal = srcConfMapRaw.get(confKey); + if (StringUtils.isNotBlank(confVal)) { + tgtConfObjMap.put(confKey, confVal); + } + } + } + + // All configuration items with "long" as the value type + confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "long"); + for (String confKey : confKeyList) { + if (srcConfMapRaw.containsKey(confKey)) { + String confVal = srcConfMapRaw.get(confKey); + if (StringUtils.isNotBlank(confVal)) { + tgtConfObjMap.put(confKey, Long.valueOf(confVal)); + } + } + } + + // All configuration items with "int" as the value type + confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "int"); + for (String confKey : confKeyList) { + if (srcConfMapRaw.containsKey(confKey)) { + String confVal = srcConfMapRaw.get(confKey); + if (StringUtils.isNotBlank(confVal)) { + tgtConfObjMap.put(confKey, Integer.valueOf(confVal)); + } + } + } + + // All configuration items with "boolean" as the value type + confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "boolean"); + for (String confKey : confKeyList) { + if (srcConfMapRaw.containsKey(confKey)) { + String confVal = srcConfMapRaw.get(confKey); + if (StringUtils.isNotBlank(confVal)) { + tgtConfObjMap.put(confKey, Boolean.valueOf(confVal)); + } + } + } + + // TODO: So far the above primitive types should be good enough. + // Add support for other types when needed + } + + private static String getInvalidConfValStr(String confKey, String confVal, String configCategory, String expectedVal) { + return "Incorrect value \"" + confVal + "\" for Pulsar " + configCategory + + " configuration item of \"" + confKey + "\". Expecting the following value (format): " + expectedVal; + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JCompletionListener.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JCompletionListener.java new file mode 100644 index 000000000..fcc185869 --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JCompletionListener.java @@ -0,0 +1,86 @@ +package io.nosqlbench.adapter.s4j.util; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.dispensers.S4JBaseOpDispenser; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.CompletionListener; +import javax.jms.JMSException; +import javax.jms.Message; + +/** + * Used for async message production + */ +public class S4JCompletionListener implements CompletionListener { + + private final static Logger logger = LogManager.getLogger(S4JCompletionListener.class); + + private final S4JSpace s4JSpace; + private final S4JBaseOpDispenser s4jBaseOpDispenser; + + public S4JCompletionListener(S4JSpace s4JSpace, S4JBaseOpDispenser s4jBaseOpDispenser) { + assert (s4JSpace != null); + assert (s4jBaseOpDispenser != null); + this.s4JSpace = s4JSpace; + this.s4jBaseOpDispenser = s4jBaseOpDispenser; + } + + @Override + public void onCompletion(Message message) { + try { + if (logger.isTraceEnabled()) { + // for testing purpose + String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP); + logger.trace("onCompletion::Async message send successful - message ID {} ({}) " + , message.getJMSMessageID(), myMsgSeq); + } + + if (s4JSpace.isTrackingMsgRecvCnt() ) { + long totalResponseCnt = s4JSpace.incTotalOpResponseCnt(); + if (logger.isTraceEnabled()) { + logger.trace("... async op response received so far: {}", totalResponseCnt); + } + } + } + catch (JMSException e) { + S4JAdapterUtil.processMsgErrorHandling( + e, + s4JSpace.isStrictMsgErrorHandling(), + "Unexpected errors when async sending a JMS message."); + } + } + + @Override + public void onException(Message message, Exception e) { + try { + if (logger.isDebugEnabled()) { + // for testing purpose + String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP); + + logger.debug("onException::Async message send failed - message ID {} ({}) " + , message.getJMSMessageID(), myMsgSeq); + } + } + catch (JMSException jmsException) { + logger.warn("onException::Unexpected error: " + jmsException.getMessage()); + } + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java new file mode 100644 index 000000000..f22b6d78b --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JJMSContextWrapper.java @@ -0,0 +1,52 @@ +package io.nosqlbench.adapter.s4j.util; + +import org.apache.commons.lang3.builder.ToStringBuilder; + +import javax.jms.JMSContext; +import javax.jms.Session; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +public class S4JJMSContextWrapper { + private final String jmsContextIdentifer; + private final JMSContext jmsContext; + private final int jmsSessionMode; + + public S4JJMSContextWrapper(String identifer, JMSContext jmsContext) { + this.jmsContextIdentifer = identifer; + this.jmsContext = jmsContext; + this.jmsSessionMode = jmsContext.getSessionMode(); + } + + public int getJmsSessionMode() { return jmsSessionMode; } + public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); } + public String getJmsContextIdentifer() { return jmsContextIdentifer; } + public JMSContext getJmsContext() { return jmsContext; } + + public void close() { + if (jmsContext != null) { + jmsContext.close(); + } + } + + public String toString() { + return new ToStringBuilder(this). + append("jmsContextIdentifer", jmsContextIdentifer). + append("jmsContext", jmsContext.toString()). + toString(); + } +} diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JMessageListener.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JMessageListener.java new file mode 100644 index 000000000..e7e9c0a5e --- /dev/null +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JMessageListener.java @@ -0,0 +1,97 @@ +package io.nosqlbench.adapter.s4j.util; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.codahale.metrics.Histogram; +import io.nosqlbench.adapter.s4j.S4JSpace; +import io.nosqlbench.adapter.s4j.dispensers.S4JBaseOpDispenser; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; + +/** + * Used for async message consumption + */ +public class S4JMessageListener implements MessageListener { + + private final static Logger logger = LogManager.getLogger(S4JMessageListener.class); + + private final float msgAckRatio; + private final int slowAckInSec; + private final JMSContext jmsContext; + private final S4JSpace s4jSpace; + private final S4JBaseOpDispenser s4jBaseOpDispenser; + + public S4JMessageListener( + JMSContext jmsContext, + S4JSpace s4jSpace, + S4JBaseOpDispenser s4jBaseOpDispenser, + float msgAckRatio, + int slowAckInSec) + { + assert (jmsContext != null); + assert (s4jSpace != null); + assert (s4jBaseOpDispenser != null); + + this.jmsContext = jmsContext; + this.s4jSpace = s4jSpace; + this.s4jBaseOpDispenser = s4jBaseOpDispenser; + this.msgAckRatio = msgAckRatio; + this.slowAckInSec = slowAckInSec; + } + + @Override + public void onMessage(Message message) { + try { + if (message != null) { + s4jSpace.processMsgAck(jmsContext, message, msgAckRatio, slowAckInSec); + + int msgSize = message.getIntProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP); + S4JAdapterMetrics s4JAdapterMetrics = s4jBaseOpDispenser.getS4jAdapterMetrics(); + Histogram messageSizeHistogram = s4JAdapterMetrics.getMessagesizeHistogram(); + messageSizeHistogram.update(msgSize); + + if (logger.isTraceEnabled()) { + // for testing purpose + String myMsgSeq = message.getStringProperty(S4JAdapterUtil.NB_MSG_SEQ_PROP); + logger.trace("onMessage::Async message receive successful - message ID {} ({}) " + , message.getJMSMessageID(), myMsgSeq); + } + + if (s4jSpace.isTrackingMsgRecvCnt()) { + s4jSpace.incTotalOpResponseCnt(); + } + } + else { + if (s4jSpace.isTrackingMsgRecvCnt()) { + s4jSpace.incTotalNullMsgRecvdCnt(); + } + } + } + catch (JMSException e) { + S4JAdapterUtil.processMsgErrorHandling( + e, + s4jSpace.isStrictMsgErrorHandling(), + "Unexpected errors when async receiving a JMS message."); + } + } +} diff --git a/nb5/pom.xml b/nb5/pom.xml index f801d40cd..340360d07 100644 --- a/nb5/pom.xml +++ b/nb5/pom.xml @@ -94,6 +94,12 @@ 4.17.31-SNAPSHOT + + io.nosqlbench + adapter-s4j + 4.17.31-SNAPSHOT + + diff --git a/pom.xml b/pom.xml index 7eac71c2c..098185117 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ adapter-dynamodb adapter-mongodb adapter-pulsar + adapter-s4j