Merge pull request #291 from eolivelli/impl/pulsar-async-adv

Allow to use  the Pulsar Producer Async API in order to generate the maximum load
This commit is contained in:
Jonathan Shook 2021-03-30 10:33:53 -05:00 committed by GitHub
commit 214ff53128
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 73 additions and 39 deletions

View File

@ -29,6 +29,10 @@ public class PulsarAction implements SyncAction {
@Override
public int runCycle(long cycle) {
// let's fail the action if some async operation failed
activity.failOnAsyncOperationFailure();
long start = System.nanoTime();
PulsarOp pulsarOp;
@ -44,8 +48,11 @@ public class PulsarAction implements SyncAction {
}
for (int i = 0; i < maxTries; i++) {
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
Timer.Context ctx = activity.getExecuteTimer().time();
try {
// it is up to the pulsarOp to call Context#close when the activity is executed
// this allows us to track time for async operations
pulsarOp.run(ctx::close);
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity

View File

@ -48,6 +48,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
private NBErrorHandler errorhandler;
private OpSequence<OpDispenser<PulsarOp>> sequencer;
private volatile Throwable asyncOperationFailure;
// private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
@ -187,4 +188,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Histogram getMessagesizeHistogram() {
return messagesizeHistogram;
}
public void failOnAsyncOperationFailure() {
if (asyncOperationFailure != null) {
throw new RuntimeException(asyncOperationFailure);
}
}
public void asyncOperationFailed(Throwable ex) {
this.asyncOperationFailure = asyncOperationFailure;
}
}

View File

@ -10,7 +10,7 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
public class PulsarAdminCrtTennamOp implements PulsarOp {
public class PulsarAdminCrtTennamOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTennamOp.class);

View File

@ -1,17 +1,14 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerEndOp implements PulsarOp {
public class PulsarBatchProducerEndOp extends SyncPulsarOp {
@Override
public void run() {
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();

View File

@ -14,7 +14,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerOp implements PulsarOp {
public class PulsarBatchProducerOp extends SyncPulsarOp {
private final Schema<?> pulsarSchema;
private final String msgKey;

View File

@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerStartOp implements PulsarOp {
public class PulsarBatchProducerStartOp extends SyncPulsarOp {
// TODO: ensure sane container lifecycle management
public final static ThreadLocal<List<CompletableFuture<MessageId>>> threadLocalBatchMsgContainer = new ThreadLocal<>();

View File

@ -9,10 +9,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class PulsarConsumerOp implements PulsarOp {
public class PulsarConsumerOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);

View File

@ -3,6 +3,12 @@ package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Pulsar Operations including Producers and Consumers.
*/
public interface PulsarOp extends Runnable {
public interface PulsarOp {
/**
* Execute the operation, invoke the timeTracker when the operation ended.
* The timeTracker can be invoked in a separate thread, it is only used for metrics.
* @param timeTracker
*/
void run(Runnable timeTracker);
}

View File

@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
@ -24,8 +25,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<Boolean> asyncApiFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final PulsarActivity pulsarActivity;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
@ -33,15 +33,13 @@ public class PulsarProducerMapper extends PulsarOpMapper {
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc,
Counter bytesCounter,
Histogram messagesizeHistogram) {
PulsarActivity pulsarActivity) {
super(cmdTpl, clientSpace);
this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.pulsarActivity = pulsarActivity;
}
@Override
@ -57,7 +55,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
asyncApi,
msgKey,
msgPayload,
bytesCounter,
messagesizeHistogram);
pulsarActivity
);
}
}

View File

@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
@ -25,25 +26,26 @@ public class PulsarProducerOp implements PulsarOp {
private final boolean asyncPulsarOp;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final PulsarActivity pulsarActivity;
public PulsarProducerOp(Producer<?> producer,
Schema<?> schema,
boolean asyncPulsarOp,
String key,
String payload,
Counter bytesCounter,
Histogram messagesizeHistogram) {
PulsarActivity pulsarActivity) {
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.pulsarActivity = pulsarActivity;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
}
@Override
public void run() {
public void run(Runnable timeTracker) {
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
@ -80,17 +82,18 @@ public class PulsarProducerOp implements PulsarOp {
logger.trace("failed sending message");
throw new RuntimeException(pce);
}
timeTracker.run();
} else {
try {
// we rely on blockIfQueueIsFull in order to throttle the request in this case
CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
future.get();
/*.thenRun(() -> {
// System.out.println("Producing message succeeded: key - " + msgKey + "; payload - " + msgPayload);
}).exceptionally(ex -> {
System.out.println("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
return ex;
})*/
future.whenComplete((messageId, error) -> {
timeTracker.run();
}).exceptionally(ex -> {
logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
pulsarActivity.asyncOperationFailed(ex);
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@ -8,7 +8,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
public class PulsarReaderOp implements PulsarOp {
public class PulsarReaderOp extends SyncPulsarOp {
private final Reader<?> reader;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;

View File

@ -50,9 +50,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
this.opFunc = resolve();
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
Supplier<LongFunction<PulsarOp>> opSupplier = scope.supplier(this::resolve);
}
@Override
@ -249,8 +246,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
async_api_func,
keyFunc,
valueFunc,
pulsarActivity.getBytesCounter(),
pulsarActivity.getMessagesizeHistogram());
pulsarActivity);
}
private LongFunction<PulsarOp> resolveMsgConsume(

View File

@ -0,0 +1,17 @@
package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Sync Pulsar Operations including Producers and Consumers.
*/
public abstract class SyncPulsarOp implements PulsarOp {
public void run(Runnable timeTracker) {
try {
this.run();
} finally {
timeTracker.run();
}
}
public abstract void run();
}