From abbea4df7175b62e6155cecb1412b34380e8537a Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 17 Mar 2021 17:49:32 +0100 Subject: [PATCH] Add ErrorHandler --- .../driver/pulsar/PulsarAction.java | 20 +++++++++++++++---- .../driver/pulsar/PulsarActivity.java | 4 ++++ 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java index e7da412fa..853a70ad7 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarAction.java @@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar; import com.codahale.metrics.Timer; import io.nosqlbench.driver.pulsar.ops.PulsarOp; import io.nosqlbench.engine.api.activityapi.core.SyncAction; +import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -12,6 +13,7 @@ public class PulsarAction implements SyncAction { private final static Logger logger = LogManager.getLogger(PulsarAction.class); + private static final int MAX_TRIALS = 100; private final int slot; private final PulsarActivity activity; @@ -26,6 +28,7 @@ public class PulsarAction implements SyncAction { @Override public int runCycle(long cycle) { + long start = System.nanoTime(); PulsarOp pulsarOp; try (Timer.Context ctx = activity.getBindTimer().time()) { @@ -33,17 +36,26 @@ public class PulsarAction implements SyncAction { pulsarOp = readyPulsarOp.apply(cycle); } catch (Exception bindException) { // if diagnostic mode ... + activity.getErrorhandler().handleError(bindException, cycle, 0); throw new RuntimeException( "while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException ); } - try (Timer.Context ctx = activity.getExecuteTimer().time()) { - pulsarOp.run(); + for (int i = 0; i < MAX_TRIALS; i++) { + try (Timer.Context ctx = activity.getExecuteTimer().time()) { + pulsarOp.run(); + break; + } catch (RuntimeException err) { + ErrorDetail errorDetail = activity + .getErrorhandler() + .handleError(err, cycle, System.nanoTime() - start); + if (!errorDetail.isRetryable()) { + break; + } + } } - // TODO: add retries and use standard error handler - return 0; } } diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index fda1f16cd..c0ff8fe7e 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -59,6 +59,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve ); } + public NBErrorHandler getErrorhandler() { + return errorhandler; + } + @Override public synchronized void onActivityDefUpdate(ActivityDef activityDef) { super.onActivityDefUpdate(activityDef);