Return metrics in case of Async API

This commit is contained in:
Enrico Olivelli 2021-03-30 10:09:15 +02:00
parent c70385e6f8
commit f10bd283f3
11 changed files with 32 additions and 14 deletions

View File

@ -44,8 +44,11 @@ public class PulsarAction implements SyncAction {
}
for (int i = 0; i < maxTries; i++) {
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
Timer.Context ctx = activity.getExecuteTimer().time();
try {
// it is up to the pulsarOp to call Context#close when the activity is executed
// this allows us to track time for async operations
pulsarOp.run(ctx::close);
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity

View File

@ -19,7 +19,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
public class PulsarAdminOp implements PulsarOp {
public class PulsarAdminOp extends SimplePulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarAdminOp.class);

View File

@ -11,7 +11,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerEndOp implements PulsarOp {
public class PulsarBatchProducerEndOp extends SimplePulsarOp {
@Override
public void run() {
List<CompletableFuture<MessageId>> container = PulsarBatchProducerStartOp.threadLocalBatchMsgContainer.get();

View File

@ -14,7 +14,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerOp implements PulsarOp {
public class PulsarBatchProducerOp extends SimplePulsarOp {
private final Schema<?> pulsarSchema;
private final String msgKey;

View File

@ -7,7 +7,7 @@ import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class PulsarBatchProducerStartOp implements PulsarOp {
public class PulsarBatchProducerStartOp extends SimplePulsarOp {
// TODO: ensure sane container lifecycle management
public final static ThreadLocal<List<CompletableFuture<MessageId>>> threadLocalBatchMsgContainer = new ThreadLocal<>();

View File

@ -12,7 +12,7 @@ import org.apache.pulsar.common.schema.SchemaType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class PulsarConsumerOp implements PulsarOp {
public class PulsarConsumerOp extends SimplePulsarOp {
private final static Logger logger = LogManager.getLogger(PulsarConsumerOp.class);

View File

@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Pulsar Operations including Producers and Consumers.
*/
public interface PulsarOp extends Runnable {
public interface PulsarOp {
void run(Runnable callback);
}

View File

@ -43,7 +43,7 @@ public class PulsarProducerOp implements PulsarOp {
}
@Override
public void run() {
public void run(Runnable timeTracker) {
if ((msgPayload == null) || msgPayload.isEmpty()) {
throw new RuntimeException("Message payload (\"msg-value\") can't be empty!");
}
@ -80,11 +80,14 @@ public class PulsarProducerOp implements PulsarOp {
logger.trace("failed sending message");
throw new RuntimeException(pce);
}
timeTracker.run();
} else {
try {
// we rely on blockIfQueueIsFull in order to throttle the request in this case
CompletableFuture<MessageId> future = typedMessageBuilder.sendAsync();
future.exceptionally(ex -> {
future.whenComplete((messageId, error) -> {
timeTracker.run();
}).exceptionally(ex -> {
logger.error("Producing message failed: key - " + msgKey + "; payload - " + msgPayload);
return null;
});

View File

@ -8,7 +8,7 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
public class PulsarReaderOp implements PulsarOp {
public class PulsarReaderOp extends SimplePulsarOp {
private final Reader<?> reader;
private final Schema<?> pulsarSchema;
private final boolean asyncPulsarOp;

View File

@ -50,9 +50,6 @@ public class ReadyPulsarOp implements OpDispenser<PulsarOp> {
}
this.opFunc = resolve();
ScopedSupplier scope = ScopedSupplier.valueOf(cmdTpl.getStaticOr("op_scope", "singleton"));
Supplier<LongFunction<PulsarOp>> opSupplier = scope.supplier(this::resolve);
}
@Override

View File

@ -0,0 +1,14 @@
package io.nosqlbench.driver.pulsar.ops;
/**
* Base type of all Pulsar Operations including Producers and Consumers.
*/
public abstract class SimplePulsarOp implements PulsarOp {
public void run(Runnable timeTracker) {
this.run();
timeTracker.run();
}
public abstract void run();
}