diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index b7fca4a36..85c906944 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.schema.SchemaType; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class PulsarConsumerOp implements PulsarOp { @@ -27,9 +28,16 @@ public class PulsarConsumerOp implements PulsarOp { public void syncConsume() { try { - Message message = consumer.receive(timeoutSeconds, TimeUnit.SECONDS); - if (message == null) { - throw new RuntimeException("Did not receive a message within "+timeoutSeconds+" seconds"); + Message message; + if (timeoutSeconds <= 0) { + // wait forever + message = consumer.receive(); + } else { + // we cannot use Consumer#receive(timeout, timeunit) due to + // https://github.com/apache/pulsar/issues/9921 + message = consumer + .receiveAsync() + .get(timeoutSeconds, TimeUnit.SECONDS); } SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); @@ -48,7 +56,7 @@ public class PulsarConsumerOp implements PulsarOp { } consumer.acknowledge(message.getMessageId()); - } catch (PulsarClientException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index c03ae9cae..2c42b054c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -231,7 +231,7 @@ public class PulsarNBClientConf { public int getConsumerTimeoutSeconds() { Object confValue = getConsumerConfValue("consumer.timeout"); if (confValue == null) - return 0; // infinite + return -1; // infinite else return Integer.parseInt(confValue.toString()); }