Update NB S4R, add README doc and testing scripts

This commit is contained in:
yabinmeng 2023-06-26 12:05:02 -05:00
parent b672c057bc
commit ccb1e15d34
23 changed files with 714 additions and 380 deletions

View File

@ -1,5 +1,21 @@
#! /usr/local/bin/bash #! /usr/local/bin/bash
##
# Copyright (c) 2022-2023 nosqlbench
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source "${CUR_SCRIPT_FOLDER}/utilities.sh" source "${CUR_SCRIPT_FOLDER}/utilities.sh"
@ -86,12 +102,12 @@ sanityS4jMsgReceiverYamlFile="${CUR_SCRIPT_FOLDER}/sanity-msg-receiver-queue.yam
read -r -d '' nbs4jMsgSendCmd << EOM read -r -d '' nbs4jMsgSendCmd << EOM
java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \ java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \
cycles=1000 threads=4 num_conn=2 num_session=2 \ cycles=200 threads=4 num_conn=2 num_session=2 \
session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \ session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \
service_url=${brokerSvcUrl} \ service_url=${brokerSvcUrl} \
web_url=${webSvcUrl} \ web_url=${webSvcUrl} \
config=${sanityS4jCfgPropFile} \ config=${sanityS4jCfgPropFile} \
yaml=${sanityS4jMsgSenderYamlFile} workload=${sanityS4jMsgSenderYamlFile}
EOM EOM
debugMsg "nbs4jMsgSendCmd=${nbs4jMsgSendCmd}" "${sanityTestMainLogFile}" debugMsg "nbs4jMsgSendCmd=${nbs4jMsgSendCmd}" "${sanityTestMainLogFile}"
@ -111,12 +127,12 @@ sleep 5
read -r -d '' nbs4jMsgRecvCmd << EOM read -r -d '' nbs4jMsgRecvCmd << EOM
java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \ java -jar ${NB5JAR} run driver=s4j -vv --logs-dir=${nbExecLogDir} \
cycles=1000 threads=4 num_conn=2 num_session=2 \ cycles=200 threads=4 num_conn=2 num_session=2 \
session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \ session_mode=\"client_ack\" strict_msg_error_handling=\"false\" \
service_url=${brokerSvcUrl} \ service_url=${brokerSvcUrl} \
web_url=${webSvcUrl} \ web_url=${webSvcUrl} \
config=${sanityS4jCfgPropFile} \ config=${sanityS4jCfgPropFile} \
yaml=${sanityS4jMsgReceiverYamlFile} workload=${sanityS4jMsgReceiverYamlFile}
EOM EOM
debugMsg "nbs4jMsgRecvCmd=${nbs4jMsgRecvCmd}" "${sanityTestMainLogFile}" debugMsg "nbs4jMsgRecvCmd=${nbs4jMsgRecvCmd}" "${sanityTestMainLogFile}"

View File

@ -1,6 +1,22 @@
#! /usr/local/bin/bash #! /usr/local/bin/bash
DEBUG=false ##
# Copyright (c) 2022-2023 nosqlbench
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
DEBUG=true
## ##
# Show debug message # Show debug message
@ -89,7 +105,7 @@ replaceStringInFile() {
fi fi
else else
if ! [[ -z "${lineIdentifier// }" ]]; then if ! [[ -z "${lineIdentifier// }" ]]; then
sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${funcCfgJsonFileTgt} sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
else else
sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan} sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
fi fi

View File

@ -50,7 +50,6 @@
<version>${amqp.version}</version> <version>${amqp.version}</version>
</dependency> </dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency> <dependency>
<groupId>commons-beanutils</groupId> <groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId> <artifactId>commons-beanutils</artifactId>
@ -63,6 +62,12 @@
<artifactId>commons-configuration2</artifactId> <artifactId>commons-configuration2</artifactId>
<version>2.9.0</version> <version>2.9.0</version>
</dependency> </dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.13.0</version>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -27,14 +27,18 @@ import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param; import io.nosqlbench.api.config.standard.Param;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -71,13 +75,16 @@ public class S4RSpace implements AutoCloseable {
// Maximum number of AMQP channels per connection // Maximum number of AMQP channels per connection
private final int amqpConnChannelNum; private final int amqpConnChannelNum;
// Maximum number of AMQP exchanges per channel
private final int amqpChannelExchangeNum;
// Max number of queues (per exchange) // Max number of queues (per exchange)
// - only relevant with message receivers // - only relevant with message receivers
private final int amqpExchangeQueueNum; private final int amqpExchangeQueueNum;
// Max number of message clients (senders or receivers) // Max number of message clients (senders or receivers)
// - for senders, this is the number of message senders per exchange // - for senders, this is the number of message senders per exchange
// - for recievers, this is the number of message receivers per queue // - for receivers, this is the number of message receivers per queue
// (there could be multiple queues per exchange) // (there could be multiple queues per exchange)
private final int amqpMsgClntNum; private final int amqpMsgClntNum;
@ -91,18 +98,9 @@ public class S4RSpace implements AutoCloseable {
private final ConcurrentHashMap<Long, Connection> amqpConnections = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Long, Connection> amqpConnections = new ConcurrentHashMap<>();
/////////////////////////////////// // Amqp connection/chanel/exchange combination for a sender
// NOTE: Do NOT mix sender and receiver workload in one NB workload public record AmqpChannelKey(Long connId, Long channelId) { }
/////////////////////////////////// private final ConcurrentHashMap<AmqpChannelKey, Channel> amqpChannels = new ConcurrentHashMap<>();
// Amqp Channels for senders
public record AmqpSenderChannelKey(Long connId, Long channelId, Long senderId) { }
private final ConcurrentHashMap<AmqpSenderChannelKey, Channel> amqpSenderChannels = new ConcurrentHashMap<>();
// Amqp Channels for receivers
public record AmqpReceiverChannelKey(Long connId, Long channelId, Long queueId, Long consumerId) { }
private final ConcurrentHashMap<AmqpReceiverChannelKey, Channel> amqpReceiverChannels = new ConcurrentHashMap<>();
private final ConcurrentHashMap<AmqpReceiverChannelKey, Set<String>> amqpRecvChannelQueueSetMap = new ConcurrentHashMap<>();
// Whether to do strict error handling while sending/receiving messages // Whether to do strict error handling while sending/receiving messages
// - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing // - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing
@ -129,6 +127,8 @@ public class S4RSpace implements AutoCloseable {
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1")); NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
this.amqpConnChannelNum = this.amqpConnChannelNum =
NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1")); NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1"));
this.amqpChannelExchangeNum =
NumberUtils.toInt(cfg.getOptional("num_exchange").orElse("1"));
this.amqpExchangeQueueNum = this.amqpExchangeQueueNum =
NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1")); NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1"));
this.amqpMsgClntNum = this.amqpMsgClntNum =
@ -155,6 +155,12 @@ public class S4RSpace implements AutoCloseable {
.setDescription("Maximum number of AMQP connections.")) .setDescription("Maximum number of AMQP connections."))
.add(Param.defaultTo("num_channel", 1) .add(Param.defaultTo("num_channel", 1)
.setDescription("Maximum number of AMQP channels per connection")) .setDescription("Maximum number of AMQP channels per connection"))
.add(Param.defaultTo("num_exchange", 1)
.setDescription("Maximum number of AMQP exchanges per channel."))
.add(Param.defaultTo("num_queue", 1)
.setDescription("Max number of queues per exchange (only relevant for receivers)."))
.add(Param.defaultTo("num_msg_clnt", 1)
.setDescription("Max number of message clients per exchange (sender) or per queue (receiver)."))
.add(Param.defaultTo("max_op_time", 0) .add(Param.defaultTo("max_op_time", 0)
.setDescription("Maximum time (in seconds) to run NB Kafka testing scenario.")) .setDescription("Maximum time (in seconds) to run NB Kafka testing scenario."))
.add(Param.defaultTo("strict_msg_error_handling", false) .add(Param.defaultTo("strict_msg_error_handling", false)
@ -164,16 +170,10 @@ public class S4RSpace implements AutoCloseable {
public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); } public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); }
public Channel getAmqpSenderChannel( public Channel getAmqpChannels(
AmqpSenderChannelKey key, AmqpChannelKey key,
Supplier<Channel> channelSupplier) { Supplier<Channel> channelSupplier) {
return amqpSenderChannels.computeIfAbsent(key, __ -> channelSupplier.get()); return amqpChannels.computeIfAbsent(key, __ -> channelSupplier.get());
}
public Channel getAmqpReceiverChannel(
AmqpReceiverChannelKey key,
Supplier<Channel> channelSupplier) {
return amqpReceiverChannels.computeIfAbsent(key, __ -> channelSupplier.get());
} }
public long getActivityStartTimeMills() { return this.activityStartTimeMills; } public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
@ -183,7 +183,8 @@ public class S4RSpace implements AutoCloseable {
public String getAmqpExchangeType() { return amqpExchangeType; } public String getAmqpExchangeType() { return amqpExchangeType; }
public int getAmqpConnNum() { return this.amqpConnNum; } public int getAmqpConnNum() { return this.amqpConnNum; }
public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; } public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; }
public int getAmqpExchangeQueueNum() { return this.amqpConnNum; } public int getAmqpChannelExchangeNum() { return this.amqpChannelExchangeNum; }
public int getAmqpExchangeQueueNum() { return this.amqpExchangeQueueNum; }
public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; } public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; }
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; } public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
@ -218,19 +219,50 @@ public class S4RSpace implements AutoCloseable {
try { try {
s4rConnFactory = new ConnectionFactory(); s4rConnFactory = new ConnectionFactory();
String passWord = cfg.get("jwtToken"); String amqpServerHost = cfgMap.get("amqpSrvHost");
s4rConnFactory.setPassword(cfgMap.get("")); if (StringUtils.isBlank(amqpServerHost)) {
s4rConnFactory.setPassword(passWord); String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!";
throw new S4RAdapterInvalidParamException(errMsg);
String amqpServerHost = cfg.get("amqpSrvHost"); }
s4rConnFactory.setHost(amqpServerHost); s4rConnFactory.setHost(amqpServerHost);
int amqpServerPort = Integer.parseInt(cfg.get("amqpSrvPort")); String amqpSrvPortCfg = cfgMap.get("amqpSrvPort");
s4rConnFactory.setPort(amqpServerPort); if (StringUtils.isBlank(amqpSrvPortCfg)) {
String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!";
throw new S4RAdapterInvalidParamException(errMsg);
}
s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg));
String amqpVirtualHost = cfg.get("virtualHost"); String amqpVirtualHost = cfgMap.get("virtualHost");
if (StringUtils.isBlank(amqpVirtualHost)) {
String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!";
throw new S4RAdapterInvalidParamException(errMsg);
}
s4rConnFactory.setVirtualHost(amqpVirtualHost); s4rConnFactory.setVirtualHost(amqpVirtualHost);
String userNameCfg = cfgMap.get("amqpUser");
String passWordCfg = cfgMap.get("amqpPassword");
if (StringUtils.isNotBlank(passWordCfg)) {
String passWord = passWordCfg;
if (StringUtils.startsWith(passWordCfg, "file://")
&& StringUtils.length(passWordCfg) > 7) {
String jwtTokenFile = StringUtils.substring(passWordCfg, 7);
passWord = FileUtils.readFileToString(new File(jwtTokenFile), "UTF-8");
}
if (StringUtils.isNotBlank(passWord)) {
if (StringUtils.isBlank(userNameCfg)) {
s4rConnFactory.setUsername("");
}
s4rConnFactory.setPassword(passWord);
}
}
String useTlsCfg = cfgMap.get("useTls");
if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) {
s4rConnFactory.useSslProtocol();
}
for (int i = 0; i < getAmqpConnNum(); i++) { for (int i = 0; i < getAmqpConnNum(); i++) {
Connection connection = s4rConnFactory.newConnection(); Connection connection = s4rConnFactory.newConnection();
@ -243,7 +275,7 @@ public class S4RSpace implements AutoCloseable {
connection); connection);
} }
} }
} catch (IOException|TimeoutException ex) { } catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) {
logger.error("Unable to establish AMQP connections with the following configuration parameters: {}", logger.error("Unable to establish AMQP connections with the following configuration parameters: {}",
s4rClientConnInfo.toString()); s4rClientConnInfo.toString());
throw new S4RAdapterUnexpectedException(ex); throw new S4RAdapterUnexpectedException(ex);
@ -255,11 +287,7 @@ public class S4RSpace implements AutoCloseable {
try { try {
beingShutdown.set(true); beingShutdown.set(true);
for (Channel channel : amqpSenderChannels.values()) { for (Channel channel : amqpChannels.values()) {
channel.close();
}
for (Channel channel : amqpReceiverChannels.values()) {
channel.close(); channel.close();
} }

View File

@ -16,9 +16,7 @@
package io.nosqlbench.adapter.s4r.dispensers; package io.nosqlbench.adapter.s4r.dispensers;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.nosqlbench.adapter.s4r.S4RSpace; import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
@ -26,7 +24,6 @@ import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp; import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils; import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -46,8 +43,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
protected final Map<String, String> s4rConfMap = new HashMap<>(); protected final Map<String, String> s4rConfMap = new HashMap<>();
protected final String exchangeType; protected final String exchangeType;
protected final LongFunction<String> exchangeNameFunc;
protected AmqpBaseOpDispenser(final DriverAdapter adapter, protected AmqpBaseOpDispenser(final DriverAdapter adapter,
final ParsedOp op, final ParsedOp op,
final S4RSpace s4RSpace) { final S4RSpace s4RSpace) {
@ -63,7 +58,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap()); s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap());
this.exchangeType = s4RSpace.getAmqpExchangeType(); this.exchangeType = s4RSpace.getAmqpExchangeType();
this.exchangeNameFunc = lookupMandtoryStrOpValueFunc("exchange_name");
s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class))); s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class)));
s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class))); s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class)));
@ -86,32 +80,19 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
return stringLongFunction; return stringLongFunction;
} }
protected Channel getChannelWithExchange(Connection amqpConnection, protected void declareExchange(Channel channel, String exchangeName, String exchangeType) {
long connSeqNum, try {
long channelSeqNum, // Declaring the same exchange multiple times on one channel is considered as a no-op
String exchangeName) channel.exchangeDeclare(exchangeName, exchangeType);
throws IOException { if (logger.isTraceEnabled()) {
Channel channel = amqpConnection.createChannel(); logger.debug("Declared the AMQP exchange \"{}\" on channel \"{}\".",
if (channel == null) { exchangeName, channel);
throw new S4RAdapterUnexpectedException("No AMQP channel is available!"); }
} catch (IOException e) {
String errMsg = String.format("Failed to declare the AMQP exchange \"%s\" on channel \"%s\"!",
exchangeName, channel);
throw new S4RAdapterUnexpectedException(errMsg);
} }
if (logger.isDebugEnabled()) {
logger.debug("AMQP channel created -- {} [{},{}] ",
channel,
connSeqNum,
channelSeqNum);
}
AMQP.Exchange.DeclareOk declareOk =
channel.exchangeDeclare(exchangeName, s4rSpace.getAmqpExchangeType());
if (logger.isDebugEnabled()) {
logger.debug("AMQP exchange declared -- [name: {}, type: {}] {}",
exchangeName,
exchangeType,
declareOk);
}
return channel;
} }
protected long getConnSeqNum(long cycle) { protected long getConnSeqNum(long cycle) {
@ -122,9 +103,24 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
return (cycle / s4rSpace.getAmqpConnNum()) % s4rSpace.getAmqpConnChannelNum(); return (cycle / s4rSpace.getAmqpConnNum()) % s4rSpace.getAmqpConnChannelNum();
} }
protected String getEffectiveExchangeName(long cycle) { protected long getChannelExchangeSeqNum(long cycle) {
String exchangeNameInput = exchangeNameFunc.apply(cycle); return (cycle / ((long) s4rSpace.getAmqpConnNum() *
return (StringUtils.isBlank(exchangeNameInput) ? "exchange-" + getConnChannelSeqNum(cycle) : exchangeNameInput); s4rSpace.getAmqpConnChannelNum())
) % s4rSpace.getAmqpChannelExchangeNum();
}
protected String getEffectiveExchangeNameByCycle(long cycle) {
return getEffectiveExchangeName(
getConnSeqNum(cycle),
getConnChannelSeqNum(cycle),
getChannelExchangeSeqNum(cycle));
}
protected String getEffectiveExchangeName(long connSeqNum, long channelSeqNum, long exchangeSeqNum) {
return String.format(
"exchange-%d-%d-%d",
connSeqNum,
channelSeqNum,
exchangeSeqNum);
} }
public String getName() { public String getName() {

View File

@ -16,7 +16,6 @@
package io.nosqlbench.adapter.s4r.dispensers; package io.nosqlbench.adapter.s4r.dispensers;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.Connection;
import io.nosqlbench.adapter.s4r.S4RSpace; import io.nosqlbench.adapter.s4r.S4RSpace;
@ -25,12 +24,10 @@ import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgRecvOp;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp; import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.util.Set;
import java.util.function.LongFunction; import java.util.function.LongFunction;
public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
@ -38,82 +35,147 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("AmqpMsgRecvOpDispenser"); private final static Logger logger = LogManager.getLogger("AmqpMsgRecvOpDispenser");
private final LongFunction<String> bindingKeyFunc; private final LongFunction<String> bindingKeyFunc;
private final LongFunction<String> queueNameFunc;
public AmqpMsgRecvOpDispenser(DriverAdapter adapter, public AmqpMsgRecvOpDispenser(DriverAdapter adapter,
ParsedOp op, ParsedOp op,
S4RSpace s4rSpace) { S4RSpace s4rSpace) {
super(adapter, op, s4rSpace); super(adapter, op, s4rSpace);
queueNameFunc = lookupOptionalStrOpValueFunc("queue_name", null);
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
} }
private long getExchangeQueueSeqNum(long cycle) { private long getExchangeQueueSeqNum(long cycle) {
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) return (cycle / ((long) s4rSpace.getAmqpConnNum() *
% s4rSpace.getAmqpExchangeQueueNum(); s4rSpace.getAmqpConnChannelNum() *
s4rSpace.getAmqpChannelExchangeNum())
) % s4rSpace.getAmqpExchangeQueueNum();
} }
private long getQueueReceiverSeqNum(long cycle) { private long getQueueReceiverSeqNum(long cycle) {
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum() * s4rSpace.getAmqpExchangeQueueNum())) return (cycle / ((long) s4rSpace.getAmqpConnNum() *
% s4rSpace.getAmqpMsgClntNum(); s4rSpace.getAmqpConnChannelNum() *
s4rSpace.getAmqpChannelExchangeNum() *
s4rSpace.getAmqpExchangeQueueNum())
) % s4rSpace.getAmqpMsgClntNum();
} }
private String getEffectiveQueueName(long cycle) { private String getEffectiveQueueNameByCycle(long cycle) {
String queueNameInput = queueNameFunc.apply(cycle); return getEffectiveQueueName(
return (StringUtils.isBlank(queueNameInput) ? "queue-" + getExchangeQueueSeqNum(cycle) : queueNameInput); getConnSeqNum(cycle),
getConnChannelSeqNum(cycle),
getChannelExchangeSeqNum(cycle),
getExchangeQueueSeqNum(cycle));
}
private String getEffectiveQueueName(long connSeqNum, long channelSeqNum, long exchangeSeqNum, long queueSeqNum) {
return "queue-" + connSeqNum + "-" + channelSeqNum + "-" + exchangeSeqNum + "-" + queueSeqNum;
} }
private Channel getAmqpChannelQueueForReceiver(long cycle, private String getEffectiveReceiverName(long cycle) {
String exchangeName, return getEffectiveReceiverName(
String queueName) { getConnSeqNum(cycle),
getConnChannelSeqNum(cycle),
getChannelExchangeSeqNum(cycle),
getExchangeQueueSeqNum(cycle),
getQueueReceiverSeqNum(cycle));
}
private String getEffectiveReceiverName(long connSeqNum,
long channelSeqNum,
long exchangeSeqNum,
long queueSeqNum,
long receiverSeqNum) {
return String.format(
"receiver-%d-%d-%d-%d-%d",
connSeqNum,
channelSeqNum,
exchangeSeqNum,
queueSeqNum,
receiverSeqNum);
}
private Channel getAmqpChannelForReceiver(long cycle) {
long connSeqNum = getConnSeqNum(cycle); long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle);
long queueSeqNum = getExchangeQueueSeqNum(cycle);
long receiverSeqNum = getQueueReceiverSeqNum(cycle);
Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
S4RSpace.AmqpChannelKey amqpConnChannelKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
S4RSpace.AmqpReceiverChannelKey amqpConnChannelKey = return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> {
new S4RSpace.AmqpReceiverChannelKey(connSeqNum, channelSeqNum, queueSeqNum, receiverSeqNum);
return s4rSpace.getAmqpReceiverChannel(amqpConnChannelKey, () -> {
Channel channel = null; Channel channel = null;
try { try {
channel = getChannelWithExchange( channel = amqpConnection.createChannel();
amqpConnection,
connSeqNum,
channelSeqNum,
exchangeName);
AMQP.Queue.DeclareOk declareOk =
channel.queueDeclare(queueName, true, true, true, null);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("AMQP queue declared -- [exchange name: {}, queue name: {}] {}", logger.debug("Created channel for amqp connection: {}, channel: {}",
exchangeName, amqpConnection, channel);
queueName, }
declareOk); }
catch (IOException ex) {
// Do not throw exception here, just log it and return null
if (logger.isDebugEnabled()) {
logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex);
} }
} catch (IOException ex) {
throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!");
} }
return channel; return channel;
}); });
} }
@Override @Override
public S4RTimeTrackOp apply(long cycle) { public S4RTimeTrackOp apply(long cycle) {
Channel channel = null; Channel channel = getAmqpChannelForReceiver(cycle);
if (channel == null) {
throw new S4RAdapterUnexpectedException(
String.format(
"Failed to get AMQP channel for receiver %s [%d]!",
getEffectiveReceiverName(cycle),
cycle));
}
String exchangeName = getEffectiveExchangeName(cycle); String exchangeName = getEffectiveExchangeNameByCycle(cycle);
String queueName = getEffectiveQueueName(cycle); declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType());
boolean durable = true;
boolean exclusive = true;
boolean autoDelete = false;
String queueName = getEffectiveQueueNameByCycle(cycle);
String bindingKey = bindingKeyFunc.apply(cycle);
try {
channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
if (logger.isTraceEnabled()) {
logger.debug("AMQP queue is declared - \"{} ({}/{}/{})\" on exchange \"{}\" for a receiver!",
queueName,
durable,
exclusive,
autoDelete,
exchangeName);
}
}
catch (IOException ex) {
throw new S4RAdapterUnexpectedException(
String.format(
"Unable to declare the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" for a receiver!",
queueName, durable, exclusive, autoDelete, exchangeName)
);
}
try { try {
channel = getAmqpChannelQueueForReceiver(cycle, exchangeName, queueName); // Binding the same queue multiple times on one exchange is considered as a no-op
channel.queueBind(queueName, exchangeName, bindingKey);
if (logger.isTraceEnabled()) {
logger.debug("AMQP queue is bound - \"{} ({}/{}/{})\" on exchange \"{}\" with binding key \"{}\"!",
queueName,
durable,
exclusive,
autoDelete,
exchangeName,
bindingKey);
}
} }
catch (Exception ex) { catch (IOException ex) {
throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel!"); throw new S4RAdapterUnexpectedException(
String.format(
"Unable to bind the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" with binding key \"%s\"!",
queueName, durable, exclusive, autoDelete, exchangeName, bindingKey)
);
} }
return new OpTimeTrackAmqpMsgRecvOp( return new OpTimeTrackAmqpMsgRecvOp(
@ -121,7 +183,6 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
s4rSpace, s4rSpace,
channel, channel,
exchangeName, exchangeName,
queueName, queueName);
bindingKeyFunc.apply(cycle));
} }
} }

View File

@ -68,7 +68,8 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
.orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label); .orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label);
if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) { if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) {
throw new S4RAdapterInvalidParamException("confirm_mode", throw new S4RAdapterInvalidParamException("confirm_mode",
"Must be one following valid values: '" + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); "The provided value \"" + confirmMode + "\" is not one of following valid values: '" +
S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
} }
confirmBatchNum = parsedOp confirmBatchNum = parsedOp
@ -86,62 +87,87 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
} }
private long getExchangeSenderSeqNum(long cycle) { private long getExchangeSenderSeqNum(long cycle) {
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) return (cycle / ((long) s4rSpace.getAmqpConnNum() *
% s4rSpace.getAmqpMsgClntNum(); s4rSpace.getAmqpConnChannelNum() *
s4rSpace.getAmqpChannelExchangeNum())
) % s4rSpace.getAmqpMsgClntNum();
} }
private Channel getAmqpChannelForSender(long cycle, private String getEffectiveSenderNameByCycle(long cycle) {
String exchangeName) { return getEffectiveSenderNameByCycle(
getConnSeqNum(cycle),
getConnChannelSeqNum(cycle),
getChannelExchangeSeqNum(cycle),
getExchangeSenderSeqNum(cycle));
}
private String getEffectiveSenderNameByCycle(long connSeqNum,
long channelSeqNum,
long exchangeSeqNum,
long senderSeqNum) {
return String.format(
"sender-%d-%d-%d-%d",
connSeqNum,
channelSeqNum,
exchangeSeqNum,
senderSeqNum);
}
private Channel getAmqpChannelForSender(long cycle) {
long connSeqNum = getConnSeqNum(cycle); long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle);
long senderSeqNum = getExchangeSenderSeqNum(cycle);
Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
S4RSpace.AmqpChannelKey senderKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
S4RSpace.AmqpSenderChannelKey amqpConnChannelKey = return s4rSpace.getAmqpChannels(senderKey, () -> {
new S4RSpace.AmqpSenderChannelKey(connSeqNum, channelSeqNum, senderSeqNum); Channel channel = null;
return s4rSpace.getAmqpSenderChannel(amqpConnChannelKey, () -> {
Channel channel;
try { try {
channel = getChannelWithExchange( channel = amqpConnection.createChannel();
amqpConnection, if (logger.isDebugEnabled()) {
connSeqNum, logger.debug("Created channel for amqp connection: {}, channel: {}",
channelSeqNum, amqpConnection, channel);
exchangeName); }
}
catch (IOException ex) {
// Do not throw exception here, just log it and return null
if (logger.isDebugEnabled()) {
logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex);
}
}
if (publisherConfirm) { try {
if ((channel != null) && publisherConfirm) {
channel.confirmSelect(); channel.confirmSelect();
boolean asyncConfirm = false;
if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) { if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) {
asyncConfirm = true;
channel.addConfirmListener((sequenceNumber, multiple) -> { channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed // code when message is confirmed
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.debug("Async ack of message publish received: {}, {}", logger.debug("Async ack received for a published message: {}, {}",
sequenceNumber, multiple); sequenceNumber, multiple);
} }
}, (sequenceNumber, multiple) -> { }, (sequenceNumber, multiple) -> {
// code when message is nack-ed // code when message is nack-ed
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.debug("Async n-ack of message publish received: {}, {}", logger.debug("Async n-ack received of a published message: {}, {}",
sequenceNumber, multiple); sequenceNumber, multiple);
} }
}); });
} }
if (logger.isDebugEnabled()) { if (logger.isTraceEnabled()) {
logger.debug("Publisher Confirms enabled on AMQP channel (sync: {}) -- {}", logger.debug("Publisher Confirms is enabled on AMQP channel: {}({}), {}",
!asyncConfirm, confirmMode,
confirmBatchNum,
channel); channel);
} }
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!"); throw new S4RAdapterUnexpectedException(
"Failed to enable publisher acknowledgement on the AMQP channel (" +
channel + ")!");
} }
return channel; return channel;
@ -155,15 +181,17 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!"); throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!");
} }
Channel channel; Channel channel = getAmqpChannelForSender(cycle);
String exchangeName = getEffectiveExchangeName(cycle); if (channel == null) {
throw new S4RAdapterUnexpectedException(
String.format(
"Failed to get AMQP channel for sender %s [%d]!",
getEffectiveSenderNameByCycle(cycle),
cycle));
}
try { String exchangeName = getEffectiveExchangeNameByCycle(cycle);
channel = getAmqpChannelForSender(cycle, exchangeName); declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType());
}
catch (Exception ex) {
throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel for sending messages!");
}
return new OpTimeTrackAmqpMsgSendOp( return new OpTimeTrackAmqpMsgSendOp(
s4rAdapterMetrics, s4rAdapterMetrics,

View File

@ -31,27 +31,16 @@ import java.nio.charset.StandardCharsets;
public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp"); private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp");
private final String queueName; private final String queueName;
private final String bindingKey;
public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics, public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics,
S4RSpace s4rSpace, S4RSpace s4rSpace,
Channel channel, Channel channel,
String exchangeName, String exchangeName,
String queueName, String queueName) {
String bindingKey) {
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); super(s4rAdapterMetrics, s4rSpace, channel, exchangeName);
this.queueName = queueName; this.queueName = queueName;
this.bindingKey = bindingKey;
try {
channel.queueBind(queueName, exchangeName, bindingKey);
}
catch (IOException ex) {
throw new S4RAdapterUnexpectedException("Unable to bind queue (\"" + queueName + "\") to " +
"exchange (\"" + exchangeName + "\")!");
}
} }
@Override @Override
@ -59,20 +48,26 @@ public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
try { try {
Consumer receiver = new DefaultConsumer(channel) { Consumer receiver = new DefaultConsumer(channel) {
@Override @Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, public void handleDelivery(
byte[] body) throws IOException { String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
String msgPayload = new String(body, StandardCharsets.UTF_8);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
String msgPayload = new String(body, StandardCharsets.UTF_8); logger.trace(
logger.trace("Successfully received message ({}) via consumer ({}) in the current channel: {}", "Successfully received message ({}) via consumer ({}/{}/{}) in the current channel: {}",
msgPayload, msgPayload,
consumerTag, consumerTag,
routingKey,
contentType,
channel); channel);
} }
} }
}; };
channel.basicConsume(queueName, receiver); channel.basicConsume(queueName, true, receiver);
} }
catch (IOException e) { catch (IOException e) {
throw new S4RAdapterUnexpectedException( throw new S4RAdapterUnexpectedException(

View File

@ -28,6 +28,7 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -40,8 +41,8 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
private final String confirmMode; private final String confirmMode;
private final int confirmBatchNum; private final int confirmBatchNum;
private static final ThreadLocal<Integer> private static final ConcurrentHashMap<Channel, Integer>
publishConfirmBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); channelPublishConfirmBathTracking = new ConcurrentHashMap<>();
public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics, public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics,
S4RSpace s4rSpace, S4RSpace s4rSpace,
@ -73,34 +74,40 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
routingKey, routingKey,
null, null,
msgPayload.getBytes(StandardCharsets.UTF_8)); msgPayload.getBytes(StandardCharsets.UTF_8));
if (logger.isTraceEnabled()) {
logger.trace("Successfully published message (({}) {}) via the current channel: {}",
cycle, msgPayload, channel);
}
if (publishConfirm) { if (publishConfirm) {
// Individual publish confirm // Individual publish confirm
if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) { if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) {
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
if (logger.isTraceEnabled()) {
logger.debug("Sync ack received for an individual published message: {}", cycle);
}
} }
// Batch publish confirm // Batch publish confirm
else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) { else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) {
int publishConfirmTrackingCnt = publishConfirmBatchTrackingCnt.get(); int publishConfirmTrackingCnt =
channelPublishConfirmBathTracking.getOrDefault(channel, 0);
if ( (publishConfirmTrackingCnt > 0) && if ( (publishConfirmTrackingCnt > 0) &&
( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) || ( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) ||
(publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) { (publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) {
synchronized (this) { channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); if (logger.isTraceEnabled()) {
logger.debug("Sync ack received for a batch of published message: {}, {}",
cycle, publishConfirmTrackingCnt);
} }
} }
else { else {
publishConfirmBatchTrackingCnt.set(publishConfirmTrackingCnt+1); channelPublishConfirmBathTracking.put(channel, publishConfirmTrackingCnt+1);
} }
} }
// Async publish confirm // Async publish confirm
// - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser' // - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser'
} }
if (logger.isTraceEnabled()) {
logger.trace("Successfully published message ({}) via the current channel: {}",
msgPayload, channel);
}
} }
catch (IllegalStateException ex) { catch (IllegalStateException ex) {
throw new S4RAdapterUnexpectedException( throw new S4RAdapterUnexpectedException(

View File

@ -30,18 +30,6 @@ import java.util.stream.Stream;
public class S4RAdapterUtil { public class S4RAdapterUtil {
private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class); private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class);
///////
// Valid document level parameters for JMS NB yaml file
public enum DOC_LEVEL_PARAMS {
// Blocking message producing or consuming
ASYNC_API("async_api");
public final String label;
DOC_LEVEL_PARAMS(final String label) {
this.label = label;
}
}
public enum AMQP_EXCHANGE_TYPES { public enum AMQP_EXCHANGE_TYPES {
DIRECT("direct"), DIRECT("direct"),
FANOUT("fanout"), FANOUT("fanout"),
@ -83,19 +71,14 @@ public class S4RAdapterUtil {
} }
} }
public static String getValidAmqpPublisherConfirmModeList() { public static String getValidAmqpPublisherConfirmModeList() {
return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", "); return StringUtils.join(AMQP_PUB_CONFIRM_MODE.LABELS, ", ");
} }
// At least 20 messages in a publishing batch // At least 20 messages in a publishing batch
public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 20; public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 10;
public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100; public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100;
public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000; public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000;
public static Map<String, String> convertJsonToMap(final String jsonStr) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonStr, new TypeReference<Map<String, String>>(){});
}
public static void pauseCurThreadExec(final int pauseInSec) { public static void pauseCurThreadExec(final int pauseInSec) {
if (0 < pauseInSec) try { if (0 < pauseInSec) try {
Thread.sleep(pauseInSec * 1000L); Thread.sleep(pauseInSec * 1000L);

View File

@ -1,62 +1,92 @@
# Overview - [1. Overview](#1-overview)
- [2. NB S4R Usage](#2-nb-s4r-usage)
- [2.1. CLI Examples](#21-cli-examples)
- [2.2. CLI parameters](#22-cli-parameters)
- [2.3. Workload Definition](#23-workload-definition)
- [2.4. Configuration Properties](#24-configuration-properties)
- [2.4.1. Global Properties File](#241-global-properties-file)
- [2.4.2. Scenario Document Level Properties](#242-scenario-document-level-properties)
This NB Kafka adapter allows publishing messages to or consuming messages from ---
* a Kafka cluster, or
* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar.
At high level, this adapter supports the following Kafka functionalities # 1. Overview
* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers)
* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits)
* auto message commit
* manual message commit with a configurable number of message commits in one batch
* Kafka Transaction support
## Example NB Yaml This NB S4R adapter allows sending messages to or receiving messages from
* [kafka_producer.yaml](./s4r_producer.yaml) * an AMQP 0-9-1 based server (e.g. RabbitMQ), or
* * a Pulsar cluster with [S4R](https://github.com/datastax/starlight-for-rabbitmq) AMQP (0-9-1) Protocol handler for Pulsar.
* [kafka_consumer.yaml](./s4r_consumer.yaml)
# Usage At high level, this adapter supports the following AMQP 0-9-1 functionalities
* Creating AMQP connections and channels
* Declaring AMQP exchanges
* The following exchange types are supported: `direct`, `fanout`, `topic`, and `headers`
* Sending messages to AMQP exchanges with sync. or async. publisher confirms
* For sync confirms, it supports both single and batch confirms
* Supports message-send based on routing keys
* Declaring and binding AMQP queues
* Supports message-receive based on binding keys
* Receiving messages from AMQP queues with async. consumer acks
# 2. NB S4R Usage
## 2.1. CLI Examples
```bash ```bash
## Kafka Producer ## AMQP Message Sender
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 $ <nb_cmd> run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \
threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \
workload=./s4r_msg_sender.yaml \
config=./s4r_config.properties
## Kafka Consumer ## AMQP Message Receiver
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 $ <nb_cmd> run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \
threads=8 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 \
workload=./s4r_msg_receiver.yaml \
config=./s4r_config.properties
``` ```
## NB Kafka adapter specific CLI parameters ## 2.2. CLI parameters
* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from The following CLI parameters are unique to the S4R adapter:
* For producer workload, this is the number of the producer threads to publish messages to the same topic
* Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe)
* `threads` and `num_clnt` values MUST be the same.
* For consumer workload, this is the partition number of a topic
* Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number.
* Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe)
* `threads` MUST be equal to `num_clnt`*`num_cons_grp`
* `num_cons_grp`: the number of consumer groups * `num_conn`: the number of AMQP connections to create
* Only relevant for consumer workload * `num_channel`: the number of AMQP channels to create for each connection
* `num_exchange`: the number of AMQP exchanges to create for each channel
* `num_queue`: the number of AMQP queues to create for each channel (only relevant for message receiver workload)
* `num_msg_client`: the number of message clients to create for each channel
* for message sender workload, it is the number of message publishers for each exchange
* for message receiver workload, it is the number of message consumers for each queue
## 2.3. Workload Definition
The example workload YAML files can be found from:
For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported: * [s4r_msg_sender.yaml](s4r_msg_sender.yaml)
* [s4r_msg_receiver.yaml](s4r_msg_receiver.yaml)
* `async_api` (boolean): ## 2.4. Configuration Properties
* When true, use async Kafka client API.
* `seq_tracking` (boolean): ### 2.4.1. Global Properties File
* When true, a sequence number is created as part of each message's properties
* This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully. A global S4R properties file can be specified via the `config` CLI parameter. It includes the following required properties:
* `seqerr_simu`: * `amqpSrvHost`: AMQP server host (e.g. An Astra Streaming cluster with S4R enabled)
* A list of error simulation types separated by comma (,) * `amqpSrvPort`: AMQP server port (for S4R enabled Astra Streaming, it is 5671)
* Valid error simulation types * `virtualHost`: AMQP server virtual host (for S4R enabled Astra Streaming, it is "<tenant>/rabbitmq")
* `out_of_order`: simulate message out of sequence * `amqpUser`: AMQP user (for S4R enabled Astra Streaming, it is an empty string)
* `msg_loss`: simulate message loss * `amqpPassword`: AMQP password (for S4R enabled Astra Streaming, it is the JWT token file path)
* `msg_dup`: simulate message duplication * `useTls`: whether to use TLS (for S4R enabled Astra Streaming, it is true)
* This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments. * `exchangeType`: AMQP exchange type (e.g. `direct`, `fanout`, `topic`, or `headers`)
* `e2e_starting_time_source`:
* Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. An example of this file can be found from: [s4r_config.properties](./s4r_config.properties)
* The possible values for `e2e_starting_time_source`:
* `message_publish_time` : uses the message publishing timestamp as the starting time. The message publishing time, in this case, [is computed by the Kafka client on record generation](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). This is the case, as [`CreateTime` is the default](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-type). ### 2.4.2. Scenario Document Level Properties
For message sender workload, the following Document level configuration parameters are supported in the YAML file:
* `publisher_confirm`: whether to use publisher confirms
* `confirm_mode`: When `publisher_confirm` is true, the following 3 confirm modes are supported:
* `individual`: wait for confirm individually
* `batch`: wait for confirm in batch
* `async`: [default] no wait for confirm
* `confirm_batch_num`: batch size for waiting for **sync** publisher confirms
* Only relevant when `publisher_confirm` is true and `confirm_mode` is "batch"
* `dft_confirm_timeout_ms`: batch size for waiting for publisher confirms
* Only relevant when `publisher_confirm` is true and `confirm_mode` is **NOT** "async"

View File

@ -1,24 +0,0 @@
#!/usr/local/bin/bash
#
# Copyright (c) 2023 nosqlbench
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
: "${SKIP_TESTS:=1}"
(
cd "$(git rev-parse --show-toplevel)" && \
mvn clean install "-DskipTests" -pl adapters-api,adapter-s4r,nb5 && \
[[ ${SKIP_TESTS} -ne 1 ]] && \
mvn test -pl adapters-api,adapter-s4r
)

View File

@ -1,4 +0,0 @@
name
usa
canada
german
1 name
2 usa
3 canada
4 german

View File

@ -1,3 +0,0 @@
queue1
queue2
queue3
1 queue1
2 queue2
3 queue3

View File

@ -18,7 +18,13 @@
# Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled # Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled
amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com
amqpSrvPort=5671 amqpSrvPort=5671
virtualHost=<as_tenant_name>/rabbitmq virtualHost=<as_tenant>/rabbitmq
jwtToken=<jwt_token_value> # For Astra Streaming with S4R, the user an empty string
amqpUser=
# For Astra Streaming with S4R, the password is the JWT token in the format of
# file:///path/to/astra_streaming_jwt_token_file
amqpPassword=file://</path/to/as_nbtest_token_file>
# when using Astra Streaming with S4R, this needs to be set to true
useTls=true
# valid values: direct, fanout, topic, headers # valid values: direct, fanout, topic, headers
exchangeType=direct exchangeType=direct

View File

@ -1,18 +0,0 @@
bindings:
myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name')
myqueue: CSVFrequencySampler('csv/queue_names.csv', 'name')
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
# Doc-level parameters (must be static)
params:
blocks:
msg-recv-block:
ops:
AmqpMsgReceiver:
#exchange_names: "{myexname}"
exchange_name: "alpha"
queue_name: "{myqueue}"
binding_key: "{myroutingkey}"

View File

@ -0,0 +1,12 @@
bindings:
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
# Doc-level parameters (must be static)
params:
blocks:
msg-recv-block:
ops:
op1:
AmqpMsgReceiver: ""
binding_key: "{myroutingkey}"

View File

@ -0,0 +1,137 @@
#! /usr/local/bin/bash
##
# Copyright (c) 2022-2023 nosqlbench
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
CUR_SCRIPT_FOLDER=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source "${CUR_SCRIPT_FOLDER}/utilities.sh"
echo
##
# Show usage info
#
usage() {
echo
echo "Usage: e2e-nbs4r-sanity.sh [-h]"
echo " -j </full/path/to/jwt/token/file>]"
echo " -h : Show usage info"
echo " -j : JWT token file full path. Default to 'jwt.token' in the same directory!"
echo
}
if [[ $# -gt 3 ]]; then
usage
errExit 10 "Incorrect input parameter count!"
fi
jwtTokenFile="./jwt.token"
while [[ "$#" -gt 0 ]]; do
case $1 in
-h) usage; exit 0 ;;
-j) jwtTokenFile=$2; shift ;;
*) errExit 20 "Unknown input parameter passed: $1"; ;;
esac
shift
done
mkdir -p "./logs/nb5-exec"
mainLogDir="${CUR_SCRIPT_FOLDER}/logs"
nbExecLogDir="${mainLogDir}/nb5-exec"
# 2022-08-19 11:40:23
startTime=$(date +'%Y-%m-%d %T')
# 20220819114023
startTime2=${startTime//[: -]/}
sanityTestMainLogFile="${mainLogDir}/e2e-nbs4r-sanity-${startTime2}.log"
echo > "${sanityTestMainLogFile}"
debugMsg "jwtTokenFile=${jwtTokenFile}" "${sanityTestMainLogFile}"
if ! [[ -f "${jwtTokenFile}" ]]; then
errExit 30 \
"Can't find the required JWT token file (for Pulsar cluster connection) at the specified location: \"" +
jwtTokenFile + "\"!" \
"${sanityTestMainLogFile}"
fi
sanityS4rCfgPropFileName="e2e-sanity-config.properties"
sanityS4rCfgPropFile="${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}"
if [[ ! -f "${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}.tmpl" ]]; then
errExit 30 \
"Can't find the required sanity test config file template at the specified location: \"" +
"${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}.tmpl" + "\"!" \
"${sanityTestMainLogFile}"
fi
cp -rf "${CUR_SCRIPT_FOLDER}/${sanityS4rCfgPropFileName}.tmpl" "${sanityS4rCfgPropFile}"
if [[ -n "${jwtTokenFile// }" ]]; then
replaceStringInFile "<jwt_token_file_tmpl>" "file://${jwtTokenFile}" "${sanityS4rCfgPropFile}"
else
replaceStringInFile "<jwt_token_file_tmpl>" "" "${sanityS4rCfgPropFile}"
fi
NB5JAR="${CUR_SCRIPT_FOLDER}/../../../../../nb5/target/nb5.jar"
sanityS4rSenderYamlFile="${CUR_SCRIPT_FOLDER}/e2e-sanity-sender.yaml"
sanityS4rReceiverYamlFile="${CUR_SCRIPT_FOLDER}/e2e-sanity-receiver.yaml"
{
echo;
echo "======================================================================================================";
echo "Starting the sanity test for the NoSQLBench S4J adapter at ${startTime} ...";
echo;
echo " >>> Kick off an S4R message sending workload ..."
echo;
} >> "${sanityTestMainLogFile}"
read -r -d '' nbs4rMsgSendCmd << EOM
java -jar ${NB5JAR} run driver=s4r -vv --logs-dir=${nbExecLogDir} strict_msg_error_handling=\"false\" \
cycles=1000 threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \
config=${sanityS4rCfgPropFile} \
workload=${sanityS4rSenderYamlFile}
EOM
debugMsg "nbs4rMsgSendCmd=${nbs4rMsgSendCmd}" "${sanityTestMainLogFile}"
eval '${nbs4rMsgSendCmd}'
if [[ $? -ne 0 ]]; then
errExit 40 "Failed to kick off the S4R message sending workload!" "${sanityTestMainLogFile}"
fi
# pause 5 seconds before kicking off the message sending workload
sleep 5
{
echo;
echo " >>> Kick off an S4J message receiving workload after 30 seconds..."
echo;
} >> "${sanityTestMainLogFile}"
read -r -d '' nbs4rMsgRecvCmd << EOM
java -jar ${NB5JAR} run driver=s4r -vv --logs-dir=${nbExecLogDir} \
cycles=1000 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2\
config=${sanityS4rCfgPropFile} \
workload=${sanityS4rReceiverYamlFile}
EOM
debugMsg "nbs4rMsgRecvCmd=${nbs4rMsgRecvCmd}" "${sanityTestMainLogFile}"
eval '${nbs4rMsgRecvCmd}'
if [[ $? -ne 0 ]]; then
errExit 40 "Failed to kick off the S4R message receiving workload!" "${sanityTestMainLogFile}"
fi
echo "NB S4J workload sanity check passed!" >> "${sanityTestMainLogFile}"
echo

View File

@ -0,0 +1,11 @@
##
# Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled
amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com
amqpSrvPort=5671
virtualHost=nbtest/rabbitmq
amqpUser=
amqpPassword=<jwt_token_file_tmpl>
# when using Astra Streaming with S4R, this needs to be set to true
useTls=true
# valid values: direct, fanout, topic, headers
exchangeType=direct

View File

@ -0,0 +1,13 @@
bindings:
mytext_val: AlphaNumericString(100)
params:
publisher_confirm: false
blocks:
msg-send-block:
ops:
op1:
AmqpMsgSender: ""
routing_key: "routing-test"
message: "{mytext_val}"

View File

@ -0,0 +1,113 @@
#! /usr/local/bin/bash
##
# Copyright (c) 2022-2023 nosqlbench
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
DEBUG=true
##
# Show debug message
# - $1 : the message to show
# - $2 : (Optional) the file to log the message to
debugMsg() {
if [[ "${DEBUG}" == "true" ]]; then
local msg=${1}
local file=${2}
if [[ -z "${file}" ]]; then
echo "[Debug] ${msg}"
else
echo "[Debug] ${msg}" >> "${file}"
fi
fi
}
##
# - $1 : the exit code
# - $2 : the message to show
# - $3 : (Optional) the file to log the message to
errExit() {
local exitCode=${1}
local msg=${2}
local file=${3}
if [[ -z "${file}" ]]; then
echo "[Error] ${msg}"
else
echo "[Error] ${msg}" >> "${file}"
fi
exit ${exitCode}
}
##
# Read the properties file and returns the value based on the key
# 2 input parameters:
# - 1st parameter: the property file to scan
# - 2nd parameter: the key to search for
getPropVal() {
local propFile=$1
local searchKey=$2
local value=$(grep "${searchKey}" ${propFile} | grep -Ev "^#|^$" | cut -d'=' -f2)
echo $value
}
##
# Check if the sed being used is GNU sed
isGnuSed() {
local gnu_sed=$(sed --version 2>&1 | grep -v 'illegal\|usage\|^\s' | grep "GNU sed" | wc -l)
echo ${gnu_sed}
}
##
# Replace the occurrence of a string place holder with a specific value in a file
# Four input parameters:
# - 1st parameter: the place holder string to be replaced
# - 2nd parameter: the value string to replace the place holder
# - 3rd parameter: the file
# - 4th parameter: (Optional) a particular line identifier to replace.
# if specified, only replace the place holder in the matching line
# otherwise, replace all occurrence in the file
#
# TBD: use this function to hide GNU difference (Mac vs Linux, GNU or not)
#
replaceStringInFile() {
local placeHolderStr=${1}
local valueStr=${2}
local fileToScan=${3}
local lineIdentifier=${4}
# in case '/' is part of the string
placeHolderStr=$(echo ${placeHolderStr} | sed 's/\//\\\//g')
valueStr=$(echo ${valueStr} | sed 's/\//\\\//g')
gnuSed=$(isGnuSed)
if [[ "$OSTYPE" == "darwin"* && ${gnuSed} -eq 0 ]]; then
if ! [[ -z "${lineIdentifier// }" ]]; then
sed -i '' "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
else
sed -i '' "s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
fi
else
if ! [[ -z "${lineIdentifier// }" ]]; then
sed -i "${lineIdentifier}s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
else
sed -i "s/${placeHolderStr}/${valueStr}/g" ${fileToScan}
fi
fi
}

View File

@ -1,36 +0,0 @@
#!/usr/local/bin/bash
#
# Copyright (c) 2023 nosqlbench
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
: "${REBUILD:=1}"
: "${CYCLES:=1000000000}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
if [[ ${REBUILD} -eq 1 ]]; then
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
fi
java -jar nb5/target/nb5.jar \
run \
driver=s4r \
-vv \
--report-interval 5 \
--docker-metrics \
cycles=${CYCLES} \
threads=1 \
num_clnt=1 \
num_cons_grp=1 \
yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \
config="${SCRIPT_DIR}/kafka_config.properties" \
bootstrap_server=PLAINTEXT://localhost:9092

View File

@ -1,38 +0,0 @@
#!/usr/local/bin/bash
#
# Copyright (c) 2023 nosqlbench
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
: "${REBUILD:=1}"
: "${CYCLES:=1000000000}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
if [[ ${REBUILD} -eq 1 ]]; then
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
fi
while [[ 1 -eq 1 ]]; do
java -jar nb5/target/nb5.jar \
run \
driver=s4r \
-vv \
--report-interval 5 \
--docker-metrics \
cycles="${CYCLES}" \
threads=1 \
num_clnt=1 \
yaml="${SCRIPT_DIR}/kafka_producer.yaml" \
config="${SCRIPT_DIR}/kafka_config.properties" \
bootstrap_server=PLAINTEXT://localhost:9092
sleep 10
done