Merge pull request #381 from lhotari/lh-workaround-ordering-bug

Pulsar: Workaround an ordering bug in the Pulsar client
This commit is contained in:
Jonathan Shook 2021-11-11 12:09:53 -06:00 committed by GitHub
commit 3429765280
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
@ -115,11 +116,11 @@ public class PulsarConsumerOp implements PulsarOp {
message = consumer.receive();
}
else {
// we cannot use Consumer#receive(timeout, timeunit) due to
// https://github.com/apache/pulsar/issues/9921
message = consumer
.receiveAsync()
.get(timeoutSeconds, TimeUnit.SECONDS);
.receive(timeoutSeconds, TimeUnit.SECONDS);
if (message == null) {
throw new TimeoutException("Did not receive a message within "+timeoutSeconds+" seconds");
}
}
if (logger.isDebugEnabled()) {