Address Shooky's comment in PR#364.

This commit is contained in:
Yabin Meng 2021-10-07 14:56:51 -05:00
parent cdfb7c3224
commit 11073cf0aa

View File

@ -11,7 +11,10 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader;
@ -450,29 +453,32 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Boolean> topicMsgDedupFunc = (l) -> {
String topic = topic_uri_func.apply(l);
String namespace = PulsarActivityUtil.getFullNamespaceName(topic);
String topicName = topic_uri_func.apply(l);
String nsName = PulsarActivityUtil.getFullNamespaceName(topicName);
PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin();
// Check namespace-level deduplication setting
// - default to broker level deduplication setting
boolean nsMsgDedup = brokerMsgDupFunc.apply(l);
// (as expressed in NB yaml file)
boolean topicMsgDedup = brokerMsgDupFunc.apply(l);
try {
Boolean dedupStatus = pulsarAdmin.namespaces().getDeduplicationStatus(namespace);
if (dedupStatus != null) nsMsgDedup = dedupStatus;
}
catch (Exception e) {
Namespaces namespaces = pulsarAdmin.namespaces();
if (namespaces != null) {
Boolean dedupStatus = namespaces.getDeduplicationStatus(nsName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check namespace level setting; use default
}
// Check topic-level deduplication setting
// - default to namespace level deduplication setting
boolean topicMsgDedup = nsMsgDedup;
try {
Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(namespace);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
catch (Exception e) {
Topics topics = pulsarAdmin.topics();
if (topics != null) {
Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(topicName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check topic level setting; use default
}