diff --git a/driver-pulsar-jms/pom.xml b/driver-pulsar-jms/pom.xml deleted file mode 100644 index a7622e1bf..000000000 --- a/driver-pulsar-jms/pom.xml +++ /dev/null @@ -1,104 +0,0 @@ - - 4.0.0 - - - mvn-defaults - io.nosqlbench - 4.15.45-SNAPSHOT - ../mvn-defaults - - - driver-pulsar-jms - jar - ${project.artifactId} - - - A PulsarJMS driver for nosqlbench. This provides the ability to inject synthetic data - into a pulsar system via JMS 2.0 compatibile APIs - - - - - datastax-releases-local - DataStax Local Releases - https://repo.sjc.dsinternal.org/artifactory/datastax-snapshots-local/ - - false - - - true - - - - - - 2.7.1 - - - - - - - - org.apache.pulsar - pulsar-client - ${pulsar.version} - - - - org.apache.pulsar - pulsar-client-admin - ${pulsar.version} - - - - io.nosqlbench - engine-api - 4.15.45-SNAPSHOT - - - - io.nosqlbench - driver-stdout - 4.15.45-SNAPSHOT - - - - - commons-beanutils - commons-beanutils - 1.9.4 - - - - - org.apache.commons - commons-configuration2 - 2.7 - - - - - org.apache.avro - avro - 1.10.1 - - - - - org.apache.commons - commons-lang3 - 3.12.0 - - - - - pulsar-jms - com.datastax.oss - 1.0.0-SNAPSHOT - - - - - - diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsAction.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsAction.java deleted file mode 100644 index 64bad75bb..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsAction.java +++ /dev/null @@ -1,73 +0,0 @@ -package io.nosqlbench.driver.pularjms; - -import com.codahale.metrics.Timer; -import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp; -import io.nosqlbench.engine.api.activityapi.core.SyncAction; -import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; -import io.nosqlbench.engine.api.activityapi.planning.OpSequence; -import io.nosqlbench.engine.api.activityimpl.ActivityDef; -import io.nosqlbench.engine.api.activityimpl.OpDispenser; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.function.LongFunction; - -public class PulsarJmsAction implements SyncAction { - - private final static Logger logger = LogManager.getLogger(PulsarJmsAction.class); - - private final PulsarJmsActivity activity; - private final int slot; - - int maxTries; - - public PulsarJmsAction(PulsarJmsActivity activity, int slot) { - this.activity = activity; - this.slot = slot; - this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10); - } - - @Override - public void init() { - - } - - @Override - public int runCycle(long cycle) { - // let's fail the action if some async operation failed - activity.failOnAsyncOperationFailure(); - - long start = System.nanoTime(); - - PulsarJmsOp pulsarJmsOp; - try (Timer.Context ctx = activity.getBindTimer().time()) { - LongFunction readyPulsarJmsOp = activity.getSequencer().get(cycle); - pulsarJmsOp = readyPulsarJmsOp.apply(cycle); - } catch (Exception bindException) { - // if diagnostic mode ... - activity.getErrorhandler().handleError(bindException, cycle, 0); - throw new RuntimeException( - "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException - ); - } - - for (int i = 0; i < maxTries; i++) { - Timer.Context ctx = activity.getExecuteTimer().time(); - try { - // it is up to the pulsarOp to call Context#close when the activity is executed - // this allows us to track time for async operations - pulsarJmsOp.run(ctx::close); - break; - } catch (RuntimeException err) { - ErrorDetail errorDetail = activity - .getErrorhandler() - .handleError(err, cycle, System.nanoTime() - start); - if (!errorDetail.isRetryable()) { - break; - } - } - } - - return 0; - } -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsActivity.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsActivity.java deleted file mode 100644 index 5f62245c5..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsActivity.java +++ /dev/null @@ -1,136 +0,0 @@ -package io.nosqlbench.driver.pularjms; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.Timer; -import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; -import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp; -import io.nosqlbench.driver.pularjms.util.PulsarJmsActivityUtil; -import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; -import io.nosqlbench.engine.api.activityapi.planning.OpSequence; -import io.nosqlbench.engine.api.activityimpl.ActivityDef; -import io.nosqlbench.engine.api.activityimpl.OpDispenser; -import io.nosqlbench.engine.api.activityimpl.SimpleActivity; -import io.nosqlbench.engine.api.metrics.ActivityMetrics; - -import javax.jms.Destination; -import javax.jms.JMSContext; -import javax.jms.JMSException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - - -public class PulsarJmsActivity extends SimpleActivity { - - private final ConcurrentHashMap jmsDestinations = new ConcurrentHashMap<>(); - - // e.g. pulsar://localhost:6650 - private String pulsarSvcUrl; - // e.g. http://localhost:8080 - private String webSvcUrl; - - private JMSContext jmsContext; - - private OpSequence> sequence; - private volatile Throwable asyncOperationFailure; - private NBErrorHandler errorhandler; - - private Timer bindTimer; - private Timer executeTimer; - private Counter bytesCounter; - private Histogram messagesizeHistogram; - - public PulsarJmsActivity(ActivityDef activityDef) { - super(activityDef); - } - - @Override - public void initActivity() { - super.initActivity(); - - webSvcUrl = - activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080"); - pulsarSvcUrl = - activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); - - Map configuration = new HashMap<>(); - configuration.put("webServiceUrl", webSvcUrl); - configuration.put("brokerServiceUrl", pulsarSvcUrl); - - PulsarConnectionFactory factory; - try { - factory = new PulsarConnectionFactory(configuration); - this.jmsContext = factory.createContext(); - } catch (JMSException e) { - throw new RuntimeException("PulsarJMS message send:: Unable to initialize Pulsar connection factory!"); - } - - bindTimer = ActivityMetrics.timer(activityDef, "bind"); - executeTimer = ActivityMetrics.timer(activityDef, "execute"); - bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); - messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); - - this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this)); - setDefaultsFromOpSequence(sequence); - onActivityDefUpdate(activityDef); - - this.errorhandler = new NBErrorHandler( - () -> activityDef.getParams().getOptionalString("errors").orElse("stop"), - this::getExceptionMetrics - ); - } - - /** - * If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it - * - * @param pulsarTopic - */ - public Destination getOrCreateJmsDestination(String pulsarTopic) { - String encodedTopicStr = PulsarJmsActivityUtil.encode(pulsarTopic); - Destination destination = jmsDestinations.get(encodedTopicStr); - - if ( destination == null ) { - destination = jmsContext.createQueue(pulsarTopic); - jmsDestinations.put(encodedTopicStr, destination); - } - - return destination; - } - - @Override - public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); } - public OpSequence> getSequencer() { return sequence; } - - public String getPulsarSvcUrl() { - return pulsarSvcUrl; - } - public String getWebSvcUrl() { return webSvcUrl; } - public JMSContext getJmsContext() { return jmsContext; } - - public Timer getBindTimer() { - return bindTimer; - } - public Timer getExecuteTimer() { - return this.executeTimer; - } - public Counter getBytesCounter() { - return bytesCounter; - } - public Histogram getMessagesizeHistogram() { - return messagesizeHistogram; - } - - public NBErrorHandler getErrorhandler() { - return errorhandler; - } - - public void failOnAsyncOperationFailure() { - if (asyncOperationFailure != null) { - throw new RuntimeException(asyncOperationFailure); - } - } - public void asyncOperationFailed(Throwable ex) { - this.asyncOperationFailure = asyncOperationFailure; - } -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsActivityType.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsActivityType.java deleted file mode 100644 index 293921e86..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/PulsarJmsActivityType.java +++ /dev/null @@ -1,32 +0,0 @@ -package io.nosqlbench.driver.pularjms; - -import io.nosqlbench.engine.api.activityapi.core.Action; -import io.nosqlbench.engine.api.activityapi.core.ActionDispenser; -import io.nosqlbench.engine.api.activityapi.core.ActivityType; -import io.nosqlbench.engine.api.activityimpl.ActivityDef; -import io.nosqlbench.nb.annotations.Service; - -@Service(value = ActivityType.class, selector = "pulsar_jms") -public class PulsarJmsActivityType implements ActivityType { - @Override - public ActionDispenser getActionDispenser(PulsarJmsActivity activity) { - return new PulsarJmsActionDispenser(activity); - } - - @Override - public PulsarJmsActivity getActivity(ActivityDef activityDef) { - return new PulsarJmsActivity(activityDef); - } - - private static class PulsarJmsActionDispenser implements ActionDispenser { - private final PulsarJmsActivity activity; - public PulsarJmsActionDispenser(PulsarJmsActivity activity) { - this.activity = activity; - } - - @Override - public Action getAction(int slot) { - return new PulsarJmsAction(activity, slot); - } - } -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ReadyPulsarJmsOp.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ReadyPulsarJmsOp.java deleted file mode 100644 index 8048411c7..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ReadyPulsarJmsOp.java +++ /dev/null @@ -1,106 +0,0 @@ -package io.nosqlbench.driver.pularjms; - -import com.datastax.oss.pulsar.jms.PulsarConnectionFactory; -import io.nosqlbench.driver.pularjms.ops.PulsarJmsMsgSendMapper; -import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp; -import io.nosqlbench.driver.pularjms.util.PulsarJmsActivityUtil; -import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; -import io.nosqlbench.engine.api.activityimpl.OpDispenser; -import io.nosqlbench.engine.api.templating.CommandTemplate; -import org.apache.commons.lang3.BooleanUtils; -import org.apache.commons.lang3.StringUtils; - -import javax.jms.Destination; -import javax.jms.JMSRuntimeException; -import java.util.function.LongFunction; - -public class ReadyPulsarJmsOp implements OpDispenser { - - private final OpTemplate opTpl; - private final CommandTemplate cmdTpl; - private final LongFunction opFunc; - private final PulsarJmsActivity pulsarJmsActivity; - - public ReadyPulsarJmsOp(OpTemplate opTemplate, PulsarJmsActivity pulsarJmsActivity) { - this.opTpl = opTemplate; - this.cmdTpl = new CommandTemplate(opTpl); - this.pulsarJmsActivity = pulsarJmsActivity; - - this.opFunc = resolve(); - } - - public PulsarJmsOp apply(long value) { - return opFunc.apply(value); - } - - public LongFunction resolve() { - if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) { - throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!"); - } - String stmtOpType = cmdTpl.getStatic("optype"); - - // Global/Doc-level parameter: topic_uri - LongFunction topicUriFunc = (l) -> null; - if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) { - if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) { - topicUriFunc = (l) -> cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label); - } else { - topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l); - } - } - - // Global/Doc-level parameter: async_api - LongFunction asyncApiFunc = (l) -> false; - if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { - if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { - boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)); - asyncApiFunc = (l) -> value; - } else { - throw new RuntimeException("\"" + PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!"); - } - } - - // Global: JMS destinaion - LongFunction jmsDestinationFunc = (l) -> null; - try { - LongFunction finalTopicUriFunc = topicUriFunc; - jmsDestinationFunc = (l) -> pulsarJmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l)); - } - catch (JMSRuntimeException ex) { - throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!"); - } - - if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_SEND.label)) { - return resolveMsgSend(asyncApiFunc, jmsDestinationFunc); - } /*else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_READ.label)) { - return resolveMsgConsume(topicUriFunc, asyncApiFunc); - } */ - else { - throw new RuntimeException("Unsupported Pulsar operation type"); - } - } - - private LongFunction resolveMsgSend( - LongFunction async_api_func, - LongFunction jmsDestinationFunc - ) { - LongFunction msgBodyFunc; - if (cmdTpl.containsKey("msg_body")) { - if (cmdTpl.isStatic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body"); - } else if (cmdTpl.isDynamic("msg_body")) { - msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l); - } else { - msgBodyFunc = (l) -> null; - } - } else { - throw new RuntimeException("PulsarJMS message send:: \"msg_body\" field must be specified!"); - } - - return new PulsarJmsMsgSendMapper( - pulsarJmsActivity, - async_api_func, - jmsDestinationFunc, - msgBodyFunc); - } -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsMsgSendMapper.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsMsgSendMapper.java deleted file mode 100644 index 4b11e31c4..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsMsgSendMapper.java +++ /dev/null @@ -1,44 +0,0 @@ -package io.nosqlbench.driver.pularjms.ops; - -import io.nosqlbench.driver.pularjms.PulsarJmsActivity; -import io.nosqlbench.engine.api.templating.CommandTemplate; - -import javax.jms.Destination; -import javax.jms.JMSContext; -import java.util.function.LongFunction; - -/** - * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains - * enough state to define a pulsar operation such that it can be executed, measured, and possibly - * retried if needed. - * - * This function doesn't act *as* the operation. It merely maps the construction logic into - * a simple functional type, given the component functions. - * - * For additional parameterization, the command template is also provided. - */ -public class PulsarJmsMsgSendMapper extends PulsarJmsOpMapper { - private final LongFunction msgBodyFunc; - - public PulsarJmsMsgSendMapper(PulsarJmsActivity pulsarJmsActivity, - LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc, - LongFunction msgBodyFunc) { - super(pulsarJmsActivity, asyncApiFunc, jmsDestinationFunc); - this.msgBodyFunc = msgBodyFunc; - } - - @Override - public PulsarJmsOp apply(long value) { - Destination jmsDestination = jmsDestinationFunc.apply(value); - boolean asyncApi = asyncApiFunc.apply(value); - String msgBody = msgBodyFunc.apply(value); - - return new PulsarJmsMsgSendOp( - pulsarJmsActivity, - asyncApi, - jmsDestination, - msgBody - ); - } -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsMsgSendOp.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsMsgSendOp.java deleted file mode 100644 index 3ae017d29..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsMsgSendOp.java +++ /dev/null @@ -1,60 +0,0 @@ -package io.nosqlbench.driver.pularjms.ops; - -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; -import io.nosqlbench.driver.pularjms.PulsarJmsActivity; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import javax.jms.Destination; -import javax.jms.JMSContext; -import javax.jms.JMSProducer; -import java.nio.charset.StandardCharsets; - -public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp { - - private final static Logger logger = LogManager.getLogger(PulsarJmsMsgSendOp.class); - - private final PulsarJmsActivity pulsarActivity; - private final boolean asyncPulsarOp; - private final Destination jmsDestination; - private final JMSContext jmsContext; - private final JMSProducer jmsProducer; - private final String msgBody; - - private final Counter bytesCounter; - private final Histogram messagesizeHistogram; - - public PulsarJmsMsgSendOp(PulsarJmsActivity pulsarActivity, - boolean asyncPulsarOp, - Destination jmsDestination, - String msgBody) { - this.pulsarActivity = pulsarActivity; - this.asyncPulsarOp = asyncPulsarOp; - this.jmsDestination = jmsDestination; - this.jmsContext = pulsarActivity.getJmsContext(); - this.jmsProducer = jmsContext.createProducer(); - this.msgBody = msgBody; - this.bytesCounter = pulsarActivity.getBytesCounter(); - this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram(); - } - - @Override - public void run() { - if ((msgBody == null) || msgBody.isEmpty()) { - throw new RuntimeException("JMS message body can't be empty!"); - } - - int messageSize; - try { - byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8); - messageSize = msgBytes.length; - jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8)); - messagesizeHistogram.update(messageSize); - bytesCounter.inc(messageSize); - } - catch (Exception ex) { - logger.error("Failed to send JMS message - " + msgBody); - } - } -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsOp.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsOp.java deleted file mode 100644 index 43d596dee..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsOp.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.nosqlbench.driver.pularjms.ops; - -/** - * Base type of all Pulsar Operations including Producers and Consumers. - */ -public interface PulsarJmsOp { - - /** - * Execute the operation, invoke the timeTracker when the operation ended. - * The timeTracker can be invoked in a separate thread, it is only used for metrics. - * @param timeTracker - */ - void run(Runnable timeTracker); -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsOpMapper.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsOpMapper.java deleted file mode 100644 index 137deae61..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsOpMapper.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.nosqlbench.driver.pularjms.ops; - -import io.nosqlbench.driver.pularjms.PulsarJmsActivity; - -import javax.jms.Destination; -import java.util.function.LongFunction; - -public abstract class PulsarJmsOpMapper implements LongFunction { - protected final PulsarJmsActivity pulsarJmsActivity; - protected final LongFunction asyncApiFunc; - protected final LongFunction jmsDestinationFunc; - - public PulsarJmsOpMapper(PulsarJmsActivity pulsarJmsActivity, - LongFunction asyncApiFunc, - LongFunction jmsDestinationFunc) - { - this.pulsarJmsActivity = pulsarJmsActivity; - this.asyncApiFunc = asyncApiFunc; - this.jmsDestinationFunc = jmsDestinationFunc; - } -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsTimeTrackOp.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsTimeTrackOp.java deleted file mode 100644 index caa211c1f..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/ops/PulsarJmsTimeTrackOp.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.nosqlbench.driver.pularjms.ops; - -/** - * Base type of all Sync Pulsar Operations including Producers and Consumers. - */ -public abstract class PulsarJmsTimeTrackOp implements PulsarJmsOp { - - public void run(Runnable timeTracker) { - try { - this.run(); - } finally { - timeTracker.run(); - } - } - - public abstract void run(); -} diff --git a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/util/PulsarJmsActivityUtil.java b/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/util/PulsarJmsActivityUtil.java deleted file mode 100644 index 5e9e4102a..000000000 --- a/driver-pulsar-jms/src/main/java/io/nosqlbench/driver/pularjms/util/PulsarJmsActivityUtil.java +++ /dev/null @@ -1,68 +0,0 @@ -package io.nosqlbench.driver.pularjms.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; -import org.apache.pulsar.common.schema.SchemaInfo; -import org.apache.pulsar.common.schema.SchemaType; - -import java.io.IOException; -import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Arrays; -import java.util.Base64; -import java.util.HashMap; -import java.util.stream.Collectors; - -public class PulsarJmsActivityUtil { - - private final static Logger logger = LogManager.getLogger(PulsarJmsActivityUtil.class); - - // Supported message operation types - public enum OP_TYPES { - MSG_SEND("msg_send"), - MSG_READ("msg_read"); - - public final String label; - - OP_TYPES(String label) { - this.label = label; - } - } - public static boolean isValidClientType(String type) { - return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type)); - } - - public enum DOC_LEVEL_PARAMS { - TOPIC_URI("topic_uri"), - ASYNC_API("async_api"); - - public final String label; - - DOC_LEVEL_PARAMS(String label) { - this.label = label; - } - } - public static boolean isValidDocLevelParam(String param) { - return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(param)); - } - - public static String encode(String... strings) { - StringBuilder stringBuilder = new StringBuilder(); - for (String str : strings) { - if (!StringUtils.isBlank(str)) - stringBuilder.append(str).append("::"); - } - - String concatenatedStr = - StringUtils.substringBeforeLast(stringBuilder.toString(), "::"); - - return Base64.getEncoder().encodeToString(concatenatedStr.getBytes()); - } -} - diff --git a/driver-pulsar-jms/src/main/resources/pulsar_jms_bytes.yaml b/driver-pulsar-jms/src/main/resources/pulsar_jms_bytes.yaml deleted file mode 100644 index 138585d29..000000000 --- a/driver-pulsar-jms/src/main/resources/pulsar_jms_bytes.yaml +++ /dev/null @@ -1,21 +0,0 @@ -bindings: - payload: NumberNameToString() #AlphaNumericString(20) - tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt") - namespace: Mod(10); Div(5L); ToString(); Prefix("ns") - core_topic_name: Mod(5); ToString(); Prefix("t") - -# document level parameters that apply to all Pulsar client types: -params: -# topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" - topic_uri: "persistent://public/default/t0" - async_api: "false" - -blocks: - - name: producer-block - tags: - phase: jms_producer - admin_task: false - statements: - - name: s1 - optype: msg_send - msg_body: "{payload}" diff --git a/pom.xml b/pom.xml index 77257faaa..486c4c90e 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,6 @@ driver-jdbc driver-cockroachdb driver-pulsar - driver-pulsar-jms