Update NBS4J version to 4.0.1 (which is based on Pulsar client version 3.0.0) and NB Pulsar version to 3.0.0

This commit is contained in:
yabinmeng 2023-06-23 21:44:30 -05:00
parent 561f3424fe
commit 046c0fc1c6
11 changed files with 309 additions and 22 deletions

View File

@ -16,25 +16,21 @@
package io.nosqlbench.adapter.pulsar.dispensers;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import io.nosqlbench.adapter.pulsar.PulsarSpace;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.DOC_LEVEL_PARAMS;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.LongFunction;
import java.util.function.Predicate;
import java.util.function.Supplier;
@ -93,13 +89,10 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
.newTransaction()
.build()
.get();
} catch (final ExecutionException | InterruptedException err) {
} catch (Exception err) {
if (PulsarClientOpDispenser.logger.isWarnEnabled())
PulsarClientOpDispenser.logger.warn("Error while starting a new transaction", err);
throw new RuntimeException(err);
} catch (final PulsarClientException err) {
throw new RuntimeException("Transactions are not enabled on Pulsar Client, " +
"please set client.enableTransaction=true in your Pulsar Client configuration");
throw new PulsarAdapterUnexpectedException(err);
}
};
}

View File

@ -58,7 +58,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
this.msgTypeFunc = lookupMandtoryStrOpValueFunc(MSG_TYPE_OP_PARAM);
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
}
private Message createAndSetMessagePayload(
@ -305,7 +305,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
logger.warn(
"The specified JMS message type {} is not valid, use the default TextMessage type!",
jmsMsgType);
jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label;
jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.BYTE.label;
}

View File

@ -16,6 +16,7 @@
package io.nosqlbench.adapter.s4j.util;
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.FileBasedConfiguration;
import org.apache.commons.configuration2.PropertiesConfiguration;
@ -60,7 +61,8 @@ public class S4JClientConf {
public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) {
public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName)
throws S4JAdapterUnexpectedException {
//////////////////
// Read related Pulsar client configuration settings from a file
@ -156,12 +158,9 @@ public class S4JClientConf {
}
}
}
} catch (IOException ioe) {
logger.error("Can't read the specified config properties file: " + fileName);
ioe.printStackTrace();
} catch (ConfigurationException cex) {
logger.error("Error loading configuration items from the specified config properties file: " + fileName + ":" + cex.getMessage());
cex.printStackTrace();
} catch (IOException | ConfigurationException ex) {
ex.printStackTrace();
throw new S4JAdapterUnexpectedException("Can't read the specified config properties file: " + fileName);
}
}

View File

@ -16,7 +16,7 @@ blocks:
msg-produce-block:
ops:
op1:
## The value represents the destination (queue or topic) name)
## The value represents the destination (queue or topic) name
MessageProduce: "mys4jtest_t"
## (Optional) JMS headers (in JSON format).

View File

@ -0,0 +1,37 @@
# Overview
The `e2e-nbs4j-sanity.sh` is used to run an end-to-end testing flow using NB S4J adapter. It includes a various
of message sender and receiver combinations that cover main S4J functionalities. This test fills gap where the
unit tests can't cover. It is suggested to always run this script before committing the code changes to the
official NB repo for the following NB modules:
* `adapter-pulasr` (Native Pulsar)
* `adapter-kafka` (Starlight for Kafka)
* `adapter-s4r` (Starlight for RabbitMQ)
* `adapter-s4j` (Starlight for JMS)
The reason is all these modules depend upon Apache Pulsar client library, either directly or indirectly. Sometimes
conflict, including runtime ones, could happen. When the conflict happens, it may impact the testing capability
of using one of the these modules. The end to end sanity validation script is used to prevent this from happening
and thus makes sure unwanted code changes don't end up in the official repo.
# Pulsar Cluster
Running this script requires a live Pulsar cluster. It is recommended to use an Astra Streaming tenant for this
purpose. The corresponding Pulsar cluster connection information should be put in a `client.conf` file.
Please **NOTE** that the target Pulsar cluster should have the following tenant/namespace/topic created before
running the script:
* nbtest/default/s4j-sanity
# Usage
Simply run the script from a command line to kick off the sanity validation.
```bash
$ e2e-nbs4j-sanity.sh -c </path/to/cient.conf>
```
The execution result will be recorded in a log file with the following naming pattern:
```bash
$ e2e-nbs4j-sanity-YYYYMMDDhhmmss.log
```

View File

@ -0,0 +1,131 @@
#! /usr/local/bin/bash
CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source "${CUR_SCRIPT_FOLDER}/utilities.sh"
echo
##
# Show usage info
#
usage() {
echo
echo "Usage: e2e-nbs4j-sanity.sh [-h]"
echo " [-c </path/to/client.conf>]"
echo " -h : Show usage info"
echo " -c : (Optional) Pulsar cluster connection file. Default to 'client.conf' in the same directory!"
echo
}
if [[ $# -gt 3 ]]; then
usage
errExit 10 "Incorrect input parameter count!"
fi
pulsarClientConf="./client.conf"
while [[ "$#" -gt 0 ]]; do
case $1 in
-h) usage; exit 0 ;;
-c) pulsarClientConf=$2; shift ;;
*) errExit 20 "Unknown input parameter passed: $1"; ;;
esac
shift
done
mkdir -p "./logs/nb5-exec"
mainLogDir="${CUR_SCRIPT_FOLDER}/logs"
nbExecLogDir="${mainLogDir}/nb5-exec"
# 2022-08-19 11:40:23
startTime=$(date +'%Y-%m-%d %T')
# 20220819114023
startTime2=${startTime//[: -]/}
sanityTestMainLogFile="${mainLogDir}/e2e-nbs4j-sanity-${startTime2}.log"
echo > "${sanityTestMainLogFile}"
debugMsg "pulsarClientConf=${pulsarClientConf}" "${sanityTestMainLogFile}"
if ! [[ -f "${pulsarClientConf}" ]]; then
errExit 30 \
"Can't find the Pulsar cluster client.conf file at the specified location: \"" + pulsarClientConf + "\"!" \
"${sanityTestMainLogFile}"
fi
brokerSvcUrl=$(getPropVal ${pulsarClientConf} "brokerServiceUrl")
webSvcUrl=$(getPropVal ${pulsarClientConf} "webServiceUrl")
authPlugin=$(getPropVal ${pulsarClientConf} "authPlugin")
authParams=$(getPropVal ${pulsarClientConf} "authParams")
debugMsg "brokerSvcUrl=${brokerSvcUrl}" "${sanityTestMainLogFile}"
debugMsg "webSvcUrl=${webSvcUrl}" "${sanityTestMainLogFile}"
debugMsg "authPlugin=${authPlugin}" "${sanityTestMainLogFile}"
debugMsg "authParams=xxxxxx" "${sanityTestMainLogFile}"
sanityS4jCfgPropFileName="e2e-sanity-config.properties"
sanityS4jCfgPropFile="${CUR_SCRIPT_FOLDER}/${sanityS4jCfgPropFileName}"
cp -rf "${CUR_SCRIPT_FOLDER}/${sanityS4jCfgPropFileName}.tmpl" "${sanityS4jCfgPropFile}"
if [[ -n "${authPlugin// }" || -n "${authParams// }" ]]; then
replaceStringInFile "<authPlugin_tmpl>" "${authPlugin}" "${sanityS4jCfgPropFileName}"
replaceStringInFile "<authParams_tmpl>" "${authParams}" "${sanityS4jCfgPropFileName}"
else
replaceStringInFile "<authPlugin_tmpl>" "" "${sanityS4jCfgPropFileName}"
replaceStringInFile "<authParams_tmpl>" "" "${sanityS4jCfgPropFileName}"
fi
NB5JAR="${CUR_SCRIPT_FOLDER}/../../../../../nb5/target/nb5.jar"
sanityS4jMsgSenderYamlFile="${CUR_SCRIPT_FOLDER}/sanity-msg-sender-queue.yaml"
sanityS4jMsgReceiverYamlFile="${CUR_SCRIPT_FOLDER}/sanity-msg-receiver-queue.yaml"
{
echo;
echo "======================================================================================================";
echo "Starting the sanity test for the NoSQLBench S4J adapter at ${startTime} ...";
echo;
echo " >>> Kick off an S4J message sending workload ..."
echo;
} >> "${sanityTestMainLogFile}"
read -r -d '' nbs4jMsgSendCmd << EOM
java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \
cycles=1000 threads=4 num_conn=2 num_session=2 \
session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \
service_url=${brokerSvcUrl} \
web_url=${webSvcUrl} \
config=${sanityS4jCfgPropFile} \
yaml=${sanityS4jMsgSenderYamlFile}
EOM
debugMsg "nbs4jMsgSendCmd=${nbs4jMsgSendCmd}" "${sanityTestMainLogFile}"
eval '${nbs4jMsgSendCmd}'
if [[ $? -ne 0 ]]; then
errExit 40 "Failed to kick off the S4J message sending workload!" "${sanityTestMainLogFile}"
fi
# pause 5 seconds before kicking off the message sending workload
sleep 5
{
echo;
echo " >>> Kick off an S4J message receiving workload after 30 seconds..."
echo;
} >> "${sanityTestMainLogFile}"
read -r -d '' nbs4jMsgRecvCmd << EOM
java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \
cycles=1000 threads=4 num_conn=2 num_session=2 \
session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \
service_url=${brokerSvcUrl} \
web_url=${webSvcUrl} \
config=${sanityS4jCfgPropFile} \
yaml=${sanityS4jMsgReceiverYamlFile}
EOM
debugMsg "nbs4jMsgRecvCmd=${nbs4jMsgRecvCmd}" "${sanityTestMainLogFile}"
eval '${nbs4jMsgRecvCmd}'
if [[ $? -ne 0 ]]; then
errExit 40 "Failed to kick off the S4J message receiving workload!" "${sanityTestMainLogFile}"
fi
echo "NB S4J workload sanity check passed!" >> "${sanityTestMainLogFile}"
echo

View File

@ -0,0 +1,4 @@
# org.apache.pulsar.client.impl.auth.AuthenticationToken
client.authPlugin=<authPlugin_tmpl>
client.authParams=<authParams_tmpl>
producer.blockIfQueueFull=true

View File

@ -0,0 +1,11 @@
# document level parameters that apply to all Pulsar client types:
params:
temporary_dest: "false"
dest_type: "queue"
async_api: "false"
blocks:
msg-consume-block:
ops:
op1:
MessageConsume: "persistent://nbtest/default/s4j-sanity"

View File

@ -0,0 +1,15 @@
bindings:
mytext_val: AlphaNumericString(30)
# document level parameters that apply to all Pulsar client types:
params:
dest_type: "queue"
async_api: "false"
blocks:
msg-produce-block:
ops:
op1:
MessageProduce: "persistent://nbtest/default/s4j-sanity"
msg_body: "{mytext_val}"
msg_type: "text"

View File

@ -0,0 +1,97 @@
#! /usr/local/bin/bash
DEBUG=false
##
# Show debug message
# - $1 : the message to show
# - $2 : (Optional) the file to log the message to
debugMsg() {
if [[ "${DEBUG}" == "true" ]]; then
local msg=${1}
local file=${2}
if [[ -z "${file}" ]]; then
echo "[Debug] ${msg}"
else
echo "[Debug] ${msg}" >> "${file}"
fi
fi
}
##
# - $1 : the exit code
# - $2 : the message to show
# - $3 : (Optional) the file to log the message to
errExit() {
local exitCode=${1}
local msg=${2}
local file=${3}
if [[ -z "${file}" ]]; then
echo "[Error] ${msg}"
else
echo "[Error] ${msg}" >> "${file}"
fi
exit ${exitCode}
}
##
# Read the properties file and returns the value based on the key
# 2 input parameters:
# - 1st parameter: the property file to scan
# - 2nd parameter: the key to search for
getPropVal() {
local propFile=$1
local searchKey=$2
local value=$(grep "${searchKey}" ${propFile} | grep -Ev "^#|^$" | cut -d'=' -f2)
echo $value
}
##
# Check if the sed being used is GNU sed
isGnuSed() {
local gnu_sed=$(sed --version 2>&1 | grep -v 'illegal\|usage\|^\s' | grep "GNU sed" | wc -l)
echo ${gnu_sed}
}
##
# Replace the occurrence of a string place holder with a specific value in a file
# Four input parameters:
# - 1st parameter: the place holder string to be replaced
# - 2nd parameter: the value string to replace the place holder
# - 3rd parameter: the file
# - 4th parameter: (Optional) a particular line identifier to replace.
# if specified, only replace the place holder in the matching line
# otherwise, replace all occurrence in the file
#
# TBD: use this function to hide GNU difference (Mac vs Linux, GNU or not)
#
replaceStringInFile() {
local placeHolderStr=${1}
local valueStr=${2}
local fileToScan=${3}
local lineIdentifier=${4}
# in case '/' is part of the string
placeHolderStr=$(echo ${placeHolderStr} | sed 's/\//\\\//g')
valueStr=$(echo ${valueStr} | sed 's/\//\\\//g')
gnuSed=$(isGnuSed)
if [[ "$OSTYPE" == "darwin"* && ${gnuSed} -eq 0 ]]; then
if ! [[ -z "${lineIdentifier// }" ]]; then
sed -i '' "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
else
sed -i '' "s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
fi
else
if ! [[ -z "${lineIdentifier// }" ]]; then
sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${funcCfgJsonFileTgt}
else
sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
fi
fi
}

View File

@ -20,7 +20,7 @@
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>5.17.1-SNAPSHOT</version>
<version>${revision}</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
@ -51,7 +51,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-cli</artifactId>
<version>5.17.1-SNAPSHOT</version>
<version>${revision}</version>
</dependency>
</dependencies>