Convert message error runtime exception to counter metrics

This commit is contained in:
Yabin Meng 2021-10-13 11:27:54 -05:00
parent ac2fdf2a2e
commit ed4ba2725e
3 changed files with 19 additions and 11 deletions

View File

@ -200,13 +200,15 @@ public class PulsarConsumerOp implements PulsarOp {
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}", logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(), message.getKey(),
message.getProperties(), message.getProperties(),
avroGenericRecord.toString()); avroGenericRecord.toString());
} }
else { else {
logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}", logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(), message.getKey(),
message.getProperties(), message.getProperties(),
new String(message.getData())); new String(message.getData()));
@ -280,13 +282,15 @@ public class PulsarConsumerOp implements PulsarOp {
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})", logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
consumer.getConsumerName(),
message.getKey(), message.getKey(),
message.getProperties(), message.getProperties(),
avroGenericRecord.toString()); avroGenericRecord.toString());
} }
else { else {
logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})", logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
consumer.getConsumerName(),
message.getKey(), message.getKey(),
message.getProperties(), message.getProperties(),
new String(message.getData())); new String(message.getData()));
@ -315,7 +319,7 @@ public class PulsarConsumerOp implements PulsarOp {
}); });
} }
catch (Exception e) { catch (Exception e) {
throw new PulsarDriverUnexpectedException("Async message receiving failed"); throw new PulsarDriverUnexpectedException(e);
} }
} }
} }

View File

@ -138,13 +138,15 @@ public class PulsarProducerOp implements PulsarOp {
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload);
logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={})", logger.debug("({}) Sync message sent: msg-key={}; msg-properties={}; msg-payload={})",
producer.getProducerName(),
msgKey, msgKey,
msgProperties, msgProperties,
avroGenericRecord.toString()); avroGenericRecord.toString());
} }
else { else {
logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={}", logger.debug("({}) Sync message sent; msg-key={}; msg-properties={}; msg-payload={}",
producer.getProducerName(),
msgKey, msgKey,
msgProperties, msgProperties,
msgPayload); msgPayload);
@ -191,13 +193,15 @@ public class PulsarProducerOp implements PulsarOp {
org.apache.avro.generic.GenericRecord avroGenericRecord = org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload);
logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})", logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})",
producer.getProducerName(),
msgKey, msgKey,
msgProperties, msgProperties,
avroGenericRecord.toString()); avroGenericRecord.toString());
} }
else { else {
logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}", logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}",
producer.getProducerName(),
msgKey, msgKey,
msgProperties, msgProperties,
msgPayload); msgPayload);

View File

@ -23,7 +23,7 @@ blocks:
#seqerr_simu: "out_of_order" #seqerr_simu: "out_of_order"
#seqerr_simu: "msg_loss" #seqerr_simu: "msg_loss"
#seqerr_simu: "msg_dup" #seqerr_simu: "msg_dup"
seqerr_simu: "out_of_order, msg_loss" #seqerr_simu: "out_of_order, msg_loss"
msg_key: msg_key:
msg_property: msg_property:
msg_value: "{myvalue}" msg_value: "{myvalue}"
@ -36,5 +36,5 @@ blocks:
- name: s1 - name: s1
optype: msg-consume optype: msg-consume
subscription_name: "mysub" subscription_name: "mysub"
subscription_type: subscription_type: "Shared"
consumer_name: consumer_name: