Merge pull request #366 from yabinmeng/main

Remove namespace and topic level dedup setting check on the fly
This commit is contained in:
Jonathan Shook 2021-10-11 17:25:23 -05:00 committed by GitHub
commit 21b96cfdf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -452,39 +452,11 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Boolean> topicMsgDedupFunc = (l) -> {
String topicName = topic_uri_func.apply(l);
String nsName = PulsarActivityUtil.getFullNamespaceName(topicName);
PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin();
// Check namespace-level deduplication setting
// - default to broker level deduplication setting
// (as expressed in NB yaml file)
boolean topicMsgDedup = brokerMsgDupFunc.apply(l);
try {
Namespaces namespaces = pulsarAdmin.namespaces();
if (namespaces != null) {
Boolean dedupStatus = namespaces.getDeduplicationStatus(nsName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check namespace level setting; use default
}
// Check topic-level deduplication setting
try {
Topics topics = pulsarAdmin.topics();
if (topics != null) {
Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(topicName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check topic level setting; use default
}
return topicMsgDedup;
};
// TODO: Ignore namespace and topic level dedup check on the fly
// this will impact the consumer performance significantly
// Consider using caching or Memoizer in the future?
// (https://www.baeldung.com/guava-memoizer)
LongFunction<Boolean> topicMsgDedupFunc = brokerMsgDupFunc;
LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer(