From eb5f42f46d0cb5503d581f7cdafa14a147adc36c Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Wed, 6 Oct 2021 10:55:32 -0500 Subject: [PATCH 1/3] Fix NullPointerException of checking namespace and topic deduplication status. --- .../java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index de26f15cc..8688627e0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -461,7 +461,7 @@ public class ReadyPulsarOp implements OpDispenser { try { nsMsgDedup = pulsarAdmin.namespaces().getDeduplicationStatus(namespace); } - catch (PulsarAdminException pae) { + catch (Exception e) { // it is fine if we're unable to check namespace level setting; use default } @@ -471,7 +471,7 @@ public class ReadyPulsarOp implements OpDispenser { try { topicMsgDedup = pulsarAdmin.topics().getDeduplicationStatus(topic); } - catch (PulsarAdminException pae) { + catch (Exception e) { // it is fine if we're unable to check topic level setting; use default } From 265760d18471aa200dc995ab9a583b9a0a1b0adb Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Wed, 6 Oct 2021 11:41:50 -0500 Subject: [PATCH 2/3] Address Lari's comment in PR#364. --- .../io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 8688627e0..7ee2320d5 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -12,7 +12,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; @@ -459,7 +458,8 @@ public class ReadyPulsarOp implements OpDispenser { // - default to broker level deduplication setting boolean nsMsgDedup = brokerMsgDupFunc.apply(l); try { - nsMsgDedup = pulsarAdmin.namespaces().getDeduplicationStatus(namespace); + Boolean dedupStatus = pulsarAdmin.namespaces().getDeduplicationStatus(namespace); + if (dedupStatus != null) nsMsgDedup = dedupStatus; } catch (Exception e) { // it is fine if we're unable to check namespace level setting; use default @@ -469,7 +469,8 @@ public class ReadyPulsarOp implements OpDispenser { // - default to namespace level deduplication setting boolean topicMsgDedup = nsMsgDedup; try { - topicMsgDedup = pulsarAdmin.topics().getDeduplicationStatus(topic); + Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(namespace); + if (dedupStatus != null) topicMsgDedup = dedupStatus; } catch (Exception e) { // it is fine if we're unable to check topic level setting; use default From 11073cf0aae321b8401e54c0d40a14332e020162 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Thu, 7 Oct 2021 14:56:51 -0500 Subject: [PATCH 3/3] Address Shooky's comment in PR#364. --- .../driver/pulsar/ops/ReadyPulsarOp.java | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 7ee2320d5..03c5bbdd3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -11,7 +11,10 @@ import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Reader; @@ -450,29 +453,32 @@ public class ReadyPulsarOp implements OpDispenser { (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? LongFunction topicMsgDedupFunc = (l) -> { - String topic = topic_uri_func.apply(l); - String namespace = PulsarActivityUtil.getFullNamespaceName(topic); + String topicName = topic_uri_func.apply(l); + String nsName = PulsarActivityUtil.getFullNamespaceName(topicName); PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin(); // Check namespace-level deduplication setting // - default to broker level deduplication setting - boolean nsMsgDedup = brokerMsgDupFunc.apply(l); + // (as expressed in NB yaml file) + boolean topicMsgDedup = brokerMsgDupFunc.apply(l); try { - Boolean dedupStatus = pulsarAdmin.namespaces().getDeduplicationStatus(namespace); - if (dedupStatus != null) nsMsgDedup = dedupStatus; - } - catch (Exception e) { + Namespaces namespaces = pulsarAdmin.namespaces(); + if (namespaces != null) { + Boolean dedupStatus = namespaces.getDeduplicationStatus(nsName); + if (dedupStatus != null) topicMsgDedup = dedupStatus; + } + } catch (PulsarAdminException e) { // it is fine if we're unable to check namespace level setting; use default } // Check topic-level deduplication setting - // - default to namespace level deduplication setting - boolean topicMsgDedup = nsMsgDedup; try { - Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(namespace); - if (dedupStatus != null) topicMsgDedup = dedupStatus; - } - catch (Exception e) { + Topics topics = pulsarAdmin.topics(); + if (topics != null) { + Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(topicName); + if (dedupStatus != null) topicMsgDedup = dedupStatus; + } + } catch (PulsarAdminException e) { // it is fine if we're unable to check topic level setting; use default }