From b0da4a149ba1c535ae87276d2fe811fb515ce7f0 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Sat, 27 Feb 2021 21:06:39 -0600 Subject: [PATCH 1/2] Pulsar Consumer API with Avro schema support --- .../driver/pulsar/PulsarConsumerSpace.java | 180 ++++++++++++++++++ .../driver/pulsar/PulsarProducerSpace.java | 81 ++++++++ .../driver/pulsar/PulsarReaderSpace.java | 15 ++ .../nosqlbench/driver/pulsar/PulsarSpace.java | 89 +-------- .../driver/pulsar/PulsarSpaceCache.java | 20 +- .../pulsar/ops/PulsarConsumerMapper.java | 19 +- .../driver/pulsar/ops/PulsarConsumerOp.java | 39 ++-- .../pulsar/ops/PulsarProducerMapper.java | 19 +- .../driver/pulsar/ops/PulsarReaderMapper.java | 27 +++ .../driver/pulsar/ops/PulsarReaderOp.java | 19 ++ .../driver/pulsar/ops/ReadyPulsarOp.java | 135 +++++++++---- .../driver/pulsar/util/AvroUtil.java | 21 ++ .../pulsar/util/PulsarActivityUtil.java | 107 ++++++++--- .../pulsar/util/PulsarNBClientConf.java | 162 +++++++++++++--- .../resources/activities/config.properties | 32 ++-- .../src/main/resources/activities/pulsar.yaml | 17 +- driver-pulsar/src/main/resources/pulsar.md | 8 +- .../api/activityconfig/yaml/StmtDef.java | 10 +- 18 files changed, 774 insertions(+), 226 deletions(-) create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java new file mode 100644 index 000000000..edf99267e --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java @@ -0,0 +1,180 @@ +package io.nosqlbench.driver.pulsar; + +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +public class PulsarConsumerSpace extends PulsarSpace { + + private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); + + public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) { super(name, pulsarClientConf); } + + private String getEffectiveTopicNamesStr(String cycleTopicNames) { + if ((cycleTopicNames != null) && (!cycleTopicNames.isEmpty())) { + return cycleTopicNames; + } + + String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); + if ((globalTopicNames != null) && (!globalTopicNames.isEmpty())) { + return globalTopicNames; + } + + return ""; + } + private List getEffectiveTopicNames(String cycleTopicNames) { + String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); + + String[] names = effectiveTopicNamesStr.split("[;,]"); + ArrayList effectiveTopicNameList = new ArrayList<>(); + + for (String name : names) { + if ( !name.isEmpty() ) + effectiveTopicNameList.add(name.trim()); + } + + + return effectiveTopicNameList; + } + + private String getEffectiveTopicPatternStr(String cycleTopicsPattern) { + if ((cycleTopicsPattern != null) && (!cycleTopicsPattern.isEmpty())) { + return cycleTopicsPattern; + } + + String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); + if ((globalTopicsPattern != null) && (!globalTopicsPattern.isEmpty())) { + return globalTopicsPattern; + } + + return ""; + } + private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) { + String effecitveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern; + try { + topicsPattern = Pattern.compile(effecitveTopicsPatternStr); + } + catch (PatternSyntaxException pse) { + topicsPattern = null; + } + return topicsPattern; + } + + private String getEffectiveSubscriptionName(String cycleSubscriptionName) { + if ((cycleSubscriptionName != null) && (!cycleSubscriptionName.isEmpty())) { + return cycleSubscriptionName; + } + + String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName(); + if ((globalSubscriptionName != null) && (!globalSubscriptionName.isEmpty())) { + return globalSubscriptionName; + } + + return "default-subs"; + } + + private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) { + if ((cycleSubscriptionType != null) && (!cycleSubscriptionType.isEmpty())) { + return cycleSubscriptionType; + } + + String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType(); + if ((globalSubscriptionType != null) && (!globalSubscriptionType.isEmpty())) { + return globalSubscriptionType; + } + + return ""; + } + private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) { + String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType); + SubscriptionType subscriptionType; + + try { + subscriptionType = SubscriptionType.valueOf(effectiveSubscriptionStr); + } + catch (IllegalArgumentException iae) { + subscriptionType = SubscriptionType.Exclusive; + } + + return subscriptionType; + } + + private String getEffectiveConsumerName(String cycleConsumerName) { + if ((cycleConsumerName != null) && (!cycleConsumerName.isEmpty())) { + return cycleConsumerName; + } + + String globalConsumerName = pulsarNBClientConf.getConsumerName(); + if ((globalConsumerName != null) && (!globalConsumerName.isEmpty())) { + return globalConsumerName; + } + + return "default-cons"; + } + + public Consumer getConsumer(String cycleTopicNames, + String cycleTopicsPattern, + String cycleSubscriptionName, + String cycleSubscriptionType, + String cycleConsumerName) { + + String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); + List topicNames = getEffectiveTopicNames(cycleTopicNames); + String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); + SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); + String consumerName = getEffectiveConsumerName(cycleConsumerName); + + String encodedStr = PulsarActivityUtil.encode( + consumerName, subscriptionName, topicNamesStr, topicsPatternStr); + Consumer consumer = consumers.get(encodedStr); + + if (consumer == null) { + PulsarClient pulsarClient = getPulsarClient(); + + // Get other possible producer settings that are set at global level + Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); + + // Explicit topic names will take precedence over topics pattern + if ( !topicNames.isEmpty() ) { + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_KEY.topicsPattern.toString()); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.topicNames.toString(), topicNames); + } + else { + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_KEY.topicNames.toString()); + if ( !topicsPatternStr.isEmpty() ) + consumerConf.put( + PulsarActivityUtil.CONSUMER_CONF_KEY.topicsPattern.toString(), + getEffectiveTopicPattern(cycleTopicsPattern)); + else { + throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!"); + } + } + + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.subscriptionName.toString(), subscriptionName); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.subscriptionType.toString(), subscriptionType); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.consumerName.toString(), consumerName); + + try { + consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe(); + } + catch (PulsarClientException ple) { + ple.printStackTrace(); + throw new RuntimeException("Unable to create a Pulsar consumer!"); + } + + consumers.put(encodedStr, consumer); + } + + return consumer; + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java new file mode 100644 index 000000000..d625a165b --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java @@ -0,0 +1,81 @@ +package io.nosqlbench.driver.pulsar; + +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; + +import java.util.Base64; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class PulsarProducerSpace extends PulsarSpace{ + + private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); + + public PulsarProducerSpace(String name, PulsarNBClientConf pulsarClientConf) { + super(name, pulsarClientConf); + } + + // Producer name is NOT mandatory + // - It can be set at either global level or cycle level + // - If set at both levels, cycle level setting takes precedence + private String getEffectiveProducerName(String cycleProducerName) { + if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) { + return cycleProducerName; + } + + String globalProducerName = pulsarNBClientConf.getProducerName(); + if ((globalProducerName != null) && (!globalProducerName.isEmpty())) { + return globalProducerName; + } + + // Default Producer name when it is not set at either cycle or global level + return "default-prod"; + } + + // Topic name IS mandatory + // - It must be set at either global level or cycle level + // - If set at both levels, cycle level setting takes precedence + private String getEffectiveTopicName(String cycleTopicName) { + if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) { + return cycleTopicName; + } + + String globalTopicName = pulsarNBClientConf.getProducerTopicName(); + if ( (globalTopicName == null) || (globalTopicName.isEmpty()) ) { + throw new RuntimeException("Topic name must be set at either global level or cycle level!"); + } + + return globalTopicName; + } + + public Producer getProducer(String cycleProducerName, String cycleTopicName) { + String producerName = getEffectiveProducerName(cycleProducerName); + String topicName = getEffectiveTopicName(cycleTopicName); + + String encodedStr = PulsarActivityUtil.encode(cycleProducerName, cycleTopicName); + Producer producer = producers.get(encodedStr); + + if (producer == null) { + PulsarClient pulsarClient = getPulsarClient(); + + // Get other possible producer settings that are set at global level + Map producerConf = pulsarNBClientConf.getProducerConfMap(); + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_KEY.topicName.toString(), topicName); + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_KEY.producerName.toString(), producerName); + + try { + producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create(); + } + catch (PulsarClientException ple) { + throw new RuntimeException("Unable to create a Pulsar producer!"); + } + + producers.put(encodedStr, producer); + } + + return producer; + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java new file mode 100644 index 000000000..3a908b652 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java @@ -0,0 +1,15 @@ +package io.nosqlbench.driver.pulsar; + +import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; +import org.apache.pulsar.client.api.Reader; + +import java.util.concurrent.ConcurrentHashMap; + +public class PulsarReaderSpace extends PulsarSpace { + + private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); + + public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) { + super(name, pulsarClientConf); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index da754e183..56b50947b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -17,15 +17,13 @@ import java.util.concurrent.ConcurrentHashMap; public class PulsarSpace { private final static Logger logger = LogManager.getLogger(PulsarSpace.class); - - private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); // TODO: add support for other client types: consumer, reader, websocket-producer, managed-ledger, etc. - private final String name; - private final PulsarNBClientConf pulsarNBClientConf; + protected final String name; + protected final PulsarNBClientConf pulsarNBClientConf; - private PulsarClient pulsarClient = null; - private Schema pulsarSchema = null; + protected PulsarClient pulsarClient = null; + protected Schema pulsarSchema = null; public PulsarSpace( String name, PulsarNBClientConf pulsarClientConf ) { this.name = name; @@ -35,7 +33,7 @@ public class PulsarSpace { createPulsarSchemaFromConf(); } - private void createPulsarClientFromConf() { + protected void createPulsarClientFromConf() { ClientBuilder clientBuilder = PulsarClient.builder(); String dftSvcUrl = "pulsar://localhost:6650"; @@ -53,11 +51,13 @@ public class PulsarSpace { } } - private void createPulsarSchemaFromConf() { - String schemaType = pulsarNBClientConf.getSchemaConfValue("schema.type").toString(); + protected void createPulsarSchemaFromConf() { + Object value = pulsarNBClientConf.getSchemaConfValue("schema.type"); + String schemaType = (value != null) ? value.toString() : ""; if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) { - String schemaDefStr = pulsarNBClientConf.getSchemaConfValue("schema.definition").toString(); + value = pulsarNBClientConf.getSchemaConfValue("schema.definition"); + String schemaDefStr = (value != null) ? value.toString() : ""; pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr); } else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) { pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType)); @@ -68,77 +68,8 @@ public class PulsarSpace { } public PulsarClient getPulsarClient() { return pulsarClient; } - public PulsarNBClientConf getPulsarClientConf() { return pulsarNBClientConf; } - public Schema getPulsarSchema() { return pulsarSchema; } - - // Producer name is NOT mandatory - // - It can be set at either global level or cycle level - // - If set at both levels, cycle level setting takes precedence - private String getEffectiveProducerName(String cycleProducerName) { - if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) { - return cycleProducerName; - } - - String globalProducerName = pulsarNBClientConf.getProducerName(); - if ((globalProducerName != null) && (!globalProducerName.isEmpty())) { - return globalProducerName; - } - - // Default Producer name when it is not set at either cycle or global level - return "default"; - } - - // Topic name is mandatory - // - It must be set at either global level or cycle level - // - If set at both levels, cycle level setting takes precedence - private String getEffectiveTopicName(String cycleTopicName) { - if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) { - return cycleTopicName; - } - - String globalTopicName = pulsarNBClientConf.getTopicName(); - if ( (globalTopicName == null) || (globalTopicName.isEmpty()) ) { - throw new RuntimeException("Topic name must be set at either global level or cycle level!"); - } - - return globalTopicName; - } - - private Producer createPulsarProducer(String cycleTopicName, String cycleProducerName) { - PulsarClient pulsarClient = getPulsarClient(); - Producer producer = null; - - String producerName = getEffectiveProducerName(cycleProducerName); - String topicName = getEffectiveTopicName(cycleTopicName); - - // Get other possible producer settings that are set at global level - Map producerConf = pulsarNBClientConf.getProducerConfMap(); - producerConf.put("topicName", topicName); - producerConf.put("producerName", producerName); - - try { - producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create(); - } - catch (PulsarClientException ple) { - throw new RuntimeException("Unable to create a client to connect to the Pulsar cluster!"); - } - - return producer; - } - - public Producer getProducer(String cycleProducerName, String cycleTopicName) { - String producerName = getEffectiveProducerName(cycleProducerName); - Producer producer = producers.get(producerName); - - if (producer == null) { - producer = createPulsarProducer(cycleTopicName, cycleProducerName); - producers.put(producerName, producer); - } - - return producer; - } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java index a9ca5fa77..fe9a64bf7 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java @@ -1,5 +1,7 @@ package io.nosqlbench.driver.pulsar; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; + import java.util.concurrent.ConcurrentHashMap; /** @@ -21,8 +23,22 @@ public class PulsarSpaceCache { } public PulsarSpace getPulsarSpace(String name) { - PulsarSpace cspace = clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, activity.getPulsarConf())); - return cspace; + + String pulsarClientType = activity.getPulsarConf().getPulsarClientType(); + + if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString())) { + return clientScopes.computeIfAbsent(name, spaceName -> new PulsarProducerSpace(spaceName, activity.getPulsarConf())); + } + else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString())) { + return clientScopes.computeIfAbsent(name, spaceName -> new PulsarConsumerSpace(spaceName, activity.getPulsarConf())); + } + else if (pulsarClientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString())) { + return clientScopes.computeIfAbsent(name, spaceName -> new PulsarReaderSpace(spaceName, activity.getPulsarConf())); + } + // TODO: add support for websocket-producer and managed-ledger + else { + throw new RuntimeException("Unsupported Pulsar client type: " + pulsarClientType); + } } public PulsarActivity getActivity() { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index b809dcd07..4a69e6c9f 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Schema; import java.util.function.LongFunction; @@ -16,21 +17,21 @@ import java.util.function.LongFunction; * For additional parameterization, the command template is also provided. */ public class PulsarConsumerMapper implements LongFunction { - private final LongFunction> consumerFunc; - private final LongFunction recvInstructions; private final CommandTemplate cmdTpl; + private final Schema pulsarSchema; + private final LongFunction> consumerFunc; - public PulsarConsumerMapper(LongFunction> consumerFunc, - LongFunction recvMsg, - CommandTemplate cmdTpl) { - this.consumerFunc = consumerFunc; - this.recvInstructions = recvMsg; + public PulsarConsumerMapper(CommandTemplate cmdTpl, + Schema pulsarSchema, + LongFunction> consumerFunc) { this.cmdTpl = cmdTpl; - // TODO add schema support + this.pulsarSchema = pulsarSchema; + this.consumerFunc = consumerFunc; } @Override public PulsarOp apply(long value) { - return new PulsarConsumerOp((Consumer) consumerFunc.apply(value), recvInstructions.apply(value)); + Consumer consumer = consumerFunc.apply(value); + return new PulsarConsumerOp(consumer, pulsarSchema); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 149931a63..1833772e4 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -1,31 +1,42 @@ package io.nosqlbench.driver.pulsar.ops; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; +import io.nosqlbench.driver.pulsar.util.AvroUtil; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.common.schema.SchemaType; import java.nio.charset.StandardCharsets; public class PulsarConsumerOp implements PulsarOp { - private final Consumer consumer; - private final String recvInstructions; + private final Consumer consumer; + private final Schema pulsarSchema; - public PulsarConsumerOp(Consumer consumer, String recvInstructions) { + public PulsarConsumerOp(Consumer consumer, Schema schema) { this.consumer = consumer; - this.recvInstructions = recvInstructions; + this.pulsarSchema = schema; } @Override public void run() { try { - Message msgbytes = consumer.receive(); - // TODO: Parameterize the actions taken on a received message - // TODO: Properly parameterize all pulsar Op types as with Producer and Consumer - String received = new String(msgbytes.getValue(), StandardCharsets.UTF_8); - System.out.print("received:" + received); - if (!received.endsWith("\n")) { - System.out.println("\n"); + Message message = consumer.receive(); + + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData()); + + System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString()); } + else { + System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData())); + } + + consumer.acknowledge(message.getMessageId()); + } catch (PulsarClientException e) { throw new RuntimeException(e); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 30231d1ae..cebc8287c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -18,23 +18,22 @@ import java.util.function.LongFunction; * For additional parameterization, the command template is also provided. */ public class PulsarProducerMapper implements LongFunction { + private final CommandTemplate cmdTpl; + private final Schema pulsarSchema; private final LongFunction> producerFunc; private final LongFunction keyFunc; private final LongFunction payloadFunc; - private final Schema pulsarSchema; - private final CommandTemplate cmdTpl; - public PulsarProducerMapper( - LongFunction> producerFunc, - LongFunction keyFunc, - LongFunction payloadFunc, - Schema pulsarSchema, - CommandTemplate cmdTpl) { + public PulsarProducerMapper(CommandTemplate cmdTpl, + Schema pulsarSchema, + LongFunction> producerFunc, + LongFunction keyFunc, + LongFunction payloadFunc) { + this.cmdTpl = cmdTpl; + this.pulsarSchema = pulsarSchema; this.producerFunc = producerFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; - this.pulsarSchema = pulsarSchema; - this.cmdTpl = cmdTpl; } @Override diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java new file mode 100644 index 000000000..7ae971a7e --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java @@ -0,0 +1,27 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; + +import java.util.function.LongFunction; + +public class PulsarReaderMapper implements LongFunction { + private final CommandTemplate cmdTpl; + private final Schema pulsarSchema; + private final LongFunction> readerFunc; + + public PulsarReaderMapper(CommandTemplate cmdTpl, + Schema pulsarSchema, + LongFunction> readerFunc) { + this.cmdTpl = cmdTpl; + this.pulsarSchema = pulsarSchema; + this.readerFunc = readerFunc; + } + + @Override + public PulsarOp apply(long value) { + Reader reader = readerFunc.apply(value); + return new PulsarReaderOp(reader, pulsarSchema); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java new file mode 100644 index 000000000..d3d7bd94f --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java @@ -0,0 +1,19 @@ +package io.nosqlbench.driver.pulsar.ops; + +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; + +public class PulsarReaderOp implements PulsarOp { + private final Reader reader; + private final Schema pulsarSchema; + + public PulsarReaderOp(Reader reader, Schema schema) { + this.reader = reader; + this.pulsarSchema = schema; + } + + @Override + public void run() { + //TODO: to be added + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 82272150f..8add08167 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -1,13 +1,13 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.*; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; -import io.nosqlbench.driver.pulsar.PulsarSpace; -import io.nosqlbench.driver.pulsar.PulsarSpaceCache; -import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.engine.api.scoping.ScopedSupplier; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import java.util.function.LongFunction; @@ -15,22 +15,23 @@ import java.util.function.Supplier; public class ReadyPulsarOp implements LongFunction { + private final OpTemplate opTpl; private final CommandTemplate cmdTpl; private final PulsarSpace clientSpace; private final LongFunction opFunc; - private final PulsarSpaceCache pcache; - private final Schema pulsarSchema; + private final Schema pulsarSchema; // TODO: Add docs for the command template with respect to the OpTemplate public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) { // TODO: Consider parsing map structures into equivalent binding representation + this.opTpl = opTemplate; this.cmdTpl = new CommandTemplate(opTemplate); + if (cmdTpl.isDynamic("op_scope")) { throw new RuntimeException("op_scope must be static"); } - this.pcache = pcache; // TODO: At the moment, only supports static "client" if (cmdTpl.containsKey("client")) { if (cmdTpl.isDynamic("client")) { @@ -52,7 +53,30 @@ public class ReadyPulsarOp implements LongFunction { } private LongFunction resolve() { + String clientType = clientSpace.getPulsarClientConf().getPulsarClientType(); + // TODO: Complete implementation for reader, websocket-producer and managed-ledger + if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) { + assert clientSpace instanceof PulsarProducerSpace; + return resolveProducer((PulsarProducerSpace) clientSpace, cmdTpl); + } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString()) ) { + assert clientSpace instanceof PulsarConsumerSpace; + return resolveConsumer((PulsarConsumerSpace)clientSpace, cmdTpl); /* + } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString()) ) { + assert clientSpace instanceof PulsarReaderSpace; + return resolveReader((PulsarReaderSpace)clientSpace, cmdTpl); + } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.WSOKT_PRODUCER.toString()) ) { + } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.MANAGED_LEDGER.toString()) ) { + */ + } else { + throw new RuntimeException("Unsupported Pulsar client: " + clientType); + } + } + + private LongFunction resolveProducer( + PulsarProducerSpace clientSpace, + CommandTemplate cmdTpl + ) { if (cmdTpl.containsKey("topic_url")) { throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); } @@ -99,31 +123,8 @@ public class ReadyPulsarOp implements LongFunction { topic_uri_func = (l) -> null; } - assert (clientSpace != null); - String clientType = clientSpace.getPulsarClientConf().getPulsarClientType(); - - // TODO: At the moment, only implements "Producer" functionality; add implementation for others later! - if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) { - return resolveProducer(clientSpace, cmdTpl, cycle_producer_name_func, topic_uri_func);/* - } else if ( msgOperation.equalsIgnoreCase(PulsarActivityUtil.MSGOP_TYPES.CONSUMER.toString()) ) { - return resolveConsumer(spaceFunc, cmdTpl, topic_uri_func); - } else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.READER.toString()) ) { - } else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.WSOKT_PRODUCER.toString()) ) { - } else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.MANAGED_LEDGER.toString()) ) { - */ - } else { - throw new RuntimeException("Unsupported Pulsar message operation type."); - } - } - - private LongFunction resolveProducer( - PulsarSpace pulsarSpace, - CommandTemplate cmdTpl, - LongFunction cycle_producer_name_func, - LongFunction topic_uri_func - ) { LongFunction> producerFunc = - (l) -> pulsarSpace.getProducer(cycle_producer_name_func.apply(l), topic_uri_func.apply(l)); + (l) -> clientSpace.getProducer(cycle_producer_name_func.apply(l), topic_uri_func.apply(l)); LongFunction keyFunc; if (cmdTpl.isStatic("msg-key")) { @@ -147,12 +148,80 @@ public class ReadyPulsarOp implements LongFunction { throw new RuntimeException("\"msg-value\" field must be specified!"); } - return new PulsarProducerMapper(producerFunc, keyFunc, valueFunc, pulsarSchema, cmdTpl); + return new PulsarProducerMapper(cmdTpl, pulsarSchema, producerFunc, keyFunc, valueFunc); + } + + private LongFunction resolveConsumer( + PulsarConsumerSpace clientSpace, + CommandTemplate cmdTpl + ) { + LongFunction topic_names_func; + if (cmdTpl.isStatic("topic-names")) { + topic_names_func = (l) -> cmdTpl.getStatic("topic-names"); + } else if (cmdTpl.isDynamic("topic-names")) { + topic_names_func = (l) -> cmdTpl.getDynamic("topic-names", l); + } else { + topic_names_func = (l) -> null; + } + + LongFunction topics_pattern_func; + if (cmdTpl.isStatic("topics-pattern")) { + topics_pattern_func = (l) -> cmdTpl.getStatic("topics-pattern"); + } else if (cmdTpl.isDynamic("topics-pattern")) { + topics_pattern_func = (l) -> cmdTpl.getDynamic("topics-pattern", l); + } else { + topics_pattern_func = (l) -> null; + } + + LongFunction subscription_name_func; + if (cmdTpl.isStatic("subscription-name")) { + subscription_name_func = (l) -> cmdTpl.getStatic("subscription-name"); + } else if (cmdTpl.isDynamic("subscription-name")) { + subscription_name_func = (l) -> cmdTpl.getDynamic("subscription-name", l); + } else { + subscription_name_func = (l) -> null; + } + + LongFunction subscription_type_func; + if (cmdTpl.isStatic("subscription-type")) { + subscription_type_func = (l) -> cmdTpl.getStatic("subscription-type"); + } else if (cmdTpl.isDynamic("subscription-type")) { + subscription_type_func = (l) -> cmdTpl.getDynamic("subscription-type", l); + } else { + subscription_type_func = (l) -> null; + } + + LongFunction consumer_name_func; + if (cmdTpl.isStatic("consumer-name")) { + consumer_name_func = (l) -> cmdTpl.getStatic("consumer-name"); + } else if (cmdTpl.isDynamic("consumer-name")) { + consumer_name_func = (l) -> cmdTpl.getDynamic("consumer-name", l); + } else { + consumer_name_func = (l) -> null; + } + + LongFunction> consumerFunc = (l) -> + clientSpace.getConsumer( + topic_names_func.apply(l), + topics_pattern_func.apply(l), + subscription_name_func.apply(l), + subscription_type_func.apply(l), + consumer_name_func.apply(l) + ); + + return new PulsarConsumerMapper(cmdTpl, pulsarSchema, consumerFunc); + } + + private LongFunction resolveReader( + PulsarReaderSpace pulsarSpace, + CommandTemplate cmdTpl + ) { + //TODO: to be completed + return null; } @Override public PulsarOp apply(long value) { - PulsarOp op = opFunc.apply(value); - return op; + return opFunc.apply(value); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java index cf7537f5c..2a4f1f75e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java @@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.util; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.JsonDecoder; +import org.apache.avro.io.BinaryDecoder; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericRecordBuilder; @@ -25,6 +26,7 @@ public class AvroUtil { org.apache.avro.generic.GenericDatumReader reader; reader = new org.apache.avro.generic.GenericDatumReader<>(schema); + JsonDecoder decoder = DecoderFactory.get().jsonDecoder(schema, jsonData); record = reader.read(null, decoder); @@ -35,6 +37,25 @@ public class AvroUtil { return record; } + public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, byte[] bytesData) { + org.apache.avro.generic.GenericRecord record = null; + + try { + org.apache.avro.Schema schema = GetSchema_ApacheAvro(avroSchemDef); + + org.apache.avro.generic.GenericDatumReader reader; + reader = new org.apache.avro.generic.GenericDatumReader<>(schema); + + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytesData, null); + + record = reader.read(null, decoder); + } + catch (IOException ioe) { + ioe.printStackTrace(); + } + + return record; + } public static GenericAvroSchema GetSchema_PulsarAvro(String schemaName, String avroSchemDef) { SchemaInfo schemaInfo = SchemaInfo.builder() diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index 6247a566f..ffeacb61b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -15,6 +15,7 @@ import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; +import java.util.Base64; import java.util.HashMap; public class PulsarActivityUtil { @@ -31,7 +32,7 @@ public class PulsarActivityUtil { ; public final String label; - private CLIENT_TYPES(String label) { + CLIENT_TYPES(String label) { this.label = label; } } @@ -48,7 +49,7 @@ public class PulsarActivityUtil { ; public final String label; - private PERSISTENT_TYPES(String label) { + PERSISTENT_TYPES(String label) { this.label = label; } } @@ -59,6 +60,7 @@ public class PulsarActivityUtil { /////// // Valid Pulsar client configuration (activity-level settings) + // - https://pulsar.apache.org/docs/en/client-libraries-java/#client public enum CLNT_CONF_KEY { serviceUrl("serviceUrl"), authPulginClassName("authPluginClassName"), @@ -83,7 +85,7 @@ public class PulsarActivityUtil { ; public final String label; - private CLNT_CONF_KEY(String label) { + CLNT_CONF_KEY(String label) { this.label = label; } } @@ -93,12 +95,10 @@ public class PulsarActivityUtil { /////// // Valid producer configuration (activity-level settings) + // - https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer public enum PRODUCER_CONF_KEY { - // NOTE: - // For "topicName" and "producerName", they're ignore at activity-level. - // Instead, op-level settings are respected - // topicName("topicName"), - // producerName("producerName"), + topicName("topicName"), + producerName("producerName"), sendTimeoutMs("sendTimeoutMs"), blockIfQueueFull("blockIfQueueFull"), maxPendingMessages("maxPendingMessages"), @@ -113,7 +113,7 @@ public class PulsarActivityUtil { ; public final String label; - private PRODUCER_CONF_KEY(String label) { + PRODUCER_CONF_KEY(String label) { this.label = label; } } @@ -123,27 +123,63 @@ public class PulsarActivityUtil { /////// // Valid consumer configuration (activity-level settings) - // TODO: to be added + // - https://pulsar.apache.org/docs/en/client-libraries-java/#consumer public enum CONSUMER_CONF_KEY { + topicNames("topicNames"), + topicsPattern("topicsPattern"), + subscriptionName("subscriptionName"), + subscriptionType("subscriptionType"), + receiverQueueSize("receiverQueueSize"), + acknowledgementsGroupTimeMicros("acknowledgementsGroupTimeMicros"), + negativeAckRedeliveryDelayMicros("negativeAckRedeliveryDelayMicros"), + maxTotalReceiverQueueSizeAcrossPartitions("maxTotalReceiverQueueSizeAcrossPartitions"), + consumerName("consumerName"), + ackTimeoutMillis("ackTimeoutMillis"), + tickDurationMillis("tickDurationMillis"), + priorityLevel("priorityLevel"), + cryptoFailureAction("cryptoFailureAction"), + properties("properties"), + readCompacted("readCompacted"), + subscriptionInitialPosition("subscriptionInitialPosition"), + patternAutoDiscoveryPeriod("patternAutoDiscoveryPeriod"), + regexSubscriptionMode("regexSubscriptionMode"), + deadLetterPolicy("deadLetterPolicy"), + autoUpdatePartitions("autoUpdatePartitions"), + replicateSubscriptionState("replicateSubscriptionState") ; public final String label; - private CONSUMER_CONF_KEY(String label) { + CONSUMER_CONF_KEY(String label) { this.label = label; } } + public static boolean isValidConsumerConfItem(String item) { + return Arrays.stream(CONSUMER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } /////// // Valid reader configuration (activity-level settings) - // TODO: to be added + // - https://pulsar.apache.org/docs/en/client-libraries-java/#reader public enum READER_CONF_KEY { + topicName("topicName"), + receiverQueueSize("receiverQueueSize"), + readerListener("readerListener"), + readerName("readerName"), + subscriptionRolePrefix("subscriptionRolePrefix"), + cryptoKeyReader("cryptoKeyReader"), + cryptoFailureAction("cryptoFailureAction"), + readCompacted("readCompacted"), + resetIncludeHead("resetIncludeHead") ; public final String label; - private READER_CONF_KEY(String label) { + READER_CONF_KEY(String label) { this.label = label; } } + public static boolean isValidReaderConfItem(String item) { + return Arrays.stream(READER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } /////// // Valid websocket-producer configuration (activity-level settings) @@ -152,7 +188,7 @@ public class PulsarActivityUtil { ; public final String label; - private WEBSKT_PRODUCER_CONF_KEY(String label) { + WEBSKT_PRODUCER_CONF_KEY(String label) { this.label = label; } } @@ -164,7 +200,7 @@ public class PulsarActivityUtil { ; public final String label; - private MANAGED_LEDGER_CONF_KEY(String label) { + MANAGED_LEDGER_CONF_KEY(String label) { this.label = label; } } @@ -179,21 +215,21 @@ public class PulsarActivityUtil { typeStr = "BYTES"; } - if ( typeStr.toUpperCase().equals("BOOLEAN") || typeStr.toUpperCase().equals("INT8") || - typeStr.toUpperCase().equals("INT16") || typeStr.toUpperCase().equals("INT32") || - typeStr.toUpperCase().equals("INT64") || typeStr.toUpperCase().equals("FLOAT") || - typeStr.toUpperCase().equals("DOUBLE") || typeStr.toUpperCase().equals("BYTES") || - typeStr.toUpperCase().equals("DATE") || typeStr.toUpperCase().equals("TIME") || - typeStr.toUpperCase().equals("TIMESTAMP") || typeStr.toUpperCase().equals("INSTANT") || - typeStr.toUpperCase().equals("LOCAL_DATE") || typeStr.toUpperCase().equals("LOCAL_TIME") || - typeStr.toUpperCase().equals("LOCAL_DATE_TIME") ) { + if ( typeStr.equalsIgnoreCase("BOOLEAN") || typeStr.equalsIgnoreCase("INT8") || + typeStr.equalsIgnoreCase("INT16") || typeStr.equalsIgnoreCase("INT32") || + typeStr.equalsIgnoreCase("INT64") || typeStr.equalsIgnoreCase("FLOAT") || + typeStr.equalsIgnoreCase("DOUBLE") || typeStr.equalsIgnoreCase("BYTES") || + typeStr.equalsIgnoreCase("DATE") || typeStr.equalsIgnoreCase("TIME") || + typeStr.equalsIgnoreCase("TIMESTAMP") || typeStr.equalsIgnoreCase("INSTANT") || + typeStr.equalsIgnoreCase("LOCAL_DATE") || typeStr.equalsIgnoreCase("LOCAL_TIME") || + typeStr.equalsIgnoreCase("LOCAL_DATE_TIME") ) { isPrimitive = true; } return isPrimitive; } - public static Schema getPrimitiveTypeSchema(String typeStr) { - Schema schema = null; + public static Schema getPrimitiveTypeSchema(String typeStr) { + Schema schema; switch (typeStr.toUpperCase()) { case "BOOLEAN": @@ -240,7 +276,7 @@ public class PulsarActivityUtil { break; // Use BYTES as the default schema type if the type string is not specified case "": - case "BTYES": + case "BYTES": schema = Schema.BYTES; break; // Report an error if non-valid, non-empty schema type string is provided @@ -255,15 +291,15 @@ public class PulsarActivityUtil { // Complex strut type: Avro or Json public static boolean isAvroSchemaTypeStr(String typeStr) { boolean isAvroType = false; - if ( typeStr.toUpperCase().equals("AVRO") ) { + if ( typeStr.equalsIgnoreCase("AVRO") ) { isAvroType = true; } return isAvroType; } - public static Schema getAvroSchema(String typeStr, String definitionStr) { + public static Schema getAvroSchema(String typeStr, String definitionStr) { String schemaDefinitionStr = definitionStr; String filePrefix = "file://"; - Schema schema = null; + Schema schema; // Check if payloadStr points to a file (e.g. "file:///path/to/a/file") if (isAvroSchemaTypeStr(typeStr)) { @@ -278,8 +314,6 @@ public class PulsarActivityUtil { } } - System.out.println(schemaDefinitionStr); - SchemaInfo schemaInfo = SchemaInfo.builder() .schema(schemaDefinitionStr.getBytes(StandardCharsets.UTF_8)) .type(SchemaType.AVRO) @@ -296,5 +330,16 @@ public class PulsarActivityUtil { return schema; } + + public static String encode(String... strings) { + StringBuilder stringBuilder = new StringBuilder(); + + for (String str : strings) { + if ((str != null) && !str.isEmpty()) + stringBuilder.append(str).append("::"); + } + + return Base64.getEncoder().encodeToString(stringBuilder.toString().getBytes()); + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index ec8c4a0d2..e5b6a58c6 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -26,11 +26,15 @@ public class PulsarNBClientConf { private static final String SCHEMA_CONF_PREFIX = "schema"; private static final String CLIENT_CONF_PREFIX = "client"; private static final String PRODUCER_CONF_PREFIX = "producer"; + private static final String CONSUMER_CONF_PREFIX = "consumer"; + private static final String READER_CONF_PREFIX = "reader"; private HashMap driverConfMap = new HashMap<>(); private HashMap schemaConfMap = new HashMap<>(); private HashMap clientConfMap = new HashMap<>(); private HashMap producerConfMap = new HashMap<>(); - // TODO: add support for other operation types: consumer, reader, websocket-producer, managed-ledger + private HashMap consumerConfMap = new HashMap<>(); + private HashMap readerConfMap = new HashMap<>(); + // TODO: add support for other operation types: websocket-producer, managed-ledger public PulsarNBClientConf(String fileName) { File file = new File(fileName); @@ -51,25 +55,49 @@ public class PulsarNBClientConf { // Get driver specific configuration settings for (Iterator it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); - driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey)); + String confVal = config.getProperty(confKey).toString(); + if ( (confVal != null) && !confVal.isEmpty() ) + driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey)); } // Get schema specific configuration settings for (Iterator it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); - schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length()+1), config.getProperty(confKey)); + String confVal = config.getProperty(confKey).toString(); + if ( (confVal != null) && !confVal.isEmpty() ) + schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length()+1), config.getProperty(confKey)); } // Get client connection specific configuration settings for (Iterator it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); - clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length()+1), config.getProperty(confKey)); + String confVal = config.getProperty(confKey).toString(); + if ( (confVal != null) && !confVal.isEmpty() ) + clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length()+1), config.getProperty(confKey)); } // Get producer specific configuration settings for (Iterator it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); - producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length()+1), config.getProperty(confKey)); + String confVal = config.getProperty(confKey).toString(); + if ( (confVal != null) && !confVal.isEmpty() ) + producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length()+1), config.getProperty(confKey)); + } + + // Get producer specific configuration settings + for (Iterator it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) { + String confKey = it.next(); + String confVal = config.getProperty(confKey).toString(); + if ( (confVal != null) && !confVal.isEmpty() ) + consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length()+1), config.getProperty(confKey)); + } + + // Get producer specific configuration settings + for (Iterator it = config.getKeys(READER_CONF_PREFIX); it.hasNext(); ) { + String confKey = it.next(); + String confVal = config.getProperty(confKey).toString(); + if ( (confVal != null) && !confVal.isEmpty() ) + readerConfMap.put(confKey.substring(READER_CONF_PREFIX.length()+1), config.getProperty(confKey)); } } catch (IOException ioe) { @@ -82,6 +110,8 @@ public class PulsarNBClientConf { } } + + ////////////////// // Get NB Driver related config public Map getDriverConfMap() { return this.driverConfMap; @@ -104,7 +134,19 @@ public class PulsarNBClientConf { else driverConfMap.put(key, value); } + // other driver helper functions ... + public String getPulsarClientType() { + Object confValue = getDriverConfValue("driver.client-type"); + // If not explicitly specifying Pulsar client type, "producer" is the default type + if (confValue == null) + return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString(); + else + return confValue.toString(); + } + + + ////////////////// // Get Schema related config public Map getSchemaConfMap() { return this.schemaConfMap; @@ -128,6 +170,8 @@ public class PulsarNBClientConf { schemaConfMap.put(key, value); } + + ////////////////// // Get Pulsar client related config public Map getClientConfMap() { return this.clientConfMap; @@ -151,6 +195,8 @@ public class PulsarNBClientConf { clientConfMap.put(key, value); } + + ////////////////// // Get Pulsar producer related config public Map getProducerConfMap() { return this.producerConfMap; @@ -173,34 +219,106 @@ public class PulsarNBClientConf { else producerConfMap.put(key, value); } - - public String getPulsarClientType() { - Object confValue = getDriverConfValue("driver.client-type"); - - // If not explicitly specifying Pulsar client type, "producer" is the default type - if (confValue == null) - return PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString(); - else - return confValue.toString(); - } - + // other producer helper functions ... public String getProducerName() { Object confValue = getProducerConfValue("producer.producerName"); - - // If not explicitly specifying Pulsar client type, "producer" is the default type if (confValue == null) return ""; else return confValue.toString(); } - - public String getTopicName() { + public String getProducerTopicName() { Object confValue = getProducerConfValue("producer.topicName"); - - // If not explicitly specifying Pulsar client type, "producer" is the default type if (confValue == null) return ""; else return confValue.toString(); } + + + ////////////////// + // Get Pulsar consumer related config + public Map getConsumerConfMap() { + return this.consumerConfMap; + } + public boolean hasConsumerConfKey(String key) { + if (key.contains(CONSUMER_CONF_PREFIX)) + return consumerConfMap.containsKey(key.substring(CONSUMER_CONF_PREFIX.length()+1)); + else + return consumerConfMap.containsKey(key); + } + public Object getConsumerConfValue(String key) { + if (key.contains(CONSUMER_CONF_PREFIX)) + return consumerConfMap.get(key.substring(CONSUMER_CONF_PREFIX.length()+1)); + else + return consumerConfMap.get(key); + } + public void setConsumerConfValue(String key, Object value) { + if (key.contains(CONSUMER_CONF_PREFIX)) + consumerConfMap.put(key.substring(CONSUMER_CONF_PREFIX.length()+1), value); + else + consumerConfMap.put(key, value); + } + // Other consumer helper functions ... + public String getConsumerTopicNames() { + Object confValue = getConsumerConfValue("consumer.topicNames"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } + public String getConsumerTopicPattern() { + Object confValue = getConsumerConfValue("consumer.topicsPattern"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } + public String getConsumerSubscriptionName() { + Object confValue = getConsumerConfValue("consumer.subscriptionName"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } + public String getConsumerSubscriptionType() { + Object confValue = getConsumerConfValue("consumer.subscriptionType"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } + public String getConsumerName() { + Object confValue = getConsumerConfValue("consumer.consumerName"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } + + + ////////////////// + // Get Pulsar reader related config + public Map getReaderConfMap() { + return this.readerConfMap; + } + public boolean hasReaderConfKey(String key) { + if (key.contains(READER_CONF_PREFIX)) + return readerConfMap.containsKey(key.substring(READER_CONF_PREFIX.length()+1)); + else + return readerConfMap.containsKey(key); + } + public Object getReaderConfValue(String key) { + if (key.contains(READER_CONF_PREFIX)) + return readerConfMap.get(key.substring(READER_CONF_PREFIX.length()+1)); + else + return readerConfMap.get(key); + } + public void setReaderConfValue(String key, Object value) { + if (key.contains(READER_CONF_PREFIX)) + readerConfMap.put(key.substring(READER_CONF_PREFIX.length()+1), value); + else + readerConfMap.put(key, value); + } + // Other consumer helper functions ... } diff --git a/driver-pulsar/src/main/resources/activities/config.properties b/driver-pulsar/src/main/resources/activities/config.properties index 69666467b..925f03477 100644 --- a/driver-pulsar/src/main/resources/activities/config.properties +++ b/driver-pulsar/src/main/resources/activities/config.properties @@ -1,9 +1,9 @@ ### NB Pulsar driver related configuration - driver.xxx -driver.client-type = producer -# TODO: At the moment, only one producer is publishing messages to one single topic through NB -# TODO - consider allowing multiple producers to publish to the same topic -# TODO - but this relies on the NB cycle to be able to run Pulsar client asynchronously! +driver.client-type = consumer driver.num-workers = 1 +# TODO: functionalities to be completed +driver.sync-mode = sync +driver.msg-recv-ouput = console ### Schema related configurations - schema.xxx @@ -13,17 +13,16 @@ driver.num-workers = 1 # - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct) # avro, json, protobuf # -# TODO: as a starting point, only supports: -# TODO: 1) primitive types, including bytearray (byte[]) which is default, for messages without schema -# TODO: 2) Avro for messages with schema -schema.type = avro +# TODO: as a starting point, only supports the following types +# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema +# 2) Avro for messages with schema +schema.type = schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc ### Pulsar client related configurations - client.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#client -default: pulsar://localhost:6550 -# default: 10000 +client.serviceUrl: pulsar://localhost:6650 client.connectionTimeoutMs = 5000 @@ -31,8 +30,19 @@ client.connectionTimeoutMs = 5000 # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer producer.producerName = producer.topicName = persistent://public/default/mynbtest -#producer.sendTimeoutMs = +producer.sendTimeoutMs = ### Consumer related configurations (global) - consumer.xxx # http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer +consumer.topicNames = +consumer.topicsPattern = +consumer.subscriptionName = +consumer.subscriptionType = +consumer.consumerName = +consumer.receiverQueueSize = +### Reader related configurations (global) - reader.xxx +# https://pulsar.apache.org/docs/en/client-libraries-java/#reader +reader.topicName = +reader.receiverQueueSize = +reader.readerName = diff --git a/driver-pulsar/src/main/resources/activities/pulsar.yaml b/driver-pulsar/src/main/resources/activities/pulsar.yaml index 5e338c057..4a7fa026f 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar.yaml @@ -22,7 +22,7 @@ blocks: - name: producer-block tags: - type: producer + op-type: producer statements: - producer-stuff: ####### @@ -40,14 +40,17 @@ blocks: "ReadingValue": {reading_value} } -# - name: consumer-block -# tags: -# type: consumer -# statements: -# - consumer-stuff: + - name: consumer-block + tags: + op-type: consumer + statements: + - consumer-stuff: + topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest" +# topics-pattern: "public/default/.*" # subscription-name: # subscription-type: -# +# consumer-name: + # - reader: # tags: # type: reader diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index bb1078a33..00deabf03 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -121,7 +121,7 @@ In the above statement block, there are 4 key statement parameters to provide va ``` * At the moment, only "**\**" part can be dynamically bound (e.g. through NB binding function). All other parts must be static values and the corresponding tenants and namespaces must be created in the Pulsar cluster in advance. -**TODO**: allow dynamic binding for "\" and "\" after adding a phase for creating "\" and/or "\", similar to C* CQL schema creation phase.! +**TODO**: allow dynamic binding for "\" and "\" after adding a phase for creating "\" and/or "\", similar to C* CQL schema creation phase! * **msg-key**: Pulsar message key * **Optional** @@ -149,7 +149,7 @@ Pulsar has built-in schema support. Other than primitive types, Pulsar also supp schema.definition: ``` -For the previous Producer block statement example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition (e.g. from a file **iot-example.asvc**) +For the previous Producer block statement example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition (e.g. as in the sample schema definition file: **[iot-example.asvc](activities/iot-example.avsc)**) ```json { "type": "record", @@ -166,7 +166,7 @@ For the previous Producer block statement example, the **msg-value** parameter h ## 1.5. Activity Parameters -At the moment, the following Activity Parameter is supported: +At the moment, the following Pulsar driver specific Activity Parameter is supported: - * config= @@ -189,7 +189,7 @@ At the moment, the following Activity Parameter is supported: To summarize, the original caching design has the following key requirements: * **Requirement 1**: Each NB Pulsar activity is able to launch and cache multiple **client spaces** -* **Requirement 2**:Each client space can launch and cache multiple Pulsar operators of the same type (producer, consumer, etc.) +* **Requirement 2**: Each client space can launch and cache multiple Pulsar operators of the same type (producer, consumer, etc.) In the current implementation, only requirement 2 is implemented. Regarding requirement 1, the current implementation only supports one client space per NB Pulsar activity! diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityconfig/yaml/StmtDef.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityconfig/yaml/StmtDef.java index 1aea3e5da..aae92d9ed 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityconfig/yaml/StmtDef.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityconfig/yaml/StmtDef.java @@ -79,12 +79,14 @@ public class StmtDef implements OpTemplate { Map map = new LinkedHashMap<>(); for (String pname : getParams().keySet()) { Object object = getParams().get(pname); - if (type.isAssignableFrom(object.getClass())) { - map.put(pname, type.cast(object)); - } else { - throw new RuntimeException("With param named '" + pname + "" + + if (object != null) { + if (type.isAssignableFrom(object.getClass())) { + map.put(pname, type.cast(object)); + } else { + throw new RuntimeException("With param named '" + pname + "" + "' You can't assign an object of type '" + object.getClass().getSimpleName() + "" + "' to '" + type.getSimpleName() + "'. Maybe the YAML format is suggesting the wrong type."); + } } } return map; From b3084fdd4f768ca172918d90418b2292652fab43 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Sun, 28 Feb 2021 17:42:16 -0600 Subject: [PATCH 2/2] Pulsar Reader API with Avro schema support --- .../driver/pulsar/PulsarConsumerSpace.java | 72 ++++++++------ .../driver/pulsar/PulsarProducerSpace.java | 14 +-- .../driver/pulsar/PulsarReaderSpace.java | 97 ++++++++++++++++++- .../driver/pulsar/ops/PulsarReaderOp.java | 28 +++++- .../driver/pulsar/ops/ReadyPulsarOp.java | 52 +++++++--- .../pulsar/util/PulsarActivityUtil.java | 66 +++++++++---- .../pulsar/util/PulsarNBClientConf.java | 50 +++++++--- .../resources/activities/config.properties | 10 +- .../src/main/resources/activities/pulsar.yaml | 20 ++-- 9 files changed, 312 insertions(+), 97 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java index edf99267e..56d21d2b0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarConsumerSpace.java @@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; @@ -19,12 +20,12 @@ public class PulsarConsumerSpace extends PulsarSpace { public PulsarConsumerSpace(String name, PulsarNBClientConf pulsarClientConf) { super(name, pulsarClientConf); } private String getEffectiveTopicNamesStr(String cycleTopicNames) { - if ((cycleTopicNames != null) && (!cycleTopicNames.isEmpty())) { + if ( !StringUtils.isBlank(cycleTopicNames) ) { return cycleTopicNames; } String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); - if ((globalTopicNames != null) && (!globalTopicNames.isEmpty())) { + if ( !StringUtils.isBlank(globalTopicNames) ) { return globalTopicNames; } @@ -37,7 +38,7 @@ public class PulsarConsumerSpace extends PulsarSpace { ArrayList effectiveTopicNameList = new ArrayList<>(); for (String name : names) { - if ( !name.isEmpty() ) + if ( !StringUtils.isBlank(name) ) effectiveTopicNameList.add(name.trim()); } @@ -46,22 +47,25 @@ public class PulsarConsumerSpace extends PulsarSpace { } private String getEffectiveTopicPatternStr(String cycleTopicsPattern) { - if ((cycleTopicsPattern != null) && (!cycleTopicsPattern.isEmpty())) { + if ( !StringUtils.isBlank(cycleTopicsPattern) ) { return cycleTopicsPattern; } String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); - if ((globalTopicsPattern != null) && (!globalTopicsPattern.isEmpty())) { + if ( !StringUtils.isBlank(globalTopicsPattern) ) { return globalTopicsPattern; } return ""; } private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) { - String effecitveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); Pattern topicsPattern; try { - topicsPattern = Pattern.compile(effecitveTopicsPatternStr); + if ( !StringUtils.isBlank(effectiveTopicsPatternStr) ) + topicsPattern = Pattern.compile(effectiveTopicsPatternStr); + else + topicsPattern = null; } catch (PatternSyntaxException pse) { topicsPattern = null; @@ -70,12 +74,12 @@ public class PulsarConsumerSpace extends PulsarSpace { } private String getEffectiveSubscriptionName(String cycleSubscriptionName) { - if ((cycleSubscriptionName != null) && (!cycleSubscriptionName.isEmpty())) { + if ( !StringUtils.isBlank(cycleSubscriptionName) ) { return cycleSubscriptionName; } String globalSubscriptionName = pulsarNBClientConf.getConsumerSubscriptionName(); - if ((globalSubscriptionName != null) && (!globalSubscriptionName.isEmpty())) { + if ( !StringUtils.isBlank(globalSubscriptionName) ) { return globalSubscriptionName; } @@ -83,12 +87,12 @@ public class PulsarConsumerSpace extends PulsarSpace { } private String getEffectiveSubscriptionTypeStr(String cycleSubscriptionType) { - if ((cycleSubscriptionType != null) && (!cycleSubscriptionType.isEmpty())) { + if ( !StringUtils.isBlank(cycleSubscriptionType) ) { return cycleSubscriptionType; } String globalSubscriptionType = pulsarNBClientConf.getConsumerSubscriptionType(); - if ((globalSubscriptionType != null) && (!globalSubscriptionType.isEmpty())) { + if ( !StringUtils.isBlank(globalSubscriptionType) ) { return globalSubscriptionType; } @@ -109,12 +113,12 @@ public class PulsarConsumerSpace extends PulsarSpace { } private String getEffectiveConsumerName(String cycleConsumerName) { - if ((cycleConsumerName != null) && (!cycleConsumerName.isEmpty())) { + if ( !StringUtils.isBlank(cycleConsumerName) ) { return cycleConsumerName; } String globalConsumerName = pulsarNBClientConf.getConsumerName(); - if ((globalConsumerName != null) && (!globalConsumerName.isEmpty())) { + if ( !StringUtils.isBlank(globalConsumerName) ) { return globalConsumerName; } @@ -130,12 +134,28 @@ public class PulsarConsumerSpace extends PulsarSpace { String topicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); List topicNames = getEffectiveTopicNames(cycleTopicNames); String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern); String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); String consumerName = getEffectiveConsumerName(cycleConsumerName); - String encodedStr = PulsarActivityUtil.encode( - consumerName, subscriptionName, topicNamesStr, topicsPatternStr); + if ( topicNames.isEmpty() && (topicsPattern == null) ) { + throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!"); + } + + String encodedStr; + if ( !topicNames.isEmpty() ) { + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + StringUtils.join(topicNames, "|") ); + } + else { + encodedStr = PulsarActivityUtil.encode( + consumerName, + subscriptionName, + topicsPatternStr ); + } Consumer consumer = consumers.get(encodedStr); if (consumer == null) { @@ -146,23 +166,19 @@ public class PulsarConsumerSpace extends PulsarSpace { // Explicit topic names will take precedence over topics pattern if ( !topicNames.isEmpty() ) { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_KEY.topicsPattern.toString()); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.topicNames.toString(), topicNames); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.toString(), topicNames); } else { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_KEY.topicNames.toString()); - if ( !topicsPatternStr.isEmpty() ) - consumerConf.put( - PulsarActivityUtil.CONSUMER_CONF_KEY.topicsPattern.toString(), - getEffectiveTopicPattern(cycleTopicsPattern)); - else { - throw new RuntimeException("\"topicName\" and \"topicsPattern\" can't be empty/invalid at the same time!"); - } + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); + consumerConf.put( + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, + getEffectiveTopicPattern(cycleTopicsPattern)); } - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.subscriptionName.toString(), subscriptionName); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.subscriptionType.toString(), subscriptionType); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_KEY.consumerName.toString(), consumerName); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType); + consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName); try { consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe(); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java index d625a165b..325b9e73b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarProducerSpace.java @@ -2,11 +2,11 @@ package io.nosqlbench.driver.pulsar; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; +import org.apache.commons.lang.StringUtils; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import java.util.Base64; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -22,12 +22,12 @@ public class PulsarProducerSpace extends PulsarSpace{ // - It can be set at either global level or cycle level // - If set at both levels, cycle level setting takes precedence private String getEffectiveProducerName(String cycleProducerName) { - if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) { + if ( !StringUtils.isBlank(cycleProducerName) ) { return cycleProducerName; } String globalProducerName = pulsarNBClientConf.getProducerName(); - if ((globalProducerName != null) && (!globalProducerName.isEmpty())) { + if ( !StringUtils.isBlank(globalProducerName) ) { return globalProducerName; } @@ -39,12 +39,12 @@ public class PulsarProducerSpace extends PulsarSpace{ // - It must be set at either global level or cycle level // - If set at both levels, cycle level setting takes precedence private String getEffectiveTopicName(String cycleTopicName) { - if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) { + if ( !StringUtils.isBlank(cycleTopicName) ) { return cycleTopicName; } String globalTopicName = pulsarNBClientConf.getProducerTopicName(); - if ( (globalTopicName == null) || (globalTopicName.isEmpty()) ) { + if ( !StringUtils.isBlank(globalTopicName) ) { throw new RuntimeException("Topic name must be set at either global level or cycle level!"); } @@ -63,8 +63,8 @@ public class PulsarProducerSpace extends PulsarSpace{ // Get other possible producer settings that are set at global level Map producerConf = pulsarNBClientConf.getProducerConfMap(); - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_KEY.topicName.toString(), topicName); - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_KEY.producerName.toString(), producerName); + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName); + producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName); try { producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create(); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java index 3a908b652..f8e8de38d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarReaderSpace.java @@ -1,8 +1,11 @@ package io.nosqlbench.driver.pulsar; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; -import org.apache.pulsar.client.api.Reader; +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.client.api.*; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class PulsarReaderSpace extends PulsarSpace { @@ -12,4 +15,96 @@ public class PulsarReaderSpace extends PulsarSpace { public PulsarReaderSpace(String name, PulsarNBClientConf pulsarClientConf) { super(name, pulsarClientConf); } + + private String getEffectiveReaderTopicName(String cycleReaderTopicName) { + if ( !StringUtils.isBlank(cycleReaderTopicName) ) { + return cycleReaderTopicName; + } + + String globalReaderTopicName = pulsarNBClientConf.getReaderTopicName(); + if ( !StringUtils.isBlank(globalReaderTopicName) ) { + return globalReaderTopicName; + } + + return ""; + } + + private String getEffectiveReaderName(String cycleReaderName) { + if ( !StringUtils.isBlank(cycleReaderName) ) { + return cycleReaderName; + } + + String globalReaderName = pulsarNBClientConf.getConsumerName(); + if ( !StringUtils.isBlank(globalReaderName) ) { + return globalReaderName; + } + + return "default-read"; + } + + private String getEffectiveStartMsgPosStr(String cycleStartMsgPosStr) { + if ( !StringUtils.isBlank(cycleStartMsgPosStr) ) { + return cycleStartMsgPosStr; + } + + String globalStartMsgPosStr = pulsarNBClientConf.getStartMsgPosStr(); + if ( !StringUtils.isBlank(globalStartMsgPosStr) ) { + return globalStartMsgPosStr; + } + + return PulsarActivityUtil.READER_MSG_POSITION_TYPE.latest.label; + } + + public Reader getReader(String cycleTopicName, + String cycleReaderName, + String cycleStartMsgPos) { + + String topicName = getEffectiveReaderTopicName(cycleTopicName); + String readerName = getEffectiveReaderName(cycleReaderName); + String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos); + + if ( StringUtils.isBlank(topicName) ) { + throw new RuntimeException("Must specify a \"topicName\" for a reader!"); + } + + String encodedStr = PulsarActivityUtil.encode(cycleTopicName, cycleReaderName, cycleStartMsgPos); + Reader reader = readers.get(encodedStr); + + if (reader == null) { + PulsarClient pulsarClient = getPulsarClient(); + + Map readerConf = pulsarNBClientConf.getReaderConfMap(); + readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName); + readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName); + // "reader.startMessagePos" is NOT a standard Pulsar reader conf + readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label); + + try { + ReaderBuilder readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf); + + MessageId startMsgId = MessageId.latest; + if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) { + startMsgId = MessageId.earliest; + } + //TODO: custom start message position is NOT supported yet + //else if (startMsgPosStr.startsWith(PulsarActivityUtil.READER_MSG_POSITION_TYPE.custom.label)) { + // startMsgId = MessageId.latest; + //} + + if (startMsgId != null) { + readerBuilder = readerBuilder.startMessageId(startMsgId); + } + + reader = readerBuilder.create(); + } + catch (PulsarClientException ple) { + ple.printStackTrace(); + throw new RuntimeException("Unable to create a Pulsar reader!"); + } + + readers.put(encodedStr, reader); + } + + return reader; + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java index d3d7bd94f..6551f3abf 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java @@ -1,7 +1,12 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.util.AvroUtil; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaType; public class PulsarReaderOp implements PulsarOp { private final Reader reader; @@ -14,6 +19,27 @@ public class PulsarReaderOp implements PulsarOp { @Override public void run() { - //TODO: to be added + try { + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + + // TODO: how many messages to read per NB cycle? + Message message; + while (reader.hasMessageAvailable()) { + message = reader.readNext(); + + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData()); + System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString()); + } + else { + System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData())); + } + } + } + catch (PulsarClientException e) { + throw new RuntimeException(e); + } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 8add08167..1a1086075 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -58,13 +58,13 @@ public class ReadyPulsarOp implements LongFunction { // TODO: Complete implementation for reader, websocket-producer and managed-ledger if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) { assert clientSpace instanceof PulsarProducerSpace; - return resolveProducer((PulsarProducerSpace) clientSpace, cmdTpl); + return resolveProducer((PulsarProducerSpace) clientSpace); } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.CONSUMER.toString()) ) { assert clientSpace instanceof PulsarConsumerSpace; - return resolveConsumer((PulsarConsumerSpace)clientSpace, cmdTpl); /* + return resolveConsumer((PulsarConsumerSpace)clientSpace); } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.READER.toString()) ) { assert clientSpace instanceof PulsarReaderSpace; - return resolveReader((PulsarReaderSpace)clientSpace, cmdTpl); + return resolveReader((PulsarReaderSpace)clientSpace); /* } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.WSOKT_PRODUCER.toString()) ) { } else if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.MANAGED_LEDGER.toString()) ) { */ @@ -74,8 +74,7 @@ public class ReadyPulsarOp implements LongFunction { } private LongFunction resolveProducer( - PulsarProducerSpace clientSpace, - CommandTemplate cmdTpl + PulsarProducerSpace clientSpace ) { if (cmdTpl.containsKey("topic_url")) { throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); @@ -152,8 +151,7 @@ public class ReadyPulsarOp implements LongFunction { } private LongFunction resolveConsumer( - PulsarConsumerSpace clientSpace, - CommandTemplate cmdTpl + PulsarConsumerSpace clientSpace ) { LongFunction topic_names_func; if (cmdTpl.isStatic("topic-names")) { @@ -213,11 +211,43 @@ public class ReadyPulsarOp implements LongFunction { } private LongFunction resolveReader( - PulsarReaderSpace pulsarSpace, - CommandTemplate cmdTpl + PulsarReaderSpace clientSpace ) { - //TODO: to be completed - return null; + LongFunction topic_name_func; + if (cmdTpl.isStatic("topic-name")) { + topic_name_func = (l) -> cmdTpl.getStatic("topic-name"); + } else if (cmdTpl.isDynamic("topic-name")) { + topic_name_func = (l) -> cmdTpl.getDynamic("topic-name", l); + } else { + topic_name_func = (l) -> null; + } + + LongFunction reader_name_func; + if (cmdTpl.isStatic("reader-name")) { + reader_name_func = (l) -> cmdTpl.getStatic("reader-name"); + } else if (cmdTpl.isDynamic("reader-name")) { + reader_name_func = (l) -> cmdTpl.getDynamic("reader-name", l); + } else { + reader_name_func = (l) -> null; + } + + LongFunction start_msg_pos_str_func; + if (cmdTpl.isStatic("start-msg-position")) { + start_msg_pos_str_func = (l) -> cmdTpl.getStatic("start-msg-position"); + } else if (cmdTpl.isDynamic("start-msg-position")) { + start_msg_pos_str_func = (l) -> cmdTpl.getDynamic("start-msg-position", l); + } else { + start_msg_pos_str_func = (l) -> null; + } + + LongFunction> readerFunc = (l) -> + clientSpace.getReader( + topic_name_func.apply(l), + reader_name_func.apply(l), + start_msg_pos_str_func.apply(l) + ); + + return new PulsarReaderMapper(cmdTpl, pulsarSchema, readerFunc); } @Override diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index ffeacb61b..7ddf858b5 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.util; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Schema; @@ -11,7 +12,6 @@ import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.nio.file.InvalidPathException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; @@ -94,9 +94,9 @@ public class PulsarActivityUtil { } /////// - // Valid producer configuration (activity-level settings) + // Standard producer configuration (activity-level settings) // - https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer - public enum PRODUCER_CONF_KEY { + public enum PRODUCER_CONF_STD_KEY { topicName("topicName"), producerName("producerName"), sendTimeoutMs("sendTimeoutMs"), @@ -113,18 +113,18 @@ public class PulsarActivityUtil { ; public final String label; - PRODUCER_CONF_KEY(String label) { + PRODUCER_CONF_STD_KEY(String label) { this.label = label; } } - public static boolean isValidProducerConfItem(String item) { - return Arrays.stream(PRODUCER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + public static boolean isStandardProducerConfItem(String item) { + return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); } /////// - // Valid consumer configuration (activity-level settings) + // Standard consumer configuration (activity-level settings) // - https://pulsar.apache.org/docs/en/client-libraries-java/#consumer - public enum CONSUMER_CONF_KEY { + public enum CONSUMER_CONF_STD_KEY { topicNames("topicNames"), topicsPattern("topicsPattern"), subscriptionName("subscriptionName"), @@ -149,18 +149,18 @@ public class PulsarActivityUtil { ; public final String label; - CONSUMER_CONF_KEY(String label) { + CONSUMER_CONF_STD_KEY(String label) { this.label = label; } } - public static boolean isValidConsumerConfItem(String item) { - return Arrays.stream(CONSUMER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + public static boolean isStandardConsumerConfItem(String item) { + return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); } /////// - // Valid reader configuration (activity-level settings) + // Standard reader configuration (activity-level settings) // - https://pulsar.apache.org/docs/en/client-libraries-java/#reader - public enum READER_CONF_KEY { + public enum READER_CONF_STD_KEY { topicName("topicName"), receiverQueueSize("receiverQueueSize"), readerListener("readerListener"), @@ -173,12 +173,34 @@ public class PulsarActivityUtil { ; public final String label; - READER_CONF_KEY(String label) { + READER_CONF_STD_KEY(String label) { this.label = label; } } - public static boolean isValidReaderConfItem(String item) { - return Arrays.stream(READER_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + public static boolean isStandardReaderConfItem(String item) { + return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } + + public enum READER_CONF_CUSTOM_KEY { + startMessagePos("startMessagePos") + ; + + public final String label; + READER_CONF_CUSTOM_KEY(String label) { + this.label = label; + } + } + public static boolean isCustomReaderConfItem(String item) { + return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase())); + } + + public enum READER_MSG_POSITION_TYPE { + earliest("earliest"), + latest("latest"), + custom("custom"); + + public final String label; + READER_MSG_POSITION_TYPE(String label) { this.label = label; } } /////// @@ -211,7 +233,7 @@ public class PulsarActivityUtil { boolean isPrimitive = false; // Use "BYTES" as the default type if the type string is not explicitly specified - if ((typeStr == null) || typeStr.isEmpty()) { + if (StringUtils.isBlank(typeStr)) { typeStr = "BYTES"; } @@ -303,7 +325,7 @@ public class PulsarActivityUtil { // Check if payloadStr points to a file (e.g. "file:///path/to/a/file") if (isAvroSchemaTypeStr(typeStr)) { - if ( (schemaDefinitionStr == null) || schemaDefinitionStr.isEmpty()) { + if ( StringUtils.isBlank(schemaDefinitionStr) ) { throw new RuntimeException("Schema definition must be provided for \"Avro\" schema type!"); } else if (schemaDefinitionStr.startsWith(filePrefix)) { try { @@ -333,13 +355,15 @@ public class PulsarActivityUtil { public static String encode(String... strings) { StringBuilder stringBuilder = new StringBuilder(); - for (String str : strings) { - if ((str != null) && !str.isEmpty()) + if ( !StringUtils.isBlank(str) ) stringBuilder.append(str).append("::"); } - return Base64.getEncoder().encodeToString(stringBuilder.toString().getBytes()); + String concatenatedStr = + StringUtils.substringAfterLast(stringBuilder.toString(), "::"); + + return Base64.getEncoder().encodeToString(concatenatedStr.getBytes()); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index e5b6a58c6..9d656caac 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -7,6 +7,7 @@ import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder; import org.apache.commons.configuration2.builder.fluent.Parameters; import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler; import org.apache.commons.configuration2.ex.ConfigurationException; +import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -22,12 +23,12 @@ public class PulsarNBClientConf { private String canonicalFilePath = ""; - private static final String DRIVER_CONF_PREFIX = "driver"; - private static final String SCHEMA_CONF_PREFIX = "schema"; - private static final String CLIENT_CONF_PREFIX = "client"; - private static final String PRODUCER_CONF_PREFIX = "producer"; - private static final String CONSUMER_CONF_PREFIX = "consumer"; - private static final String READER_CONF_PREFIX = "reader"; + public static final String DRIVER_CONF_PREFIX = "driver"; + public static final String SCHEMA_CONF_PREFIX = "schema"; + public static final String CLIENT_CONF_PREFIX = "client"; + public static final String PRODUCER_CONF_PREFIX = "producer"; + public static final String CONSUMER_CONF_PREFIX = "consumer"; + public static final String READER_CONF_PREFIX = "reader"; private HashMap driverConfMap = new HashMap<>(); private HashMap schemaConfMap = new HashMap<>(); private HashMap clientConfMap = new HashMap<>(); @@ -56,7 +57,7 @@ public class PulsarNBClientConf { for (Iterator it = config.getKeys(DRIVER_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); String confVal = config.getProperty(confKey).toString(); - if ( (confVal != null) && !confVal.isEmpty() ) + if ( !StringUtils.isBlank(confVal) ) driverConfMap.put(confKey.substring(DRIVER_CONF_PREFIX.length()+1), config.getProperty(confKey)); } @@ -64,7 +65,7 @@ public class PulsarNBClientConf { for (Iterator it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); String confVal = config.getProperty(confKey).toString(); - if ( (confVal != null) && !confVal.isEmpty() ) + if ( !StringUtils.isBlank(confVal) ) schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length()+1), config.getProperty(confKey)); } @@ -72,7 +73,7 @@ public class PulsarNBClientConf { for (Iterator it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); String confVal = config.getProperty(confKey).toString(); - if ( (confVal != null) && !confVal.isEmpty() ) + if ( !StringUtils.isBlank(confVal) ) clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length()+1), config.getProperty(confKey)); } @@ -80,23 +81,23 @@ public class PulsarNBClientConf { for (Iterator it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); String confVal = config.getProperty(confKey).toString(); - if ( (confVal != null) && !confVal.isEmpty() ) + if ( !StringUtils.isBlank(confVal) ) producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length()+1), config.getProperty(confKey)); } - // Get producer specific configuration settings + // Get consumer specific configuration settings for (Iterator it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); String confVal = config.getProperty(confKey).toString(); - if ( (confVal != null) && !confVal.isEmpty() ) + if ( !StringUtils.isBlank(confVal) ) consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length()+1), config.getProperty(confKey)); } - // Get producer specific configuration settings + // Get reader specific configuration settings for (Iterator it = config.getKeys(READER_CONF_PREFIX); it.hasNext(); ) { String confKey = it.next(); String confVal = config.getProperty(confKey).toString(); - if ( (confVal != null) && !confVal.isEmpty() ) + if ( !StringUtils.isBlank(confVal) ) readerConfMap.put(confKey.substring(READER_CONF_PREFIX.length()+1), config.getProperty(confKey)); } } @@ -321,4 +322,25 @@ public class PulsarNBClientConf { readerConfMap.put(key, value); } // Other consumer helper functions ... + public String getReaderTopicName() { + Object confValue = getReaderConfValue("reader.topicName"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } + public String getReaderName() { + Object confValue = getReaderConfValue("reader.readerName"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } + public String getStartMsgPosStr() { + Object confValue = getReaderConfValue("reader.startMessagePos"); + if (confValue == null) + return ""; + else + return confValue.toString(); + } } diff --git a/driver-pulsar/src/main/resources/activities/config.properties b/driver-pulsar/src/main/resources/activities/config.properties index 925f03477..ec954c961 100644 --- a/driver-pulsar/src/main/resources/activities/config.properties +++ b/driver-pulsar/src/main/resources/activities/config.properties @@ -1,5 +1,5 @@ ### NB Pulsar driver related configuration - driver.xxx -driver.client-type = consumer +driver.client-type = producer driver.num-workers = 1 # TODO: functionalities to be completed driver.sync-mode = sync @@ -16,8 +16,8 @@ driver.msg-recv-ouput = console # TODO: as a starting point, only supports the following types # 1) primitive types, including bytearray (byte[]) which is default, for messages without schema # 2) Avro for messages with schema -schema.type = -schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc +schema.type = avro +schema.definition = file://// ### Pulsar client related configurations - client.xxx @@ -43,6 +43,8 @@ consumer.receiverQueueSize = ### Reader related configurations (global) - reader.xxx # https://pulsar.apache.org/docs/en/client-libraries-java/#reader -reader.topicName = +# - valid Pos: earliest, latest, custom::file://// +reader.topicName = persistent://public/default/nbpulsar reader.receiverQueueSize = reader.readerName = +#reader.startMessagePos = earliest diff --git a/driver-pulsar/src/main/resources/activities/pulsar.yaml b/driver-pulsar/src/main/resources/activities/pulsar.yaml index 4a7fa026f..c329e193f 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar.yaml @@ -46,17 +46,17 @@ blocks: statements: - consumer-stuff: topic-names: "persistent://public/default/nbpulsar, persistent://public/default/mynbtest" -# topics-pattern: "public/default/.*" -# subscription-name: -# subscription-type: -# consumer-name: + topics-pattern: "public/default/.*" + subscription-name: + subscription-type: + consumer-name: + + - reader: + tags: + op-type: reader + statements: + - reader-stuff: -# - reader: -# tags: -# type: reader -# statements: -# - reader-stuff: -# # - websocket-producer: # tags: # type: websocket-produer