fail if field is not found

This commit is contained in:
Enrico Olivelli
2022-01-26 17:00:41 +01:00
parent 614fb3bb51
commit 2e8fbd7102

View File

@@ -239,6 +239,7 @@ public class PulsarConsumerOp implements PulsarOp {
field = keyPart.getField(payloadRttTrackingField); field = keyPart.getField(payloadRttTrackingField);
} catch (AvroRuntimeException err) { } catch (AvroRuntimeException err) {
// field is not in the key // field is not in the key
logger.error("Cannot find "+payloadRttTrackingField+" in key " + keyPart + ": " + err);
} }
} }
// look into the Value // look into the Value
@@ -248,8 +249,12 @@ public class PulsarConsumerOp implements PulsarOp {
field = valuePart.getField(payloadRttTrackingField); field = valuePart.getField(payloadRttTrackingField);
} catch (AvroRuntimeException err) { } catch (AvroRuntimeException err) {
// field is not in the value // field is not in the value
logger.error("Cannot find "+payloadRttTrackingField+" in value " + valuePart + ": " + err);
} }
} }
if (field == null) {
throw new RuntimeException("Cannot find field " + payloadRttTrackingField + " in " + keyValue.getKey() + " and " + keyValue.getValue());
}
} else { } else {
field = pulsarGenericRecord.getField(payloadRttTrackingField); field = pulsarGenericRecord.getField(payloadRttTrackingField);
} }
@@ -260,6 +265,8 @@ public class PulsarConsumerOp implements PulsarOp {
} else { } else {
extractedSendTime = Long.valueOf(field.toString()); extractedSendTime = Long.valueOf(field.toString());
} }
} else {
logger.error("Cannot find " + payloadRttTrackingField + " in value " + pulsarGenericRecord);
} }
done = true; done = true;
} }