From 046c0fc1c6458b1ae143ae6006b72f139adcaa3c Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Fri, 23 Jun 2023 21:44:30 -0500 Subject: [PATCH] Update NBS4J version to 4.0.1 (which is based on Pulsar client version 3.0.0) and NB Pulsar version to 3.0.0 --- .../dispensers/PulsarClientOpDispenser.java | 13 +- .../MessageProducerOpDispenser.java | 4 +- .../adapter/s4j/util/S4JClientConf.java | 13 +- .../main/resources/pulsar_s4j_producer.yaml | 2 +- .../resources/sanity-validation/README.md | 37 +++++ .../sanity-validation/e2e-nbs4j-sanity.sh | 131 ++++++++++++++++++ .../e2e-sanity-config.properties.tmpl | 4 + .../sanity-msg-receiver-queue.yaml | 11 ++ .../sanity-msg-sender-queue.yaml | 15 ++ .../resources/sanity-validation/utilities.sh | 97 +++++++++++++ engine-rest/pom.xml | 4 +- 11 files changed, 309 insertions(+), 22 deletions(-) create mode 100644 adapter-s4j/src/main/resources/sanity-validation/README.md create mode 100755 adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh create mode 100644 adapter-s4j/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl create mode 100644 adapter-s4j/src/main/resources/sanity-validation/sanity-msg-receiver-queue.yaml create mode 100644 adapter-s4j/src/main/resources/sanity-validation/sanity-msg-sender-queue.yaml create mode 100755 adapter-s4j/src/main/resources/sanity-validation/utilities.sh diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java index b9dabba8b..3e92da254 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java @@ -16,25 +16,21 @@ package io.nosqlbench.adapter.pulsar.dispensers; -import com.codahale.metrics.Timer; import com.codahale.metrics.Timer.Context; import io.nosqlbench.adapter.pulsar.PulsarSpace; -import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; +import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException; import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.DOC_LEVEL_PARAMS; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; -import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil; import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.function.LongFunction; import java.util.function.Predicate; import java.util.function.Supplier; @@ -93,13 +89,10 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser { .newTransaction() .build() .get(); - } catch (final ExecutionException | InterruptedException err) { + } catch (Exception err) { if (PulsarClientOpDispenser.logger.isWarnEnabled()) PulsarClientOpDispenser.logger.warn("Error while starting a new transaction", err); - throw new RuntimeException(err); - } catch (final PulsarClientException err) { - throw new RuntimeException("Transactions are not enabled on Pulsar Client, " + - "please set client.enableTransaction=true in your Pulsar Client configuration"); + throw new PulsarAdapterUnexpectedException(err); } }; } diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java index 5f3764180..c0cccc5de 100644 --- a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java @@ -58,7 +58,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser { this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM); this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM); - this.msgTypeFunc = lookupMandtoryStrOpValueFunc(MSG_TYPE_OP_PARAM); + this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM); } private Message createAndSetMessagePayload( @@ -305,7 +305,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser { logger.warn( "The specified JMS message type {} is not valid, use the default TextMessage type!", jmsMsgType); - jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label; + jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.BYTE.label; } diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java index e2a62c5a1..74706ab94 100644 --- a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java @@ -16,6 +16,7 @@ package io.nosqlbench.adapter.s4j.util; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.FileBasedConfiguration; import org.apache.commons.configuration2.PropertiesConfiguration; @@ -60,7 +61,8 @@ public class S4JClientConf { - public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) { + public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) + throws S4JAdapterUnexpectedException { ////////////////// // Read related Pulsar client configuration settings from a file @@ -156,12 +158,9 @@ public class S4JClientConf { } } } - } catch (IOException ioe) { - logger.error("Can't read the specified config properties file: " + fileName); - ioe.printStackTrace(); - } catch (ConfigurationException cex) { - logger.error("Error loading configuration items from the specified config properties file: " + fileName + ":" + cex.getMessage()); - cex.printStackTrace(); + } catch (IOException | ConfigurationException ex) { + ex.printStackTrace(); + throw new S4JAdapterUnexpectedException("Can't read the specified config properties file: " + fileName); } } diff --git a/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml b/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml index 6a0528574..3fb641a81 100644 --- a/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml +++ b/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml @@ -16,7 +16,7 @@ blocks: msg-produce-block: ops: op1: - ## The value represents the destination (queue or topic) name) + ## The value represents the destination (queue or topic) name MessageProduce: "mys4jtest_t" ## (Optional) JMS headers (in JSON format). diff --git a/adapter-s4j/src/main/resources/sanity-validation/README.md b/adapter-s4j/src/main/resources/sanity-validation/README.md new file mode 100644 index 000000000..e5753af69 --- /dev/null +++ b/adapter-s4j/src/main/resources/sanity-validation/README.md @@ -0,0 +1,37 @@ +# Overview + +The `e2e-nbs4j-sanity.sh` is used to run an end-to-end testing flow using NB S4J adapter. It includes a various +of message sender and receiver combinations that cover main S4J functionalities. This test fills gap where the +unit tests can't cover. It is suggested to always run this script before committing the code changes to the +official NB repo for the following NB modules: +* `adapter-pulasr` (Native Pulsar) +* `adapter-kafka` (Starlight for Kafka) +* `adapter-s4r` (Starlight for RabbitMQ) +* `adapter-s4j` (Starlight for JMS) + +The reason is all these modules depend upon Apache Pulsar client library, either directly or indirectly. Sometimes +conflict, including runtime ones, could happen. When the conflict happens, it may impact the testing capability +of using one of the these modules. The end to end sanity validation script is used to prevent this from happening +and thus makes sure unwanted code changes don't end up in the official repo. + +# Pulsar Cluster + +Running this script requires a live Pulsar cluster. It is recommended to use an Astra Streaming tenant for this +purpose. The corresponding Pulsar cluster connection information should be put in a `client.conf` file. + +Please **NOTE** that the target Pulsar cluster should have the following tenant/namespace/topic created before +running the script: +* nbtest/default/s4j-sanity + + +# Usage + +Simply run the script from a command line to kick off the sanity validation. +```bash +$ e2e-nbs4j-sanity.sh -c +``` + +The execution result will be recorded in a log file with the following naming pattern: +```bash +$ e2e-nbs4j-sanity-YYYYMMDDhhmmss.log +``` diff --git a/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh b/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh new file mode 100755 index 000000000..fe225a567 --- /dev/null +++ b/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh @@ -0,0 +1,131 @@ +#! /usr/local/bin/bash + +CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source "${CUR_SCRIPT_FOLDER}/utilities.sh" + +echo + +## +# Show usage info +# +usage() { + echo + echo "Usage: e2e-nbs4j-sanity.sh [-h]" + echo " [-c ]" + echo " -h : Show usage info" + echo " -c : (Optional) Pulsar cluster connection file. Default to 'client.conf' in the same directory!" + echo +} + +if [[ $# -gt 3 ]]; then + usage + errExit 10 "Incorrect input parameter count!" +fi + +pulsarClientConf="./client.conf" +while [[ "$#" -gt 0 ]]; do + case $1 in + -h) usage; exit 0 ;; + -c) pulsarClientConf=$2; shift ;; + *) errExit 20 "Unknown input parameter passed: $1"; ;; + esac + shift +done + +mkdir -p "./logs/nb5-exec" +mainLogDir="${CUR_SCRIPT_FOLDER}/logs" +nbExecLogDir="${mainLogDir}/nb5-exec" + +# 2022-08-19 11:40:23 +startTime=$(date +'%Y-%m-%d %T') +# 20220819114023 +startTime2=${startTime//[: -]/} +sanityTestMainLogFile="${mainLogDir}/e2e-nbs4j-sanity-${startTime2}.log" +echo > "${sanityTestMainLogFile}" + +debugMsg "pulsarClientConf=${pulsarClientConf}" "${sanityTestMainLogFile}" +if ! [[ -f "${pulsarClientConf}" ]]; then + errExit 30 \ + "Can't find the Pulsar cluster client.conf file at the specified location: \"" + pulsarClientConf + "\"!" \ + "${sanityTestMainLogFile}" +fi + +brokerSvcUrl=$(getPropVal ${pulsarClientConf} "brokerServiceUrl") +webSvcUrl=$(getPropVal ${pulsarClientConf} "webServiceUrl") +authPlugin=$(getPropVal ${pulsarClientConf} "authPlugin") +authParams=$(getPropVal ${pulsarClientConf} "authParams") +debugMsg "brokerSvcUrl=${brokerSvcUrl}" "${sanityTestMainLogFile}" +debugMsg "webSvcUrl=${webSvcUrl}" "${sanityTestMainLogFile}" +debugMsg "authPlugin=${authPlugin}" "${sanityTestMainLogFile}" +debugMsg "authParams=xxxxxx" "${sanityTestMainLogFile}" + +sanityS4jCfgPropFileName="e2e-sanity-config.properties" +sanityS4jCfgPropFile="${CUR_SCRIPT_FOLDER}/${sanityS4jCfgPropFileName}" +cp -rf "${CUR_SCRIPT_FOLDER}/${sanityS4jCfgPropFileName}.tmpl" "${sanityS4jCfgPropFile}" +if [[ -n "${authPlugin// }" || -n "${authParams// }" ]]; then + replaceStringInFile "" "${authPlugin}" "${sanityS4jCfgPropFileName}" + replaceStringInFile "" "${authParams}" "${sanityS4jCfgPropFileName}" +else + replaceStringInFile "" "" "${sanityS4jCfgPropFileName}" + replaceStringInFile "" "" "${sanityS4jCfgPropFileName}" +fi + +NB5JAR="${CUR_SCRIPT_FOLDER}/../../../../../nb5/target/nb5.jar" +sanityS4jMsgSenderYamlFile="${CUR_SCRIPT_FOLDER}/sanity-msg-sender-queue.yaml" +sanityS4jMsgReceiverYamlFile="${CUR_SCRIPT_FOLDER}/sanity-msg-receiver-queue.yaml" + + +{ + echo; + echo "======================================================================================================"; + echo "Starting the sanity test for the NoSQLBench S4J adapter at ${startTime} ..."; + echo; + echo " >>> Kick off an S4J message sending workload ..." + echo; +} >> "${sanityTestMainLogFile}" + +read -r -d '' nbs4jMsgSendCmd << EOM +java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \ + cycles=1000 threads=4 num_conn=2 num_session=2 \ + session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \ + service_url=${brokerSvcUrl} \ + web_url=${webSvcUrl} \ + config=${sanityS4jCfgPropFile} \ + yaml=${sanityS4jMsgSenderYamlFile} +EOM +debugMsg "nbs4jMsgSendCmd=${nbs4jMsgSendCmd}" "${sanityTestMainLogFile}" + +eval '${nbs4jMsgSendCmd}' +if [[ $? -ne 0 ]]; then + errExit 40 "Failed to kick off the S4J message sending workload!" "${sanityTestMainLogFile}" +fi + +# pause 5 seconds before kicking off the message sending workload +sleep 5 + +{ + echo; + echo " >>> Kick off an S4J message receiving workload after 30 seconds..." + echo; +} >> "${sanityTestMainLogFile}" + +read -r -d '' nbs4jMsgRecvCmd << EOM +java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \ + cycles=1000 threads=4 num_conn=2 num_session=2 \ + session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \ + service_url=${brokerSvcUrl} \ + web_url=${webSvcUrl} \ + config=${sanityS4jCfgPropFile} \ + yaml=${sanityS4jMsgReceiverYamlFile} +EOM +debugMsg "nbs4jMsgRecvCmd=${nbs4jMsgRecvCmd}" "${sanityTestMainLogFile}" + +eval '${nbs4jMsgRecvCmd}' +if [[ $? -ne 0 ]]; then + errExit 40 "Failed to kick off the S4J message receiving workload!" "${sanityTestMainLogFile}" +fi + +echo "NB S4J workload sanity check passed!" >> "${sanityTestMainLogFile}" + + +echo diff --git a/adapter-s4j/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl b/adapter-s4j/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl new file mode 100644 index 000000000..9d8ce6dc4 --- /dev/null +++ b/adapter-s4j/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl @@ -0,0 +1,4 @@ +# org.apache.pulsar.client.impl.auth.AuthenticationToken +client.authPlugin= +client.authParams= +producer.blockIfQueueFull=true diff --git a/adapter-s4j/src/main/resources/sanity-validation/sanity-msg-receiver-queue.yaml b/adapter-s4j/src/main/resources/sanity-validation/sanity-msg-receiver-queue.yaml new file mode 100644 index 000000000..1a7adaae0 --- /dev/null +++ b/adapter-s4j/src/main/resources/sanity-validation/sanity-msg-receiver-queue.yaml @@ -0,0 +1,11 @@ +# document level parameters that apply to all Pulsar client types: +params: + temporary_dest: "false" + dest_type: "queue" + async_api: "false" + +blocks: + msg-consume-block: + ops: + op1: + MessageConsume: "persistent://nbtest/default/s4j-sanity" diff --git a/adapter-s4j/src/main/resources/sanity-validation/sanity-msg-sender-queue.yaml b/adapter-s4j/src/main/resources/sanity-validation/sanity-msg-sender-queue.yaml new file mode 100644 index 000000000..dd4cb03bf --- /dev/null +++ b/adapter-s4j/src/main/resources/sanity-validation/sanity-msg-sender-queue.yaml @@ -0,0 +1,15 @@ +bindings: + mytext_val: AlphaNumericString(30) + +# document level parameters that apply to all Pulsar client types: +params: + dest_type: "queue" + async_api: "false" + +blocks: + msg-produce-block: + ops: + op1: + MessageProduce: "persistent://nbtest/default/s4j-sanity" + msg_body: "{mytext_val}" + msg_type: "text" diff --git a/adapter-s4j/src/main/resources/sanity-validation/utilities.sh b/adapter-s4j/src/main/resources/sanity-validation/utilities.sh new file mode 100755 index 000000000..1469e2985 --- /dev/null +++ b/adapter-s4j/src/main/resources/sanity-validation/utilities.sh @@ -0,0 +1,97 @@ +#! /usr/local/bin/bash + +DEBUG=false + +## +# Show debug message +# - $1 : the message to show +# - $2 : (Optional) the file to log the message to +debugMsg() { + if [[ "${DEBUG}" == "true" ]]; then + local msg=${1} + local file=${2} + + if [[ -z "${file}" ]]; then + echo "[Debug] ${msg}" + else + echo "[Debug] ${msg}" >> "${file}" + fi + + fi +} + +## +# - $1 : the exit code +# - $2 : the message to show +# - $3 : (Optional) the file to log the message to +errExit() { + local exitCode=${1} + local msg=${2} + local file=${3} + + if [[ -z "${file}" ]]; then + echo "[Error] ${msg}" + else + echo "[Error] ${msg}" >> "${file}" + fi + + exit ${exitCode} +} + +## +# Read the properties file and returns the value based on the key +# 2 input parameters: +# - 1st parameter: the property file to scan +# - 2nd parameter: the key to search for +getPropVal() { + local propFile=$1 + local searchKey=$2 + local value=$(grep "${searchKey}" ${propFile} | grep -Ev "^#|^$" | cut -d'=' -f2) + echo $value +} + +## +# Check if the sed being used is GNU sed +isGnuSed() { + local gnu_sed=$(sed --version 2>&1 | grep -v 'illegal\|usage\|^\s' | grep "GNU sed" | wc -l) + echo ${gnu_sed} +} + + +## +# Replace the occurrence of a string place holder with a specific value in a file +# Four input parameters: +# - 1st parameter: the place holder string to be replaced +# - 2nd parameter: the value string to replace the place holder +# - 3rd parameter: the file +# - 4th parameter: (Optional) a particular line identifier to replace. +# if specified, only replace the place holder in the matching line +# otherwise, replace all occurrence in the file +# +# TBD: use this function to hide GNU difference (Mac vs Linux, GNU or not) +# +replaceStringInFile() { + local placeHolderStr=${1} + local valueStr=${2} + local fileToScan=${3} + local lineIdentifier=${4} + + # in case '/' is part of the string + placeHolderStr=$(echo ${placeHolderStr} | sed 's/\//\\\//g') + valueStr=$(echo ${valueStr} | sed 's/\//\\\//g') + + gnuSed=$(isGnuSed) + if [[ "$OSTYPE" == "darwin"* && ${gnuSed} -eq 0 ]]; then + if ! [[ -z "${lineIdentifier// }" ]]; then + sed -i '' "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan} + else + sed -i '' "s/${placeHolderStr}/${valueStr}/g" ${fileToScan} + fi + else + if ! [[ -z "${lineIdentifier// }" ]]; then + sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${funcCfgJsonFileTgt} + else + sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan} + fi + fi +} diff --git a/engine-rest/pom.xml b/engine-rest/pom.xml index df8992445..42a047220 100644 --- a/engine-rest/pom.xml +++ b/engine-rest/pom.xml @@ -20,7 +20,7 @@ mvn-defaults io.nosqlbench - 5.17.1-SNAPSHOT + ${revision} ../mvn-defaults @@ -51,7 +51,7 @@ io.nosqlbench engine-cli - 5.17.1-SNAPSHOT + ${revision}