NB Pulsar driver enhancement (milestone 11: https://github.com/nosqlbench/nosqlbench/milestone/11)

- add support for message properties
- add support for end-to-end latency measurement (latency histogram)
- add support for message out-of-order and loss detect
- add async api for consumer
- split single-topic consumer and multi-topic consumer
- SSL/TLS bug fix
- Code cleanup
This commit is contained in:
Yabin Meng 2021-09-22 17:17:53 -05:00
parent 99982ce74d
commit 3ef807172b
30 changed files with 1561 additions and 679 deletions

View File

@ -41,7 +41,7 @@ public class PulsarAction implements SyncAction {
pulsarOp = readyPulsarOp.apply(cycle);
} catch (Exception bindException) {
// if diagnostic mode ...
activity.getErrorhandler().handleError(bindException, cycle, 0);
activity.getErrorHandler().handleError(bindException, cycle, 0);
throw new RuntimeException(
"while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
);
@ -56,7 +56,7 @@ public class PulsarAction implements SyncAction {
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity
.getErrorhandler()
.getErrorHandler()
.handleError(err, cycle, System.nanoTime() - start);
if (!errorDetail.isRetryable()) {
break;

View File

@ -20,117 +20,74 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.api.*;
import java.util.Map;
public class PulsarActivity extends SimpleActivity implements ActivityDefObserver {
private final static Logger logger = LogManager.getLogger(PulsarActivity.class);
public Timer bindTimer;
public Timer executeTimer;
public Counter bytesCounter;
public Histogram messagesizeHistogram;
public Timer createTransactionTimer;
public Timer commitTransactionTimer;
private Counter bytesCounter;
private Histogram messageSizeHistogram;
private Timer bindTimer;
private Timer executeTimer;
private Timer createTransactionTimer;
private Timer commitTransactionTimer;
// Metrics for NB Pulsar driver milestone: https://github.com/nosqlbench/nosqlbench/milestone/11
// - end-to-end latency
private Histogram e2eMsgProcLatencyHistogram;
private PulsarSpaceCache pulsarCache;
private PulsarAdmin pulsarAdmin;
private PulsarNBClientConf clientConf;
// e.g. pulsar://localhost:6650
private PulsarNBClientConf pulsarNBClientConf;
private String pulsarSvcUrl;
// e.g. http://localhost:8080
private String webSvcUrl;
private PulsarAdmin pulsarAdmin;
private PulsarClient pulsarClient;
private Schema<?> pulsarSchema;
private NBErrorHandler errorhandler;
private NBErrorHandler errorHandler;
private OpSequence<OpDispenser<PulsarOp>> sequencer;
private volatile Throwable asyncOperationFailure;
// private Supplier<PulsarSpace> clientSupplier;
// private ThreadLocal<Supplier<PulsarClient>> tlClientSupplier;
public PulsarActivity(ActivityDef activityDef) {
super(activityDef);
}
private void initPulsarAdmin() {
@Override
public void shutdownActivity() {
super.shutdownActivity();
PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder()
.serviceHttpUrl(webSvcUrl);
try {
String authPluginClassName =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label);
String authParams =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label);
String useTlsStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label);
boolean useTls = BooleanUtils.toBoolean(useTlsStr);
String tlsTrustCertsFilePath =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
String tlsAllowInsecureConnectionStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
String tlsHostnameVerificationEnableStr =
(String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
adminBuilder
.useKeyStoreTls(true)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
// Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection"
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarAdmin = adminBuilder.build();
// Not supported in Pulsar 2.8.0
// ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData();
// logger.debug(configurationData.toString());
} catch (PulsarClientException e) {
logger.error("Fail to create PulsarAdmin from global configuration!");
throw new RuntimeException("Fail to create PulsarAdmin from global configuration!");
for (PulsarSpace pulsarSpace : pulsarCache.getAssociatedPulsarSpace()) {
pulsarSpace.shutdownPulsarSpace();
}
}
@Override
public void initActivity() {
super.initActivity();
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
messageSizeHistogram = ActivityMetrics.histogram(activityDef, "message_size");
bindTimer = ActivityMetrics.timer(activityDef, "bind");
executeTimer = ActivityMetrics.timer(activityDef, "execute");
createTransactionTimer = ActivityMetrics.timer(activityDef, "createtransaction");
commitTransactionTimer = ActivityMetrics.timer(activityDef, "committransaction");
createTransactionTimer = ActivityMetrics.timer(activityDef, "create_transaction");
commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction");
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency");
String pulsarClntConfFile =
activityDef.getParams().getOptionalString("config").orElse("config.properties");
clientConf = new PulsarNBClientConf(pulsarClntConfFile);
pulsarNBClientConf = new PulsarNBClientConf(pulsarClntConfFile);
pulsarSvcUrl =
activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
webSvcUrl =
activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080");
initPulsarAdmin();
initPulsarAdminAndClientObj();
createPulsarSchemaFromConf();
pulsarCache = new PulsarSpaceCache(this);
@ -138,60 +95,20 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
setDefaultsFromOpSequence(sequencer);
onActivityDefUpdate(activityDef);
this.errorhandler = new NBErrorHandler(
this.errorHandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics
);
}
public NBErrorHandler getErrorhandler() {
return errorhandler;
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
}
public OpSequence<OpDispenser<PulsarOp>> getSequencer() {
return sequencer;
}
public NBErrorHandler getErrorHandler() { return errorHandler; }
public PulsarNBClientConf getPulsarConf() {
return clientConf;
}
public String getPulsarSvcUrl() {
return pulsarSvcUrl;
}
public String getWebSvcUrl() { return webSvcUrl; }
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
public Timer getBindTimer() {
return bindTimer;
}
public Timer getExecuteTimer() {
return this.executeTimer;
}
public Counter getBytesCounter() {
return bytesCounter;
}
public Timer getCreateTransactionTimer() {
return createTransactionTimer;
}
public Timer getCommitTransactionTimer() {
return commitTransactionTimer;
}
public Histogram getMessagesizeHistogram() {
return messagesizeHistogram;
}
public OpSequence<OpDispenser<PulsarOp>> getSequencer() { return sequencer; }
public void failOnAsyncOperationFailure() {
if (asyncOperationFailure != null) {
@ -202,4 +119,116 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
public void asyncOperationFailed(Throwable ex) {
this.asyncOperationFailure = ex;
}
/**
* Initialize
* - PulsarAdmin object for adding/deleting tenant, namespace, and topic
* - PulsarClient object for message publishing and consuming
*/
private void initPulsarAdminAndClientObj() {
PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder()
.serviceHttpUrl(webSvcUrl);
ClientBuilder clientBuilder = PulsarClient.builder();
try {
Map<String, Object> clientConfMap = pulsarNBClientConf.getClientConfMap();
// Override "client.serviceUrl" setting in config.properties
clientConfMap.remove("serviceUrl");
clientBuilder.loadConf(clientConfMap).serviceUrl(pulsarSvcUrl);
// Pulsar Authentication
String authPluginClassName =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label);
String authParams =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
adminBuilder.authentication(authPluginClassName, authParams);
clientBuilder.authentication(authPluginClassName, authParams);
}
String useTlsStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label);
boolean useTls = BooleanUtils.toBoolean(useTlsStr);
String tlsTrustCertsFilePath =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
String tlsAllowInsecureConnectionStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
String tlsHostnameVerificationEnableStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( useTls ) {
adminBuilder
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
clientBuilder
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath)) {
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
}
// Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection"
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarAdmin = adminBuilder.build();
pulsarClient = clientBuilder.build();
////////////////
// Not supported in Pulsar 2.8.0
//
// ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData();
// logger.debug(configurationData.toString());
} catch (PulsarClientException e) {
logger.error("Fail to create PulsarAdmin and/or PulsarClient object from the global configuration!");
throw new RuntimeException("Fail to create PulsarAdmin and/or PulsarClient object from global configuration!");
}
}
/**
* Get Pulsar schema from the definition string
*/
private void createPulsarSchemaFromConf() {
Object value = pulsarNBClientConf.getSchemaConfValue("schema.type");
String schemaType = (value != null) ? value.toString() : "";
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) {
value = pulsarNBClientConf.getSchemaConfValue("schema.definition");
String schemaDefStr = (value != null) ? value.toString() : "";
pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr);
} else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) {
pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType));
} else {
throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " +
"Only primitive type and Avro type are supported at the moment!");
}
}
public PulsarNBClientConf getPulsarConf() { return this.pulsarNBClientConf;}
public String getPulsarSvcUrl() { return this.pulsarSvcUrl;}
public String getWebSvcUrl() { return this.webSvcUrl; }
public PulsarAdmin getPulsarAdmin() { return this.pulsarAdmin; }
public PulsarClient getPulsarClient() { return this.pulsarClient; }
public Schema<?> getPulsarSchema() { return pulsarSchema; }
public Counter getBytesCounter() { return bytesCounter; }
public Histogram getMessageSizeHistogram() { return messageSizeHistogram; }
public Timer getBindTimer() { return bindTimer; }
public Timer getExecuteTimer() { return this.executeTimer; }
public Timer getCreateTransactionTimer() { return createTransactionTimer; }
public Timer getCommitTransactionTimer() { return commitTransactionTimer; }
public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; }
}

View File

@ -7,7 +7,6 @@ import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -17,6 +16,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -39,40 +39,36 @@ public class PulsarSpace {
private final static Logger logger = LogManager.getLogger(PulsarSpace.class);
private final String spaceName;
private final ConcurrentHashMap<String, Producer<?>> producers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumers = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Reader<?>> readers = new ConcurrentHashMap<>();
private final String spaceName;
private final PulsarActivity pulsarActivity;
private final ActivityDef activityDef;
private final PulsarNBClientConf pulsarNBClientConf;
private final String pulsarSvcUrl;
private final String webSvcUrl;
private final PulsarAdmin pulsarAdmin;
private final PulsarClient pulsarClient;
private final Schema<?> pulsarSchema;
private final Set<String> pulsarClusterMetadata = new HashSet<>();
private final Timer createTransactionTimer;
private final Set<String> pulsarClusterMetadata = new HashSet<>();
private PulsarClient pulsarClient = null;
private Schema<?> pulsarSchema = null;
private final ActivityDef activityDef;
public PulsarSpace(String name,
PulsarNBClientConf pulsarClientConf,
String pulsarSvcUrl,
String webSvcUrl,
PulsarAdmin pulsarAdmin,
ActivityDef activityDef,
Timer createTransactionTimer) {
public PulsarSpace(String name, PulsarActivity pulsarActivity) {
this.spaceName = name;
this.pulsarNBClientConf = pulsarClientConf;
this.pulsarSvcUrl = pulsarSvcUrl;
this.webSvcUrl = webSvcUrl;
this.pulsarAdmin = pulsarAdmin;
this.activityDef = activityDef;
this.createTransactionTimer = createTransactionTimer;
this.pulsarActivity = pulsarActivity;
createPulsarClientFromConf();
createPulsarSchemaFromConf();
this.pulsarNBClientConf = pulsarActivity.getPulsarConf();
this.pulsarSvcUrl = pulsarActivity.getPulsarSvcUrl();
this.webSvcUrl = pulsarActivity.getWebSvcUrl();
this.pulsarAdmin = pulsarActivity.getPulsarAdmin();
this.pulsarClient = pulsarActivity.getPulsarClient();
this.pulsarSchema = pulsarActivity.getPulsarSchema();
this.activityDef = pulsarActivity.getActivityDef();
this.createTransactionTimer = pulsarActivity.getCreateTransactionTimer();
try {
Clusters clusters = pulsarAdmin.clusters();
@ -86,114 +82,111 @@ public class PulsarSpace {
}
}
private void createPulsarClientFromConf() {
ClientBuilder clientBuilder = PulsarClient.builder();
try {
Map<String, Object> clientConf = pulsarNBClientConf.getClientConfMap();
// Override "client.serviceUrl" setting in config.properties
clientConf.remove("serviceUrl");
clientBuilder.loadConf(clientConf).serviceUrl(pulsarSvcUrl);
String authPluginClassName =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label);
String authParams =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label);
String useTlsStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label);
boolean useTls = BooleanUtils.toBoolean(useTlsStr);
String tlsTrustCertsFilePath =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
String tlsAllowInsecureConnectionStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
String tlsHostnameVerificationEnableStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
clientBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
clientBuilder
.useKeyStoreTls(useTls)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
// Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection"
clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarClient = clientBuilder.build();
}
catch (PulsarClientException pce) {
String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage();
logger.error(errMsg);
throw new RuntimeException(errMsg);
}
}
private void createPulsarSchemaFromConf() {
Object value = pulsarNBClientConf.getSchemaConfValue("schema.type");
String schemaType = (value != null) ? value.toString() : "";
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) {
value = pulsarNBClientConf.getSchemaConfValue("schema.definition");
String schemaDefStr = (value != null) ? value.toString() : "";
pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr);
} else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) {
pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType));
} else {
throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " +
"Only primitive type and Avro type are supported at the moment!");
}
}
public PulsarClient getPulsarClient() { return pulsarClient; }
public PulsarNBClientConf getPulsarClientConf() {
return pulsarNBClientConf;
}
public Schema<?> getPulsarSchema() {
return pulsarSchema;
}
public PulsarNBClientConf getPulsarClientConf() { return pulsarNBClientConf; }
public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; }
public PulsarClient getPulsarClient() { return pulsarClient; }
public Schema<?> getPulsarSchema() { return pulsarSchema; }
public String getPulsarSvcUrl() { return pulsarSvcUrl;}
public String getWebSvcUrl() { return webSvcUrl; }
public Set<String> getPulsarClusterMetadata() { return pulsarClusterMetadata; }
public String getPulsarSvcUrl() {
return pulsarSvcUrl;
// Properly shut down all Pulsar objects (producers, consumers, etc.) that are associated with this space
public void shutdownPulsarSpace() {
try {
for (Producer<?> producer : producers.values()) {
if (producer != null) producer.close();
}
for (Consumer<?> consumer : consumers.values()) {
if (consumer != null) consumer.close();
}
for (Reader<?> reader : readers.values()) {
if (reader != null) reader.close();
}
if (pulsarAdmin != null) pulsarAdmin.close();
if (pulsarClient != null) pulsarClient.close();
}
catch (Exception e) {
throw new RuntimeException("Unexpected error when closing Pulsar objects!");
}
}
public String getWebSvcUrl() { return webSvcUrl; }
/**
* Get a proper Pulsar API metrics prefix depending on the API type
*
* @param apiType - Pulsar API type: producer, consumer, reader, etc.
* @param apiObjName - actual name of a producer, a consumer, a reader, etc.
* @param topicName - topic name
* @return String
*/
private String getPulsarAPIMetricsPrefix(String apiType, String apiObjName, String topicName) {
String apiMetricsPrefix;
if (!PulsarActivityUtil.isValidPulsarApiType(apiType)) {
throw new RuntimeException(
"Incorrect Pulsar API type. Valid type list: " + PulsarActivityUtil.getValidPulsarApiTypeList());
}
if (!StringUtils.isBlank(apiObjName)) {
apiMetricsPrefix = apiObjName + "_";
}
else {
// we want a meaningful name for the API object (producer, consumer, reader, etc.)
// we are not appending the topic name
apiMetricsPrefix = apiType;
if (apiType.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label))
apiMetricsPrefix += producers.size();
else if (apiType.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.CONSUMER.label))
apiMetricsPrefix += consumers.size();
else if (apiType.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.READER.label))
apiMetricsPrefix += readers.size();
apiMetricsPrefix += "_";
}
apiMetricsPrefix += topicName + "_";
apiMetricsPrefix = apiMetricsPrefix
// default name for tests/demos (in all Pulsar examples) is persistent://public/default/test -> use just the topic name test
.replace("persistent://public/default/", "")
// always remove topic type
.replace("non-persistent://", "")
.replace("persistent://", "")
// persistent://tenant/namespace/topicname -> tenant_namespace_topicname
.replace("/","_");
return apiMetricsPrefix;
}
public Set<String> getPulsarClusterMetadata() { return pulsarClusterMetadata; }
//////////////////////////////////////
// Producer Processing --> start
//////////////////////////////////////
// Topic name IS mandatory
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerTopicName(String cycleTopicName) {
if (!StringUtils.isBlank(cycleTopicName)) {
return cycleTopicName;
//
private static class ProducerGaugeImpl implements Gauge<Object> {
private final Producer<?> producer;
private final Function<ProducerStats, Object> valueExtractor;
ProducerGaugeImpl(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
this.producer = producer;
this.valueExtractor = valueExtractor;
}
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
if (!StringUtils.isBlank(globalTopicName)) {
return globalTopicName;
@Override
public Object getValue() {
// see Pulsar bug https://github.com/apache/pulsar/issues/10100
// we need to synchronize on producer otherwise we could receive corrupted data
synchronized(producer) {
return valueExtractor.apply(producer.getStats());
}
}
throw new RuntimeException(" topic name must be set at either global level or cycle level!");
}
static Gauge<Object> producerSafeExtractMetric(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
return new ProducerGaugeImpl(producer, valueExtractor);
}
// Producer name is NOT mandatory
@ -212,7 +205,6 @@ public class PulsarSpace {
return "";
}
public Supplier<Transaction> getTransactionSupplier() {
PulsarClient pulsarClient = getPulsarClient();
return () -> {
@ -233,8 +225,20 @@ public class PulsarSpace {
};
}
private static String buildCacheKey(String... keyParts) {
return String.join("::", keyParts);
// Topic name IS mandatory
// - It must be set at either global level or cycle level
// - If set at both levels, cycle level setting takes precedence
private String getEffectiveProducerTopicName(String cycleTopicName) {
if (!StringUtils.isBlank(cycleTopicName)) {
return cycleTopicName;
}
String globalTopicName = pulsarNBClientConf.getProducerTopicName();
if (!StringUtils.isBlank(globalTopicName)) {
return globalTopicName;
}
throw new RuntimeException("Producer topic name must be set at either global level or cycle level!");
}
public Producer<?> getProducer(String cycleTopicName, String cycleProducerName) {
@ -242,10 +246,10 @@ public class PulsarSpace {
String producerName = getEffectiveProducerName(cycleProducerName);
if (StringUtils.isBlank(topicName)) {
throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level");
throw new RuntimeException("Producer:: must specify a topic name");
}
String producerCacheKey = buildCacheKey(producerName, topicName);
String producerCacheKey = PulsarActivityUtil.buildCacheKey(producerName, topicName);
Producer<?> producer = producers.get(producerCacheKey);
if (producer == null) {
@ -253,37 +257,47 @@ public class PulsarSpace {
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
String producerMetricsPrefix;
if (!StringUtils.isBlank(producerName)) {
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
producerMetricsPrefix = producerName + "_";
} else {
// we want a meaningful name for the producer
// we are not appending the topic name
producerMetricsPrefix = "producer" + producers.size() + "_" ;
}
// Remove global level settings: "topicName" and "producerName"
producerConf.remove(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label);
producerConf.remove(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label);
producerMetricsPrefix += topicName + "_";
producerMetricsPrefix = producerMetricsPrefix
.replace("persistent://public/default/", "") // default name for tests/demos (in all Pulsar examples) is persistent://public/default/test -> use just the topic name test
.replace("non-persistent://", "") // always remove topic type
.replace("persistent://", "")
.replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname
String producerMetricsPrefix = getPulsarAPIMetricsPrefix(
PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label,
producerName,
topicName);
try {
ProducerBuilder<?> producerBuilder = pulsarClient.newProducer(pulsarSchema);
producerBuilder.loadConf(producerConf);
ProducerBuilder<?> producerBuilder = pulsarClient.
newProducer(pulsarSchema).
loadConf(producerConf).
topic(topicName);
if (!StringUtils.isAnyBlank(producerName)) {
producerBuilder = producerBuilder.producerName(producerName);
}
producer = producerBuilder.create();
producers.put(producerCacheKey, producer);
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent", safeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalsendfailed", safeExtractMetric(producer, (s -> s.getTotalSendFailed() + s.getNumSendFailed())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalacksreceived", safeExtractMetric(producer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendbytesrate", safeExtractMetric(producer, ProducerStats::getSendBytesRate));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendmsgsrate", safeExtractMetric(producer, ProducerStats::getSendMsgsRate));
ActivityMetrics.gauge(activityDef,
producerMetricsPrefix + "total_bytes_sent",
producerSafeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(activityDef,
producerMetricsPrefix + "total_msg_sent",
producerSafeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
ActivityMetrics.gauge(activityDef,
producerMetricsPrefix + "total_send_failed",
producerSafeExtractMetric(producer, (s -> s.getTotalSendFailed() + s.getNumSendFailed())));
ActivityMetrics.gauge(activityDef,
producerMetricsPrefix + "total_ack_received",
producerSafeExtractMetric(producer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived())));
ActivityMetrics.gauge(activityDef,
producerMetricsPrefix + "send_bytes_rate",
producerSafeExtractMetric(producer, ProducerStats::getSendBytesRate));
ActivityMetrics.gauge(activityDef,
producerMetricsPrefix + "send_msg_rate",
producerSafeExtractMetric(producer, ProducerStats::getSendMsgsRate));
}
catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a Pulsar producer!", ple);
@ -292,30 +306,7 @@ public class PulsarSpace {
return producer;
}
static Gauge<Object> safeExtractMetric(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
return new GaugeImpl(producer, valueExtractor);
}
private static class GaugeImpl implements Gauge<Object> {
private final Producer<?> producer;
private final Function<ProducerStats, Object> valueExtractor;
GaugeImpl(Producer<?> producer, Function<ProducerStats, Object> valueExtractor) {
this.producer = producer;
this.valueExtractor = valueExtractor;
}
@Override
public Object getValue() {
// see Pulsar bug https://github.com/apache/pulsar/issues/10100
// we need to synchronize on producer otherwise we could receive corrupted data
synchronized(producer) {
return valueExtractor.apply(producer.getStats());
}
}
}
//
//////////////////////////////////////
// Producer Processing <-- end
//////////////////////////////////////
@ -324,59 +315,28 @@ public class PulsarSpace {
//////////////////////////////////////
// Consumer Processing --> start
//////////////////////////////////////
private String getEffectiveTopicNamesStr(String cycleTopicNames) {
if (!StringUtils.isBlank(cycleTopicNames)) {
return cycleTopicNames;
//
private static class ConsumerGaugeImpl implements Gauge<Object> {
private final Consumer<?> consumer;
private final Function<ConsumerStats, Object> valueExtractor;
ConsumerGaugeImpl(Consumer<?> consumer, Function<ConsumerStats, Object> valueExtractor) {
this.consumer = consumer;
this.valueExtractor = valueExtractor;
}
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
if (!StringUtils.isBlank(globalTopicNames)) {
return globalTopicNames;
@Override
public Object getValue() {
// see Pulsar bug https://github.com/apache/pulsar/issues/10100
// - this is a bug report for producer stats.
// - assume this also applies to consumer stats.
synchronized(consumer) {
return valueExtractor.apply(consumer.getStats());
}
}
return "";
}
private List<String> getEffectiveTopicNames(String cycleTopicNames) {
String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if (!StringUtils.isBlank(name))
effectiveTopicNameList.add(name.trim());
}
return effectiveTopicNameList;
}
private String getEffectiveTopicPatternStr(String cycleTopicsPattern) {
if (!StringUtils.isBlank(cycleTopicsPattern)) {
return cycleTopicsPattern;
}
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
if (!StringUtils.isBlank(globalTopicsPattern)) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) {
String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern;
try {
if (!StringUtils.isBlank(effectiveTopicsPatternStr))
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
else
topicsPattern = null;
} catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
static Gauge<Object> consumerSafeExtractMetric(Consumer<?> consumer, Function<ConsumerStats, Object> valueExtractor) {
return new ConsumerGaugeImpl(consumer, valueExtractor);
}
private String getEffectiveSubscriptionName(String cycleSubscriptionName) {
@ -404,7 +364,6 @@ public class PulsarSpace {
return "";
}
private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) {
String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType);
SubscriptionType subscriptionType = SubscriptionType.Exclusive;
@ -434,78 +393,79 @@ public class PulsarSpace {
return "";
}
public Consumer<?> getConsumer(String cycleTopicUri,
String cycleTopicNames,
String cycleTopicsPattern,
public Consumer<?> getConsumer(String cycleTopicName,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName) {
List<String> topicNames = getEffectiveTopicNames(cycleTopicNames);
String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if (StringUtils.isBlank(cycleTopicUri) && topicNames.isEmpty() && (topicsPattern == null)) {
throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
if ( subscriptionType.equals(SubscriptionType.Exclusive) && (activityDef.getThreads() > 1) ) {
throw new RuntimeException("Consumer:: trying to create multiple consumers of " +
"\"Exclusive\" subscription type under the same subscription name to the same topic!");
}
String consumerCacheKey;
// precedence sequence:
// topic_names (consumer statement param) >
// topics_pattern (consumer statement param) >
// topic_uri (document level param)
if (!topicNames.isEmpty()) {
consumerCacheKey = buildCacheKey(
consumerName,
subscriptionName,
String.join("|", topicNames));
} else if (topicsPattern != null) {
consumerCacheKey = buildCacheKey(
consumerName,
subscriptionName,
topicsPatternStr);
} else {
consumerCacheKey = buildCacheKey(
consumerName,
subscriptionName,
cycleTopicUri);
if (StringUtils.isAnyBlank(cycleTopicName, subscriptionName)) {
throw new RuntimeException("Consumer:: must specify a topic name and a subscription name");
}
String consumerCacheKey = PulsarActivityUtil.buildCacheKey(consumerName, subscriptionName, cycleTopicName);
Consumer<?> consumer = consumers.get(consumerCacheKey);
if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
// Get other possible consumer settings that are set at global level
Map<String, Object> consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap());
consumerConf.remove("timeout");
// Explicit topic names will take precedence over topics pattern
if (!topicNames.isEmpty()) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames);
} else if (topicsPattern != null) {
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.put(
PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label,
getEffectiveTopicPattern(cycleTopicsPattern));
} else {
topicNames.add(cycleTopicUri);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames);
}
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName);
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType);
if (!StringUtils.isBlank(consumerName)) {
consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName);
}
// Remove global level settings:
// - "topicNames", "topicsPattern", "subscriptionName", "subscriptionType", "consumerName"
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
// Remove non-standard consumer configuration properties
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
try {
consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe();
ConsumerBuilder<?> consumerBuilder = pulsarClient.
newConsumer(pulsarSchema).
loadConf(consumerConf).
topic(cycleTopicName).
subscriptionName(subscriptionName).
subscriptionType(subscriptionType);
if (!StringUtils.isBlank(consumerName)) {
consumerBuilder = consumerBuilder.consumerName(consumerName);
}
consumer = consumerBuilder.subscribe();
String consumerMetricsPrefix = getPulsarAPIMetricsPrefix(
PulsarActivityUtil.PULSAR_API_TYPE.CONSUMER.label,
consumerName,
cycleTopicName);
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "total_bytes_recv",
consumerSafeExtractMetric(consumer, (s -> s.getTotalBytesReceived() + s.getNumBytesReceived())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "total_msg_recv",
consumerSafeExtractMetric(consumer, (s -> s.getTotalMsgsReceived() + s.getNumMsgsReceived())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "total_recv_failed",
consumerSafeExtractMetric(consumer, (s -> s.getTotalReceivedFailed() + s.getNumReceiveFailed())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "total_acks_sent",
consumerSafeExtractMetric(consumer,(s -> s.getTotalAcksSent() + s.getNumAcksSent())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "recv_bytes_rate",
consumerSafeExtractMetric(consumer, ConsumerStats::getRateBytesReceived));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "recv_msg_rate",
consumerSafeExtractMetric(consumer, ConsumerStats::getRateMsgsReceived));
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar consumer!");
@ -516,11 +476,186 @@ public class PulsarSpace {
return consumer;
}
//
//////////////////////////////////////
// Consumer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Multi-topic Consumer Processing --> start
//////////////////////////////////////
//
private String getEffectiveConsumerTopicNameListStr(String cycleTopicNames) {
if (!StringUtils.isBlank(cycleTopicNames)) {
return cycleTopicNames;
}
String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames();
if (!StringUtils.isBlank(globalTopicNames)) {
return globalTopicNames;
}
return "";
}
private List<String> getEffectiveConsumerTopicNameList(String cycleTopicNames) {
String effectiveTopicNamesStr = getEffectiveConsumerTopicNameListStr(cycleTopicNames);
String[] names = effectiveTopicNamesStr.split("[;,]");
ArrayList<String> effectiveTopicNameList = new ArrayList<>();
for (String name : names) {
if (!StringUtils.isBlank(name))
effectiveTopicNameList.add(name.trim());
}
return effectiveTopicNameList;
}
private String getEffectiveConsumerTopicPatternStr(String cycleTopicsPattern) {
if (!StringUtils.isBlank(cycleTopicsPattern)) {
return cycleTopicsPattern;
}
String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern();
if (!StringUtils.isBlank(globalTopicsPattern)) {
return globalTopicsPattern;
}
return "";
}
private Pattern getEffectiveConsumerTopicPattern(String cycleTopicsPattern) {
String effectiveTopicsPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern;
try {
if (!StringUtils.isBlank(effectiveTopicsPatternStr))
topicsPattern = Pattern.compile(effectiveTopicsPatternStr);
else
topicsPattern = null;
} catch (PatternSyntaxException pse) {
topicsPattern = null;
}
return topicsPattern;
}
public Consumer<?> getMultiTopicConsumer(
String cycleTopicUri,
String cycleTopicNameList,
String cycleTopicsPattern,
String cycleSubscriptionName,
String cycleSubscriptionType,
String cycleConsumerName) {
List<String> topicNameList = getEffectiveConsumerTopicNameList(cycleTopicNameList);
String topicsPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicsPattern);
Pattern topicsPattern = getEffectiveConsumerTopicPattern(cycleTopicsPattern);
String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName);
SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType);
String consumerName = getEffectiveConsumerName(cycleConsumerName);
if ( subscriptionType.equals(SubscriptionType.Exclusive) && (activityDef.getThreads() > 1) ) {
throw new RuntimeException("Consumer:: trying to create multiple consumers of " +
"\"Exclusive\" subscription type under the same subscription name to the same topic!");
}
if (StringUtils.isBlank(cycleTopicUri) && topicNameList.isEmpty() && (topicsPattern == null)) {
throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!");
}
// precedence sequence:
// topic_names (consumer statement param) >
// topics_pattern (consumer statement param) >
// topic_uri (document level param)
String consumerTopicListString;
if (!topicNameList.isEmpty()) {
consumerTopicListString = String.join("|", topicNameList);
} else if (topicsPattern != null) {
consumerTopicListString = topicsPatternStr;
} else {
consumerTopicListString = cycleTopicUri;
}
String consumerCacheKey = PulsarActivityUtil.buildCacheKey(
consumerName,
subscriptionName,
consumerTopicListString);
Consumer<?> consumer = consumers.get(consumerCacheKey);
if (consumer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap());
// Remove global level settings:
// - "topicNameList", "topicsPattern", "subscriptionName", "subscriptionType", "consumerName"
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
// Remove non-standard consumer configuration properties
consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
try {
ConsumerBuilder<?> consumerBuilder = pulsarClient.newConsumer(pulsarSchema).
loadConf(consumerConf).
subscriptionName(subscriptionName).
subscriptionType(subscriptionType).
consumerName(consumerName);
if (!topicNameList.isEmpty()) {
consumerBuilder = consumerBuilder.topics(topicNameList);
} else if (topicsPattern != null) {
consumerBuilder = consumerBuilder.topicsPattern(topicsPattern);
} else {
consumerBuilder = consumerBuilder.topic(cycleTopicUri);
}
consumer = consumerBuilder.subscribe();
String consumerMetricsPrefix = getPulsarAPIMetricsPrefix(
PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label,
consumerName,
consumerTopicListString);
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "totalBytesRecvd",
consumerSafeExtractMetric(consumer, (s -> s.getTotalBytesReceived() + s.getNumBytesReceived())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "totalMsgsRecvd",
consumerSafeExtractMetric(consumer, (s -> s.getTotalMsgsReceived() + s.getNumMsgsReceived())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "totalRecvdFailed",
consumerSafeExtractMetric(consumer, (s -> s.getTotalReceivedFailed() + s.getNumReceiveFailed())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "totalAcksSent",
consumerSafeExtractMetric(consumer,(s -> s.getTotalAcksSent() + s.getNumAcksSent())));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "recvdBytesRate",
consumerSafeExtractMetric(consumer, ConsumerStats::getRateBytesReceived));
ActivityMetrics.gauge(activityDef,
consumerMetricsPrefix + "recvdMsgsRate",
consumerSafeExtractMetric(consumer, ConsumerStats::getRateMsgsReceived));
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar consumer!");
}
consumers.put(consumerCacheKey, consumer);
}
return consumer;
}
//
//////////////////////////////////////
// Multi-topic Consumer Processing <-- end
//////////////////////////////////////
//////////////////////////////////////
// Reader Processing --> Start
//////////////////////////////////////
@ -534,7 +669,7 @@ public class PulsarSpace {
return globalReaderTopicName;
}
throw new RuntimeException("Reader topic name must be set at either global level or cycle level!");
throw new RuntimeException("Reader:: Reader topic name must be set at either global level or cycle level!");
}
private String getEffectiveReaderName(String cycleReaderName) {
@ -568,35 +703,32 @@ public class PulsarSpace {
String cycleStartMsgPos) {
String topicName = getEffectiveReaderTopicName(cycleTopicName);
if (StringUtils.isBlank(topicName)) {
throw new RuntimeException("Reader:: must specify a topic name either at the global level or the cycle level");
}
String readerName = getEffectiveReaderName(cycleReaderName);
String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos);
if (!PulsarActivityUtil.isValideReaderStartPosition(startMsgPosStr)) {
throw new RuntimeException("Reader:: Invalid value for Reader start message position!");
throw new RuntimeException("Reader:: Invalid value for reader start message position!");
}
String readerCacheKey = buildCacheKey(topicName, readerName, startMsgPosStr);
String readerCacheKey = PulsarActivityUtil.buildCacheKey(topicName, readerName, startMsgPosStr);
Reader<?> reader = readers.get(readerCacheKey);
if (reader == null) {
PulsarClient pulsarClient = getPulsarClient();
Map<String, Object> readerConf = pulsarNBClientConf.getReaderConfMap();
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName);
if (!StringUtils.isBlank(readerName)) {
readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName);
}
// "reader.startMessagePos" is NOT a standard Pulsar reader conf
// Remove global level settings: "topicName" and "readerName"
readerConf.remove(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.label);
readerConf.remove(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.label);
// Remove non-standard reader configuration properties
readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
try {
ReaderBuilder<?> readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf);
ReaderBuilder<?> readerBuilder = pulsarClient.
newReader(pulsarSchema).
loadConf(readerConf).
topic(topicName).
readerName(readerName);
MessageId startMsgId = MessageId.latest;
if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) {
@ -607,11 +739,8 @@ public class PulsarSpace {
// startMsgId = MessageId.latest;
//}
if (startMsgId != null) {
readerBuilder = readerBuilder.startMessageId(startMsgId);
}
reader = readerBuilder.startMessageId(startMsgId).create();
reader = readerBuilder.create();
} catch (PulsarClientException ple) {
ple.printStackTrace();
throw new RuntimeException("Unable to create a Pulsar reader!");

View File

@ -20,20 +20,17 @@ public class PulsarSpaceCache {
this.activity = pulsarActivity;
}
public PulsarSpace getPulsarSpace(String name) {
return clientScopes.computeIfAbsent(name, spaceName ->
new PulsarSpace(
spaceName,
activity.getPulsarConf(),
activity.getPulsarSvcUrl(),
activity.getWebSvcUrl(),
activity.getPulsarAdmin(),
activity.getActivityDef(),
activity.getCreateTransactionTimer()
));
public Iterable<PulsarSpace> getAssociatedPulsarSpace() {
return clientScopes.values();
}
public PulsarActivity getActivity() {
public PulsarActivity getAssociatedPulsarActivity() {
return activity;
}
public PulsarSpace getPulsarSpace(String name) {
return clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, activity));
}
public PulsarActivity getActivity() { return activity; }
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
@ -21,9 +22,10 @@ public abstract class PulsarAdminMapper extends PulsarOpMapper {
protected PulsarAdminMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc) {
super(cmdTpl, clientSpace, asyncApiFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc);
this.adminDelOpFunc = adminDelOpFunc;
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
@ -21,11 +22,12 @@ public class PulsarAdminNamespaceMapper extends PulsarAdminMapper {
public PulsarAdminNamespaceMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc,
LongFunction<String> namespaceFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, adminDelOpFunc);
this.namespaceFunc = namespaceFunc;
}

View File

@ -1,17 +1,8 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public abstract class PulsarAdminOp extends SyncPulsarOp {

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
@ -23,13 +24,14 @@ public class PulsarAdminTenantMapper extends PulsarAdminMapper {
public PulsarAdminTenantMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc,
LongFunction<Set<String>> adminRolesFunc,
LongFunction<Set<String>> allowedClustersFunc,
LongFunction<String> tenantFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, adminDelOpFunc);
this.adminRolesFunc = adminRolesFunc;
this.allowedClustersFunc = allowedClustersFunc;
this.tenantFunc = tenantFunc;

View File

@ -8,8 +8,7 @@ import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantInfo;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -45,15 +44,10 @@ public class PulsarAdminTenantOp extends PulsarAdminOp {
// Admin API - create tenants and namespaces
if (!adminDelOp) {
TenantInfoImpl tenantInfo = TenantInfoImpl.builder().build();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {
tenantInfo.setAllowedClusters(allowedClusterSet);
} else {
tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata());
}
TenantInfo tenantInfo = TenantInfo.builder()
.adminRoles(adminRoleSet)
.allowedClusters(!allowedClusterSet.isEmpty() ? allowedClusterSet : clientSpace.getPulsarClusterMetadata())
.build();
try {
if (!asyncApi) {

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.BooleanUtils;
@ -24,13 +25,14 @@ public class PulsarAdminTopicMapper extends PulsarAdminMapper {
public PulsarAdminTopicMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> adminDelOpFunc,
LongFunction<String> topicUriFunc,
LongFunction<String> enablePartionFunc,
LongFunction<String> partitionNumFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, adminDelOpFunc);
this.topicUriFunc = topicUriFunc;
this.enablePartionFunc = enablePartionFunc;
this.partitionNumFunc = partitionNumFunc;

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
@ -9,9 +10,10 @@ public class PulsarBatchProducerEndMapper extends PulsarOpMapper {
public PulsarBatchProducerEndMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc);
}
@Override

View File

@ -24,7 +24,8 @@ public class PulsarBatchProducerEndOp extends SyncPulsarOp {
container.clear();
PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.set(null);
} else {
}
else {
throw new BasicError("You tried to end an empty batch message container. This means you" +
" did initiate the batch container properly, or there is an error in your" +
" pulsar op sequencing and ratios.");

View File

@ -1,22 +1,35 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
public class PulsarBatchProducerMapper extends PulsarOpMapper {
private final static Logger logger = LogManager.getLogger(PulsarBatchProducerMapper.class);
private final LongFunction<String> keyFunc;
private final LongFunction<String> propFunc;
private final LongFunction<String> payloadFunc;
public PulsarBatchProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<String> keyFunc,
LongFunction<String> propFunc,
LongFunction<String> payloadFunc) {
super(cmdTpl, clientSpace, asyncApiFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc);
this.keyFunc = keyFunc;
this.propFunc = propFunc;
this.payloadFunc = payloadFunc;
}
@ -25,9 +38,24 @@ public class PulsarBatchProducerMapper extends PulsarOpMapper {
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
// Check if msgPropJonStr is valid JSON string with a collection of key/value pairs
// - if Yes, convert it to a map
// - otherwise, log an error message and ignore message properties without throwing a runtime exception
Map<String, String> msgProperties = new HashMap<>();
String msgPropJsonStr = propFunc.apply(value);
try {
msgProperties = PulsarActivityUtil.convertJsonToMap(msgPropJsonStr);
}
catch (Exception e) {
logger.error(
"PulsarProducerMapper:: Error parsing message property JSON string {}, ignore message properties!",
msgPropJsonStr);
}
return new PulsarBatchProducerOp(
clientSpace.getPulsarSchema(),
msgKey,
msgProperties,
msgPayload
);
}

View File

@ -12,23 +12,26 @@ import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerOp extends SyncPulsarOp {
private final Schema<?> pulsarSchema;
private final String msgKey;
private final Map<String, String> msgProperties;
private final String msgPayload;
public PulsarBatchProducerOp(Schema<?> schema,
String key,
Map<String, String> msgProperties,
String payload) {
this.pulsarSchema = schema;
this.msgKey = key;
this.msgProperties = msgProperties;
this.msgPayload = payload;
}
@Override
public void run() {
if ((msgPayload == null) || msgPayload.isEmpty()) {
@ -43,6 +46,9 @@ public class PulsarBatchProducerOp extends SyncPulsarOp {
if ((msgKey != null) && (!msgKey.isEmpty())) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
if (!msgProperties.isEmpty()) {
typedMessageBuilder = typedMessageBuilder.properties(msgProperties);
}
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
@ -12,9 +13,10 @@ public class PulsarBatchProducerStartMapper extends PulsarOpMapper {
public PulsarBatchProducerStartMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Producer<?>> batchProducerFunc) {
super(cmdTpl, clientSpace, asyncApiFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc);
this.batchProducerFunc = batchProducerFunc;
}

View File

@ -3,8 +3,11 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
@ -22,30 +25,25 @@ import java.util.function.Supplier;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarConsumerMapper extends PulsarOpMapper {
public class PulsarConsumerMapper extends PulsarTransactOpMapper {
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
private final LongFunction<Consumer<?>> consumerFunc;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final LongFunction<Boolean> useTransactionFunc;
private final LongFunction<Supplier<Transaction>> transactionSupplierFunc;
private final Timer transactionCommitTimer;
private final boolean e2eMsProc;
public PulsarConsumerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Consumer<?>> consumerFunc,
Counter bytesCounter,
Histogram messagesizeHistogram,
Timer transactionCommitTimer,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc) {
super(cmdTpl, clientSpace, asyncApiFunc);
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
LongFunction<Consumer<?>> consumerFunc,
boolean e2eMsgProc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.consumerFunc = consumerFunc;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.transactionCommitTimer = transactionCommitTimer;
this.useTransactionFunc = useTransactionFunc;
this.transactionSupplierFunc = transactionSupplierFunc;
this.e2eMsProc = e2eMsgProc;
}
@Override
@ -53,18 +51,19 @@ public class PulsarConsumerMapper extends PulsarOpMapper {
Consumer<?> consumer = consumerFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
boolean useTransaction = useTransactionFunc.apply(value);
boolean seqTracking = seqTrackingFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
return new PulsarConsumerOp(
pulsarActivity,
asyncApi,
useTransaction,
seqTracking,
transactionSupplier,
consumer,
clientSpace.getPulsarSchema(),
asyncApi,
clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(),
bytesCounter,
messagesizeHistogram,
useTransaction,
transactionSupplier,
transactionCommitTimer
);
value,
e2eMsProc);
}
}

View File

@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
@ -11,106 +12,260 @@ import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public class PulsarConsumerOp extends SyncPulsarOp {
public class PulsarConsumerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
private final PulsarActivity pulsarActivity;
private final boolean asyncPulsarOp;
private final boolean useTransaction;
private final boolean seqTracking;
private final Supplier<Transaction> transactionSupplier;
private final Consumer<?> consumer;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;
private final int timeoutSeconds;
private final boolean e2eMsgProc;
private final long curCycleNum;
private long curMsgSeqId;
private long prevMsgSeqid;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final boolean useTransaction;
private final Supplier<Transaction> transactionSupplier;
private final Histogram messageSizeHistogram;
private final Timer transactionCommitTimer;
public PulsarConsumerOp(Consumer<?> consumer, Schema<?> schema, boolean asyncPulsarOp, int timeoutSeconds,
Counter bytesCounter,
Histogram messagesizeHistogram,
boolean useTransaction,
Supplier<Transaction> transactionSupplier,
Timer transactionCommitTimer) {
// keep track of end-to-end message latency
private final Histogram e2eMsgProcLatencyHistogram;
public PulsarConsumerOp(
PulsarActivity pulsarActivity,
boolean asyncPulsarOp,
boolean useTransaction,
boolean seqTracking,
Supplier<Transaction> transactionSupplier,
Consumer<?> consumer,
Schema<?> schema,
int timeoutSeconds,
long curCycleNum,
boolean e2eMsgProc)
{
this.pulsarActivity = pulsarActivity;
this.asyncPulsarOp = asyncPulsarOp;
this.useTransaction = useTransaction;
this.seqTracking = seqTracking;
this.transactionSupplier = transactionSupplier;
this.consumer = consumer;
this.pulsarSchema = schema;
this.asyncPulsarOp = asyncPulsarOp;
this.timeoutSeconds = timeoutSeconds;
this.bytesCounter = bytesCounter;
this.messagesizeHistogram = messagesizeHistogram;
this.useTransaction = useTransaction;
this.transactionSupplier = transactionSupplier;
this.transactionCommitTimer = transactionCommitTimer;
}
this.curCycleNum = curCycleNum;
this.e2eMsgProc = e2eMsgProc;
public void syncConsume() {
try {
Message<?> message;
if (timeoutSeconds <= 0) {
// wait forever
message = consumer.receive();
} else {
// we cannot use Consumer#receive(timeout, timeunit) due to
// https://github.com/apache/pulsar/issues/9921
message = consumer
.receiveAsync()
.get(timeoutSeconds, TimeUnit.SECONDS);
}
this.curMsgSeqId = 0;
this.prevMsgSeqid = 0;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
if (logger.isDebugEnabled()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("msg-key={} msg-payload={}", message.getKey(), avroGenericRecord.toString());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData()));
}
}
int messagesize = message.getData().length;
bytesCounter.inc(messagesize);
messagesizeHistogram.update(messagesize);
if (useTransaction) {
Transaction transaction = transactionSupplier.get();
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
} else{
consumer.acknowledge(message.getMessageId());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void asyncConsume() {
//TODO: add support for async consume
this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram();
}
@Override
public void run() {
if (!asyncPulsarOp)
syncConsume();
else
asyncConsume();
public void run(Runnable timeTracker) {
final Transaction transaction;
if (useTransaction) {
// if you are in a transaction you cannot set the schema per-message
transaction = transactionSupplier.get();
}
else {
transaction = null;
}
if (!asyncPulsarOp) {
Message<?> message;
try {
if (timeoutSeconds <= 0) {
// wait forever
message = consumer.receive();
}
else {
// we cannot use Consumer#receive(timeout, timeunit) due to
// https://github.com/apache/pulsar/issues/9921
message = consumer
.receiveAsync()
.get(timeoutSeconds, TimeUnit.SECONDS);
}
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
}
// keep track end-to-end message processing latency
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
if (e2eMsgProc) {
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message ordering and message loss
if (seqTracking) {
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
curMsgSeqId = Long.parseLong(msgSeqIdStr);
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqid) == 1) {
prevMsgSeqid = curMsgSeqId;
}
else {
// abnormal case: out of ordering
if (curMsgSeqId < prevMsgSeqid) {
throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!");
}
// abnormal case: message loss
else if ( (curMsgSeqId - prevMsgSeqid) > 1 ) {
throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!");
}
}
}
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (useTransaction) {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
else {
consumer.acknowledge(message.getMessageId());
}
}
catch (Exception e) {
logger.error(
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds);
throw new RuntimeException(e);
}
}
else {
try {
CompletableFuture<? extends Message<?>> msgRecvFuture = consumer.receiveAsync();
if (useTransaction) {
// add commit step
msgRecvFuture = msgRecvFuture.thenCompose(msg -> {
Timer.Context ctx = transactionCommitTimer.time();
return transaction
.commit()
.whenComplete((m,e) -> ctx.close())
.thenApply(v-> msg);
}
);
}
msgRecvFuture.whenComplete((message, error) -> {
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})",
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})",
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
}
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
if (e2eMsgProc) {
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message ordering and message loss
if (seqTracking) {
String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID);
curMsgSeqId = Long.parseLong(msgSeqIdStr);
// normal case: message sequence id is monotonically increasing by 1
if ((curMsgSeqId - prevMsgSeqid) == 1) {
prevMsgSeqid = curMsgSeqId;
} else {
// abnormal case: out of ordering
if (curMsgSeqId < prevMsgSeqid) {
throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!");
}
// abnormal case: message loss
else if ((curMsgSeqId - prevMsgSeqid) > 1) {
throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!");
}
}
}
if (useTransaction) {
consumer.acknowledgeAsync(message.getMessageId(), transaction);
}
else {
consumer.acknowledgeAsync(message);
}
timeTracker.run();
}).exceptionally(ex -> {
pulsarActivity.asyncOperationFailed(ex);
return null;
});
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -1,23 +1,29 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public abstract class PulsarOpMapper implements LongFunction<PulsarOp> {
protected final CommandTemplate cmdTpl;
protected final PulsarSpace clientSpace;
protected final PulsarActivity pulsarActivity;
protected final LongFunction<Boolean> asyncApiFunc;
public PulsarOpMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc)
{
this.cmdTpl = cmdTpl;
this.clientSpace = clientSpace;
this.pulsarActivity = pulsarActivity;
this.asyncApiFunc = asyncApiFunc;
}
}

View File

@ -1,14 +1,20 @@
package io.nosqlbench.driver.pulsar.ops;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.LongFunction;
import java.util.function.Supplier;
@ -22,49 +28,98 @@ import java.util.function.Supplier;
*
* For additional parameterization, the command template is also provided.
*/
public class PulsarProducerMapper extends PulsarOpMapper {
public class PulsarProducerMapper extends PulsarTransactOpMapper {
private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class);
private final LongFunction<Producer<?>> producerFunc;
private final LongFunction<String> seqErrSimuTypeFunc;
private final LongFunction<String> keyFunc;
private final LongFunction<String> propFunc;
private final LongFunction<String> payloadFunc;
private final PulsarActivity pulsarActivity;
private final LongFunction<Boolean> useTransactionFunc;
private final LongFunction<Supplier<Transaction>> transactionSupplierFunc;
public PulsarProducerMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Producer<?>> producerFunc,
LongFunction<String> keyFunc,
LongFunction<String> payloadFunc,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc,
PulsarActivity pulsarActivity) {
super(cmdTpl, clientSpace, asyncApiFunc);
LongFunction<Producer<?>> producerFunc,
LongFunction<String> seqErrSimuTypeFunc,
LongFunction<String> keyFunc,
LongFunction<String> propFunc,
LongFunction<String> payloadFunc) {
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc);
this.producerFunc = producerFunc;
this.seqErrSimuTypeFunc = seqErrSimuTypeFunc;
this.keyFunc = keyFunc;
this.propFunc = propFunc;
this.payloadFunc = payloadFunc;
this.pulsarActivity = pulsarActivity;
this.useTransactionFunc = useTransactionFunc;
this.transactionSupplierFunc = transactionSupplierFunc;
}
@Override
public PulsarOp apply(long value) {
Producer<?> producer = producerFunc.apply(value);
boolean asyncApi = asyncApiFunc.apply(value);
boolean useTransaction = useTransactionFunc.apply(value);
boolean seqTracking = seqTrackingFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
Producer<?> producer = producerFunc.apply(value);
// Simulate error 10% of the time
float rndVal = RandomUtils.nextFloat(0, 1.0f);
boolean simulationError = (rndVal > 0) && (rndVal < 0.1f);
String seqErrSimuType = seqErrSimuTypeFunc.apply(value);
boolean simulateMsgOutofOrder = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label);
boolean simulateMsgLoss = simulationError &&
!StringUtils.isBlank(seqErrSimuType) &&
StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.DataLoss.label);
String msgKey = keyFunc.apply(value);
String msgPayload = payloadFunc.apply(value);
boolean useTransaction = useTransactionFunc.apply(value);
Supplier<Transaction> transactionSupplier = transactionSupplierFunc.apply(value);
// Check if msgPropJonStr is valid JSON string with a collection of key/value pairs
// - if Yes, convert it to a map
// - otherwise, log an error message and ignore message properties without throwing a runtime exception
Map<String, String> msgProperties = new HashMap<>();
String msgPropJsonStr = propFunc.apply(value);
if (!StringUtils.isBlank(msgPropJsonStr)) {
try {
msgProperties = PulsarActivityUtil.convertJsonToMap(msgPropJsonStr);
} catch (Exception e) {
logger.error(
"Error parsing message property JSON string {}, ignore message properties!",
msgPropJsonStr);
}
}
// Set message sequence tracking property
if (seqTracking) {
if (!simulateMsgOutofOrder) {
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value));
}
else {
int rndmOffset = 2;
if (value > rndmOffset)
msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-rndmOffset));
}
}
return new PulsarProducerOp(
producer,
clientSpace.getPulsarSchema(),
pulsarActivity,
asyncApi,
useTransaction,
transactionSupplier,
producer,
clientSpace.getPulsarSchema(),
msgKey,
msgProperties,
msgPayload,
pulsarActivity
);
simulateMsgLoss);
}
}

View File

@ -6,6 +6,7 @@ import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*;
@ -15,6 +16,7 @@ import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
@ -23,57 +25,87 @@ public class PulsarProducerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarProducerOp.class);
private final Producer<?> producer;
private final Schema<?> pulsarSchema;
private final String msgKey;
private final String msgPayload;
private final boolean asyncPulsarOp;
private final Counter bytesCounter;
private final Histogram messagesizeHistogram;
private final PulsarActivity pulsarActivity;
private final boolean asyncPulsarOp;
private final boolean useTransaction;
private final Supplier<Transaction> transactionSupplier;
public PulsarProducerOp(Producer<?> producer,
Schema<?> schema,
boolean asyncPulsarOp,
boolean useTransaction,
Supplier<Transaction> transactionSupplier,
String key,
String payload,
PulsarActivity pulsarActivity) {
private final Producer<?> producer;
private final Schema<?> pulsarSchema;
private final String msgKey;
private final Map<String, String> msgProperties;
private final String msgPayload;
private final boolean simulateMsgLoss;
private final Counter bytesCounter;
private final Histogram messageSizeHistogram;
private final Timer transactionCommitTimer;
public PulsarProducerOp( PulsarActivity pulsarActivity,
boolean asyncPulsarOp,
boolean useTransaction,
Supplier<Transaction> transactionSupplier,
Producer<?> producer,
Schema<?> schema,
String key,
Map<String, String> msgProperties,
String payload,
boolean simulateMsgLoss) {
this.pulsarActivity = pulsarActivity;
this.asyncPulsarOp = asyncPulsarOp;
this.useTransaction = useTransaction;
this.transactionSupplier = transactionSupplier;
this.producer = producer;
this.pulsarSchema = schema;
this.msgKey = key;
this.msgProperties = msgProperties;
this.msgPayload = payload;
this.asyncPulsarOp = asyncPulsarOp;
this.pulsarActivity = pulsarActivity;
this.simulateMsgLoss = simulateMsgLoss;
this.bytesCounter = pulsarActivity.getBytesCounter();
this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram();
this.useTransaction = useTransaction;
this.transactionSupplier = transactionSupplier;
this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram();
this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer();
}
@Override
public void run(Runnable timeTracker) {
// Skip this cycle (without sending messages) if we're doing message loss simulation
if (simulateMsgLoss) {
return;
}
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
TypedMessageBuilder typedMessageBuilder;
final Transaction transaction;
if (useTransaction) {
// if you are in a transaction you cannot set the schema per-message
transaction = transactionSupplier.get();
typedMessageBuilder = producer.newMessage(transaction);
} else {
}
else {
transaction = null;
typedMessageBuilder = producer.newMessage(pulsarSchema);
}
if ((msgKey != null) && (!msgKey.isEmpty())) {
// set message key
if (!StringUtils.isBlank(msgKey)) {
typedMessageBuilder = typedMessageBuilder.key(msgKey);
}
int messagesize;
// set message properties
if ( !msgProperties.isEmpty() ) {
typedMessageBuilder = typedMessageBuilder.properties(msgProperties);
}
// set message payload
int messageSize;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
@ -83,55 +115,110 @@ public class PulsarProducerOp implements PulsarOp {
);
typedMessageBuilder = typedMessageBuilder.value(payload);
// TODO: add a way to calculate the message size for AVRO messages
messagesize = msgPayload.length();
messageSize = msgPayload.length();
} else {
byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8);
typedMessageBuilder = typedMessageBuilder.value(array);
messagesize = array.length;
messageSize = array.length;
}
messagesizeHistogram.update(messagesize);
bytesCounter.inc(messagesize);
messageSizeHistogram.update(messageSize);
bytesCounter.inc(messageSize);
//TODO: add error handling with failed message production
if (!asyncPulsarOp) {
try {
logger.trace("sending message");
logger.trace("Sending message");
typedMessageBuilder.send();
if (useTransaction) {
try (Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();) {
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
} catch (PulsarClientException | ExecutionException | InterruptedException pce) {
logger.trace("failed sending message");
if (logger.isDebugEnabled()) {
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload);
logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={})",
msgKey,
msgProperties,
avroGenericRecord.toString());
}
else {
logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={}",
msgKey,
msgProperties,
msgPayload);
}
}
}
catch (PulsarClientException | ExecutionException | InterruptedException pce) {
logger.trace(
"Sync message sending failed: " +
"key - " + msgKey + "; " +
"properties - " + msgProperties + "; " +
"payload - " + msgPayload);
throw new RuntimeException(pce);
}
timeTracker.run();
} else {
}
else {
try {
// we rely on blockIfQueueIsFull in order to throttle the request in this case
CompletableFuture<?> future = typedMessageBuilder.sendAsync();
if (useTransaction) {
// add commit step
future = future.thenCompose(msg -> {
Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();;
Timer.Context ctx = transactionCommitTimer.time();
return transaction
.commit()
.whenComplete((m,e) -> {
ctx.close();
})
.whenComplete((m,e) -> ctx.close())
.thenApply(v-> msg);
}
);
}
future.whenComplete((messageId, error) -> {
if (logger.isDebugEnabled()) {
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload);
logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})",
msgKey,
msgProperties,
avroGenericRecord.toString());
}
else {
logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}",
msgKey,
msgProperties,
msgPayload);
}
}
timeTracker.run();
}).exceptionally(ex -> {
logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
logger.error("Async message sending failed: " +
"key - " + msgKey + "; " +
"properties - " + msgProperties + "; " +
"payload - " + msgPayload);
pulsarActivity.asyncOperationFailed(ex);
return null;
});
} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Reader;
@ -13,10 +14,11 @@ public class PulsarReaderMapper extends PulsarOpMapper {
public PulsarReaderMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Reader<?>> readerFunc)
{
super(cmdTpl, clientSpace, asyncApiFunc);
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc);
this.readerFunc = readerFunc;
}

View File

@ -0,0 +1,29 @@
package io.nosqlbench.driver.pulsar.ops;
import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.transaction.Transaction;
import java.util.function.LongFunction;
import java.util.function.Supplier;
public abstract class PulsarTransactOpMapper extends PulsarOpMapper {
protected final LongFunction<Boolean> useTransactionFunc;
protected final LongFunction<Boolean> seqTrackingFunc;
protected final LongFunction<Supplier<Transaction>> transactionSupplierFunc;
public PulsarTransactOpMapper(CommandTemplate cmdTpl,
PulsarSpace clientSpace,
PulsarActivity pulsarActivity,
LongFunction<Boolean> asyncApiFunc,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
LongFunction<Supplier<Transaction>> transactionSupplierFunc)
{
super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc);
this.useTransactionFunc = useTransactionFunc;
this.seqTrackingFunc = seqTrackingFunc;
this.transactionSupplierFunc = transactionSupplierFunc;
}
}

View File

@ -21,7 +21,9 @@ import java.util.function.LongFunction;
import java.util.function.Supplier;
public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
private final static Logger logger = LogManager.getLogger(ReadyPulsarOp.class);
private final OpTemplate opTpl;
private final CommandTemplate cmdTpl;
private final PulsarSpace clientSpace;
@ -71,7 +73,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?");
}
// Global parameter: topic_uri
// Doc-level parameter: topic_uri
LongFunction<String> topicUriFunc = (l) -> null;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) {
@ -80,8 +82,9 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l);
}
}
logger.info("topic_uri: {}", topicUriFunc.apply(0));
// Global parameter: async_api
// Doc-level parameter: async_api
LongFunction<Boolean> asyncApiFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) {
@ -93,6 +96,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
logger.info("async_api: {}", asyncApiFunc.apply(0));
// Doc-level parameter: async_api
LongFunction<Boolean> useTransactionFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) {
@ -104,7 +108,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
logger.info("use_transaction: {}", useTransactionFunc.apply(0));
// Global parameter: admin_delop
// Doc-level parameter: admin_delop
LongFunction<Boolean> adminDelOpFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label))
@ -112,27 +116,70 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!");
}
logger.info("admin_delop: {}", adminDelOpFunc.apply(0));
// Doc-level parameter: seq_tracking
LongFunction<Boolean> seqTrackingFunc = (l) -> false;
if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label)) {
if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label))
seqTrackingFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label));
else
throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label + "\" parameter cannot be dynamic!");
}
logger.info("seq_tracking: {}", seqTrackingFunc.apply(0));
// TODO: Complete implementation for websocket-producer and managed-ledger
// Admin operation: create/delete tenant
if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) {
return resolveAdminTenant(clientSpace, asyncApiFunc, adminDelOpFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_NAMESPACE.label)) {
}
// Admin operation: create/delete namespace
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_NAMESPACE.label)) {
return resolveAdminNamespace(clientSpace, asyncApiFunc, adminDelOpFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TOPIC.label)) {
}
// Admin operation: create/delete topic
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TOPIC.label)) {
return resolveAdminTopic(clientSpace, topicUriFunc, asyncApiFunc, adminDelOpFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
}
// Regular/non-admin operation: single message sending (producer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc);
}
// Regular/non-admin operation: single message consuming from a single topic (consumer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, false);
}
// Regular/non-admin operation: single message consuming from multiple-topics (consumer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_MULTI_CONSUME.label)) {
return resolveMultiTopicMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc);
}
// Regular/non-admin operation: single message consuming a single topic (reader)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) {
return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) {
}
// Regular/non-admin operation: batch message processing - batch start
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) {
return resolveMsgBatchSendStart(clientSpace, topicUriFunc, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) {
}
// Regular/non-admin operation: batch message processing - message sending (producer)
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) {
return resolveMsgBatchSend(clientSpace, asyncApiFunc);
} else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) {
}
// Regular/non-admin operation: batch message processing - batch send
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) {
return resolveMsgBatchSendEnd(clientSpace, asyncApiFunc);
} else {
}
// Regular/non-admin operation: end-to-end message processing - sending message
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_SEND.label)) {
return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc);
}
// Regular/non-admin operation: end-to-end message processing - consuming message
else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_CONSUME.label)) {
return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, true);
}
// Invalid operation type
else {
throw new RuntimeException("Unsupported Pulsar operation type");
}
}
@ -184,6 +231,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return new PulsarAdminTenantMapper(
cmdTpl,
clientSpace,
pulsarActivity,
asyncApiFunc,
adminDelOpFunc,
adminRolesFunc,
@ -209,6 +257,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return new PulsarAdminNamespaceMapper(
cmdTpl,
clientSpace,
pulsarActivity,
asyncApiFunc,
adminDelOpFunc,
namespaceFunc);
@ -238,6 +287,7 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return new PulsarAdminTopicMapper(
cmdTpl,
clientSpace,
pulsarActivity,
asyncApiFunc,
adminDelOpFunc,
topic_uri_fun,
@ -249,8 +299,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc
) {
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<String> cycle_producer_name_func;
if (cmdTpl.isStatic("producer_name")) {
cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer_name");
@ -263,9 +317,19 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Producer<?>> producerFunc =
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l));
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
// check if we're going to simulate producer message out-of-sequence error
// - message ordering
// - message loss
LongFunction<String> seqErrSimuTypeFunc = (l) -> null;
if (cmdTpl.containsKey("seqerr_simu")) {
if (cmdTpl.isStatic("seqerr_simu")) {
seqErrSimuTypeFunc = (l) -> cmdTpl.getStatic("seqerr_simu");
} else {
throw new RuntimeException("\"seqerr_simu\" parameter cannot be dynamic!");
}
}
// message key
LongFunction<String> keyFunc;
if (cmdTpl.isStatic("msg_key")) {
keyFunc = (l) -> cmdTpl.getStatic("msg_key");
@ -275,6 +339,16 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
keyFunc = (l) -> null;
}
// message property
LongFunction<String> propFunc;
if (cmdTpl.isStatic("msg_property")) {
propFunc = (l) -> cmdTpl.getStatic("msg_property");
} else if (cmdTpl.isDynamic("msg_property")) {
propFunc = (l) -> cmdTpl.getDynamic("msg_property", l);
} else {
propFunc = (l) -> null;
}
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
@ -291,20 +365,82 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return new PulsarProducerMapper(
cmdTpl,
clientSpace,
pulsarActivity,
async_api_func,
producerFunc,
keyFunc,
valueFunc,
useTransactionFunc,
seqTrackingFunc,
transactionSupplierFunc,
pulsarActivity);
producerFunc,
seqErrSimuTypeFunc,
keyFunc,
propFunc,
valueFunc);
}
private LongFunction<PulsarOp> resolveMsgConsume(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc,
boolean e2eMsgProc
) {
LongFunction<String> subscription_name_func;
if (cmdTpl.isStatic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getStatic("subscription_name");
} else if (cmdTpl.isDynamic("subscription_name")) {
subscription_name_func = (l) -> cmdTpl.getDynamic("subscription_name", l);
} else {
subscription_name_func = (l) -> null;
}
LongFunction<String> subscription_type_func;
if (cmdTpl.isStatic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getStatic("subscription_type");
} else if (cmdTpl.isDynamic("subscription_type")) {
subscription_type_func = (l) -> cmdTpl.getDynamic("subscription_type", l);
} else {
subscription_type_func = (l) -> null;
}
LongFunction<String> consumer_name_func;
if (cmdTpl.isStatic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getStatic("consumer_name");
} else if (cmdTpl.isDynamic("consumer_name")) {
consumer_name_func = (l) -> cmdTpl.getDynamic("consumer_name", l);
} else {
consumer_name_func = (l) -> null;
}
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer(
topic_uri_func.apply(l),
subscription_name_func.apply(l),
subscription_type_func.apply(l),
consumer_name_func.apply(l)
);
return new PulsarConsumerMapper(
cmdTpl,
clientSpace,
pulsarActivity,
async_api_func,
useTransactionFunc,
seqTrackingFunc,
transactionSupplierFunc,
consumerFunc,
e2eMsgProc);
}
private LongFunction<PulsarOp> resolveMultiTopicMsgConsume(
PulsarSpace clientSpace,
LongFunction<String> topic_uri_func,
LongFunction<Boolean> async_api_func,
LongFunction<Boolean> useTransactionFunc,
LongFunction<Boolean> seqTrackingFunc
) {
// Topic list (multi-topic)
LongFunction<String> topic_names_func;
@ -356,8 +492,8 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Supplier<Transaction>> transactionSupplierFunc =
(l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle?
LongFunction<Consumer<?>> consumerFunc = (l) ->
clientSpace.getConsumer(
LongFunction<Consumer<?>> mtConsumerFunc = (l) ->
clientSpace.getMultiTopicConsumer(
topic_uri_func.apply(l),
topic_names_func.apply(l),
topics_pattern_func.apply(l),
@ -366,9 +502,16 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
consumer_name_func.apply(l)
);
return new PulsarConsumerMapper(cmdTpl, clientSpace, async_api_func, consumerFunc,
pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram(), pulsarActivity.getCommitTransactionTimer(),
useTransactionFunc, transactionSupplierFunc);
return new PulsarConsumerMapper(
cmdTpl,
clientSpace,
pulsarActivity,
async_api_func,
useTransactionFunc,
seqTrackingFunc,
transactionSupplierFunc,
mtConsumerFunc,
false);
}
private LongFunction<PulsarOp> resolveMsgRead(
@ -401,7 +544,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
start_msg_pos_str_func.apply(l)
);
return new PulsarReaderMapper(cmdTpl, clientSpace, async_api_func, readerFunc);
return new PulsarReaderMapper(
cmdTpl,
clientSpace,
pulsarActivity,
async_api_func,
readerFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSendStart(
@ -421,7 +569,12 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
LongFunction<Producer<?>> batchProducerFunc =
(l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l));
return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, asyncApiFunc, batchProducerFunc);
return new PulsarBatchProducerStartMapper(
cmdTpl,
clientSpace,
pulsarActivity,
asyncApiFunc,
batchProducerFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSend(PulsarSpace clientSpace,
@ -436,6 +589,16 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
keyFunc = (l) -> null;
}
// message property
LongFunction<String> propFunc;
if (cmdTpl.isStatic("msg_property")) {
propFunc = (l) -> cmdTpl.getStatic("msg_property");
} else if (cmdTpl.isDynamic("msg_property")) {
propFunc = (l) -> cmdTpl.getDynamic("msg_property", l);
} else {
propFunc = (l) -> null;
}
LongFunction<String> valueFunc;
if (cmdTpl.containsKey("msg_value")) {
if (cmdTpl.isStatic("msg_value")) {
@ -452,14 +615,20 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
return new PulsarBatchProducerMapper(
cmdTpl,
clientSpace,
pulsarActivity,
asyncApiFunc,
keyFunc,
propFunc,
valueFunc);
}
private LongFunction<PulsarOp> resolveMsgBatchSendEnd(PulsarSpace clientSpace,
LongFunction<Boolean> asyncApiFunc)
{
return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace, asyncApiFunc);
return new PulsarBatchProducerEndMapper(
cmdTpl,
clientSpace,
pulsarActivity,
asyncApiFunc);
}
}

View File

@ -1,5 +1,6 @@
package io.nosqlbench.driver.pulsar.util;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -12,8 +13,10 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Base64;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class PulsarActivityUtil {
@ -25,12 +28,15 @@ public class PulsarActivityUtil {
ADMIN_TENANT("admin-tenant"),
ADMIN_NAMESPACE("admin-namespace"),
ADMIN_TOPIC("admin-topic"),
E2E_MSG_PROC_SEND("ec2-msg-proc-send"),
E2E_MSG_PROC_CONSUME("ec2-msg-proc-consume"),
BATCH_MSG_SEND_START("batch-msg-send-start"),
BATCH_MSG_SEND("batch-msg-send"),
BATCH_MSG_SEND_END("batch-msg-send-end"),
MSG_SEND("msg-send"),
MSG_CONSUME("msg-consume"),
MSG_READ("msg-read");
MSG_READ("msg-read"),
MSG_MULTI_CONSUME("msg-mt-consume");
public final String label;
@ -42,11 +48,17 @@ public class PulsarActivityUtil {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
public static final String MSG_SEQUENCE_ID = "sequence_id";
public static final String MSG_SEQUENCE_TGTMAX = "sequence_tgtmax";
///////
// Valid document level parameters for Pulsar NB yaml file
public enum DOC_LEVEL_PARAMS {
TOPIC_URI("topic_uri"),
ASYNC_API("async_api"),
USE_TRANSACTION("use_transaction"),
ADMIN_DELOP("admin_delop");
ADMIN_DELOP("admin_delop"),
SEQ_TRACKING("seq_tracking");
public final String label;
@ -55,9 +67,28 @@ public class PulsarActivityUtil {
}
}
public static boolean isValidDocLevelParam(String param) {
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(param));
return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param));
}
///////
// Valid Pulsar API type
public enum PULSAR_API_TYPE {
PRODUCER("producer"),
CONSUMER("consumer"),
READER("reader");
public final String label;
PULSAR_API_TYPE(String label) {
this.label = label;
}
}
public static boolean isValidPulsarApiType(String param) {
return Arrays.stream(PULSAR_API_TYPE.values()).anyMatch(t -> t.label.equals(param));
}
public static String getValidPulsarApiTypeList() {
return Arrays.stream(PULSAR_API_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid persistence type
@ -75,7 +106,6 @@ public class PulsarActivityUtil {
return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
///////
// Valid Pulsar client configuration (activity-level settings)
// - https://pulsar.apache.org/docs/en/client-libraries-java/#client
@ -171,11 +201,29 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isStandardConsumerConfItem(String item) {
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Custom consumer configuration (activity-level settings)
// - NOT part of https://pulsar.apache.org/docs/en/client-libraries-java/#consumer
// - NB Pulsar driver consumer operation specific
public enum CONSUMER_CONF_CUSTOM_KEY {
timeout("timeout");
public final String label;
CONSUMER_CONF_CUSTOM_KEY(String label) {
this.label = label;
}
}
public static boolean isCustomConsumerConfItem(String item) {
return Arrays.stream(CONSUMER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Pulsar subscription type
public enum SUBSCRIPTION_TYPE {
Exclusive("Exclusive"),
Failover("Failover"),
@ -188,7 +236,6 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isValidSubscriptionType(String item) {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
@ -220,6 +267,10 @@ public class PulsarActivityUtil {
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Custom reader configuration (activity-level settings)
// - NOT part of https://pulsar.apache.org/docs/en/client-libraries-java/#reader
// - NB Pulsar driver reader operation specific
public enum READER_CONF_CUSTOM_KEY {
startMessagePos("startMessagePos");
@ -229,11 +280,12 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isCustomReaderConfItem(String item) {
return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Valid read positions for a Pulsar reader
public enum READER_MSG_POSITION_TYPE {
earliest("earliest"),
latest("latest"),
@ -245,11 +297,29 @@ public class PulsarActivityUtil {
this.label = label;
}
}
public static boolean isValideReaderStartPosition(String item) {
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
///////
// Pulsar subscription type
public enum SEQ_ERROR_SIMU_TYPE {
OutOfOrder("out_of_order"),
DataLoss("data_loss");
public final String label;
SEQ_ERROR_SIMU_TYPE(String label) {
this.label = label;
}
}
public static boolean isValidSeqErrSimuType(String item) {
return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidSeqErrSimuTypeList() {
return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
// Valid websocket-producer configuration (activity-level settings)
// TODO: to be added
@ -387,5 +457,24 @@ public class PulsarActivityUtil {
return schema;
}
///////
// Generate effective key string
public static String buildCacheKey(String... keyParts) {
// Ignore blank keyPart
String joinedKeyStr =
Stream.of(keyParts)
.filter(s -> !StringUtils.isBlank(s))
.collect(Collectors.joining(","));
return Base64.getEncoder().encodeToString(joinedKeyStr.getBytes());
}
///////
// Convert JSON string to a key/value map
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonStr, Map.class);
}
}

View File

@ -173,14 +173,16 @@ public class PulsarNBClientConf {
}
// other producer helper functions ...
public String getProducerName() {
Object confValue = getProducerConfValue("producer.producerName");
Object confValue = getProducerConfValue(
"producer." + PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getProducerTopicName() {
Object confValue = getProducerConfValue("producer.topicName");
Object confValue = getProducerConfValue(
"producer." + PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName);
if (confValue == null)
return "";
else
@ -213,48 +215,56 @@ public class PulsarNBClientConf {
}
// Other consumer helper functions ...
public String getConsumerTopicNames() {
Object confValue = getConsumerConfValue("consumer.topicNames");
Object confValue = getConsumerConfValue(
"consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getConsumerTopicPattern() {
Object confValue = getConsumerConfValue("consumer.topicsPattern");
Object confValue = getConsumerConfValue(
"consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public int getConsumerTimeoutSeconds() {
Object confValue = getConsumerConfValue("consumer.timeout");
if (confValue == null)
return -1; // infinite
else
return Integer.parseInt(confValue.toString());
}
public String getConsumerSubscriptionName() {
Object confValue = getConsumerConfValue("consumer.subscriptionName");
Object confValue = getConsumerConfValue(
"consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getConsumerSubscriptionType() {
Object confValue = getConsumerConfValue("consumer.subscriptionType");
Object confValue = getConsumerConfValue(
"consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getConsumerName() {
Object confValue = getConsumerConfValue("consumer.consumerName");
Object confValue = getConsumerConfValue(
"consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
// NOTE: Below are not a standard Pulsar consumer configuration parameter as
// listed in "https://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer"
// They're custom-made configuration properties for NB pulsar driver consumer.
public int getConsumerTimeoutSeconds() {
Object confValue = getConsumerConfValue(
"consumer." + PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label);
if (confValue == null)
return -1; // infinite
else
return Integer.parseInt(confValue.toString());
}
//////////////////
// Get Pulsar reader related config
@ -279,23 +289,29 @@ public class PulsarNBClientConf {
else
readerConfMap.put(key, value);
}
// Other consumer helper functions ...
// Other reader helper functions ...
public String getReaderTopicName() {
Object confValue = getReaderConfValue("reader.topicName");
Object confValue = getReaderConfValue(
"reader." + PulsarActivityUtil.READER_CONF_STD_KEY.topicName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
public String getReaderName() {
Object confValue = getReaderConfValue("reader.readerName");
Object confValue = getReaderConfValue(
"reader." + PulsarActivityUtil.READER_CONF_STD_KEY.readerName.label);
if (confValue == null)
return "";
else
return confValue.toString();
}
// NOTE: Below are not a standard Pulsar reader configuration parameter as
// listed in "https://pulsar.apache.org/docs/en/client-libraries-java/#reader"
// They're custom-made configuration properties for NB pulsar driver reader.
public String getStartMsgPosStr() {
Object confValue = getReaderConfValue("reader.startMessagePos");
Object confValue = getReaderConfValue(
"reader." + PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label);
if (confValue == null)
return "";
else

View File

@ -8,6 +8,8 @@
# TODO: as a starting point, only supports the following types
# 1) primitive types, including bytearray (byte[]) which is default, for messages without schema
# 2) Avro for messages with schema
#schema.type=avro
#schema.definition=file:///Users/yabinmeng/DataStax/MyNoSQLBench/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc
schema.type=
schema.definition=

View File

@ -49,6 +49,7 @@ blocks:
optype: msg-send
# producer_name: {producer_name}
msg_key: "{mykey}"
msg_property: "{myprop}"
msg_value: |
{
"SensorID": "{sensor_id}",

View File

@ -1,7 +1,10 @@
bindings:
# message key and value
# message key, property and value
mykey:
int_prop_val: ToString(); Prefix("IntProp_")
text_prop_val: AlphaNumericString(10); Prefix("TextProp_")
myvalue: NumberNameToString() #AlphaNumericString(20)
# tenant, namespace, and core topic name (without tenant and namespace)
tenant: Mod(100); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
@ -25,6 +28,11 @@ blocks:
- name: s2
optype: batch-msg-send
msg_key: "{mykey}"
msg_property: |
{
"prop1": "{int_prop_val}",
"prop2": "{text_prop_val}}"
}
msg_value: "{myvalue}"
ratio: 100
- name: s3
@ -49,8 +57,6 @@ blocks:
statements:
- name: s1
optype: msg-consume
topic_names:
topics_pattern:
subscription_name: "mysub"
subscription_type:
consumer_name:
@ -64,6 +70,19 @@ blocks:
optype: msg-read
reader_name:
- name: multi-topic-consumer-block
tags:
phase: multi-topic-consumer
admin_task: false
statements:
- name: s1
optype: msg-mt-consume
topic_names:
topics_pattern:
subscription_name: "mysub"
subscription_type:
consumer_name:
# - websocket-producer:
# tags:
# type: websocket-produer

View File

@ -0,0 +1,30 @@
bindings:
# message key, property and value
myprop1: AlphaNumericString(10); Prefix("PropVal_")
myvalue: NumberNameToString() #AlphaNumericString(20)
# document level parameters that apply to all Pulsar client types:
params:
topic_uri: "persistent://public/default/sanity_e2e_2"
async_api: "true"
blocks:
- name: e2e-msg-proc-block
tags:
phase: e2e-msg-proc
admin_task: false
statements:
- name: s1
optype: ec2-msg-proc-send
msg_key:
msg_property: |
{
"prop1": "{myprop1}"
}
msg_value: "{myvalue}"
ratio: 1
- name: s2
optype: ec2-msg-proc-consume
ratio: 1
subscription_name: "mysub"
subscription_type:

View File

@ -0,0 +1,36 @@
bindings:
# message key, property and value
myprop1: AlphaNumericString(10)
myvalue: NumberNameToString()
# document level parameters that apply to all Pulsar client types:
params:
topic_uri: "persistent://public/default/sanity_seqloss2"
# Only applicable to producer and consumer
# - used for message ordering and message loss check
seq_tracking: "false"
blocks:
- name: producer-block
tags:
phase: producer
admin_task: false
statements:
- name: s1
optype: msg-send
#seqerr_simu: "out_of_order"
seqerr_simu: "data_loass"
msg_key:
msg_property:
msg_value: "{myvalue}"
- name: consumer-block
tags:
phase: consumer
admin_task: false
statements:
- name: s1
optype: msg-consume
subscription_name: "mysub"
subscription_type:
consumer_name: