mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge pull request #377 from yabinmeng/main
README and minor code cleanup
This commit is contained in:
commit
9bb13604ce
@ -28,8 +28,6 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
|
|||||||
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
|
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
|
||||||
|
|
||||||
private final LongFunction<Consumer<?>> consumerFunc;
|
private final LongFunction<Consumer<?>> consumerFunc;
|
||||||
private final LongFunction<Boolean> topicMsgDedupFunc;
|
|
||||||
private final LongFunction<String> subscriptionTypeFunc;
|
|
||||||
private final boolean e2eMsProc;
|
private final boolean e2eMsProc;
|
||||||
|
|
||||||
public PulsarConsumerMapper(CommandTemplate cmdTpl,
|
public PulsarConsumerMapper(CommandTemplate cmdTpl,
|
||||||
@ -39,14 +37,10 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
|
|||||||
LongFunction<Boolean> useTransactionFunc,
|
LongFunction<Boolean> useTransactionFunc,
|
||||||
LongFunction<Boolean> seqTrackingFunc,
|
LongFunction<Boolean> seqTrackingFunc,
|
||||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
|
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
|
||||||
LongFunction<Boolean> topicMsgDedupFunc,
|
|
||||||
LongFunction<Consumer<?>> consumerFunc,
|
LongFunction<Consumer<?>> consumerFunc,
|
||||||
LongFunction<String> subscriptionTypeFunc,
|
|
||||||
boolean e2eMsgProc) {
|
boolean e2eMsgProc) {
|
||||||
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
|
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
|
||||||
this.consumerFunc = consumerFunc;
|
this.consumerFunc = consumerFunc;
|
||||||
this.topicMsgDedupFunc = topicMsgDedupFunc;
|
|
||||||
this.subscriptionTypeFunc = subscriptionTypeFunc;
|
|
||||||
this.e2eMsProc = e2eMsgProc;
|
this.e2eMsProc = e2eMsgProc;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -57,22 +51,16 @@ public class PulsarConsumerMapper extends PulsarTransactOpMapper {
|
|||||||
boolean asyncApi = asyncApiFunc.apply(value);
|
boolean asyncApi = asyncApiFunc.apply(value);
|
||||||
boolean useTransaction = useTransactionFunc.apply(value);
|
boolean useTransaction = useTransactionFunc.apply(value);
|
||||||
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
|
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
|
||||||
boolean topicMsgDedup = topicMsgDedupFunc.apply(value);
|
|
||||||
String subscriptionType = subscriptionTypeFunc.apply(value);
|
|
||||||
|
|
||||||
return new PulsarConsumerOp(
|
return new PulsarConsumerOp(
|
||||||
this,
|
|
||||||
pulsarActivity,
|
pulsarActivity,
|
||||||
asyncApi,
|
asyncApi,
|
||||||
useTransaction,
|
useTransaction,
|
||||||
seqTracking,
|
seqTracking,
|
||||||
transactionSupplier,
|
transactionSupplier,
|
||||||
topicMsgDedup,
|
|
||||||
consumer,
|
consumer,
|
||||||
subscriptionType,
|
|
||||||
clientSpace.getPulsarSchema(),
|
clientSpace.getPulsarSchema(),
|
||||||
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
|
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
|
||||||
value,
|
|
||||||
e2eMsProc,
|
e2eMsProc,
|
||||||
this::getReceivedMessageSequenceTracker);
|
this::getReceivedMessageSequenceTracker);
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,6 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
|
|
||||||
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
|
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
|
||||||
|
|
||||||
private final PulsarConsumerMapper consumerMapper;
|
|
||||||
private final PulsarActivity pulsarActivity;
|
private final PulsarActivity pulsarActivity;
|
||||||
|
|
||||||
private final boolean asyncPulsarOp;
|
private final boolean asyncPulsarOp;
|
||||||
@ -31,13 +30,10 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
private final boolean seqTracking;
|
private final boolean seqTracking;
|
||||||
private final Supplier<Transaction> transactionSupplier;
|
private final Supplier<Transaction> transactionSupplier;
|
||||||
|
|
||||||
private final boolean topicMsgDedup;
|
|
||||||
private final Consumer<?> consumer;
|
private final Consumer<?> consumer;
|
||||||
private final String subscriptionType;
|
|
||||||
private final Schema<?> pulsarSchema;
|
private final Schema<?> pulsarSchema;
|
||||||
private final int timeoutSeconds;
|
private final int timeoutSeconds;
|
||||||
private final boolean e2eMsgProc;
|
private final boolean e2eMsgProc;
|
||||||
private final long curCycleNum;
|
|
||||||
|
|
||||||
private final Counter bytesCounter;
|
private final Counter bytesCounter;
|
||||||
private final Histogram messageSizeHistogram;
|
private final Histogram messageSizeHistogram;
|
||||||
@ -48,22 +44,17 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
|
private final Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic;
|
||||||
|
|
||||||
public PulsarConsumerOp(
|
public PulsarConsumerOp(
|
||||||
PulsarConsumerMapper consumerMapper,
|
|
||||||
PulsarActivity pulsarActivity,
|
PulsarActivity pulsarActivity,
|
||||||
boolean asyncPulsarOp,
|
boolean asyncPulsarOp,
|
||||||
boolean useTransaction,
|
boolean useTransaction,
|
||||||
boolean seqTracking,
|
boolean seqTracking,
|
||||||
Supplier<Transaction> transactionSupplier,
|
Supplier<Transaction> transactionSupplier,
|
||||||
boolean topicMsgDedup,
|
|
||||||
Consumer<?> consumer,
|
Consumer<?> consumer,
|
||||||
String subscriptionType,
|
|
||||||
Schema<?> schema,
|
Schema<?> schema,
|
||||||
int timeoutSeconds,
|
int timeoutSeconds,
|
||||||
long curCycleNum,
|
|
||||||
boolean e2eMsgProc,
|
boolean e2eMsgProc,
|
||||||
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic)
|
Function<String, ReceivedMessageSequenceTracker> receivedMessageSequenceTrackerForTopic)
|
||||||
{
|
{
|
||||||
this.consumerMapper = consumerMapper;
|
|
||||||
this.pulsarActivity = pulsarActivity;
|
this.pulsarActivity = pulsarActivity;
|
||||||
|
|
||||||
this.asyncPulsarOp = asyncPulsarOp;
|
this.asyncPulsarOp = asyncPulsarOp;
|
||||||
@ -71,12 +62,9 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
this.seqTracking = seqTracking;
|
this.seqTracking = seqTracking;
|
||||||
this.transactionSupplier = transactionSupplier;
|
this.transactionSupplier = transactionSupplier;
|
||||||
|
|
||||||
this.topicMsgDedup = topicMsgDedup;
|
|
||||||
this.consumer = consumer;
|
this.consumer = consumer;
|
||||||
this.subscriptionType = subscriptionType;
|
|
||||||
this.pulsarSchema = schema;
|
this.pulsarSchema = schema;
|
||||||
this.timeoutSeconds = timeoutSeconds;
|
this.timeoutSeconds = timeoutSeconds;
|
||||||
this.curCycleNum = curCycleNum;
|
|
||||||
this.e2eMsgProc = e2eMsgProc;
|
this.e2eMsgProc = e2eMsgProc;
|
||||||
|
|
||||||
this.bytesCounter = pulsarActivity.getBytesCounter();
|
this.bytesCounter = pulsarActivity.getBytesCounter();
|
||||||
|
@ -129,17 +129,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
}
|
}
|
||||||
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
|
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
|
||||||
|
|
||||||
// Doc-level parameter: msg_dedup_broker
|
|
||||||
LongFunction<Boolean> brokerMsgDedupFunc = (l) -> false;
|
|
||||||
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label)) {
|
|
||||||
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label))
|
|
||||||
brokerMsgDedupFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label));
|
|
||||||
else
|
|
||||||
throw new PulsarDriverParamException("[resolve()] \"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.MSG_DEDUP_BROKER.label + "\" parameter cannot be dynamic!");
|
|
||||||
}
|
|
||||||
logger.info("msg_dedup_broker: {}", seqTrackingFunc.apply(0));
|
|
||||||
|
|
||||||
|
|
||||||
// TODO: Complete implementation for websocket-producer and managed-ledger
|
// TODO: Complete implementation for websocket-producer and managed-ledger
|
||||||
// Admin operation: create/delete tenant
|
// Admin operation: create/delete tenant
|
||||||
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) {
|
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) {
|
||||||
@ -165,7 +154,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
asyncApiFunc,
|
asyncApiFunc,
|
||||||
useTransactionFunc,
|
useTransactionFunc,
|
||||||
seqTrackingFunc,
|
seqTrackingFunc,
|
||||||
brokerMsgDedupFunc,
|
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
|
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
|
||||||
@ -175,8 +163,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
topicUriFunc,
|
topicUriFunc,
|
||||||
asyncApiFunc,
|
asyncApiFunc,
|
||||||
useTransactionFunc,
|
useTransactionFunc,
|
||||||
seqTrackingFunc,
|
seqTrackingFunc);
|
||||||
brokerMsgDedupFunc);
|
|
||||||
}
|
}
|
||||||
// Regular/non-admin operation: single message consuming a single topic (reader)
|
// Regular/non-admin operation: single message consuming a single topic (reader)
|
||||||
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
|
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
|
||||||
@ -206,7 +193,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
asyncApiFunc,
|
asyncApiFunc,
|
||||||
useTransactionFunc,
|
useTransactionFunc,
|
||||||
seqTrackingFunc,
|
seqTrackingFunc,
|
||||||
brokerMsgDedupFunc,
|
|
||||||
true);
|
true);
|
||||||
}
|
}
|
||||||
// Invalid operation type
|
// Invalid operation type
|
||||||
@ -425,7 +411,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
LongFunction<Boolean> async_api_func,
|
LongFunction<Boolean> async_api_func,
|
||||||
LongFunction<Boolean> useTransactionFunc,
|
LongFunction<Boolean> useTransactionFunc,
|
||||||
LongFunction<Boolean> seqTrackingFunc,
|
LongFunction<Boolean> seqTrackingFunc,
|
||||||
LongFunction<Boolean> brokerMsgDupFunc,
|
|
||||||
boolean e2eMsgProc
|
boolean e2eMsgProc
|
||||||
) {
|
) {
|
||||||
LongFunction<String> subscription_name_func;
|
LongFunction<String> subscription_name_func;
|
||||||
@ -458,12 +443,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
|
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
|
||||||
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
|
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
|
||||||
|
|
||||||
// TODO: Ignore namespace and topic level dedup check on the fly
|
|
||||||
// this will impact the consumer performance significantly
|
|
||||||
// Consider using caching or Memoizer in the future?
|
|
||||||
// (https://www.baeldung.com/guava-memoizer)
|
|
||||||
LongFunction<Boolean> topicMsgDedupFunc = brokerMsgDupFunc;
|
|
||||||
|
|
||||||
LongFunction<Consumer<?>> consumerFunc = (l) ->
|
LongFunction<Consumer<?>> consumerFunc = (l) ->
|
||||||
clientSpace.getConsumer(
|
clientSpace.getConsumer(
|
||||||
topic_uri_func.apply(l),
|
topic_uri_func.apply(l),
|
||||||
@ -480,9 +459,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
useTransactionFunc,
|
useTransactionFunc,
|
||||||
seqTrackingFunc,
|
seqTrackingFunc,
|
||||||
transactionSupplierFunc,
|
transactionSupplierFunc,
|
||||||
topicMsgDedupFunc,
|
|
||||||
consumerFunc,
|
consumerFunc,
|
||||||
subscription_type_func,
|
|
||||||
e2eMsgProc);
|
e2eMsgProc);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -491,8 +468,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
LongFunction<String> topic_uri_func,
|
LongFunction<String> topic_uri_func,
|
||||||
LongFunction<Boolean> async_api_func,
|
LongFunction<Boolean> async_api_func,
|
||||||
LongFunction<Boolean> useTransactionFunc,
|
LongFunction<Boolean> useTransactionFunc,
|
||||||
LongFunction<Boolean> seqTrackingFunc,
|
LongFunction<Boolean> seqTrackingFunc
|
||||||
LongFunction<Boolean> brokerMsgDupFunc
|
|
||||||
) {
|
) {
|
||||||
// Topic list (multi-topic)
|
// Topic list (multi-topic)
|
||||||
LongFunction<String> topic_names_func;
|
LongFunction<String> topic_names_func;
|
||||||
@ -562,17 +538,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
|
|||||||
useTransactionFunc,
|
useTransactionFunc,
|
||||||
seqTrackingFunc,
|
seqTrackingFunc,
|
||||||
transactionSupplierFunc,
|
transactionSupplierFunc,
|
||||||
// For multi-topic subscription message consumption,
|
|
||||||
// - Only consider broker-level message deduplication setting
|
|
||||||
// - Ignore namespace- and topic-level message deduplication setting
|
|
||||||
//
|
|
||||||
// This is because Pulsar is able to specify a list of topics from
|
|
||||||
// different namespaces. In theory, we can get topic deduplication
|
|
||||||
// status from each message, but this will be too much overhead.
|
|
||||||
// e.g. pulsarAdmin.getPulsarAdmin().topics().getDeduplicationStatus(message.getTopicName())
|
|
||||||
brokerMsgDupFunc,
|
|
||||||
mtConsumerFunc,
|
mtConsumerFunc,
|
||||||
subscription_type_func,
|
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,7 @@ bindings:
|
|||||||
params:
|
params:
|
||||||
# "true" - asynchronous Pulsar Admin API
|
# "true" - asynchronous Pulsar Admin API
|
||||||
# "false" - synchronous Pulsar Admin API
|
# "false" - synchronous Pulsar Admin API
|
||||||
async_api: "true"
|
async_api: "false"
|
||||||
# "true" - delete namespace
|
# "true" - delete namespace
|
||||||
# "false" - create namespace
|
# "false" - create namespace
|
||||||
admin_delop: "false"
|
admin_delop: "false"
|
||||||
|
@ -8,7 +8,7 @@ params:
|
|||||||
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
|
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
|
||||||
# "true" - asynchronous Pulsar Admin API
|
# "true" - asynchronous Pulsar Admin API
|
||||||
# "false" - synchronous Pulsar Admin API
|
# "false" - synchronous Pulsar Admin API
|
||||||
async_api: "true"
|
async_api: "false"
|
||||||
# "true" - delete topic
|
# "true" - delete topic
|
||||||
# "false" - create topic
|
# "false" - create topic
|
||||||
admin_delop: "false"
|
admin_delop: "false"
|
||||||
|
89
driver-pulsar/src/main/resources/design_revisit.md
Normal file
89
driver-pulsar/src/main/resources/design_revisit.md
Normal file
@ -0,0 +1,89 @@
|
|||||||
|
# TODO : Design Revisit -- Advanced Driver Features
|
||||||
|
|
||||||
|
**NOTE**: The following text is based on the original multi-layer API
|
||||||
|
caching design which is not fully implemented at the moment. We need to
|
||||||
|
revisit the original design at some point in order to achieve maximum
|
||||||
|
testing flexibility.
|
||||||
|
|
||||||
|
To summarize, the original caching design has the following key
|
||||||
|
requirements:
|
||||||
|
|
||||||
|
* **Requirement 1**: Each NB Pulsar activity is able to launch and cache
|
||||||
|
multiple **client spaces**
|
||||||
|
* **Requirement 2**: Each client space can launch and cache multiple
|
||||||
|
Pulsar operators of the same type (producer, consumer, etc.)
|
||||||
|
* **Requirement 3**: The size of each Pulsar operator specific cached
|
||||||
|
space can be configurable.
|
||||||
|
|
||||||
|
In the current implementation, only requirement 2 is implemented.
|
||||||
|
|
||||||
|
* For requirement 1, the current implementation only supports one client
|
||||||
|
space per NB Pulsar activity
|
||||||
|
* For requirement 3, the cache space size is not configurable (no limit at
|
||||||
|
the moment)
|
||||||
|
|
||||||
|
## Other Activity Parameters
|
||||||
|
|
||||||
|
- **maxcached** - A default value to be applied to `max_clients`,
|
||||||
|
`max_producers`, `max_consumers`.
|
||||||
|
- default: `max_cached=100`
|
||||||
|
- **max_clients** - Clients cache size. This is the number of client
|
||||||
|
instances which are allowed to be cached in the NoSQLBench client
|
||||||
|
runtime. The clients cache automatically maintains a cache of unique
|
||||||
|
client instances internally. default: _maxcached_
|
||||||
|
- **max_operators** - Producers/Consumers/Readers cache size (per client
|
||||||
|
instance). Limits the number of instances which are allowed to be cached
|
||||||
|
per client instance. default: _maxcached_
|
||||||
|
|
||||||
|
## API Caching
|
||||||
|
|
||||||
|
This driver is tailored around the multi-tenancy and topic naming scheme
|
||||||
|
that is part of Apache Pulsar. Specifically, you can create an arbitrary
|
||||||
|
number of client instances, producers (per client), and consumers (per
|
||||||
|
client) depending on your testing requirements.
|
||||||
|
|
||||||
|
Further, the topic URI is composed from the provided qualifiers of
|
||||||
|
`persistence`, `tenant`, `namespace`, and `topic`, or you can provide a
|
||||||
|
fully-composed value in the `persistence://tenant/namespace/topic`
|
||||||
|
form.
|
||||||
|
|
||||||
|
### Instancing Controls
|
||||||
|
|
||||||
|
Normative usage of the Apache Pulsar API follows a strictly enforced
|
||||||
|
binding of topics to producers and consumers. As well, clients may be
|
||||||
|
customized with different behavior for advanced testing scenarios. There
|
||||||
|
is a significant variety of messaging and concurrency schemes seen in
|
||||||
|
modern architectures. Thus, it is important that testing tools rise to the
|
||||||
|
occasion by letting users configure their testing runtimes to emulate
|
||||||
|
applications as they are found in practice. To this end, the NoSQLBench
|
||||||
|
driver for Apache Pulsar provides a set controls within its op template
|
||||||
|
format which allow for flexible yet intuitive instancing in the client
|
||||||
|
runtime. This is enabled directly by using nominative variables for
|
||||||
|
instance names where needed. When the instance names are not provided for
|
||||||
|
an operation, defaults are used to emulate a simple configuration.
|
||||||
|
|
||||||
|
Since this is a new capability in a NoSQLBench driver, how it works is
|
||||||
|
explained below:
|
||||||
|
|
||||||
|
When a pulsar cycles is executed, the operation is synthesized from the op
|
||||||
|
template fields as explained below under _Op Fields_. This happens in a
|
||||||
|
specific order:
|
||||||
|
|
||||||
|
1. The client instance name is resolved. If a `client` field is provided,
|
||||||
|
this is taken as the client instance name. If not, it is set
|
||||||
|
to `default`.
|
||||||
|
2. The named client instance is fetched from the cache, or created and
|
||||||
|
cached if it does not yet exist.
|
||||||
|
3. The topic_uri is resolved. This is the value to be used with
|
||||||
|
`.topic(...)` calls in the API. The op fields below explain how to
|
||||||
|
control this value.
|
||||||
|
4. For _send_ operations, a producer is named and created if needed. By
|
||||||
|
default, the producer is named after the topic_uri above. You can
|
||||||
|
override this by providing a value for `producer`.
|
||||||
|
5. For _recv_ operations, a consumer is named and created if needed. By
|
||||||
|
default, the consumer is named after the topic_uri above. You can
|
||||||
|
override this by providing a value for `consumer`.
|
||||||
|
|
||||||
|
The most important detail for understanding the instancing controls is
|
||||||
|
that clients, producers, and consumers are all named and cached in the
|
||||||
|
specific order above.
|
@ -1,38 +1,34 @@
|
|||||||
- [1. NoSQLBench (NB) Pulsar Driver Overview](#1-nosqlbench-nb-pulsar-driver-overview)
|
- [1. Overview](#1-overview)
|
||||||
- [1.1. Issues Tracker](#11-issues-tracker)
|
- [1.1. Issues Tracker](#11-issues-tracker)
|
||||||
- [1.2. Global Level Pulsar Configuration Settings](#12-global-level-pulsar-configuration-settings)
|
- [2. NB Pulsar Driver Yaml File - High Level Structure](#2-nb-pulsar-driver-yaml-file---high-level-structure)
|
||||||
- [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure)
|
- [3. Global Level Pulsar Configuration Settings](#3-global-level-pulsar-configuration-settings)
|
||||||
- [1.3.1. Configuration Parameter Levels](#131-configuration-parameter-levels)
|
- [4. Global, Document, and Statement Level Configuration Items](#4-global-document-and-statement-level-configuration-items)
|
||||||
- [1.4. Pulsar Driver Yaml File - Command Blocks](#14-pulsar-driver-yaml-file---command-blocks)
|
- [5. NB Pulsar Driver Yaml File - Command Blocks](#5-nb-pulsar-driver-yaml-file---command-blocks)
|
||||||
- [1.4.1. Pulsar Admin API Command Block - Create Tenants](#141-pulsar-admin-api-command-block---create-tenants)
|
- [5.1. Pulsar Admin API Command Block - Create/Delete Tenants](#51-pulsar-admin-api-command-block---createdelete-tenants)
|
||||||
- [1.4.2. Pulsar Admin API Command Block - Create Namespaces](#142-pulsar-admin-api-command-block---create-namespaces)
|
- [5.2. Pulsar Admin API Command Block - Create/Delete Namespaces](#52-pulsar-admin-api-command-block---createdelete-namespaces)
|
||||||
- [1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular)](#143-pulsar-admin-api-command-block---create-topics-partitioned-or-regular)
|
- [5.3. Pulsar Admin API Command Block - Create/Delete Topics (Partitioned or Regular)](#53-pulsar-admin-api-command-block---createdelete-topics-partitioned-or-regular)
|
||||||
- [1.4.4. Batch Producer Command Block](#144-batch-producer-command-block)
|
- [5.4. Batch Producer Command Block](#54-batch-producer-command-block)
|
||||||
- [1.4.5. Producer Command Block](#145-producer-command-block)
|
- [5.5. Producer Command Block](#55-producer-command-block)
|
||||||
- [1.4.6. (Single-Topic) Consumer Command Block](#146-single-topic-consumer-command-block)
|
- [5.6. (Single-Topic) Consumer Command Block](#56-single-topic-consumer-command-block)
|
||||||
- [1.4.7. Reader Command Block](#147-reader-command-block)
|
- [5.7. Reader Command Block](#57-reader-command-block)
|
||||||
- [1.4.8. Multi-topic Consumer Command Block](#148-multi-topic-consumer-command-block)
|
- [5.8. Multi-topic Consumer Command Block](#58-multi-topic-consumer-command-block)
|
||||||
- [1.4.9. End-to-end Message Processing Command Block](#149-end-to-end-message-processing-command-block)
|
- [5.9. End-to-end Message Processing Command Block](#59-end-to-end-message-processing-command-block)
|
||||||
- [1.5. Message Properties](#15-message-properties)
|
- [6. Message Properties](#6-message-properties)
|
||||||
- [1.6. Schema Support](#16-schema-support)
|
- [7. Schema Support](#7-schema-support)
|
||||||
- [1.7. Measure End-to-end Message Processing Latency](#17-measure-end-to-end-message-processing-latency)
|
- [8. Measure End-to-end Message Processing Latency](#8-measure-end-to-end-message-processing-latency)
|
||||||
- [1.8. Detect Message Out-of-order, Message Loss, and Message Duplication](#18-detect-message-out-of-order-message-loss-and-message-duplication)
|
- [9. Detect Message Out-of-order, Message Loss, and Message Duplication](#9-detect-message-out-of-order-message-loss-and-message-duplication)
|
||||||
- [1.9. NB Activity Execution Parameters](#19-nb-activity-execution-parameters)
|
- [10. NB Activity Execution Parameters](#10-nb-activity-execution-parameters)
|
||||||
- [1.10. NB Pulsar Driver Execution Example](#110-nb-pulsar-driver-execution-example)
|
- [11. NB Pulsar Driver Execution Example](#11-nb-pulsar-driver-execution-example)
|
||||||
- [1.11. Appendix A. Template Global Setting File (config.properties)](#111-appendix-a-template-global-setting-file-configproperties)
|
- [12. Appendix A. Template Global Setting File (config.properties)](#12-appendix-a-template-global-setting-file-configproperties)
|
||||||
- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features)
|
|
||||||
- [2.1. Other Activity Parameters](#21-other-activity-parameters)
|
|
||||||
- [2.2. API Caching](#22-api-caching)
|
|
||||||
- [2.2.1. Instancing Controls](#221-instancing-controls)
|
|
||||||
|
|
||||||
# 1. NoSQLBench (NB) Pulsar Driver Overview
|
# 1. Overview
|
||||||
|
|
||||||
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
|
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
|
||||||
* Admin API - create tenants
|
* Admin API - create/delete tenants
|
||||||
* Admin API - create namespaces
|
* Admin API - create/delete namespaces
|
||||||
* Admin API - create topics
|
* Admin API - create/delete topics, partitioned or not
|
||||||
* Producer
|
* Producer - publish messages with Avro schema support
|
||||||
* Consumer
|
* Consumer - consume messages with all subscription types
|
||||||
* Reader
|
* Reader
|
||||||
* (Future) WebSocket Producer
|
* (Future) WebSocket Producer
|
||||||
* (Future) Managed Ledger
|
* (Future) Managed Ledger
|
||||||
@ -41,68 +37,7 @@ This driver allows you to simulate and run different types of workloads (as belo
|
|||||||
|
|
||||||
If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
|
If you have issues or new requirements for this driver, please add them at the [pulsar issues tracker](https://github.com/nosqlbench/nosqlbench/issues/new?labels=pulsar).
|
||||||
|
|
||||||
## 1.2. Global Level Pulsar Configuration Settings
|
# 2. NB Pulsar Driver Yaml File - High Level Structure
|
||||||
|
|
||||||
The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) to publish messages to and consume messages from a Pulsar cluster. In order to do so, a [PulsarClient](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient) object needs to be created first in order to establish the connection to the Pulsar cluster; then a workload-specific object (e.g. [Producer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer) or [Consumer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer)) is required in order to execute workload-specific actions (e.g. publishing or consuming messages).
|
|
||||||
|
|
||||||
When creating these objects (e.g. PulsarClient, Producer), there are different configuration options that can be used. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration options when creating a Pulsar Producer object.
|
|
||||||
|
|
||||||
The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below:
|
|
||||||
|
|
||||||
```properties
|
|
||||||
### Schema related configurations - schema.xxx
|
|
||||||
schema.type = avro
|
|
||||||
schema.definition = file:///<path/to/avro/schema/definition/file>
|
|
||||||
|
|
||||||
### Pulsar client related configurations - client.xxx
|
|
||||||
client.connectionTimeoutMs = 5000
|
|
||||||
|
|
||||||
### Producer related configurations (global) - producer.xxx
|
|
||||||
producer.topicName = persistent://public/default/mynbtest
|
|
||||||
producer.producerName =
|
|
||||||
producer.sendTimeoutMs =
|
|
||||||
```
|
|
||||||
|
|
||||||
There are multiple sections in this file that correspond to different groups of configuration settings:
|
|
||||||
* **Schema related settings**:
|
|
||||||
* All settings under this section starts with **schema.** prefix.
|
|
||||||
* The NB Pulsar driver supports schema-based message publishing and
|
|
||||||
consuming. This section defines configuration settings that are
|
|
||||||
schema related.
|
|
||||||
* There are 2 valid options under this section.
|
|
||||||
* *schema.type*: Pulsar message schema type. When unset or set as
|
|
||||||
an empty string, Pulsar messages will be handled in raw *byte[]*
|
|
||||||
format. The other valid option is **avro** which the Pulsar
|
|
||||||
message will follow a specific Avro format.
|
|
||||||
* *schema.definition*: This only applies when an Avro schema type
|
|
||||||
is specified. The value of this configuration is the (full) file
|
|
||||||
path that contains the Avro schema definition.
|
|
||||||
* **Pulsar Client related settings**:
|
|
||||||
* All settings under this section starts with **client.** prefix.
|
|
||||||
* This section defines all configuration settings that are related
|
|
||||||
with defining a PulsarClient object.
|
|
||||||
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
|
|
||||||
* **Pulsar Producer related settings**:
|
|
||||||
* All settings under this section starts with **producer** prefix.
|
|
||||||
* This section defines all configuration settings that are related
|
|
||||||
with defining a Pulsar Producer object.
|
|
||||||
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
|
|
||||||
* **Pulsar Consumer related settings**:
|
|
||||||
* All settings under this section starts with **consumer** prefix.
|
|
||||||
* This section defines all configuration settings that are related
|
|
||||||
with defining a Pulsar Consumer object.
|
|
||||||
* See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
|
|
||||||
* **Pulsar Reader related settings**:
|
|
||||||
* All settings under this section starts with **reader** prefix.
|
|
||||||
* This section defines all configuration settings that are related
|
|
||||||
with defining a Pulsar Reader object.
|
|
||||||
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
|
|
||||||
|
|
||||||
In the future, when the support for other types of Pulsar workloads is
|
|
||||||
added in NB Pulsar driver, there will be corresponding configuration
|
|
||||||
sections in this file as well.
|
|
||||||
|
|
||||||
## 1.3. NB Pulsar Driver Yaml File - High Level Structure
|
|
||||||
|
|
||||||
Just like other NB driver types, the actual Pulsar workload generation is
|
Just like other NB driver types, the actual Pulsar workload generation is
|
||||||
determined by the statement blocks in an NB driver Yaml file. Depending
|
determined by the statement blocks in an NB driver Yaml file. Depending
|
||||||
@ -127,9 +62,7 @@ At high level, Pulsar driver yaml file has the following structure:
|
|||||||
* **seq_tracking**: Whether to do message sequence tracking. This is
|
* **seq_tracking**: Whether to do message sequence tracking. This is
|
||||||
used for message out-of-order and message loss detection (more on
|
used for message out-of-order and message loss detection (more on
|
||||||
this later).
|
this later).
|
||||||
* **msg_dedup_broker**: Whether or not broker level message deduplication
|
* **statement blocks**: includes a series of command blocks. Each command block
|
||||||
is enabled.
|
|
||||||
* **blocks**: includes a series of command blocks. Each command block
|
|
||||||
defines one major Pulsar operation such as *producer*, *consumer*, etc.
|
defines one major Pulsar operation such as *producer*, *consumer*, etc.
|
||||||
Right now, the following command blocks are already supported or will be
|
Right now, the following command blocks are already supported or will be
|
||||||
added in the near future. We'll go through each of these command blocks
|
added in the near future. We'll go through each of these command blocks
|
||||||
@ -203,7 +136,68 @@ multiple Pulsar operations in one run! But if we want to focus the testing
|
|||||||
on one particular operation, we can use the tag to filter the command
|
on one particular operation, we can use the tag to filter the command
|
||||||
block as listed above!
|
block as listed above!
|
||||||
|
|
||||||
### 1.3.1. Configuration Parameter Levels
|
# 3. Global Level Pulsar Configuration Settings
|
||||||
|
|
||||||
|
The NB Pulsar driver relies on Pulsar's [Java Client API](https://pulsar.apache.org/docs/en/client-libraries-java/) to publish messages to and consume messages from a Pulsar cluster. In order to do so, a [PulsarClient](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/PulsarClient) object needs to be created first in order to establish the connection to the Pulsar cluster; then a workload-specific object (e.g. [Producer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Producer) or [Consumer](https://pulsar.incubator.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Consumer)) is required in order to execute workload related actions (e.g. publishing or consuming messages).
|
||||||
|
|
||||||
|
When creating these objects (e.g. PulsarClient, Producer), there are different configuration options that can be used. For example, [this document](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) lists all possible configuration options when creating a Pulsar Producer object.
|
||||||
|
|
||||||
|
The NB pulsar driver supports these options via a global properties file (e.g. **config.properties**). An example of this file is as below:
|
||||||
|
|
||||||
|
```properties
|
||||||
|
### Schema related configurations - schema.xxx
|
||||||
|
schema.type = avro
|
||||||
|
schema.definition = file:///<path/to/avro/schema/definition/file>
|
||||||
|
|
||||||
|
### Pulsar client related configurations - client.xxx
|
||||||
|
client.connectionTimeoutMs = 5000
|
||||||
|
|
||||||
|
### Producer related configurations (global) - producer.xxx
|
||||||
|
producer.topicName = persistent://public/default/mynbtest
|
||||||
|
producer.producerName =
|
||||||
|
producer.sendTimeoutMs =
|
||||||
|
```
|
||||||
|
|
||||||
|
There are multiple sections in this file that correspond to different groups of configuration settings:
|
||||||
|
* **Schema related settings**:
|
||||||
|
* All settings under this section starts with **schema.** prefix.
|
||||||
|
* The NB Pulsar driver supports schema-based message publishing and
|
||||||
|
consuming. This section defines configuration settings that are
|
||||||
|
schema related.
|
||||||
|
* There are 2 valid options under this section.
|
||||||
|
* *schema.type*: Pulsar message schema type. When unset or set as
|
||||||
|
an empty string, Pulsar messages will be handled in raw *byte[]*
|
||||||
|
format. The other valid option is **avro** which the Pulsar
|
||||||
|
message will follow Avro schema format.
|
||||||
|
* *schema.definition*: This only applies when an Avro schema type
|
||||||
|
is specified. The value of this configuration is the (full) file
|
||||||
|
path that contains the Avro schema definition.
|
||||||
|
* **Pulsar Client related settings**:
|
||||||
|
* All settings under this section starts with **client.** prefix.
|
||||||
|
* This section defines all configuration settings that are related
|
||||||
|
with defining a PulsarClient object.
|
||||||
|
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
|
||||||
|
* **Pulsar Producer related settings**:
|
||||||
|
* All settings under this section starts with **producer** prefix.
|
||||||
|
* This section defines all configuration settings that are related
|
||||||
|
with defining a Pulsar Producer object.
|
||||||
|
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
|
||||||
|
* **Pulsar Consumer related settings**:
|
||||||
|
* All settings under this section starts with **consumer** prefix.
|
||||||
|
* This section defines all configuration settings that are related
|
||||||
|
with defining a Pulsar Consumer object.
|
||||||
|
* See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
|
||||||
|
* **Pulsar Reader related settings**:
|
||||||
|
* All settings under this section starts with **reader** prefix.
|
||||||
|
* This section defines all configuration settings that are related
|
||||||
|
with defining a Pulsar Reader object.
|
||||||
|
* See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
|
||||||
|
|
||||||
|
In the future, when the support for other types of Pulsar workloads is
|
||||||
|
added in NB Pulsar driver, there will be corresponding configuration
|
||||||
|
sections in this file as well.
|
||||||
|
|
||||||
|
# 4. Global, Document, and Statement Level Configuration Items
|
||||||
|
|
||||||
The NB Pulsar driver configuration parameters can be set at 3 different
|
The NB Pulsar driver configuration parameters can be set at 3 different
|
||||||
levels:
|
levels:
|
||||||
@ -213,13 +207,13 @@ levels:
|
|||||||
schema.type=
|
schema.type=
|
||||||
```
|
```
|
||||||
* **document level**: parameters that are set within NB yaml file and under
|
* **document level**: parameters that are set within NB yaml file and under
|
||||||
the ***params*** section
|
the ***params*** section
|
||||||
```
|
```
|
||||||
params:
|
params:
|
||||||
topic_uri: ...
|
topic_uri: ...
|
||||||
```
|
```
|
||||||
* **statement level**: parameters that are set within NB yaml file, but
|
* **statement level**: parameters that are set within NB yaml file, but
|
||||||
under different block statements
|
under different block statements
|
||||||
```
|
```
|
||||||
- name: producer-block
|
- name: producer-block
|
||||||
statements:
|
statements:
|
||||||
@ -230,15 +224,15 @@ under different block statements
|
|||||||
**NOTE**: If one parameter is set at multiple levels (e.g. producer name),
|
**NOTE**: If one parameter is set at multiple levels (e.g. producer name),
|
||||||
the parameter at lower level will take precedence.
|
the parameter at lower level will take precedence.
|
||||||
|
|
||||||
## 1.4. Pulsar Driver Yaml File - Command Blocks
|
# 5. NB Pulsar Driver Yaml File - Command Blocks
|
||||||
|
|
||||||
### 1.4.1. Pulsar Admin API Command Block - Create Tenants
|
## 5.1. Pulsar Admin API Command Block - Create/Delete Tenants
|
||||||
|
|
||||||
This Pulsar Admin API Block is used to create or delete Pulsar tenants. It
|
This Pulsar Admin API Block is used to create or delete Pulsar tenants. It
|
||||||
has the following format.
|
has the following format.
|
||||||
|
|
||||||
Please note that when document level parameter **admin_delop** is set to be
|
Please note that when document level parameter **admin_delop** is set to be
|
||||||
true, then this command block will delete Pulsar tenants instead. Similarly
|
true, then this command block will delete Pulsar tenants instead. Similarly,
|
||||||
this applies to other Admin API blocks for namespace and topic management.
|
this applies to other Admin API blocks for namespace and topic management.
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
@ -266,7 +260,7 @@ In this command block, there is only 1 statement (s1):
|
|||||||
* (Mandatory) **tenant** is the Pulsar tenant name to be created. It
|
* (Mandatory) **tenant** is the Pulsar tenant name to be created. It
|
||||||
can either be dynamically or statically bound.
|
can either be dynamically or statically bound.
|
||||||
|
|
||||||
### 1.4.2. Pulsar Admin API Command Block - Create Namespaces
|
## 5.2. Pulsar Admin API Command Block - Create/Delete Namespaces
|
||||||
|
|
||||||
This Pulsar Admin API Block is used to create Pulsar namespaces. It has the following format:
|
This Pulsar Admin API Block is used to create Pulsar namespaces. It has the following format:
|
||||||
|
|
||||||
@ -289,7 +283,7 @@ In this command block, there is only 1 statement (s1):
|
|||||||
* (Mandatory) **namespace** is the Pulsar namespace name to be created
|
* (Mandatory) **namespace** is the Pulsar namespace name to be created
|
||||||
under a tenant. It can be either statically or dynamically bound.
|
under a tenant. It can be either statically or dynamically bound.
|
||||||
|
|
||||||
### 1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular)
|
## 5.3. Pulsar Admin API Command Block - Create/Delete Topics (Partitioned or Regular)
|
||||||
|
|
||||||
This Pulsar Admin API Block is used to create Pulsar topics. It has the following format:
|
This Pulsar Admin API Block is used to create Pulsar topics. It has the following format:
|
||||||
|
|
||||||
@ -318,7 +312,7 @@ In this command block, there is only 1 statement (s1):
|
|||||||
|
|
||||||
**NOTE**: The topic name is bound by the document level parameter "topic_uri".
|
**NOTE**: The topic name is bound by the document level parameter "topic_uri".
|
||||||
|
|
||||||
### 1.4.4. Batch Producer Command Block
|
## 5.4. Batch Producer Command Block
|
||||||
|
|
||||||
Batch producer command block is used to produce a batch of messages all at
|
Batch producer command block is used to produce a batch of messages all at
|
||||||
once by one NB cycle execution. A typical format of this command block is
|
once by one NB cycle execution. A typical format of this command block is
|
||||||
@ -390,7 +384,7 @@ ratios: 1, <batch_num>, 1.
|
|||||||
**NOTE**: the topic that the producer needs to publish messages to is
|
**NOTE**: the topic that the producer needs to publish messages to is
|
||||||
specified by the document level parameter ***topic_uri***.
|
specified by the document level parameter ***topic_uri***.
|
||||||
|
|
||||||
### 1.4.5. Producer Command Block
|
## 5.5. Producer Command Block
|
||||||
|
|
||||||
This is the regular Pulsar producer command block that produces one Pulsar
|
This is the regular Pulsar producer command block that produces one Pulsar
|
||||||
message per NB cycle execution. A typical format of this command block is
|
message per NB cycle execution. A typical format of this command block is
|
||||||
@ -432,15 +426,16 @@ This command block only has 1 statements (s1):
|
|||||||
of the generated message. It must be a JSON string that contains a
|
of the generated message. It must be a JSON string that contains a
|
||||||
series of key-value pairs.
|
series of key-value pairs.
|
||||||
* (Mandatory) **msg_payload** specifies the payload of the generated
|
* (Mandatory) **msg_payload** specifies the payload of the generated
|
||||||
message
|
message. If the message schema type is specified as Avro schema type,
|
||||||
|
then the message value format must be in proper Avro format.
|
||||||
|
|
||||||
**NOTE**: the topic that the producer needs to publish messages to is
|
**NOTE**: the topic that the producer needs to publish messages to is
|
||||||
specified by the document level parameter ***topic_uri***.
|
specified by the document level parameter ***topic_uri***.
|
||||||
|
|
||||||
### 1.4.6. (Single-Topic) Consumer Command Block
|
## 5.6. (Single-Topic) Consumer Command Block
|
||||||
|
|
||||||
This is the regular Pulsar consumer command block that consumes one Pulsar
|
This is the regular Pulsar consumer command block that consumes messages
|
||||||
message from one single Pulsar topic per NB cycle execution. A typical
|
from one single Pulsar topic per NB cycle execution. A typical
|
||||||
format of this command block is as below:
|
format of this command block is as below:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
@ -463,18 +458,18 @@ This command block only has 1 statements (s1):
|
|||||||
this statement
|
this statement
|
||||||
* (Mandatory) **subscription_name** specifies subscription name.
|
* (Mandatory) **subscription_name** specifies subscription name.
|
||||||
* (Optional) **subscription_type**, when provided, specifies
|
* (Optional) **subscription_type**, when provided, specifies
|
||||||
subscription type. Default to **exclusive** subscription type.
|
subscription type. Default to **Exclusive** subscription type.
|
||||||
* (Optional) **consumer_name**, when provided, specifies the
|
* (Optional) **consumer_name**, when provided, specifies the
|
||||||
associated consumer name.
|
associated consumer name.
|
||||||
|
|
||||||
**NOTE**: the single topic that the consumer needs to consume messages from
|
**NOTE**: the single topic that the consumer receives messages from is
|
||||||
is specified by the document level parameter ***topic_uri***.
|
specified by the document level parameter ***topic_uri***.
|
||||||
|
|
||||||
### 1.4.7. Reader Command Block
|
## 5.7. Reader Command Block
|
||||||
|
|
||||||
This is the regular Pulsar reader command block that reads one Pulsar
|
This is the regular Pulsar reader command block that reads messages from
|
||||||
message per NB cycle execution. A typical format of this command block is
|
one Pulsar topic per NB cycle execution. A typical format of this command
|
||||||
as below:
|
block is as below:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
- name: reader-block
|
- name: reader-block
|
||||||
@ -513,11 +508,11 @@ Reader reader = pulsarClient.newReader()
|
|||||||
.create();
|
.create();
|
||||||
```
|
```
|
||||||
|
|
||||||
### 1.4.8. Multi-topic Consumer Command Block
|
## 5.8. Multi-topic Consumer Command Block
|
||||||
|
|
||||||
This is the regular Pulsar consumer command block that consumes one Pulsar
|
This is the Pulsar consumer command block that consumes messages from
|
||||||
message from multiple Pulsar topics per NB cycle execution. A typical format
|
multiple Pulsar topics per NB cycle execution. A typical format of
|
||||||
of this command block is as below:
|
this command block is as below:
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
- name: multi-topic-consumer-block
|
- name: multi-topic-consumer-block
|
||||||
@ -541,22 +536,23 @@ This command block only has 1 statements (s1):
|
|||||||
* (Mandatory) **optype (msg-consume)** is the statement identifier for
|
* (Mandatory) **optype (msg-consume)** is the statement identifier for
|
||||||
this statement
|
this statement
|
||||||
* (Optional) **topic_names**, when provided, specifies multiple topic
|
* (Optional) **topic_names**, when provided, specifies multiple topic
|
||||||
names from which to consume messages for multi-topic message consumption.
|
names from which to consume messages.
|
||||||
* (Optional) **topics_pattern**, when provided, specifies pulsar
|
* (Optional) **topics_pattern**, when provided, specifies pulsar
|
||||||
topic regex pattern for multi-topic message consumption
|
topic regex pattern for multi-topic message consumption
|
||||||
* (Mandatory) **subscription_name** specifies subscription name.
|
* (Mandatory) **subscription_name** specifies subscription name.
|
||||||
* (Optional) **subscription_type**, when provided, specifies
|
* (Optional) **subscription_type**, when provided, specifies
|
||||||
subscription type. Default to **exclusive** subscription type.
|
subscription type. Default to **Exclusive** subscription type.
|
||||||
* (Optional) **consumer_name**, when provided, specifies the
|
* (Optional) **consumer_name**, when provided, specifies the
|
||||||
associated consumer name.
|
associated consumer name.
|
||||||
|
|
||||||
**NOTE 1**: when both **topic_names** and **topics_pattern** are provided,
|
**NOTE 1**: if neither **topic_names** and **topics_pattern** is provided,
|
||||||
|
consumer topic name is default to the document level parameter **topic_uri**.
|
||||||
|
Otherwise, the document level parameter **topic_uri** is ignored.
|
||||||
|
|
||||||
|
**NOTE 2**: when both **topic_names** and **topics_pattern** are provided,
|
||||||
**topic_names** takes precedence over **topics_pattern**.
|
**topic_names** takes precedence over **topics_pattern**.
|
||||||
|
|
||||||
**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided,
|
## 5.9. End-to-end Message Processing Command Block
|
||||||
consumer topic name is default to the document level parameter **topic_uri**.
|
|
||||||
|
|
||||||
### 1.4.9. End-to-end Message Processing Command Block
|
|
||||||
|
|
||||||
End-to-end message processing command block is used to simplify measuring
|
End-to-end message processing command block is used to simplify measuring
|
||||||
the end-to-end message processing (from being published to being consumed)
|
the end-to-end message processing (from being published to being consumed)
|
||||||
@ -600,19 +596,19 @@ ratios: 1, 1.
|
|||||||
* (Optional) **ratio**, must be 1 when provided.
|
* (Optional) **ratio**, must be 1 when provided.
|
||||||
Otherwise, default to 1.
|
Otherwise, default to 1.
|
||||||
* Statement **s2** is used to consume the message that just got published
|
* Statement **s2** is used to consume the message that just got published
|
||||||
from the same topic
|
from the same topic
|
||||||
* (Mandatory) **optype (ec2-msg-proc-consume)** is the statement
|
* (Mandatory) **optype (ec2-msg-proc-consume)** is the statement
|
||||||
identifier for this statement
|
identifier for this statement
|
||||||
* (Mandatory) **subscription_name** specifies subscription name.
|
* (Mandatory) **subscription_name** specifies subscription name.
|
||||||
* (Optional) **subscription_type**, when provided, specifies
|
* (Optional) **subscription_type**, when provided, specifies
|
||||||
subscription type. Default to **exclusive** subscription type.
|
subscription type. Default to **exclusive** subscription type.
|
||||||
* (Optional) **ratio**, must be 1 when provided.
|
* (Optional) **ratio**, must be 1 when provided.
|
||||||
Otherwise, default to 1.
|
Otherwise, default to 1.
|
||||||
|
|
||||||
**NOTE**: the topic that the producer needs to publish messages to is
|
**NOTE**: the topic that the producer needs to publish messages to is
|
||||||
specified by the document level parameter ***topic_uri***.
|
specified by the document level parameter ***topic_uri***.
|
||||||
|
|
||||||
## 1.5. Message Properties
|
# 6. Message Properties
|
||||||
|
|
||||||
In the producer command block, it is optional to specify message properties:
|
In the producer command block, it is optional to specify message properties:
|
||||||
```
|
```
|
||||||
@ -630,7 +626,7 @@ contains a list of key value pairs. Otherwise, if it is not a valid
|
|||||||
JSON string as expected, the driver will ignore it and treat the
|
JSON string as expected, the driver will ignore it and treat the
|
||||||
message as having no properties.
|
message as having no properties.
|
||||||
|
|
||||||
## 1.6. Schema Support
|
# 7. Schema Support
|
||||||
|
|
||||||
Pulsar has built-in schema support. Other than primitive types, Pulsar
|
Pulsar has built-in schema support. Other than primitive types, Pulsar
|
||||||
also supports complex types like **Avro**, etc. At the moment, the NB
|
also supports complex types like **Avro**, etc. At the moment, the NB
|
||||||
@ -648,8 +644,8 @@ related settings as below:
|
|||||||
```
|
```
|
||||||
|
|
||||||
Take the previous Producer command block as an example, the **msg-value**
|
Take the previous Producer command block as an example, the **msg-value**
|
||||||
parameter has the value of a JSON string that follows the following Avro
|
parameter has the value of a JSON string that follows the specified Avro
|
||||||
schema definition:
|
schema definition, an example of which is as below:
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"type": "record",
|
"type": "record",
|
||||||
@ -664,10 +660,10 @@ schema definition:
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## 1.7. Measure End-to-end Message Processing Latency
|
# 8. Measure End-to-end Message Processing Latency
|
||||||
|
|
||||||
**e2e-msg-proc-block** measures the end-to-end message latency metrics. It
|
The built-in **e2e-msg-proc-block** measures the end-to-end message latency metrics.
|
||||||
contains one message producing statement and one message consuming statement.
|
It contains one message producing statement and one message consuming statement.
|
||||||
When the message that is published by the producer is received by the consumer,
|
When the message that is published by the producer is received by the consumer,
|
||||||
the consumer calculates the time difference between when the time is received
|
the consumer calculates the time difference between when the time is received
|
||||||
and when the time is published.
|
and when the time is published.
|
||||||
@ -675,8 +671,8 @@ and when the time is published.
|
|||||||
The measured end-to-end message processing latency is captured as a histogram
|
The measured end-to-end message processing latency is captured as a histogram
|
||||||
metrics name "e2e_msg_latency".
|
metrics name "e2e_msg_latency".
|
||||||
|
|
||||||
This command block uses one single machine to act as both a producer and a
|
This built-in command block uses one single machine to act as both a producer and
|
||||||
consumer. We do so just for convenience purposes. In reality, we can use
|
a consumer. We do so just for convenience purposes. In reality, we can use
|
||||||
**producer-block** and **consumer-block** command blocks on separate machines
|
**producer-block** and **consumer-block** command blocks on separate machines
|
||||||
to achieve the same goal, which is probably closer to the actual use case and
|
to achieve the same goal, which is probably closer to the actual use case and
|
||||||
probably more accurate measurement (to avoid the situation of always reading
|
probably more accurate measurement (to avoid the situation of always reading
|
||||||
@ -685,11 +681,11 @@ messages from the managed ledger cache).
|
|||||||
One thing to remember though if we're using multiple machines to measure the
|
One thing to remember though if we're using multiple machines to measure the
|
||||||
end-to-end message processing latency, we need to make sure:
|
end-to-end message processing latency, we need to make sure:
|
||||||
1) The time of the two machines are synced up with each other, e.g. through
|
1) The time of the two machines are synced up with each other, e.g. through
|
||||||
NTP protocol.
|
NTP protocol.
|
||||||
2) If there is some time lag of starting the consumer, we need to count that
|
2) If there is some time lag of starting the consumer, we need to count that
|
||||||
into consideration when interpreting the end-to-end message processing latency.
|
into consideration when interpreting the end-to-end message processing latency.
|
||||||
|
|
||||||
## 1.8. Detect Message Out-of-order, Message Loss, and Message Duplication
|
# 9. Detect Message Out-of-order, Message Loss, and Message Duplication
|
||||||
|
|
||||||
In order to detect errors like message out-of-order and message loss through
|
In order to detect errors like message out-of-order and message loss through
|
||||||
the NB Pulsar driver, we need to set the following document level parameter
|
the NB Pulsar driver, we need to set the following document level parameter
|
||||||
@ -701,44 +697,7 @@ params:
|
|||||||
seq_tracking: "true"
|
seq_tracking: "true"
|
||||||
```
|
```
|
||||||
|
|
||||||
For message duplication detection, if broker level message dedup configuration
|
# 10. NB Activity Execution Parameters
|
||||||
is enabled ("brokerDeduplicationEnabled=true" in broker.conf), we also need to
|
|
||||||
enable this document level parameter:
|
|
||||||
```
|
|
||||||
params:
|
|
||||||
msg_dedup_broker: "true"
|
|
||||||
```
|
|
||||||
|
|
||||||
However, since message dedup. can be also enabled or disabled at namespace level
|
|
||||||
or topic level, the NB Pulsar driver will also check the settings at these layers
|
|
||||||
through API. Basically, the final message dedup setting for a topic is determined
|
|
||||||
by the following rules:
|
|
||||||
* if topic level message dedup is not set, check namespace level setting
|
|
||||||
* if namespace level message dedup is not set, check broker level setting which
|
|
||||||
in turn is determined by the document level NB parameter **msg_dedup_broker**
|
|
||||||
* if message dedup is enabled at multiple levels, the priority sequence follows:
|
|
||||||
* topic level > namespace level > broker level
|
|
||||||
|
|
||||||
The logic of how this works is based on the fact that NB execution cycle number
|
|
||||||
is monotonically increasing by 1 for every cycle moving forward. When publishing
|
|
||||||
a series of messages, we use the current NB cycle number as one message property
|
|
||||||
which is also monotonically increasing by 1.
|
|
||||||
|
|
||||||
When receiving the messages, if the message sequence number stored in the message
|
|
||||||
property is not monotonically increasing or if there is a gap larger than 1, then
|
|
||||||
it must be one of the following errors:
|
|
||||||
* If the current message sequence ID is less than the previous message sequence ID,
|
|
||||||
then it is message out-of-order error. Exception **PulsarMsgOutOfOrderException**
|
|
||||||
will be thrown out.
|
|
||||||
* if the current message sequence ID is more than 1 bigger than the previous message
|
|
||||||
sequence ID, then it is message loss error. Exception **PulsarMsgLossException**
|
|
||||||
will be thrown out.
|
|
||||||
* if message dedup is enabled and the current message sequence ID is equal to the
|
|
||||||
previous message sequence ID, then it is message duplication error. Exception **PulsarMsgDuplicateException** will be thrown out.
|
|
||||||
|
|
||||||
In either case, a runtime error will be thrown out with corresponding error messages.
|
|
||||||
|
|
||||||
## 1.9. NB Activity Execution Parameters
|
|
||||||
|
|
||||||
At the moment, the following NB Pulsar driver **specific** activity
|
At the moment, the following NB Pulsar driver **specific** activity
|
||||||
parameters are supported:
|
parameters are supported:
|
||||||
@ -756,7 +715,7 @@ reference to NB documentation for more parameters
|
|||||||
* cycles=<total_NB_cycle_execution_number>
|
* cycles=<total_NB_cycle_execution_number>
|
||||||
* --report-csv-to <metrics_output_dir_name>
|
* --report-csv-to <metrics_output_dir_name>
|
||||||
|
|
||||||
## 1.10. NB Pulsar Driver Execution Example
|
# 11. NB Pulsar Driver Execution Example
|
||||||
|
|
||||||
**NOTE**: in the following examples, the Pulsar service URL is **pulsar:
|
**NOTE**: in the following examples, the Pulsar service URL is **pulsar:
|
||||||
//localhost:6650**, please change it accordingly for your own Pulsar
|
//localhost:6650**, please change it accordingly for your own Pulsar
|
||||||
@ -769,7 +728,7 @@ environment.
|
|||||||
```
|
```
|
||||||
|
|
||||||
2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads.
|
2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads.
|
||||||
**NOTE**: *seq=* must have **concat** value in order to make the batch API working properly!
|
**NOTE**: *seq=* must have **concat** value in order to make the batch API working properly!
|
||||||
```bash
|
```bash
|
||||||
<nb_cmd> run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml --report-csv-to <metrics_folder_path>
|
<nb_cmd> run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml --report-csv-to <metrics_folder_path>
|
||||||
```
|
```
|
||||||
@ -780,8 +739,7 @@ environment.
|
|||||||
<nb_cmd> run driver=pulsar tags=phase:consumer cycles=100 web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
|
<nb_cmd> run driver=pulsar tags=phase:consumer cycles=100 web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
|
||||||
```
|
```
|
||||||
|
|
||||||
|
# 12. Appendix A. Template Global Setting File (config.properties)
|
||||||
## 1.11. Appendix A. Template Global Setting File (config.properties)
|
|
||||||
```properties
|
```properties
|
||||||
schema.type =
|
schema.type =
|
||||||
schema.definition =
|
schema.definition =
|
||||||
@ -812,95 +770,3 @@ reader.receiverQueueSize =
|
|||||||
reader.readerName =
|
reader.readerName =
|
||||||
reader.startMessagePos =
|
reader.startMessagePos =
|
||||||
```
|
```
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
# 2. TODO : Design Revisit -- Advanced Driver Features
|
|
||||||
|
|
||||||
**NOTE**: The following text is based on the original multi-layer API
|
|
||||||
caching design which is not fully implemented at the moment. We need to
|
|
||||||
revisit the original design at some point in order to achieve maximum
|
|
||||||
testing flexibility.
|
|
||||||
|
|
||||||
To summarize, the original caching design has the following key
|
|
||||||
requirements:
|
|
||||||
|
|
||||||
* **Requirement 1**: Each NB Pulsar activity is able to launch and cache
|
|
||||||
multiple **client spaces**
|
|
||||||
* **Requirement 2**: Each client space can launch and cache multiple
|
|
||||||
Pulsar operators of the same type (producer, consumer, etc.)
|
|
||||||
* **Requirement 3**: The size of each Pulsar operator specific cached
|
|
||||||
space can be configurable.
|
|
||||||
|
|
||||||
In the current implementation, only requirement 2 is implemented.
|
|
||||||
|
|
||||||
* For requirement 1, the current implementation only supports one client
|
|
||||||
space per NB Pulsar activity
|
|
||||||
* For requirement 3, the cache space size is not configurable (no limit at
|
|
||||||
the moment)
|
|
||||||
|
|
||||||
## 2.1. Other Activity Parameters
|
|
||||||
|
|
||||||
- **maxcached** - A default value to be applied to `max_clients`,
|
|
||||||
`max_producers`, `max_consumers`.
|
|
||||||
- default: `max_cached=100`
|
|
||||||
- **max_clients** - Clients cache size. This is the number of client
|
|
||||||
instances which are allowed to be cached in the NoSQLBench client
|
|
||||||
runtime. The clients cache automatically maintains a cache of unique
|
|
||||||
client instances internally. default: _maxcached_
|
|
||||||
- **max_operators** - Producers/Consumers/Readers cache size (per client
|
|
||||||
instance). Limits the number of instances which are allowed to be cached
|
|
||||||
per client instance. default: _maxcached_
|
|
||||||
|
|
||||||
## 2.2. API Caching
|
|
||||||
|
|
||||||
This driver is tailored around the multi-tenancy and topic naming scheme
|
|
||||||
that is part of Apache Pulsar. Specifically, you can create an arbitrary
|
|
||||||
number of client instances, producers (per client), and consumers (per
|
|
||||||
client) depending on your testing requirements.
|
|
||||||
|
|
||||||
Further, the topic URI is composed from the provided qualifiers of
|
|
||||||
`persistence`, `tenant`, `namespace`, and `topic`, or you can provide a
|
|
||||||
fully-composed value in the `persistence://tenant/namespace/topic`
|
|
||||||
form.
|
|
||||||
|
|
||||||
### 2.2.1. Instancing Controls
|
|
||||||
|
|
||||||
Normative usage of the Apache Pulsar API follows a strictly enforced
|
|
||||||
binding of topics to producers and consumers. As well, clients may be
|
|
||||||
customized with different behavior for advanced testing scenarios. There
|
|
||||||
is a significant variety of messaging and concurrency schemes seen in
|
|
||||||
modern architectures. Thus, it is important that testing tools rise to the
|
|
||||||
occasion by letting users configure their testing runtimes to emulate
|
|
||||||
applications as they are found in practice. To this end, the NoSQLBench
|
|
||||||
driver for Apache Pulsar provides a set controls within its op template
|
|
||||||
format which allow for flexible yet intuitive instancing in the client
|
|
||||||
runtime. This is enabled directly by using nominative variables for
|
|
||||||
instance names where needed. When the instance names are not provided for
|
|
||||||
an operation, defaults are used to emulate a simple configuration.
|
|
||||||
|
|
||||||
Since this is a new capability in a NoSQLBench driver, how it works is
|
|
||||||
explained below:
|
|
||||||
|
|
||||||
When a pulsar cycles is executed, the operation is synthesized from the op
|
|
||||||
template fields as explained below under _Op Fields_. This happens in a
|
|
||||||
specific order:
|
|
||||||
|
|
||||||
1. The client instance name is resolved. If a `client` field is provided,
|
|
||||||
this is taken as the client instance name. If not, it is set
|
|
||||||
to `default`.
|
|
||||||
2. The named client instance is fetched from the cache, or created and
|
|
||||||
cached if it does not yet exist.
|
|
||||||
3. The topic_uri is resolved. This is the value to be used with
|
|
||||||
`.topic(...)` calls in the API. The op fields below explain how to
|
|
||||||
control this value.
|
|
||||||
4. For _send_ operations, a producer is named and created if needed. By
|
|
||||||
default, the producer is named after the topic_uri above. You can
|
|
||||||
override this by providing a value for `producer`.
|
|
||||||
5. For _recv_ operations, a consumer is named and created if needed. By
|
|
||||||
default, the consumer is named after the topic_uri above. You can
|
|
||||||
override this by providing a value for `consumer`.
|
|
||||||
|
|
||||||
The most important detail for understanding the instancing controls is
|
|
||||||
that clients, producers, and consumers are all named and cached in the
|
|
||||||
specific order above.
|
|
||||||
|
Loading…
Reference in New Issue
Block a user