mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-01-11 08:22:04 -06:00
Add Pulsar client, producer, and consumer configuration support
This commit is contained in:
parent
b1fe01e591
commit
e6491433ae
@ -64,12 +64,19 @@
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/javax.jms/javax.jms-api -->
|
||||
<!-- <dependency>-->
|
||||
<!-- <groupId>javax.jms</groupId>-->
|
||||
<!-- <artifactId>javax.jms-api</artifactId>-->
|
||||
<!-- <version>2.0.1</version>-->
|
||||
<!-- </dependency>-->
|
||||
<!-- https://mvnrepository.com/artifact/commons-beanutils/commons-beanutils -->
|
||||
<dependency>
|
||||
<groupId>commons-beanutils</groupId>
|
||||
<artifactId>commons-beanutils</artifactId>
|
||||
<version>1.9.4</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-configuration2 -->
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-configuration2</artifactId>
|
||||
<version>2.7</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<artifactId>pulsar-jms</artifactId>
|
||||
|
@ -7,22 +7,19 @@ import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
|
||||
import io.nosqlbench.driver.jms.conn.JmsConnInfo;
|
||||
import io.nosqlbench.driver.jms.conn.JmsPulsarConnInfo;
|
||||
import io.nosqlbench.driver.jms.ops.JmsOp;
|
||||
import io.nosqlbench.driver.jms.util.JmsHeader;
|
||||
import io.nosqlbench.driver.jms.util.JmsUtil;
|
||||
import io.nosqlbench.driver.jms.util.PulsarConfig;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
@ -53,33 +50,54 @@ public class JmsActivity extends SimpleActivity {
|
||||
super.initActivity();
|
||||
|
||||
// default JMS type: Pulsar
|
||||
// - currently this is the only supported JMS provider
|
||||
jmsProviderType =
|
||||
activityDef.getParams()
|
||||
.getOptionalString(JmsUtil.JMS_PROVIDER_TYPE_KEY_STR)
|
||||
.orElse(JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label);
|
||||
|
||||
// "Pulsar" as the JMS provider
|
||||
if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
|
||||
jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, activityDef);
|
||||
}
|
||||
|
||||
PulsarConnectionFactory factory;
|
||||
if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
|
||||
Map<String, Object> configuration = new HashMap<>();
|
||||
configuration.put("webServiceUrl", ((JmsPulsarConnInfo)jmsConnInfo).getWebSvcUrl());
|
||||
configuration.put("brokerServiceUrl",((JmsPulsarConnInfo)jmsConnInfo).getPulsarSvcUrl());
|
||||
String webSvcUrl =
|
||||
activityDef.getParams()
|
||||
.getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR)
|
||||
.orElse("http://localhost:8080");
|
||||
String pulsarSvcUrl =
|
||||
activityDef.getParams()
|
||||
.getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR)
|
||||
.orElse("pulsar://localhost:6650");
|
||||
|
||||
try {
|
||||
factory = new PulsarConnectionFactory(configuration);
|
||||
this.jmsContext = factory.createContext();
|
||||
} catch (JMSException e) {
|
||||
throw new RuntimeException(
|
||||
"Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!");
|
||||
if (StringUtils.isAnyBlank(webSvcUrl, pulsarSvcUrl)) {
|
||||
throw new RuntimeException("For \"" + JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label + "\" type, " +
|
||||
"\"" + JmsUtil.JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR + "\" and " +
|
||||
"\"" + JmsUtil.JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR + "\" parameters are manadatory!");
|
||||
}
|
||||
|
||||
// Check if extra Pulsar config. file is in place
|
||||
// - default file: "pulsar_config.properties" under the current directory
|
||||
String pulsarCfgFile =
|
||||
activityDef.getParams()
|
||||
.getOptionalString(JmsUtil.JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR)
|
||||
.orElse(JmsUtil.JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME);
|
||||
|
||||
PulsarConfig pulsarConfig = new PulsarConfig(pulsarCfgFile);
|
||||
|
||||
jmsConnInfo = new JmsPulsarConnInfo(jmsProviderType, webSvcUrl, pulsarSvcUrl, pulsarConfig);
|
||||
}
|
||||
else {
|
||||
throw new RuntimeException("Unsupported JMS driver type : " + jmsProviderType);
|
||||
}
|
||||
|
||||
PulsarConnectionFactory factory;
|
||||
try {
|
||||
factory = new PulsarConnectionFactory(jmsConnInfo.getJmsConnConfig());
|
||||
this.jmsContext = factory.createContext();
|
||||
} catch (JMSException e) {
|
||||
throw new RuntimeException(
|
||||
"Unable to initialize JMS connection factory (driver type: " + jmsProviderType + ")!");
|
||||
}
|
||||
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
|
||||
|
@ -1,10 +1,21 @@
|
||||
package io.nosqlbench.driver.jms.conn;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class JmsConnInfo {
|
||||
|
||||
private final String jmsProviderType;
|
||||
protected final String jmsProviderType;
|
||||
protected final Map<String, Object> jmsConnConfig;
|
||||
|
||||
public JmsConnInfo(String jmsProviderType) {
|
||||
protected JmsConnInfo(String jmsProviderType) {
|
||||
this.jmsProviderType = jmsProviderType;
|
||||
this.jmsConnConfig = new HashMap<>();
|
||||
}
|
||||
|
||||
public Map<String, Object> getJmsConnConfig() { return this.jmsConnConfig; }
|
||||
public void resetJmsConnConfig() { this.jmsConnConfig.clear(); }
|
||||
public void addJmsConnConfigItems(Map<String, Object> cfgItems) { this.jmsConnConfig.putAll(cfgItems); }
|
||||
public void addJmsConnConfigItem(String key, Object value) { this.jmsConnConfig.put(key, value); }
|
||||
public void removeJmsConnConfigItem(String key) { this.jmsConnConfig.remove(key); }
|
||||
}
|
||||
|
@ -1,24 +1,42 @@
|
||||
package io.nosqlbench.driver.jms.conn;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.driver.jms.util.PulsarConfig;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class JmsPulsarConnInfo extends JmsConnInfo {
|
||||
|
||||
private String pulsarSvcUrl;
|
||||
private String webSvcUrl;
|
||||
private final String webSvcUrl;
|
||||
private final String pulsarSvcUrl;
|
||||
private final PulsarConfig extraPulsarConfig;
|
||||
|
||||
public JmsPulsarConnInfo(String jmsProviderType, ActivityDef activityDef) {
|
||||
public JmsPulsarConnInfo(String jmsProviderType, String webSvcUrl, String pulsarSvcUrl, PulsarConfig pulsarConfig) {
|
||||
super(jmsProviderType);
|
||||
|
||||
webSvcUrl =
|
||||
activityDef.getParams().getOptionalString("web_url").orElse("http://localhost:8080");
|
||||
pulsarSvcUrl =
|
||||
activityDef.getParams().getOptionalString("service_url").orElse("pulsar://localhost:6650");
|
||||
this.webSvcUrl = webSvcUrl;
|
||||
this.pulsarSvcUrl = pulsarSvcUrl;
|
||||
this.extraPulsarConfig = pulsarConfig;
|
||||
|
||||
this.addJmsConnConfigItem("webServiceUrl", this.webSvcUrl);
|
||||
this.addJmsConnConfigItem("brokerServiceUrl", this.pulsarSvcUrl);
|
||||
|
||||
Map<String, Object> clientCfgMap = this.extraPulsarConfig.getClientConfMap();
|
||||
if (!clientCfgMap.isEmpty()) {
|
||||
this.addJmsConnConfigItems(clientCfgMap);
|
||||
}
|
||||
|
||||
Map<String, Object> producerCfgMap = this.extraPulsarConfig.getProducerConfMap();
|
||||
if (!producerCfgMap.isEmpty()) {
|
||||
this.addJmsConnConfigItem("producerConfig", producerCfgMap);
|
||||
}
|
||||
|
||||
Map<String, Object> consumerCfgMap = this.extraPulsarConfig.getConsumerConfMap();
|
||||
if (!consumerCfgMap.isEmpty()) {
|
||||
this.addJmsConnConfigItem("consumerConfig", consumerCfgMap);
|
||||
}
|
||||
}
|
||||
|
||||
public void setPulsarSvcUrl(String pulsarSvcUrl) { this.pulsarSvcUrl = pulsarSvcUrl; }
|
||||
public String getPulsarSvcUrl() { return this.pulsarSvcUrl; }
|
||||
|
||||
public void setWebSvcUrl(String webSvcUrl) { this.webSvcUrl = webSvcUrl; }
|
||||
public String getWebSvcUrl() { return this.webSvcUrl; }
|
||||
public String getPulsarSvcUrl() { return this.pulsarSvcUrl; }
|
||||
public PulsarConfig getExtraPulsarConfig() { return this.extraPulsarConfig; }
|
||||
}
|
||||
|
@ -7,9 +7,7 @@ import io.nosqlbench.driver.jms.util.JmsHeader;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSContext;
|
||||
import javax.jms.JMSProducer;
|
||||
import javax.jms.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
@ -69,10 +67,37 @@ public class JmsMsgSendOp extends JmsTimeTrackOp {
|
||||
jmsProducer.setDisableMessageTimestamp(this.jmsHeader.isDisableMsgTimestamp());
|
||||
jmsProducer.setDisableMessageID(this.jmsHeader.isDisableMsgId());
|
||||
|
||||
// TODO: async producer
|
||||
// if (this.asyncJmsOp) {
|
||||
// jmsProducer.setAsync();
|
||||
// }
|
||||
if (this.asyncJmsOp) {
|
||||
jmsProducer.setAsync(new CompletionListener() {
|
||||
@Override
|
||||
public void onCompletion(Message msg) {
|
||||
try {
|
||||
byte[] msgBody = msg.getBody(byte[].class);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Async message send success - message body: " + new String(msgBody));
|
||||
}
|
||||
}
|
||||
catch (JMSException jmsException) {
|
||||
jmsException.printStackTrace();
|
||||
logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Message msg, Exception e) {
|
||||
try {
|
||||
byte[] msgBody = msg.getBody(byte[].class);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Async message send failure - message body: " + new String(msgBody));
|
||||
}
|
||||
}
|
||||
catch (JMSException jmsException) {
|
||||
jmsException.printStackTrace();
|
||||
logger.warn("Unexpected error when parsing message body: " + jmsException.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Object> entry : jmsMsgProperties.entrySet()) {
|
||||
jmsProducer.setProperty(entry.getKey(), entry.getValue());
|
||||
|
@ -11,8 +11,35 @@ public class JmsUtil {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(JmsUtil.class);
|
||||
|
||||
// Supported JMS provider type
|
||||
public enum JMS_PROVIDER_TYPES {
|
||||
PULSAR("pulsar");
|
||||
|
||||
public final String label;
|
||||
JMS_PROVIDER_TYPES(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidJmsProviderType(String type) {
|
||||
return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||
}
|
||||
|
||||
/////
|
||||
// NB command line parameters
|
||||
// - JMS provider type
|
||||
public final static String JMS_PROVIDER_TYPE_KEY_STR = "provider_type";
|
||||
|
||||
/// Only applicable when the provider is "Pulsar"
|
||||
// - Pulsar configuration properties file
|
||||
public final static String JMS_PULSAR_PROVIDER_CFG_FILE_KEY_STR = "pulsar_cfg_file";
|
||||
public final static String JMS_PULSAR_PROVIDER_DFT_CFG_FILE_NAME = "pulsar_config.properties";
|
||||
// - Pulsar web url
|
||||
public final static String JMS_PULSAR_PROVIDER_WEB_URL_KEY_STR = "web_url";
|
||||
// - Pulsar service url
|
||||
public final static String JMS_PULSAR_PROVIDER_SVC_URL_KEY_STR = "service_url";
|
||||
|
||||
|
||||
public final static String ASYNC_API_KEY_STR = "async_api";
|
||||
public final static String JMS_PROVIDER_TYPE_KEY_STR = "jms_provider_type";
|
||||
public final static String JMS_DESTINATION_TYPE_KEY_STR = "jms_desitation_type";
|
||||
|
||||
///// JMS Producer
|
||||
@ -62,19 +89,6 @@ public class JmsUtil {
|
||||
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||
}
|
||||
|
||||
// Supported JMS provider type
|
||||
public enum JMS_PROVIDER_TYPES {
|
||||
PULSAR("pulsar");
|
||||
|
||||
public final String label;
|
||||
JMS_PROVIDER_TYPES(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
}
|
||||
public static boolean isValidJmsProviderType(String type) {
|
||||
return Arrays.stream(JMS_PROVIDER_TYPES.values()).anyMatch(t -> t.label.equals(type));
|
||||
}
|
||||
|
||||
// JMS Destination Types
|
||||
public enum JMS_DESTINATION_TYPES {
|
||||
QUEUE("queue"),
|
||||
|
@ -0,0 +1,99 @@
|
||||
package io.nosqlbench.driver.jms.util;
|
||||
|
||||
import org.apache.commons.configuration2.Configuration;
|
||||
import org.apache.commons.configuration2.FileBasedConfiguration;
|
||||
import org.apache.commons.configuration2.PropertiesConfiguration;
|
||||
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
||||
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
public class PulsarConfig {
|
||||
private final static Logger logger = LogManager.getLogger(PulsarConfig.class);
|
||||
|
||||
public static final String SCHEMA_CONF_PREFIX = "schema";
|
||||
public static final String CLIENT_CONF_PREFIX = "client";
|
||||
public static final String PRODUCER_CONF_PREFIX = "producer";
|
||||
public static final String CONSUMER_CONF_PREFIX = "consumer";
|
||||
|
||||
private final Map<String, Object> schemaConfMap = new HashMap<>();
|
||||
private final Map<String, Object> clientConfMap = new HashMap<>();
|
||||
private final Map<String, Object> producerConfMap = new HashMap<>();
|
||||
private final Map<String, Object> consumerConfMap = new HashMap<>();
|
||||
|
||||
public PulsarConfig(String fileName) {
|
||||
File file = new File(fileName);
|
||||
|
||||
try {
|
||||
String canonicalFilePath = file.getCanonicalPath();
|
||||
|
||||
Parameters params = new Parameters();
|
||||
|
||||
FileBasedConfigurationBuilder<FileBasedConfiguration> builder =
|
||||
new FileBasedConfigurationBuilder<FileBasedConfiguration>(PropertiesConfiguration.class)
|
||||
.configure(params.properties()
|
||||
.setFileName(fileName));
|
||||
|
||||
Configuration config = builder.getConfiguration();
|
||||
|
||||
// Get schema specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(SCHEMA_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
schemaConfMap.put(confKey.substring(SCHEMA_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get client connection specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(CLIENT_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
clientConfMap.put(confKey.substring(CLIENT_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get producer specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(PRODUCER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
producerConfMap.put(confKey.substring(PRODUCER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
|
||||
// Get consumer specific configuration settings
|
||||
for (Iterator<String> it = config.getKeys(CONSUMER_CONF_PREFIX); it.hasNext(); ) {
|
||||
String confKey = it.next();
|
||||
String confVal = config.getProperty(confKey).toString();
|
||||
if (!StringUtils.isBlank(confVal))
|
||||
consumerConfMap.put(confKey.substring(CONSUMER_CONF_PREFIX.length() + 1), config.getProperty(confKey));
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
logger.error("Can't read the specified config properties file: " + fileName);
|
||||
ioe.printStackTrace();
|
||||
} catch (ConfigurationException cex) {
|
||||
logger.error("Error loading configuration items from the specified config properties file: " + fileName);
|
||||
cex.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Object> getSchemaConfMap() {
|
||||
return this.schemaConfMap;
|
||||
}
|
||||
public Map<String, Object> getClientConfMap() {
|
||||
return this.clientConfMap;
|
||||
}
|
||||
public Map<String, Object> getProducerConfMap() {
|
||||
return this.producerConfMap;
|
||||
}
|
||||
public Map<String, Object> getConsumerConfMap() {
|
||||
return this.consumerConfMap;
|
||||
}
|
||||
}
|
33
driver-jms/src/main/resources/pulsar_config.properties
Normal file
33
driver-jms/src/main/resources/pulsar_config.properties
Normal file
@ -0,0 +1,33 @@
|
||||
### Schema related configurations - schema.xxx
|
||||
# valid types:
|
||||
# - primitive type (https://pulsar.apache.org/docs/en/schema-understand/#primitive-type)
|
||||
# - keyvalue (https://pulsar.apache.org/docs/en/schema-understand/#keyvalue)
|
||||
# - strut (complex type) (https://pulsar.apache.org/docs/en/schema-understand/#struct)
|
||||
# avro, json, protobuf
|
||||
#
|
||||
# NOTE: for JMS client, Pulsar "schema" is NOT supported yet
|
||||
schema.type=
|
||||
schema.definition=
|
||||
|
||||
|
||||
### Pulsar client related configurations - client.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#client
|
||||
client.connectionTimeoutMs=5000
|
||||
#client.authPluginClassName=org.apache.pulsar.client.impl.auth.AuthenticationToken
|
||||
#client.authParams=
|
||||
#client.tlsAllowInsecureConnection=true
|
||||
client.numIoThreads=10
|
||||
client.numListenerThreads=10
|
||||
|
||||
|
||||
### Producer related configurations (global) - producer.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
|
||||
producer.sendTimeoutMs=
|
||||
producer.blockIfQueueFull=true
|
||||
producer.maxPendingMessages=10000
|
||||
producer.batchingMaxMessages=10000
|
||||
|
||||
|
||||
### Consumer related configurations (global) - consumer.xxx
|
||||
# http://pulsar.apache.org/docs/en/client-libraries-java/#configure-consumer
|
||||
consumer.receiverQueueSize=2000
|
@ -7,7 +7,7 @@ bindings:
|
||||
# document level parameters that apply to all Pulsar client types:
|
||||
params:
|
||||
### static only
|
||||
async_api: "false"
|
||||
async_api: "true"
|
||||
|
||||
### Static only
|
||||
# Valid values: queue (point-to-point) or topic (pub-sub)
|
||||
@ -16,7 +16,12 @@ params:
|
||||
### Static Only
|
||||
# NOTE: ONLY relevant when the JMS provider is Pulsar
|
||||
#pulsar_topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
|
||||
pulsar_topic_uri: "persistent://public/default/t0"
|
||||
#pulsar_topic_uri: "persistent://public/default/pt100"
|
||||
#pulsar_topic_uri: "persistent://public/default/t0"
|
||||
pulsar_topic_uri: "persistent://public/default/pt100_10"
|
||||
#pulsar_topic_uri: "persistent://public/default/pt200_10"
|
||||
#pulsar_topic_uri: "persistent://public/default/pt300_10"
|
||||
#pulsar_topic_uri: "persistent://public/default/pt400_10"
|
||||
|
||||
blocks:
|
||||
- name: "producer-block"
|
@ -5,7 +5,6 @@ import org.apache.commons.configuration2.FileBasedConfiguration;
|
||||
import org.apache.commons.configuration2.PropertiesConfiguration;
|
||||
import org.apache.commons.configuration2.builder.FileBasedConfigurationBuilder;
|
||||
import org.apache.commons.configuration2.builder.fluent.Parameters;
|
||||
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler;
|
||||
import org.apache.commons.configuration2.ex.ConfigurationException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
|
Loading…
Reference in New Issue
Block a user