From d940b40b104cdef96c76e4110f78b440ad1b68e5 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 2 Feb 2022 17:01:38 +0100 Subject: [PATCH] Pulsar: implement KeyValue schema for PulsarProducerOp --- .../driver/pulsar/PulsarActivity.java | 32 ++++++-- .../nosqlbench/driver/pulsar/PulsarSpace.java | 2 +- .../pulsar/ops/PulsarBatchProducerMapper.java | 1 - .../driver/pulsar/ops/PulsarProducerOp.java | 80 +++++++++++++++++-- .../driver/pulsar/ops/PulsarReaderMapper.java | 1 - .../driver/pulsar/util/AvroUtil.java | 4 + .../pulsar/util/PulsarActivityUtil.java | 4 + 7 files changed, 108 insertions(+), 16 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index 0d95037b4..574f2c0f3 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.*; +import org.apache.pulsar.common.schema.KeyValueEncodingType; import java.util.Map; @@ -244,21 +245,40 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve * Get Pulsar schema from the definition string */ private void createPulsarSchemaFromConf() { - Object value = pulsarNBClientConf.getSchemaConfValue("schema.type"); + pulsarSchema = buldSchemaFromDefinition("schema.type", "schema.definition"); + + // this is to allow KEY_VALUE schema + if (pulsarNBClientConf.hasSchemaConfKey("schema.key.type")) { + Schema pulsarKeySchema = buldSchemaFromDefinition("schema.key.type", "schema.key.definition"); + Object encodingType = pulsarNBClientConf.getSchemaConfValue("schema.keyvalue.encodingtype"); + KeyValueEncodingType keyValueEncodingType = KeyValueEncodingType.SEPARATED; + if (encodingType != null) { + keyValueEncodingType = KeyValueEncodingType.valueOf(encodingType.toString()); + } + pulsarSchema = Schema.KeyValue(pulsarKeySchema, pulsarSchema, keyValueEncodingType); + } + } + + private Schema buldSchemaFromDefinition(String schemaTypeConfEntry, + String schemaDefinitionConfEntry) { + Object value = pulsarNBClientConf.getSchemaConfValue(schemaTypeConfEntry); + Object schemaDefinition = pulsarNBClientConf.getSchemaConfValue(schemaDefinitionConfEntry); String schemaType = (value != null) ? value.toString() : ""; + Schema result; if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType)) { - value = pulsarNBClientConf.getSchemaConfValue("schema.definition"); - String schemaDefStr = (value != null) ? value.toString() : ""; - pulsarSchema = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr); + String schemaDefStr = (schemaDefinition != null) ? schemaDefinition.toString() : ""; + result = PulsarActivityUtil.getAvroSchema(schemaType, schemaDefStr); } else if (PulsarActivityUtil.isPrimitiveSchemaTypeStr(schemaType)) { - pulsarSchema = PulsarActivityUtil.getPrimitiveTypeSchema((schemaType)); + result = PulsarActivityUtil.getPrimitiveTypeSchema(schemaType); } else if (PulsarActivityUtil.isAutoConsumeSchemaTypeStr(schemaType)) { - pulsarSchema = Schema.AUTO_CONSUME(); + result = Schema.AUTO_CONSUME(); } else { throw new RuntimeException("Unsupported schema type string: " + schemaType + "; " + "Only primitive type, Avro type and AUTO_CONSUME are supported at the moment!"); } + logger.info("Generated schema from {}={} {}={}: {}", schemaTypeConfEntry, value, schemaDefinitionConfEntry, schemaDefinition, result.getSchemaInfo().getSchemaDefinition()); + return result; } public PulsarNBClientConf getPulsarConf() { return this.pulsarNBClientConf;} diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index b7978ad02..7c0f44704 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -15,8 +15,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.common.schema.KeyValueEncodingType; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java index 13605134a..40512dc30 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarBatchProducerMapper.java @@ -6,7 +6,6 @@ import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.api.Producer; import java.util.HashMap; import java.util.Map; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 9875d2586..8f3413fdf 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -13,8 +13,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.KeyValueSchema; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema; +import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; import java.nio.charset.StandardCharsets; @@ -43,6 +45,9 @@ public class PulsarProducerOp implements PulsarOp { private final Histogram messageSizeHistogram; private final Timer transactionCommitTimer; + private org.apache.avro.Schema avroSchema; + private org.apache.avro.Schema avroKeySchema; + public PulsarProducerOp( PulsarActivity pulsarActivity, boolean asyncPulsarOp, boolean useTransaction, @@ -101,7 +106,36 @@ public class PulsarProducerOp implements PulsarOp { // set message payload int messageSize; SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); - if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { + if (pulsarSchema instanceof KeyValueSchema) { + + int separator = msgPayload.indexOf("}||{"); + if (separator < 0) { + throw new IllegalArgumentException("KeyValue payload MUST be in form {KEY IN JSON}||{VALUE IN JSON} (with 2 pipes that separate the KEY part from the VALUE part)"); + } + String keyInput = msgPayload.substring(0, separator + 1); + String valueInput = msgPayload.substring(separator + 3); + logger.info("msgPayload: {}", msgPayload); + logger.info("keyInput: {}", keyInput); + logger.info("valueInput: {}", valueInput); + + KeyValueSchema keyValueSchema = (KeyValueSchema) pulsarSchema; + org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration(); + GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( + (GenericAvroSchema) keyValueSchema.getValueSchema(), + avroSchema, + valueInput + ); + + org.apache.avro.Schema avroSchemaForKey = getKeyAvroSchemaFromConfiguration(); + GenericRecord key = AvroUtil.GetGenericRecord_PulsarAvro( + (GenericAvroSchema) keyValueSchema.getKeySchema(), + avroSchemaForKey, + keyInput + ); + typedMessageBuilder = typedMessageBuilder.value(new KeyValue(key, payload)); + // TODO: add a way to calculate the message size for KEY_VALUE messages + messageSize = msgPayload.length(); + } else if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { GenericRecord payload = AvroUtil.GetGenericRecord_PulsarAvro( (GenericAvroSchema) pulsarSchema, pulsarSchema.getSchemaInfo().getSchemaDefinition(), @@ -132,9 +166,7 @@ public class PulsarProducerOp implements PulsarOp { if (logger.isDebugEnabled()) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.Schema avroSchema = - AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration(); org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); @@ -187,9 +219,7 @@ public class PulsarProducerOp implements PulsarOp { future.whenComplete((messageId, error) -> { if (logger.isDebugEnabled()) { if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.Schema avroSchema = - AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.Schema avroSchema = getAvroSchemaFromConfiguration(); org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); @@ -224,4 +254,40 @@ public class PulsarProducerOp implements PulsarOp { } } } + + private org.apache.avro.Schema getAvroSchemaFromConfiguration() { + // no need for synchronization, this is only a cache + // in case of the race we will parse the string twice, not a big + if (avroSchema == null) { + logger.info("getAvroSchemaFromConfiguration...{}", pulsarSchema); + if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { + KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema; + Schema valueSchema = kvSchema.getValueSchema(); + String avroDefStr = valueSchema.getSchemaInfo().getSchemaDefinition(); + avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr); + } else { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr); + } + } + logger.info("getAvroSchemaFromConfiguration...avroSchema {}", avroSchema); + return avroSchema; + } + + private org.apache.avro.Schema getKeyAvroSchemaFromConfiguration() { + // no need for synchronization, this is only a cache + // in case of the race we will parse the string twice, not a big + if (avroKeySchema == null) { + if (pulsarSchema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) { + KeyValueSchema kvSchema = (KeyValueSchema) pulsarSchema; + Schema keySchema = kvSchema.getKeySchema(); + String avroDefStr = keySchema.getSchemaInfo().getSchemaDefinition(); + avroKeySchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr); + } else { + throw new RuntimeException("We are not using KEY_VALUE schema, so no Schema for the Key!"); + } + } + logger.info("getKeyAvroSchemaFromConfiguration...avroSchema {}", avroKeySchema); + return avroKeySchema; + } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java index 4ebbc965f..ff88d442d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarReaderMapper.java @@ -4,7 +4,6 @@ import io.nosqlbench.driver.pulsar.PulsarActivity; import io.nosqlbench.driver.pulsar.PulsarSpace; import io.nosqlbench.engine.api.templating.CommandTemplate; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.Schema; import java.util.function.LongFunction; diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java index 1df1f64bf..9ee531324 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/AvroUtil.java @@ -90,6 +90,10 @@ public class AvroUtil { // Get a Pulsar Avro record (GenericRecord) from a JSON string that matches a specific Pulsar Avro schema public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, String avroSchemDefStr, String jsonData) { org.apache.avro.Schema avroSchema = GetSchema_ApacheAvro(avroSchemDefStr); + return GetGenericRecord_PulsarAvro(genericAvroSchema, avroSchema, jsonData); + } + + public static GenericRecord GetGenericRecord_PulsarAvro(GenericAvroSchema genericAvroSchema, org.apache.avro.Schema avroSchema, String jsonData) { org.apache.avro.generic.GenericRecord apacheAvroRecord = GetGenericRecord_ApacheAvro(avroSchema, jsonData); return GetGenericRecord_PulsarAvro(genericAvroSchema, apacheAvroRecord); } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java index 166000055..b6dca8261 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/util/PulsarActivityUtil.java @@ -451,6 +451,10 @@ public class PulsarActivityUtil { public static boolean isAvroSchemaTypeStr(String typeStr) { return typeStr.equalsIgnoreCase("AVRO"); } + public static boolean isKeyValueTypeStr(String typeStr) { + return typeStr.equalsIgnoreCase("KEY_VALUE"); + } + // automatic decode the type from the Registry public static boolean isAutoConsumeSchemaTypeStr(String typeStr) { return typeStr.equalsIgnoreCase("AUTO_CONSUME");