From 30e4942341072a2a54cbe865e26996842a698510 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 21 Apr 2021 12:48:46 +0200 Subject: [PATCH] Pulsar: first prototype of transaction support --- .../driver/pulsar/PulsarActivity.java | 15 +++++- .../nosqlbench/driver/pulsar/PulsarSpace.java | 34 ++++++++++++- .../driver/pulsar/PulsarSpaceCache.java | 3 +- .../pulsar/ops/PulsarConsumerMapper.java | 21 +++++++- .../driver/pulsar/ops/PulsarConsumerOp.java | 32 +++++++++++- .../pulsar/ops/PulsarProducerMapper.java | 13 ++++- .../driver/pulsar/ops/PulsarProducerOp.java | 48 ++++++++++++++++-- .../driver/pulsar/ops/ReadyPulsarOp.java | 49 +++++++++++++++---- .../pulsar/util/PulsarActivityUtil.java | 1 + 9 files changed, 192 insertions(+), 24 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index 7e6c614c5..76ea6a7f5 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -31,6 +31,8 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Timer executeTimer; public Counter bytesCounter; public Histogram messagesizeHistogram; + public Timer createTransactionTimer; + public Timer commitTransactionTimer; private PulsarSpaceCache pulsarCache; private PulsarAdmin pulsarAdmin; @@ -110,6 +112,9 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve bindTimer = ActivityMetrics.timer(activityDef, "bind"); executeTimer = ActivityMetrics.timer(activityDef, "execute"); + createTransactionTimer = ActivityMetrics.timer(activityDef, "createtransaction"); + commitTransactionTimer = ActivityMetrics.timer(activityDef, "committransaction"); + bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); @@ -120,7 +125,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve pulsarSvcUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); webSvcUrl = - activityDef.getParams().getOptionalString("web_url").orElse("pulsar://localhost:8080"); + activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080"); initPulsarAdmin(); @@ -173,6 +178,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve return bytesCounter; } + public Timer getCreateTransactionTimer() { + return createTransactionTimer; + } + + public Timer getCommitTransactionTimer() { + return commitTransactionTimer; + } + public Histogram getMessagesizeHistogram() { return messagesizeHistogram; } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index e6bd7bcb9..8f0bbab09 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -1,6 +1,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Gauge; +import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.engine.api.activityimpl.ActivityDef; @@ -14,14 +15,18 @@ import org.apache.pulsar.client.admin.Clusters; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.transaction.Transaction; -import java.util.*; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.function.Function; +import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -43,6 +48,7 @@ public class PulsarSpace { private final String pulsarSvcUrl; private final String webSvcUrl; private final PulsarAdmin pulsarAdmin; + private final Timer createTransactionTimer; private final Set pulsarClusterMetadata = new HashSet<>(); @@ -55,13 +61,15 @@ public class PulsarSpace { String pulsarSvcUrl, String webSvcUrl, PulsarAdmin pulsarAdmin, - ActivityDef activityDef) { + ActivityDef activityDef, + Timer createTransactionTimer) { this.spaceName = name; this.pulsarNBClientConf = pulsarClientConf; this.pulsarSvcUrl = pulsarSvcUrl; this.webSvcUrl = webSvcUrl; this.pulsarAdmin = pulsarAdmin; this.activityDef = activityDef; + this.createTransactionTimer = createTransactionTimer; createPulsarClientFromConf(); createPulsarSchemaFromConf(); @@ -204,6 +212,28 @@ public class PulsarSpace { return ""; } + + public Supplier getTransactionSupplier() { + PulsarClient pulsarClient = getPulsarClient(); + return () -> { + try (Timer.Context time = createTransactionTimer.time(); ){ + return pulsarClient + .newTransaction() + .build() + .get(); + } catch (ExecutionException | InterruptedException err) { + if (logger.isWarnEnabled()) { + logger.warn("Error while starting a new transaction", err); + } + throw new RuntimeException(err); + } catch (NullPointerException err) { // Unfortunately Pulsar 2.7.1 client does not report a better error + throw new RuntimeException("Transactions are not enabled on Pulsar Client, " + + "please set client.enableTransaction=true in your Pulsar Client configuration"); + } + }; + } + + public Producer getProducer(String cycleTopicName, String cycleProducerName) { String topicName = getEffectiveProducerTopicName(cycleTopicName); String producerName = getEffectiveProducerName(cycleProducerName); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java index e434296a4..262384032 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java @@ -28,7 +28,8 @@ public class PulsarSpaceCache { activity.getPulsarSvcUrl(), activity.getWebSvcUrl(), activity.getPulsarAdmin(), - activity.getActivityDef() + activity.getActivityDef(), + activity.getCreateTransactionTimer() )); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index 605a0ae3b..46cc6e2f2 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -2,12 +2,15 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; import java.util.function.LongFunction; +import java.util.function.Supplier; /** * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains @@ -23,23 +26,34 @@ public class PulsarConsumerMapper extends PulsarOpMapper { private final LongFunction> consumerFunc; private final Counter bytesCounter; private final Histogram messagesizeHistogram; + private final LongFunction useTransactionFunc; + private final LongFunction> transactionSupplierFunc; + private final Timer transactionCommitTimer; public PulsarConsumerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, LongFunction asyncApiFunc, LongFunction> consumerFunc, Counter bytesCounter, - Histogram messagesizeHistogram) { + Histogram messagesizeHistogram, + Timer transactionCommitTimer, + LongFunction useTransactionFunc, + LongFunction> transactionSupplierFunc) { super(cmdTpl, clientSpace, asyncApiFunc); this.consumerFunc = consumerFunc; this.bytesCounter = bytesCounter; this.messagesizeHistogram = messagesizeHistogram; + this.transactionCommitTimer = transactionCommitTimer; + this.useTransactionFunc = useTransactionFunc; + this.transactionSupplierFunc = transactionSupplierFunc; } @Override public PulsarOp apply(long value) { Consumer consumer = consumerFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); + boolean useTransaction = useTransactionFunc.apply(value); + Supplier transactionSupplier = transactionSupplierFunc.apply(value); return new PulsarConsumerOp( consumer, @@ -47,7 +61,10 @@ public class PulsarConsumerMapper extends PulsarOpMapper { asyncApi, clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), bytesCounter, - messagesizeHistogram + messagesizeHistogram, + useTransaction, + transactionSupplier, + transactionCommitTimer ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 28b9da842..55c95c9f8 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -2,14 +2,18 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.schema.SchemaType; import java.util.concurrent.TimeUnit; +import java.util.function.LongFunction; +import java.util.function.Supplier; public class PulsarConsumerOp extends SyncPulsarOp { @@ -21,16 +25,25 @@ public class PulsarConsumerOp extends SyncPulsarOp { private final int timeoutSeconds; private final Counter bytesCounter; private final Histogram messagesizeHistogram; + private final boolean useTransaction; + private final Supplier transactionSupplier; + private final Timer transactionCommitTimer; public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp, int timeoutSeconds, Counter bytesCounter, - Histogram messagesizeHistogram) { + Histogram messagesizeHistogram, + boolean useTransaction, + Supplier transactionSupplier, + Timer transactionCommitTimer) { this.consumer = consumer; this.pulsarSchema = schema; this.asyncPulsarOp = asyncPulsarOp; this.timeoutSeconds = timeoutSeconds; this.bytesCounter = bytesCounter; this.messagesizeHistogram = messagesizeHistogram; + this.useTransaction = useTransaction; + this.transactionSupplier = transactionSupplier; + this.transactionCommitTimer = transactionCommitTimer; } public void syncConsume() { @@ -64,7 +77,22 @@ public class PulsarConsumerOp extends SyncPulsarOp { int messagesize = message.getData().length; bytesCounter.inc(messagesize); messagesizeHistogram.update(messagesize); - consumer.acknowledge(message.getMessageId()); + + + if (useTransaction) { + Transaction transaction = transactionSupplier.get(); + consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); + + // little problem: here we are counting the "commit" time + // inside the overall time spent for the execution of the consume operation + // we should refactor this operation as for PulsarProducerOp, and use the passed callback + // to track with precision the time spent for the operation and for the commit + try (Timer.Context ctx = transactionCommitTimer.time()) { + transaction.commit().get(); + } + } else{ + consumer.acknowledge(message.getMessageId()); + } } catch (Exception e) { throw new RuntimeException(e); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 5bacf998f..5c9397881 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -7,8 +7,10 @@ import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; import java.util.function.LongFunction; +import java.util.function.Supplier; /** * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains @@ -25,6 +27,8 @@ public class PulsarProducerMapper extends PulsarOpMapper { private final LongFunction keyFunc; private final LongFunction payloadFunc; private final PulsarActivity pulsarActivity; + private final LongFunction useTransactionFunc; + private final LongFunction> transactionSupplierFunc; public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, @@ -32,12 +36,16 @@ public class PulsarProducerMapper extends PulsarOpMapper { LongFunction> producerFunc, LongFunction keyFunc, LongFunction payloadFunc, + LongFunction useTransactionFunc, + LongFunction> transactionSupplierFunc, PulsarActivity pulsarActivity) { super(cmdTpl, clientSpace, asyncApiFunc); this.producerFunc = producerFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; this.pulsarActivity = pulsarActivity; + this.useTransactionFunc = useTransactionFunc; + this.transactionSupplierFunc = transactionSupplierFunc; } @Override @@ -46,11 +54,14 @@ public class PulsarProducerMapper extends PulsarOpMapper { boolean asyncApi = asyncApiFunc.apply(value); String msgKey = keyFunc.apply(value); String msgPayload = payloadFunc.apply(value); - + boolean useTransaction = useTransactionFunc.apply(value); + Supplier transactionSupplier = transactionSupplierFunc.apply(value); return new PulsarProducerOp( producer, clientSpace.getPulsarSchema(), asyncApi, + useTransaction, + transactionSupplier, msgKey, msgPayload, pulsarActivity diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 6973c34f4..c72102813 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; @@ -9,11 +10,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; public class PulsarProducerOp implements PulsarOp { @@ -27,10 +31,14 @@ public class PulsarProducerOp implements PulsarOp { private final Counter bytesCounter; private final Histogram messagesizeHistogram; private final PulsarActivity pulsarActivity; + private final boolean useTransaction; + private final Supplier transactionSupplier; public PulsarProducerOp(Producer producer, Schema schema, boolean asyncPulsarOp, + boolean useTransaction, + Supplier transactionSupplier, String key, String payload, PulsarActivity pulsarActivity) { @@ -42,6 +50,8 @@ public class PulsarProducerOp implements PulsarOp { this.pulsarActivity = pulsarActivity; this.bytesCounter = pulsarActivity.getBytesCounter(); this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram(); + this.useTransaction = useTransaction; + this.transactionSupplier = transactionSupplier; } @Override @@ -49,8 +59,16 @@ public class PulsarProducerOp implements PulsarOp { if ((msgPayload == null) || msgPayload.isEmpty()) { throw new RuntimeException("Message payload (\"msg-value\") can't be empty!"); } - - TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema); + TypedMessageBuilder typedMessageBuilder; + final Transaction transaction; + if (useTransaction) { + // if you are in a transaction you cannot set the schema per-message + transaction = transactionSupplier.get(); + typedMessageBuilder = producer.newMessage(transaction); + } else { + transaction = null; + typedMessageBuilder = producer.newMessage(pulsarSchema); + } if ((msgKey != null) && (!msgKey.isEmpty())) { typedMessageBuilder = typedMessageBuilder.key(msgKey); } @@ -79,7 +97,12 @@ public class PulsarProducerOp implements PulsarOp { try { logger.trace("sending message"); typedMessageBuilder.send(); - } catch (PulsarClientException pce) { + if (useTransaction) { + try (Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();) { + transaction.commit().get(); + } + } + } catch (PulsarClientException | ExecutionException | InterruptedException pce) { logger.trace("failed sending message"); throw new RuntimeException(pce); } @@ -87,8 +110,23 @@ public class PulsarProducerOp implements PulsarOp { } else { try { // we rely on blockIfQueueIsFull in order to throttle the request in this case - CompletableFuture future = typedMessageBuilder.sendAsync(); - future.whenComplete((messageId, error) -> timeTracker.run()).exceptionally(ex -> { + CompletableFuture future = typedMessageBuilder.sendAsync(); + if (useTransaction) { + // add commit step + future = future.thenCompose(msg -> { + Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();; + return transaction + .commit() + .whenComplete((m,e) -> { + ctx.close(); + }) + .thenApply(v-> msg); + } + ); + } + future.whenComplete((messageId, error) -> { + timeTracker.run(); + }).exceptionally(ex -> { logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload); pulsarActivity.asyncOperationFailed(ex); return null; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 0cdeba7bd..44b206132 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -7,17 +7,21 @@ import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.transaction.Transaction; import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.function.LongFunction; +import java.util.function.Supplier; public class ReadyPulsarOp implements OpDispenser { - + private final static Logger logger = LogManager.getLogger(ReadyPulsarOp.class); private final OpTemplate opTpl; private final CommandTemplate cmdTpl; private final PulsarSpace clientSpace; @@ -80,11 +84,25 @@ public class ReadyPulsarOp implements OpDispenser { // Global parameter: async_api LongFunction asyncApiFunc = (l) -> false; if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { - if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) - asyncApiFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)); - else - throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!"); + if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { + boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)); + asyncApiFunc = (l) -> value; + } else { + throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!"); + } } + logger.info("async_api: {}", asyncApiFunc.apply(0)); + + LongFunction useTransactionFunc = (l) -> false; + if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) { + if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) { + boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)); + useTransactionFunc = (l) -> value; + } else { + throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label + "\" parameter cannot be dynamic!"); + } + } + logger.info("use_transaction: {}", useTransactionFunc.apply(0)); // Global parameter: admin_delop LongFunction adminDelOpFunc = (l) -> false; @@ -103,9 +121,9 @@ public class ReadyPulsarOp implements OpDispenser { } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TOPIC.label)) { return resolveAdminTopic(clientSpace, topicUriFunc, asyncApiFunc, adminDelOpFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) { - return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc); + return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { - return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc); + return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) { return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) { @@ -230,7 +248,8 @@ public class ReadyPulsarOp implements OpDispenser { private LongFunction resolveMsgSend( PulsarSpace clientSpace, LongFunction topic_uri_func, - LongFunction async_api_func + LongFunction async_api_func, + LongFunction useTransactionFunc ) { LongFunction cycle_producer_name_func; if (cmdTpl.isStatic("producer_name")) { @@ -244,6 +263,9 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction> producerFunc = (l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l)); + LongFunction> transactionSupplierFunc = + (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? + LongFunction keyFunc; if (cmdTpl.isStatic("msg_key")) { keyFunc = (l) -> cmdTpl.getStatic("msg_key"); @@ -273,13 +295,16 @@ public class ReadyPulsarOp implements OpDispenser { producerFunc, keyFunc, valueFunc, + useTransactionFunc, + transactionSupplierFunc, pulsarActivity); } private LongFunction resolveMsgConsume( PulsarSpace clientSpace, LongFunction topic_uri_func, - LongFunction async_api_func + LongFunction async_api_func, + LongFunction useTransactionFunc ) { // Topic list (multi-topic) LongFunction topic_names_func; @@ -328,6 +353,9 @@ public class ReadyPulsarOp implements OpDispenser { consumer_name_func = (l) -> null; } + LongFunction> transactionSupplierFunc = + (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? + LongFunction> consumerFunc = (l) -> clientSpace.getConsumer( topic_uri_func.apply(l), @@ -339,7 +367,8 @@ public class ReadyPulsarOp implements OpDispenser { ); return new PulsarConsumerMapper(cmdTpl, clientSpace, async_api_func, consumerFunc, - pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram()); + pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram(), pulsarActivity.getCommitTransactionTimer(), + useTransactionFunc, transactionSupplierFunc); } private LongFunction resolveMsgRead( diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index d04455f52..c12fb2ff2 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -49,6 +49,7 @@ public class PulsarActivityUtil { public enum DOC_LEVEL_PARAMS { TOPIC_URI("topic_uri"), ASYNC_API("async_api"), + USE_TRANSACTION("use_transaction"), ADMIN_DELOP("admin_delop"); public final String label;