diff --git a/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-keyvalue.yaml b/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-keyvalue.yaml new file mode 100644 index 000000000..71a498bb2 --- /dev/null +++ b/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-keyvalue.yaml @@ -0,0 +1,114 @@ +# nb -v run driver=http yaml=http-graphql-keyvalue tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN + +# TODO +# - /graphqlv2 will change in future, adapt when needed +# - do we need a truncate schema / namespace at the end + +description: | + This workload emulates a key-value data model and access patterns. + This should be identical to the cql variant except for: + - Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options. + - There is no instrumentation with the http driver. + - There is no async mode with the http driver. + Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080). + +scenarios: + default: + - run driver=http tags==phase:schema threads==1 cycles==UNDEF + - run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto + - run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto +bindings: + # To enable an optional weighted set of hosts in place of a load balancer + # Examples + # single host: stargate_host=host1 + # multiple hosts: stargate_host=host1,host2,host3 + # multiple weighted hosts: stargate_host=host1:3,host2:7 + weighted_hosts: WeightedStrings('<>') + # http request id + request_id: ToHashedUUID(); ToString(); + + seq_key: Mod(<>); ToString() -> String + seq_value: Hash(); Mod(<>); ToString() -> String + rw_key: <int>>; ToString() -> String + rw_value: Hash(); <int>>; ToString() -> String + +blocks: + - tags: + phase: schema + statements: + - create-keyspace: POST <>://{weighted_hosts}:<><>/graphqlv2/admin + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n createNamespace(name: \"<>\", replicas: <>) {\n namespace {\n name\n }\n }\n}\n" + } + tags: + name: create-keyspace + - create-gql-schema : POST <>://{weighted_hosts}:<><>/graphqlv2/admin + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n deploySchema(namespace: \"<>\", schema: \"\"\"\n type KeyValue @cql_input {\n key: String! @cql_column(partitionKey: true)\n value: String!\n }\n type Query {\n getKeyValue(\n key: String!,\n ): KeyValue\n }\n type Mutation {\n \t\tinsertKeyValue(keyValue: KeyValueInput): KeyValue\n }\n \"\"\") {\n version\n }\n}\n" + } + tags: + name: create-gql-schema + + - name: rampup + tags: + phase: rampup + statements: + - rampup-insert: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n insertKeyValue(keyValue: {key: \"{seq_key}\", value: \"{seq_value}\"}) {\n key\n value\n }\n}\n" + } + tags: + name: rampup-insert + + - name: main-read + tags: + phase: main + type: read + params: + ratio: <> + statements: + - main-select: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"{\n getKeyValue(key: \"rw_key\") {\n key\n value\n }\n}\n" + } + tags: + name: main-select + + - name: main-write + tags: + phase: main + type: write + params: + ratio: <> + statements: + - main-write: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n insertKeyValue(keyValue: {key: \"{rw_key}\", value: \"{rw_value}\"}) {\n key\n value\n }\n}\n" + } + tags: + name: main-write diff --git a/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-tabular.yaml b/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-tabular.yaml new file mode 100644 index 000000000..958264a0f --- /dev/null +++ b/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-tabular.yaml @@ -0,0 +1,123 @@ +# nb -v run driver=http yaml=http-graphql-tabular tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN + +# TODO +# - /graphqlv2 will change in future, adapt when needed +# - do we need a truncate schema / namespace at the end +# - rest uses limit as it queries only by a single primary key, we can not map this to GQL (also should data be clustering key) + +description: | + This workload emulates a time-series data model and access patterns. + This should be identical to the cql variant except for: + - We need to URLEncode the `data` and `data_write` bindings because newlines can't be sent in REST calls. + - Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options. + - There is no instrumentation with the http driver. + - There is no async mode with the http driver. + Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080). + +scenarios: + default: + - run driver=http tags==phase:schema threads==1 cycles==UNDEF + - run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto + - run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto +bindings: + # To enable an optional weighted set of hosts in place of a load balancer + # Examples + # single host: stargate_host=host1 + # multiple hosts: stargate_host=host1,host2,host3 + # multiple weighted hosts: stargate_host=host1:3,host2:7 + weighted_hosts: WeightedStrings('<>') + # http request id + request_id: ToHashedUUID(); ToString(); + + # for ramp-up and verify + part_layout: Div(<>); ToString() -> String + clust_layout: Mod(<>); ToString() -> String + data: HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150); URLEncode(); + # for read + limit: Uniform(1,10) -> int + part_read: Uniform(0,<>)->int; ToString() -> String + clust_read: Add(1); Uniform(0,<>)->int; ToString() -> String + # for write + part_write: Hash(); Uniform(0,<>)->int; ToString() -> String + clust_write: Hash(); Add(1); Uniform(0,<>)->int; ToString() -> String + data_write: Hash(); HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150); URLEncode(); +blocks: + - tags: + phase: schema + statements: + - create-keyspace: POST <>://{weighted_hosts}:<><>/graphqlv2/admin + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n createNamespace(name: \"<>\", replicas: <>) {\n namespace {\n name\n }\n }\n}\n" + } + tags: + name: create-keyspace + - create-gql-schema : POST <>://{weighted_hosts}:<><>/graphqlv2/admin + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n deploySchema(namespace: \"<>\", schema: \"\"\"\n type Tabular @cql_input {\n part: String! @cql_column(partitionKey: true)\n clust: String! @cql_column(partitionKey: true)\n data: String! \n }\n type SelectTabularResult @cql_payload {\n \t\tdata: [Tabular]\n \t\tpagingState: String\n }\n type Query {\n getTabulars(\n part: String!,\n clust: String!,\n pagingState: String @cql_pagingState\n ): SelectTabularResult @cql_select(pageSize: 10)\n }\n type Mutation {\n \t\tinsertTabular(tabular: TabularInput): Tabular\n }\n \"\"\") {\n version\n }\n}\n" + } + tags: + name: create-gql-schema + + - name: rampup + tags: + phase: rampup + statements: + - rampup-insert: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n insertTabular(tabular: {part: \"{part_layout}\", clust: \"{clust_layout}\", data: \"{data}\"}) {\n part\n clust\n data\n }\n}\n" + } + tags: + name: rampup-insert + + - name: main-read + tags: + phase: main + type: read + params: + ratio: <> + statements: + - main-select: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"{\n getTabulars(part: \"{part_read}\", clust: \"{clust_read}\") {\n data {\n part\n clust\n data\n }\n pagingState\n }\n}\n" + } + tags: + name: main-select + + - name: main-write + tags: + phase: main + type: write + params: + ratio: <> + statements: + - main-write: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n insertTabular(tabular: {part: \"{part_write}\", clust: \"{clust_write}\", data: \"{data_write}\"}) {\n part\n clust\n data\n }\n}\n" + } + tags: + name: main-write diff --git a/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-timeseries.yaml b/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-timeseries.yaml new file mode 100644 index 000000000..f92f9c76e --- /dev/null +++ b/driver-http/src/main/resources/activities/graphql-schema-first/http-graphql-timeseries.yaml @@ -0,0 +1,121 @@ +# nb -v run driver=http yaml=http-graphql-timeseries tags=phase:schema host=my_stargate_host stargate_host=my_stargate_host auth_token=$AUTH_TOKEN + +# TODO +# - /graphqlv2 will change in future, adapt when needed +# - do we need a truncate schema / namespace at the end +# - support for a paging state? +# - infinity is not handled +# - compression and compaction not defined (limitation in the GQL schema-first API) + +description: | + This workload emulates a time-series data model and access patterns. + This should be identical to the cql variant except for: + - We can't specify the write timestamp to make the write idempotent like we can with cql. + - The `time` binding has to have a StringDateWrapper to get the exact format that the REST API/GraphQL needs; See https://github.com/stargate/stargate/issues/532. + - We need to URLEncode the `data` binding because newlines can't be sent in REST calls. + - Schema creation GraphQL first, we don't use cql and thus can only create schema with limited options. + - There is no instrumentation with the http driver. + - There is no async mode with the http driver. + Note that stargate_port should reflect the port where GraphQL API V2 is exposed (defaults to 8080). + +scenarios: + default: + - run driver=http tags==phase:schema threads==1 cycles==UNDEF + - run driver=http tags==phase:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto + - run driver=http tags==phase:main cycles===TEMPLATE(main-cycles,10000000) threads=auto +bindings: + # To enable an optional weighted set of hosts in place of a load balancer + # Examples + # single host: stargate_host=host1 + # multiple hosts: stargate_host=host1,host2,host3 + # multiple weighted hosts: stargate_host=host1:3,host2:7 + weighted_hosts: WeightedStrings('<>') + # http request id + request_id: ToHashedUUID(); ToString(); + + machine_id: Mod(<>); ToHashedUUID() -> java.util.UUID + sensor_name: HashedLineToString('data/variable_words.txt') + time: Mul(<>L); Div(<>L); StringDateWrapper("yyyy-MM-dd'T'hh:mm:ss'Z"); + sensor_value: Normal(0.0,5.0); Add(100.0) -> double + station_id: Div(<>);Mod(<>); ToHashedUUID() -> java.util.UUID + data: HashedFileExtractToString('data/lorem_ipsum_full.txt',800,1200); URLEncode(); +blocks: + - tags: + phase: schema + statements: + - create-keyspace: POST <>://{weighted_hosts}:<><>/graphqlv2/admin + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n createNamespace(name: \"<>\", replicas: <>) {\n namespace {\n name\n }\n }\n}\n" + } + tags: + name: create-keyspace + - create-gql-schema : POST <>://{weighted_hosts}:<><>/graphqlv2/admin + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n deploySchema(namespace: \"<>\", schema: \"\"\"\n type Iot @cql_input {\n machine_id: Uuid! @cql_column(partitionKey: true)\n sensor_name: String! @cql_column(partitionKey: true)\n time: Timestamp! @cql_column(clusteringOrder: DESC)\n sensor_value: Float!\n \tstation_id: Uuid!\n data: String!\n }\n type SelectIotResult @cql_payload {\n \t\tdata: [Iot]\n \t\tpagingState: String\n }\n type Query {\n getIots(\n machine_id: Uuid!,\n sensor_name: String!,\n pagingState: String @cql_pagingState\n ): SelectIotResult @cql_select(pageSize: 10)\n }\n type Mutation {\n \t\tinsertIot(iot: IotInput): Iot\n }\n \"\"\") {\n version\n }\n}\n" + } + tags: + name: create-gql-schema + + - name: rampup + tags: + phase: rampup + statements: + - rampup-insert: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n insertIot(iot: {machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\", time: \"{time}\", sensor_value: {sensor_value}, station_id: \"{station_id}\", data: \"{data}\"}) {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n}\n" + } + tags: + name: rampup-insert + + - name: main-read + tags: + phase: main + type: read + params: + ratio: <> + statements: + - main-select: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"{\n getIots(machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\") {\n data {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n }\n}\n" + } + tags: + name: main-select + + - name: main-write + tags: + phase: main + type: write + params: + ratio: <> + statements: + - main-write: POST <>://{weighted_hosts}:<><>/graphqlv2/namespace/<> + Accept: "application/json" + X-Cassandra-Request-Id: "{request_id}" + X-Cassandra-Token: "<>" + Content-Type: "application/json" + body: | + { + "query":"mutation {\n insertIot(iot: {machine_id: \"{machine_id}\", sensor_name: \"{sensor_name}\", time: \"{time}\", sensor_value: {sensor_value}, station_id: \"{station_id}\", data: \"{data}\"}) {\n machine_id\n sensor_name\n time\n sensor_value\n station_id\n data\n }\n}\n" + } + tags: + name: main-write diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index f5f64609a..964ebee72 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -29,6 +29,10 @@ public class PulsarAction implements SyncAction { @Override public int runCycle(long cycle) { + + // let's fail the action if some async operation failed + activity.failOnAsyncOperationFailure(); + long start = System.nanoTime(); PulsarOp pulsarOp; @@ -44,8 +48,11 @@ public class PulsarAction implements SyncAction { } for (int i = 0; i < maxTries; i++) { - try (Timer.Context ctx = activity.getExecuteTimer().time()) { - pulsarOp.run(); + Timer.Context ctx = activity.getExecuteTimer().time(); + try { + // it is up to the pulsarOp to call Context#close when the activity is executed + // this allows us to track time for async operations + pulsarOp.run(ctx::close); break; } catch (RuntimeException err) { ErrorDetail errorDetail = activity diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index 5feb202b1..e5d4fa85a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -20,7 +20,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.shade.org.apache.commons.io.FileUtils; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; public class PulsarActivity extends SimpleActivity implements ActivityDefObserver { @@ -42,6 +48,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve private NBErrorHandler errorhandler; private OpSequence> sequencer; + private volatile Throwable asyncOperationFailure; // private Supplier clientSupplier; // private ThreadLocal> tlClientSupplier; @@ -77,18 +84,28 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr); if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) { - adminBuilder = adminBuilder.authentication(authPluginClassName, authParams); +// String tokenFileName = StringUtils.removeStart(authParams, "file://"); +// File tokenFile = new File(tokenFileName); +// String token; +// try { +// token = FileUtils.readFileToString(tokenFile, StandardCharsets.UTF_8); +// token = StringUtils.normalizeSpace(token); +// } +// catch (IOException ioe) { +// throw new RuntimeException("Failed to read the specified (\"client.authParams\") token file: " + tokenFileName + "!"); +// } +// adminBuilder.authentication(AuthenticationFactory.token(token)); + adminBuilder.authentication(authPluginClassName, authParams); } if ( useTls ) { - adminBuilder = adminBuilder + adminBuilder .useKeyStoreTls(useTls) .allowTlsInsecureConnection(tlsAllowInsecureConnection) .enableTlsHostnameVerification(tlsHostnameVerificationEnable); if (!StringUtils.isBlank(tlsTrustCertsFilePath)) - adminBuilder = adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); - + adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); } pulsarAdmin = adminBuilder.build(); @@ -171,4 +188,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public Histogram getMessagesizeHistogram() { return messagesizeHistogram; } + + public void failOnAsyncOperationFailure() { + if (asyncOperationFailure != null) { + throw new RuntimeException(asyncOperationFailure); + } + } + + public void asyncOperationFailed(Throwable ex) { + this.asyncOperationFailure = asyncOperationFailure; + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index 492a282e2..6014ae21d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -6,6 +6,7 @@ import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.Clusters; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; @@ -58,11 +59,14 @@ public class PulsarSpace { createPulsarSchemaFromConf(); try { - List stringList = pulsarAdmin.clusters().getClusters(); + Clusters clusters = pulsarAdmin.clusters(); + List stringList = clusters.getClusters(); CollectionUtils.addAll(pulsarClusterMetadata, stringList.listIterator()); } catch (PulsarAdminException e) { - throw new RuntimeException("Failed to get Pulsar cluster metadata!"); + String errMsg = "Fail to create PulsarClient from global configuration: " + e.getMessage(); + logger.error(errMsg); + throw new RuntimeException(errMsg); } } @@ -79,8 +83,9 @@ public class PulsarSpace { .serviceUrl(pulsarSvcUrl) .build(); } catch (PulsarClientException pce) { - logger.error("Fail to create PulsarClient from global configuration!"); - throw new RuntimeException("Fail to create PulsarClient from global configuration!"); + String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage(); + logger.error(errMsg); + throw new RuntimeException(errMsg); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamMapper.java similarity index 74% rename from driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java rename to driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamMapper.java index ac47afeaa..f821061fc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamMapper.java @@ -2,7 +2,6 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; -import org.apache.pulsar.client.api.Producer; import java.util.Set; import java.util.function.LongFunction; @@ -17,18 +16,18 @@ import java.util.function.LongFunction; * * For additional parameterization, the command template is also provided. */ -public class PulsarAdminMapper extends PulsarOpMapper { +public class PulsarAdminCrtTennamMapper extends PulsarOpMapper { private final LongFunction> adminRolesFunc; private final LongFunction> allowedClustersFunc; private final LongFunction tenantFunc; private final LongFunction namespaceFunc; - public PulsarAdminMapper(CommandTemplate cmdTpl, - PulsarSpace clientSpace, - LongFunction> adminRolesFunc, - LongFunction> allowedClustersFunc, - LongFunction tenantFunc, - LongFunction namespaceFunc) { + public PulsarAdminCrtTennamMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction> adminRolesFunc, + LongFunction> allowedClustersFunc, + LongFunction tenantFunc, + LongFunction namespaceFunc) { super(cmdTpl, clientSpace); this.adminRolesFunc = adminRolesFunc; this.allowedClustersFunc = allowedClustersFunc; @@ -43,7 +42,7 @@ public class PulsarAdminMapper extends PulsarOpMapper { String tenant = tenantFunc.apply(value); String namespace = namespaceFunc.apply(value); - return new PulsarAdminOp( + return new PulsarAdminCrtTennamOp( clientSpace, adminRoleSet, allowedClusterSet, diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java similarity index 76% rename from driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java rename to driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java index 8614b294c..30c0f717e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTennamOp.java @@ -1,27 +1,18 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarSpace; -import io.nosqlbench.driver.pulsar.util.AvroUtil; -import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.*; -import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.policies.data.TenantInfo; -import org.apache.pulsar.common.schema.SchemaType; -import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; -public class PulsarAdminOp implements PulsarOp { +public class PulsarAdminCrtTennamOp extends SyncPulsarOp { - private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class); + private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTennamOp.class); private final PulsarSpace clientSpace; private final Set adminRoleSet; @@ -29,11 +20,11 @@ public class PulsarAdminOp implements PulsarOp { private final String tenant; private final String namespace; - public PulsarAdminOp(PulsarSpace clientSpace, - Set adminRoleSet, - Set allowedClusterSet, - String tenant, - String namespace) { + public PulsarAdminCrtTennamOp(PulsarSpace clientSpace, + Set adminRoleSet, + Set allowedClusterSet, + String tenant, + String namespace) { this.clientSpace = clientSpace; this.adminRoleSet = adminRoleSet; this.allowedClusterSet = allowedClusterSet; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopMapper.java new file mode 100644 index 000000000..a236e4cd6 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopMapper.java @@ -0,0 +1,66 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.function.LongFunction; + +/** + * This maps a set of specifier functions to a pulsar operation. The pulsar operation contains + * enough state to define a pulsar operation such that it can be executed, measured, and possibly + * retried if needed. + * + * This function doesn't act *as* the operation. It merely maps the construction logic into + * a simple functional type, given the component functions. + * + * For additional parameterization, the command template is also provided. + */ +public class PulsarAdminCrtTopMapper extends PulsarOpMapper { + private final LongFunction topicUriFunc; + private final LongFunction enablePartionFunc; + private final LongFunction partitionNumFunc; + + public PulsarAdminCrtTopMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + LongFunction topicUriFunc, + LongFunction enablePartionFunc, + LongFunction partitionNumFunc) { + super(cmdTpl, clientSpace); + this.topicUriFunc = topicUriFunc; + this.enablePartionFunc = enablePartionFunc; + this.partitionNumFunc = partitionNumFunc; + } + + @Override + public PulsarOp apply(long value) { + String topicUri = topicUriFunc.apply(value); + String enablePartitionStr = enablePartionFunc.apply(value); + String partitionNumStr = partitionNumFunc.apply(value); + + if ( StringUtils.isBlank(topicUri) ) { + throw new RuntimeException("\"topic_uri\" parameter can't be empty when creating a Pulsar topic!"); + } + + boolean partitionTopic = BooleanUtils.toBoolean(enablePartitionStr); + + boolean invalidPartStr; + int partitionNum = 0; + if ( StringUtils.isBlank(partitionNumStr) || !StringUtils.isNumeric(partitionNumStr) ) { + invalidPartStr = true; + } else { + partitionNum = Integer.valueOf(partitionNumStr); + invalidPartStr = (partitionNum <= 0); + } + if (partitionTopic && invalidPartStr) { + throw new RuntimeException("Invalid specified value for \"partition_num\" parameter when creating partitioned topic!"); + } + + return new PulsarAdminCrtTopOp( + clientSpace, + topicUri, + partitionTopic, + partitionNum); + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java new file mode 100644 index 000000000..483580ba3 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminCrtTopOp.java @@ -0,0 +1,60 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarSpace; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.admin.Topics; + +public class PulsarAdminCrtTopOp implements PulsarOp { + + private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTopOp.class); + + private final PulsarSpace clientSpace; + private final String topicUri; + private final boolean partitionTopic; + private final int partitionNum; + + public PulsarAdminCrtTopOp(PulsarSpace clientSpace, + String topicUri, + boolean partitionTopic, + int partitionNum) { + this.clientSpace = clientSpace; + this.topicUri = topicUri; + this.partitionTopic = partitionTopic; + this.partitionNum = partitionNum; + } + + private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) { + int statusCode = e.getStatusCode(); + + // 409 conflict: resource already exists + if ( (statusCode >= 400) && (statusCode != 409) ) { + throw new RuntimeException(finalErrMsg); + } + } + + @Override + public void run() { + PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin(); + + Topics topics = pulsarAdmin.topics(); + + try { + if (!partitionTopic) { + topics.createNonPartitionedTopic(topicUri); + } + else { + topics.createPartitionedTopic(topicUri, partitionNum); + } + } catch (PulsarAdminException e) { + String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d", + topicUri, + partitionTopic, + partitionNum); + + processPulsarAdminException(e, errMsg); + } + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java index c9061e4f5..7d2fe51b0 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java @@ -1,17 +1,14 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.nb.api.errors.BasicError; -import org.apache.pulsar.client.api.BatchMessageContainer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.impl.BatchMessageContainerBase; -import org.apache.pulsar.client.impl.DefaultBatcherBuilder; import org.apache.pulsar.common.util.FutureUtil; import java.util.List; import java.util.concurrent.CompletableFuture; -public class PulsarBatchProducerEndOp implements PulsarOp { +public class PulsarBatchProducerEndOp extends SyncPulsarOp { @Override public void run() { List> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get(); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java index e16a9bfa3..95877493f 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java @@ -14,7 +14,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; -public class PulsarBatchProducerOp implements PulsarOp { +public class PulsarBatchProducerOp extends SyncPulsarOp { private final Schema pulsarSchema; private final String msgKey; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java index 1798df3a5..eca7bbd56 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartOp.java @@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.*; import java.util.List; import java.util.concurrent.CompletableFuture; -public class PulsarBatchProducerStartOp implements PulsarOp { +public class PulsarBatchProducerStartOp extends SyncPulsarOp { // TODO: ensure sane container lifecycle management public final static ThreadLocal>> threadLocalBatchMsgContainer = new ThreadLocal<>(); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index c221787c3..28b9da842 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -9,10 +9,9 @@ import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.schema.SchemaType; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -public class PulsarConsumerOp implements PulsarOp { +public class PulsarConsumerOp extends SyncPulsarOp { private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOp.java index cb865e2e6..9e4407c48 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOp.java @@ -3,6 +3,12 @@ package io.nosqlbench.driver.pulsar.ops; /** * Base type of all Pulsar Operations including Producers and Consumers. */ -public interface PulsarOp extends Runnable { +public interface PulsarOp { + /** + * Execute the operation, invoke the timeTracker when the operation ended. + * The timeTracker can be invoked in a separate thread, it is only used for metrics. + * @param timeTracker + */ + void run(Runnable timeTracker); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 0333d9b3e..5e1fcfa24 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; @@ -24,8 +25,7 @@ public class PulsarProducerMapper extends PulsarOpMapper { private final LongFunction asyncApiFunc; private final LongFunction keyFunc; private final LongFunction payloadFunc; - private final Counter bytesCounter; - private final Histogram messagesizeHistogram; + private final PulsarActivity pulsarActivity; public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, @@ -33,15 +33,13 @@ public class PulsarProducerMapper extends PulsarOpMapper { LongFunction asyncApiFunc, LongFunction keyFunc, LongFunction payloadFunc, - Counter bytesCounter, - Histogram messagesizeHistogram) { + PulsarActivity pulsarActivity) { super(cmdTpl, clientSpace); this.producerFunc = producerFunc; this.asyncApiFunc = asyncApiFunc; this.keyFunc = keyFunc; this.payloadFunc = payloadFunc; - this.bytesCounter = bytesCounter; - this.messagesizeHistogram = messagesizeHistogram; + this.pulsarActivity = pulsarActivity; } @Override @@ -57,7 +55,7 @@ public class PulsarProducerMapper extends PulsarOpMapper { asyncApi, msgKey, msgPayload, - bytesCounter, - messagesizeHistogram); + pulsarActivity + ); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 537e04833..3c7066591 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -2,6 +2,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; @@ -25,25 +26,26 @@ public class PulsarProducerOp implements PulsarOp { private final boolean asyncPulsarOp; private final Counter bytesCounter; private final Histogram messagesizeHistogram; + private final PulsarActivity pulsarActivity; public PulsarProducerOp(Producer producer, Schema schema, boolean asyncPulsarOp, String key, String payload, - Counter bytesCounter, - Histogram messagesizeHistogram) { + PulsarActivity pulsarActivity) { this.producer = producer; this.pulsarSchema = schema; this.msgKey = key; this.msgPayload = payload; this.asyncPulsarOp = asyncPulsarOp; - this.bytesCounter = bytesCounter; - this.messagesizeHistogram = messagesizeHistogram; + this.pulsarActivity = pulsarActivity; + this.bytesCounter = pulsarActivity.getBytesCounter(); + this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram(); } @Override - public void run() { + public void run(Runnable timeTracker) { if ((msgPayload == null) || msgPayload.isEmpty()) { throw new RuntimeException("Message payload (\"msg-value\") can't be empty!"); } @@ -80,17 +82,18 @@ public class PulsarProducerOp implements PulsarOp { logger.trace("failed sending message"); throw new RuntimeException(pce); } + timeTracker.run(); } else { try { + // we rely on blockIfQueueIsFull in order to throttle the request in this case CompletableFuture future = typedMessageBuilder.sendAsync(); - future.get(); - - /*.thenRun(() -> { -// System.out.println("Producing message succeeded: key - " + msgKey + "; payload - " + msgPayload); - }).exceptionally(ex -> { - System.out.println("Producing message failed: key - " + msgKey + "; payload - " + msgPayload); - return ex; - })*/ + future.whenComplete((messageId, error) -> { + timeTracker.run(); + }).exceptionally(ex -> { + logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload); + pulsarActivity.asyncOperationFailed(ex); + return null; + }); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java index a29fd9721..8a083d63c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderOp.java @@ -8,7 +8,7 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; -public class PulsarReaderOp implements PulsarOp { +public class PulsarReaderOp extends SyncPulsarOp { private final Reader reader; private final Schema pulsarSchema; private final boolean asyncPulsarOp; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 8fabe1ef1..50ba9abab 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -50,9 +50,6 @@ public class ReadyPulsarOp implements OpDispenser { } this.opFunc = resolve(); - - ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton")); - Supplier> opSupplier = scope.supplier(this::resolve); } @Override @@ -75,53 +72,32 @@ public class ReadyPulsarOp implements OpDispenser { throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); } - // Global parameter: topic_uri (applies only to non-Admin API) + // Global parameter: topic_uri LongFunction topicUriFunc = (l) -> null; - // Global parameter: async_api (applies only to non-Admin API) - LongFunction asyncApiFunc = (l) -> false; - if (!StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label)) { - if (cmdTpl.containsKey("topic_uri")) { - if (cmdTpl.containsAny("tenant", "namespace", "topic", "persistent")) { - throw new RuntimeException("You may not specify topic_uri with any of the piece-wise components 'persistence','tenant','namespace','topic'."); - } else if (cmdTpl.isStatic("topic_uri")) { - topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri"); - } else { - topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l); - } - } else if (cmdTpl.containsKey("topic")) { - if (cmdTpl.isStaticOrUnsetSet("persistence", "tenant", "namespace", "topic")) { - String persistence = cmdTpl.getStaticOr("persistence", "persistent") - .replaceAll("true", "persistent"); - - String tenant = cmdTpl.getStaticOr("tenant", "public"); - String namespace = cmdTpl.getStaticOr("namespace", "default"); - String topic = cmdTpl.getStaticOr("topic", ""); - - String composited = persistence + "://" + tenant + "/" + namespace + "/" + topic; - topicUriFunc = (l) -> composited; - } else { // some or all dynamic fields, composite into a single dynamic call - topicUriFunc = (l) -> - cmdTpl.getOr("persistent", l, "persistent").replaceAll("true", "persistent") - + "://" + cmdTpl.getOr("tenant", l, "public") - + "/" + cmdTpl.getOr("namespace", l, "default") - + "/" + cmdTpl.getOr("topic", l, ""); - } - } - - if (cmdTpl.containsKey("async_api")) { - if (cmdTpl.isStatic("async_api")) - asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api")); - else - throw new RuntimeException("\"async_api\" parameter cannot be dynamic!"); + if (cmdTpl.containsKey("topic_uri")) { + if (cmdTpl.isStatic("topic_uri")) { + topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri"); } else { - asyncApiFunc = (l) -> false; + topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l); } } + // Global parameter: async_api + LongFunction asyncApiFunc = (l) -> false; + + if (cmdTpl.containsKey("async_api")) { + if (cmdTpl.isStatic("async_api")) + asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api")); + else + throw new RuntimeException("\"async_api\" parameter cannot be dynamic!"); + } + // TODO: Complete implementation for websocket-producer and managed-ledger - if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN.label) ) { - return resolveAdminRequest(clientSpace); + if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TENNAME.label) ) { + return resolveAdminCrtTenname(clientSpace); + } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TOP.label)) { + return resolveAdminCrtParttop(clientSpace, topicUriFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) { return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc); } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { @@ -139,7 +115,8 @@ public class ReadyPulsarOp implements OpDispenser { } } - private LongFunction resolveAdminRequest(PulsarSpace clientSpace) { + // Admin API: create tenant and namespace + private LongFunction resolveAdminCrtTenname(PulsarSpace clientSpace) { if ( cmdTpl.isDynamic("admin_roles") || cmdTpl.isDynamic("allowed_clusters") ) { throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!"); @@ -187,7 +164,7 @@ public class ReadyPulsarOp implements OpDispenser { namespaceFunc = (l) -> null; } - return new PulsarAdminMapper( + return new PulsarAdminCrtTennamMapper( cmdTpl, clientSpace, adminRolesFunc, @@ -196,6 +173,33 @@ public class ReadyPulsarOp implements OpDispenser { namespaceFunc); } + // Admin API: create partitioned topic + private LongFunction resolveAdminCrtParttop( + PulsarSpace clientSpace, + LongFunction topic_uri_fun + ) { + LongFunction enablePartionFunc = (l) -> null; + if (cmdTpl.isStatic("enable_partition")) { + enablePartionFunc = (l) -> cmdTpl.getStatic("enable_partition"); + } else if (cmdTpl.isDynamic("enable_partition")) { + enablePartionFunc = (l) -> cmdTpl.getDynamic("enable_partition", l); + } + + LongFunction partitionNumFunc = (l) -> null; + if (cmdTpl.isStatic("partition_num")) { + partitionNumFunc = (l) -> cmdTpl.getStatic("partition_num"); + } else if (cmdTpl.isDynamic("partition_num")) { + partitionNumFunc = (l) -> cmdTpl.getDynamic("partition_num", l); + } + + return new PulsarAdminCrtTopMapper( + cmdTpl, + clientSpace, + topic_uri_fun, + enablePartionFunc, + partitionNumFunc); + } + private LongFunction resolveMsgSend( PulsarSpace clientSpace, LongFunction topic_uri_func, @@ -242,8 +246,7 @@ public class ReadyPulsarOp implements OpDispenser { async_api_func, keyFunc, valueFunc, - pulsarActivity.getBytesCounter(), - pulsarActivity.getMessagesizeHistogram()); + pulsarActivity); } private LongFunction resolveMsgConsume( diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/SyncPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/SyncPulsarOp.java new file mode 100644 index 000000000..0ab5805d8 --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/SyncPulsarOp.java @@ -0,0 +1,17 @@ +package io.nosqlbench.driver.pulsar.ops; + +/** + * Base type of all Sync Pulsar Operations including Producers and Consumers. + */ +public abstract class SyncPulsarOp implements PulsarOp { + + public void run(Runnable timeTracker) { + try { + this.run(); + } finally { + timeTracker.run(); + } + } + + public abstract void run(); +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index fa898b02f..060452115 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -26,7 +26,8 @@ public class PulsarActivityUtil { // Supported message operation types // TODO: websocket-producer and managed-ledger public enum OP_TYPES { - ADMIN("admin"), + ADMIN_CRT_TENNAME("admin-crt-tennam"), + ADMIN_CRT_TOP("admin-crt-top"), BATCH_MSG_SEND_START("batch-msg-send-start"), BATCH_MSG_SEND("batch-msg-send"), BATCH_MSG_SEND_END("batch-msg-send-end"), @@ -68,7 +69,7 @@ public class PulsarActivityUtil { public enum CLNT_CONF_KEY { serviceUrl("serviceUrl"), authPulginClassName("authPluginClassName"), - authParams("AuthParams"), + authParams("authParams"), pperationTimeoutMs("operationTimeoutMs"), statsIntervalSeconds("statsIntervalSeconds"), numIoThreads("numIoThreads"), diff --git a/driver-pulsar/src/main/resources/activities/pulsar.yaml b/driver-pulsar/src/main/resources/activities/pulsar.yaml index 4c20423f7..b89d120f3 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar.yaml @@ -19,20 +19,32 @@ params: async_api: "false" blocks: - - name: admin-block + - name: create-tennam-block tags: - phase: admin-api + phase: create-tenant-namespace + admin_task: true statements: - name: s1 - optype: admin + optype: admin-crt-tennam admin_roles: allowed_clusters: tenant: "{tenant}" namespace: "default" + - name: create-parttop-block + tags: + phase: create-topic + admin_task: true + statements: + - name: s1 + optype: admin-crt-top + enable_partition: "true" + partition_num: "5" + - name: batch-producer-block tags: phase: batch-producer + admin_task: false statements: - name: s1 optype: batch-msg-send-start @@ -57,6 +69,7 @@ blocks: - name: producer-block tags: phase: producer + admin_task: false statements: - name: s1 optype: msg-send @@ -73,6 +86,7 @@ blocks: - name: consumer-block tags: phase: consumer + admin_task: false statements: - name: s1 optype: msg-consume @@ -85,6 +99,7 @@ blocks: - name: reader-block tags: phase: reader + admin_task: false statements: - name: s1 optype: msg-read diff --git a/driver-pulsar/src/main/resources/pulsar.md b/driver-pulsar/src/main/resources/pulsar.md index 4e2d253d9..225a38eb2 100644 --- a/driver-pulsar/src/main/resources/pulsar.md +++ b/driver-pulsar/src/main/resources/pulsar.md @@ -4,15 +4,16 @@ - [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure) - [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters) - [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details) - - [1.4.1. Pulsar Admin API Command Block](#141-pulsar-admin-api-command-block) - - [1.4.2. Batch Producer Command Block](#142-batch-producer-command-block) - - [1.4.3. Producer Command Block](#143-producer-command-block) - - [1.4.4. Consumer Command Block](#144-consumer-command-block) - - [1.4.5. Reader Command Block](#145-reader-command-block) + - [1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace](#141-pulsar-admin-api-command-block---create-tenant-and-namespace) + - [1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)](#142-pulsar-admin-api-command-block---create-topic-partitioned-or-regular) + - [1.4.3. Batch Producer Command Block](#143-batch-producer-command-block) + - [1.4.4. Producer Command Block](#144-producer-command-block) + - [1.4.5. Consumer Command Block](#145-consumer-command-block) + - [1.4.6. Reader Command Block](#146-reader-command-block) - [1.5. Schema Support](#15-schema-support) - [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters) - [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example) - - [Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) + - [1.8. Appendix A. Template Global Setting File (config.properties)](#18-appendix-a-template-global-setting-file-configproperties) - [2. TODO : Design Revisit -- Advanced Driver Features](#2-todo--design-revisit----advanced-driver-features) - [2.1. Other Activity Parameters](#21-other-activity-parameters) - [2.2. API Caching](#22-api-caching) @@ -72,25 +73,25 @@ There are multiple sections in this file that correspond to different groups of * This section defines all configuration settings that are related with defining a PulsarClient object. * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#default-broker-urls-for-standalone-clusters) * **Pulsar Producer related settings**: * All settings under this section starts with **producer** prefix. * This section defines all configuration settings that are related with defining a Pulsar Producer object. * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer) * **Pulsar Consumer related settings**: * All settings under this section starts with **consumer** prefix. * This section defines all configuration settings that are related with defining a Pulsar Consumer object. * - See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) + See [Pulsar Doc Reference](http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer) * **Pulsar Reader related settings**: * All settings under this section starts with **reader** prefix. * This section defines all configuration settings that are related with defining a Pulsar Reader object. * - See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) + See [Pulsar Doc Reference](https://pulsar.apache.org/docs/en/client-libraries-java/#reader) In the future, when the support for other types of Pulsar workloads is added in NB Pulsar driver, there will be corresponding configuration @@ -141,10 +142,10 @@ params: blocks: - name: tags: - phase: + phase: statements: - name: - optype: + optype: ... ... - name: ... ... @@ -237,43 +238,68 @@ cycle level, **the ycle level setting will take priority!** ## 1.4. Pulsar Driver Yaml File - Command Block Details -### 1.4.1. Pulsar Admin API Command Block +### 1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace -**NOTE**: this functionality is only partially implemented at the moment -and doesn't function yet. - -Currently, the Pulsar Admin API Block is (planned) to only support -creating Pulsar tenants and namespaces. It has the following format: +This Pulsar Admin API Block is used to create Pulsar tenants and namespaces. It has the following format: ```yaml - - name: admin-block + - name: create-tennam-block tags: phase: create-tenant-namespace + admin_task: true statements: - name: s1 - optype: create-tenant + optype: admin-crt-tennam + admin_roles: + allowed_clusters: tenant: "{tenant}" - - name: s2 - optype: create-namespace - namespace: "{namespace}" + namespace: "default" ``` -In this command block, there are 2 statements (s1 and s2): +In this command block, there is only 1 statement (s1): -* Statement **s1** is used for creating a Pulsar tenant - * (Mandatory) **optype (create-tenant)** is the statement identifier +* Statement **s1** is used for creating a Pulsar tenant and a namespace + * (Mandatory) **optype (admin-crt-tennam)** is the statement identifier for this statement - * (Mandatory) **tenant** is the only statement parameter that - specifies the Pulsar tenant name which can either be dynamically - bound or statically assigned. -* Statement **s2** is used for creating a Pulsar namespace - * (Mandatory) **optype (create-namespace)** is the statement - identifier for this statement - * (Mandatory) **namespace** is the only statement parameter that - specifies the Pulsar namespace under the tenant created by statement - s1. Its name can either be dynamically bound or statically assigned. + * (Optional) **allowed_clusters** must be statically bound and it + specifies the cluster list that is allowed for a tenant. + * (Optional) **admin_roles** must be statically bound and it specifies + the super user role that is associated with a tenant. + * (Mandatory) **tenant** is the Pulsar tenant name to be created. It + can either be dynamically or statically bound. + * (Mandatory) **namespace** is the Pulsar namespace name to be created + under the above tenant. It also can be dynamically or statically bound. -### 1.4.2. Batch Producer Command Block +### 1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular) + +This Pulsar Admin API Block is used to create Pulsar topics. It has the following format: + +```yaml + - name: create-parttop-block + tags: + phase: create-topic + admin_task: true + statements: + - name: s1 + optype: admin-crt-top + enable_partition: "true" + partition_num: "5" +``` + +In this command block, there is only 1 statement (s1): + +* Statement **s1** is used for creating a Pulsar tenant and a namespace + * (Mandatory) **optype (admin-crt-top)** is the statement identifier + for this statement + * (Mandatory) **enable_partition** specifies whether to create a + partitioned topic. It can either be dynamically or statically bound. + * (Mandatory) **partition_num** specifies the number of partitions if + a partitioned topic is to be created. It also can be dynamically or + statically bound. + +**NOTE**: The topic name is bounded by the document level parameter "topic_uri". + +### 1.4.3. Batch Producer Command Block Batch producer command block is used to produce a batch of messages all at once by one NB cycle execution. A typical format of this command block is @@ -334,7 +360,7 @@ ratios: 1, , 1. * (Optional) **ratio**, when provided, MUST be 1. If not provided, it is default to 1. -### 1.4.3. Producer Command Block +### 1.4.4. Producer Command Block This is the regular Pulsar producer command block that produces one Pulsar message per NB cycle execution. A typical format of this command block is @@ -370,7 +396,7 @@ This command block only has 1 statements (s1): * (Mandatory) **msg_payload** specifies the payload of the generated message -### 1.4.4. Consumer Command Block +### 1.4.5. Consumer Command Block This is the regular Pulsar consumer command block that consumes one Pulsar message per NB cycle execution. A typical format of this command block is @@ -410,7 +436,7 @@ This command block only has 1 statements (s1): **NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**. -### 1.4.5. Reader Command Block +### 1.4.6. Reader Command Block This is the regular Pulsar reader command block that reads one Pulsar message per NB cycle execution. A typical format of this command block is @@ -528,7 +554,7 @@ environment. ``` -## Appendix A. Template Global Setting File (config.properties) +## 1.8. Appendix A. Template Global Setting File (config.properties) ```properties schema.type = schema.definition =