mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Reduce code duplication in PulsarConsumerOp
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package io.nosqlbench.driver.pulsar.ops;
|
package io.nosqlbench.driver.pulsar.ops;
|
||||||
|
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
@@ -19,6 +20,7 @@ import org.apache.logging.log4j.LogManager;
|
|||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.pulsar.client.api.Consumer;
|
import org.apache.pulsar.client.api.Consumer;
|
||||||
import org.apache.pulsar.client.api.Message;
|
import org.apache.pulsar.client.api.Message;
|
||||||
|
import org.apache.pulsar.client.api.PulsarClientException;
|
||||||
import org.apache.pulsar.client.api.Schema;
|
import org.apache.pulsar.client.api.Schema;
|
||||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||||
import org.apache.pulsar.common.schema.SchemaType;
|
import org.apache.pulsar.common.schema.SchemaType;
|
||||||
@@ -85,7 +87,7 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
this.payloadRttTrackingField = payloadRttTrackingField;
|
this.payloadRttTrackingField = payloadRttTrackingField;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkAndUpdateMessageErrorCounter(Message message) {
|
private void checkAndUpdateMessageErrorCounter(Message<?> message) {
|
||||||
String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_NUMBER);
|
String msgSeqIdStr = message.getProperty(PulsarActivityUtil.MSG_SEQUENCE_NUMBER);
|
||||||
|
|
||||||
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
|
if ( !StringUtils.isBlank(msgSeqIdStr) ) {
|
||||||
@@ -108,9 +110,9 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!asyncPulsarOp) {
|
if (!asyncPulsarOp) {
|
||||||
Message<?> message;
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
Message<?> message;
|
||||||
|
|
||||||
if (timeoutSeconds <= 0) {
|
if (timeoutSeconds <= 0) {
|
||||||
// wait forever
|
// wait forever
|
||||||
message = consumer.receive();
|
message = consumer.receive();
|
||||||
@@ -123,77 +125,11 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
handleMessage(transaction, message);
|
||||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
|
||||||
|
|
||||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
|
||||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
|
||||||
org.apache.avro.Schema avroSchema =
|
|
||||||
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
|
||||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
|
||||||
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
|
||||||
|
|
||||||
logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
|
|
||||||
consumer.getConsumerName(),
|
|
||||||
message.getKey(),
|
|
||||||
message.getProperties(),
|
|
||||||
avroGenericRecord.toString());
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
logger.debug("({}) Sync message received: msg-key={}; msg-properties={}; msg-payload={}",
|
|
||||||
consumer.getConsumerName(),
|
|
||||||
message.getKey(),
|
|
||||||
message.getProperties(),
|
|
||||||
new String(message.getData()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!payloadRttTrackingField.isEmpty()) {
|
|
||||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
|
||||||
org.apache.avro.Schema avroSchema =
|
|
||||||
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
|
||||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
|
||||||
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
|
||||||
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
|
|
||||||
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
|
|
||||||
long delta = System.currentTimeMillis() - extractedSendTime;
|
|
||||||
payloadRttHistogram.update(delta);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// keep track end-to-end message processing latency
|
|
||||||
if (e2eMsgProc) {
|
|
||||||
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
|
|
||||||
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
|
|
||||||
}
|
|
||||||
|
|
||||||
// keep track of message errors and update error counters
|
|
||||||
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
|
|
||||||
|
|
||||||
int messageSize = message.getData().length;
|
|
||||||
bytesCounter.inc(messageSize);
|
|
||||||
messageSizeHistogram.update(messageSize);
|
|
||||||
|
|
||||||
if (!useTransaction) {
|
|
||||||
consumer.acknowledge(message.getMessageId());
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
|
|
||||||
|
|
||||||
// little problem: here we are counting the "commit" time
|
|
||||||
// inside the overall time spent for the execution of the consume operation
|
|
||||||
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
|
|
||||||
// to track with precision the time spent for the operation and for the commit
|
|
||||||
try (Timer.Context ctx = transactionCommitTimer.time()) {
|
|
||||||
transaction.commit().get();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
logger.error(
|
logger.error(
|
||||||
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds);
|
"Sync message receiving failed - timeout value: {} seconds ", timeoutSeconds, e);
|
||||||
e.printStackTrace();
|
|
||||||
throw new PulsarDriverUnexpectedException("" +
|
throw new PulsarDriverUnexpectedException("" +
|
||||||
"Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds ");
|
"Sync message receiving failed - timeout value: " + timeoutSeconds + " seconds ");
|
||||||
}
|
}
|
||||||
@@ -213,52 +149,16 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
msgRecvFuture.whenComplete((message, error) -> {
|
msgRecvFuture.thenAccept(message -> {
|
||||||
int messageSize = message.getData().length;
|
try {
|
||||||
bytesCounter.inc(messageSize);
|
handleMessage(transaction, message);
|
||||||
messageSizeHistogram.update(messageSize);
|
} catch (PulsarClientException e) {
|
||||||
|
pulsarActivity.asyncOperationFailed(e);
|
||||||
if (logger.isDebugEnabled()) {
|
} catch (InterruptedException e) {
|
||||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
Thread.currentThread().interrupt();
|
||||||
|
} catch (ExecutionException e) {
|
||||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
pulsarActivity.asyncOperationFailed(e.getCause());
|
||||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
|
||||||
org.apache.avro.Schema avroSchema =
|
|
||||||
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
|
||||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
|
||||||
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
|
||||||
|
|
||||||
logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
|
|
||||||
consumer.getConsumerName(),
|
|
||||||
message.getKey(),
|
|
||||||
message.getProperties(),
|
|
||||||
avroGenericRecord.toString());
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
logger.debug("({}) Async message received: msg-key={}; msg-properties={}; msg-payload={})",
|
|
||||||
consumer.getConsumerName(),
|
|
||||||
message.getKey(),
|
|
||||||
message.getProperties(),
|
|
||||||
new String(message.getData()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (e2eMsgProc) {
|
|
||||||
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
|
|
||||||
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
|
|
||||||
}
|
|
||||||
|
|
||||||
// keep track of message errors and update error counters
|
|
||||||
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
|
|
||||||
|
|
||||||
if (!useTransaction) {
|
|
||||||
consumer.acknowledgeAsync(message);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
consumer.acknowledgeAsync(message.getMessageId(), transaction);
|
|
||||||
}
|
|
||||||
|
|
||||||
timeTracker.run();
|
|
||||||
}).exceptionally(ex -> {
|
}).exceptionally(ex -> {
|
||||||
pulsarActivity.asyncOperationFailed(ex);
|
pulsarActivity.asyncOperationFailed(ex);
|
||||||
return null;
|
return null;
|
||||||
@@ -270,4 +170,73 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void handleMessage(Transaction transaction, Message<?> message)
|
||||||
|
throws PulsarClientException, InterruptedException, ExecutionException {
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
|
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||||
|
|
||||||
|
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
||||||
|
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
||||||
|
org.apache.avro.Schema avroSchema =
|
||||||
|
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
||||||
|
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
||||||
|
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
||||||
|
|
||||||
|
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
|
||||||
|
consumer.getConsumerName(),
|
||||||
|
message.getKey(),
|
||||||
|
message.getProperties(),
|
||||||
|
avroGenericRecord.toString());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
logger.debug("({}) message received: msg-key={}; msg-properties={}; msg-payload={}",
|
||||||
|
consumer.getConsumerName(),
|
||||||
|
message.getKey(),
|
||||||
|
message.getProperties(),
|
||||||
|
new String(message.getData()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!payloadRttTrackingField.isEmpty()) {
|
||||||
|
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
||||||
|
org.apache.avro.Schema avroSchema =
|
||||||
|
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
||||||
|
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
||||||
|
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
||||||
|
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
|
||||||
|
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
|
||||||
|
long delta = System.currentTimeMillis() - extractedSendTime;
|
||||||
|
payloadRttHistogram.update(delta);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep track end-to-end message processing latency
|
||||||
|
if (e2eMsgProc) {
|
||||||
|
long e2eMsgLatency = System.currentTimeMillis() - message.getPublishTime();
|
||||||
|
e2eMsgProcLatencyHistogram.update(e2eMsgLatency);
|
||||||
|
}
|
||||||
|
|
||||||
|
// keep track of message errors and update error counters
|
||||||
|
if (seqTracking) checkAndUpdateMessageErrorCounter(message);
|
||||||
|
|
||||||
|
int messageSize = message.getData().length;
|
||||||
|
bytesCounter.inc(messageSize);
|
||||||
|
messageSizeHistogram.update(messageSize);
|
||||||
|
|
||||||
|
if (!useTransaction) {
|
||||||
|
consumer.acknowledge(message.getMessageId());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
|
||||||
|
|
||||||
|
// little problem: here we are counting the "commit" time
|
||||||
|
// inside the overall time spent for the execution of the consume operation
|
||||||
|
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
|
||||||
|
// to track with precision the time spent for the operation and for the commit
|
||||||
|
try (Timer.Context ctx = transactionCommitTimer.time()) {
|
||||||
|
transaction.commit().get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user