From ccb1e15d3451a1a547ef9ee5430b22ead8fa22ea Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Mon, 26 Jun 2023 12:05:02 -0500 Subject: [PATCH] Update NB S4R, add README doc and testing scripts --- .../sanity-validation/e2e-nbs4j-sanity.sh | 24 ++- .../resources/sanity-validation/utilities.sh | 20 ++- adapter-s4r/pom.xml | 7 +- .../io/nosqlbench/adapter/s4r/S4RSpace.java | 104 +++++++----- .../s4r/dispensers/AmqpBaseOpDispenser.java | 64 ++++---- .../dispensers/AmqpMsgRecvOpDispenser.java | 151 ++++++++++++------ .../dispensers/AmqpMsgSendOpDispenser.java | 98 ++++++++---- .../s4r/ops/OpTimeTrackAmqpMsgRecvOp.java | 33 ++-- .../s4r/ops/OpTimeTrackAmqpMsgSendOp.java | 29 ++-- .../adapter/s4r/util/S4RAdapterUtil.java | 21 +-- adapter-s4r/src/main/resources/README.md | 124 ++++++++------ .../src/main/resources/build-nb-s4r-driver.sh | 24 --- .../src/main/resources/csv/exchange_names.csv | 4 - .../src/main/resources/csv/queue_names.csv | 3 - .../src/main/resources/s4r_config.properties | 10 +- .../src/main/resources/s4r_consumer.yaml | 18 --- .../src/main/resources/s4r_msg_receiver.yaml | 12 ++ .../sanity-validation/e2e-nbs4r-sanity.sh | 137 ++++++++++++++++ .../e2e-sanity-config.properties.tmpl | 11 ++ .../sanity-validation/e2e-sanity-sender.yaml | 13 ++ .../resources/sanity-validation/utilities.sh | 113 +++++++++++++ .../src/main/resources/start_s4r_consumer.sh | 36 ----- .../src/main/resources/start_s4r_producer.sh | 38 ----- 23 files changed, 714 insertions(+), 380 deletions(-) delete mode 100755 adapter-s4r/src/main/resources/build-nb-s4r-driver.sh delete mode 100644 adapter-s4r/src/main/resources/csv/exchange_names.csv delete mode 100644 adapter-s4r/src/main/resources/csv/queue_names.csv delete mode 100644 adapter-s4r/src/main/resources/s4r_consumer.yaml create mode 100644 adapter-s4r/src/main/resources/s4r_msg_receiver.yaml create mode 100755 adapter-s4r/src/main/resources/sanity-validation/e2e-nbs4r-sanity.sh create mode 100644 adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl create mode 100644 adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-sender.yaml create mode 100755 adapter-s4r/src/main/resources/sanity-validation/utilities.sh delete mode 100755 adapter-s4r/src/main/resources/start_s4r_consumer.sh delete mode 100755 adapter-s4r/src/main/resources/start_s4r_producer.sh diff --git a/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh b/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh index fe225a567..45e4f0bac 100755 --- a/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh +++ b/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh @@ -1,5 +1,21 @@ #! /usr/local/bin/bash +## +# Copyright (c) 2022-2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) source "${CUR_SCRIPT_FOLDER}/utilities.sh" @@ -86,12 +102,12 @@ sanityS4jMsgReceiverYamlFile="${CUR_SCRIPT_FOLDER}/sanity-msg-receiver-queue.yam read -r -d '' nbs4jMsgSendCmd << EOM java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \ - cycles=1000 threads=4 num_conn=2 num_session=2 \ + cycles=200 threads=4 num_conn=2 num_session=2 \ session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \ service_url=${brokerSvcUrl} \ web_url=${webSvcUrl} \ config=${sanityS4jCfgPropFile} \ - yaml=${sanityS4jMsgSenderYamlFile} + workload=${sanityS4jMsgSenderYamlFile} EOM debugMsg "nbs4jMsgSendCmd=${nbs4jMsgSendCmd}" "${sanityTestMainLogFile}" @@ -111,12 +127,12 @@ sleep 5 read -r -d '' nbs4jMsgRecvCmd << EOM java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \ - cycles=1000 threads=4 num_conn=2 num_session=2 \ + cycles=200 threads=4 num_conn=2 num_session=2 \ session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \ service_url=${brokerSvcUrl} \ web_url=${webSvcUrl} \ config=${sanityS4jCfgPropFile} \ - yaml=${sanityS4jMsgReceiverYamlFile} + workload=${sanityS4jMsgReceiverYamlFile} EOM debugMsg "nbs4jMsgRecvCmd=${nbs4jMsgRecvCmd}" "${sanityTestMainLogFile}" diff --git a/adapter-s4j/src/main/resources/sanity-validation/utilities.sh b/adapter-s4j/src/main/resources/sanity-validation/utilities.sh index 1469e2985..a6e31ace1 100755 --- a/adapter-s4j/src/main/resources/sanity-validation/utilities.sh +++ b/adapter-s4j/src/main/resources/sanity-validation/utilities.sh @@ -1,6 +1,22 @@ #! /usr/local/bin/bash -DEBUG=false +## +# Copyright (c) 2022-2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +DEBUG=true ## # Show debug message @@ -89,7 +105,7 @@ replaceStringInFile() { fi else if ! [[ -z "${lineIdentifier// }" ]]; then - sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${funcCfgJsonFileTgt} + sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan} else sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan} fi diff --git a/adapter-s4r/pom.xml b/adapter-s4r/pom.xml index 014aed4ea..0e1385970 100644 --- a/adapter-s4r/pom.xml +++ b/adapter-s4r/pom.xml @@ -50,7 +50,6 @@ ${amqp.version} - commons-beanutils commons-beanutils @@ -63,6 +62,12 @@ commons-configuration2 2.9.0 + + + commons-io + commons-io + 2.13.0 + diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java index 645a94aeb..e66c4cd22 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java @@ -27,14 +27,18 @@ import io.nosqlbench.api.config.standard.ConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.Param; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.File; import java.io.IOException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,13 +75,16 @@ public class S4RSpace implements AutoCloseable { // Maximum number of AMQP channels per connection private final int amqpConnChannelNum; + // Maximum number of AMQP exchanges per channel + private final int amqpChannelExchangeNum; + // Max number of queues (per exchange) // - only relevant with message receivers private final int amqpExchangeQueueNum; // Max number of message clients (senders or receivers) // - for senders, this is the number of message senders per exchange - // - for recievers, this is the number of message receivers per queue + // - for receivers, this is the number of message receivers per queue // (there could be multiple queues per exchange) private final int amqpMsgClntNum; @@ -91,18 +98,9 @@ public class S4RSpace implements AutoCloseable { private final ConcurrentHashMap amqpConnections = new ConcurrentHashMap<>(); - /////////////////////////////////// - // NOTE: Do NOT mix sender and receiver workload in one NB workload - /////////////////////////////////// - - // Amqp Channels for senders - public record AmqpSenderChannelKey(Long connId, Long channelId, Long senderId) { } - private final ConcurrentHashMap amqpSenderChannels = new ConcurrentHashMap<>(); - - // Amqp Channels for receivers - public record AmqpReceiverChannelKey(Long connId, Long channelId, Long queueId, Long consumerId) { } - private final ConcurrentHashMap amqpReceiverChannels = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> amqpRecvChannelQueueSetMap = new ConcurrentHashMap<>(); + // Amqp connection/chanel/exchange combination for a sender + public record AmqpChannelKey(Long connId, Long channelId) { } + private final ConcurrentHashMap amqpChannels = new ConcurrentHashMap<>(); // Whether to do strict error handling while sending/receiving messages // - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing @@ -129,6 +127,8 @@ public class S4RSpace implements AutoCloseable { NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1")); this.amqpConnChannelNum = NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1")); + this.amqpChannelExchangeNum = + NumberUtils.toInt(cfg.getOptional("num_exchange").orElse("1")); this.amqpExchangeQueueNum = NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1")); this.amqpMsgClntNum = @@ -155,6 +155,12 @@ public class S4RSpace implements AutoCloseable { .setDescription("Maximum number of AMQP connections.")) .add(Param.defaultTo("num_channel", 1) .setDescription("Maximum number of AMQP channels per connection")) + .add(Param.defaultTo("num_exchange", 1) + .setDescription("Maximum number of AMQP exchanges per channel.")) + .add(Param.defaultTo("num_queue", 1) + .setDescription("Max number of queues per exchange (only relevant for receivers).")) + .add(Param.defaultTo("num_msg_clnt", 1) + .setDescription("Max number of message clients per exchange (sender) or per queue (receiver).")) .add(Param.defaultTo("max_op_time", 0) .setDescription("Maximum time (in seconds) to run NB Kafka testing scenario.")) .add(Param.defaultTo("strict_msg_error_handling", false) @@ -164,16 +170,10 @@ public class S4RSpace implements AutoCloseable { public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); } - public Channel getAmqpSenderChannel( - AmqpSenderChannelKey key, + public Channel getAmqpChannels( + AmqpChannelKey key, Supplier channelSupplier) { - return amqpSenderChannels.computeIfAbsent(key, __ -> channelSupplier.get()); - } - - public Channel getAmqpReceiverChannel( - AmqpReceiverChannelKey key, - Supplier channelSupplier) { - return amqpReceiverChannels.computeIfAbsent(key, __ -> channelSupplier.get()); + return amqpChannels.computeIfAbsent(key, __ -> channelSupplier.get()); } public long getActivityStartTimeMills() { return this.activityStartTimeMills; } @@ -183,7 +183,8 @@ public class S4RSpace implements AutoCloseable { public String getAmqpExchangeType() { return amqpExchangeType; } public int getAmqpConnNum() { return this.amqpConnNum; } public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; } - public int getAmqpExchangeQueueNum() { return this.amqpConnNum; } + public int getAmqpChannelExchangeNum() { return this.amqpChannelExchangeNum; } + public int getAmqpExchangeQueueNum() { return this.amqpExchangeQueueNum; } public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; } public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; } @@ -218,19 +219,50 @@ public class S4RSpace implements AutoCloseable { try { s4rConnFactory = new ConnectionFactory(); - String passWord = cfg.get("jwtToken"); - s4rConnFactory.setPassword(cfgMap.get("")); - s4rConnFactory.setPassword(passWord); - - String amqpServerHost = cfg.get("amqpSrvHost"); + String amqpServerHost = cfgMap.get("amqpSrvHost"); + if (StringUtils.isBlank(amqpServerHost)) { + String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!"; + throw new S4RAdapterInvalidParamException(errMsg); + } s4rConnFactory.setHost(amqpServerHost); - int amqpServerPort = Integer.parseInt(cfg.get("amqpSrvPort")); - s4rConnFactory.setPort(amqpServerPort); + String amqpSrvPortCfg = cfgMap.get("amqpSrvPort"); + if (StringUtils.isBlank(amqpSrvPortCfg)) { + String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!"; + throw new S4RAdapterInvalidParamException(errMsg); + } + s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg)); - String amqpVirtualHost = cfg.get("virtualHost"); + String amqpVirtualHost = cfgMap.get("virtualHost"); + if (StringUtils.isBlank(amqpVirtualHost)) { + String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!"; + throw new S4RAdapterInvalidParamException(errMsg); + } s4rConnFactory.setVirtualHost(amqpVirtualHost); + String userNameCfg = cfgMap.get("amqpUser"); + + String passWordCfg = cfgMap.get("amqpPassword"); + if (StringUtils.isNotBlank(passWordCfg)) { + String passWord = passWordCfg; + if (StringUtils.startsWith(passWordCfg, "file://") + && StringUtils.length(passWordCfg) > 7) { + String jwtTokenFile = StringUtils.substring(passWordCfg, 7); + passWord = FileUtils.readFileToString(new File(jwtTokenFile), "UTF-8"); + } + + if (StringUtils.isNotBlank(passWord)) { + if (StringUtils.isBlank(userNameCfg)) { + s4rConnFactory.setUsername(""); + } + s4rConnFactory.setPassword(passWord); + } + } + + String useTlsCfg = cfgMap.get("useTls"); + if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) { + s4rConnFactory.useSslProtocol(); + } for (int i = 0; i < getAmqpConnNum(); i++) { Connection connection = s4rConnFactory.newConnection(); @@ -243,7 +275,7 @@ public class S4RSpace implements AutoCloseable { connection); } } - } catch (IOException|TimeoutException ex) { + } catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) { logger.error("Unable to establish AMQP connections with the following configuration parameters: {}", s4rClientConnInfo.toString()); throw new S4RAdapterUnexpectedException(ex); @@ -255,11 +287,7 @@ public class S4RSpace implements AutoCloseable { try { beingShutdown.set(true); - for (Channel channel : amqpSenderChannels.values()) { - channel.close(); - } - - for (Channel channel : amqpReceiverChannels.values()) { + for (Channel channel : amqpChannels.values()) { channel.close(); } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java index 109b37898..4f99b5db9 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java @@ -16,9 +16,7 @@ package io.nosqlbench.adapter.s4r.dispensers; -import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; import io.nosqlbench.adapter.s4r.S4RSpace; import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; @@ -26,7 +24,6 @@ import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,8 +43,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser s4rConfMap = new HashMap<>(); protected final String exchangeType; - protected final LongFunction exchangeNameFunc; - protected AmqpBaseOpDispenser(final DriverAdapter adapter, final ParsedOp op, final S4RSpace s4RSpace) { @@ -63,7 +58,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser bindingKeyFunc; - private final LongFunction queueNameFunc; public AmqpMsgRecvOpDispenser(DriverAdapter adapter, ParsedOp op, S4RSpace s4rSpace) { super(adapter, op, s4rSpace); - - queueNameFunc = lookupOptionalStrOpValueFunc("queue_name", null); bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); } private long getExchangeQueueSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) - % s4rSpace.getAmqpExchangeQueueNum(); + return (cycle / ((long) s4rSpace.getAmqpConnNum() * + s4rSpace.getAmqpConnChannelNum() * + s4rSpace.getAmqpChannelExchangeNum()) + ) % s4rSpace.getAmqpExchangeQueueNum(); } private long getQueueReceiverSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum() * s4rSpace.getAmqpExchangeQueueNum())) - % s4rSpace.getAmqpMsgClntNum(); + return (cycle / ((long) s4rSpace.getAmqpConnNum() * + s4rSpace.getAmqpConnChannelNum() * + s4rSpace.getAmqpChannelExchangeNum() * + s4rSpace.getAmqpExchangeQueueNum()) + ) % s4rSpace.getAmqpMsgClntNum(); } - private String getEffectiveQueueName(long cycle) { - String queueNameInput = queueNameFunc.apply(cycle); - return (StringUtils.isBlank(queueNameInput) ? "queue-" + getExchangeQueueSeqNum(cycle) : queueNameInput); + private String getEffectiveQueueNameByCycle(long cycle) { + return getEffectiveQueueName( + getConnSeqNum(cycle), + getConnChannelSeqNum(cycle), + getChannelExchangeSeqNum(cycle), + getExchangeQueueSeqNum(cycle)); + } + private String getEffectiveQueueName(long connSeqNum, long channelSeqNum, long exchangeSeqNum, long queueSeqNum) { + return "queue-" + connSeqNum + "-" + channelSeqNum + "-" + exchangeSeqNum + "-" + queueSeqNum; } - private Channel getAmqpChannelQueueForReceiver(long cycle, - String exchangeName, - String queueName) { + private String getEffectiveReceiverName(long cycle) { + return getEffectiveReceiverName( + getConnSeqNum(cycle), + getConnChannelSeqNum(cycle), + getChannelExchangeSeqNum(cycle), + getExchangeQueueSeqNum(cycle), + getQueueReceiverSeqNum(cycle)); + } + private String getEffectiveReceiverName(long connSeqNum, + long channelSeqNum, + long exchangeSeqNum, + long queueSeqNum, + long receiverSeqNum) { + return String.format( + "receiver-%d-%d-%d-%d-%d", + connSeqNum, + channelSeqNum, + exchangeSeqNum, + queueSeqNum, + receiverSeqNum); + } + + private Channel getAmqpChannelForReceiver(long cycle) { long connSeqNum = getConnSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle); - long queueSeqNum = getExchangeQueueSeqNum(cycle); - long receiverSeqNum = getQueueReceiverSeqNum(cycle); - Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); + Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); + S4RSpace.AmqpChannelKey amqpConnChannelKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum); - S4RSpace.AmqpReceiverChannelKey amqpConnChannelKey = - new S4RSpace.AmqpReceiverChannelKey(connSeqNum, channelSeqNum, queueSeqNum, receiverSeqNum); - - return s4rSpace.getAmqpReceiverChannel(amqpConnChannelKey, () -> { + return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> { Channel channel = null; try { - channel = getChannelWithExchange( - amqpConnection, - connSeqNum, - channelSeqNum, - exchangeName); - - AMQP.Queue.DeclareOk declareOk = - channel.queueDeclare(queueName, true, true, true, null); + channel = amqpConnection.createChannel(); if (logger.isDebugEnabled()) { - logger.debug("AMQP queue declared -- [exchange name: {}, queue name: {}] {}", - exchangeName, - queueName, - declareOk); + logger.debug("Created channel for amqp connection: {}, channel: {}", + amqpConnection, channel); + } + } + catch (IOException ex) { + // Do not throw exception here, just log it and return null + if (logger.isDebugEnabled()) { + logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex); } - } catch (IOException ex) { - throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!"); } return channel; }); } + @Override public S4RTimeTrackOp apply(long cycle) { - Channel channel = null; + Channel channel = getAmqpChannelForReceiver(cycle); + if (channel == null) { + throw new S4RAdapterUnexpectedException( + String.format( + "Failed to get AMQP channel for receiver %s [%d]!", + getEffectiveReceiverName(cycle), + cycle)); + } - String exchangeName = getEffectiveExchangeName(cycle); - String queueName = getEffectiveQueueName(cycle); + String exchangeName = getEffectiveExchangeNameByCycle(cycle); + declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); + + boolean durable = true; + boolean exclusive = true; + boolean autoDelete = false; + String queueName = getEffectiveQueueNameByCycle(cycle); + String bindingKey = bindingKeyFunc.apply(cycle); + try { + channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); + if (logger.isTraceEnabled()) { + logger.debug("AMQP queue is declared - \"{} ({}/{}/{})\" on exchange \"{}\" for a receiver!", + queueName, + durable, + exclusive, + autoDelete, + exchangeName); + } + } + catch (IOException ex) { + throw new S4RAdapterUnexpectedException( + String.format( + "Unable to declare the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" for a receiver!", + queueName, durable, exclusive, autoDelete, exchangeName) + ); + } try { - channel = getAmqpChannelQueueForReceiver(cycle, exchangeName, queueName); + // Binding the same queue multiple times on one exchange is considered as a no-op + channel.queueBind(queueName, exchangeName, bindingKey); + if (logger.isTraceEnabled()) { + logger.debug("AMQP queue is bound - \"{} ({}/{}/{})\" on exchange \"{}\" with binding key \"{}\"!", + queueName, + durable, + exclusive, + autoDelete, + exchangeName, + bindingKey); + } } - catch (Exception ex) { - throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel!"); + catch (IOException ex) { + throw new S4RAdapterUnexpectedException( + String.format( + "Unable to bind the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" with binding key \"%s\"!", + queueName, durable, exclusive, autoDelete, exchangeName, bindingKey) + ); } return new OpTimeTrackAmqpMsgRecvOp( @@ -121,7 +183,6 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { s4rSpace, channel, exchangeName, - queueName, - bindingKeyFunc.apply(cycle)); + queueName); } } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java index 803da6fb5..e08a43ca8 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java @@ -68,7 +68,8 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { .orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label); if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) { throw new S4RAdapterInvalidParamException("confirm_mode", - "Must be one following valid values: '" + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); + "The provided value \"" + confirmMode + "\" is not one of following valid values: '" + + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); } confirmBatchNum = parsedOp @@ -86,62 +87,87 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { } private long getExchangeSenderSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) - % s4rSpace.getAmqpMsgClntNum(); + return (cycle / ((long) s4rSpace.getAmqpConnNum() * + s4rSpace.getAmqpConnChannelNum() * + s4rSpace.getAmqpChannelExchangeNum()) + ) % s4rSpace.getAmqpMsgClntNum(); } - private Channel getAmqpChannelForSender(long cycle, - String exchangeName) { + private String getEffectiveSenderNameByCycle(long cycle) { + return getEffectiveSenderNameByCycle( + getConnSeqNum(cycle), + getConnChannelSeqNum(cycle), + getChannelExchangeSeqNum(cycle), + getExchangeSenderSeqNum(cycle)); + } + private String getEffectiveSenderNameByCycle(long connSeqNum, + long channelSeqNum, + long exchangeSeqNum, + long senderSeqNum) { + return String.format( + "sender-%d-%d-%d-%d", + connSeqNum, + channelSeqNum, + exchangeSeqNum, + senderSeqNum); + } + + private Channel getAmqpChannelForSender(long cycle) { long connSeqNum = getConnSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle); - long senderSeqNum = getExchangeSenderSeqNum(cycle); - Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); + Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); + S4RSpace.AmqpChannelKey senderKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum); - S4RSpace.AmqpSenderChannelKey amqpConnChannelKey = - new S4RSpace.AmqpSenderChannelKey(connSeqNum, channelSeqNum, senderSeqNum); - - return s4rSpace.getAmqpSenderChannel(amqpConnChannelKey, () -> { - Channel channel; + return s4rSpace.getAmqpChannels(senderKey, () -> { + Channel channel = null; try { - channel = getChannelWithExchange( - amqpConnection, - connSeqNum, - channelSeqNum, - exchangeName); + channel = amqpConnection.createChannel(); + if (logger.isDebugEnabled()) { + logger.debug("Created channel for amqp connection: {}, channel: {}", + amqpConnection, channel); + } + } + catch (IOException ex) { + // Do not throw exception here, just log it and return null + if (logger.isDebugEnabled()) { + logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex); + } + } - if (publisherConfirm) { + try { + if ((channel != null) && publisherConfirm) { channel.confirmSelect(); - boolean asyncConfirm = false; if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) { - asyncConfirm = true; - channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed if (logger.isTraceEnabled()) { - logger.debug("Async ack of message publish received: {}, {}", + logger.debug("Async ack received for a published message: {}, {}", sequenceNumber, multiple); } }, (sequenceNumber, multiple) -> { // code when message is nack-ed if (logger.isTraceEnabled()) { - logger.debug("Async n-ack of message publish received: {}, {}", + logger.debug("Async n-ack received of a published message: {}, {}", sequenceNumber, multiple); } }); } - if (logger.isDebugEnabled()) { - logger.debug("Publisher Confirms enabled on AMQP channel (sync: {}) -- {}", - !asyncConfirm, + if (logger.isTraceEnabled()) { + logger.debug("Publisher Confirms is enabled on AMQP channel: {}({}), {}", + confirmMode, + confirmBatchNum, channel); } } } catch (IOException ex) { - throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!"); + throw new S4RAdapterUnexpectedException( + "Failed to enable publisher acknowledgement on the AMQP channel (" + + channel + ")!"); } return channel; @@ -155,15 +181,17 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!"); } - Channel channel; - String exchangeName = getEffectiveExchangeName(cycle); + Channel channel = getAmqpChannelForSender(cycle); + if (channel == null) { + throw new S4RAdapterUnexpectedException( + String.format( + "Failed to get AMQP channel for sender %s [%d]!", + getEffectiveSenderNameByCycle(cycle), + cycle)); + } - try { - channel = getAmqpChannelForSender(cycle, exchangeName); - } - catch (Exception ex) { - throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel for sending messages!"); - } + String exchangeName = getEffectiveExchangeNameByCycle(cycle); + declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); return new OpTimeTrackAmqpMsgSendOp( s4rAdapterMetrics, diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java index 2ad551ead..c780e3012 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java @@ -31,27 +31,16 @@ import java.nio.charset.StandardCharsets; public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp"); - private final String queueName; - private final String bindingKey; + public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics, S4RSpace s4rSpace, Channel channel, String exchangeName, - String queueName, - String bindingKey) { + String queueName) { super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); this.queueName = queueName; - this.bindingKey = bindingKey; - - try { - channel.queueBind(queueName, exchangeName, bindingKey); - } - catch (IOException ex) { - throw new S4RAdapterUnexpectedException("Unable to bind queue (\"" + queueName + "\") to " + - "exchange (\"" + exchangeName + "\")!"); - } } @Override @@ -59,20 +48,26 @@ public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { try { Consumer receiver = new DefaultConsumer(channel) { @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, - byte[] body) throws IOException { + public void handleDelivery( + String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + { + String routingKey = envelope.getRoutingKey(); + String contentType = properties.getContentType(); + String msgPayload = new String(body, StandardCharsets.UTF_8); + if (logger.isTraceEnabled()) { - String msgPayload = new String(body, StandardCharsets.UTF_8); - logger.trace("Successfully received message ({}) via consumer ({}) in the current channel: {}", + logger.trace( + "Successfully received message ({}) via consumer ({}/{}/{}) in the current channel: {}", msgPayload, consumerTag, + routingKey, + contentType, channel); } } }; - channel.basicConsume(queueName, receiver); - + channel.basicConsume(queueName, true, receiver); } catch (IOException e) { throw new S4RAdapterUnexpectedException( diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java index 9c9795fcc..b19a035a7 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java @@ -28,6 +28,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; @@ -40,8 +41,8 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { private final String confirmMode; private final int confirmBatchNum; - private static final ThreadLocal - publishConfirmBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); + private static final ConcurrentHashMap + channelPublishConfirmBathTracking = new ConcurrentHashMap<>(); public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics, S4RSpace s4rSpace, @@ -73,34 +74,40 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { routingKey, null, msgPayload.getBytes(StandardCharsets.UTF_8)); + if (logger.isTraceEnabled()) { + logger.trace("Successfully published message (({}) {}) via the current channel: {}", + cycle, msgPayload, channel); + } if (publishConfirm) { // Individual publish confirm if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) { channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + if (logger.isTraceEnabled()) { + logger.debug("Sync ack received for an individual published message: {}", cycle); + } } // Batch publish confirm else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) { - int publishConfirmTrackingCnt = publishConfirmBatchTrackingCnt.get(); + int publishConfirmTrackingCnt = + channelPublishConfirmBathTracking.getOrDefault(channel, 0); + if ( (publishConfirmTrackingCnt > 0) && ( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) || (publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) { - synchronized (this) { - channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + if (logger.isTraceEnabled()) { + logger.debug("Sync ack received for a batch of published message: {}, {}", + cycle, publishConfirmTrackingCnt); } } else { - publishConfirmBatchTrackingCnt.set(publishConfirmTrackingCnt+1); + channelPublishConfirmBathTracking.put(channel, publishConfirmTrackingCnt+1); } } // Async publish confirm // - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser' } - - if (logger.isTraceEnabled()) { - logger.trace("Successfully published message ({}) via the current channel: {}", - msgPayload, channel); - } } catch (IllegalStateException ex) { throw new S4RAdapterUnexpectedException( diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java index 9009d26cc..4659e44f9 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java @@ -30,18 +30,6 @@ import java.util.stream.Stream; public class S4RAdapterUtil { private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class); - /////// - // Valid document level parameters for JMS NB yaml file - public enum DOC_LEVEL_PARAMS { - // Blocking message producing or consuming - ASYNC_API("async_api"); - public final String label; - - DOC_LEVEL_PARAMS(final String label) { - this.label = label; - } - } - public enum AMQP_EXCHANGE_TYPES { DIRECT("direct"), FANOUT("fanout"), @@ -83,19 +71,14 @@ public class S4RAdapterUtil { } } public static String getValidAmqpPublisherConfirmModeList() { - return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", "); + return StringUtils.join(AMQP_PUB_CONFIRM_MODE.LABELS, ", "); } // At least 20 messages in a publishing batch - public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 20; + public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 10; public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100; public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000; - public static Map convertJsonToMap(final String jsonStr) throws Exception { - final ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonStr, new TypeReference>(){}); - } - public static void pauseCurThreadExec(final int pauseInSec) { if (0 < pauseInSec) try { Thread.sleep(pauseInSec * 1000L); diff --git a/adapter-s4r/src/main/resources/README.md b/adapter-s4r/src/main/resources/README.md index 3e66eb7cc..3a915e5c7 100644 --- a/adapter-s4r/src/main/resources/README.md +++ b/adapter-s4r/src/main/resources/README.md @@ -1,62 +1,92 @@ -# Overview +- [1. Overview](#1-overview) +- [2. NB S4R Usage](#2-nb-s4r-usage) + - [2.1. CLI Examples](#21-cli-examples) + - [2.2. CLI parameters](#22-cli-parameters) + - [2.3. Workload Definition](#23-workload-definition) + - [2.4. Configuration Properties](#24-configuration-properties) + - [2.4.1. Global Properties File](#241-global-properties-file) + - [2.4.2. Scenario Document Level Properties](#242-scenario-document-level-properties) -This NB Kafka adapter allows publishing messages to or consuming messages from -* a Kafka cluster, or -* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar. +--- -At high level, this adapter supports the following Kafka functionalities -* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers) -* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits) - * auto message commit - * manual message commit with a configurable number of message commits in one batch -* Kafka Transaction support +# 1. Overview -## Example NB Yaml -* [kafka_producer.yaml](./s4r_producer.yaml) -* -* [kafka_consumer.yaml](./s4r_consumer.yaml) +This NB S4R adapter allows sending messages to or receiving messages from +* an AMQP 0-9-1 based server (e.g. RabbitMQ), or +* a Pulsar cluster with [S4R](https://github.com/datastax/starlight-for-rabbitmq) AMQP (0-9-1) Protocol handler for Pulsar. -# Usage +At high level, this adapter supports the following AMQP 0-9-1 functionalities +* Creating AMQP connections and channels +* Declaring AMQP exchanges + * The following exchange types are supported: `direct`, `fanout`, `topic`, and `headers` +* Sending messages to AMQP exchanges with sync. or async. publisher confirms + * For sync confirms, it supports both single and batch confirms + * Supports message-send based on routing keys +* Declaring and binding AMQP queues + * Supports message-receive based on binding keys +* Receiving messages from AMQP queues with async. consumer acks + +# 2. NB S4R Usage + +## 2.1. CLI Examples ```bash -## Kafka Producer -$ run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 +## AMQP Message Sender +$ run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \ + threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \ + workload=./s4r_msg_sender.yaml \ + config=./s4r_config.properties -## Kafka Consumer -$ run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 +## AMQP Message Receiver +$ run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \ + threads=8 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 \ + workload=./s4r_msg_receiver.yaml \ + config=./s4r_config.properties ``` -## NB Kafka adapter specific CLI parameters +## 2.2. CLI parameters -* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from - * For producer workload, this is the number of the producer threads to publish messages to the same topic - * Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe) - * `threads` and `num_clnt` values MUST be the same. - * For consumer workload, this is the partition number of a topic - * Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number. - * Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe) - * `threads` MUST be equal to `num_clnt`*`num_cons_grp` +The following CLI parameters are unique to the S4R adapter: -* `num_cons_grp`: the number of consumer groups - * Only relevant for consumer workload +* `num_conn`: the number of AMQP connections to create +* `num_channel`: the number of AMQP channels to create for each connection +* `num_exchange`: the number of AMQP exchanges to create for each channel +* `num_queue`: the number of AMQP queues to create for each channel (only relevant for message receiver workload) +* `num_msg_client`: the number of message clients to create for each channel + * for message sender workload, it is the number of message publishers for each exchange + * for message receiver workload, it is the number of message consumers for each queue +## 2.3. Workload Definition +The example workload YAML files can be found from: -For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported: +* [s4r_msg_sender.yaml](s4r_msg_sender.yaml) +* [s4r_msg_receiver.yaml](s4r_msg_receiver.yaml) -* `async_api` (boolean): - * When true, use async Kafka client API. -* `seq_tracking` (boolean): - * When true, a sequence number is created as part of each message's properties - * This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully. -* `seqerr_simu`: - * A list of error simulation types separated by comma (,) - * Valid error simulation types - * `out_of_order`: simulate message out of sequence - * `msg_loss`: simulate message loss - * `msg_dup`: simulate message duplication - * This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments. -* `e2e_starting_time_source`: - * Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. - * The possible values for `e2e_starting_time_source`: - * `message_publish_time` : uses the message publishing timestamp as the starting time. The message publishing time, in this case, [is computed by the Kafka client on record generation](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). This is the case, as [`CreateTime` is the default](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-type). +## 2.4. Configuration Properties + +### 2.4.1. Global Properties File + +A global S4R properties file can be specified via the `config` CLI parameter. It includes the following required properties: +* `amqpSrvHost`: AMQP server host (e.g. An Astra Streaming cluster with S4R enabled) +* `amqpSrvPort`: AMQP server port (for S4R enabled Astra Streaming, it is 5671) +* `virtualHost`: AMQP server virtual host (for S4R enabled Astra Streaming, it is "/rabbitmq") +* `amqpUser`: AMQP user (for S4R enabled Astra Streaming, it is an empty string) +* `amqpPassword`: AMQP password (for S4R enabled Astra Streaming, it is the JWT token file path) +* `useTls`: whether to use TLS (for S4R enabled Astra Streaming, it is true) +* `exchangeType`: AMQP exchange type (e.g. `direct`, `fanout`, `topic`, or `headers`) + +An example of this file can be found from: [s4r_config.properties](./s4r_config.properties) + +### 2.4.2. Scenario Document Level Properties + +For message sender workload, the following Document level configuration parameters are supported in the YAML file: +* `publisher_confirm`: whether to use publisher confirms +* `confirm_mode`: When `publisher_confirm` is true, the following 3 confirm modes are supported: + * `individual`: wait for confirm individually + * `batch`: wait for confirm in batch + * `async`: [default] no wait for confirm +* `confirm_batch_num`: batch size for waiting for **sync** publisher confirms + * Only relevant when `publisher_confirm` is true and `confirm_mode` is "batch" +* `dft_confirm_timeout_ms`: batch size for waiting for publisher confirms + * Only relevant when `publisher_confirm` is true and `confirm_mode` is **NOT** "async" diff --git a/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh b/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh deleted file mode 100755 index bdb55507f..000000000 --- a/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/local/bin/bash -# -# Copyright (c) 2023 nosqlbench -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -: "${SKIP_TESTS:=1}" -( - cd "$(git rev-parse --show-toplevel)" && \ - mvn clean install "-DskipTests" -pl adapters-api,adapter-s4r,nb5 && \ - [[ ${SKIP_TESTS} -ne 1 ]] && \ - mvn test -pl adapters-api,adapter-s4r -) diff --git a/adapter-s4r/src/main/resources/csv/exchange_names.csv b/adapter-s4r/src/main/resources/csv/exchange_names.csv deleted file mode 100644 index 3feca9f3f..000000000 --- a/adapter-s4r/src/main/resources/csv/exchange_names.csv +++ /dev/null @@ -1,4 +0,0 @@ -name -usa -canada -german diff --git a/adapter-s4r/src/main/resources/csv/queue_names.csv b/adapter-s4r/src/main/resources/csv/queue_names.csv deleted file mode 100644 index 7c75221b9..000000000 --- a/adapter-s4r/src/main/resources/csv/queue_names.csv +++ /dev/null @@ -1,3 +0,0 @@ -queue1 -queue2 -queue3 diff --git a/adapter-s4r/src/main/resources/s4r_config.properties b/adapter-s4r/src/main/resources/s4r_config.properties index 6c75f2ac8..bfbfc74cc 100644 --- a/adapter-s4r/src/main/resources/s4r_config.properties +++ b/adapter-s4r/src/main/resources/s4r_config.properties @@ -18,7 +18,13 @@ # Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com amqpSrvPort=5671 -virtualHost=/rabbitmq -jwtToken= +virtualHost=/rabbitmq +# For Astra Streaming with S4R, the user an empty string +amqpUser= +# For Astra Streaming with S4R, the password is the JWT token in the format of +# file:///path/to/astra_streaming_jwt_token_file +amqpPassword=file:// +# when using Astra Streaming with S4R, this needs to be set to true +useTls=true # valid values: direct, fanout, topic, headers exchangeType=direct diff --git a/adapter-s4r/src/main/resources/s4r_consumer.yaml b/adapter-s4r/src/main/resources/s4r_consumer.yaml deleted file mode 100644 index d6b96d129..000000000 --- a/adapter-s4r/src/main/resources/s4r_consumer.yaml +++ /dev/null @@ -1,18 +0,0 @@ -bindings: - myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name') - myqueue: CSVFrequencySampler('csv/queue_names.csv', 'name') - myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name') - -# Doc-level parameters (must be static) -params: - -blocks: - msg-recv-block: - ops: - AmqpMsgReceiver: - #exchange_names: "{myexname}" - exchange_name: "alpha" - - queue_name: "{myqueue}" - - binding_key: "{myroutingkey}" diff --git a/adapter-s4r/src/main/resources/s4r_msg_receiver.yaml b/adapter-s4r/src/main/resources/s4r_msg_receiver.yaml new file mode 100644 index 000000000..c70d4a119 --- /dev/null +++ b/adapter-s4r/src/main/resources/s4r_msg_receiver.yaml @@ -0,0 +1,12 @@ +bindings: + myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name') + +# Doc-level parameters (must be static) +params: + +blocks: + msg-recv-block: + ops: + op1: + AmqpMsgReceiver: "" + binding_key: "{myroutingkey}" diff --git a/adapter-s4r/src/main/resources/sanity-validation/e2e-nbs4r-sanity.sh b/adapter-s4r/src/main/resources/sanity-validation/e2e-nbs4r-sanity.sh new file mode 100755 index 000000000..072365ba2 --- /dev/null +++ b/adapter-s4r/src/main/resources/sanity-validation/e2e-nbs4r-sanity.sh @@ -0,0 +1,137 @@ +#! /usr/local/bin/bash + +## +# Copyright (c) 2022-2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source "${CUR_SCRIPT_FOLDER}/utilities.sh" + +echo + +## +# Show usage info +# +usage() { + echo + echo "Usage: e2e-nbs4r-sanity.sh [-h]" + echo " -j ]" + echo " -h : Show usage info" + echo " -j : JWT token file full path. Default to 'jwt.token' in the same directory!" + echo +} + +if [[ $# -gt 3 ]]; then + usage + errExit 10 "Incorrect input parameter count!" +fi + +jwtTokenFile="./jwt.token" +while [[ "$#" -gt 0 ]]; do + case $1 in + -h) usage; exit 0 ;; + -j) jwtTokenFile=$2; shift ;; + *) errExit 20 "Unknown input parameter passed: $1"; ;; + esac + shift +done + +mkdir -p "./logs/nb5-exec" +mainLogDir="${CUR_SCRIPT_FOLDER}/logs" +nbExecLogDir="${mainLogDir}/nb5-exec" + +# 2022-08-19 11:40:23 +startTime=$(date +'%Y-%m-%d %T') +# 20220819114023 +startTime2=${startTime//[: -]/} +sanityTestMainLogFile="${mainLogDir}/e2e-nbs4r-sanity-${startTime2}.log" +echo > "${sanityTestMainLogFile}" + +debugMsg "jwtTokenFile=${jwtTokenFile}" "${sanityTestMainLogFile}" +if ! [[ -f "${jwtTokenFile}" ]]; then + errExit 30 \ + "Can't find the required JWT token file (for Pulsar cluster connection) at the specified location: \"" + + jwtTokenFile + "\"!" \ + "${sanityTestMainLogFile}" +fi + +sanityS4rCfgPropFileName="e2e-sanity-config.properties" +sanityS4rCfgPropFile="${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}" +if [[ ! -f "${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}.tmpl" ]]; then + errExit 30 \ + "Can't find the required sanity test config file template at the specified location: \"" + + "${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}.tmpl" + "\"!" \ + "${sanityTestMainLogFile}" +fi + +cp -rf "${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}.tmpl" "${sanityS4rCfgPropFile}" +if [[ -n "${jwtTokenFile// }" ]]; then + replaceStringInFile "" "file://${jwtTokenFile}" "${sanityS4rCfgPropFile}" +else + replaceStringInFile "" "" "${sanityS4rCfgPropFile}" +fi + +NB5JAR="${CUR_SCRIPT_FOLDER}/../../../../../nb5/target/nb5.jar" +sanityS4rSenderYamlFile="${CUR_SCRIPT_FOLDER}/e2e-sanity-sender.yaml" +sanityS4rReceiverYamlFile="${CUR_SCRIPT_FOLDER}/e2e-sanity-receiver.yaml" + +{ + echo; + echo "======================================================================================================"; + echo "Starting the sanity test for the NoSQLBench S4J adapter at ${startTime} ..."; + echo; + echo " >>> Kick off an S4R message sending workload ..." + echo; +} >> "${sanityTestMainLogFile}" + +read -r -d '' nbs4rMsgSendCmd << EOM +java -jar ${NB5JAR} run driver=s4r -vv --logs-dir=${nbExecLogDir} strict_msg_error_handling=\"false\" \ + cycles=1000 threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \ + config=${sanityS4rCfgPropFile} \ + workload=${sanityS4rSenderYamlFile} +EOM +debugMsg "nbs4rMsgSendCmd=${nbs4rMsgSendCmd}" "${sanityTestMainLogFile}" + +eval '${nbs4rMsgSendCmd}' +if [[ $? -ne 0 ]]; then + errExit 40 "Failed to kick off the S4R message sending workload!" "${sanityTestMainLogFile}" +fi + +# pause 5 seconds before kicking off the message sending workload +sleep 5 + +{ + echo; + echo " >>> Kick off an S4J message receiving workload after 30 seconds..." + echo; +} >> "${sanityTestMainLogFile}" + +read -r -d '' nbs4rMsgRecvCmd << EOM +java -jar ${NB5JAR} run driver=s4r -vv --logs-dir=${nbExecLogDir} \ + cycles=1000 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2\ + config=${sanityS4rCfgPropFile} \ + workload=${sanityS4rReceiverYamlFile} +EOM +debugMsg "nbs4rMsgRecvCmd=${nbs4rMsgRecvCmd}" "${sanityTestMainLogFile}" + +eval '${nbs4rMsgRecvCmd}' +if [[ $? -ne 0 ]]; then + errExit 40 "Failed to kick off the S4R message receiving workload!" "${sanityTestMainLogFile}" +fi + +echo "NB S4J workload sanity check passed!" >> "${sanityTestMainLogFile}" + + +echo diff --git a/adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl b/adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl new file mode 100644 index 000000000..bbaadbae2 --- /dev/null +++ b/adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-config.properties.tmpl @@ -0,0 +1,11 @@ +## +# Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled +amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com +amqpSrvPort=5671 +virtualHost=nbtest/rabbitmq +amqpUser= +amqpPassword= +# when using Astra Streaming with S4R, this needs to be set to true +useTls=true +# valid values: direct, fanout, topic, headers +exchangeType=direct diff --git a/adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-sender.yaml b/adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-sender.yaml new file mode 100644 index 000000000..ba74f7c9f --- /dev/null +++ b/adapter-s4r/src/main/resources/sanity-validation/e2e-sanity-sender.yaml @@ -0,0 +1,13 @@ +bindings: + mytext_val: AlphaNumericString(100) + +params: + publisher_confirm: false + +blocks: + msg-send-block: + ops: + op1: + AmqpMsgSender: "" + routing_key: "routing-test" + message: "{mytext_val}" diff --git a/adapter-s4r/src/main/resources/sanity-validation/utilities.sh b/adapter-s4r/src/main/resources/sanity-validation/utilities.sh new file mode 100755 index 000000000..a6e31ace1 --- /dev/null +++ b/adapter-s4r/src/main/resources/sanity-validation/utilities.sh @@ -0,0 +1,113 @@ +#! /usr/local/bin/bash + +## +# Copyright (c) 2022-2023 nosqlbench +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +DEBUG=true + +## +# Show debug message +# - $1 : the message to show +# - $2 : (Optional) the file to log the message to +debugMsg() { + if [[ "${DEBUG}" == "true" ]]; then + local msg=${1} + local file=${2} + + if [[ -z "${file}" ]]; then + echo "[Debug] ${msg}" + else + echo "[Debug] ${msg}" >> "${file}" + fi + + fi +} + +## +# - $1 : the exit code +# - $2 : the message to show +# - $3 : (Optional) the file to log the message to +errExit() { + local exitCode=${1} + local msg=${2} + local file=${3} + + if [[ -z "${file}" ]]; then + echo "[Error] ${msg}" + else + echo "[Error] ${msg}" >> "${file}" + fi + + exit ${exitCode} +} + +## +# Read the properties file and returns the value based on the key +# 2 input parameters: +# - 1st parameter: the property file to scan +# - 2nd parameter: the key to search for +getPropVal() { + local propFile=$1 + local searchKey=$2 + local value=$(grep "${searchKey}" ${propFile} | grep -Ev "^#|^$" | cut -d'=' -f2) + echo $value +} + +## +# Check if the sed being used is GNU sed +isGnuSed() { + local gnu_sed=$(sed --version 2>&1 | grep -v 'illegal\|usage\|^\s' | grep "GNU sed" | wc -l) + echo ${gnu_sed} +} + + +## +# Replace the occurrence of a string place holder with a specific value in a file +# Four input parameters: +# - 1st parameter: the place holder string to be replaced +# - 2nd parameter: the value string to replace the place holder +# - 3rd parameter: the file +# - 4th parameter: (Optional) a particular line identifier to replace. +# if specified, only replace the place holder in the matching line +# otherwise, replace all occurrence in the file +# +# TBD: use this function to hide GNU difference (Mac vs Linux, GNU or not) +# +replaceStringInFile() { + local placeHolderStr=${1} + local valueStr=${2} + local fileToScan=${3} + local lineIdentifier=${4} + + # in case '/' is part of the string + placeHolderStr=$(echo ${placeHolderStr} | sed 's/\//\\\//g') + valueStr=$(echo ${valueStr} | sed 's/\//\\\//g') + + gnuSed=$(isGnuSed) + if [[ "$OSTYPE" == "darwin"* && ${gnuSed} -eq 0 ]]; then + if ! [[ -z "${lineIdentifier// }" ]]; then + sed -i '' "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan} + else + sed -i '' "s/${placeHolderStr}/${valueStr}/g" ${fileToScan} + fi + else + if ! [[ -z "${lineIdentifier// }" ]]; then + sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan} + else + sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan} + fi + fi +} diff --git a/adapter-s4r/src/main/resources/start_s4r_consumer.sh b/adapter-s4r/src/main/resources/start_s4r_consumer.sh deleted file mode 100755 index 1be463e4c..000000000 --- a/adapter-s4r/src/main/resources/start_s4r_consumer.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/local/bin/bash -# -# Copyright (c) 2023 nosqlbench -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -: "${REBUILD:=1}" -: "${CYCLES:=1000000000}" -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" -if [[ ${REBUILD} -eq 1 ]]; then - "${SCRIPT_DIR}/build-nb-kafka-driver.sh" -fi -java -jar nb5/target/nb5.jar \ - run \ - driver=s4r \ - -vv \ - --report-interval 5 \ - --docker-metrics \ - cycles=${CYCLES} \ - threads=1 \ - num_clnt=1 \ - num_cons_grp=1 \ - yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \ - config="${SCRIPT_DIR}/kafka_config.properties" \ - bootstrap_server=PLAINTEXT://localhost:9092 diff --git a/adapter-s4r/src/main/resources/start_s4r_producer.sh b/adapter-s4r/src/main/resources/start_s4r_producer.sh deleted file mode 100755 index 0999c5fc0..000000000 --- a/adapter-s4r/src/main/resources/start_s4r_producer.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/local/bin/bash -# -# Copyright (c) 2023 nosqlbench -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -: "${REBUILD:=1}" -: "${CYCLES:=1000000000}" -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" -if [[ ${REBUILD} -eq 1 ]]; then - "${SCRIPT_DIR}/build-nb-kafka-driver.sh" -fi -while [[ 1 -eq 1 ]]; do - java -jar nb5/target/nb5.jar \ - run \ - driver=s4r \ - -vv \ - --report-interval 5 \ - --docker-metrics \ - cycles="${CYCLES}" \ - threads=1 \ - num_clnt=1 \ - yaml="${SCRIPT_DIR}/kafka_producer.yaml" \ - config="${SCRIPT_DIR}/kafka_config.properties" \ - bootstrap_server=PLAINTEXT://localhost:9092 - sleep 10 -done