From 60bad74ed7d7bb5d42ec6d5d0a2d345025f4cfe1 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 19 Jan 2022 14:43:22 +0100 Subject: [PATCH] Pulsar: improve AVRO decoding --- .../driver/pulsar/ops/PulsarConsumerOp.java | 86 +++++++++++++------ 1 file changed, 58 insertions(+), 28 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 1eb6ab20c..c70a3fa2b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -7,6 +7,8 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.commons.lang3.StringUtils; import com.codahale.metrics.Counter; @@ -18,10 +20,8 @@ import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.schema.SchemaType; @@ -52,6 +52,8 @@ public class PulsarConsumerOp implements PulsarOp { private final Histogram payloadRttHistogram; private final String payloadRttTrackingField; + private org.apache.avro.Schema avroSchema; + public PulsarConsumerOp( PulsarActivity pulsarActivity, boolean asyncPulsarOp, @@ -152,7 +154,7 @@ public class PulsarConsumerOp implements PulsarOp { msgRecvFuture.thenAccept(message -> { try { handleMessage(transaction, message); - } catch (PulsarClientException e) { + } catch (PulsarClientException | TimeoutException e) { pulsarActivity.asyncOperationFailed(e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -171,14 +173,30 @@ public class PulsarConsumerOp implements PulsarOp { } private void handleMessage(Transaction transaction, Message message) - throws PulsarClientException, InterruptedException, ExecutionException { + throws PulsarClientException, InterruptedException, ExecutionException, TimeoutException { + + // acknowledge the message as soon as possible + if (!useTransaction) { + consumer.acknowledgeAsync(message.getMessageId()) + .get(timeoutSeconds, TimeUnit.SECONDS); + } else { + consumer.acknowledgeAsync(message.getMessageId(), transaction) + .get(timeoutSeconds, TimeUnit.SECONDS); + + // little problem: here we are counting the "commit" time + // inside the overall time spent for the execution of the consume operation + // we should refactor this operation as for PulsarProducerOp, and use the passed callback + // to track with precision the time spent for the operation and for the commit + try (Timer.Context ctx = transactionCommitTimer.time()) { + transaction.commit().get(); + } + } + if (logger.isDebugEnabled()) { SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.Schema avroSchema = - AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.Schema avroSchema = getSchemaFromConfiguration(); org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); @@ -198,13 +216,30 @@ public class PulsarConsumerOp implements PulsarOp { } if (!payloadRttTrackingField.isEmpty()) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.Schema avroSchema = - AvroUtil.GetSchema_ApacheAvro(avroDefStr); - org.apache.avro.generic.GenericRecord avroGenericRecord = + Object decodedPayload = message.getValue(); + Long extractedSendTime = null; + // if Pulsar is able to decode this it is better to let it do the work + // because Pulsar caches the Schema, handles Schema evolution + // as much efficiently as possible + if (decodedPayload instanceof GenericRecord) { + GenericRecord pulsarGenericRecord = (GenericRecord) decodedPayload; + Object field = pulsarGenericRecord.getField(payloadRttTrackingField); + if (field != null) { + if (field instanceof Number) { + extractedSendTime = ((Number) field).longValue(); + } else { + extractedSendTime = Long.valueOf(field.toString()); + } + } + } else { + org.apache.avro.Schema avroSchema = getSchemaFromConfiguration(); + org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); - if (avroGenericRecord.hasField(payloadRttTrackingField)) { - long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField); + if (avroGenericRecord.hasField(payloadRttTrackingField)) { + extractedSendTime = (Long) avroGenericRecord.get(payloadRttTrackingField); + } + } + if (extractedSendTime != null) { long delta = System.currentTimeMillis() - extractedSendTime; payloadRttHistogram.update(delta); } @@ -237,21 +272,16 @@ public class PulsarConsumerOp implements PulsarOp { int messageSize = message.getData().length; bytesCounter.inc(messageSize); messageSizeHistogram.update(messageSize); + } - if (!useTransaction) { - consumer.acknowledge(message.getMessageId()); - } - else { - consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); - - // little problem: here we are counting the "commit" time - // inside the overall time spent for the execution of the consume operation - // we should refactor this operation as for PulsarProducerOp, and use the passed callback - // to track with precision the time spent for the operation and for the commit - try (Timer.Context ctx = transactionCommitTimer.time()) { - transaction.commit().get(); - } + private org.apache.avro.Schema getSchemaFromConfiguration() { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + // no need for synchronization, this is only a cache + // in case of the race we will parse the string twice, not a big + if (avroSchema == null) { + avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr); } + return avroSchema; } }