improve concurrency patterns for activity execution

This commit is contained in:
Jonathan Shook
2022-12-20 20:04:17 -06:00
parent bf5a31b342
commit b9365bff72
14 changed files with 624 additions and 649 deletions

View File

@@ -17,7 +17,7 @@
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityimpl.SlotStateTracker;
import io.nosqlbench.engine.api.activityimpl.MotorState;
/**
* The core threading harness within an activity.
@@ -54,6 +54,7 @@ public interface Motor<T> extends Runnable, Stoppable {
* Get a description of the current slot run status.
* @return - a value from the {@link RunState} enum
*/
SlotStateTracker getSlotStateTracker();
MotorState getState();
void removeState();
}

View File

@@ -29,12 +29,11 @@ import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.output.Output;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.SlotStateTracker;
import io.nosqlbench.engine.api.activityimpl.MotorState;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static io.nosqlbench.engine.api.activityapi.core.RunState.*;
@@ -70,8 +69,8 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
private final Activity activity;
private Output output;
private final SlotStateTracker slotStateTracker;
private final AtomicReference<RunState> slotState;
private final MotorState motorState;
// private final AtomicReference<RunState> slotState;
private int stride = 1;
private OpTracker<D> opTracker;
@@ -86,14 +85,13 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
* @param input A LongSupplier which provides the cycle number inputs.
*/
public CoreMotor(
Activity activity,
long slotId,
Input input) {
Activity activity,
long slotId,
Input input) {
this.activity = activity;
this.slotId = slotId;
setInput(input);
slotStateTracker = new SlotStateTracker(slotId);
slotState = slotStateTracker.getAtomicSlotState();
motorState = new MotorState(slotId, activity.getRunStateTally());
onActivityDefUpdate(activity.getActivityDef());
}
@@ -107,10 +105,10 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
* @param action An LongConsumer which is applied to the input for each cycle.
*/
public CoreMotor(
Activity activity,
long slotId,
Input input,
Action action
Activity activity,
long slotId,
Input input,
Action action
) {
this(activity, slotId, input);
setAction(action);
@@ -126,11 +124,11 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
* @param output An optional opTracker.
*/
public CoreMotor(
Activity activity,
long slotId,
Input input,
Action action,
Output output
Activity activity,
long slotId,
Input input,
Action action,
Output output
) {
this(activity, slotId, input);
setAction(action);
@@ -178,12 +176,18 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
}
@Override
public SlotStateTracker getSlotStateTracker() {
return slotStateTracker;
public MotorState getState() {
return motorState;
}
@Override
public void removeState() {
motorState.removeState();
}
@Override
public void run() {
motorState.enterState(Starting);
try {
inputTimer = activity.getInstrumentation().getOrCreateInputTimer();
@@ -195,12 +199,10 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
cycleRateLimiter = activity.getCycleLimiter();
if (slotState.get() == Finished) {
if (motorState.get() == Finished) {
logger.warn("Input was already exhausted for slot " + slotId + ", remaining in finished state.");
}
slotStateTracker.enterState(Running);
long cyclenum;
action.init();
@@ -235,7 +237,8 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
strideconsumer = (StrideOutputConsumer<D>) async;
}
while (slotState.get() == Running) {
motorState.enterState(Running);
while (motorState.get() == Running) {
CycleSegment cycleSegment = null;
@@ -245,7 +248,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (cycleSegment == null) {
logger.trace("input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId);
slotStateTracker.enterState(Finished);
motorState.enterState(Finished);
continue;
}
@@ -256,27 +259,27 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
StrideTracker<D> strideTracker = new StrideTracker<>(
strideServiceTimer,
stridesResponseTimer,
strideDelay,
cycleSegment.peekNextCycle(),
stride,
output,
strideconsumer);
stridesResponseTimer,
strideDelay,
cycleSegment.peekNextCycle(),
stride,
output,
strideconsumer);
strideTracker.start();
long strideStart = System.nanoTime();
while (!cycleSegment.isExhausted() && slotState.get() == Running) {
while (!cycleSegment.isExhausted() && motorState.get() == Running) {
cyclenum = cycleSegment.nextCycle();
if (cyclenum < 0) {
if (cycleSegment.isExhausted()) {
logger.trace("input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId);
slotStateTracker.enterState(Finished);
motorState.enterState(Finished);
continue;
}
}
if (slotState.get() != Running) {
if (motorState.get() != Running) {
logger.trace("motor stopped in cycle " + cyclenum + ", stopping motor thread " + slotId);
continue;
}
@@ -287,7 +290,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
}
try {
TrackedOp<D> op = opTracker.newOp(cyclenum,strideTracker);
TrackedOp<D> op = opTracker.newOp(cyclenum, strideTracker);
op.setWaitTime(cycleDelay);
synchronized (opTracker) {
@@ -312,7 +315,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
}
if (slotState.get() == Finished) {
if (motorState.get() == Finished) {
boolean finished = opTracker.awaitCompletion(60000);
if (finished) {
logger.debug("slot " + this.slotId + " completed successfully");
@@ -321,12 +324,12 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
}
}
if (slotState.get() == Stopping) {
slotStateTracker.enterState(Stopped);
if (motorState.get() == Stopping) {
motorState.enterState(Stopped);
}
} else if (action instanceof SyncAction) {
} else if (action instanceof SyncAction sync) {
cycleServiceTimer = activity.getInstrumentation().getOrCreateCyclesServiceTimer();
strideServiceTimer = activity.getInstrumentation().getOrCreateStridesServiceTimer();
@@ -335,9 +338,8 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async.");
}
SyncAction sync = (SyncAction) action;
while (slotState.get() == Running) {
motorState.enterState(Running);
while (motorState.get() == Running) {
CycleSegment cycleSegment = null;
CycleResultSegmentBuffer segBuffer = new CycleResultSegmentBuffer(stride);
@@ -348,7 +350,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (cycleSegment == null) {
logger.trace("input exhausted (input " + input + ") via null segment, stopping motor thread " + slotId);
slotStateTracker.enterState(Finished);
motorState.enterState(Finished);
continue;
}
@@ -366,12 +368,12 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (cyclenum < 0) {
if (cycleSegment.isExhausted()) {
logger.trace("input exhausted (input " + input + ") via negative read, stopping motor thread " + slotId);
slotStateTracker.enterState(Finished);
motorState.enterState(Finished);
continue;
}
}
if (slotState.get() != Running) {
if (motorState.get() != Running) {
logger.trace("motor stopped after input (input " + cyclenum + "), stopping motor thread " + slotId);
continue;
}
@@ -391,6 +393,9 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
result = sync.runCycle(cyclenum);
long phaseEnd = System.nanoTime();
} catch (Exception e) {
motorState.enterState(Errored);
throw e;
} finally {
long cycleEnd = System.nanoTime();
cycleServiceTimer.update((cycleEnd - cycleStart) + cycleDelay, TimeUnit.NANOSECONDS);
@@ -414,25 +419,29 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
}
}
if (slotState.get() == Stopping) {
slotStateTracker.enterState(Stopped);
}
} else {
throw new RuntimeException("Valid Action implementations must implement either the SyncAction or the AsyncAction sub-interface");
}
if (motorState.get() == Stopping) {
motorState.enterState(Stopped);
logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
} else if (motorState.get() == Finished) {
logger.trace(() -> Thread.currentThread().getName() + " shutting down as " + motorState.get());
} else {
logger.warn("Unexpected motor state for CoreMotor shutdown: " + motorState.get());
}
} catch (Throwable t) {
logger.error("Error in core motor loop:" + t, t);
motorState.enterState(Errored);
throw t;
}
}
@Override
public String toString() {
return "slot:" + this.slotId + "; state:" + slotState.get();
return "slot:" + this.slotId + "; state:" + motorState.get();
}
@Override
@@ -452,17 +461,17 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
@Override
public synchronized void requestStop() {
if (slotState.get() == Running) {
if (motorState.get() == Running) {
if (input instanceof Stoppable) {
((Stoppable) input).requestStop();
}
if (action instanceof Stoppable) {
((Stoppable) action).requestStop();
}
slotStateTracker.enterState(RunState.Stopping);
motorState.enterState(RunState.Stopping);
} else {
if (slotState.get() != Stopped && slotState.get() != Stopping) {
logger.warn("attempted to stop motor " + this.getSlotId() + ": from non Running state:" + slotState.get());
if (motorState.get() != Stopped && motorState.get() != Stopping) {
logger.warn("attempted to stop motor " + this.getSlotId() + ": from non Running state:" + motorState.get());
}
}
}