diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index de26f15cc..03c5bbdd3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -11,8 +11,10 @@ import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; @@ -451,27 +453,32 @@ public class ReadyPulsarOp implements OpDispenser { (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? LongFunction topicMsgDedupFunc = (l) -> { - String topic = topic_uri_func.apply(l); - String namespace = PulsarActivityUtil.getFullNamespaceName(topic); + String topicName = topic_uri_func.apply(l); + String nsName = PulsarActivityUtil.getFullNamespaceName(topicName); PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin(); // Check namespace-level deduplication setting // - default to broker level deduplication setting - boolean nsMsgDedup = brokerMsgDupFunc.apply(l); + // (as expressed in NB yaml file) + boolean topicMsgDedup = brokerMsgDupFunc.apply(l); try { - nsMsgDedup = pulsarAdmin.namespaces().getDeduplicationStatus(namespace); - } - catch (PulsarAdminException pae) { + Namespaces namespaces = pulsarAdmin.namespaces(); + if (namespaces != null) { + Boolean dedupStatus = namespaces.getDeduplicationStatus(nsName); + if (dedupStatus != null) topicMsgDedup = dedupStatus; + } + } catch (PulsarAdminException e) { // it is fine if we're unable to check namespace level setting; use default } // Check topic-level deduplication setting - // - default to namespace level deduplication setting - boolean topicMsgDedup = nsMsgDedup; try { - topicMsgDedup = pulsarAdmin.topics().getDeduplicationStatus(topic); - } - catch (PulsarAdminException pae) { + Topics topics = pulsarAdmin.topics(); + if (topics != null) { + Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(topicName); + if (dedupStatus != null) topicMsgDedup = dedupStatus; + } + } catch (PulsarAdminException e) { // it is fine if we're unable to check topic level setting; use default }