This commit is contained in:
Jonathan Shook
2021-03-30 10:38:44 -05:00
23 changed files with 736 additions and 158 deletions

View File

@@ -0,0 +1,114 @@
# nb -v run driver=http yaml=http-graphql-keyvalue tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
# TODO
# - /graphqlv2 will change in future, adapt when needed
# - do we need a truncate schema / namespace at the end
description: |
This workload emulates a key-value data model and access patterns.
This should be identical to the cql variant except for:
- Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options.
- There is no instrumentation with the http driver.
- There is no async mode with the http driver.
Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080).
scenarios:
default:
- run driver=http tags==phase:schema threads==1 cycles==UNDEF
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
# To enable an optional weighted set of hosts in place of a load balancer
# Examples
# single host: stargate_host=host1
# multiple hosts: stargate_host=host1,host2,host3
# multiple weighted hosts: stargate_host=host1:3,host2:7
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
# http request id
request_id: ToHashedUUID(); ToString();
seq_key: Mod(<<keycount:1000000000>>); ToString() -> String
seq_value: Hash(); Mod(<<valuecount:1000000000>>); ToString() -> String
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
blocks:
- tags:
phase: schema
statements:
- create-keyspace: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n createNamespace(name: \"<<keyspace:gqlsf_keyvalue>>\", replicas: <<rf:1>>) {\n namespace {\n name\n }\n }\n}\n"
}
tags:
name: create-keyspace
- create-gql-schema : POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n deploySchema(namespace: \"<<keyspace:gqlsf_keyvalue>>\", schema: \"\"\"\n type KeyValue @cql_input {\n key: String! @cql_column(partitionKey: true)\n value: String!\n }\n type Query {\n getKeyValue(\n key: String!,\n ): KeyValue\n }\n type Mutation {\n \t\tinsertKeyValue(keyValue: KeyValueInput): KeyValue\n }\n \"\"\") {\n version\n }\n}\n"
}
tags:
name: create-gql-schema
- name: rampup
tags:
phase: rampup
statements:
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_keyvalue>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertKeyValue(keyValue: {key: \"{seq_key}\", value: \"{seq_value}\"}) {\n key\n value\n }\n}\n"
}
tags:
name: rampup-insert
- name: main-read
tags:
phase: main
type: read
params:
ratio: <<read_ratio:1>>
statements:
- main-select: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_keyvalue>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"{\n getKeyValue(key: \"rw_key\") {\n key\n value\n }\n}\n"
}
tags:
name: main-select
- name: main-write
tags:
phase: main
type: write
params:
ratio: <<write_ratio:9>>
statements:
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_keyvalue>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertKeyValue(keyValue: {key: \"{rw_key}\", value: \"{rw_value}\"}) {\n key\n value\n }\n}\n"
}
tags:
name: main-write

View File

@@ -0,0 +1,123 @@
# nb -v run driver=http yaml=http-graphql-tabular tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
# TODO
# - /graphqlv2 will change in future, adapt when needed
# - do we need a truncate schema / namespace at the end
# - rest uses limit as it queries only by a single primary key, we can not map this to GQL (also should data be clustering key)
description: |
This workload emulates a time-series data model and access patterns.
This should be identical to the cql variant except for:
- We need to URLEncode the `data` and `data_write` bindings because newlines can't be sent in REST calls.
- Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options.
- There is no instrumentation with the http driver.
- There is no async mode with the http driver.
Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080).
scenarios:
default:
- run driver=http tags==phase:schema threads==1 cycles==UNDEF
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
# To enable an optional weighted set of hosts in place of a load balancer
# Examples
# single host: stargate_host=host1
# multiple hosts: stargate_host=host1,host2,host3
# multiple weighted hosts: stargate_host=host1:3,host2:7
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
# http request id
request_id: ToHashedUUID(); ToString();
# for ramp-up and verify
part_layout: Div(<<partsize:1000000>>); ToString() -> String
clust_layout: Mod(<<partsize:1000000>>); ToString() -> String
data: HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150); URLEncode();
# for read
limit: Uniform(1,10) -> int
part_read: Uniform(0,<<partcount:100>>)->int; ToString() -> String
clust_read: Add(1); Uniform(0,<<partsize:1000000>>)->int; ToString() -> String
# for write
part_write: Hash(); Uniform(0,<<partcount:100>>)->int; ToString() -> String
clust_write: Hash(); Add(1); Uniform(0,<<partsize:1000000>>)->int; ToString() -> String
data_write: Hash(); HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150); URLEncode();
blocks:
- tags:
phase: schema
statements:
- create-keyspace: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n createNamespace(name: \"<<keyspace:gqlsf_tabular>>\", replicas: <<rf:1>>) {\n namespace {\n name\n }\n }\n}\n"
}
tags:
name: create-keyspace
- create-gql-schema : POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n deploySchema(namespace: \"<<keyspace:gqlsf_tabular>>\", schema: \"\"\"\n type Tabular @cql_input {\n part: String! @cql_column(partitionKey: true)\n clust: String! @cql_column(partitionKey: true)\n data: String! \n }\n type SelectTabularResult @cql_payload {\n \t\tdata: [Tabular]\n \t\tpagingState: String\n }\n type Query {\n getTabulars(\n part: String!,\n clust: String!,\n pagingState: String @cql_pagingState\n ): SelectTabularResult @cql_select(pageSize: 10)\n }\n type Mutation {\n \t\tinsertTabular(tabular: TabularInput): Tabular\n }\n \"\"\") {\n version\n }\n}\n"
}
tags:
name: create-gql-schema
- name: rampup
tags:
phase: rampup
statements:
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_tabular>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertTabular(tabular: {part: \"{part_layout}\", clust: \"{clust_layout}\", data: \"{data}\"}) {\n part\n clust\n data\n }\n}\n"
}
tags:
name: rampup-insert
- name: main-read
tags:
phase: main
type: read
params:
ratio: <<read_ratio:1>>
statements:
- main-select: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_tabular>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"{\n getTabulars(part: \"{part_read}\", clust: \"{clust_read}\") {\n data {\n part\n clust\n data\n }\n pagingState\n }\n}\n"
}
tags:
name: main-select
- name: main-write
tags:
phase: main
type: write
params:
ratio: <<write_ratio:9>>
statements:
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_tabular>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertTabular(tabular: {part: \"{part_write}\", clust: \"{clust_write}\", data: \"{data_write}\"}) {\n part\n clust\n data\n }\n}\n"
}
tags:
name: main-write

View File

@@ -0,0 +1,121 @@
# nb -v run driver=http yaml=http-graphql-timeseries tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
# TODO
# - /graphqlv2 will change in future, adapt when needed
# - do we need a truncate schema / namespace at the end
# - support for a paging state?
# - infinity is not handled
# - compression and compaction not defined (limitation in the GQL schema-first API)
description: |
This workload emulates a time-series data model and access patterns.
This should be identical to the cql variant except for:
- We can't specify the write timestamp to make the write idempotent like we can with cql.
- The `time` binding has to have a StringDateWrapper to get the exact format that the REST API/GraphQL needs; See https://github.com/stargate/stargate/issues/532.
- We need to URLEncode the `data` binding because newlines can't be sent in REST calls.
- Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options.
- There is no instrumentation with the http driver.
- There is no async mode with the http driver.
Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080).
scenarios:
default:
- run driver=http tags==phase:schema threads==1 cycles==UNDEF
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
# To enable an optional weighted set of hosts in place of a load balancer
# Examples
# single host: stargate_host=host1
# multiple hosts: stargate_host=host1,host2,host3
# multiple weighted hosts: stargate_host=host1:3,host2:7
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
# http request id
request_id: ToHashedUUID(); ToString();
machine_id: Mod(<<sources:10000>>); ToHashedUUID() -> java.util.UUID
sensor_name: HashedLineToString('data/variable_words.txt')
time: Mul(<<timespeed:100>>L); Div(<<sources:10000>>L); StringDateWrapper("yyyy-MM-dd'T'hh:mm:ss'Z");
sensor_value: Normal(0.0,5.0); Add(100.0) -> double
station_id: Div(<<sources:10000>>);Mod(<<stations:100>>); ToHashedUUID() -> java.util.UUID
data: HashedFileExtractToString('data/lorem_ipsum_full.txt',800,1200); URLEncode();
blocks:
- tags:
phase: schema
statements:
- create-keyspace: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n createNamespace(name: \"<<keyspace:gqlsf_timeseries>>\", replicas: <<rf:1>>) {\n namespace {\n name\n }\n }\n}\n"
}
tags:
name: create-keyspace
- create-gql-schema : POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n deploySchema(namespace: \"<<keyspace:gqlsf_timeseries>>\", schema: \"\"\"\n type Iot @cql_input {\n machine_id: Uuid! @cql_column(partitionKey: true)\n sensor_name: String! @cql_column(partitionKey: true)\n time: Timestamp! @cql_column(clusteringOrder: DESC)\n sensor_value: Float!\n \tstation_id: Uuid!\n data: String!\n }\n type SelectIotResult @cql_payload {\n \t\tdata: [Iot]\n \t\tpagingState: String\n }\n type Query {\n getIots(\n machine_id: Uuid!,\n sensor_name: String!,\n pagingState: String @cql_pagingState\n ): SelectIotResult @cql_select(pageSize: 10)\n }\n type Mutation {\n \t\tinsertIot(iot: IotInput): Iot\n }\n \"\"\") {\n version\n }\n}\n"
}
tags:
name: create-gql-schema
- name: rampup
tags:
phase: rampup
statements:
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_timeseries>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertIot(iot: {machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\", time: \"{time}\", sensor_value: {sensor_value}, station_id: \"{station_id}\", data: \"{data}\"}) {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n}\n"
}
tags:
name: rampup-insert
- name: main-read
tags:
phase: main
type: read
params:
ratio: <<read_ratio:1>>
statements:
- main-select: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_timeseries>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"{\n getIots(machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\") {\n data {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n }\n}\n"
}
tags:
name: main-select
- name: main-write
tags:
phase: main
type: write
params:
ratio: <<write_ratio:9>>
statements:
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_timeseries>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertIot(iot: {machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\", time: \"{time}\", sensor_value: {sensor_value}, station_id: \"{station_id}\", data: \"{data}\"}) {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n}\n"
}
tags:
name: main-write

View File

@@ -29,6 +29,10 @@ public class PulsarAction implements SyncAction {
@Override
public int runCycle(long cycle) {
// let's fail the action if some async operation failed
activity.failOnAsyncOperationFailure();
long start = System.nanoTime();
PulsarOp pulsarOp;
@@ -44,8 +48,11 @@ public class PulsarAction implements SyncAction {
}
for (int i = 0; i < maxTries; i++) {
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
Timer.Context ctx = activity.getExecuteTimer().time();
try {
// it is up to the pulsarOp to call Context#close when the activity is executed
// this allows us to track time for async operations
pulsarOp.run(ctx::close);
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity

View File

@@ -20,7 +20,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@@ -42,6 +48,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
private NBErrorHandler errorhandler;
private OpSequence<OpDispenser<PulsarOp>> sequencer;
private volatile Throwable asyncOperationFailure;
// private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
@@ -77,18 +84,28 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder = adminBuilder.authentication(authPluginClassName, authParams);
// String tokenFileName = StringUtils.removeStart(authParams, "file://");
// File tokenFile = new File(tokenFileName);
// String token;
// try {
// token = FileUtils.readFileToString(tokenFile, StandardCharsets.UTF_8);
// token = StringUtils.normalizeSpace(token);
// }
// catch (IOException ioe) {
// throw new RuntimeException("Failed to read the specified (\"client.authParams\") token file: " + tokenFileName + "!");
// }
// adminBuilder.authentication(AuthenticationFactory.token(token));
adminBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
adminBuilder = adminBuilder
adminBuilder
.useKeyStoreTls(useTls)
.allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
adminBuilder = adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
pulsarAdmin = adminBuilder.build();
@@ -171,4 +188,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public Histogram getMessagesizeHistogram() {
return messagesizeHistogram;
}
public void failOnAsyncOperationFailure() {
if (asyncOperationFailure != null) {
throw new RuntimeException(asyncOperationFailure);
}
}
public void asyncOperationFailed(Throwable ex) {
this.asyncOperationFailure = asyncOperationFailure;
}
}

View File

@@ -6,6 +6,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
@@ -58,11 +59,14 @@ public class PulsarSpace {
createPulsarSchemaFromConf();
try {
List<String> stringList = pulsarAdmin.clusters().getClusters();
Clusters clusters = pulsarAdmin.clusters();
List<String> stringList = clusters.getClusters();
CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator());
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get Pulsar cluster metadata!");
String errMsg = "Fail to create PulsarClient from global configuration: " + e.getMessage();
logger.error(errMsg);
throw new RuntimeException(errMsg);
}
}
@@ -79,8 +83,9 @@ public class PulsarSpace {
.serviceUrl(pulsarSvcUrl)
.build();
} catch (PulsarClientException pce) {
logger.error("Fail to create PulsarClient from global configuration!");
throw new RuntimeException("Fail to create PulsarClient from global configuration!");
String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage();
logger.error(errMsg);
throw new RuntimeException(errMsg);
}
}

View File

@@ -2,7 +2,6 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.Set;
import java.util.function.LongFunction;
@@ -17,18 +16,18 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminMapper extends PulsarOpMapper {
public class PulsarAdminCrtTennamMapper extends PulsarOpMapper {
private final LongFunction<Set<String>> adminRolesFunc;
private final LongFunction<Set<String>> allowedClustersFunc;
private final LongFunction<String> tenantFunc;
private final LongFunction<String> namespaceFunc;
public PulsarAdminMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) {
public PulsarAdminCrtTennamMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) {
super(cmdTpl, clientSpace);
this.adminRolesFunc = adminRolesFunc;
this.allowedClustersFunc = allowedClustersFunc;
@@ -43,7 +42,7 @@ public class PulsarAdminMapper extends PulsarOpMapper {
String tenant = tenantFunc.apply(value);
String namespace = namespaceFunc.apply(value);
return new PulsarAdminOp(
return new PulsarAdminCrtTennamOp(
clientSpace,
adminRoleSet,
allowedClusterSet,

View File

@@ -1,27 +1,18 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminOp implements PulsarOp {
public class PulsarAdminCrtTennamOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class);
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTennamOp.class);
private final PulsarSpace clientSpace;
private final Set<String> adminRoleSet;
@@ -29,11 +20,11 @@ public class PulsarAdminOp implements PulsarOp {
private final String tenant;
private final String namespace;
public PulsarAdminOp(PulsarSpace clientSpace,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant,
String namespace) {
public PulsarAdminCrtTennamOp(PulsarSpace clientSpace,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant,
String namespace) {
this.clientSpace = clientSpace;
this.adminRoleSet = adminRoleSet;
this.allowedClusterSet = allowedClusterSet;

View File

@@ -0,0 +1,66 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminCrtTopMapper extends PulsarOpMapper {
private final LongFunction<String> topicUriFunc;
private final LongFunction<String> enablePartionFunc;
private final LongFunction<String> partitionNumFunc;
public PulsarAdminCrtTopMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<String> topicUriFunc,
LongFunction<String> enablePartionFunc,
LongFunction<String> partitionNumFunc) {
super(cmdTpl, clientSpace);
this.topicUriFunc = topicUriFunc;
this.enablePartionFunc = enablePartionFunc;
this.partitionNumFunc = partitionNumFunc;
}
@Override
public PulsarOp apply(long value) {
String topicUri = topicUriFunc.apply(value);
String enablePartitionStr = enablePartionFunc.apply(value);
String partitionNumStr = partitionNumFunc.apply(value);
if ( StringUtils.isBlank(topicUri) ) {
throw new RuntimeException("\"topic_uri\" parameter can't be empty when creating a Pulsar topic!");
}
boolean partitionTopic = BooleanUtils.toBoolean(enablePartitionStr);
boolean invalidPartStr;
int partitionNum = 0;
if ( StringUtils.isBlank(partitionNumStr) || !StringUtils.isNumeric(partitionNumStr) ) {
invalidPartStr = true;
} else {
partitionNum = Integer.valueOf(partitionNumStr);
invalidPartStr = (partitionNum <= 0);
}
if (partitionTopic && invalidPartStr) {
throw new RuntimeException("Invalid specified value for \"partition_num\" parameter when creating partitioned topic!");
}
return new PulsarAdminCrtTopOp(
clientSpace,
topicUri,
partitionTopic,
partitionNum);
}
}

View File

@@ -0,0 +1,60 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
public class PulsarAdminCrtTopOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTopOp.class);
private final PulsarSpace clientSpace;
private final String topicUri;
private final boolean partitionTopic;
private final int partitionNum;
public PulsarAdminCrtTopOp(PulsarSpace clientSpace,
String topicUri,
boolean partitionTopic,
int partitionNum) {
this.clientSpace = clientSpace;
this.topicUri = topicUri;
this.partitionTopic = partitionTopic;
this.partitionNum = partitionNum;
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
int statusCode = e.getStatusCode();
// 409 conflict: resource already exists
if ( (statusCode >= 400) && (statusCode != 409) ) {
throw new RuntimeException(finalErrMsg);
}
}
@Override
public void run() {
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
try {
if (!partitionTopic) {
topics.createNonPartitionedTopic(topicUri);
}
else {
topics.createPartitionedTopic(topicUri, partitionNum);
}
} catch (PulsarAdminException e) {
String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d",
topicUri,
partitionTopic,
partitionNum);
processPulsarAdminException(e, errMsg);
}
}
}

View File

@@ -1,17 +1,14 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.pulsar.client.api.BatchMessageContainer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.DefaultBatcherBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerEndOp implements PulsarOp {
public class PulsarBatchProducerEndOp extends SyncPulsarOp {
@Override
public void run() {
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();

View File

@@ -14,7 +14,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerOp implements PulsarOp {
public class PulsarBatchProducerOp extends SyncPulsarOp {
private final Schema<?> pulsarSchema;
private final String msgKey;

View File

@@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerStartOp implements PulsarOp {
public class PulsarBatchProducerStartOp extends SyncPulsarOp {
// TODO: ensure sane container lifecycle management
public final static ThreadLocal<List<CompletableFuture<MessageId>>> threadLocalBatchMsgContainer = new ThreadLocal<>();

View File

@@ -9,10 +9,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class PulsarConsumerOp implements PulsarOp {
public class PulsarConsumerOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);

View File

@@ -3,6 +3,12 @@ package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Pulsar Operations including Producers and Consumers.
*/
public interface PulsarOp extends Runnable {
public interface PulsarOp {
/**
* Execute the operation, invoke the timeTracker when the operation ended.
* The timeTracker can be invoked in a separate thread, it is only used for metrics.
* @param timeTracker
*/
void run(Runnable timeTracker);
}

View File

@@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
@@ -24,8 +25,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<Boolean> asyncApiFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final PulsarActivity pulsarActivity;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
@@ -33,15 +33,13 @@ public class PulsarProducerMapper extends PulsarOpMapper {
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc,
Counter bytesCounter,
Histogram messagesizeHistogram) {
PulsarActivity pulsarActivity) {
super(cmdTpl, clientSpace);
this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.pulsarActivity = pulsarActivity;
}
@Override
@@ -57,7 +55,7 @@ public class PulsarProducerMapper extends PulsarOpMapper {
asyncApi,
msgKey,
msgPayload,
bytesCounter,
messagesizeHistogram);
pulsarActivity
);
}
}

View File

@@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
@@ -25,25 +26,26 @@ public class PulsarProducerOp implements PulsarOp {
private final boolean asyncPulsarOp;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final PulsarActivity pulsarActivity;
public PulsarProducerOp(Producer<?> producer,
Schema<?> schema,
boolean asyncPulsarOp,
String key,
String payload,
Counter bytesCounter,
Histogram messagesizeHistogram) {
PulsarActivity pulsarActivity) {
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.pulsarActivity = pulsarActivity;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
}
@Override
public void run() {
public void run(Runnable timeTracker) {
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
@@ -80,17 +82,18 @@ public class PulsarProducerOp implements PulsarOp {
logger.trace("failed sending message");
throw new RuntimeException(pce);
}
timeTracker.run();
} else {
try {
// we rely on blockIfQueueIsFull in order to throttle the request in this case
CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
future.get();
/*.thenRun(() -> {
// System.out.println("Producing message succeeded: key - " + msgKey + "; payload - " + msgPayload);
}).exceptionally(ex -> {
System.out.println("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
return ex;
})*/
future.whenComplete((messageId, error) -> {
timeTracker.run();
}).exceptionally(ex -> {
logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
pulsarActivity.asyncOperationFailed(ex);
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);
}

View File

@@ -8,7 +8,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
public class PulsarReaderOp implements PulsarOp {
public class PulsarReaderOp extends SyncPulsarOp {
private final Reader<?> reader;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;

View File

@@ -50,9 +50,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
this.opFunc = resolve();
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
Supplier<LongFunction<PulsarOp>> opSupplier = scope.supplier(this::resolve);
}
@Override
@@ -75,53 +72,32 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
// Global parameter: topic_uri (applies only to non-Admin API)
// Global parameter: topic_uri
LongFunction<String> topicUriFunc = (l) -> null;
// Global parameter: async_api (applies only to non-Admin API)
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (!StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label)) {
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (cmdTpl.containsKey("topic")) {
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topicUriFunc = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topicUriFunc = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "");
}
}
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
asyncApiFunc = (l) -> false;
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
}
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
}
// TODO: Complete implementation for websocket-producer and managed-ledger
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label) ) {
return resolveAdminRequest(clientSpace);
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TENNAME.label) ) {
return resolveAdminCrtTenname(clientSpace);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TOP.label)) {
return resolveAdminCrtParttop(clientSpace, topicUriFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
@@ -139,7 +115,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
}
private LongFunction<PulsarOp> resolveAdminRequest(PulsarSpace clientSpace) {
// Admin API: create tenant and namespace
private LongFunction<PulsarOp> resolveAdminCrtTenname(PulsarSpace clientSpace) {
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
@@ -187,7 +164,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
namespaceFunc = (l) -> null;
}
return new PulsarAdminMapper(
return new PulsarAdminCrtTennamMapper(
cmdTpl,
clientSpace,
adminRolesFunc,
@@ -196,6 +173,33 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
namespaceFunc);
}
// Admin API: create partitioned topic
private LongFunction<PulsarOp> resolveAdminCrtParttop(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_fun
) {
LongFunction<String> enablePartionFunc = (l) -> null;
if (cmdTpl.isStatic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getStatic("enable_partition");
} else if (cmdTpl.isDynamic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getDynamic("enable_partition", l);
}
LongFunction<String> partitionNumFunc = (l) -> null;
if (cmdTpl.isStatic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getStatic("partition_num");
} else if (cmdTpl.isDynamic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getDynamic("partition_num", l);
}
return new PulsarAdminCrtTopMapper(
cmdTpl,
clientSpace,
topic_uri_fun,
enablePartionFunc,
partitionNumFunc);
}
private LongFunction<PulsarOp> resolveMsgSend(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
@@ -242,8 +246,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
async_api_func,
keyFunc,
valueFunc,
pulsarActivity.getBytesCounter(),
pulsarActivity.getMessagesizeHistogram());
pulsarActivity);
}
private LongFunction<PulsarOp> resolveMsgConsume(

View File

@@ -0,0 +1,17 @@
package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Sync Pulsar Operations including Producers and Consumers.
*/
public abstract class SyncPulsarOp implements PulsarOp {
public void run(Runnable timeTracker) {
try {
this.run();
} finally {
timeTracker.run();
}
}
public abstract void run();
}

View File

@@ -26,7 +26,8 @@ public class PulsarActivityUtil {
// Supported message operation types
// TODO: websocket-producer and managed-ledger
public enum OP_TYPES {
ADMIN("admin"),
ADMIN_CRT_TENNAME("admin-crt-tennam"),
ADMIN_CRT_TOP("admin-crt-top"),
BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"),
@@ -68,7 +69,7 @@ public class PulsarActivityUtil {
public enum CLNT_CONF_KEY {
serviceUrl("serviceUrl"),
authPulginClassName("authPluginClassName"),
authParams("AuthParams"),
authParams("authParams"),
pperationTimeoutMs("operationTimeoutMs"),
statsIntervalSeconds("statsIntervalSeconds"),
numIoThreads("numIoThreads"),

View File

@@ -19,20 +19,32 @@ params:
async_api: "false"
blocks:
- name: admin-block
- name: create-tennam-block
tags:
phase: admin-api
phase: create-tenant-namespace
admin_task: true
statements:
- name: s1
optype: admin
optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}"
namespace: "default"
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
partition_num: "5"
- name: batch-producer-block
tags:
phase: batch-producer
admin_task: false
statements:
- name: s1
optype: batch-msg-send-start
@@ -57,6 +69,7 @@ blocks:
- name: producer-block
tags:
phase: producer
admin_task: false
statements:
- name: s1
optype: msg-send
@@ -73,6 +86,7 @@ blocks:
- name: consumer-block
tags:
phase: consumer
admin_task: false
statements:
- name: s1
optype: msg-consume
@@ -85,6 +99,7 @@ blocks:
- name: reader-block
tags:
phase: reader
admin_task: false
statements:
- name: s1
optype: msg-read

View File

@@ -4,15 +4,16 @@
- [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure)
- [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters)
- [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details)
- [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block)
- [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block)
- [1.4.3. Producer Command Block](#143-producer-command-block)
- [1.4.4. Consumer Command Block](#144-consumer-command-block)
- [1.4.5. Reader Command Block](#145-reader-command-block)
- [1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace](#141-pulsar-admin-api-command-block---create-tenant-and-namespace)
- [1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)](#142-pulsar-admin-api-command-block---create-topic-partitioned-or-regular)
- [1.4.3. Batch Producer Command Block](#143-batch-producer-command-block)
- [1.4.4. Producer Command Block](#144-producer-command-block)
- [1.4.5. Consumer Command Block](#145-consumer-command-block)
- [1.4.6. Reader Command Block](#146-reader-command-block)
- [1.5. Schema Support](#15-schema-support)
- [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters)
- [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example)
- [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties)
- [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties)
- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features)
- [2.1. Other Activity Parameters](#21-other-activity-parameters)
- [2.2. API Caching](#22-api-caching)
@@ -72,25 +73,25 @@ There are multiple sections in this file that correspond to different groups of
* This section defines all configuration settings that are related
with defining a PulsarClient object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
* **Pulsar Producer related settings**:
* All settings under this section starts with **producer** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Producer object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
* **Pulsar Consumer related settings**:
* All settings under this section starts with **consumer** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Consumer object.
*
See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
* **Pulsar Reader related settings**:
* All settings under this section starts with **reader** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Reader object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
In the future, when the support for other types of Pulsar workloads is
added in NB Pulsar driver, there will be corresponding configuration
@@ -141,10 +142,10 @@ params:
blocks:
- name: <command_block_1>
tags:
phase: <command_bock_filtering_identifier>
phase: <command_bock_identifier>
statements:
- name: <statement_name_1>
optype: <statement_filtering_identifier>
optype: <statement_identifier>
... <statement_specific_parameters> ...
- name: <statement_name_2>
... ...
@@ -237,43 +238,68 @@ cycle level, **the ycle level setting will take priority!**
## 1.4. Pulsar Driver Yaml File - Command Block Details
### 1.4.1. Pulsar Admin API Command Block
### 1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace
**NOTE**: this functionality is only partially implemented at the moment
and doesn't function yet.
Currently, the Pulsar Admin API Block is (planned) to only support
creating Pulsar tenants and namespaces. It has the following format:
This Pulsar Admin API Block is used to create Pulsar tenants and namespaces. It has the following format:
```yaml
- name: admin-block
- name: create-tennam-block
tags:
phase: create-tenant-namespace
admin_task: true
statements:
- name: s1
optype: create-tenant
optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}"
- name: s2
optype: create-namespace
namespace: "{namespace}"
namespace: "default"
```
In this command block, there are 2 statements (s1 and s2):
In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar tenant
* (Mandatory) **optype (create-tenant)** is the statement identifier
* Statement **s1** is used for creating a Pulsar tenant and a namespace
* (Mandatory) **optype (admin-crt-tennam)** is the statement identifier
for this statement
* (Mandatory) **tenant** is the only statement parameter that
specifies the Pulsar tenant name which can either be dynamically
bound or statically assigned.
* Statement **s2** is used for creating a Pulsar namespace
* (Mandatory) **optype (create-namespace)** is the statement
identifier for this statement
* (Mandatory) **namespace** is the only statement parameter that
specifies the Pulsar namespace under the tenant created by statement
s1. Its name can either be dynamically bound or statically assigned.
* (Optional) **allowed_clusters** must be statically bound and it
specifies the cluster list that is allowed for a tenant.
* (Optional) **admin_roles** must be statically bound and it specifies
the super user role that is associated with a tenant.
* (Mandatory) **tenant** is the Pulsar tenant name to be created. It
can either be dynamically or statically bound.
* (Mandatory) **namespace** is the Pulsar namespace name to be created
under the above tenant. It also can be dynamically or statically bound.
### 1.4.2. Batch Producer Command Block
### 1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)
This Pulsar Admin API Block is used to create Pulsar topics. It has the following format:
```yaml
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
partition_num: "5"
```
In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar tenant and a namespace
* (Mandatory) **optype (admin-crt-top)** is the statement identifier
for this statement
* (Mandatory) **enable_partition** specifies whether to create a
partitioned topic. It can either be dynamically or statically bound.
* (Mandatory) **partition_num** specifies the number of partitions if
a partitioned topic is to be created. It also can be dynamically or
statically bound.
**NOTE**: The topic name is bounded by the document level parameter "topic_uri".
### 1.4.3. Batch Producer Command Block
Batch producer command block is used to produce a batch of messages all at
once by one NB cycle execution. A typical format of this command block is
@@ -334,7 +360,7 @@ ratios: 1, <batch_num>, 1.
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it
is default to 1.
### 1.4.3. Producer Command Block
### 1.4.4. Producer Command Block
This is the regular Pulsar producer command block that produces one Pulsar
message per NB cycle execution. A typical format of this command block is
@@ -370,7 +396,7 @@ This command block only has 1 statements (s1):
* (Mandatory) **msg_payload** specifies the payload of the generated
message
### 1.4.4. Consumer Command Block
### 1.4.5. Consumer Command Block
This is the regular Pulsar consumer command block that consumes one Pulsar
message per NB cycle execution. A typical format of this command block is
@@ -410,7 +436,7 @@ This command block only has 1 statements (s1):
**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**.
### 1.4.5. Reader Command Block
### 1.4.6. Reader Command Block
This is the regular Pulsar reader command block that reads one Pulsar
message per NB cycle execution. A typical format of this command block is
@@ -528,7 +554,7 @@ environment.
```
## Appendix A. Template Global Setting File (config.properties)
## 1.8. Appendix A. Template Global Setting File (config.properties)
```properties
schema.type =
schema.definition =