Merge pull request #299 from yabinmeng/main

- Pulsar Admin API for deletion and Async Admin API
This commit is contained in:
Jonathan Shook 2021-04-06 10:16:26 -05:00 committed by GitHub
commit f0548a60ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 756 additions and 417 deletions

View File

@ -20,13 +20,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
@ -89,7 +84,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
if ( useTls ) {
adminBuilder
.useKeyStoreTls(useTls)
.useKeyStoreTls(true)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
@ -98,9 +93,11 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
// Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection"
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarAdmin = adminBuilder.build();
ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData();
logger.debug(configurationData.toString());
} catch (PulsarClientException e) {
logger.error("Fail to create PulsarAdmin from global configuration!");
throw new RuntimeException("Fail to create PulsarAdmin from global configuration!");

View File

@ -1,113 +0,0 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class PulsarAdminCrtTennamOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTennamOp.class);
private final PulsarSpace clientSpace;
private final Set<String> adminRoleSet;
private final Set<String> allowedClusterSet;
private final String tenant;
private final String namespace;
public PulsarAdminCrtTennamOp(PulsarSpace clientSpace,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant,
String namespace) {
this.clientSpace = clientSpace;
this.adminRoleSet = adminRoleSet;
this.allowedClusterSet = allowedClusterSet;
this.tenant = tenant;
this.namespace = namespace;
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
int statusCode = e.getStatusCode();
// 409 conflict: resource already exists
if ( (statusCode >= 400) && (statusCode != 409) ) {
throw new RuntimeException(finalErrMsg);
}
}
@Override
public void run() {
if (StringUtils.isBlank(tenant) && !StringUtils.isBlank(namespace)) {
throw new RuntimeException("Can't create a namespace without a tenant!");
}
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
if (!StringUtils.isBlank(tenant)) {
Tenants tenants = pulsarAdmin.tenants();
// Check if the tenant already exists
TenantInfo tenantInfo = null;
try {
tenantInfo = pulsarAdmin.tenants().getTenantInfo(tenant);
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing
}
catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to retrieve tenant info. for pulsar tenant: " + tenant);
}
if (tenantInfo == null) {
tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {
tenantInfo.setAllowedClusters(allowedClusterSet);
} else {
tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata());
}
try {
tenants.createTenant(tenant, tenantInfo);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar tenant: " + tenant);
}
}
}
if (!StringUtils.isBlank(namespace)) {
Namespaces namespaces = pulsarAdmin.namespaces();
List<String> nsListWorkingArea = new ArrayList<>();
try {
nsListWorkingArea = namespaces.getNamespaces(tenant);
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing
}
catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to retrieve namespace info. for pulsar tenant: " + tenant);
}
// If te specified namespace doesn't exist yet, create it!
String fullNsName = tenant + "/" + namespace;
if (nsListWorkingArea.isEmpty() || !nsListWorkingArea.contains(fullNsName)) {
try {
namespaces.createNamespace(fullNsName);
} catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to create pulsar namespace: " + fullNsName);
}
}
}
}
}

View File

@ -1,92 +0,0 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import java.util.ArrayList;
import java.util.List;
public class PulsarAdminCrtTopOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminCrtTopOp.class);
private final PulsarSpace clientSpace;
private final String topicUri;
private final boolean partitionTopic;
private final int partitionNum;
private final String fullNsName;
public PulsarAdminCrtTopOp(PulsarSpace clientSpace,
String topicUri,
boolean partitionTopic,
int partitionNum) {
this.clientSpace = clientSpace;
this.topicUri = topicUri;
this.partitionTopic = partitionTopic;
this.partitionNum = partitionNum;
// Get tenant/namespace string
// - topicUri : persistent://<tenant>/<namespace>/<topic>
// - tmpStr : <tenant>/<namespace>/<topic>
// - fullNsName : <tenant>/<namespace>
String tmpStr = StringUtils.substringAfter(this.topicUri,"://");
this.fullNsName = StringUtils.substringBeforeLast(tmpStr, "/");
}
private void processPulsarAdminException(PulsarAdminException e, String finalErrMsg) {
int statusCode = e.getStatusCode();
// 409 conflict: resource already exists
if ( (statusCode >= 400) && (statusCode != 409) ) {
throw new RuntimeException(finalErrMsg);
}
}
@Override
public void run() {
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
List<String> topicListWorkingArea = new ArrayList<>();
try {
if (!partitionTopic) {
topicListWorkingArea = topics.getList(fullNsName);
}
else {
topicListWorkingArea = topics.getPartitionedTopicList(fullNsName);
}
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing
}
catch (PulsarAdminException e) {
processPulsarAdminException(e, "Failed to retrieve topic info.for pulsar namespace: " + fullNsName);
}
// If the topic doesn't exist, create it.
if (topicListWorkingArea.isEmpty() || !topicListWorkingArea.contains(topicUri)) {
try {
if (!partitionTopic) {
topics.createNonPartitionedTopic(topicUri);
}
else {
topics.createPartitionedTopic(topicUri, partitionNum);
}
} catch (PulsarAdminException e) {
String errMsg = String.format("Failed to create pulsar topic: %s (partition topic: %b; partition number: %d",
topicUri,
partitionTopic,
partitionNum);
processPulsarAdminException(e, errMsg);
}
}
}
}

View File

@ -0,0 +1,29 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import java.util.Set;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public abstract class PulsarAdminMapper extends PulsarOpMapper {
protected final LongFunction<Boolean> adminDelOpFunc;
protected PulsarAdminMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc) {
super(cmdTpl, clientSpace, asyncApiFunc);
this.adminDelOpFunc = adminDelOpFunc;
}
}

View File

@ -0,0 +1,44 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import java.util.Set;
import java.util.function.LongFunction;
/**
* This maps a set of specifier functions to a pulsar operation. The pulsar operation contains
* enough state to define a pulsar operation such that it can be executed, measured, and possibly
* retried if needed.
*
* This function doesn't act *as* the operation. It merely maps the construction logic into
* a simple functional type, given the component functions.
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminNamespaceMapper extends PulsarAdminMapper {
private final LongFunction<String> namespaceFunc;
public PulsarAdminNamespaceMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc,
LongFunction<String> namespaceFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc);
this.namespaceFunc = namespaceFunc;
}
@Override
public PulsarOp apply(long value) {
boolean asyncApi = asyncApiFunc.apply(value);
boolean adminDelOp = adminDelOpFunc.apply(value);
String namespace = namespaceFunc.apply(value);
return new PulsarAdminNamespaceOp(
clientSpace,
asyncApi,
adminDelOp,
namespace);
}
}

View File

@ -0,0 +1,88 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminNamespaceOp extends PulsarAdminOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminNamespaceOp.class);
private final String fullNsName;
public PulsarAdminNamespaceOp(PulsarSpace clientSpace,
boolean asyncApi,
boolean adminDelOp,
String fullNsName)
{
super(clientSpace, asyncApi, adminDelOp);
this.fullNsName = fullNsName;
}
@Override
public void run() {
// Do nothing if the namespace name is empty
if ( StringUtils.isBlank(fullNsName) ) return;
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Namespaces namespaces = pulsarAdmin.namespaces();
// Admin API - create tenants and namespaces
if (!adminDelOp) {
try {
if (!asyncApi) {
namespaces.createNamespace(fullNsName);
logger.trace("Successfully created namespace \"" + fullNsName + "\" synchronously!");
} else {
CompletableFuture<Void> future = namespaces.createNamespaceAsync(fullNsName);
future.whenComplete((unused, throwable) ->
logger.trace("Successfully created namespace \"" + fullNsName + "\" asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to create namespace \"" + fullNsName + "\" asynchronously!");
return null;
});
}
}
catch (PulsarAdminException.ConflictException ce) {
// do nothing if the namespace already exists
}
catch (PulsarAdminException e) {
e.printStackTrace();
throw new RuntimeException("Unexpected error when creating pulsar namespace: " + fullNsName);
}
}
// Admin API - delete tenants and namespaces
else {
try {
if (!asyncApi) {
namespaces.deleteNamespace(fullNsName, true);
logger.trace("Successfully deleted namespace \"" + fullNsName + "\" synchronously!");
} else {
CompletableFuture<Void> future = namespaces.deleteNamespaceAsync(fullNsName, true);
future.whenComplete((unused, throwable) ->
logger.trace("Successfully deleted namespace \"" + fullNsName + "\" asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to delete namespace \"" + fullNsName + "\" asynchronously!");
return null;
});
}
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing if the namespace doesn't exist
}
catch (PulsarAdminException e) {
e.printStackTrace();
throw new RuntimeException("Unexpected error when deleting pulsar namespace: " + fullNsName);
}
}
}
}

View File

@ -0,0 +1,32 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public abstract class PulsarAdminOp extends SyncPulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class);
protected final PulsarSpace clientSpace;
protected final boolean asyncApi;
protected final boolean adminDelOp;
protected PulsarAdminOp(PulsarSpace clientSpace,
boolean asyncApi,
boolean adminDelOp)
{
this.clientSpace = clientSpace;
this.asyncApi = asyncApi;
this.adminDelOp = adminDelOp;
}
}

View File

@ -16,37 +16,39 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminCrtTennamMapper extends PulsarOpMapper {
public class PulsarAdminTenantMapper extends PulsarAdminMapper {
private final LongFunction<Set<String>> adminRolesFunc;
private final LongFunction<Set<String>> allowedClustersFunc;
private final LongFunction<String> tenantFunc;
private final LongFunction<String> namespaceFunc;
public PulsarAdminCrtTennamMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc,
LongFunction<String> namespaceFunc) {
super(cmdTpl, clientSpace);
public PulsarAdminTenantMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc);
this.adminRolesFunc = adminRolesFunc;
this.allowedClustersFunc = allowedClustersFunc;
this.tenantFunc = tenantFunc;
this.namespaceFunc = namespaceFunc;
}
@Override
public PulsarOp apply(long value) {
boolean asyncApi = asyncApiFunc.apply(value);
boolean adminDelOp = adminDelOpFunc.apply(value);
Set<String> adminRoleSet = adminRolesFunc.apply(value);
Set<String> allowedClusterSet = allowedClustersFunc.apply(value);
String tenant = tenantFunc.apply(value);
String namespace = namespaceFunc.apply(value);
return new PulsarAdminCrtTennamOp(
return new PulsarAdminTenantOp(
clientSpace,
asyncApi,
adminDelOp,
adminRoleSet,
allowedClusterSet,
tenant,
namespace);
tenant);
}
}

View File

@ -0,0 +1,109 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminTenantOp extends PulsarAdminOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminTenantOp.class);
private final Set<String> adminRoleSet;
private final Set<String> allowedClusterSet;
private final String tenant;
public PulsarAdminTenantOp(PulsarSpace clientSpace,
boolean asyncApi,
boolean adminDelOp,
Set<String> adminRoleSet,
Set<String> allowedClusterSet,
String tenant)
{
super(clientSpace, asyncApi, adminDelOp);
this.adminRoleSet = adminRoleSet;
this.allowedClusterSet = allowedClusterSet;
this.tenant = tenant;
}
@Override
public void run() {
// Do nothing if the tenant name is empty
if ( StringUtils.isBlank(tenant) ) return;
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Tenants tenants = pulsarAdmin.tenants();
Namespaces namespaces = pulsarAdmin.namespaces();
// Admin API - create tenants and namespaces
if (!adminDelOp) {
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {
tenantInfo.setAllowedClusters(allowedClusterSet);
} else {
tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata());
}
try {
if (!asyncApi) {
tenants.createTenant(tenant, tenantInfo);
logger.trace("Successfully created tenant \"" + tenant + "\" synchronously!");
} else {
CompletableFuture<Void> future = tenants.createTenantAsync(tenant, tenantInfo);
future.whenComplete((unused, throwable) ->
logger.trace("Successfully created tenant \"" + tenant + "\" asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to create tenant \"" + tenant + "\" asynchronously!");
return null;
});
}
}
catch (PulsarAdminException.ConflictException ce) {
// do nothing if the tenant already exists
}
catch (PulsarAdminException e) {
e.printStackTrace();
throw new RuntimeException("Unexpected error when creating pulsar tenant: " + tenant);
}
}
// Admin API - delete tenants and namespaces
else {
try {
int nsNum = namespaces.getNamespaces(tenant).size();
// Only delete a tenant when there is no underlying namespaces
if ( nsNum == 0 ) {
if (!asyncApi) {
tenants.deleteTenant(tenant);
logger.trace("Successfully deleted tenant \"" + tenant + "\" synchronously!");
} else {
CompletableFuture<Void> future = tenants.deleteTenantAsync(tenant);
future.whenComplete((unused, throwable)
-> logger.trace("Successfully deleted tenant \"" + tenant + "\" asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to delete tenant \"" + tenant + "\" asynchronously!");
return null;
});
}
}
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing if the tenant doesn't exist
}
catch (PulsarAdminException e) {
e.printStackTrace();
throw new RuntimeException("Unexpected error when deleting pulsar tenant: " + tenant);
}
}
}
}

View File

@ -17,17 +17,20 @@ import java.util.function.LongFunction;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarAdminCrtTopMapper extends PulsarOpMapper {
public class PulsarAdminTopicMapper extends PulsarAdminMapper {
private final LongFunction<String> topicUriFunc;
private final LongFunction<String> enablePartionFunc;
private final LongFunction<String> partitionNumFunc;
public PulsarAdminCrtTopMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<String> topicUriFunc,
LongFunction<String> enablePartionFunc,
LongFunction<String> partitionNumFunc) {
super(cmdTpl, clientSpace);
public PulsarAdminTopicMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc,
LongFunction<String> topicUriFunc,
LongFunction<String> enablePartionFunc,
LongFunction<String> partitionNumFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc);
this.topicUriFunc = topicUriFunc;
this.enablePartionFunc = enablePartionFunc;
this.partitionNumFunc = partitionNumFunc;
@ -38,6 +41,8 @@ public class PulsarAdminCrtTopMapper extends PulsarOpMapper {
String topicUri = topicUriFunc.apply(value);
String enablePartitionStr = enablePartionFunc.apply(value);
String partitionNumStr = partitionNumFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
boolean adminDelOp = adminDelOpFunc.apply(value);
if ( StringUtils.isBlank(topicUri) ) {
throw new RuntimeException("\"topic_uri\" parameter can't be empty when creating a Pulsar topic!");
@ -50,17 +55,19 @@ public class PulsarAdminCrtTopMapper extends PulsarOpMapper {
if ( StringUtils.isBlank(partitionNumStr) || !StringUtils.isNumeric(partitionNumStr) ) {
invalidPartStr = true;
} else {
partitionNum = Integer.valueOf(partitionNumStr);
partitionNum = Integer.parseInt(partitionNumStr);
invalidPartStr = (partitionNum <= 0);
}
if (partitionTopic && invalidPartStr) {
throw new RuntimeException("Invalid specified value for \"partition_num\" parameter when creating partitioned topic!");
}
return new PulsarAdminCrtTopOp(
return new PulsarAdminTopicOp(
clientSpace,
topicUri,
partitionTopic,
partitionNum);
partitionNum,
asyncApi,
adminDelOp);
}
}

View File

@ -0,0 +1,148 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminTopicOp extends PulsarAdminOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminTopicOp.class);
private final String topicUri;
private final boolean partitionTopic;
private final int partitionNum;
private final String fullNsName;
public PulsarAdminTopicOp(PulsarSpace clientSpace,
String topicUri,
boolean partitionTopic,
int partitionNum,
boolean asyncApi,
boolean adminDelOp)
{
super(clientSpace, asyncApi, adminDelOp);
this.topicUri = topicUri;
this.partitionTopic = partitionTopic;
this.partitionNum = partitionNum;
// Get tenant/namespace string
// - topicUri : persistent://<tenant>/<namespace>/<topic>
// - tmpStr : <tenant>/<namespace>/<topic>
// - fullNsName : <tenant>/<namespace>
String tmpStr = StringUtils.substringAfter(this.topicUri,"://");
this.fullNsName = StringUtils.substringBeforeLast(tmpStr, "/");
}
// Check whether the specified topic already exists
private boolean checkTopicExistence(Topics topics, String topicUri) {
// Check the existence of the topic
List<String> topicListWorkingArea = new ArrayList<>();
try {
if (!partitionTopic) {
topicListWorkingArea = topics.getList(fullNsName);
}
else {
topicListWorkingArea = topics.getPartitionedTopicList(fullNsName);
}
}
catch (PulsarAdminException.NotFoundException nfe) {
// do nothing
}
catch (PulsarAdminException e) {
e.printStackTrace();
throw new RuntimeException("Failed to retrieve topic info.for pulsar namespace: " + fullNsName);
}
return ( !topicListWorkingArea.isEmpty() && topicListWorkingArea.contains(topicUri) );
}
@Override
public void run() {
PulsarAdmin pulsarAdmin = clientSpace.getPulsarAdmin();
Topics topics = pulsarAdmin.topics();
try {
// Create the topic
if (!adminDelOp) {
if (!partitionTopic) {
if (!asyncApi) {
topics.createNonPartitionedTopic(topicUri);
logger.trace("Successfully created non-partitioned topic \"" + topicUri + "\" synchronously!");
} else {
CompletableFuture<Void> future = topics.createNonPartitionedTopicAsync(topicUri);
future.whenComplete((unused, throwable)
-> logger.trace("Successfully created non-partitioned topic \"" + topicUri + "\" asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to create non-partitioned topic \"" + topicUri + "\" asynchronously!");
return null;
});
}
} else {
if (!asyncApi) {
topics.createPartitionedTopic(topicUri, partitionNum);
logger.trace("Successfully created partitioned topic \"" + topicUri + "\"" +
"(partition_num: " + partitionNum + ") synchronously!");
} else {
CompletableFuture<Void> future = topics.createPartitionedTopicAsync(topicUri, partitionNum);
future.whenComplete((unused, throwable)
-> logger.trace("Successfully created partitioned topic \"" + topicUri + "\"" +
"(partition_num: " + partitionNum + ") asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to create partitioned topic \"" + topicUri + "\"" +
"(partition_num: " + partitionNum + ") asynchronously!");
return null;
});
}
}
}
// Delete the topic
else {
if (!partitionTopic) {
if (!asyncApi) {
topics.delete(topicUri, true);
logger.trace("Successfully deleted non-partitioned topic \"" + topicUri + "\" synchronously!");
} else {
CompletableFuture<Void> future = topics.deleteAsync(topicUri, true);
future.whenComplete((unused, throwable)
-> logger.trace("Successfully deleted non-partitioned topic \"" + topicUri + "\" asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to delete non-partitioned topic \"" + topicUri + "\" asynchronously!");
return null;
});
}
} else {
if (!asyncApi) {
topics.deletePartitionedTopic(topicUri, true);
logger.trace("Successfully deleted partitioned topic \"" + topicUri + "\" synchronously!");
} else {
CompletableFuture<Void> future = topics.deletePartitionedTopicAsync(topicUri, true);
future.whenComplete((unused, throwable)
-> logger.trace("Successfully deleted partitioned topic \"" + topicUri + "\" asynchronously!"))
.exceptionally(ex -> {
logger.error("Failed to delete partitioned topic \"" + topicUri + "\" asynchronously!");
return null;
});
}
}
}
}
catch (PulsarAdminException e) {
e.printStackTrace();
String errMsg = String.format("Unexpected error when %s pulsar topic: %s (partition topic: %b; partition number: %d)",
(!adminDelOp ? "creating" : "deleting"),
topicUri,
partitionTopic,
partitionNum);
throw new RuntimeException(errMsg);
}
}
}

View File

@ -3,11 +3,15 @@ package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import java.util.function.LongFunction;
public class PulsarBatchProducerEndMapper extends PulsarOpMapper {
public PulsarBatchProducerEndMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace) {
super(cmdTpl, clientSpace);
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc);
}
@Override

View File

@ -12,9 +12,10 @@ public class PulsarBatchProducerMapper extends PulsarOpMapper {
public PulsarBatchProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc) {
super(cmdTpl, clientSpace);
super(cmdTpl, clientSpace, asyncApiFunc);
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
}

View File

@ -12,15 +12,15 @@ public class PulsarBatchProducerStartMapper extends PulsarOpMapper {
public PulsarBatchProducerStartMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Producer<?>> batchProducerFunc) {
super(cmdTpl, clientSpace);
super(cmdTpl, clientSpace, asyncApiFunc);
this.batchProducerFunc = batchProducerFunc;
}
@Override
public PulsarOp apply(long value) {
Producer<?> batchProducer = batchProducerFunc.apply(value);
return new PulsarBatchProducerStartOp(batchProducer);
}
}

View File

@ -21,19 +21,17 @@ import java.util.function.LongFunction;
*/
public class PulsarConsumerMapper extends PulsarOpMapper {
private final LongFunction<Consumer<?>> consumerFunc;
private final LongFunction<Boolean> asyncApiFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Consumer<?>> consumerFunc,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Consumer<?>> consumerFunc,
Counter bytesCounter,
Histogram messagesizeHistogram) {
super(cmdTpl, clientSpace);
super(cmdTpl, clientSpace, asyncApiFunc);
this.consumerFunc = consumerFunc;
this.asyncApiFunc = asyncApiFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
}

View File

@ -10,10 +10,14 @@ import java.util.function.LongFunction;
public abstract class PulsarOpMapper implements LongFunction<PulsarOp> {
protected final CommandTemplate cmdTpl;
protected final PulsarSpace clientSpace;
protected final LongFunction<Boolean> asyncApiFunc;
public PulsarOpMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace) {
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc)
{
this.cmdTpl = cmdTpl;
this.clientSpace = clientSpace;
this.asyncApiFunc = asyncApiFunc;
}
}

View File

@ -22,21 +22,19 @@ import java.util.function.LongFunction;
*/
public class PulsarProducerMapper extends PulsarOpMapper {
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<Boolean> asyncApiFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> payloadFunc;
private final PulsarActivity pulsarActivity;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Producer<?>> producerFunc,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Producer<?>> producerFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc,
PulsarActivity pulsarActivity) {
super(cmdTpl, clientSpace);
super(cmdTpl, clientSpace, asyncApiFunc);
this.producerFunc = producerFunc;
this.asyncApiFunc = asyncApiFunc;
this.keyFunc = keyFunc;
this.payloadFunc = payloadFunc;
this.pulsarActivity = pulsarActivity;

View File

@ -54,6 +54,7 @@ public class PulsarProducerOp implements PulsarOp {
if ((msgKey != null) && (!msgKey.isEmpty())) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
int messagesize;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
@ -87,9 +88,7 @@ public class PulsarProducerOp implements PulsarOp {
try {
// we rely on blockIfQueueIsFull in order to throttle the request in this case
CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
future.whenComplete((messageId, error) -> {
timeTracker.run();
}).exceptionally(ex -> {
future.whenComplete((messageId, error) -> timeTracker.run()).exceptionally(ex -> {
logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
pulsarActivity.asyncOperationFailed(ex);
return null;

View File

@ -10,15 +10,14 @@ import java.util.function.LongFunction;
public class PulsarReaderMapper extends PulsarOpMapper {
private final LongFunction<Reader<?>> readerFunc;
private final LongFunction<Boolean> asyncApiFunc;
public PulsarReaderMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
LongFunction<Reader<?>> readerFunc,
LongFunction<Boolean> asyncApiFunc) {
super(cmdTpl, clientSpace);
LongFunction<Boolean> asyncApiFunc,
LongFunction<Reader<?>> readerFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc);
this.readerFunc = readerFunc;
this.asyncApiFunc = asyncApiFunc;
}
@Override

View File

@ -4,8 +4,8 @@ import io.nosqlbench.driver.pulsar.*;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.scoping.ScopedSupplier;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Consumer;
@ -15,7 +15,6 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
@ -57,14 +56,10 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return opFunc.apply(value);
}
private boolean isBoolean(String str) {
return StringUtils.equalsAnyIgnoreCase(str, "yes", "true");
}
private LongFunction<PulsarOp> resolve() {
if (!cmdTpl.containsKey("optype") || !cmdTpl.isStatic("optype")) {
throw new RuntimeException("Statement parameter \"optype\" must have a valid value!");
throw new RuntimeException("Statement parameter \"optype\" must be static and have a valid value!");
}
String stmtOpType = cmdTpl.getStatic("optype");
@ -74,30 +69,39 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
// Global parameter: topic_uri
LongFunction<String> topicUriFunc = (l) -> null;
if (cmdTpl.containsKey("topic_uri")) {
if (cmdTpl.isStatic("topic_uri")) {
topicUriFunc = (l) -> cmdTpl.getStatic("topic_uri");
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
topicUriFunc = (l) -> cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label);
} else {
topicUriFunc = (l) -> cmdTpl.getDynamic("topic_uri", l);
topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l);
}
}
// Global parameter: async_api
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey("async_api")) {
if (cmdTpl.isStatic("async_api"))
asyncApiFunc = (l) -> isBoolean(cmdTpl.getStatic("async_api"));
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label))
asyncApiFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic("PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label"));
else
throw new RuntimeException("\"async_api\" parameter cannot be dynamic!");
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label + "\" parameter cannot be dynamic!");
}
// Global parameter: admin_delop
LongFunction<Boolean> adminDelOpFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label))
adminDelOpFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!");
}
// TODO: Complete implementation for websocket-producer and managed-ledger
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TENNAME.label) ) {
return resolveAdminCrtTenname(clientSpace);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_CRT_TOP.label)) {
return resolveAdminCrtParttop(clientSpace, topicUriFunc);
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) {
return resolveAdminTenant(clientSpace, asyncApiFunc, adminDelOpFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_NAMESPACE.label)) {
return resolveAdminNamespace(clientSpace, asyncApiFunc, adminDelOpFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TOPIC.label)) {
return resolveAdminTopic(clientSpace, topicUriFunc, asyncApiFunc, adminDelOpFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
@ -105,18 +109,22 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) {
return resolveMsgBatchSendStart(clientSpace, topicUriFunc);
return resolveMsgBatchSendStart(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) {
return resolveMsgBatchSend(clientSpace);
return resolveMsgBatchSend(clientSpace, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) {
return resolveMsgBatchSendEnd(clientSpace);
return resolveMsgBatchSendEnd(clientSpace, asyncApiFunc);
} else {
throw new RuntimeException("Unsupported Pulsar operation type");
}
}
// Admin API: create tenant and namespace
private LongFunction<PulsarOp> resolveAdminCrtTenname(PulsarSpace clientSpace) {
// Admin API: create tenant
private LongFunction<PulsarOp> resolveAdminTenant(
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc)
{
if ( cmdTpl.isDynamic("admin_roles") ||
cmdTpl.isDynamic("allowed_clusters") ) {
throw new RuntimeException("\"admin_roles\" or \"allowed_clusters\" parameter must NOT be dynamic!");
@ -155,6 +163,22 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
tenantFunc = (l) -> null;
}
return new PulsarAdminTenantMapper(
cmdTpl,
clientSpace,
asyncApiFunc,
adminDelOpFunc,
adminRolesFunc,
allowedClustersFunc,
tenantFunc);
}
// Admin API: create tenant
private LongFunction<PulsarOp> resolveAdminNamespace(
PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc)
{
LongFunction<String> namespaceFunc;
if (cmdTpl.isStatic("namespace")) {
namespaceFunc = (l) -> cmdTpl.getStatic("namespace");
@ -164,19 +188,20 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
namespaceFunc = (l) -> null;
}
return new PulsarAdminCrtTennamMapper(
return new PulsarAdminNamespaceMapper(
cmdTpl,
clientSpace,
adminRolesFunc,
allowedClustersFunc,
tenantFunc,
asyncApiFunc,
adminDelOpFunc,
namespaceFunc);
}
// Admin API: create partitioned topic
private LongFunction<PulsarOp> resolveAdminCrtParttop(
private LongFunction<PulsarOp> resolveAdminTopic(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_fun
LongFunction<String> topic_uri_fun,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc
) {
LongFunction<String> enablePartionFunc = (l) -> null;
if (cmdTpl.isStatic("enable_partition")) {
@ -192,9 +217,11 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
partitionNumFunc = (l) -> cmdTpl.getDynamic("partition_num", l);
}
return new PulsarAdminCrtTopMapper(
return new PulsarAdminTopicMapper(
cmdTpl,
clientSpace,
asyncApiFunc,
adminDelOpFunc,
topic_uri_fun,
enablePartionFunc,
partitionNumFunc);
@ -242,8 +269,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return new PulsarProducerMapper(
cmdTpl,
clientSpace,
producerFunc,
async_api_func,
producerFunc,
keyFunc,
valueFunc,
pulsarActivity);
@ -311,7 +338,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
consumer_name_func.apply(l)
);
return new PulsarConsumerMapper(cmdTpl, clientSpace, consumerFunc, async_api_func,
return new PulsarConsumerMapper(cmdTpl, clientSpace, async_api_func, consumerFunc,
pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram());
}
@ -345,13 +372,14 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
start_msg_pos_str_func.apply(l)
);
return new PulsarReaderMapper(cmdTpl, clientSpace, readerFunc, async_api_func);
return new PulsarReaderMapper(cmdTpl, clientSpace, async_api_func, readerFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSendStart(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func
) {
LongFunction<String> topic_uri_func,
LongFunction<Boolean> asyncApiFunc)
{
LongFunction<String> cycle_batch_producer_name_func;
if (cmdTpl.isStatic("batch_producer_name")) {
cycle_batch_producer_name_func = (l) -> cmdTpl.getStatic("batch_producer_name");
@ -364,10 +392,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Producer<?>> batchProducerFunc =
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l));
return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, batchProducerFunc);
return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, asyncApiFunc, batchProducerFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSend(PulsarSpace clientSpace) {
private LongFunction<PulsarOp> resolveMsgBatchSend(PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc)
{
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
@ -393,11 +423,14 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return new PulsarBatchProducerMapper(
cmdTpl,
clientSpace,
asyncApiFunc,
keyFunc,
valueFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSendEnd(PulsarSpace clientSpace) {
return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace);
private LongFunction<PulsarOp> resolveMsgBatchSendEnd(PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc)
{
return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace, asyncApiFunc);
}
}

View File

@ -26,8 +26,9 @@ public class PulsarActivityUtil {
// Supported message operation types
// TODO: websocket-producer and managed-ledger
public enum OP_TYPES {
ADMIN_CRT_TENNAME("admin-crt-tennam"),
ADMIN_CRT_TOP("admin-crt-top"),
ADMIN_TENANT("admin-tenant"),
ADMIN_NAMESPACE("admin-namespace"),
ADMIN_TOPIC("admin-topic"),
BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"),
@ -45,6 +46,21 @@ public class PulsarActivityUtil {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public enum DOC_LEVEL_PARAMS {
TOPIC_URI("topic_uri"),
ASYNC_API("async_api"),
ADMIN_DELOP("admin_delop");
public final String label;
DOC_LEVEL_PARAMS(String label) {
this.label = label;
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(param));
}
///////
// Valid persistence type

View File

@ -0,0 +1,22 @@
bindings:
# 20 namespaces: 10 tenants, 2 namespaces/tenant
tenant: Mod(20); Div(2L); ToString(); Prefix("tnt")
namespace: Mod(2); ToString(); Prefix("ns")
params:
# "true" - asynchronous Pulsar Admin API
# "false" - synchronous Pulsar Admin API
async_api: "true"
# "true" - delete namespace
# "false" - create namespace
admin_delop: "false"
blocks:
- name: create-namespace-block
tags:
phase: admin-namespace
admin_task: true
statements:
- name: s1
optype: admin-namespace
namespace: "{tenant}/{namespace}"

View File

@ -0,0 +1,23 @@
bindings:
# 10 tenants
tenant: Mod(10); ToString(); Prefix("tnt")
params:
# "true" - asynchronous Pulsar Admin API
# "false" - synchronous Pulsar Admin API
async_api: "true"
# "true" - delete tenant
# "false" - create tenant
admin_delop: "false"
blocks:
- name: create-tenant-block
tags:
phase: admin-tenant
admin_task: true
statements:
- name: s1
optype: admin-tenant
admin_roles:
allowed_clusters:
tenant: "{tenant}"

View File

@ -0,0 +1,25 @@
bindings:
# 100 topics: 10 tenants, 2 namespaces/tenant, 5 topics/namespace
tenant: Mod(100); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
params:
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
# "true" - asynchronous Pulsar Admin API
# "false" - synchronous Pulsar Admin API
async_api: "true"
# "true" - delete topic
# "false" - create topic
admin_delop: "false"
blocks:
- name: create-topic-block
tags:
phase: admin-topic
admin_task: true
statements:
- name: s1
optype: admin-topic
enable_partition: "false"
partition_num: "5"

View File

@ -1,46 +1,19 @@
description: |
Test workload for new pulsar driver.
There is no default scenario. You must specify one of the named scenarios
below like send100 or recv100.
You can specify the number of tenants like `tenants=100`. This is the default.
bindings:
mykey: NumberNameToString();
# message key and value
mykey: NumberNameToString()
sensor_id: ToUUID();ToString();
# sensor_type:
reading_time: ToDateTime();
reading_value: ToFloat(100);
tenant: Mod(1000); Div(10L); ToString(); Prefix("tnt")
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types:
params:
#topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar2"
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
async_api: "true"
blocks:
- name: create-tennam-block
tags:
phase: create-tenant-namespace
admin_task: true
statements:
- name: s1
optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}"
namespace: "default"
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "false"
partition_num: "5"
- name: batch-producer-block
tags:
phase: batch-producer
@ -54,13 +27,13 @@ blocks:
- name: s2
optype: batch-msg-send
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
ratio: 100
- name: s3
optype: batch-msg-send-end
@ -75,13 +48,13 @@ blocks:
optype: msg-send
# producer_name: {producer_name}
msg_key: "{mykey}"
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
msg_value: |
{
"SensorID": "{sensor_id}",
"SensorType": "Temperature",
"ReadingTime": "{reading_time}",
"ReadingValue": {reading_value}
}
- name: consumer-block
tags:
@ -92,7 +65,7 @@ blocks:
optype: msg-consume
topic_names:
topics_pattern:
subscription_name: "mynbsub_test"
subscription_name: "mysub"
subscription_type:
consumer_name:

View File

@ -1,15 +1,8 @@
description: |
Test workload for new pulsar driver.
There is no default scenario. You must specify one of the named scenarios
below like send100 or recv100.
You can specify the number of tenants like `tenants=100`. This is the default.
bindings:
# message key and value
mykey: NumberNameToString()
myvalue: AlphaNumericString(20)
# Admin API - create tenant, namespace, and topic
tenant: Mod(1000); Div(10L); ToString(); Prefix("tnt")
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
@ -19,28 +12,6 @@ params:
async_api: "true"
blocks:
- name: create-tennam-block
tags:
phase: create-tenant-namespace
admin_task: true
statements:
- name: s1
optype: admin-crt-tennam
admin_roles:
allowed_clusters:
tenant: "{tenant}"
namespace: "{namespace}"
- name: create-parttop-block
tags:
phase: create-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "false"
partition_num: "5"
- name: batch-producer-block
tags:
phase: batch-producer
@ -80,7 +51,7 @@ blocks:
optype: msg-consume
topic_names:
topics_pattern:
subscription_name: "mynbsub_test"
subscription_name: "mysub"
subscription_type:
consumer_name:

View File

@ -4,12 +4,13 @@
- [1.3. NB Pulsar Driver Yaml File - High Level Structure](#13-nb-pulsar-driver-yaml-file---high-level-structure)
- [1.3.1. NB Cycle Level Parameters vs. Global Level Parameters](#131-nb-cycle-level-parameters-vs-global-level-parameters)
- [1.4. Pulsar Driver Yaml File - Command Block Details](#14-pulsar-driver-yaml-file---command-block-details)
- [1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace](#141-pulsar-admin-api-command-block---create-tenant-and-namespace)
- [1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)](#142-pulsar-admin-api-command-block---create-topic-partitioned-or-regular)
- [1.4.3. Batch Producer Command Block](#143-batch-producer-command-block)
- [1.4.4. Producer Command Block](#144-producer-command-block)
- [1.4.5. Consumer Command Block](#145-consumer-command-block)
- [1.4.6. Reader Command Block](#146-reader-command-block)
- [1.4.1. Pulsar Admin API Command Block - Create Tenants](#141-pulsar-admin-api-command-block---create-tenants)
- [1.4.2. Pulsar Admin API Command Block - Create Namespaces](#142-pulsar-admin-api-command-block---create-namespaces)
- [1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular)](#143-pulsar-admin-api-command-block---create-topics-partitioned-or-regular)
- [1.4.4. Batch Producer Command Block](#144-batch-producer-command-block)
- [1.4.5. Producer Command Block](#145-producer-command-block)
- [1.4.6. Consumer Command Block](#146-consumer-command-block)
- [1.4.7. Reader Command Block](#147-reader-command-block)
- [1.5. Schema Support](#15-schema-support)
- [1.6. NB Activity Execution Parameters](#16-nb-activity-execution-parameters)
- [1.7. NB Pulsar Driver Execution Example](#17-nb-pulsar-driver-execution-example)
@ -22,8 +23,9 @@
# 1. NoSQLBench (NB) Pulsar Driver Overview
This driver allows you to simulate and run different types of workloads (as below) against a Pulsar cluster through NoSQLBench (NB).
* Admin API - create tenant and namespace
* Admin API - create topic, partition or not
* Admin API - create tenants
* Admin API - create namespaces
* Admin API - create topics
* Producer
* Consumer
* Reader
@ -122,12 +124,13 @@ At high level, Pulsar driver yaml file has the following structure:
Right now, the following command blocks are already supported or will be
added in the near future. We'll go through each of these command blocks
with more details in later sections.
* (future) **admin-block**: support for Pulsar Admin API, starting
with using NB to create tenants and namespaces.
* **batch-producer-block**: Pulsar batch producer
* **producer-block**: Pulsar producer
* **consumer-block**: Pulsar consumer
* **reader-block**: Pulsar reader
* (Pulsar Admin API) **create-tenant-block**: create/delete tenants
* (Pulsar Admin API) **create-namespace-block**: create/delete namespaces
* (Pulsar Admin API) **create-topic-block**: create/delete topics
* (Pulsar Client API) **batch-producer-block**: batch producer
* (Pulsar Client API) **producer-block**: producer
* (Pulsar Client API) **consumer-block**: consumer
* (Pulsar Client API) **reader-block**: reader
```yaml
description: |
@ -136,10 +139,10 @@ description: |
bindings:
... ...
# global parameters that apply to all Pulsar client types:
params:
topic_uri: "<pulsar_topic_name>"
async_api: "false"
admin_delop: "false"
blocks:
- name: <command_block_1>
@ -240,28 +243,27 @@ cycle level, **the cycle level setting will take priority!**
## 1.4. Pulsar Driver Yaml File - Command Block Details
### 1.4.1. Pulsar Admin API Command Block - Create Tenant and Namespace
### 1.4.1. Pulsar Admin API Command Block - Create Tenants
This Pulsar Admin API Block is used to create Pulsar tenants and namespaces. It has the following format:
This Pulsar Admin API Block is used to create Pulsar tenants. It has the following format:
```yaml
- name: create-tennam-block
- name: create-tenant-block
tags:
phase: create-tenant-namespace
phase: admin-tenant
admin_task: true
statements:
- name: s1
optype: admin-crt-tennam
optype: admin-tenant
admin_roles:
allowed_clusters:
tenant: "{tenant}"
namespace: "default"
```
In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar tenant and a namespace
* (Mandatory) **optype (admin-crt-tennam)** is the statement identifier
* Statement **s1** is used for creating a Pulsar tenant
* (Mandatory) **optype (admin-tenant)** is the statement identifier
for this statement
* (Optional) **allowed_clusters** must be statically bound and it
specifies the cluster list that is allowed for a tenant.
@ -269,22 +271,43 @@ In this command block, there is only 1 statement (s1):
the super user role that is associated with a tenant.
* (Mandatory) **tenant** is the Pulsar tenant name to be created. It
can either be dynamically or statically bound.
### 1.4.2. Pulsar Admin API Command Block - Create Namespaces
This Pulsar Admin API Block is used to create Pulsar namespaces. It has the following format:
```yaml
- name: create-namespace-block
tags:
phase: admin-namespace
admin_task: true
statements:
- name: s1
optype: admin-namespace
namespace: "{tenant}/{namespace}"
```
In this command block, there is only 1 statement (s1):
* Statement **s1** is used for creating a Pulsar namespace in format "<tenant>/<namespace>"
* (Mandatory) **optype (admin-namespace)** is the statement identifier
for this statement
* (Mandatory) **namespace** is the Pulsar namespace name to be created
under the above tenant. It also can be dynamically or statically bound.
### 1.4.2. Pulsar Admin API Command Block - Create Topic (Partitioned or Regular)
### 1.4.3. Pulsar Admin API Command Block - Create Topics (Partitioned or Regular)
This Pulsar Admin API Block is used to create Pulsar topics. It has the following format:
```yaml
- name: create-parttop-block
- name: create-topic-block
tags:
phase: create-topic
phase: admin-topic
admin_task: true
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
optype: admin-topic
enable_partition: "false"
partition_num: "5"
```
@ -301,7 +324,7 @@ In this command block, there is only 1 statement (s1):
**NOTE**: The topic name is bounded by the document level parameter "topic_uri".
### 1.4.3. Batch Producer Command Block
### 1.4.4. Batch Producer Command Block
Batch producer command block is used to produce a batch of messages all at
once by one NB cycle execution. A typical format of this command block is
@ -362,7 +385,7 @@ ratios: 1, <batch_num>, 1.
* (Optional) **ratio**, when provided, MUST be 1. If not provided, it
is default to 1.
### 1.4.4. Producer Command Block
### 1.4.5. Producer Command Block
This is the regular Pulsar producer command block that produces one Pulsar
message per NB cycle execution. A typical format of this command block is
@ -398,7 +421,7 @@ This command block only has 1 statements (s1):
* (Mandatory) **msg_payload** specifies the payload of the generated
message
### 1.4.5. Consumer Command Block
### 1.4.6. Consumer Command Block
This is the regular Pulsar consumer command block that consumes one Pulsar
message per NB cycle execution. A typical format of this command block is
@ -438,7 +461,7 @@ This command block only has 1 statements (s1):
**NOTE 2**: if both **topic_names** and **topics_pattern** are not provided, consumer topic name is default to the document level parameter **topic_uri**.
### 1.4.6. Reader Command Block
### 1.4.7. Reader Command Block
This is the regular Pulsar reader command block that reads one Pulsar
message per NB cycle execution. A typical format of this command block is
@ -542,9 +565,8 @@ environment.
<nb_cmd> run driver=pulsar tags=phase:producer threads=100 cycles=100K web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml
```
2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads;
put NB execution metrics in a specified metrics folder
2. Run Pulsar producer batch API to produce 1M messages with 2 NB threads.
**NOTE**: *seq=* must have **concat** value in order to make the batch API working properly!
```bash
<nb_cmd> run driver=pulsar seq=concat tags=phase:batch-producer threads=2 cycles=1M web_url=http://localhost:8080 service_url=pulsar://localhost:6650 config=<dir>/config.properties yaml=<dir>/pulsar.yaml --report-csv-to <metrics_folder_path>
```