diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index c1ff0332d..ffccca6b5 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -38,7 +38,8 @@ public class PulsarConsumerMapper extends PulsarOpMapper { return new PulsarConsumerOp( consumer, clientSpace.getPulsarSchema(), - asyncApi + asyncApi, + clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds() ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index d3606a059..85c906944 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -2,37 +2,61 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.schema.SchemaType; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + public class PulsarConsumerOp implements PulsarOp { + + private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class); + private final Consumer consumer; private final Schema pulsarSchema; private final boolean asyncPulsarOp; + private final int timeoutSeconds; - public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp) { + public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp, int timeoutSeconds) { this.consumer = consumer; this.pulsarSchema = schema; this.asyncPulsarOp = asyncPulsarOp; + this.timeoutSeconds = timeoutSeconds; } public void syncConsume() { try { - Message message = consumer.receive(); + Message message; + if (timeoutSeconds <= 0) { + // wait forever + message = consumer.receive(); + } else { + // we cannot use Consumer#receive(timeout, timeunit) due to + // https://github.com/apache/pulsar/issues/9921 + message = consumer + .receiveAsync() + .get(timeoutSeconds, TimeUnit.SECONDS); + } SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.generic.GenericRecord avroGenericRecord = - AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData()); + if (logger.isDebugEnabled()) { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData()); - System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString()); + logger.debug("msg-key={} msg-payload={}", message.getKey(), avroGenericRecord.toString()); + } } else { - System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData())); + if (logger.isDebugEnabled()) { + logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData())); + } } consumer.acknowledge(message.getMessageId()); - } catch (PulsarClientException e) { + } catch (Exception e) { throw new RuntimeException(e); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index 5b5814759..2c42b054c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -228,6 +228,13 @@ public class PulsarNBClientConf { else return confValue.toString(); } + public int getConsumerTimeoutSeconds() { + Object confValue = getConsumerConfValue("consumer.timeout"); + if (confValue == null) + return -1; // infinite + else + return Integer.parseInt(confValue.toString()); + } public String getConsumerSubscriptionName() { Object confValue = getConsumerConfValue("consumer.subscriptionName"); if (confValue == null)