Look for Payload RTT field in the Key as well

This commit is contained in:
Enrico Olivelli
2022-01-26 08:42:25 +01:00
parent e3ed838354
commit 614fb3bb51

View File

@@ -25,7 +25,8 @@ import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType; // this is from the Shaded version of Avro inside the Pulsar client jar
import org.apache.pulsar.shade.org.apache.avro.AvroRuntimeException;
public class PulsarConsumerOp implements PulsarOp { public class PulsarConsumerOp implements PulsarOp {
@@ -234,12 +235,20 @@ public class PulsarConsumerOp implements PulsarOp {
// look into the Key // look into the Key
if (keyValue.getKey() instanceof GenericRecord) { if (keyValue.getKey() instanceof GenericRecord) {
GenericRecord keyPart = (GenericRecord) keyValue.getKey(); GenericRecord keyPart = (GenericRecord) keyValue.getKey();
field = keyPart.getField(payloadRttTrackingField); try {
field = keyPart.getField(payloadRttTrackingField);
} catch (AvroRuntimeException err) {
// field is not in the key
}
} }
// look into the Value // look into the Value
if (keyValue.getValue() instanceof GenericRecord && field == null) { if (keyValue.getValue() instanceof GenericRecord && field == null) {
GenericRecord valuePart = (GenericRecord) keyValue.getValue(); GenericRecord valuePart = (GenericRecord) keyValue.getValue();
field = valuePart.getField(payloadRttTrackingField); try {
field = valuePart.getField(payloadRttTrackingField);
} catch (AvroRuntimeException err) {
// field is not in the value
}
} }
} else { } else {
field = pulsarGenericRecord.getField(payloadRttTrackingField); field = pulsarGenericRecord.getField(payloadRttTrackingField);