mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge pull request #1353 from yabinmeng/main
Update NBS4J version to 4.0.1
This commit is contained in:
commit
3bfc9b10f2
@ -34,10 +34,7 @@
|
|||||||
</description>
|
</description>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<!-- Pulsar version 2.11.x causes NB S4J failure due to version conflict (S4J right now is on version 2.10.3)
|
<pulsar.version>3.0.0</pulsar.version>
|
||||||
<pulsar.version>2.11.1</pulsar.version>
|
|
||||||
-->
|
|
||||||
<pulsar.version>2.10.4</pulsar.version>
|
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
@ -16,25 +16,21 @@
|
|||||||
|
|
||||||
package io.nosqlbench.adapter.pulsar.dispensers;
|
package io.nosqlbench.adapter.pulsar.dispensers;
|
||||||
|
|
||||||
import com.codahale.metrics.Timer;
|
|
||||||
import com.codahale.metrics.Timer.Context;
|
import com.codahale.metrics.Timer.Context;
|
||||||
import io.nosqlbench.adapter.pulsar.PulsarSpace;
|
import io.nosqlbench.adapter.pulsar.PulsarSpace;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
||||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.DOC_LEVEL_PARAMS;
|
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.DOC_LEVEL_PARAMS;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil;
|
|
||||||
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE;
|
import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.pulsar.client.api.PulsarClient;
|
import org.apache.pulsar.client.api.PulsarClient;
|
||||||
import org.apache.pulsar.client.api.PulsarClientException;
|
|
||||||
import org.apache.pulsar.client.api.Schema;
|
import org.apache.pulsar.client.api.Schema;
|
||||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.function.LongFunction;
|
import java.util.function.LongFunction;
|
||||||
import java.util.function.Predicate;
|
import java.util.function.Predicate;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
@ -93,13 +89,10 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser {
|
|||||||
.newTransaction()
|
.newTransaction()
|
||||||
.build()
|
.build()
|
||||||
.get();
|
.get();
|
||||||
} catch (final ExecutionException | InterruptedException err) {
|
} catch (Exception err) {
|
||||||
if (PulsarClientOpDispenser.logger.isWarnEnabled())
|
if (PulsarClientOpDispenser.logger.isWarnEnabled())
|
||||||
PulsarClientOpDispenser.logger.warn("Error while starting a new transaction", err);
|
PulsarClientOpDispenser.logger.warn("Error while starting a new transaction", err);
|
||||||
throw new RuntimeException(err);
|
throw new PulsarAdapterUnexpectedException(err);
|
||||||
} catch (final PulsarClientException err) {
|
|
||||||
throw new RuntimeException("Transactions are not enabled on Pulsar Client, " +
|
|
||||||
"please set client.enableTransaction=true in your Pulsar Client configuration");
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -37,7 +37,7 @@
|
|||||||
</description>
|
</description>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<s4j.version>3.2.1</s4j.version>
|
<s4j.version>4.0.1</s4j.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
@ -58,7 +58,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
|
|||||||
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
|
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
|
||||||
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
|
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
|
||||||
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
|
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
|
||||||
this.msgTypeFunc = lookupMandtoryStrOpValueFunc(MSG_TYPE_OP_PARAM);
|
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Message createAndSetMessagePayload(
|
private Message createAndSetMessagePayload(
|
||||||
@ -305,7 +305,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
|
|||||||
logger.warn(
|
logger.warn(
|
||||||
"The specified JMS message type {} is not valid, use the default TextMessage type!",
|
"The specified JMS message type {} is not valid, use the default TextMessage type!",
|
||||||
jmsMsgType);
|
jmsMsgType);
|
||||||
jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label;
|
jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.BYTE.label;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
package io.nosqlbench.adapter.s4j.util;
|
package io.nosqlbench.adapter.s4j.util;
|
||||||
|
|
||||||
|
import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException;
|
||||||
import org.apache.commons.configuration2.Configuration;
|
import org.apache.commons.configuration2.Configuration;
|
||||||
import org.apache.commons.configuration2.FileBasedConfiguration;
|
import org.apache.commons.configuration2.FileBasedConfiguration;
|
||||||
import org.apache.commons.configuration2.PropertiesConfiguration;
|
import org.apache.commons.configuration2.PropertiesConfiguration;
|
||||||
@ -60,7 +61,8 @@ public class S4JClientConf {
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) {
|
public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName)
|
||||||
|
throws S4JAdapterUnexpectedException {
|
||||||
|
|
||||||
//////////////////
|
//////////////////
|
||||||
// Read related Pulsar client configuration settings from a file
|
// Read related Pulsar client configuration settings from a file
|
||||||
@ -156,12 +158,9 @@ public class S4JClientConf {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException | ConfigurationException ex) {
|
||||||
logger.error("Can't read the specified config properties file: " + fileName);
|
ex.printStackTrace();
|
||||||
ioe.printStackTrace();
|
throw new S4JAdapterUnexpectedException("Can't read the specified config properties file: " + fileName);
|
||||||
} catch (ConfigurationException cex) {
|
|
||||||
logger.error("Error loading configuration items from the specified config properties file: " + fileName + ":" + cex.getMessage());
|
|
||||||
cex.printStackTrace();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ blocks:
|
|||||||
msg-produce-block:
|
msg-produce-block:
|
||||||
ops:
|
ops:
|
||||||
op1:
|
op1:
|
||||||
## The value represents the destination (queue or topic) name)
|
## The value represents the destination (queue or topic) name
|
||||||
MessageProduce: "mys4jtest_t"
|
MessageProduce: "mys4jtest_t"
|
||||||
|
|
||||||
## (Optional) JMS headers (in JSON format).
|
## (Optional) JMS headers (in JSON format).
|
||||||
|
@ -50,7 +50,6 @@
|
|||||||
<version>${amqp.version}</version>
|
<version>${amqp.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>commons-beanutils</groupId>
|
<groupId>commons-beanutils</groupId>
|
||||||
<artifactId>commons-beanutils</artifactId>
|
<artifactId>commons-beanutils</artifactId>
|
||||||
@ -63,6 +62,12 @@
|
|||||||
<artifactId>commons-configuration2</artifactId>
|
<artifactId>commons-configuration2</artifactId>
|
||||||
<version>2.9.0</version>
|
<version>2.9.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>commons-io</groupId>
|
||||||
|
<artifactId>commons-io</artifactId>
|
||||||
|
<version>2.13.0</version>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
</project>
|
</project>
|
||||||
|
@ -27,14 +27,18 @@ import io.nosqlbench.api.config.standard.ConfigModel;
|
|||||||
import io.nosqlbench.api.config.standard.NBConfigModel;
|
import io.nosqlbench.api.config.standard.NBConfigModel;
|
||||||
import io.nosqlbench.api.config.standard.NBConfiguration;
|
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||||
import io.nosqlbench.api.config.standard.Param;
|
import io.nosqlbench.api.config.standard.Param;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.lang3.BooleanUtils;
|
import org.apache.commons.lang3.BooleanUtils;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.math.NumberUtils;
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.security.KeyManagementException;
|
||||||
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
@ -71,13 +75,16 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
// Maximum number of AMQP channels per connection
|
// Maximum number of AMQP channels per connection
|
||||||
private final int amqpConnChannelNum;
|
private final int amqpConnChannelNum;
|
||||||
|
|
||||||
|
// Maximum number of AMQP exchanges per channel
|
||||||
|
private final int amqpChannelExchangeNum;
|
||||||
|
|
||||||
// Max number of queues (per exchange)
|
// Max number of queues (per exchange)
|
||||||
// - only relevant with message receivers
|
// - only relevant with message receivers
|
||||||
private final int amqpExchangeQueueNum;
|
private final int amqpExchangeQueueNum;
|
||||||
|
|
||||||
// Max number of message clients (senders or receivers)
|
// Max number of message clients (senders or receivers)
|
||||||
// - for senders, this is the number of message senders per exchange
|
// - for senders, this is the number of message senders per exchange
|
||||||
// - for recievers, this is the number of message receivers per queue
|
// - for receivers, this is the number of message receivers per queue
|
||||||
// (there could be multiple queues per exchange)
|
// (there could be multiple queues per exchange)
|
||||||
private final int amqpMsgClntNum;
|
private final int amqpMsgClntNum;
|
||||||
|
|
||||||
@ -91,18 +98,9 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
|
|
||||||
private final ConcurrentHashMap<Long, Connection> amqpConnections = new ConcurrentHashMap<>();
|
private final ConcurrentHashMap<Long, Connection> amqpConnections = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
///////////////////////////////////
|
// Amqp connection/chanel/exchange combination for a sender
|
||||||
// NOTE: Do NOT mix sender and receiver workload in one NB workload
|
public record AmqpChannelKey(Long connId, Long channelId) { }
|
||||||
///////////////////////////////////
|
private final ConcurrentHashMap<AmqpChannelKey, Channel> amqpChannels = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
// Amqp Channels for senders
|
|
||||||
public record AmqpSenderChannelKey(Long connId, Long channelId, Long senderId) { }
|
|
||||||
private final ConcurrentHashMap<AmqpSenderChannelKey, Channel> amqpSenderChannels = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
// Amqp Channels for receivers
|
|
||||||
public record AmqpReceiverChannelKey(Long connId, Long channelId, Long queueId, Long consumerId) { }
|
|
||||||
private final ConcurrentHashMap<AmqpReceiverChannelKey, Channel> amqpReceiverChannels = new ConcurrentHashMap<>();
|
|
||||||
private final ConcurrentHashMap<AmqpReceiverChannelKey, Set<String>> amqpRecvChannelQueueSetMap = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
// Whether to do strict error handling while sending/receiving messages
|
// Whether to do strict error handling while sending/receiving messages
|
||||||
// - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing
|
// - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing
|
||||||
@ -129,6 +127,8 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
|
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
|
||||||
this.amqpConnChannelNum =
|
this.amqpConnChannelNum =
|
||||||
NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1"));
|
NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1"));
|
||||||
|
this.amqpChannelExchangeNum =
|
||||||
|
NumberUtils.toInt(cfg.getOptional("num_exchange").orElse("1"));
|
||||||
this.amqpExchangeQueueNum =
|
this.amqpExchangeQueueNum =
|
||||||
NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1"));
|
NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1"));
|
||||||
this.amqpMsgClntNum =
|
this.amqpMsgClntNum =
|
||||||
@ -155,6 +155,12 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
.setDescription("Maximum number of AMQP connections."))
|
.setDescription("Maximum number of AMQP connections."))
|
||||||
.add(Param.defaultTo("num_channel", 1)
|
.add(Param.defaultTo("num_channel", 1)
|
||||||
.setDescription("Maximum number of AMQP channels per connection"))
|
.setDescription("Maximum number of AMQP channels per connection"))
|
||||||
|
.add(Param.defaultTo("num_exchange", 1)
|
||||||
|
.setDescription("Maximum number of AMQP exchanges per channel."))
|
||||||
|
.add(Param.defaultTo("num_queue", 1)
|
||||||
|
.setDescription("Max number of queues per exchange (only relevant for receivers)."))
|
||||||
|
.add(Param.defaultTo("num_msg_clnt", 1)
|
||||||
|
.setDescription("Max number of message clients per exchange (sender) or per queue (receiver)."))
|
||||||
.add(Param.defaultTo("max_op_time", 0)
|
.add(Param.defaultTo("max_op_time", 0)
|
||||||
.setDescription("Maximum time (in seconds) to run NB Kafka testing scenario."))
|
.setDescription("Maximum time (in seconds) to run NB Kafka testing scenario."))
|
||||||
.add(Param.defaultTo("strict_msg_error_handling", false)
|
.add(Param.defaultTo("strict_msg_error_handling", false)
|
||||||
@ -164,16 +170,10 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
|
|
||||||
public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); }
|
public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); }
|
||||||
|
|
||||||
public Channel getAmqpSenderChannel(
|
public Channel getAmqpChannels(
|
||||||
AmqpSenderChannelKey key,
|
AmqpChannelKey key,
|
||||||
Supplier<Channel> channelSupplier) {
|
Supplier<Channel> channelSupplier) {
|
||||||
return amqpSenderChannels.computeIfAbsent(key, __ -> channelSupplier.get());
|
return amqpChannels.computeIfAbsent(key, __ -> channelSupplier.get());
|
||||||
}
|
|
||||||
|
|
||||||
public Channel getAmqpReceiverChannel(
|
|
||||||
AmqpReceiverChannelKey key,
|
|
||||||
Supplier<Channel> channelSupplier) {
|
|
||||||
return amqpReceiverChannels.computeIfAbsent(key, __ -> channelSupplier.get());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
|
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
|
||||||
@ -183,7 +183,8 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
public String getAmqpExchangeType() { return amqpExchangeType; }
|
public String getAmqpExchangeType() { return amqpExchangeType; }
|
||||||
public int getAmqpConnNum() { return this.amqpConnNum; }
|
public int getAmqpConnNum() { return this.amqpConnNum; }
|
||||||
public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; }
|
public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; }
|
||||||
public int getAmqpExchangeQueueNum() { return this.amqpConnNum; }
|
public int getAmqpChannelExchangeNum() { return this.amqpChannelExchangeNum; }
|
||||||
|
public int getAmqpExchangeQueueNum() { return this.amqpExchangeQueueNum; }
|
||||||
public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; }
|
public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; }
|
||||||
|
|
||||||
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
|
public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; }
|
||||||
@ -218,19 +219,50 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
try {
|
try {
|
||||||
s4rConnFactory = new ConnectionFactory();
|
s4rConnFactory = new ConnectionFactory();
|
||||||
|
|
||||||
String passWord = cfg.get("jwtToken");
|
String amqpServerHost = cfgMap.get("amqpSrvHost");
|
||||||
s4rConnFactory.setPassword(cfgMap.get(""));
|
if (StringUtils.isBlank(amqpServerHost)) {
|
||||||
s4rConnFactory.setPassword(passWord);
|
String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!";
|
||||||
|
throw new S4RAdapterInvalidParamException(errMsg);
|
||||||
String amqpServerHost = cfg.get("amqpSrvHost");
|
}
|
||||||
s4rConnFactory.setHost(amqpServerHost);
|
s4rConnFactory.setHost(amqpServerHost);
|
||||||
|
|
||||||
int amqpServerPort = Integer.parseInt(cfg.get("amqpSrvPort"));
|
String amqpSrvPortCfg = cfgMap.get("amqpSrvPort");
|
||||||
s4rConnFactory.setPort(amqpServerPort);
|
if (StringUtils.isBlank(amqpSrvPortCfg)) {
|
||||||
|
String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!";
|
||||||
|
throw new S4RAdapterInvalidParamException(errMsg);
|
||||||
|
}
|
||||||
|
s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg));
|
||||||
|
|
||||||
String amqpVirtualHost = cfg.get("virtualHost");
|
String amqpVirtualHost = cfgMap.get("virtualHost");
|
||||||
|
if (StringUtils.isBlank(amqpVirtualHost)) {
|
||||||
|
String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!";
|
||||||
|
throw new S4RAdapterInvalidParamException(errMsg);
|
||||||
|
}
|
||||||
s4rConnFactory.setVirtualHost(amqpVirtualHost);
|
s4rConnFactory.setVirtualHost(amqpVirtualHost);
|
||||||
|
|
||||||
|
String userNameCfg = cfgMap.get("amqpUser");
|
||||||
|
|
||||||
|
String passWordCfg = cfgMap.get("amqpPassword");
|
||||||
|
if (StringUtils.isNotBlank(passWordCfg)) {
|
||||||
|
String passWord = passWordCfg;
|
||||||
|
if (StringUtils.startsWith(passWordCfg, "file://")
|
||||||
|
&& StringUtils.length(passWordCfg) > 7) {
|
||||||
|
String jwtTokenFile = StringUtils.substring(passWordCfg, 7);
|
||||||
|
passWord = FileUtils.readFileToString(new File(jwtTokenFile), "UTF-8");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (StringUtils.isNotBlank(passWord)) {
|
||||||
|
if (StringUtils.isBlank(userNameCfg)) {
|
||||||
|
s4rConnFactory.setUsername("");
|
||||||
|
}
|
||||||
|
s4rConnFactory.setPassword(passWord);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String useTlsCfg = cfgMap.get("useTls");
|
||||||
|
if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) {
|
||||||
|
s4rConnFactory.useSslProtocol();
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < getAmqpConnNum(); i++) {
|
for (int i = 0; i < getAmqpConnNum(); i++) {
|
||||||
Connection connection = s4rConnFactory.newConnection();
|
Connection connection = s4rConnFactory.newConnection();
|
||||||
@ -243,7 +275,7 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
connection);
|
connection);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (IOException|TimeoutException ex) {
|
} catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) {
|
||||||
logger.error("Unable to establish AMQP connections with the following configuration parameters: {}",
|
logger.error("Unable to establish AMQP connections with the following configuration parameters: {}",
|
||||||
s4rClientConnInfo.toString());
|
s4rClientConnInfo.toString());
|
||||||
throw new S4RAdapterUnexpectedException(ex);
|
throw new S4RAdapterUnexpectedException(ex);
|
||||||
@ -255,11 +287,7 @@ public class S4RSpace implements AutoCloseable {
|
|||||||
try {
|
try {
|
||||||
beingShutdown.set(true);
|
beingShutdown.set(true);
|
||||||
|
|
||||||
for (Channel channel : amqpSenderChannels.values()) {
|
for (Channel channel : amqpChannels.values()) {
|
||||||
channel.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (Channel channel : amqpReceiverChannels.values()) {
|
|
||||||
channel.close();
|
channel.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,9 +16,7 @@
|
|||||||
|
|
||||||
package io.nosqlbench.adapter.s4r.dispensers;
|
package io.nosqlbench.adapter.s4r.dispensers;
|
||||||
|
|
||||||
import com.rabbitmq.client.AMQP;
|
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import com.rabbitmq.client.Connection;
|
|
||||||
import io.nosqlbench.adapter.s4r.S4RSpace;
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
|
||||||
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
||||||
@ -26,7 +24,6 @@ import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
|
|||||||
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.commons.lang3.math.NumberUtils;
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
@ -46,8 +43,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
|
|||||||
|
|
||||||
protected final Map<String, String> s4rConfMap = new HashMap<>();
|
protected final Map<String, String> s4rConfMap = new HashMap<>();
|
||||||
protected final String exchangeType;
|
protected final String exchangeType;
|
||||||
protected final LongFunction<String> exchangeNameFunc;
|
|
||||||
|
|
||||||
protected AmqpBaseOpDispenser(final DriverAdapter adapter,
|
protected AmqpBaseOpDispenser(final DriverAdapter adapter,
|
||||||
final ParsedOp op,
|
final ParsedOp op,
|
||||||
final S4RSpace s4RSpace) {
|
final S4RSpace s4RSpace) {
|
||||||
@ -63,7 +58,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
|
|||||||
s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap());
|
s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap());
|
||||||
|
|
||||||
this.exchangeType = s4RSpace.getAmqpExchangeType();
|
this.exchangeType = s4RSpace.getAmqpExchangeType();
|
||||||
this.exchangeNameFunc = lookupMandtoryStrOpValueFunc("exchange_name");
|
|
||||||
|
|
||||||
s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class)));
|
s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class)));
|
||||||
s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class)));
|
s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class)));
|
||||||
@ -86,32 +80,19 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
|
|||||||
return stringLongFunction;
|
return stringLongFunction;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Channel getChannelWithExchange(Connection amqpConnection,
|
protected void declareExchange(Channel channel, String exchangeName, String exchangeType) {
|
||||||
long connSeqNum,
|
try {
|
||||||
long channelSeqNum,
|
// Declaring the same exchange multiple times on one channel is considered as a no-op
|
||||||
String exchangeName)
|
channel.exchangeDeclare(exchangeName, exchangeType);
|
||||||
throws IOException {
|
if (logger.isTraceEnabled()) {
|
||||||
Channel channel = amqpConnection.createChannel();
|
logger.debug("Declared the AMQP exchange \"{}\" on channel \"{}\".",
|
||||||
if (channel == null) {
|
exchangeName, channel);
|
||||||
throw new S4RAdapterUnexpectedException("No AMQP channel is available!");
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
String errMsg = String.format("Failed to declare the AMQP exchange \"%s\" on channel \"%s\"!",
|
||||||
|
exchangeName, channel);
|
||||||
|
throw new S4RAdapterUnexpectedException(errMsg);
|
||||||
}
|
}
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("AMQP channel created -- {} [{},{}] ",
|
|
||||||
channel,
|
|
||||||
connSeqNum,
|
|
||||||
channelSeqNum);
|
|
||||||
}
|
|
||||||
|
|
||||||
AMQP.Exchange.DeclareOk declareOk =
|
|
||||||
channel.exchangeDeclare(exchangeName, s4rSpace.getAmqpExchangeType());
|
|
||||||
if (logger.isDebugEnabled()) {
|
|
||||||
logger.debug("AMQP exchange declared -- [name: {}, type: {}] {}",
|
|
||||||
exchangeName,
|
|
||||||
exchangeType,
|
|
||||||
declareOk);
|
|
||||||
}
|
|
||||||
|
|
||||||
return channel;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected long getConnSeqNum(long cycle) {
|
protected long getConnSeqNum(long cycle) {
|
||||||
@ -122,9 +103,24 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
|
|||||||
return (cycle / s4rSpace.getAmqpConnNum()) % s4rSpace.getAmqpConnChannelNum();
|
return (cycle / s4rSpace.getAmqpConnNum()) % s4rSpace.getAmqpConnChannelNum();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getEffectiveExchangeName(long cycle) {
|
protected long getChannelExchangeSeqNum(long cycle) {
|
||||||
String exchangeNameInput = exchangeNameFunc.apply(cycle);
|
return (cycle / ((long) s4rSpace.getAmqpConnNum() *
|
||||||
return (StringUtils.isBlank(exchangeNameInput) ? "exchange-" + getConnChannelSeqNum(cycle) : exchangeNameInput);
|
s4rSpace.getAmqpConnChannelNum())
|
||||||
|
) % s4rSpace.getAmqpChannelExchangeNum();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getEffectiveExchangeNameByCycle(long cycle) {
|
||||||
|
return getEffectiveExchangeName(
|
||||||
|
getConnSeqNum(cycle),
|
||||||
|
getConnChannelSeqNum(cycle),
|
||||||
|
getChannelExchangeSeqNum(cycle));
|
||||||
|
}
|
||||||
|
protected String getEffectiveExchangeName(long connSeqNum, long channelSeqNum, long exchangeSeqNum) {
|
||||||
|
return String.format(
|
||||||
|
"exchange-%d-%d-%d",
|
||||||
|
connSeqNum,
|
||||||
|
channelSeqNum,
|
||||||
|
exchangeSeqNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getName() {
|
public String getName() {
|
||||||
|
@ -16,7 +16,6 @@
|
|||||||
|
|
||||||
package io.nosqlbench.adapter.s4r.dispensers;
|
package io.nosqlbench.adapter.s4r.dispensers;
|
||||||
|
|
||||||
import com.rabbitmq.client.AMQP;
|
|
||||||
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Channel;
|
||||||
import com.rabbitmq.client.Connection;
|
import com.rabbitmq.client.Connection;
|
||||||
import io.nosqlbench.adapter.s4r.S4RSpace;
|
import io.nosqlbench.adapter.s4r.S4RSpace;
|
||||||
@ -25,12 +24,10 @@ import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgRecvOp;
|
|||||||
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
|
||||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.function.LongFunction;
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
|
public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
|
||||||
@ -38,82 +35,147 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
|
|||||||
private final static Logger logger = LogManager.getLogger("AmqpMsgRecvOpDispenser");
|
private final static Logger logger = LogManager.getLogger("AmqpMsgRecvOpDispenser");
|
||||||
|
|
||||||
private final LongFunction<String> bindingKeyFunc;
|
private final LongFunction<String> bindingKeyFunc;
|
||||||
private final LongFunction<String> queueNameFunc;
|
|
||||||
public AmqpMsgRecvOpDispenser(DriverAdapter adapter,
|
public AmqpMsgRecvOpDispenser(DriverAdapter adapter,
|
||||||
ParsedOp op,
|
ParsedOp op,
|
||||||
S4RSpace s4rSpace) {
|
S4RSpace s4rSpace) {
|
||||||
super(adapter, op, s4rSpace);
|
super(adapter, op, s4rSpace);
|
||||||
|
|
||||||
queueNameFunc = lookupOptionalStrOpValueFunc("queue_name", null);
|
|
||||||
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
|
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getExchangeQueueSeqNum(long cycle) {
|
private long getExchangeQueueSeqNum(long cycle) {
|
||||||
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum()))
|
return (cycle / ((long) s4rSpace.getAmqpConnNum() *
|
||||||
% s4rSpace.getAmqpExchangeQueueNum();
|
s4rSpace.getAmqpConnChannelNum() *
|
||||||
|
s4rSpace.getAmqpChannelExchangeNum())
|
||||||
|
) % s4rSpace.getAmqpExchangeQueueNum();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getQueueReceiverSeqNum(long cycle) {
|
private long getQueueReceiverSeqNum(long cycle) {
|
||||||
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum() * s4rSpace.getAmqpExchangeQueueNum()))
|
return (cycle / ((long) s4rSpace.getAmqpConnNum() *
|
||||||
% s4rSpace.getAmqpMsgClntNum();
|
s4rSpace.getAmqpConnChannelNum() *
|
||||||
|
s4rSpace.getAmqpChannelExchangeNum() *
|
||||||
|
s4rSpace.getAmqpExchangeQueueNum())
|
||||||
|
) % s4rSpace.getAmqpMsgClntNum();
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getEffectiveQueueName(long cycle) {
|
private String getEffectiveQueueNameByCycle(long cycle) {
|
||||||
String queueNameInput = queueNameFunc.apply(cycle);
|
return getEffectiveQueueName(
|
||||||
return (StringUtils.isBlank(queueNameInput) ? "queue-" + getExchangeQueueSeqNum(cycle) : queueNameInput);
|
getConnSeqNum(cycle),
|
||||||
|
getConnChannelSeqNum(cycle),
|
||||||
|
getChannelExchangeSeqNum(cycle),
|
||||||
|
getExchangeQueueSeqNum(cycle));
|
||||||
|
}
|
||||||
|
private String getEffectiveQueueName(long connSeqNum, long channelSeqNum, long exchangeSeqNum, long queueSeqNum) {
|
||||||
|
return "queue-" + connSeqNum + "-" + channelSeqNum + "-" + exchangeSeqNum + "-" + queueSeqNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Channel getAmqpChannelQueueForReceiver(long cycle,
|
private String getEffectiveReceiverName(long cycle) {
|
||||||
String exchangeName,
|
return getEffectiveReceiverName(
|
||||||
String queueName) {
|
getConnSeqNum(cycle),
|
||||||
|
getConnChannelSeqNum(cycle),
|
||||||
|
getChannelExchangeSeqNum(cycle),
|
||||||
|
getExchangeQueueSeqNum(cycle),
|
||||||
|
getQueueReceiverSeqNum(cycle));
|
||||||
|
}
|
||||||
|
private String getEffectiveReceiverName(long connSeqNum,
|
||||||
|
long channelSeqNum,
|
||||||
|
long exchangeSeqNum,
|
||||||
|
long queueSeqNum,
|
||||||
|
long receiverSeqNum) {
|
||||||
|
return String.format(
|
||||||
|
"receiver-%d-%d-%d-%d-%d",
|
||||||
|
connSeqNum,
|
||||||
|
channelSeqNum,
|
||||||
|
exchangeSeqNum,
|
||||||
|
queueSeqNum,
|
||||||
|
receiverSeqNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Channel getAmqpChannelForReceiver(long cycle) {
|
||||||
long connSeqNum = getConnSeqNum(cycle);
|
long connSeqNum = getConnSeqNum(cycle);
|
||||||
long channelSeqNum = getConnChannelSeqNum(cycle);
|
long channelSeqNum = getConnChannelSeqNum(cycle);
|
||||||
long queueSeqNum = getExchangeQueueSeqNum(cycle);
|
|
||||||
long receiverSeqNum = getQueueReceiverSeqNum(cycle);
|
|
||||||
|
|
||||||
Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum);
|
Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
|
||||||
|
S4RSpace.AmqpChannelKey amqpConnChannelKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
|
||||||
|
|
||||||
S4RSpace.AmqpReceiverChannelKey amqpConnChannelKey =
|
return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> {
|
||||||
new S4RSpace.AmqpReceiverChannelKey(connSeqNum, channelSeqNum, queueSeqNum, receiverSeqNum);
|
|
||||||
|
|
||||||
return s4rSpace.getAmqpReceiverChannel(amqpConnChannelKey, () -> {
|
|
||||||
Channel channel = null;
|
Channel channel = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
channel = getChannelWithExchange(
|
channel = amqpConnection.createChannel();
|
||||||
amqpConnection,
|
|
||||||
connSeqNum,
|
|
||||||
channelSeqNum,
|
|
||||||
exchangeName);
|
|
||||||
|
|
||||||
AMQP.Queue.DeclareOk declareOk =
|
|
||||||
channel.queueDeclare(queueName, true, true, true, null);
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("AMQP queue declared -- [exchange name: {}, queue name: {}] {}",
|
logger.debug("Created channel for amqp connection: {}, channel: {}",
|
||||||
exchangeName,
|
amqpConnection, channel);
|
||||||
queueName,
|
}
|
||||||
declareOk);
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
// Do not throw exception here, just log it and return null
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex);
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
|
||||||
throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return channel;
|
return channel;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public S4RTimeTrackOp apply(long cycle) {
|
public S4RTimeTrackOp apply(long cycle) {
|
||||||
Channel channel = null;
|
Channel channel = getAmqpChannelForReceiver(cycle);
|
||||||
|
if (channel == null) {
|
||||||
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
String.format(
|
||||||
|
"Failed to get AMQP channel for receiver %s [%d]!",
|
||||||
|
getEffectiveReceiverName(cycle),
|
||||||
|
cycle));
|
||||||
|
}
|
||||||
|
|
||||||
String exchangeName = getEffectiveExchangeName(cycle);
|
String exchangeName = getEffectiveExchangeNameByCycle(cycle);
|
||||||
String queueName = getEffectiveQueueName(cycle);
|
declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType());
|
||||||
|
|
||||||
|
boolean durable = true;
|
||||||
|
boolean exclusive = true;
|
||||||
|
boolean autoDelete = false;
|
||||||
|
String queueName = getEffectiveQueueNameByCycle(cycle);
|
||||||
|
String bindingKey = bindingKeyFunc.apply(cycle);
|
||||||
|
try {
|
||||||
|
channel.queueDeclare(queueName, durable, exclusive, autoDelete, null);
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.debug("AMQP queue is declared - \"{} ({}/{}/{})\" on exchange \"{}\" for a receiver!",
|
||||||
|
queueName,
|
||||||
|
durable,
|
||||||
|
exclusive,
|
||||||
|
autoDelete,
|
||||||
|
exchangeName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
String.format(
|
||||||
|
"Unable to declare the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" for a receiver!",
|
||||||
|
queueName, durable, exclusive, autoDelete, exchangeName)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
channel = getAmqpChannelQueueForReceiver(cycle, exchangeName, queueName);
|
// Binding the same queue multiple times on one exchange is considered as a no-op
|
||||||
|
channel.queueBind(queueName, exchangeName, bindingKey);
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.debug("AMQP queue is bound - \"{} ({}/{}/{})\" on exchange \"{}\" with binding key \"{}\"!",
|
||||||
|
queueName,
|
||||||
|
durable,
|
||||||
|
exclusive,
|
||||||
|
autoDelete,
|
||||||
|
exchangeName,
|
||||||
|
bindingKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (IOException ex) {
|
||||||
throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel!");
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
String.format(
|
||||||
|
"Unable to bind the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" with binding key \"%s\"!",
|
||||||
|
queueName, durable, exclusive, autoDelete, exchangeName, bindingKey)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
return new OpTimeTrackAmqpMsgRecvOp(
|
return new OpTimeTrackAmqpMsgRecvOp(
|
||||||
@ -121,7 +183,6 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
|
|||||||
s4rSpace,
|
s4rSpace,
|
||||||
channel,
|
channel,
|
||||||
exchangeName,
|
exchangeName,
|
||||||
queueName,
|
queueName);
|
||||||
bindingKeyFunc.apply(cycle));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,7 +68,8 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
|
|||||||
.orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label);
|
.orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label);
|
||||||
if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) {
|
if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) {
|
||||||
throw new S4RAdapterInvalidParamException("confirm_mode",
|
throw new S4RAdapterInvalidParamException("confirm_mode",
|
||||||
"Must be one following valid values: '" + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
|
"The provided value \"" + confirmMode + "\" is not one of following valid values: '" +
|
||||||
|
S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
|
||||||
}
|
}
|
||||||
|
|
||||||
confirmBatchNum = parsedOp
|
confirmBatchNum = parsedOp
|
||||||
@ -86,62 +87,87 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private long getExchangeSenderSeqNum(long cycle) {
|
private long getExchangeSenderSeqNum(long cycle) {
|
||||||
return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum()))
|
return (cycle / ((long) s4rSpace.getAmqpConnNum() *
|
||||||
% s4rSpace.getAmqpMsgClntNum();
|
s4rSpace.getAmqpConnChannelNum() *
|
||||||
|
s4rSpace.getAmqpChannelExchangeNum())
|
||||||
|
) % s4rSpace.getAmqpMsgClntNum();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Channel getAmqpChannelForSender(long cycle,
|
private String getEffectiveSenderNameByCycle(long cycle) {
|
||||||
String exchangeName) {
|
return getEffectiveSenderNameByCycle(
|
||||||
|
getConnSeqNum(cycle),
|
||||||
|
getConnChannelSeqNum(cycle),
|
||||||
|
getChannelExchangeSeqNum(cycle),
|
||||||
|
getExchangeSenderSeqNum(cycle));
|
||||||
|
}
|
||||||
|
private String getEffectiveSenderNameByCycle(long connSeqNum,
|
||||||
|
long channelSeqNum,
|
||||||
|
long exchangeSeqNum,
|
||||||
|
long senderSeqNum) {
|
||||||
|
return String.format(
|
||||||
|
"sender-%d-%d-%d-%d",
|
||||||
|
connSeqNum,
|
||||||
|
channelSeqNum,
|
||||||
|
exchangeSeqNum,
|
||||||
|
senderSeqNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Channel getAmqpChannelForSender(long cycle) {
|
||||||
long connSeqNum = getConnSeqNum(cycle);
|
long connSeqNum = getConnSeqNum(cycle);
|
||||||
long channelSeqNum = getConnChannelSeqNum(cycle);
|
long channelSeqNum = getConnChannelSeqNum(cycle);
|
||||||
long senderSeqNum = getExchangeSenderSeqNum(cycle);
|
|
||||||
|
|
||||||
Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum);
|
Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
|
||||||
|
S4RSpace.AmqpChannelKey senderKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
|
||||||
|
|
||||||
S4RSpace.AmqpSenderChannelKey amqpConnChannelKey =
|
return s4rSpace.getAmqpChannels(senderKey, () -> {
|
||||||
new S4RSpace.AmqpSenderChannelKey(connSeqNum, channelSeqNum, senderSeqNum);
|
Channel channel = null;
|
||||||
|
|
||||||
return s4rSpace.getAmqpSenderChannel(amqpConnChannelKey, () -> {
|
|
||||||
Channel channel;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
channel = getChannelWithExchange(
|
channel = amqpConnection.createChannel();
|
||||||
amqpConnection,
|
if (logger.isDebugEnabled()) {
|
||||||
connSeqNum,
|
logger.debug("Created channel for amqp connection: {}, channel: {}",
|
||||||
channelSeqNum,
|
amqpConnection, channel);
|
||||||
exchangeName);
|
}
|
||||||
|
}
|
||||||
|
catch (IOException ex) {
|
||||||
|
// Do not throw exception here, just log it and return null
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (publisherConfirm) {
|
try {
|
||||||
|
if ((channel != null) && publisherConfirm) {
|
||||||
channel.confirmSelect();
|
channel.confirmSelect();
|
||||||
|
|
||||||
boolean asyncConfirm = false;
|
|
||||||
if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) {
|
if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) {
|
||||||
asyncConfirm = true;
|
|
||||||
|
|
||||||
channel.addConfirmListener((sequenceNumber, multiple) -> {
|
channel.addConfirmListener((sequenceNumber, multiple) -> {
|
||||||
// code when message is confirmed
|
// code when message is confirmed
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.debug("Async ack of message publish received: {}, {}",
|
logger.debug("Async ack received for a published message: {}, {}",
|
||||||
sequenceNumber, multiple);
|
sequenceNumber, multiple);
|
||||||
}
|
}
|
||||||
}, (sequenceNumber, multiple) -> {
|
}, (sequenceNumber, multiple) -> {
|
||||||
// code when message is nack-ed
|
// code when message is nack-ed
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.debug("Async n-ack of message publish received: {}, {}",
|
logger.debug("Async n-ack received of a published message: {}, {}",
|
||||||
sequenceNumber, multiple);
|
sequenceNumber, multiple);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.debug("Publisher Confirms enabled on AMQP channel (sync: {}) -- {}",
|
logger.debug("Publisher Confirms is enabled on AMQP channel: {}({}), {}",
|
||||||
!asyncConfirm,
|
confirmMode,
|
||||||
|
confirmBatchNum,
|
||||||
channel);
|
channel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!");
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
"Failed to enable publisher acknowledgement on the AMQP channel (" +
|
||||||
|
channel + ")!");
|
||||||
}
|
}
|
||||||
|
|
||||||
return channel;
|
return channel;
|
||||||
@ -155,15 +181,17 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
|
|||||||
throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!");
|
throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!");
|
||||||
}
|
}
|
||||||
|
|
||||||
Channel channel;
|
Channel channel = getAmqpChannelForSender(cycle);
|
||||||
String exchangeName = getEffectiveExchangeName(cycle);
|
if (channel == null) {
|
||||||
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
String.format(
|
||||||
|
"Failed to get AMQP channel for sender %s [%d]!",
|
||||||
|
getEffectiveSenderNameByCycle(cycle),
|
||||||
|
cycle));
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
String exchangeName = getEffectiveExchangeNameByCycle(cycle);
|
||||||
channel = getAmqpChannelForSender(cycle, exchangeName);
|
declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType());
|
||||||
}
|
|
||||||
catch (Exception ex) {
|
|
||||||
throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel for sending messages!");
|
|
||||||
}
|
|
||||||
|
|
||||||
return new OpTimeTrackAmqpMsgSendOp(
|
return new OpTimeTrackAmqpMsgSendOp(
|
||||||
s4rAdapterMetrics,
|
s4rAdapterMetrics,
|
||||||
|
@ -31,27 +31,16 @@ import java.nio.charset.StandardCharsets;
|
|||||||
public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
|
public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
|
||||||
|
|
||||||
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp");
|
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp");
|
||||||
|
|
||||||
private final String queueName;
|
private final String queueName;
|
||||||
private final String bindingKey;
|
|
||||||
|
|
||||||
public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics,
|
public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics,
|
||||||
S4RSpace s4rSpace,
|
S4RSpace s4rSpace,
|
||||||
Channel channel,
|
Channel channel,
|
||||||
String exchangeName,
|
String exchangeName,
|
||||||
String queueName,
|
String queueName) {
|
||||||
String bindingKey) {
|
|
||||||
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName);
|
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName);
|
||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
this.bindingKey = bindingKey;
|
|
||||||
|
|
||||||
try {
|
|
||||||
channel.queueBind(queueName, exchangeName, bindingKey);
|
|
||||||
}
|
|
||||||
catch (IOException ex) {
|
|
||||||
throw new S4RAdapterUnexpectedException("Unable to bind queue (\"" + queueName + "\") to " +
|
|
||||||
"exchange (\"" + exchangeName + "\")!");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -59,20 +48,26 @@ public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
|
|||||||
try {
|
try {
|
||||||
Consumer receiver = new DefaultConsumer(channel) {
|
Consumer receiver = new DefaultConsumer(channel) {
|
||||||
@Override
|
@Override
|
||||||
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
|
public void handleDelivery(
|
||||||
byte[] body) throws IOException {
|
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
|
||||||
|
{
|
||||||
|
String routingKey = envelope.getRoutingKey();
|
||||||
|
String contentType = properties.getContentType();
|
||||||
|
String msgPayload = new String(body, StandardCharsets.UTF_8);
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
String msgPayload = new String(body, StandardCharsets.UTF_8);
|
logger.trace(
|
||||||
logger.trace("Successfully received message ({}) via consumer ({}) in the current channel: {}",
|
"Successfully received message ({}) via consumer ({}/{}/{}) in the current channel: {}",
|
||||||
msgPayload,
|
msgPayload,
|
||||||
consumerTag,
|
consumerTag,
|
||||||
|
routingKey,
|
||||||
|
contentType,
|
||||||
channel);
|
channel);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
channel.basicConsume(queueName, receiver);
|
channel.basicConsume(queueName, true, receiver);
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
throw new S4RAdapterUnexpectedException(
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
@ -28,6 +28,7 @@ import org.apache.logging.log4j.Logger;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
|
||||||
@ -40,8 +41,8 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
|
|||||||
private final String confirmMode;
|
private final String confirmMode;
|
||||||
private final int confirmBatchNum;
|
private final int confirmBatchNum;
|
||||||
|
|
||||||
private static final ThreadLocal<Integer>
|
private static final ConcurrentHashMap<Channel, Integer>
|
||||||
publishConfirmBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
|
channelPublishConfirmBathTracking = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics,
|
public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics,
|
||||||
S4RSpace s4rSpace,
|
S4RSpace s4rSpace,
|
||||||
@ -73,34 +74,40 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
|
|||||||
routingKey,
|
routingKey,
|
||||||
null,
|
null,
|
||||||
msgPayload.getBytes(StandardCharsets.UTF_8));
|
msgPayload.getBytes(StandardCharsets.UTF_8));
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.trace("Successfully published message (({}) {}) via the current channel: {}",
|
||||||
|
cycle, msgPayload, channel);
|
||||||
|
}
|
||||||
|
|
||||||
if (publishConfirm) {
|
if (publishConfirm) {
|
||||||
// Individual publish confirm
|
// Individual publish confirm
|
||||||
if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) {
|
if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) {
|
||||||
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
|
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
|
||||||
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.debug("Sync ack received for an individual published message: {}", cycle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Batch publish confirm
|
// Batch publish confirm
|
||||||
else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) {
|
else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) {
|
||||||
int publishConfirmTrackingCnt = publishConfirmBatchTrackingCnt.get();
|
int publishConfirmTrackingCnt =
|
||||||
|
channelPublishConfirmBathTracking.getOrDefault(channel, 0);
|
||||||
|
|
||||||
if ( (publishConfirmTrackingCnt > 0) &&
|
if ( (publishConfirmTrackingCnt > 0) &&
|
||||||
( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) ||
|
( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) ||
|
||||||
(publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) {
|
(publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) {
|
||||||
synchronized (this) {
|
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
|
||||||
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
|
if (logger.isTraceEnabled()) {
|
||||||
|
logger.debug("Sync ack received for a batch of published message: {}, {}",
|
||||||
|
cycle, publishConfirmTrackingCnt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
publishConfirmBatchTrackingCnt.set(publishConfirmTrackingCnt+1);
|
channelPublishConfirmBathTracking.put(channel, publishConfirmTrackingCnt+1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Async publish confirm
|
// Async publish confirm
|
||||||
// - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser'
|
// - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser'
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logger.isTraceEnabled()) {
|
|
||||||
logger.trace("Successfully published message ({}) via the current channel: {}",
|
|
||||||
msgPayload, channel);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
catch (IllegalStateException ex) {
|
catch (IllegalStateException ex) {
|
||||||
throw new S4RAdapterUnexpectedException(
|
throw new S4RAdapterUnexpectedException(
|
||||||
|
@ -16,13 +16,10 @@
|
|||||||
|
|
||||||
package io.nosqlbench.adapter.s4r.util;
|
package io.nosqlbench.adapter.s4r.util;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
@ -30,18 +27,6 @@ import java.util.stream.Stream;
|
|||||||
public class S4RAdapterUtil {
|
public class S4RAdapterUtil {
|
||||||
private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class);
|
private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class);
|
||||||
|
|
||||||
///////
|
|
||||||
// Valid document level parameters for JMS NB yaml file
|
|
||||||
public enum DOC_LEVEL_PARAMS {
|
|
||||||
// Blocking message producing or consuming
|
|
||||||
ASYNC_API("async_api");
|
|
||||||
public final String label;
|
|
||||||
|
|
||||||
DOC_LEVEL_PARAMS(final String label) {
|
|
||||||
this.label = label;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public enum AMQP_EXCHANGE_TYPES {
|
public enum AMQP_EXCHANGE_TYPES {
|
||||||
DIRECT("direct"),
|
DIRECT("direct"),
|
||||||
FANOUT("fanout"),
|
FANOUT("fanout"),
|
||||||
@ -83,19 +68,14 @@ public class S4RAdapterUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
public static String getValidAmqpPublisherConfirmModeList() {
|
public static String getValidAmqpPublisherConfirmModeList() {
|
||||||
return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", ");
|
return StringUtils.join(AMQP_PUB_CONFIRM_MODE.LABELS, ", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
// At least 20 messages in a publishing batch
|
// At least 20 messages in a publishing batch
|
||||||
public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 20;
|
public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 10;
|
||||||
public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100;
|
public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100;
|
||||||
public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000;
|
public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000;
|
||||||
|
|
||||||
public static Map<String, String> convertJsonToMap(final String jsonStr) throws Exception {
|
|
||||||
final ObjectMapper mapper = new ObjectMapper();
|
|
||||||
return mapper.readValue(jsonStr, new TypeReference<Map<String, String>>(){});
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void pauseCurThreadExec(final int pauseInSec) {
|
public static void pauseCurThreadExec(final int pauseInSec) {
|
||||||
if (0 < pauseInSec) try {
|
if (0 < pauseInSec) try {
|
||||||
Thread.sleep(pauseInSec * 1000L);
|
Thread.sleep(pauseInSec * 1000L);
|
||||||
|
@ -1,62 +1,92 @@
|
|||||||
# Overview
|
- [1. Overview](#1-overview)
|
||||||
|
- [2. NB S4R Usage](#2-nb-s4r-usage)
|
||||||
|
- [2.1. CLI Examples](#21-cli-examples)
|
||||||
|
- [2.2. CLI parameters](#22-cli-parameters)
|
||||||
|
- [2.3. Workload Definition](#23-workload-definition)
|
||||||
|
- [2.4. Configuration Properties](#24-configuration-properties)
|
||||||
|
- [2.4.1. Global Properties File](#241-global-properties-file)
|
||||||
|
- [2.4.2. Scenario Document Level Properties](#242-scenario-document-level-properties)
|
||||||
|
|
||||||
This NB Kafka adapter allows publishing messages to or consuming messages from
|
---
|
||||||
* a Kafka cluster, or
|
|
||||||
* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar.
|
|
||||||
|
|
||||||
At high level, this adapter supports the following Kafka functionalities
|
# 1. Overview
|
||||||
* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers)
|
|
||||||
* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits)
|
|
||||||
* auto message commit
|
|
||||||
* manual message commit with a configurable number of message commits in one batch
|
|
||||||
* Kafka Transaction support
|
|
||||||
|
|
||||||
## Example NB Yaml
|
This NB S4R adapter allows sending messages to or receiving messages from
|
||||||
* [kafka_producer.yaml](./s4r_producer.yaml)
|
* an AMQP 0-9-1 based server (e.g. RabbitMQ), or
|
||||||
*
|
* a Pulsar cluster with [S4R](https://github.com/datastax/starlight-for-rabbitmq) AMQP (0-9-1) Protocol handler for Pulsar.
|
||||||
* [kafka_consumer.yaml](./s4r_consumer.yaml)
|
|
||||||
|
|
||||||
# Usage
|
At high level, this adapter supports the following AMQP 0-9-1 functionalities
|
||||||
|
* Creating AMQP connections and channels
|
||||||
|
* Declaring AMQP exchanges
|
||||||
|
* The following exchange types are supported: `direct`, `fanout`, `topic`, and `headers`
|
||||||
|
* Sending messages to AMQP exchanges with sync. or async. publisher confirms
|
||||||
|
* For sync confirms, it supports both single and batch confirms
|
||||||
|
* Supports message-send based on routing keys
|
||||||
|
* Declaring and binding AMQP queues
|
||||||
|
* Supports message-receive based on binding keys
|
||||||
|
* Receiving messages from AMQP queues with async. consumer acks
|
||||||
|
|
||||||
|
# 2. NB S4R Usage
|
||||||
|
|
||||||
|
## 2.1. CLI Examples
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
## Kafka Producer
|
## AMQP Message Sender
|
||||||
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092
|
$ <nb_cmd> run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \
|
||||||
|
threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \
|
||||||
|
workload=./s4r_msg_sender.yaml \
|
||||||
|
config=./s4r_config.properties
|
||||||
|
|
||||||
## Kafka Consumer
|
## AMQP Message Receiver
|
||||||
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092
|
$ <nb_cmd> run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \
|
||||||
|
threads=8 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 \
|
||||||
|
workload=./s4r_msg_receiver.yaml \
|
||||||
|
config=./s4r_config.properties
|
||||||
```
|
```
|
||||||
|
|
||||||
## NB Kafka adapter specific CLI parameters
|
## 2.2. CLI parameters
|
||||||
|
|
||||||
* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from
|
The following CLI parameters are unique to the S4R adapter:
|
||||||
* For producer workload, this is the number of the producer threads to publish messages to the same topic
|
|
||||||
* Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe)
|
|
||||||
* `threads` and `num_clnt` values MUST be the same.
|
|
||||||
* For consumer workload, this is the partition number of a topic
|
|
||||||
* Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number.
|
|
||||||
* Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe)
|
|
||||||
* `threads` MUST be equal to `num_clnt`*`num_cons_grp`
|
|
||||||
|
|
||||||
* `num_cons_grp`: the number of consumer groups
|
* `num_conn`: the number of AMQP connections to create
|
||||||
* Only relevant for consumer workload
|
* `num_channel`: the number of AMQP channels to create for each connection
|
||||||
|
* `num_exchange`: the number of AMQP exchanges to create for each channel
|
||||||
|
* `num_queue`: the number of AMQP queues to create for each channel (only relevant for message receiver workload)
|
||||||
|
* `num_msg_client`: the number of message clients to create for each channel
|
||||||
|
* for message sender workload, it is the number of message publishers for each exchange
|
||||||
|
* for message receiver workload, it is the number of message consumers for each queue
|
||||||
|
|
||||||
|
## 2.3. Workload Definition
|
||||||
|
|
||||||
|
The example workload YAML files can be found from:
|
||||||
|
|
||||||
For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported:
|
* [s4r_msg_sender.yaml](s4r_msg_sender.yaml)
|
||||||
|
* [s4r_msg_receiver.yaml](s4r_msg_receiver.yaml)
|
||||||
|
|
||||||
* `async_api` (boolean):
|
## 2.4. Configuration Properties
|
||||||
* When true, use async Kafka client API.
|
|
||||||
* `seq_tracking` (boolean):
|
### 2.4.1. Global Properties File
|
||||||
* When true, a sequence number is created as part of each message's properties
|
|
||||||
* This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully.
|
A global S4R properties file can be specified via the `config` CLI parameter. It includes the following required properties:
|
||||||
* `seqerr_simu`:
|
* `amqpSrvHost`: AMQP server host (e.g. An Astra Streaming cluster with S4R enabled)
|
||||||
* A list of error simulation types separated by comma (,)
|
* `amqpSrvPort`: AMQP server port (for S4R enabled Astra Streaming, it is 5671)
|
||||||
* Valid error simulation types
|
* `virtualHost`: AMQP server virtual host (for S4R enabled Astra Streaming, it is "<tenant>/rabbitmq")
|
||||||
* `out_of_order`: simulate message out of sequence
|
* `amqpUser`: AMQP user (for S4R enabled Astra Streaming, it is an empty string)
|
||||||
* `msg_loss`: simulate message loss
|
* `amqpPassword`: AMQP password (for S4R enabled Astra Streaming, it is the JWT token file path)
|
||||||
* `msg_dup`: simulate message duplication
|
* `useTls`: whether to use TLS (for S4R enabled Astra Streaming, it is true)
|
||||||
* This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments.
|
* `exchangeType`: AMQP exchange type (e.g. `direct`, `fanout`, `topic`, or `headers`)
|
||||||
* `e2e_starting_time_source`:
|
|
||||||
* Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch.
|
An example of this file can be found from: [s4r_config.properties](./s4r_config.properties)
|
||||||
* The possible values for `e2e_starting_time_source`:
|
|
||||||
* `message_publish_time` : uses the message publishing timestamp as the starting time. The message publishing time, in this case, [is computed by the Kafka client on record generation](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). This is the case, as [`CreateTime` is the default](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-type).
|
### 2.4.2. Scenario Document Level Properties
|
||||||
|
|
||||||
|
For message sender workload, the following Document level configuration parameters are supported in the YAML file:
|
||||||
|
* `publisher_confirm`: whether to use publisher confirms
|
||||||
|
* `confirm_mode`: When `publisher_confirm` is true, the following 3 confirm modes are supported:
|
||||||
|
* `individual`: wait for confirm individually
|
||||||
|
* `batch`: wait for confirm in batch
|
||||||
|
* `async`: [default] no wait for confirm
|
||||||
|
* `confirm_batch_num`: batch size for waiting for **sync** publisher confirms
|
||||||
|
* Only relevant when `publisher_confirm` is true and `confirm_mode` is "batch"
|
||||||
|
* `dft_confirm_timeout_ms`: batch size for waiting for publisher confirms
|
||||||
|
* Only relevant when `publisher_confirm` is true and `confirm_mode` is **NOT** "async"
|
||||||
|
@ -1,24 +0,0 @@
|
|||||||
#!/usr/local/bin/bash
|
|
||||||
#
|
|
||||||
# Copyright (c) 2023 nosqlbench
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
#
|
|
||||||
|
|
||||||
: "${SKIP_TESTS:=1}"
|
|
||||||
(
|
|
||||||
cd "$(git rev-parse --show-toplevel)" && \
|
|
||||||
mvn clean install "-DskipTests" -pl adapters-api,adapter-s4r,nb5 && \
|
|
||||||
[[ ${SKIP_TESTS} -ne 1 ]] && \
|
|
||||||
mvn test -pl adapters-api,adapter-s4r
|
|
||||||
)
|
|
@ -1,4 +0,0 @@
|
|||||||
name
|
|
||||||
usa
|
|
||||||
canada
|
|
||||||
german
|
|
|
@ -1,3 +0,0 @@
|
|||||||
queue1
|
|
||||||
queue2
|
|
||||||
queue3
|
|
|
@ -18,7 +18,13 @@
|
|||||||
# Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled
|
# Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled
|
||||||
amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com
|
amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com
|
||||||
amqpSrvPort=5671
|
amqpSrvPort=5671
|
||||||
virtualHost=<as_tenant_name>/rabbitmq
|
virtualHost=<as_tenant>/rabbitmq
|
||||||
jwtToken=<jwt_token_value>
|
# For Astra Streaming with S4R, the user an empty string
|
||||||
|
amqpUser=
|
||||||
|
# For Astra Streaming with S4R, the password is the JWT token in the format of
|
||||||
|
# file:///path/to/astra_streaming_jwt_token_file
|
||||||
|
amqpPassword=file://</path/to/as_nbtest_token_file>
|
||||||
|
# when using Astra Streaming with S4R, this needs to be set to true
|
||||||
|
useTls=true
|
||||||
# valid values: direct, fanout, topic, headers
|
# valid values: direct, fanout, topic, headers
|
||||||
exchangeType=direct
|
exchangeType=direct
|
||||||
|
@ -1,18 +0,0 @@
|
|||||||
bindings:
|
|
||||||
myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name')
|
|
||||||
myqueue: CSVFrequencySampler('csv/queue_names.csv', 'name')
|
|
||||||
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
|
|
||||||
|
|
||||||
# Doc-level parameters (must be static)
|
|
||||||
params:
|
|
||||||
|
|
||||||
blocks:
|
|
||||||
msg-recv-block:
|
|
||||||
ops:
|
|
||||||
AmqpMsgReceiver:
|
|
||||||
#exchange_names: "{myexname}"
|
|
||||||
exchange_name: "alpha"
|
|
||||||
|
|
||||||
queue_name: "{myqueue}"
|
|
||||||
|
|
||||||
binding_key: "{myroutingkey}"
|
|
12
adapter-s4r/src/main/resources/s4r_msg_receiver.yaml
Normal file
12
adapter-s4r/src/main/resources/s4r_msg_receiver.yaml
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
bindings:
|
||||||
|
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
|
||||||
|
|
||||||
|
# Doc-level parameters (must be static)
|
||||||
|
params:
|
||||||
|
|
||||||
|
blocks:
|
||||||
|
msg-recv-block:
|
||||||
|
ops:
|
||||||
|
op1:
|
||||||
|
AmqpMsgReceiver: ""
|
||||||
|
binding_key: "{myroutingkey}"
|
@ -1,6 +1,5 @@
|
|||||||
bindings:
|
bindings:
|
||||||
mytext_val: AlphaNumericString(100)
|
mytext_val: AlphaNumericString(100)
|
||||||
myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name')
|
|
||||||
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
|
myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name')
|
||||||
|
|
||||||
|
|
||||||
@ -8,18 +7,18 @@ bindings:
|
|||||||
params:
|
params:
|
||||||
# whether to do publisher confirm (for reliable publishing)
|
# whether to do publisher confirm (for reliable publishing)
|
||||||
# - default: false
|
# - default: false
|
||||||
publisher_confirm: "false"
|
publisher_confirm: true
|
||||||
#publisher_confirm: "true"
|
#publisher_confirm: true
|
||||||
# If 'publisher_confirm' is true, use one of the following 3 confirm modes:
|
# If 'publisher_confirm' is true, use one of the following 3 confirm modes:
|
||||||
# - individual (wait_for_confirm individually)
|
# - individual (wait_for_confirm individually)
|
||||||
# - batch (wait_for_confirm in batch)
|
# - batch (wait_for_confirm in batch)
|
||||||
# - async [default]
|
# - async [default]
|
||||||
confirm_mode: "aysnc"
|
#confirm_mode: async
|
||||||
#confirm_mode: "individual"
|
#confirm_mode: individual
|
||||||
#confirm_mode: "batch"
|
confirm_mode: batch
|
||||||
|
|
||||||
# Only relevant when 'publisher_confirm' is true and 'confirm_mode' is "batch"
|
# Only relevant when 'publisher_confirm' is true and 'confirm_mode' is "batch"
|
||||||
confirm_batch_num: 100
|
confirm_batch_num: 10
|
||||||
# default timeout value (in milliseconds)
|
# default timeout value (in milliseconds)
|
||||||
# - only relevant when publisher_confirm' is true and 'confirm_mode' is NOT "async"
|
# - only relevant when publisher_confirm' is true and 'confirm_mode' is NOT "async"
|
||||||
dft_confirm_timeout_ms: 1000
|
dft_confirm_timeout_ms: 1000
|
||||||
@ -28,12 +27,7 @@ params:
|
|||||||
blocks:
|
blocks:
|
||||||
msg-send-block:
|
msg-send-block:
|
||||||
ops:
|
ops:
|
||||||
AmqpMsgSender:
|
op1:
|
||||||
#exchange_names: "{myexname}"
|
AmqpMsgSender: ""
|
||||||
exchange_names: "alpha"
|
|
||||||
|
|
||||||
routing_key: "{myroutingkey}"
|
routing_key: "{myroutingkey}"
|
||||||
|
|
||||||
## (Optional) Kafka message value.
|
|
||||||
# - message key and value can't be both empty at the same time
|
|
||||||
message: "{mytext_val}"
|
message: "{mytext_val}"
|
@ -1,36 +0,0 @@
|
|||||||
#!/usr/local/bin/bash
|
|
||||||
#
|
|
||||||
# Copyright (c) 2023 nosqlbench
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
#
|
|
||||||
|
|
||||||
: "${REBUILD:=1}"
|
|
||||||
: "${CYCLES:=1000000000}"
|
|
||||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
|
|
||||||
if [[ ${REBUILD} -eq 1 ]]; then
|
|
||||||
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
|
|
||||||
fi
|
|
||||||
java -jar nb5/target/nb5.jar \
|
|
||||||
run \
|
|
||||||
driver=s4r \
|
|
||||||
-vv \
|
|
||||||
--report-interval 5 \
|
|
||||||
--docker-metrics \
|
|
||||||
cycles=${CYCLES} \
|
|
||||||
threads=1 \
|
|
||||||
num_clnt=1 \
|
|
||||||
num_cons_grp=1 \
|
|
||||||
yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \
|
|
||||||
config="${SCRIPT_DIR}/kafka_config.properties" \
|
|
||||||
bootstrap_server=PLAINTEXT://localhost:9092
|
|
@ -1,38 +0,0 @@
|
|||||||
#!/usr/local/bin/bash
|
|
||||||
#
|
|
||||||
# Copyright (c) 2023 nosqlbench
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
#
|
|
||||||
|
|
||||||
: "${REBUILD:=1}"
|
|
||||||
: "${CYCLES:=1000000000}"
|
|
||||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)"
|
|
||||||
if [[ ${REBUILD} -eq 1 ]]; then
|
|
||||||
"${SCRIPT_DIR}/build-nb-kafka-driver.sh"
|
|
||||||
fi
|
|
||||||
while [[ 1 -eq 1 ]]; do
|
|
||||||
java -jar nb5/target/nb5.jar \
|
|
||||||
run \
|
|
||||||
driver=s4r \
|
|
||||||
-vv \
|
|
||||||
--report-interval 5 \
|
|
||||||
--docker-metrics \
|
|
||||||
cycles="${CYCLES}" \
|
|
||||||
threads=1 \
|
|
||||||
num_clnt=1 \
|
|
||||||
yaml="${SCRIPT_DIR}/kafka_producer.yaml" \
|
|
||||||
config="${SCRIPT_DIR}/kafka_config.properties" \
|
|
||||||
bootstrap_server=PLAINTEXT://localhost:9092
|
|
||||||
sleep 10
|
|
||||||
done
|
|
@ -20,7 +20,7 @@
|
|||||||
<parent>
|
<parent>
|
||||||
<artifactId>mvn-defaults</artifactId>
|
<artifactId>mvn-defaults</artifactId>
|
||||||
<groupId>io.nosqlbench</groupId>
|
<groupId>io.nosqlbench</groupId>
|
||||||
<version>5.17.1-SNAPSHOT</version>
|
<version>${revision}</version>
|
||||||
<relativePath>../mvn-defaults</relativePath>
|
<relativePath>../mvn-defaults</relativePath>
|
||||||
</parent>
|
</parent>
|
||||||
|
|
||||||
@ -51,7 +51,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.nosqlbench</groupId>
|
<groupId>io.nosqlbench</groupId>
|
||||||
<artifactId>engine-cli</artifactId>
|
<artifactId>engine-cli</artifactId>
|
||||||
<version>5.17.1-SNAPSHOT</version>
|
<version>${revision}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
Loading…
Reference in New Issue
Block a user