mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge branch 'main' of github.com:nosqlbench/nosqlbench
This commit is contained in:
commit
d98e1aaaee
@ -7,6 +7,8 @@ import java.util.concurrent.TimeoutException;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import com.google.gson.Gson;
|
||||||
|
import com.google.gson.GsonBuilder;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
|
||||||
import com.codahale.metrics.Counter;
|
import com.codahale.metrics.Counter;
|
||||||
@ -18,10 +20,8 @@ import io.nosqlbench.driver.pulsar.util.AvroUtil;
|
|||||||
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
import io.nosqlbench.driver.pulsar.util.PulsarActivityUtil;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.pulsar.client.api.Consumer;
|
import org.apache.pulsar.client.api.*;
|
||||||
import org.apache.pulsar.client.api.Message;
|
import org.apache.pulsar.client.api.schema.GenericRecord;
|
||||||
import org.apache.pulsar.client.api.PulsarClientException;
|
|
||||||
import org.apache.pulsar.client.api.Schema;
|
|
||||||
import org.apache.pulsar.client.api.transaction.Transaction;
|
import org.apache.pulsar.client.api.transaction.Transaction;
|
||||||
import org.apache.pulsar.common.schema.SchemaType;
|
import org.apache.pulsar.common.schema.SchemaType;
|
||||||
|
|
||||||
@ -52,6 +52,8 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
private final Histogram payloadRttHistogram;
|
private final Histogram payloadRttHistogram;
|
||||||
private final String payloadRttTrackingField;
|
private final String payloadRttTrackingField;
|
||||||
|
|
||||||
|
private org.apache.avro.Schema avroSchema;
|
||||||
|
|
||||||
public PulsarConsumerOp(
|
public PulsarConsumerOp(
|
||||||
PulsarActivity pulsarActivity,
|
PulsarActivity pulsarActivity,
|
||||||
boolean asyncPulsarOp,
|
boolean asyncPulsarOp,
|
||||||
@ -152,7 +154,7 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
msgRecvFuture.thenAccept(message -> {
|
msgRecvFuture.thenAccept(message -> {
|
||||||
try {
|
try {
|
||||||
handleMessage(transaction, message);
|
handleMessage(transaction, message);
|
||||||
} catch (PulsarClientException e) {
|
} catch (PulsarClientException | TimeoutException e) {
|
||||||
pulsarActivity.asyncOperationFailed(e);
|
pulsarActivity.asyncOperationFailed(e);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
@ -171,14 +173,30 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void handleMessage(Transaction transaction, Message<?> message)
|
private void handleMessage(Transaction transaction, Message<?> message)
|
||||||
throws PulsarClientException, InterruptedException, ExecutionException {
|
throws PulsarClientException, InterruptedException, ExecutionException, TimeoutException {
|
||||||
|
|
||||||
|
// acknowledge the message as soon as possible
|
||||||
|
if (!useTransaction) {
|
||||||
|
consumer.acknowledgeAsync(message.getMessageId())
|
||||||
|
.get(timeoutSeconds, TimeUnit.SECONDS);
|
||||||
|
} else {
|
||||||
|
consumer.acknowledgeAsync(message.getMessageId(), transaction)
|
||||||
|
.get(timeoutSeconds, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// little problem: here we are counting the "commit" time
|
||||||
|
// inside the overall time spent for the execution of the consume operation
|
||||||
|
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
|
||||||
|
// to track with precision the time spent for the operation and for the commit
|
||||||
|
try (Timer.Context ctx = transactionCommitTimer.time()) {
|
||||||
|
transaction.commit().get();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
SchemaType schemaType = pulsarSchema.getSchemaInfo().getType();
|
||||||
|
|
||||||
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
if (PulsarActivityUtil.isAvroSchemaTypeStr(schemaType.name())) {
|
||||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
org.apache.avro.Schema avroSchema = getSchemaFromConfiguration();
|
||||||
org.apache.avro.Schema avroSchema =
|
|
||||||
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
|
||||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
||||||
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
||||||
|
|
||||||
@ -198,13 +216,30 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!payloadRttTrackingField.isEmpty()) {
|
if (!payloadRttTrackingField.isEmpty()) {
|
||||||
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
Object decodedPayload = message.getValue();
|
||||||
org.apache.avro.Schema avroSchema =
|
Long extractedSendTime = null;
|
||||||
AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
// if Pulsar is able to decode this it is better to let it do the work
|
||||||
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
// because Pulsar caches the Schema, handles Schema evolution
|
||||||
|
// as much efficiently as possible
|
||||||
|
if (decodedPayload instanceof GenericRecord) {
|
||||||
|
GenericRecord pulsarGenericRecord = (GenericRecord) decodedPayload;
|
||||||
|
Object field = pulsarGenericRecord.getField(payloadRttTrackingField);
|
||||||
|
if (field != null) {
|
||||||
|
if (field instanceof Number) {
|
||||||
|
extractedSendTime = ((Number) field).longValue();
|
||||||
|
} else {
|
||||||
|
extractedSendTime = Long.valueOf(field.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
org.apache.avro.Schema avroSchema = getSchemaFromConfiguration();
|
||||||
|
org.apache.avro.generic.GenericRecord avroGenericRecord =
|
||||||
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
AvroUtil.GetGenericRecord_ApacheAvro(avroSchema, message.getData());
|
||||||
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
|
if (avroGenericRecord.hasField(payloadRttTrackingField)) {
|
||||||
long extractedSendTime = (Long)avroGenericRecord.get(payloadRttTrackingField);
|
extractedSendTime = (Long) avroGenericRecord.get(payloadRttTrackingField);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (extractedSendTime != null) {
|
||||||
long delta = System.currentTimeMillis() - extractedSendTime;
|
long delta = System.currentTimeMillis() - extractedSendTime;
|
||||||
payloadRttHistogram.update(delta);
|
payloadRttHistogram.update(delta);
|
||||||
}
|
}
|
||||||
@ -237,21 +272,16 @@ public class PulsarConsumerOp implements PulsarOp {
|
|||||||
int messageSize = message.getData().length;
|
int messageSize = message.getData().length;
|
||||||
bytesCounter.inc(messageSize);
|
bytesCounter.inc(messageSize);
|
||||||
messageSizeHistogram.update(messageSize);
|
messageSizeHistogram.update(messageSize);
|
||||||
|
}
|
||||||
|
|
||||||
if (!useTransaction) {
|
private org.apache.avro.Schema getSchemaFromConfiguration() {
|
||||||
consumer.acknowledge(message.getMessageId());
|
String avroDefStr = pulsarSchema.getSchemaInfo().getSchemaDefinition();
|
||||||
}
|
// no need for synchronization, this is only a cache
|
||||||
else {
|
// in case of the race we will parse the string twice, not a big
|
||||||
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
|
if (avroSchema == null) {
|
||||||
|
avroSchema = AvroUtil.GetSchema_ApacheAvro(avroDefStr);
|
||||||
// little problem: here we are counting the "commit" time
|
|
||||||
// inside the overall time spent for the execution of the consume operation
|
|
||||||
// we should refactor this operation as for PulsarProducerOp, and use the passed callback
|
|
||||||
// to track with precision the time spent for the operation and for the commit
|
|
||||||
try (Timer.Context ctx = transactionCommitTimer.time()) {
|
|
||||||
transaction.commit().get();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return avroSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -66,7 +66,6 @@ public class LoggerConfig extends ConfigurationFactory {
|
|||||||
private final String logfilePattern = DEFAULT_LOGFILE_PATTERN;
|
private final String logfilePattern = DEFAULT_LOGFILE_PATTERN;
|
||||||
private NBLogLevel fileLevel = NBLogLevel.DEBUG;
|
private NBLogLevel fileLevel = NBLogLevel.DEBUG;
|
||||||
|
|
||||||
public static final Level ROOT_LOG_LEVEL = Level.ALL;
|
|
||||||
private Map<String, String> logLevelOverrides = new LinkedHashMap<>();
|
private Map<String, String> logLevelOverrides = new LinkedHashMap<>();
|
||||||
private Path loggerDir = Path.of("logs");
|
private Path loggerDir = Path.of("logs");
|
||||||
private String sessionName;
|
private String sessionName;
|
||||||
@ -116,9 +115,20 @@ public class LoggerConfig extends ConfigurationFactory {
|
|||||||
|
|
||||||
Level internalLoggingStatusThreshold = Level.ERROR;
|
Level internalLoggingStatusThreshold = Level.ERROR;
|
||||||
Level builderThresholdLevel = Level.INFO;
|
Level builderThresholdLevel = Level.INFO;
|
||||||
// Level rootLoggingLevel = Level.INFO;
|
|
||||||
|
|
||||||
RootLoggerComponentBuilder rootBuilder = builder.newRootLogger(ROOT_LOG_LEVEL);
|
Level fileLevel = Level.valueOf(getEffectiveFileLevel().toString());
|
||||||
|
Level consoleLevel = Level.valueOf(this.consoleLevel.toString());
|
||||||
|
|
||||||
|
// configure the ROOT logger the same way as the File level
|
||||||
|
// this is because the fileLevel is supposed to show more than the console
|
||||||
|
|
||||||
|
// therefore, it is very important that the ROOT level is as much specific as possible
|
||||||
|
// because NB code and especially third party libraries may rely on logging guards (if logger.isDebugEnabled())
|
||||||
|
// to reduce memory allocations and resource waste due to debug/trace logging
|
||||||
|
// if you set ROOT to ALL or to TRACE then you will trigger the execution of trace/debugging code
|
||||||
|
// that will affect performances and impact on the measurements made with NB
|
||||||
|
Level rootLoggingLevel = fileLevel;
|
||||||
|
RootLoggerComponentBuilder rootBuilder = builder.newRootLogger(rootLoggingLevel);
|
||||||
|
|
||||||
builder.setConfigurationName(name);
|
builder.setConfigurationName(name);
|
||||||
|
|
||||||
@ -186,14 +196,14 @@ public class LoggerConfig extends ConfigurationFactory {
|
|||||||
|
|
||||||
rootBuilder.add(
|
rootBuilder.add(
|
||||||
builder.newAppenderRef("SCENARIO_APPENDER")
|
builder.newAppenderRef("SCENARIO_APPENDER")
|
||||||
.addAttribute("level", Level.valueOf(getEffectiveFileLevel().toString()))
|
.addAttribute("level", fileLevel)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
rootBuilder.add(
|
rootBuilder.add(
|
||||||
builder.newAppenderRef("console")
|
builder.newAppenderRef("console")
|
||||||
.addAttribute("level",
|
.addAttribute("level",
|
||||||
Level.valueOf(consoleLevel.toString())
|
consoleLevel
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user