Merge branch 'main' of github.com:nosqlbench/nosqlbench

This commit is contained in:
Jonathan Shook 2021-10-11 17:25:31 -05:00
commit 4418500d20
2 changed files with 9 additions and 29 deletions

View File

@ -94,7 +94,8 @@ blocks:
create table if not exists <<keyspace:baselines>>.<<table:tabular>> ( create table if not exists <<keyspace:baselines>>.<<table:tabular>> (
part text, part text,
clust text, clust text,
data text, data0 text, data1 text, data2 text, data3 text,
data4 text, data5 text, data6 text, data7 text,
PRIMARY KEY (part,clust) PRIMARY KEY (part,clust)
); );
tags: tags:

View File

@ -11,8 +11,10 @@ import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Reader;
@ -450,34 +452,11 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Supplier<Transaction>> transactionSupplierFunc = LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Boolean> topicMsgDedupFunc = (l) -> { // TODO: Ignore namespace and topic level dedup check on the fly
String topic = topic_uri_func.apply(l); // this will impact the consumer performance significantly
String namespace = PulsarActivityUtil.getFullNamespaceName(topic); // Consider using caching or Memoizer in the future?
PulsarAdmin pulsarAdmin = pulsarActivity.getPulsarAdmin(); // (https://www.baeldung.com/guava-memoizer)
LongFunction<Boolean> topicMsgDedupFunc = brokerMsgDupFunc;
// Check namespace-level deduplication setting
// - default to broker level deduplication setting
boolean nsMsgDedup = brokerMsgDupFunc.apply(l);
try {
nsMsgDedup = pulsarAdmin.namespaces().getDeduplicationStatus(namespace);
}
catch (PulsarAdminException pae) {
// it is fine if we're unable to check namespace level setting; use default
}
// Check topic-level deduplication setting
// - default to namespace level deduplication setting
boolean topicMsgDedup = nsMsgDedup;
try {
topicMsgDedup = pulsarAdmin.topics().getDeduplicationStatus(topic);
}
catch (PulsarAdminException pae) {
// it is fine if we're unable to check topic level setting; use default
}
return topicMsgDedup;
};
LongFunction<Consumer<?>> consumerFunc = (l) -> LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer( clientSpace.getConsumer(