Merge pull request #364 from yabinmeng/main

Fix NullPointerException to check namespce/topic dedup status
This commit is contained in:
Jonathan Shook 2021-10-07 15:32:40 -05:00 committed by GitHub
commit 29dafd9133
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -11,8 +11,10 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Reader;
@ -451,27 +453,32 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Boolean> topicMsgDedupFunc = (l) -> { LongFunction<Boolean> topicMsgDedupFunc = (l) -> {
String topic = topic_uri_func.apply(l); String topicName = topic_uri_func.apply(l);
String namespace = PulsarActivityUtil.getFullNamespaceName(topic); String nsName = PulsarActivityUtil.getFullNamespaceName(topicName);
PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin(); PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin();
// Check namespace-level deduplication setting // Check namespace-level deduplication setting
// - default to broker level deduplication setting // - default to broker level deduplication setting
boolean nsMsgDedup = brokerMsgDupFunc.apply(l); // (as expressed in NB yaml file)
boolean topicMsgDedup = brokerMsgDupFunc.apply(l);
try { try {
nsMsgDedup = pulsarAdmin.namespaces().getDeduplicationStatus(namespace); Namespaces namespaces = pulsarAdmin.namespaces();
} if (namespaces != null) {
catch (PulsarAdminException pae) { Boolean dedupStatus = namespaces.getDeduplicationStatus(nsName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check namespace level setting; use default // it is fine if we're unable to check namespace level setting; use default
} }
// Check topic-level deduplication setting // Check topic-level deduplication setting
// - default to namespace level deduplication setting
boolean topicMsgDedup = nsMsgDedup;
try { try {
topicMsgDedup = pulsarAdmin.topics().getDeduplicationStatus(topic); Topics topics = pulsarAdmin.topics();
} if (topics != null) {
catch (PulsarAdminException pae) { Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(topicName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check topic level setting; use default // it is fine if we're unable to check topic level setting; use default
} }