mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Revert "Add NB support for Pulsar JMS"
This commit is contained in:
parent
63b0a68118
commit
09615afb0b
@ -1,104 +0,0 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>mvn-defaults</artifactId>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<version>4.15.45-SNAPSHOT</version>
|
||||
<relativePath>../mvn-defaults</relativePath>
|
||||
</parent>
|
||||
|
||||
<artifactId>driver-pulsar-jms</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>${project.artifactId}</name>
|
||||
|
||||
<description>
|
||||
A PulsarJMS driver for nosqlbench. This provides the ability to inject synthetic data
|
||||
into a pulsar system via JMS 2.0 compatibile APIs
|
||||
</description>
|
||||
|
||||
<repositories>
|
||||
<repository>
|
||||
<id>datastax-releases-local</id>
|
||||
<name>DataStax Local Releases</name>
|
||||
<url>https://repo.sjc.dsinternal.org/artifactory/datastax-snapshots-local/</url>
|
||||
<releases>
|
||||
<enabled>false</enabled>
|
||||
</releases>
|
||||
<snapshots>
|
||||
<enabled>true</enabled>
|
||||
</snapshots>
|
||||
</repository>
|
||||
</repositories>
|
||||
|
||||
<properties>
|
||||
<pulsar.version>2.7.1</pulsar.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
|
||||
<!-- core dependencies -->
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.pulsar</groupId>
|
||||
<artifactId>pulsar-client</artifactId>
|
||||
<version>${pulsar.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.pulsar</groupId>
|
||||
<artifactId>pulsar-client-admin</artifactId>
|
||||
<version>${pulsar.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>engine-api</artifactId>
|
||||
<version>4.15.45-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.nosqlbench</groupId>
|
||||
<artifactId>driver-stdout</artifactId>
|
||||
<version>4.15.45-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
|
||||
<dependency>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>commons-beanutils</artifactId>
|
||||
<version>1.9.4</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-configuration2</artifactId>
|
||||
<version>2.7</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
|
||||
<dependency>
|
||||
<groupId>org.apache.avro</groupId>
|
||||
<artifactId>avro</artifactId>
|
||||
<version>1.10.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>3.12.0</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<artifactId>pulsar-jms</artifactId>
|
||||
<groupId>com.datastax.oss</groupId>
|
||||
<version>1.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
@ -1,73 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp;
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class PulsarJmsAction implements SyncAction {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(PulsarJmsAction.class);
|
||||
|
||||
private final PulsarJmsActivity activity;
|
||||
private final int slot;
|
||||
|
||||
int maxTries;
|
||||
|
||||
public PulsarJmsAction(PulsarJmsActivity activity, int slot) {
|
||||
this.activity = activity;
|
||||
this.slot = slot;
|
||||
this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int runCycle(long cycle) {
|
||||
// let's fail the action if some async operation failed
|
||||
activity.failOnAsyncOperationFailure();
|
||||
|
||||
long start = System.nanoTime();
|
||||
|
||||
PulsarJmsOp pulsarJmsOp;
|
||||
try (Timer.Context ctx = activity.getBindTimer().time()) {
|
||||
LongFunction<PulsarJmsOp> readyPulsarJmsOp = activity.getSequencer().get(cycle);
|
||||
pulsarJmsOp = readyPulsarJmsOp.apply(cycle);
|
||||
} catch (Exception bindException) {
|
||||
// if diagnostic mode ...
|
||||
activity.getErrorhandler().handleError(bindException, cycle, 0);
|
||||
throw new RuntimeException(
|
||||
"while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
|
||||
);
|
||||
}
|
||||
|
||||
for (int i = 0; i < maxTries; i++) {
|
||||
Timer.Context ctx = activity.getExecuteTimer().time();
|
||||
try {
|
||||
// it is up to the pulsarOp to call Context#close when the activity is executed
|
||||
// this allows us to track time for async operations
|
||||
pulsarJmsOp.run(ctx::close);
|
||||
break;
|
||||
} catch (RuntimeException err) {
|
||||
ErrorDetail errorDetail = activity
|
||||
.getErrorhandler()
|
||||
.handleError(err, cycle, System.nanoTime() - start);
|
||||
if (!errorDetail.isRetryable()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
@ -1,136 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
|
||||
import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp;
|
||||
import io.nosqlbench.driver.pularjms.util.PulsarJmsActivityUtil;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
public class PulsarJmsActivity extends SimpleActivity {
|
||||
|
||||
private final ConcurrentHashMap<String, Destination> jmsDestinations = new ConcurrentHashMap<>();
|
||||
|
||||
// e.g. pulsar://localhost:6650
|
||||
private String pulsarSvcUrl;
|
||||
// e.g. http://localhost:8080
|
||||
private String webSvcUrl;
|
||||
|
||||
private JMSContext jmsContext;
|
||||
|
||||
private OpSequence<OpDispenser<PulsarJmsOp>> sequence;
|
||||
private volatile Throwable asyncOperationFailure;
|
||||
private NBErrorHandler errorhandler;
|
||||
|
||||
private Timer bindTimer;
|
||||
private Timer executeTimer;
|
||||
private Counter bytesCounter;
|
||||
private Histogram messagesizeHistogram;
|
||||
|
||||
public PulsarJmsActivity(ActivityDef activityDef) {
|
||||
super(activityDef);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initActivity() {
|
||||
super.initActivity();
|
||||
|
||||
webSvcUrl =
|
||||
activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080");
|
||||
pulsarSvcUrl =
|
||||
activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
|
||||
|
||||
Map<String, Object> configuration = new HashMap<>();
|
||||
configuration.put("webServiceUrl", webSvcUrl);
|
||||
configuration.put("brokerServiceUrl", pulsarSvcUrl);
|
||||
|
||||
PulsarConnectionFactory factory;
|
||||
try {
|
||||
factory = new PulsarConnectionFactory(configuration);
|
||||
this.jmsContext = factory.createContext();
|
||||
} catch (JMSException e) {
|
||||
throw new RuntimeException("PulsarJMS message send:: Unable to initialize Pulsar connection factory!");
|
||||
}
|
||||
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
|
||||
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
|
||||
|
||||
this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this));
|
||||
setDefaultsFromOpSequence(sequence);
|
||||
onActivityDefUpdate(activityDef);
|
||||
|
||||
this.errorhandler = new NBErrorHandler(
|
||||
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
|
||||
this::getExceptionMetrics
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* If the JMS destination that corresponds to a topic exists, reuse it; Otherwise, create it
|
||||
*
|
||||
* @param pulsarTopic
|
||||
*/
|
||||
public Destination getOrCreateJmsDestination(String pulsarTopic) {
|
||||
String encodedTopicStr = PulsarJmsActivityUtil.encode(pulsarTopic);
|
||||
Destination destination = jmsDestinations.get(encodedTopicStr);
|
||||
|
||||
if ( destination == null ) {
|
||||
destination = jmsContext.createQueue(pulsarTopic);
|
||||
jmsDestinations.put(encodedTopicStr, destination);
|
||||
}
|
||||
|
||||
return destination;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); }
|
||||
public OpSequence<OpDispenser<PulsarJmsOp>> getSequencer() { return sequence; }
|
||||
|
||||
public String getPulsarSvcUrl() {
|
||||
return pulsarSvcUrl;
|
||||
}
|
||||
public String getWebSvcUrl() { return webSvcUrl; }
|
||||
public JMSContext getJmsContext() { return jmsContext; }
|
||||
|
||||
public Timer getBindTimer() {
|
||||
return bindTimer;
|
||||
}
|
||||
public Timer getExecuteTimer() {
|
||||
return this.executeTimer;
|
||||
}
|
||||
public Counter getBytesCounter() {
|
||||
return bytesCounter;
|
||||
}
|
||||
public Histogram getMessagesizeHistogram() {
|
||||
return messagesizeHistogram;
|
||||
}
|
||||
|
||||
public NBErrorHandler getErrorhandler() {
|
||||
return errorhandler;
|
||||
}
|
||||
|
||||
public void failOnAsyncOperationFailure() {
|
||||
if (asyncOperationFailure != null) {
|
||||
throw new RuntimeException(asyncOperationFailure);
|
||||
}
|
||||
}
|
||||
public void asyncOperationFailed(Throwable ex) {
|
||||
this.asyncOperationFailure = asyncOperationFailure;
|
||||
}
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.Action;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActivityType;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
|
||||
@Service(value = ActivityType.class, selector = "pulsar_jms")
|
||||
public class PulsarJmsActivityType implements ActivityType<PulsarJmsActivity> {
|
||||
@Override
|
||||
public ActionDispenser getActionDispenser(PulsarJmsActivity activity) {
|
||||
return new PulsarJmsActionDispenser(activity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PulsarJmsActivity getActivity(ActivityDef activityDef) {
|
||||
return new PulsarJmsActivity(activityDef);
|
||||
}
|
||||
|
||||
private static class PulsarJmsActionDispenser implements ActionDispenser {
|
||||
private final PulsarJmsActivity activity;
|
||||
public PulsarJmsActionDispenser(PulsarJmsActivity activity) {
|
||||
this.activity = activity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Action getAction(int slot) {
|
||||
return new PulsarJmsAction(activity, slot);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,106 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms;
|
||||
|
||||
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
|
||||
import io.nosqlbench.driver.pularjms.ops.PulsarJmsMsgSendMapper;
|
||||
import io.nosqlbench.driver.pularjms.ops.PulsarJmsOp;
|
||||
import io.nosqlbench.driver.pularjms.util.PulsarJmsActivityUtil;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.commons.lang3.BooleanUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSRuntimeException;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class ReadyPulsarJmsOp implements OpDispenser<PulsarJmsOp> {
|
||||
|
||||
private final OpTemplate opTpl;
|
||||
private final CommandTemplate cmdTpl;
|
||||
private final LongFunction<PulsarJmsOp> opFunc;
|
||||
private final PulsarJmsActivity pulsarJmsActivity;
|
||||
|
||||
public ReadyPulsarJmsOp(OpTemplate opTemplate, PulsarJmsActivity pulsarJmsActivity) {
|
||||
this.opTpl = opTemplate;
|
||||
this.cmdTpl = new CommandTemplate(opTpl);
|
||||
this.pulsarJmsActivity = pulsarJmsActivity;
|
||||
|
||||
this.opFunc = resolve();
|
||||
}
|
||||
|
||||
public PulsarJmsOp apply(long value) {
|
||||
return opFunc.apply(value);
|
||||
}
|
||||
|
||||
public LongFunction<PulsarJmsOp> resolve() {
|
||||
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
|
||||
throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
|
||||
}
|
||||
String stmtOpType = cmdTpl.getStatic("optype");
|
||||
|
||||
// Global/Doc-level parameter: topic_uri
|
||||
LongFunction<String> topicUriFunc = (l) -> null;
|
||||
if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
|
||||
if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
|
||||
topicUriFunc = (l) -> cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label);
|
||||
} else {
|
||||
topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l);
|
||||
}
|
||||
}
|
||||
|
||||
// Global/Doc-level parameter: async_api
|
||||
LongFunction<Boolean> asyncApiFunc = (l) -> false;
|
||||
if (cmdTpl.containsKey(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
|
||||
if (cmdTpl.isStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
|
||||
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
|
||||
asyncApiFunc = (l) -> value;
|
||||
} else {
|
||||
throw new RuntimeException("\"" + PulsarJmsActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
|
||||
}
|
||||
}
|
||||
|
||||
// Global: JMS destinaion
|
||||
LongFunction<Destination> jmsDestinationFunc = (l) -> null;
|
||||
try {
|
||||
LongFunction<String> finalTopicUriFunc = topicUriFunc;
|
||||
jmsDestinationFunc = (l) -> pulsarJmsActivity.getOrCreateJmsDestination(finalTopicUriFunc.apply(l));
|
||||
}
|
||||
catch (JMSRuntimeException ex) {
|
||||
throw new RuntimeException("PulsarJMS message send:: unable to create JMS desit!");
|
||||
}
|
||||
|
||||
if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_SEND.label)) {
|
||||
return resolveMsgSend(asyncApiFunc, jmsDestinationFunc);
|
||||
} /*else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarJmsActivityUtil.OP_TYPES.MSG_READ.label)) {
|
||||
return resolveMsgConsume(topicUriFunc, asyncApiFunc);
|
||||
} */
|
||||
else {
|
||||
throw new RuntimeException("Unsupported Pulsar operation type");
|
||||
}
|
||||
}
|
||||
|
||||
private LongFunction<PulsarJmsOp> resolveMsgSend(
|
||||
LongFunction<Boolean> async_api_func,
|
||||
LongFunction<Destination> jmsDestinationFunc
|
||||
) {
|
||||
LongFunction<String> msgBodyFunc;
|
||||
if (cmdTpl.containsKey("msg_body")) {
|
||||
if (cmdTpl.isStatic("msg_body")) {
|
||||
msgBodyFunc = (l) -> cmdTpl.getStatic("msg_body");
|
||||
} else if (cmdTpl.isDynamic("msg_body")) {
|
||||
msgBodyFunc = (l) -> cmdTpl.getDynamic("msg_body", l);
|
||||
} else {
|
||||
msgBodyFunc = (l) -> null;
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("PulsarJMS message send:: \"msg_body\" field must be specified!");
|
||||
}
|
||||
|
||||
return new PulsarJmsMsgSendMapper(
|
||||
pulsarJmsActivity,
|
||||
async_api_func,
|
||||
jmsDestinationFunc,
|
||||
msgBodyFunc);
|
||||
}
|
||||
}
|
@ -1,44 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms.ops;
|
||||
|
||||
import io.nosqlbench.driver.pularjms.PulsarJmsActivity;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSContext;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
/**
|
||||
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
|
||||
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
|
||||
* retried if needed.
|
||||
*
|
||||
* This function doesn't act *as* the operation. It merely maps the construction logic into
|
||||
* a simple functional type, given the component functions.
|
||||
*
|
||||
* For additional parameterization, the command template is also provided.
|
||||
*/
|
||||
public class PulsarJmsMsgSendMapper extends PulsarJmsOpMapper {
|
||||
private final LongFunction<String> msgBodyFunc;
|
||||
|
||||
public PulsarJmsMsgSendMapper(PulsarJmsActivity pulsarJmsActivity,
|
||||
LongFunction<Boolean> asyncApiFunc,
|
||||
LongFunction<Destination> jmsDestinationFunc,
|
||||
LongFunction<String> msgBodyFunc) {
|
||||
super(pulsarJmsActivity, asyncApiFunc, jmsDestinationFunc);
|
||||
this.msgBodyFunc = msgBodyFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PulsarJmsOp apply(long value) {
|
||||
Destination jmsDestination = jmsDestinationFunc.apply(value);
|
||||
boolean asyncApi = asyncApiFunc.apply(value);
|
||||
String msgBody = msgBodyFunc.apply(value);
|
||||
|
||||
return new PulsarJmsMsgSendOp(
|
||||
pulsarJmsActivity,
|
||||
asyncApi,
|
||||
jmsDestination,
|
||||
msgBody
|
||||
);
|
||||
}
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms.ops;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.driver.pularjms.PulsarJmsActivity;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSProducer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class PulsarJmsMsgSendOp extends PulsarJmsTimeTrackOp {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(PulsarJmsMsgSendOp.class);
|
||||
|
||||
private final PulsarJmsActivity pulsarActivity;
|
||||
private final boolean asyncPulsarOp;
|
||||
private final Destination jmsDestination;
|
||||
private final JMSContext jmsContext;
|
||||
private final JMSProducer jmsProducer;
|
||||
private final String msgBody;
|
||||
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
|
||||
public PulsarJmsMsgSendOp(PulsarJmsActivity pulsarActivity,
|
||||
boolean asyncPulsarOp,
|
||||
Destination jmsDestination,
|
||||
String msgBody) {
|
||||
this.pulsarActivity = pulsarActivity;
|
||||
this.asyncPulsarOp = asyncPulsarOp;
|
||||
this.jmsDestination = jmsDestination;
|
||||
this.jmsContext = pulsarActivity.getJmsContext();
|
||||
this.jmsProducer = jmsContext.createProducer();
|
||||
this.msgBody = msgBody;
|
||||
this.bytesCounter = pulsarActivity.getBytesCounter();
|
||||
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if ((msgBody == null) || msgBody.isEmpty()) {
|
||||
throw new RuntimeException("JMS message body can't be empty!");
|
||||
}
|
||||
|
||||
int messageSize;
|
||||
try {
|
||||
byte[] msgBytes = msgBody.getBytes(StandardCharsets.UTF_8);
|
||||
messageSize = msgBytes.length;
|
||||
jmsProducer.send(jmsDestination, msgBody.getBytes(StandardCharsets.UTF_8));
|
||||
messagesizeHistogram.update(messageSize);
|
||||
bytesCounter.inc(messageSize);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Failed to send JMS message - " + msgBody);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,14 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms.ops;
|
||||
|
||||
/**
|
||||
* Base type of all Pulsar Operations including Producers and Consumers.
|
||||
*/
|
||||
public interface PulsarJmsOp {
|
||||
|
||||
/**
|
||||
* Execute the operation, invoke the timeTracker when the operation ended.
|
||||
* The timeTracker can be invoked in a separate thread, it is only used for metrics.
|
||||
* @param timeTracker
|
||||
*/
|
||||
void run(Runnable timeTracker);
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms.ops;
|
||||
|
||||
import io.nosqlbench.driver.pularjms.PulsarJmsActivity;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public abstract class PulsarJmsOpMapper implements LongFunction<PulsarJmsOp> {
|
||||
protected final PulsarJmsActivity pulsarJmsActivity;
|
||||
protected final LongFunction<Boolean> asyncApiFunc;
|
||||
protected final LongFunction<Destination> jmsDestinationFunc;
|
||||
|
||||
public PulsarJmsOpMapper(PulsarJmsActivity pulsarJmsActivity,
|
||||
LongFunction<Boolean> asyncApiFunc,
|
||||
LongFunction<Destination> jmsDestinationFunc)
|
||||
{
|
||||
this.pulsarJmsActivity = pulsarJmsActivity;
|
||||
this.asyncApiFunc = asyncApiFunc;
|
||||
this.jmsDestinationFunc = jmsDestinationFunc;
|
||||
}
|
||||
}
|
@ -1,17 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms.ops;
|
||||
|
||||
/**
|
||||
* Base type of all Sync Pulsar Operations including Producers and Consumers.
|
||||
*/
|
||||
public abstract class PulsarJmsTimeTrackOp implements PulsarJmsOp {
|
||||
|
||||
public void run(Runnable timeTracker) {
|
||||
try {
|
||||
this.run();
|
||||
} finally {
|
||||
timeTracker.run();
|
||||
}
|
||||
}
|
||||
|
||||
public abstract void run();
|
||||
}
|
@ -1,68 +0,0 @@
|
||||
package io.nosqlbench.driver.pularjms.util;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
|
||||
import org.apache.pulsar.common.schema.SchemaInfo;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class PulsarJmsActivityUtil {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(PulsarJmsActivityUtil.class);
|
||||
|
||||
// Supported message operation types
|
||||
public enum OP_TYPES {
|
||||
MSG_SEND("msg_send"),
|
||||
MSG_READ("msg_read");
|
||||
|
||||
public final String label;
|
||||
|
||||
OP_TYPES(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidClientType(String type) {
|
||||
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||
}
|
||||
|
||||
public enum DOC_LEVEL_PARAMS {
|
||||
TOPIC_URI("topic_uri"),
|
||||
ASYNC_API("async_api");
|
||||
|
||||
public final String label;
|
||||
|
||||
DOC_LEVEL_PARAMS(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidDocLevelParam(String param) {
|
||||
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(param));
|
||||
}
|
||||
|
||||
public static String encode(String... strings) {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
for (String str : strings) {
|
||||
if (!StringUtils.isBlank(str))
|
||||
stringBuilder.append(str).append("::");
|
||||
}
|
||||
|
||||
String concatenatedStr =
|
||||
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
|
||||
|
||||
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
|
||||
}
|
||||
}
|
||||
|
@ -1,21 +0,0 @@
|
||||
bindings:
|
||||
payload: NumberNameToString() #AlphaNumericString(20)
|
||||
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
|
||||
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
|
||||
core_topic_name: Mod(5); ToString(); Prefix("t")
|
||||
|
||||
# document level parameters that apply to all Pulsar client types:
|
||||
params:
|
||||
# topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
|
||||
topic_uri: "persistent://public/default/t0"
|
||||
async_api: "false"
|
||||
|
||||
blocks:
|
||||
- name: producer-block
|
||||
tags:
|
||||
phase: jms_producer
|
||||
admin_task: false
|
||||
statements:
|
||||
- name: s1
|
||||
optype: msg_send
|
||||
msg_body: "{payload}"
|
Loading…
Reference in New Issue
Block a user