From 94b72156f04b7e7be4577c5453998fd8edd810ed Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 19 Jan 2022 14:43:22 +0100 Subject: [PATCH 1/2] Pulsar: improve AVRO decoding --- .../driver/pulsar/ops/PulsarConsumerOp.java | 86 +++++++++++++------ 1 file changed, 58 insertions(+), 28 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java index 1eb6ab20c..c70a3fa2b 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/ops/PulsarConsumerOp.java @@ -7,6 +7,8 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.function.Supplier; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.commons.lang3.StringUtils; import com.codahale.metrics.Counter; @@ -18,10 +20,8 @@ import io.nosqlbench.driver.pulsar.util.AvroUtil; import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.common.schema.SchemaType; @@ -52,6 +52,8 @@ public class PulsarConsumerOp implements PulsarOp { private final Histogram payloadRttHistogram; private final String payloadRttTrackingField; + private org.apache.avro.Schema avroSchema; + public PulsarConsumerOp( PulsarActivity pulsarActivity, boolean asyncPulsarOp, @@ -152,7 +154,7 @@ public class PulsarConsumerOp implements PulsarOp { msgRecvFuture.thenAccept(message -> { try { handleMessage(transaction, message); - } catch (PulsarClientException e) { + } catch (PulsarClientException | TimeoutException e) { pulsarActivity.asyncOperationFailed(e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -171,14 +173,30 @@ public class PulsarConsumerOp implements PulsarOp { } private void handleMessage(Transaction transaction, Message message) - throws PulsarClientException, InterruptedException, ExecutionException { + throws PulsarClientException, InterruptedException, ExecutionException, TimeoutException { + + // acknowledge the message as soon as possible + if (!useTransaction) { + consumer.acknowledgeAsync(message.getMessageId()) + .get(timeoutSeconds, TimeUnit.SECONDS); + } else { + consumer.acknowledgeAsync(message.getMessageId(), transaction) + .get(timeoutSeconds, TimeUnit.SECONDS); + + // little problem: here we are counting the "commit" time + // inside the overall time spent for the execution of the consume operation + // we should refactor this operation as for PulsarProducerOp, and use the passed callback + // to track with precision the time spent for the operation and for the commit + try (Timer.Context ctx = transactionCommitTimer.time()) { + transaction.commit().get(); + } + } + if (logger.isDebugEnabled()) { SchemaType schemaType = pulsarSchema.getSchemaInfo().getType(); if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.Schema avroSchema = - AvroUtil.GetSchema_ApacheAvro(avroDefStr); + org.apache.avro.Schema avroSchema = getSchemaFromConfiguration(); org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); @@ -198,13 +216,30 @@ public class PulsarConsumerOp implements PulsarOp { } if (!payloadRttTrackingField.isEmpty()) { - String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); - org.apache.avro.Schema avroSchema = - AvroUtil.GetSchema_ApacheAvro(avroDefStr); - org.apache.avro.generic.GenericRecord avroGenericRecord = + Object decodedPayload = message.getValue(); + Long extractedSendTime = null; + // if Pulsar is able to decode this it is better to let it do the work + // because Pulsar caches the Schema, handles Schema evolution + // as much efficiently as possible + if (decodedPayload instanceof GenericRecord) { + GenericRecord pulsarGenericRecord = (GenericRecord) decodedPayload; + Object field = pulsarGenericRecord.getField(payloadRttTrackingField); + if (field != null) { + if (field instanceof Number) { + extractedSendTime = ((Number) field).longValue(); + } else { + extractedSendTime = Long.valueOf(field.toString()); + } + } + } else { + org.apache.avro.Schema avroSchema = getSchemaFromConfiguration(); + org.apache.avro.generic.GenericRecord avroGenericRecord = AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData()); - if (avroGenericRecord.hasField(payloadRttTrackingField)) { - long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField); + if (avroGenericRecord.hasField(payloadRttTrackingField)) { + extractedSendTime = (Long) avroGenericRecord.get(payloadRttTrackingField); + } + } + if (extractedSendTime != null) { long delta = System.currentTimeMillis() - extractedSendTime; payloadRttHistogram.update(delta); } @@ -237,21 +272,16 @@ public class PulsarConsumerOp implements PulsarOp { int messageSize = message.getData().length; bytesCounter.inc(messageSize); messageSizeHistogram.update(messageSize); + } - if (!useTransaction) { - consumer.acknowledge(message.getMessageId()); - } - else { - consumer.acknowledgeAsync(message.getMessageId(), transaction).get(); - - // little problem: here we are counting the "commit" time - // inside the overall time spent for the execution of the consume operation - // we should refactor this operation as for PulsarProducerOp, and use the passed callback - // to track with precision the time spent for the operation and for the commit - try (Timer.Context ctx = transactionCommitTimer.time()) { - transaction.commit().get(); - } + private org.apache.avro.Schema getSchemaFromConfiguration() { + String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition(); + // no need for synchronization, this is only a cache + // in case of the race we will parse the string twice, not a big + if (avroSchema == null) { + avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr); } + return avroSchema; } } From bcfa4911efee809f43142aa3e6b537ad6ff125b2 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Thu, 20 Jan 2022 10:03:41 +0100 Subject: [PATCH 2/2] Logging: do not set ROOT level to ALL --- .../engine/core/logging/LoggerConfig.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/logging/LoggerConfig.java b/engine-core/src/main/java/io/nosqlbench/engine/core/logging/LoggerConfig.java index bb395d5e8..222c1b173 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/logging/LoggerConfig.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/logging/LoggerConfig.java @@ -66,7 +66,6 @@ public class LoggerConfig extends ConfigurationFactory { private final String logfilePattern = DEFAULT_LOGFILE_PATTERN; private NBLogLevel fileLevel = NBLogLevel.DEBUG; - public static final Level ROOT_LOG_LEVEL = Level.ALL; private Map logLevelOverrides = new LinkedHashMap<>(); private Path loggerDir = Path.of("logs"); private String sessionName; @@ -116,9 +115,20 @@ public class LoggerConfig extends ConfigurationFactory { Level internalLoggingStatusThreshold = Level.ERROR; Level builderThresholdLevel = Level.INFO; -// Level rootLoggingLevel = Level.INFO; - RootLoggerComponentBuilder rootBuilder = builder.newRootLogger(ROOT_LOG_LEVEL); + Level fileLevel = Level.valueOf(getEffectiveFileLevel().toString()); + Level consoleLevel = Level.valueOf(this.consoleLevel.toString()); + + // configure the ROOT logger the same way as the File level + // this is because the fileLevel is supposed to show more than the console + + // therefore, it is very important that the ROOT level is as much specific as possible + // because NB code and especially third party libraries may rely on logging guards (if logger.isDebugEnabled()) + // to reduce memory allocations and resource waste due to debug/trace logging + // if you set ROOT to ALL or to TRACE then you will trigger the execution of trace/debugging code + // that will affect performances and impact on the measurements made with NB + Level rootLoggingLevel = fileLevel; + RootLoggerComponentBuilder rootBuilder = builder.newRootLogger(rootLoggingLevel); builder.setConfigurationName(name); @@ -186,14 +196,14 @@ public class LoggerConfig extends ConfigurationFactory { rootBuilder.add( builder.newAppenderRef("SCENARIO_APPENDER") - .addAttribute("level", Level.valueOf(getEffectiveFileLevel().toString())) + .addAttribute("level", fileLevel) ); } rootBuilder.add( builder.newAppenderRef("console") .addAttribute("level", - Level.valueOf(consoleLevel.toString()) + consoleLevel ) );