diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java index ad52de2fd..049daec64 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java @@ -23,10 +23,7 @@ import io.nosqlbench.engine.api.activityimpl.input.ProgressCapable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -357,21 +354,30 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen * @param m motor instance * @param waitTime milliseconds to wait, total * @param pollTime polling interval between state checks - * @param runState any desired SlotState + * @param desiredRunStates any desired SlotState * @return true, if the desired SlotState was detected */ - private boolean awaitMotorState(Motor m, int waitTime, int pollTime, RunState... runState) { + private boolean awaitMotorState(Motor m, int waitTime, int pollTime, RunState... desiredRunStates) { + Set desiredStates = new HashSet<>(Arrays.asList(desiredRunStates)); + long startedAt = System.currentTimeMillis(); while (System.currentTimeMillis() < (startedAt + waitTime)) { - for (RunState state : runState) { - if (m.getSlotStateTracker().getSlotState() == state) { - logger.trace(activityDef.getAlias() + "/Motor[" + m.getSlotId() + "] is now in state " + m.getSlotStateTracker().getSlotState()); - return true; - } + Map actualStates = new HashMap<>(); + for (RunState state : desiredRunStates) { + actualStates.compute(state, (k, v) -> (v == null ? 0 : v) + 1); } - try { - Thread.sleep(pollTime); - } catch (InterruptedException ignored) { + for (RunState desiredRunState : desiredRunStates) { + actualStates.remove(desiredRunState); + } + logger.trace("state of remaining slots:" + actualStates.toString()); + if (actualStates.size()==0) { + return true; + } else { + System.out.println("motor states:" + actualStates.toString()); + try { + Thread.sleep(pollTime); + } catch (InterruptedException ignored) { + } } } logger.trace(activityDef.getAlias() + "/Motor[" + m.getSlotId() + "] is now in state " + m.getSlotStateTracker().getSlotState());