mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-11-29 03:54:01 -06:00
Merge pull request #303 from eolivelli/pulsar/transactions
Pulsar: first prototype of transaction support
This commit is contained in:
commit
003162b4ac
@ -31,6 +31,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
public Timer executeTimer;
|
||||
public Counter bytesCounter;
|
||||
public Histogram messagesizeHistogram;
|
||||
public Timer createTransactionTimer;
|
||||
public Timer commitTransactionTimer;
|
||||
|
||||
private PulsarSpaceCache pulsarCache;
|
||||
private PulsarAdmin pulsarAdmin;
|
||||
@ -110,6 +112,9 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
createTransactionTimer = ActivityMetrics.timer(activityDef, "createtransaction");
|
||||
commitTransactionTimer = ActivityMetrics.timer(activityDef, "committransaction");
|
||||
|
||||
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
|
||||
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
|
||||
|
||||
@ -120,7 +125,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
pulsarSvcUrl =
|
||||
activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
|
||||
webSvcUrl =
|
||||
activityDef.getParams().getOptionalString("web_url").orElse("pulsar://localhost:8080");
|
||||
activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080");
|
||||
|
||||
initPulsarAdmin();
|
||||
|
||||
@ -173,6 +178,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
return bytesCounter;
|
||||
}
|
||||
|
||||
public Timer getCreateTransactionTimer() {
|
||||
return createTransactionTimer;
|
||||
}
|
||||
|
||||
public Timer getCommitTransactionTimer() {
|
||||
return commitTransactionTimer;
|
||||
}
|
||||
|
||||
public Histogram getMessagesizeHistogram() {
|
||||
return messagesizeHistogram;
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar;
|
||||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
@ -14,14 +15,18 @@ import org.apache.pulsar.client.admin.Clusters;
|
||||
import org.apache.pulsar.client.admin.PulsarAdmin;
|
||||
import org.apache.pulsar.client.admin.PulsarAdminException;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
|
||||
@ -43,6 +48,7 @@ public class PulsarSpace {
|
||||
private final String pulsarSvcUrl;
|
||||
private final String webSvcUrl;
|
||||
private final PulsarAdmin pulsarAdmin;
|
||||
private final Timer createTransactionTimer;
|
||||
|
||||
private final Set<String> pulsarClusterMetadata = new HashSet<>();
|
||||
|
||||
@ -55,13 +61,15 @@ public class PulsarSpace {
|
||||
String pulsarSvcUrl,
|
||||
String webSvcUrl,
|
||||
PulsarAdmin pulsarAdmin,
|
||||
ActivityDef activityDef) {
|
||||
ActivityDef activityDef,
|
||||
Timer createTransactionTimer) {
|
||||
this.spaceName = name;
|
||||
this.pulsarNBClientConf = pulsarClientConf;
|
||||
this.pulsarSvcUrl = pulsarSvcUrl;
|
||||
this.webSvcUrl = webSvcUrl;
|
||||
this.pulsarAdmin = pulsarAdmin;
|
||||
this.activityDef = activityDef;
|
||||
this.createTransactionTimer = createTransactionTimer;
|
||||
|
||||
createPulsarClientFromConf();
|
||||
createPulsarSchemaFromConf();
|
||||
@ -204,6 +212,28 @@ public class PulsarSpace {
|
||||
return "";
|
||||
}
|
||||
|
||||
|
||||
public Supplier<Transaction> getTransactionSupplier() {
|
||||
PulsarClient pulsarClient = getPulsarClient();
|
||||
return () -> {
|
||||
try (Timer.Context time = createTransactionTimer.time(); ){
|
||||
return pulsarClient
|
||||
.newTransaction()
|
||||
.build()
|
||||
.get();
|
||||
} catch (ExecutionException | InterruptedException err) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("Error while starting a new transaction", err);
|
||||
}
|
||||
throw new RuntimeException(err);
|
||||
} catch (NullPointerException err) { // Unfortunately Pulsar 2.7.1 client does not report a better error
|
||||
throw new RuntimeException("Transactions are not enabled on Pulsar Client, " +
|
||||
"please set client.enableTransaction=true in your Pulsar Client configuration");
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) {
|
||||
String topicName = getEffectiveProducerTopicName(cycleTopicName);
|
||||
String producerName = getEffectiveProducerName(cycleProducerName);
|
||||
|
@ -28,7 +28,8 @@ public class PulsarSpaceCache {
|
||||
activity.getPulsarSvcUrl(),
|
||||
activity.getWebSvcUrl(),
|
||||
activity.getPulsarAdmin(),
|
||||
activity.getActivityDef()
|
||||
activity.getActivityDef(),
|
||||
activity.getCreateTransactionTimer()
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -2,12 +2,15 @@ package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
|
||||
@ -23,23 +26,34 @@ public class PulsarConsumerMapper extends PulsarOpMapper {
|
||||
private final LongFunction<Consumer<?>> consumerFunc;
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
private final LongFunction<Boolean> useTransactionFunc;
|
||||
private final LongFunction<Supplier<Transaction>> transactionSupplierFunc;
|
||||
private final Timer transactionCommitTimer;
|
||||
|
||||
public PulsarConsumerMapper(CommandTemplate cmdTpl,
|
||||
PulsarSpace clientSpace,
|
||||
LongFunction<Boolean> asyncApiFunc,
|
||||
LongFunction<Consumer<?>> consumerFunc,
|
||||
Counter bytesCounter,
|
||||
Histogram messagesizeHistogram) {
|
||||
Histogram messagesizeHistogram,
|
||||
Timer transactionCommitTimer,
|
||||
LongFunction<Boolean> useTransactionFunc,
|
||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc) {
|
||||
super(cmdTpl, clientSpace, asyncApiFunc);
|
||||
this.consumerFunc = consumerFunc;
|
||||
this.bytesCounter = bytesCounter;
|
||||
this.messagesizeHistogram = messagesizeHistogram;
|
||||
this.transactionCommitTimer = transactionCommitTimer;
|
||||
this.useTransactionFunc = useTransactionFunc;
|
||||
this.transactionSupplierFunc = transactionSupplierFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PulsarOp apply(long value) {
|
||||
Consumer<?> consumer = consumerFunc.apply(value);
|
||||
boolean asyncApi = asyncApiFunc.apply(value);
|
||||
boolean useTransaction = useTransactionFunc.apply(value);
|
||||
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
|
||||
|
||||
return new PulsarConsumerOp(
|
||||
consumer,
|
||||
@ -47,7 +61,10 @@ public class PulsarConsumerMapper extends PulsarOpMapper {
|
||||
asyncApi,
|
||||
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
|
||||
bytesCounter,
|
||||
messagesizeHistogram
|
||||
messagesizeHistogram,
|
||||
useTransaction,
|
||||
transactionSupplier,
|
||||
transactionCommitTimer
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -2,14 +2,18 @@ package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class PulsarConsumerOp extends SyncPulsarOp {
|
||||
|
||||
@ -21,16 +25,25 @@ public class PulsarConsumerOp extends SyncPulsarOp {
|
||||
private final int timeoutSeconds;
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
private final boolean useTransaction;
|
||||
private final Supplier<Transaction> transactionSupplier;
|
||||
private final Timer transactionCommitTimer;
|
||||
|
||||
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds,
|
||||
Counter bytesCounter,
|
||||
Histogram messagesizeHistogram) {
|
||||
Histogram messagesizeHistogram,
|
||||
boolean useTransaction,
|
||||
Supplier<Transaction> transactionSupplier,
|
||||
Timer transactionCommitTimer) {
|
||||
this.consumer = consumer;
|
||||
this.pulsarSchema = schema;
|
||||
this.asyncPulsarOp = asyncPulsarOp;
|
||||
this.timeoutSeconds = timeoutSeconds;
|
||||
this.bytesCounter = bytesCounter;
|
||||
this.messagesizeHistogram = messagesizeHistogram;
|
||||
this.useTransaction = useTransaction;
|
||||
this.transactionSupplier = transactionSupplier;
|
||||
this.transactionCommitTimer = transactionCommitTimer;
|
||||
}
|
||||
|
||||
public void syncConsume() {
|
||||
@ -64,7 +77,22 @@ public class PulsarConsumerOp extends SyncPulsarOp {
|
||||
int messagesize = message.getData().length;
|
||||
bytesCounter.inc(messagesize);
|
||||
messagesizeHistogram.update(messagesize);
|
||||
consumer.acknowledge(message.getMessageId());
|
||||
|
||||
|
||||
if (useTransaction) {
|
||||
Transaction transaction = transactionSupplier.get();
|
||||
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
|
||||
|
||||
// little problem: here we are counting the "commit" time
|
||||
// inside the overall time spent for the execution of the consume operation
|
||||
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
|
||||
// to track with precision the time spent for the operation and for the commit
|
||||
try (Timer.Context ctx = transactionCommitTimer.time()) {
|
||||
transaction.commit().get();
|
||||
}
|
||||
} else{
|
||||
consumer.acknowledge(message.getMessageId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -7,8 +7,10 @@ import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
|
||||
@ -25,6 +27,8 @@ public class PulsarProducerMapper extends PulsarOpMapper {
|
||||
private final LongFunction<String> keyFunc;
|
||||
private final LongFunction<String> payloadFunc;
|
||||
private final PulsarActivity pulsarActivity;
|
||||
private final LongFunction<Boolean> useTransactionFunc;
|
||||
private final LongFunction<Supplier<Transaction>> transactionSupplierFunc;
|
||||
|
||||
public PulsarProducerMapper(CommandTemplate cmdTpl,
|
||||
PulsarSpace clientSpace,
|
||||
@ -32,12 +36,16 @@ public class PulsarProducerMapper extends PulsarOpMapper {
|
||||
LongFunction<Producer<?>> producerFunc,
|
||||
LongFunction<String> keyFunc,
|
||||
LongFunction<String> payloadFunc,
|
||||
LongFunction<Boolean> useTransactionFunc,
|
||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
|
||||
PulsarActivity pulsarActivity) {
|
||||
super(cmdTpl, clientSpace, asyncApiFunc);
|
||||
this.producerFunc = producerFunc;
|
||||
this.keyFunc = keyFunc;
|
||||
this.payloadFunc = payloadFunc;
|
||||
this.pulsarActivity = pulsarActivity;
|
||||
this.useTransactionFunc = useTransactionFunc;
|
||||
this.transactionSupplierFunc = transactionSupplierFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -46,11 +54,14 @@ public class PulsarProducerMapper extends PulsarOpMapper {
|
||||
boolean asyncApi = asyncApiFunc.apply(value);
|
||||
String msgKey = keyFunc.apply(value);
|
||||
String msgPayload = payloadFunc.apply(value);
|
||||
|
||||
boolean useTransaction = useTransactionFunc.apply(value);
|
||||
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
|
||||
return new PulsarProducerOp(
|
||||
producer,
|
||||
clientSpace.getPulsarSchema(),
|
||||
asyncApi,
|
||||
useTransaction,
|
||||
transactionSupplier,
|
||||
msgKey,
|
||||
msgPayload,
|
||||
pulsarActivity
|
||||
|
@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.driver.pulsar.PulsarActivity;
|
||||
import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
@ -9,11 +10,14 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
import org.apache.pulsar.client.api.schema.GenericRecord;
|
||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class PulsarProducerOp implements PulsarOp {
|
||||
|
||||
@ -27,10 +31,14 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
private final Counter bytesCounter;
|
||||
private final Histogram messagesizeHistogram;
|
||||
private final PulsarActivity pulsarActivity;
|
||||
private final boolean useTransaction;
|
||||
private final Supplier<Transaction> transactionSupplier;
|
||||
|
||||
public PulsarProducerOp(Producer<?> producer,
|
||||
Schema<?> schema,
|
||||
boolean asyncPulsarOp,
|
||||
boolean useTransaction,
|
||||
Supplier<Transaction> transactionSupplier,
|
||||
String key,
|
||||
String payload,
|
||||
PulsarActivity pulsarActivity) {
|
||||
@ -42,6 +50,8 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
this.pulsarActivity = pulsarActivity;
|
||||
this.bytesCounter = pulsarActivity.getBytesCounter();
|
||||
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
|
||||
this.useTransaction = useTransaction;
|
||||
this.transactionSupplier = transactionSupplier;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -49,8 +59,16 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
if ((msgPayload == null) || msgPayload.isEmpty()) {
|
||||
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
|
||||
}
|
||||
|
||||
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
|
||||
TypedMessageBuilder typedMessageBuilder;
|
||||
final Transaction transaction;
|
||||
if (useTransaction) {
|
||||
// if you are in a transaction you cannot set the schema per-message
|
||||
transaction = transactionSupplier.get();
|
||||
typedMessageBuilder = producer.newMessage(transaction);
|
||||
} else {
|
||||
transaction = null;
|
||||
typedMessageBuilder = producer.newMessage(pulsarSchema);
|
||||
}
|
||||
if ((msgKey != null) && (!msgKey.isEmpty())) {
|
||||
typedMessageBuilder = typedMessageBuilder.key(msgKey);
|
||||
}
|
||||
@ -79,7 +97,12 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
try {
|
||||
logger.trace("sending message");
|
||||
typedMessageBuilder.send();
|
||||
} catch (PulsarClientException pce) {
|
||||
if (useTransaction) {
|
||||
try (Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();) {
|
||||
transaction.commit().get();
|
||||
}
|
||||
}
|
||||
} catch (PulsarClientException | ExecutionException | InterruptedException pce) {
|
||||
logger.trace("failed sending message");
|
||||
throw new RuntimeException(pce);
|
||||
}
|
||||
@ -87,8 +110,23 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
} else {
|
||||
try {
|
||||
// we rely on blockIfQueueIsFull in order to throttle the request in this case
|
||||
CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
|
||||
future.whenComplete((messageId, error) -> timeTracker.run()).exceptionally(ex -> {
|
||||
CompletableFuture<?> future = typedMessageBuilder.sendAsync();
|
||||
if (useTransaction) {
|
||||
// add commit step
|
||||
future = future.thenCompose(msg -> {
|
||||
Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();;
|
||||
return transaction
|
||||
.commit()
|
||||
.whenComplete((m,e) -> {
|
||||
ctx.close();
|
||||
})
|
||||
.thenApply(v-> msg);
|
||||
}
|
||||
);
|
||||
}
|
||||
future.whenComplete((messageId, error) -> {
|
||||
timeTracker.run();
|
||||
}).exceptionally(ex -> {
|
||||
logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
|
||||
pulsarActivity.asyncOperationFailed(ex);
|
||||
return null;
|
||||
|
@ -7,17 +7,21 @@ import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.commons.lang3.BooleanUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.Reader;
|
||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(ReadyPulsarOp.class);
|
||||
private final OpTemplate opTpl;
|
||||
private final CommandTemplate cmdTpl;
|
||||
private final PulsarSpace clientSpace;
|
||||
@ -80,11 +84,25 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
// Global parameter: async_api
|
||||
LongFunction<Boolean> asyncApiFunc = (l) -> false;
|
||||
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
|
||||
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label))
|
||||
asyncApiFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
|
||||
else
|
||||
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
|
||||
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
|
||||
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
|
||||
asyncApiFunc = (l) -> value;
|
||||
} else {
|
||||
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
|
||||
}
|
||||
}
|
||||
logger.info("async_api: {}", asyncApiFunc.apply(0));
|
||||
|
||||
LongFunction<Boolean> useTransactionFunc = (l) -> false;
|
||||
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) {
|
||||
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) {
|
||||
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label));
|
||||
useTransactionFunc = (l) -> value;
|
||||
} else {
|
||||
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label + "\" parameter cannot be dynamic!");
|
||||
}
|
||||
}
|
||||
logger.info("use_transaction: {}", useTransactionFunc.apply(0));
|
||||
|
||||
// Global parameter: admin_delop
|
||||
LongFunction<Boolean> adminDelOpFunc = (l) -> false;
|
||||
@ -103,9 +121,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TOPIC.label)) {
|
||||
return resolveAdminTopic(clientSpace, topicUriFunc, asyncApiFunc, adminDelOpFunc);
|
||||
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
|
||||
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
|
||||
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc);
|
||||
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
|
||||
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc);
|
||||
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc);
|
||||
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
|
||||
return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc);
|
||||
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) {
|
||||
@ -230,7 +248,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
private LongFunction<PulsarOp> resolveMsgSend(
|
||||
PulsarSpace clientSpace,
|
||||
LongFunction<String> topic_uri_func,
|
||||
LongFunction<Boolean> async_api_func
|
||||
LongFunction<Boolean> async_api_func,
|
||||
LongFunction<Boolean> useTransactionFunc
|
||||
) {
|
||||
LongFunction<String> cycle_producer_name_func;
|
||||
if (cmdTpl.isStatic("producer_name")) {
|
||||
@ -244,6 +263,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
LongFunction<Producer<?>> producerFunc =
|
||||
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l));
|
||||
|
||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
|
||||
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
|
||||
|
||||
LongFunction<String> keyFunc;
|
||||
if (cmdTpl.isStatic("msg_key")) {
|
||||
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
|
||||
@ -273,13 +295,16 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
producerFunc,
|
||||
keyFunc,
|
||||
valueFunc,
|
||||
useTransactionFunc,
|
||||
transactionSupplierFunc,
|
||||
pulsarActivity);
|
||||
}
|
||||
|
||||
private LongFunction<PulsarOp> resolveMsgConsume(
|
||||
PulsarSpace clientSpace,
|
||||
LongFunction<String> topic_uri_func,
|
||||
LongFunction<Boolean> async_api_func
|
||||
LongFunction<Boolean> async_api_func,
|
||||
LongFunction<Boolean> useTransactionFunc
|
||||
) {
|
||||
// Topic list (multi-topic)
|
||||
LongFunction<String> topic_names_func;
|
||||
@ -328,6 +353,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
consumer_name_func = (l) -> null;
|
||||
}
|
||||
|
||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
|
||||
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
|
||||
|
||||
LongFunction<Consumer<?>> consumerFunc = (l) ->
|
||||
clientSpace.getConsumer(
|
||||
topic_uri_func.apply(l),
|
||||
@ -339,7 +367,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
||||
);
|
||||
|
||||
return new PulsarConsumerMapper(cmdTpl, clientSpace, async_api_func, consumerFunc,
|
||||
pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram());
|
||||
pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram(), pulsarActivity.getCommitTransactionTimer(),
|
||||
useTransactionFunc, transactionSupplierFunc);
|
||||
}
|
||||
|
||||
private LongFunction<PulsarOp> resolveMsgRead(
|
||||
|
@ -49,6 +49,7 @@ public class PulsarActivityUtil {
|
||||
public enum DOC_LEVEL_PARAMS {
|
||||
TOPIC_URI("topic_uri"),
|
||||
ASYNC_API("async_api"),
|
||||
USE_TRANSACTION("use_transaction"),
|
||||
ADMIN_DELOP("admin_delop");
|
||||
|
||||
public final String label;
|
||||
|
Loading…
Reference in New Issue
Block a user