Workarond for Pulsar bug

This commit is contained in:
Enrico Olivelli 2021-03-16 14:33:26 +01:00
parent 97cfaa5cac
commit 59a5ce8920
2 changed files with 13 additions and 5 deletions

View File

@ -7,6 +7,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class PulsarConsumerOp implements PulsarOp { public class PulsarConsumerOp implements PulsarOp {
@ -27,9 +28,16 @@ public class PulsarConsumerOp implements PulsarOp {
public void syncConsume() { public void syncConsume() {
try { try {
Message<?> message = consumer.receive(timeoutSeconds, TimeUnit.SECONDS); Message<?> message;
if (message == null) { if (timeoutSeconds <= 0) {
throw new RuntimeException("Did not receive a message within "+timeoutSeconds+" seconds"); // wait forever
message = consumer.receive();
} else {
// we cannot use Consumer#receive(timeout, timeunit) due to
// https://github.com/apache/pulsar/issues/9921
message = consumer
.receiveAsync()
.get(timeoutSeconds, TimeUnit.SECONDS);
} }
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
@ -48,7 +56,7 @@ public class PulsarConsumerOp implements PulsarOp {
} }
consumer.acknowledge(message.getMessageId()); consumer.acknowledge(message.getMessageId());
} catch (PulsarClientException e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }

View File

@ -231,7 +231,7 @@ public class PulsarNBClientConf {
public int getConsumerTimeoutSeconds() { public int getConsumerTimeoutSeconds() {
Object confValue = getConsumerConfValue("consumer.timeout"); Object confValue = getConsumerConfValue("consumer.timeout");
if (confValue == null) if (confValue == null)
return 0; // infinite return -1; // infinite
else else
return Integer.parseInt(confValue.toString()); return Integer.parseInt(confValue.toString());
} }