Merge remote-tracking branch 'origin/main' into impl/pulsar-async-adv

This commit is contained in:
Enrico Olivelli 2021-03-30 17:00:05 +02:00
commit 937b10a71c
16 changed files with 667 additions and 114 deletions

View File

@ -1,4 +1,4 @@
# nb -v run driver=http yaml=http-keyvalue tags=phase:schema stargate_host=my_stargate_host host=my_stargate_host auth_token=$AUTH_TOKEN
# nb -v run driver=http yaml=http-rest-keyvalue tags=phase:schema stargate_host=my_stargate_host host=my_stargate_host auth_token=$AUTH_TOKEN
description: |
This workload emulates a key-value data model and access patterns.
This should be identical to the cql variant except for:

View File

@ -1,4 +1,4 @@
# nb -v http-tabular rampup-cycles=1E6 main-cycles=1E9 stargate_host=my_stargate_host host=my_stargate_host auth_token=$AUTH_TOKEN
# nb -v http-rest-tabular rampup-cycles=1E6 main-cycles=1E9 stargate_host=my_stargate_host host=my_stargate_host auth_token=$AUTH_TOKEN
description: |
This workload emulates a time-series data model and access patterns.
This should be identical to the cql variant except for:

View File

@ -1,4 +1,4 @@
# nb -v run driver=http yaml=http-iot tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
# nb -v run driver=http yaml=http-rest-timeseries tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
description: |
This workload emulates a time-series data model and access patterns.
This should be identical to the cql variant except for:

View File

@ -0,0 +1,114 @@
# nb -v run driver=http yaml=http-graphql-keyvalue tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
# TODO
# - /graphqlv2 will change in future, adapt when needed
# - do we need a truncate schema / namespace at the end
description: |
This workload emulates a key-value data model and access patterns.
This should be identical to the cql variant except for:
- Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options.
- There is no instrumentation with the http driver.
- There is no async mode with the http driver.
Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080).
scenarios:
default:
- run driver=http tags==phase:schema threads==1 cycles==UNDEF
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
# To enable an optional weighted set of hosts in place of a load balancer
# Examples
# single host: stargate_host=host1
# multiple hosts: stargate_host=host1,host2,host3
# multiple weighted hosts: stargate_host=host1:3,host2:7
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
# http request id
request_id: ToHashedUUID(); ToString();
seq_key: Mod(<<keycount:1000000000>>); ToString() -> String
seq_value: Hash(); Mod(<<valuecount:1000000000>>); ToString() -> String
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
blocks:
- tags:
phase: schema
statements:
- create-keyspace: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n createNamespace(name: \"<<keyspace:gqlsf_keyvalue>>\", replicas: <<rf:1>>) {\n namespace {\n name\n }\n }\n}\n"
}
tags:
name: create-keyspace
- create-gql-schema : POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n deploySchema(namespace: \"<<keyspace:gqlsf_keyvalue>>\", schema: \"\"\"\n type KeyValue @cql_input {\n key: String! @cql_column(partitionKey: true)\n value: String!\n }\n type Query {\n getKeyValue(\n key: String!,\n ): KeyValue\n }\n type Mutation {\n \t\tinsertKeyValue(keyValue: KeyValueInput): KeyValue\n }\n \"\"\") {\n version\n }\n}\n"
}
tags:
name: create-gql-schema
- name: rampup
tags:
phase: rampup
statements:
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_keyvalue>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertKeyValue(keyValue: {key: \"{seq_key}\", value: \"{seq_value}\"}) {\n key\n value\n }\n}\n"
}
tags:
name: rampup-insert
- name: main-read
tags:
phase: main
type: read
params:
ratio: <<read_ratio:1>>
statements:
- main-select: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_keyvalue>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"{\n getKeyValue(key: \"rw_key\") {\n key\n value\n }\n}\n"
}
tags:
name: main-select
- name: main-write
tags:
phase: main
type: write
params:
ratio: <<write_ratio:9>>
statements:
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_keyvalue>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertKeyValue(keyValue: {key: \"{rw_key}\", value: \"{rw_value}\"}) {\n key\n value\n }\n}\n"
}
tags:
name: main-write

View File

@ -0,0 +1,123 @@
# nb -v run driver=http yaml=http-graphql-tabular tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
# TODO
# - /graphqlv2 will change in future, adapt when needed
# - do we need a truncate schema / namespace at the end
# - rest uses limit as it queries only by a single primary key, we can not map this to GQL (also should data be clustering key)
description: |
This workload emulates a time-series data model and access patterns.
This should be identical to the cql variant except for:
- We need to URLEncode the `data` and `data_write` bindings because newlines can't be sent in REST calls.
- Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options.
- There is no instrumentation with the http driver.
- There is no async mode with the http driver.
Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080).
scenarios:
default:
- run driver=http tags==phase:schema threads==1 cycles==UNDEF
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
# To enable an optional weighted set of hosts in place of a load balancer
# Examples
# single host: stargate_host=host1
# multiple hosts: stargate_host=host1,host2,host3
# multiple weighted hosts: stargate_host=host1:3,host2:7
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
# http request id
request_id: ToHashedUUID(); ToString();
# for ramp-up and verify
part_layout: Div(<<partsize:1000000>>); ToString() -> String
clust_layout: Mod(<<partsize:1000000>>); ToString() -> String
data: HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150); URLEncode();
# for read
limit: Uniform(1,10) -> int
part_read: Uniform(0,<<partcount:100>>)->int; ToString() -> String
clust_read: Add(1); Uniform(0,<<partsize:1000000>>)->int; ToString() -> String
# for write
part_write: Hash(); Uniform(0,<<partcount:100>>)->int; ToString() -> String
clust_write: Hash(); Add(1); Uniform(0,<<partsize:1000000>>)->int; ToString() -> String
data_write: Hash(); HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150); URLEncode();
blocks:
- tags:
phase: schema
statements:
- create-keyspace: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n createNamespace(name: \"<<keyspace:gqlsf_tabular>>\", replicas: <<rf:1>>) {\n namespace {\n name\n }\n }\n}\n"
}
tags:
name: create-keyspace
- create-gql-schema : POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n deploySchema(namespace: \"<<keyspace:gqlsf_tabular>>\", schema: \"\"\"\n type Tabular @cql_input {\n part: String! @cql_column(partitionKey: true)\n clust: String! @cql_column(partitionKey: true)\n data: String! \n }\n type SelectTabularResult @cql_payload {\n \t\tdata: [Tabular]\n \t\tpagingState: String\n }\n type Query {\n getTabulars(\n part: String!,\n clust: String!,\n pagingState: String @cql_pagingState\n ): SelectTabularResult @cql_select(pageSize: 10)\n }\n type Mutation {\n \t\tinsertTabular(tabular: TabularInput): Tabular\n }\n \"\"\") {\n version\n }\n}\n"
}
tags:
name: create-gql-schema
- name: rampup
tags:
phase: rampup
statements:
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_tabular>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertTabular(tabular: {part: \"{part_layout}\", clust: \"{clust_layout}\", data: \"{data}\"}) {\n part\n clust\n data\n }\n}\n"
}
tags:
name: rampup-insert
- name: main-read
tags:
phase: main
type: read
params:
ratio: <<read_ratio:1>>
statements:
- main-select: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_tabular>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"{\n getTabulars(part: \"{part_read}\", clust: \"{clust_read}\") {\n data {\n part\n clust\n data\n }\n pagingState\n }\n}\n"
}
tags:
name: main-select
- name: main-write
tags:
phase: main
type: write
params:
ratio: <<write_ratio:9>>
statements:
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_tabular>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertTabular(tabular: {part: \"{part_write}\", clust: \"{clust_write}\", data: \"{data_write}\"}) {\n part\n clust\n data\n }\n}\n"
}
tags:
name: main-write

View File

@ -0,0 +1,121 @@
# nb -v run driver=http yaml=http-graphql-timeseries tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN
# TODO
# - /graphqlv2 will change in future, adapt when needed
# - do we need a truncate schema / namespace at the end
# - support for a paging state?
# - infinity is not handled
# - compression and compaction not defined (limitation in the GQL schema-first API)
description: |
This workload emulates a time-series data model and access patterns.
This should be identical to the cql variant except for:
- We can't specify the write timestamp to make the write idempotent like we can with cql.
- The `time` binding has to have a StringDateWrapper to get the exact format that the REST API/GraphQL needs; See https://github.com/stargate/stargate/issues/532.
- We need to URLEncode the `data` binding because newlines can't be sent in REST calls.
- Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options.
- There is no instrumentation with the http driver.
- There is no async mode with the http driver.
Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080).
scenarios:
default:
- run driver=http tags==phase:schema threads==1 cycles==UNDEF
- run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
- run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
# To enable an optional weighted set of hosts in place of a load balancer
# Examples
# single host: stargate_host=host1
# multiple hosts: stargate_host=host1,host2,host3
# multiple weighted hosts: stargate_host=host1:3,host2:7
weighted_hosts: WeightedStrings('<<stargate_host:stargate>>')
# http request id
request_id: ToHashedUUID(); ToString();
machine_id: Mod(<<sources:10000>>); ToHashedUUID() -> java.util.UUID
sensor_name: HashedLineToString('data/variable_words.txt')
time: Mul(<<timespeed:100>>L); Div(<<sources:10000>>L); StringDateWrapper("yyyy-MM-dd'T'hh:mm:ss'Z");
sensor_value: Normal(0.0,5.0); Add(100.0) -> double
station_id: Div(<<sources:10000>>);Mod(<<stations:100>>); ToHashedUUID() -> java.util.UUID
data: HashedFileExtractToString('data/lorem_ipsum_full.txt',800,1200); URLEncode();
blocks:
- tags:
phase: schema
statements:
- create-keyspace: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n createNamespace(name: \"<<keyspace:gqlsf_timeseries>>\", replicas: <<rf:1>>) {\n namespace {\n name\n }\n }\n}\n"
}
tags:
name: create-keyspace
- create-gql-schema : POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/admin
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n deploySchema(namespace: \"<<keyspace:gqlsf_timeseries>>\", schema: \"\"\"\n type Iot @cql_input {\n machine_id: Uuid! @cql_column(partitionKey: true)\n sensor_name: String! @cql_column(partitionKey: true)\n time: Timestamp! @cql_column(clusteringOrder: DESC)\n sensor_value: Float!\n \tstation_id: Uuid!\n data: String!\n }\n type SelectIotResult @cql_payload {\n \t\tdata: [Iot]\n \t\tpagingState: String\n }\n type Query {\n getIots(\n machine_id: Uuid!,\n sensor_name: String!,\n pagingState: String @cql_pagingState\n ): SelectIotResult @cql_select(pageSize: 10)\n }\n type Mutation {\n \t\tinsertIot(iot: IotInput): Iot\n }\n \"\"\") {\n version\n }\n}\n"
}
tags:
name: create-gql-schema
- name: rampup
tags:
phase: rampup
statements:
- rampup-insert: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_timeseries>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertIot(iot: {machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\", time: \"{time}\", sensor_value: {sensor_value}, station_id: \"{station_id}\", data: \"{data}\"}) {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n}\n"
}
tags:
name: rampup-insert
- name: main-read
tags:
phase: main
type: read
params:
ratio: <<read_ratio:1>>
statements:
- main-select: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_timeseries>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"{\n getIots(machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\") {\n data {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n }\n}\n"
}
tags:
name: main-select
- name: main-write
tags:
phase: main
type: write
params:
ratio: <<write_ratio:9>>
statements:
- main-write: POST <<protocol:http>>://{weighted_hosts}:<<stargate_port:8080>><<path_prefix:>>/graphqlv2/namespace/<<keyspace:gqlsf_timeseries>>
Accept: "application/json"
X-Cassandra-Request-Id: "{request_id}"
X-Cassandra-Token: "<<auth_token:my_auth_token>>"
Content-Type: "application/json"
body: |
{
"query":"mutation {\n insertIot(iot: {machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\", time: \"{time}\", sensor_value: {sensor_value}, station_id: \"{station_id}\", data: \"{data}\"}) {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n}\n"
}
tags:
name: main-write

View File

@ -20,7 +20,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -78,18 +84,28 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder = adminBuilder.authentication(authPluginClassName, authParams);
// String tokenFileName = StringUtils.removeStart(authParams, "file://");
// File tokenFile = new File(tokenFileName);
// String token;
// try {
// token = FileUtils.readFileToString(tokenFile, StandardCharsets.UTF_8);
// token = StringUtils.normalizeSpace(token);
// }
// catch (IOException ioe) {
// throw new RuntimeException("Failed to read the specified (\"client.authParams\") token file: " + tokenFileName + "!");
// }
// adminBuilder.authentication(AuthenticationFactory.token(token));
adminBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
adminBuilder = adminBuilder
adminBuilder
.useKeyStoreTls(useTls)
.allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
adminBuilder = adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
pulsarAdmin = adminBuilder.build();

View File

@ -6,6 +6,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
@ -58,11 +59,14 @@ public class PulsarSpace {
createPulsarSchemaFromConf();
try {
List<String> stringList = pulsarAdmin.clusters().getClusters();
Clusters clusters = pulsarAdmin.clusters();
List<String> stringList = clusters.getClusters();
CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator());
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get Pulsar cluster metadata!");
String errMsg = "Fail to create PulsarClient from global configuration: " + e.getMessage();
logger.error(errMsg);
throw new RuntimeException(errMsg);
}
}
@ -79,8 +83,9 @@ public class PulsarSpace {
.serviceUrl(pulsarSvcUrl)
.build();
} catch (PulsarClientException pce) {
logger.error("Fail to create PulsarClient from global configuration!");
throw new RuntimeException("Fail to create PulsarClient from global configuration!");
String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage();
logger.error(errMsg);
throw new RuntimeException(errMsg);
}
}

View File

@ -2,7 +2,6 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import java.util.Set;
import java.util.function.LongFunction;
@ -17,18 +16,18 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminMapper extends PulsarOpMapper {
public class PulsarAdminCrtTennamMapper extends PulsarOpMapper {
private final LongFunction<Set<String>> adminRolesFunc;
private final LongFunction<Set<String>> allowedClustersFunc;
private final LongFunction<String> tenantFunc;
private final LongFunction<String> namespaceFunc;
public PulsarAdminMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) {
public PulsarAdminCrtTennamMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) {
super(cmdTpl, clientSpace);
this.adminRolesFunc = adminRolesFunc;
this.allowedClustersFunc = allowedClustersFunc;
@ -43,7 +42,7 @@ public class PulsarAdminMapper extends PulsarOpMapper {
String tenant = tenantFunc.apply(value);
String namespace = namespaceFunc.apply(value);
return new PulsarAdminOp(
return new PulsarAdminCrtTennamOp(
clientSpace,
adminRoleSet,
allowedClusterSet,

View File

@ -10,9 +10,9 @@ import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
public class PulsarAdminOp extends SyncPulsarOp {
public class PulsarAdminCrtTennamOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class);
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTennamOp.class);
private final PulsarSpace clientSpace;
private final Set<String> adminRoleSet;
@ -20,11 +20,11 @@ public class PulsarAdminOp extends SyncPulsarOp {
private final String tenant;
private final String namespace;
public PulsarAdminOp(PulsarSpace clientSpace,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant,
String namespace) {
public PulsarAdminCrtTennamOp(PulsarSpace clientSpace,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant,
String namespace) {
this.clientSpace = clientSpace;
this.adminRoleSet = adminRoleSet;
this.allowedClusterSet = allowedClusterSet;

View File

@ -0,0 +1,66 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminCrtTopMapper extends PulsarOpMapper {
private final LongFunction<String> topicUriFunc;
private final LongFunction<String> enablePartionFunc;
private final LongFunction<String> partitionNumFunc;
public PulsarAdminCrtTopMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<String> topicUriFunc,
LongFunction<String> enablePartionFunc,
LongFunction<String> partitionNumFunc) {
super(cmdTpl, clientSpace);
this.topicUriFunc = topicUriFunc;
this.enablePartionFunc = enablePartionFunc;
this.partitionNumFunc = partitionNumFunc;
}
@Override
public PulsarOp apply(long value) {
String topicUri = topicUriFunc.apply(value);
String enablePartitionStr = enablePartionFunc.apply(value);
String partitionNumStr = partitionNumFunc.apply(value);
if ( StringUtils.isBlank(topicUri) ) {
throw new RuntimeException("\"topic_uri\" parameter can't be empty when creating a Pulsar topic!");
}
boolean partitionTopic = BooleanUtils.toBoolean(enablePartitionStr);
boolean invalidPartStr;
int partitionNum = 0;
if ( StringUtils.isBlank(partitionNumStr) || !StringUtils.isNumeric(partitionNumStr) ) {
invalidPartStr = true;
} else {
partitionNum = Integer.valueOf(partitionNumStr);
invalidPartStr = (partitionNum <= 0);
}
if (partitionTopic && invalidPartStr) {
throw new RuntimeException("Invalid specified value for \"partition_num\" parameter when creating partitioned topic!");
}
return new PulsarAdminCrtTopOp(
clientSpace,
topicUri,
partitionTopic,
partitionNum);
}
}

View File

@ -0,0 +1,60 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
public class PulsarAdminCrtTopOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTopOp.class);
private final PulsarSpace clientSpace;
private final String topicUri;
private final boolean partitionTopic;
private final int partitionNum;
public PulsarAdminCrtTopOp(PulsarSpace clientSpace,
String topicUri,
boolean partitionTopic,
int partitionNum) {
this.clientSpace = clientSpace;
this.topicUri = topicUri;
this.partitionTopic = partitionTopic;
this.partitionNum = partitionNum;
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
int statusCode = e.getStatusCode();
// 409 conflict: resource already exists
if ( (statusCode >= 400) && (statusCode != 409) ) {
throw new RuntimeException(finalErrMsg);
}
}
@Override
public void run() {
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
try {
if (!partitionTopic) {
topics.createNonPartitionedTopic(topicUri);
}
else {
topics.createPartitionedTopic(topicUri, partitionNum);
}
} catch (PulsarAdminException e) {
String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d",
topicUri,
partitionTopic,
partitionNum);
processPulsarAdminException(e, errMsg);
}
}
}

View File

@ -72,53 +72,32 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
// Global parameter: topic_uri (applies only to non-Admin API)
// Global parameter: topic_uri
LongFunction<String> topicUriFunc = (l) -> null;
// Global parameter: async_api (applies only to non-Admin API)
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (!StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label)) {
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) {
throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'.");
} else if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
} else if (cmdTpl.containsKey("topic")) {
if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) {
String persistence = cmdTpl.getStaticOr("persistence", "persistent")
.replaceAll("true", "persistent");
String tenant = cmdTpl.getStaticOr("tenant", "public");
String namespace = cmdTpl.getStaticOr("namespace", "default");
String topic = cmdTpl.getStaticOr("topic", "");
String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic;
topicUriFunc = (l) -> composited;
} else { // some or all dynamic fields, composite into a single dynamic call
topicUriFunc = (l) ->
cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent")
+ "://" + cmdTpl.getOr("tenant", l, "public")
+ "/" + cmdTpl.getOr("namespace", l, "default")
+ "/" + cmdTpl.getOr("topic", l, "");
}
}
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
} else {
asyncApiFunc = (l) -> false;
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
}
}
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
}
// TODO: Complete implementation for websocket-producer and managed-ledger
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label) ) {
return resolveAdminRequest(clientSpace);
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TENNAME.label) ) {
return resolveAdminCrtTenname(clientSpace);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TOP.label)) {
return resolveAdminCrtParttop(clientSpace, topicUriFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
@ -136,7 +115,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
}
private LongFunction<PulsarOp> resolveAdminRequest(PulsarSpace clientSpace) {
// Admin API: create tenant and namespace
private LongFunction<PulsarOp> resolveAdminCrtTenname(PulsarSpace clientSpace) {
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
@ -184,7 +164,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
namespaceFunc = (l) -> null;
}
return new PulsarAdminMapper(
return new PulsarAdminCrtTennamMapper(
cmdTpl,
clientSpace,
adminRolesFunc,
@ -193,6 +173,33 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
namespaceFunc);
}
// Admin API: create partitioned topic
private LongFunction<PulsarOp> resolveAdminCrtParttop(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_fun
) {
LongFunction<String> enablePartionFunc = (l) -> null;
if (cmdTpl.isStatic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getStatic("enable_partition");
} else if (cmdTpl.isDynamic("enable_partition")) {
enablePartionFunc = (l) -> cmdTpl.getDynamic("enable_partition", l);
}
LongFunction<String> partitionNumFunc = (l) -> null;
if (cmdTpl.isStatic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getStatic("partition_num");
} else if (cmdTpl.isDynamic("partition_num")) {
partitionNumFunc = (l) -> cmdTpl.getDynamic("partition_num", l);
}
return new PulsarAdminCrtTopMapper(
cmdTpl,
clientSpace,
topic_uri_fun,
enablePartionFunc,
partitionNumFunc);
}
private LongFunction<PulsarOp> resolveMsgSend(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,

View File

@ -26,7 +26,8 @@ public class PulsarActivityUtil {
// Supported message operation types
// TODO: websocket-producer and managed-ledger
public enum OP_TYPES {
ADMIN("admin"),
ADMIN_CRT_TENNAME("admin-crt-tennam"),
ADMIN_CRT_TOP("admin-crt-top"),
BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"),
@ -68,7 +69,7 @@ public class PulsarActivityUtil {
public enum CLNT_CONF_KEY {
serviceUrl("serviceUrl"),
authPulginClassName("authPluginClassName"),
authParams("AuthParams"),
authParams("authParams"),
pperationTimeoutMs("operationTimeoutMs"),
statsIntervalSeconds("statsIntervalSeconds"),
numIoThreads("numIoThreads"),

View File

@ -19,20 +19,32 @@ params:
async_api: "false"
blocks:
- name: admin-block
- name: create-tennam-block
tags:
phase: admin-api
phase: create-tenant-namespace
admin_task: true
statements:
- name: s1
optype: admin
optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}"
namespace: "default"
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
partition_num: "5"
- name: batch-producer-block
tags:
phase: batch-producer
admin_task: false
statements:
- name: s1
optype: batch-msg-send-start
@ -57,6 +69,7 @@ blocks:
- name: producer-block
tags:
phase: producer
admin_task: false
statements:
- name: s1
optype: msg-send
@ -73,6 +86,7 @@ blocks:
- name: consumer-block
tags:
phase: consumer
admin_task: false
statements:
- name: s1
optype: msg-consume
@ -85,6 +99,7 @@ blocks:
- name: reader-block
tags:
phase: reader
admin_task: false
statements:
- name: s1
optype: msg-read

View File

@ -4,15 +4,16 @@
- [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure)
- [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters)
- [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details)
- [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block)
- [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block)
- [1.4.3. Producer Command Block](#143-producer-command-block)
- [1.4.4. Consumer Command Block](#144-consumer-command-block)
- [1.4.5. Reader Command Block](#145-reader-command-block)
- [1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace](#141-pulsar-admin-api-command-block---create-tenant-and-namespace)
- [1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)](#142-pulsar-admin-api-command-block---create-topic-partitioned-or-regular)
- [1.4.3. Batch Producer Command Block](#143-batch-producer-command-block)
- [1.4.4. Producer Command Block](#144-producer-command-block)
- [1.4.5. Consumer Command Block](#145-consumer-command-block)
- [1.4.6. Reader Command Block](#146-reader-command-block)
- [1.5. Schema Support](#15-schema-support)
- [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters)
- [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example)
- [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties)
- [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties)
- [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features)
- [2.1. Other Activity Parameters](#21-other-activity-parameters)
- [2.2. API Caching](#22-api-caching)
@ -72,25 +73,25 @@ There are multiple sections in this file that correspond to different groups of
* This section defines all configuration settings that are related
with defining a PulsarClient object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters)
* **Pulsar Producer related settings**:
* All settings under this section starts with **producer** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Producer object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer)
* **Pulsar Consumer related settings**:
* All settings under this section starts with **consumer** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Consumer object.
*
See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer)
* **Pulsar Reader related settings**:
* All settings under this section starts with **reader** prefix.
* This section defines all configuration settings that are related
with defining a Pulsar Reader object.
*
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader)
In the future, when the support for other types of Pulsar workloads is
added in NB Pulsar driver, there will be corresponding configuration
@ -141,10 +142,10 @@ params:
blocks:
- name: <command_block_1>
tags:
phase: <command_bock_filtering_identifier>
phase: <command_bock_identifier>
statements:
- name: <statement_name_1>
optype: <statement_filtering_identifier>
optype: <statement_identifier>
... <statement_specific_parameters> ...
- name: <statement_name_2>
... ...
@ -237,43 +238,68 @@ cycle level, **the ycle level setting will take priority!**
## 1.4. Pulsar Driver Yaml File - Command Block Details
### 1.4.1. Pulsar Admin API Command Block
### 1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace
**NOTE**: this functionality is only partially implemented at the moment
and doesn't function yet.
Currently, the Pulsar Admin API Block is (planned) to only support
creating Pulsar tenants and namespaces. It has the following format:
This Pulsar Admin API Block is used to create Pulsar tenants and namespaces. It has the following format:
```yaml
- name: admin-block
- name: create-tennam-block
tags:
phase: create-tenant-namespace
admin_task: true
statements:
- name: s1
optype: create-tenant
optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}"
- name: s2
optype: create-namespace
namespace: "{namespace}"
namespace: "default"
```
In this command block, there are 2 statements (s1 and s2):
In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar tenant
* (Mandatory) **optype (create-tenant)** is the statement identifier
* Statement **s1** is used for creating a Pulsar tenant and a namespace
* (Mandatory) **optype (admin-crt-tennam)** is the statement identifier
for this statement
* (Mandatory) **tenant** is the only statement parameter that
specifies the Pulsar tenant name which can either be dynamically
bound or statically assigned.
* Statement **s2** is used for creating a Pulsar namespace
* (Mandatory) **optype (create-namespace)** is the statement
identifier for this statement
* (Mandatory) **namespace** is the only statement parameter that
specifies the Pulsar namespace under the tenant created by statement
s1. Its name can either be dynamically bound or statically assigned.
* (Optional) **allowed_clusters** must be statically bound and it
specifies the cluster list that is allowed for a tenant.
* (Optional) **admin_roles** must be statically bound and it specifies
the super user role that is associated with a tenant.
* (Mandatory) **tenant** is the Pulsar tenant name to be created. It
can either be dynamically or statically bound.
* (Mandatory) **namespace** is the Pulsar namespace name to be created
under the above tenant. It also can be dynamically or statically bound.
### 1.4.2. Batch Producer Command Block
### 1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)
This Pulsar Admin API Block is used to create Pulsar topics. It has the following format:
```yaml
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
partition_num: "5"
```
In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar tenant and a namespace
* (Mandatory) **optype (admin-crt-top)** is the statement identifier
for this statement
* (Mandatory) **enable_partition** specifies whether to create a
partitioned topic. It can either be dynamically or statically bound.
* (Mandatory) **partition_num** specifies the number of partitions if
a partitioned topic is to be created. It also can be dynamically or
statically bound.
**NOTE**: The topic name is bounded by the document level parameter "topic_uri".
### 1.4.3. Batch Producer Command Block
Batch producer command block is used to produce a batch of messages all at
once by one NB cycle execution. A typical format of this command block is
@ -334,7 +360,7 @@ ratios: 1, <batch_num>, 1.
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it
is default to 1.
### 1.4.3. Producer Command Block
### 1.4.4. Producer Command Block
This is the regular Pulsar producer command block that produces one Pulsar
message per NB cycle execution. A typical format of this command block is
@ -370,7 +396,7 @@ This command block only has 1 statements (s1):
* (Mandatory) **msg_payload** specifies the payload of the generated
message
### 1.4.4. Consumer Command Block
### 1.4.5. Consumer Command Block
This is the regular Pulsar consumer command block that consumes one Pulsar
message per NB cycle execution. A typical format of this command block is
@ -410,7 +436,7 @@ This command block only has 1 statements (s1):
**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**.
### 1.4.5. Reader Command Block
### 1.4.6. Reader Command Block
This is the regular Pulsar reader command block that reads one Pulsar
message per NB cycle execution. A typical format of this command block is
@ -528,7 +554,7 @@ environment.
```
## Appendix A. Template Global Setting File (config.properties)
## 1.8. Appendix A. Template Global Setting File (config.properties)
```properties
schema.type =
schema.definition =