Rename adapter-s4r to adapter-amqp Part 2: rename variables and string literals from s4r to amqp

This commit is contained in:
yabinmeng 2023-06-30 10:49:05 -05:00
parent 9382147a66
commit e3a8640192
10 changed files with 97 additions and 97 deletions

View File

@ -46,7 +46,7 @@ public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp> {
@Override @Override
public OpDispenser<? extends AmqpTimeTrackOp> apply(ParsedOp op) { public OpDispenser<? extends AmqpTimeTrackOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default"); String spaceName = op.getStaticConfigOr("space", "default");
AmqpSpace s4RSpace = spaceCache.get(spaceName); AmqpSpace amqpSpace = spaceCache.get(spaceName);
/* /*
* If the user provides a body element, then they want to provide the JSON or * If the user provides a body element, then they want to provide the JSON or
@ -61,9 +61,9 @@ public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp> {
return switch (opType.enumId) { return switch (opType.enumId) {
case AmqpMsgSender -> case AmqpMsgSender ->
new AmqpMsgSendOpDispenser(adapter, op, s4RSpace); new AmqpMsgSendOpDispenser(adapter, op, amqpSpace);
case AmqpMsgReceiver -> case AmqpMsgReceiver ->
new AmqpMsgRecvOpDispenser(adapter, op, s4RSpace); new AmqpMsgRecvOpDispenser(adapter, op, amqpSpace);
}; };
} }
} }

View File

@ -51,7 +51,7 @@ public class AmqpSpace implements AutoCloseable {
private final String spaceName; private final String spaceName;
private final NBConfiguration cfg; private final NBConfiguration cfg;
private final AmqpClientConf s4rClientConf; private final AmqpClientConf amqpClientConf;
/////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////
// NOTE: in this driver, we assume: // NOTE: in this driver, we assume:
@ -91,7 +91,7 @@ public class AmqpSpace implements AutoCloseable {
private final AtomicBoolean beingShutdown = new AtomicBoolean(false); private final AtomicBoolean beingShutdown = new AtomicBoolean(false);
private ConnectionFactory s4rConnFactory; private ConnectionFactory amqpConnFactory;
// Default to "direct" type // Default to "direct" type
private String amqpExchangeType = AmqpAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label; private String amqpExchangeType = AmqpAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label;
@ -108,7 +108,7 @@ public class AmqpSpace implements AutoCloseable {
// - No: pause the current thread that received the error message for 1 second and then continue processing // - No: pause the current thread that received the error message for 1 second and then continue processing
private final boolean strictMsgErrorHandling; private final boolean strictMsgErrorHandling;
// Maximum time length to execute S4R operations (e.g. message send or consume) // Maximum time length to execute AMQP operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp // - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. AmqpTimeTrackOp is always executed until NB execution cycle finishes // - 0 means no maximum time constraint. AmqpTimeTrackOp is always executed until NB execution cycle finishes
private final long maxOpTimeInSec; private final long maxOpTimeInSec;
@ -121,8 +121,8 @@ public class AmqpSpace implements AutoCloseable {
this.spaceName = spaceName; this.spaceName = spaceName;
this.cfg = cfg; this.cfg = cfg;
String s4rClientConfFileName = cfg.get("config"); String amqpClientConfFileName = cfg.get("config");
this.s4rClientConf = new AmqpClientConf(s4rClientConfFileName); this.amqpClientConf = new AmqpClientConf(amqpClientConfFileName);
this.amqpConnNum = this.amqpConnNum =
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1")); NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
this.amqpConnChannelNum = this.amqpConnChannelNum =
@ -139,7 +139,7 @@ public class AmqpSpace implements AutoCloseable {
BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false")); BooleanUtils.toBoolean(cfg.getOptional("strict_msg_error_handling").orElse("false"));
this.activityStartTimeMills = System.currentTimeMillis(); this.activityStartTimeMills = System.currentTimeMillis();
this.initializeSpace(s4rClientConf); this.initializeSpace(amqpClientConf);
} }
@Override @Override
@ -150,7 +150,7 @@ public class AmqpSpace implements AutoCloseable {
public static NBConfigModel getConfigModel() { public static NBConfigModel getConfigModel() {
return ConfigModel.of(AmqpSpace.class) return ConfigModel.of(AmqpSpace.class)
.add(Param.defaultTo("config", "config.properties") .add(Param.defaultTo("config", "config.properties")
.setDescription("S4R client connection configuration property file.")) .setDescription("AMQP client connection configuration property file."))
.add(Param.defaultTo("num_conn", 1) .add(Param.defaultTo("num_conn", 1)
.setDescription("Maximum number of AMQP connections.")) .setDescription("Maximum number of AMQP connections."))
.add(Param.defaultTo("num_channel", 1) .add(Param.defaultTo("num_channel", 1)
@ -178,7 +178,7 @@ public class AmqpSpace implements AutoCloseable {
public long getActivityStartTimeMills() { return this.activityStartTimeMills; } public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; } public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; }
public AmqpClientConf getS4rClientConf() { return s4rClientConf; } public AmqpClientConf getAmqpClientConf() { return amqpClientConf; }
public String getAmqpExchangeType() { return amqpExchangeType; } public String getAmqpExchangeType() { return amqpExchangeType; }
public int getAmqpConnNum() { return this.amqpConnNum; } public int getAmqpConnNum() { return this.amqpConnNum; }
@ -195,8 +195,8 @@ public class AmqpSpace implements AutoCloseable {
public long getTotalThreadNum() { return totalThreadNum; } public long getTotalThreadNum() { return totalThreadNum; }
public void setTotalThreadNum(long threadNum) { totalThreadNum = threadNum; } public void setTotalThreadNum(long threadNum) { totalThreadNum = threadNum; }
public void initializeSpace(AmqpClientConf s4rClientConnInfo) { public void initializeSpace(AmqpClientConf amqpClientConf) {
Map<String, String> cfgMap = s4rClientConnInfo.getS4rConfMap(); Map<String, String> cfgMap = amqpClientConf.getConfigMap();
if (amqpConnNum < 1) { if (amqpConnNum < 1) {
String errMsg = "AMQP connection number (\"num_conn\") must be a positive number!"; String errMsg = "AMQP connection number (\"num_conn\") must be a positive number!";
@ -215,30 +215,30 @@ public class AmqpSpace implements AutoCloseable {
throw new AmqpAdapterInvalidParamException(errMsg); throw new AmqpAdapterInvalidParamException(errMsg);
} }
if (s4rConnFactory == null) { if (amqpConnFactory == null) {
try { try {
s4rConnFactory = new ConnectionFactory(); amqpConnFactory = new ConnectionFactory();
String amqpServerHost = cfgMap.get("amqpSrvHost"); String amqpServerHost = cfgMap.get("amqpSrvHost");
if (StringUtils.isBlank(amqpServerHost)) { if (StringUtils.isBlank(amqpServerHost)) {
String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!"; String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!";
throw new AmqpAdapterInvalidParamException(errMsg); throw new AmqpAdapterInvalidParamException(errMsg);
} }
s4rConnFactory.setHost(amqpServerHost); amqpConnFactory.setHost(amqpServerHost);
String amqpSrvPortCfg = cfgMap.get("amqpSrvPort"); String amqpSrvPortCfg = cfgMap.get("amqpSrvPort");
if (StringUtils.isBlank(amqpSrvPortCfg)) { if (StringUtils.isBlank(amqpSrvPortCfg)) {
String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!"; String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!";
throw new AmqpAdapterInvalidParamException(errMsg); throw new AmqpAdapterInvalidParamException(errMsg);
} }
s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg)); amqpConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg));
String amqpVirtualHost = cfgMap.get("virtualHost"); String amqpVirtualHost = cfgMap.get("virtualHost");
if (StringUtils.isBlank(amqpVirtualHost)) { if (StringUtils.isBlank(amqpVirtualHost)) {
String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!"; String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!";
throw new AmqpAdapterInvalidParamException(errMsg); throw new AmqpAdapterInvalidParamException(errMsg);
} }
s4rConnFactory.setVirtualHost(amqpVirtualHost); amqpConnFactory.setVirtualHost(amqpVirtualHost);
String userNameCfg = cfgMap.get("amqpUser"); String userNameCfg = cfgMap.get("amqpUser");
@ -253,19 +253,19 @@ public class AmqpSpace implements AutoCloseable {
if (StringUtils.isNotBlank(passWord)) { if (StringUtils.isNotBlank(passWord)) {
if (StringUtils.isBlank(userNameCfg)) { if (StringUtils.isBlank(userNameCfg)) {
s4rConnFactory.setUsername(""); amqpConnFactory.setUsername("");
} }
s4rConnFactory.setPassword(passWord); amqpConnFactory.setPassword(passWord);
} }
} }
String useTlsCfg = cfgMap.get("useTls"); String useTlsCfg = cfgMap.get("useTls");
if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) { if (StringUtils.isNotBlank(useTlsCfg) && Boolean.parseBoolean(useTlsCfg)) {
s4rConnFactory.useSslProtocol(); amqpConnFactory.useSslProtocol();
} }
for (int i = 0; i < getAmqpConnNum(); i++) { for (int i = 0; i < getAmqpConnNum(); i++) {
Connection connection = s4rConnFactory.newConnection(); Connection connection = amqpConnFactory.newConnection();
amqpConnections.put((long) i, connection); amqpConnections.put((long) i, connection);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
@ -277,7 +277,7 @@ public class AmqpSpace implements AutoCloseable {
} }
} catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) { } catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) {
logger.error("Unable to establish AMQP connections with the following configuration parameters: {}", logger.error("Unable to establish AMQP connections with the following configuration parameters: {}",
s4rClientConnInfo.toString()); amqpClientConf.toString());
throw new AmqpAdapterUnexpectedException(ex); throw new AmqpAdapterUnexpectedException(ex);
} }
} }
@ -299,7 +299,7 @@ public class AmqpSpace implements AutoCloseable {
AmqpAdapterUtil.pauseCurThreadExec(5); AmqpAdapterUtil.pauseCurThreadExec(5);
} }
catch (Exception ex) { catch (Exception ex) {
String exp = "Unexpected error when shutting down the S4R adaptor space"; String exp = "Unexpected error when shutting down the AMQP adaptor space";
logger.error(exp, ex); logger.error(exp, ex);
} }
} }

View File

@ -38,29 +38,29 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<AmqpTimeTrack
private static final Logger logger = LogManager.getLogger(AmqpBaseOpDispenser.class); private static final Logger logger = LogManager.getLogger(AmqpBaseOpDispenser.class);
protected final ParsedOp parsedOp; protected final ParsedOp parsedOp;
protected final AmqpAdapterMetrics s4rAdapterMetrics; protected final AmqpAdapterMetrics amqpAdapterMetrics;
protected final AmqpSpace s4rSpace; protected final AmqpSpace amqpSpace;
protected final Map<String, String> s4rConfMap = new HashMap<>(); protected final Map<String, String> amqpConfMap = new HashMap<>();
protected final String exchangeType; protected final String exchangeType;
protected AmqpBaseOpDispenser(final DriverAdapter adapter, protected AmqpBaseOpDispenser(final DriverAdapter adapter,
final ParsedOp op, final ParsedOp op,
final AmqpSpace s4RSpace) { final AmqpSpace amqpSpace) {
super(adapter, op); super(adapter, op);
parsedOp = op; parsedOp = op;
this.s4rSpace = s4RSpace; this.amqpSpace = amqpSpace;
s4rAdapterMetrics = new AmqpAdapterMetrics(this, this); amqpAdapterMetrics = new AmqpAdapterMetrics(this, this);
s4rAdapterMetrics.initS4JAdapterInstrumentation(); amqpAdapterMetrics.initS4JAdapterInstrumentation();
s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap()); amqpConfMap.putAll(amqpSpace.getAmqpClientConf().getConfigMap());
this.exchangeType = s4RSpace.getAmqpExchangeType(); this.exchangeType = amqpSpace.getAmqpExchangeType();
s4rSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class))); this.amqpSpace.setTotalCycleNum(NumberUtils.toLong(this.parsedOp.getStaticConfig("cycles", String.class)));
s4rSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class))); this.amqpSpace.setTotalThreadNum(NumberUtils.toInt(this.parsedOp.getStaticConfig("threads", String.class)));
} }
protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) { protected LongFunction<String> lookupMandtoryStrOpValueFunc(String paramName) {
@ -96,17 +96,17 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<AmqpTimeTrack
} }
protected long getConnSeqNum(long cycle) { protected long getConnSeqNum(long cycle) {
return cycle % s4rSpace.getAmqpConnNum(); return cycle % amqpSpace.getAmqpConnNum();
} }
protected long getConnChannelSeqNum(long cycle) { protected long getConnChannelSeqNum(long cycle) {
return (cycle / s4rSpace.getAmqpConnNum()) % s4rSpace.getAmqpConnChannelNum(); return (cycle / amqpSpace.getAmqpConnNum()) % amqpSpace.getAmqpConnChannelNum();
} }
protected long getChannelExchangeSeqNum(long cycle) { protected long getChannelExchangeSeqNum(long cycle) {
return (cycle / ((long) s4rSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
s4rSpace.getAmqpConnChannelNum()) amqpSpace.getAmqpConnChannelNum())
) % s4rSpace.getAmqpChannelExchangeNum(); ) % amqpSpace.getAmqpChannelExchangeNum();
} }
protected String getEffectiveExchangeNameByCycle(long cycle) { protected String getEffectiveExchangeNameByCycle(long cycle) {

View File

@ -37,24 +37,24 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
private final LongFunction<String> bindingKeyFunc; private final LongFunction<String> bindingKeyFunc;
public AmqpMsgRecvOpDispenser(DriverAdapter adapter, public AmqpMsgRecvOpDispenser(DriverAdapter adapter,
ParsedOp op, ParsedOp op,
AmqpSpace s4rSpace) { AmqpSpace amqpSpace) {
super(adapter, op, s4rSpace); super(adapter, op, amqpSpace);
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null); bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
} }
private long getExchangeQueueSeqNum(long cycle) { private long getExchangeQueueSeqNum(long cycle) {
return (cycle / ((long) s4rSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
s4rSpace.getAmqpConnChannelNum() * amqpSpace.getAmqpConnChannelNum() *
s4rSpace.getAmqpChannelExchangeNum()) amqpSpace.getAmqpChannelExchangeNum())
) % s4rSpace.getAmqpExchangeQueueNum(); ) % amqpSpace.getAmqpExchangeQueueNum();
} }
private long getQueueReceiverSeqNum(long cycle) { private long getQueueReceiverSeqNum(long cycle) {
return (cycle / ((long) s4rSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
s4rSpace.getAmqpConnChannelNum() * amqpSpace.getAmqpConnChannelNum() *
s4rSpace.getAmqpChannelExchangeNum() * amqpSpace.getAmqpChannelExchangeNum() *
s4rSpace.getAmqpExchangeQueueNum()) amqpSpace.getAmqpExchangeQueueNum())
) % s4rSpace.getAmqpMsgClntNum(); ) % amqpSpace.getAmqpMsgClntNum();
} }
private String getEffectiveQueueNameByCycle(long cycle) { private String getEffectiveQueueNameByCycle(long cycle) {
@ -94,10 +94,10 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
long connSeqNum = getConnSeqNum(cycle); long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle);
Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); Connection amqpConnection = amqpSpace.getAmqpConnection(connSeqNum);
AmqpSpace.AmqpChannelKey amqpConnChannelKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum); AmqpSpace.AmqpChannelKey amqpConnChannelKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> { return amqpSpace.getAmqpChannels(amqpConnChannelKey, () -> {
Channel channel = null; Channel channel = null;
try { try {
@ -131,7 +131,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
} }
String exchangeName = getEffectiveExchangeNameByCycle(cycle); String exchangeName = getEffectiveExchangeNameByCycle(cycle);
declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); declareExchange(channel, exchangeName, amqpSpace.getAmqpExchangeType());
boolean durable = true; boolean durable = true;
boolean exclusive = true; boolean exclusive = true;
@ -179,8 +179,8 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
} }
return new OpTimeTrackAmqpMsgRecvOp( return new OpTimeTrackAmqpMsgRecvOp(
s4rAdapterMetrics, amqpAdapterMetrics,
s4rSpace, amqpSpace,
channel, channel,
exchangeName, exchangeName,
queueName); queueName);

View File

@ -54,8 +54,8 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
public AmqpMsgSendOpDispenser(DriverAdapter adapter, public AmqpMsgSendOpDispenser(DriverAdapter adapter,
ParsedOp op, ParsedOp op,
AmqpSpace s4rSpace) { AmqpSpace amqpSpace) {
super(adapter, op, s4rSpace); super(adapter, op, amqpSpace);
publisherConfirm = parsedOp publisherConfirm = parsedOp
.getOptionalStaticConfig("publisher_confirm", String.class) .getOptionalStaticConfig("publisher_confirm", String.class)
@ -87,10 +87,10 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
} }
private long getExchangeSenderSeqNum(long cycle) { private long getExchangeSenderSeqNum(long cycle) {
return (cycle / ((long) s4rSpace.getAmqpConnNum() * return (cycle / ((long) amqpSpace.getAmqpConnNum() *
s4rSpace.getAmqpConnChannelNum() * amqpSpace.getAmqpConnChannelNum() *
s4rSpace.getAmqpChannelExchangeNum()) amqpSpace.getAmqpChannelExchangeNum())
) % s4rSpace.getAmqpMsgClntNum(); ) % amqpSpace.getAmqpMsgClntNum();
} }
private String getEffectiveSenderNameByCycle(long cycle) { private String getEffectiveSenderNameByCycle(long cycle) {
@ -116,10 +116,10 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
long connSeqNum = getConnSeqNum(cycle); long connSeqNum = getConnSeqNum(cycle);
long channelSeqNum = getConnChannelSeqNum(cycle); long channelSeqNum = getConnChannelSeqNum(cycle);
Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum); Connection amqpConnection = amqpSpace.getAmqpConnection(connSeqNum);
AmqpSpace.AmqpChannelKey senderKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum); AmqpSpace.AmqpChannelKey senderKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
return s4rSpace.getAmqpChannels(senderKey, () -> { return amqpSpace.getAmqpChannels(senderKey, () -> {
Channel channel = null; Channel channel = null;
try { try {
@ -191,11 +191,11 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
} }
String exchangeName = getEffectiveExchangeNameByCycle(cycle); String exchangeName = getEffectiveExchangeNameByCycle(cycle);
declareExchange(channel, exchangeName, s4rSpace.getAmqpExchangeType()); declareExchange(channel, exchangeName, amqpSpace.getAmqpExchangeType());
return new OpTimeTrackAmqpMsgSendOp( return new OpTimeTrackAmqpMsgSendOp(
s4rAdapterMetrics, amqpAdapterMetrics,
s4rSpace, amqpSpace,
channel, channel,
exchangeName, exchangeName,
msgPayload, msgPayload,

View File

@ -23,8 +23,8 @@ import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp; import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public abstract class AmqpTimeTrackOp implements CycleOp<Object> { public abstract class AmqpTimeTrackOp implements CycleOp<Object> {
private final AmqpAdapterMetrics s4rAdapterMetrics; private final AmqpAdapterMetrics amqpAdapterMetrics;
protected final AmqpSpace s4RSpace; protected final AmqpSpace amqpSpace;
protected final Channel channel; protected final Channel channel;
protected final String exchangeName; protected final String exchangeName;
@ -37,17 +37,17 @@ public abstract class AmqpTimeTrackOp implements CycleOp<Object> {
protected Object cycleObj; protected Object cycleObj;
public AmqpTimeTrackOp(AmqpAdapterMetrics s4rAdapterMetrics, public AmqpTimeTrackOp(AmqpAdapterMetrics amqpAdapterMetrics,
AmqpSpace s4RSpace, AmqpSpace amqpSpace,
Channel channel, Channel channel,
String exchangeName) String exchangeName)
{ {
this.s4rAdapterMetrics = s4rAdapterMetrics; this.amqpAdapterMetrics = amqpAdapterMetrics;
this.s4RSpace = s4RSpace; this.amqpSpace = amqpSpace;
this.channel = channel; this.channel = channel;
this.exchangeName = exchangeName; this.exchangeName = exchangeName;
this.activityStartTime = s4RSpace.getActivityStartTimeMills(); this.activityStartTime = amqpSpace.getActivityStartTimeMills();
this.maxOpTimeInSec = s4RSpace.getMaxOpTimeInSec(); this.maxOpTimeInSec = amqpSpace.getMaxOpTimeInSec();
} }
@Override @Override

View File

@ -34,12 +34,12 @@ public class OpTimeTrackAmqpMsgRecvOp extends AmqpTimeTrackOp {
private final String queueName; private final String queueName;
public OpTimeTrackAmqpMsgRecvOp(AmqpAdapterMetrics s4rAdapterMetrics, public OpTimeTrackAmqpMsgRecvOp(AmqpAdapterMetrics amqpAdapterMetrics,
AmqpSpace s4rSpace, AmqpSpace amqpSpace,
Channel channel, Channel channel,
String exchangeName, String exchangeName,
String queueName) { String queueName) {
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); super(amqpAdapterMetrics, amqpSpace, channel, exchangeName);
this.queueName = queueName; this.queueName = queueName;
} }

View File

@ -44,8 +44,8 @@ public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp {
private static final ConcurrentHashMap<Channel, Integer> private static final ConcurrentHashMap<Channel, Integer>
channelPublishConfirmBathTracking = new ConcurrentHashMap<>(); channelPublishConfirmBathTracking = new ConcurrentHashMap<>();
public OpTimeTrackAmqpMsgSendOp(AmqpAdapterMetrics s4rAdapterMetrics, public OpTimeTrackAmqpMsgSendOp(AmqpAdapterMetrics amqpAdapterMetrics,
AmqpSpace s4rSpace, AmqpSpace amqpSpace,
Channel channel, Channel channel,
String exchangeName, String exchangeName,
String message, String message,
@ -53,7 +53,7 @@ public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp {
boolean publishConfirm, boolean publishConfirm,
String confirmMode, String confirmMode,
int confirmBatchNum) { int confirmBatchNum) {
super(s4rAdapterMetrics, s4rSpace, channel, exchangeName); super(amqpAdapterMetrics, amqpSpace, channel, exchangeName);
this.cycleObj = message; this.cycleObj = message;
this.routingKey = routingKey; this.routingKey = routingKey;
this.publishConfirm = publishConfirm; this.publishConfirm = publishConfirm;
@ -94,7 +94,7 @@ public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp {
if ( (publishConfirmTrackingCnt > 0) && if ( (publishConfirmTrackingCnt > 0) &&
( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) || ( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) ||
(publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) { (publishConfirmTrackingCnt == (amqpSpace.getTotalCycleNum() - 1)) ) ) {
channel.waitForConfirms(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS); channel.waitForConfirms(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.debug("Sync ack received for a batch of published message: {}, {}", logger.debug("Sync ack received for a batch of published message: {}, {}",

View File

@ -47,38 +47,38 @@ public class AmqpAdapterMetrics {
// end-to-end latency // end-to-end latency
private Histogram e2eMsgProcLatencyHistogram; private Histogram e2eMsgProcLatencyHistogram;
private final AmqpBaseOpDispenser s4rBaseOpDispenser; private final AmqpBaseOpDispenser amqpBaseOpDispenser;
public AmqpAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) { public AmqpAdapterMetrics(final AmqpBaseOpDispenser amqpBaseOpDispenser, final NBLabeledElement labeledParent) {
this.s4rBaseOpDispenser = s4rBaseOpDispenser; this.amqpBaseOpDispenser = amqpBaseOpDispenser;
labels=labeledParent.getLabels().and("name", AmqpAdapterMetrics.class.getSimpleName()); labels=labeledParent.getLabels().and("name", AmqpAdapterMetrics.class.getSimpleName());
} }
public void initS4JAdapterInstrumentation() { public void initS4JAdapterInstrumentation() {
// Histogram metrics // Histogram metrics
messageSizeHistogram = messageSizeHistogram =
ActivityMetrics.histogram(this.s4rBaseOpDispenser, ActivityMetrics.histogram(this.amqpBaseOpDispenser,
"message_size", ActivityMetrics.DEFAULT_HDRDIGITS); "message_size", ActivityMetrics.DEFAULT_HDRDIGITS);
// Timer metrics // Timer metrics
bindTimer = bindTimer =
ActivityMetrics.timer(this.s4rBaseOpDispenser, ActivityMetrics.timer(this.amqpBaseOpDispenser,
"bind", ActivityMetrics.DEFAULT_HDRDIGITS); "bind", ActivityMetrics.DEFAULT_HDRDIGITS);
executeTimer = executeTimer =
ActivityMetrics.timer(this.s4rBaseOpDispenser, ActivityMetrics.timer(this.amqpBaseOpDispenser,
"execute", ActivityMetrics.DEFAULT_HDRDIGITS); "execute", ActivityMetrics.DEFAULT_HDRDIGITS);
// End-to-end metrics // End-to-end metrics
// Latency // Latency
e2eMsgProcLatencyHistogram = e2eMsgProcLatencyHistogram =
ActivityMetrics.histogram(this.s4rBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS); ActivityMetrics.histogram(this.amqpBaseOpDispenser, "e2e_msg_latency", ActivityMetrics.DEFAULT_HDRDIGITS);
// Error metrics // Error metrics
msgErrOutOfSeqCounter = msgErrOutOfSeqCounter =
ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_oos"); ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_oos");
msgErrLossCounter = msgErrLossCounter =
ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_loss"); ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_loss");
msgErrDuplicateCounter = msgErrDuplicateCounter =
ActivityMetrics.counter(this.s4rBaseOpDispenser, "err_msg_dup"); ActivityMetrics.counter(this.amqpBaseOpDispenser, "err_msg_dup");
} }
public Timer getBindTimer() { return bindTimer; } public Timer getBindTimer() { return bindTimer; }

View File

@ -37,13 +37,13 @@ public class AmqpClientConf {
private static final Logger logger = LogManager.getLogger(AmqpClientConf.class); private static final Logger logger = LogManager.getLogger(AmqpClientConf.class);
// https://docs.datastax.com/en/streaming/starlight-for-rabbitmq/2.10.1.x/configuration/configuration.html // https://docs.datastax.com/en/streaming/starlight-for-rabbitmq/2.10.1.x/configuration/configuration.html
private final Map<String, String> s4rConfMap = new HashMap<>(); private final Map<String, String> configMap = new HashMap<>();
public AmqpClientConf(final String clientConfFileName) { public AmqpClientConf(final String clientConfFileName) {
////////////////// //////////////////
// Read related S4R client configuration settings from a file // Read related AMQP client configuration settings from a file
this.readRawConfFromFile(clientConfFileName); this.readRawConfFromFile(clientConfFileName);
} }
@ -68,7 +68,7 @@ public class AmqpClientConf {
// Get client connection specific configuration settings, removing "topic." prefix // Get client connection specific configuration settings, removing "topic." prefix
if (!StringUtils.isBlank(confVal)) if (!StringUtils.isBlank(confVal))
this.s4rConfMap.put(confKey, confVal); this.configMap.put(confKey, confVal);
} }
} catch (final IOException ioe) { } catch (final IOException ioe) {
AmqpClientConf.logger.error("Can't read the specified config properties file: {}", fileName); AmqpClientConf.logger.error("Can't read the specified config properties file: {}", fileName);
@ -79,11 +79,11 @@ public class AmqpClientConf {
} }
} }
public Map<String, String> getS4rConfMap() { return this.s4rConfMap; } public Map<String, String> getConfigMap() { return this.configMap; }
public String toString() { public String toString() {
return new ToStringBuilder(this). return new ToStringBuilder(this).
append("s4rConfMap", this.s4rConfMap). append("configMap", this.configMap).
toString(); toString();
} }
} }