Reduce code duplication in PulsarConsumerOp

This commit is contained in:
Lari Hotari 2021-12-08 09:08:21 +02:00
parent 333e277b20
commit fb30deb1ff

View File

@ -1,6 +1,7 @@
package io.nosqlbench.driver.pulsar.ops;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
@ -19,6 +20,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.SchemaType;
@ -85,7 +87,7 @@ public class PulsarConsumerOp implements PulsarOp {
this.payloadRttTrackingField = payloadRttTrackingField;
}
private void checkAndUpdateMessageErrorCounter(Message message) {
private void checkAndUpdateMessageErrorCounter(Message<?> message) {
String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_NUMBER);
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
@ -108,9 +110,9 @@ public class PulsarConsumerOp implements PulsarOp {
}
if (!asyncPulsarOp) {
Message<?> message;
try {
Message<?> message;
if (timeoutSeconds <= 0) {
// wait forever
message = consumer.receive();
@ -123,77 +125,11 @@ public class PulsarConsumerOp implements PulsarOp {
}
}
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
}
if (!payloadRttTrackingField.isEmpty()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
long delta = System.currentTimeMillis() - extractedSendTime;
payloadRttHistogram.update(delta);
}
}
// keep track end-to-end message processing latency
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (!useTransaction) {
consumer.acknowledge(message.getMessageId());
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
handleMessage(transaction, message);
}
catch (Exception e) {
logger.error(
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds);
e.printStackTrace();
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds, e);
throw new PulsarDriverUnexpectedException("" +
"Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds ");
}
@ -213,52 +149,16 @@ public class PulsarConsumerOp implements PulsarOp {
);
}
msgRecvFuture.whenComplete((message, error) -> {
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
msgRecvFuture.thenAccept(message -> {
try {
handleMessage(transaction, message);
} catch (PulsarClientException e) {
pulsarActivity.asyncOperationFailed(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
pulsarActivity.asyncOperationFailed(e.getCause());
}
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
if (!useTransaction) {
consumer.acknowledgeAsync(message);
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction);
}
timeTracker.run();
}).exceptionally(ex -> {
pulsarActivity.asyncOperationFailed(ex);
return null;
@ -270,4 +170,73 @@ public class PulsarConsumerOp implements PulsarOp {
}
}
private void handleMessage(Transaction transaction, Message<?> message)
throws PulsarClientException, InterruptedException, ExecutionException {
if (logger.isDebugEnabled()) {
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
avroGenericRecord.toString());
}
else {
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
consumer.getConsumerName(),
message.getKey(),
message.getProperties(),
new String(message.getData()));
}
}
if (!payloadRttTrackingField.isEmpty()) {
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
long delta = System.currentTimeMillis() - extractedSendTime;
payloadRttHistogram.update(delta);
}
}
// keep track end-to-end message processing latency
if (e2eMsgProc) {
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
}
// keep track of message errors and update error counters
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
int messageSize = message.getData().length;
bytesCounter.inc(messageSize);
messageSizeHistogram.update(messageSize);
if (!useTransaction) {
consumer.acknowledge(message.getMessageId());
}
else {
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
// little problem: here we are counting the "commit" time
// inside the overall time spent for the execution of the consume operation
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
// to track with precision the time spent for the operation and for the commit
try (Timer.Context ctx = transactionCommitTimer.time()) {
transaction.commit().get();
}
}
}
}