mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Add support for
1) producer message compression 2) consumer DLT/negAck/ackTimeout policy Add sample NB scenario yaml files
This commit is contained in:
@@ -17,7 +17,6 @@
|
||||
package io.nosqlbench.adapter.pulsar;
|
||||
|
||||
import io.nosqlbench.adapter.pulsar.dispensers.*;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnsupportedOpException;
|
||||
import io.nosqlbench.adapter.pulsar.ops.PulsarOp;
|
||||
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
@@ -28,9 +27,6 @@ import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.nosqlbench.engine.api.templating.TypeAndTarget;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.admin.PulsarAdmin;
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
|
||||
public class PulsarOpMapper implements OpMapper<PulsarOp> {
|
||||
|
||||
@@ -51,10 +47,6 @@ public class PulsarOpMapper implements OpMapper<PulsarOp> {
|
||||
String spaceName = op.getStaticConfigOr("space", "default");
|
||||
PulsarSpace pulsarSpace = spaceCache.get(spaceName);
|
||||
|
||||
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
|
||||
PulsarAdmin pulsarAdmin = pulsarSpace.getPulsarAdmin();
|
||||
Schema<?> pulsarSchema = pulsarSpace.getPulsarSchema();
|
||||
|
||||
/*
|
||||
* If the user provides a body element, then they want to provide the JSON or
|
||||
* a data structure that can be converted into JSON, bypassing any further
|
||||
@@ -66,12 +58,6 @@ public class PulsarOpMapper implements OpMapper<PulsarOp> {
|
||||
else {
|
||||
TypeAndTarget<PulsarOpType, String> opType = op.getTypeAndTarget(PulsarOpType.class, String.class);
|
||||
|
||||
if (PulsarOpType.isValidPulsarOpType(opType.enumId.label)) {
|
||||
throw new PulsarAdapterUnsupportedOpException(
|
||||
"Unrecognized Pulsar Adapter Op Type -- must be one of the following values: \"" +
|
||||
PulsarOpType.getValidPulsarOpTypeList() + "\"!");
|
||||
}
|
||||
|
||||
return switch (opType.enumId) {
|
||||
case AdminTenant ->
|
||||
new AdminTenantOpDispenser(adapter, op, opType.targetFunction, pulsarSpace);
|
||||
@@ -87,7 +73,7 @@ public class PulsarOpMapper implements OpMapper<PulsarOp> {
|
||||
// NOTE: not sure how useful to have Pulsar message reader API in the NB performance testing
|
||||
// currently, the reader API in NB Pulsar driver is no-op (see TDOD in MessageReaderOp)
|
||||
//////////////////////////
|
||||
case MessageRead ->
|
||||
case MessageRead ->
|
||||
new MessageReaderOpDispenser(adapter, op, opType.targetFunction, pulsarSpace);
|
||||
};
|
||||
}
|
||||
|
||||
@@ -16,37 +16,14 @@
|
||||
|
||||
package io.nosqlbench.adapter.pulsar;
|
||||
|
||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public enum PulsarOpType {
|
||||
AdminTenant("admin-tenant"),
|
||||
AdminNamespace("admin-namespace"),
|
||||
AdminTopic("admin-topic"),
|
||||
MessageProduce("msg-send"),
|
||||
AdminTenant,
|
||||
AdminNamespace,
|
||||
AdminTopic,
|
||||
MessageProduce,
|
||||
// This also supports multi-topic message consumption
|
||||
MessageConsume("msg-consume"),
|
||||
MessageRead("msg-read");
|
||||
|
||||
public final String label;
|
||||
|
||||
PulsarOpType(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
|
||||
public static boolean isValidPulsarOpType(String type) {
|
||||
return Arrays.stream(values())
|
||||
.anyMatch(t -> t.label.equalsIgnoreCase(type));
|
||||
}
|
||||
|
||||
public static String getValidPulsarOpTypeList() {
|
||||
return Arrays.stream(values())
|
||||
.map(t -> t.label)
|
||||
.collect(Collectors.joining(", "));
|
||||
}
|
||||
MessageConsume,
|
||||
MessageRead;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ package io.nosqlbench.adapter.pulsar;
|
||||
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
||||
import io.nosqlbench.adapter.pulsar.util.PulsarAdapterUtil;
|
||||
import io.nosqlbench.adapter.pulsar.util.PulsarNBClientConf;
|
||||
import io.nosqlbench.adapter.pulsar.util.PulsarClientConf;
|
||||
import io.nosqlbench.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||
@@ -45,7 +45,7 @@ public class PulsarSpace implements AutoCloseable {
|
||||
private final String pulsarSvcUrl;
|
||||
private final String webSvcUrl;
|
||||
|
||||
private PulsarNBClientConf pulsarNBClientConf;
|
||||
private PulsarClientConf pulsarClientConf;
|
||||
private PulsarClient pulsarClient;
|
||||
private PulsarAdmin pulsarAdmin;
|
||||
private Schema<?> pulsarSchema;
|
||||
@@ -61,7 +61,7 @@ public class PulsarSpace implements AutoCloseable {
|
||||
|
||||
this.pulsarSvcUrl = cfg.get("service_url");
|
||||
this.webSvcUrl = cfg.get("web_url");
|
||||
this.pulsarNBClientConf = new PulsarNBClientConf(cfg.get("config"));
|
||||
this.pulsarClientConf = new PulsarClientConf(cfg.get("config"));
|
||||
|
||||
initPulsarAdminAndClientObj();
|
||||
createPulsarSchemaFromConf();
|
||||
@@ -82,7 +82,7 @@ public class PulsarSpace implements AutoCloseable {
|
||||
|
||||
public String getPulsarSvcUrl() { return pulsarSvcUrl; }
|
||||
public String getWebSvcUrl() { return webSvcUrl; }
|
||||
public PulsarNBClientConf getPulsarNBClientConf() { return pulsarNBClientConf; }
|
||||
public PulsarClientConf getPulsarNBClientConf() { return pulsarClientConf; }
|
||||
public PulsarClient getPulsarClient() { return pulsarClient; }
|
||||
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
|
||||
public Schema<?> getPulsarSchema() { return pulsarSchema; }
|
||||
@@ -111,7 +111,7 @@ public class PulsarSpace implements AutoCloseable {
|
||||
ClientBuilder clientBuilder = PulsarClient.builder();
|
||||
|
||||
try {
|
||||
Map<String, Object> clientConfMap = pulsarNBClientConf.getClientConfMap();
|
||||
Map clientConfMap = pulsarClientConf.getClientConfMapRaw();
|
||||
|
||||
// Override "client.serviceUrl" setting in config.properties
|
||||
clientConfMap.remove("serviceUrl");
|
||||
@@ -119,9 +119,9 @@ public class PulsarSpace implements AutoCloseable {
|
||||
|
||||
// Pulsar Authentication
|
||||
String authPluginClassName =
|
||||
(String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authPulginClassName.label);
|
||||
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authPulginClassName.label);
|
||||
String authParams =
|
||||
(String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authParams.label);
|
||||
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.authParams.label);
|
||||
|
||||
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
|
||||
adminBuilder.authentication(authPluginClassName, authParams);
|
||||
@@ -131,7 +131,7 @@ public class PulsarSpace implements AutoCloseable {
|
||||
boolean useTls = StringUtils.contains(pulsarSvcUrl, "pulsar+ssl");
|
||||
if ( useTls ) {
|
||||
String tlsHostnameVerificationEnableStr =
|
||||
(String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
|
||||
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
|
||||
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
|
||||
|
||||
adminBuilder
|
||||
@@ -140,14 +140,14 @@ public class PulsarSpace implements AutoCloseable {
|
||||
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
|
||||
|
||||
String tlsTrustCertsFilePath =
|
||||
(String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
|
||||
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
|
||||
if (!StringUtils.isBlank(tlsTrustCertsFilePath)) {
|
||||
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
|
||||
clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
|
||||
}
|
||||
|
||||
String tlsAllowInsecureConnectionStr =
|
||||
(String) pulsarNBClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
|
||||
pulsarClientConf.getClientConfValue(PulsarAdapterUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
|
||||
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
|
||||
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
|
||||
clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
|
||||
@@ -188,14 +188,12 @@ public class PulsarSpace implements AutoCloseable {
|
||||
|
||||
private Schema<?> buildSchemaFromDefinition(String schemaTypeConfEntry,
|
||||
String schemaDefinitionConfEntry) {
|
||||
Object value = pulsarNBClientConf.getSchemaConfValue(schemaTypeConfEntry);
|
||||
Object schemaDefinition = pulsarNBClientConf.getSchemaConfValue(schemaDefinitionConfEntry);
|
||||
String schemaType = (value != null) ? value.toString() : "";
|
||||
String schemaType = pulsarClientConf.getSchemaConfValue(schemaTypeConfEntry);
|
||||
String schemaDef = pulsarClientConf.getSchemaConfValue(schemaDefinitionConfEntry);
|
||||
|
||||
Schema<?> result;
|
||||
if (PulsarAdapterUtil.isAvroSchemaTypeStr(schemaType)) {
|
||||
String schemaDefStr = (schemaDefinition != null) ? schemaDefinition.toString() : "";
|
||||
result = PulsarAdapterUtil.getAvroSchema(schemaType, schemaDefStr);
|
||||
result = PulsarAdapterUtil.getAvroSchema(schemaType, schemaDef);
|
||||
} else if (PulsarAdapterUtil.isPrimitiveSchemaTypeStr(schemaType)) {
|
||||
result = PulsarAdapterUtil.getPrimitiveTypeSchema(schemaType);
|
||||
} else if (PulsarAdapterUtil.isAutoConsumeSchemaTypeStr(schemaType)) {
|
||||
@@ -210,12 +208,12 @@ public class PulsarSpace implements AutoCloseable {
|
||||
pulsarSchema = buildSchemaFromDefinition("schema.type", "schema.definition");
|
||||
|
||||
// this is to allow KEY_VALUE schema
|
||||
if (pulsarNBClientConf.hasSchemaConfKey("schema.key.type")) {
|
||||
if (pulsarClientConf.hasSchemaConfKey("schema.key.type")) {
|
||||
Schema<?> pulsarKeySchema = buildSchemaFromDefinition("schema.key.type", "schema.key.definition");
|
||||
Object encodingType = pulsarNBClientConf.getSchemaConfValue("schema.keyvalue.encodingtype");
|
||||
String encodingType = pulsarClientConf.getSchemaConfValue("schema.keyvalue.encodingtype");
|
||||
KeyValueEncodingType keyValueEncodingType = KeyValueEncodingType.SEPARATED;
|
||||
if (encodingType != null) {
|
||||
keyValueEncodingType = KeyValueEncodingType.valueOf(encodingType.toString());
|
||||
keyValueEncodingType = KeyValueEncodingType.valueOf(encodingType);
|
||||
}
|
||||
pulsarSchema = Schema.KeyValue(pulsarKeySchema, pulsarSchema, keyValueEncodingType);
|
||||
}
|
||||
|
||||
@@ -246,7 +246,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
|
||||
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
|
||||
|
||||
// Get other possible producer settings that are set at global level
|
||||
Map<String, Object> producerConf = pulsarSpace.getPulsarNBClientConf().getProducerConfMap();
|
||||
Map<String, Object> producerConf = pulsarSpace.getPulsarNBClientConf().getProducerConfMapTgt();
|
||||
|
||||
// Remove global level settings: "topicName" and "producerName"
|
||||
producerConf.remove(PulsarAdapterUtil.PRODUCER_CONF_STD_KEY.topicName.label);
|
||||
@@ -265,13 +265,11 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
|
||||
producer = producerBuilder.create();
|
||||
pulsarSpace.setProducer(producerCacheKey, producer);
|
||||
|
||||
if (instrument) {
|
||||
pulsarAdapterMetrics.registerProducerApiMetrics(producer,
|
||||
getPulsarAPIMetricsPrefix(
|
||||
PulsarAdapterUtil.PULSAR_API_TYPE.PRODUCER.label,
|
||||
producerName,
|
||||
topicName));
|
||||
}
|
||||
pulsarAdapterMetrics.registerProducerApiMetrics(producer,
|
||||
getPulsarAPIMetricsPrefix(
|
||||
PulsarAdapterUtil.PULSAR_API_TYPE.PRODUCER.label,
|
||||
producerName,
|
||||
topicName));
|
||||
}
|
||||
catch (PulsarClientException ple) {
|
||||
throw new PulsarAdapterUnexpectedException("Failed to create a Pulsar producer.");
|
||||
@@ -458,7 +456,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
|
||||
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();
|
||||
|
||||
// Get other possible consumer settings that are set at global level
|
||||
Map<String, Object> consumerConf = new HashMap<>(pulsarSpace.getPulsarNBClientConf().getConsumerConfMap());
|
||||
Map<String, Object> consumerConf = new HashMap<>(pulsarSpace.getPulsarNBClientConf().getConsumerConfMapTgt());
|
||||
|
||||
// Remove global level settings:
|
||||
// - "topicNames", "topicsPattern", "subscriptionName", "subscriptionType", "consumerName"
|
||||
@@ -628,7 +626,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, P
|
||||
if (reader == null) {
|
||||
PulsarClient pulsarClient = pulsarSpace.getPulsarClient();;
|
||||
|
||||
Map<String, Object> readerConf = pulsarSpace.getPulsarNBClientConf().getReaderConfMap();
|
||||
Map<String, Object> readerConf = pulsarSpace.getPulsarNBClientConf().getReaderConfMapTgt();
|
||||
|
||||
// Remove global level settings: "topicName" and "readerName"
|
||||
readerConf.remove(PulsarAdapterUtil.READER_CONF_STD_KEY.topicName.label);
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
package io.nosqlbench.adapter.pulsar.ops;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterAsyncOperationFailedException;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
||||
@@ -84,7 +83,6 @@ public class MessageProducerOp extends PulsarClientOp {
|
||||
this.msgValue = msgValue;
|
||||
|
||||
getMsgPropMapFromRawJsonStr();
|
||||
getMsgPropMapFromRawJsonStr();
|
||||
}
|
||||
|
||||
private MessageSequenceNumberSendingHandler getMessageSequenceNumberSendingHandler(String topicName) {
|
||||
@@ -131,7 +129,7 @@ public class MessageProducerOp extends PulsarClientOp {
|
||||
}
|
||||
|
||||
// set message key
|
||||
if (!StringUtils.isBlank(msgKey)) {
|
||||
if ( !StringUtils.isBlank(msgKey) && !(pulsarSchema instanceof KeyValueSchema) ) {
|
||||
typedMessageBuilder = typedMessageBuilder.key(msgKey);
|
||||
}
|
||||
|
||||
@@ -145,31 +143,32 @@ public class MessageProducerOp extends PulsarClientOp {
|
||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||
if (pulsarSchema instanceof KeyValueSchema) {
|
||||
|
||||
// {KEY IN JSON}||{VALUE IN JSON}
|
||||
int separator = msgValue.indexOf("}||{");
|
||||
if (separator < 0) {
|
||||
throw new IllegalArgumentException("KeyValue payload MUST be in form {KEY IN JSON}||{VALUE IN JSON} (with 2 pipes that separate the KEY part from the VALUE part)");
|
||||
}
|
||||
String keyInput = msgValue.substring(0, separator + 1);
|
||||
String valueInput = msgValue.substring(separator + 3);
|
||||
// // {KEY IN JSON}||{VALUE IN JSON}
|
||||
// int separator = msgValue.indexOf("}||{");
|
||||
// if (separator < 0) {
|
||||
// throw new IllegalArgumentException("KeyValue payload MUST be in form {KEY IN JSON}||{VALUE IN JSON} (with 2 pipes that separate the KEY part from the VALUE part)");
|
||||
// }
|
||||
// String keyInput = msgValue.substring(0, separator + 1);
|
||||
// String valueInput = msgValue.substring(separator + 3);
|
||||
|
||||
KeyValueSchema keyValueSchema = (KeyValueSchema) pulsarSchema;
|
||||
org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
|
||||
GenericRecord payload = PulsarAvroSchemaUtil.GetGenericRecord_PulsarAvro(
|
||||
(GenericAvroSchema) keyValueSchema.getValueSchema(),
|
||||
avroSchema,
|
||||
valueInput
|
||||
msgValue
|
||||
);
|
||||
|
||||
org.apache.avro.Schema avroSchemaForKey = getKeyAvroSchemaFromConfiguration();
|
||||
GenericRecord key = PulsarAvroSchemaUtil.GetGenericRecord_PulsarAvro(
|
||||
(GenericAvroSchema) keyValueSchema.getKeySchema(),
|
||||
avroSchemaForKey,
|
||||
keyInput
|
||||
msgKey
|
||||
);
|
||||
|
||||
typedMessageBuilder = typedMessageBuilder.value(new KeyValue(key, payload));
|
||||
// TODO: add a way to calculate the message size for KEY_VALUE messages
|
||||
messageSize = msgValue.length();
|
||||
messageSize = msgKey.length() + msgValue.length();
|
||||
}
|
||||
else if (PulsarAdapterUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
||||
GenericRecord payload = PulsarAvroSchemaUtil.GetGenericRecord_PulsarAvro(
|
||||
@@ -209,14 +208,14 @@ public class MessageProducerOp extends PulsarClientOp {
|
||||
logger.debug("({}) Sync message sent: msg-key={}; msg-properties={}; msg-payload={})",
|
||||
producer.getProducerName(),
|
||||
msgKey,
|
||||
msgPropRawJsonStr,
|
||||
msgProperties,
|
||||
avroGenericRecord.toString());
|
||||
}
|
||||
else {
|
||||
logger.debug("({}) Sync message sent; msg-key={}; msg-properties={}; msg-payload={}",
|
||||
producer.getProducerName(),
|
||||
msgKey,
|
||||
msgPropRawJsonStr,
|
||||
msgProperties,
|
||||
msgValue);
|
||||
}
|
||||
}
|
||||
@@ -225,7 +224,7 @@ public class MessageProducerOp extends PulsarClientOp {
|
||||
String errMsg =
|
||||
"Sync message sending failed: " +
|
||||
"key - " + msgKey + "; " +
|
||||
"properties - " + msgPropRawJsonStr + "; " +
|
||||
"properties - " + msgProperties + "; " +
|
||||
"payload - " + msgValue;
|
||||
|
||||
logger.trace(errMsg);
|
||||
@@ -260,21 +259,21 @@ public class MessageProducerOp extends PulsarClientOp {
|
||||
logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})",
|
||||
producer.getProducerName(),
|
||||
msgKey,
|
||||
msgPropRawJsonStr,
|
||||
msgProperties,
|
||||
avroGenericRecord.toString());
|
||||
}
|
||||
else {
|
||||
logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}",
|
||||
producer.getProducerName(),
|
||||
msgKey,
|
||||
msgPropRawJsonStr,
|
||||
msgProperties,
|
||||
msgValue);
|
||||
}
|
||||
}
|
||||
}).exceptionally(ex -> {
|
||||
logger.error("Async message sending failed: " +
|
||||
"key - " + msgKey + "; " +
|
||||
"properties - " + msgPropRawJsonStr + "; " +
|
||||
"properties - " + msgProperties + "; " +
|
||||
"payload - " + msgValue);
|
||||
|
||||
throw new PulsarAdapterAsyncOperationFailedException(ex);
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
package io.nosqlbench.adapter.pulsar.util;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterInvalidParamException;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterUnexpectedException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@@ -460,19 +462,25 @@ public class PulsarAdapterUtil {
|
||||
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
|
||||
if (isAvroSchemaTypeStr(typeStr)) {
|
||||
if (StringUtils.isBlank(schemaDefinitionStr)) {
|
||||
throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!");
|
||||
} else if (schemaDefinitionStr.startsWith(filePrefix)) {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
"Schema definition must be provided for \"Avro\" schema type!");
|
||||
}
|
||||
else if (schemaDefinitionStr.startsWith(filePrefix)) {
|
||||
try {
|
||||
Path filePath = Paths.get(URI.create(schemaDefinitionStr));
|
||||
schemaDefinitionStr = Files.readString(filePath, StandardCharsets.US_ASCII);
|
||||
} catch (IOException ioe) {
|
||||
throw new RuntimeException("Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
throw new PulsarAdapterUnexpectedException(
|
||||
"Error reading the specified \"Avro\" schema definition file: " + definitionStr + ": " + ioe.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
schema = PulsarAvroSchemaUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
|
||||
} else {
|
||||
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
|
||||
}
|
||||
else {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
"Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
|
||||
}
|
||||
|
||||
return schema;
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
||||
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.builder.ToStringBuilder;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@@ -32,9 +33,9 @@ import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
public class PulsarNBClientConf {
|
||||
public class PulsarClientConf {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(PulsarNBClientConf.class);
|
||||
private final static Logger logger = LogManager.getLogger(PulsarClientConf.class);
|
||||
|
||||
private String canonicalFilePath = "";
|
||||
|
||||
@@ -43,14 +44,44 @@ public class PulsarNBClientConf {
|
||||
public static final String PRODUCER_CONF_PREFIX = "producer";
|
||||
public static final String CONSUMER_CONF_PREFIX = "consumer";
|
||||
public static final String READER_CONF_PREFIX = "reader";
|
||||
private final HashMap<String, Object> schemaConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> clientConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> producerConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> consumerConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> readerConfMap = new HashMap<>();
|
||||
// TODO: add support for other operation types: websocket-producer, managed-ledger
|
||||
private final Map<String, String> schemaConfMapRaw = new HashMap<>();
|
||||
private final Map<String, String> clientConfMapRaw = new HashMap<>();
|
||||
|
||||
public PulsarNBClientConf(String fileName) {
|
||||
// "Raw" map is what is read from the config properties file
|
||||
// "Tgt" map is what is really needed in the Pulsar producer/consumer/reader API
|
||||
private final Map<String, String> producerConfMapRaw = new HashMap<>();
|
||||
private final Map<String, Object> producerConfMapTgt = new HashMap<>();
|
||||
|
||||
private final Map<String, String> consumerConfMapRaw = new HashMap<>();
|
||||
private final Map<String, Object> consumerConfMapTgt = new HashMap<>();
|
||||
|
||||
private final Map<String, String> readerConfMapRaw = new HashMap<>();
|
||||
private final Map<String, Object> readerConfMapTgt = new HashMap<>();
|
||||
|
||||
public PulsarClientConf(String fileName) {
|
||||
|
||||
//////////////////
|
||||
// Read related Pulsar client configuration settings from a file
|
||||
readRawConfFromFile(fileName);
|
||||
|
||||
|
||||
//////////////////
|
||||
// Ignores the following Pulsar client/producer/consumer configurations since
|
||||
// they need to be specified either as the NB CLI parameters or as the NB yaml
|
||||
// OpTemplate parameters.
|
||||
clientConfMapRaw.remove("brokerServiceUrl");
|
||||
clientConfMapRaw.remove("webServiceUrl");
|
||||
|
||||
|
||||
//////////////////
|
||||
// Convert the raw configuration map (<String,String>) to the required map (<String,Object>)
|
||||
producerConfMapTgt.putAll(PulsarConfConverter.convertRawProducerConf(producerConfMapRaw));
|
||||
consumerConfMapTgt.putAll(PulsarConfConverter.convertRawConsumerConf(consumerConfMapRaw));
|
||||
// TODO: Reader API is not disabled at the moment. Revisit when needed
|
||||
}
|
||||
|
||||
|
||||
public void readRawConfFromFile(String fileName) {
|
||||
File file = new File(fileName);
|
||||
|
||||
try {
|
||||
@@ -65,44 +96,37 @@ public class PulsarNBClientConf {
|
||||
|
||||
Configuration config = builder.getConfiguration();
|
||||
|
||||
// Get schema specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
|
||||
for (Iterator<String> it = config.getKeys(); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get client connection specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
if (!StringUtils.isBlank(confVal)) {
|
||||
|
||||
// Get producer specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get consumer specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get reader specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(READER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
readerConfMap.put(confKey.substring(READER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
// Get schema specific configuration settings, removing "schema." prefix
|
||||
if (StringUtils.startsWith(confKey, SCHEMA_CONF_PREFIX)) {
|
||||
schemaConfMapRaw.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), confVal);
|
||||
}
|
||||
// Get client connection specific configuration settings, removing "client." prefix
|
||||
// <<< https://pulsar.apache.org/docs/reference-configuration/#client >>>
|
||||
else if (StringUtils.startsWith(confKey, CLIENT_CONF_PREFIX)) {
|
||||
clientConfMapRaw.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), confVal);
|
||||
}
|
||||
// Get producer specific configuration settings, removing "producer." prefix
|
||||
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>>
|
||||
else if (StringUtils.startsWith(confKey, PRODUCER_CONF_PREFIX)) {
|
||||
producerConfMapRaw.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), confVal);
|
||||
}
|
||||
// Get consumer specific configuration settings, removing "consumer." prefix
|
||||
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer >>>
|
||||
else if (StringUtils.startsWith(confKey, CONSUMER_CONF_PREFIX)) {
|
||||
consumerConfMapRaw.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), confVal);
|
||||
}
|
||||
// Get reader specific configuration settings, removing "reader." prefix
|
||||
// <<< https://pulsar.apache.org/docs/2.10.x/client-libraries-java/#configure-reader >>>
|
||||
else if (StringUtils.startsWith(confKey, READER_CONF_PREFIX)) {
|
||||
readerConfMapRaw.put(confKey.substring(READER_CONF_PREFIX.length() + 1), confVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
logger.error("Can't read the specified config properties file!");
|
||||
@@ -114,78 +138,59 @@ public class PulsarNBClientConf {
|
||||
}
|
||||
|
||||
|
||||
public Map<String, String> getSchemaConfMapRaw() { return this.schemaConfMapRaw; }
|
||||
public Map<String, String> getClientConfMapRaw() { return this.clientConfMapRaw; }
|
||||
public Map<String, String> getProducerConfMapRaw() { return this.producerConfMapRaw; }
|
||||
public Map<String, Object> getProducerConfMapTgt() { return this.producerConfMapTgt; }
|
||||
public Map<String, String> getConsumerConfMapRaw() { return this.consumerConfMapRaw; }
|
||||
public Map<String, Object> getConsumerConfMapTgt() { return this.consumerConfMapTgt; }
|
||||
public Map<String, String> getReaderConfMapRaw() { return this.readerConfMapRaw; }
|
||||
public Map<String, Object> getReaderConfMapTgt() { return this.readerConfMapTgt; }
|
||||
|
||||
|
||||
public String toString() {
|
||||
return new ToStringBuilder(this).
|
||||
append("schemaConfMapRaw", schemaConfMapRaw.toString()).
|
||||
append("clientConfMapRaw", clientConfMapRaw.toString()).
|
||||
append("producerConfMapRaw", producerConfMapRaw.toString()).
|
||||
append("consumerConfMapRaw", consumerConfMapRaw.toString()).
|
||||
append("readerConfMapRaw", readerConfMapRaw.toString()).
|
||||
toString();
|
||||
}
|
||||
|
||||
//////////////////
|
||||
// Get Schema related config
|
||||
public Map<String, Object> getSchemaConfMap() {
|
||||
return this.schemaConfMap;
|
||||
}
|
||||
public boolean hasSchemaConfKey(String key) {
|
||||
if (key.contains(SCHEMA_CONF_PREFIX))
|
||||
return schemaConfMap.containsKey(key.substring(SCHEMA_CONF_PREFIX.length() + 1));
|
||||
return schemaConfMapRaw.containsKey(key.substring(SCHEMA_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return schemaConfMap.containsKey(key);
|
||||
return schemaConfMapRaw.containsKey(key);
|
||||
}
|
||||
public Object getSchemaConfValue(String key) {
|
||||
public String getSchemaConfValue(String key) {
|
||||
if (key.contains(SCHEMA_CONF_PREFIX))
|
||||
return schemaConfMap.get(key.substring(SCHEMA_CONF_PREFIX.length()+1));
|
||||
return schemaConfMapRaw.get(key.substring(SCHEMA_CONF_PREFIX.length()+1));
|
||||
else
|
||||
return schemaConfMap.get(key);
|
||||
}
|
||||
public void setSchemaConfValue(String key, Object value) {
|
||||
if (key.contains(SCHEMA_CONF_PREFIX))
|
||||
schemaConfMap.put(key.substring(SCHEMA_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
schemaConfMap.put(key, value);
|
||||
return schemaConfMapRaw.get(key);
|
||||
}
|
||||
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar client related config
|
||||
public Map<String, Object> getClientConfMap() {
|
||||
return this.clientConfMap;
|
||||
}
|
||||
public boolean hasClientConfKey(String key) {
|
||||
public String getClientConfValue(String key) {
|
||||
if (key.contains(CLIENT_CONF_PREFIX))
|
||||
return clientConfMap.containsKey(key.substring(CLIENT_CONF_PREFIX.length() + 1));
|
||||
return clientConfMapRaw.get(key.substring(CLIENT_CONF_PREFIX.length()+1));
|
||||
else
|
||||
return clientConfMap.containsKey(key);
|
||||
}
|
||||
public Object getClientConfValue(String key) {
|
||||
if (key.contains(CLIENT_CONF_PREFIX))
|
||||
return clientConfMap.get(key.substring(CLIENT_CONF_PREFIX.length()+1));
|
||||
else
|
||||
return clientConfMap.get(key);
|
||||
}
|
||||
public void setClientConfValue(String key, Object value) {
|
||||
if (key.contains(CLIENT_CONF_PREFIX))
|
||||
clientConfMap.put(key.substring(CLIENT_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
clientConfMap.put(key, value);
|
||||
return clientConfMapRaw.get(key);
|
||||
}
|
||||
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar producer related config
|
||||
public Map<String, Object> getProducerConfMap() {
|
||||
return this.producerConfMap;
|
||||
}
|
||||
public boolean hasProducerConfKey(String key) {
|
||||
if (key.contains(PRODUCER_CONF_PREFIX))
|
||||
return producerConfMap.containsKey(key.substring(PRODUCER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return producerConfMap.containsKey(key);
|
||||
}
|
||||
public Object getProducerConfValue(String key) {
|
||||
if (key.contains(PRODUCER_CONF_PREFIX))
|
||||
return producerConfMap.get(key.substring(PRODUCER_CONF_PREFIX.length()+1));
|
||||
return producerConfMapTgt.get(key.substring(PRODUCER_CONF_PREFIX.length()+1));
|
||||
else
|
||||
return producerConfMap.get(key);
|
||||
}
|
||||
public void setProducerConfValue(String key, Object value) {
|
||||
if (key.contains(PRODUCER_CONF_PREFIX))
|
||||
producerConfMap.put(key.substring(PRODUCER_CONF_PREFIX.length()+1), value);
|
||||
else
|
||||
producerConfMap.put(key, value);
|
||||
return producerConfMapTgt.get(key);
|
||||
}
|
||||
// other producer helper functions ...
|
||||
public String getProducerName() {
|
||||
@@ -208,30 +213,15 @@ public class PulsarNBClientConf {
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar consumer related config
|
||||
public Map<String, Object> getConsumerConfMap() {
|
||||
return this.consumerConfMap;
|
||||
}
|
||||
public boolean hasConsumerConfKey(String key) {
|
||||
public String getConsumerConfValue(String key) {
|
||||
if (key.contains(CONSUMER_CONF_PREFIX))
|
||||
return consumerConfMap.containsKey(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
|
||||
return consumerConfMapRaw.get(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return consumerConfMap.containsKey(key);
|
||||
}
|
||||
public Object getConsumerConfValue(String key) {
|
||||
if (key.contains(CONSUMER_CONF_PREFIX))
|
||||
return consumerConfMap.get(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return consumerConfMap.get(key);
|
||||
}
|
||||
public void setConsumerConfValue(String key, Object value) {
|
||||
if (key.contains(CONSUMER_CONF_PREFIX))
|
||||
consumerConfMap.put(key.substring(CONSUMER_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
consumerConfMap.put(key, value);
|
||||
return consumerConfMapRaw.get(key);
|
||||
}
|
||||
// Other consumer helper functions ...
|
||||
public String getConsumerTopicNames() {
|
||||
Object confValue = getConsumerConfValue(
|
||||
String confValue = getConsumerConfValue(
|
||||
"consumer." + PulsarAdapterUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
|
||||
if (confValue == null)
|
||||
return "";
|
||||
@@ -284,26 +274,23 @@ public class PulsarNBClientConf {
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar reader related config
|
||||
public Map<String, Object> getReaderConfMap() {
|
||||
return this.readerConfMap;
|
||||
}
|
||||
public boolean hasReaderConfKey(String key) {
|
||||
if (key.contains(READER_CONF_PREFIX))
|
||||
return readerConfMap.containsKey(key.substring(READER_CONF_PREFIX.length() + 1));
|
||||
return readerConfMapRaw.containsKey(key.substring(READER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return readerConfMap.containsKey(key);
|
||||
return readerConfMapRaw.containsKey(key);
|
||||
}
|
||||
public Object getReaderConfValue(String key) {
|
||||
if (key.contains(READER_CONF_PREFIX))
|
||||
return readerConfMap.get(key.substring(READER_CONF_PREFIX.length() + 1));
|
||||
return readerConfMapRaw.get(key.substring(READER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return readerConfMap.get(key);
|
||||
return readerConfMapRaw.get(key);
|
||||
}
|
||||
public void setReaderConfValue(String key, Object value) {
|
||||
public void setReaderConfValue(String key, String value) {
|
||||
if (key.contains(READER_CONF_PREFIX))
|
||||
readerConfMap.put(key.substring(READER_CONF_PREFIX.length() + 1), value);
|
||||
readerConfMapRaw.put(key.substring(READER_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
readerConfMap.put(key, value);
|
||||
readerConfMapRaw.put(key, value);
|
||||
}
|
||||
// Other reader helper functions ...
|
||||
public String getReaderTopicName() {
|
||||
@@ -0,0 +1,367 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.pulsar.util;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.nosqlbench.adapter.pulsar.exception.PulsarAdapterInvalidParamException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.pulsar.client.api.CompressionType;
|
||||
import org.apache.pulsar.client.api.DeadLetterPolicy;
|
||||
import org.apache.pulsar.client.api.RedeliveryBackoff;
|
||||
import org.apache.pulsar.client.impl.MultiplierRedeliveryBackoff;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class PulsarConfConverter {
|
||||
|
||||
// <<< https://pulsar.apache.org/docs/client-libraries-java/#configure-producer >>>
|
||||
private final static Map<String, String> validPulsarProducerConfKeyTypeMap = Map.ofEntries(
|
||||
Map.entry("topicName", "String"),
|
||||
Map.entry("producerName","String"),
|
||||
Map.entry("sendTimeoutMs","long"),
|
||||
Map.entry("blockIfQueueFull","boolean"),
|
||||
Map.entry("maxPendingMessages","int"),
|
||||
Map.entry("maxPendingMessagesAcrossPartitions","int"),
|
||||
Map.entry("messageRoutingMode","MessageRoutingMode"),
|
||||
Map.entry("hashingScheme","HashingScheme"),
|
||||
Map.entry("cryptoFailureAction","ProducerCryptoFailureAction"),
|
||||
Map.entry("batchingMaxPublishDelayMicros","long"),
|
||||
Map.entry("batchingMaxMessages","int"),
|
||||
Map.entry("batchingEnabled","boolean"),
|
||||
Map.entry("chunkingEnabled","boolean"),
|
||||
Map.entry("compressionType","CompressionType"),
|
||||
Map.entry("initialSubscriptionName","string")
|
||||
);
|
||||
public static Map<String, Object> convertRawProducerConf(Map<String, String> pulsarProducerConfMapRaw) {
|
||||
Map<String, Object> producerConfObjMap = new HashMap<>();
|
||||
setConfObjMapForPrimitives(producerConfObjMap, pulsarProducerConfMapRaw, validPulsarProducerConfKeyTypeMap);
|
||||
|
||||
/**
|
||||
* Non-primitive type processing for Pulsar producer configuration items
|
||||
*/
|
||||
// TODO: Skip the following Pulsar configuration items for now because they're not really
|
||||
// needed in the NB S4J testing at the moment. Add support for them when needed.
|
||||
// * messageRoutingMode
|
||||
// * hashingScheme
|
||||
// * cryptoFailureAction
|
||||
|
||||
// "compressionType" has value type "CompressionType"
|
||||
// - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY'
|
||||
String confKeyName = "compressionType";
|
||||
String confVal = pulsarProducerConfMapRaw.get(confKeyName);
|
||||
String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)";
|
||||
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) {
|
||||
CompressionType compressionType = CompressionType.NONE;
|
||||
|
||||
switch (StringUtils.upperCase(confVal)) {
|
||||
case "LZ4":
|
||||
compressionType = CompressionType.LZ4;
|
||||
case "ZLIB":
|
||||
compressionType = CompressionType.ZLIB;
|
||||
case "ZSTD":
|
||||
compressionType = CompressionType.ZSTD;
|
||||
case "SNAPPY":
|
||||
compressionType = CompressionType.SNAPPY;
|
||||
}
|
||||
|
||||
producerConfObjMap.put(confKeyName, compressionType);
|
||||
} else {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal));
|
||||
}
|
||||
}
|
||||
|
||||
return producerConfObjMap;
|
||||
}
|
||||
|
||||
|
||||
// https://pulsar.apache.org/docs/client-libraries-java/#configure-consumer
|
||||
private final static Map<String, String> validPulsarConsumerConfKeyTypeMap = Map.ofEntries(
|
||||
Map.entry("topicNames", "Set<String>"),
|
||||
Map.entry("topicsPattern","Pattern"),
|
||||
Map.entry("subscriptionName","String"),
|
||||
Map.entry("subscriptionType","SubscriptionType"),
|
||||
Map.entry("receiverQueueSize","int"),
|
||||
Map.entry("acknowledgementsGroupTimeMicros","long"),
|
||||
Map.entry("negativeAckRedeliveryDelayMicros","long"),
|
||||
Map.entry("maxTotalReceiverQueueSizeAcrossPartitions","int"),
|
||||
Map.entry("consumerName","String"),
|
||||
Map.entry("ackTimeoutMillis","long"),
|
||||
Map.entry("tickDurationMillis","long"),
|
||||
Map.entry("priorityLevel","int"),
|
||||
Map.entry("cryptoFailureAction","ConsumerCryptoFailureAction"),
|
||||
Map.entry("properties","SortedMap<String, String>"),
|
||||
Map.entry("readCompacted","boolean"),
|
||||
Map.entry("subscriptionInitialPosition", "SubscriptionInitialPosition"),
|
||||
Map.entry("patternAutoDiscoveryPeriod", "int"),
|
||||
Map.entry("regexSubscriptionMode", "RegexSubscriptionMode"),
|
||||
Map.entry("deadLetterPolicy", "DeadLetterPolicy"),
|
||||
Map.entry("autoUpdatePartitions", "boolean"),
|
||||
Map.entry("replicateSubscriptionState", "boolean"),
|
||||
Map.entry("negativeAckRedeliveryBackoff", "RedeliveryBackoff"),
|
||||
Map.entry("ackTimeoutRedeliveryBackoff", "RedeliveryBackoff"),
|
||||
Map.entry("autoAckOldestChunkedMessageOnQueueFull", "boolean"),
|
||||
Map.entry("maxPendingChunkedMessage", "int"),
|
||||
Map.entry("expireTimeOfIncompleteChunkedMessageMillis", "long")
|
||||
);
|
||||
public static Map<String, Object> convertRawConsumerConf(Map<String, String> pulsarConsumerConfMapRaw) {
|
||||
Map<String, Object> consumerConfObjMap = new HashMap<>();
|
||||
setConfObjMapForPrimitives(consumerConfObjMap, pulsarConsumerConfMapRaw, validPulsarConsumerConfKeyTypeMap);
|
||||
|
||||
/**
|
||||
* Non-primitive type processing for Pulsar consumer configuration items
|
||||
*/
|
||||
// NOTE: The following non-primitive type configuration items are excluded since
|
||||
// they'll be handled in PulsarBasedOpDispenser.getConsumer() method directly
|
||||
// * topicNames
|
||||
// * topicPattern
|
||||
// * subscriptionType
|
||||
|
||||
|
||||
// TODO: Skip the following Pulsar configuration items for now because they're not really
|
||||
// needed in the NB S4J testing right now. Add the support for them when needed.
|
||||
// * subscriptionInitialPosition
|
||||
// * regexSubscriptionMode
|
||||
// * cryptoFailureAction
|
||||
|
||||
|
||||
// "properties" has value type "SortedMap<String, String>"
|
||||
// - expecting the value string has the format: a JSON string that includes a set of key/value pairs
|
||||
String confKeyName = "properties";
|
||||
String confVal = pulsarConsumerConfMapRaw.get(confKeyName);
|
||||
String expectedVal = "{\"property1\":\"value1\", \"property2\":\"value2\"}, ...";
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
try {
|
||||
Map<String, String> consumerProperties = mapper.readValue(confVal, Map.class);
|
||||
|
||||
// Empty map value is considered as no value
|
||||
if (!consumerProperties.isEmpty()) {
|
||||
consumerConfObjMap.put(confKeyName, consumerProperties);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
|
||||
}
|
||||
}
|
||||
|
||||
// "deadLetterPolicy"
|
||||
// - expecting the value is a JSON string has the format:
|
||||
// {"maxRedeliverCount":"<int_value>","deadLetterTopic":"<topic_name>","initialSubscriptionName":"<sub_name>"}
|
||||
confKeyName = "deadLetterPolicy";
|
||||
confVal = pulsarConsumerConfMapRaw.get(confKeyName);
|
||||
expectedVal = "{" +
|
||||
"\"maxRedeliverCount\":\"<int_value>\"," +
|
||||
"\"deadLetterTopic\":\"<topic_name>\"," +
|
||||
"\"initialSubscriptionName\":\"<sub_name>\"}";
|
||||
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
try {
|
||||
Map<String, String> dlqPolicyMap = mapper.readValue(confVal, Map.class);
|
||||
|
||||
// Empty map value is considered as no value
|
||||
if (!dlqPolicyMap.isEmpty()) {
|
||||
boolean valid = true;
|
||||
|
||||
// The JSON key must be one of "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName"
|
||||
for (String key : dlqPolicyMap.keySet()) {
|
||||
if (!StringUtils.equalsAnyIgnoreCase(key,
|
||||
"maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName")) {
|
||||
valid = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// DLQ.maxRedeliverCount is mandatory
|
||||
if (valid && !dlqPolicyMap.containsKey("maxRedeliverCount")) {
|
||||
valid = false;
|
||||
}
|
||||
|
||||
String maxRedeliverCountStr = dlqPolicyMap.get("maxRedeliverCount");
|
||||
if (!NumberUtils.isCreatable(maxRedeliverCountStr)) {
|
||||
valid = false;
|
||||
}
|
||||
|
||||
if (valid) {
|
||||
DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
|
||||
.maxRedeliverCount(NumberUtils.toInt(maxRedeliverCountStr))
|
||||
.deadLetterTopic(dlqPolicyMap.get("deadLetterTopic"))
|
||||
.initialSubscriptionName(dlqPolicyMap.get("initialSubscriptionName"))
|
||||
.build();
|
||||
|
||||
consumerConfObjMap.put(confKeyName, deadLetterPolicy);
|
||||
} else {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
getInvalidConfValStr(confKeyName, confVal, "consumer", expectedVal));
|
||||
}
|
||||
}
|
||||
|
||||
// "negativeAckRedeliveryBackoff" or "ackTimeoutRedeliveryBackoff"
|
||||
// - expecting the value is a JSON string has the format:
|
||||
// {"minDelayMs":"<int_value>", "maxDelayMs":"<int_value>", "multiplier":"<double_value>"}
|
||||
String[] redeliveryBackoffConfigSet = {"negativeAckRedeliveryBackoff", "ackTimeoutRedeliveryBackoff"};
|
||||
expectedVal = "{" +
|
||||
"\"minDelayMs\":\"<int_value>\"," +
|
||||
"\"maxDelayMs\":\"<int_value>\"," +
|
||||
"\"multiplier\":\"<double_value>\"}";
|
||||
|
||||
for (String confKey : redeliveryBackoffConfigSet) {
|
||||
confVal = pulsarConsumerConfMapRaw.get(confKey);
|
||||
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
try {
|
||||
Map<String, String> redliveryBackoffMap = mapper.readValue(confVal, Map.class);
|
||||
|
||||
// Empty map value is considered as no value
|
||||
if (! redliveryBackoffMap.isEmpty()) {
|
||||
boolean valid = true;
|
||||
|
||||
// The JSON key must be one of "maxRedeliverCount", "deadLetterTopic", "initialSubscriptionName"
|
||||
for (String key : redliveryBackoffMap.keySet()) {
|
||||
if (!StringUtils.equalsAnyIgnoreCase(key,
|
||||
"minDelayMs", "maxDelayMs", "multiplier")) {
|
||||
valid = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
String minDelayMsStr = redliveryBackoffMap.get("minDelayMs");
|
||||
String maxDelayMsStr = redliveryBackoffMap.get("maxDelayMs");
|
||||
String multiplierStr = redliveryBackoffMap.get("multiplier");
|
||||
|
||||
if ((StringUtils.isNotBlank(minDelayMsStr) && !NumberUtils.isCreatable(minDelayMsStr)) ||
|
||||
(StringUtils.isNotBlank(maxDelayMsStr) && !NumberUtils.isCreatable(maxDelayMsStr)) ||
|
||||
(StringUtils.isNotBlank(multiplierStr) && !NumberUtils.isCreatable(multiplierStr))) {
|
||||
valid = false;
|
||||
}
|
||||
|
||||
if (valid) {
|
||||
RedeliveryBackoff redeliveryBackoff = MultiplierRedeliveryBackoff.builder()
|
||||
.minDelayMs(NumberUtils.toLong(minDelayMsStr))
|
||||
.maxDelayMs(NumberUtils.toLong(maxDelayMsStr))
|
||||
.multiplier(NumberUtils.toDouble(multiplierStr))
|
||||
.build();
|
||||
|
||||
consumerConfObjMap.put(confKey, redeliveryBackoff);
|
||||
|
||||
} else {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
getInvalidConfValStr(confKey, confVal, "consumer", expectedVal));
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new PulsarAdapterInvalidParamException(
|
||||
getInvalidConfValStr(confKey, confVal, "consumer", expectedVal));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return consumerConfObjMap;
|
||||
}
|
||||
|
||||
|
||||
// Utility function
|
||||
// - get configuration key names by the value type
|
||||
private static List<String> getConfKeyNameByValueType(Map<String, String> confKeyTypeMap, String tgtValType) {
|
||||
ArrayList<String> confKeyNames = new ArrayList<>();
|
||||
|
||||
for (Map.Entry entry: confKeyTypeMap.entrySet()) {
|
||||
if (StringUtils.equalsIgnoreCase(entry.getValue().toString(), tgtValType)) {
|
||||
confKeyNames.add(entry.getKey().toString());
|
||||
}
|
||||
}
|
||||
|
||||
return confKeyNames;
|
||||
}
|
||||
|
||||
// Conversion from Map<String, String> to Map<String, Object> for configuration items with primitive
|
||||
// value types
|
||||
private static void setConfObjMapForPrimitives(
|
||||
Map<String, Object> tgtConfObjMap,
|
||||
Map<String, String> srcConfMapRaw,
|
||||
Map<String, String> validConfKeyTypeMap)
|
||||
{
|
||||
List<String> confKeyList = new ArrayList<>();
|
||||
|
||||
// All configuration items with "String" as the value type
|
||||
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "String");
|
||||
for (String confKey : confKeyList) {
|
||||
if (srcConfMapRaw.containsKey(confKey)) {
|
||||
String confVal = srcConfMapRaw.get(confKey);
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
tgtConfObjMap.put(confKey, confVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// All configuration items with "long" as the value type
|
||||
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "long");
|
||||
for (String confKey : confKeyList) {
|
||||
if (srcConfMapRaw.containsKey(confKey)) {
|
||||
String confVal = srcConfMapRaw.get(confKey);
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
tgtConfObjMap.put(confKey, Long.valueOf(confVal));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// All configuration items with "int" as the value type
|
||||
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "int");
|
||||
for (String confKey : confKeyList) {
|
||||
if (srcConfMapRaw.containsKey(confKey)) {
|
||||
String confVal = srcConfMapRaw.get(confKey);
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
tgtConfObjMap.put(confKey, Integer.valueOf(confVal));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// All configuration items with "boolean" as the value type
|
||||
confKeyList = getConfKeyNameByValueType(validConfKeyTypeMap, "boolean");
|
||||
for (String confKey : confKeyList) {
|
||||
if (srcConfMapRaw.containsKey(confKey)) {
|
||||
String confVal = srcConfMapRaw.get(confKey);
|
||||
if (StringUtils.isNotBlank(confVal)) {
|
||||
tgtConfObjMap.put(confKey, Boolean.valueOf(confVal));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: So far the above primitive types should be good enough.
|
||||
// Add support for other types when needed
|
||||
}
|
||||
|
||||
private static String getInvalidConfValStr(String confKey, String confVal, String configCategory, String expectedVal) {
|
||||
return "Incorrect value \"" + confVal + "\" for Pulsar " + configCategory +
|
||||
" configuration item of \"" + confKey + "\". Expecting the following value (format): " + expectedVal;
|
||||
}
|
||||
}
|
||||
@@ -5,7 +5,7 @@ bindings:
|
||||
|
||||
params:
|
||||
async_api: "false"
|
||||
admin_delop: "true"
|
||||
admin_delop: "false"
|
||||
|
||||
blocks:
|
||||
admin-namespace-block:
|
||||
@@ -4,7 +4,7 @@ bindings:
|
||||
|
||||
params:
|
||||
async_api: "false"
|
||||
admin_delop: "true"
|
||||
admin_delop: "false"
|
||||
|
||||
blocks:
|
||||
admin-tenant-block:
|
||||
@@ -12,6 +12,6 @@ blocks:
|
||||
phase: admin-tenant
|
||||
ops:
|
||||
op1:
|
||||
AdminTenant: "{tenant}"
|
||||
AdminTopic: "{tenant}"
|
||||
admin_roles: ""
|
||||
allowed_clusters: ""
|
||||
@@ -6,7 +6,7 @@ bindings:
|
||||
|
||||
params:
|
||||
async_api: "false"
|
||||
admin_delop: "true"
|
||||
admin_delop: "false"
|
||||
|
||||
blocks:
|
||||
admin-topic-block:
|
||||
@@ -15,5 +15,5 @@ blocks:
|
||||
ops:
|
||||
op1:
|
||||
AdminTopic: "{tenant}/{namespace}/{topic}"
|
||||
enable_partition: "true"
|
||||
enable_partition: "false"
|
||||
partition_num: "5"
|
||||
@@ -1,4 +1,4 @@
|
||||
### Schema related configurations - schema.xxx
|
||||
9### Schema related configurations - schema.xxx
|
||||
# valid types:
|
||||
# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
|
||||
# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
|
||||
@@ -8,8 +8,10 @@
|
||||
# TODO: as a starting point, only supports the following types
|
||||
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
|
||||
# 2) Avro for messages with schema
|
||||
#schema.key.type=avro
|
||||
#schema.key.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-key-example.avsc
|
||||
#schema.type=avro
|
||||
#schema.definition=file:///Users/yabinmeng/DataStax/MyNoSQLBench/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc
|
||||
#schema.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-value-example.avsc
|
||||
schema.type=
|
||||
schema.definition=
|
||||
|
||||
@@ -29,8 +31,6 @@ producer.producerName=
|
||||
producer.topicName=
|
||||
producer.sendTimeoutMs=
|
||||
producer.blockIfQueueFull=true
|
||||
producer.maxPendingMessages=5000
|
||||
producer.batchingMaxMessages=5000
|
||||
|
||||
|
||||
### Consumer related configurations (global) - consumer.xxx
|
||||
|
||||
9
adapter-pulsar/src/main/resources/iot-key-example.avsc
Normal file
9
adapter-pulsar/src/main/resources/iot-key-example.avsc
Normal file
@@ -0,0 +1,9 @@
|
||||
{
|
||||
"type": "record",
|
||||
"name": "IotSensorKey",
|
||||
"namespace": "TestNS",
|
||||
"fields" : [
|
||||
{"name": "Location", "type": "string"},
|
||||
{"name": "WellID", "type": "string"}
|
||||
]
|
||||
}
|
||||
11
adapter-pulsar/src/main/resources/iot-value-example.avsc
Normal file
11
adapter-pulsar/src/main/resources/iot-value-example.avsc
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"type": "record",
|
||||
"name": "IotSensor",
|
||||
"namespace": "TestNS",
|
||||
"fields" : [
|
||||
{"name": "SensorID", "type": "string"},
|
||||
{"name": "SensorType", "type": "string"},
|
||||
{"name": "ReadingTime", "type": "string"},
|
||||
{"name": "ReadingValue", "type": "float"}
|
||||
]
|
||||
}
|
||||
41
adapter-pulsar/src/main/resources/msg_proc_avro.yaml
Normal file
41
adapter-pulsar/src/main/resources/msg_proc_avro.yaml
Normal file
@@ -0,0 +1,41 @@
|
||||
bindings:
|
||||
# message key and value
|
||||
mykey: NumberNameToString()
|
||||
location: Cities();
|
||||
well_id: ToUUID();ToString();
|
||||
sensor_id: ToUUID();ToString();
|
||||
reading_time: ToDateTime();
|
||||
reading_value: ToFloat(100);
|
||||
|
||||
# document level parameters that apply to all Pulsar client types:
|
||||
params:
|
||||
async_api: "true"
|
||||
|
||||
blocks:
|
||||
msg-produce-block:
|
||||
tags:
|
||||
phase: msg-send
|
||||
ops:
|
||||
op1:
|
||||
MessageProduce: "tnt0/ns0/tp1"
|
||||
msg_key: |
|
||||
{
|
||||
"Location": "{location}",
|
||||
"WellID": "{well_id}"
|
||||
}
|
||||
msg_value: |
|
||||
{
|
||||
"SensorID": "{sensor_id}",
|
||||
"SensorType": "Temperature",
|
||||
"ReadingTime": "{reading_time}",
|
||||
"ReadingValue": {reading_value}
|
||||
}
|
||||
|
||||
msg-consume-block:
|
||||
tags:
|
||||
phase: msg-recv
|
||||
ops:
|
||||
op1:
|
||||
MessageConsume: "tnt0/ns0/tp0"
|
||||
subscription_name: "mynbsub"
|
||||
# subscription_type: "shared"
|
||||
34
adapter-pulsar/src/main/resources/msg_proc_kvraw.yaml
Normal file
34
adapter-pulsar/src/main/resources/msg_proc_kvraw.yaml
Normal file
@@ -0,0 +1,34 @@
|
||||
bindings:
|
||||
# message key, property and value
|
||||
mykey: NumberNameToString()
|
||||
int_prop_val: ToString(); Prefix("IntProp_")
|
||||
text_prop_val: AlphaNumericString(5); Prefix("TextProp_")
|
||||
myvalue: AlphaNumericString(20)
|
||||
|
||||
# document level parameters that apply to all Pulsar client types:
|
||||
params:
|
||||
async_api: "true"
|
||||
|
||||
blocks:
|
||||
msg-produce-block:
|
||||
tags:
|
||||
phase: msg-send
|
||||
ops:
|
||||
op1:
|
||||
MessageProduce: "tnt0/ns0/tp0"
|
||||
msg_key: "{mykey}"
|
||||
msg_prop: |
|
||||
{
|
||||
"prop1": "{int_prop_val}",
|
||||
"prop2": "{text_prop_val}"
|
||||
}
|
||||
msg_value: "{myvalue}"
|
||||
|
||||
msg-consume-block:
|
||||
tags:
|
||||
phase: msg-recv
|
||||
ops:
|
||||
op1:
|
||||
MessageConsume: "tnt0/ns0/tp0"
|
||||
subscription_name: "mynbsub"
|
||||
# subscription_type: "shared"
|
||||
1
adapter-pulsar/src/main/resources/pulsar.md
Normal file
1
adapter-pulsar/src/main/resources/pulsar.md
Normal file
@@ -0,0 +1 @@
|
||||
<< to be added ... >>
|
||||
Reference in New Issue
Block a user