1) Add NB5 S4J example yaml files

2) Update NB5 S4J readme file
3) Minor code adjustments
This commit is contained in:
yabinmeng 2022-12-07 22:14:03 -06:00
parent 9a00af0806
commit c08f8b6b3d
13 changed files with 569 additions and 90 deletions

View File

@ -7,7 +7,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.17.31-SNAPSHOT</version>
<version>4.17.32-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -39,13 +39,13 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.17.31-SNAPSHOT</version>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>4.17.31-SNAPSHOT</version>
<version>4.17.32-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.datastax.oss/pulsar-jms -->

View File

@ -17,6 +17,7 @@
package io.nosqlbench.adapter.s4j;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.datastax.oss.pulsar.jms.PulsarJMSContext;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import io.nosqlbench.adapter.s4j.util.*;
@ -24,8 +25,10 @@ import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -98,13 +101,19 @@ public class S4JSpace implements AutoCloseable {
this.pulsarSvcUrl = cfg.get("service_url");
this.webSvcUrl = cfg.get("web_url");
this.maxNumConn= cfg.getOrDefault("num_conn", Integer.valueOf(1));
this.maxNumSessionPerConn = cfg.getOrDefault("num_session", Integer.valueOf(1));
this.maxS4JOpTimeInSec= cfg.getOrDefault("max_s4jop_time", Long.valueOf(0));
this.trackingMsgRecvCnt=cfg.getOrDefault("track_msg_cnt", Boolean.FALSE);
this.strictMsgErrorHandling = cfg.getOrDefault("strict_msg_error_handling", Boolean.FALSE);
this.maxNumConn=
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
this.maxNumSessionPerConn =
NumberUtils.toInt(cfg.getOptional("num_session").orElse("1"));
this.maxS4JOpTimeInSec =
NumberUtils.toLong(cfg.getOptional("max_s4jop_time").orElse("0L"));
this.trackingMsgRecvCnt =
BooleanUtils.toBoolean(cfg.getOptional("track_msg_cnt").orElse("false"));
this.strictMsgErrorHandling =
BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false"));
this.s4jClientConfFileName = cfg.get("config");
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(cfg.get("session_mode"));
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(
cfg.getOptional("session_mode").orElse(""));
this.s4JClientConf = new S4JClientConf(pulsarSvcUrl, webSvcUrl, s4jClientConfFileName);
this.initializeSpace(s4JClientConf);
@ -340,7 +349,17 @@ public class S4JSpace implements AutoCloseable {
S4JClientConf s4JClientConf,
int sessionMode)
{
boolean useCredentialsEnable = S4JAdapterUtil.isUseCredentialsEnabled(s4JClientConf);
if ( !S4JAdapterUtil.isAuthNRequired(s4JClientConf) &&
S4JAdapterUtil.isUseCredentialsEnabled(s4JClientConf) ) {
throw new S4JAdapterInvalidParamException(
"'jms.useCredentialsFromCreateConnection' can't set be true " +
"when Pulsar client authN parameters are not set. "
);
}
boolean useCredentialsEnable =
S4JAdapterUtil.isAuthNRequired(s4JClientConf) &&
S4JAdapterUtil.isUseCredentialsEnabled(s4JClientConf);
JMSContext jmsConnContext;
if (!useCredentialsEnable)
@ -361,4 +380,65 @@ public class S4JSpace implements AutoCloseable {
return jmsConnContext;
}
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long curCycle) {
return getOrCreateS4jJmsContextWrapper(curCycle, null);
}
// Get the next JMSContext Wrapper in the following approach
// - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection):
// c0s0, c0s1, c1s0, c1s1, c2s0, c2s1
// - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session
// When reaching the end of connection, move back to the first connection, but get the next session.
// e.g. first: c0s0 (0)
// next: c1s0 (1)
// next: c2s0 (2)
// next: c0s1 (3)
// next: c1s1 (4)
// next: c2s1 (5)
// next: c0s0 (6) <-- repeat the pattern
// next: c1s0 (7)
// next: c2s0 (8)
// next: c0s1 (9)
// ... ...
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(
long curCycle,
Map<String, Object> overrideS4jConfMap)
{
int totalConnNum = getMaxNumConn();
int totalSessionPerConnNum = getMaxNumSessionPerConn();
int connSeqNum = (int) curCycle % totalConnNum;
int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum;
String jmsConnContextIdStr = getConnLvlJmsContextIdentifier(connSeqNum);
JMSContext connLvlJmsContext = connLvlJmsContexts.get(jmsConnContextIdStr);
// Connection level JMSContext objects should be already created during the initialization phase
assert (connLvlJmsContext != null);
String jmsSessionContextIdStr = getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum);
S4JJMSContextWrapper jmsContextWrapper = sessionLvlJmsContexts.get(jmsSessionContextIdStr);
if (jmsContextWrapper == null) {
JMSContext jmsContext = null;
if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) {
jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode());
} else {
jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext(
connLvlJmsContext.getSessionMode(), overrideS4jConfMap);
}
jmsContextWrapper = new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext);
sessionLvlJmsContexts.put(jmsSessionContextIdStr, jmsContextWrapper);
if (logger.isDebugEnabled()) {
logger.debug("[Session level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsContextWrapper);
}
}
return jmsContextWrapper;
}
}

View File

@ -63,7 +63,7 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
// which can be applied to many testing scenarios.
// Setting them here will allow scenario-specific customer configurations. At the moment, only the
// DLT related settings are supported
private final Map<String, Object> combinedConsumerConfigObjMap = new HashMap<>();
private final Map<String, Object> combinedS4jConfigObjMap = new HashMap<>();
public MessageConsumerOpDispenser(DriverAdapter adapter,
@ -88,11 +88,22 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
parsedOp.getStaticConfigOr("msg_ack_ratio", Float.valueOf(1.0f));
this.slowAckInSec =
parsedOp.getStaticConfigOr("slow_ack_in_sec", Integer.valueOf(0));
this.subNameStrFunc =
lookupMandtoryStrOpValueFunc("subscription_name");
this.localMsgSelectorFunc =
lookupOptionalStrOpValueFunc("msg_selector");
// Subscription name is OPTIONAL for queue and non-shared, non-durable topic;
// but mandatory for shared or shared topic
if ( StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.QUEUE.label) ||
( StringUtils.equalsIgnoreCase(destType, S4JAdapterUtil.JMS_DEST_TYPES.TOPIC.label) &&
!durableTopic && !sharedTopic) ) {
this.subNameStrFunc =
lookupOptionalStrOpValueFunc("subscription_name");
}
else {
this.subNameStrFunc =
lookupMandtoryStrOpValueFunc("subscription_name");
}
String[] stmtLvlConsumerConfKeyNameList = {
"consumer.ackTimeoutMillis",
"consumer.deadLetterPolicy",
@ -106,14 +117,14 @@ public class MessageConsumerOpDispenser extends S4JBaseOpDispenser {
confVal);
}
this.combinedConsumerConfigObjMap.putAll(
this.combinedS4jConfigObjMap.putAll(
s4jSpace.getS4JClientConf().mergeExtraConsumerConfig(stmtLvlConsumerConfRawMap));
}
@Override
public MessageConsumerOp apply(long cycle) {
S4JJMSContextWrapper s4JJMSContextWrapper =
getOrCreateS4jJmsContextWrapper(cycle, this.combinedConsumerConfigObjMap);
s4jSpace.getOrCreateS4jJmsContextWrapper(cycle, this.combinedS4jConfigObjMap);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransact = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);

View File

@ -279,7 +279,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
throw new S4JAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}
S4JJMSContextWrapper s4JJMSContextWrapper = getOrCreateS4jJmsContextWrapper(cycle);
S4JJMSContextWrapper s4JJMSContextWrapper = s4jSpace.getOrCreateS4jJmsContextWrapper(cycle);
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
boolean commitTransaction = !super.commitTransaction(txnBatchNum, jmsContext.getSessionMode(), cycle);

View File

@ -167,67 +167,6 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
return stringLongFunction;
}
// Get the next JMSContext Wrapper in the following approach
// - The JMSContext wrapper pool has the following sequence (assuming 3 [c]onnections and 2 [s]essions per connection):
// c0s0, c0s1, c1s0, c1s1, c2s0, c2s1
// - When getting the next JMSContext wrapper, always get from the next connection, starting from the first session
// When reaching the end of connection, move back to the first connection, but get the next session.
// e.g. first: c0s0 (0)
// next: c1s0 (1)
// next: c2s0 (2)
// next: c0s1 (3)
// next: c1s1 (4)
// next: c2s1 (5)
// next: c0s0 (6) <-- repeat the pattern
// next: c1s0 (7)
// next: c2s0 (8)
// next: c0s1 (9)
// ... ...
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(
long curCycle,
Map<String, Object> overrideS4jConfMap)
{
int totalConnNum = s4jSpace.getMaxNumConn();
int totalSessionPerConnNum = s4jSpace.getMaxNumSessionPerConn();
int connSeqNum = (int) curCycle % totalConnNum;
int sessionSeqNum = ( (int)(curCycle / totalConnNum) ) % totalSessionPerConnNum;
String jmsConnContextIdStr = s4jSpace.getConnLvlJmsContextIdentifier(connSeqNum);
JMSContext connLvlJmsContext = connLvlJmsContexts.get(jmsConnContextIdStr);
// Connection level JMSContext objects should be already created during the initialization phase
assert (connLvlJmsContext != null);
String jmsSessionContextIdStr = s4jSpace.getSessionLvlJmsContextIdentifier(connSeqNum, sessionSeqNum);
S4JJMSContextWrapper jmsContextWrapper = sessionLvlJmsContexts.get(jmsSessionContextIdStr);
if (jmsContextWrapper == null) {
JMSContext jmsContext = null;
if (overrideS4jConfMap == null || overrideS4jConfMap.isEmpty()) {
jmsContext = connLvlJmsContext.createContext(connLvlJmsContext.getSessionMode());
} else {
jmsContext = ((PulsarJMSContext) connLvlJmsContext).createContext(
connLvlJmsContext.getSessionMode(), overrideS4jConfMap);
}
jmsContextWrapper = new S4JJMSContextWrapper(jmsSessionContextIdStr, jmsContext);
sessionLvlJmsContexts.put(jmsSessionContextIdStr, jmsContextWrapper);
if (logger.isDebugEnabled()) {
logger.debug("[Session level JMSContext] {} -- {}",
Thread.currentThread().getName(),
jmsContextWrapper);
}
}
return jmsContextWrapper;
}
public S4JJMSContextWrapper getOrCreateS4jJmsContextWrapper(long curCycle) {
return getOrCreateS4jJmsContextWrapper(curCycle, null);
}
/**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*/
@ -340,7 +279,8 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
}
if (durable && !shared)
jmsConsumer = jmsContext.createDurableConsumer((Topic) destination, subName, msgSelector, nonLocal);
jmsConsumer = jmsContext.createDurableConsumer(
(Topic) destination, subName, msgSelector, nonLocal);
else if (!durable)
jmsConsumer = jmsContext.createSharedConsumer((Topic) destination, subName, msgSelector);
else

View File

@ -75,7 +75,7 @@ public class MessageConsumerOp extends S4JOp {
// just no-op.
if ( (maxS4jOpDurationInSec == 0) || (timeElapsedMills <= (maxS4jOpDurationInSec*1000)) ) {
// Please see S4JSpace::getOrCreateJmsConsumer() for async processing
// Please see S4JBaseOpDispenser::getOrCreateJmsConsumer() for async processing
if (!asyncApi) {
Message recvdMsg;

View File

@ -69,7 +69,7 @@ public class MessageProducerOp extends S4JOp {
int msgSize = message.getIntProperty(S4JAdapterUtil.NB_MSG_SIZE_PROP);
messageSizeHistogram.update(msgSize);
// Please see s4JSpace::getOrCreateJmsProducer() for async processing
// Please see S4JBaseOpDispenser::getOrCreateJmsProducer() for async processing
if (!asyncApi) {
if (logger.isDebugEnabled()) {
// for testing purpose

View File

@ -48,9 +48,6 @@ public class S4JAdapterUtil {
// String value
// - valid values: see JMS_DEST_TYPES
DEST_TYPE("dest_type"),
// JMS destination name
// String value
DEST_NAME("dest_name"),
// Asynchronous message processing
ASYNC_API("async_api"),
// Transaction batch size
@ -266,11 +263,30 @@ public class S4JAdapterUtil {
return sessionMode;
}
public static boolean isUseCredentialsEnabled(S4JClientConf s4JClientConf) {
assert (s4JClientConf != null);
public static boolean isAuthNRequired(S4JClientConf s4jClientConf) {
assert (s4jClientConf != null);
boolean required = false;
Map<String, Object> s4jClientConfObjMap = s4jClientConf.getS4jConfMapObj_client();
if (s4jClientConfObjMap.containsKey("authPlugin") && s4jClientConfObjMap.containsKey("authParams")) {
Object authPluginObj = s4jClientConfObjMap.get("authPlugin");
Object authParamsObj = s4jClientConfObjMap.get("authParams");
if ( (authPluginObj != null) && StringUtils.isNotBlank(authPluginObj.toString()) &&
(authParamsObj != null) && StringUtils.isNotBlank(authParamsObj.toString()) ) {
required = true;
}
}
return required;
}
public static boolean isUseCredentialsEnabled(S4JClientConf s4jClientConf) {
assert (s4jClientConf != null);
boolean enabled = false;
Map<String, Object> s4jConfMap = s4JClientConf.getS4jConfObjMap();
Map<String, Object> s4jConfMap = s4jClientConf.getS4jConfObjMap();
if (s4jConfMap.containsKey("jms.useCredentialsFromCreateConnection")) {
enabled = BooleanUtils.toBoolean(s4jConfMap.get("jms.useCredentialsFromCreateConnection").toString());
@ -278,12 +294,12 @@ public class S4JAdapterUtil {
return enabled;
}
public static String getCredentialUserName(S4JClientConf s4JClientConf) {
public static String getCredentialUserName(S4JClientConf s4jClientConf) {
return "dummy";
}
public static String getCredentialPassword(S4JClientConf s4JClientConf) {
Map<String, Object> s4jConfMap = s4JClientConf.getS4jConfObjMap();
public static String getCredentialPassword(S4JClientConf s4jClientConf) {
Map<String, Object> s4jConfMap = s4jClientConf.getS4jConfObjMap();
if (s4jConfMap.containsKey("authParams"))
return s4jConfMap.get("authParams").toString();
else

View File

@ -0,0 +1,73 @@
# document level parameters that apply to all Pulsar client types:
params:
temporary_dest: "false"
dest_type: "topic"
# default: true
async_api: "false"
# whether to wait indefinitely (as the default behavior)
# - only applies when "async_api" is false (synchronous API)
# - only applies to message receiving
# - default: false
blocking_msg_recv: "true"
## (Optional) If shared topic or not (only relevant when the destination is a topic)
share_topic: "true"
## (Optional) If durable topic or not (only relevant when the destination is a topic)
durable_topic: "false"
blocks:
msg-consume-block:
ops:
op1:
## The value represents the destination (queue or topic) name)
MessageConsume: "mys4jtest_t"
## Subscription name
## - optional for queue and non-shared, non-durable topic
## - mandatory for shared and/or durable topic
subscription_name: "nbs4j-sub"
## (Optional) client side message selector
msg_selector: ""
## (Optional) No Local
no_local: "true"
## (Optional) Read Timeout
read_timeout: "10"
## (Optional) Receive message without wait
no_wait: "true"
## (Optional) Message acknowledgement ratio
msg_ack_ratio: "0.5"
## (Optional) Simulate slow consumer acknowledgement
# must be non-negative numbers. negative numbers will be treated as 0
# 0 - means no simulation
# positive value - the number of seconds to pause before acknowledgement
slow_ack_in_sec: "0"
#####
## (Optional) Statement level settings for Consumer
#
## AckTimeout value (at least 1 second)
consumer.ackTimeoutMillis: 1000
## DLQ policy
consumer.deadLetterPolicy: '{ "maxRedeliverCount": "2" }'
## NegativeAck Redelivery policy
consumer.negativeAckRedeliveryBackoff: |
{
}
## AckTimeout Redelivery policy
consumer.ackTimeoutRedeliveryBackoff: |
{
"minDelayMs":"10",
"maxDelayMs":"20",
"multiplier":"1.2"
}

View File

@ -0,0 +1,54 @@
bindings:
cur_cycle: ToString()
mykey: Mod(5); ToString(); Prefix("key-")
mytext_val: AlphaNumericString(30)
mymap_val1: AlphaNumericString(10)
mymap_val2: AlphaNumericString(20)
mystream_val1: AlphaNumericString(50)
# document level parameters that apply to all Pulsar client types:
params:
temporary_dest: "false"
dest_type: "queue"
async_api: "true"
blocks:
msg-produce-block:
ops:
op1:
## The value represents the destination (queue or topic) name)
MessageProduce: "mys4jtest_t"
## (Optional) JMS headers (in JSON format).
msg_header: |
{
"JMSPriority": "9"
}
## (Optional) JMS properties, predefined or customized (in JSON format).
msg_property: |
{
"JMSXGroupID": "{mykey}"
}
## (Optional) JMS message types, default to be BYTES.
msg_type: "text"
## (Mandatory) JMS message body. Value depends on msg_type.
msg_body: "{mytext_val}"
# # example of having "map" as the message type
# msg_type: "map"
# msg_body: |
# {
# "prop-key-1": "{mymap_val1}",
# "prop-key-2": "{mymap_val2}"
# }
# # example of having "stream" as the message type
# msg_type: "stream"
# msg_body: |
# [
# "100",
# "{mystream_val1}",
# "abcdef"
# ]

View File

@ -0,0 +1,244 @@
- [1. Overview](#1-overview)
- [2. Execute NB S4J Workload](#2-execute-nb-s4j-workload)
- [3. NB S4J Driver Configuration Parameter File](#3-nb-s4j-driver-configuration-parameter-file)
- [4. NB S4J Scenario Definition File](#4-nb-s4j-scenario-definition-file)
- [4.1. Document Level Parameters](#41-document-level-parameters)
- [4.2. NB S4J Workload Types](#42-nb-s4j-workload-types)
- [4.2.1. Publish Messages to a JMS Destination, Queue or Topic](#421-publish-messages-to-a-jms-destination-queue-or-topic)
- [4.2.2. Receiving Messages from a JMS Destination, Queue or Topic](#422-receiving-messages-from-a-jms-destination-queue-or-topic)
# 1. Overview
This driver is similar to [NB Pulsar driver](../../../../driver-pulsar/src/main/resources/pulsar.md) that allows NB based workload generation and performance testing against a Pulsar cluster. It also follows a similar pattern to configure and connect to the Pulsar cluster for workload execution.
However, the major difference is instead of simulating native Pulsar client workloads, the NB S4J driver allows simulating JMS oriented workloads (that follows JMS spec 2.0 and 1.1) to be executed on the Pulsar cluster. Under the hood, this is achieved through DataStax's [Starlight for JMS API] (https://github.com/datastax/pulsar-jms).
# 2. Execute NB S4J Workload
The following is an example of executing a NB S4J workload (defined as *pulsar_s4j.yaml*)
```
$ <nb_cmd> run driver=s4j cycles=10000 threads=4 num_conn=2 num_session=2 session_mode="client_ack" strict_msg_error_handling="false" web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=/path/to/nb_s4j_config.properties yaml=/path/to/pulsar_s4j.yaml -vv --logs-dir=s4j_log
```
In the above NB CLI command, the S4J driver specific parameters are listed as below:
* num_conn: the number of JMS connections to be created
* num_session: the number of JMS sessions per JMS connection
* Note that multiple JMS sessions can be created from one JMS connection, and they share the same connection characteristics.
* session_mode: the session mode used when creating a JMS session
* web_url: the URL of the Pulsar web service
* service_url: the URL of the Pulsar native protocol service
* (optional) strict_msg_error_handling: whether to do strict error handling
* when true, Pulsar client error will not stop NB S4J execution
* otherwise, any Pulsar client error will stop NB S4J execution
* (optional) max_s4jop_time: maximum time (in seconds) to execute the actual S4J operations (e.g. message sending or receiving). If NB execution time is beyond this limit, each NB cycle is just a no-op. Please NOTE:
* this is useful when controlled NB execution is needed with NB CLI scripting.
* if this parameter is not specified or the value is 0, it means no time limitation. Every single NB cycle will trigger an actual S4J operation.
* (optional) track_msg_cnt: When set to true (with default as false), the S4J driver will keep track of the confirmed response count for message sending and receiving.
Other NB engine parameters are straight forward:
* driver: must be **s4j**
* threads: depending on the workload type, the NB thread number determines how many producers or consumers will be created. All producers or consumers will share the available JMS connections and sessions
* yamL: the NB S4J scenario definition yaml file
* config: specify the file that contains the connection parameters used by the S4J API
# 3. NB S4J Driver Configuration Parameter File
The S4J API has a list of configuration options that can be found here: https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options.
The NB S4J driver supports these configuration options via a config property file, an example of which is listed below. The configuration parameters in this file are grouped into several groups. The comments below explain how the grouping works.
```
###########
# Overview: Starlight for JMS (S4J) API configuration items are listed at:
# https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options
enableTransaction=true
####
# S4J API specific configurations (non Pulsar specific) - jms.***
jms.enableClientSideEmulation=true
jms.usePulsarAdmin=false
#...
#####
# Pulsar client related configurations - client.***
# - Valid settings: http://pulsar.apache.org/docs/en/client-libraries-java/#client
#
# - These Pulsar client settings (without the "client." prefix) will be
# directly used as S4J configuration settings, on a 1-to-1 basis.
#--------------------------------------
# only relevant when authentication is enabled
client.authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
client.authParams=file:///path/to/authentication/jwt/file
# only relevant when in-transit encryption is enabled
client.tlsTrustCertsFilePath=/path/to/certificate/file
#...
#####
# Producer related configurations (global) - producer.***
# - Valid settings: http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
#
# - These Pulsar producer settings (without "producer." prefix) will be collectively (as a map)
# mapped to S4J connection setting of "producerConfig"
#--------------------------------------
producer.blockIfQueueFull=true
# disable producer batching
#producer.batchingEnabled=false
#...
#####
# Consumer related configurations (global) - consumer.***
# - Valid settings: http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
#
# - These Pulsar producer settings (without "consumer." portion) will be collectively (as a map)
# mapped to S4J connection setting of "consumerConfig"
#--------------------------------------
#...
```
# 4. NB S4J Scenario Definition File
Like any NB scenario yaml file, the NB S4J yaml file is composed of 3 major components:
* bindings: define NB bindings
* params: define document level parameters
* blocks: define various statement blocks. Each statement block represents one JMS workload type
```
bindings:
... ...
params:
... ...
blocks:
... ...
```
## 4.1. Document Level Parameters
The parameters defined in this section will be applicable to all statement blocks. An example of some common parameters that can be set at the document level is listed below:
* temporary_dest: whether JMS workload is dealing with a temporary destination
* dest_type: JMS destination type - queue or topic
```
params:
temporary_dest: "false"
dest_type: "<jms_destination_type>"
async_api: "true"
txn_batch_num: <number_of_message_ops_in_one_transaction>
blocking_msg_recv: <whehter_to_block_when_receiving_messages>
shared_topic: <if_shared_topic_or_not> // only relevant when the destination type is a topic
durable_topic: <if_durable_topic_or_not> // only relevant when the destination type is a topic
```
Please **NOTE** that the above parameters won't necessarily be specified at the document level. If they're specified at the statement level, they will only impact the statement within which they're specified.
## 4.2. NB S4J Workload Types
The NB S4J driver supports 2 types of JMS operations:
* One for message producing/sending/publishing
* this is identified by NB Op identifier ***MessageProduce***
* One for message consuming/receiving/subscribing
* this is identified by NB Op identifier ***MessageConsume***
### 4.2.1. Publish Messages to a JMS Destination, Queue or Topic
The NB S4J statement block for publishing messages to a JMS destination (either a Queue or a topic) has the following format.
* Optionally, you can specify the JMS headers (**msg_header**) and properties (**msg_property**) via valid JSON strings in key: value format.
* The default message type (**msg_type**) is "byte". But optionally, you can specify other message types such as "text", "map", etc.
* The message payload (**msg_body**) is the only mandatory field.
```
blocks:
msg-produce-block:
ops:
op1:
## The value represents the destination (queue or topic) name)
MessageProduce: "mys4jtest_t"
## (Optional) JMS headers (in JSON format).
msg_header: |
{
"<header_key>": "<header_value>"
}
## (Optional) JMS properties, predefined or customized (in JSON format).
msg_property: |
{
"<property1_key>": "<property_value1>",
"<property2_key>": "<property_value2>"
}
## (Optional) JMS message types, default to be BYTES.
msg_type: "text"
## (Mandatory) JMS message body. Value depends on msg_type.
msg_body: "{mytext_val}"
```
### 4.2.2. Receiving Messages from a JMS Destination, Queue or Topic
The generic NB S4J statement block for receiving messages to a JMS destination (either a Queue or a topic) has the following format. All the statement specific parameters are listed as below.
* **msg_selector**: Message selector string
* **no_local**: Only applicable to a Topic as the destination. This allows a subscriber to inhibit the delivery of messages published by its own connection.
* **read_timeout**: The timeout value for receiving a message from a destination
* This setting only works if **no_wait** is false
* If the **read_timeout** value is 0, it behaves the same as **no_wait** is true
* **no_wait**: Whether to receive the next message immediately if one is available
* **msg_ack_ratio**: the ratio of the received messages being acknowledged
* **slow_ack_in_sec**: whether to simulate a slow consumer (pause before acknowledging after receiving a message)
* value 0 means no simulation (consumer acknowledges right away)
* negative ack/ack timeout/deadletter topic related settings
* The settings here (as the scenario specific settings) will be merged with the
* global settings in *s4j_config.properties* file
```
blocks:
msg-produce-block:
ops:
op1:
## The value represents the destination (queue or topic) name)
MessageProduce: "mys4jtest_t"
## (Optional) client side message selector
msg_selector: ""
## (Optional) No Local
no_local: "true"
## (Optional) Read Timeout
read_timeout: "10"
## (Optional) Receive message without wait
no_wait: "true"
## (Optional) Message acknowledgement ratio
msg_ack_ratio: "0.5"
## (Optional) Simulate slow consumer acknowledgement
# must be non-negative numbers. negative numbers will be treated as 0
# 0 - means no simulation
# positive value - the number of seconds to pause before acknowledgement
slow_ack_in_sec: "0"
#####
## (Optional) Statement level settings for Consumer
#
## AckTimeout value (at least 1 second)
consumer.ackTimeoutMillis: 1000
## DLQ policy
consumer.deadLetterPolicy: '{ "maxRedeliverCount": "2" }'
## NegativeAck Redelivery policy
consumer.negativeAckRedeliveryBackoff: |
{
}
## AckTimeout Redelivery policy
consumer.ackTimeoutRedeliveryBackoff: |
{
"minDelayMs":"10",
"maxDelayMs":"20",
"multiplier":"1.2"
}
```

View File

@ -0,0 +1,61 @@
###########
# Overview: Starlight for JMS (S4J) API configuration items are listed at:
# https://docs.datastax.com/en/fast-pulsar-jms/docs/1.1/pulsar-jms-reference.html#_configuration_options
enableTransaction=true
####
# S4J API specific configurations (non Pulsar specific) - jms.***
#--------------------------------------
jms.usePulsarAdmin=false
jms.precreateQueueSubscription=false
jms.enableClientSideEmulation=false
jms.useServerSideFiltering=true
jms.useCredentialsFromCreateConnection=false
jms.transactionsStickyPartitions=true
# for JMS priority
jms.enableJMSPriority=true
jms.priorityMapping=non-linear
#...
#####
# Pulsar client related configurations - client.***
# - Valid settings: http://pulsar.apache.org/docs/en/client-libraries-java/#client
#
# - These Pulsar client settings (without the "client." prefix) will be
# directly used as S4J configuration settings, on 1-to-1 basis.
#--------------------------------------
client.connectionTimeoutMs=5000
#client.authPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
#client.authParams=
#...
#####
# Producer related configurations (global) - producer.***
# - Valid settings: http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
#
# - These Pulsar producer settings (without "producer." prefix) will be collectively (as a map)
# mapped to S4J connection setting of "producerConfig"
#--------------------------------------
#producer.sendTimeoutMs=
producer.blockIfQueueFull=true
#producer.maxPendingMessages=10000
#producer.batchingMaxMessages=10000
#...
#####
# Consumer related configurations (global) - consumer.***
# - Valid settings: http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
#
# - These Pulsar producer settings (without "consumer." portion) will be collectively (as a map)
# mapped to S4J connection setting of "consumerConfig"
#--------------------------------------
consumer.receiverQueueSize=2000
consumer.acknowledgementsGroupTimeMicros=0
consumer.ackTimeoutMillis=2000
consumer.deadLetterPolicy={ "maxRedeliverCount":"5", "deadLetterTopic":"", "initialSubscriptionName":"" }
consumer.ackTimeoutRedeliveryBackoff={"minDelayMs":"50", "maxDelayMs":"100", "multiplier":"2.0"}
consumer.negativeAckRedeliveryBackoff={}
#...

View File

@ -97,7 +97,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-s4j</artifactId>
<version>4.17.31-SNAPSHOT</version>
<version>4.17.32-SNAPSHOT</version>
</dependency>
</dependencies>