diff --git a/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh b/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh
index fe225a567..45e4f0bac 100755
--- a/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh
+++ b/adapter-s4j/src/main/resources/sanity-validation/e2e-nbs4j-sanity.sh
@@ -1,5 +1,21 @@
#! /usr/local/bin/bash
+##
+# Copyright (c) 2022-2023 nosqlbench
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source "${CUR_SCRIPT_FOLDER}/utilities.sh"
@@ -86,12 +102,12 @@ sanityS4jMsgReceiverYamlFile="${CUR_SCRIPT_FOLDER}/sanity-msg-receiver-queue.yam
read -r -d '' nbs4jMsgSendCmd << EOM
java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \
- cycles=1000 threads=4 num_conn=2 num_session=2 \
+ cycles=200 threads=4 num_conn=2 num_session=2 \
session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \
service_url=${brokerSvcUrl} \
web_url=${webSvcUrl} \
config=${sanityS4jCfgPropFile} \
- yaml=${sanityS4jMsgSenderYamlFile}
+ workload=${sanityS4jMsgSenderYamlFile}
EOM
debugMsg "nbs4jMsgSendCmd=${nbs4jMsgSendCmd}" "${sanityTestMainLogFile}"
@@ -111,12 +127,12 @@ sleep 5
read -r -d '' nbs4jMsgRecvCmd << EOM
java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \
- cycles=1000 threads=4 num_conn=2 num_session=2 \
+ cycles=200 threads=4 num_conn=2 num_session=2 \
session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \
service_url=${brokerSvcUrl} \
web_url=${webSvcUrl} \
config=${sanityS4jCfgPropFile} \
- yaml=${sanityS4jMsgReceiverYamlFile}
+ workload=${sanityS4jMsgReceiverYamlFile}
EOM
debugMsg "nbs4jMsgRecvCmd=${nbs4jMsgRecvCmd}" "${sanityTestMainLogFile}"
diff --git a/adapter-s4j/src/main/resources/sanity-validation/utilities.sh b/adapter-s4j/src/main/resources/sanity-validation/utilities.sh
index 1469e2985..a6e31ace1 100755
--- a/adapter-s4j/src/main/resources/sanity-validation/utilities.sh
+++ b/adapter-s4j/src/main/resources/sanity-validation/utilities.sh
@@ -1,6 +1,22 @@
#! /usr/local/bin/bash
-DEBUG=false
+##
+# Copyright (c) 2022-2023 nosqlbench
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+DEBUG=true
##
# Show debug message
@@ -89,7 +105,7 @@ replaceStringInFile() {
fi
else
if ! [[ -z "${lineIdentifier// }" ]]; then
- sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${funcCfgJsonFileTgt}
+ sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
else
sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
fi
diff --git a/adapter-s4r/pom.xml b/adapter-s4r/pom.xml
index 014aed4ea..0e1385970 100644
--- a/adapter-s4r/pom.xml
+++ b/adapter-s4r/pom.xml
@@ -50,7 +50,6 @@
${amqp.version}
-
commons-beanutils
commons-beanutils
@@ -63,6 +62,12 @@
commons-configuration2
2.9.0
+
+
+ commons-io
+ commons-io
+ 2.13.0
+
diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java
index 645a94aeb..e66c4cd22 100644
--- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java
+++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java
@@ -27,14 +27,18 @@ import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.File;
import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -71,13 +75,16 @@ public class S4RSpace implements AutoCloseable {
// Maximum number of AMQP channels per connection
private final int amqpConnChannelNum;
+ // Maximum number of AMQP exchanges per channel
+ private final int amqpChannelExchangeNum;
+
// Max number of queues (per exchange)
// - only relevant with message receivers
private final int amqpExchangeQueueNum;
// Max number of message clients (senders or receivers)
// - for senders, this is the number of message senders per exchange
- // - for recievers, this is the number of message receivers per queue
+ // - for receivers, this is the number of message receivers per queue
// (there could be multiple queues per exchange)
private final int amqpMsgClntNum;
@@ -91,18 +98,9 @@ public class S4RSpace implements AutoCloseable {
private final ConcurrentHashMap amqpConnections = new ConcurrentHashMap<>();
- ///////////////////////////////////
- // NOTE: Do NOT mix sender and receiver workload in one NB workload
- ///////////////////////////////////
-
- // Amqp Channels for senders
- public record AmqpSenderChannelKey(Long connId, Long channelId, Long senderId) { }
- private final ConcurrentHashMap amqpSenderChannels = new ConcurrentHashMap<>();
-
- // Amqp Channels for receivers
- public record AmqpReceiverChannelKey(Long connId, Long channelId, Long queueId, Long consumerId) { }
- private final ConcurrentHashMap amqpReceiverChannels = new ConcurrentHashMap<>();
- private final ConcurrentHashMap> amqpRecvChannelQueueSetMap = new ConcurrentHashMap<>();
+ // Amqp connection/chanel/exchange combination for a sender
+ public record AmqpChannelKey(Long connId, Long channelId) { }
+ private final ConcurrentHashMap amqpChannels = new ConcurrentHashMap<>();
// Whether to do strict error handling while sending/receiving messages
// - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing
@@ -129,6 +127,8 @@ public class S4RSpace implements AutoCloseable {
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
this.amqpConnChannelNum =
NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1"));
+ this.amqpChannelExchangeNum =
+ NumberUtils.toInt(cfg.getOptional("num_exchange").orElse("1"));
this.amqpExchangeQueueNum =
NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1"));
this.amqpMsgClntNum =
@@ -155,6 +155,12 @@ public class S4RSpace implements AutoCloseable {
.setDescription("Maximum number of AMQP connections."))
.add(Param.defaultTo("num_channel", 1)
.setDescription("Maximum number of AMQP channels per connection"))
+ .add(Param.defaultTo("num_exchange", 1)
+ .setDescription("Maximum number of AMQP exchanges per channel."))
+ .add(Param.defaultTo("num_queue", 1)
+ .setDescription("Max number of queues per exchange (only relevant for receivers)."))
+ .add(Param.defaultTo("num_msg_clnt", 1)
+ .setDescription("Max number of message clients per exchange (sender) or per queue (receiver)."))
.add(Param.defaultTo("max_op_time", 0)
.setDescription("Maximum time (in seconds) to run NB Kafka testing scenario."))
.add(Param.defaultTo("strict_msg_error_handling", false)
@@ -164,16 +170,10 @@ public class S4RSpace implements AutoCloseable {
public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); }
- public Channel getAmqpSenderChannel(
- AmqpSenderChannelKey key,
+ public Channel getAmqpChannels(
+ AmqpChannelKey key,
Supplier channelSupplier) {
- return amqpSenderChannels.computeIfAbsent(key, __ -> channelSupplier.get());
- }
-
- public Channel getAmqpReceiverChannel(
- AmqpReceiverChannelKey key,
- Supplier channelSupplier) {
- return amqpReceiverChannels.computeIfAbsent(key, __ -> channelSupplier.get());
+ return amqpChannels.computeIfAbsent(key, __ -> channelSupplier.get());
}
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
@@ -183,7 +183,8 @@ public class S4RSpace implements AutoCloseable {
public String getAmqpExchangeType() { return amqpExchangeType; }
public int getAmqpConnNum() { return this.amqpConnNum; }
public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; }
- public int getAmqpExchangeQueueNum() { return this.amqpConnNum; }
+ public int getAmqpChannelExchangeNum() { return this.amqpChannelExchangeNum; }
+ public int getAmqpExchangeQueueNum() { return this.amqpExchangeQueueNum; }
public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; }
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
@@ -218,19 +219,50 @@ public class S4RSpace implements AutoCloseable {
try {
s4rConnFactory = new ConnectionFactory();
- String passWord = cfg.get("jwtToken");
- s4rConnFactory.setPassword(cfgMap.get(""));
- s4rConnFactory.setPassword(passWord);
-
- String amqpServerHost = cfg.get("amqpSrvHost");
+ String amqpServerHost = cfgMap.get("amqpSrvHost");
+ if (StringUtils.isBlank(amqpServerHost)) {
+ String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!";
+ throw new S4RAdapterInvalidParamException(errMsg);
+ }
s4rConnFactory.setHost(amqpServerHost);
- int amqpServerPort = Integer.parseInt(cfg.get("amqpSrvPort"));
- s4rConnFactory.setPort(amqpServerPort);
+ String amqpSrvPortCfg = cfgMap.get("amqpSrvPort");
+ if (StringUtils.isBlank(amqpSrvPortCfg)) {
+ String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!";
+ throw new S4RAdapterInvalidParamException(errMsg);
+ }
+ s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg));
- String amqpVirtualHost = cfg.get("virtualHost");
+ String amqpVirtualHost = cfgMap.get("virtualHost");
+ if (StringUtils.isBlank(amqpVirtualHost)) {
+ String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!";
+ throw new S4RAdapterInvalidParamException(errMsg);
+ }
s4rConnFactory.setVirtualHost(amqpVirtualHost);
+ String userNameCfg = cfgMap.get("amqpUser");
+
+ String passWordCfg = cfgMap.get("amqpPassword");
+ if (StringUtils.isNotBlank(passWordCfg)) {
+ String passWord = passWordCfg;
+ if (StringUtils.startsWith(passWordCfg, "file://")
+ && StringUtils.length(passWordCfg) > 7) {
+ String jwtTokenFile = StringUtils.substring(passWordCfg, 7);
+ passWord = FileUtils.readFileToString(new File(jwtTokenFile), "UTF-8");
+ }
+
+ if (StringUtils.isNotBlank(passWord)) {
+ if (StringUtils.isBlank(userNameCfg)) {
+ s4rConnFactory.setUsername("");
+ }
+ s4rConnFactory.setPassword(passWord);
+ }
+ }
+
+ String useTlsCfg = cfgMap.get("useTls");
+ if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) {
+ s4rConnFactory.useSslProtocol();
+ }
for (int i = 0; i < getAmqpConnNum(); i++) {
Connection connection = s4rConnFactory.newConnection();
@@ -243,7 +275,7 @@ public class S4RSpace implements AutoCloseable {
connection);
}
}
- } catch (IOException|TimeoutException ex) {
+ } catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) {
logger.error("Unable to establish AMQP connections with the following configuration parameters: {}",
s4rClientConnInfo.toString());
throw new S4RAdapterUnexpectedException(ex);
@@ -255,11 +287,7 @@ public class S4RSpace implements AutoCloseable {
try {
beingShutdown.set(true);
- for (Channel channel : amqpSenderChannels.values()) {
- channel.close();
- }
-
- for (Channel channel : amqpReceiverChannels.values()) {
+ for (Channel channel : amqpChannels.values()) {
channel.close();
}
diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java
index 109b37898..4f99b5db9 100644
--- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java
+++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java
@@ -16,9 +16,7 @@
package io.nosqlbench.adapter.s4r.dispensers;
-import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
@@ -26,7 +24,6 @@ import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,8 +43,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser s4rConfMap = new HashMap<>();
protected final String exchangeType;
- protected final LongFunction exchangeNameFunc;
-
protected AmqpBaseOpDispenser(final DriverAdapter adapter,
final ParsedOp op,
final S4RSpace s4RSpace) {
@@ -63,7 +58,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser bindingKeyFunc;
- private final LongFunction queueNameFunc;
public AmqpMsgRecvOpDispenser(DriverAdapter adapter,
ParsedOp op,
S4RSpace s4rSpace) {
super(adapter, op, s4rSpace);
-
- queueNameFunc = lookupOptionalStrOpValueFunc("queue_name", null);
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
}
private long getExchangeQueueSeqNum(long cycle) {
- return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum()))
- % s4rSpace.getAmqpExchangeQueueNum();
+ return (cycle / ((long) s4rSpace.getAmqpConnNum() *
+ s4rSpace.getAmqpConnChannelNum() *
+ s4rSpace.getAmqpChannelExchangeNum())
+ ) % s4rSpace.getAmqpExchangeQueueNum();
}
private long getQueueReceiverSeqNum(long cycle) {
- return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum() * s4rSpace.getAmqpExchangeQueueNum()))
- % s4rSpace.getAmqpMsgClntNum();
+ return (cycle / ((long) s4rSpace.getAmqpConnNum() *
+ s4rSpace.getAmqpConnChannelNum() *
+ s4rSpace.getAmqpChannelExchangeNum() *
+ s4rSpace.getAmqpExchangeQueueNum())
+ ) % s4rSpace.getAmqpMsgClntNum();
}
- private String getEffectiveQueueName(long cycle) {
- String queueNameInput = queueNameFunc.apply(cycle);
- return (StringUtils.isBlank(queueNameInput) ? "queue-" + getExchangeQueueSeqNum(cycle) : queueNameInput);
+ private String getEffectiveQueueNameByCycle(long cycle) {
+ return getEffectiveQueueName(
+ getConnSeqNum(cycle),
+ getConnChannelSeqNum(cycle),
+ getChannelExchangeSeqNum(cycle),
+ getExchangeQueueSeqNum(cycle));
+ }
+ private String getEffectiveQueueName(long connSeqNum, long channelSeqNum, long exchangeSeqNum, long queueSeqNum) {
+ return "queue-" + connSeqNum + "-" + channelSeqNum + "-" + exchangeSeqNum + "-" + queueSeqNum;
}
- private Channel getAmqpChannelQueueForReceiver(long cycle,
- String exchangeName,
- String queueName) {
+ private String getEffectiveReceiverName(long cycle) {
+ return getEffectiveReceiverName(
+ getConnSeqNum(cycle),
+ getConnChannelSeqNum(cycle),
+ getChannelExchangeSeqNum(cycle),
+ getExchangeQueueSeqNum(cycle),
+ getQueueReceiverSeqNum(cycle));
+ }
+ private String getEffectiveReceiverName(long connSeqNum,
+ long channelSeqNum,
+ long exchangeSeqNum,
+ long queueSeqNum,
+ long receiverSeqNum) {
+ return String.format(
+ "receiver-%d-%d-%d-%d-%d",
+ connSeqNum,
+ channelSeqNum,
+ exchangeSeqNum,
+ queueSeqNum,
+ receiverSeqNum);
+ }
+
+ private Channel getAmqpChannelForReceiver(long cycle) {
long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle);
- long queueSeqNum = getExchangeQueueSeqNum(cycle);
- long receiverSeqNum = getQueueReceiverSeqNum(cycle);
- Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum);
+ Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
+ S4RSpace.AmqpChannelKey amqpConnChannelKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
- S4RSpace.AmqpReceiverChannelKey amqpConnChannelKey =
- new S4RSpace.AmqpReceiverChannelKey(connSeqNum, channelSeqNum, queueSeqNum, receiverSeqNum);
-
- return s4rSpace.getAmqpReceiverChannel(amqpConnChannelKey, () -> {
+ return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> {
Channel channel = null;
try {
- channel = getChannelWithExchange(
- amqpConnection,
- connSeqNum,
- channelSeqNum,
- exchangeName);
-
- AMQP.Queue.DeclareOk declareOk =
- channel.queueDeclare(queueName, true, true, true, null);
+ channel = amqpConnection.createChannel();
if (logger.isDebugEnabled()) {
- logger.debug("AMQP queue declared -- [exchange name: {}, queue name: {}] {}",
- exchangeName,
- queueName,
- declareOk);
+ logger.debug("Created channel for amqp connection: {}, channel: {}",
+ amqpConnection, channel);
+ }
+ }
+ catch (IOException ex) {
+ // Do not throw exception here, just log it and return null
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex);
}
- } catch (IOException ex) {
- throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!");
}
return channel;
});
}
+
@Override
public S4RTimeTrackOp apply(long cycle) {
- Channel channel = null;
+ Channel channel = getAmqpChannelForReceiver(cycle);
+ if (channel == null) {
+ throw new S4RAdapterUnexpectedException(
+ String.format(
+ "Failed to get AMQP channel for receiver %s [%d]!",
+ getEffectiveReceiverName(cycle),
+ cycle));
+ }
- String exchangeName = getEffectiveExchangeName(cycle);
- String queueName = getEffectiveQueueName(cycle);
+ String exchangeName = getEffectiveExchangeNameByCycle(cycle);
+ declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType());
+
+ boolean durable = true;
+ boolean exclusive = true;
+ boolean autoDelete = false;
+ String queueName = getEffectiveQueueNameByCycle(cycle);
+ String bindingKey = bindingKeyFunc.apply(cycle);
+ try {
+ channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
+ if (logger.isTraceEnabled()) {
+ logger.debug("AMQP queue is declared - \"{} ({}/{}/{})\" on exchange \"{}\" for a receiver!",
+ queueName,
+ durable,
+ exclusive,
+ autoDelete,
+ exchangeName);
+ }
+ }
+ catch (IOException ex) {
+ throw new S4RAdapterUnexpectedException(
+ String.format(
+ "Unable to declare the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" for a receiver!",
+ queueName, durable, exclusive, autoDelete, exchangeName)
+ );
+ }
try {
- channel = getAmqpChannelQueueForReceiver(cycle, exchangeName, queueName);
+ // Binding the same queue multiple times on one exchange is considered as a no-op
+ channel.queueBind(queueName, exchangeName, bindingKey);
+ if (logger.isTraceEnabled()) {
+ logger.debug("AMQP queue is bound - \"{} ({}/{}/{})\" on exchange \"{}\" with binding key \"{}\"!",
+ queueName,
+ durable,
+ exclusive,
+ autoDelete,
+ exchangeName,
+ bindingKey);
+ }
}
- catch (Exception ex) {
- throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel!");
+ catch (IOException ex) {
+ throw new S4RAdapterUnexpectedException(
+ String.format(
+ "Unable to bind the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" with binding key \"%s\"!",
+ queueName, durable, exclusive, autoDelete, exchangeName, bindingKey)
+ );
}
return new OpTimeTrackAmqpMsgRecvOp(
@@ -121,7 +183,6 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
s4rSpace,
channel,
exchangeName,
- queueName,
- bindingKeyFunc.apply(cycle));
+ queueName);
}
}
diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java
index 803da6fb5..e08a43ca8 100644
--- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java
+++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java
@@ -68,7 +68,8 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
.orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label);
if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) {
throw new S4RAdapterInvalidParamException("confirm_mode",
- "Must be one following valid values: '" + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
+ "The provided value \"" + confirmMode + "\" is not one of following valid values: '" +
+ S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
}
confirmBatchNum = parsedOp
@@ -86,62 +87,87 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
}
private long getExchangeSenderSeqNum(long cycle) {
- return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum()))
- % s4rSpace.getAmqpMsgClntNum();
+ return (cycle / ((long) s4rSpace.getAmqpConnNum() *
+ s4rSpace.getAmqpConnChannelNum() *
+ s4rSpace.getAmqpChannelExchangeNum())
+ ) % s4rSpace.getAmqpMsgClntNum();
}
- private Channel getAmqpChannelForSender(long cycle,
- String exchangeName) {
+ private String getEffectiveSenderNameByCycle(long cycle) {
+ return getEffectiveSenderNameByCycle(
+ getConnSeqNum(cycle),
+ getConnChannelSeqNum(cycle),
+ getChannelExchangeSeqNum(cycle),
+ getExchangeSenderSeqNum(cycle));
+ }
+ private String getEffectiveSenderNameByCycle(long connSeqNum,
+ long channelSeqNum,
+ long exchangeSeqNum,
+ long senderSeqNum) {
+ return String.format(
+ "sender-%d-%d-%d-%d",
+ connSeqNum,
+ channelSeqNum,
+ exchangeSeqNum,
+ senderSeqNum);
+ }
+
+ private Channel getAmqpChannelForSender(long cycle) {
long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle);
- long senderSeqNum = getExchangeSenderSeqNum(cycle);
- Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum);
+ Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
+ S4RSpace.AmqpChannelKey senderKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
- S4RSpace.AmqpSenderChannelKey amqpConnChannelKey =
- new S4RSpace.AmqpSenderChannelKey(connSeqNum, channelSeqNum, senderSeqNum);
-
- return s4rSpace.getAmqpSenderChannel(amqpConnChannelKey, () -> {
- Channel channel;
+ return s4rSpace.getAmqpChannels(senderKey, () -> {
+ Channel channel = null;
try {
- channel = getChannelWithExchange(
- amqpConnection,
- connSeqNum,
- channelSeqNum,
- exchangeName);
+ channel = amqpConnection.createChannel();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Created channel for amqp connection: {}, channel: {}",
+ amqpConnection, channel);
+ }
+ }
+ catch (IOException ex) {
+ // Do not throw exception here, just log it and return null
+ if (logger.isDebugEnabled()) {
+ logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex);
+ }
+ }
- if (publisherConfirm) {
+ try {
+ if ((channel != null) && publisherConfirm) {
channel.confirmSelect();
- boolean asyncConfirm = false;
if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) {
- asyncConfirm = true;
-
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
if (logger.isTraceEnabled()) {
- logger.debug("Async ack of message publish received: {}, {}",
+ logger.debug("Async ack received for a published message: {}, {}",
sequenceNumber, multiple);
}
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
if (logger.isTraceEnabled()) {
- logger.debug("Async n-ack of message publish received: {}, {}",
+ logger.debug("Async n-ack received of a published message: {}, {}",
sequenceNumber, multiple);
}
});
}
- if (logger.isDebugEnabled()) {
- logger.debug("Publisher Confirms enabled on AMQP channel (sync: {}) -- {}",
- !asyncConfirm,
+ if (logger.isTraceEnabled()) {
+ logger.debug("Publisher Confirms is enabled on AMQP channel: {}({}), {}",
+ confirmMode,
+ confirmBatchNum,
channel);
}
}
} catch (IOException ex) {
- throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!");
+ throw new S4RAdapterUnexpectedException(
+ "Failed to enable publisher acknowledgement on the AMQP channel (" +
+ channel + ")!");
}
return channel;
@@ -155,15 +181,17 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}
- Channel channel;
- String exchangeName = getEffectiveExchangeName(cycle);
+ Channel channel = getAmqpChannelForSender(cycle);
+ if (channel == null) {
+ throw new S4RAdapterUnexpectedException(
+ String.format(
+ "Failed to get AMQP channel for sender %s [%d]!",
+ getEffectiveSenderNameByCycle(cycle),
+ cycle));
+ }
- try {
- channel = getAmqpChannelForSender(cycle, exchangeName);
- }
- catch (Exception ex) {
- throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel for sending messages!");
- }
+ String exchangeName = getEffectiveExchangeNameByCycle(cycle);
+ declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType());
return new OpTimeTrackAmqpMsgSendOp(
s4rAdapterMetrics,
diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java
index 2ad551ead..c780e3012 100644
--- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java
+++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java
@@ -31,27 +31,16 @@ import java.nio.charset.StandardCharsets;
public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp");
-
private final String queueName;
- private final String bindingKey;
+
public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics,
S4RSpace s4rSpace,
Channel channel,
String exchangeName,
- String queueName,
- String bindingKey) {
+ String queueName) {
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName);
this.queueName = queueName;
- this.bindingKey = bindingKey;
-
- try {
- channel.queueBind(queueName, exchangeName, bindingKey);
- }
- catch (IOException ex) {
- throw new S4RAdapterUnexpectedException("Unable to bind queue (\"" + queueName + "\") to " +
- "exchange (\"" + exchangeName + "\")!");
- }
}
@Override
@@ -59,20 +48,26 @@ public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
try {
Consumer receiver = new DefaultConsumer(channel) {
@Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
+ public void handleDelivery(
+ String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
+ {
+ String routingKey = envelope.getRoutingKey();
+ String contentType = properties.getContentType();
+ String msgPayload = new String(body, StandardCharsets.UTF_8);
+
if (logger.isTraceEnabled()) {
- String msgPayload = new String(body, StandardCharsets.UTF_8);
- logger.trace("Successfully received message ({}) via consumer ({}) in the current channel: {}",
+ logger.trace(
+ "Successfully received message ({}) via consumer ({}/{}/{}) in the current channel: {}",
msgPayload,
consumerTag,
+ routingKey,
+ contentType,
channel);
}
}
};
- channel.basicConsume(queueName, receiver);
-
+ channel.basicConsume(queueName, true, receiver);
}
catch (IOException e) {
throw new S4RAdapterUnexpectedException(
diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java
index 9c9795fcc..b19a035a7 100644
--- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java
+++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java
@@ -28,6 +28,7 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
@@ -40,8 +41,8 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
private final String confirmMode;
private final int confirmBatchNum;
- private static final ThreadLocal
- publishConfirmBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
+ private static final ConcurrentHashMap
+ channelPublishConfirmBathTracking = new ConcurrentHashMap<>();
public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics,
S4RSpace s4rSpace,
@@ -73,34 +74,40 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
routingKey,
null,
msgPayload.getBytes(StandardCharsets.UTF_8));
+ if (logger.isTraceEnabled()) {
+ logger.trace("Successfully published message (({}) {}) via the current channel: {}",
+ cycle, msgPayload, channel);
+ }
if (publishConfirm) {
// Individual publish confirm
if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) {
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
+ if (logger.isTraceEnabled()) {
+ logger.debug("Sync ack received for an individual published message: {}", cycle);
+ }
}
// Batch publish confirm
else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) {
- int publishConfirmTrackingCnt = publishConfirmBatchTrackingCnt.get();
+ int publishConfirmTrackingCnt =
+ channelPublishConfirmBathTracking.getOrDefault(channel, 0);
+
if ( (publishConfirmTrackingCnt > 0) &&
( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) ||
(publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) {
- synchronized (this) {
- channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
+ channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
+ if (logger.isTraceEnabled()) {
+ logger.debug("Sync ack received for a batch of published message: {}, {}",
+ cycle, publishConfirmTrackingCnt);
}
}
else {
- publishConfirmBatchTrackingCnt.set(publishConfirmTrackingCnt+1);
+ channelPublishConfirmBathTracking.put(channel, publishConfirmTrackingCnt+1);
}
}
// Async publish confirm
// - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser'
}
-
- if (logger.isTraceEnabled()) {
- logger.trace("Successfully published message ({}) via the current channel: {}",
- msgPayload, channel);
- }
}
catch (IllegalStateException ex) {
throw new S4RAdapterUnexpectedException(
diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java
index 9009d26cc..4659e44f9 100644
--- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java
+++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java
@@ -30,18 +30,6 @@ import java.util.stream.Stream;
public class S4RAdapterUtil {
private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class);
- ///////
- // Valid document level parameters for JMS NB yaml file
- public enum DOC_LEVEL_PARAMS {
- // Blocking message producing or consuming
- ASYNC_API("async_api");
- public final String label;
-
- DOC_LEVEL_PARAMS(final String label) {
- this.label = label;
- }
- }
-
public enum AMQP_EXCHANGE_TYPES {
DIRECT("direct"),
FANOUT("fanout"),
@@ -83,19 +71,14 @@ public class S4RAdapterUtil {
}
}
public static String getValidAmqpPublisherConfirmModeList() {
- return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", ");
+ return StringUtils.join(AMQP_PUB_CONFIRM_MODE.LABELS, ", ");
}
// At least 20 messages in a publishing batch
- public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 20;
+ public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 10;
public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100;
public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000;
- public static Map convertJsonToMap(final String jsonStr) throws Exception {
- final ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(jsonStr, new TypeReference