From 1880dfa9a089381b49bdecbacdee08101fcb6ae1 Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Fri, 30 Jun 2023 10:03:57 -0500 Subject: [PATCH] Rename `adapter-s4r` to `adapter-amqp` since this is a generic AMQP-0-9-1 implementation, which includes Pulsar+S4R plugin. --- {adapter-s4r => adapter-amqp}/pom.xml | 2 +- .../adapter/amqp/AmqpDriverAdapter.java | 22 ++++---- .../nosqlbench/adapter/amqp/AmqpOpMapper.java | 22 ++++---- .../nosqlbench/adapter/amqp/AmqpOpType.java | 4 +- .../io/nosqlbench/adapter/amqp/AmqpSpace.java | 50 +++++++++---------- .../amqp}/dispensers/AmqpBaseOpDispenser.java | 24 ++++----- .../dispensers/AmqpMsgRecvOpDispenser.java | 24 ++++----- .../dispensers/AmqpMsgSendOpDispenser.java | 44 ++++++++-------- .../AmqpAdapterInvalidParamException.java | 8 +-- .../AmqpAdapterUnexpectedException.java | 8 +-- .../AmqpAdapterUnsupportedOpException.java | 6 +-- .../adapter/amqp/ops/AmqpTimeTrackOp.java | 22 ++++---- .../amqp}/ops/OpTimeTrackAmqpMsgRecvOp.java | 18 +++---- .../amqp}/ops/OpTimeTrackAmqpMsgSendOp.java | 32 ++++++------ .../adapter/amqp/util/AmqpAdapterMetrics.java | 12 ++--- .../adapter/amqp/util/AmqpAdapterUtil.java | 6 +-- .../adapter/amqp/util/AmqpClientConf.java | 12 ++--- .../src/main/resources/amqp.md | 40 +++++++-------- .../resources/conf/amqp_config.properties | 0 .../scenarios/amqp_msg_receiver.yaml | 0 .../resources/scenarios/amqp_msg_sender.yaml | 0 .../resources/scenarios/csv/binding_keys.csv | 0 .../resources/scenarios/csv/routing_keys.csv | 0 .../scenarios/nbamqp_msg_proc_named.yaml | 4 +- nb5/pom.xml | 2 +- pom.xml | 4 +- 26 files changed, 183 insertions(+), 183 deletions(-) rename {adapter-s4r => adapter-amqp}/pom.xml (98%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpDriverAdapter.java (64%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java (72%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpType.java (91%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java (87%) rename {adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp}/dispensers/AmqpBaseOpDispenser.java (86%) rename {adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp}/dispensers/AmqpMsgRecvOpDispenser.java (89%) rename {adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp}/dispensers/AmqpMsgSendOpDispenser.java (81%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterInvalidParamException.java (73%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnexpectedException.java (75%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnsupportedOpException.java (78%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java (75%) rename {adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp}/ops/OpTimeTrackAmqpMsgRecvOp.java (80%) rename {adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp}/ops/OpTimeTrackAmqpMsgSendOp.java (81%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java (89%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterUtil.java (94%) rename adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java => adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java (86%) rename adapter-s4r/src/main/resources/s4r.md => adapter-amqp/src/main/resources/amqp.md (75%) rename adapter-s4r/src/main/resources/conf/s4r_config.properties => adapter-amqp/src/main/resources/conf/amqp_config.properties (100%) rename adapter-s4r/src/main/resources/scenarios/s4r_msg_receiver.yaml => adapter-amqp/src/main/resources/scenarios/amqp_msg_receiver.yaml (100%) rename adapter-s4r/src/main/resources/scenarios/s4r_msg_sender.yaml => adapter-amqp/src/main/resources/scenarios/amqp_msg_sender.yaml (100%) rename {adapter-s4r => adapter-amqp}/src/main/resources/scenarios/csv/binding_keys.csv (100%) rename {adapter-s4r => adapter-amqp}/src/main/resources/scenarios/csv/routing_keys.csv (100%) rename adapter-s4r/src/main/resources/scenarios/nbs4r_msg_proc_named.yaml => adapter-amqp/src/main/resources/scenarios/nbamqp_msg_proc_named.yaml (82%) diff --git a/adapter-s4r/pom.xml b/adapter-amqp/pom.xml similarity index 98% rename from adapter-s4r/pom.xml rename to adapter-amqp/pom.xml index 0e1385970..2ca4ee39a 100644 --- a/adapter-s4r/pom.xml +++ b/adapter-amqp/pom.xml @@ -17,7 +17,7 @@ 4.0.0 - adapter-s4r + adapter-amqp jar diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpDriverAdapter.java similarity index 64% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpDriverAdapter.java index 4ee136eef..cb2c837f3 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RDriverAdapter.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpDriverAdapter.java @@ -14,9 +14,9 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r; +package io.nosqlbench.adapter.amqp; -import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; +import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp; import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.engine.api.activityimpl.OpMapper; @@ -29,24 +29,24 @@ import org.apache.logging.log4j.Logger; import java.util.function.Function; -@Service(value = DriverAdapter.class, selector = "s4r") -public class S4RDriverAdapter extends BaseDriverAdapter { - private final static Logger logger = LogManager.getLogger(S4RDriverAdapter.class); +@Service(value = DriverAdapter.class, selector = "amqp") +public class AmqpDriverAdapter extends BaseDriverAdapter { + private final static Logger logger = LogManager.getLogger(AmqpDriverAdapter.class); @Override - public OpMapper getOpMapper() { - DriverSpaceCache spaceCache = getSpaceCache(); + public OpMapper getOpMapper() { + DriverSpaceCache spaceCache = getSpaceCache(); NBConfiguration adapterConfig = getConfiguration(); - return new S4ROpMapper(this, adapterConfig, spaceCache); + return new AmqpOpMapper(this, adapterConfig, spaceCache); } @Override - public Function getSpaceInitializer(NBConfiguration cfg) { - return (s) -> new S4RSpace(s, cfg); + public Function getSpaceInitializer(NBConfiguration cfg) { + return (s) -> new AmqpSpace(s, cfg); } @Override public NBConfigModel getConfigModel() { - return super.getConfigModel().add(S4RSpace.getConfigModel()); + return super.getConfigModel().add(AmqpSpace.getConfigModel()); } } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java similarity index 72% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java index 5004ae9f2..2b4d0994c 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpMapper.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java @@ -14,11 +14,11 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r; +package io.nosqlbench.adapter.amqp; -import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgRecvOpDispenser; -import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgSendOpDispenser; -import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; +import io.nosqlbench.adapter.amqp.dispensers.AmqpMsgRecvOpDispenser; +import io.nosqlbench.adapter.amqp.dispensers.AmqpMsgSendOpDispenser; +import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp; import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.OpMapper; @@ -29,24 +29,24 @@ import io.nosqlbench.engine.api.templating.TypeAndTarget; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class S4ROpMapper implements OpMapper { +public class AmqpOpMapper implements OpMapper { - private final static Logger logger = LogManager.getLogger(S4ROpMapper.class); + private final static Logger logger = LogManager.getLogger(AmqpOpMapper.class); private final NBConfiguration cfg; - private final DriverSpaceCache spaceCache; + private final DriverSpaceCache spaceCache; private final DriverAdapter adapter; - public S4ROpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache spaceCache) { + public AmqpOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache spaceCache) { this.cfg = cfg; this.spaceCache = spaceCache; this.adapter = adapter; } @Override - public OpDispenser apply(ParsedOp op) { + public OpDispenser apply(ParsedOp op) { String spaceName = op.getStaticConfigOr("space", "default"); - S4RSpace s4RSpace = spaceCache.get(spaceName); + AmqpSpace s4RSpace = spaceCache.get(spaceName); /* * If the user provides a body element, then they want to provide the JSON or @@ -57,7 +57,7 @@ public class S4ROpMapper implements OpMapper { throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field."); } else { - TypeAndTarget opType = op.getTypeAndTarget(S4ROpType.class, String.class); + TypeAndTarget opType = op.getTypeAndTarget(AmqpOpType.class, String.class); return switch (opType.enumId) { case AmqpMsgSender -> diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpType.java similarity index 91% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpType.java index 11baa7ac7..dadd2a4cf 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4ROpType.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpType.java @@ -14,9 +14,9 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r; +package io.nosqlbench.adapter.amqp; -public enum S4ROpType { +public enum AmqpOpType { AmqpMsgSender, AmqpMsgReceiver } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java similarity index 87% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java index e66c4cd22..3e5ca537c 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/S4RSpace.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java @@ -14,15 +14,15 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r; +package io.nosqlbench.adapter.amqp; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException; -import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; -import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil; -import io.nosqlbench.adapter.s4r.util.S4RClientConf; +import io.nosqlbench.adapter.amqp.exception.AmqpAdapterInvalidParamException; +import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; +import io.nosqlbench.adapter.amqp.util.AmqpAdapterUtil; +import io.nosqlbench.adapter.amqp.util.AmqpClientConf; import io.nosqlbench.api.config.standard.ConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfiguration; @@ -44,14 +44,14 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; -public class S4RSpace implements AutoCloseable { +public class AmqpSpace implements AutoCloseable { - private final static Logger logger = LogManager.getLogger(S4RSpace.class); + private final static Logger logger = LogManager.getLogger(AmqpSpace.class); private final String spaceName; private final NBConfiguration cfg; - private final S4RClientConf s4rClientConf; + private final AmqpClientConf s4rClientConf; /////////////////////////////////////////////////////////////////// // NOTE: in this driver, we assume: @@ -94,7 +94,7 @@ public class S4RSpace implements AutoCloseable { private ConnectionFactory s4rConnFactory; // Default to "direct" type - private String amqpExchangeType = S4RAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label; + private String amqpExchangeType = AmqpAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label; private final ConcurrentHashMap amqpConnections = new ConcurrentHashMap<>(); @@ -110,19 +110,19 @@ public class S4RSpace implements AutoCloseable { // Maximum time length to execute S4R operations (e.g. message send or consume) // - when NB execution passes this threshold, it is simply NoOp - // - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes + // - 0 means no maximum time constraint. AmqpTimeTrackOp is always executed until NB execution cycle finishes private final long maxOpTimeInSec; private final long activityStartTimeMills; private long totalCycleNum; private long totalThreadNum; - public S4RSpace(String spaceName, NBConfiguration cfg) { + public AmqpSpace(String spaceName, NBConfiguration cfg) { this.spaceName = spaceName; this.cfg = cfg; String s4rClientConfFileName = cfg.get("config"); - this.s4rClientConf = new S4RClientConf(s4rClientConfFileName); + this.s4rClientConf = new AmqpClientConf(s4rClientConfFileName); this.amqpConnNum = NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1")); this.amqpConnChannelNum = @@ -148,7 +148,7 @@ public class S4RSpace implements AutoCloseable { } public static NBConfigModel getConfigModel() { - return ConfigModel.of(S4RSpace.class) + return ConfigModel.of(AmqpSpace.class) .add(Param.defaultTo("config", "config.properties") .setDescription("S4R client connection configuration property file.")) .add(Param.defaultTo("num_conn", 1) @@ -178,7 +178,7 @@ public class S4RSpace implements AutoCloseable { public long getActivityStartTimeMills() { return this.activityStartTimeMills; } public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; } - public S4RClientConf getS4rClientConf() { return s4rClientConf; } + public AmqpClientConf getS4rClientConf() { return s4rClientConf; } public String getAmqpExchangeType() { return amqpExchangeType; } public int getAmqpConnNum() { return this.amqpConnNum; } @@ -195,24 +195,24 @@ public class S4RSpace implements AutoCloseable { public long getTotalThreadNum() { return totalThreadNum; } public void setTotalThreadNum(long threadNum) { totalThreadNum = threadNum; } - public void initializeSpace(S4RClientConf s4rClientConnInfo) { + public void initializeSpace(AmqpClientConf s4rClientConnInfo) { Map cfgMap = s4rClientConnInfo.getS4rConfMap(); if (amqpConnNum < 1) { String errMsg = "AMQP connection number (\"num_conn\") must be a positive number!"; - throw new S4RAdapterInvalidParamException(errMsg); + throw new AmqpAdapterInvalidParamException(errMsg); } if (amqpConnChannelNum < 1) { String errMsg = "AMQP channel number per connection (\"num_channel\") must be a positive number!"; - throw new S4RAdapterInvalidParamException(errMsg); + throw new AmqpAdapterInvalidParamException(errMsg); } amqpExchangeType = cfgMap.get("exchangeType"); - if (!S4RAdapterUtil.AMQP_EXCHANGE_TYPES.isValidLabel(amqpExchangeType)) { + if (!AmqpAdapterUtil.AMQP_EXCHANGE_TYPES.isValidLabel(amqpExchangeType)) { String errMsg = "Invalid AMQP exchange type: \"" + amqpExchangeType + "\". " + - "Valid values are: \"" + S4RAdapterUtil.getValidAmqpExchangeTypeList() + "\""; - throw new S4RAdapterInvalidParamException(errMsg); + "Valid values are: \"" + AmqpAdapterUtil.getValidAmqpExchangeTypeList() + "\""; + throw new AmqpAdapterInvalidParamException(errMsg); } if (s4rConnFactory == null) { @@ -222,21 +222,21 @@ public class S4RSpace implements AutoCloseable { String amqpServerHost = cfgMap.get("amqpSrvHost"); if (StringUtils.isBlank(amqpServerHost)) { String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!"; - throw new S4RAdapterInvalidParamException(errMsg); + throw new AmqpAdapterInvalidParamException(errMsg); } s4rConnFactory.setHost(amqpServerHost); String amqpSrvPortCfg = cfgMap.get("amqpSrvPort"); if (StringUtils.isBlank(amqpSrvPortCfg)) { String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!"; - throw new S4RAdapterInvalidParamException(errMsg); + throw new AmqpAdapterInvalidParamException(errMsg); } s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg)); String amqpVirtualHost = cfgMap.get("virtualHost"); if (StringUtils.isBlank(amqpVirtualHost)) { String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!"; - throw new S4RAdapterInvalidParamException(errMsg); + throw new AmqpAdapterInvalidParamException(errMsg); } s4rConnFactory.setVirtualHost(amqpVirtualHost); @@ -278,7 +278,7 @@ public class S4RSpace implements AutoCloseable { } catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) { logger.error("Unable to establish AMQP connections with the following configuration parameters: {}", s4rClientConnInfo.toString()); - throw new S4RAdapterUnexpectedException(ex); + throw new AmqpAdapterUnexpectedException(ex); } } } @@ -296,7 +296,7 @@ public class S4RSpace implements AutoCloseable { } // Pause 5 seconds before closing producers/consumers - S4RAdapterUtil.pauseCurThreadExec(5); + AmqpAdapterUtil.pauseCurThreadExec(5); } catch (Exception ex) { String exp = "Unexpected error when shutting down the S4R adaptor space"; diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpBaseOpDispenser.java similarity index 86% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpBaseOpDispenser.java index 4f99b5db9..5534d4f3e 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpBaseOpDispenser.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpBaseOpDispenser.java @@ -14,13 +14,13 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.dispensers; +package io.nosqlbench.adapter.amqp.dispensers; import com.rabbitmq.client.Channel; -import io.nosqlbench.adapter.s4r.S4RSpace; -import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; -import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; -import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; +import io.nosqlbench.adapter.amqp.AmqpSpace; +import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; +import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp; +import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics; import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; @@ -33,26 +33,26 @@ import java.util.HashMap; import java.util.Map; import java.util.function.LongFunction; -public abstract class AmqpBaseOpDispenser extends BaseOpDispenser { +public abstract class AmqpBaseOpDispenser extends BaseOpDispenser { - private static final Logger logger = LogManager.getLogger("AmqpBaseOpDispenser"); + private static final Logger logger = LogManager.getLogger(AmqpBaseOpDispenser.class); protected final ParsedOp parsedOp; - protected final S4RAdapterMetrics s4rAdapterMetrics; - protected final S4RSpace s4rSpace; + protected final AmqpAdapterMetrics s4rAdapterMetrics; + protected final AmqpSpace s4rSpace; protected final Map s4rConfMap = new HashMap<>(); protected final String exchangeType; protected AmqpBaseOpDispenser(final DriverAdapter adapter, final ParsedOp op, - final S4RSpace s4RSpace) { + final AmqpSpace s4RSpace) { super(adapter, op); parsedOp = op; this.s4rSpace = s4RSpace; - s4rAdapterMetrics = new S4RAdapterMetrics(this, this); + s4rAdapterMetrics = new AmqpAdapterMetrics(this, this); s4rAdapterMetrics.initS4JAdapterInstrumentation(); s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap()); @@ -91,7 +91,7 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser bindingKeyFunc; public AmqpMsgRecvOpDispenser(DriverAdapter adapter, ParsedOp op, - S4RSpace s4rSpace) { + AmqpSpace s4rSpace) { super(adapter, op, s4rSpace); bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); } @@ -95,7 +95,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { long channelSeqNum = getConnChannelSeqNum(cycle); Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); - S4RSpace.AmqpChannelKey amqpConnChannelKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum); + AmqpSpace.AmqpChannelKey amqpConnChannelKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum); return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> { Channel channel = null; @@ -120,10 +120,10 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { @Override - public S4RTimeTrackOp apply(long cycle) { + public AmqpTimeTrackOp apply(long cycle) { Channel channel = getAmqpChannelForReceiver(cycle); if (channel == null) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( String.format( "Failed to get AMQP channel for receiver %s [%d]!", getEffectiveReceiverName(cycle), @@ -150,7 +150,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { } } catch (IOException ex) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( String.format( "Unable to declare the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" for a receiver!", queueName, durable, exclusive, autoDelete, exchangeName) @@ -171,7 +171,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { } } catch (IOException ex) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( String.format( "Unable to bind the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" with binding key \"%s\"!", queueName, durable, exclusive, autoDelete, exchangeName, bindingKey) diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpMsgSendOpDispenser.java similarity index 81% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpMsgSendOpDispenser.java index e08a43ca8..00a0e79dc 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/dispensers/AmqpMsgSendOpDispenser.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpMsgSendOpDispenser.java @@ -14,16 +14,16 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.dispensers; +package io.nosqlbench.adapter.amqp.dispensers; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; -import io.nosqlbench.adapter.s4r.S4RSpace; -import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException; -import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; -import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgSendOp; -import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp; -import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil; +import io.nosqlbench.adapter.amqp.AmqpSpace; +import io.nosqlbench.adapter.amqp.exception.AmqpAdapterInvalidParamException; +import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; +import io.nosqlbench.adapter.amqp.ops.OpTimeTrackAmqpMsgSendOp; +import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp; +import io.nosqlbench.adapter.amqp.util.AmqpAdapterUtil; import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.engine.api.templating.ParsedOp; import org.apache.commons.lang3.BooleanUtils; @@ -38,7 +38,7 @@ import java.util.function.Predicate; public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { - private final static Logger logger = LogManager.getLogger("AmqpMsgSendOpDispenser"); + private final static Logger logger = LogManager.getLogger(AmqpMsgSendOpDispenser.class); private final boolean publisherConfirm ; // Only relevant when 'publisherConfirm' is true @@ -54,7 +54,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { public AmqpMsgSendOpDispenser(DriverAdapter adapter, ParsedOp op, - S4RSpace s4rSpace) { + AmqpSpace s4rSpace) { super(adapter, op, s4rSpace); publisherConfirm = parsedOp @@ -65,20 +65,20 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { confirmMode = parsedOp .getOptionalStaticValue("confirm_mode", String.class) - .orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label); - if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) { - throw new S4RAdapterInvalidParamException("confirm_mode", + .orElse(AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label); + if (! AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) { + throw new AmqpAdapterInvalidParamException("confirm_mode", "The provided value \"" + confirmMode + "\" is not one of following valid values: '" + - S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); + AmqpAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'"); } confirmBatchNum = parsedOp .getOptionalStaticConfig("confirm_batch_num", String.class) .filter(Predicate.not(String::isEmpty)) .map(NumberUtils::toInt) - .orElse(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM); - if (confirmBatchNum < S4RAdapterUtil.AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN) { - confirmBatchNum = S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM; + .orElse(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM); + if (confirmBatchNum < AmqpAdapterUtil.AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN) { + confirmBatchNum = AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM; } routingKeyFunc = lookupOptionalStrOpValueFunc("routing_key", null); @@ -117,7 +117,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { long channelSeqNum = getConnChannelSeqNum(cycle); Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); - S4RSpace.AmqpChannelKey senderKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum); + AmqpSpace.AmqpChannelKey senderKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum); return s4rSpace.getAmqpChannels(senderKey, () -> { Channel channel = null; @@ -140,7 +140,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { if ((channel != null) && publisherConfirm) { channel.confirmSelect(); - if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) { + if (StringUtils.equalsIgnoreCase(confirmMode, AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) { channel.addConfirmListener((sequenceNumber, multiple) -> { // code when message is confirmed if (logger.isTraceEnabled()) { @@ -165,7 +165,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { } } catch (IOException ex) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( "Failed to enable publisher acknowledgement on the AMQP channel (" + channel + ")!"); } @@ -175,15 +175,15 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { } @Override - public S4RTimeTrackOp apply(long cycle) { + public AmqpTimeTrackOp apply(long cycle) { String msgPayload = msgPayloadFunc.apply(cycle); if (StringUtils.isBlank(msgPayload)) { - throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!"); + throw new AmqpAdapterInvalidParamException("Message payload must be specified and can't be empty!"); } Channel channel = getAmqpChannelForSender(cycle); if (channel == null) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( String.format( "Failed to get AMQP channel for sender %s [%d]!", getEffectiveSenderNameByCycle(cycle), diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterInvalidParamException.java similarity index 73% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterInvalidParamException.java index c5b56fa04..b3687a0b0 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterInvalidParamException.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterInvalidParamException.java @@ -15,15 +15,15 @@ * under the License. */ -package io.nosqlbench.adapter.s4r.exception; +package io.nosqlbench.adapter.amqp.exception; -public class S4RAdapterInvalidParamException extends RuntimeException { +public class AmqpAdapterInvalidParamException extends RuntimeException { - public S4RAdapterInvalidParamException(String paramName, String errDesc) { + public AmqpAdapterInvalidParamException(String paramName, String errDesc) { super("Invalid setting for parameter (" + paramName + "): " + errDesc); } - public S4RAdapterInvalidParamException(String fullErrDesc) { + public AmqpAdapterInvalidParamException(String fullErrDesc) { super(fullErrDesc); } } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnexpectedException.java similarity index 75% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnexpectedException.java index a0597fb17..236b90635 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnexpectedException.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnexpectedException.java @@ -15,15 +15,15 @@ * under the License. */ -package io.nosqlbench.adapter.s4r.exception; +package io.nosqlbench.adapter.amqp.exception; -public class S4RAdapterUnexpectedException extends RuntimeException { +public class AmqpAdapterUnexpectedException extends RuntimeException { - public S4RAdapterUnexpectedException(String message) { + public AmqpAdapterUnexpectedException(String message) { super(message); printStackTrace(); } - public S4RAdapterUnexpectedException(Exception e) { + public AmqpAdapterUnexpectedException(Exception e) { super(e); printStackTrace(); } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnsupportedOpException.java similarity index 78% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnsupportedOpException.java index 3e48af652..a9604d1c5 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/exception/S4RAdapterUnsupportedOpException.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/exception/AmqpAdapterUnsupportedOpException.java @@ -14,11 +14,11 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.exception; +package io.nosqlbench.adapter.amqp.exception; -public class S4RAdapterUnsupportedOpException extends RuntimeException { +public class AmqpAdapterUnsupportedOpException extends RuntimeException { - public S4RAdapterUnsupportedOpException(final String kafkaOpType) { + public AmqpAdapterUnsupportedOpException(final String kafkaOpType) { super("Unsupported Kafka adapter operation type: \"" + kafkaOpType + '"'); } } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java similarity index 75% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java index ac822f48a..207828f1d 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/S4RTimeTrackOp.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java @@ -15,32 +15,32 @@ * under the License. */ -package io.nosqlbench.adapter.s4r.ops; +package io.nosqlbench.adapter.amqp.ops; import com.rabbitmq.client.Channel; -import io.nosqlbench.adapter.s4r.S4RSpace; -import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; +import io.nosqlbench.adapter.amqp.AmqpSpace; +import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics; import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp; -public abstract class S4RTimeTrackOp implements CycleOp { - private final S4RAdapterMetrics s4rAdapterMetrics; - protected final S4RSpace s4RSpace; +public abstract class AmqpTimeTrackOp implements CycleOp { + private final AmqpAdapterMetrics s4rAdapterMetrics; + protected final AmqpSpace s4RSpace; protected final Channel channel; protected final String exchangeName; // Maximum time length to execute Kafka operations (e.g. message send or consume) // - when NB execution passes this threshold, it is simply NoOp - // - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes + // - 0 means no maximum time constraint. AmqpTimeTrackOp is always executed until NB execution cycle finishes protected final long maxOpTimeInSec; protected final long activityStartTime; protected Object cycleObj; - public S4RTimeTrackOp(S4RAdapterMetrics s4rAdapterMetrics, - S4RSpace s4RSpace, - Channel channel, - String exchangeName) + public AmqpTimeTrackOp(AmqpAdapterMetrics s4rAdapterMetrics, + AmqpSpace s4RSpace, + Channel channel, + String exchangeName) { this.s4rAdapterMetrics = s4rAdapterMetrics; this.s4RSpace = s4RSpace; diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgRecvOp.java similarity index 80% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgRecvOp.java index c780e3012..24a123e37 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgRecvOp.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgRecvOp.java @@ -14,13 +14,13 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.ops; +package io.nosqlbench.adapter.amqp.ops; import com.rabbitmq.client.*; -import io.nosqlbench.adapter.s4r.S4RSpace; -import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; -import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; +import io.nosqlbench.adapter.amqp.AmqpSpace; +import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; +import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,14 +28,14 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { +public class OpTimeTrackAmqpMsgRecvOp extends AmqpTimeTrackOp { - private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp"); + private final static Logger logger = LogManager.getLogger(OpTimeTrackAmqpMsgRecvOp.class); private final String queueName; - public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics, - S4RSpace s4rSpace, + public OpTimeTrackAmqpMsgRecvOp(AmqpAdapterMetrics s4rAdapterMetrics, + AmqpSpace s4rSpace, Channel channel, String exchangeName, String queueName) { @@ -70,7 +70,7 @@ public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp { channel.basicConsume(queueName, true, receiver); } catch (IOException e) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( "Failed to receive message via the current channel: " + channel); } } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgSendOp.java similarity index 81% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgSendOp.java index b19a035a7..e8bec3453 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/ops/OpTimeTrackAmqpMsgSendOp.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgSendOp.java @@ -14,14 +14,14 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.ops; +package io.nosqlbench.adapter.amqp.ops; import com.rabbitmq.client.Channel; -import io.nosqlbench.adapter.s4r.S4RSpace; -import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException; -import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics; -import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil; +import io.nosqlbench.adapter.amqp.AmqpSpace; +import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException; +import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics; +import io.nosqlbench.adapter.amqp.util.AmqpAdapterUtil; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -32,9 +32,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; -public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { +public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp { - private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgSendOp"); + private final static Logger logger = LogManager.getLogger(OpTimeTrackAmqpMsgSendOp.class); private final String routingKey; private final boolean publishConfirm; @@ -44,8 +44,8 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { private static final ConcurrentHashMap channelPublishConfirmBathTracking = new ConcurrentHashMap<>(); - public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics, - S4RSpace s4rSpace, + public OpTimeTrackAmqpMsgSendOp(AmqpAdapterMetrics s4rAdapterMetrics, + AmqpSpace s4rSpace, Channel channel, String exchangeName, String message, @@ -81,21 +81,21 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { if (publishConfirm) { // Individual publish confirm - if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) { - channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + if (StringUtils.containsIgnoreCase(confirmMode, AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) { + channel.waitForConfirms(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); if (logger.isTraceEnabled()) { logger.debug("Sync ack received for an individual published message: {}", cycle); } } // Batch publish confirm - else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) { + else if (StringUtils.containsIgnoreCase(confirmMode, AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) { int publishConfirmTrackingCnt = channelPublishConfirmBathTracking.getOrDefault(channel, 0); if ( (publishConfirmTrackingCnt > 0) && ( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) || (publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) { - channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); + channel.waitForConfirms(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); if (logger.isTraceEnabled()) { logger.debug("Sync ack received for a batch of published message: {}, {}", cycle, publishConfirmTrackingCnt); @@ -110,16 +110,16 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp { } } catch (IllegalStateException ex) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( "Wait for confirm on a wrong non-confirm channel: " + channel); } catch (InterruptedException | TimeoutException ex) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( "Failed to wait for the ack of the published message (" + msgPayload + ") via the current channel: " + channel); } catch (IOException ex) { - throw new S4RAdapterUnexpectedException( + throw new AmqpAdapterUnexpectedException( "Failed to publish message (" + msgPayload + ") via the current channel: " + channel); } diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java similarity index 89% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java index 09eef0159..54ea29f8c 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterMetrics.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java @@ -14,21 +14,21 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.util; +package io.nosqlbench.adapter.amqp.util; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; -import io.nosqlbench.adapter.s4r.dispensers.AmqpBaseOpDispenser; +import io.nosqlbench.adapter.amqp.dispensers.AmqpBaseOpDispenser; import io.nosqlbench.api.config.NBLabeledElement; import io.nosqlbench.api.config.NBLabels; import io.nosqlbench.api.engine.metrics.ActivityMetrics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class S4RAdapterMetrics { +public class AmqpAdapterMetrics { - private static final Logger logger = LogManager.getLogger("S4JAdapterMetrics"); + private static final Logger logger = LogManager.getLogger(AmqpAdapterMetrics.class); private final NBLabels labels; private Histogram messageSizeHistogram; @@ -49,9 +49,9 @@ public class S4RAdapterMetrics { private Histogram e2eMsgProcLatencyHistogram; private final AmqpBaseOpDispenser s4rBaseOpDispenser; - public S4RAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) { + public AmqpAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) { this.s4rBaseOpDispenser = s4rBaseOpDispenser; - labels=labeledParent.getLabels().and("name", S4RAdapterMetrics.class.getSimpleName()); + labels=labeledParent.getLabels().and("name", AmqpAdapterMetrics.class.getSimpleName()); } public void initS4JAdapterInstrumentation() { diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterUtil.java similarity index 94% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterUtil.java index 8a0a81e4b..5be754b06 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RAdapterUtil.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterUtil.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.util; +package io.nosqlbench.adapter.amqp.util; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -24,8 +24,8 @@ import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; -public class S4RAdapterUtil { - private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class); +public class AmqpAdapterUtil { + private static final Logger logger = LogManager.getLogger(AmqpAdapterUtil.class); public enum AMQP_EXCHANGE_TYPES { DIRECT("direct"), diff --git a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java similarity index 86% rename from adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java rename to adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java index dfc9b0bac..03fae529c 100644 --- a/adapter-s4r/src/main/java/io/nosqlbench/adapter/s4r/util/S4RClientConf.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.nosqlbench.adapter.s4r.util; +package io.nosqlbench.adapter.amqp.util; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.FileBasedConfiguration; @@ -33,14 +33,14 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -public class S4RClientConf { - private static final Logger logger = LogManager.getLogger(S4RClientConf.class); +public class AmqpClientConf { + private static final Logger logger = LogManager.getLogger(AmqpClientConf.class); // https://docs.datastax.com/en/streaming/starlight-for-rabbitmq/2.10.1.x/configuration/configuration.html private final Map s4rConfMap = new HashMap<>(); - public S4RClientConf(final String clientConfFileName) { + public AmqpClientConf(final String clientConfFileName) { ////////////////// // Read related S4R client configuration settings from a file @@ -71,10 +71,10 @@ public class S4RClientConf { this.s4rConfMap.put(confKey, confVal); } } catch (final IOException ioe) { - S4RClientConf.logger.error("Can't read the specified config properties file: {}", fileName); + AmqpClientConf.logger.error("Can't read the specified config properties file: {}", fileName); ioe.printStackTrace(); } catch (final ConfigurationException cex) { - S4RClientConf.logger.error("Error loading configuration items from the specified config properties file: {}:{}", fileName, cex.getMessage()); + AmqpClientConf.logger.error("Error loading configuration items from the specified config properties file: {}:{}", fileName, cex.getMessage()); cex.printStackTrace(); } } diff --git a/adapter-s4r/src/main/resources/s4r.md b/adapter-amqp/src/main/resources/amqp.md similarity index 75% rename from adapter-s4r/src/main/resources/s4r.md rename to adapter-amqp/src/main/resources/amqp.md index 43b406ba8..d15f2410c 100644 --- a/adapter-s4r/src/main/resources/s4r.md +++ b/adapter-amqp/src/main/resources/amqp.md @@ -1,9 +1,9 @@ --- weight: 0 -title: S4R +title: AMQP --- - [1. Overview](#1-overview) -- [2. NB S4R Usage](#2-nb-s4r-usage) +- [2. NB AMQP Usage](#2-nb-amqp-usage) - [2.1. Workload Definition](#21-workload-definition) - [2.1.1. Named Scenarios](#211-named-scenarios) - [2.2. CLI parameters](#22-cli-parameters) @@ -15,7 +15,7 @@ title: S4R # 1. Overview -The NB S4R adapter allows sending messages to or receiving messages from +The NB AMQP adapter allows sending messages to or receiving messages from * an AMQP 0-9-1 based server (e.g. RabbitMQ), or * a Pulsar cluster with [S4R](https://github.com/datastax/starlight-for-rabbitmq) AMQP (0-9-1) Protocol handler for Pulsar. @@ -30,47 +30,47 @@ At high level, this adapter supports the following AMQP 0-9-1 functionalities * Supports message-receive based on binding keys * Receiving messages from AMQP queues with async. consumer acks -# 2. NB S4R Usage +# 2. NB AMQP Usage ## 2.1. Workload Definition -There are two main types of workloads supported by the S4R adapter: -* Message sender workload (see [s4r_msg_sender.yaml](scenarios/s4r_msg_sender.yaml)) -* Message receiver workload (see [s4r_msg_receiver.yaml](scenarios/s4r_msg_receiver.yaml)) +There are two main types of workloads supported by this adapter: +* Message sender workload (see [amqp_msg_sender.yaml](scenarios/amqp_msg_sender.yaml)) +* Message receiver workload (see [amqp_msg_receiver.yaml](scenarios/amqp_msg_receiver.yaml)) Below are examples of running the message sender and receiver workloads separately. ```bash -$ run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \ +$ run driver=amqp -vv cycles=200 strict_msg_error_handling=0 \ threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \ - workload=/path/to/s4r_msg_sender.yaml \ - config=/path/to/s4r_config.properties + workload=/path/to/amqp_msg_sender.yaml \ + config=/path/to/amqp_config.properties ``` ```bash -$ run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \ +$ run driver=amqp -vv cycles=200 strict_msg_error_handling=0 \ threads=8 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 \ - workload=/path/to/s4r_msg_receiver.yaml \ - config=/path/to/s4r_config.properties + workload=/path/to/amqp_msg_receiver.yaml \ + config=/path/to/amqp_config.properties ``` ### 2.1.1. Named Scenarios For workload execution convenience, NB engine has the concept of **named scenario** ([doc](https://docs.nosqlbench.io/workloads-101/11-named-scenarios/)). -For NB S4R adapter, the following yaml file is used to define the named scenarios: [nbs4r_msg_proc_named.yaml](scenarios/nbs4r_msg_proc_named.yaml) +For NB AMQP adapter, the following yaml file is used to define the named scenarios: [nbamqp_msg_proc_named.yaml](scenarios/nbamqp_msg_proc_named.yaml) -The CLI command to execute the S4R named scenarios is as simple as below: +The CLI command to execute the named scenarios is as simple as below: ```bash # for message sender workload -$ nbs4r_msg_proc_named msg_send +$ nbamqp_msg_proc_named msg_send # for message receiver workload -$ nbs4r_msg_proc_named msg_recv +$ nbamqp_msg_proc_named msg_recv ``` ## 2.2. CLI parameters -The following CLI parameters are unique to the S4R adapter: +The following CLI parameters are unique to this adapter: * `num_conn`: the number of AMQP connections to create * `num_channel`: the number of AMQP channels to create for each connection @@ -84,7 +84,7 @@ The following CLI parameters are unique to the S4R adapter: ### 2.3.1. Global Properties File -A global S4R properties file can be specified via the `config` CLI parameter. It includes the following required properties: +A global AMQP properties file can be specified via the `config` CLI parameter. It includes the following required properties: * `amqpSrvHost`: AMQP server host (e.g. An Astra Streaming cluster with S4R enabled) * `amqpSrvPort`: AMQP server port (for S4R enabled Astra Streaming, it is 5671) * `virtualHost`: AMQP server virtual host (for S4R enabled Astra Streaming, it is "/rabbitmq") @@ -93,7 +93,7 @@ A global S4R properties file can be specified via the `config` CLI parameter. It * `useTls`: whether to use TLS (for S4R enabled Astra Streaming, it is true) * `exchangeType`: AMQP exchange type (e.g. `direct`, `fanout`, `topic`, or `headers`) -An example of this file can be found from: [s4r_config.properties](conf/s4r_config.properties) +An example of this file can be found from: [amqp_config.properties](conf/amqp_config.properties) ### 2.3.2. Scenario Document Level Properties diff --git a/adapter-s4r/src/main/resources/conf/s4r_config.properties b/adapter-amqp/src/main/resources/conf/amqp_config.properties similarity index 100% rename from adapter-s4r/src/main/resources/conf/s4r_config.properties rename to adapter-amqp/src/main/resources/conf/amqp_config.properties diff --git a/adapter-s4r/src/main/resources/scenarios/s4r_msg_receiver.yaml b/adapter-amqp/src/main/resources/scenarios/amqp_msg_receiver.yaml similarity index 100% rename from adapter-s4r/src/main/resources/scenarios/s4r_msg_receiver.yaml rename to adapter-amqp/src/main/resources/scenarios/amqp_msg_receiver.yaml diff --git a/adapter-s4r/src/main/resources/scenarios/s4r_msg_sender.yaml b/adapter-amqp/src/main/resources/scenarios/amqp_msg_sender.yaml similarity index 100% rename from adapter-s4r/src/main/resources/scenarios/s4r_msg_sender.yaml rename to adapter-amqp/src/main/resources/scenarios/amqp_msg_sender.yaml diff --git a/adapter-s4r/src/main/resources/scenarios/csv/binding_keys.csv b/adapter-amqp/src/main/resources/scenarios/csv/binding_keys.csv similarity index 100% rename from adapter-s4r/src/main/resources/scenarios/csv/binding_keys.csv rename to adapter-amqp/src/main/resources/scenarios/csv/binding_keys.csv diff --git a/adapter-s4r/src/main/resources/scenarios/csv/routing_keys.csv b/adapter-amqp/src/main/resources/scenarios/csv/routing_keys.csv similarity index 100% rename from adapter-s4r/src/main/resources/scenarios/csv/routing_keys.csv rename to adapter-amqp/src/main/resources/scenarios/csv/routing_keys.csv diff --git a/adapter-s4r/src/main/resources/scenarios/nbs4r_msg_proc_named.yaml b/adapter-amqp/src/main/resources/scenarios/nbamqp_msg_proc_named.yaml similarity index 82% rename from adapter-s4r/src/main/resources/scenarios/nbs4r_msg_proc_named.yaml rename to adapter-amqp/src/main/resources/scenarios/nbamqp_msg_proc_named.yaml index bb88b0fec..d20daddcf 100644 --- a/adapter-s4r/src/main/resources/scenarios/nbs4r_msg_proc_named.yaml +++ b/adapter-amqp/src/main/resources/scenarios/nbamqp_msg_proc_named.yaml @@ -1,6 +1,6 @@ scenarios: - msg_send: run driver=s4r cycles=200 threads=16 num_conn=2 num_channel=2 num_exchange=2 num_msg_clnt=2 config=../conf/s4r_config.properties - msg_recv: run driver=s4r cycles=200 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 config=../conf/s4r_config.properties + msg_send: run driver=amqp cycles=200 threads=16 num_conn=2 num_channel=2 num_exchange=2 num_msg_clnt=2 config=../conf/amqp_config.properties + msg_recv: run driver=amqp cycles=200 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 config=../conf/amqp_config.properties bindings: mytext_val: AlphaNumericString(100) diff --git a/nb5/pom.xml b/nb5/pom.xml index 558f7acc9..abbe49a01 100644 --- a/nb5/pom.xml +++ b/nb5/pom.xml @@ -115,7 +115,7 @@ io.nosqlbench - adapter-s4r + adapter-amqp ${revision} diff --git a/pom.xml b/pom.xml index 8148e3104..a79b86a73 100644 --- a/pom.xml +++ b/pom.xml @@ -65,7 +65,7 @@ adapter-pulsar adapter-s4j adapter-kafka - adapter-s4r + adapter-amqp adapter-jdbc @@ -111,7 +111,7 @@ adapter-pulsar adapter-s4j adapter-kafka - adapter-s4r + adapter-amqp adapter-jdbc adapter-pinecone