- AuthN and AuthR for Client API

- Pulsar client caching fix
- Other minor fix
This commit is contained in:
Yabin Meng 2021-04-01 21:49:19 -05:00
parent 1c64031b93
commit d5cfdf13da
6 changed files with 84 additions and 53 deletions

View File

@ -84,30 +84,21 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
// String tokenFileName = StringUtils.removeStart(authParams, "file://");
// File tokenFile = new File(tokenFileName);
// String token;
// try {
// token = FileUtils.readFileToString(tokenFile, StandardCharsets.UTF_8);
// token = StringUtils.normalizeSpace(token);
// }
// catch (IOException ioe) {
// throw new RuntimeException("Failed to read the specified (\"client.authParams\") token file: " + tokenFileName + "!");
// }
// adminBuilder.authentication(AuthenticationFactory.token(token));
adminBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
adminBuilder
.useKeyStoreTls(useTls)
.allowTlsInsecureConnection(tlsAllowInsecureConnection)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
adminBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
// Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection"
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarAdmin = adminBuilder.build();
} catch (PulsarClientException e) {

View File

@ -6,6 +6,7 @@ import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -82,14 +83,50 @@ public class PulsarSpace {
try {
Map<String, Object> clientConf = pulsarNBClientConf.getClientConfMap();
// Override "client.serviceUrl" setting in config.properties
clientConf.remove("serviceUrl", pulsarSvcUrl);
pulsarClient = clientBuilder
.loadConf(clientConf)
.serviceUrl(pulsarSvcUrl)
.build();
} catch (PulsarClientException pce) {
// Override "client.serviceUrl" setting in config.properties
clientConf.remove("serviceUrl");
clientBuilder.loadConf(clientConf).serviceUrl(pulsarSvcUrl);
String authPluginClassName =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authPulginClassName.label);
String authParams =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.authParams.label);
String useTlsStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.useTls.label);
boolean useTls = BooleanUtils.toBoolean(useTlsStr);
String tlsTrustCertsFilePath =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsTrustCertsFilePath.label);
String tlsAllowInsecureConnectionStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsAllowInsecureConnection.label);
boolean tlsAllowInsecureConnection = BooleanUtils.toBoolean(tlsAllowInsecureConnectionStr);
String tlsHostnameVerificationEnableStr =
(String) pulsarNBClientConf.getClientConfValue(PulsarActivityUtil.CLNT_CONF_KEY.tlsHostnameVerificationEnable.label);
boolean tlsHostnameVerificationEnable = BooleanUtils.toBoolean(tlsHostnameVerificationEnableStr);
if ( !StringUtils.isAnyBlank(authPluginClassName, authParams) ) {
clientBuilder.authentication(authPluginClassName, authParams);
}
if ( useTls ) {
clientBuilder
.useKeyStoreTls(useTls)
.enableTlsHostnameVerification(tlsHostnameVerificationEnable);
if (!StringUtils.isBlank(tlsTrustCertsFilePath))
clientBuilder.tlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
// Put this outside "if (useTls)" block for easier handling of "tlsAllowInsecureConnection"
clientBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarClient = clientBuilder.build();
}
catch (PulsarClientException pce) {
String errMsg = "Fail to create PulsarClient from global configuration: " + pce.getMessage();
logger.error(errMsg);
throw new RuntimeException(errMsg);
@ -176,14 +213,15 @@ public class PulsarSpace {
}
String encodedStr = PulsarActivityUtil.encode(producerName, topicName);
Producer<?> producer = producers.computeIfAbsent(encodedStr, (pn -> {
Producer<?> producer = producers.get(encodedStr);
if (producer == null) {
PulsarClient pulsarClient = getPulsarClient();
// Get other possible producer settings that are set at global level
Map<String, Object> producerConf = pulsarNBClientConf.getProducerConfMap();
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.topicName.label, topicName);
String producerMetricsPrefix;
if (!StringUtils.isBlank(producerName)) {
producerConf.put(PulsarActivityUtil.PRODUCER_CONF_STD_KEY.producerName.label, producerName);
@ -195,7 +233,6 @@ public class PulsarSpace {
}
producerMetricsPrefix += topicName + "_";
producerMetricsPrefix = producerMetricsPrefix
.replace("persistent://public/default/", "") // default name for tests/demos (in all Pulsar examples) is persistent://public/default/test -> use just the topic name test
.replace("non-persistent://", "") // always remove topic type
@ -203,19 +240,22 @@ public class PulsarSpace {
.replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname
try {
Producer<?> newProducer = pulsarClient.newProducer(pulsarSchema).loadConf(producerConf).create();
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent",safeExtractMetric(newProducer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(newProducer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalsendfailed", safeExtractMetric(newProducer, (s -> s.getTotalSendFailed() + s.getNumSendFailed())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalacksreceived", safeExtractMetric(newProducer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendbytesrate", safeExtractMetric(newProducer, ProducerStats::getSendBytesRate));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendmsgsrate", safeExtractMetric(newProducer, ProducerStats::getSendMsgsRate));
return newProducer;
} catch (PulsarClientException ple) {
ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema);
producerBuilder.loadConf(producerConf);
producer = producerBuilder.create();
producers.put(encodedStr, producer);
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalbytessent", safeExtractMetric(producer, (s -> s.getTotalBytesSent() + s.getNumBytesSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalmsgssent", safeExtractMetric(producer, (s -> s.getTotalMsgsSent() + s.getNumMsgsSent())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalsendfailed", safeExtractMetric(producer, (s -> s.getTotalSendFailed() + s.getNumSendFailed())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "totalacksreceived", safeExtractMetric(producer,(s -> s.getTotalAcksReceived() + s.getNumAcksReceived())));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendbytesrate", safeExtractMetric(producer, ProducerStats::getSendBytesRate));
ActivityMetrics.gauge(activityDef, producerMetricsPrefix + "sendmsgsrate", safeExtractMetric(producer, ProducerStats::getSendMsgsRate));
}
catch (PulsarClientException ple) {
throw new RuntimeException("Unable to create a Pulsar producer!", ple);
}
}));
}
return producer;
}

View File

@ -42,7 +42,7 @@ public class PulsarActivityUtil {
}
}
public static boolean isValidClientType(String type) {
return Arrays.stream(OP_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
return Arrays.stream(OP_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
@ -59,7 +59,7 @@ public class PulsarActivityUtil {
}
}
public static boolean isValidPersistenceType(String type) {
return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch((t) -> t.name().equals(type.toLowerCase()));
return Arrays.stream(PERSISTENT_TYPES.values()).anyMatch(t -> t.label.equals(type));
}
@ -95,7 +95,7 @@ public class PulsarActivityUtil {
}
}
public static boolean isValidClientConfItem(String item) {
return Arrays.stream(CLNT_CONF_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
return Arrays.stream(CLNT_CONF_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
@ -123,7 +123,7 @@ public class PulsarActivityUtil {
}
}
public static boolean isStandardProducerConfItem(String item) {
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
return Arrays.stream(PRODUCER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
///////
@ -160,7 +160,7 @@ public class PulsarActivityUtil {
}
public static boolean isStandardConsumerConfItem(String item) {
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
return Arrays.stream(CONSUMER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
public enum SUBSCRIPTION_TYPE {
@ -177,10 +177,10 @@ public class PulsarActivityUtil {
}
public static boolean isValidSubscriptionType(String item) {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch((t) -> t.name().equals(item));
return Arrays.stream(SUBSCRIPTION_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
public static String getValidSubscriptionTypeList() {
return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(Object::toString).collect(Collectors.joining(", "));
return Arrays.stream(SUBSCRIPTION_TYPE.values()).map(t -> t.label).collect(Collectors.joining(", "));
}
///////
@ -204,7 +204,7 @@ public class PulsarActivityUtil {
}
}
public static boolean isStandardReaderConfItem(String item) {
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
return Arrays.stream(READER_CONF_STD_KEY.values()).anyMatch(t -> t.label.equals(item));
}
public enum READER_CONF_CUSTOM_KEY {
@ -218,7 +218,7 @@ public class PulsarActivityUtil {
}
public static boolean isCustomReaderConfItem(String item) {
return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
return Arrays.stream(READER_CONF_CUSTOM_KEY.values()).anyMatch(t -> t.label.equals(item));
}
public enum READER_MSG_POSITION_TYPE {
@ -234,7 +234,7 @@ public class PulsarActivityUtil {
}
public static boolean isValideReaderStartPosition(String item) {
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch((t) -> t.name().equals(item.toLowerCase()));
return Arrays.stream(READER_MSG_POSITION_TYPE.values()).anyMatch(t -> t.label.equals(item));
}
///////
@ -395,7 +395,7 @@ public class PulsarActivityUtil {
}
String concatenatedStr =
StringUtils.substringAfterLast(stringBuilder.toString(), "::");
StringUtils.substringBeforeLast(stringBuilder.toString(), "::");
return Base64.getEncoder().encodeToString(concatenatedStr.getBytes());
}

View File

@ -93,7 +93,7 @@ public class PulsarNBClientConf {
logger.error("Can't read the specified config properties file!");
ioe.printStackTrace();
} catch (ConfigurationException cex) {
logger.error("Error loading configuration items from the specified config properties file!");
logger.error("Error loading configuration items from the specified config properties file: " + canonicalFilePath);
cex.printStackTrace();
}
}

View File

@ -10,13 +10,13 @@ bindings:
# sensor_type:
reading_time: ToDateTime();
reading_value: ToFloat(100);
# tenant:
tenant: Mod(1000); Div(10L); ToString(); Prefix("tnt")
# document level parameters that apply to all Pulsar client types:
params:
#topic_uri: "persistent://public/default/{topic}"
topic_uri: "persistent://public/default/nbpulsar"
async_api: "false"
topic_uri: "persistent://public/default/nbpulsar2"
async_api: "true"
blocks:
- name: create-tennam-block
@ -38,7 +38,7 @@ blocks:
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
enable_partition: "false"
partition_num: "5"
- name: batch-producer-block

View File

@ -9,14 +9,14 @@ bindings:
mykey: NumberNameToString()
myvalue: AlphaNumericString(20)
# Admin API - create tenant, namespace, and topic
tenant: Mod(1000); Div(10L); ToString(); Prefix("tenant_")
namespace: Mod(10); Div(5L); ToString(); Prefix("default_")
tenant: Mod(1000); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types:
params:
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
async_api: "false"
async_api: "true"
blocks:
- name: create-tennam-block
@ -38,7 +38,7 @@ blocks:
statements:
- name: s1
optype: admin-crt-top
enable_partition: "true"
enable_partition: "false"
partition_num: "5"
- name: batch-producer-block