Cleanup parameter parsing in ReadyPulsarOp

- reduce duplication
This commit is contained in:
Lari Hotari 2021-12-08 10:21:41 +02:00
parent 725b1473ef
commit 72e32acd49

View File

@ -43,21 +43,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
this.opTpl = optpl;
this.cmdTpl = new CommandTemplate(optpl);
if (cmdTpl.isDynamic("op_scope")) {
throw new PulsarDriverParamException("\"op_scope\" parameter must be static");
}
// TODO: At the moment, only supports static "client"
if (cmdTpl.containsKey("client")) {
if (cmdTpl.isDynamic("client")) {
throw new PulsarDriverParamException("\"client\" parameter can't be made dynamic!");
} else {
String client_name = cmdTpl.getStatic("client");
this.clientSpace = pcache.getPulsarSpace(client_name);
}
} else {
this.clientSpace = pcache.getPulsarSpace("default");
}
String client_name = lookupStaticParameter("client", false, "default");
this.clientSpace = pcache.getPulsarSpace(client_name);
this.opFunc = resolve();
}
@ -69,80 +57,39 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private LongFunction<PulsarOp> resolve() {
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new PulsarDriverParamException("[resolve()] \"optype\" parameter must be static and have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
String stmtOpType = lookupStaticParameter("optype", true, null);
if (cmdTpl.containsKey("topic_url")) {
throw new PulsarDriverParamException("[resolve()] \"topic_url\" parameter is not valid. Perhaps you mean \"topic_uri\"?");
}
// Doc-level parameter: topic_uri
LongFunction<String> topicUriFunc = (l) -> null;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
topicUriFunc = (l) -> cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label);
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l);
}
}
LongFunction<String> topicUriFunc = lookupParameterFunc(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label);
logger.info("topic_uri: {}", topicUriFunc.apply(0));
// Doc-level parameter: async_api
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label));
asyncApiFunc = (l) -> value;
} else {
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
}
}
logger.info("async_api: {}", asyncApiFunc.apply(0));
boolean useAsyncApi = BooleanUtils.toBoolean(lookupStaticParameter(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label, false, "false"));
LongFunction<Boolean> asyncApiFunc = (l) -> useAsyncApi;
logger.info("async_api: {}", useAsyncApi);
// Doc-level parameter: async_api
LongFunction<Boolean> useTransactionFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) {
boolean value = BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label));
useTransactionFunc = (l) -> value;
} else {
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label + "\" parameter cannot be dynamic!");
}
}
logger.info("use_transaction: {}", useTransactionFunc.apply(0));
// Doc-level parameter: use_transaction
boolean useTransaction = BooleanUtils.toBoolean(lookupStaticParameter(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label, false, "false"));
LongFunction<Boolean> useTransactionFunc = (l) -> useTransaction;
logger.info("use_transaction: {}", useTransaction);
// Doc-level parameter: admin_delop
LongFunction<Boolean> adminDelOpFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label))
adminDelOpFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label));
else
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!");
}
logger.info("admin_delop: {}", adminDelOpFunc.apply(0));
boolean adminDelOp = BooleanUtils.toBoolean(lookupStaticParameter(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label, false, "false"));
LongFunction<Boolean> adminDelOpFunc = (l) -> adminDelOp;
logger.info("admin_delop: {}", adminDelOp);
// Doc-level parameter: seq_tracking
LongFunction<Boolean> seqTrackingFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label))
seqTrackingFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label));
else
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label + "\" parameter cannot be dynamic!");
}
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
boolean seqTracking = BooleanUtils.toBoolean(lookupStaticParameter(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label, false, "false"));
LongFunction<Boolean> seqTrackingFunc = (l) -> seqTracking;
logger.info("seq_tracking: {}", seqTracking);
// TODO: Collapse this pattern into a simple version and flatten out all call sites
LongFunction<String> payloadRttFieldFunc = (l) -> "";
if (cmdTpl.isStatic(RTT_TRACKING_FIELD)) {
payloadRttFieldFunc = l -> cmdTpl.getStatic(RTT_TRACKING_FIELD);
logger.info("payload_rtt_field: {}", cmdTpl.getStatic(RTT_TRACKING_FIELD));
} else if (cmdTpl.isDynamic(RTT_TRACKING_FIELD)) {
payloadRttFieldFunc = l -> cmdTpl.getDynamic(RTT_TRACKING_FIELD,l);
logger.info("payload_rtt_field: {}", cmdTpl.getFieldDescription(RTT_TRACKING_FIELD));
}
logger.info("payload_rtt_field_func: {}", payloadRttFieldFunc.toString());
LongFunction<String> payloadRttFieldFunc = lookupParameterFunc(RTT_TRACKING_FIELD, false, "");
logger.info("payload_rtt_field_func: {}", payloadRttFieldFunc.apply(0));
// TODO: Complete implementation for websocket-producer and managed-ledger
// Admin operation: create/delete tenant
@ -225,43 +172,17 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc)
{
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new PulsarDriverParamException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
}
// "admin_roles" includes comma-separated admin roles:
// e.g. role1, role2
Set<String> roleSet = lookupStaticParameterSet("admin_roles");
LongFunction<Set<String>> adminRolesFunc = (l) -> roleSet;
LongFunction<Set<String>> adminRolesFunc;
Set<String> roleSet = new HashSet<>();
if (cmdTpl.isStatic("admin_roles")) {
// "admin_roles" includes comma-separated admin roles:
// e.g. role1, role2
String adminRolesStr = cmdTpl.getStatic("admin_roles");
String[] roleArr = adminRolesStr.split(",");
Set<String> stringSet = new HashSet<>(Arrays.asList(roleArr));
roleSet.addAll(stringSet);
}
adminRolesFunc = (l) -> roleSet;
// "allowed_cluster" includes comma-separated cluster names:
// e.g. cluster1, cluster2
Set<String> clusterSet = lookupStaticParameterSet("allowed_clusters");
LongFunction<Set<String>> allowedClustersFunc = (l) -> clusterSet;
LongFunction<Set<String>> allowedClustersFunc;
Set<String> clusterSet = new HashSet<>();
if (cmdTpl.isStatic("allowed_clusters")) {
// "allowed_cluster" includes comma-separated cluster names:
// e.g. cluster1, cluster2
String allowedClustersStr = cmdTpl.getStatic("allowed_clusters");
String[] clusterArr = allowedClustersStr.split(",");
Set<String> stringSet = new HashSet<>(Arrays.asList(clusterArr));
clusterSet.addAll(stringSet);
}
allowedClustersFunc = (l) -> clusterSet;
LongFunction<String> tenantFunc;
if (cmdTpl.isStatic("tenant")) {
tenantFunc = (l) -> cmdTpl.getStatic("tenant");
} else if (cmdTpl.isDynamic("tenant")) {
tenantFunc = (l) -> cmdTpl.getDynamic("tenant", l);
} else {
tenantFunc = (l) -> null;
}
LongFunction<String> tenantFunc = lookupParameterFunc("tenant");
return new PulsarAdminTenantMapper(
cmdTpl,
@ -274,20 +195,23 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
tenantFunc);
}
private Set<String> lookupStaticParameterSet(String parameterName) {
return Optional.ofNullable(lookupStaticParameter(parameterName))
.map(value -> {
Set<String> set = Arrays.stream(value.split(","))
.map(String::trim)
.collect(Collectors.toCollection(LinkedHashSet::new));
return set;
}).orElse(Collections.emptySet());
}
// Admin API: create tenant
private LongFunction<PulsarOp> resolveAdminNamespace(
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc)
{
LongFunction<String> namespaceFunc;
if (cmdTpl.isStatic("namespace")) {
namespaceFunc = (l) -> cmdTpl.getStatic("namespace");
} else if (cmdTpl.isDynamic("namespace")) {
namespaceFunc = (l) -> cmdTpl.getDynamic("namespace", l);
} else {
namespaceFunc = (l) -> null;
}
LongFunction<String> namespaceFunc = lookupParameterFunc("namespace");
return new PulsarAdminNamespaceMapper(
cmdTpl,
@ -305,19 +229,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc
) {
LongFunction<String> enablePartionFunc = (l) -> null;
if (cmdTpl.isStatic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getStatic("enable_partition");
} else if (cmdTpl.isDynamic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getDynamic("enable_partition", l);
}
LongFunction<String> enablePartionFunc = lookupParameterFunc("enable_partition");
LongFunction<String> partitionNumFunc = (l) -> null;
if (cmdTpl.isStatic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getStatic("partition_num");
} else if (cmdTpl.isDynamic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getDynamic("partition_num", l);
}
LongFunction<String> partitionNumFunc = lookupParameterFunc("partition_num");
return new PulsarAdminTopicMapper(
cmdTpl,
@ -340,14 +254,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<String> cycle_producer_name_func;
if (cmdTpl.isStatic("producer_name")) {
cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer_name");
} else if (cmdTpl.isDynamic("producer_name")) {
cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer_name", l);
} else {
cycle_producer_name_func = (l) -> null;
}
LongFunction<String> cycle_producer_name_func = lookupParameterFunc("producer_name");
LongFunction<Producer<?>> producerFunc =
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l));
@ -355,47 +262,15 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
// check if we're going to simulate producer message out-of-sequence error
// - message ordering
// - message loss
Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> seqErrSimuTypes = Collections.emptySet();
if (cmdTpl.containsKey("seqerr_simu")) {
if (cmdTpl.isStatic("seqerr_simu")) {
seqErrSimuTypes = parseSimulatedErrorTypes(cmdTpl.getStatic("seqerr_simu"));
} else {
throw new PulsarDriverParamException("[resolveMsgSend()] \"seqerr_simu\" parameter cannot be dynamic!");
}
}
Set<PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE> seqErrSimuTypes = parseSimulatedErrorTypes(lookupStaticParameter("seqerr_simu"));
// message key
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
} else if (cmdTpl.isDynamic("msg_key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l);
} else {
keyFunc = (l) -> null;
}
LongFunction<String> keyFunc = lookupParameterFunc("msg_key");
// message property
LongFunction<String> propFunc;
if (cmdTpl.isStatic("msg_property")) {
propFunc = (l) -> cmdTpl.getStatic("msg_property");
} else if (cmdTpl.isDynamic("msg_property")) {
propFunc = (l) -> cmdTpl.getDynamic("msg_property", l);
} else {
propFunc = (l) -> null;
}
LongFunction<String> propFunc = lookupParameterFunc("msg_property");
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg_value");
} else if (cmdTpl.isDynamic("msg_value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l);
} else {
valueFunc = (l) -> null;
}
} else {
throw new PulsarDriverParamException("[resolveMsgSend()] \"msg_value\" field must be specified!");
}
LongFunction<String> valueFunc = lookupParameterFunc("msg_value", true);
return new PulsarProducerMapper(
cmdTpl,
@ -420,7 +295,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
.map(PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE::parseSimuType)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
.collect(Collectors.toCollection(LinkedHashSet::new));
}
private LongFunction<PulsarOp> resolveMsgConsume(
@ -432,32 +307,11 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
boolean e2eMsgProc,
LongFunction<String> rttTrackingFieldFunc
) {
LongFunction<String> subscription_name_func;
if (cmdTpl.isStatic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getStatic("subscription_name");
} else if (cmdTpl.isDynamic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getDynamic("subscription_name", l);
} else {
subscription_name_func = (l) -> null;
}
LongFunction<String> subscription_name_func = lookupParameterFunc("subscription_name");
LongFunction<String> subscription_type_func;
if (cmdTpl.isStatic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getStatic("subscription_type");
} else if (cmdTpl.isDynamic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getDynamic("subscription_type", l);
} else {
subscription_type_func = (l) -> null;
}
LongFunction<String> subscription_type_func = lookupParameterFunc("subscription_type");
LongFunction<String> consumer_name_func;
if (cmdTpl.isStatic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getStatic("consumer_name");
} else if (cmdTpl.isDynamic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getDynamic("consumer_name", l);
} else {
consumer_name_func = (l) -> null;
}
LongFunction<String> consumer_name_func = lookupParameterFunc("consumer_name");
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
@ -492,51 +346,16 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<String> payloadRttFieldFunc
) {
// Topic list (multi-topic)
LongFunction<String> topic_names_func;
if (cmdTpl.isStatic("topic_names")) {
topic_names_func = (l) -> cmdTpl.getStatic("topic_names");
} else if (cmdTpl.isDynamic("topic_names")) {
topic_names_func = (l) -> cmdTpl.getDynamic("topic_names", l);
} else {
topic_names_func = (l) -> null;
}
LongFunction<String> topic_names_func = lookupParameterFunc("topic_names");
// Topic pattern (multi-topic)
LongFunction<String> topics_pattern_func;
if (cmdTpl.isStatic("topics_pattern")) {
topics_pattern_func = (l) -> cmdTpl.getStatic("topics_pattern");
} else if (cmdTpl.isDynamic("topics_pattern")) {
topics_pattern_func = (l) -> cmdTpl.getDynamic("topics_pattern", l);
} else {
topics_pattern_func = (l) -> null;
}
LongFunction<String> topics_pattern_func = lookupParameterFunc("topics_pattern");
LongFunction<String> subscription_name_func;
if (cmdTpl.isStatic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getStatic("subscription_name");
} else if (cmdTpl.isDynamic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getDynamic("subscription_name", l);
} else {
subscription_name_func = (l) -> null;
}
LongFunction<String> subscription_name_func = lookupParameterFunc("subscription_name");
LongFunction<String> subscription_type_func;
if (cmdTpl.isStatic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getStatic("subscription_type");
} else if (cmdTpl.isDynamic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getDynamic("subscription_type", l);
} else {
subscription_type_func = (l) -> null;
}
LongFunction<String> subscription_type_func = lookupParameterFunc("subscription_type");
LongFunction<String> consumer_name_func;
if (cmdTpl.isStatic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getStatic("consumer_name");
} else if (cmdTpl.isDynamic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getDynamic("consumer_name", l);
} else {
consumer_name_func = (l) -> null;
}
LongFunction<String> consumer_name_func = lookupParameterFunc("consumer_name");
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
@ -564,28 +383,69 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
payloadRttFieldFunc);
}
private LongFunction<String> lookupParameterFunc(String parameterName) {
return lookupParameterFunc(parameterName, false, null);
}
private LongFunction<String> lookupParameterFunc(String parameterName, boolean required) {
return lookupParameterFunc(parameterName, required, null);
}
private LongFunction<String> lookupParameterFunc(String parameterName, boolean required, String defaultValue) {
if (cmdTpl.containsKey(parameterName)) {
LongFunction<String> lookupFunc;
if (cmdTpl.isStatic(parameterName)) {
String staticValue = cmdTpl.getStatic(parameterName);
lookupFunc = (l) -> staticValue;
} else if (cmdTpl.isDynamic(parameterName)) {
lookupFunc = (l) -> cmdTpl.getDynamic(parameterName, l);
} else {
lookupFunc = (l) -> defaultValue;
}
return lookupFunc;
} else {
if (required) {
throw new PulsarDriverParamException("\"" + parameterName + "\" field must be specified!");
} else {
return (l) -> defaultValue;
}
}
}
private String lookupStaticParameter(String parameterName) {
return lookupStaticParameter(parameterName, false, null);
}
private String lookupStaticParameter(String parameterName, boolean required, String defaultValue) {
if (cmdTpl.containsKey(parameterName)) {
if (cmdTpl.isStatic(parameterName)) {
return cmdTpl.getStatic(parameterName);
} else if (cmdTpl.isDynamic(parameterName)) {
throw new PulsarDriverParamException("\"" + parameterName + "\" parameter must be static");
} else {
return defaultValue;
}
} else {
if (required) {
throw new PulsarDriverParamException("\"" + parameterName + "\" field must be specified!");
} else {
return defaultValue;
}
}
}
private LongFunction<Boolean> toBooleanFunc(LongFunction<String> parameterFunc) {
return (l) -> BooleanUtils.toBoolean(parameterFunc.apply(l));
}
private LongFunction<PulsarOp> resolveMsgRead(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func
) {
LongFunction<String> reader_name_func;
if (cmdTpl.isStatic("reader_name")) {
reader_name_func = (l) -> cmdTpl.getStatic("reader_name");
} else if (cmdTpl.isDynamic("reader_name")) {
reader_name_func = (l) -> cmdTpl.getDynamic("reader_name", l);
} else {
reader_name_func = (l) -> null;
}
LongFunction<String> reader_name_func = lookupParameterFunc("reader_name");
LongFunction<String> start_msg_pos_str_func;
if (cmdTpl.isStatic("start_msg_position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start_msg_position");
} else if (cmdTpl.isDynamic("start_msg_position")) {
start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start_msg_position", l);
} else {
start_msg_pos_str_func = (l) -> null;
}
LongFunction<String> start_msg_pos_str_func = lookupParameterFunc("start_msg_position");
LongFunction<Reader<?>> readerFunc = (l) ->
clientSpace.getReader(
@ -607,14 +467,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<String> topic_uri_func,
LongFunction<Boolean> asyncApiFunc)
{
LongFunction<String> cycle_batch_producer_name_func;
if (cmdTpl.isStatic("batch_producer_name")) {
cycle_batch_producer_name_func = (l) -> cmdTpl.getStatic("batch_producer_name");
} else if (cmdTpl.isDynamic("batch_producer_name")) {
cycle_batch_producer_name_func = (l) -> cmdTpl.getDynamic("batch_producer_name", l);
} else {
cycle_batch_producer_name_func = (l) -> null;
}
LongFunction<String> cycle_batch_producer_name_func = lookupParameterFunc("batch_producer_name");
LongFunction<Producer<?>> batchProducerFunc =
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l));
@ -630,37 +483,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private LongFunction<PulsarOp> resolveMsgBatchSend(PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc)
{
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
} else if (cmdTpl.isDynamic("msg_key")) {
keyFunc = (l) -> cmdTpl.getDynamic("msg_key", l);
} else {
keyFunc = (l) -> null;
}
LongFunction<String> keyFunc = lookupParameterFunc("msg_key");
// message property
LongFunction<String> propFunc;
if (cmdTpl.isStatic("msg_property")) {
propFunc = (l) -> cmdTpl.getStatic("msg_property");
} else if (cmdTpl.isDynamic("msg_property")) {
propFunc = (l) -> cmdTpl.getDynamic("msg_property", l);
} else {
propFunc = (l) -> null;
}
LongFunction<String> propFunc = lookupParameterFunc("msg_property");
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
valueFunc = (l) -> cmdTpl.getStatic("msg_value");
} else if (cmdTpl.isDynamic("msg_value")) {
valueFunc = (l) -> cmdTpl.getDynamic("msg_value", l);
} else {
valueFunc = (l) -> null;
}
} else {
throw new PulsarDriverParamException("[resolveMsgBatchSend()] \"msg_value\" field must be specified!");
}
LongFunction<String> valueFunc = lookupParameterFunc("msg_value", true);
return new PulsarBatchProducerMapper(
cmdTpl,