diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 896631293..615c82dd5 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; @@ -115,11 +116,11 @@ public class PulsarConsumerOp implements PulsarOp { message = consumer.receive(); } else { - // we cannot use Consumer#receive(timeout, timeunit) due to - // https://github.com/apache/pulsar/issues/9921 message = consumer - .receiveAsync() - .get(timeoutSeconds, TimeUnit.SECONDS); + .receive(timeoutSeconds, TimeUnit.SECONDS); + if (message == null) { + throw new TimeoutException("Did not receive a message within "+timeoutSeconds+" seconds"); + } } if (logger.isDebugEnabled()) {