Pulsar: implement KeyValue schema for PulsarProducerOp

This commit is contained in:
Enrico Olivelli 2022-02-02 17:01:38 +01:00
parent 77e4de0968
commit d940b40b10
7 changed files with 108 additions and 16 deletions

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import java.util.Map; import java.util.Map;
@ -244,21 +245,40 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
* Get Pulsar schema from the definition string * Get Pulsar schema from the definition string
*/ */
private void createPulsarSchemaFromConf() { private void createPulsarSchemaFromConf() {
Object value = pulsarNBClientConf.getSchemaConfValue("schema.type"); pulsarSchema = buldSchemaFromDefinition("schema.type", "schema.definition");
// this is to allow KEY_VALUE schema
if (pulsarNBClientConf.hasSchemaConfKey("schema.key.type")) {
Schema<?> pulsarKeySchema = buldSchemaFromDefinition("schema.key.type", "schema.key.definition");
Object encodingType = pulsarNBClientConf.getSchemaConfValue("schema.keyvalue.encodingtype");
KeyValueEncodingType keyValueEncodingType = KeyValueEncodingType.SEPARATED;
if (encodingType != null) {
keyValueEncodingType = KeyValueEncodingType.valueOf(encodingType.toString());
}
pulsarSchema = Schema.KeyValue(pulsarKeySchema, pulsarSchema, keyValueEncodingType);
}
}
private Schema<?> buldSchemaFromDefinition(String schemaTypeConfEntry,
String schemaDefinitionConfEntry) {
Object value = pulsarNBClientConf.getSchemaConfValue(schemaTypeConfEntry);
Object schemaDefinition = pulsarNBClientConf.getSchemaConfValue(schemaDefinitionConfEntry);
String schemaType = (value != null) ? value.toString() : ""; String schemaType = (value != null) ? value.toString() : "";
Schema<?> result;
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) {
value = pulsarNBClientConf.getSchemaConfValue("schema.definition"); String schemaDefStr = (schemaDefinition != null) ? schemaDefinition.toString() : "";
String schemaDefStr = (value != null) ? value.toString() : ""; result = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr);
pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr);
} else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) { } else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) {
pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType)); result = PulsarActivityUtil.getPrimitiveTypeSchema(schemaType);
} else if (PulsarActivityUtil.isAutoConsumeSchemaTypeStr(schemaType)) { } else if (PulsarActivityUtil.isAutoConsumeSchemaTypeStr(schemaType)) {
pulsarSchema = Schema.AUTO_CONSUME(); result = Schema.AUTO_CONSUME();
} else { } else {
throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " + throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " +
"Only primitive type, Avro type and AUTO_CONSUME are supported at the moment!"); "Only primitive type, Avro type and AUTO_CONSUME are supported at the moment!");
} }
logger.info("Generated schema from {}={} {}={}: {}", schemaTypeConfEntry, value, schemaDefinitionConfEntry, schemaDefinition, result.getSchemaInfo().getSchemaDefinition());
return result;
} }
public PulsarNBClientConf getPulsarConf() { return this.pulsarNBClientConf;} public PulsarNBClientConf getPulsarConf() { return this.pulsarNBClientConf;}

View File

@ -15,8 +15,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;

View File

@ -6,7 +6,6 @@ import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import io.nosqlbench.engine.api.templating.CommandTemplate; import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Producer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;

View File

@ -13,8 +13,10 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaType;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
@ -43,6 +45,9 @@ public class PulsarProducerOp implements PulsarOp {
private final Histogram messageSizeHistogram; private final Histogram messageSizeHistogram;
private final Timer transactionCommitTimer; private final Timer transactionCommitTimer;
private org.apache.avro.Schema avroSchema;
private org.apache.avro.Schema avroKeySchema;
public PulsarProducerOp( PulsarActivity pulsarActivity, public PulsarProducerOp( PulsarActivity pulsarActivity,
boolean asyncPulsarOp, boolean asyncPulsarOp,
boolean useTransaction, boolean useTransaction,
@ -101,7 +106,36 @@ public class PulsarProducerOp implements PulsarOp {
// set message payload // set message payload
int messageSize; int messageSize;
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { if (pulsarSchema instanceof KeyValueSchema) {
int separator = msgPayload.indexOf("}||{");
if (separator < 0) {
throw new IllegalArgumentException("KeyValue payload MUST be in form {KEY IN JSON}||{VALUE IN JSON} (with 2 pipes that separate the KEY part from the VALUE part)");
}
String keyInput = msgPayload.substring(0, separator + 1);
String valueInput = msgPayload.substring(separator + 3);
logger.info("msgPayload: {}", msgPayload);
logger.info("keyInput: {}", keyInput);
logger.info("valueInput: {}", valueInput);
KeyValueSchema keyValueSchema = (KeyValueSchema) pulsarSchema;
org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) keyValueSchema.getValueSchema(),
avroSchema,
valueInput
);
org.apache.avro.Schema avroSchemaForKey = getKeyAvroSchemaFromConfiguration();
GenericRecord key = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) keyValueSchema.getKeySchema(),
avroSchemaForKey,
keyInput
);
typedMessageBuilder = typedMessageBuilder.value(new KeyValue(key, payload));
// TODO: add a way to calculate the message size for KEY_VALUE messages
messageSize = msgPayload.length();
} else if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro(
(GenericAvroSchema) pulsarSchema, (GenericAvroSchema) pulsarSchema,
pulsarSchema.getSchemaInfo().getSchemaDefinition(), pulsarSchema.getSchemaInfo().getSchemaDefinition(),
@ -132,9 +166,7 @@ public class PulsarProducerOp implements PulsarOp {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload);
@ -187,9 +219,7 @@ public class PulsarProducerOp implements PulsarOp {
future.whenComplete((messageId, error) -> { future.whenComplete((messageId, error) -> {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload);
@ -224,4 +254,40 @@ public class PulsarProducerOp implements PulsarOp {
} }
} }
} }
private org.apache.avro.Schema getAvroSchemaFromConfiguration() {
// no need for synchronization, this is only a cache
// in case of the race we will parse the string twice, not a big
if (avroSchema == null) {
logger.info("getAvroSchemaFromConfiguration...{}", pulsarSchema);
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
Schema valueSchema = kvSchema.getValueSchema();
String avroDefStr = valueSchema.getSchemaInfo().getSchemaDefinition();
avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr);
} else {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr);
}
}
logger.info("getAvroSchemaFromConfiguration...avroSchema {}", avroSchema);
return avroSchema;
}
private org.apache.avro.Schema getKeyAvroSchemaFromConfiguration() {
// no need for synchronization, this is only a cache
// in case of the race we will parse the string twice, not a big
if (avroKeySchema == null) {
if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema;
Schema keySchema = kvSchema.getKeySchema();
String avroDefStr = keySchema.getSchemaInfo().getSchemaDefinition();
avroKeySchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr);
} else {
throw new RuntimeException("We are not using KEY_VALUE schema, so no Schema for the Key!");
}
}
logger.info("getKeyAvroSchemaFromConfiguration...avroSchema {}", avroKeySchema);
return avroKeySchema;
}
} }

View File

@ -4,7 +4,6 @@ import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.driver.pulsar.PulsarSpace;
import io.nosqlbench.engine.api.templating.CommandTemplate; import io.nosqlbench.engine.api.templating.CommandTemplate;
import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import java.util.function.LongFunction; import java.util.function.LongFunction;

View File

@ -90,6 +90,10 @@ public class AvroUtil {
// Get a Pulsar Avro record (GenericRecord) from a JSON string that matches a specific Pulsar Avro schema // Get a Pulsar Avro record (GenericRecord) from a JSON string that matches a specific Pulsar Avro schema
public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDefStr, String jsonData) { public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDefStr, String jsonData) {
org.apache.avro.Schema avroSchema = GetSchema_ApacheAvro(avroSchemDefStr); org.apache.avro.Schema avroSchema = GetSchema_ApacheAvro(avroSchemDefStr);
return GetGenericRecord_PulsarAvro(genericAvroSchema, avroSchema, jsonData);
}
public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, org.apache.avro.Schema avroSchema, String jsonData) {
org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchema, jsonData); org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchema, jsonData);
return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord); return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord);
} }

View File

@ -451,6 +451,10 @@ public class PulsarActivityUtil {
public static boolean isAvroSchemaTypeStr(String typeStr) { public static boolean isAvroSchemaTypeStr(String typeStr) {
return typeStr.equalsIgnoreCase("AVRO"); return typeStr.equalsIgnoreCase("AVRO");
} }
public static boolean isKeyValueTypeStr(String typeStr) {
return typeStr.equalsIgnoreCase("KEY_VALUE");
}
// automatic decode the type from the Registry // automatic decode the type from the Registry
public static boolean isAutoConsumeSchemaTypeStr(String typeStr) { public static boolean isAutoConsumeSchemaTypeStr(String typeStr) {
return typeStr.equalsIgnoreCase("AUTO_CONSUME"); return typeStr.equalsIgnoreCase("AUTO_CONSUME");