diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index e7da412fa..f5f64609a 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.engine.api.activityapi.core.SyncAction; +import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -14,10 +15,12 @@ public class PulsarAction implements SyncAction { private final int slot; private final PulsarActivity activity; + int maxTries = 1; public PulsarAction(PulsarActivity activity, int slot) { this.activity = activity; this.slot = slot; + this.maxTries = activity.getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10); } @Override @@ -26,6 +29,7 @@ public class PulsarAction implements SyncAction { @Override public int runCycle(long cycle) { + long start = System.nanoTime(); PulsarOp pulsarOp; try (Timer.Context ctx = activity.getBindTimer().time()) { @@ -33,17 +37,26 @@ public class PulsarAction implements SyncAction { pulsarOp = readyPulsarOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... + activity.getErrorhandler().handleError(bindException, cycle, 0); throw new RuntimeException( "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException ); } - try (Timer.Context ctx = activity.getExecuteTimer().time()) { - pulsarOp.run(); + for (int i = 0; i < maxTries; i++) { + try (Timer.Context ctx = activity.getExecuteTimer().time()) { + pulsarOp.run(); + break; + } catch (RuntimeException err) { + ErrorDetail errorDetail = activity + .getErrorhandler() + .handleError(err, cycle, System.nanoTime() - start); + if (!errorDetail.isRetryable()) { + break; + } + } } - // TODO: add retries and use standard error handler - return 0; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index fda1f16cd..c0ff8fe7e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -59,6 +59,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve ); } + public NBErrorHandler getErrorhandler() { + return errorhandler; + } + @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef); diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java index ee58f6848..8dea929bc 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarSpace.java @@ -11,6 +11,7 @@ import org.apache.pulsar.client.impl.DefaultBatcherBuilder; import org.apache.pulsar.client.impl.ProducerImpl; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -323,7 +324,8 @@ public class PulsarSpace { PulsarClient pulsarClient = getPulsarClient(); // Get other possible producer settings that are set at global level - Map consumerConf = pulsarNBClientConf.getConsumerConfMap(); + Map consumerConf = new HashMap<>(pulsarNBClientConf.getConsumerConfMap()); + consumerConf.remove("timeout"); // Explicit topic names will take precedence over topics pattern if (!topicNames.isEmpty()) {