Update Pulsar API to 2.8.0

This commit is contained in:
Yabin Meng 2021-07-20 13:31:40 -05:00
parent 98ec7718b0
commit c175411cdb
11 changed files with 49 additions and 43 deletions

View File

@ -18,7 +18,7 @@
</description>
<properties>
<pulsar.version>2.7.1</pulsar.version>
<pulsar.version>2.8.0</pulsar.version>
</properties>
<dependencies>

View File

@ -20,6 +20,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.internal.PulsarAdminImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@ -55,6 +56,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
}
private void initPulsarAdmin() {
PulsarAdminBuilder adminBuilder =
PulsarAdmin.builder()
.serviceHttpUrl(webSvcUrl);
@ -97,8 +99,9 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
adminBuilder.allowTlsInsecureConnection(tlsAllowInsecureConnection);
pulsarAdmin = adminBuilder.build();
ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData();
logger.debug(configurationData.toString());
// Not supported in Pulsar 2.8.0
// ClientConfigurationData configurationData = pulsarAdmin.getClientConfigData();
// logger.debug(configurationData.toString());
} catch (PulsarClientException e) {
logger.error("Fail to create PulsarAdmin from global configuration!");

View File

@ -216,7 +216,7 @@ public class PulsarSpace {
public Supplier<Transaction> getTransactionSupplier() {
PulsarClient pulsarClient = getPulsarClient();
return () -> {
try (Timer.Context time = createTransactionTimer.time(); ){
try (Timer.Context time = createTransactionTimer.time() ){
return pulsarClient
.newTransaction()
.build()
@ -226,7 +226,7 @@ public class PulsarSpace {
logger.warn("Error while starting a new transaction", err);
}
throw new RuntimeException(err);
} catch (NullPointerException err) { // Unfortunately Pulsar 2.7.1 client does not report a better error
} catch (PulsarClientException err) {
throw new RuntimeException("Transactions are not enabled on Pulsar Client, " +
"please set client.enableTransaction=true in your Pulsar Client configuration");
}
@ -273,7 +273,7 @@ public class PulsarSpace {
.replace("/","_"); // persistent://tenant/namespace/topicname -> tenant_namespace_topicname
try {
ProducerBuilder producerBuilder = pulsarClient.newProducer(pulsarSchema);
ProducerBuilder<?> producerBuilder = pulsarClient.newProducer(pulsarSchema);
producerBuilder.loadConf(producerConf);
producer = producerBuilder.create();
producers.put(producerCacheKey, producer);

View File

@ -8,7 +8,7 @@ import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@ -45,7 +45,8 @@ public class PulsarAdminTenantOp extends PulsarAdminOp {
// Admin API - create tenants and namespaces
if (!adminDelOp) {
TenantInfo tenantInfo = new TenantInfo();
TenantInfoImpl tenantInfo = TenantInfoImpl.builder().build();
tenantInfo.setAdminRoles(adminRoleSet);
if ( !allowedClusterSet.isEmpty() ) {

View File

@ -64,8 +64,12 @@ public class PulsarConsumerOp extends SyncPulsarOp {
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
if (logger.isDebugEnabled()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("msg-key={} msg-payload={}", message.getKey(), avroGenericRecord.toString());
}

View File

@ -30,8 +30,12 @@ public class PulsarReaderOp extends SyncPulsarOp {
message = reader.readNext();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroDefStr, message.getData());
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + avroGenericRecord.toString());
} else {
System.out.println("msg-key=" + message.getKey() + " msg-payload=" + new String(message.getData()));

View File

@ -6,6 +6,7 @@ import org.apache.avro.io.BinaryDecoder;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
@ -15,16 +16,17 @@ import java.util.HashMap;
import java.util.List;
public class AvroUtil {
////////////////////////
// Get an OSS Apache Avro schema from a string definition
public static org.apache.avro.Schema GetSchema_ApacheAvro(String avroSchemDef) {
return new org.apache.avro.Schema.Parser().parse(avroSchemDef);
}
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, String jsonData) {
// Get an OSS Apache Avro schema record from a JSON string that matches a specific OSS Apache Avro schema
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(org.apache.avro.Schema schema, String jsonData) {
org.apache.avro.generic.GenericRecord record = null;
try {
org.apache.avro.Schema schema = GetSchema_ApacheAvro(avroSchemDef);
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
@ -38,12 +40,11 @@ public class AvroUtil {
return record;
}
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(String avroSchemDef, byte[] bytesData) {
// Get an OSS Apache Avro schema record from a byte array that matches a specific OSS Apache Avro schema
public static org.apache.avro.generic.GenericRecord GetGenericRecord_ApacheAvro(org.apache.avro.Schema schema, byte[] bytesData) {
org.apache.avro.generic.GenericRecord record = null;
try {
org.apache.avro.Schema schema = GetSchema_ApacheAvro(avroSchemDef);
org.apache.avro.generic.GenericDatumReader<org.apache.avro.generic.GenericData.Record> reader;
reader = new org.apache.avro.generic.GenericDatumReader<>(schema);
@ -57,8 +58,11 @@ public class AvroUtil {
return record;
}
////////////////////////
// Get a Pulsar Avro schema from a string definition
public static GenericAvroSchema GetSchema_PulsarAvro(String schemaName, String avroSchemDef) {
SchemaInfo schemaInfo = SchemaInfo.builder()
SchemaInfo schemaInfo = SchemaInfoImpl.builder()
.schema(avroSchemDef.getBytes(StandardCharsets.UTF_8))
.type(SchemaType.AVRO)
.properties(new HashMap<>())
@ -66,6 +70,8 @@ public class AvroUtil {
.build();
return new GenericAvroSchema(schemaInfo);
}
// Get a Pulsar Avro record from an OSS Avro schema record, matching a specific Pulsar Avro schema
public static GenericRecord GetGenericRecord_PulsarAvro(
GenericAvroSchema pulsarGenericAvroSchema,
org.apache.avro.generic.GenericRecord apacheAvroGenericRecord)
@ -81,14 +87,16 @@ public class AvroUtil {
return recordBuilder.build();
}
public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDef, String jsonData) {
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData);
// Get a Pulsar Avro record (GenericRecord) from a JSON string that matches a specific Pulsar Avro schema
public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDefStr, String jsonData) {
org.apache.avro.Schema avroSchema = GetSchema_ApacheAvro(avroSchemDefStr);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchema, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
}
public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDef, String jsonData) {
GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDef);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchemDef, jsonData);
public static GenericRecord GetGenericRecord_PulsarAvro(String schemaName, String avroSchemDefStr, String jsonData) {
GenericAvroSchema genericAvroSchema = GetSchema_PulsarAvro(schemaName, avroSchemDefStr);
org.apache.avro.Schema avroSchema = GetSchema_ApacheAvro(avroSchemDefStr);
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchema, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
}

View File

@ -4,9 +4,6 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.net.URI;
@ -363,11 +360,7 @@ public class PulsarActivityUtil {
///////
// Complex strut type: Avro or Json
public static boolean isAvroSchemaTypeStr(String typeStr) {
boolean isAvroType = false;
if (typeStr.equalsIgnoreCase("AVRO")) {
isAvroType = true;
}
return isAvroType;
return typeStr.equalsIgnoreCase("AVRO");
}
public static Schema<?> getAvroSchema(String typeStr, String definitionStr) {
String schemaDefinitionStr = definitionStr;
@ -387,15 +380,7 @@ public class PulsarActivityUtil {
}
}
SchemaInfo schemaInfo = SchemaInfo.builder()
.schema(schemaDefinitionStr.getBytes(StandardCharsets.UTF_8))
.type(SchemaType.AVRO)
.properties(new HashMap<>())
//TODO: A unique name for each NB run?
.name("NBAvro")
.build();
schema = new GenericAvroSchema(schemaInfo);
schema = AvroUtil.GetSchema_PulsarAvro("NBAvro", schemaDefinitionStr);
} else {
throw new RuntimeException("Trying to create a \"Avro\" schema for a non-Avro schema type string: " + typeStr);
}

View File

@ -1,6 +1,6 @@
bindings:
# 100 topics: 10 tenants, 2 namespaces/tenant, 5 topics/namespace
tenant: Mod(100); ToString(); Prefix("tnt")
tenant: Mod(100); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")

View File

@ -4,13 +4,14 @@ bindings:
sensor_id: ToUUID();ToString();
reading_time: ToDateTime();
reading_value: ToFloat(100);
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
tenant: Mod(100); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")
# document level parameters that apply to all Pulsar client types:
params:
topic_uri: "persistent://{tenant}/{namespace}/{core_topic_name}"
# topic_uri: "persistent://public/default/mytopic"
async_api: "true"
blocks:

View File

@ -2,7 +2,7 @@ bindings:
# message key and value
mykey:
myvalue: NumberNameToString() #AlphaNumericString(20)
tenant: Mod(10000); Div(10L); ToString(); Prefix("tnt")
tenant: Mod(100); Div(10L); ToString(); Prefix("tnt")
namespace: Mod(10); Div(5L); ToString(); Prefix("ns")
core_topic_name: Mod(5); ToString(); Prefix("t")