From 3ef807172b5abca200eadef86c05f646602d8094 Mon Sep 17 00:00:00 2001 From: Yabin Meng Date: Wed, 22 Sep 2021 17:17:53 -0500 Subject: [PATCH] NB Pulsar driver enhancement (milestone 11: https://github.com/nosqlbench/nosqlbench/milestone/11) - add support for message properties - add support for end-to-end latency measurement (latency histogram) - add support for message out-of-order and loss detect - add async api for consumer - split single-topic consumer and multi-topic consumer - SSL/TLS bug fix - Code cleanup --- .../driver/pulsar/PulsarAction.java | 4 +- .../driver/pulsar/PulsarActivity.java | 263 ++++--- .../nosqlbench/driver/pulsar/PulsarSpace.java | 715 +++++++++++------- .../driver/pulsar/PulsarSpaceCache.java | 21 +- .../driver/pulsar/ops/PulsarAdminMapper.java | 4 +- .../ops/PulsarAdminNamespaceMapper.java | 4 +- .../driver/pulsar/ops/PulsarAdminOp.java | 9 - .../pulsar/ops/PulsarAdminTenantMapper.java | 4 +- .../pulsar/ops/PulsarAdminTenantOp.java | 16 +- .../pulsar/ops/PulsarAdminTopicMapper.java | 4 +- .../ops/PulsarBatchProducerEndMapper.java | 4 +- .../pulsar/ops/PulsarBatchProducerEndOp.java | 3 +- .../pulsar/ops/PulsarBatchProducerMapper.java | 30 +- .../pulsar/ops/PulsarBatchProducerOp.java | 8 +- .../ops/PulsarBatchProducerStartMapper.java | 4 +- .../pulsar/ops/PulsarConsumerMapper.java | 47 +- .../driver/pulsar/ops/PulsarConsumerOp.java | 315 ++++++-- .../driver/pulsar/ops/PulsarOpMapper.java | 6 + .../pulsar/ops/PulsarProducerMapper.java | 99 ++- .../driver/pulsar/ops/PulsarProducerOp.java | 163 +++- .../driver/pulsar/ops/PulsarReaderMapper.java | 4 +- .../pulsar/ops/PulsarTransactOpMapper.java | 29 + .../driver/pulsar/ops/ReadyPulsarOp.java | 229 +++++- .../pulsar/util/PulsarActivityUtil.java | 107 ++- .../pulsar/util/PulsarNBClientConf.java | 54 +- .../resources/activities/config.properties | 2 + .../activities/pulsar_client_avro.yaml | 1 + .../activities/pulsar_client_kv.yaml | 25 +- .../activities/pulsar_client_sanity_e2e.yaml | 30 + .../pulsar_client_sanity_seqloss.yaml | 36 + 30 files changed, 1561 insertions(+), 679 deletions(-) create mode 100644 driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarTransactOpMapper.java create mode 100644 driver-pulsar/src/main/resources/activities/pulsar_client_sanity_e2e.yaml create mode 100644 driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index 964ebee72..d9c8c0623 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -41,7 +41,7 @@ public class PulsarAction implements SyncAction { pulsarOp = readyPulsarOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... - activity.getErrorhandler().handleError(bindException, cycle, 0); + activity.getErrorHandler().handleError(bindException, cycle, 0); throw new RuntimeException( "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException ); @@ -56,7 +56,7 @@ public class PulsarAction implements SyncAction { break; } catch (RuntimeException err) { ErrorDetail errorDetail = activity - .getErrorhandler() + .getErrorHandler() .handleError(err, cycle, System.nanoTime() - start); if (!errorDetail.isRetryable()) { break; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index bb0e521bf..6d24f312a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -20,117 +20,74 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.api.*; + +import java.util.Map; public class PulsarActivity extends SimpleActivity implements ActivityDefObserver { private final static Logger logger = LogManager.getLogger(PulsarActivity.class); - public Timer bindTimer; - public Timer executeTimer; - public Counter bytesCounter; - public Histogram messagesizeHistogram; - public Timer createTransactionTimer; - public Timer commitTransactionTimer; + private Counter bytesCounter; + private Histogram messageSizeHistogram; + private Timer bindTimer; + private Timer executeTimer; + private Timer createTransactionTimer; + private Timer commitTransactionTimer; + + // Metrics for NB Pulsar driver milestone: https://github.com/nosqlbench/nosqlbench/milestone/11 + // - end-to-end latency + private Histogram e2eMsgProcLatencyHistogram; private PulsarSpaceCache pulsarCache; - private PulsarAdmin pulsarAdmin; - private PulsarNBClientConf clientConf; - // e.g. pulsar://localhost:6650 + private PulsarNBClientConf pulsarNBClientConf; private String pulsarSvcUrl; - // e.g. http://localhost:8080 private String webSvcUrl; + private PulsarAdmin pulsarAdmin; + private PulsarClient pulsarClient; + private Schema pulsarSchema; - private NBErrorHandler errorhandler; + private NBErrorHandler errorHandler; private OpSequence> sequencer; private volatile Throwable asyncOperationFailure; - // private Supplier clientSupplier; - // private ThreadLocal> tlClientSupplier; - public PulsarActivity(ActivityDef activityDef) { super(activityDef); } - private void initPulsarAdmin() { + @Override + public void shutdownActivity() { + super.shutdownActivity(); - PulsarAdminBuilder adminBuilder = - PulsarAdmin.builder() - .serviceHttpUrl(webSvcUrl); - - try { - String authPluginClassName = - (String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label); - String authParams = - (String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label); - - String useTlsStr = - (String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label); - boolean useTls = BooleanUtils.toBoolean(useTlsStr); - - String tlsTrustCertsFilePath = - (String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label); - - String tlsAllowInsecureConnectionStr = - (String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label); - boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr); - - String tlsHostnameVerificationEnableStr = - (String) clientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label); - boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr); - - if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) { - adminBuilder.authentication(authPluginClassName, authParams); - } - - if ( useTls ) { - adminBuilder - .useKeyStoreTls(true) - .enableTlsHostnameVerification(tlsHostnameVerificationEnable); - - if (!StringUtils.isBlank(tlsTrustCertsFilePath)) - adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); - } - - // Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection" - adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection); - pulsarAdmin = adminBuilder.build(); - - // Not supported in Pulsar 2.8.0 -// ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData(); -// logger.debug(configurationData.toString()); - - } catch (PulsarClientException e) { - logger.error("Fail to create PulsarAdmin from global configuration!"); - throw new RuntimeException("Fail to create PulsarAdmin from global configuration!"); + for (PulsarSpace pulsarSpace : pulsarCache.getAssociatedPulsarSpace()) { + pulsarSpace.shutdownPulsarSpace(); } } @Override public void initActivity() { super.initActivity(); - + bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); + messageSizeHistogram = ActivityMetrics.histogram(activityDef, "message_size"); bindTimer = ActivityMetrics.timer(activityDef, "bind"); executeTimer = ActivityMetrics.timer(activityDef, "execute"); - createTransactionTimer = ActivityMetrics.timer(activityDef, "createtransaction"); - commitTransactionTimer = ActivityMetrics.timer(activityDef, "committransaction"); + createTransactionTimer = ActivityMetrics.timer(activityDef, "create_transaction"); + commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction"); - bytesCounter = ActivityMetrics.counter(activityDef, "bytes"); - messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize"); + e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency"); String pulsarClntConfFile = activityDef.getParams().getOptionalString("config").orElse("config.properties"); - clientConf = new PulsarNBClientConf(pulsarClntConfFile); + pulsarNBClientConf = new PulsarNBClientConf(pulsarClntConfFile); pulsarSvcUrl = activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650"); webSvcUrl = activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080"); - initPulsarAdmin(); + initPulsarAdminAndClientObj(); + createPulsarSchemaFromConf(); pulsarCache = new PulsarSpaceCache(this); @@ -138,60 +95,20 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve setDefaultsFromOpSequence(sequencer); onActivityDefUpdate(activityDef); - this.errorhandler = new NBErrorHandler( + this.errorHandler = new NBErrorHandler( () -> activityDef.getParams().getOptionalString("errors").orElse("stop"), this::getExceptionMetrics ); } - public NBErrorHandler getErrorhandler() { - return errorhandler; - } - @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); } - public OpSequence> getSequencer() { - return sequencer; - } + public NBErrorHandler getErrorHandler() { return errorHandler; } - public PulsarNBClientConf getPulsarConf() { - return clientConf; - } - - public String getPulsarSvcUrl() { - return pulsarSvcUrl; - } - - public String getWebSvcUrl() { return webSvcUrl; } - - public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; } - - public Timer getBindTimer() { - return bindTimer; - } - - public Timer getExecuteTimer() { - return this.executeTimer; - } - - public Counter getBytesCounter() { - return bytesCounter; - } - - public Timer getCreateTransactionTimer() { - return createTransactionTimer; - } - - public Timer getCommitTransactionTimer() { - return commitTransactionTimer; - } - - public Histogram getMessagesizeHistogram() { - return messagesizeHistogram; - } + public OpSequence> getSequencer() { return sequencer; } public void failOnAsyncOperationFailure() { if (asyncOperationFailure != null) { @@ -202,4 +119,116 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve public void asyncOperationFailed(Throwable ex) { this.asyncOperationFailure = ex; } + + /** + * Initialize + * - PulsarAdmin object for adding/deleting tenant, namespace, and topic + * - PulsarClient object for message publishing and consuming + */ + private void initPulsarAdminAndClientObj() { + PulsarAdminBuilder adminBuilder = + PulsarAdmin.builder() + .serviceHttpUrl(webSvcUrl); + + ClientBuilder clientBuilder = PulsarClient.builder(); + + try { + Map clientConfMap = pulsarNBClientConf.getClientConfMap(); + + // Override "client.serviceUrl" setting in config.properties + clientConfMap.remove("serviceUrl"); + clientBuilder.loadConf(clientConfMap).serviceUrl(pulsarSvcUrl); + + // Pulsar Authentication + String authPluginClassName = + (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label); + String authParams = + (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label); + + if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) { + adminBuilder.authentication(authPluginClassName, authParams); + clientBuilder.authentication(authPluginClassName, authParams); + } + + String useTlsStr = + (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label); + boolean useTls = BooleanUtils.toBoolean(useTlsStr); + + String tlsTrustCertsFilePath = + (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label); + + String tlsAllowInsecureConnectionStr = + (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label); + boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr); + + String tlsHostnameVerificationEnableStr = + (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label); + boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr); + + if ( useTls ) { + adminBuilder + .enableTlsHostnameVerification(tlsHostnameVerificationEnable); + + clientBuilder + .enableTlsHostnameVerification(tlsHostnameVerificationEnable); + + if (!StringUtils.isBlank(tlsTrustCertsFilePath)) { + adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); + clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); + } + } + + // Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection" + adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection); + clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection); + + pulsarAdmin = adminBuilder.build(); + pulsarClient = clientBuilder.build(); + + //////////////// + // Not supported in Pulsar 2.8.0 + // + // ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData(); + // logger.debug(configurationData.toString()); + + } catch (PulsarClientException e) { + logger.error("Fail to create PulsarAdmin and/or PulsarClient object from the global configuration!"); + throw new RuntimeException("Fail to create PulsarAdmin and/or PulsarClient object from global configuration!"); + } + } + + /** + * Get Pulsar schema from the definition string + */ + private void createPulsarSchemaFromConf() { + Object value = pulsarNBClientConf.getSchemaConfValue("schema.type"); + String schemaType = (value != null) ? value.toString() : ""; + + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) { + value = pulsarNBClientConf.getSchemaConfValue("schema.definition"); + String schemaDefStr = (value != null) ? value.toString() : ""; + pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr); + } else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) { + pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType)); + } else { + throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " + + "Only primitive type and Avro type are supported at the moment!"); + } + } + + public PulsarNBClientConf getPulsarConf() { return this.pulsarNBClientConf;} + public String getPulsarSvcUrl() { return this.pulsarSvcUrl;} + public String getWebSvcUrl() { return this.webSvcUrl; } + public PulsarAdmin getPulsarAdmin() { return this.pulsarAdmin; } + public PulsarClient getPulsarClient() { return this.pulsarClient; } + public Schema getPulsarSchema() { return pulsarSchema; } + + public Counter getBytesCounter() { return bytesCounter; } + public Histogram getMessageSizeHistogram() { return messageSizeHistogram; } + public Timer getBindTimer() { return bindTimer; } + public Timer getExecuteTimer() { return this.executeTimer; } + public Timer getCreateTransactionTimer() { return createTransactionTimer; } + public Timer getCommitTransactionTimer() { return commitTransactionTimer; } + + public Histogram getE2eMsgProcLatencyHistogram() { return e2eMsgProcLatencyHistogram; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index 5101642d1..ef4a9e4fc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -7,7 +7,6 @@ import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.metrics.ActivityMetrics; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.BooleanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -17,6 +16,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.transaction.Transaction; +import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -39,40 +39,36 @@ public class PulsarSpace { private final static Logger logger = LogManager.getLogger(PulsarSpace.class); + private final String spaceName; + private final ConcurrentHashMap> producers = new ConcurrentHashMap<>(); private final ConcurrentHashMap> consumers = new ConcurrentHashMap<>(); private final ConcurrentHashMap> readers = new ConcurrentHashMap<>(); - private final String spaceName; + private final PulsarActivity pulsarActivity; + private final ActivityDef activityDef; + private final PulsarNBClientConf pulsarNBClientConf; private final String pulsarSvcUrl; private final String webSvcUrl; private final PulsarAdmin pulsarAdmin; + private final PulsarClient pulsarClient; + private final Schema pulsarSchema; + private final Set pulsarClusterMetadata = new HashSet<>(); private final Timer createTransactionTimer; - private final Set pulsarClusterMetadata = new HashSet<>(); - - private PulsarClient pulsarClient = null; - private Schema pulsarSchema = null; - private final ActivityDef activityDef; - - public PulsarSpace(String name, - PulsarNBClientConf pulsarClientConf, - String pulsarSvcUrl, - String webSvcUrl, - PulsarAdmin pulsarAdmin, - ActivityDef activityDef, - Timer createTransactionTimer) { + public PulsarSpace(String name, PulsarActivity pulsarActivity) { this.spaceName = name; - this.pulsarNBClientConf = pulsarClientConf; - this.pulsarSvcUrl = pulsarSvcUrl; - this.webSvcUrl = webSvcUrl; - this.pulsarAdmin = pulsarAdmin; - this.activityDef = activityDef; - this.createTransactionTimer = createTransactionTimer; + this.pulsarActivity = pulsarActivity; - createPulsarClientFromConf(); - createPulsarSchemaFromConf(); + this.pulsarNBClientConf = pulsarActivity.getPulsarConf(); + this.pulsarSvcUrl = pulsarActivity.getPulsarSvcUrl(); + this.webSvcUrl = pulsarActivity.getWebSvcUrl(); + this.pulsarAdmin = pulsarActivity.getPulsarAdmin(); + this.pulsarClient = pulsarActivity.getPulsarClient(); + this.pulsarSchema = pulsarActivity.getPulsarSchema(); + this.activityDef = pulsarActivity.getActivityDef(); + this.createTransactionTimer = pulsarActivity.getCreateTransactionTimer(); try { Clusters clusters = pulsarAdmin.clusters(); @@ -86,114 +82,111 @@ public class PulsarSpace { } } - private void createPulsarClientFromConf() { - ClientBuilder clientBuilder = PulsarClient.builder(); - - try { - Map clientConf = pulsarNBClientConf.getClientConfMap(); - - // Override "client.serviceUrl" setting in config.properties - clientConf.remove("serviceUrl"); - clientBuilder.loadConf(clientConf).serviceUrl(pulsarSvcUrl); - - String authPluginClassName = - (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label); - String authParams = - (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label); - - String useTlsStr = - (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label); - boolean useTls = BooleanUtils.toBoolean(useTlsStr); - - String tlsTrustCertsFilePath = - (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label); - - String tlsAllowInsecureConnectionStr = - (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label); - boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr); - - String tlsHostnameVerificationEnableStr = - (String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label); - boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr); - - if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) { - clientBuilder.authentication(authPluginClassName, authParams); - } - - if ( useTls ) { - clientBuilder - .useKeyStoreTls(useTls) - .enableTlsHostnameVerification(tlsHostnameVerificationEnable); - - if (!StringUtils.isBlank(tlsTrustCertsFilePath)) - clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath); - } - - // Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection" - clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection); - - pulsarClient = clientBuilder.build(); - } - catch (PulsarClientException pce) { - String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage(); - logger.error(errMsg); - throw new RuntimeException(errMsg); - } - } - - private void createPulsarSchemaFromConf() { - Object value = pulsarNBClientConf.getSchemaConfValue("schema.type"); - String schemaType = (value != null) ? value.toString() : ""; - - if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) { - value = pulsarNBClientConf.getSchemaConfValue("schema.definition"); - String schemaDefStr = (value != null) ? value.toString() : ""; - pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr); - } else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) { - pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType)); - } else { - throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " + - "Only primitive type and Avro type are supported at the moment!"); - } - } - - public PulsarClient getPulsarClient() { return pulsarClient; } - - public PulsarNBClientConf getPulsarClientConf() { - return pulsarNBClientConf; - } - - public Schema getPulsarSchema() { - return pulsarSchema; - } - + public PulsarNBClientConf getPulsarClientConf() { return pulsarNBClientConf; } public PulsarAdmin getPulsarAdmin() { return pulsarAdmin; } + public PulsarClient getPulsarClient() { return pulsarClient; } + public Schema getPulsarSchema() { return pulsarSchema; } + public String getPulsarSvcUrl() { return pulsarSvcUrl;} + public String getWebSvcUrl() { return webSvcUrl; } + public Set getPulsarClusterMetadata() { return pulsarClusterMetadata; } - public String getPulsarSvcUrl() { - return pulsarSvcUrl; + + // Properly shut down all Pulsar objects (producers, consumers, etc.) that are associated with this space + public void shutdownPulsarSpace() { + try { + for (Producer producer : producers.values()) { + if (producer != null) producer.close(); + } + + for (Consumer consumer : consumers.values()) { + if (consumer != null) consumer.close(); + } + + for (Reader reader : readers.values()) { + if (reader != null) reader.close(); + } + + if (pulsarAdmin != null) pulsarAdmin.close(); + + if (pulsarClient != null) pulsarClient.close(); + } + catch (Exception e) { + throw new RuntimeException("Unexpected error when closing Pulsar objects!"); + } } - public String getWebSvcUrl() { return webSvcUrl; } + /** + * Get a proper Pulsar API metrics prefix depending on the API type + * + * @param apiType - Pulsar API type: producer, consumer, reader, etc. + * @param apiObjName - actual name of a producer, a consumer, a reader, etc. + * @param topicName - topic name + * @return String + */ + private String getPulsarAPIMetricsPrefix(String apiType, String apiObjName, String topicName) { + String apiMetricsPrefix; + + if (!PulsarActivityUtil.isValidPulsarApiType(apiType)) { + throw new RuntimeException( + "Incorrect Pulsar API type. Valid type list: " + PulsarActivityUtil.getValidPulsarApiTypeList()); + } + + if (!StringUtils.isBlank(apiObjName)) { + apiMetricsPrefix = apiObjName + "_"; + } + else { + // we want a meaningful name for the API object (producer, consumer, reader, etc.) + // we are not appending the topic name + apiMetricsPrefix = apiType; + + if (apiType.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label)) + apiMetricsPrefix += producers.size(); + else if (apiType.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.CONSUMER.label)) + apiMetricsPrefix += consumers.size(); + else if (apiType.equalsIgnoreCase(PulsarActivityUtil.PULSAR_API_TYPE.READER.label)) + apiMetricsPrefix += readers.size(); + + apiMetricsPrefix += "_"; + } + + apiMetricsPrefix += topicName + "_"; + apiMetricsPrefix = apiMetricsPrefix + // default name for tests/demos (in all Pulsar examples) is persistent://public/default/test -> use just the topic name test + .replace("persistent://public/default/", "") + // always remove topic type + .replace("non-persistent://", "") + .replace("persistent://", "") + // persistent://tenant/namespace/topicname -> tenant_namespace_topicname + .replace("/","_"); + + return apiMetricsPrefix; + } - public Set getPulsarClusterMetadata() { return pulsarClusterMetadata; } ////////////////////////////////////// // Producer Processing --> start ////////////////////////////////////// - // Topic name IS mandatory - // - It must be set at either global level or cycle level - // - If set at both levels, cycle level setting takes precedence - private String getEffectiveProducerTopicName(String cycleTopicName) { - if (!StringUtils.isBlank(cycleTopicName)) { - return cycleTopicName; + // + private static class ProducerGaugeImpl implements Gauge { + private final Producer producer; + private final Function valueExtractor; + + ProducerGaugeImpl(Producer producer, Function valueExtractor) { + this.producer = producer; + this.valueExtractor = valueExtractor; } - String globalTopicName = pulsarNBClientConf.getProducerTopicName(); - if (!StringUtils.isBlank(globalTopicName)) { - return globalTopicName; + @Override + public Object getValue() { + // see Pulsar bug https://github.com/apache/pulsar/issues/10100 + // we need to synchronize on producer otherwise we could receive corrupted data + synchronized(producer) { + return valueExtractor.apply(producer.getStats()); + } } - - throw new RuntimeException(" topic name must be set at either global level or cycle level!"); + } + static Gauge producerSafeExtractMetric(Producer producer, Function valueExtractor) { + return new ProducerGaugeImpl(producer, valueExtractor); } // Producer name is NOT mandatory @@ -212,7 +205,6 @@ public class PulsarSpace { return ""; } - public Supplier getTransactionSupplier() { PulsarClient pulsarClient = getPulsarClient(); return () -> { @@ -233,8 +225,20 @@ public class PulsarSpace { }; } - private static String buildCacheKey(String... keyParts) { - return String.join("::", keyParts); + // Topic name IS mandatory + // - It must be set at either global level or cycle level + // - If set at both levels, cycle level setting takes precedence + private String getEffectiveProducerTopicName(String cycleTopicName) { + if (!StringUtils.isBlank(cycleTopicName)) { + return cycleTopicName; + } + + String globalTopicName = pulsarNBClientConf.getProducerTopicName(); + if (!StringUtils.isBlank(globalTopicName)) { + return globalTopicName; + } + + throw new RuntimeException("Producer topic name must be set at either global level or cycle level!"); } public Producer getProducer(String cycleTopicName, String cycleProducerName) { @@ -242,10 +246,10 @@ public class PulsarSpace { String producerName = getEffectiveProducerName(cycleProducerName); if (StringUtils.isBlank(topicName)) { - throw new RuntimeException("Producer:: must specify a topic name either at the global level or the cycle level"); + throw new RuntimeException("Producer:: must specify a topic name"); } - String producerCacheKey = buildCacheKey(producerName, topicName); + String producerCacheKey = PulsarActivityUtil.buildCacheKey(producerName, topicName); Producer producer = producers.get(producerCacheKey); if (producer == null) { @@ -253,37 +257,47 @@ public class PulsarSpace { // Get other possible producer settings that are set at global level Map producerConf = pulsarNBClientConf.getProducerConfMap(); - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName); - String producerMetricsPrefix; - if (!StringUtils.isBlank(producerName)) { - producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName); - producerMetricsPrefix = producerName + "_"; - } else { - // we want a meaningful name for the producer - // we are not appending the topic name - producerMetricsPrefix = "producer" + producers.size() + "_" ; - } + // Remove global level settings: "topicName" and "producerName" + producerConf.remove(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label); + producerConf.remove(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label); - producerMetricsPrefix += topicName + "_"; - producerMetricsPrefix = producerMetricsPrefix - .replace("persistent://public/default/", "") // default name for tests/demos (in all Pulsar examples) is persistent://public/default/test -> use just the topic name test - .replace("non-persistent://", "") // always remove topic type - .replace("persistent://", "") - .replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname + String producerMetricsPrefix = getPulsarAPIMetricsPrefix( + PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label, + producerName, + topicName); try { - ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema); - producerBuilder.loadConf(producerConf); + ProducerBuilder producerBuilder = pulsarClient. + newProducer(pulsarSchema). + loadConf(producerConf). + topic(topicName); + + if (!StringUtils.isAnyBlank(producerName)) { + producerBuilder = producerBuilder.producerName(producerName); + } + producer = producerBuilder.create(); producers.put(producerCacheKey, producer); - ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent", safeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent()))); - ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent()))); - ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalsendfailed", safeExtractMetric(producer, (s -> s.getTotalSendFailed() + s.getNumSendFailed()))); - ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalacksreceived", safeExtractMetric(producer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived()))); - ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendbytesrate", safeExtractMetric(producer, ProducerStats::getSendBytesRate)); - ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendmsgsrate", safeExtractMetric(producer, ProducerStats::getSendMsgsRate)); + ActivityMetrics.gauge(activityDef, + producerMetricsPrefix + "total_bytes_sent", + producerSafeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent()))); + ActivityMetrics.gauge(activityDef, + producerMetricsPrefix + "total_msg_sent", + producerSafeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent()))); + ActivityMetrics.gauge(activityDef, + producerMetricsPrefix + "total_send_failed", + producerSafeExtractMetric(producer, (s -> s.getTotalSendFailed() + s.getNumSendFailed()))); + ActivityMetrics.gauge(activityDef, + producerMetricsPrefix + "total_ack_received", + producerSafeExtractMetric(producer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived()))); + ActivityMetrics.gauge(activityDef, + producerMetricsPrefix + "send_bytes_rate", + producerSafeExtractMetric(producer, ProducerStats::getSendBytesRate)); + ActivityMetrics.gauge(activityDef, + producerMetricsPrefix + "send_msg_rate", + producerSafeExtractMetric(producer, ProducerStats::getSendMsgsRate)); } catch (PulsarClientException ple) { throw new RuntimeException("Unable to create a Pulsar producer!", ple); @@ -292,30 +306,7 @@ public class PulsarSpace { return producer; } - - static Gauge safeExtractMetric(Producer producer, Function valueExtractor) { - return new GaugeImpl(producer, valueExtractor); - } - - private static class GaugeImpl implements Gauge { - private final Producer producer; - private final Function valueExtractor; - - GaugeImpl(Producer producer, Function valueExtractor) { - this.producer = producer; - this.valueExtractor = valueExtractor; - } - - @Override - public Object getValue() { - // see Pulsar bug https://github.com/apache/pulsar/issues/10100 - // we need to synchronize on producer otherwise we could receive corrupted data - synchronized(producer) { - return valueExtractor.apply(producer.getStats()); - } - } - } - + // ////////////////////////////////////// // Producer Processing <-- end ////////////////////////////////////// @@ -324,59 +315,28 @@ public class PulsarSpace { ////////////////////////////////////// // Consumer Processing --> start ////////////////////////////////////// - private String getEffectiveTopicNamesStr(String cycleTopicNames) { - if (!StringUtils.isBlank(cycleTopicNames)) { - return cycleTopicNames; + // + private static class ConsumerGaugeImpl implements Gauge { + private final Consumer consumer; + private final Function valueExtractor; + + ConsumerGaugeImpl(Consumer consumer, Function valueExtractor) { + this.consumer = consumer; + this.valueExtractor = valueExtractor; } - String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); - if (!StringUtils.isBlank(globalTopicNames)) { - return globalTopicNames; + @Override + public Object getValue() { + // see Pulsar bug https://github.com/apache/pulsar/issues/10100 + // - this is a bug report for producer stats. + // - assume this also applies to consumer stats. + synchronized(consumer) { + return valueExtractor.apply(consumer.getStats()); + } } - - return ""; } - - private List getEffectiveTopicNames(String cycleTopicNames) { - String effectiveTopicNamesStr = getEffectiveTopicNamesStr(cycleTopicNames); - - String[] names = effectiveTopicNamesStr.split("[;,]"); - ArrayList effectiveTopicNameList = new ArrayList<>(); - - for (String name : names) { - if (!StringUtils.isBlank(name)) - effectiveTopicNameList.add(name.trim()); - } - - - return effectiveTopicNameList; - } - - private String getEffectiveTopicPatternStr(String cycleTopicsPattern) { - if (!StringUtils.isBlank(cycleTopicsPattern)) { - return cycleTopicsPattern; - } - - String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); - if (!StringUtils.isBlank(globalTopicsPattern)) { - return globalTopicsPattern; - } - - return ""; - } - - private Pattern getEffectiveTopicPattern(String cycleTopicsPattern) { - String effectiveTopicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); - Pattern topicsPattern; - try { - if (!StringUtils.isBlank(effectiveTopicsPatternStr)) - topicsPattern = Pattern.compile(effectiveTopicsPatternStr); - else - topicsPattern = null; - } catch (PatternSyntaxException pse) { - topicsPattern = null; - } - return topicsPattern; + static Gauge consumerSafeExtractMetric(Consumer consumer, Function valueExtractor) { + return new ConsumerGaugeImpl(consumer, valueExtractor); } private String getEffectiveSubscriptionName(String cycleSubscriptionName) { @@ -404,7 +364,6 @@ public class PulsarSpace { return ""; } - private SubscriptionType getEffectiveSubscriptionType(String cycleSubscriptionType) { String effectiveSubscriptionStr = getEffectiveSubscriptionTypeStr(cycleSubscriptionType); SubscriptionType subscriptionType = SubscriptionType.Exclusive; @@ -434,78 +393,79 @@ public class PulsarSpace { return ""; } - public Consumer getConsumer(String cycleTopicUri, - String cycleTopicNames, - String cycleTopicsPattern, + public Consumer getConsumer(String cycleTopicName, String cycleSubscriptionName, String cycleSubscriptionType, String cycleConsumerName) { - - List topicNames = getEffectiveTopicNames(cycleTopicNames); - String topicsPatternStr = getEffectiveTopicPatternStr(cycleTopicsPattern); - Pattern topicsPattern = getEffectiveTopicPattern(cycleTopicsPattern); String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); String consumerName = getEffectiveConsumerName(cycleConsumerName); - if (StringUtils.isBlank(cycleTopicUri) && topicNames.isEmpty() && (topicsPattern == null)) { - throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!"); + if ( subscriptionType.equals(SubscriptionType.Exclusive) && (activityDef.getThreads() > 1) ) { + throw new RuntimeException("Consumer:: trying to create multiple consumers of " + + "\"Exclusive\" subscription type under the same subscription name to the same topic!"); } - String consumerCacheKey; - // precedence sequence: - // topic_names (consumer statement param) > - // topics_pattern (consumer statement param) > - // topic_uri (document level param) - if (!topicNames.isEmpty()) { - consumerCacheKey = buildCacheKey( - consumerName, - subscriptionName, - String.join("|", topicNames)); - } else if (topicsPattern != null) { - consumerCacheKey = buildCacheKey( - consumerName, - subscriptionName, - topicsPatternStr); - } else { - consumerCacheKey = buildCacheKey( - consumerName, - subscriptionName, - cycleTopicUri); + if (StringUtils.isAnyBlank(cycleTopicName, subscriptionName)) { + throw new RuntimeException("Consumer:: must specify a topic name and a subscription name"); } + String consumerCacheKey = PulsarActivityUtil.buildCacheKey(consumerName, subscriptionName, cycleTopicName); Consumer consumer = consumers.get(consumerCacheKey); if (consumer == null) { PulsarClient pulsarClient = getPulsarClient(); - // Get other possible producer settings that are set at global level + // Get other possible consumer settings that are set at global level Map consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap()); - consumerConf.remove("timeout"); - // Explicit topic names will take precedence over topics pattern - if (!topicNames.isEmpty()) { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames); - } else if (topicsPattern != null) { - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); - consumerConf.put( - PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label, - getEffectiveTopicPattern(cycleTopicsPattern)); - } else { - topicNames.add(cycleTopicUri); - consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label, topicNames); - } - - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label, subscriptionName); - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label, subscriptionType); - if (!StringUtils.isBlank(consumerName)) { - consumerConf.put(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label, consumerName); - } + // Remove global level settings: + // - "topicNames", "topicsPattern", "subscriptionName", "subscriptionType", "consumerName" + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label); + // Remove non-standard consumer configuration properties + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label); try { - consumer = pulsarClient.newConsumer(pulsarSchema).loadConf(consumerConf).subscribe(); + ConsumerBuilder consumerBuilder = pulsarClient. + newConsumer(pulsarSchema). + loadConf(consumerConf). + topic(cycleTopicName). + subscriptionName(subscriptionName). + subscriptionType(subscriptionType); + + if (!StringUtils.isBlank(consumerName)) { + consumerBuilder = consumerBuilder.consumerName(consumerName); + } + + consumer = consumerBuilder.subscribe(); + + String consumerMetricsPrefix = getPulsarAPIMetricsPrefix( + PulsarActivityUtil.PULSAR_API_TYPE.CONSUMER.label, + consumerName, + cycleTopicName); + + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "total_bytes_recv", + consumerSafeExtractMetric(consumer, (s -> s.getTotalBytesReceived() + s.getNumBytesReceived()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "total_msg_recv", + consumerSafeExtractMetric(consumer, (s -> s.getTotalMsgsReceived() + s.getNumMsgsReceived()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "total_recv_failed", + consumerSafeExtractMetric(consumer, (s -> s.getTotalReceivedFailed() + s.getNumReceiveFailed()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "total_acks_sent", + consumerSafeExtractMetric(consumer,(s -> s.getTotalAcksSent() + s.getNumAcksSent()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "recv_bytes_rate", + consumerSafeExtractMetric(consumer, ConsumerStats::getRateBytesReceived)); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "recv_msg_rate", + consumerSafeExtractMetric(consumer, ConsumerStats::getRateMsgsReceived)); } catch (PulsarClientException ple) { ple.printStackTrace(); throw new RuntimeException("Unable to create a Pulsar consumer!"); @@ -516,11 +476,186 @@ public class PulsarSpace { return consumer; } + // ////////////////////////////////////// // Consumer Processing <-- end ////////////////////////////////////// + ////////////////////////////////////// + // Multi-topic Consumer Processing --> start + ////////////////////////////////////// + // + private String getEffectiveConsumerTopicNameListStr(String cycleTopicNames) { + if (!StringUtils.isBlank(cycleTopicNames)) { + return cycleTopicNames; + } + + String globalTopicNames = pulsarNBClientConf.getConsumerTopicNames(); + if (!StringUtils.isBlank(globalTopicNames)) { + return globalTopicNames; + } + + return ""; + } + + private List getEffectiveConsumerTopicNameList(String cycleTopicNames) { + String effectiveTopicNamesStr = getEffectiveConsumerTopicNameListStr(cycleTopicNames); + + String[] names = effectiveTopicNamesStr.split("[;,]"); + ArrayList effectiveTopicNameList = new ArrayList<>(); + + for (String name : names) { + if (!StringUtils.isBlank(name)) + effectiveTopicNameList.add(name.trim()); + } + + return effectiveTopicNameList; + } + + private String getEffectiveConsumerTopicPatternStr(String cycleTopicsPattern) { + if (!StringUtils.isBlank(cycleTopicsPattern)) { + return cycleTopicsPattern; + } + + String globalTopicsPattern = pulsarNBClientConf.getConsumerTopicPattern(); + if (!StringUtils.isBlank(globalTopicsPattern)) { + return globalTopicsPattern; + } + + return ""; + } + + private Pattern getEffectiveConsumerTopicPattern(String cycleTopicsPattern) { + String effectiveTopicsPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern; + try { + if (!StringUtils.isBlank(effectiveTopicsPatternStr)) + topicsPattern = Pattern.compile(effectiveTopicsPatternStr); + else + topicsPattern = null; + } catch (PatternSyntaxException pse) { + topicsPattern = null; + } + return topicsPattern; + } + + public Consumer getMultiTopicConsumer( + String cycleTopicUri, + String cycleTopicNameList, + String cycleTopicsPattern, + String cycleSubscriptionName, + String cycleSubscriptionType, + String cycleConsumerName) { + + List topicNameList = getEffectiveConsumerTopicNameList(cycleTopicNameList); + String topicsPatternStr = getEffectiveConsumerTopicPatternStr(cycleTopicsPattern); + Pattern topicsPattern = getEffectiveConsumerTopicPattern(cycleTopicsPattern); + String subscriptionName = getEffectiveSubscriptionName(cycleSubscriptionName); + SubscriptionType subscriptionType = getEffectiveSubscriptionType(cycleSubscriptionType); + String consumerName = getEffectiveConsumerName(cycleConsumerName); + + if ( subscriptionType.equals(SubscriptionType.Exclusive) && (activityDef.getThreads() > 1) ) { + throw new RuntimeException("Consumer:: trying to create multiple consumers of " + + "\"Exclusive\" subscription type under the same subscription name to the same topic!"); + } + + if (StringUtils.isBlank(cycleTopicUri) && topicNameList.isEmpty() && (topicsPattern == null)) { + throw new RuntimeException("Consumer:: \"topic_uri\", \"topic_names\" and \"topics_pattern\" parameters can't be all empty/invalid!"); + } + + // precedence sequence: + // topic_names (consumer statement param) > + // topics_pattern (consumer statement param) > + // topic_uri (document level param) + String consumerTopicListString; + if (!topicNameList.isEmpty()) { + consumerTopicListString = String.join("|", topicNameList); + } else if (topicsPattern != null) { + consumerTopicListString = topicsPatternStr; + } else { + consumerTopicListString = cycleTopicUri; + } + String consumerCacheKey = PulsarActivityUtil.buildCacheKey( + consumerName, + subscriptionName, + consumerTopicListString); + + Consumer consumer = consumers.get(consumerCacheKey); + + if (consumer == null) { + PulsarClient pulsarClient = getPulsarClient(); + + // Get other possible producer settings that are set at global level + Map consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap()); + + // Remove global level settings: + // - "topicNameList", "topicsPattern", "subscriptionName", "subscriptionType", "consumerName" + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label); + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label); + // Remove non-standard consumer configuration properties + consumerConf.remove(PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label); + + try { + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(pulsarSchema). + loadConf(consumerConf). + subscriptionName(subscriptionName). + subscriptionType(subscriptionType). + consumerName(consumerName); + + if (!topicNameList.isEmpty()) { + consumerBuilder = consumerBuilder.topics(topicNameList); + } else if (topicsPattern != null) { + consumerBuilder = consumerBuilder.topicsPattern(topicsPattern); + } else { + consumerBuilder = consumerBuilder.topic(cycleTopicUri); + } + + consumer = consumerBuilder.subscribe(); + + String consumerMetricsPrefix = getPulsarAPIMetricsPrefix( + PulsarActivityUtil.PULSAR_API_TYPE.PRODUCER.label, + consumerName, + consumerTopicListString); + + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "totalBytesRecvd", + consumerSafeExtractMetric(consumer, (s -> s.getTotalBytesReceived() + s.getNumBytesReceived()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "totalMsgsRecvd", + consumerSafeExtractMetric(consumer, (s -> s.getTotalMsgsReceived() + s.getNumMsgsReceived()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "totalRecvdFailed", + consumerSafeExtractMetric(consumer, (s -> s.getTotalReceivedFailed() + s.getNumReceiveFailed()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "totalAcksSent", + consumerSafeExtractMetric(consumer,(s -> s.getTotalAcksSent() + s.getNumAcksSent()))); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "recvdBytesRate", + consumerSafeExtractMetric(consumer, ConsumerStats::getRateBytesReceived)); + ActivityMetrics.gauge(activityDef, + consumerMetricsPrefix + "recvdMsgsRate", + consumerSafeExtractMetric(consumer, ConsumerStats::getRateMsgsReceived)); + + } catch (PulsarClientException ple) { + ple.printStackTrace(); + throw new RuntimeException("Unable to create a Pulsar consumer!"); + } + + consumers.put(consumerCacheKey, consumer); + } + + return consumer; + } + // + ////////////////////////////////////// + // Multi-topic Consumer Processing <-- end + ////////////////////////////////////// + + ////////////////////////////////////// // Reader Processing --> Start ////////////////////////////////////// @@ -534,7 +669,7 @@ public class PulsarSpace { return globalReaderTopicName; } - throw new RuntimeException("Reader topic name must be set at either global level or cycle level!"); + throw new RuntimeException("Reader:: Reader topic name must be set at either global level or cycle level!"); } private String getEffectiveReaderName(String cycleReaderName) { @@ -568,35 +703,32 @@ public class PulsarSpace { String cycleStartMsgPos) { String topicName = getEffectiveReaderTopicName(cycleTopicName); - if (StringUtils.isBlank(topicName)) { - throw new RuntimeException("Reader:: must specify a topic name either at the global level or the cycle level"); - } - String readerName = getEffectiveReaderName(cycleReaderName); - String startMsgPosStr = getEffectiveStartMsgPosStr(cycleStartMsgPos); if (!PulsarActivityUtil.isValideReaderStartPosition(startMsgPosStr)) { - throw new RuntimeException("Reader:: Invalid value for Reader start message position!"); + throw new RuntimeException("Reader:: Invalid value for reader start message position!"); } - String readerCacheKey = buildCacheKey(topicName, readerName, startMsgPosStr); + String readerCacheKey = PulsarActivityUtil.buildCacheKey(topicName, readerName, startMsgPosStr); Reader reader = readers.get(readerCacheKey); if (reader == null) { PulsarClient pulsarClient = getPulsarClient(); Map readerConf = pulsarNBClientConf.getReaderConfMap(); - readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.toString(), topicName); - if (!StringUtils.isBlank(readerName)) { - readerConf.put(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.toString(), readerName); - } - - // "reader.startMessagePos" is NOT a standard Pulsar reader conf + // Remove global level settings: "topicName" and "readerName" + readerConf.remove(PulsarActivityUtil.READER_CONF_STD_KEY.topicName.label); + readerConf.remove(PulsarActivityUtil.READER_CONF_STD_KEY.readerName.label); + // Remove non-standard reader configuration properties readerConf.remove(PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label); try { - ReaderBuilder readerBuilder = pulsarClient.newReader(pulsarSchema).loadConf(readerConf); + ReaderBuilder readerBuilder = pulsarClient. + newReader(pulsarSchema). + loadConf(readerConf). + topic(topicName). + readerName(readerName); MessageId startMsgId = MessageId.latest; if (startMsgPosStr.equalsIgnoreCase(PulsarActivityUtil.READER_MSG_POSITION_TYPE.earliest.label)) { @@ -607,11 +739,8 @@ public class PulsarSpace { // startMsgId = MessageId.latest; //} - if (startMsgId != null) { - readerBuilder = readerBuilder.startMessageId(startMsgId); - } + reader = readerBuilder.startMessageId(startMsgId).create(); - reader = readerBuilder.create(); } catch (PulsarClientException ple) { ple.printStackTrace(); throw new RuntimeException("Unable to create a Pulsar reader!"); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java index 262384032..62ab366ed 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpaceCache.java @@ -20,20 +20,17 @@ public class PulsarSpaceCache { this.activity = pulsarActivity; } - public PulsarSpace getPulsarSpace(String name) { - return clientScopes.computeIfAbsent(name, spaceName -> - new PulsarSpace( - spaceName, - activity.getPulsarConf(), - activity.getPulsarSvcUrl(), - activity.getWebSvcUrl(), - activity.getPulsarAdmin(), - activity.getActivityDef(), - activity.getCreateTransactionTimer() - )); + public Iterable getAssociatedPulsarSpace() { + return clientScopes.values(); } - public PulsarActivity getActivity() { + public PulsarActivity getAssociatedPulsarActivity() { return activity; } + + public PulsarSpace getPulsarSpace(String name) { + return clientScopes.computeIfAbsent(name, spaceName -> new PulsarSpace(spaceName, activity)); + } + + public PulsarActivity getActivity() { return activity; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java index ba0cf5140..612f84aff 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; @@ -21,9 +22,10 @@ public abstract class PulsarAdminMapper extends PulsarOpMapper { protected PulsarAdminMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, LongFunction adminDelOpFunc) { - super(cmdTpl, clientSpace, asyncApiFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc); this.adminDelOpFunc = adminDelOpFunc; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminNamespaceMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminNamespaceMapper.java index c95293072..28872671b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminNamespaceMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminNamespaceMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; @@ -21,11 +22,12 @@ public class PulsarAdminNamespaceMapper extends PulsarAdminMapper { public PulsarAdminNamespaceMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, LongFunction adminDelOpFunc, LongFunction namespaceFunc) { - super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, adminDelOpFunc); this.namespaceFunc = namespaceFunc; } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java index 49273c7ae..ee1f68620 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminOp.java @@ -1,17 +1,8 @@ package io.nosqlbench.driver.pulsar.ops; import io.nosqlbench.driver.pulsar.PulsarSpace; -import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.admin.Namespaces; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.admin.Tenants; -import org.apache.pulsar.common.policies.data.TenantInfo; - -import java.util.Set; -import java.util.concurrent.CompletableFuture; public abstract class PulsarAdminOp extends SyncPulsarOp { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantMapper.java index 78cc441e5..177333184 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; @@ -23,13 +24,14 @@ public class PulsarAdminTenantMapper extends PulsarAdminMapper { public PulsarAdminTenantMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, LongFunction adminDelOpFunc, LongFunction> adminRolesFunc, LongFunction> allowedClustersFunc, LongFunction tenantFunc) { - super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, adminDelOpFunc); this.adminRolesFunc = adminRolesFunc; this.allowedClustersFunc = allowedClustersFunc; this.tenantFunc = tenantFunc; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java index 6f0102320..77ad3c8fe 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTenantOp.java @@ -8,8 +8,7 @@ import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Tenants; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; - +import org.apache.pulsar.common.policies.data.TenantInfo; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -45,15 +44,10 @@ public class PulsarAdminTenantOp extends PulsarAdminOp { // Admin API - create tenants and namespaces if (!adminDelOp) { - - TenantInfoImpl tenantInfo = TenantInfoImpl.builder().build(); - tenantInfo.setAdminRoles(adminRoleSet); - - if ( !allowedClusterSet.isEmpty() ) { - tenantInfo.setAllowedClusters(allowedClusterSet); - } else { - tenantInfo.setAllowedClusters(clientSpace.getPulsarClusterMetadata()); - } + TenantInfo tenantInfo = TenantInfo.builder() + .adminRoles(adminRoleSet) + .allowedClusters(!allowedClusterSet.isEmpty() ? allowedClusterSet : clientSpace.getPulsarClusterMetadata()) + .build(); try { if (!asyncApi) { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTopicMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTopicMapper.java index c9b2512ba..ee906502c 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTopicMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarAdminTopicMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.commons.lang3.BooleanUtils; @@ -24,13 +25,14 @@ public class PulsarAdminTopicMapper extends PulsarAdminMapper { public PulsarAdminTopicMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, LongFunction adminDelOpFunc, LongFunction topicUriFunc, LongFunction enablePartionFunc, LongFunction partitionNumFunc) { - super(cmdTpl, clientSpace, asyncApiFunc, adminDelOpFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, adminDelOpFunc); this.topicUriFunc = topicUriFunc; this.enablePartionFunc = enablePartionFunc; this.partitionNumFunc = partitionNumFunc; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java index 94ccf27c4..2463ec14d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; @@ -9,9 +10,10 @@ public class PulsarBatchProducerEndMapper extends PulsarOpMapper { public PulsarBatchProducerEndMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc) { - super(cmdTpl, clientSpace, asyncApiFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc); } @Override diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java index 7d2fe51b0..9aba10001 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerEndOp.java @@ -24,7 +24,8 @@ public class PulsarBatchProducerEndOp extends SyncPulsarOp { container.clear(); PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.set(null); - } else { + } + else { throw new BasicError("You tried to end an empty batch message container. This means you" + " did initiate the batch container properly, or there is an error in your" + " pulsar op sequencing and ratios."); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java index 6bbc3a8c6..13605134a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java @@ -1,22 +1,35 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Producer; +import java.util.HashMap; +import java.util.Map; import java.util.function.LongFunction; public class PulsarBatchProducerMapper extends PulsarOpMapper { + + private final static Logger logger = LogManager.getLogger(PulsarBatchProducerMapper.class); + private final LongFunction keyFunc; + private final LongFunction propFunc; private final LongFunction payloadFunc; public PulsarBatchProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, LongFunction keyFunc, + LongFunction propFunc, LongFunction payloadFunc) { - super(cmdTpl, clientSpace, asyncApiFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc); this.keyFunc = keyFunc; + this.propFunc = propFunc; this.payloadFunc = payloadFunc; } @@ -25,9 +38,24 @@ public class PulsarBatchProducerMapper extends PulsarOpMapper { String msgKey = keyFunc.apply(value); String msgPayload = payloadFunc.apply(value); + // Check if msgPropJonStr is valid JSON string with a collection of key/value pairs + // - if Yes, convert it to a map + // - otherwise, log an error message and ignore message properties without throwing a runtime exception + Map msgProperties = new HashMap<>(); + String msgPropJsonStr = propFunc.apply(value); + try { + msgProperties = PulsarActivityUtil.convertJsonToMap(msgPropJsonStr); + } + catch (Exception e) { + logger.error( + "PulsarProducerMapper:: Error parsing message property JSON string {}, ignore message properties!", + msgPropJsonStr); + } + return new PulsarBatchProducerOp( clientSpace.getPulsarSchema(), msgKey, + msgProperties, msgPayload ); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java index 95877493f..8b93c6ffa 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerOp.java @@ -12,23 +12,26 @@ import org.apache.pulsar.common.schema.SchemaType; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; public class PulsarBatchProducerOp extends SyncPulsarOp { private final Schema pulsarSchema; private final String msgKey; + private final Map msgProperties; private final String msgPayload; public PulsarBatchProducerOp(Schema schema, String key, + Map msgProperties, String payload) { this.pulsarSchema = schema; this.msgKey = key; + this.msgProperties = msgProperties; this.msgPayload = payload; } - @Override public void run() { if ((msgPayload == null) || msgPayload.isEmpty()) { @@ -43,6 +46,9 @@ public class PulsarBatchProducerOp extends SyncPulsarOp { if ((msgKey != null) && (!msgKey.isEmpty())) { typedMessageBuilder = typedMessageBuilder.key(msgKey); } + if (!msgProperties.isEmpty()) { + typedMessageBuilder = typedMessageBuilder.properties(msgProperties); + } SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java index a1a2405a1..40f0428b3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerStartMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; @@ -12,9 +13,10 @@ public class PulsarBatchProducerStartMapper extends PulsarOpMapper { public PulsarBatchProducerStartMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, LongFunction> batchProducerFunc) { - super(cmdTpl, clientSpace, asyncApiFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc); this.batchProducerFunc = batchProducerFunc; } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java index 46cc6e2f2..6b610bef4 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerMapper.java @@ -3,8 +3,11 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; @@ -22,30 +25,25 @@ import java.util.function.Supplier; * * For additional parameterization, the command template is also provided. */ -public class PulsarConsumerMapper extends PulsarOpMapper { +public class PulsarConsumerMapper extends PulsarTransactOpMapper { + + private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class); + private final LongFunction> consumerFunc; - private final Counter bytesCounter; - private final Histogram messagesizeHistogram; - private final LongFunction useTransactionFunc; - private final LongFunction> transactionSupplierFunc; - private final Timer transactionCommitTimer; + private final boolean e2eMsProc; public PulsarConsumerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, - LongFunction> consumerFunc, - Counter bytesCounter, - Histogram messagesizeHistogram, - Timer transactionCommitTimer, LongFunction useTransactionFunc, - LongFunction> transactionSupplierFunc) { - super(cmdTpl, clientSpace, asyncApiFunc); + LongFunction seqTrackingFunc, + LongFunction> transactionSupplierFunc, + LongFunction> consumerFunc, + boolean e2eMsgProc) { + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc); this.consumerFunc = consumerFunc; - this.bytesCounter = bytesCounter; - this.messagesizeHistogram = messagesizeHistogram; - this.transactionCommitTimer = transactionCommitTimer; - this.useTransactionFunc = useTransactionFunc; - this.transactionSupplierFunc = transactionSupplierFunc; + this.e2eMsProc = e2eMsgProc; } @Override @@ -53,18 +51,19 @@ public class PulsarConsumerMapper extends PulsarOpMapper { Consumer consumer = consumerFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); boolean useTransaction = useTransactionFunc.apply(value); + boolean seqTracking = seqTrackingFunc.apply(value); Supplier transactionSupplier = transactionSupplierFunc.apply(value); return new PulsarConsumerOp( + pulsarActivity, + asyncApi, + useTransaction, + seqTracking, + transactionSupplier, consumer, clientSpace.getPulsarSchema(), - asyncApi, clientSpace.getPulsarClientConf().getConsumerTimeoutSeconds(), - bytesCounter, - messagesizeHistogram, - useTransaction, - transactionSupplier, - transactionCommitTimer - ); + value, + e2eMsProc); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index e8d5138aa..ac994edfc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar.ops; import com.codahale.metrics.Counter; import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; @@ -11,106 +12,260 @@ import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.schema.SchemaType; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.function.LongFunction; import java.util.function.Supplier; -public class PulsarConsumerOp extends SyncPulsarOp { +public class PulsarConsumerOp implements PulsarOp { private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class); + private final PulsarActivity pulsarActivity; + + private final boolean asyncPulsarOp; + private final boolean useTransaction; + private final boolean seqTracking; + private final Supplier transactionSupplier; + private final Consumer consumer; private final Schema pulsarSchema; - private final boolean asyncPulsarOp; private final int timeoutSeconds; + private final boolean e2eMsgProc; + private final long curCycleNum; + + private long curMsgSeqId; + private long prevMsgSeqid; + private final Counter bytesCounter; - private final Histogram messagesizeHistogram; - private final boolean useTransaction; - private final Supplier transactionSupplier; + private final Histogram messageSizeHistogram; private final Timer transactionCommitTimer; - public PulsarConsumerOp(Consumer consumer, Schema schema, boolean asyncPulsarOp, int timeoutSeconds, - Counter bytesCounter, - Histogram messagesizeHistogram, - boolean useTransaction, - Supplier transactionSupplier, - Timer transactionCommitTimer) { + // keep track of end-to-end message latency + private final Histogram e2eMsgProcLatencyHistogram; + + public PulsarConsumerOp( + PulsarActivity pulsarActivity, + boolean asyncPulsarOp, + boolean useTransaction, + boolean seqTracking, + Supplier transactionSupplier, + Consumer consumer, + Schema schema, + int timeoutSeconds, + long curCycleNum, + boolean e2eMsgProc) + { + this.pulsarActivity = pulsarActivity; + + this.asyncPulsarOp = asyncPulsarOp; + this.useTransaction = useTransaction; + this.seqTracking = seqTracking; + this.transactionSupplier = transactionSupplier; + this.consumer = consumer; this.pulsarSchema = schema; - this.asyncPulsarOp = asyncPulsarOp; this.timeoutSeconds = timeoutSeconds; - this.bytesCounter = bytesCounter; - this.messagesizeHistogram = messagesizeHistogram; - this.useTransaction = useTransaction; - this.transactionSupplier = transactionSupplier; - this.transactionCommitTimer = transactionCommitTimer; - } + this.curCycleNum = curCycleNum; + this.e2eMsgProc = e2eMsgProc; - public void syncConsume() { - try { - Message message; - if (timeoutSeconds <= 0) { - // wait forever - message = consumer.receive(); - } else { - // we cannot use Consumer#receive(timeout, timeunit) due to - // https://github.com/apache/pulsar/issues/9921 - message = consumer - .receiveAsync() - .get(timeoutSeconds, TimeUnit.SECONDS); - } + this.curMsgSeqId = 0; + this.prevMsgSeqid = 0; - SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); - if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - if (logger.isDebugEnabled()) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + this.bytesCounter = pulsarActivity.getBytesCounter(); + this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram(); + this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer(); - org.apache.avro.Schema avroSchema = - AvroUtil.GetSchema_ApacheAvro(avroDefStr); - - org.apache.avro.generic.GenericRecord avroGenericRecord = - AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); - - logger.debug("msg-key={} msg-payload={}", message.getKey(), avroGenericRecord.toString()); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("msg-key={} msg-payload={}", message.getKey(), new String(message.getData())); - } - } - int messagesize = message.getData().length; - bytesCounter.inc(messagesize); - messagesizeHistogram.update(messagesize); - - - if (useTransaction) { - Transaction transaction = transactionSupplier.get(); - consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); - - // little problem: here we are counting the "commit" time - // inside the overall time spent for the execution of the consume operation - // we should refactor this operation as for PulsarProducerOp, and use the passed callback - // to track with precision the time spent for the operation and for the commit - try (Timer.Context ctx = transactionCommitTimer.time()) { - transaction.commit().get(); - } - } else{ - consumer.acknowledge(message.getMessageId()); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void asyncConsume() { - //TODO: add support for async consume + this.e2eMsgProcLatencyHistogram = pulsarActivity.getE2eMsgProcLatencyHistogram(); } @Override - public void run() { - if (!asyncPulsarOp) - syncConsume(); - else - asyncConsume(); + public void run(Runnable timeTracker) { + + final Transaction transaction; + if (useTransaction) { + // if you are in a transaction you cannot set the schema per-message + transaction = transactionSupplier.get(); + } + else { + transaction = null; + } + + if (!asyncPulsarOp) { + Message message; + + try { + if (timeoutSeconds <= 0) { + // wait forever + message = consumer.receive(); + } + else { + // we cannot use Consumer#receive(timeout, timeunit) due to + // https://github.com/apache/pulsar/issues/9921 + message = consumer + .receiveAsync() + .get(timeoutSeconds, TimeUnit.SECONDS); + } + + if (logger.isDebugEnabled()) { + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + org.apache.avro.Schema avroSchema = + AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); + + logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}", + message.getKey(), + message.getProperties(), + avroGenericRecord.toString()); + } + else { + logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}", + message.getKey(), + message.getProperties(), + new String(message.getData())); + } + } + + // keep track end-to-end message processing latency + long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime(); + if (e2eMsgProc) { + e2eMsgProcLatencyHistogram.update(e2eMsgLatency); + } + + // keep track of message ordering and message loss + if (seqTracking) { + String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID); + curMsgSeqId = Long.parseLong(msgSeqIdStr); + + // normal case: message sequence id is monotonically increasing by 1 + if ((curMsgSeqId - prevMsgSeqid) == 1) { + prevMsgSeqid = curMsgSeqId; + } + else { + // abnormal case: out of ordering + if (curMsgSeqId < prevMsgSeqid) { + throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!"); + } + // abnormal case: message loss + else if ( (curMsgSeqId - prevMsgSeqid) > 1 ) { + throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!"); + } + } + } + + int messageSize = message.getData().length; + bytesCounter.inc(messageSize); + messageSizeHistogram.update(messageSize); + + if (useTransaction) { + consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); + + // little problem: here we are counting the "commit" time + // inside the overall time spent for the execution of the consume operation + // we should refactor this operation as for PulsarProducerOp, and use the passed callback + // to track with precision the time spent for the operation and for the commit + try (Timer.Context ctx = transactionCommitTimer.time()) { + transaction.commit().get(); + } + } + else { + consumer.acknowledge(message.getMessageId()); + } + + } + catch (Exception e) { + logger.error( + "Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds); + throw new RuntimeException(e); + } + } + else { + try { + CompletableFuture> msgRecvFuture = consumer.receiveAsync(); + if (useTransaction) { + // add commit step + msgRecvFuture = msgRecvFuture.thenCompose(msg -> { + Timer.Context ctx = transactionCommitTimer.time(); + return transaction + .commit() + .whenComplete((m,e) -> ctx.close()) + .thenApply(v-> msg); + } + ); + } + + msgRecvFuture.whenComplete((message, error) -> { + int messageSize = message.getData().length; + bytesCounter.inc(messageSize); + messageSizeHistogram.update(messageSize); + + if (logger.isDebugEnabled()) { + SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); + + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + org.apache.avro.Schema avroSchema = + AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); + + logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})", + message.getKey(), + message.getProperties(), + avroGenericRecord.toString()); + } + else { + logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})", + message.getKey(), + message.getProperties(), + new String(message.getData())); + } + } + + long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime(); + if (e2eMsgProc) { + e2eMsgProcLatencyHistogram.update(e2eMsgLatency); + } + + // keep track of message ordering and message loss + if (seqTracking) { + String msgSeqIdStr = message.getProperties().get(PulsarActivityUtil.MSG_SEQUENCE_ID); + curMsgSeqId = Long.parseLong(msgSeqIdStr); + + // normal case: message sequence id is monotonically increasing by 1 + if ((curMsgSeqId - prevMsgSeqid) == 1) { + prevMsgSeqid = curMsgSeqId; + } else { + // abnormal case: out of ordering + if (curMsgSeqId < prevMsgSeqid) { + throw new RuntimeException("Detected message ordering is not guaranteed. Older messages are received earlier!"); + } + // abnormal case: message loss + else if ((curMsgSeqId - prevMsgSeqid) > 1) { + throw new RuntimeException("Detected message sequence id gap. Some published messages are not received!"); + } + } + } + + if (useTransaction) { + consumer.acknowledgeAsync(message.getMessageId(), transaction); + } + else { + consumer.acknowledgeAsync(message); + } + + timeTracker.run(); + }).exceptionally(ex -> { + pulsarActivity.asyncOperationFailed(ex); + return null; + }); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java index 6139243de..12ce0626a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarOpMapper.java @@ -1,23 +1,29 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.Transaction; import java.util.function.LongFunction; +import java.util.function.Supplier; public abstract class PulsarOpMapper implements LongFunction { protected final CommandTemplate cmdTpl; protected final PulsarSpace clientSpace; + protected final PulsarActivity pulsarActivity; protected final LongFunction asyncApiFunc; public PulsarOpMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc) { this.cmdTpl = cmdTpl; this.clientSpace = clientSpace; + this.pulsarActivity = pulsarActivity; this.asyncApiFunc = asyncApiFunc; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java index 5c9397881..c6e3f2cbf 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerMapper.java @@ -1,14 +1,20 @@ package io.nosqlbench.driver.pulsar.ops; -import com.codahale.metrics.Counter; -import com.codahale.metrics.Histogram; +import com.fasterxml.jackson.databind.ObjectMapper; import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.commons.lang3.RandomUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.transaction.Transaction; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.function.LongFunction; import java.util.function.Supplier; @@ -22,49 +28,98 @@ import java.util.function.Supplier; * * For additional parameterization, the command template is also provided. */ -public class PulsarProducerMapper extends PulsarOpMapper { +public class PulsarProducerMapper extends PulsarTransactOpMapper { + + private final static Logger logger = LogManager.getLogger(PulsarProducerMapper.class); + private final LongFunction> producerFunc; + private final LongFunction seqErrSimuTypeFunc; private final LongFunction keyFunc; + private final LongFunction propFunc; private final LongFunction payloadFunc; - private final PulsarActivity pulsarActivity; - private final LongFunction useTransactionFunc; - private final LongFunction> transactionSupplierFunc; public PulsarProducerMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, - LongFunction> producerFunc, - LongFunction keyFunc, - LongFunction payloadFunc, LongFunction useTransactionFunc, + LongFunction seqTrackingFunc, LongFunction> transactionSupplierFunc, - PulsarActivity pulsarActivity) { - super(cmdTpl, clientSpace, asyncApiFunc); + LongFunction> producerFunc, + LongFunction seqErrSimuTypeFunc, + LongFunction keyFunc, + LongFunction propFunc, + LongFunction payloadFunc) { + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc, useTransactionFunc, seqTrackingFunc, transactionSupplierFunc); + this.producerFunc = producerFunc; + this.seqErrSimuTypeFunc = seqErrSimuTypeFunc; this.keyFunc = keyFunc; + this.propFunc = propFunc; this.payloadFunc = payloadFunc; - this.pulsarActivity = pulsarActivity; - this.useTransactionFunc = useTransactionFunc; - this.transactionSupplierFunc = transactionSupplierFunc; } @Override public PulsarOp apply(long value) { - Producer producer = producerFunc.apply(value); boolean asyncApi = asyncApiFunc.apply(value); + boolean useTransaction = useTransactionFunc.apply(value); + boolean seqTracking = seqTrackingFunc.apply(value); + Supplier transactionSupplier = transactionSupplierFunc.apply(value); + + Producer producer = producerFunc.apply(value); + + // Simulate error 10% of the time + float rndVal = RandomUtils.nextFloat(0, 1.0f); + boolean simulationError = (rndVal > 0) && (rndVal < 0.1f); + String seqErrSimuType = seqErrSimuTypeFunc.apply(value); + boolean simulateMsgOutofOrder = simulationError && + !StringUtils.isBlank(seqErrSimuType) && + StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.OutOfOrder.label); + boolean simulateMsgLoss = simulationError && + !StringUtils.isBlank(seqErrSimuType) && + StringUtils.equalsIgnoreCase(seqErrSimuType, PulsarActivityUtil.SEQ_ERROR_SIMU_TYPE.DataLoss.label); + String msgKey = keyFunc.apply(value); String msgPayload = payloadFunc.apply(value); - boolean useTransaction = useTransactionFunc.apply(value); - Supplier transactionSupplier = transactionSupplierFunc.apply(value); + + // Check if msgPropJonStr is valid JSON string with a collection of key/value pairs + // - if Yes, convert it to a map + // - otherwise, log an error message and ignore message properties without throwing a runtime exception + Map msgProperties = new HashMap<>(); + String msgPropJsonStr = propFunc.apply(value); + if (!StringUtils.isBlank(msgPropJsonStr)) { + try { + msgProperties = PulsarActivityUtil.convertJsonToMap(msgPropJsonStr); + + } catch (Exception e) { + logger.error( + "Error parsing message property JSON string {}, ignore message properties!", + msgPropJsonStr); + } + } + + // Set message sequence tracking property + if (seqTracking) { + if (!simulateMsgOutofOrder) { + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value)); + } + else { + int rndmOffset = 2; + if (value > rndmOffset) + msgProperties.put(PulsarActivityUtil.MSG_SEQUENCE_ID, String.valueOf(value-rndmOffset)); + } + } + return new PulsarProducerOp( - producer, - clientSpace.getPulsarSchema(), + pulsarActivity, asyncApi, useTransaction, transactionSupplier, + producer, + clientSpace.getPulsarSchema(), msgKey, + msgProperties, msgPayload, - pulsarActivity - ); + simulateMsgLoss); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index c72102813..47a9e3381 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -6,6 +6,7 @@ import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; @@ -15,6 +16,7 @@ import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.common.schema.SchemaType; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; @@ -23,57 +25,87 @@ public class PulsarProducerOp implements PulsarOp { private final static Logger logger = LogManager.getLogger(PulsarProducerOp.class); - private final Producer producer; - private final Schema pulsarSchema; - private final String msgKey; - private final String msgPayload; - private final boolean asyncPulsarOp; - private final Counter bytesCounter; - private final Histogram messagesizeHistogram; private final PulsarActivity pulsarActivity; + + private final boolean asyncPulsarOp; private final boolean useTransaction; private final Supplier transactionSupplier; - public PulsarProducerOp(Producer producer, - Schema schema, - boolean asyncPulsarOp, - boolean useTransaction, - Supplier transactionSupplier, - String key, - String payload, - PulsarActivity pulsarActivity) { + private final Producer producer; + private final Schema pulsarSchema; + private final String msgKey; + private final Map msgProperties; + private final String msgPayload; + private final boolean simulateMsgLoss; + + private final Counter bytesCounter; + private final Histogram messageSizeHistogram; + private final Timer transactionCommitTimer; + + public PulsarProducerOp( PulsarActivity pulsarActivity, + boolean asyncPulsarOp, + boolean useTransaction, + Supplier transactionSupplier, + Producer producer, + Schema schema, + String key, + Map msgProperties, + String payload, + boolean simulateMsgLoss) { + this.pulsarActivity = pulsarActivity; + + this.asyncPulsarOp = asyncPulsarOp; + this.useTransaction = useTransaction; + this.transactionSupplier = transactionSupplier; + this.producer = producer; this.pulsarSchema = schema; this.msgKey = key; + this.msgProperties = msgProperties; this.msgPayload = payload; - this.asyncPulsarOp = asyncPulsarOp; - this.pulsarActivity = pulsarActivity; + this.simulateMsgLoss = simulateMsgLoss; + this.bytesCounter = pulsarActivity.getBytesCounter(); - this.messagesizeHistogram = pulsarActivity.getMessagesizeHistogram(); - this.useTransaction = useTransaction; - this.transactionSupplier = transactionSupplier; + this.messageSizeHistogram = pulsarActivity.getMessageSizeHistogram(); + this.transactionCommitTimer = pulsarActivity.getCommitTransactionTimer(); } @Override public void run(Runnable timeTracker) { + // Skip this cycle (without sending messages) if we're doing message loss simulation + if (simulateMsgLoss) { + return; + } + if ((msgPayload == null) || msgPayload.isEmpty()) { throw new RuntimeException("Message payload (\"msg-value\") can't be empty!"); } + TypedMessageBuilder typedMessageBuilder; + final Transaction transaction; if (useTransaction) { // if you are in a transaction you cannot set the schema per-message transaction = transactionSupplier.get(); typedMessageBuilder = producer.newMessage(transaction); - } else { + } + else { transaction = null; typedMessageBuilder = producer.newMessage(pulsarSchema); } - if ((msgKey != null) && (!msgKey.isEmpty())) { + + // set message key + if (!StringUtils.isBlank(msgKey)) { typedMessageBuilder = typedMessageBuilder.key(msgKey); } - int messagesize; + // set message properties + if ( !msgProperties.isEmpty() ) { + typedMessageBuilder = typedMessageBuilder.properties(msgProperties); + } + + // set message payload + int messageSize; SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( @@ -83,55 +115,110 @@ public class PulsarProducerOp implements PulsarOp { ); typedMessageBuilder = typedMessageBuilder.value(payload); // TODO: add a way to calculate the message size for AVRO messages - messagesize = msgPayload.length(); + messageSize = msgPayload.length(); } else { byte[] array = msgPayload.getBytes(StandardCharsets.UTF_8); typedMessageBuilder = typedMessageBuilder.value(array); - messagesize = array.length; + messageSize = array.length; } - messagesizeHistogram.update(messagesize); - bytesCounter.inc(messagesize); + messageSizeHistogram.update(messageSize); + bytesCounter.inc(messageSize); //TODO: add error handling with failed message production if (!asyncPulsarOp) { try { - logger.trace("sending message"); + logger.trace("Sending message"); typedMessageBuilder.send(); + if (useTransaction) { - try (Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();) { + try (Timer.Context ctx = transactionCommitTimer.time()) { transaction.commit().get(); } } - } catch (PulsarClientException | ExecutionException | InterruptedException pce) { - logger.trace("failed sending message"); + + if (logger.isDebugEnabled()) { + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + org.apache.avro.Schema avroSchema = + AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); + + logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={})", + msgKey, + msgProperties, + avroGenericRecord.toString()); + } + else { + logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={}", + msgKey, + msgProperties, + msgPayload); + } + } + } + catch (PulsarClientException | ExecutionException | InterruptedException pce) { + logger.trace( + "Sync message sending failed: " + + "key - " + msgKey + "; " + + "properties - " + msgProperties + "; " + + "payload - " + msgPayload); throw new RuntimeException(pce); } + timeTracker.run(); - } else { + } + else { try { // we rely on blockIfQueueIsFull in order to throttle the request in this case CompletableFuture future = typedMessageBuilder.sendAsync(); + if (useTransaction) { // add commit step future = future.thenCompose(msg -> { - Timer.Context ctx = pulsarActivity.getCommitTransactionTimer().time();; + Timer.Context ctx = transactionCommitTimer.time(); return transaction .commit() - .whenComplete((m,e) -> { - ctx.close(); - }) + .whenComplete((m,e) -> ctx.close()) .thenApply(v-> msg); } ); } + future.whenComplete((messageId, error) -> { + if (logger.isDebugEnabled()) { + if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + org.apache.avro.Schema avroSchema = + AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.generic.GenericRecord avroGenericRecord = + AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); + + logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})", + msgKey, + msgProperties, + avroGenericRecord.toString()); + } + else { + logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}", + msgKey, + msgProperties, + msgPayload); + } + } + timeTracker.run(); }).exceptionally(ex -> { - logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload); + logger.error("Async message sending failed: " + + "key - " + msgKey + "; " + + "properties - " + msgProperties + "; " + + "payload - " + msgPayload); + pulsarActivity.asyncOperationFailed(ex); return null; }); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException(e); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java index 6d8d45ced..4ebbc965f 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.ops; +import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Reader; @@ -13,10 +14,11 @@ public class PulsarReaderMapper extends PulsarOpMapper { public PulsarReaderMapper(CommandTemplate cmdTpl, PulsarSpace clientSpace, + PulsarActivity pulsarActivity, LongFunction asyncApiFunc, LongFunction> readerFunc) { - super(cmdTpl, clientSpace, asyncApiFunc); + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc); this.readerFunc = readerFunc; } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarTransactOpMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarTransactOpMapper.java new file mode 100644 index 000000000..343ca9a7a --- /dev/null +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarTransactOpMapper.java @@ -0,0 +1,29 @@ +package io.nosqlbench.driver.pulsar.ops; + +import io.nosqlbench.driver.pulsar.PulsarActivity; +import io.nosqlbench.driver.pulsar.PulsarSpace; +import io.nosqlbench.engine.api.templating.CommandTemplate; +import org.apache.pulsar.client.api.transaction.Transaction; + +import java.util.function.LongFunction; +import java.util.function.Supplier; + +public abstract class PulsarTransactOpMapper extends PulsarOpMapper { + protected final LongFunction useTransactionFunc; + protected final LongFunction seqTrackingFunc; + protected final LongFunction> transactionSupplierFunc; + + public PulsarTransactOpMapper(CommandTemplate cmdTpl, + PulsarSpace clientSpace, + PulsarActivity pulsarActivity, + LongFunction asyncApiFunc, + LongFunction useTransactionFunc, + LongFunction seqTrackingFunc, + LongFunction> transactionSupplierFunc) + { + super(cmdTpl, clientSpace, pulsarActivity, asyncApiFunc); + this.useTransactionFunc = useTransactionFunc; + this.seqTrackingFunc = seqTrackingFunc; + this.transactionSupplierFunc = transactionSupplierFunc; + } +} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java index 44b206132..730ff14dc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/ReadyPulsarOp.java @@ -21,7 +21,9 @@ import java.util.function.LongFunction; import java.util.function.Supplier; public class ReadyPulsarOp implements OpDispenser { + private final static Logger logger = LogManager.getLogger(ReadyPulsarOp.class); + private final OpTemplate opTpl; private final CommandTemplate cmdTpl; private final PulsarSpace clientSpace; @@ -71,7 +73,7 @@ public class ReadyPulsarOp implements OpDispenser { throw new RuntimeException("topic_url is not valid. Perhaps you mean topic_uri ?"); } - // Global parameter: topic_uri + // Doc-level parameter: topic_uri LongFunction topicUriFunc = (l) -> null; if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) { if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label)) { @@ -80,8 +82,9 @@ public class ReadyPulsarOp implements OpDispenser { topicUriFunc = (l) -> cmdTpl.getDynamic(PulsarActivityUtil.DOC_LEVEL_PARAMS.TOPIC_URI.label, l); } } + logger.info("topic_uri: {}", topicUriFunc.apply(0)); - // Global parameter: async_api + // Doc-level parameter: async_api LongFunction asyncApiFunc = (l) -> false; if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ASYNC_API.label)) { @@ -93,6 +96,7 @@ public class ReadyPulsarOp implements OpDispenser { } logger.info("async_api: {}", asyncApiFunc.apply(0)); + // Doc-level parameter: async_api LongFunction useTransactionFunc = (l) -> false; if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) { if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.USE_TRANSACTION.label)) { @@ -104,7 +108,7 @@ public class ReadyPulsarOp implements OpDispenser { } logger.info("use_transaction: {}", useTransactionFunc.apply(0)); - // Global parameter: admin_delop + // Doc-level parameter: admin_delop LongFunction adminDelOpFunc = (l) -> false; if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label)) { if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label)) @@ -112,27 +116,70 @@ public class ReadyPulsarOp implements OpDispenser { else throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.ADMIN_DELOP.label + "\" parameter cannot be dynamic!"); } + logger.info("admin_delop: {}", adminDelOpFunc.apply(0)); + + // Doc-level parameter: seq_tracking + LongFunction seqTrackingFunc = (l) -> false; + if (cmdTpl.containsKey(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label)) { + if (cmdTpl.isStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label)) + seqTrackingFunc = (l) -> BooleanUtils.toBoolean(cmdTpl.getStatic(PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label)); + else + throw new RuntimeException("\"" + PulsarActivityUtil.DOC_LEVEL_PARAMS.SEQ_TRACKING.label + "\" parameter cannot be dynamic!"); + } + logger.info("seq_tracking: {}", seqTrackingFunc.apply(0)); + // TODO: Complete implementation for websocket-producer and managed-ledger + // Admin operation: create/delete tenant if ( StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TENANT.label) ) { return resolveAdminTenant(clientSpace, asyncApiFunc, adminDelOpFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_NAMESPACE.label)) { + } + // Admin operation: create/delete namespace + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_NAMESPACE.label)) { return resolveAdminNamespace(clientSpace, asyncApiFunc, adminDelOpFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TOPIC.label)) { + } + // Admin operation: create/delete topic + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.ADMIN_TOPIC.label)) { return resolveAdminTopic(clientSpace, topicUriFunc, asyncApiFunc, adminDelOpFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) { - return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { - return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) { + } + // Regular/non-admin operation: single message sending (producer) + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_SEND.label)) { + return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc); + } + // Regular/non-admin operation: single message consuming from a single topic (consumer) + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_CONSUME.label)) { + return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, false); + } + // Regular/non-admin operation: single message consuming from multiple-topics (consumer) + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_MULTI_CONSUME.label)) { + return resolveMultiTopicMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc); + } + // Regular/non-admin operation: single message consuming a single topic (reader) + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.MSG_READ.label)) { return resolveMsgRead(clientSpace, topicUriFunc, asyncApiFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) { + } + // Regular/non-admin operation: batch message processing - batch start + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_START.label)) { return resolveMsgBatchSendStart(clientSpace, topicUriFunc, asyncApiFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) { + } + // Regular/non-admin operation: batch message processing - message sending (producer) + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND.label)) { return resolveMsgBatchSend(clientSpace, asyncApiFunc); - } else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) { + } + // Regular/non-admin operation: batch message processing - batch send + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.BATCH_MSG_SEND_END.label)) { return resolveMsgBatchSendEnd(clientSpace, asyncApiFunc); - } else { + } + // Regular/non-admin operation: end-to-end message processing - sending message + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_SEND.label)) { + return resolveMsgSend(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc); + } + // Regular/non-admin operation: end-to-end message processing - consuming message + else if (StringUtils.equalsIgnoreCase(stmtOpType, PulsarActivityUtil.OP_TYPES.E2E_MSG_PROC_CONSUME.label)) { + return resolveMsgConsume(clientSpace, topicUriFunc, asyncApiFunc, useTransactionFunc, seqTrackingFunc, true); + } + // Invalid operation type + else { throw new RuntimeException("Unsupported Pulsar operation type"); } } @@ -184,6 +231,7 @@ public class ReadyPulsarOp implements OpDispenser { return new PulsarAdminTenantMapper( cmdTpl, clientSpace, + pulsarActivity, asyncApiFunc, adminDelOpFunc, adminRolesFunc, @@ -209,6 +257,7 @@ public class ReadyPulsarOp implements OpDispenser { return new PulsarAdminNamespaceMapper( cmdTpl, clientSpace, + pulsarActivity, asyncApiFunc, adminDelOpFunc, namespaceFunc); @@ -238,6 +287,7 @@ public class ReadyPulsarOp implements OpDispenser { return new PulsarAdminTopicMapper( cmdTpl, clientSpace, + pulsarActivity, asyncApiFunc, adminDelOpFunc, topic_uri_fun, @@ -249,8 +299,12 @@ public class ReadyPulsarOp implements OpDispenser { PulsarSpace clientSpace, LongFunction topic_uri_func, LongFunction async_api_func, - LongFunction useTransactionFunc + LongFunction useTransactionFunc, + LongFunction seqTrackingFunc ) { + LongFunction> transactionSupplierFunc = + (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? + LongFunction cycle_producer_name_func; if (cmdTpl.isStatic("producer_name")) { cycle_producer_name_func = (l) -> cmdTpl.getStatic("producer_name"); @@ -263,9 +317,19 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction> producerFunc = (l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_producer_name_func.apply(l)); - LongFunction> transactionSupplierFunc = - (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? + // check if we're going to simulate producer message out-of-sequence error + // - message ordering + // - message loss + LongFunction seqErrSimuTypeFunc = (l) -> null; + if (cmdTpl.containsKey("seqerr_simu")) { + if (cmdTpl.isStatic("seqerr_simu")) { + seqErrSimuTypeFunc = (l) -> cmdTpl.getStatic("seqerr_simu"); + } else { + throw new RuntimeException("\"seqerr_simu\" parameter cannot be dynamic!"); + } + } + // message key LongFunction keyFunc; if (cmdTpl.isStatic("msg_key")) { keyFunc = (l) -> cmdTpl.getStatic("msg_key"); @@ -275,6 +339,16 @@ public class ReadyPulsarOp implements OpDispenser { keyFunc = (l) -> null; } + // message property + LongFunction propFunc; + if (cmdTpl.isStatic("msg_property")) { + propFunc = (l) -> cmdTpl.getStatic("msg_property"); + } else if (cmdTpl.isDynamic("msg_property")) { + propFunc = (l) -> cmdTpl.getDynamic("msg_property", l); + } else { + propFunc = (l) -> null; + } + LongFunction valueFunc; if (cmdTpl.containsKey("msg_value")) { if (cmdTpl.isStatic("msg_value")) { @@ -291,20 +365,82 @@ public class ReadyPulsarOp implements OpDispenser { return new PulsarProducerMapper( cmdTpl, clientSpace, + pulsarActivity, async_api_func, - producerFunc, - keyFunc, - valueFunc, useTransactionFunc, + seqTrackingFunc, transactionSupplierFunc, - pulsarActivity); + producerFunc, + seqErrSimuTypeFunc, + keyFunc, + propFunc, + valueFunc); } private LongFunction resolveMsgConsume( PulsarSpace clientSpace, LongFunction topic_uri_func, LongFunction async_api_func, - LongFunction useTransactionFunc + LongFunction useTransactionFunc, + LongFunction seqTrackingFunc, + boolean e2eMsgProc + ) { + LongFunction subscription_name_func; + if (cmdTpl.isStatic("subscription_name")) { + subscription_name_func = (l) -> cmdTpl.getStatic("subscription_name"); + } else if (cmdTpl.isDynamic("subscription_name")) { + subscription_name_func = (l) -> cmdTpl.getDynamic("subscription_name", l); + } else { + subscription_name_func = (l) -> null; + } + + LongFunction subscription_type_func; + if (cmdTpl.isStatic("subscription_type")) { + subscription_type_func = (l) -> cmdTpl.getStatic("subscription_type"); + } else if (cmdTpl.isDynamic("subscription_type")) { + subscription_type_func = (l) -> cmdTpl.getDynamic("subscription_type", l); + } else { + subscription_type_func = (l) -> null; + } + + LongFunction consumer_name_func; + if (cmdTpl.isStatic("consumer_name")) { + consumer_name_func = (l) -> cmdTpl.getStatic("consumer_name"); + } else if (cmdTpl.isDynamic("consumer_name")) { + consumer_name_func = (l) -> cmdTpl.getDynamic("consumer_name", l); + } else { + consumer_name_func = (l) -> null; + } + + LongFunction> transactionSupplierFunc = + (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? + + LongFunction> consumerFunc = (l) -> + clientSpace.getConsumer( + topic_uri_func.apply(l), + subscription_name_func.apply(l), + subscription_type_func.apply(l), + consumer_name_func.apply(l) + ); + + return new PulsarConsumerMapper( + cmdTpl, + clientSpace, + pulsarActivity, + async_api_func, + useTransactionFunc, + seqTrackingFunc, + transactionSupplierFunc, + consumerFunc, + e2eMsgProc); + } + + private LongFunction resolveMultiTopicMsgConsume( + PulsarSpace clientSpace, + LongFunction topic_uri_func, + LongFunction async_api_func, + LongFunction useTransactionFunc, + LongFunction seqTrackingFunc ) { // Topic list (multi-topic) LongFunction topic_names_func; @@ -356,8 +492,8 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction> transactionSupplierFunc = (l) -> clientSpace.getTransactionSupplier(); //TODO make it dependant on current cycle? - LongFunction> consumerFunc = (l) -> - clientSpace.getConsumer( + LongFunction> mtConsumerFunc = (l) -> + clientSpace.getMultiTopicConsumer( topic_uri_func.apply(l), topic_names_func.apply(l), topics_pattern_func.apply(l), @@ -366,9 +502,16 @@ public class ReadyPulsarOp implements OpDispenser { consumer_name_func.apply(l) ); - return new PulsarConsumerMapper(cmdTpl, clientSpace, async_api_func, consumerFunc, - pulsarActivity.getBytesCounter(), pulsarActivity.getMessagesizeHistogram(), pulsarActivity.getCommitTransactionTimer(), - useTransactionFunc, transactionSupplierFunc); + return new PulsarConsumerMapper( + cmdTpl, + clientSpace, + pulsarActivity, + async_api_func, + useTransactionFunc, + seqTrackingFunc, + transactionSupplierFunc, + mtConsumerFunc, + false); } private LongFunction resolveMsgRead( @@ -401,7 +544,12 @@ public class ReadyPulsarOp implements OpDispenser { start_msg_pos_str_func.apply(l) ); - return new PulsarReaderMapper(cmdTpl, clientSpace, async_api_func, readerFunc); + return new PulsarReaderMapper( + cmdTpl, + clientSpace, + pulsarActivity, + async_api_func, + readerFunc); } private LongFunction resolveMsgBatchSendStart( @@ -421,7 +569,12 @@ public class ReadyPulsarOp implements OpDispenser { LongFunction> batchProducerFunc = (l) -> clientSpace.getProducer(topic_uri_func.apply(l), cycle_batch_producer_name_func.apply(l)); - return new PulsarBatchProducerStartMapper(cmdTpl, clientSpace, asyncApiFunc, batchProducerFunc); + return new PulsarBatchProducerStartMapper( + cmdTpl, + clientSpace, + pulsarActivity, + asyncApiFunc, + batchProducerFunc); } private LongFunction resolveMsgBatchSend(PulsarSpace clientSpace, @@ -436,6 +589,16 @@ public class ReadyPulsarOp implements OpDispenser { keyFunc = (l) -> null; } + // message property + LongFunction propFunc; + if (cmdTpl.isStatic("msg_property")) { + propFunc = (l) -> cmdTpl.getStatic("msg_property"); + } else if (cmdTpl.isDynamic("msg_property")) { + propFunc = (l) -> cmdTpl.getDynamic("msg_property", l); + } else { + propFunc = (l) -> null; + } + LongFunction valueFunc; if (cmdTpl.containsKey("msg_value")) { if (cmdTpl.isStatic("msg_value")) { @@ -452,14 +615,20 @@ public class ReadyPulsarOp implements OpDispenser { return new PulsarBatchProducerMapper( cmdTpl, clientSpace, + pulsarActivity, asyncApiFunc, keyFunc, + propFunc, valueFunc); } private LongFunction resolveMsgBatchSendEnd(PulsarSpace clientSpace, LongFunction asyncApiFunc) { - return new PulsarBatchProducerEndMapper(cmdTpl, clientSpace, asyncApiFunc); + return new PulsarBatchProducerEndMapper( + cmdTpl, + clientSpace, + pulsarActivity, + asyncApiFunc); } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index 6af629860..40c468474 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -1,5 +1,6 @@ package io.nosqlbench.driver.pulsar.util; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,8 +13,10 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Arrays; -import java.util.HashMap; +import java.util.Base64; +import java.util.Map; import java.util.stream.Collectors; +import java.util.stream.Stream; public class PulsarActivityUtil { @@ -25,12 +28,15 @@ public class PulsarActivityUtil { ADMIN_TENANT("admin-tenant"), ADMIN_NAMESPACE("admin-namespace"), ADMIN_TOPIC("admin-topic"), + E2E_MSG_PROC_SEND("ec2-msg-proc-send"), + E2E_MSG_PROC_CONSUME("ec2-msg-proc-consume"), BATCH_MSG_SEND_START("batch-msg-send-start"), BATCH_MSG_SEND("batch-msg-send"), BATCH_MSG_SEND_END("batch-msg-send-end"), MSG_SEND("msg-send"), MSG_CONSUME("msg-consume"), - MSG_READ("msg-read"); + MSG_READ("msg-read"), + MSG_MULTI_CONSUME("msg-mt-consume"); public final String label; @@ -42,11 +48,17 @@ public class PulsarActivityUtil { return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type)); } + public static final String MSG_SEQUENCE_ID = "sequence_id"; + public static final String MSG_SEQUENCE_TGTMAX = "sequence_tgtmax"; + + /////// + // Valid document level parameters for Pulsar NB yaml file public enum DOC_LEVEL_PARAMS { TOPIC_URI("topic_uri"), ASYNC_API("async_api"), USE_TRANSACTION("use_transaction"), - ADMIN_DELOP("admin_delop"); + ADMIN_DELOP("admin_delop"), + SEQ_TRACKING("seq_tracking"); public final String label; @@ -55,9 +67,28 @@ public class PulsarActivityUtil { } } public static boolean isValidDocLevelParam(String param) { - return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(param)); + return Arrays.stream(DOC_LEVEL_PARAMS.values()).anyMatch(t -> t.label.equals(param)); } + /////// + // Valid Pulsar API type + public enum PULSAR_API_TYPE { + PRODUCER("producer"), + CONSUMER("consumer"), + READER("reader"); + + public final String label; + + PULSAR_API_TYPE(String label) { + this.label = label; + } + } + public static boolean isValidPulsarApiType(String param) { + return Arrays.stream(PULSAR_API_TYPE.values()).anyMatch(t -> t.label.equals(param)); + } + public static String getValidPulsarApiTypeList() { + return Arrays.stream(PULSAR_API_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } /////// // Valid persistence type @@ -75,7 +106,6 @@ public class PulsarActivityUtil { return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch(t -> t.label.equals(type)); } - /////// // Valid Pulsar client configuration (activity-level settings) // - https://pulsar.apache.org/docs/en/client-libraries-java/#client @@ -171,11 +201,29 @@ public class PulsarActivityUtil { this.label = label; } } - public static boolean isStandardConsumerConfItem(String item) { return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); } + /////// + // Custom consumer configuration (activity-level settings) + // - NOT part of https://pulsar.apache.org/docs/en/client-libraries-java/#consumer + // - NB Pulsar driver consumer operation specific + public enum CONSUMER_CONF_CUSTOM_KEY { + timeout("timeout"); + + public final String label; + + CONSUMER_CONF_CUSTOM_KEY(String label) { + this.label = label; + } + } + public static boolean isCustomConsumerConfItem(String item) { + return Arrays.stream(CONSUMER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item)); + } + + /////// + // Pulsar subscription type public enum SUBSCRIPTION_TYPE { Exclusive("Exclusive"), Failover("Failover"), @@ -188,7 +236,6 @@ public class PulsarActivityUtil { this.label = label; } } - public static boolean isValidSubscriptionType(String item) { return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch(t -> t.label.equals(item)); } @@ -220,6 +267,10 @@ public class PulsarActivityUtil { return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item)); } + /////// + // Custom reader configuration (activity-level settings) + // - NOT part of https://pulsar.apache.org/docs/en/client-libraries-java/#reader + // - NB Pulsar driver reader operation specific public enum READER_CONF_CUSTOM_KEY { startMessagePos("startMessagePos"); @@ -229,11 +280,12 @@ public class PulsarActivityUtil { this.label = label; } } - public static boolean isCustomReaderConfItem(String item) { return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item)); } + /////// + // Valid read positions for a Pulsar reader public enum READER_MSG_POSITION_TYPE { earliest("earliest"), latest("latest"), @@ -245,11 +297,29 @@ public class PulsarActivityUtil { this.label = label; } } - public static boolean isValideReaderStartPosition(String item) { return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item)); } + /////// + // Pulsar subscription type + public enum SEQ_ERROR_SIMU_TYPE { + OutOfOrder("out_of_order"), + DataLoss("data_loss"); + + public final String label; + + SEQ_ERROR_SIMU_TYPE(String label) { + this.label = label; + } + } + public static boolean isValidSeqErrSimuType(String item) { + return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).anyMatch(t -> t.label.equals(item)); + } + public static String getValidSeqErrSimuTypeList() { + return Arrays.stream(SEQ_ERROR_SIMU_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", ")); + } + /////// // Valid websocket-producer configuration (activity-level settings) // TODO: to be added @@ -387,5 +457,24 @@ public class PulsarActivityUtil { return schema; } + + /////// + // Generate effective key string + public static String buildCacheKey(String... keyParts) { + // Ignore blank keyPart + String joinedKeyStr = + Stream.of(keyParts) + .filter(s -> !StringUtils.isBlank(s)) + .collect(Collectors.joining(",")); + + return Base64.getEncoder().encodeToString(joinedKeyStr.getBytes()); + } + + /////// + // Convert JSON string to a key/value map + public static Map convertJsonToMap(String jsonStr) throws Exception { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonStr, Map.class); + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java index 0bbabd8c4..5e5da059e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarNBClientConf.java @@ -173,14 +173,16 @@ public class PulsarNBClientConf { } // other producer helper functions ... public String getProducerName() { - Object confValue = getProducerConfValue("producer.producerName"); + Object confValue = getProducerConfValue( + "producer." + PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label); if (confValue == null) return ""; else return confValue.toString(); } public String getProducerTopicName() { - Object confValue = getProducerConfValue("producer.topicName"); + Object confValue = getProducerConfValue( + "producer." + PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName); if (confValue == null) return ""; else @@ -213,48 +215,56 @@ public class PulsarNBClientConf { } // Other consumer helper functions ... public String getConsumerTopicNames() { - Object confValue = getConsumerConfValue("consumer.topicNames"); + Object confValue = getConsumerConfValue( + "consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicNames.label); if (confValue == null) return ""; else return confValue.toString(); } public String getConsumerTopicPattern() { - Object confValue = getConsumerConfValue("consumer.topicsPattern"); + Object confValue = getConsumerConfValue( + "consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.topicsPattern.label); if (confValue == null) return ""; else return confValue.toString(); } - public int getConsumerTimeoutSeconds() { - Object confValue = getConsumerConfValue("consumer.timeout"); - if (confValue == null) - return -1; // infinite - else - return Integer.parseInt(confValue.toString()); - } public String getConsumerSubscriptionName() { - Object confValue = getConsumerConfValue("consumer.subscriptionName"); + Object confValue = getConsumerConfValue( + "consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionName.label); if (confValue == null) return ""; else return confValue.toString(); } public String getConsumerSubscriptionType() { - Object confValue = getConsumerConfValue("consumer.subscriptionType"); + Object confValue = getConsumerConfValue( + "consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.subscriptionType.label); if (confValue == null) return ""; else return confValue.toString(); } public String getConsumerName() { - Object confValue = getConsumerConfValue("consumer.consumerName"); + Object confValue = getConsumerConfValue( + "consumer." + PulsarActivityUtil.CONSUMER_CONF_STD_KEY.consumerName.label); if (confValue == null) return ""; else return confValue.toString(); } - + // NOTE: Below are not a standard Pulsar consumer configuration parameter as + // listed in "https://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer" + // They're custom-made configuration properties for NB pulsar driver consumer. + public int getConsumerTimeoutSeconds() { + Object confValue = getConsumerConfValue( + "consumer." + PulsarActivityUtil.CONSUMER_CONF_CUSTOM_KEY.timeout.label); + if (confValue == null) + return -1; // infinite + else + return Integer.parseInt(confValue.toString()); + } ////////////////// // Get Pulsar reader related config @@ -279,23 +289,29 @@ public class PulsarNBClientConf { else readerConfMap.put(key, value); } - // Other consumer helper functions ... + // Other reader helper functions ... public String getReaderTopicName() { - Object confValue = getReaderConfValue("reader.topicName"); + Object confValue = getReaderConfValue( + "reader." + PulsarActivityUtil.READER_CONF_STD_KEY.topicName.label); if (confValue == null) return ""; else return confValue.toString(); } public String getReaderName() { - Object confValue = getReaderConfValue("reader.readerName"); + Object confValue = getReaderConfValue( + "reader." + PulsarActivityUtil.READER_CONF_STD_KEY.readerName.label); if (confValue == null) return ""; else return confValue.toString(); } + // NOTE: Below are not a standard Pulsar reader configuration parameter as + // listed in "https://pulsar.apache.org/docs/en/client-libraries-java/#reader" + // They're custom-made configuration properties for NB pulsar driver reader. public String getStartMsgPosStr() { - Object confValue = getReaderConfValue("reader.startMessagePos"); + Object confValue = getReaderConfValue( + "reader." + PulsarActivityUtil.READER_CONF_CUSTOM_KEY.startMessagePos.label); if (confValue == null) return ""; else diff --git a/driver-pulsar/src/main/resources/activities/config.properties b/driver-pulsar/src/main/resources/activities/config.properties index 92842b9c2..37be64384 100644 --- a/driver-pulsar/src/main/resources/activities/config.properties +++ b/driver-pulsar/src/main/resources/activities/config.properties @@ -8,6 +8,8 @@ # TODO: as a starting point, only supports the following types # 1) primitive types, including bytearray (byte[]) which is default, for messages without schema # 2) Avro for messages with schema +#schema.type=avro +#schema.definition=file:///Users/yabinmeng/DataStax/MyNoSQLBench/nosqlbench/driver-pulsar/src/main/resources/activities/iot-example.avsc schema.type= schema.definition= diff --git a/driver-pulsar/src/main/resources/activities/pulsar_client_avro.yaml b/driver-pulsar/src/main/resources/activities/pulsar_client_avro.yaml index 6fe4b147e..709cd6256 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar_client_avro.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar_client_avro.yaml @@ -49,6 +49,7 @@ blocks: optype: msg-send # producer_name: {producer_name} msg_key: "{mykey}" + msg_property: "{myprop}" msg_value: | { "SensorID": "{sensor_id}", diff --git a/driver-pulsar/src/main/resources/activities/pulsar_client_kv.yaml b/driver-pulsar/src/main/resources/activities/pulsar_client_kv.yaml index 834efda79..a15a7a58e 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar_client_kv.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar_client_kv.yaml @@ -1,7 +1,10 @@ bindings: - # message key and value + # message key, property and value mykey: + int_prop_val: ToString(); Prefix("IntProp_") + text_prop_val: AlphaNumericString(10); Prefix("TextProp_") myvalue: NumberNameToString() #AlphaNumericString(20) + # tenant, namespace, and core topic name (without tenant and namespace) tenant: Mod(100); Div(10L); ToString(); Prefix("tnt") namespace: Mod(10); Div(5L); ToString(); Prefix("ns") core_topic_name: Mod(5); ToString(); Prefix("t") @@ -25,6 +28,11 @@ blocks: - name: s2 optype: batch-msg-send msg_key: "{mykey}" + msg_property: | + { + "prop1": "{int_prop_val}", + "prop2": "{text_prop_val}}" + } msg_value: "{myvalue}" ratio: 100 - name: s3 @@ -49,8 +57,6 @@ blocks: statements: - name: s1 optype: msg-consume - topic_names: - topics_pattern: subscription_name: "mysub" subscription_type: consumer_name: @@ -64,6 +70,19 @@ blocks: optype: msg-read reader_name: + - name: multi-topic-consumer-block + tags: + phase: multi-topic-consumer + admin_task: false + statements: + - name: s1 + optype: msg-mt-consume + topic_names: + topics_pattern: + subscription_name: "mysub" + subscription_type: + consumer_name: + # - websocket-producer: # tags: # type: websocket-produer diff --git a/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_e2e.yaml b/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_e2e.yaml new file mode 100644 index 000000000..50ac9dd4a --- /dev/null +++ b/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_e2e.yaml @@ -0,0 +1,30 @@ +bindings: + # message key, property and value + myprop1: AlphaNumericString(10); Prefix("PropVal_") + myvalue: NumberNameToString() #AlphaNumericString(20) + +# document level parameters that apply to all Pulsar client types: +params: + topic_uri: "persistent://public/default/sanity_e2e_2" + async_api: "true" + +blocks: + - name: e2e-msg-proc-block + tags: + phase: e2e-msg-proc + admin_task: false + statements: + - name: s1 + optype: ec2-msg-proc-send + msg_key: + msg_property: | + { + "prop1": "{myprop1}" + } + msg_value: "{myvalue}" + ratio: 1 + - name: s2 + optype: ec2-msg-proc-consume + ratio: 1 + subscription_name: "mysub" + subscription_type: diff --git a/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml b/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml new file mode 100644 index 000000000..7eccdd68e --- /dev/null +++ b/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml @@ -0,0 +1,36 @@ +bindings: + # message key, property and value + myprop1: AlphaNumericString(10) + myvalue: NumberNameToString() + +# document level parameters that apply to all Pulsar client types: +params: + topic_uri: "persistent://public/default/sanity_seqloss2" + # Only applicable to producer and consumer + # - used for message ordering and message loss check + seq_tracking: "false" + +blocks: + - name: producer-block + tags: + phase: producer + admin_task: false + statements: + - name: s1 + optype: msg-send + #seqerr_simu: "out_of_order" + seqerr_simu: "data_loass" + msg_key: + msg_property: + msg_value: "{myvalue}" + + - name: consumer-block + tags: + phase: consumer + admin_task: false + statements: + - name: s1 + optype: msg-consume + subscription_name: "mysub" + subscription_type: + consumer_name: