diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 7dfc3dd95..7b2e5057d 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -200,13 +200,15 @@ public class PulsarConsumerOp implements PulsarOp { org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); - logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}", + logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}", + consumer.getConsumerName(), message.getKey(), message.getProperties(), avroGenericRecord.toString()); } else { - logger.debug("Sync message received: msg-key={}; msg-properties={}; msg-payload={}", + logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}", + consumer.getConsumerName(), message.getKey(), message.getProperties(), new String(message.getData())); @@ -280,13 +282,15 @@ public class PulsarConsumerOp implements PulsarOp { org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); - logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})", + logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})", + consumer.getConsumerName(), message.getKey(), message.getProperties(), avroGenericRecord.toString()); } else { - logger.debug("Async message received: msg-key={}; msg-properties={}; msg-payload={})", + logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})", + consumer.getConsumerName(), message.getKey(), message.getProperties(), new String(message.getData())); @@ -315,7 +319,7 @@ public class PulsarConsumerOp implements PulsarOp { }); } catch (Exception e) { - throw new PulsarDriverUnexpectedException("Async message receiving failed"); + throw new PulsarDriverUnexpectedException(e); } } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java index 796621df9..9875d2586 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarProducerOp.java @@ -138,13 +138,15 @@ public class PulsarProducerOp implements PulsarOp { org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); - logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={})", + logger.debug("({}) Sync message sent: msg-key={}; msg-properties={}; msg-payload={})", + producer.getProducerName(), msgKey, msgProperties, avroGenericRecord.toString()); } else { - logger.debug("Sync message sent: msg-key={}; msg-properties={}; msg-payload={}", + logger.debug("({}) Sync message sent; msg-key={}; msg-properties={}; msg-payload={}", + producer.getProducerName(), msgKey, msgProperties, msgPayload); @@ -191,13 +193,15 @@ public class PulsarProducerOp implements PulsarOp { org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, msgPayload); - logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})", + logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={})", + producer.getProducerName(), msgKey, msgProperties, avroGenericRecord.toString()); } else { - logger.debug("Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}", + logger.debug("({}) Aysnc message sent: msg-key={}; msg-properties={}; msg-payload={}", + producer.getProducerName(), msgKey, msgProperties, msgPayload); diff --git a/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml b/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml index 61f3258b4..024bccd0b 100644 --- a/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml +++ b/driver-pulsar/src/main/resources/activities/pulsar_client_sanity_seqloss.yaml @@ -23,7 +23,7 @@ blocks: #seqerr_simu: "out_of_order" #seqerr_simu: "msg_loss" #seqerr_simu: "msg_dup" - seqerr_simu: "out_of_order, msg_loss" + #seqerr_simu: "out_of_order, msg_loss" msg_key: msg_property: msg_value: "{myvalue}" @@ -36,5 +36,5 @@ blocks: - name: s1 optype: msg-consume subscription_name: "mysub" - subscription_type: + subscription_type: "Shared" consumer_name: