diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 7ee2320d5..03c5bbdd3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -11,7 +11,10 @@ import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; @@ -450,29 +453,32 @@ public class ReadyPulsarOp implements OpDispenser { (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? LongFunction topicMsgDedupFunc = (l) -> { - String topic = topic_uri_func.apply(l); - String namespace = PulsarActivityUtil.getFullNamespaceName(topic); + String topicName = topic_uri_func.apply(l); + String nsName = PulsarActivityUtil.getFullNamespaceName(topicName); PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin(); // Check namespace-level deduplication setting // - default to broker level deduplication setting - boolean nsMsgDedup = brokerMsgDupFunc.apply(l); + // (as expressed in NB yaml file) + boolean topicMsgDedup = brokerMsgDupFunc.apply(l); try { - Boolean dedupStatus = pulsarAdmin.namespaces().getDeduplicationStatus(namespace); - if (dedupStatus != null) nsMsgDedup = dedupStatus; - } - catch (Exception e) { + Namespaces namespaces = pulsarAdmin.namespaces(); + if (namespaces != null) { + Boolean dedupStatus = namespaces.getDeduplicationStatus(nsName); + if (dedupStatus != null) topicMsgDedup = dedupStatus; + } + } catch (PulsarAdminException e) { // it is fine if we're unable to check namespace level setting; use default } // Check topic-level deduplication setting - // - default to namespace level deduplication setting - boolean topicMsgDedup = nsMsgDedup; try { - Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(namespace); - if (dedupStatus != null) topicMsgDedup = dedupStatus; - } - catch (Exception e) { + Topics topics = pulsarAdmin.topics(); + if (topics != null) { + Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(topicName); + if (dedupStatus != null) topicMsgDedup = dedupStatus; + } + } catch (PulsarAdminException e) { // it is fine if we're unable to check topic level setting; use default }