mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
pulsar merge fixes
This commit is contained in:
commit
0880f0cd58
@ -0,0 +1,196 @@
|
||||
package io.nosqlbench.driver.pulsar;
|
||||
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
import org.apache.pulsar.client.api.PulsarClientException;
|
||||
import org.apache.pulsar.client.api.SubscriptionType;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.regex.PatternSyntaxException;
|
||||
|
||||
public class PulsarConsumerSpace extends PulsarSpace {
|
||||
|
||||
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
|
||||
|
||||
public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) {
|
||||
super(name, pulsarClientConf);
|
||||
}
|
||||
|
||||
private String getEffectiveTopicNamesStr(String cycleTopicNames) {
|
||||
if (!StringUtils.isBlank(cycleTopicNames)) {
|
||||
return cycleTopicNames;
|
||||
}
|
||||
|
||||
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
|
||||
if (!StringUtils.isBlank(globalTopicNames)) {
|
||||
return globalTopicNames;
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private List<String> getEffectiveTopicNames(String cycleTopicNames) {
|
||||
String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
|
||||
|
||||
String[] names = effectiveTopicNamesStr.split("[;,]");
|
||||
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
|
||||
|
||||
for (String name : names) {
|
||||
if (!StringUtils.isBlank(name))
|
||||
effectiveTopicNameList.add(name.trim());
|
||||
}
|
||||
|
||||
|
||||
return effectiveTopicNameList;
|
||||
}
|
||||
|
||||
private String getEffectiveTopicPatternStr(String cycleTopicsPattern) {
|
||||
if (!StringUtils.isBlank(cycleTopicsPattern)) {
|
||||
return cycleTopicsPattern;
|
||||
}
|
||||
|
||||
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
|
||||
if (!StringUtils.isBlank(globalTopicsPattern)) {
|
||||
return globalTopicsPattern;
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) {
|
||||
String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
|
||||
Pattern topicsPattern;
|
||||
try {
|
||||
if (!StringUtils.isBlank(effectiveTopicsPatternStr))
|
||||
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
|
||||
else
|
||||
topicsPattern = null;
|
||||
} catch (PatternSyntaxException pse) {
|
||||
topicsPattern = null;
|
||||
}
|
||||
return topicsPattern;
|
||||
}
|
||||
|
||||
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
|
||||
if (!StringUtils.isBlank(cycleSubscriptionName)) {
|
||||
return cycleSubscriptionName;
|
||||
}
|
||||
|
||||
String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName();
|
||||
if (!StringUtils.isBlank(globalSubscriptionName)) {
|
||||
return globalSubscriptionName;
|
||||
}
|
||||
|
||||
return "default-subs";
|
||||
}
|
||||
|
||||
private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) {
|
||||
if (!StringUtils.isBlank(cycleSubscriptionType)) {
|
||||
return cycleSubscriptionType;
|
||||
}
|
||||
|
||||
String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType();
|
||||
if (!StringUtils.isBlank(globalSubscriptionType)) {
|
||||
return globalSubscriptionType;
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
|
||||
String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType);
|
||||
SubscriptionType subscriptionType;
|
||||
|
||||
try {
|
||||
subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
subscriptionType = SubscriptionType.Exclusive;
|
||||
}
|
||||
|
||||
return subscriptionType;
|
||||
}
|
||||
|
||||
private String getEffectiveConsumerName(String cycleConsumerName) {
|
||||
if (!StringUtils.isBlank(cycleConsumerName)) {
|
||||
return cycleConsumerName;
|
||||
}
|
||||
|
||||
String globalConsumerName = pulsarNBClientConf.getConsumerName();
|
||||
if (!StringUtils.isBlank(globalConsumerName)) {
|
||||
return globalConsumerName;
|
||||
}
|
||||
|
||||
return "default-cons";
|
||||
}
|
||||
|
||||
public Consumer<?> getConsumer(String cycleTopicNames,
|
||||
String cycleTopicsPattern,
|
||||
String cycleSubscriptionName,
|
||||
String cycleSubscriptionType,
|
||||
String cycleConsumerName) {
|
||||
|
||||
String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
|
||||
List<String> topicNames = getEffectiveTopicNames(cycleTopicNames);
|
||||
String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
|
||||
Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern);
|
||||
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
|
||||
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
|
||||
String consumerName = getEffectiveConsumerName(cycleConsumerName);
|
||||
|
||||
if (topicNames.isEmpty() && (topicsPattern == null)) {
|
||||
throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!");
|
||||
}
|
||||
|
||||
String encodedStr;
|
||||
if (!topicNames.isEmpty()) {
|
||||
encodedStr = PulsarActivityUtil.encode(
|
||||
consumerName,
|
||||
subscriptionName,
|
||||
StringUtils.join(topicNames, "|"));
|
||||
} else {
|
||||
encodedStr = PulsarActivityUtil.encode(
|
||||
consumerName,
|
||||
subscriptionName,
|
||||
topicsPatternStr);
|
||||
}
|
||||
Consumer<?> consumer = consumers.get(encodedStr);
|
||||
|
||||
if (consumer == null) {
|
||||
PulsarClient pulsarClient = getPulsarClient();
|
||||
|
||||
// Get other possible producer settings that are set at global level
|
||||
Map<String, Object> consumerConf = pulsarNBClientConf.getConsumerConfMap();
|
||||
|
||||
// Explicit topic names will take precedence over topics pattern
|
||||
if (!topicNames.isEmpty()) {
|
||||
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
|
||||
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.toString(), topicNames);
|
||||
} else {
|
||||
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
|
||||
consumerConf.put(
|
||||
PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
|
||||
getEffectiveTopicPattern(cycleTopicsPattern));
|
||||
}
|
||||
|
||||
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName);
|
||||
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType);
|
||||
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName);
|
||||
|
||||
try {
|
||||
consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe();
|
||||
} catch (PulsarClientException ple) {
|
||||
ple.printStackTrace();
|
||||
throw new RuntimeException("Unable to create a Pulsar consumer!");
|
||||
}
|
||||
|
||||
consumers.put(encodedStr, consumer);
|
||||
}
|
||||
|
||||
return consumer;
|
||||
}
|
||||
}
|
@ -0,0 +1,80 @@
|
||||
package io.nosqlbench.driver.pulsar;
|
||||
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
import org.apache.pulsar.client.api.PulsarClientException;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class PulsarProducerSpace extends PulsarSpace {
|
||||
|
||||
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
|
||||
|
||||
public PulsarProducerSpace(String name, PulsarNBClientConf pulsarClientConf) {
|
||||
super(name, pulsarClientConf);
|
||||
}
|
||||
|
||||
// Producer name is NOT mandatory
|
||||
// - It can be set at either global level or cycle level
|
||||
// - If set at both levels, cycle level setting takes precedence
|
||||
private String getEffectiveProducerName(String cycleProducerName) {
|
||||
if (!StringUtils.isBlank(cycleProducerName)) {
|
||||
return cycleProducerName;
|
||||
}
|
||||
|
||||
String globalProducerName = pulsarNBClientConf.getProducerName();
|
||||
if (!StringUtils.isBlank(globalProducerName)) {
|
||||
return globalProducerName;
|
||||
}
|
||||
|
||||
// Default Producer name when it is not set at either cycle or global level
|
||||
return "default-prod";
|
||||
}
|
||||
|
||||
// Topic name IS mandatory
|
||||
// - It must be set at either global level or cycle level
|
||||
// - If set at both levels, cycle level setting takes precedence
|
||||
private String getEffectiveTopicName(String cycleTopicName) {
|
||||
if (!StringUtils.isBlank(cycleTopicName)) {
|
||||
return cycleTopicName;
|
||||
}
|
||||
|
||||
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
|
||||
if (!StringUtils.isBlank(globalTopicName)) {
|
||||
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
|
||||
}
|
||||
|
||||
return globalTopicName;
|
||||
}
|
||||
|
||||
public Producer<?> getProducer(String cycleProducerName, String cycleTopicName) {
|
||||
String producerName = getEffectiveProducerName(cycleProducerName);
|
||||
String topicName = getEffectiveTopicName(cycleTopicName);
|
||||
|
||||
String encodedStr = PulsarActivityUtil.encode(cycleProducerName, cycleTopicName);
|
||||
Producer<?> producer = producers.get(encodedStr);
|
||||
|
||||
if (producer == null) {
|
||||
PulsarClient pulsarClient = getPulsarClient();
|
||||
|
||||
// Get other possible producer settings that are set at global level
|
||||
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
|
||||
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
|
||||
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
|
||||
|
||||
try {
|
||||
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
|
||||
} catch (PulsarClientException ple) {
|
||||
throw new RuntimeException("Unable to create a Pulsar producer!");
|
||||
}
|
||||
|
||||
producers.put(encodedStr, producer);
|
||||
}
|
||||
|
||||
return producer;
|
||||
}
|
||||
}
|
@ -0,0 +1,109 @@
|
||||
package io.nosqlbench.driver.pulsar;
|
||||
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class PulsarReaderSpace extends PulsarSpace {
|
||||
|
||||
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
|
||||
|
||||
public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) {
|
||||
super(name, pulsarClientConf);
|
||||
}
|
||||
|
||||
private String getEffectiveReaderTopicName(String cycleReaderTopicName) {
|
||||
if (!StringUtils.isBlank(cycleReaderTopicName)) {
|
||||
return cycleReaderTopicName;
|
||||
}
|
||||
|
||||
String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName();
|
||||
if (!StringUtils.isBlank(globalReaderTopicName)) {
|
||||
return globalReaderTopicName;
|
||||
}
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private String getEffectiveReaderName(String cycleReaderName) {
|
||||
if (!StringUtils.isBlank(cycleReaderName)) {
|
||||
return cycleReaderName;
|
||||
}
|
||||
|
||||
String globalReaderName = pulsarNBClientConf.getConsumerName();
|
||||
if (!StringUtils.isBlank(globalReaderName)) {
|
||||
return globalReaderName;
|
||||
}
|
||||
|
||||
return "default-read";
|
||||
}
|
||||
|
||||
private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) {
|
||||
if (!StringUtils.isBlank(cycleStartMsgPosStr)) {
|
||||
return cycleStartMsgPosStr;
|
||||
}
|
||||
|
||||
String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr();
|
||||
if (!StringUtils.isBlank(globalStartMsgPosStr)) {
|
||||
return globalStartMsgPosStr;
|
||||
}
|
||||
|
||||
return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label;
|
||||
}
|
||||
|
||||
public Reader<?> getReader(String cycleTopicName,
|
||||
String cycleReaderName,
|
||||
String cycleStartMsgPos) {
|
||||
|
||||
String topicName = getEffectiveReaderTopicName(cycleTopicName);
|
||||
String readerName = getEffectiveReaderName(cycleReaderName);
|
||||
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
|
||||
|
||||
if (StringUtils.isBlank(topicName)) {
|
||||
throw new RuntimeException("Must specify a \"topicName\" for a reader!");
|
||||
}
|
||||
|
||||
String encodedStr = PulsarActivityUtil.encode(cycleTopicName, cycleReaderName, cycleStartMsgPos);
|
||||
Reader<?> reader = readers.get(encodedStr);
|
||||
|
||||
if (reader == null) {
|
||||
PulsarClient pulsarClient = getPulsarClient();
|
||||
|
||||
Map<String, Object> readerConf = pulsarNBClientConf.getReaderConfMap();
|
||||
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName);
|
||||
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName);
|
||||
// "reader.startMessagePos" is NOT a standard Pulsar reader conf
|
||||
readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
|
||||
|
||||
try {
|
||||
ReaderBuilder<?> readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf);
|
||||
|
||||
MessageId startMsgId = MessageId.latest;
|
||||
if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
|
||||
startMsgId = MessageId.earliest;
|
||||
}
|
||||
//TODO: custom start message position is NOT supported yet
|
||||
//else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) {
|
||||
// startMsgId = MessageId.latest;
|
||||
//}
|
||||
|
||||
if (startMsgId != null) {
|
||||
readerBuilder = readerBuilder.startMessageId(startMsgId);
|
||||
}
|
||||
|
||||
reader = readerBuilder.create();
|
||||
} catch (PulsarClientException ple) {
|
||||
ple.printStackTrace();
|
||||
throw new RuntimeException("Unable to create a Pulsar reader!");
|
||||
}
|
||||
|
||||
readers.put(encodedStr, reader);
|
||||
}
|
||||
|
||||
return reader;
|
||||
}
|
||||
}
|
@ -17,17 +17,15 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
public class PulsarSpace {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
|
||||
|
||||
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
|
||||
// TODO: add support for other client types: consumer, reader, websocket-producer, managed-ledger, etc.
|
||||
|
||||
private final String name;
|
||||
private final PulsarNBClientConf pulsarNBClientConf;
|
||||
protected final String name;
|
||||
protected final PulsarNBClientConf pulsarNBClientConf;
|
||||
|
||||
private PulsarClient pulsarClient = null;
|
||||
private Schema<?> pulsarSchema = null;
|
||||
protected PulsarClient pulsarClient = null;
|
||||
protected Schema<?> pulsarSchema = null;
|
||||
|
||||
public PulsarSpace( String name, PulsarNBClientConf pulsarClientConf ) {
|
||||
public PulsarSpace(String name, PulsarNBClientConf pulsarClientConf) {
|
||||
this.name = name;
|
||||
this.pulsarNBClientConf = pulsarClientConf;
|
||||
|
||||
@ -35,29 +33,30 @@ public class PulsarSpace {
|
||||
createPulsarSchemaFromConf();
|
||||
}
|
||||
|
||||
private void createPulsarClientFromConf() {
|
||||
protected void createPulsarClientFromConf() {
|
||||
ClientBuilder clientBuilder = PulsarClient.builder();
|
||||
|
||||
String dftSvcUrl = "pulsar://localhost:6650";
|
||||
if ( !pulsarNBClientConf.hasClientConfKey(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString()) ) {
|
||||
if (!pulsarNBClientConf.hasClientConfKey(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString())) {
|
||||
pulsarNBClientConf.setClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.serviceUrl.toString(), dftSvcUrl);
|
||||
}
|
||||
|
||||
try {
|
||||
Map<String, Object> clientConf = pulsarNBClientConf.getClientConfMap();
|
||||
pulsarClient = clientBuilder.loadConf(clientConf).build();
|
||||
}
|
||||
catch (PulsarClientException pce) {
|
||||
} catch (PulsarClientException pce) {
|
||||
logger.error("Fail to create PulsarClient from global configuration!");
|
||||
throw new RuntimeException("Fail to create PulsarClient from global configuration!");
|
||||
}
|
||||
}
|
||||
|
||||
private void createPulsarSchemaFromConf() {
|
||||
String schemaType = pulsarNBClientConf.getSchemaConfValue("schema.type").toString();
|
||||
protected void createPulsarSchemaFromConf() {
|
||||
Object value = pulsarNBClientConf.getSchemaConfValue("schema.type");
|
||||
String schemaType = (value != null) ? value.toString() : "";
|
||||
|
||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) {
|
||||
String schemaDefStr = pulsarNBClientConf.getSchemaConfValue("schema.definition").toString();
|
||||
value = pulsarNBClientConf.getSchemaConfValue("schema.definition");
|
||||
String schemaDefStr = (value != null) ? value.toString() : "";
|
||||
pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr);
|
||||
} else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) {
|
||||
pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType));
|
||||
@ -68,77 +67,8 @@ public class PulsarSpace {
|
||||
}
|
||||
|
||||
public PulsarClient getPulsarClient() { return pulsarClient; }
|
||||
|
||||
public PulsarNBClientConf getPulsarClientConf() {
|
||||
return pulsarNBClientConf;
|
||||
}
|
||||
|
||||
public Schema<?> getPulsarSchema() { return pulsarSchema; }
|
||||
|
||||
// Producer name is NOT mandatory
|
||||
// - It can be set at either global level or cycle level
|
||||
// - If set at both levels, cycle level setting takes precedence
|
||||
private String getEffectiveProducerName(String cycleProducerName) {
|
||||
if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) {
|
||||
return cycleProducerName;
|
||||
}
|
||||
|
||||
String globalProducerName = pulsarNBClientConf.getProducerName();
|
||||
if ((globalProducerName != null) && (!globalProducerName.isEmpty())) {
|
||||
return globalProducerName;
|
||||
}
|
||||
|
||||
// Default Producer name when it is not set at either cycle or global level
|
||||
return "default";
|
||||
}
|
||||
|
||||
// Topic name is mandatory
|
||||
// - It must be set at either global level or cycle level
|
||||
// - If set at both levels, cycle level setting takes precedence
|
||||
private String getEffectiveTopicName(String cycleTopicName) {
|
||||
if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) {
|
||||
return cycleTopicName;
|
||||
}
|
||||
|
||||
String globalTopicName = pulsarNBClientConf.getTopicName();
|
||||
if ( (globalTopicName == null) || (globalTopicName.isEmpty()) ) {
|
||||
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
|
||||
}
|
||||
|
||||
return globalTopicName;
|
||||
}
|
||||
|
||||
private Producer createPulsarProducer(String cycleTopicName, String cycleProducerName) {
|
||||
PulsarClient pulsarClient = getPulsarClient();
|
||||
Producer producer = null;
|
||||
|
||||
String producerName = getEffectiveProducerName(cycleProducerName);
|
||||
String topicName = getEffectiveTopicName(cycleTopicName);
|
||||
|
||||
// Get other possible producer settings that are set at global level
|
||||
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
|
||||
producerConf.put("topicName", topicName);
|
||||
producerConf.put("producerName", producerName);
|
||||
|
||||
try {
|
||||
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
|
||||
}
|
||||
catch (PulsarClientException ple) {
|
||||
throw new RuntimeException("Unable to create a client to connect to the Pulsar cluster!");
|
||||
}
|
||||
|
||||
return producer;
|
||||
}
|
||||
|
||||
public Producer<?> getProducer(String cycleProducerName, String cycleTopicName) {
|
||||
String producerName = getEffectiveProducerName(cycleProducerName);
|
||||
Producer producer = producers.get(producerName);
|
||||
|
||||
if (producer == null) {
|
||||
producer = createPulsarProducer(cycleTopicName, cycleProducerName);
|
||||
producers.put(producerName, producer);
|
||||
}
|
||||
|
||||
return producer;
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,7 @@
|
||||
package io.nosqlbench.driver.pulsar;
|
||||
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@ -21,8 +23,20 @@ public class PulsarSpaceCache {
|
||||
}
|
||||
|
||||
public PulsarSpace getPulsarSpace(String name) {
|
||||
PulsarSpace cspace = clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, activity.getPulsarConf()));
|
||||
return cspace;
|
||||
|
||||
String pulsarClientType = activity.getPulsarConf().getPulsarClientType();
|
||||
|
||||
if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString())) {
|
||||
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarProducerSpace(spaceName, activity.getPulsarConf()));
|
||||
} else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString())) {
|
||||
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarConsumerSpace(spaceName, activity.getPulsarConf()));
|
||||
} else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString())) {
|
||||
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarReaderSpace(spaceName, activity.getPulsarConf()));
|
||||
}
|
||||
// TODO: add support for websocket-producer and managed-ledger
|
||||
else {
|
||||
throw new RuntimeException("Unsupported Pulsar client type: " + pulsarClientType);
|
||||
}
|
||||
}
|
||||
|
||||
public PulsarActivity getActivity() {
|
||||
|
@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
@ -16,21 +17,21 @@ import java.util.function.LongFunction;
|
||||
* For additional parameterization, the command template is also provided.
|
||||
*/
|
||||
public class PulsarConsumerMapper implements LongFunction<PulsarOp> {
|
||||
private final LongFunction<Consumer<?>> consumerFunc;
|
||||
private final LongFunction<String> recvInstructions;
|
||||
private final CommandTemplate cmdTpl;
|
||||
private final Schema<?> pulsarSchema;
|
||||
private final LongFunction<Consumer<?>> consumerFunc;
|
||||
|
||||
public PulsarConsumerMapper(LongFunction<Consumer<?>> consumerFunc,
|
||||
LongFunction<String> recvMsg,
|
||||
CommandTemplate cmdTpl) {
|
||||
this.consumerFunc = consumerFunc;
|
||||
this.recvInstructions = recvMsg;
|
||||
public PulsarConsumerMapper(CommandTemplate cmdTpl,
|
||||
Schema<?> pulsarSchema,
|
||||
LongFunction<Consumer<?>> consumerFunc) {
|
||||
this.cmdTpl = cmdTpl;
|
||||
// TODO add schema support
|
||||
this.pulsarSchema = pulsarSchema;
|
||||
this.consumerFunc = consumerFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PulsarOp apply(long value) {
|
||||
return new PulsarConsumerOp((Consumer<byte[]>) consumerFunc.apply(value), recvInstructions.apply(value));
|
||||
Consumer<?> consumer = consumerFunc.apply(value);
|
||||
return new PulsarConsumerOp(consumer, pulsarSchema);
|
||||
}
|
||||
}
|
||||
|
@ -1,31 +1,41 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import org.apache.pulsar.client.api.Consumer;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
import org.apache.pulsar.client.api.PulsarClientException;
|
||||
import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import org.apache.pulsar.client.api.*;
|
||||
import org.apache.pulsar.client.api.schema.GenericRecord;
|
||||
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class PulsarConsumerOp implements PulsarOp {
|
||||
private final Consumer<byte[]> consumer;
|
||||
private final String recvInstructions;
|
||||
private final Consumer<?> consumer;
|
||||
private final Schema<?> pulsarSchema;
|
||||
|
||||
public PulsarConsumerOp(Consumer<byte[]> consumer, String recvInstructions) {
|
||||
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema) {
|
||||
this.consumer = consumer;
|
||||
this.recvInstructions = recvInstructions;
|
||||
this.pulsarSchema = schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
Message<byte[]> msgbytes = consumer.receive();
|
||||
// TODO: Parameterize the actions taken on a received message
|
||||
// TODO: Properly parameterize all pulsar Op types as with Producer<T> and Consumer<T>
|
||||
String received = new String(msgbytes.getValue(), StandardCharsets.UTF_8);
|
||||
System.out.print("received:" + received);
|
||||
if (!received.endsWith("\n")) {
|
||||
System.out.println("\n");
|
||||
Message<?> message = consumer.receive();
|
||||
|
||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
||||
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
|
||||
|
||||
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString());
|
||||
} else {
|
||||
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData()));
|
||||
}
|
||||
|
||||
consumer.acknowledge(message.getMessageId());
|
||||
|
||||
} catch (PulsarClientException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
@ -18,23 +18,22 @@ import java.util.function.LongFunction;
|
||||
* For additional parameterization, the command template is also provided.
|
||||
*/
|
||||
public class PulsarProducerMapper implements LongFunction<PulsarOp> {
|
||||
private final CommandTemplate cmdTpl;
|
||||
private final Schema<?> pulsarSchema;
|
||||
private final LongFunction<Producer<?>> producerFunc;
|
||||
private final LongFunction<String> keyFunc;
|
||||
private final LongFunction<String> payloadFunc;
|
||||
private final Schema pulsarSchema;
|
||||
private final CommandTemplate cmdTpl;
|
||||
|
||||
public PulsarProducerMapper(
|
||||
LongFunction<Producer<?>> producerFunc,
|
||||
LongFunction<String> keyFunc,
|
||||
LongFunction<String> payloadFunc,
|
||||
Schema pulsarSchema,
|
||||
CommandTemplate cmdTpl) {
|
||||
public PulsarProducerMapper(CommandTemplate cmdTpl,
|
||||
Schema<?> pulsarSchema,
|
||||
LongFunction<Producer<?>> producerFunc,
|
||||
LongFunction<String> keyFunc,
|
||||
LongFunction<String> payloadFunc) {
|
||||
this.cmdTpl = cmdTpl;
|
||||
this.pulsarSchema = pulsarSchema;
|
||||
this.producerFunc = producerFunc;
|
||||
this.keyFunc = keyFunc;
|
||||
this.payloadFunc = payloadFunc;
|
||||
this.pulsarSchema = pulsarSchema;
|
||||
this.cmdTpl = cmdTpl;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -0,0 +1,27 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Reader;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class PulsarReaderMapper implements LongFunction<PulsarOp> {
|
||||
private final CommandTemplate cmdTpl;
|
||||
private final Schema<?> pulsarSchema;
|
||||
private final LongFunction<Reader<?>> readerFunc;
|
||||
|
||||
public PulsarReaderMapper(CommandTemplate cmdTpl,
|
||||
Schema<?> pulsarSchema,
|
||||
LongFunction<Reader<?>> readerFunc) {
|
||||
this.cmdTpl = cmdTpl;
|
||||
this.pulsarSchema = pulsarSchema;
|
||||
this.readerFunc = readerFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PulsarOp apply(long value) {
|
||||
Reader<?> reader = readerFunc.apply(value);
|
||||
return new PulsarReaderOp(reader, pulsarSchema);
|
||||
}
|
||||
}
|
@ -0,0 +1,43 @@
|
||||
package io.nosqlbench.driver.pulsar.ops;
|
||||
|
||||
import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
import org.apache.pulsar.client.api.PulsarClientException;
|
||||
import org.apache.pulsar.client.api.Reader;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
import org.apache.pulsar.common.schema.SchemaType;
|
||||
|
||||
public class PulsarReaderOp implements PulsarOp {
|
||||
private final Reader<?> reader;
|
||||
private final Schema<?> pulsarSchema;
|
||||
|
||||
public PulsarReaderOp(Reader<?> reader, Schema<?> schema) {
|
||||
this.reader = reader;
|
||||
this.pulsarSchema = schema;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
||||
|
||||
// TODO: how many messages to read per NB cycle?
|
||||
Message<?> message;
|
||||
while (reader.hasMessageAvailable()) {
|
||||
message = reader.readNext();
|
||||
|
||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
||||
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
|
||||
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString());
|
||||
} else {
|
||||
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData()));
|
||||
}
|
||||
}
|
||||
} catch (PulsarClientException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.util;
|
||||
|
||||
import org.apache.avro.io.DecoderFactory;
|
||||
import org.apache.avro.io.JsonDecoder;
|
||||
import org.apache.avro.io.BinaryDecoder;
|
||||
import org.apache.pulsar.client.api.schema.Field;
|
||||
import org.apache.pulsar.client.api.schema.GenericRecord;
|
||||
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
|
||||
@ -17,6 +18,7 @@ public class AvroUtil {
|
||||
public static org.apache.avro.Schema GetSchema_ApacheAvro(String avroSchemDef) {
|
||||
return new org.apache.avro.Schema.Parser().parse(avroSchemDef);
|
||||
}
|
||||
|
||||
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, String jsonData) {
|
||||
org.apache.avro.generic.GenericRecord record = null;
|
||||
|
||||
@ -25,11 +27,30 @@ public class AvroUtil {
|
||||
|
||||
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
|
||||
reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
|
||||
|
||||
JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, jsonData);
|
||||
|
||||
record = reader.read(null, decoder);
|
||||
} catch (IOException ioe) {
|
||||
ioe.printStackTrace();
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
|
||||
return record;
|
||||
}
|
||||
|
||||
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, byte[] bytesData) {
|
||||
org.apache.avro.generic.GenericRecord record = null;
|
||||
|
||||
try {
|
||||
org.apache.avro.Schema schema = GetSchema_ApacheAvro(avroSchemDef);
|
||||
|
||||
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
|
||||
reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
|
||||
|
||||
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytesData, null);
|
||||
|
||||
record = reader.read(null, decoder);
|
||||
} catch (IOException ioe) {
|
||||
ioe.printStackTrace();
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package io.nosqlbench.driver.pulsar.util;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
@ -11,10 +12,10 @@ import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.InvalidPathException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.HashMap;
|
||||
|
||||
public class PulsarActivityUtil {
|
||||
@ -31,7 +32,8 @@ public class PulsarActivityUtil {
|
||||
;
|
||||
|
||||
public final String label;
|
||||
private CLIENT_TYPES(String label) {
|
||||
|
||||
CLIENT_TYPES(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
@ -48,7 +50,8 @@ public class PulsarActivityUtil {
|
||||
;
|
||||
|
||||
public final String label;
|
||||
private PERSISTENT_TYPES(String label) {
|
||||
|
||||
PERSISTENT_TYPES(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
@ -59,6 +62,7 @@ public class PulsarActivityUtil {
|
||||
|
||||
///////
|
||||
// Valid Pulsar client configuration (activity-level settings)
|
||||
// - https://pulsar.apache.org/docs/en/client-libraries-java/#client
|
||||
public enum CLNT_CONF_KEY {
|
||||
serviceUrl("serviceUrl"),
|
||||
authPulginClassName("authPluginClassName"),
|
||||
@ -83,7 +87,8 @@ public class PulsarActivityUtil {
|
||||
;
|
||||
|
||||
public final String label;
|
||||
private CLNT_CONF_KEY(String label) {
|
||||
|
||||
CLNT_CONF_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
@ -92,13 +97,11 @@ public class PulsarActivityUtil {
|
||||
}
|
||||
|
||||
///////
|
||||
// Valid producer configuration (activity-level settings)
|
||||
public enum PRODUCER_CONF_KEY {
|
||||
// NOTE:
|
||||
// For "topicName" and "producerName", they're ignore at activity-level.
|
||||
// Instead, op-level settings are respected
|
||||
// topicName("topicName"),
|
||||
// producerName("producerName"),
|
||||
// Standard producer configuration (activity-level settings)
|
||||
// - https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
|
||||
public enum PRODUCER_CONF_STD_KEY {
|
||||
topicName("topicName"),
|
||||
producerName("producerName"),
|
||||
sendTimeoutMs("sendTimeoutMs"),
|
||||
blockIfQueueFull("blockIfQueueFull"),
|
||||
maxPendingMessages("maxPendingMessages"),
|
||||
@ -109,38 +112,103 @@ public class PulsarActivityUtil {
|
||||
batchingMaxPublishDelayMicros("batchingMaxPublishDelayMicros"),
|
||||
batchingMaxMessages("batchingMaxMessages"),
|
||||
batchingEnabled("batchingEnabled"),
|
||||
compressionType("compressionType")
|
||||
;
|
||||
compressionType("compressionType");
|
||||
|
||||
public final String label;
|
||||
private PRODUCER_CONF_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidProducerConfItem(String item) {
|
||||
return Arrays.stream(PRODUCER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
|
||||
}
|
||||
|
||||
///////
|
||||
// Valid consumer configuration (activity-level settings)
|
||||
// TODO: to be added
|
||||
public enum CONSUMER_CONF_KEY {
|
||||
;
|
||||
|
||||
public final String label;
|
||||
private CONSUMER_CONF_KEY(String label) {
|
||||
PRODUCER_CONF_STD_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isStandardProducerConfItem(String item) {
|
||||
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
|
||||
}
|
||||
|
||||
///////
|
||||
// Valid reader configuration (activity-level settings)
|
||||
// TODO: to be added
|
||||
public enum READER_CONF_KEY {
|
||||
;
|
||||
// Standard consumer configuration (activity-level settings)
|
||||
// - https://pulsar.apache.org/docs/en/client-libraries-java/#consumer
|
||||
public enum CONSUMER_CONF_STD_KEY {
|
||||
topicNames("topicNames"),
|
||||
topicsPattern("topicsPattern"),
|
||||
subscriptionName("subscriptionName"),
|
||||
subscriptionType("subscriptionType"),
|
||||
receiverQueueSize("receiverQueueSize"),
|
||||
acknowledgementsGroupTimeMicros("acknowledgementsGroupTimeMicros"),
|
||||
negativeAckRedeliveryDelayMicros("negativeAckRedeliveryDelayMicros"),
|
||||
maxTotalReceiverQueueSizeAcrossPartitions("maxTotalReceiverQueueSizeAcrossPartitions"),
|
||||
consumerName("consumerName"),
|
||||
ackTimeoutMillis("ackTimeoutMillis"),
|
||||
tickDurationMillis("tickDurationMillis"),
|
||||
priorityLevel("priorityLevel"),
|
||||
cryptoFailureAction("cryptoFailureAction"),
|
||||
properties("properties"),
|
||||
readCompacted("readCompacted"),
|
||||
subscriptionInitialPosition("subscriptionInitialPosition"),
|
||||
patternAutoDiscoveryPeriod("patternAutoDiscoveryPeriod"),
|
||||
regexSubscriptionMode("regexSubscriptionMode"),
|
||||
deadLetterPolicy("deadLetterPolicy"),
|
||||
autoUpdatePartitions("autoUpdatePartitions"),
|
||||
replicateSubscriptionState("replicateSubscriptionState");
|
||||
|
||||
public final String label;
|
||||
private READER_CONF_KEY(String label) {
|
||||
|
||||
CONSUMER_CONF_STD_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isStandardConsumerConfItem(String item) {
|
||||
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
|
||||
}
|
||||
|
||||
///////
|
||||
// Standard reader configuration (activity-level settings)
|
||||
// - https://pulsar.apache.org/docs/en/client-libraries-java/#reader
|
||||
public enum READER_CONF_STD_KEY {
|
||||
topicName("topicName"),
|
||||
receiverQueueSize("receiverQueueSize"),
|
||||
readerListener("readerListener"),
|
||||
readerName("readerName"),
|
||||
subscriptionRolePrefix("subscriptionRolePrefix"),
|
||||
cryptoKeyReader("cryptoKeyReader"),
|
||||
cryptoFailureAction("cryptoFailureAction"),
|
||||
readCompacted("readCompacted"),
|
||||
resetIncludeHead("resetIncludeHead");
|
||||
|
||||
public final String label;
|
||||
|
||||
READER_CONF_STD_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isStandardReaderConfItem(String item) {
|
||||
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
|
||||
}
|
||||
|
||||
public enum READER_CONF_CUSTOM_KEY {
|
||||
startMessagePos("startMessagePos");
|
||||
|
||||
public final String label;
|
||||
|
||||
READER_CONF_CUSTOM_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isCustomReaderConfItem(String item) {
|
||||
return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
|
||||
}
|
||||
|
||||
public enum READER_MSG_POSITION_TYPE {
|
||||
earliest("earliest"),
|
||||
latest("latest"),
|
||||
custom("custom");
|
||||
|
||||
public final String label;
|
||||
|
||||
READER_MSG_POSITION_TYPE(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
@ -152,7 +220,8 @@ public class PulsarActivityUtil {
|
||||
;
|
||||
|
||||
public final String label;
|
||||
private WEBSKT_PRODUCER_CONF_KEY(String label) {
|
||||
|
||||
WEBSKT_PRODUCER_CONF_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
@ -164,7 +233,8 @@ public class PulsarActivityUtil {
|
||||
;
|
||||
|
||||
public final String label;
|
||||
private MANAGED_LEDGER_CONF_KEY(String label) {
|
||||
|
||||
MANAGED_LEDGER_CONF_KEY(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
@ -175,25 +245,26 @@ public class PulsarActivityUtil {
|
||||
boolean isPrimitive = false;
|
||||
|
||||
// Use "BYTES" as the default type if the type string is not explicitly specified
|
||||
if ((typeStr == null) || typeStr.isEmpty()) {
|
||||
if (StringUtils.isBlank(typeStr)) {
|
||||
typeStr = "BYTES";
|
||||
}
|
||||
|
||||
if ( typeStr.toUpperCase().equals("BOOLEAN") || typeStr.toUpperCase().equals("INT8") ||
|
||||
typeStr.toUpperCase().equals("INT16") || typeStr.toUpperCase().equals("INT32") ||
|
||||
typeStr.toUpperCase().equals("INT64") || typeStr.toUpperCase().equals("FLOAT") ||
|
||||
typeStr.toUpperCase().equals("DOUBLE") || typeStr.toUpperCase().equals("BYTES") ||
|
||||
typeStr.toUpperCase().equals("DATE") || typeStr.toUpperCase().equals("TIME") ||
|
||||
typeStr.toUpperCase().equals("TIMESTAMP") || typeStr.toUpperCase().equals("INSTANT") ||
|
||||
typeStr.toUpperCase().equals("LOCAL_DATE") || typeStr.toUpperCase().equals("LOCAL_TIME") ||
|
||||
typeStr.toUpperCase().equals("LOCAL_DATE_TIME") ) {
|
||||
if (typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") ||
|
||||
typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") ||
|
||||
typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") ||
|
||||
typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") ||
|
||||
typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") ||
|
||||
typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") ||
|
||||
typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") ||
|
||||
typeStr.equalsIgnoreCase("LOCAL_DATE_TIME")) {
|
||||
isPrimitive = true;
|
||||
}
|
||||
|
||||
return isPrimitive;
|
||||
}
|
||||
public static Schema getPrimitiveTypeSchema(String typeStr) {
|
||||
Schema schema = null;
|
||||
|
||||
public static Schema<?> getPrimitiveTypeSchema(String typeStr) {
|
||||
Schema<?> schema;
|
||||
|
||||
switch (typeStr.toUpperCase()) {
|
||||
case "BOOLEAN":
|
||||
@ -240,7 +311,7 @@ public class PulsarActivityUtil {
|
||||
break;
|
||||
// Use BYTES as the default schema type if the type string is not specified
|
||||
case "":
|
||||
case "BTYES":
|
||||
case "BYTES":
|
||||
schema = Schema.BYTES;
|
||||
break;
|
||||
// Report an error if non-valid, non-empty schema type string is provided
|
||||
@ -255,19 +326,20 @@ public class PulsarActivityUtil {
|
||||
// Complex strut type: Avro or Json
|
||||
public static boolean isAvroSchemaTypeStr(String typeStr) {
|
||||
boolean isAvroType = false;
|
||||
if ( typeStr.toUpperCase().equals("AVRO") ) {
|
||||
if (typeStr.equalsIgnoreCase("AVRO")) {
|
||||
isAvroType = true;
|
||||
}
|
||||
return isAvroType;
|
||||
}
|
||||
public static Schema getAvroSchema(String typeStr, String definitionStr) {
|
||||
|
||||
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
|
||||
String schemaDefinitionStr = definitionStr;
|
||||
String filePrefix = "file://";
|
||||
Schema schema = null;
|
||||
Schema<?> schema;
|
||||
|
||||
// Check if payloadStr points to a file (e.g. "file:///path/to/a/file")
|
||||
if (isAvroSchemaTypeStr(typeStr)) {
|
||||
if ( (schemaDefinitionStr == null) || schemaDefinitionStr.isEmpty()) {
|
||||
if (StringUtils.isBlank(schemaDefinitionStr)) {
|
||||
throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!");
|
||||
} else if (schemaDefinitionStr.startsWith(filePrefix)) {
|
||||
try {
|
||||
@ -278,8 +350,6 @@ public class PulsarActivityUtil {
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println(schemaDefinitionStr);
|
||||
|
||||
SchemaInfo schemaInfo = SchemaInfo.builder()
|
||||
.schema(schemaDefinitionStr.getBytes(StandardCharsets.UTF_8))
|
||||
.type(SchemaType.AVRO)
|
||||
@ -289,12 +359,24 @@ public class PulsarActivityUtil {
|
||||
.build();
|
||||
|
||||
schema = new GenericAvroSchema(schemaInfo);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
|
||||
}
|
||||
|
||||
return schema;
|
||||
}
|
||||
|
||||
public static String encode(String... strings) {
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
for (String str : strings) {
|
||||
if (!StringUtils.isBlank(str))
|
||||
stringBuilder.append(str).append("::");
|
||||
}
|
||||
|
||||
String concatenatedStr =
|
||||
StringUtils.substringAfterLast(stringBuilder.toString(), "::");
|
||||
|
||||
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
||||
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
|
||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -22,15 +23,19 @@ public class PulsarNBClientConf {
|
||||
|
||||
private String canonicalFilePath = "";
|
||||
|
||||
private static final String DRIVER_CONF_PREFIX = "driver";
|
||||
private static final String SCHEMA_CONF_PREFIX = "schema";
|
||||
private static final String CLIENT_CONF_PREFIX = "client";
|
||||
private static final String PRODUCER_CONF_PREFIX = "producer";
|
||||
private HashMap<String, Object> driverConfMap = new HashMap<>();
|
||||
private HashMap<String, Object> schemaConfMap = new HashMap<>();
|
||||
private HashMap<String, Object> clientConfMap = new HashMap<>();
|
||||
private HashMap<String, Object> producerConfMap = new HashMap<>();
|
||||
// TODO: add support for other operation types: consumer, reader, websocket-producer, managed-ledger
|
||||
public static final String DRIVER_CONF_PREFIX = "driver";
|
||||
public static final String SCHEMA_CONF_PREFIX = "schema";
|
||||
public static final String CLIENT_CONF_PREFIX = "client";
|
||||
public static final String PRODUCER_CONF_PREFIX = "producer";
|
||||
public static final String CONSUMER_CONF_PREFIX = "consumer";
|
||||
public static final String READER_CONF_PREFIX = "reader";
|
||||
private final HashMap<String, Object> driverConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> schemaConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> clientConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> producerConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> consumerConfMap = new HashMap<>();
|
||||
private final HashMap<String, Object> readerConfMap = new HashMap<>();
|
||||
// TODO: add support for other operation types: websocket-producer, managed-ledger
|
||||
|
||||
public PulsarNBClientConf(String fileName) {
|
||||
File file = new File(fileName);
|
||||
@ -51,67 +56,107 @@ public class PulsarNBClientConf {
|
||||
// Get driver specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey));
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get schema specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length()+1), config.getProperty(confKey));
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get client connection specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length()+1), config.getProperty(confKey));
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get producer specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length()+1), config.getProperty(confKey));
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
|
||||
// Get consumer specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get reader specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(READER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
readerConfMap.put(confKey.substring(READER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
logger.error("Can't read the specified config properties file!");
|
||||
ioe.printStackTrace();
|
||||
}
|
||||
catch (ConfigurationException cex) {
|
||||
} catch (ConfigurationException cex) {
|
||||
logger.error("Error loading configuration items from the specified config properties file!");
|
||||
cex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
//////////////////
|
||||
// Get NB Driver related config
|
||||
public Map<String, Object> getDriverConfMap() {
|
||||
return this.driverConfMap;
|
||||
}
|
||||
|
||||
public boolean hasDriverConfKey(String key) {
|
||||
if (key.contains(DRIVER_CONF_PREFIX))
|
||||
return driverConfMap.containsKey(key.substring(DRIVER_CONF_PREFIX.length()+1));
|
||||
return driverConfMap.containsKey(key.substring(DRIVER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return driverConfMap.containsKey(key);
|
||||
}
|
||||
public Object getDriverConfValue(String key) {
|
||||
if (key.contains(DRIVER_CONF_PREFIX))
|
||||
return driverConfMap.get(key.substring(DRIVER_CONF_PREFIX.length()+1));
|
||||
return driverConfMap.get(key.substring(DRIVER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return driverConfMap.get(key);
|
||||
}
|
||||
|
||||
public void setDriverConfValue(String key, Object value) {
|
||||
if (key.contains(DRIVER_CONF_PREFIX))
|
||||
driverConfMap.put(key.substring(DRIVER_CONF_PREFIX.length()+1), value);
|
||||
driverConfMap.put(key.substring(DRIVER_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
driverConfMap.put(key, value);
|
||||
}
|
||||
|
||||
// other driver helper functions ...
|
||||
public String getPulsarClientType() {
|
||||
Object confValue = getDriverConfValue("driver.client-type");
|
||||
|
||||
// If not explicitly specifying Pulsar client type, "producer" is the default type
|
||||
if (confValue == null)
|
||||
return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString();
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
|
||||
//////////////////
|
||||
// Get Schema related config
|
||||
public Map<String, Object> getSchemaConfMap() {
|
||||
return this.schemaConfMap;
|
||||
}
|
||||
|
||||
public boolean hasSchemaConfKey(String key) {
|
||||
if (key.contains(SCHEMA_CONF_PREFIX))
|
||||
return schemaConfMap.containsKey(key.substring(SCHEMA_CONF_PREFIX.length()+1));
|
||||
return schemaConfMap.containsKey(key.substring(SCHEMA_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return schemaConfMap.containsKey(key);
|
||||
}
|
||||
@ -121,20 +166,24 @@ public class PulsarNBClientConf {
|
||||
else
|
||||
return schemaConfMap.get(key);
|
||||
}
|
||||
|
||||
public void setSchemaConfValue(String key, Object value) {
|
||||
if (key.contains(SCHEMA_CONF_PREFIX))
|
||||
schemaConfMap.put(key.substring(SCHEMA_CONF_PREFIX.length()+1), value);
|
||||
schemaConfMap.put(key.substring(SCHEMA_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
schemaConfMap.put(key, value);
|
||||
}
|
||||
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar client related config
|
||||
public Map<String, Object> getClientConfMap() {
|
||||
return this.clientConfMap;
|
||||
}
|
||||
|
||||
public boolean hasClientConfKey(String key) {
|
||||
if (key.contains(CLIENT_CONF_PREFIX))
|
||||
return clientConfMap.containsKey(key.substring(CLIENT_CONF_PREFIX.length()+1));
|
||||
return clientConfMap.containsKey(key.substring(CLIENT_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return clientConfMap.containsKey(key);
|
||||
}
|
||||
@ -144,20 +193,24 @@ public class PulsarNBClientConf {
|
||||
else
|
||||
return clientConfMap.get(key);
|
||||
}
|
||||
|
||||
public void setClientConfValue(String key, Object value) {
|
||||
if (key.contains(CLIENT_CONF_PREFIX))
|
||||
clientConfMap.put(key.substring(CLIENT_CONF_PREFIX.length()+1), value);
|
||||
clientConfMap.put(key.substring(CLIENT_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
clientConfMap.put(key, value);
|
||||
}
|
||||
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar producer related config
|
||||
public Map<String, Object> getProducerConfMap() {
|
||||
return this.producerConfMap;
|
||||
}
|
||||
|
||||
public boolean hasProducerConfKey(String key) {
|
||||
if (key.contains(PRODUCER_CONF_PREFIX))
|
||||
return producerConfMap.containsKey(key.substring(PRODUCER_CONF_PREFIX.length()+1));
|
||||
return producerConfMap.containsKey(key.substring(PRODUCER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return producerConfMap.containsKey(key);
|
||||
}
|
||||
@ -174,30 +227,139 @@ public class PulsarNBClientConf {
|
||||
producerConfMap.put(key, value);
|
||||
}
|
||||
|
||||
public String getPulsarClientType() {
|
||||
Object confValue = getDriverConfValue("driver.client-type");
|
||||
|
||||
// If not explicitly specifying Pulsar client type, "producer" is the default type
|
||||
if (confValue == null)
|
||||
return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString();
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
// other producer helper functions ...
|
||||
public String getProducerName() {
|
||||
Object confValue = getProducerConfValue("producer.producerName");
|
||||
|
||||
// If not explicitly specifying Pulsar client type, "producer" is the default type
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
public String getTopicName() {
|
||||
public String getProducerTopicName() {
|
||||
Object confValue = getProducerConfValue("producer.topicName");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
// If not explicitly specifying Pulsar client type, "producer" is the default type
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar consumer related config
|
||||
public Map<String, Object> getConsumerConfMap() {
|
||||
return this.consumerConfMap;
|
||||
}
|
||||
|
||||
public boolean hasConsumerConfKey(String key) {
|
||||
if (key.contains(CONSUMER_CONF_PREFIX))
|
||||
return consumerConfMap.containsKey(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return consumerConfMap.containsKey(key);
|
||||
}
|
||||
|
||||
public Object getConsumerConfValue(String key) {
|
||||
if (key.contains(CONSUMER_CONF_PREFIX))
|
||||
return consumerConfMap.get(key.substring(CONSUMER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return consumerConfMap.get(key);
|
||||
}
|
||||
|
||||
public void setConsumerConfValue(String key, Object value) {
|
||||
if (key.contains(CONSUMER_CONF_PREFIX))
|
||||
consumerConfMap.put(key.substring(CONSUMER_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
consumerConfMap.put(key, value);
|
||||
}
|
||||
|
||||
// Other consumer helper functions ...
|
||||
public String getConsumerTopicNames() {
|
||||
Object confValue = getConsumerConfValue("consumer.topicNames");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
public String getConsumerTopicPattern() {
|
||||
Object confValue = getConsumerConfValue("consumer.topicsPattern");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
public String getConsumerSubscriptionName() {
|
||||
Object confValue = getConsumerConfValue("consumer.subscriptionName");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
public String getConsumerSubscriptionType() {
|
||||
Object confValue = getConsumerConfValue("consumer.subscriptionType");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
public String getConsumerName() {
|
||||
Object confValue = getConsumerConfValue("consumer.consumerName");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
|
||||
//////////////////
|
||||
// Get Pulsar reader related config
|
||||
public Map<String, Object> getReaderConfMap() {
|
||||
return this.readerConfMap;
|
||||
}
|
||||
|
||||
public boolean hasReaderConfKey(String key) {
|
||||
if (key.contains(READER_CONF_PREFIX))
|
||||
return readerConfMap.containsKey(key.substring(READER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return readerConfMap.containsKey(key);
|
||||
}
|
||||
|
||||
public Object getReaderConfValue(String key) {
|
||||
if (key.contains(READER_CONF_PREFIX))
|
||||
return readerConfMap.get(key.substring(READER_CONF_PREFIX.length() + 1));
|
||||
else
|
||||
return readerConfMap.get(key);
|
||||
}
|
||||
|
||||
public void setReaderConfValue(String key, Object value) {
|
||||
if (key.contains(READER_CONF_PREFIX))
|
||||
readerConfMap.put(key.substring(READER_CONF_PREFIX.length() + 1), value);
|
||||
else
|
||||
readerConfMap.put(key, value);
|
||||
}
|
||||
|
||||
// Other consumer helper functions ...
|
||||
public String getReaderTopicName() {
|
||||
Object confValue = getReaderConfValue("reader.topicName");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
public String getReaderName() {
|
||||
Object confValue = getReaderConfValue("reader.readerName");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
return confValue.toString();
|
||||
}
|
||||
|
||||
public String getStartMsgPosStr() {
|
||||
Object confValue = getReaderConfValue("reader.startMessagePos");
|
||||
if (confValue == null)
|
||||
return "";
|
||||
else
|
||||
|
@ -1,11 +1,9 @@
|
||||
### NB Pulsar driver related configuration - driver.xxx
|
||||
driver.client-type = producer
|
||||
# TODO: At the moment, only one producer is publishing messages to one single topic through NB
|
||||
# TODO - consider allowing multiple producers to publish to the same topic
|
||||
# TODO - but this relies on the NB cycle to be able to run Pulsar client asynchronously!
|
||||
driver.num-workers = 1
|
||||
|
||||
|
||||
driver.client-type=producer
|
||||
driver.num-workers=1
|
||||
# TODO: functionalities to be completed
|
||||
driver.sync-mode=sync
|
||||
driver.msg-recv-ouput=console
|
||||
### Schema related configurations - schema.xxx
|
||||
# valid types:
|
||||
# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
|
||||
@ -13,26 +11,32 @@ driver.num-workers = 1
|
||||
# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
|
||||
# avro, json, protobuf
|
||||
#
|
||||
# TODO: as a starting point, only supports:
|
||||
# TODO: 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
|
||||
# TODO: 2) Avro for messages with schema
|
||||
schema.type = avro
|
||||
schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc
|
||||
|
||||
|
||||
# TODO: as a starting point, only supports the following types
|
||||
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
|
||||
# 2) Avro for messages with schema
|
||||
schema.type=avro
|
||||
schema.definition=file://<path>/<to>/<avro-definition-file>
|
||||
### Pulsar client related configurations - client.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
|
||||
default: pulsar://localhost:6550
|
||||
# default: 10000
|
||||
client.connectionTimeoutMs = 5000
|
||||
|
||||
|
||||
client.serviceUrl=pulsar://localhost:6650
|
||||
client.connectionTimeoutMs=5000
|
||||
### Producer related configurations (global) - producer.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
|
||||
producer.producerName =
|
||||
producer.topicName = persistent://public/default/mynbtest
|
||||
#producer.sendTimeoutMs =
|
||||
|
||||
producer.producerName=
|
||||
producer.topicName=persistent://public/default/mynbtest
|
||||
producer.sendTimeoutMs=
|
||||
### Consumer related configurations (global) - consumer.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
|
||||
|
||||
consumer.topicNames=
|
||||
consumer.topicsPattern=
|
||||
consumer.subscriptionName=
|
||||
consumer.subscriptionType=
|
||||
consumer.consumerName=
|
||||
consumer.receiverQueueSize=
|
||||
### Reader related configurations (global) - reader.xxx
|
||||
# https://pulsar.apache.org/docs/en/client-libraries-java/#reader
|
||||
# - valid Pos: earliest, latest, custom::file://<path>/<to>/<message_id_file>
|
||||
reader.topicName=persistent://public/default/nbpulsar
|
||||
reader.receiverQueueSize=
|
||||
reader.readerName=
|
||||
#reader.startMessagePos = earliest
|
||||
|
@ -22,7 +22,7 @@ blocks:
|
||||
|
||||
- name: producer-block
|
||||
tags:
|
||||
type: producer
|
||||
op-type: producer
|
||||
statements:
|
||||
- producer-stuff:
|
||||
#######
|
||||
@ -40,20 +40,23 @@ blocks:
|
||||
"ReadingValue": {reading_value}
|
||||
}
|
||||
|
||||
# - name: consumer-block
|
||||
# tags:
|
||||
# type: consumer
|
||||
# statements:
|
||||
# - consumer-stuff:
|
||||
# subscription-name:
|
||||
# subscription-type:
|
||||
#
|
||||
# - reader:
|
||||
# tags:
|
||||
# type: reader
|
||||
# statements:
|
||||
# - reader-stuff:
|
||||
#
|
||||
- name: consumer-block
|
||||
tags:
|
||||
op-type: consumer
|
||||
statements:
|
||||
- consumer-stuff:
|
||||
topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest"
|
||||
topics-pattern: "public/default/.*"
|
||||
subscription-name:
|
||||
subscription-type:
|
||||
consumer-name:
|
||||
|
||||
- reader:
|
||||
tags:
|
||||
op-type: reader
|
||||
statements:
|
||||
- reader-stuff:
|
||||
|
||||
# - websocket-producer:
|
||||
# tags:
|
||||
# type: websocket-produer
|
||||
|
@ -121,7 +121,9 @@ In the above statement block, there are 4 key statement parameters to provide va
|
||||
```
|
||||
* At the moment, only "**\<short-topic-name\>**" part can be dynamically bound (e.g. through NB binding function). All other parts must be static values and the corresponding tenants and namespaces must be created in the Pulsar cluster in advance.
|
||||
|
||||
**TODO**: allow dynamic binding for "\<tenant-name\>" and "\<namespace-name\>" after adding a phase for creating "\<tenant-name\>" and/or "\<namespace-name\>", similar to C* CQL schema creation phase.!
|
||||
**TODO**: allow dynamic binding for "\<tenant-name\>" and "
|
||||
\<namespace-name\>" after adding a phase for creating "\<tenant-name\>"
|
||||
and/or "\<namespace-name\>", similar to C* CQL schema creation phase!
|
||||
|
||||
* **msg-key**: Pulsar message key
|
||||
* **Optional**
|
||||
@ -140,16 +142,19 @@ In the above statement block, there are 4 key statement parameters to provide va
|
||||
Pulsar has built-in schema support. Other than primitive types, Pulsar also supports complex types like **Avro**, etc. At the moment, the NB Pulsar driver provides 2 schema support modes, via the global level schema related settings as below:
|
||||
* Avro schema:
|
||||
```properties
|
||||
shcema.type: avro
|
||||
schema.definition: file:///<file/path/to/the/definition/file>
|
||||
shcema.type= avro
|
||||
schema.definition= file:///<file/path/to/the/definition/file>
|
||||
```
|
||||
* Default byte[] schema:
|
||||
```properties
|
||||
shcema.type:
|
||||
schema.definition:
|
||||
shcema.type=
|
||||
schema.definition=
|
||||
```
|
||||
|
||||
For the previous Producer block statement example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition (e.g. from a file **iot-example.asvc**)
|
||||
For the previous Producer block statement example, the **msg-value**
|
||||
parameter has the value of a JSON string that follows the following Avro
|
||||
schema definition (e.g. as in the sample schema definition
|
||||
file: **[iot-example.asvc](activities/iot-example.avsc)**)
|
||||
```json
|
||||
{
|
||||
"type": "record",
|
||||
@ -166,7 +171,8 @@ For the previous Producer block statement example, the **msg-value** parameter h
|
||||
|
||||
## 1.5. Activity Parameters
|
||||
|
||||
At the moment, the following Activity Parameter is supported:
|
||||
At the moment, the following Pulsar driver specific Activity Parameter is
|
||||
supported:
|
||||
|
||||
- * config=<file/path/to/global/configuration/properties/file>
|
||||
|
||||
@ -188,8 +194,11 @@ At the moment, the following Activity Parameter is supported:
|
||||
**NOTE**: The following text is based on the original multi-layer API caching design which is not fully implemented at the moment. We need to revisit the original design at some point in order to achieve maximum testing flexibility.
|
||||
|
||||
To summarize, the original caching design has the following key requirements:
|
||||
* **Requirement 1**: Each NB Pulsar activity is able to launch and cache multiple **client spaces**
|
||||
* **Requirement 2**:Each client space can launch and cache multiple Pulsar operators of the same type (producer, consumer, etc.)
|
||||
|
||||
* **Requirement 1**: Each NB Pulsar activity is able to launch and cache
|
||||
multiple **client spaces**
|
||||
* **Requirement 2**: Each client space can launch and cache multiple
|
||||
Pulsar operators of the same type (producer, consumer, etc.)
|
||||
|
||||
In the current implementation, only requirement 2 is implemented. Regarding requirement 1, the current implementation only supports one client space per NB Pulsar activity!
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user