Add ErrorHandler

This commit is contained in:
Enrico Olivelli 2021-03-17 17:49:32 +01:00
parent 8aff79cba4
commit abbea4df71
2 changed files with 20 additions and 4 deletions

View File

@ -3,6 +3,7 @@ package io.nosqlbench.driver.pulsar;
import com.codahale.metrics.Timer;
import io.nosqlbench.driver.pulsar.ops.PulsarOp;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -12,6 +13,7 @@ public class PulsarAction implements SyncAction {
private final static Logger logger = LogManager.getLogger(PulsarAction.class);
private static final int MAX_TRIALS = 100;
private final int slot;
private final PulsarActivity activity;
@ -26,6 +28,7 @@ public class PulsarAction implements SyncAction {
@Override
public int runCycle(long cycle) {
long start = System.nanoTime();
PulsarOp pulsarOp;
try (Timer.Context ctx = activity.getBindTimer().time()) {
@ -33,17 +36,26 @@ public class PulsarAction implements SyncAction {
pulsarOp = readyPulsarOp.apply(cycle);
} catch (Exception bindException) {
// if diagnostic mode ...
activity.getErrorhandler().handleError(bindException, cycle, 0);
throw new RuntimeException(
"while binding request in cycle " + cycle + ": " + bindException.getMessage(), bindException
);
}
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
for (int i = 0; i < MAX_TRIALS; i++) {
try (Timer.Context ctx = activity.getExecuteTimer().time()) {
pulsarOp.run();
break;
} catch (RuntimeException err) {
ErrorDetail errorDetail = activity
.getErrorhandler()
.handleError(err, cycle, System.nanoTime() - start);
if (!errorDetail.isRetryable()) {
break;
}
}
}
// TODO: add retries and use standard error handler
return 0;
}
}

View File

@ -59,6 +59,10 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
);
}
public NBErrorHandler getErrorhandler() {
return errorhandler;
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);