standard action improvements

This commit is contained in:
Jonathan Shook 2021-08-10 10:33:07 -05:00
parent e7668610c3
commit 069e96fceb

View File

@ -1,11 +1,14 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSource;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.*;
import java.util.concurrent.TimeUnit;
@ -19,51 +22,80 @@ import java.util.concurrent.TimeUnit;
* @param <A> The type of activity
* @param <R> The type of operation
*/
public class StandardAction<A extends StandardActivity, R extends Runnable> implements SyncAction, ActivityDefObserver {
public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> implements SyncAction, ActivityDefObserver {
private final A activity;
private final OpSource<R> opsource;
private final int slot;
private final Timer executeTimer;
private final Histogram triesHistogram;
private final Timer resultSuccessTimer;
private final Timer resultTimer;
private final Timer bindTimer;
private final NBErrorHandler errorHandler;
public StandardAction(A activity, int slot) {
this.activity = activity;
this.opsource = activity.getOpSource();
this.slot = slot;
bindTimer = activity.getInstrumentation().getOrCreateBindTimer();
executeTimer = activity.getInstrumentation().getOrCreateExecuteTimer();
triesHistogram = activity.getInstrumentation().getOrCreateTriesHistogram();
resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
errorHandler = activity.getErrorHandler();
}
@Override
public int runCycle(long cycle) {
R op = null;
try (Timer.Context ct = activity.getInstrumentation().getOrCreateInputTimer().time()) {
Op op = null;
try (Timer.Context ct = bindTimer.time()) {
op = opsource.apply(cycle);
}
int tries = 0;
int code= 0;
int code = 0;
Object result = null;
while (tries++ <= activity.getMaxTries()) {
Throwable error = null;
long startedAt = System.nanoTime();
try (Timer.Context ct = activity.getInstrumentation().getOrCreateExecuteTimer().time()) {
op.run();
break;
try (Timer.Context ct = executeTimer.time()) {
if (op instanceof CycleOp<?>) {
result = ((CycleOp) op).apply(cycle);
} else if (op instanceof ChainingOp) {
result = ((ChainingOp) op).apply(result);
} else {
throw new RuntimeException("The op implementation did not implement any active logic. Implement " +
"either InitialCycleFunction or ChainedCycleFunction");
}
if (op instanceof OpGenerator) {
op = ((OpGenerator) op).getNextOp();
} else {
break;
}
} catch (Exception e) {
error = e;
} finally {
long nanos = System.nanoTime() - startedAt;
activity.getInstrumentation().getOrCreateResultTimer().update(nanos, TimeUnit.NANOSECONDS);
resultTimer.update(nanos, TimeUnit.NANOSECONDS);
if (error == null) {
activity.getInstrumentation().getOrCreateResultSuccessTimer().update(nanos, TimeUnit.NANOSECONDS);
resultSuccessTimer.update(nanos, TimeUnit.NANOSECONDS);
} else {
ErrorDetail detail = activity.getErrorHandler().handleError(error, cycle, nanos);
code=detail.resultCode;
ErrorDetail detail = errorHandler.handleError(error, cycle, nanos);
code = detail.resultCode;
if (!detail.isRetryable()) {
break;
}
}
}
}
activity.getInstrumentation().getOrCreateTriesHistogram().update(tries);
triesHistogram.update(tries);
return code;
}