merge fixups

This commit is contained in:
Jonathan Shook 2021-07-20 20:08:03 -05:00
commit 15eedea561
11 changed files with 49 additions and 40 deletions

View File

@ -18,7 +18,7 @@
</description> </description>
<properties> <properties>
<pulsar.version>2.7.1</pulsar.version> <pulsar.version>2.8.0</pulsar.version>
</properties> </properties>
<dependencies> <dependencies>

View File

@ -20,6 +20,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@ -55,6 +56,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
} }
private void initPulsarAdmin() { private void initPulsarAdmin() {
PulsarAdminBuilder adminBuilder = PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder() PulsarAdmin.builder()
.serviceHttpUrl(webSvcUrl); .serviceHttpUrl(webSvcUrl);
@ -97,8 +99,9 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection); adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarAdmin = adminBuilder.build(); pulsarAdmin = adminBuilder.build();
ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData(); // Not supported in Pulsar 2.8.0
logger.debug(configurationData.toString()); // ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData();
// logger.debug(configurationData.toString());
} catch (PulsarClientException e) { } catch (PulsarClientException e) {
logger.error("Fail to create PulsarAdmin from global configuration!"); logger.error("Fail to create PulsarAdmin from global configuration!");

View File

@ -216,7 +216,7 @@ public class PulsarSpace {
public Supplier<Transaction> getTransactionSupplier() { public Supplier<Transaction> getTransactionSupplier() {
PulsarClient pulsarClient = getPulsarClient(); PulsarClient pulsarClient = getPulsarClient();
return () -> { return () -> {
try (Timer.Context time = createTransactionTimer.time()){ try (Timer.Context time = createTransactionTimer.time() ){
return pulsarClient return pulsarClient
.newTransaction() .newTransaction()
.build() .build()
@ -226,7 +226,7 @@ public class PulsarSpace {
logger.warn("Error while starting a new transaction", err); logger.warn("Error while starting a new transaction", err);
} }
throw new RuntimeException(err); throw new RuntimeException(err);
} catch (NullPointerException err) { // Unfortunately Pulsar 2.7.1 client does not report a better error } catch (PulsarClientException err) {
throw new RuntimeException("Transactions are not enabled on Pulsar Client, " + throw new RuntimeException("Transactions are not enabled on Pulsar Client, " +
"please set client.enableTransaction=true in your Pulsar Client configuration"); "please set client.enableTransaction=true in your Pulsar Client configuration");
} }
@ -273,7 +273,7 @@ public class PulsarSpace {
.replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname .replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname
try { try {
ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema); ProducerBuilder<?> producerBuilder = pulsarClient.newProducer(pulsarSchema);
producerBuilder.loadConf(producerConf); producerBuilder.loadConf(producerConf);
producer = producerBuilder.create(); producer = producerBuilder.create();
producers.put(producerCacheKey, producer); producers.put(producerCacheKey, producer);

View File

@ -8,7 +8,7 @@ import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants; import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -45,7 +45,8 @@ public class PulsarAdminTenantOp extends PulsarAdminOp {
// Admin API - create tenants and namespaces // Admin API - create tenants and namespaces
if (!adminDelOp) { if (!adminDelOp) {
TenantInfo tenantInfo = new TenantInfo();
TenantInfoImpl tenantInfo = TenantInfoImpl.builder().build();
tenantInfo.setAdminRoles(adminRoleSet); tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) { if ( !allowedClusterSet.isEmpty() ) {

View File

@ -64,8 +64,12 @@ public class PulsarConsumerOp extends SyncPulsarOp {
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData()); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("msg-key={} msg-payload={}", message.getKey(), avroGenericRecord.toString()); logger.debug("msg-key={} msg-payload={}", message.getKey(), avroGenericRecord.toString());
} }

View File

@ -30,8 +30,12 @@ public class PulsarReaderOp extends SyncPulsarOp {
message = reader.readNext(); message = reader.readNext();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData()); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString()); System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString());
} else { } else {
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData())); System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData()));

View File

@ -6,6 +6,7 @@ import org.apache.avro.io.BinaryDecoder;
import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder; import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
@ -15,16 +16,17 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
public class AvroUtil { public class AvroUtil {
////////////////////////
// Get an OSS Apache Avro schema from a string definition
public static org.apache.avro.Schema GetSchema_ApacheAvro(String avroSchemDef) { public static org.apache.avro.Schema GetSchema_ApacheAvro(String avroSchemDef) {
return new org.apache.avro.Schema.Parser().parse(avroSchemDef); return new org.apache.avro.Schema.Parser().parse(avroSchemDef);
} }
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, String jsonData) { // Get an OSS Apache Avro schema record from a JSON string that matches a specific OSS Apache Avro schema
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(org.apache.avro.Schema schema, String jsonData) {
org.apache.avro.generic.GenericRecord record = null; org.apache.avro.generic.GenericRecord record = null;
try { try {
org.apache.avro.Schema schema = GetSchema_ApacheAvro(avroSchemDef);
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader; org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
reader = new org.apache.avro.generic.GenericDatumReader<>(schema); reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
@ -38,12 +40,11 @@ public class AvroUtil {
return record; return record;
} }
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, byte[] bytesData) { // Get an OSS Apache Avro schema record from a byte array that matches a specific OSS Apache Avro schema
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(org.apache.avro.Schema schema, byte[] bytesData) {
org.apache.avro.generic.GenericRecord record = null; org.apache.avro.generic.GenericRecord record = null;
try { try {
org.apache.avro.Schema schema = GetSchema_ApacheAvro(avroSchemDef);
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader; org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
reader = new org.apache.avro.generic.GenericDatumReader<>(schema); reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
@ -57,8 +58,11 @@ public class AvroUtil {
return record; return record;
} }
////////////////////////
// Get a Pulsar Avro schema from a string definition
public static GenericAvroSchema GetSchema_PulsarAvro(String schemaName, String avroSchemDef) { public static GenericAvroSchema GetSchema_PulsarAvro(String schemaName, String avroSchemDef) {
SchemaInfo schemaInfo = SchemaInfo.builder() SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.schema(avroSchemDef.getBytes(StandardCharsets.UTF_8)) .schema(avroSchemDef.getBytes(StandardCharsets.UTF_8))
.type(SchemaType.AVRO) .type(SchemaType.AVRO)
.properties(new HashMap<>()) .properties(new HashMap<>())
@ -66,6 +70,8 @@ public class AvroUtil {
.build(); .build();
return new GenericAvroSchema(schemaInfo); return new GenericAvroSchema(schemaInfo);
} }
// Get a Pulsar Avro record from an OSS Avro schema record, matching a specific Pulsar Avro schema
public static GenericRecord GetGenericRecord_PulsarAvro( public static GenericRecord GetGenericRecord_PulsarAvro(
GenericAvroSchema pulsarGenericAvroSchema, GenericAvroSchema pulsarGenericAvroSchema,
org.apache.avro.generic.GenericRecord apacheAvroGenericRecord) org.apache.avro.generic.GenericRecord apacheAvroGenericRecord)
@ -81,14 +87,16 @@ public class AvroUtil {
return recordBuilder.build(); return recordBuilder.build();
} }
public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDef, String jsonData) { // Get a Pulsar Avro record (GenericRecord) from a JSON string that matches a specific Pulsar Avro schema
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData); public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDefStr, String jsonData) {
org.apache.avro.Schema avroSchema = GetSchema_ApacheAvro(avroSchemDefStr);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchema, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord); return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
} }
public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDefStr, String jsonData) {
public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDef, String jsonData) { GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDefStr);
GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDef); org.apache.avro.Schema avroSchema = GetSchema_ApacheAvro(avroSchemDefStr);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData); org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchema, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord); return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
} }

View File

@ -4,9 +4,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
@ -363,8 +360,7 @@ public class PulsarActivityUtil {
/////// ///////
// Complex strut type: Avro or Json // Complex strut type: Avro or Json
public static boolean isAvroSchemaTypeStr(String typeStr) { public static boolean isAvroSchemaTypeStr(String typeStr) {
boolean isAvroType = typeStr.equalsIgnoreCase("AVRO"); return typeStr.equalsIgnoreCase("AVRO");
return isAvroType;
} }
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) { public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
String schemaDefinitionStr = definitionStr; String schemaDefinitionStr = definitionStr;
@ -384,15 +380,7 @@ public class PulsarActivityUtil {
} }
} }
SchemaInfo schemaInfo = SchemaInfo.builder() schema = AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
.schema(schemaDefinitionStr.getBytes(StandardCharsets.UTF_8))
.type(SchemaType.AVRO)
.properties(new HashMap<>())
//TODO: A unique name for each NB run?
.name("NBAvro")
.build();
schema = new GenericAvroSchema(schemaInfo);
} else { } else {
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr); throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
} }

View File

@ -1,6 +1,6 @@
bindings: bindings:
# 100 topics: 10 tenants, 2 namespaces/tenant, 5 topics/namespace # 100 topics: 10 tenants, 2 namespaces/tenant, 5 topics/namespace
tenant: Mod(100); ToString(); Prefix("tnt") tenant: Mod(100); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns") namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t") core_topic_name: Mod(5); ToString(); Prefix("t")

View File

@ -4,13 +4,14 @@ bindings:
sensor_id: ToUUID();ToString(); sensor_id: ToUUID();ToString();
reading_time: ToDateTime(); reading_time: ToDateTime();
reading_value: ToFloat(100); reading_value: ToFloat(100);
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt") tenant: Mod(100); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns") namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t") core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types: # document level parameters that apply to all Pulsar client types:
params: params:
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}" topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
# topic_uri: "persistent://public/default/mytopic"
async_api: "true" async_api: "true"
blocks: blocks:

View File

@ -2,7 +2,7 @@ bindings:
# message key and value # message key and value
mykey: mykey:
myvalue: NumberNameToString() #AlphaNumericString(20) myvalue: NumberNameToString() #AlphaNumericString(20)
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt") tenant: Mod(100); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns") namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t") core_topic_name: Mod(5); ToString(); Prefix("t")