Rename adapter-s4r to adapter-amqp since this is a generic AMQP-0-9-1 implementation, which includes Pulsar+S4R plugin.

This commit is contained in:
yabinmeng 2023-06-30 10:03:57 -05:00
parent cd0a62f339
commit 1880dfa9a0
26 changed files with 183 additions and 183 deletions

View File

@ -17,7 +17,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>adapter-s4r</artifactId>
<artifactId>adapter-amqp</artifactId>
<packaging>jar</packaging>
<parent>

View File

@ -14,9 +14,9 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r;
package io.nosqlbench.adapter.amqp;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
@ -29,24 +29,24 @@ import org.apache.logging.log4j.Logger;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "s4r")
public class S4RDriverAdapter extends BaseDriverAdapter<S4RTimeTrackOp, S4RSpace> {
private final static Logger logger = LogManager.getLogger(S4RDriverAdapter.class);
@Service(value = DriverAdapter.class, selector = "amqp")
public class AmqpDriverAdapter extends BaseDriverAdapter<AmqpTimeTrackOp, AmqpSpace> {
private final static Logger logger = LogManager.getLogger(AmqpDriverAdapter.class);
@Override
public OpMapper<S4RTimeTrackOp> getOpMapper() {
DriverSpaceCache<? extends S4RSpace> spaceCache = getSpaceCache();
public OpMapper<AmqpTimeTrackOp> getOpMapper() {
DriverSpaceCache<? extends AmqpSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new S4ROpMapper(this, adapterConfig, spaceCache);
return new AmqpOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends S4RSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new S4RSpace(s, cfg);
public Function<String, ? extends AmqpSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new AmqpSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(S4RSpace.getConfigModel());
return super.getConfigModel().add(AmqpSpace.getConfigModel());
}
}

View File

@ -14,11 +14,11 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r;
package io.nosqlbench.adapter.amqp;
import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgRecvOpDispenser;
import io.nosqlbench.adapter.s4r.dispensers.AmqpMsgSendOpDispenser;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
import io.nosqlbench.adapter.amqp.dispensers.AmqpMsgRecvOpDispenser;
import io.nosqlbench.adapter.amqp.dispensers.AmqpMsgSendOpDispenser;
import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
@ -29,24 +29,24 @@ import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S4ROpMapper implements OpMapper<S4RTimeTrackOp> {
public class AmqpOpMapper implements OpMapper<AmqpTimeTrackOp> {
private final static Logger logger = LogManager.getLogger(S4ROpMapper.class);
private final static Logger logger = LogManager.getLogger(AmqpOpMapper.class);
private final NBConfiguration cfg;
private final DriverSpaceCache<? extends S4RSpace> spaceCache;
private final DriverSpaceCache<? extends AmqpSpace> spaceCache;
private final DriverAdapter adapter;
public S4ROpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends S4RSpace> spaceCache) {
public AmqpOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends AmqpSpace> spaceCache) {
this.cfg = cfg;
this.spaceCache = spaceCache;
this.adapter = adapter;
}
@Override
public OpDispenser<? extends S4RTimeTrackOp> apply(ParsedOp op) {
public OpDispenser<? extends AmqpTimeTrackOp> apply(ParsedOp op) {
String spaceName = op.getStaticConfigOr("space", "default");
S4RSpace s4RSpace = spaceCache.get(spaceName);
AmqpSpace s4RSpace = spaceCache.get(spaceName);
/*
* If the user provides a body element, then they want to provide the JSON or
@ -57,7 +57,7 @@ public class S4ROpMapper implements OpMapper<S4RTimeTrackOp> {
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
}
else {
TypeAndTarget<S4ROpType, String> opType = op.getTypeAndTarget(S4ROpType.class, String.class);
TypeAndTarget<AmqpOpType, String> opType = op.getTypeAndTarget(AmqpOpType.class, String.class);
return switch (opType.enumId) {
case AmqpMsgSender ->

View File

@ -14,9 +14,9 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r;
package io.nosqlbench.adapter.amqp;
public enum S4ROpType {
public enum AmqpOpType {
AmqpMsgSender,
AmqpMsgReceiver
}

View File

@ -14,15 +14,15 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r;
package io.nosqlbench.adapter.amqp;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil;
import io.nosqlbench.adapter.s4r.util.S4RClientConf;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterInvalidParamException;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterUtil;
import io.nosqlbench.adapter.amqp.util.AmqpClientConf;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
@ -44,14 +44,14 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
public class S4RSpace implements AutoCloseable {
public class AmqpSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(S4RSpace.class);
private final static Logger logger = LogManager.getLogger(AmqpSpace.class);
private final String spaceName;
private final NBConfiguration cfg;
private final S4RClientConf s4rClientConf;
private final AmqpClientConf s4rClientConf;
///////////////////////////////////////////////////////////////////
// NOTE: in this driver, we assume:
@ -94,7 +94,7 @@ public class S4RSpace implements AutoCloseable {
private ConnectionFactory s4rConnFactory;
// Default to "direct" type
private String amqpExchangeType = S4RAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label;
private String amqpExchangeType = AmqpAdapterUtil.AMQP_EXCHANGE_TYPES.DIRECT.label;
private final ConcurrentHashMap<Long, Connection> amqpConnections = new ConcurrentHashMap<>();
@ -110,19 +110,19 @@ public class S4RSpace implements AutoCloseable {
// Maximum time length to execute S4R operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes
// - 0 means no maximum time constraint. AmqpTimeTrackOp is always executed until NB execution cycle finishes
private final long maxOpTimeInSec;
private final long activityStartTimeMills;
private long totalCycleNum;
private long totalThreadNum;
public S4RSpace(String spaceName, NBConfiguration cfg) {
public AmqpSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.cfg = cfg;
String s4rClientConfFileName = cfg.get("config");
this.s4rClientConf = new S4RClientConf(s4rClientConfFileName);
this.s4rClientConf = new AmqpClientConf(s4rClientConfFileName);
this.amqpConnNum =
NumberUtils.toInt(cfg.getOptional("num_conn").orElse("1"));
this.amqpConnChannelNum =
@ -148,7 +148,7 @@ public class S4RSpace implements AutoCloseable {
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(S4RSpace.class)
return ConfigModel.of(AmqpSpace.class)
.add(Param.defaultTo("config", "config.properties")
.setDescription("S4R client connection configuration property file."))
.add(Param.defaultTo("num_conn", 1)
@ -178,7 +178,7 @@ public class S4RSpace implements AutoCloseable {
public long getActivityStartTimeMills() { return this.activityStartTimeMills; }
public long getMaxOpTimeInSec() { return this.maxOpTimeInSec; }
public S4RClientConf getS4rClientConf() { return s4rClientConf; }
public AmqpClientConf getS4rClientConf() { return s4rClientConf; }
public String getAmqpExchangeType() { return amqpExchangeType; }
public int getAmqpConnNum() { return this.amqpConnNum; }
@ -195,24 +195,24 @@ public class S4RSpace implements AutoCloseable {
public long getTotalThreadNum() { return totalThreadNum; }
public void setTotalThreadNum(long threadNum) { totalThreadNum = threadNum; }
public void initializeSpace(S4RClientConf s4rClientConnInfo) {
public void initializeSpace(AmqpClientConf s4rClientConnInfo) {
Map<String, String> cfgMap = s4rClientConnInfo.getS4rConfMap();
if (amqpConnNum < 1) {
String errMsg = "AMQP connection number (\"num_conn\") must be a positive number!";
throw new S4RAdapterInvalidParamException(errMsg);
throw new AmqpAdapterInvalidParamException(errMsg);
}
if (amqpConnChannelNum < 1) {
String errMsg = "AMQP channel number per connection (\"num_channel\") must be a positive number!";
throw new S4RAdapterInvalidParamException(errMsg);
throw new AmqpAdapterInvalidParamException(errMsg);
}
amqpExchangeType = cfgMap.get("exchangeType");
if (!S4RAdapterUtil.AMQP_EXCHANGE_TYPES.isValidLabel(amqpExchangeType)) {
if (!AmqpAdapterUtil.AMQP_EXCHANGE_TYPES.isValidLabel(amqpExchangeType)) {
String errMsg = "Invalid AMQP exchange type: \"" + amqpExchangeType + "\". " +
"Valid values are: \"" + S4RAdapterUtil.getValidAmqpExchangeTypeList() + "\"";
throw new S4RAdapterInvalidParamException(errMsg);
"Valid values are: \"" + AmqpAdapterUtil.getValidAmqpExchangeTypeList() + "\"";
throw new AmqpAdapterInvalidParamException(errMsg);
}
if (s4rConnFactory == null) {
@ -222,21 +222,21 @@ public class S4RSpace implements AutoCloseable {
String amqpServerHost = cfgMap.get("amqpSrvHost");
if (StringUtils.isBlank(amqpServerHost)) {
String errMsg = "AMQP server host (\"amqpSrvHost\") must be specified!";
throw new S4RAdapterInvalidParamException(errMsg);
throw new AmqpAdapterInvalidParamException(errMsg);
}
s4rConnFactory.setHost(amqpServerHost);
String amqpSrvPortCfg = cfgMap.get("amqpSrvPort");
if (StringUtils.isBlank(amqpSrvPortCfg)) {
String errMsg = "AMQP server port (\"amqpSrvPort\") must be specified!";
throw new S4RAdapterInvalidParamException(errMsg);
throw new AmqpAdapterInvalidParamException(errMsg);
}
s4rConnFactory.setPort(Integer.parseInt(amqpSrvPortCfg));
String amqpVirtualHost = cfgMap.get("virtualHost");
if (StringUtils.isBlank(amqpVirtualHost)) {
String errMsg = "AMQP virtual host (\"virtualHost\") must be specified!";
throw new S4RAdapterInvalidParamException(errMsg);
throw new AmqpAdapterInvalidParamException(errMsg);
}
s4rConnFactory.setVirtualHost(amqpVirtualHost);
@ -278,7 +278,7 @@ public class S4RSpace implements AutoCloseable {
} catch (IOException|TimeoutException|NoSuchAlgorithmException|KeyManagementException ex) {
logger.error("Unable to establish AMQP connections with the following configuration parameters: {}",
s4rClientConnInfo.toString());
throw new S4RAdapterUnexpectedException(ex);
throw new AmqpAdapterUnexpectedException(ex);
}
}
}
@ -296,7 +296,7 @@ public class S4RSpace implements AutoCloseable {
}
// Pause 5 seconds before closing producers/consumers
S4RAdapterUtil.pauseCurThreadExec(5);
AmqpAdapterUtil.pauseCurThreadExec(5);
}
catch (Exception ex) {
String exp = "Unexpected error when shutting down the S4R adaptor space";

View File

@ -14,13 +14,13 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.dispensers;
package io.nosqlbench.adapter.amqp.dispensers;
import com.rabbitmq.client.Channel;
import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
@ -33,26 +33,26 @@ import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackOp, S4RSpace> {
public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<AmqpTimeTrackOp, AmqpSpace> {
private static final Logger logger = LogManager.getLogger("AmqpBaseOpDispenser");
private static final Logger logger = LogManager.getLogger(AmqpBaseOpDispenser.class);
protected final ParsedOp parsedOp;
protected final S4RAdapterMetrics s4rAdapterMetrics;
protected final S4RSpace s4rSpace;
protected final AmqpAdapterMetrics s4rAdapterMetrics;
protected final AmqpSpace s4rSpace;
protected final Map<String, String> s4rConfMap = new HashMap<>();
protected final String exchangeType;
protected AmqpBaseOpDispenser(final DriverAdapter adapter,
final ParsedOp op,
final S4RSpace s4RSpace) {
final AmqpSpace s4RSpace) {
super(adapter, op);
parsedOp = op;
this.s4rSpace = s4RSpace;
s4rAdapterMetrics = new S4RAdapterMetrics(this, this);
s4rAdapterMetrics = new AmqpAdapterMetrics(this, this);
s4rAdapterMetrics.initS4JAdapterInstrumentation();
s4rConfMap.putAll(s4RSpace.getS4rClientConf().getS4rConfMap());
@ -91,7 +91,7 @@ public abstract class AmqpBaseOpDispenser extends BaseOpDispenser<S4RTimeTrackO
} catch (IOException e) {
String errMsg = String.format("Failed to declare the AMQP exchange \"%s\" on channel \"%s\"!",
exchangeName, channel);
throw new S4RAdapterUnexpectedException(errMsg);
throw new AmqpAdapterUnexpectedException(errMsg);
}
}

View File

@ -14,14 +14,14 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.dispensers;
package io.nosqlbench.adapter.amqp.dispensers;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgRecvOp;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.ops.OpTimeTrackAmqpMsgRecvOp;
import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
@ -32,12 +32,12 @@ import java.util.function.LongFunction;
public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("AmqpMsgRecvOpDispenser");
private final static Logger logger = LogManager.getLogger(AmqpMsgRecvOpDispenser.class);
private final LongFunction<String> bindingKeyFunc;
public AmqpMsgRecvOpDispenser(DriverAdapter adapter,
ParsedOp op,
S4RSpace s4rSpace) {
AmqpSpace s4rSpace) {
super(adapter, op, s4rSpace);
bindingKeyFunc = lookupOptionalStrOpValueFunc("binding_key", null);
}
@ -95,7 +95,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
long channelSeqNum = getConnChannelSeqNum(cycle);
Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
S4RSpace.AmqpChannelKey amqpConnChannelKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
AmqpSpace.AmqpChannelKey amqpConnChannelKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
return s4rSpace.getAmqpChannels(amqpConnChannelKey, () -> {
Channel channel = null;
@ -120,10 +120,10 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
@Override
public S4RTimeTrackOp apply(long cycle) {
public AmqpTimeTrackOp apply(long cycle) {
Channel channel = getAmqpChannelForReceiver(cycle);
if (channel == null) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
String.format(
"Failed to get AMQP channel for receiver %s [%d]!",
getEffectiveReceiverName(cycle),
@ -150,7 +150,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
}
}
catch (IOException ex) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
String.format(
"Unable to declare the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" for a receiver!",
queueName, durable, exclusive, autoDelete, exchangeName)
@ -171,7 +171,7 @@ public class AmqpMsgRecvOpDispenser extends AmqpBaseOpDispenser {
}
}
catch (IOException ex) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
String.format(
"Unable to bind the AMQP queue - \"%s (%b/%b/%b)\" on exchange \"%s\" with binding key \"%s\"!",
queueName, durable, exclusive, autoDelete, exchangeName, bindingKey)

View File

@ -14,16 +14,16 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.dispensers;
package io.nosqlbench.adapter.amqp.dispensers;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterInvalidParamException;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.ops.OpTimeTrackAmqpMsgSendOp;
import io.nosqlbench.adapter.s4r.ops.S4RTimeTrackOp;
import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil;
import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterInvalidParamException;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.ops.OpTimeTrackAmqpMsgSendOp;
import io.nosqlbench.adapter.amqp.ops.AmqpTimeTrackOp;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterUtil;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.engine.api.templating.ParsedOp;
import org.apache.commons.lang3.BooleanUtils;
@ -38,7 +38,7 @@ import java.util.function.Predicate;
public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
private final static Logger logger = LogManager.getLogger("AmqpMsgSendOpDispenser");
private final static Logger logger = LogManager.getLogger(AmqpMsgSendOpDispenser.class);
private final boolean publisherConfirm ;
// Only relevant when 'publisherConfirm' is true
@ -54,7 +54,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
public AmqpMsgSendOpDispenser(DriverAdapter adapter,
ParsedOp op,
S4RSpace s4rSpace) {
AmqpSpace s4rSpace) {
super(adapter, op, s4rSpace);
publisherConfirm = parsedOp
@ -65,20 +65,20 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
confirmMode = parsedOp
.getOptionalStaticValue("confirm_mode", String.class)
.orElse(S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label);
if (! S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) {
throw new S4RAdapterInvalidParamException("confirm_mode",
.orElse(AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label);
if (! AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.isValidLabel(confirmMode)) {
throw new AmqpAdapterInvalidParamException("confirm_mode",
"The provided value \"" + confirmMode + "\" is not one of following valid values: '" +
S4RAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
AmqpAdapterUtil.getValidAmqpPublisherConfirmModeList() + "'");
}
confirmBatchNum = parsedOp
.getOptionalStaticConfig("confirm_batch_num", String.class)
.filter(Predicate.not(String::isEmpty))
.map(NumberUtils::toInt)
.orElse(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM);
if (confirmBatchNum < S4RAdapterUtil.AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN) {
confirmBatchNum = S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM;
.orElse(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM);
if (confirmBatchNum < AmqpAdapterUtil.AMQP_PUBLISH_CONFIRM_BATCH_NUM_MIN) {
confirmBatchNum = AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_BATCH_NUM;
}
routingKeyFunc = lookupOptionalStrOpValueFunc("routing_key", null);
@ -117,7 +117,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
long channelSeqNum = getConnChannelSeqNum(cycle);
Connection amqpConnection = s4rSpace.getAmqpConnection(connSeqNum);
S4RSpace.AmqpChannelKey senderKey = new S4RSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
AmqpSpace.AmqpChannelKey senderKey = new AmqpSpace.AmqpChannelKey(connSeqNum, channelSeqNum);
return s4rSpace.getAmqpChannels(senderKey, () -> {
Channel channel = null;
@ -140,7 +140,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
if ((channel != null) && publisherConfirm) {
channel.confirmSelect();
if (StringUtils.equalsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) {
if (StringUtils.equalsIgnoreCase(confirmMode, AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.ASYNC.label)) {
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
if (logger.isTraceEnabled()) {
@ -165,7 +165,7 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
}
} catch (IOException ex) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
"Failed to enable publisher acknowledgement on the AMQP channel (" +
channel + ")!");
}
@ -175,15 +175,15 @@ public class AmqpMsgSendOpDispenser extends AmqpBaseOpDispenser {
}
@Override
public S4RTimeTrackOp apply(long cycle) {
public AmqpTimeTrackOp apply(long cycle) {
String msgPayload = msgPayloadFunc.apply(cycle);
if (StringUtils.isBlank(msgPayload)) {
throw new S4RAdapterInvalidParamException("Message payload must be specified and can't be empty!");
throw new AmqpAdapterInvalidParamException("Message payload must be specified and can't be empty!");
}
Channel channel = getAmqpChannelForSender(cycle);
if (channel == null) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
String.format(
"Failed to get AMQP channel for sender %s [%d]!",
getEffectiveSenderNameByCycle(cycle),

View File

@ -15,15 +15,15 @@
* under the License.
*/
package io.nosqlbench.adapter.s4r.exception;
package io.nosqlbench.adapter.amqp.exception;
public class S4RAdapterInvalidParamException extends RuntimeException {
public class AmqpAdapterInvalidParamException extends RuntimeException {
public S4RAdapterInvalidParamException(String paramName, String errDesc) {
public AmqpAdapterInvalidParamException(String paramName, String errDesc) {
super("Invalid setting for parameter (" + paramName + "): " + errDesc);
}
public S4RAdapterInvalidParamException(String fullErrDesc) {
public AmqpAdapterInvalidParamException(String fullErrDesc) {
super(fullErrDesc);
}
}

View File

@ -15,15 +15,15 @@
* under the License.
*/
package io.nosqlbench.adapter.s4r.exception;
package io.nosqlbench.adapter.amqp.exception;
public class S4RAdapterUnexpectedException extends RuntimeException {
public class AmqpAdapterUnexpectedException extends RuntimeException {
public S4RAdapterUnexpectedException(String message) {
public AmqpAdapterUnexpectedException(String message) {
super(message);
printStackTrace();
}
public S4RAdapterUnexpectedException(Exception e) {
public AmqpAdapterUnexpectedException(Exception e) {
super(e);
printStackTrace();
}

View File

@ -14,11 +14,11 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.exception;
package io.nosqlbench.adapter.amqp.exception;
public class S4RAdapterUnsupportedOpException extends RuntimeException {
public class AmqpAdapterUnsupportedOpException extends RuntimeException {
public S4RAdapterUnsupportedOpException(final String kafkaOpType) {
public AmqpAdapterUnsupportedOpException(final String kafkaOpType) {
super("Unsupported Kafka adapter operation type: \"" + kafkaOpType + '"');
}
}

View File

@ -15,32 +15,32 @@
* under the License.
*/
package io.nosqlbench.adapter.s4r.ops;
package io.nosqlbench.adapter.amqp.ops;
import com.rabbitmq.client.Channel;
import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.CycleOp;
public abstract class S4RTimeTrackOp implements CycleOp<Object> {
private final S4RAdapterMetrics s4rAdapterMetrics;
protected final S4RSpace s4RSpace;
public abstract class AmqpTimeTrackOp implements CycleOp<Object> {
private final AmqpAdapterMetrics s4rAdapterMetrics;
protected final AmqpSpace s4RSpace;
protected final Channel channel;
protected final String exchangeName;
// Maximum time length to execute Kafka operations (e.g. message send or consume)
// - when NB execution passes this threshold, it is simply NoOp
// - 0 means no maximum time constraint. S4RTimeTrackOp is always executed until NB execution cycle finishes
// - 0 means no maximum time constraint. AmqpTimeTrackOp is always executed until NB execution cycle finishes
protected final long maxOpTimeInSec;
protected final long activityStartTime;
protected Object cycleObj;
public S4RTimeTrackOp(S4RAdapterMetrics s4rAdapterMetrics,
S4RSpace s4RSpace,
Channel channel,
String exchangeName)
public AmqpTimeTrackOp(AmqpAdapterMetrics s4rAdapterMetrics,
AmqpSpace s4RSpace,
Channel channel,
String exchangeName)
{
this.s4rAdapterMetrics = s4rAdapterMetrics;
this.s4RSpace = s4RSpace;

View File

@ -14,13 +14,13 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.ops;
package io.nosqlbench.adapter.amqp.ops;
import com.rabbitmq.client.*;
import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -28,14 +28,14 @@ import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
public class OpTimeTrackAmqpMsgRecvOp extends AmqpTimeTrackOp {
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgRecvOp");
private final static Logger logger = LogManager.getLogger(OpTimeTrackAmqpMsgRecvOp.class);
private final String queueName;
public OpTimeTrackAmqpMsgRecvOp(S4RAdapterMetrics s4rAdapterMetrics,
S4RSpace s4rSpace,
public OpTimeTrackAmqpMsgRecvOp(AmqpAdapterMetrics s4rAdapterMetrics,
AmqpSpace s4rSpace,
Channel channel,
String exchangeName,
String queueName) {
@ -70,7 +70,7 @@ public class OpTimeTrackAmqpMsgRecvOp extends S4RTimeTrackOp {
channel.basicConsume(queueName, true, receiver);
}
catch (IOException e) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
"Failed to receive message via the current channel: " + channel);
}
}

View File

@ -14,14 +14,14 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.ops;
package io.nosqlbench.adapter.amqp.ops;
import com.rabbitmq.client.Channel;
import io.nosqlbench.adapter.s4r.S4RSpace;
import io.nosqlbench.adapter.s4r.exception.S4RAdapterUnexpectedException;
import io.nosqlbench.adapter.s4r.util.S4RAdapterMetrics;
import io.nosqlbench.adapter.s4r.util.S4RAdapterUtil;
import io.nosqlbench.adapter.amqp.AmqpSpace;
import io.nosqlbench.adapter.amqp.exception.AmqpAdapterUnexpectedException;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterMetrics;
import io.nosqlbench.adapter.amqp.util.AmqpAdapterUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -32,9 +32,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
public class OpTimeTrackAmqpMsgSendOp extends AmqpTimeTrackOp {
private final static Logger logger = LogManager.getLogger("OpTimeTrackAmqpMsgSendOp");
private final static Logger logger = LogManager.getLogger(OpTimeTrackAmqpMsgSendOp.class);
private final String routingKey;
private final boolean publishConfirm;
@ -44,8 +44,8 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
private static final ConcurrentHashMap<Channel, Integer>
channelPublishConfirmBathTracking = new ConcurrentHashMap<>();
public OpTimeTrackAmqpMsgSendOp(S4RAdapterMetrics s4rAdapterMetrics,
S4RSpace s4rSpace,
public OpTimeTrackAmqpMsgSendOp(AmqpAdapterMetrics s4rAdapterMetrics,
AmqpSpace s4rSpace,
Channel channel,
String exchangeName,
String message,
@ -81,21 +81,21 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
if (publishConfirm) {
// Individual publish confirm
if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) {
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
if (StringUtils.containsIgnoreCase(confirmMode, AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.INDIVIDUAL.label)) {
channel.waitForConfirms(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
if (logger.isTraceEnabled()) {
logger.debug("Sync ack received for an individual published message: {}", cycle);
}
}
// Batch publish confirm
else if (StringUtils.containsIgnoreCase(confirmMode, S4RAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) {
else if (StringUtils.containsIgnoreCase(confirmMode, AmqpAdapterUtil.AMQP_PUB_CONFIRM_MODE.BATCH.label)) {
int publishConfirmTrackingCnt =
channelPublishConfirmBathTracking.getOrDefault(channel, 0);
if ( (publishConfirmTrackingCnt > 0) &&
( (publishConfirmTrackingCnt % (confirmBatchNum - 1) == 0) ||
(publishConfirmTrackingCnt == (s4RSpace.getTotalCycleNum() - 1)) ) ) {
channel.waitForConfirms(S4RAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
channel.waitForConfirms(AmqpAdapterUtil.DFT_AMQP_PUBLISH_CONFIRM_TIMEOUT_MS);
if (logger.isTraceEnabled()) {
logger.debug("Sync ack received for a batch of published message: {}, {}",
cycle, publishConfirmTrackingCnt);
@ -110,16 +110,16 @@ public class OpTimeTrackAmqpMsgSendOp extends S4RTimeTrackOp {
}
}
catch (IllegalStateException ex) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
"Wait for confirm on a wrong non-confirm channel: " + channel);
}
catch (InterruptedException | TimeoutException ex) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
"Failed to wait for the ack of the published message (" + msgPayload
+ ") via the current channel: " + channel);
}
catch (IOException ex) {
throw new S4RAdapterUnexpectedException(
throw new AmqpAdapterUnexpectedException(
"Failed to publish message (" + msgPayload
+ ") via the current channel: " + channel);
}

View File

@ -14,21 +14,21 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.util;
package io.nosqlbench.adapter.amqp.util;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.adapter.s4r.dispensers.AmqpBaseOpDispenser;
import io.nosqlbench.adapter.amqp.dispensers.AmqpBaseOpDispenser;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class S4RAdapterMetrics {
public class AmqpAdapterMetrics {
private static final Logger logger = LogManager.getLogger("S4JAdapterMetrics");
private static final Logger logger = LogManager.getLogger(AmqpAdapterMetrics.class);
private final NBLabels labels;
private Histogram messageSizeHistogram;
@ -49,9 +49,9 @@ public class S4RAdapterMetrics {
private Histogram e2eMsgProcLatencyHistogram;
private final AmqpBaseOpDispenser s4rBaseOpDispenser;
public S4RAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) {
public AmqpAdapterMetrics(final AmqpBaseOpDispenser s4rBaseOpDispenser, final NBLabeledElement labeledParent) {
this.s4rBaseOpDispenser = s4rBaseOpDispenser;
labels=labeledParent.getLabels().and("name", S4RAdapterMetrics.class.getSimpleName());
labels=labeledParent.getLabels().and("name", AmqpAdapterMetrics.class.getSimpleName());
}
public void initS4JAdapterInstrumentation() {

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.util;
package io.nosqlbench.adapter.amqp.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
@ -24,8 +24,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class S4RAdapterUtil {
private static final Logger logger = LogManager.getLogger(S4RAdapterUtil.class);
public class AmqpAdapterUtil {
private static final Logger logger = LogManager.getLogger(AmqpAdapterUtil.class);
public enum AMQP_EXCHANGE_TYPES {
DIRECT("direct"),

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.nosqlbench.adapter.s4r.util;
package io.nosqlbench.adapter.amqp.util;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.configuration2.FileBasedConfiguration;
@ -33,14 +33,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class S4RClientConf {
private static final Logger logger = LogManager.getLogger(S4RClientConf.class);
public class AmqpClientConf {
private static final Logger logger = LogManager.getLogger(AmqpClientConf.class);
// https://docs.datastax.com/en/streaming/starlight-for-rabbitmq/2.10.1.x/configuration/configuration.html
private final Map<String, String> s4rConfMap = new HashMap<>();
public S4RClientConf(final String clientConfFileName) {
public AmqpClientConf(final String clientConfFileName) {
//////////////////
// Read related S4R client configuration settings from a file
@ -71,10 +71,10 @@ public class S4RClientConf {
this.s4rConfMap.put(confKey, confVal);
}
} catch (final IOException ioe) {
S4RClientConf.logger.error("Can't read the specified config properties file: {}", fileName);
AmqpClientConf.logger.error("Can't read the specified config properties file: {}", fileName);
ioe.printStackTrace();
} catch (final ConfigurationException cex) {
S4RClientConf.logger.error("Error loading configuration items from the specified config properties file: {}:{}", fileName, cex.getMessage());
AmqpClientConf.logger.error("Error loading configuration items from the specified config properties file: {}:{}", fileName, cex.getMessage());
cex.printStackTrace();
}
}

View File

@ -1,9 +1,9 @@
---
weight: 0
title: S4R
title: AMQP
---
- [1. Overview](#1-overview)
- [2. NB S4R Usage](#2-nb-s4r-usage)
- [2. NB AMQP Usage](#2-nb-amqp-usage)
- [2.1. Workload Definition](#21-workload-definition)
- [2.1.1. Named Scenarios](#211-named-scenarios)
- [2.2. CLI parameters](#22-cli-parameters)
@ -15,7 +15,7 @@ title: S4R
# 1. Overview
The NB S4R adapter allows sending messages to or receiving messages from
The NB AMQP adapter allows sending messages to or receiving messages from
* an AMQP 0-9-1 based server (e.g. RabbitMQ), or
* a Pulsar cluster with [S4R](https://github.com/datastax/starlight-for-rabbitmq) AMQP (0-9-1) Protocol handler for Pulsar.
@ -30,47 +30,47 @@ At high level, this adapter supports the following AMQP 0-9-1 functionalities
* Supports message-receive based on binding keys
* Receiving messages from AMQP queues with async. consumer acks
# 2. NB S4R Usage
# 2. NB AMQP Usage
## 2.1. Workload Definition
There are two main types of workloads supported by the S4R adapter:
* Message sender workload (see [s4r_msg_sender.yaml](scenarios/s4r_msg_sender.yaml))
* Message receiver workload (see [s4r_msg_receiver.yaml](scenarios/s4r_msg_receiver.yaml))
There are two main types of workloads supported by this adapter:
* Message sender workload (see [amqp_msg_sender.yaml](scenarios/amqp_msg_sender.yaml))
* Message receiver workload (see [amqp_msg_receiver.yaml](scenarios/amqp_msg_receiver.yaml))
Below are examples of running the message sender and receiver workloads separately.
```bash
$ <nb_cmd> run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \
$ <nb_cmd> run driver=amqp -vv cycles=200 strict_msg_error_handling=0 \
threads=8 num_conn=1 num_channel=2 num_exchange=2 num_msg_clnt=2 \
workload=/path/to/s4r_msg_sender.yaml \
config=/path/to/s4r_config.properties
workload=/path/to/amqp_msg_sender.yaml \
config=/path/to/amqp_config.properties
```
```bash
$ <nb_cmd> run driver=s4r -vv cycles=200 strict_msg_error_handling=0 \
$ <nb_cmd> run driver=amqp -vv cycles=200 strict_msg_error_handling=0 \
threads=8 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 \
workload=/path/to/s4r_msg_receiver.yaml \
config=/path/to/s4r_config.properties
workload=/path/to/amqp_msg_receiver.yaml \
config=/path/to/amqp_config.properties
```
### 2.1.1. Named Scenarios
For workload execution convenience, NB engine has the concept of **named scenario** ([doc](https://docs.nosqlbench.io/workloads-101/11-named-scenarios/)).
For NB S4R adapter, the following yaml file is used to define the named scenarios: [nbs4r_msg_proc_named.yaml](scenarios/nbs4r_msg_proc_named.yaml)
For NB AMQP adapter, the following yaml file is used to define the named scenarios: [nbamqp_msg_proc_named.yaml](scenarios/nbamqp_msg_proc_named.yaml)
The CLI command to execute the S4R named scenarios is as simple as below:
The CLI command to execute the named scenarios is as simple as below:
```bash
# for message sender workload
$ <nb_cmd> nbs4r_msg_proc_named msg_send
$ <nb_cmd> nbamqp_msg_proc_named msg_send
# for message receiver workload
$ <nb_cmd> nbs4r_msg_proc_named msg_recv
$ <nb_cmd> nbamqp_msg_proc_named msg_recv
```
## 2.2. CLI parameters
The following CLI parameters are unique to the S4R adapter:
The following CLI parameters are unique to this adapter:
* `num_conn`: the number of AMQP connections to create
* `num_channel`: the number of AMQP channels to create for each connection
@ -84,7 +84,7 @@ The following CLI parameters are unique to the S4R adapter:
### 2.3.1. Global Properties File
A global S4R properties file can be specified via the `config` CLI parameter. It includes the following required properties:
A global AMQP properties file can be specified via the `config` CLI parameter. It includes the following required properties:
* `amqpSrvHost`: AMQP server host (e.g. An Astra Streaming cluster with S4R enabled)
* `amqpSrvPort`: AMQP server port (for S4R enabled Astra Streaming, it is 5671)
* `virtualHost`: AMQP server virtual host (for S4R enabled Astra Streaming, it is "<tenant>/rabbitmq")
@ -93,7 +93,7 @@ A global S4R properties file can be specified via the `config` CLI parameter. It
* `useTls`: whether to use TLS (for S4R enabled Astra Streaming, it is true)
* `exchangeType`: AMQP exchange type (e.g. `direct`, `fanout`, `topic`, or `headers`)
An example of this file can be found from: [s4r_config.properties](conf/s4r_config.properties)
An example of this file can be found from: [amqp_config.properties](conf/amqp_config.properties)
### 2.3.2. Scenario Document Level Properties

View File

@ -1,6 +1,6 @@
scenarios:
msg_send: run driver=s4r cycles=200 threads=16 num_conn=2 num_channel=2 num_exchange=2 num_msg_clnt=2 config=../conf/s4r_config.properties
msg_recv: run driver=s4r cycles=200 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 config=../conf/s4r_config.properties
msg_send: run driver=amqp cycles=200 threads=16 num_conn=2 num_channel=2 num_exchange=2 num_msg_clnt=2 config=../conf/amqp_config.properties
msg_recv: run driver=amqp cycles=200 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 config=../conf/amqp_config.properties
bindings:
mytext_val: AlphaNumericString(100)

View File

@ -115,7 +115,7 @@
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-s4r</artifactId>
<artifactId>adapter-amqp</artifactId>
<version>${revision}</version>
</dependency>

View File

@ -65,7 +65,7 @@
<module.adapter-pulsar>adapter-pulsar</module.adapter-pulsar>
<module.adapter-s4j>adapter-s4j</module.adapter-s4j>
<module.adapter-kafka>adapter-kafka</module.adapter-kafka>
<module.adapter-kafka>adapter-s4r</module.adapter-kafka>
<module.adapter-kafka>adapter-amqp</module.adapter-kafka>
<module.adapter-jdbc>adapter-jdbc</module.adapter-jdbc>
<!-- VIRTDATA MODULES -->
@ -111,7 +111,7 @@
<module>adapter-pulsar</module>
<module>adapter-s4j</module>
<module>adapter-kafka</module>
<module>adapter-s4r</module>
<module>adapter-amqp</module>
<module>adapter-jdbc</module>
<module>adapter-pinecone</module>