Pulsar: improve AVRO decoding

This commit is contained in:
Enrico Olivelli 2022-01-19 14:43:22 +01:00 committed by Jonathan Shook
parent ac30bf812e
commit 60bad74ed7

View File

@ -7,6 +7,8 @@ import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.Counter;
@ -18,10 +20,8 @@ import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.SchemaType;
@ -52,6 +52,8 @@ public class PulsarConsumerOp implements PulsarOp {
private final Histogram payloadRttHistogram;
private final String payloadRttTrackingField;
private org.apache.avro.Schema avroSchema;
public PulsarConsumerOp(
PulsarActivity pulsarActivity,
boolean asyncPulsarOp,
@ -152,7 +154,7 @@ public class PulsarConsumerOp implements PulsarOp {
msgRecvFuture.thenAccept(message -> {
try {
handleMessage(transaction, message);
} catch (PulsarClientException e) {
} catch (PulsarClientException | TimeoutException e) {
pulsarActivity.asyncOperationFailed(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -171,14 +173,30 @@ public class PulsarConsumerOp implements PulsarOp {
}
private void handleMessage(Transaction transaction, Message<?> message)
throws PulsarClientException, InterruptedException, ExecutionException {
throws PulsarClientException, InterruptedException, ExecutionException, TimeoutException {
// acknowledge the message as soon as possible
if (!useTransaction) {
consumer.acknowledgeAsync(message.getMessageId())
.get(timeoutSeconds, TimeUnit.SECONDS);
} else {
consumer.acknowledgeAsync(message.getMessageId(), transaction)
.get(timeoutSeconds, TimeUnit.SECONDS);
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.Schema avroSchema = getSchemaFromConfiguration();
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
@ -198,13 +216,30 @@ public class PulsarConsumerOp implements PulsarOp {
}
if (!payloadRttTrackingField.isEmpty()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
Object decodedPayload = message.getValue();
Long extractedSendTime = null;
// if Pulsar is able to decode this it is better to let it do the work
// because Pulsar caches the Schema, handles Schema evolution
// as much efficiently as possible
if (decodedPayload instanceof GenericRecord) {
GenericRecord pulsarGenericRecord = (GenericRecord) decodedPayload;
Object field = pulsarGenericRecord.getField(payloadRttTrackingField);
if (field != null) {
if (field instanceof Number) {
extractedSendTime = ((Number) field).longValue();
} else {
extractedSendTime = Long.valueOf(field.toString());
}
}
} else {
org.apache.avro.Schema avroSchema = getSchemaFromConfiguration();
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
extractedSendTime = (Long) avroGenericRecord.get(payloadRttTrackingField);
}
}
if (extractedSendTime != null) {
long delta = System.currentTimeMillis() - extractedSendTime;
payloadRttHistogram.update(delta);
}
@ -237,21 +272,16 @@ public class PulsarConsumerOp implements PulsarOp {
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
}
if (!useTransaction) {
consumer.acknowledge(message.getMessageId());
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
private org.apache.avro.Schema getSchemaFromConfiguration() {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
// no need for synchronization, this is only a cache
// in case of the race we will parse the string twice, not a big
if (avroSchema == null) {
avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr);
}
return avroSchema;
}
}