Merge pull request #282 from eolivelli/impl/pulsar-enhancements

Allow PulsarConsumer to configure a Timeout
This commit is contained in:
Jonathan Shook 2021-03-16 10:08:11 -05:00 committed by GitHub
commit d4ecf0e8a9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 9 deletions

View File

@ -38,7 +38,8 @@ public class PulsarConsumerMapper extends PulsarOpMapper {
return new PulsarConsumerOp(
consumer,
clientSpace.getPulsarSchema(),
asyncApi
asyncApi,
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds()
);
}
}

View File

@ -2,37 +2,61 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class PulsarConsumerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
private final Consumer<?> consumer;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;
private final int timeoutSeconds;
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp) {
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds) {
this.consumer = consumer;
this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp;
this.timeoutSeconds = timeoutSeconds;
}
public void syncConsume() {
try {
Message<?> message = consumer.receive();
Message<?> message;
if (timeoutSeconds <= 0) {
// wait forever
message = consumer.receive();
} else {
// we cannot use Consumer#receive(timeout, timeunit) due to
// https://github.com/apache/pulsar/issues/9921
message = consumer
.receiveAsync()
.get(timeoutSeconds, TimeUnit.SECONDS);
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
if (logger.isDebugEnabled()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString());
logger.debug("msg-key={} msg-payload={}", message.getKey(), avroGenericRecord.toString());
}
} else {
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData()));
if (logger.isDebugEnabled()) {
logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData()));
}
}
consumer.acknowledge(message.getMessageId());
} catch (PulsarClientException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}

View File

@ -228,6 +228,13 @@ public class PulsarNBClientConf {
else
return confValue.toString();
}
public int getConsumerTimeoutSeconds() {
Object confValue = getConsumerConfValue("consumer.timeout");
if (confValue == null)
return -1; // infinite
else
return Integer.parseInt(confValue.toString());
}
public String getConsumerSubscriptionName() {
Object confValue = getConsumerConfValue("consumer.subscriptionName");
if (confValue == null)