Remove namespace and topic level dedup setting check for each run, which is too much overhead.

This commit is contained in:
Yabin Meng 2021-10-09 12:29:22 -05:00
parent 29dafd9133
commit 2876e102a6

View File

@ -452,39 +452,11 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Supplier<Transaction>> transactionSupplierFunc = LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Boolean> topicMsgDedupFunc = (l) -> { // TODO: Ignore namespace and topic level dedup check on the fly
String topicName = topic_uri_func.apply(l); // this will impact the consumer performance significantly
String nsName = PulsarActivityUtil.getFullNamespaceName(topicName); // Consider using caching or Memoizer in the future?
PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin(); // (https://www.baeldung.com/guava-memoizer)
LongFunction<Boolean> topicMsgDedupFunc = brokerMsgDupFunc;
// Check namespace-level deduplication setting
// - default to broker level deduplication setting
// (as expressed in NB yaml file)
boolean topicMsgDedup = brokerMsgDupFunc.apply(l);
try {
Namespaces namespaces = pulsarAdmin.namespaces();
if (namespaces != null) {
Boolean dedupStatus = namespaces.getDeduplicationStatus(nsName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check namespace level setting; use default
}
// Check topic-level deduplication setting
try {
Topics topics = pulsarAdmin.topics();
if (topics != null) {
Boolean dedupStatus = pulsarAdmin.topics().getDeduplicationStatus(topicName);
if (dedupStatus != null) topicMsgDedup = dedupStatus;
}
} catch (PulsarAdminException e) {
// it is fine if we're unable to check topic level setting; use default
}
return topicMsgDedup;
};
LongFunction<Consumer<?>> consumerFunc = (l) -> LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer( clientSpace.getConsumer(