diff --git a/adapter-pulsar/pom.xml b/adapter-pulsar/pom.xml index e1726bce9..8dc32c655 100644 --- a/adapter-pulsar/pom.xml +++ b/adapter-pulsar/pom.xml @@ -34,10 +34,7 @@ - - 2.10.4 + 3.0.0 diff --git a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java index b9dabba8b..3e92da254 100644 --- a/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java +++ b/adapter-pulsar/src/main/java/io/nosqlbench/adapter/pulsar/dispensers/PulsarClientOpDispenser.java @@ -16,25 +16,21 @@ package io.nosqlbench.adapter.pulsar.dispensers; -import com.codahale.metrics.Timer; import com.codahale.metrics.Timer.Context; import io.nosqlbench.adapter.pulsar.PulsarSpace; -import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil; +import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException; import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil.DOC_LEVEL_PARAMS; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; -import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil; import io.nosqlbench.engine.api.metrics.EndToEndMetricsAdapterUtil.MSG_SEQ_ERROR_SIMU_TYPE; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.function.LongFunction; import java.util.function.Predicate; import java.util.function.Supplier; @@ -93,13 +89,10 @@ public abstract class PulsarClientOpDispenser extends PulsarBaseOpDispenser { .newTransaction() .build() .get(); - } catch (final ExecutionException | InterruptedException err) { + } catch (Exception err) { if (PulsarClientOpDispenser.logger.isWarnEnabled()) PulsarClientOpDispenser.logger.warn("Error while starting a new transaction", err); - throw new RuntimeException(err); - } catch (final PulsarClientException err) { - throw new RuntimeException("Transactions are not enabled on Pulsar Client, " + - "please set client.enableTransaction=true in your Pulsar Client configuration"); + throw new PulsarAdapterUnexpectedException(err); } }; } diff --git a/adapter-s4j/pom.xml b/adapter-s4j/pom.xml index 634fc9bbb..dafca2e06 100644 --- a/adapter-s4j/pom.xml +++ b/adapter-s4j/pom.xml @@ -37,7 +37,7 @@ - 3.2.1 + 4.0.1 diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java index 5f3764180..c0cccc5de 100644 --- a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/dispensers/MessageProducerOpDispenser.java @@ -58,7 +58,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser { this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM); this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM); this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM); - this.msgTypeFunc = lookupMandtoryStrOpValueFunc(MSG_TYPE_OP_PARAM); + this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM); } private Message createAndSetMessagePayload( @@ -305,7 +305,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser { logger.warn( "The specified JMS message type {} is not valid, use the default TextMessage type!", jmsMsgType); - jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.TEXT.label; + jmsMsgType = S4JAdapterUtil.JMS_MESSAGE_TYPES.BYTE.label; } diff --git a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java index e2a62c5a1..74706ab94 100644 --- a/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java +++ b/adapter-s4j/src/main/java/io/nosqlbench/adapter/s4j/util/S4JClientConf.java @@ -16,6 +16,7 @@ package io.nosqlbench.adapter.s4j.util; +import io.nosqlbench.adapter.s4j.exception.S4JAdapterUnexpectedException; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.FileBasedConfiguration; import org.apache.commons.configuration2.PropertiesConfiguration; @@ -60,7 +61,8 @@ public class S4JClientConf { - public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) { + public S4JClientConf(String webSvcUrl, String pulsarSvcUrl, String s4jConfFileName) + throws S4JAdapterUnexpectedException { ////////////////// // Read related Pulsar client configuration settings from a file @@ -156,12 +158,9 @@ public class S4JClientConf { } } } - } catch (IOException ioe) { - logger.error("Can't read the specified config properties file: " + fileName); - ioe.printStackTrace(); - } catch (ConfigurationException cex) { - logger.error("Error loading configuration items from the specified config properties file: " + fileName + ":" + cex.getMessage()); - cex.printStackTrace(); + } catch (IOException | ConfigurationException ex) { + ex.printStackTrace(); + throw new S4JAdapterUnexpectedException("Can't read the specified config properties file: " + fileName); } } diff --git a/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml b/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml index 6a0528574..3fb641a81 100644 --- a/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml +++ b/adapter-s4j/src/main/resources/pulsar_s4j_producer.yaml @@ -16,7 +16,7 @@ blocks: msg-produce-block: ops: op1: - ## The value represents the destination (queue or topic) name) + ## The value represents the destination (queue or topic) name MessageProduce: "mys4jtest_t" ## (Optional) JMS headers (in JSON format). diff --git a/adapter-s4r/pom.xml b/adapter-s4r/pom.xml index 014aed4ea..0e1385970 100644 --- a/adapter-s4r/pom.xml +++ b/adapter-s4r/pom.xml @@ -50,7 +50,6 @@ ${amqp.version} - commons-beanutils commons-beanutils @@ -63,6 +62,12 @@ commons-configuration2 2.9.0 + + + commons-io + commons-io + 2.13.0 + diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java index 645a94aeb..e66c4cd22 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java @@ -27,14 +27,18 @@ import io.nosqlbench.api.config.standard.ConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.Param; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.File; import java.io.IOException; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -71,13 +75,16 @@ public class S4RSpace implements AutoCloseable { // Maximum number of AMQP channels per connection private final int amqpConnChannelNum; + // Maximum number of AMQP exchanges per channel + private final int amqpChannelExchangeNum; + // Max number of queues (per exchange) // - only relevant with message receivers private final int amqpExchangeQueueNum; // Max number of message clients (senders or receivers) // - for senders, this is the number of message senders per exchange - // - for recievers, this is the number of message receivers per queue + // - for receivers, this is the number of message receivers per queue // (there could be multiple queues per exchange) private final int amqpMsgClntNum; @@ -91,18 +98,9 @@ public class S4RSpace implements AutoCloseable { private final ConcurrentHashMap amqpConnections = new ConcurrentHashMap<>(); - /////////////////////////////////// - // NOTE: Do NOT mix sender and receiver workload in one NB workload - /////////////////////////////////// - - // Amqp Channels for senders - public record AmqpSenderChannelKey(Long connId, Long channelId, Long senderId) { } - private final ConcurrentHashMap amqpSenderChannels = new ConcurrentHashMap<>(); - - // Amqp Channels for receivers - public record AmqpReceiverChannelKey(Long connId, Long channelId, Long queueId, Long consumerId) { } - private final ConcurrentHashMap amqpReceiverChannels = new ConcurrentHashMap<>(); - private final ConcurrentHashMap> amqpRecvChannelQueueSetMap = new ConcurrentHashMap<>(); + // Amqp connection/chanel/exchange combination for a sender + public record AmqpChannelKey(Long connId, Long channelId) { } + private final ConcurrentHashMap amqpChannels = new ConcurrentHashMap<>(); // Whether to do strict error handling while sending/receiving messages // - Yes: any error returned from the AMQP server (or AMQP compatible sever like Pulsar) while doing @@ -129,6 +127,8 @@ public class S4RSpace implements AutoCloseable { NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1")); this.amqpConnChannelNum = NumberUtils.toInt(cfg.getOptional("num_channel").orElse("1")); + this.amqpChannelExchangeNum = + NumberUtils.toInt(cfg.getOptional("num_exchange").orElse("1")); this.amqpExchangeQueueNum = NumberUtils.toInt(cfg.getOptional("num_queue").orElse("1")); this.amqpMsgClntNum = @@ -155,6 +155,12 @@ public class S4RSpace implements AutoCloseable { .setDescription("Maximum number of AMQP connections.")) .add(Param.defaultTo("num_channel", 1) .setDescription("Maximum number of AMQP channels per connection")) + .add(Param.defaultTo("num_exchange", 1) + .setDescription("Maximum number of AMQP exchanges per channel.")) + .add(Param.defaultTo("num_queue", 1) + .setDescription("Max number of queues per exchange (only relevant for receivers).")) + .add(Param.defaultTo("num_msg_clnt", 1) + .setDescription("Max number of message clients per exchange (sender) or per queue (receiver).")) .add(Param.defaultTo("max_op_time", 0) .setDescription("Maximum time (in seconds) to run NB Kafka testing scenario.")) .add(Param.defaultTo("strict_msg_error_handling", false) @@ -164,16 +170,10 @@ public class S4RSpace implements AutoCloseable { public Connection getAmqpConnection(Long id) { return amqpConnections.get(id); } - public Channel getAmqpSenderChannel( - AmqpSenderChannelKey key, + public Channel getAmqpChannels( + AmqpChannelKey key, Supplier channelSupplier) { - return amqpSenderChannels.computeIfAbsent(key, __ -> channelSupplier.get()); - } - - public Channel getAmqpReceiverChannel( - AmqpReceiverChannelKey key, - Supplier channelSupplier) { - return amqpReceiverChannels.computeIfAbsent(key, __ -> channelSupplier.get()); + return amqpChannels.computeIfAbsent(key, __ -> channelSupplier.get()); } public long getActivityStartTimeMills() { return this.activityStartTimeMills; } @@ -183,7 +183,8 @@ public class S4RSpace implements AutoCloseable { public String getAmqpExchangeType() { return amqpExchangeType; } public int getAmqpConnNum() { return this.amqpConnNum; } public int getAmqpConnChannelNum() { return this.amqpConnChannelNum; } - public int getAmqpExchangeQueueNum() { return this.amqpConnNum; } + public int getAmqpChannelExchangeNum() { return this.amqpChannelExchangeNum; } + public int getAmqpExchangeQueueNum() { return this.amqpExchangeQueueNum; } public int getAmqpMsgClntNum() { return this.amqpMsgClntNum; } public boolean isStrictMsgErrorHandling() { return this.strictMsgErrorHandling; } @@ -218,19 +219,50 @@ public class S4RSpace implements AutoCloseable { try { s4rConnFactory = new ConnectionFactory(); - String passWord = cfg.get("jwtToken"); - s4rConnFactory.setPassword(cfgMap.get("")); - s4rConnFactory.setPassword(passWord); - - String amqpServerHost = cfg.get("amqpSrvHost"); + String amqpServerHost = cfgMap.get("amqpSrvHost"); + if (StringUtils.isBlank(amqpServerHost)) { + String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!"; + throw new S4RAdapterInvalidParamException(errMsg); + } s4rConnFactory.setHost(amqpServerHost); - int amqpServerPort = Integer.parseInt(cfg.get("amqpSrvPort")); - s4rConnFactory.setPort(amqpServerPort); + String amqpSrvPortCfg = cfgMap.get("amqpSrvPort"); + if (StringUtils.isBlank(amqpSrvPortCfg)) { + String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!"; + throw new S4RAdapterInvalidParamException(errMsg); + } + s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg)); - String amqpVirtualHost = cfg.get("virtualHost"); + String amqpVirtualHost = cfgMap.get("virtualHost"); + if (StringUtils.isBlank(amqpVirtualHost)) { + String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!"; + throw new S4RAdapterInvalidParamException(errMsg); + } s4rConnFactory.setVirtualHost(amqpVirtualHost); + String userNameCfg = cfgMap.get("amqpUser"); + + String passWordCfg = cfgMap.get("amqpPassword"); + if (StringUtils.isNotBlank(passWordCfg)) { + String passWord = passWordCfg; + if (StringUtils.startsWith(passWordCfg, "file://") + && StringUtils.length(passWordCfg) > 7) { + String jwtTokenFile = StringUtils.substring(passWordCfg, 7); + passWord = FileUtils.readFileToString(new File(jwtTokenFile), "UTF-8"); + } + + if (StringUtils.isNotBlank(passWord)) { + if (StringUtils.isBlank(userNameCfg)) { + s4rConnFactory.setUsername(""); + } + s4rConnFactory.setPassword(passWord); + } + } + + String useTlsCfg = cfgMap.get("useTls"); + if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) { + s4rConnFactory.useSslProtocol(); + } for (int i = 0; i < getAmqpConnNum(); i++) { Connection connection = s4rConnFactory.newConnection(); @@ -243,7 +275,7 @@ public class S4RSpace implements AutoCloseable { connection); } } - } catch (IOException|TimeoutException ex) { + } catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) { logger.error("Unable to establish AMQP connections with the following configuration parameters: {}", s4rClientConnInfo.toString()); throw new S4RAdapterUnexpectedException(ex); @@ -255,11 +287,7 @@ public class S4RSpace implements AutoCloseable { try { beingShutdown.set(true); - for (Channel channel : amqpSenderChannels.values()) { - channel.close(); - } - - for (Channel channel : amqpReceiverChannels.values()) { + for (Channel channel : amqpChannels.values()) { channel.close(); } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java index 109b37898..4f99b5db9 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java @@ -16,9 +16,7 @@ package io.nosqlbench.adapter.s4r.dispensers; -import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; import io.nosqlbench.adapter.s4r.S4RSpace; import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; @@ -26,7 +24,6 @@ import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -46,8 +43,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser s4rConfMap = new HashMap<>(); protected final String exchangeType; - protected final LongFunction exchangeNameFunc; - protected AmqpBaseOpDispenser(final DriverAdapter adapter, final ParsedOp op, final S4RSpace s4RSpace) { @@ -63,7 +58,6 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser bindingKeyFunc; - private final LongFunction queueNameFunc; public AmqpMsgRecvOpDispenser(DriverAdapter adapter, ParsedOp op, S4RSpace s4rSpace) { super(adapter, op, s4rSpace); - - queueNameFunc = lookupOptionalStrOpValueFunc("queue_name", null); bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); } private long getExchangeQueueSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) - % s4rSpace.getAmqpExchangeQueueNum(); + return (cycle / ((long) s4rSpace.getAmqpConnNum() * + s4rSpace.getAmqpConnChannelNum() * + s4rSpace.getAmqpChannelExchangeNum()) + ) % s4rSpace.getAmqpExchangeQueueNum(); } private long getQueueReceiverSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum() * s4rSpace.getAmqpExchangeQueueNum())) - % s4rSpace.getAmqpMsgClntNum(); + return (cycle / ((long) s4rSpace.getAmqpConnNum() * + s4rSpace.getAmqpConnChannelNum() * + s4rSpace.getAmqpChannelExchangeNum() * + s4rSpace.getAmqpExchangeQueueNum()) + ) % s4rSpace.getAmqpMsgClntNum(); } - private String getEffectiveQueueName(long cycle) { - String queueNameInput = queueNameFunc.apply(cycle); - return (StringUtils.isBlank(queueNameInput) ? "queue-" + getExchangeQueueSeqNum(cycle) : queueNameInput); + private String getEffectiveQueueNameByCycle(long cycle) { + return getEffectiveQueueName( + getConnSeqNum(cycle), + getConnChannelSeqNum(cycle), + getChannelExchangeSeqNum(cycle), + getExchangeQueueSeqNum(cycle)); + } + private String getEffectiveQueueName(long connSeqNum, long channelSeqNum, long exchangeSeqNum, long queueSeqNum) { + return "queue-" + connSeqNum + "-" + channelSeqNum + "-" + exchangeSeqNum + "-" + queueSeqNum; } - private Channel getAmqpChannelQueueForReceiver(long cycle, - String exchangeName, - String queueName) { + private String getEffectiveReceiverName(long cycle) { + return getEffectiveReceiverName( + getConnSeqNum(cycle), + getConnChannelSeqNum(cycle), + getChannelExchangeSeqNum(cycle), + getExchangeQueueSeqNum(cycle), + getQueueReceiverSeqNum(cycle)); + } + private String getEffectiveReceiverName(long connSeqNum, + long channelSeqNum, + long exchangeSeqNum, + long queueSeqNum, + long receiverSeqNum) { + return String.format( + "receiver-%d-%d-%d-%d-%d", + connSeqNum, + channelSeqNum, + exchangeSeqNum, + queueSeqNum, + receiverSeqNum); + } + + private Channel getAmqpChannelForReceiver(long cycle) { long connSeqNum = getConnSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle); - long queueSeqNum = getExchangeQueueSeqNum(cycle); - long receiverSeqNum = getQueueReceiverSeqNum(cycle); - Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); + Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); + S4RSpace.AmqpChannelKey amqpConnChannelKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum); - S4RSpace.AmqpReceiverChannelKey amqpConnChannelKey = - new S4RSpace.AmqpReceiverChannelKey(connSeqNum, channelSeqNum, queueSeqNum, receiverSeqNum); - - return s4rSpace.getAmqpReceiverChannel(amqpConnChannelKey, () -> { + return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> { Channel channel = null; try { - channel = getChannelWithExchange( - amqpConnection, - connSeqNum, - channelSeqNum, - exchangeName); - - AMQP.Queue.DeclareOk declareOk = - channel.queueDeclare(queueName, true, true, true, null); + channel = amqpConnection.createChannel(); if (logger.isDebugEnabled()) { - logger.debug("AMQP queue declared -- [exchange name: {}, queue name: {}] {}", - exchangeName, - queueName, - declareOk); + logger.debug("Created channel for amqp connection: {}, channel: {}", + amqpConnection, channel); + } + } + catch (IOException ex) { + // Do not throw exception here, just log it and return null + if (logger.isDebugEnabled()) { + logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex); } - } catch (IOException ex) { - throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!"); } return channel; }); } + @Override public S4RTimeTrackOp apply(long cycle) { - Channel channel = null; + Channel channel = getAmqpChannelForReceiver(cycle); + if (channel == null) { + throw new S4RAdapterUnexpectedException( + String.format( + "Failed to get AMQP channel for receiver %s [%d]!", + getEffectiveReceiverName(cycle), + cycle)); + } - String exchangeName = getEffectiveExchangeName(cycle); - String queueName = getEffectiveQueueName(cycle); + String exchangeName = getEffectiveExchangeNameByCycle(cycle); + declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); + + boolean durable = true; + boolean exclusive = true; + boolean autoDelete = false; + String queueName = getEffectiveQueueNameByCycle(cycle); + String bindingKey = bindingKeyFunc.apply(cycle); + try { + channel.queueDeclare(queueName, durable, exclusive, autoDelete, null); + if (logger.isTraceEnabled()) { + logger.debug("AMQP queue is declared - \"{} ({}/{}/{})\" on exchange \"{}\" for a receiver!", + queueName, + durable, + exclusive, + autoDelete, + exchangeName); + } + } + catch (IOException ex) { + throw new S4RAdapterUnexpectedException( + String.format( + "Unable to declare the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" for a receiver!", + queueName, durable, exclusive, autoDelete, exchangeName) + ); + } try { - channel = getAmqpChannelQueueForReceiver(cycle, exchangeName, queueName); + // Binding the same queue multiple times on one exchange is considered as a no-op + channel.queueBind(queueName, exchangeName, bindingKey); + if (logger.isTraceEnabled()) { + logger.debug("AMQP queue is bound - \"{} ({}/{}/{})\" on exchange \"{}\" with binding key \"{}\"!", + queueName, + durable, + exclusive, + autoDelete, + exchangeName, + bindingKey); + } } - catch (Exception ex) { - throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel!"); + catch (IOException ex) { + throw new S4RAdapterUnexpectedException( + String.format( + "Unable to bind the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" with binding key \"%s\"!", + queueName, durable, exclusive, autoDelete, exchangeName, bindingKey) + ); } return new OpTimeTrackAmqpMsgRecvOp( @@ -121,7 +183,6 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { s4rSpace, channel, exchangeName, - queueName, - bindingKeyFunc.apply(cycle)); + queueName); } } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java index 803da6fb5..e08a43ca8 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java @@ -68,7 +68,8 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { .orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label); if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) { throw new S4RAdapterInvalidParamException("confirm_mode", - "Must be one following valid values: '" + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); + "The provided value \"" + confirmMode + "\" is not one of following valid values: '" + + S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); } confirmBatchNum = parsedOp @@ -86,62 +87,87 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { } private long getExchangeSenderSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * s4rSpace.getAmqpConnChannelNum())) - % s4rSpace.getAmqpMsgClntNum(); + return (cycle / ((long) s4rSpace.getAmqpConnNum() * + s4rSpace.getAmqpConnChannelNum() * + s4rSpace.getAmqpChannelExchangeNum()) + ) % s4rSpace.getAmqpMsgClntNum(); } - private Channel getAmqpChannelForSender(long cycle, - String exchangeName) { + private String getEffectiveSenderNameByCycle(long cycle) { + return getEffectiveSenderNameByCycle( + getConnSeqNum(cycle), + getConnChannelSeqNum(cycle), + getChannelExchangeSeqNum(cycle), + getExchangeSenderSeqNum(cycle)); + } + private String getEffectiveSenderNameByCycle(long connSeqNum, + long channelSeqNum, + long exchangeSeqNum, + long senderSeqNum) { + return String.format( + "sender-%d-%d-%d-%d", + connSeqNum, + channelSeqNum, + exchangeSeqNum, + senderSeqNum); + } + + private Channel getAmqpChannelForSender(long cycle) { long connSeqNum = getConnSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle); - long senderSeqNum = getExchangeSenderSeqNum(cycle); - Connection amqpConnection = s4rSpace.getAmqpConnection(cycle % connSeqNum); + Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); + S4RSpace.AmqpChannelKey senderKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum); - S4RSpace.AmqpSenderChannelKey amqpConnChannelKey = - new S4RSpace.AmqpSenderChannelKey(connSeqNum, channelSeqNum, senderSeqNum); - - return s4rSpace.getAmqpSenderChannel(amqpConnChannelKey, () -> { - Channel channel; + return s4rSpace.getAmqpChannels(senderKey, () -> { + Channel channel = null; try { - channel = getChannelWithExchange( - amqpConnection, - connSeqNum, - channelSeqNum, - exchangeName); + channel = amqpConnection.createChannel(); + if (logger.isDebugEnabled()) { + logger.debug("Created channel for amqp connection: {}, channel: {}", + amqpConnection, channel); + } + } + catch (IOException ex) { + // Do not throw exception here, just log it and return null + if (logger.isDebugEnabled()) { + logger.debug("Failed to create channel for amqp connection: " + amqpConnection, ex); + } + } - if (publisherConfirm) { + try { + if ((channel != null) && publisherConfirm) { channel.confirmSelect(); - boolean asyncConfirm = false; if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) { - asyncConfirm = true; - channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed if (logger.isTraceEnabled()) { - logger.debug("Async ack of message publish received: {}, {}", + logger.debug("Async ack received for a published message: {}, {}", sequenceNumber, multiple); } }, (sequenceNumber, multiple) -> { // code when message is nack-ed if (logger.isTraceEnabled()) { - logger.debug("Async n-ack of message publish received: {}, {}", + logger.debug("Async n-ack received of a published message: {}, {}", sequenceNumber, multiple); } }); } - if (logger.isDebugEnabled()) { - logger.debug("Publisher Confirms enabled on AMQP channel (sync: {}) -- {}", - !asyncConfirm, + if (logger.isTraceEnabled()) { + logger.debug("Publisher Confirms is enabled on AMQP channel: {}({}), {}", + confirmMode, + confirmBatchNum, channel); } } } catch (IOException ex) { - throw new S4RAdapterUnexpectedException("Unexpected error when creating the AMQP channel!"); + throw new S4RAdapterUnexpectedException( + "Failed to enable publisher acknowledgement on the AMQP channel (" + + channel + ")!"); } return channel; @@ -155,15 +181,17 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!"); } - Channel channel; - String exchangeName = getEffectiveExchangeName(cycle); + Channel channel = getAmqpChannelForSender(cycle); + if (channel == null) { + throw new S4RAdapterUnexpectedException( + String.format( + "Failed to get AMQP channel for sender %s [%d]!", + getEffectiveSenderNameByCycle(cycle), + cycle)); + } - try { - channel = getAmqpChannelForSender(cycle, exchangeName); - } - catch (Exception ex) { - throw new S4RAdapterUnexpectedException("Unable to create the AMQP channel for sending messages!"); - } + String exchangeName = getEffectiveExchangeNameByCycle(cycle); + declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); return new OpTimeTrackAmqpMsgSendOp( s4rAdapterMetrics, diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java index 2ad551ead..c780e3012 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java @@ -31,27 +31,16 @@ import java.nio.charset.StandardCharsets; public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp"); - private final String queueName; - private final String bindingKey; + public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics, S4RSpace s4rSpace, Channel channel, String exchangeName, - String queueName, - String bindingKey) { + String queueName) { super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); this.queueName = queueName; - this.bindingKey = bindingKey; - - try { - channel.queueBind(queueName, exchangeName, bindingKey); - } - catch (IOException ex) { - throw new S4RAdapterUnexpectedException("Unable to bind queue (\"" + queueName + "\") to " + - "exchange (\"" + exchangeName + "\")!"); - } } @Override @@ -59,20 +48,26 @@ public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { try { Consumer receiver = new DefaultConsumer(channel) { @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, - byte[] body) throws IOException { + public void handleDelivery( + String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) + { + String routingKey = envelope.getRoutingKey(); + String contentType = properties.getContentType(); + String msgPayload = new String(body, StandardCharsets.UTF_8); + if (logger.isTraceEnabled()) { - String msgPayload = new String(body, StandardCharsets.UTF_8); - logger.trace("Successfully received message ({}) via consumer ({}) in the current channel: {}", + logger.trace( + "Successfully received message ({}) via consumer ({}/{}/{}) in the current channel: {}", msgPayload, consumerTag, + routingKey, + contentType, channel); } } }; - channel.basicConsume(queueName, receiver); - + channel.basicConsume(queueName, true, receiver); } catch (IOException e) { throw new S4RAdapterUnexpectedException( diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java index 9c9795fcc..b19a035a7 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java @@ -28,6 +28,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; @@ -40,8 +41,8 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { private final String confirmMode; private final int confirmBatchNum; - private static final ThreadLocal - publishConfirmBatchTrackingCnt = ThreadLocal.withInitial(() -> 0); + private static final ConcurrentHashMap + channelPublishConfirmBathTracking = new ConcurrentHashMap<>(); public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics, S4RSpace s4rSpace, @@ -73,34 +74,40 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { routingKey, null, msgPayload.getBytes(StandardCharsets.UTF_8)); + if (logger.isTraceEnabled()) { + logger.trace("Successfully published message (({}) {}) via the current channel: {}", + cycle, msgPayload, channel); + } if (publishConfirm) { // Individual publish confirm if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) { channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + if (logger.isTraceEnabled()) { + logger.debug("Sync ack received for an individual published message: {}", cycle); + } } // Batch publish confirm else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) { - int publishConfirmTrackingCnt = publishConfirmBatchTrackingCnt.get(); + int publishConfirmTrackingCnt = + channelPublishConfirmBathTracking.getOrDefault(channel, 0); + if ( (publishConfirmTrackingCnt > 0) && ( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) || (publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) { - synchronized (this) { - channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + if (logger.isTraceEnabled()) { + logger.debug("Sync ack received for a batch of published message: {}, {}", + cycle, publishConfirmTrackingCnt); } } else { - publishConfirmBatchTrackingCnt.set(publishConfirmTrackingCnt+1); + channelPublishConfirmBathTracking.put(channel, publishConfirmTrackingCnt+1); } } // Async publish confirm // - Do nothing here. See "channel.addConfirmListener" code in 'AmqpMsgSendOpDispenser' } - - if (logger.isTraceEnabled()) { - logger.trace("Successfully published message ({}) via the current channel: {}", - msgPayload, channel); - } } catch (IllegalStateException ex) { throw new S4RAdapterUnexpectedException( diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java index 9009d26cc..8a0a81e4b 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java +++ b/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java @@ -16,13 +16,10 @@ package io.nosqlbench.adapter.s4r.util; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -30,18 +27,6 @@ import java.util.stream.Stream; public class S4RAdapterUtil { private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class); - /////// - // Valid document level parameters for JMS NB yaml file - public enum DOC_LEVEL_PARAMS { - // Blocking message producing or consuming - ASYNC_API("async_api"); - public final String label; - - DOC_LEVEL_PARAMS(final String label) { - this.label = label; - } - } - public enum AMQP_EXCHANGE_TYPES { DIRECT("direct"), FANOUT("fanout"), @@ -83,19 +68,14 @@ public class S4RAdapterUtil { } } public static String getValidAmqpPublisherConfirmModeList() { - return StringUtils.join(AMQP_EXCHANGE_TYPES.LABELS, ", "); + return StringUtils.join(AMQP_PUB_CONFIRM_MODE.LABELS, ", "); } // At least 20 messages in a publishing batch - public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 20; + public static int AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN = 10; public static int DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM = 100; public static int DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS = 1000; - public static Map convertJsonToMap(final String jsonStr) throws Exception { - final ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonStr, new TypeReference>(){}); - } - public static void pauseCurThreadExec(final int pauseInSec) { if (0 < pauseInSec) try { Thread.sleep(pauseInSec * 1000L); diff --git a/adapter-s4r/src/main/resources/README.md b/adapter-s4r/src/main/resources/README.md index 3e66eb7cc..3a915e5c7 100644 --- a/adapter-s4r/src/main/resources/README.md +++ b/adapter-s4r/src/main/resources/README.md @@ -1,62 +1,92 @@ -# Overview +- [1. Overview](#1-overview) +- [2. NB S4R Usage](#2-nb-s4r-usage) + - [2.1. CLI Examples](#21-cli-examples) + - [2.2. CLI parameters](#22-cli-parameters) + - [2.3. Workload Definition](#23-workload-definition) + - [2.4. Configuration Properties](#24-configuration-properties) + - [2.4.1. Global Properties File](#241-global-properties-file) + - [2.4.2. Scenario Document Level Properties](#242-scenario-document-level-properties) -This NB Kafka adapter allows publishing messages to or consuming messages from -* a Kafka cluster, or -* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar. +--- -At high level, this adapter supports the following Kafka functionalities -* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers) -* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits) - * auto message commit - * manual message commit with a configurable number of message commits in one batch -* Kafka Transaction support +# 1. Overview -## Example NB Yaml -* [kafka_producer.yaml](./s4r_producer.yaml) -* -* [kafka_consumer.yaml](./s4r_consumer.yaml) +This NB S4R adapter allows sending messages to or receiving messages from +* an AMQP 0-9-1 based server (e.g. RabbitMQ), or +* a Pulsar cluster with [S4R](https://github.com/datastax/starlight-for-rabbitmq) AMQP (0-9-1) Protocol handler for Pulsar. -# Usage +At high level, this adapter supports the following AMQP 0-9-1 functionalities +* Creating AMQP connections and channels +* Declaring AMQP exchanges + * The following exchange types are supported: `direct`, `fanout`, `topic`, and `headers` +* Sending messages to AMQP exchanges with sync. or async. publisher confirms + * For sync confirms, it supports both single and batch confirms + * Supports message-send based on routing keys +* Declaring and binding AMQP queues + * Supports message-receive based on binding keys +* Receiving messages from AMQP queues with async. consumer acks + +# 2. NB S4R Usage + +## 2.1. CLI Examples ```bash -## Kafka Producer -$ run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 +## AMQP Message Sender +$ run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \ + threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \ + workload=./s4r_msg_sender.yaml \ + config=./s4r_config.properties -## Kafka Consumer -$ run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=s4r_producer.yaml config=s4r_config.properties bootstrap_server=PLAINTEXT://localhost:9092 +## AMQP Message Receiver +$ run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \ + threads=8 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 \ + workload=./s4r_msg_receiver.yaml \ + config=./s4r_config.properties ``` -## NB Kafka adapter specific CLI parameters +## 2.2. CLI parameters -* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from - * For producer workload, this is the number of the producer threads to publish messages to the same topic - * Can have multiple producer threads for one topic/partition (`KafkaProducer` is thread-safe) - * `threads` and `num_clnt` values MUST be the same. - * For consumer workload, this is the partition number of a topic - * Consumer workload supports to subscribe from multiple topics. If so, it requires all topics having the same partition number. - * Only one consumer thread for one topic/partition (`KafkaConsumer` is NOT thread-safe) - * `threads` MUST be equal to `num_clnt`*`num_cons_grp` +The following CLI parameters are unique to the S4R adapter: -* `num_cons_grp`: the number of consumer groups - * Only relevant for consumer workload +* `num_conn`: the number of AMQP connections to create +* `num_channel`: the number of AMQP channels to create for each connection +* `num_exchange`: the number of AMQP exchanges to create for each channel +* `num_queue`: the number of AMQP queues to create for each channel (only relevant for message receiver workload) +* `num_msg_client`: the number of message clients to create for each channel + * for message sender workload, it is the number of message publishers for each exchange + * for message receiver workload, it is the number of message consumers for each queue +## 2.3. Workload Definition +The example workload YAML files can be found from: -For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported: +* [s4r_msg_sender.yaml](s4r_msg_sender.yaml) +* [s4r_msg_receiver.yaml](s4r_msg_receiver.yaml) -* `async_api` (boolean): - * When true, use async Kafka client API. -* `seq_tracking` (boolean): - * When true, a sequence number is created as part of each message's properties - * This parameter is used in conjunction with the next one in order to simulate abnormal message processing errors and then be able to detect such errors successfully. -* `seqerr_simu`: - * A list of error simulation types separated by comma (,) - * Valid error simulation types - * `out_of_order`: simulate message out of sequence - * `msg_loss`: simulate message loss - * `msg_dup`: simulate message duplication - * This value should be used only for testing purposes. It is not recommended to use this parameter in actual testing environments. -* `e2e_starting_time_source`: - * Starting timestamp for end-to-end operation. When specified, will update the `e2e_msg_latency` histogram with the calculated end-to-end latency. The latency is calculated by subtracting the starting time from the current time. The starting time is determined from a configured starting time source. The unit of the starting time is milliseconds since epoch. - * The possible values for `e2e_starting_time_source`: - * `message_publish_time` : uses the message publishing timestamp as the starting time. The message publishing time, in this case, [is computed by the Kafka client on record generation](https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html). This is the case, as [`CreateTime` is the default](https://docs.confluent.io/platform/current/installation/configuration/topic-configs.html#message-timestamp-type). +## 2.4. Configuration Properties + +### 2.4.1. Global Properties File + +A global S4R properties file can be specified via the `config` CLI parameter. It includes the following required properties: +* `amqpSrvHost`: AMQP server host (e.g. An Astra Streaming cluster with S4R enabled) +* `amqpSrvPort`: AMQP server port (for S4R enabled Astra Streaming, it is 5671) +* `virtualHost`: AMQP server virtual host (for S4R enabled Astra Streaming, it is "/rabbitmq") +* `amqpUser`: AMQP user (for S4R enabled Astra Streaming, it is an empty string) +* `amqpPassword`: AMQP password (for S4R enabled Astra Streaming, it is the JWT token file path) +* `useTls`: whether to use TLS (for S4R enabled Astra Streaming, it is true) +* `exchangeType`: AMQP exchange type (e.g. `direct`, `fanout`, `topic`, or `headers`) + +An example of this file can be found from: [s4r_config.properties](./s4r_config.properties) + +### 2.4.2. Scenario Document Level Properties + +For message sender workload, the following Document level configuration parameters are supported in the YAML file: +* `publisher_confirm`: whether to use publisher confirms +* `confirm_mode`: When `publisher_confirm` is true, the following 3 confirm modes are supported: + * `individual`: wait for confirm individually + * `batch`: wait for confirm in batch + * `async`: [default] no wait for confirm +* `confirm_batch_num`: batch size for waiting for **sync** publisher confirms + * Only relevant when `publisher_confirm` is true and `confirm_mode` is "batch" +* `dft_confirm_timeout_ms`: batch size for waiting for publisher confirms + * Only relevant when `publisher_confirm` is true and `confirm_mode` is **NOT** "async" diff --git a/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh b/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh deleted file mode 100755 index bdb55507f..000000000 --- a/adapter-s4r/src/main/resources/build-nb-s4r-driver.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/local/bin/bash -# -# Copyright (c) 2023 nosqlbench -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -: "${SKIP_TESTS:=1}" -( - cd "$(git rev-parse --show-toplevel)" && \ - mvn clean install "-DskipTests" -pl adapters-api,adapter-s4r,nb5 && \ - [[ ${SKIP_TESTS} -ne 1 ]] && \ - mvn test -pl adapters-api,adapter-s4r -) diff --git a/adapter-s4r/src/main/resources/csv/exchange_names.csv b/adapter-s4r/src/main/resources/csv/exchange_names.csv deleted file mode 100644 index 3feca9f3f..000000000 --- a/adapter-s4r/src/main/resources/csv/exchange_names.csv +++ /dev/null @@ -1,4 +0,0 @@ -name -usa -canada -german diff --git a/adapter-s4r/src/main/resources/csv/queue_names.csv b/adapter-s4r/src/main/resources/csv/queue_names.csv deleted file mode 100644 index 7c75221b9..000000000 --- a/adapter-s4r/src/main/resources/csv/queue_names.csv +++ /dev/null @@ -1,3 +0,0 @@ -queue1 -queue2 -queue3 diff --git a/adapter-s4r/src/main/resources/s4r_config.properties b/adapter-s4r/src/main/resources/s4r_config.properties index 6c75f2ac8..bfbfc74cc 100644 --- a/adapter-s4r/src/main/resources/s4r_config.properties +++ b/adapter-s4r/src/main/resources/s4r_config.properties @@ -18,7 +18,13 @@ # Below is an example to connect to Astra Streaming with RabbitMQ/S4R enabled amqpSrvHost=rabbitmq-gcp-uscentral1.streaming.datastax.com amqpSrvPort=5671 -virtualHost=/rabbitmq -jwtToken= +virtualHost=/rabbitmq +# For Astra Streaming with S4R, the user an empty string +amqpUser= +# For Astra Streaming with S4R, the password is the JWT token in the format of +# file:///path/to/astra_streaming_jwt_token_file +amqpPassword=file:// +# when using Astra Streaming with S4R, this needs to be set to true +useTls=true # valid values: direct, fanout, topic, headers exchangeType=direct diff --git a/adapter-s4r/src/main/resources/s4r_consumer.yaml b/adapter-s4r/src/main/resources/s4r_consumer.yaml deleted file mode 100644 index d6b96d129..000000000 --- a/adapter-s4r/src/main/resources/s4r_consumer.yaml +++ /dev/null @@ -1,18 +0,0 @@ -bindings: - myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name') - myqueue: CSVFrequencySampler('csv/queue_names.csv', 'name') - myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name') - -# Doc-level parameters (must be static) -params: - -blocks: - msg-recv-block: - ops: - AmqpMsgReceiver: - #exchange_names: "{myexname}" - exchange_name: "alpha" - - queue_name: "{myqueue}" - - binding_key: "{myroutingkey}" diff --git a/adapter-s4r/src/main/resources/s4r_msg_receiver.yaml b/adapter-s4r/src/main/resources/s4r_msg_receiver.yaml new file mode 100644 index 000000000..c70d4a119 --- /dev/null +++ b/adapter-s4r/src/main/resources/s4r_msg_receiver.yaml @@ -0,0 +1,12 @@ +bindings: + myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name') + +# Doc-level parameters (must be static) +params: + +blocks: + msg-recv-block: + ops: + op1: + AmqpMsgReceiver: "" + binding_key: "{myroutingkey}" diff --git a/adapter-s4r/src/main/resources/s4r_producer.yaml b/adapter-s4r/src/main/resources/s4r_msg_sender.yaml similarity index 64% rename from adapter-s4r/src/main/resources/s4r_producer.yaml rename to adapter-s4r/src/main/resources/s4r_msg_sender.yaml index 2e95eab69..730b7f529 100644 --- a/adapter-s4r/src/main/resources/s4r_producer.yaml +++ b/adapter-s4r/src/main/resources/s4r_msg_sender.yaml @@ -1,6 +1,5 @@ bindings: mytext_val: AlphaNumericString(100) - myexname: CSVFrequencySampler('csv/exchange_names.csv', 'name') myroutingkey: CSVFrequencySampler('csv/routing_keys.csv', 'name') @@ -8,18 +7,18 @@ bindings: params: # whether to do publisher confirm (for reliable publishing) # - default: false - publisher_confirm: "false" - #publisher_confirm: "true" + publisher_confirm: true + #publisher_confirm: true # If 'publisher_confirm' is true, use one of the following 3 confirm modes: # - individual (wait_for_confirm individually) # - batch (wait_for_confirm in batch) # - async [default] - confirm_mode: "aysnc" - #confirm_mode: "individual" - #confirm_mode: "batch" + #confirm_mode: async + #confirm_mode: individual + confirm_mode: batch # Only relevant when 'publisher_confirm' is true and 'confirm_mode' is "batch" - confirm_batch_num: 100 + confirm_batch_num: 10 # default timeout value (in milliseconds) # - only relevant when publisher_confirm' is true and 'confirm_mode' is NOT "async" dft_confirm_timeout_ms: 1000 @@ -28,12 +27,7 @@ params: blocks: msg-send-block: ops: - AmqpMsgSender: - #exchange_names: "{myexname}" - exchange_names: "alpha" - + op1: + AmqpMsgSender: "" routing_key: "{myroutingkey}" - - ## (Optional) Kafka message value. - # - message key and value can't be both empty at the same time message: "{mytext_val}" diff --git a/adapter-s4r/src/main/resources/start_s4r_consumer.sh b/adapter-s4r/src/main/resources/start_s4r_consumer.sh deleted file mode 100755 index 1be463e4c..000000000 --- a/adapter-s4r/src/main/resources/start_s4r_consumer.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/usr/local/bin/bash -# -# Copyright (c) 2023 nosqlbench -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -: "${REBUILD:=1}" -: "${CYCLES:=1000000000}" -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" -if [[ ${REBUILD} -eq 1 ]]; then - "${SCRIPT_DIR}/build-nb-kafka-driver.sh" -fi -java -jar nb5/target/nb5.jar \ - run \ - driver=s4r \ - -vv \ - --report-interval 5 \ - --docker-metrics \ - cycles=${CYCLES} \ - threads=1 \ - num_clnt=1 \ - num_cons_grp=1 \ - yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \ - config="${SCRIPT_DIR}/kafka_config.properties" \ - bootstrap_server=PLAINTEXT://localhost:9092 diff --git a/adapter-s4r/src/main/resources/start_s4r_producer.sh b/adapter-s4r/src/main/resources/start_s4r_producer.sh deleted file mode 100755 index 0999c5fc0..000000000 --- a/adapter-s4r/src/main/resources/start_s4r_producer.sh +++ /dev/null @@ -1,38 +0,0 @@ -#!/usr/local/bin/bash -# -# Copyright (c) 2023 nosqlbench -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -: "${REBUILD:=1}" -: "${CYCLES:=1000000000}" -SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" -if [[ ${REBUILD} -eq 1 ]]; then - "${SCRIPT_DIR}/build-nb-kafka-driver.sh" -fi -while [[ 1 -eq 1 ]]; do - java -jar nb5/target/nb5.jar \ - run \ - driver=s4r \ - -vv \ - --report-interval 5 \ - --docker-metrics \ - cycles="${CYCLES}" \ - threads=1 \ - num_clnt=1 \ - yaml="${SCRIPT_DIR}/kafka_producer.yaml" \ - config="${SCRIPT_DIR}/kafka_config.properties" \ - bootstrap_server=PLAINTEXT://localhost:9092 - sleep 10 -done diff --git a/engine-rest/pom.xml b/engine-rest/pom.xml index df8992445..42a047220 100644 --- a/engine-rest/pom.xml +++ b/engine-rest/pom.xml @@ -20,7 +20,7 @@ mvn-defaults io.nosqlbench - 5.17.1-SNAPSHOT + ${revision} ../mvn-defaults @@ -51,7 +51,7 @@ io.nosqlbench engine-cli - 5.17.1-SNAPSHOT + ${revision}