1) Address NB issue #1283 (overlapping label name \"name\" in both parent and child labels when adding Pulsar adapter metrics).

2) Add named scenarios for NB Pulsar and NB Kafka adapters.
This commit is contained in:
yabinmeng 2023-06-28 16:32:55 -05:00
parent a264291c04
commit f7d7fd56c7
28 changed files with 125 additions and 46 deletions

View File

@ -1,22 +1,33 @@
# Overview
---
weight: 0
title: S4J
---
- [1. Overview](#1-overview)
- [1.1. Example NB Yaml](#11-example-nb-yaml)
- [2. Usage](#2-usage)
- [2.1. NB Kafka adapter specific CLI parameters](#21-nb-kafka-adapter-specific-cli-parameters)
This NB Kafka adapter allows publishing messages to or consuming messages from
---
# 1. Overview
The NB Kafka adapter allows publishing messages to or consuming messages from
* a Kafka cluster, or
* a Pulsar cluster with [S4K](https://github.com/datastax/starlight-for-kafka) or [KoP](https://github.com/streamnative/kop) Kafka Protocol handler for Pulsar.
At high level, this adapter supports the following Kafka functionalities
* Publishing messages to one Kafka topic with sync. or async. message-send acknowledgements (from brokers)
* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowlegements (to brokers) (aka, message commits)
* Subscribing messages from one or multiple Kafka topics with sync. or async. message-recv acknowledgements (to brokers) (aka, message commits)
* auto message commit
* manual message commit with a configurable number of message commits in one batch
* Kafka Transaction support
## Example NB Yaml
* [kafka_producer.yaml](./kafka_producer.yaml)
## 1.1. Example NB Yaml
* [kafka_producer.yaml](scenarios/kafka_producer.yaml)
*
* [kafka_consumer.yaml](./kafka_consumer.yaml)
* [kafka_consumer.yaml](scenarios/kafka_consumer.yaml)
# Usage
# 2. Usage
```bash
## Kafka Producer
@ -26,7 +37,7 @@ $ <nb_cmd> run driver=kafka -vv cycles=100 threads=2 num_clnt=2 yaml=kafka_produ
$ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 yaml=kafka_producer.yaml config=kafka_config.properties bootstrap_server=PLAINTEXT://localhost:9092
```
## NB Kafka adapter specific CLI parameters
## 2.1. NB Kafka adapter specific CLI parameters
* `num_clnt`: the number of Kafka clients to publish messages to or to receive messages from
* For producer workload, this is the number of the producer threads to publish messages to the same topic
@ -40,8 +51,6 @@ $ <nb_cmd> run driver=kafka -vv cycles=100 threads=4 num_clnt=2 num_cons_grp=2 y
* `num_cons_grp`: the number of consumer groups
* Only relevant for consumer workload
For the Kafka NB adapter, Document level parameters can only be statically bound; and currently, the following Document level configuration parameters are supported:
* `async_api` (boolean):

View File

@ -0,0 +1,34 @@
scenarios:
msg_pub: run driver=kafka cycles=1000 threads=2 num_clnt=2 config=../conf/kafka_config.properties bootstrap_server=localhost:9092
msg_sub: run driver=kafka cycles=1000 threads=4 num_clnt=2 num_cons_grp=2 config=../conf/s4j_config.properties bootstrap_server=localhost:9092
bindings:
mykey: Mod(5); ToString(); Prefix("key-")
mytext_val: AlphaNumericString(30)
random_text_val1: AlphaNumericString(10)
random_text_val2: AlphaNumericString(20)
# document level parameters that apply to all Pulsar client types:
params:
async_api: "true"
blocks:
msg_pub:
ops:
op1:
MessageProduce: "persistent://nbtest/default/s4ktest"
txn_batch_num: 1
msg_header: |
{
"header-1": "{random_text_val1}",
"header-2": "{random_text_val2}"
}
msg_key: "{mykey}"
msg_body: "{mytext_val}"
msg_sub:
ops:
op1:
MessageConsume: "persistent://nbtest/default/s4ktest"
msg_poll_interval: "10"
manual_commit_batch_num: "0"

View File

@ -31,6 +31,6 @@ java -jar nb5/target/nb5.jar \
threads=1 \
num_clnt=1 \
num_cons_grp=1 \
yaml="${SCRIPT_DIR}/kafka_consumer.yaml" \
config="${SCRIPT_DIR}/kafka_config.properties" \
yaml="${SCRIPT_DIR}/scenarios/kafka_consumer.yaml" \
config="${SCRIPT_DIR}/conf/kafka_config.properties" \
bootstrap_server=PLAINTEXT://localhost:9092

View File

@ -31,8 +31,8 @@ while [[ 1 -eq 1 ]]; do
cycles="${CYCLES}" \
threads=1 \
num_clnt=1 \
yaml="${SCRIPT_DIR}/kafka_producer.yaml" \
config="${SCRIPT_DIR}/kafka_config.properties" \
yaml="${SCRIPT_DIR}/scenarios/kafka_producer.yaml" \
config="${SCRIPT_DIR}/conf/kafka_config.properties" \
bootstrap_server=PLAINTEXT://localhost:9092
sleep 10
done

View File

@ -85,7 +85,7 @@ public class PulsarSpace implements AutoCloseable {
.setDescription("Pulsar broker service URL."))
.add(Param.defaultTo("web_url", "http://localhost:8080")
.setDescription("Pulsar web service URL."))
.add(Param.defaultTo("config", "config.properties")
.add(Param.defaultTo("config", "conf/pulsar_config.properties")
.setDescription("Pulsar client connection configuration property file."))
.add(Param.defaultTo("cyclerate_per_thread", false)
.setDescription("Apply cycle rate per NB thread"))
@ -130,7 +130,7 @@ public class PulsarSpace implements AutoCloseable {
try {
Map clientConfMap = pulsarClientConf.getClientConfMapRaw();
// Override "client.serviceUrl" setting in config.properties
// Override "client.serviceUrl" setting in pulsar_config.properties
clientConfMap.remove("serviceUrl");
clientBuilder.loadConf(clientConfMap).serviceUrl(pulsarSvcUrl);

View File

@ -86,16 +86,6 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, Pu
this.totalCycleNum = NumberUtils.toLong(this.parsedOp.getStaticValue("cycles"));
}
public String getName() {
return "PulsarBaseOpDispenser";
}
@Override
public NBLabels getLabels() {
return NBLabels.forKV("name", this.getName());
}
public PulsarSpace getPulsarSpace() { return this.pulsarSpace; }
protected LongFunction<Boolean> lookupStaticBoolConfigValueFunc(final String paramName, final boolean defaultValue) {
@ -210,7 +200,7 @@ public abstract class PulsarBaseOpDispenser extends BaseOpDispenser<PulsarOp, Pu
}
// A configuration parameter can be set either at the global level (config.properties file),
// A configuration parameter can be set either at the global level (pulsar_config.properties file),
// or at the cycle level (<nb_scenario>.yaml file).
// If set at both levels, cycle level setting takes precedence
private String getEffectiveConValue(final String confCategory, final String confParamName, final String cycleConfValue) {

View File

@ -285,14 +285,16 @@ public enum PulsarAdapterUtil {
this.label = label;
}
private static final Set<String> LABELS = Stream.of(SUBSCRIPTION_TYPE.values()).map(v -> v.label)
private static final Set<String> LABELS = Stream.of(SUBSCRIPTION_TYPE.values())
.map(v -> v.label)
.collect(Collectors.toUnmodifiableSet());
public static boolean isValidLabel(final String label) {
return SUBSCRIPTION_TYPE.LABELS.contains(label);
}
private static final String TYPE_LIST = Stream.of(COMPRESSION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
private static final String TYPE_LIST =
Stream.of(SUBSCRIPTION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
public static boolean isValidSubscriptionType(final String item) {
return SUBSCRIPTION_TYPE.isValidLabel(item);

View File

@ -9,9 +9,9 @@
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
# 2) Avro for messages with schema
#schema.key.type=avro
#schema.key.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-key-example.avsc
#schema.key.definition=file:///path/to/iot-key-example.avsc
#schema.type=avro
#schema.definition=file:///Users/yabinmeng/DataStax/MyNBMain/nosqlbench/adapter-pulsar/src/main/resources/iot-value-example.avsc
#schema.definition=file:///path/to/iot-value-example.avsc
schema.type=
schema.definition=
@ -21,7 +21,7 @@ schema.definition=
client.connectionTimeoutMs=5000
client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
# Cluster admin
client.authParams=
client.authParams=token:eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE2ODc5ODM5ODMsImlzcyI6ImRhdGFzdGF4Iiwic3ViIjoiY2xpZW50O2I0NzI2MWQ2LTQ2NzAtNGMwZi04NzhhLWEzMDUzZTMxNmM1NztibUowWlhOMDtkMzg2NmE1Y2RiIiwidG9rZW5pZCI6ImQzODY2YTVjZGIifQ.G3c_vqGEvhesqIvdir9nHMwIf7my25wzjoL4ILG_BOqYwH76mJyCEHMLL_0-0zvkIff7uZkIsIheYoXvOzcUbS6tiVpN8I2nYakGeTi3hMyExkg6vNjryj1N1zLWZ3mY_om0afmW4ibslyENJm9zR6wEm2cjE8_P6iPl62WahGNV6r-ZMm9gg6i5DGAOouXb2rclg9JptXk4y7CnzXiyGzCfL2P4bLnI9SCQwBuc45Pl95rQJv_367VyV3J5iUiZXxtSVhq1Dih31PfkPpMUUgrE59o7DV4pulfqf9pveXIi5Bvbewl_GFIvLjcQsrgU4crIjnybtPLetB54K7pk-A
### Producer related configurations (global) - producer.xxx

View File

@ -1,8 +1,25 @@
# pulsar
---
weight: 0
title: S4R
---
- [1. Overview](#1-overview)
- [1.1. Issues Tracker](#11-issues-tracker)
- [2. Execute the NB Pulsar Driver Workload](#2-execute-the-nb-pulsar-driver-workload)
- [2.1. NB Pulsar Driver Yaml File High Level Structure](#21-nb-pulsar-driver-yaml-file-high-level-structure)
- [2.2. NB Pulsar Driver Configuration Parameters](#22-nb-pulsar-driver-configuration-parameters)
- [2.2.1. Global Level Parameters](#221-global-level-parameters)
- [2.2.2. Document Level Parameters](#222-document-level-parameters)
- [3. NB Pulsar Driver OpTemplates](#3-nb-pulsar-driver-optemplates)
- [4. Message Generation and Schema Support](#4-message-generation-and-schema-support)
- [4.1. Message Generation](#41-message-generation)
- [4.2. Schema Support](#42-schema-support)
---
# 1. Overview
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
The NB Pulsar driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
* Admin API - create/delete tenants
* Admin API - create/delete namespaces
* Admin API - create/delete topics

View File

@ -0,0 +1,31 @@
scenarios:
msg_pub: run driver=pulsar cycles=200 threads=2 config=../conf/pulsar_config.properties service_url=pulsar://localhost:6650 web_url=http://localhost:8080
msg_sub: run driver=pulsar cycles=200 threads=2 config=../conf/pulsar_config.properties service_url=pulsar://localhost:6650 web_url=http://localhost:8080
bindings:
mykey: NumberNameToString()
mypropval: AlphaNumericString(5)
myvalue: AlphaNumericString(20)
params:
async_api: "true"
blocks:
msg_pub:
ops:
op1:
MessageProduce: "persistent://nbtest/default/ptest_str"
msg_key: "{mykey}"
msg_prop: |
{
"prop1": "{mypropval}"
}
msg_value: "{myvalue}"
msg_sub:
ops:
op1:
MessageConsume: "persistent://nbtest/default/ptest_str"
subscriptionName: "mynbsub_str"
# Case sensitive -- valid values: Exclusive, Shared, Failover, Key_Shared
subscriptionType: "Shared"

View File

@ -28,5 +28,5 @@ java -jar nb5/target/nb5.jar \
--report-interval 5 \
--docker-metrics \
cycles=${CYCLES} \
yaml="${SCRIPT_DIR}/yaml_examples/consumer_4KB_workload.yaml" \
config="${SCRIPT_DIR}/config.properties"
yaml="${SCRIPT_DIR}/scenarios/consumer_4KB_workload.yaml" \
config="${SCRIPT_DIR}/conf/pulsar_config.properties"

View File

@ -32,7 +32,7 @@ while [[ 1 -eq 1 ]]; do
cycles="${CYCLES}" \
cyclerate="${CYCLERATE}" \
threads=1 \
yaml="${SCRIPT_DIR}/yaml_examples/producer_4KB_workload.yaml" \
config="${SCRIPT_DIR}/config.properties"
yaml="${SCRIPT_DIR}/scenarios/producer_4KB_workload.yaml" \
config="${SCRIPT_DIR}/conf/pulsar_config.properties"
sleep 10
done

View File

@ -1,8 +1,6 @@
scenarios:
msg_send:
op: run driver=s4j cycles=100 threads=4 num_conn=2 num_session=2 session_mode=client_ack config=../conf/s4j_config.properties service_url=pulsar://localhost:6650 web_url=http://localhost:8080
msg_recv:
op: run driver=s4j cycles=100 threads=4 num_conn=2 num_session=2 session_mode=client_ack config=../conf/s4j_config.properties service_url=pulsar://localhost:6650 web_url=http://localhost:8080
msg_send: run driver=s4j cycles=100 threads=4 num_conn=2 num_session=2 session_mode=client_ack config=../conf/s4j_config.properties service_url=pulsar://localhost:6650 web_url=http://localhost:8080
msg_recv: run driver=s4j cycles=100 threads=4 num_conn=2 num_session=2 session_mode=client_ack config=../conf/s4j_config.properties service_url=pulsar://localhost:6650 web_url=http://localhost:8080
bindings:
mykey: Mod(5); ToString(); Prefix("key-")

View File

@ -1,8 +1,6 @@
scenarios:
msg_send:
op: run driver=s4r cycles=200 threads=16 num_conn=2 num_channel=2 num_exchange=2 num_msg_clnt=2 config=../conf/s4r_config.properties
msg_recv:
op: run driver=s4r cycles=200 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 config=../conf/s4r_config.properties
msg_send: run driver=s4r cycles=200 threads=16 num_conn=2 num_channel=2 num_exchange=2 num_msg_clnt=2 config=../conf/s4r_config.properties
msg_recv: run driver=s4r cycles=200 threads=16 num_conn=1 num_channel=2 num_exchange=2 num_queue=2 num_msg_clnt=2 config=../conf/s4r_config.properties
bindings:
mytext_val: AlphaNumericString(100)