Merge pull request #315 from yabinmeng/revert-311-main

Revert "Add NB support for Pulsar JMS"
This commit is contained in:
Jonathan Shook 2021-05-03 11:53:44 -05:00 committed by GitHub
commit ee848a95fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 0 additions and 697 deletions

View File

@ -1,104 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>4.15.45-SNAPSHOT</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>driver-pulsar-jms</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A PulsarJMS driver for nosqlbench. This provides the ability to inject synthetic data
into a pulsar system via JMS 2.0 compatibile APIs
</description>
<repositories>
<repository>
<id>datastax-releases-local</id>
<name>DataStax Local Releases</name>
<url>https://repo.sjc.dsinternal.org/artifactory/datastax-snapshots-local/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<pulsar.version>2.7.1</pulsar.version>
</properties>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>4.15.45-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>driver-stdout</artifactId>
<version>4.15.45-SNAPSHOT</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>2.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<artifactId>pulsar-jms</artifactId>
<groupId>com.datastax.oss</groupId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>

View File

@ -1,73 +0,0 @@
package io.nosqlbench.driver.pularjms;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.LongFunction;
public class PulsarJmsAction implements SyncAction {
private final static Logger logger = LogManager.getLogger(PulsarJmsAction.class);
private final PulsarJmsActivity activity;
private final int slot;
int maxTries;
public PulsarJmsAction(PulsarJmsActivity activity, int slot) {
this.activity = activity;
this.slot = slot;
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
}
@Override
public void init() {
}
@Override
public int runCycle(long cycle) {
// let's fail the action if some async operation failed
activity.failOnAsyncOperationFailure();
long start = System.nanoTime();
PulsarJmsOp pulsarJmsOp;
try (Timer.Context ctx = activity.getBindTimer().time()) {
LongFunction<PulsarJmsOp> readyPulsarJmsOp = activity.getSequencer().get(cycle);
pulsarJmsOp = readyPulsarJmsOp.apply(cycle);
} catch (Exception bindException) {
// if diagnostic mode ...
activity.getErrorhandler().handleError(bindException, cycle, 0);
throw new RuntimeException(
"while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
);
}
for (int i = 0; i < maxTries; i++) {
Timer.Context ctx = activity.getExecuteTimer().time();
try {
// it is up to the pulsarOp to call Context#close when the activity is executed
// this allows us to track time for async operations
pulsarJmsOp.run(ctx::close);
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity
.getErrorhandler()
.handleError(err, cycle, System.nanoTime() - start);
if (!errorDetail.isRetryable()) {
break;
}
}
}
return 0;
}
}

View File

@ -1,136 +0,0 @@
package io.nosqlbench.driver.pularjms;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp;
import io.nosqlbench.driver.pularjms.util.PulsarJmsActivityUtil;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import javax.jms.Destination;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class PulsarJmsActivity extends SimpleActivity {
private final ConcurrentHashMap<String, Destination> jmsDestinations = new ConcurrentHashMap<>();
// e.g. pulsar://localhost:6650
private String pulsarSvcUrl;
// e.g. http://localhost:8080
private String webSvcUrl;
private JMSContext jmsContext;
private OpSequence<OpDispenser<PulsarJmsOp>> sequence;
private volatile Throwable asyncOperationFailure;
private NBErrorHandler errorhandler;
private Timer bindTimer;
private Timer executeTimer;
private Counter bytesCounter;
private Histogram messagesizeHistogram;
public PulsarJmsActivity(ActivityDef activityDef) {
super(activityDef);
}
@Override
public void initActivity() {
super.initActivity();
webSvcUrl =
activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080");
pulsarSvcUrl =
activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
Map<String, Object> configuration = new HashMap<>();
configuration.put("webServiceUrl", webSvcUrl);
configuration.put("brokerServiceUrl", pulsarSvcUrl);
PulsarConnectionFactory factory;
try {
factory = new PulsarConnectionFactory(configuration);
this.jmsContext = factory.createContext();
} catch (JMSException e) {
throw new RuntimeException("PulsarJMS message send:: Unable to initialize Pulsar connection factory!");
}
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this));
setDefaultsFromOpSequence(sequence);
onActivityDefUpdate(activityDef);
this.errorhandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics
);
}
/**
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
*
* @param pulsarTopic
*/
public Destination getOrCreateJmsDestination(String pulsarTopic) {
String encodedTopicStr = PulsarJmsActivityUtil.encode(pulsarTopic);
Destination destination = jmsDestinations.get(encodedTopicStr);
if ( destination == null ) {
destination = jmsContext.createQueue(pulsarTopic);
jmsDestinations.put(encodedTopicStr, destination);
}
return destination;
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
public OpSequence<OpDispenser<PulsarJmsOp>> getSequencer() { return sequence; }
public String getPulsarSvcUrl() {
return pulsarSvcUrl;
}
public String getWebSvcUrl() { return webSvcUrl; }
public JMSContext getJmsContext() { return jmsContext; }
public Timer getBindTimer() {
return bindTimer;
}
public Timer getExecuteTimer() {
return this.executeTimer;
}
public Counter getBytesCounter() {
return bytesCounter;
}
public Histogram getMessagesizeHistogram() {
return messagesizeHistogram;
}
public NBErrorHandler getErrorhandler() {
return errorhandler;
}
public void failOnAsyncOperationFailure() {
if (asyncOperationFailure != null) {
throw new RuntimeException(asyncOperationFailure);
}
}
public void asyncOperationFailed(Throwable ex) {
this.asyncOperationFailure = asyncOperationFailure;
}
}

View File

@ -1,32 +0,0 @@
package io.nosqlbench.driver.pularjms;
import io.nosqlbench.engine.api.activityapi.core.Action;
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.nb.annotations.Service;
@Service(value = ActivityType.class, selector = "pulsar_jms")
public class PulsarJmsActivityType implements ActivityType<PulsarJmsActivity> {
@Override
public ActionDispenser getActionDispenser(PulsarJmsActivity activity) {
return new PulsarJmsActionDispenser(activity);
}
@Override
public PulsarJmsActivity getActivity(ActivityDef activityDef) {
return new PulsarJmsActivity(activityDef);
}
private static class PulsarJmsActionDispenser implements ActionDispenser {
private final PulsarJmsActivity activity;
public PulsarJmsActionDispenser(PulsarJmsActivity activity) {
this.activity = activity;
}
@Override
public Action getAction(int slot) {
return new PulsarJmsAction(activity, slot);
}
}
}

View File

@ -1,106 +0,0 @@
package io.nosqlbench.driver.pularjms;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import io.nosqlbench.driver.pularjms.ops.PulsarJmsMsgSendMapper;
import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp;
import io.nosqlbench.driver.pularjms.util.PulsarJmsActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import javax.jms.Destination;
import javax.jms.JMSRuntimeException;
import java.util.function.LongFunction;
public class ReadyPulsarJmsOp implements OpDispenser<PulsarJmsOp> {
private final OpTemplate opTpl;
private final CommandTemplate cmdTpl;
private final LongFunction<PulsarJmsOp> opFunc;
private final PulsarJmsActivity pulsarJmsActivity;
public ReadyPulsarJmsOp(OpTemplate opTemplate, PulsarJmsActivity pulsarJmsActivity) {
this.opTpl = opTemplate;
this.cmdTpl = new CommandTemplate(opTpl);
this.pulsarJmsActivity = pulsarJmsActivity;
this.opFunc = resolve();
}
public PulsarJmsOp apply(long value) {
return opFunc.apply(value);
}
public LongFunction<PulsarJmsOp> resolve() {
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
// Global/Doc-level parameter: topic_uri
LongFunction<String> topicUriFunc = (l) -> null;
if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
topicUriFunc = (l) -> cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label);
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l);
}
}
// Global/Doc-level parameter: async_api
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
asyncApiFunc = (l) -> value;
} else {
throw new RuntimeException("\"" + PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
}
}
// Global: JMS destinaion
LongFunction<Destination> jmsDestinationFunc = (l) -> null;
try {
LongFunction<String> finalTopicUriFunc = topicUriFunc;
jmsDestinationFunc = (l) -> pulsarJmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l));
}
catch (JMSRuntimeException ex) {
throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!");
}
if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(asyncApiFunc, jmsDestinationFunc);
} /*else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_READ.label)) {
return resolveMsgConsume(topicUriFunc, asyncApiFunc);
} */
else {
throw new RuntimeException("Unsupported Pulsar operation type");
}
}
private LongFunction<PulsarJmsOp> resolveMsgSend(
LongFunction<Boolean> async_api_func,
LongFunction<Destination> jmsDestinationFunc
) {
LongFunction<String> msgBodyFunc;
if (cmdTpl.containsKey("msg_body")) {
if (cmdTpl.isStatic("msg_body")) {
msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body");
} else if (cmdTpl.isDynamic("msg_body")) {
msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l);
} else {
msgBodyFunc = (l) -> null;
}
} else {
throw new RuntimeException("PulsarJMS message send:: \"msg_body\" field must be specified!");
}
return new PulsarJmsMsgSendMapper(
pulsarJmsActivity,
async_api_func,
jmsDestinationFunc,
msgBodyFunc);
}
}

View File

@ -1,44 +0,0 @@
package io.nosqlbench.driver.pularjms.ops;
import io.nosqlbench.driver.pularjms.PulsarJmsActivity;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import javax.jms.Destination;
import javax.jms.JMSContext;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarJmsMsgSendMapper extends PulsarJmsOpMapper {
private final LongFunction<String> msgBodyFunc;
public PulsarJmsMsgSendMapper(PulsarJmsActivity pulsarJmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc,
LongFunction<String> msgBodyFunc) {
super(pulsarJmsActivity, asyncApiFunc, jmsDestinationFunc);
this.msgBodyFunc = msgBodyFunc;
}
@Override
public PulsarJmsOp apply(long value) {
Destination jmsDestination = jmsDestinationFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
String msgBody = msgBodyFunc.apply(value);
return new PulsarJmsMsgSendOp(
pulsarJmsActivity,
asyncApi,
jmsDestination,
msgBody
);
}
}

View File

@ -1,60 +0,0 @@
package io.nosqlbench.driver.pularjms.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pularjms.PulsarJmsActivity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import javax.jms.Destination;
import javax.jms.JMSContext;
import javax.jms.JMSProducer;
import java.nio.charset.StandardCharsets;
public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp {
private final static Logger logger = LogManager.getLogger(PulsarJmsMsgSendOp.class);
private final PulsarJmsActivity pulsarActivity;
private final boolean asyncPulsarOp;
private final Destination jmsDestination;
private final JMSContext jmsContext;
private final JMSProducer jmsProducer;
private final String msgBody;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarJmsMsgSendOp(PulsarJmsActivity pulsarActivity,
boolean asyncPulsarOp,
Destination jmsDestination,
String msgBody) {
this.pulsarActivity = pulsarActivity;
this.asyncPulsarOp = asyncPulsarOp;
this.jmsDestination = jmsDestination;
this.jmsContext = pulsarActivity.getJmsContext();
this.jmsProducer = jmsContext.createProducer();
this.msgBody = msgBody;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
}
@Override
public void run() {
if ((msgBody == null) || msgBody.isEmpty()) {
throw new RuntimeException("JMS message body can't be empty!");
}
int messageSize;
try {
byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8);
messageSize = msgBytes.length;
jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8));
messagesizeHistogram.update(messageSize);
bytesCounter.inc(messageSize);
}
catch (Exception ex) {
logger.error("Failed to send JMS message - " + msgBody);
}
}
}

View File

@ -1,14 +0,0 @@
package io.nosqlbench.driver.pularjms.ops;
/**
* Base type of all Pulsar Operations including Producers and Consumers.
*/
public interface PulsarJmsOp {
/**
* Execute the operation, invoke the timeTracker when the operation ended.
* The timeTracker can be invoked in a separate thread, it is only used for metrics.
* @param timeTracker
*/
void run(Runnable timeTracker);
}

View File

@ -1,21 +0,0 @@
package io.nosqlbench.driver.pularjms.ops;
import io.nosqlbench.driver.pularjms.PulsarJmsActivity;
import javax.jms.Destination;
import java.util.function.LongFunction;
public abstract class PulsarJmsOpMapper implements LongFunction<PulsarJmsOp> {
protected final PulsarJmsActivity pulsarJmsActivity;
protected final LongFunction<Boolean> asyncApiFunc;
protected final LongFunction<Destination> jmsDestinationFunc;
public PulsarJmsOpMapper(PulsarJmsActivity pulsarJmsActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Destination> jmsDestinationFunc)
{
this.pulsarJmsActivity = pulsarJmsActivity;
this.asyncApiFunc = asyncApiFunc;
this.jmsDestinationFunc = jmsDestinationFunc;
}
}

View File

@ -1,17 +0,0 @@
package io.nosqlbench.driver.pularjms.ops;
/**
* Base type of all Sync Pulsar Operations including Producers and Consumers.
*/
public abstract class PulsarJmsTimeTrackOp implements PulsarJmsOp {
public void run(Runnable timeTracker) {
try {
this.run();
} finally {
timeTracker.run();
}
}
public abstract void run();
}

View File

@ -1,68 +0,0 @@
package io.nosqlbench.driver.pularjms.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.stream.Collectors;
public class PulsarJmsActivityUtil {
private final static Logger logger = LogManager.getLogger(PulsarJmsActivityUtil.class);
// Supported message operation types
public enum OP_TYPES {
MSG_SEND("msg_send"),
MSG_READ("msg_read");
public final String label;
OP_TYPES(String label) {
this.label = label;
}
}
public static boolean isValidClientType(String type) {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public enum DOC_LEVEL_PARAMS {
TOPIC_URI("topic_uri"),
ASYNC_API("async_api");
public final String label;
DOC_LEVEL_PARAMS(String label) {
this.label = label;
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(param));
}
public static String encode(String... strings) {
StringBuilder stringBuilder = new StringBuilder();
for (String str : strings) {
if (!StringUtils.isBlank(str))
stringBuilder.append(str).append("::");
}
String concatenatedStr =
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
}
}

View File

@ -1,21 +0,0 @@
bindings:
payload: NumberNameToString() #AlphaNumericString(20)
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types:
params:
# topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
topic_uri: "persistent://public/default/t0"
async_api: "false"
blocks:
- name: producer-block
tags:
phase: jms_producer
admin_task: false
statements:
- name: s1
optype: msg_send
msg_body: "{payload}"

View File

@ -50,7 +50,6 @@
<module>driver-jdbc</module>
<module>driver-cockroachdb</module>
<module>driver-pulsar</module>
<module>driver-pulsar-jms</module>
<!-- VIRTDATA MODULES -->