From e3a8640192ce44c30c85ed5fac6e1c62e33854d4 Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Fri, 30 Jun 2023 10:49:05 -0500 Subject: [PATCH] Rename `adapter-s4r` to `adapter-amqp` Part 2: rename variables and string literals from s4r to amqp --- .../nosqlbench/adapter/amqp/AmqpOpMapper.java | 6 +-- .../io/nosqlbench/adapter/amqp/AmqpSpace.java | 42 +++++++++---------- .../amqp/dispensers/AmqpBaseOpDispenser.java | 32 +++++++------- .../dispensers/AmqpMsgRecvOpDispenser.java | 32 +++++++------- .../dispensers/AmqpMsgSendOpDispenser.java | 22 +++++----- .../adapter/amqp/ops/AmqpTimeTrackOp.java | 16 +++---- .../amqp/ops/OpTimeTrackAmqpMsgRecvOp.java | 6 +-- .../amqp/ops/OpTimeTrackAmqpMsgSendOp.java | 8 ++-- .../adapter/amqp/util/AmqpAdapterMetrics.java | 20 ++++----- .../adapter/amqp/util/AmqpClientConf.java | 10 ++--- 10 files changed, 97 insertions(+), 97 deletions(-) diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java index 2b4d0994c..f6b3b4a71 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpOpMapper.java @@ -46,7 +46,7 @@ public class AmqpOpMapper implements OpMapper { @Override public OpDispenser apply(ParsedOp op) { String spaceName = op.getStaticConfigOr("space", "default"); - AmqpSpace s4RSpace = spaceCache.get(spaceName); + AmqpSpace amqpSpace = spaceCache.get(spaceName); /* * If the user provides a body element, then they want to provide the JSON or @@ -61,9 +61,9 @@ public class AmqpOpMapper implements OpMapper { return switch (opType.enumId) { case AmqpMsgSender -> - new AmqpMsgSendOpDispenser(adapter, op, s4RSpace); + new AmqpMsgSendOpDispenser(adapter, op, amqpSpace); case AmqpMsgReceiver -> - new AmqpMsgRecvOpDispenser(adapter, op, s4RSpace); + new AmqpMsgRecvOpDispenser(adapter, op, amqpSpace); }; } } diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java index 3e5ca537c..275e3a285 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/AmqpSpace.java @@ -51,7 +51,7 @@ public class AmqpSpace implements AutoCloseable { private final String spaceName; private final NBConfiguration cfg; - private final AmqpClientConf s4rClientConf; + private final AmqpClientConf amqpClientConf; /////////////////////////////////////////////////////////////////// // NOTE: in this driver, we assume: @@ -91,7 +91,7 @@ public class AmqpSpace implements AutoCloseable { private final AtomicBoolean beingShutdown = new AtomicBoolean(false); - private ConnectionFactory s4rConnFactory; + private ConnectionFactory amqpConnFactory; // Default to "direct" type private String amqpExchangeType = AmqpAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label; @@ -108,7 +108,7 @@ public class AmqpSpace implements AutoCloseable { // - No: pause the current thread that received the error message for 1 second and then continue processing private final boolean strictMsgErrorHandling; - // Maximum time length to execute S4R operations (e.g. message send or consume) + // Maximum time length to execute AMQP operations (e.g. message send or consume) // - when NB execution passes this threshold, it is simply NoOp // - 0 means no maximum time constraint. AmqpTimeTrackOp is always executed until NB execution cycle finishes private final long maxOpTimeInSec; @@ -121,8 +121,8 @@ public class AmqpSpace implements AutoCloseable { this.spaceName = spaceName; this.cfg = cfg; - String s4rClientConfFileName = cfg.get("config"); - this.s4rClientConf = new AmqpClientConf(s4rClientConfFileName); + String amqpClientConfFileName = cfg.get("config"); + this.amqpClientConf = new AmqpClientConf(amqpClientConfFileName); this.amqpConnNum = NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1")); this.amqpConnChannelNum = @@ -139,7 +139,7 @@ public class AmqpSpace implements AutoCloseable { BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false")); this.activityStartTimeMills = System.currentTimeMillis(); - this.initializeSpace(s4rClientConf); + this.initializeSpace(amqpClientConf); } @Override @@ -150,7 +150,7 @@ public class AmqpSpace implements AutoCloseable { public static NBConfigModel getConfigModel() { return ConfigModel.of(AmqpSpace.class) .add(Param.defaultTo("config", "config.properties") - .setDescription("S4R client connection configuration property file.")) + .setDescription("AMQP client connection configuration property file.")) .add(Param.defaultTo("num_conn", 1) .setDescription("Maximum number of AMQP connections.")) .add(Param.defaultTo("num_channel", 1) @@ -178,7 +178,7 @@ public class AmqpSpace implements AutoCloseable { public long getActivityStartTimeMills() { return this.activityStartTimeMills; } public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; } - public AmqpClientConf getS4rClientConf() { return s4rClientConf; } + public AmqpClientConf getAmqpClientConf() { return amqpClientConf; } public String getAmqpExchangeType() { return amqpExchangeType; } public int getAmqpConnNum() { return this.amqpConnNum; } @@ -195,8 +195,8 @@ public class AmqpSpace implements AutoCloseable { public long getTotalThreadNum() { return totalThreadNum; } public void setTotalThreadNum(long threadNum) { totalThreadNum = threadNum; } - public void initializeSpace(AmqpClientConf s4rClientConnInfo) { - Map cfgMap = s4rClientConnInfo.getS4rConfMap(); + public void initializeSpace(AmqpClientConf amqpClientConf) { + Map cfgMap = amqpClientConf.getConfigMap(); if (amqpConnNum < 1) { String errMsg = "AMQP connection number (\"num_conn\") must be a positive number!"; @@ -215,30 +215,30 @@ public class AmqpSpace implements AutoCloseable { throw new AmqpAdapterInvalidParamException(errMsg); } - if (s4rConnFactory == null) { + if (amqpConnFactory == null) { try { - s4rConnFactory = new ConnectionFactory(); + amqpConnFactory = new ConnectionFactory(); String amqpServerHost = cfgMap.get("amqpSrvHost"); if (StringUtils.isBlank(amqpServerHost)) { String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!"; throw new AmqpAdapterInvalidParamException(errMsg); } - s4rConnFactory.setHost(amqpServerHost); + amqpConnFactory.setHost(amqpServerHost); String amqpSrvPortCfg = cfgMap.get("amqpSrvPort"); if (StringUtils.isBlank(amqpSrvPortCfg)) { String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!"; throw new AmqpAdapterInvalidParamException(errMsg); } - s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg)); + amqpConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg)); String amqpVirtualHost = cfgMap.get("virtualHost"); if (StringUtils.isBlank(amqpVirtualHost)) { String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!"; throw new AmqpAdapterInvalidParamException(errMsg); } - s4rConnFactory.setVirtualHost(amqpVirtualHost); + amqpConnFactory.setVirtualHost(amqpVirtualHost); String userNameCfg = cfgMap.get("amqpUser"); @@ -253,19 +253,19 @@ public class AmqpSpace implements AutoCloseable { if (StringUtils.isNotBlank(passWord)) { if (StringUtils.isBlank(userNameCfg)) { - s4rConnFactory.setUsername(""); + amqpConnFactory.setUsername(""); } - s4rConnFactory.setPassword(passWord); + amqpConnFactory.setPassword(passWord); } } String useTlsCfg = cfgMap.get("useTls"); if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) { - s4rConnFactory.useSslProtocol(); + amqpConnFactory.useSslProtocol(); } for (int i = 0; i < getAmqpConnNum(); i++) { - Connection connection = s4rConnFactory.newConnection(); + Connection connection = amqpConnFactory.newConnection(); amqpConnections.put((long) i, connection); if (logger.isDebugEnabled()) { @@ -277,7 +277,7 @@ public class AmqpSpace implements AutoCloseable { } } catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) { logger.error("Unable to establish AMQP connections with the following configuration parameters: {}", - s4rClientConnInfo.toString()); + amqpClientConf.toString()); throw new AmqpAdapterUnexpectedException(ex); } } @@ -299,7 +299,7 @@ public class AmqpSpace implements AutoCloseable { AmqpAdapterUtil.pauseCurThreadExec(5); } catch (Exception ex) { - String exp = "Unexpected error when shutting down the S4R adaptor space"; + String exp = "Unexpected error when shutting down the AMQP adaptor space"; logger.error(exp, ex); } } diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpBaseOpDispenser.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpBaseOpDispenser.java index 5534d4f3e..4edfbc18b 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpBaseOpDispenser.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpBaseOpDispenser.java @@ -38,29 +38,29 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser s4rConfMap = new HashMap<>(); + protected final Map amqpConfMap = new HashMap<>(); protected final String exchangeType; protected AmqpBaseOpDispenser(final DriverAdapter adapter, final ParsedOp op, - final AmqpSpace s4RSpace) { + final AmqpSpace amqpSpace) { super(adapter, op); parsedOp = op; - this.s4rSpace = s4RSpace; + this.amqpSpace = amqpSpace; - s4rAdapterMetrics = new AmqpAdapterMetrics(this, this); - s4rAdapterMetrics.initS4JAdapterInstrumentation(); + amqpAdapterMetrics = new AmqpAdapterMetrics(this, this); + amqpAdapterMetrics.initS4JAdapterInstrumentation(); - s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap()); + amqpConfMap.putAll(amqpSpace.getAmqpClientConf().getConfigMap()); - this.exchangeType = s4RSpace.getAmqpExchangeType(); + this.exchangeType = amqpSpace.getAmqpExchangeType(); - s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class))); - s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class))); + this.amqpSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class))); + this.amqpSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class))); } protected LongFunction lookupMandtoryStrOpValueFunc(String paramName) { @@ -96,17 +96,17 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser bindingKeyFunc; public AmqpMsgRecvOpDispenser(DriverAdapter adapter, ParsedOp op, - AmqpSpace s4rSpace) { - super(adapter, op, s4rSpace); + AmqpSpace amqpSpace) { + super(adapter, op, amqpSpace); bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); } private long getExchangeQueueSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * - s4rSpace.getAmqpConnChannelNum() * - s4rSpace.getAmqpChannelExchangeNum()) - ) % s4rSpace.getAmqpExchangeQueueNum(); + return (cycle / ((long) amqpSpace.getAmqpConnNum() * + amqpSpace.getAmqpConnChannelNum() * + amqpSpace.getAmqpChannelExchangeNum()) + ) % amqpSpace.getAmqpExchangeQueueNum(); } private long getQueueReceiverSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * - s4rSpace.getAmqpConnChannelNum() * - s4rSpace.getAmqpChannelExchangeNum() * - s4rSpace.getAmqpExchangeQueueNum()) - ) % s4rSpace.getAmqpMsgClntNum(); + return (cycle / ((long) amqpSpace.getAmqpConnNum() * + amqpSpace.getAmqpConnChannelNum() * + amqpSpace.getAmqpChannelExchangeNum() * + amqpSpace.getAmqpExchangeQueueNum()) + ) % amqpSpace.getAmqpMsgClntNum(); } private String getEffectiveQueueNameByCycle(long cycle) { @@ -94,10 +94,10 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { long connSeqNum = getConnSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle); - Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); + Connection amqpConnection = amqpSpace.getAmqpConnection(connSeqNum); AmqpSpace.AmqpChannelKey amqpConnChannelKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum); - return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> { + return amqpSpace.getAmqpChannels(amqpConnChannelKey, () -> { Channel channel = null; try { @@ -131,7 +131,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { } String exchangeName = getEffectiveExchangeNameByCycle(cycle); - declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); + declareExchange(channel, exchangeName, amqpSpace.getAmqpExchangeType()); boolean durable = true; boolean exclusive = true; @@ -179,8 +179,8 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser { } return new OpTimeTrackAmqpMsgRecvOp( - s4rAdapterMetrics, - s4rSpace, + amqpAdapterMetrics, + amqpSpace, channel, exchangeName, queueName); diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpMsgSendOpDispenser.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpMsgSendOpDispenser.java index 00a0e79dc..8508ea570 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpMsgSendOpDispenser.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/dispensers/AmqpMsgSendOpDispenser.java @@ -54,8 +54,8 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { public AmqpMsgSendOpDispenser(DriverAdapter adapter, ParsedOp op, - AmqpSpace s4rSpace) { - super(adapter, op, s4rSpace); + AmqpSpace amqpSpace) { + super(adapter, op, amqpSpace); publisherConfirm = parsedOp .getOptionalStaticConfig("publisher_confirm", String.class) @@ -87,10 +87,10 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { } private long getExchangeSenderSeqNum(long cycle) { - return (cycle / ((long) s4rSpace.getAmqpConnNum() * - s4rSpace.getAmqpConnChannelNum() * - s4rSpace.getAmqpChannelExchangeNum()) - ) % s4rSpace.getAmqpMsgClntNum(); + return (cycle / ((long) amqpSpace.getAmqpConnNum() * + amqpSpace.getAmqpConnChannelNum() * + amqpSpace.getAmqpChannelExchangeNum()) + ) % amqpSpace.getAmqpMsgClntNum(); } private String getEffectiveSenderNameByCycle(long cycle) { @@ -116,10 +116,10 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { long connSeqNum = getConnSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle); - Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); + Connection amqpConnection = amqpSpace.getAmqpConnection(connSeqNum); AmqpSpace.AmqpChannelKey senderKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum); - return s4rSpace.getAmqpChannels(senderKey, () -> { + return amqpSpace.getAmqpChannels(senderKey, () -> { Channel channel = null; try { @@ -191,11 +191,11 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser { } String exchangeName = getEffectiveExchangeNameByCycle(cycle); - declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); + declareExchange(channel, exchangeName, amqpSpace.getAmqpExchangeType()); return new OpTimeTrackAmqpMsgSendOp( - s4rAdapterMetrics, - s4rSpace, + amqpAdapterMetrics, + amqpSpace, channel, exchangeName, msgPayload, diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java index 207828f1d..a35ef777e 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/AmqpTimeTrackOp.java @@ -23,8 +23,8 @@ import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics; import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp; public abstract class AmqpTimeTrackOp implements CycleOp { - private final AmqpAdapterMetrics s4rAdapterMetrics; - protected final AmqpSpace s4RSpace; + private final AmqpAdapterMetrics amqpAdapterMetrics; + protected final AmqpSpace amqpSpace; protected final Channel channel; protected final String exchangeName; @@ -37,17 +37,17 @@ public abstract class AmqpTimeTrackOp implements CycleOp { protected Object cycleObj; - public AmqpTimeTrackOp(AmqpAdapterMetrics s4rAdapterMetrics, - AmqpSpace s4RSpace, + public AmqpTimeTrackOp(AmqpAdapterMetrics amqpAdapterMetrics, + AmqpSpace amqpSpace, Channel channel, String exchangeName) { - this.s4rAdapterMetrics = s4rAdapterMetrics; - this.s4RSpace = s4RSpace; + this.amqpAdapterMetrics = amqpAdapterMetrics; + this.amqpSpace = amqpSpace; this.channel = channel; this.exchangeName = exchangeName; - this.activityStartTime = s4RSpace.getActivityStartTimeMills(); - this.maxOpTimeInSec = s4RSpace.getMaxOpTimeInSec(); + this.activityStartTime = amqpSpace.getActivityStartTimeMills(); + this.maxOpTimeInSec = amqpSpace.getMaxOpTimeInSec(); } @Override diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgRecvOp.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgRecvOp.java index 24a123e37..27571749e 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgRecvOp.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgRecvOp.java @@ -34,12 +34,12 @@ public class OpTimeTrackAmqpMsgRecvOp extends AmqpTimeTrackOp { private final String queueName; - public OpTimeTrackAmqpMsgRecvOp(AmqpAdapterMetrics s4rAdapterMetrics, - AmqpSpace s4rSpace, + public OpTimeTrackAmqpMsgRecvOp(AmqpAdapterMetrics amqpAdapterMetrics, + AmqpSpace amqpSpace, Channel channel, String exchangeName, String queueName) { - super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); + super(amqpAdapterMetrics, amqpSpace, channel, exchangeName); this.queueName = queueName; } diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgSendOp.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgSendOp.java index e8bec3453..c428b71bb 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgSendOp.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/ops/OpTimeTrackAmqpMsgSendOp.java @@ -44,8 +44,8 @@ public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp { private static final ConcurrentHashMap channelPublishConfirmBathTracking = new ConcurrentHashMap<>(); - public OpTimeTrackAmqpMsgSendOp(AmqpAdapterMetrics s4rAdapterMetrics, - AmqpSpace s4rSpace, + public OpTimeTrackAmqpMsgSendOp(AmqpAdapterMetrics amqpAdapterMetrics, + AmqpSpace amqpSpace, Channel channel, String exchangeName, String message, @@ -53,7 +53,7 @@ public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp { boolean publishConfirm, String confirmMode, int confirmBatchNum) { - super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); + super(amqpAdapterMetrics, amqpSpace, channel, exchangeName); this.cycleObj = message; this.routingKey = routingKey; this.publishConfirm = publishConfirm; @@ -94,7 +94,7 @@ public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp { if ( (publishConfirmTrackingCnt > 0) && ( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) || - (publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) { + (publishConfirmTrackingCnt == (amqpSpace.getTotalCycleNum() - 1)) ) ) { channel.waitForConfirms(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); if (logger.isTraceEnabled()) { logger.debug("Sync ack received for a batch of published message: {}, {}", diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java index 54ea29f8c..8c553baff 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpAdapterMetrics.java @@ -47,38 +47,38 @@ public class AmqpAdapterMetrics { // end-to-end latency private Histogram e2eMsgProcLatencyHistogram; - private final AmqpBaseOpDispenser s4rBaseOpDispenser; + private final AmqpBaseOpDispenser amqpBaseOpDispenser; - public AmqpAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) { - this.s4rBaseOpDispenser = s4rBaseOpDispenser; + public AmqpAdapterMetrics(final AmqpBaseOpDispenser amqpBaseOpDispenser, final NBLabeledElement labeledParent) { + this.amqpBaseOpDispenser = amqpBaseOpDispenser; labels=labeledParent.getLabels().and("name", AmqpAdapterMetrics.class.getSimpleName()); } public void initS4JAdapterInstrumentation() { // Histogram metrics messageSizeHistogram = - ActivityMetrics.histogram(this.s4rBaseOpDispenser, + ActivityMetrics.histogram(this.amqpBaseOpDispenser, "message_size", ActivityMetrics.DEFAULT_HDRDIGITS); // Timer metrics bindTimer = - ActivityMetrics.timer(this.s4rBaseOpDispenser, + ActivityMetrics.timer(this.amqpBaseOpDispenser, "bind", ActivityMetrics.DEFAULT_HDRDIGITS); executeTimer = - ActivityMetrics.timer(this.s4rBaseOpDispenser, + ActivityMetrics.timer(this.amqpBaseOpDispenser, "execute", ActivityMetrics.DEFAULT_HDRDIGITS); // End-to-end metrics // Latency e2eMsgProcLatencyHistogram = - ActivityMetrics.histogram(this.s4rBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS); + ActivityMetrics.histogram(this.amqpBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS); // Error metrics msgErrOutOfSeqCounter = - ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_oos"); + ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_oos"); msgErrLossCounter = - ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_loss"); + ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_loss"); msgErrDuplicateCounter = - ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_dup"); + ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_dup"); } public Timer getBindTimer() { return bindTimer; } diff --git a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java index 03fae529c..798f597b6 100644 --- a/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java +++ b/adapter-amqp/src/main/java/io/nosqlbench/adapter/amqp/util/AmqpClientConf.java @@ -37,13 +37,13 @@ public class AmqpClientConf { private static final Logger logger = LogManager.getLogger(AmqpClientConf.class); // https://docs.datastax.com/en/streaming/starlight-for-rabbitmq/2.10.1.x/configuration/configuration.html - private final Map s4rConfMap = new HashMap<>(); + private final Map configMap = new HashMap<>(); public AmqpClientConf(final String clientConfFileName) { ////////////////// - // Read related S4R client configuration settings from a file + // Read related AMQP client configuration settings from a file this.readRawConfFromFile(clientConfFileName); } @@ -68,7 +68,7 @@ public class AmqpClientConf { // Get client connection specific configuration settings, removing "topic." prefix if (!StringUtils.isBlank(confVal)) - this.s4rConfMap.put(confKey, confVal); + this.configMap.put(confKey, confVal); } } catch (final IOException ioe) { AmqpClientConf.logger.error("Can't read the specified config properties file: {}", fileName); @@ -79,11 +79,11 @@ public class AmqpClientConf { } } - public Map getS4rConfMap() { return this.s4rConfMap; } + public Map getConfigMap() { return this.configMap; } public String toString() { return new ToStringBuilder(this). - append("s4rConfMap", this.s4rConfMap). + append("configMap", this.configMap). toString(); } }