mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-01-26 15:36:33 -06:00
- Update pulsar.md
- Code changes based on review comments from Shooky
This commit is contained in:
parent
ddc7544b8f
commit
f78a9e928c
@ -67,51 +67,45 @@ public class PulsarSpace {
|
||||
}
|
||||
}
|
||||
|
||||
public PulsarClient getPulsarClient() {
|
||||
return pulsarClient;
|
||||
}
|
||||
public PulsarClient getPulsarClient() { return pulsarClient; }
|
||||
|
||||
public PulsarNBClientConf getPulsarClientConf() {
|
||||
return pulsarNBClientConf;
|
||||
}
|
||||
|
||||
public Schema<?> getPulsarSchema() {
|
||||
return pulsarSchema;
|
||||
}
|
||||
public Schema<?> getPulsarSchema() { return pulsarSchema; }
|
||||
|
||||
// Producer name is NOT mandatory
|
||||
// - It can be set at either global level or cycle level
|
||||
// - If set at both levels, cycle level setting takes precedence
|
||||
private String getEffectiveProducerName(String cycleProducerName) {
|
||||
// TODO: Maybe using NB run specific string as the producer name?
|
||||
String producerName = "default";
|
||||
if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) {
|
||||
return cycleProducerName;
|
||||
}
|
||||
|
||||
String globalProducerName = pulsarNBClientConf.getProducerName();
|
||||
if ((globalProducerName != null) && (!globalProducerName.isEmpty())) {
|
||||
producerName = globalProducerName;
|
||||
}
|
||||
if ((cycleProducerName != null) && (!cycleProducerName.isEmpty())) {
|
||||
producerName = cycleProducerName;
|
||||
return globalProducerName;
|
||||
}
|
||||
|
||||
return producerName;
|
||||
// Default Producer name when it is not set at either cycle or global level
|
||||
return "default";
|
||||
}
|
||||
|
||||
// Topic name is mandatory
|
||||
// - It must be set at either global level or cycle level
|
||||
// - If set at both levels, cycle level setting takes precedence
|
||||
private String getEffectiveTopicName(String cycleTopicName) {
|
||||
String globalTopicName = pulsarNBClientConf.getTopicName();
|
||||
String topicName = globalTopicName;
|
||||
|
||||
if ( ((globalTopicName == null) || (globalTopicName.isEmpty())) &&
|
||||
((cycleTopicName == null) || (cycleTopicName.isEmpty())) ) {
|
||||
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
|
||||
} else if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) {
|
||||
topicName = cycleTopicName;
|
||||
if ((cycleTopicName != null) && (!cycleTopicName.isEmpty())) {
|
||||
return cycleTopicName;
|
||||
}
|
||||
|
||||
return topicName;
|
||||
String globalTopicName = pulsarNBClientConf.getTopicName();
|
||||
if ( (globalTopicName == null) || (globalTopicName.isEmpty()) ) {
|
||||
throw new RuntimeException("Topic name must be set at either global level or cycle level!");
|
||||
}
|
||||
|
||||
return globalTopicName;
|
||||
}
|
||||
|
||||
private Producer createPulsarProducer(String cycleTopicName, String cycleProducerName) {
|
||||
@ -124,9 +118,7 @@ public class PulsarSpace {
|
||||
// Get other possible producer settings that are set at global level
|
||||
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
|
||||
producerConf.put("topicName", topicName);
|
||||
if ((producerName != null) && (!producerName.isEmpty())) {
|
||||
producerConf.put("producerName", producerName);
|
||||
}
|
||||
producerConf.put("producerName", producerName);
|
||||
|
||||
try {
|
||||
producer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
|
||||
@ -140,14 +132,11 @@ public class PulsarSpace {
|
||||
|
||||
public Producer<?> getProducer(String cycleProducerName, String cycleTopicName) {
|
||||
String producerName = getEffectiveProducerName(cycleProducerName);
|
||||
String topicName = getEffectiveTopicName(cycleTopicName);
|
||||
|
||||
String identifierStr = producerName.toLowerCase() + "::" + topicName.toLowerCase();
|
||||
Producer producer = producers.get(identifierStr);
|
||||
Producer producer = producers.get(producerName);
|
||||
|
||||
if (producer == null) {
|
||||
producer = createPulsarProducer(cycleTopicName, cycleProducerName);
|
||||
producers.put(identifierStr, producer);
|
||||
producers.put(producerName, producer);
|
||||
}
|
||||
|
||||
return producer;
|
||||
|
@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar.ops;
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
@ -20,28 +21,28 @@ public class PulsarProducerMapper implements LongFunction<PulsarOp> {
|
||||
private final LongFunction<Producer<?>> producerFunc;
|
||||
private final LongFunction<String> keyFunc;
|
||||
private final LongFunction<String> payloadFunc;
|
||||
private final PulsarSpace clientSpace;
|
||||
private final Schema pulsarSchema;
|
||||
private final CommandTemplate cmdTpl;
|
||||
|
||||
public PulsarProducerMapper(
|
||||
LongFunction<Producer<?>> producerFunc,
|
||||
LongFunction<String> keyFunc,
|
||||
LongFunction<String> payloadFunc,
|
||||
PulsarSpace clientSpace,
|
||||
Schema pulsarSchema,
|
||||
CommandTemplate cmdTpl) {
|
||||
this.producerFunc = producerFunc;
|
||||
this.keyFunc = keyFunc;
|
||||
this.payloadFunc = payloadFunc;
|
||||
this.clientSpace = clientSpace;
|
||||
this.pulsarSchema = pulsarSchema;
|
||||
this.cmdTpl = cmdTpl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PulsarOp apply(long value) {
|
||||
Producer<?> producer = producerFunc.apply(value);
|
||||
String msgKey = keyFunc != null ? keyFunc.apply(value) : null;
|
||||
String msgKey = keyFunc.apply(value);
|
||||
String msgPayload = payloadFunc.apply(value);
|
||||
|
||||
return new PulsarProducerOp(producer, clientSpace.getPulsarSchema(), msgKey, msgPayload);
|
||||
return new PulsarProducerOp(producer, pulsarSchema, msgKey, msgPayload);
|
||||
}
|
||||
}
|
||||
|
@ -25,13 +25,16 @@ public class PulsarProducerOp implements PulsarOp {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||
if ( (msgPayload == null) || msgPayload.isEmpty() ) {
|
||||
throw new RuntimeException("Message payload (\"msg-value\" can't be empty!");
|
||||
}
|
||||
|
||||
TypedMessageBuilder typedMessageBuilder = producer.newMessage(pulsarSchema);
|
||||
if ( (msgKey != null) && (!msgKey.isEmpty()) ) {
|
||||
typedMessageBuilder = typedMessageBuilder.key(msgKey);
|
||||
}
|
||||
|
||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
||||
|
@ -3,10 +3,12 @@ package io.nosqlbench.driver.pulsar.ops;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpace;
|
||||
import io.nosqlbench.driver.pulsar.PulsarSpaceCache;
|
||||
import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.scoping.ScopedSupplier;
|
||||
import io.nosqlbench.engine.api.templating.CommandTemplate;
|
||||
import org.apache.pulsar.client.api.Producer;
|
||||
import org.apache.pulsar.client.api.Schema;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
import java.util.function.Supplier;
|
||||
@ -14,18 +16,36 @@ import java.util.function.Supplier;
|
||||
public class ReadyPulsarOp implements LongFunction<PulsarOp> {
|
||||
|
||||
private final CommandTemplate cmdTpl;
|
||||
private final PulsarSpace clientSpace;
|
||||
private final LongFunction<PulsarOp> opFunc;
|
||||
private final PulsarSpaceCache pcache;
|
||||
private final Schema pulsarSchema;
|
||||
|
||||
// TODO: Add docs for the command template with respect to the OpTemplate
|
||||
|
||||
public ReadyPulsarOp(OpTemplate opTemplate, PulsarSpaceCache pcache) {
|
||||
PulsarSpace clientSpace1;
|
||||
// TODO: Consider parsing map structures into equivalent binding representation
|
||||
this.cmdTpl = new CommandTemplate(opTemplate);
|
||||
this.pcache = pcache;
|
||||
if (cmdTpl.isDynamic("op_scope")) {
|
||||
throw new RuntimeException("op_scope must be static");
|
||||
}
|
||||
|
||||
this.pcache = pcache;
|
||||
// TODO: At the moment, only supports static "client"
|
||||
if (cmdTpl.containsKey("client")) {
|
||||
if (cmdTpl.isDynamic("client")) {
|
||||
throw new RuntimeException("\"client\" can't be made dynamic!");
|
||||
} else {
|
||||
String client_name = cmdTpl.getStatic("client");
|
||||
this.clientSpace = pcache.getPulsarSpace(client_name);
|
||||
}
|
||||
} else {
|
||||
this.clientSpace = pcache.getPulsarSpace("default");
|
||||
}
|
||||
|
||||
this.pulsarSchema = clientSpace.getPulsarSchema();
|
||||
|
||||
this.opFunc = resolve();
|
||||
|
||||
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
|
||||
@ -38,6 +58,15 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
|
||||
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
|
||||
}
|
||||
|
||||
LongFunction<String> cycle_producer_name_func;
|
||||
if (cmdTpl.isStatic("producer-name")) {
|
||||
cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer-name");
|
||||
} else if (cmdTpl.isDynamic("producer-name")) {
|
||||
cycle_producer_name_func = (l) -> cmdTpl.getDynamic("producer-name", l);
|
||||
} else {
|
||||
cycle_producer_name_func = (l) -> null;
|
||||
}
|
||||
|
||||
LongFunction<String> topic_uri_func;
|
||||
if (cmdTpl.containsKey("topic_uri")) {
|
||||
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
|
||||
@ -48,42 +77,27 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
|
||||
topic_uri_func = (l) -> cmdTpl.getDynamic("topic_uri", l);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (cmdTpl.containsKey("topic")) {
|
||||
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
|
||||
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
|
||||
.replaceAll("true", "persistent");
|
||||
else if (cmdTpl.containsKey("topic")) {
|
||||
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
|
||||
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
|
||||
.replaceAll("true", "persistent");
|
||||
|
||||
String tenant = cmdTpl.getStaticOr("tenant", "public");
|
||||
String namespace = cmdTpl.getStaticOr("namespace", "default");
|
||||
String topic = cmdTpl.getStaticOr("topic", "");
|
||||
String tenant = cmdTpl.getStaticOr("tenant", "public");
|
||||
String namespace = cmdTpl.getStaticOr("namespace", "default");
|
||||
String topic = cmdTpl.getStaticOr("topic", "");
|
||||
|
||||
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
|
||||
topic_uri_func = (l) -> composited;
|
||||
} else { // some or all dynamic fields, composite into a single dynamic call
|
||||
topic_uri_func = (l) ->
|
||||
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
|
||||
+ "://" + cmdTpl.getOr("tenant", l, "public")
|
||||
+ "/" + cmdTpl.getOr("namespace", l, "default")
|
||||
+ "/" + cmdTpl.getOr("topic", l, "");
|
||||
}
|
||||
}
|
||||
else {
|
||||
topic_uri_func = null;
|
||||
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
|
||||
topic_uri_func = (l) -> composited;
|
||||
} else { // some or all dynamic fields, composite into a single dynamic call
|
||||
topic_uri_func = (l) ->
|
||||
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
|
||||
+ "://" + cmdTpl.getOr("tenant", l, "public")
|
||||
+ "/" + cmdTpl.getOr("namespace", l, "default")
|
||||
+ "/" + cmdTpl.getOr("topic", l, "");
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: At the moment, only supports static "client"
|
||||
PulsarSpace clientSpace;
|
||||
if (cmdTpl.containsKey("client")) {
|
||||
if (cmdTpl.isDynamic("client")) {
|
||||
throw new RuntimeException("\"client\" can't be made dynamic!");
|
||||
} else {
|
||||
String client_name = cmdTpl.getStatic("client");
|
||||
clientSpace = pcache.getPulsarSpace(client_name);
|
||||
}
|
||||
} else {
|
||||
clientSpace = pcache.getPulsarSpace("default");
|
||||
else {
|
||||
topic_uri_func = (l) -> null;
|
||||
}
|
||||
|
||||
assert (clientSpace != null);
|
||||
@ -91,7 +105,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
|
||||
|
||||
// TODO: At the moment, only implements "Producer" functionality; add implementation for others later!
|
||||
if ( clientType.equalsIgnoreCase(PulsarActivityUtil.CLIENT_TYPES.PRODUCER.toString()) ) {
|
||||
return resolveProducer(clientSpace, cmdTpl, topic_uri_func);/*
|
||||
return resolveProducer(clientSpace, cmdTpl, cycle_producer_name_func, topic_uri_func);/*
|
||||
} else if ( msgOperation.equalsIgnoreCase(PulsarActivityUtil.MSGOP_TYPES.CONSUMER.toString()) ) {
|
||||
return resolveConsumer(spaceFunc, cmdTpl, topic_uri_func);
|
||||
} else if ( msgOperation.equalsIgnoreCase(PulsarOpUtil.MSGOP_TYPES.READER.toString()) ) {
|
||||
@ -106,20 +120,11 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
|
||||
private LongFunction<PulsarOp> resolveProducer(
|
||||
PulsarSpace pulsarSpace,
|
||||
CommandTemplate cmdTpl,
|
||||
LongFunction<String> cycle_producer_name_func,
|
||||
LongFunction<String> topic_uri_func
|
||||
) {
|
||||
LongFunction<Producer<?>> producerFunc;
|
||||
|
||||
if (cmdTpl.isStatic("producer-name")) {
|
||||
producerFunc = (l) -> pulsarSpace.getProducer(cmdTpl.getStatic("producer-name"),
|
||||
(topic_uri_func == null) ? null : topic_uri_func.apply(l));
|
||||
} else if (cmdTpl.isDynamic("producer-name")) {
|
||||
producerFunc = (l) -> pulsarSpace.getProducer(cmdTpl.getDynamic("producer-name", l),
|
||||
(topic_uri_func == null) ? null : topic_uri_func.apply(l));
|
||||
} else {
|
||||
producerFunc = (l) -> pulsarSpace.getProducer(null,
|
||||
(topic_uri_func == null) ? null : topic_uri_func.apply(l));
|
||||
}
|
||||
LongFunction<Producer<?>> producerFunc =
|
||||
(l) -> pulsarSpace.getProducer(cycle_producer_name_func.apply(l), topic_uri_func.apply(l));
|
||||
|
||||
LongFunction<String> keyFunc;
|
||||
if (cmdTpl.isStatic("msg-key")) {
|
||||
@ -127,7 +132,7 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
|
||||
} else if (cmdTpl.isDynamic("msg-key")) {
|
||||
keyFunc = (l) -> cmdTpl.getDynamic("msg-key", l);
|
||||
} else {
|
||||
keyFunc = null;
|
||||
keyFunc = (l) -> null;
|
||||
}
|
||||
|
||||
LongFunction<String> valueFunc;
|
||||
@ -137,13 +142,13 @@ public class ReadyPulsarOp implements LongFunction<PulsarOp> {
|
||||
} else if (cmdTpl.isDynamic("msg-value")) {
|
||||
valueFunc = (l) -> cmdTpl.getDynamic("msg-value", l);
|
||||
} else {
|
||||
valueFunc = null;
|
||||
valueFunc = (l) -> null;
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("\"msg-value\" field must be specified!");
|
||||
}
|
||||
|
||||
return new PulsarProducerMapper(producerFunc, keyFunc, valueFunc, pulsarSpace, cmdTpl);
|
||||
return new PulsarProducerMapper(producerFunc, keyFunc, valueFunc, pulsarSchema, cmdTpl);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -22,16 +22,15 @@ schema.definition = file:///Users/yabinmeng/DataStax/nosqlbench/driver-pulsar/sr
|
||||
|
||||
### Pulsar client related configurations - client.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
|
||||
# default: pulsar://localhost:6550
|
||||
client.serviceUrl = pulsar://10.101.34.64:6650
|
||||
default: pulsar://localhost:6550
|
||||
# default: 10000
|
||||
client.connectionTimeoutMs = 5000
|
||||
|
||||
|
||||
### Producer related configurations (global) - producer.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
|
||||
producer.topicName = persistent://public/default/mynbtest
|
||||
producer.producerName =
|
||||
producer.topicName = persistent://public/default/mynbtest
|
||||
#producer.sendTimeoutMs =
|
||||
|
||||
### Consumer related configurations (global) - consumer.xxx
|
||||
|
@ -1,54 +1,195 @@
|
||||
# Pulsar driver
|
||||
# NoSQLBench (NB) Pulsar Driver Overview
|
||||
|
||||
This driver allows you to produce and consume Apache Pulsar messages with
|
||||
NoSQLBench.
|
||||
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
|
||||
* Producer
|
||||
* Consumer
|
||||
* (Future) Reader
|
||||
* (Future) WebSocket Producer
|
||||
* (Future) Managed Ledger
|
||||
|
||||
**NOTE**: At the moment, only Producer workload type is fully supported in NB. The support for Consumer type is partially added but not completed yet; and the support for other types of workloads will be added in NB in future releases.
|
||||
|
||||
## Issues Tracker
|
||||
|
||||
If you have issues or new requirements for this driver, please add them at
|
||||
the
|
||||
[pulsar issues tracker](
|
||||
https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
|
||||
If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
|
||||
|
||||
## Example Statements
|
||||
## Global Level Pulsar Configuration Settings
|
||||
|
||||
The simplest pulsar statement looks like this:
|
||||
The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) to publish and consume messages from the Pulsar cluster. In order to do so, a [PulsarClient](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient) object needs to be created first in order to establish the connection to the Pulsar cluster; then a workload-specific object (e.g. [Producer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer) or [Consumer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer)) is required in order to execute workload-specific actions (e.g. publishing or consuming messages).
|
||||
|
||||
```yaml
|
||||
statement: send='{"msg":"test message"}'
|
||||
When creating these objects (e.g. PulsarClient, Producer), there are different configuration options that can be used. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration options when creating a Pulsar Producer object.
|
||||
|
||||
The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below:
|
||||
|
||||
```properties
|
||||
### NB Pulsar driver related configuration - driver.xxx
|
||||
driver.client-type = producer
|
||||
|
||||
### Schema related configurations - schema.xxx
|
||||
schema.type = avro
|
||||
schema.definition = file:///<path/to/avro/schema/definition/file>
|
||||
|
||||
### Pulsar client related configurations - client.xxx
|
||||
client.serviceUrl = pulsar://<pulsar_broker_ip>:6650
|
||||
client.connectionTimeoutMs = 5000
|
||||
|
||||
### Producer related configurations (global) - producer.xxx
|
||||
producer.topicName = persistent://public/default/mynbtest
|
||||
producer.producerName =
|
||||
producer.sendTimeoutMs =
|
||||
```
|
||||
|
||||
In this example, the statement is sent by a producer with a default
|
||||
_topic_uri_ of `persistent://public/default/default` at at the pulsar
|
||||
endpoint `pulsar://localhost:6650`
|
||||
There are multiple sections in this file that correspond to different groups of configuration settings:
|
||||
* **NB pulsar driver related settings**:
|
||||
* All settings under this section starts with **driver.** prefix.
|
||||
* Right now there is only valid option under this section:
|
||||
* *driver.client-type* determines what type of Pulsar workload to be simulated by NB.
|
||||
* **Schema related settings**:
|
||||
* All settings under this section starts with **schema.** prefix.
|
||||
* The NB Pulsar driver supports schema-based message publishing and consuming. This section defines configuration settings that are schema related.
|
||||
* There are 2 valid options under this section.
|
||||
* *shcema.type*: Pulsar message schema type. When unset or set as an empty string, Pulsar messages will be handled in raw *byte[]* format. The other valid option is **avro** which the Pulsar message will follow a specific Avro format.
|
||||
* *schema.definition*: This only applies when an Avro schema type is specified and the value is the (full) file path that contains the Avro schema definition.
|
||||
* **Pulsar Client related settings**:
|
||||
* All settings under this section starts with **client.** prefix.
|
||||
* This section defines all configuration settings that are related with defining a PulsarClient object.
|
||||
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
|
||||
* **Pulsar Producer related settings**:
|
||||
* All settings under this section starts with **producer.** prefix.
|
||||
* This section defines all configuration settings that are related with defining a Pulsar Producer object.
|
||||
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
|
||||
|
||||
A complete example which uses all the available fields:
|
||||
In the future, when the support for other types of Pulsar workloads is added in NB Pulsar driver, there will be corresponding configuration sections in this file as well.
|
||||
|
||||
## Pulsar Driver Yaml File: Statement Blocks
|
||||
|
||||
Just like other NB driver types, the actual Pulsar workload generation is determined by the statement blocks in the NB driver Yaml file. Depending on the Pulsar workload type, the corresponding statement block may have different contents.
|
||||
|
||||
### Producer Statement block
|
||||
|
||||
A complete example of defining Pulsar **Producer** workload is as below:
|
||||
|
||||
```yaml
|
||||
statements:
|
||||
- name1:
|
||||
send: "Payload test with number {numbername}"
|
||||
persistent: true
|
||||
tenant: {test_tenant}
|
||||
namespace: {test_namespace}
|
||||
topic: {test_topic}
|
||||
client: {clientid}
|
||||
producer: {producerid}
|
||||
blocks:
|
||||
- name: producer-block
|
||||
tags:
|
||||
type: producer
|
||||
statements:
|
||||
- producer-stuff:
|
||||
# producer-name:
|
||||
# topic_uri: "persistent://public/default/{topic}"
|
||||
topic_uri: "persistent://public/default/nbpulsar"
|
||||
msg-key: "{mykey}"
|
||||
msg-value: |
|
||||
{
|
||||
"SensorID": "{sensor_id}",
|
||||
"SensorType": "Temperature",
|
||||
"ReadingTime": "{reading_time}",
|
||||
"ReadingValue": {reading_value}
|
||||
}
|
||||
```
|
||||
|
||||
In this example, we define a producer send operation, identified as
|
||||
`name1` in metrics. The full topic_uri is constructed piece-wise by the
|
||||
provided values of `persistent`, `tenant`, `namespace`, and
|
||||
`topic`. The client instance used by NoSQLBench is controlled by the name
|
||||
of the `client` field. The instance of the producer used to send messages
|
||||
with that client is controlled by the `producer` property.
|
||||
In the above statement block, there are 4 key statement parameters to provide values:
|
||||
* **producer-name**: cycle-level Pulsar producer name (can be dynamically bound)
|
||||
* **Optional**
|
||||
* If not set, global level producer name in *config.properties* file will be used.
|
||||
* Use a default producer name, "default", if it is neither set at global level.
|
||||
* If set, cycle level producer name will take precedence over the global level setting
|
||||
|
||||
Details on object instancing are provided below under __Instancing
|
||||
Controls__
|
||||
* **topic_uri**: cycle-level Pulsar topic name (can be dynamically bound)
|
||||
* **Optional**
|
||||
* If not set, global level topic_uri in *config.properties* file will be used
|
||||
* Throw a Runtime Error if it is neither set at global level
|
||||
* If set, cycle level topic_uri will take precedence over the global level setting; and the provided value must follow several guidelines:
|
||||
* It must be in valid Pulsar topic format as below:
|
||||
```
|
||||
[persistent|non-persistent]://<tenant-name>/<namespace-name>/<short-topic-name>
|
||||
```
|
||||
* At the moment, only **<short-topic-name>** part can be dynamically bound (e.g. through NB binding function). All other parts (e.g. <tenant-name> and <namespace-name>) must be static values and the corresponding tenants and namespaces must be created in the Pulsar cluster in advance.
|
||||
|
||||
## Driver Features
|
||||
**TODO**: allow dynamic binding for <tenant-name> and <namespace-name> after first adding a phase for creating <tenant-name> and/or <namespace-name>, similar to C* CQL schema creation phase.!
|
||||
|
||||
### API Caching
|
||||
* **msg-key**: Pulsar message key
|
||||
* **Optional**
|
||||
* If not set, the generated Pulsar messages (to be published by the Producer) doesn't have **keys**.
|
||||
|
||||
* **msg-value**: Pulsar message payload
|
||||
* **Mandatory**
|
||||
* If not set, throw a Runtime Error.
|
||||
|
||||
### Consumer Statement block
|
||||
|
||||
**TBD ...**
|
||||
|
||||
## Schema Support
|
||||
|
||||
Pulsar has built-in schema support. Other than primitive types, Pulsar also supports complex types like **Avro**, and etc. At the moment, the NB Pulsar driver provides 2 schema support modes, via the global level schema related settings as below:
|
||||
* Avro schema:
|
||||
```properties
|
||||
shcema.type: avro
|
||||
schema.definition: file:///<file/path/to/the/definition/file>
|
||||
```
|
||||
* Default byte[] schema:
|
||||
```properties
|
||||
shcema.type:
|
||||
schema.definition:
|
||||
```
|
||||
|
||||
For the previous Producer block statement example, the **msg-value** parameter has the value of a JSON string that follows the following Avro schema definition (e.g. from a file **iot-example.asvc**)
|
||||
```json
|
||||
{
|
||||
"type": "record",
|
||||
"name": "IotSensor",
|
||||
"namespace": "TestNS",
|
||||
"fields" : [
|
||||
{"name": "SensorID", "type": "string"},
|
||||
{"name": "SensorType", "type": "string"},
|
||||
{"name": "ReadingTime", "type": "string"},
|
||||
{"name": "ReadingValue", "type": "float"}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
# Advanced Driver Features
|
||||
|
||||
## Activity Parameters
|
||||
|
||||
At the moment, the following Activity Parameter is supported:
|
||||
|
||||
- * config=<file/path/to/global/configuration/properties/file>
|
||||
|
||||
---
|
||||
|
||||
## TODO: Design Revisit
|
||||
|
||||
**NOTE**: The following text is based on the original multi-layer API caching design which is not fully implemented at the moment. We need to revisit the original design at some point in order to achieve maximum testing flexibility.
|
||||
|
||||
To summarize, the original caching design has the following key requirements:
|
||||
* **Requirement 1**: Each NB Pulsar activity is able to launch and cache multiple **client spaces**
|
||||
* **Requirement 2**:Each client space can launch and cache multiple Pulsar operators of the same type (producer, consumer, etc.)
|
||||
|
||||
In the current implementation, only requirement 2 is implemented. Regarding requirement 1, the current implementation only supports one client space per NB Pulsar activity!
|
||||
|
||||
---
|
||||
|
||||
- **url** - The pulsar url to connect to.
|
||||
- **default** - `url=pulsar://localhost:6650`
|
||||
- **maxcached** - A default value to be applied to `max_clients`,
|
||||
`max_producers`, `max_consumers`.
|
||||
- default: `max_cached=100`
|
||||
- **max_clients** - Clients cache size. This is the number of client
|
||||
instances which are allowed to be cached in the NoSQLBench client
|
||||
runtime. The clients cache automatically maintains a cache of unique
|
||||
client instances internally. default: _maxcached_
|
||||
|
||||
- **max_producers** - Producers cache size (per client instance). Limits
|
||||
the number of producer instances which are allowed to be cached per
|
||||
client instance. default: _maxcached_
|
||||
- **max_consumers** - Consumers cache size (per client instance). Limits
|
||||
the number of consumer instances which are allowed to be cached per
|
||||
client instance.
|
||||
|
||||
## API Caching
|
||||
|
||||
This driver is tailored around the multi-tenancy and topic naming scheme
|
||||
that is part of Apache Pulsar. Specifically, you can create an arbitrary
|
||||
@ -60,10 +201,6 @@ Further, the topic URI is composed from the provided qualifiers of
|
||||
fully-composed value in the `persistence://tenant/namespace/topic`
|
||||
form.
|
||||
|
||||
### Schema Support
|
||||
|
||||
Schema support will be added after the initial version is working.
|
||||
|
||||
### Instancing Controls
|
||||
|
||||
Normative usage of the Apache Pulsar API follows a strictly enforced
|
||||
@ -104,101 +241,3 @@ specific order:
|
||||
The most important detail for understanding the instancing controls is
|
||||
that clients, producers, and consumers are all named and cached in the
|
||||
specific order above.
|
||||
|
||||
## Op Fields
|
||||
|
||||
Thees fields are used to define of a single pulsar client operation. These
|
||||
fields, taken together, are called the _op template_, and each one is
|
||||
called an _op template field_, or simply _template field_. You may specify
|
||||
them as `literal values` or as `{binding_anchors}` to be qualified at
|
||||
runtime for each and every cycle. When necessary, the appropriate API
|
||||
scaffolding is created automatically and cached by the NoSQLBench driver
|
||||
for Apache Pulsar such as clients, producers, and consumers.
|
||||
|
||||
- **send** - If this op field is provided, then its value is used as the
|
||||
payload for a send operation. The value may be static or dynamic as in
|
||||
a `{binding_anchor}`.
|
||||
- default: _undefined_
|
||||
- **producer** - If provided, the string value of this field determines
|
||||
the name of the producer to use in the operation. The named producer
|
||||
will be created (if needed) and cached under the designated client
|
||||
instance. Because producers are bound to a topic at initialization, the
|
||||
default behavior is to create a separate producer per topic_uri per
|
||||
client. The producer field is only consulted if the _send_ field is
|
||||
defined.
|
||||
- default: `{topic_uri}`
|
||||
|
||||
- **recv** - If this op field is provided, then its value is used to
|
||||
control how a message is received. Special handling of received data is
|
||||
possible but will be added in a future version. For now, the default
|
||||
behavior of a recv operation is simply to receive a single message.
|
||||
- default: _undefined_
|
||||
- **consumer** - If provided, the string value of this field determines
|
||||
the name of the consumer to use in the operation. The named consumer
|
||||
will be created (if needed) and cached under the designated client
|
||||
instance. Because consumers are bound to a topic at instantiation, the
|
||||
default behavior is to create a separate consumer per topic_uri per
|
||||
client. The _consumer_ field is only consulted if the _recv_ field is
|
||||
defined.
|
||||
- default: `{topic_uri}`
|
||||
|
||||
- **topic_uri** - The fully defined topic URI for a producer or consumer (
|
||||
for a send or recv operation). The format is
|
||||
`{persistent|non-persistent}://tenant/namespace/topic` as explained
|
||||
at [Topics](https://pulsar.apache.org/docs/en/concepts-messaging/#topics)
|
||||
. You may provide the full topic_uri in any valid op template form -- A
|
||||
full or partial binding, or a static string. However, it is an error to
|
||||
provide this field when any of `topic`, `tenant`, `namespace`, or
|
||||
`persistence` are provided, as these fields are used to build the
|
||||
_topic_uri_ piece-wise. On the contrary, it is not an error to leave all
|
||||
of these fields undefined, as there are defaults that will fill in the
|
||||
missing pieces.
|
||||
- **persistence** - Whether or not the topic should be persistent. This
|
||||
value can any one of (boolean) true or false, "true", "false",
|
||||
"persistent" or "non-persistent".
|
||||
- default: `persistent`
|
||||
- **tenant** - Defines the name of the tenant to use for the operation.
|
||||
- default: `public`
|
||||
- **namespace** - Defines the namespace to use for the operation.
|
||||
- default: `default`
|
||||
- **topic** - Defines the topic to be used for the operation, and thus the
|
||||
topic to be used for the producer or consumer of the operation.
|
||||
-default: `default`
|
||||
- **client** - If this op field is provided, then the value is used to
|
||||
name a client instance. If this client instance is not already cached,
|
||||
it will be created and used for this operation.
|
||||
- default: `default`
|
||||
|
||||
## Activity Parameters
|
||||
|
||||
- **url** - The pulsar url to connect to.
|
||||
- **default** - `url=pulsar://localhost:6650`
|
||||
- **maxcached** - A default value to be applied to `max_clients`,
|
||||
`max_producers`, `max_consumers`.
|
||||
- default: `max_cached=100`
|
||||
- **max_clients** - Clients cache size. This is the number of client
|
||||
instances which are allowed to be cached in the NoSQLBench client
|
||||
runtime. The clients cache automatically maintains a cache of unique
|
||||
client instances internally. default: _maxcached_
|
||||
- **max_producers** - Producers cache size (per client instance). Limits
|
||||
the number of producer instances which are allowed to be cached per
|
||||
client instance. default: _maxcached_
|
||||
- **max_consumers** - Consumers cache size (per client instance). Limits
|
||||
the number of consumer instances which are allowed to be cached per
|
||||
client instance.
|
||||
|
||||
## Metrics
|
||||
|
||||
- clients
|
||||
- avg_producers
|
||||
- avg_consumers
|
||||
- standard metrics ... TBD
|
||||
|
||||
.. this need to be configurable
|
||||
|
||||
- sent-{tenant}-{namespace}-{topic}
|
||||
- recv-{tenant}-{namespace}-{topic}
|
||||
- sent-{tenant}-{topic}
|
||||
- recv-{tenant}-{topic}
|
||||
- sent-{namespace}-{topic}
|
||||
-
|
||||
|
Loading…
Reference in New Issue
Block a user