Extract payload rtt field from Avro record if it exists

This commit is contained in:
Matt Fleming 2021-11-09 20:44:00 +00:00
parent c15bd97c97
commit 8353a9d842

View File

@ -1,5 +1,12 @@
package io.nosqlbench.driver.pulsar.ops;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
@ -7,7 +14,6 @@ import io.nosqlbench.driver.pulsar.PulsarActivity;
import io.nosqlbench.driver.pulsar.exception.PulsarDriverUnexpectedException;
import io.nosqlbench.driver.pulsar.util.AvroUtil;
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.Consumer;
@ -16,11 +22,6 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
public class PulsarConsumerOp implements PulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);
@ -147,10 +148,16 @@ public class PulsarConsumerOp implements PulsarOp {
}
if (!payloadRttTrackingField.isEmpty()) {
// TODO: Extract the extractedSendTime from the payload and convert it to a long.
long extractedSendTime = 0L;
long delta = System.currentTimeMillis()-extractedSendTime;
payloadRttHistogram.update(delta);
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
org.apache.avro.Schema avroSchema =
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
org.apache.avro.generic.GenericRecord avroGenericRecord =
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
long delta = System.currentTimeMillis() - extractedSendTime;
payloadRttHistogram.update(delta);
}
}
// keep track end-to-end message processing latency